*
* Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
*/
+#include <linux/ftrace_event.h>
#include <linux/ring_buffer.h>
#include <linux/trace_clock.h>
+#include <linux/trace_seq.h>
#include <linux/spinlock.h>
+#include <linux/irq_work.h>
#include <linux/debugfs.h>
#include <linux/uaccess.h>
#include <linux/hardirq.h>
+#include <linux/kthread.h> /* for self test */
#include <linux/kmemcheck.h>
#include <linux/module.h>
#include <linux/percpu.h>
#include <linux/mutex.h>
+#include <linux/delay.h>
#include <linux/slab.h>
#include <linux/init.h>
#include <linux/hash.h>
#include <linux/fs.h>
#include <asm/local.h>
-#include "trace.h"
static void update_pages_handler(struct work_struct *work);
return ret;
}
+struct rb_irq_work {
+ struct irq_work work;
+ wait_queue_head_t waiters;
+ bool waiters_pending;
+};
+
/*
* head_page == tail_page && head == tail then buffer is empty.
*/
struct list_head new_pages; /* new pages to add */
struct work_struct update_pages_work;
struct completion update_done;
+
+ struct rb_irq_work irq_work;
};
struct ring_buffer {
struct notifier_block cpu_notify;
#endif
u64 (*clock)(void);
+
+ struct rb_irq_work irq_work;
};
struct ring_buffer_iter {
u64 read_stamp;
};
+/*
+ * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
+ *
+ * Schedules a delayed work to wake up any task that is blocked on the
+ * ring buffer waiters queue.
+ */
+static void rb_wake_up_waiters(struct irq_work *work)
+{
+ struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
+
+ wake_up_all(&rbwork->waiters);
+}
+
+/**
+ * ring_buffer_wait - wait for input to the ring buffer
+ * @buffer: buffer to wait on
+ * @cpu: the cpu buffer to wait on
+ *
+ * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
+ * as data is added to any of the @buffer's cpu buffers. Otherwise
+ * it will wait for data to be added to a specific cpu buffer.
+ */
+void ring_buffer_wait(struct ring_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ DEFINE_WAIT(wait);
+ struct rb_irq_work *work;
+
+ /*
+ * Depending on what the caller is waiting for, either any
+ * data in any cpu buffer, or a specific buffer, put the
+ * caller on the appropriate wait queue.
+ */
+ if (cpu == RING_BUFFER_ALL_CPUS)
+ work = &buffer->irq_work;
+ else {
+ cpu_buffer = buffer->buffers[cpu];
+ work = &cpu_buffer->irq_work;
+ }
+
+
+ prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
+
+ /*
+ * The events can happen in critical sections where
+ * checking a work queue can cause deadlocks.
+ * After adding a task to the queue, this flag is set
+ * only to notify events to try to wake up the queue
+ * using irq_work.
+ *
+ * We don't clear it even if the buffer is no longer
+ * empty. The flag only causes the next event to run
+ * irq_work to do the work queue wake up. The worse
+ * that can happen if we race with !trace_empty() is that
+ * an event will cause an irq_work to try to wake up
+ * an empty queue.
+ *
+ * There's no reason to protect this flag either, as
+ * the work queue and irq_work logic will do the necessary
+ * synchronization for the wake ups. The only thing
+ * that is necessary is that the wake up happens after
+ * a task has been queued. It's OK for spurious wake ups.
+ */
+ work->waiters_pending = true;
+
+ if ((cpu == RING_BUFFER_ALL_CPUS && ring_buffer_empty(buffer)) ||
+ (cpu != RING_BUFFER_ALL_CPUS && ring_buffer_empty_cpu(buffer, cpu)))
+ schedule();
+
+ finish_wait(&work->waiters, &wait);
+}
+
+/**
+ * ring_buffer_poll_wait - poll on buffer input
+ * @buffer: buffer to wait on
+ * @cpu: the cpu buffer to wait on
+ * @filp: the file descriptor
+ * @poll_table: The poll descriptor
+ *
+ * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
+ * as data is added to any of the @buffer's cpu buffers. Otherwise
+ * it will wait for data to be added to a specific cpu buffer.
+ *
+ * Returns POLLIN | POLLRDNORM if data exists in the buffers,
+ * zero otherwise.
+ */
+int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
+ struct file *filp, poll_table *poll_table)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+ struct rb_irq_work *work;
+
+ if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
+ (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
+ return POLLIN | POLLRDNORM;
+
+ if (cpu == RING_BUFFER_ALL_CPUS)
+ work = &buffer->irq_work;
+ else {
+ cpu_buffer = buffer->buffers[cpu];
+ work = &cpu_buffer->irq_work;
+ }
+
+ work->waiters_pending = true;
+ poll_wait(filp, &work->waiters, poll_table);
+
+ if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
+ (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
+ return POLLIN | POLLRDNORM;
+ return 0;
+}
+
/* buffer may be either ring_buffer or ring_buffer_per_cpu */
#define RB_WARN_ON(b, cond) \
({ \
cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
init_completion(&cpu_buffer->update_done);
+ init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
+ init_waitqueue_head(&cpu_buffer->irq_work.waiters);
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
buffer->clock = trace_clock_local;
buffer->reader_lock_key = key;
+ init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
+ init_waitqueue_head(&buffer->irq_work.waiters);
+
/* need at least two pages */
if (nr_pages < 2)
nr_pages = 2;
if (!cpu_buffer->nr_pages_to_update)
continue;
- if (cpu_online(cpu))
+ /* The update must run on the CPU that is being updated. */
+ preempt_disable();
+ if (cpu == smp_processor_id() || !cpu_online(cpu)) {
+ rb_update_pages(cpu_buffer);
+ cpu_buffer->nr_pages_to_update = 0;
+ } else {
+ /*
+ * Can not disable preemption for schedule_work_on()
+ * on PREEMPT_RT.
+ */
+ preempt_enable();
schedule_work_on(cpu,
&cpu_buffer->update_pages_work);
- else
- rb_update_pages(cpu_buffer);
+ preempt_disable();
+ }
+ preempt_enable();
}
/* wait for all the updates to complete */
get_online_cpus();
- if (cpu_online(cpu_id)) {
+ preempt_disable();
+ /* The update must run on the CPU that is being updated. */
+ if (cpu_id == smp_processor_id() || !cpu_online(cpu_id))
+ rb_update_pages(cpu_buffer);
+ else {
+ /*
+ * Can not disable preemption for schedule_work_on()
+ * on PREEMPT_RT.
+ */
+ preempt_enable();
schedule_work_on(cpu_id,
&cpu_buffer->update_pages_work);
wait_for_completion(&cpu_buffer->update_done);
- } else
- rb_update_pages(cpu_buffer);
+ preempt_disable();
+ }
+ preempt_enable();
cpu_buffer->nr_pages_to_update = 0;
put_online_cpus();
#ifdef CONFIG_TRACING
-#define TRACE_RECURSIVE_DEPTH 16
+/*
+ * The lock and unlock are done within a preempt disable section.
+ * The current_context per_cpu variable can only be modified
+ * by the current task between lock and unlock. But it can
+ * be modified more than once via an interrupt. To pass this
+ * information from the lock to the unlock without having to
+ * access the 'in_interrupt()' functions again (which do show
+ * a bit of overhead in something as critical as function tracing,
+ * we use a bitmask trick.
+ *
+ * bit 0 = NMI context
+ * bit 1 = IRQ context
+ * bit 2 = SoftIRQ context
+ * bit 3 = normal context.
+ *
+ * This works because this is the order of contexts that can
+ * preempt other contexts. A SoftIRQ never preempts an IRQ
+ * context.
+ *
+ * When the context is determined, the corresponding bit is
+ * checked and set (if it was set, then a recursion of that context
+ * happened).
+ *
+ * On unlock, we need to clear this bit. To do so, just subtract
+ * 1 from the current_context and AND it to itself.
+ *
+ * (binary)
+ * 101 - 1 = 100
+ * 101 & 100 = 100 (clearing bit zero)
+ *
+ * 1010 - 1 = 1001
+ * 1010 & 1001 = 1000 (clearing bit 1)
+ *
+ * The least significant bit can be cleared this way, and it
+ * just so happens that it is the same bit corresponding to
+ * the current context.
+ */
+static DEFINE_PER_CPU(unsigned int, current_context);
-/* Keep this code out of the fast path cache */
-static noinline void trace_recursive_fail(void)
+static __always_inline int trace_recursive_lock(void)
{
- /* Disable all tracing before we do anything else */
- tracing_off_permanent();
-
- printk_once(KERN_WARNING "Tracing recursion: depth[%ld]:"
- "HC[%lu]:SC[%lu]:NMI[%lu]\n",
- trace_recursion_buffer(),
- hardirq_count() >> HARDIRQ_SHIFT,
- softirq_count() >> SOFTIRQ_SHIFT,
- in_nmi());
-
- WARN_ON_ONCE(1);
-}
+ unsigned int val = this_cpu_read(current_context);
+ int bit;
-static inline int trace_recursive_lock(void)
-{
- trace_recursion_inc();
+ if (in_interrupt()) {
+ if (in_nmi())
+ bit = 0;
+ else if (in_irq())
+ bit = 1;
+ else
+ bit = 2;
+ } else
+ bit = 3;
- if (likely(trace_recursion_buffer() < TRACE_RECURSIVE_DEPTH))
- return 0;
+ if (unlikely(val & (1 << bit)))
+ return 1;
- trace_recursive_fail();
+ val |= (1 << bit);
+ this_cpu_write(current_context, val);
- return -1;
+ return 0;
}
-static inline void trace_recursive_unlock(void)
+static __always_inline void trace_recursive_unlock(void)
{
- WARN_ON_ONCE(!trace_recursion_buffer());
+ unsigned int val = this_cpu_read(current_context);
- trace_recursion_dec();
+ val--;
+ val &= this_cpu_read(current_context);
+ this_cpu_write(current_context, val);
}
#else
rb_end_commit(cpu_buffer);
}
+static __always_inline void
+rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
+{
+ if (buffer->irq_work.waiters_pending) {
+ buffer->irq_work.waiters_pending = false;
+ /* irq_work_queue() supplies it's own memory barriers */
+ irq_work_queue(&buffer->irq_work.work);
+ }
+
+ if (cpu_buffer->irq_work.waiters_pending) {
+ cpu_buffer->irq_work.waiters_pending = false;
+ /* irq_work_queue() supplies it's own memory barriers */
+ irq_work_queue(&cpu_buffer->irq_work.work);
+ }
+}
+
/**
* ring_buffer_unlock_commit - commit a reserved
* @buffer: The buffer to commit to
rb_commit(cpu_buffer, event);
+ rb_wakeups(buffer, cpu_buffer);
+
trace_recursive_unlock();
preempt_enable_notrace();
rb_commit(cpu_buffer, event);
+ rb_wakeups(buffer, cpu_buffer);
+
ret = 0;
out:
preempt_enable_notrace();
}
EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
+/**
+ * ring_buffer_read_events_cpu - get the number of events successfully read
+ * @buffer: The ring buffer
+ * @cpu: The per CPU buffer to get the number of events read
+ */
+unsigned long
+ring_buffer_read_events_cpu(struct ring_buffer *buffer, int cpu)
+{
+ struct ring_buffer_per_cpu *cpu_buffer;
+
+ if (!cpumask_test_cpu(cpu, buffer->cpumask))
+ return 0;
+
+ cpu_buffer = buffer->buffers[cpu];
+ return cpu_buffer->read;
+}
+EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
+
/**
* ring_buffer_entries - get the number of entries in a buffer
* @buffer: The ring buffer
/* check for end of page padding */
if ((iter->head >= rb_page_size(iter->head_page)) &&
(iter->head_page != cpu_buffer->commit_page))
- rb_advance_iter(iter);
+ rb_inc_iter(iter);
}
static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
return NOTIFY_OK;
}
#endif
+
+#ifdef CONFIG_RING_BUFFER_STARTUP_TEST
+/*
+ * This is a basic integrity check of the ring buffer.
+ * Late in the boot cycle this test will run when configured in.
+ * It will kick off a thread per CPU that will go into a loop
+ * writing to the per cpu ring buffer various sizes of data.
+ * Some of the data will be large items, some small.
+ *
+ * Another thread is created that goes into a spin, sending out
+ * IPIs to the other CPUs to also write into the ring buffer.
+ * this is to test the nesting ability of the buffer.
+ *
+ * Basic stats are recorded and reported. If something in the
+ * ring buffer should happen that's not expected, a big warning
+ * is displayed and all ring buffers are disabled.
+ */
+static struct task_struct *rb_threads[NR_CPUS] __initdata;
+
+struct rb_test_data {
+ struct ring_buffer *buffer;
+ unsigned long events;
+ unsigned long bytes_written;
+ unsigned long bytes_alloc;
+ unsigned long bytes_dropped;
+ unsigned long events_nested;
+ unsigned long bytes_written_nested;
+ unsigned long bytes_alloc_nested;
+ unsigned long bytes_dropped_nested;
+ int min_size_nested;
+ int max_size_nested;
+ int max_size;
+ int min_size;
+ int cpu;
+ int cnt;
+};
+
+static struct rb_test_data rb_data[NR_CPUS] __initdata;
+
+/* 1 meg per cpu */
+#define RB_TEST_BUFFER_SIZE 1048576
+
+static char rb_string[] __initdata =
+ "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
+ "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
+ "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
+
+static bool rb_test_started __initdata;
+
+struct rb_item {
+ int size;
+ char str[];
+};
+
+static __init int rb_write_something(struct rb_test_data *data, bool nested)
+{
+ struct ring_buffer_event *event;
+ struct rb_item *item;
+ bool started;
+ int event_len;
+ int size;
+ int len;
+ int cnt;
+
+ /* Have nested writes different that what is written */
+ cnt = data->cnt + (nested ? 27 : 0);
+
+ /* Multiply cnt by ~e, to make some unique increment */
+ size = (data->cnt * 68 / 25) % (sizeof(rb_string) - 1);
+
+ len = size + sizeof(struct rb_item);
+
+ started = rb_test_started;
+ /* read rb_test_started before checking buffer enabled */
+ smp_rmb();
+
+ event = ring_buffer_lock_reserve(data->buffer, len);
+ if (!event) {
+ /* Ignore dropped events before test starts. */
+ if (started) {
+ if (nested)
+ data->bytes_dropped += len;
+ else
+ data->bytes_dropped_nested += len;
+ }
+ return len;
+ }
+
+ event_len = ring_buffer_event_length(event);
+
+ if (RB_WARN_ON(data->buffer, event_len < len))
+ goto out;
+
+ item = ring_buffer_event_data(event);
+ item->size = size;
+ memcpy(item->str, rb_string, size);
+
+ if (nested) {
+ data->bytes_alloc_nested += event_len;
+ data->bytes_written_nested += len;
+ data->events_nested++;
+ if (!data->min_size_nested || len < data->min_size_nested)
+ data->min_size_nested = len;
+ if (len > data->max_size_nested)
+ data->max_size_nested = len;
+ } else {
+ data->bytes_alloc += event_len;
+ data->bytes_written += len;
+ data->events++;
+ if (!data->min_size || len < data->min_size)
+ data->max_size = len;
+ if (len > data->max_size)
+ data->max_size = len;
+ }
+
+ out:
+ ring_buffer_unlock_commit(data->buffer, event);
+
+ return 0;
+}
+
+static __init int rb_test(void *arg)
+{
+ struct rb_test_data *data = arg;
+
+ while (!kthread_should_stop()) {
+ rb_write_something(data, false);
+ data->cnt++;
+
+ set_current_state(TASK_INTERRUPTIBLE);
+ /* Now sleep between a min of 100-300us and a max of 1ms */
+ usleep_range(((data->cnt % 3) + 1) * 100, 1000);
+ }
+
+ return 0;
+}
+
+static __init void rb_ipi(void *ignore)
+{
+ struct rb_test_data *data;
+ int cpu = smp_processor_id();
+
+ data = &rb_data[cpu];
+ rb_write_something(data, true);
+}
+
+static __init int rb_hammer_test(void *arg)
+{
+ while (!kthread_should_stop()) {
+
+ /* Send an IPI to all cpus to write data! */
+ smp_call_function(rb_ipi, NULL, 1);
+ /* No sleep, but for non preempt, let others run */
+ schedule();
+ }
+
+ return 0;
+}
+
+static __init int test_ringbuffer(void)
+{
+ struct task_struct *rb_hammer;
+ struct ring_buffer *buffer;
+ int cpu;
+ int ret = 0;
+
+ pr_info("Running ring buffer tests...\n");
+
+ buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
+ if (WARN_ON(!buffer))
+ return 0;
+
+ /* Disable buffer so that threads can't write to it yet */
+ ring_buffer_record_off(buffer);
+
+ for_each_online_cpu(cpu) {
+ rb_data[cpu].buffer = buffer;
+ rb_data[cpu].cpu = cpu;
+ rb_data[cpu].cnt = cpu;
+ rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu],
+ "rbtester/%d", cpu);
+ if (WARN_ON(!rb_threads[cpu])) {
+ pr_cont("FAILED\n");
+ ret = -1;
+ goto out_free;
+ }
+
+ kthread_bind(rb_threads[cpu], cpu);
+ wake_up_process(rb_threads[cpu]);
+ }
+
+ /* Now create the rb hammer! */
+ rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
+ if (WARN_ON(!rb_hammer)) {
+ pr_cont("FAILED\n");
+ ret = -1;
+ goto out_free;
+ }
+
+ ring_buffer_record_on(buffer);
+ /*
+ * Show buffer is enabled before setting rb_test_started.
+ * Yes there's a small race window where events could be
+ * dropped and the thread wont catch it. But when a ring
+ * buffer gets enabled, there will always be some kind of
+ * delay before other CPUs see it. Thus, we don't care about
+ * those dropped events. We care about events dropped after
+ * the threads see that the buffer is active.
+ */
+ smp_wmb();
+ rb_test_started = true;
+
+ set_current_state(TASK_INTERRUPTIBLE);
+ /* Just run for 10 seconds */;
+ schedule_timeout(10 * HZ);
+
+ kthread_stop(rb_hammer);
+
+ out_free:
+ for_each_online_cpu(cpu) {
+ if (!rb_threads[cpu])
+ break;
+ kthread_stop(rb_threads[cpu]);
+ }
+ if (ret) {
+ ring_buffer_free(buffer);
+ return ret;
+ }
+
+ /* Report! */
+ pr_info("finished\n");
+ for_each_online_cpu(cpu) {
+ struct ring_buffer_event *event;
+ struct rb_test_data *data = &rb_data[cpu];
+ struct rb_item *item;
+ unsigned long total_events;
+ unsigned long total_dropped;
+ unsigned long total_written;
+ unsigned long total_alloc;
+ unsigned long total_read = 0;
+ unsigned long total_size = 0;
+ unsigned long total_len = 0;
+ unsigned long total_lost = 0;
+ unsigned long lost;
+ int big_event_size;
+ int small_event_size;
+
+ ret = -1;
+
+ total_events = data->events + data->events_nested;
+ total_written = data->bytes_written + data->bytes_written_nested;
+ total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
+ total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
+
+ big_event_size = data->max_size + data->max_size_nested;
+ small_event_size = data->min_size + data->min_size_nested;
+
+ pr_info("CPU %d:\n", cpu);
+ pr_info(" events: %ld\n", total_events);
+ pr_info(" dropped bytes: %ld\n", total_dropped);
+ pr_info(" alloced bytes: %ld\n", total_alloc);
+ pr_info(" written bytes: %ld\n", total_written);
+ pr_info(" biggest event: %d\n", big_event_size);
+ pr_info(" smallest event: %d\n", small_event_size);
+
+ if (RB_WARN_ON(buffer, total_dropped))
+ break;
+
+ ret = 0;
+
+ while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
+ total_lost += lost;
+ item = ring_buffer_event_data(event);
+ total_len += ring_buffer_event_length(event);
+ total_size += item->size + sizeof(struct rb_item);
+ if (memcmp(&item->str[0], rb_string, item->size) != 0) {
+ pr_info("FAILED!\n");
+ pr_info("buffer had: %.*s\n", item->size, item->str);
+ pr_info("expected: %.*s\n", item->size, rb_string);
+ RB_WARN_ON(buffer, 1);
+ ret = -1;
+ break;
+ }
+ total_read++;
+ }
+ if (ret)
+ break;
+
+ ret = -1;
+
+ pr_info(" read events: %ld\n", total_read);
+ pr_info(" lost events: %ld\n", total_lost);
+ pr_info(" total events: %ld\n", total_lost + total_read);
+ pr_info(" recorded len bytes: %ld\n", total_len);
+ pr_info(" recorded size bytes: %ld\n", total_size);
+ if (total_lost)
+ pr_info(" With dropped events, record len and size may not match\n"
+ " alloced and written from above\n");
+ if (!total_lost) {
+ if (RB_WARN_ON(buffer, total_len != total_alloc ||
+ total_size != total_written))
+ break;
+ }
+ if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
+ break;
+
+ ret = 0;
+ }
+ if (!ret)
+ pr_info("Ring buffer PASSED!\n");
+
+ ring_buffer_free(buffer);
+ return 0;
+}
+
+late_initcall(test_ringbuffer);
+#endif /* CONFIG_RING_BUFFER_STARTUP_TEST */