rcu: Make rcu_barrier() less disruptive
authorPaul E. McKenney <paul.mckenney@linaro.org>
Thu, 1 Mar 2012 21:18:08 +0000 (13:18 -0800)
committerPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Wed, 9 May 2012 21:27:54 +0000 (14:27 -0700)
The rcu_barrier() primitive interrupts each and every CPU, registering
a callback on every CPU.  Once all of these callbacks have been invoked,
rcu_barrier() knows that every callback that was registered before
the call to rcu_barrier() has also been invoked.

However, there is no point in registering a callback on a CPU that
currently has no callbacks, most especially if that CPU is in a
deep idle state.  This commit therefore makes rcu_barrier() avoid
interrupting CPUs that have no callbacks.  Doing this requires reworking
the handling of orphaned callbacks, otherwise callbacks could slip through
rcu_barrier()'s net by being orphaned from a CPU that rcu_barrier() had
not yet interrupted to a CPU that rcu_barrier() had already interrupted.
This reworking was needed anyway to take a first step towards weaning
RCU from the CPU_DYING notifier's use of stop_cpu().

Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
kernel/rcutree.c
kernel/rcutree.h
kernel/rcutree_trace.c

index 403306b86e78bb40b8c9a9dd319db4f951fe7860..e578dd327c6465e19f6f2be3ff07e5b0b15a5ed6 100644 (file)
@@ -75,6 +75,8 @@ static struct lock_class_key rcu_node_class[NUM_RCU_LVLS];
        .gpnum = -300, \
        .completed = -300, \
        .onofflock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.onofflock), \
+       .orphan_nxttail = &structname##_state.orphan_nxtlist, \
+       .orphan_donetail = &structname##_state.orphan_donelist, \
        .fqslock = __RAW_SPIN_LOCK_UNLOCKED(&structname##_state.fqslock), \
        .n_force_qs = 0, \
        .n_force_qs_ngp = 0, \
@@ -145,6 +147,13 @@ static void invoke_rcu_callbacks(struct rcu_state *rsp, struct rcu_data *rdp);
 unsigned long rcutorture_testseq;
 unsigned long rcutorture_vernum;
 
+/* State information for rcu_barrier() and friends. */
+
+static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
+static atomic_t rcu_barrier_cpu_count;
+static DEFINE_MUTEX(rcu_barrier_mutex);
+static struct completion rcu_barrier_completion;
+
 /*
  * Return true if an RCU grace period is in progress.  The ACCESS_ONCE()s
  * permit this function to be invoked without holding the root rcu_node
@@ -1311,95 +1320,133 @@ rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
 #ifdef CONFIG_HOTPLUG_CPU
 
 /*
- * Move a dying CPU's RCU callbacks to online CPU's callback list.
- * Also record a quiescent state for this CPU for the current grace period.
- * Synchronization and interrupt disabling are not required because
- * this function executes in stop_machine() context.  Therefore, cleanup
- * operations that might block must be done later from the CPU_DEAD
- * notifier.
- *
- * Note that the outgoing CPU's bit has already been cleared in the
- * cpu_online_mask.  This allows us to randomly pick a callback
- * destination from the bits set in that mask.
+ * Send the specified CPU's RCU callbacks to the orphanage.  The
+ * specified CPU must be offline, and the caller must hold the
+ * ->onofflock.
  */
-static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
+static void
+rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
+                         struct rcu_node *rnp, struct rcu_data *rdp)
 {
        int i;
-       unsigned long mask;
-       int receive_cpu = cpumask_any(cpu_online_mask);
-       struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
-       struct rcu_data *receive_rdp = per_cpu_ptr(rsp->rda, receive_cpu);
-       RCU_TRACE(struct rcu_node *rnp = rdp->mynode); /* For dying CPU. */
 
-       /* First, adjust the counts. */
+       /*
+        * Orphan the callbacks.  First adjust the counts.  This is safe
+        * because ->onofflock excludes _rcu_barrier()'s adoption of
+        * the callbacks, thus no memory barrier is required.
+        */
        if (rdp->nxtlist != NULL) {
-               receive_rdp->qlen_lazy += rdp->qlen_lazy;
-               receive_rdp->qlen += rdp->qlen;
+               rsp->qlen_lazy += rdp->qlen_lazy;
+               rsp->qlen += rdp->qlen;
+               rdp->n_cbs_orphaned += rdp->qlen;
                rdp->qlen_lazy = 0;
                rdp->qlen = 0;
        }
 
        /*
-        * Next, move ready-to-invoke callbacks to be invoked on some
-        * other CPU.  These will not be required to pass through another
-        * grace period:  They are done, regardless of CPU.
+        * Next, move those callbacks still needing a grace period to
+        * the orphanage, where some other CPU will pick them up.
+        * Some of the callbacks might have gone partway through a grace
+        * period, but that is too bad.  They get to start over because we
+        * cannot assume that grace periods are synchronized across CPUs.
+        * We don't bother updating the ->nxttail[] array yet, instead
+        * we just reset the whole thing later on.
         */
-       if (rdp->nxtlist != NULL &&
-           rdp->nxttail[RCU_DONE_TAIL] != &rdp->nxtlist) {
-               struct rcu_head *oldhead;
-               struct rcu_head **oldtail;
-               struct rcu_head **newtail;
-
-               oldhead = rdp->nxtlist;
-               oldtail = receive_rdp->nxttail[RCU_DONE_TAIL];
-               rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
-               *rdp->nxttail[RCU_DONE_TAIL] = *oldtail;
-               *receive_rdp->nxttail[RCU_DONE_TAIL] = oldhead;
-               newtail = rdp->nxttail[RCU_DONE_TAIL];
-               for (i = RCU_DONE_TAIL; i < RCU_NEXT_SIZE; i++) {
-                       if (receive_rdp->nxttail[i] == oldtail)
-                               receive_rdp->nxttail[i] = newtail;
-                       if (rdp->nxttail[i] == newtail)
-                               rdp->nxttail[i] = &rdp->nxtlist;
-               }
+       if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
+               *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
+               rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
+               *rdp->nxttail[RCU_DONE_TAIL] = NULL;
        }
 
        /*
-        * Finally, put the rest of the callbacks at the end of the list.
-        * The ones that made it partway through get to start over:  We
-        * cannot assume that grace periods are synchronized across CPUs.
-        * (We could splice RCU_WAIT_TAIL into RCU_NEXT_READY_TAIL, but
-        * this does not seem compelling.  Not yet, anyway.)
+        * Then move the ready-to-invoke callbacks to the orphanage,
+        * where some other CPU will pick them up.  These will not be
+        * required to pass though another grace period: They are done.
         */
        if (rdp->nxtlist != NULL) {
-               *receive_rdp->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
-               receive_rdp->nxttail[RCU_NEXT_TAIL] =
-                               rdp->nxttail[RCU_NEXT_TAIL];
-               receive_rdp->n_cbs_adopted += rdp->qlen;
-               rdp->n_cbs_orphaned += rdp->qlen;
-
-               rdp->nxtlist = NULL;
-               for (i = 0; i < RCU_NEXT_SIZE; i++)
-                       rdp->nxttail[i] = &rdp->nxtlist;
+               *rsp->orphan_donetail = rdp->nxtlist;
+               rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
        }
 
+       /* Finally, initialize the rcu_data structure's list to empty.  */
+       rdp->nxtlist = NULL;
+       for (i = 0; i < RCU_NEXT_SIZE; i++)
+               rdp->nxttail[i] = &rdp->nxtlist;
+}
+
+/*
+ * Adopt the RCU callbacks from the specified rcu_state structure's
+ * orphanage.  The caller must hold the ->onofflock.
+ */
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+       int i;
+       struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
+
        /*
-        * Record a quiescent state for the dying CPU.  This is safe
-        * only because we have already cleared out the callbacks.
-        * (Otherwise, the RCU core might try to schedule the invocation
-        * of callbacks on this now-offline CPU, which would be bad.)
+        * If there is an rcu_barrier() operation in progress, then
+        * only the task doing that operation is permitted to adopt
+        * callbacks.  To do otherwise breaks rcu_barrier() and friends
+        * by causing them to fail to wait for the callbacks in the
+        * orphanage.
         */
-       mask = rdp->grpmask;    /* rnp->grplo is constant. */
+       if (rsp->rcu_barrier_in_progress &&
+           rsp->rcu_barrier_in_progress != current)
+               return;
+
+       /* Do the accounting first. */
+       rdp->qlen_lazy += rsp->qlen_lazy;
+       rdp->qlen += rsp->qlen;
+       rdp->n_cbs_adopted += rsp->qlen;
+       rsp->qlen_lazy = 0;
+       rsp->qlen = 0;
+
+       /*
+        * We do not need a memory barrier here because the only way we
+        * can get here if there is an rcu_barrier() in flight is if
+        * we are the task doing the rcu_barrier().
+        */
+
+       /* First adopt the ready-to-invoke callbacks. */
+       if (rsp->orphan_donelist != NULL) {
+               *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
+               *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
+               for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
+                       if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
+                               rdp->nxttail[i] = rsp->orphan_donetail;
+               rsp->orphan_donelist = NULL;
+               rsp->orphan_donetail = &rsp->orphan_donelist;
+       }
+
+       /* And then adopt the callbacks that still need a grace period. */
+       if (rsp->orphan_nxtlist != NULL) {
+               *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
+               rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
+               rsp->orphan_nxtlist = NULL;
+               rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+       }
+}
+
+/*
+ * Trace the fact that this CPU is going offline.
+ */
+static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
+{
+       RCU_TRACE(unsigned long mask);
+       RCU_TRACE(struct rcu_data *rdp = this_cpu_ptr(rsp->rda));
+       RCU_TRACE(struct rcu_node *rnp = rdp->mynode);
+
+       RCU_TRACE(mask = rdp->grpmask);
        trace_rcu_grace_period(rsp->name,
                               rnp->gpnum + 1 - !!(rnp->qsmask & mask),
                               "cpuofl");
-       rcu_report_qs_rdp(smp_processor_id(), rsp, rdp, rsp->gpnum);
-       /* Note that rcu_report_qs_rdp() might call trace_rcu_grace_period(). */
 }
 
 /*
  * The CPU has been completely removed, and some other CPU is reporting
- * this fact from process context.  Do the remainder of the cleanup.
+ * this fact from process context.  Do the remainder of the cleanup,
+ * including orphaning the outgoing CPU's RCU callbacks, and also
+ * adopting them, if there is no _rcu_barrier() instance running.
  * There can only be one CPU hotplug operation at a time, so no other
  * CPU can be attempting to update rcu_cpu_kthread_task.
  */
@@ -1409,17 +1456,21 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
        unsigned long mask;
        int need_report = 0;
        struct rcu_data *rdp = per_cpu_ptr(rsp->rda, cpu);
-       struct rcu_node *rnp = rdp->mynode;  /* Outgoing CPU's rnp. */
+       struct rcu_node *rnp = rdp->mynode;  /* Outgoing CPU's rdp & rnp. */
 
        /* Adjust any no-longer-needed kthreads. */
        rcu_stop_cpu_kthread(cpu);
        rcu_node_kthread_setaffinity(rnp, -1);
 
-       /* Remove the dying CPU from the bitmasks in the rcu_node hierarchy. */
+       /* Remove the dead CPU from the bitmasks in the rcu_node hierarchy. */
 
        /* Exclude any attempts to start a new grace period. */
        raw_spin_lock_irqsave(&rsp->onofflock, flags);
 
+       /* Orphan the dead CPU's callbacks, and adopt them if appropriate. */
+       rcu_send_cbs_to_orphanage(cpu, rsp, rnp, rdp);
+       rcu_adopt_orphan_cbs(rsp);
+
        /* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
        mask = rdp->grpmask;    /* rnp->grplo is constant. */
        do {
@@ -1456,6 +1507,10 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 
 #else /* #ifdef CONFIG_HOTPLUG_CPU */
 
+static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
+{
+}
+
 static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
 {
 }
@@ -1524,9 +1579,6 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
                            rcu_is_callbacks_kthread());
 
        /* Update count, and requeue any remaining callbacks. */
-       rdp->qlen_lazy -= count_lazy;
-       rdp->qlen -= count;
-       rdp->n_cbs_invoked += count;
        if (list != NULL) {
                *tail = rdp->nxtlist;
                rdp->nxtlist = list;
@@ -1536,6 +1588,10 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
                        else
                                break;
        }
+       smp_mb(); /* List handling before counting for rcu_barrier(). */
+       rdp->qlen_lazy -= count_lazy;
+       rdp->qlen -= count;
+       rdp->n_cbs_invoked += count;
 
        /* Reinstate batch limit if we have worked down the excess. */
        if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
@@ -1824,13 +1880,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
        rdp = this_cpu_ptr(rsp->rda);
 
        /* Add the callback to our list. */
-       *rdp->nxttail[RCU_NEXT_TAIL] = head;
-       rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
        rdp->qlen++;
        if (lazy)
                rdp->qlen_lazy++;
        else
                rcu_idle_count_callbacks_posted();
+       smp_mb();  /* Count before adding callback for rcu_barrier(). */
+       *rdp->nxttail[RCU_NEXT_TAIL] = head;
+       rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
 
        if (__is_kfree_rcu_offset((unsigned long)func))
                trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
@@ -2169,11 +2226,10 @@ static int rcu_cpu_has_callbacks(int cpu)
               rcu_preempt_cpu_has_callbacks(cpu);
 }
 
-static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head) = {NULL};
-static atomic_t rcu_barrier_cpu_count;
-static DEFINE_MUTEX(rcu_barrier_mutex);
-static struct completion rcu_barrier_completion;
-
+/*
+ * RCU callback function for _rcu_barrier().  If we are last, wake
+ * up the task executing _rcu_barrier().
+ */
 static void rcu_barrier_callback(struct rcu_head *notused)
 {
        if (atomic_dec_and_test(&rcu_barrier_cpu_count))
@@ -2203,27 +2259,94 @@ static void _rcu_barrier(struct rcu_state *rsp,
                         void (*call_rcu_func)(struct rcu_head *head,
                                               void (*func)(struct rcu_head *head)))
 {
-       BUG_ON(in_interrupt());
+       int cpu;
+       unsigned long flags;
+       struct rcu_data *rdp;
+       struct rcu_head rh;
+
+       init_rcu_head_on_stack(&rh);
+
        /* Take mutex to serialize concurrent rcu_barrier() requests. */
        mutex_lock(&rcu_barrier_mutex);
-       init_completion(&rcu_barrier_completion);
+
+       smp_mb();  /* Prevent any prior operations from leaking in. */
+
        /*
-        * Initialize rcu_barrier_cpu_count to 1, then invoke
-        * rcu_barrier_func() on each CPU, so that each CPU also has
-        * incremented rcu_barrier_cpu_count.  Only then is it safe to
-        * decrement rcu_barrier_cpu_count -- otherwise the first CPU
-        * might complete its grace period before all of the other CPUs
-        * did their increment, causing this function to return too
-        * early.  Note that on_each_cpu() disables irqs, which prevents
-        * any CPUs from coming online or going offline until each online
-        * CPU has queued its RCU-barrier callback.
+        * Initialize the count to one rather than to zero in order to
+        * avoid a too-soon return to zero in case of a short grace period
+        * (or preemption of this task).  Also flag this task as doing
+        * an rcu_barrier().  This will prevent anyone else from adopting
+        * orphaned callbacks, which could cause otherwise failure if a
+        * CPU went offline and quickly came back online.  To see this,
+        * consider the following sequence of events:
+        *
+        * 1.   We cause CPU 0 to post an rcu_barrier_callback() callback.
+        * 2.   CPU 1 goes offline, orphaning its callbacks.
+        * 3.   CPU 0 adopts CPU 1's orphaned callbacks.
+        * 4.   CPU 1 comes back online.
+        * 5.   We cause CPU 1 to post an rcu_barrier_callback() callback.
+        * 6.   Both rcu_barrier_callback() callbacks are invoked, awakening
+        *      us -- but before CPU 1's orphaned callbacks are invoked!!!
         */
+       init_completion(&rcu_barrier_completion);
        atomic_set(&rcu_barrier_cpu_count, 1);
-       on_each_cpu(rcu_barrier_func, (void *)call_rcu_func, 1);
+       raw_spin_lock_irqsave(&rsp->onofflock, flags);
+       rsp->rcu_barrier_in_progress = current;
+       raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+
+       /*
+        * Force every CPU with callbacks to register a new callback
+        * that will tell us when all the preceding callbacks have
+        * been invoked.  If an offline CPU has callbacks, wait for
+        * it to either come back online or to finish orphaning those
+        * callbacks.
+        */
+       for_each_possible_cpu(cpu) {
+               preempt_disable();
+               rdp = per_cpu_ptr(rsp->rda, cpu);
+               if (cpu_is_offline(cpu)) {
+                       preempt_enable();
+                       while (cpu_is_offline(cpu) && ACCESS_ONCE(rdp->qlen))
+                               schedule_timeout_interruptible(1);
+               } else if (ACCESS_ONCE(rdp->qlen)) {
+                       smp_call_function_single(cpu, rcu_barrier_func,
+                                                (void *)call_rcu_func, 1);
+                       preempt_enable();
+               } else {
+                       preempt_enable();
+               }
+       }
+
+       /*
+        * Now that all online CPUs have rcu_barrier_callback() callbacks
+        * posted, we can adopt all of the orphaned callbacks and place
+        * an rcu_barrier_callback() callback after them.  When that is done,
+        * we are guaranteed to have an rcu_barrier_callback() callback
+        * following every callback that could possibly have been
+        * registered before _rcu_barrier() was called.
+        */
+       raw_spin_lock_irqsave(&rsp->onofflock, flags);
+       rcu_adopt_orphan_cbs(rsp);
+       rsp->rcu_barrier_in_progress = NULL;
+       raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
+       atomic_inc(&rcu_barrier_cpu_count);
+       smp_mb__after_atomic_inc(); /* Ensure atomic_inc() before callback. */
+       call_rcu_func(&rh, rcu_barrier_callback);
+
+       /*
+        * Now that we have an rcu_barrier_callback() callback on each
+        * CPU, and thus each counted, remove the initial count.
+        */
        if (atomic_dec_and_test(&rcu_barrier_cpu_count))
                complete(&rcu_barrier_completion);
+
+       /* Wait for all rcu_barrier_callback() callbacks to be invoked. */
        wait_for_completion(&rcu_barrier_completion);
+
+       /* Other rcu_barrier() invocations can now safely proceed. */
        mutex_unlock(&rcu_barrier_mutex);
+
+       destroy_rcu_head_on_stack(&rh);
 }
 
 /**
index 36ca28ecedc67028242dd6b9d51acb691120eb11..1e49c56859600f99ea655c025babc7a0f3af5d32 100644 (file)
@@ -371,6 +371,17 @@ struct rcu_state {
 
        raw_spinlock_t onofflock;               /* exclude on/offline and */
                                                /*  starting new GP. */
+       struct rcu_head *orphan_nxtlist;        /* Orphaned callbacks that */
+                                               /*  need a grace period. */
+       struct rcu_head **orphan_nxttail;       /* Tail of above. */
+       struct rcu_head *orphan_donelist;       /* Orphaned callbacks that */
+                                               /*  are ready to invoke. */
+       struct rcu_head **orphan_donetail;      /* Tail of above. */
+       long qlen_lazy;                         /* Number of lazy callbacks. */
+       long qlen;                              /* Total number of callbacks. */
+       struct task_struct *rcu_barrier_in_progress;
+                                               /* Task doing rcu_barrier(), */
+                                               /*  or NULL if no barrier. */
        raw_spinlock_t fqslock;                 /* Only one task forcing */
                                                /*  quiescent states. */
        unsigned long jiffies_force_qs;         /* Time at which to invoke */
index ed459edeff43826cdd642445c24e0b5aae03a836..d4bc16ddd1d4aa09efb52cbee140f1f0b45afc36 100644 (file)
@@ -271,13 +271,13 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
 
        gpnum = rsp->gpnum;
        seq_printf(m, "c=%lu g=%lu s=%d jfq=%ld j=%x "
-                     "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu\n",
+                     "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
                   rsp->completed, gpnum, rsp->fqs_state,
                   (long)(rsp->jiffies_force_qs - jiffies),
                   (int)(jiffies & 0xffff),
                   rsp->n_force_qs, rsp->n_force_qs_ngp,
                   rsp->n_force_qs - rsp->n_force_qs_ngp,
-                  rsp->n_force_qs_lh);
+                  rsp->n_force_qs_lh, rsp->qlen_lazy, rsp->qlen);
        for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
                if (rnp->level != level) {
                        seq_puts(m, "\n");