rcu: Enforce expedited-GP fairness via funnel wait queue
authorPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Sun, 31 Jan 2016 01:57:35 +0000 (17:57 -0800)
committerPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Thu, 31 Mar 2016 20:34:08 +0000 (13:34 -0700)
The current mutex-based funnel-locking approach used by expedited grace
periods is subject to severe unfairness.  The problem arises when a
few tasks, making a path from leaves to root, all wake up before other
tasks do.  A new task can then follow this path all the way to the root,
which needlessly delays tasks whose grace period is done, but who do
not happen to acquire the lock quickly enough.

This commit avoids this problem by maintaining per-rcu_node wait queues,
along with a per-rcu_node counter that tracks the latest grace period
sought by an earlier task to visit this node.  If that grace period
would satisfy the current task, instead of proceeding up the tree,
it waits on the current rcu_node structure using a pair of wait queues
provided for that purpose.  This decouples awakening of old tasks from
the arrival of new tasks.

If the wakeups prove to be a bottleneck, additional kthreads can be
brought to bear for that purpose.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
include/trace/events/rcu.h
kernel/rcu/tree.c
kernel/rcu/tree.h
kernel/rcu/tree_plugin.h

index aacc172eba7ee78c68d11906569ee090bddbf89c..d3e756539d44c6c94941ee457914785c0566a8e4 100644 (file)
@@ -179,6 +179,7 @@ TRACE_EVENT(rcu_grace_period_init,
  *     "snap": Captured snapshot of expedited grace period sequence number.
  *     "start": Started a real expedited grace period.
  *     "end": Ended a real expedited grace period.
+ *     "endwake": Woke piggybackers up.
  *     "done": Someone else did the expedited grace period for us.
  */
 TRACE_EVENT(rcu_exp_grace_period,
@@ -210,8 +211,8 @@ TRACE_EVENT(rcu_exp_grace_period,
  * and highest-numbered CPU associated with the current rcu_node structure,
  * and a string.  identifying the grace-period-related event as follows:
  *
- *     "acq": Acquired a level of funnel lock
- *     "rel": Released a level of funnel lock
+ *     "nxtlvl": Advance to next level of rcu_node funnel
+ *     "wait": Wait for someone else to do expedited GP
  */
 TRACE_EVENT(rcu_exp_funnel_lock,
 
index 89f028767765705193ba8e06d8557c65a7a0efef..bd2658edce0069fbaa4ed94b290fac25f6efd078 100644 (file)
@@ -102,6 +102,7 @@ struct rcu_state sname##_state = { \
        .barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
        .name = RCU_STATE_NAME(sname), \
        .abbr = sabbr, \
+       .exp_mutex = __MUTEX_INITIALIZER(sname##_state.exp_mutex), \
 }
 
 RCU_STATE_INITIALIZER(rcu_sched, 's', call_rcu_sched);
@@ -3484,7 +3485,7 @@ static void __maybe_unused sync_exp_reset_tree(struct rcu_state *rsp)
  * for the current expedited grace period.  Works only for preemptible
  * RCU -- other RCU implementation use other means.
  *
- * Caller must hold the root rcu_node's exp_funnel_mutex.
+ * Caller must hold the rcu_state's exp_mutex.
  */
 static int sync_rcu_preempt_exp_done(struct rcu_node *rnp)
 {
@@ -3500,8 +3501,8 @@ static int sync_rcu_preempt_exp_done(struct rcu_node *rnp)
  * recursively up the tree.  (Calm down, calm down, we do the recursion
  * iteratively!)
  *
- * Caller must hold the root rcu_node's exp_funnel_mutex and the
- * specified rcu_node structure's ->lock.
+ * Caller must hold the rcu_state's exp_mutex and the specified rcu_node
+ * structure's ->lock.
  */
 static void __rcu_report_exp_rnp(struct rcu_state *rsp, struct rcu_node *rnp,
                                 bool wake, unsigned long flags)
@@ -3538,7 +3539,7 @@ static void __rcu_report_exp_rnp(struct rcu_state *rsp, struct rcu_node *rnp,
  * Report expedited quiescent state for specified node.  This is a
  * lock-acquisition wrapper function for __rcu_report_exp_rnp().
  *
- * Caller must hold the root rcu_node's exp_funnel_mutex.
+ * Caller must hold the rcu_state's exp_mutex.
  */
 static void __maybe_unused rcu_report_exp_rnp(struct rcu_state *rsp,
                                              struct rcu_node *rnp, bool wake)
@@ -3551,8 +3552,8 @@ static void __maybe_unused rcu_report_exp_rnp(struct rcu_state *rsp,
 
 /*
  * Report expedited quiescent state for multiple CPUs, all covered by the
- * specified leaf rcu_node structure.  Caller must hold the root
- * rcu_node's exp_funnel_mutex.
+ * specified leaf rcu_node structure.  Caller must hold the rcu_state's
+ * exp_mutex.
  */
 static void rcu_report_exp_cpu_mult(struct rcu_state *rsp, struct rcu_node *rnp,
                                    unsigned long mask, bool wake)
@@ -3570,7 +3571,6 @@ static void rcu_report_exp_cpu_mult(struct rcu_state *rsp, struct rcu_node *rnp,
 
 /*
  * Report expedited quiescent state for specified rcu_data (CPU).
- * Caller must hold the root rcu_node's exp_funnel_mutex.
  */
 static void rcu_report_exp_rdp(struct rcu_state *rsp, struct rcu_data *rdp,
                               bool wake)
@@ -3579,24 +3579,11 @@ static void rcu_report_exp_rdp(struct rcu_state *rsp, struct rcu_data *rdp,
 }
 
 /* Common code for synchronize_{rcu,sched}_expedited() work-done checking. */
-static bool sync_exp_work_done(struct rcu_state *rsp, struct rcu_node *rnp,
-                              struct rcu_data *rdp,
-                              atomic_long_t *stat, unsigned long s)
+static bool sync_exp_work_done(struct rcu_state *rsp, atomic_long_t *stat,
+                              unsigned long s)
 {
        if (rcu_exp_gp_seq_done(rsp, s)) {
                trace_rcu_exp_grace_period(rsp->name, s, TPS("done"));
-               if (rnp) {
-                       trace_rcu_exp_funnel_lock(rsp->name, rnp->level,
-                                                 rnp->grplo, rnp->grphi,
-                                                 TPS("rel"));
-                       mutex_unlock(&rnp->exp_funnel_mutex);
-               } else if (rdp) {
-                       trace_rcu_exp_funnel_lock(rsp->name,
-                                                 rdp->mynode->level + 1,
-                                                 rdp->cpu, rdp->cpu,
-                                                 TPS("rel"));
-                       mutex_unlock(&rdp->exp_funnel_mutex);
-               }
                /* Ensure test happens before caller kfree(). */
                smp_mb__before_atomic(); /* ^^^ */
                atomic_long_inc(stat);
@@ -3606,53 +3593,53 @@ static bool sync_exp_work_done(struct rcu_state *rsp, struct rcu_node *rnp,
 }
 
 /*
- * Funnel-lock acquisition for expedited grace periods.  Returns a
- * pointer to the root rcu_node structure, or NULL if some other
- * task did the expedited grace period for us.
+ * Funnel-lock acquisition for expedited grace periods.  Returns true
+ * if some other task completed an expedited grace period that this task
+ * can piggy-back on, and with no mutex held.  Otherwise, returns false
+ * with the mutex held, indicating that the caller must actually do the
+ * expedited grace period.
  */
-static struct rcu_node *exp_funnel_lock(struct rcu_state *rsp, unsigned long s)
+static bool exp_funnel_lock(struct rcu_state *rsp, unsigned long s)
 {
        struct rcu_data *rdp = per_cpu_ptr(rsp->rda, raw_smp_processor_id());
-       struct rcu_node *rnp0;
-       struct rcu_node *rnp1 = NULL;
+       struct rcu_node *rnp = rdp->mynode;
 
        /*
-        * Each pass through the following loop works its way
-        * up the rcu_node tree, returning if others have done the
-        * work or otherwise falls through holding the root rnp's
-        * ->exp_funnel_mutex.  The mapping from CPU to rcu_node structure
-        * can be inexact, as it is just promoting locality and is not
-        * strictly needed for correctness.
+        * Each pass through the following loop works its way up
+        * the rcu_node tree, returning if others have done the work or
+        * otherwise falls through to acquire rsp->exp_mutex.  The mapping
+        * from CPU to rcu_node structure can be inexact, as it is just
+        * promoting locality and is not strictly needed for correctness.
         */
-       if (sync_exp_work_done(rsp, NULL, NULL, &rdp->exp_workdone1, s))
-               return NULL;
-       mutex_lock(&rdp->exp_funnel_mutex);
-       trace_rcu_exp_funnel_lock(rsp->name, rdp->mynode->level + 1,
-                                 rdp->cpu, rdp->cpu, TPS("acq"));
-       rnp0 = rdp->mynode;
-       for (; rnp0 != NULL; rnp0 = rnp0->parent) {
-               if (sync_exp_work_done(rsp, rnp1, rdp, &rdp->exp_workdone2, s))
-                       return NULL;
-               mutex_lock(&rnp0->exp_funnel_mutex);
-               trace_rcu_exp_funnel_lock(rsp->name, rnp0->level,
-                                         rnp0->grplo, rnp0->grphi, TPS("acq"));
-               if (rnp1) {
-                       trace_rcu_exp_funnel_lock(rsp->name, rnp1->level,
-                                                 rnp1->grplo, rnp1->grphi,
-                                                 TPS("rel"));
-                       mutex_unlock(&rnp1->exp_funnel_mutex);
-               } else {
-                       trace_rcu_exp_funnel_lock(rsp->name,
-                                                 rdp->mynode->level + 1,
-                                                 rdp->cpu, rdp->cpu,
-                                                 TPS("rel"));
-                       mutex_unlock(&rdp->exp_funnel_mutex);
+       for (; rnp != NULL; rnp = rnp->parent) {
+               if (sync_exp_work_done(rsp, &rdp->exp_workdone1, s))
+                       return true;
+
+               /* Work not done, either wait here or go up. */
+               spin_lock(&rnp->exp_lock);
+               if (ULONG_CMP_GE(rnp->exp_seq_rq, s)) {
+
+                       /* Someone else doing GP, so wait for them. */
+                       spin_unlock(&rnp->exp_lock);
+                       trace_rcu_exp_funnel_lock(rsp->name, rnp->level,
+                                                 rnp->grplo, rnp->grphi,
+                                                 TPS("wait"));
+                       wait_event(rnp->exp_wq[(s >> 1) & 0x1],
+                                  sync_exp_work_done(rsp,
+                                                     &rdp->exp_workdone2, s));
+                       return true;
                }
-               rnp1 = rnp0;
+               rnp->exp_seq_rq = s; /* Followers can wait on us. */
+               spin_unlock(&rnp->exp_lock);
+               trace_rcu_exp_funnel_lock(rsp->name, rnp->level, rnp->grplo,
+                                         rnp->grphi, TPS("nxtlvl"));
        }
-       if (sync_exp_work_done(rsp, rnp1, rdp, &rdp->exp_workdone3, s))
-               return NULL;
-       return rnp1;
+       mutex_lock(&rsp->exp_mutex);
+       if (sync_exp_work_done(rsp, &rdp->exp_workdone3, s)) {
+               mutex_unlock(&rsp->exp_mutex);
+               return true;
+       }
+       return false;
 }
 
 /* Invoked on each online non-idle CPU for expedited quiescent state. */
@@ -3841,6 +3828,27 @@ static void synchronize_sched_expedited_wait(struct rcu_state *rsp)
        }
 }
 
+/*
+ * Wake up everyone who piggybacked on the just-completed expedited
+ * grace period.  Also update all the ->exp_seq_rq counters as needed
+ * in order to avoid counter-wrap problems.
+ */
+static void rcu_exp_wake(struct rcu_state *rsp, unsigned long s)
+{
+       struct rcu_node *rnp;
+
+       rcu_for_each_node_breadth_first(rsp, rnp) {
+               if (ULONG_CMP_LT(READ_ONCE(rnp->exp_seq_rq), s)) {
+                       spin_lock(&rnp->exp_lock);
+                       /* Recheck, avoid hang in case someone just arrived. */
+                       if (ULONG_CMP_LT(rnp->exp_seq_rq, s))
+                               rnp->exp_seq_rq = s;
+                       spin_unlock(&rnp->exp_lock);
+               }
+               wake_up_all(&rnp->exp_wq[(rsp->expedited_sequence >> 1) & 0x1]);
+       }
+}
+
 /**
  * synchronize_sched_expedited - Brute-force RCU-sched grace period
  *
@@ -3860,7 +3868,6 @@ static void synchronize_sched_expedited_wait(struct rcu_state *rsp)
 void synchronize_sched_expedited(void)
 {
        unsigned long s;
-       struct rcu_node *rnp;
        struct rcu_state *rsp = &rcu_sched_state;
 
        /* If only one CPU, this is automatically a grace period. */
@@ -3877,20 +3884,23 @@ void synchronize_sched_expedited(void)
        s = rcu_exp_gp_seq_snap(rsp);
        trace_rcu_exp_grace_period(rsp->name, s, TPS("snap"));
 
-       rnp = exp_funnel_lock(rsp, s);
-       if (rnp == NULL)
+       if (exp_funnel_lock(rsp, s))
                return;  /* Someone else did our work for us. */
 
        rcu_exp_gp_seq_start(rsp);
        trace_rcu_exp_grace_period(rsp->name, s, TPS("start"));
+
+       /* Initialize the rcu_node tree in preparation for the wait. */
        sync_rcu_exp_select_cpus(rsp, sync_sched_exp_handler);
-       synchronize_sched_expedited_wait(rsp);
 
+       /* Wait and clean up, including waking everyone. */
+       synchronize_sched_expedited_wait(rsp);
        rcu_exp_gp_seq_end(rsp);
        trace_rcu_exp_grace_period(rsp->name, s, TPS("end"));
-       trace_rcu_exp_funnel_lock(rsp->name, rnp->level,
-                                 rnp->grplo, rnp->grphi, TPS("rel"));
-       mutex_unlock(&rnp->exp_funnel_mutex);
+       rcu_exp_wake(rsp, s);
+
+       trace_rcu_exp_grace_period(rsp->name, s, TPS("endwake"));
+       mutex_unlock(&rsp->exp_mutex);
 }
 EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
 
@@ -4190,7 +4200,6 @@ rcu_boot_init_percpu_data(int cpu, struct rcu_state *rsp)
        WARN_ON_ONCE(atomic_read(&rdp->dynticks->dynticks) != 1);
        rdp->cpu = cpu;
        rdp->rsp = rsp;
-       mutex_init(&rdp->exp_funnel_mutex);
        rcu_boot_init_nocb_percpu_data(rdp);
        raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
 }
@@ -4448,10 +4457,8 @@ static void __init rcu_init_one(struct rcu_state *rsp)
 {
        static const char * const buf[] = RCU_NODE_NAME_INIT;
        static const char * const fqs[] = RCU_FQS_NAME_INIT;
-       static const char * const exp[] = RCU_EXP_NAME_INIT;
        static struct lock_class_key rcu_node_class[RCU_NUM_LVLS];
        static struct lock_class_key rcu_fqs_class[RCU_NUM_LVLS];
-       static struct lock_class_key rcu_exp_class[RCU_NUM_LVLS];
        static u8 fl_mask = 0x1;
 
        int levelcnt[RCU_NUM_LVLS];             /* # nodes in each level. */
@@ -4510,9 +4517,9 @@ static void __init rcu_init_one(struct rcu_state *rsp)
                        rnp->level = i;
                        INIT_LIST_HEAD(&rnp->blkd_tasks);
                        rcu_init_one_nocb(rnp);
-                       mutex_init(&rnp->exp_funnel_mutex);
-                       lockdep_set_class_and_name(&rnp->exp_funnel_mutex,
-                                                  &rcu_exp_class[i], exp[i]);
+                       init_waitqueue_head(&rnp->exp_wq[0]);
+                       init_waitqueue_head(&rnp->exp_wq[1]);
+                       spin_lock_init(&rnp->exp_lock);
                }
        }
 
index 6a8f09446924903ccb7cd023481e20244b996e39..f9d4fbb1e014eec3a9c9f71d5beb6f85261b7fef 100644 (file)
@@ -70,7 +70,6 @@
 #  define NUM_RCU_LVL_INIT    { NUM_RCU_LVL_0 }
 #  define RCU_NODE_NAME_INIT  { "rcu_node_0" }
 #  define RCU_FQS_NAME_INIT   { "rcu_node_fqs_0" }
-#  define RCU_EXP_NAME_INIT   { "rcu_node_exp_0" }
 #elif NR_CPUS <= RCU_FANOUT_2
 #  define RCU_NUM_LVLS       2
 #  define NUM_RCU_LVL_0              1
@@ -79,7 +78,6 @@
 #  define NUM_RCU_LVL_INIT    { NUM_RCU_LVL_0, NUM_RCU_LVL_1 }
 #  define RCU_NODE_NAME_INIT  { "rcu_node_0", "rcu_node_1" }
 #  define RCU_FQS_NAME_INIT   { "rcu_node_fqs_0", "rcu_node_fqs_1" }
-#  define RCU_EXP_NAME_INIT   { "rcu_node_exp_0", "rcu_node_exp_1" }
 #elif NR_CPUS <= RCU_FANOUT_3
 #  define RCU_NUM_LVLS       3
 #  define NUM_RCU_LVL_0              1
@@ -89,7 +87,6 @@
 #  define NUM_RCU_LVL_INIT    { NUM_RCU_LVL_0, NUM_RCU_LVL_1, NUM_RCU_LVL_2 }
 #  define RCU_NODE_NAME_INIT  { "rcu_node_0", "rcu_node_1", "rcu_node_2" }
 #  define RCU_FQS_NAME_INIT   { "rcu_node_fqs_0", "rcu_node_fqs_1", "rcu_node_fqs_2" }
-#  define RCU_EXP_NAME_INIT   { "rcu_node_exp_0", "rcu_node_exp_1", "rcu_node_exp_2" }
 #elif NR_CPUS <= RCU_FANOUT_4
 #  define RCU_NUM_LVLS       4
 #  define NUM_RCU_LVL_0              1
 #  define NUM_RCU_LVL_INIT    { NUM_RCU_LVL_0, NUM_RCU_LVL_1, NUM_RCU_LVL_2, NUM_RCU_LVL_3 }
 #  define RCU_NODE_NAME_INIT  { "rcu_node_0", "rcu_node_1", "rcu_node_2", "rcu_node_3" }
 #  define RCU_FQS_NAME_INIT   { "rcu_node_fqs_0", "rcu_node_fqs_1", "rcu_node_fqs_2", "rcu_node_fqs_3" }
-#  define RCU_EXP_NAME_INIT   { "rcu_node_exp_0", "rcu_node_exp_1", "rcu_node_exp_2", "rcu_node_exp_3" }
 #else
 # error "CONFIG_RCU_FANOUT insufficient for NR_CPUS"
 #endif /* #if (NR_CPUS) <= RCU_FANOUT_1 */
@@ -252,7 +248,9 @@ struct rcu_node {
                                /* Counts of upcoming no-CB GP requests. */
        raw_spinlock_t fqslock ____cacheline_internodealigned_in_smp;
 
-       struct mutex exp_funnel_mutex ____cacheline_internodealigned_in_smp;
+       spinlock_t exp_lock ____cacheline_internodealigned_in_smp;
+       unsigned long exp_seq_rq;
+       wait_queue_head_t exp_wq[2];
 } ____cacheline_internodealigned_in_smp;
 
 /*
@@ -387,7 +385,6 @@ struct rcu_data {
 #ifdef CONFIG_RCU_FAST_NO_HZ
        struct rcu_head oom_head;
 #endif /* #ifdef CONFIG_RCU_FAST_NO_HZ */
-       struct mutex exp_funnel_mutex;
        atomic_long_t exp_workdone1;    /* # done by others #1. */
        atomic_long_t exp_workdone2;    /* # done by others #2. */
        atomic_long_t exp_workdone3;    /* # done by others #3. */
@@ -504,6 +501,7 @@ struct rcu_state {
                                                /*  _rcu_barrier(). */
        /* End of fields guarded by barrier_mutex. */
 
+       struct mutex exp_mutex;                 /* Serialize expedited GP. */
        unsigned long expedited_sequence;       /* Take a ticket. */
        atomic_long_t expedited_normal;         /* # fallbacks to normal. */
        atomic_t expedited_need_qs;             /* # CPUs left to check in. */
index 36e94aed38a7c822d2f1d343b2d542851959e6fb..c82c3640493f3a4acf6a4ff10057d617cd71ff34 100644 (file)
@@ -738,8 +738,6 @@ static void sync_rcu_exp_handler(void *info)
  */
 void synchronize_rcu_expedited(void)
 {
-       struct rcu_node *rnp;
-       struct rcu_node *rnp_unlock;
        struct rcu_state *rsp = rcu_state_p;
        unsigned long s;
 
@@ -752,8 +750,7 @@ void synchronize_rcu_expedited(void)
        s = rcu_exp_gp_seq_snap(rsp);
        trace_rcu_exp_grace_period(rsp->name, s, TPS("snap"));
 
-       rnp_unlock = exp_funnel_lock(rsp, s);
-       if (rnp_unlock == NULL)
+       if (exp_funnel_lock(rsp, s))
                return;  /* Someone else did our work for us. */
 
        rcu_exp_gp_seq_start(rsp);
@@ -763,16 +760,13 @@ void synchronize_rcu_expedited(void)
        sync_rcu_exp_select_cpus(rsp, sync_rcu_exp_handler);
 
        /* Wait for snapshotted ->blkd_tasks lists to drain. */
-       rnp = rcu_get_root(rsp);
        synchronize_sched_expedited_wait(rsp);
-
-       /* Clean up and exit. */
        rcu_exp_gp_seq_end(rsp);
        trace_rcu_exp_grace_period(rsp->name, s, TPS("end"));
-       mutex_unlock(&rnp_unlock->exp_funnel_mutex);
-       trace_rcu_exp_funnel_lock(rsp->name, rnp_unlock->level,
-                                 rnp_unlock->grplo, rnp_unlock->grphi,
-                                 TPS("rel"));
+       rcu_exp_wake(rsp, s);
+
+       trace_rcu_exp_grace_period(rsp->name, s, TPS("endwake"));
+       mutex_unlock(&rsp->exp_mutex);
 }
 EXPORT_SYMBOL_GPL(synchronize_rcu_expedited);