tracing: add ring_buffer_event_discard() to ring buffer
authorTom Zanussi <tzanussi@gmail.com>
Sun, 22 Mar 2009 08:30:49 +0000 (03:30 -0500)
committerIngo Molnar <mingo@elte.hu>
Sun, 22 Mar 2009 17:38:25 +0000 (18:38 +0100)
This patch overloads RINGBUF_TYPE_PADDING to provide a way to discard
events from the ring buffer, for the event-filtering mechanism
introduced in a subsequent patch.

I did the initial version but thanks to Steven Rostedt for adding
the parts that actually made it work. ;-)

Signed-off-by: Tom Zanussi <tzanussi@gmail.com>
Acked-by: Frederic Weisbecker <fweisbec@gmail.com>
Signed-off-by: Ingo Molnar <mingo@elte.hu>
include/linux/ring_buffer.h
kernel/trace/ring_buffer.c

index 9e6052bd1a1c9f42edafe3959b5b57e8f652e159..e1b7b2173885f8f14f4a8da3979d0a6eff08e186 100644 (file)
@@ -18,10 +18,13 @@ struct ring_buffer_event {
 /**
  * enum ring_buffer_type - internal ring buffer types
  *
- * @RINGBUF_TYPE_PADDING:      Left over page padding
- *                              array is ignored
- *                              size is variable depending on how much
+ * @RINGBUF_TYPE_PADDING:      Left over page padding or discarded event
+ *                              If time_delta is 0:
+ *                               array is ignored
+ *                               size is variable depending on how much
  *                               padding is needed
+ *                              If time_delta is non zero:
+ *                               everything else same as RINGBUF_TYPE_DATA
  *
  * @RINGBUF_TYPE_TIME_EXTEND:  Extend the time delta
  *                              array[0] = time delta (28 .. 59)
@@ -65,6 +68,8 @@ ring_buffer_event_time_delta(struct ring_buffer_event *event)
        return event->time_delta;
 }
 
+void ring_buffer_event_discard(struct ring_buffer_event *event);
+
 /*
  * size is in bytes for each per CPU buffer.
  */
index 384ca5d9d729dc8df1e547f8f64eaa2bd6fe5f3d..a09027ec17146dfd46b9729041181da75720e97f 100644 (file)
@@ -189,16 +189,65 @@ enum {
        RB_LEN_TIME_STAMP = 16,
 };
 
-/* inline for ring buffer fast paths */
+static inline int rb_null_event(struct ring_buffer_event *event)
+{
+       return event->type == RINGBUF_TYPE_PADDING && event->time_delta == 0;
+}
+
+static inline int rb_discarded_event(struct ring_buffer_event *event)
+{
+       return event->type == RINGBUF_TYPE_PADDING && event->time_delta;
+}
+
+static void rb_event_set_padding(struct ring_buffer_event *event)
+{
+       event->type = RINGBUF_TYPE_PADDING;
+       event->time_delta = 0;
+}
+
+/**
+ * ring_buffer_event_discard - discard an event in the ring buffer
+ * @buffer: the ring buffer
+ * @event: the event to discard
+ *
+ * Sometimes a event that is in the ring buffer needs to be ignored.
+ * This function lets the user discard an event in the ring buffer
+ * and then that event will not be read later.
+ *
+ * Note, it is up to the user to be careful with this, and protect
+ * against races. If the user discards an event that has been consumed
+ * it is possible that it could corrupt the ring buffer.
+ */
+void ring_buffer_event_discard(struct ring_buffer_event *event)
+{
+       event->type = RINGBUF_TYPE_PADDING;
+       /* time delta must be non zero */
+       if (!event->time_delta)
+               event->time_delta = 1;
+}
+
 static unsigned
-rb_event_length(struct ring_buffer_event *event)
+rb_event_data_length(struct ring_buffer_event *event)
 {
        unsigned length;
 
+       if (event->len)
+               length = event->len * RB_ALIGNMENT;
+       else
+               length = event->array[0];
+       return length + RB_EVNT_HDR_SIZE;
+}
+
+/* inline for ring buffer fast paths */
+static unsigned
+rb_event_length(struct ring_buffer_event *event)
+{
        switch (event->type) {
        case RINGBUF_TYPE_PADDING:
-               /* undefined */
-               return -1;
+               if (rb_null_event(event))
+                       /* undefined */
+                       return -1;
+               return rb_event_data_length(event);
 
        case RINGBUF_TYPE_TIME_EXTEND:
                return RB_LEN_TIME_EXTEND;
@@ -207,11 +256,7 @@ rb_event_length(struct ring_buffer_event *event)
                return RB_LEN_TIME_STAMP;
 
        case RINGBUF_TYPE_DATA:
-               if (event->len)
-                       length = event->len * RB_ALIGNMENT;
-               else
-                       length = event->array[0];
-               return length + RB_EVNT_HDR_SIZE;
+               return rb_event_data_length(event);
        default:
                BUG();
        }
@@ -845,11 +890,6 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_resize);
 
-static inline int rb_null_event(struct ring_buffer_event *event)
-{
-       return event->type == RINGBUF_TYPE_PADDING;
-}
-
 static inline void *
 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
 {
@@ -1219,7 +1259,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
                if (tail < BUF_PAGE_SIZE) {
                        /* Mark the rest of the page with padding */
                        event = __rb_page_index(tail_page, tail);
-                       event->type = RINGBUF_TYPE_PADDING;
+                       rb_event_set_padding(event);
                }
 
                if (tail <= BUF_PAGE_SIZE)
@@ -1969,7 +2009,7 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
 
        event = rb_reader_event(cpu_buffer);
 
-       if (event->type == RINGBUF_TYPE_DATA)
+       if (event->type == RINGBUF_TYPE_DATA || rb_discarded_event(event))
                cpu_buffer->entries--;
 
        rb_update_read_stamp(cpu_buffer, event);
@@ -2052,9 +2092,18 @@ rb_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
 
        switch (event->type) {
        case RINGBUF_TYPE_PADDING:
-               RB_WARN_ON(cpu_buffer, 1);
+               if (rb_null_event(event))
+                       RB_WARN_ON(cpu_buffer, 1);
+               /*
+                * Because the writer could be discarding every
+                * event it creates (which would probably be bad)
+                * if we were to go back to "again" then we may never
+                * catch up, and will trigger the warn on, or lock
+                * the box. Return the padding, and we will release
+                * the current locks, and try again.
+                */
                rb_advance_reader(cpu_buffer);
-               return NULL;
+               return event;
 
        case RINGBUF_TYPE_TIME_EXTEND:
                /* Internal data, OK to advance */
@@ -2115,8 +2164,12 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
 
        switch (event->type) {
        case RINGBUF_TYPE_PADDING:
-               rb_inc_iter(iter);
-               goto again;
+               if (rb_null_event(event)) {
+                       rb_inc_iter(iter);
+                       goto again;
+               }
+               rb_advance_iter(iter);
+               return event;
 
        case RINGBUF_TYPE_TIME_EXTEND:
                /* Internal data, OK to advance */
@@ -2163,10 +2216,16 @@ ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
        if (!cpumask_test_cpu(cpu, buffer->cpumask))
                return NULL;
 
+ again:
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
        event = rb_buffer_peek(buffer, cpu, ts);
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
+       if (event && event->type == RINGBUF_TYPE_PADDING) {
+               cpu_relax();
+               goto again;
+       }
+
        return event;
 }
 
@@ -2185,10 +2244,16 @@ ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
        struct ring_buffer_event *event;
        unsigned long flags;
 
+ again:
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
        event = rb_iter_peek(iter, ts);
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
+       if (event && event->type == RINGBUF_TYPE_PADDING) {
+               cpu_relax();
+               goto again;
+       }
+
        return event;
 }
 
@@ -2207,6 +2272,7 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
        struct ring_buffer_event *event = NULL;
        unsigned long flags;
 
+ again:
        /* might be called in atomic */
        preempt_disable();
 
@@ -2228,6 +2294,11 @@ ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
  out:
        preempt_enable();
 
+       if (event && event->type == RINGBUF_TYPE_PADDING) {
+               cpu_relax();
+               goto again;
+       }
+
        return event;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_consume);
@@ -2306,6 +2377,7 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
        unsigned long flags;
 
+ again:
        spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
        event = rb_iter_peek(iter, ts);
        if (!event)
@@ -2315,6 +2387,11 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
  out:
        spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
+       if (event && event->type == RINGBUF_TYPE_PADDING) {
+               cpu_relax();
+               goto again;
+       }
+
        return event;
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read);