rcu: Add callback-free CPUs
authorPaul E. McKenney <paul.mckenney@linaro.org>
Mon, 20 Aug 2012 04:35:53 +0000 (21:35 -0700)
committerPaul E. McKenney <paulmck@linux.vnet.ibm.com>
Fri, 16 Nov 2012 18:05:56 +0000 (10:05 -0800)
RCU callback execution can add significant OS jitter and also can
degrade both scheduling latency and, in asymmetric multiprocessors,
energy efficiency.  This commit therefore adds the ability for selected
CPUs ("rcu_nocbs=" boot parameter) to have their callbacks offloaded
to kthreads.  If the "rcu_nocb_poll" boot parameter is also specified,
these kthreads will do polling, removing the need for the offloaded
CPUs to do wakeups.  At least one CPU must be doing normal callback
processing: currently CPU 0 cannot be selected as a no-CBs CPU.
In addition, attempts to offline the last normal-CBs CPU will fail.

This feature was inspired by Jim Houston's and Joe Korty's JRCU, and
this commit includes fixes to problems located by Fengguang Wu's
kbuild test robot.

[ paulmck: Added gfp.h include file as suggested by Fengguang Wu. ]

Signed-off-by: Paul E. McKenney <paul.mckenney@linaro.org>
Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Documentation/kernel-parameters.txt
include/trace/events/rcu.h
init/Kconfig
kernel/rcutree.c
kernel/rcutree.h
kernel/rcutree_plugin.h
kernel/rcutree_trace.c

index 9776f068306b7f7ec6425ec4c02b17f5dac4788a..9d2e5cb3a95f636f3d16bc4f419b6f60e4b49904 100644 (file)
@@ -2394,6 +2394,27 @@ bytes respectively. Such letter suffixes can also be entirely omitted.
        ramdisk_size=   [RAM] Sizes of RAM disks in kilobytes
                        See Documentation/blockdev/ramdisk.txt.
 
+       rcu_nocbs=      [KNL,BOOT]
+                       In kernels built with CONFIG_RCU_NOCB_CPU=y, set
+                       the specified list of CPUs to be no-callback CPUs.
+                       Invocation of these CPUs' RCU callbacks will
+                       be offloaded to "rcuoN" kthreads created for
+                       that purpose.  This reduces OS jitter on the
+                       offloaded CPUs, which can be useful for HPC and
+                       real-time workloads.  It can also improve energy
+                       efficiency for asymmetric multiprocessors.
+
+       rcu_nocbs_poll  [KNL,BOOT]
+                       Rather than requiring that offloaded CPUs
+                       (specified by rcu_nocbs= above) explicitly
+                       awaken the corresponding "rcuoN" kthreads,
+                       make these kthreads poll for callbacks.
+                       This improves the real-time response for the
+                       offloaded CPUs by relieving them of the need to
+                       wake up the corresponding kthread, but degrades
+                       energy efficiency by requiring that the kthreads
+                       periodically wake up to do the polling.
+
        rcutree.blimit= [KNL,BOOT]
                        Set maximum number of finished RCU callbacks to process
                        in one batch.
index 5bde94d8585b77c71e957926ed9ca7b5b85bcc5c..d4f559b1ec3444eada48e8e9e1d8bd85f36da5b6 100644 (file)
@@ -549,6 +549,7 @@ TRACE_EVENT(rcu_torture_read,
  *     "EarlyExit": rcu_barrier_callback() piggybacked, thus early exit.
  *     "Inc1": rcu_barrier_callback() piggyback check counter incremented.
  *     "Offline": rcu_barrier_callback() found offline CPU
+ *     "OnlineNoCB": rcu_barrier_callback() found online no-CBs CPU.
  *     "OnlineQ": rcu_barrier_callback() found online CPU with callbacks.
  *     "OnlineNQ": rcu_barrier_callback() found online CPU, no callbacks.
  *     "IRQ": An rcu_barrier_callback() callback posted on remote CPU.
index ec62139207d3cf5930a23d4f0642309864291a7c..5ac6ee094225dc558365bbc80fd1739dec96bfc8 100644 (file)
@@ -654,6 +654,28 @@ config RCU_BOOST_DELAY
 
          Accept the default if unsure.
 
+config RCU_NOCB_CPU
+       bool "Offload RCU callback processing from boot-selected CPUs"
+       depends on TREE_RCU || TREE_PREEMPT_RCU
+       default n
+       help
+         Use this option to reduce OS jitter for aggressive HPC or
+         real-time workloads.  It can also be used to offload RCU
+         callback invocation to energy-efficient CPUs in battery-powered
+         asymmetric multiprocessors.
+
+         This option offloads callback invocation from the set of
+         CPUs specified at boot time by the rcu_nocbs parameter.
+         For each such CPU, a kthread ("rcuoN") will be created to
+         invoke callbacks, where the "N" is the CPU being offloaded.
+         Nothing prevents this kthread from running on the specified
+         CPUs, but (1) the kthreads may be preempted between each
+         callback, and (2) affinity or cgroups can be used to force
+         the kthreads to run on whatever set of CPUs is desired.
+
+         Say Y here if you want reduced OS jitter on selected CPUs.
+         Say N here if you are unsure.
+
 endmenu # "RCU Subsystem"
 
 config IKCONFIG
index 5ffadcc3bb26e6030580e6a9df23124af638e851..7733eb56e156a865efdc306a39ae31d09fdea4ba 100644 (file)
@@ -303,7 +303,8 @@ EXPORT_SYMBOL_GPL(rcu_sched_force_quiescent_state);
 static int
 cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 {
-       return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL];
+       return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
+              rdp->nxttail[RCU_DONE_TAIL] != NULL;
 }
 
 /*
@@ -312,8 +313,11 @@ cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
 static int
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-       return *rdp->nxttail[RCU_DONE_TAIL +
-                            (ACCESS_ONCE(rsp->completed) != rdp->completed)] &&
+       struct rcu_head **ntp;
+
+       ntp = rdp->nxttail[RCU_DONE_TAIL +
+                          (ACCESS_ONCE(rsp->completed) != rdp->completed)];
+       return rdp->nxttail[RCU_DONE_TAIL] && ntp && *ntp &&
               !rcu_gp_in_progress(rsp);
 }
 
@@ -1123,6 +1127,7 @@ static void init_callback_list(struct rcu_data *rdp)
        rdp->nxtlist = NULL;
        for (i = 0; i < RCU_NEXT_SIZE; i++)
                rdp->nxttail[i] = &rdp->nxtlist;
+       init_nocb_callback_list(rdp);
 }
 
 /*
@@ -1633,6 +1638,10 @@ static void
 rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
                          struct rcu_node *rnp, struct rcu_data *rdp)
 {
+       /* No-CBs CPUs do not have orphanable callbacks. */
+       if (is_nocb_cpu(rdp->cpu))
+               return;
+
        /*
         * Orphan the callbacks.  First adjust the counts.  This is safe
         * because _rcu_barrier() excludes CPU-hotplug operations, so it
@@ -1684,6 +1693,10 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
        int i;
        struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
 
+       /* No-CBs CPUs are handled specially. */
+       if (rcu_nocb_adopt_orphan_cbs(rsp, rdp))
+               return;
+
        /* Do the accounting first. */
        rdp->qlen_lazy += rsp->qlen_lazy;
        rdp->qlen += rsp->qlen;
@@ -2162,9 +2175,15 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
        }
 }
 
+/*
+ * Helper function for call_rcu() and friends.  The cpu argument will
+ * normally be -1, indicating "currently running CPU".  It may specify
+ * a CPU only if that CPU is a no-CBs CPU.  Currently, only _rcu_barrier()
+ * is expected to specify a CPU.
+ */
 static void
 __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
-          struct rcu_state *rsp, bool lazy)
+          struct rcu_state *rsp, int cpu, bool lazy)
 {
        unsigned long flags;
        struct rcu_data *rdp;
@@ -2184,9 +2203,14 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
        rdp = this_cpu_ptr(rsp->rda);
 
        /* Add the callback to our list. */
-       if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL)) {
+       if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+               int offline;
+
+               if (cpu != -1)
+                       rdp = per_cpu_ptr(rsp->rda, cpu);
+               offline = !__call_rcu_nocb(rdp, head, lazy);
+               WARN_ON_ONCE(offline);
                /* _call_rcu() is illegal on offline CPU; leak the callback. */
-               WARN_ON_ONCE(1);
                local_irq_restore(flags);
                return;
        }
@@ -2215,7 +2239,7 @@ __call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
  */
 void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_sched_state, 0);
+       __call_rcu(head, func, &rcu_sched_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_sched);
 
@@ -2224,7 +2248,7 @@ EXPORT_SYMBOL_GPL(call_rcu_sched);
  */
 void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_bh_state, 0);
+       __call_rcu(head, func, &rcu_bh_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu_bh);
 
@@ -2676,9 +2700,17 @@ static void _rcu_barrier(struct rcu_state *rsp)
         * When that callback is invoked, we will know that all of the
         * corresponding CPU's preceding callbacks have been invoked.
         */
-       for_each_online_cpu(cpu) {
+       for_each_possible_cpu(cpu) {
+               if (!cpu_online(cpu) && !is_nocb_cpu(cpu))
+                       continue;
                rdp = per_cpu_ptr(rsp->rda, cpu);
-               if (ACCESS_ONCE(rdp->qlen)) {
+               if (is_nocb_cpu(cpu)) {
+                       _rcu_barrier_trace(rsp, "OnlineNoCB", cpu,
+                                          rsp->n_barrier_done);
+                       atomic_inc(&rsp->barrier_cpu_count);
+                       __call_rcu(&rdp->barrier_head, rcu_barrier_callback,
+                                  rsp, cpu, 0);
+               } else if (ACCESS_ONCE(rdp->qlen)) {
                        _rcu_barrier_trace(rsp, "OnlineQ", cpu,
                                           rsp->n_barrier_done);
                        smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
@@ -2752,6 +2784,7 @@ rcu_boot_init_percpu_data(int cpu, struct rcu_state *rsp)
 #endif
        rdp->cpu = cpu;
        rdp->rsp = rsp;
+       rcu_boot_init_nocb_percpu_data(rdp);
        raw_spin_unlock_irqrestore(&rnp->lock, flags);
 }
 
@@ -2833,6 +2866,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
        struct rcu_data *rdp = per_cpu_ptr(rcu_state->rda, cpu);
        struct rcu_node *rnp = rdp->mynode;
        struct rcu_state *rsp;
+       int ret = NOTIFY_OK;
 
        trace_rcu_utilization("Start CPU hotplug");
        switch (action) {
@@ -2846,7 +2880,10 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
                rcu_boost_kthread_setaffinity(rnp, -1);
                break;
        case CPU_DOWN_PREPARE:
-               rcu_boost_kthread_setaffinity(rnp, cpu);
+               if (nocb_cpu_expendable(cpu))
+                       rcu_boost_kthread_setaffinity(rnp, cpu);
+               else
+                       ret = NOTIFY_BAD;
                break;
        case CPU_DYING:
        case CPU_DYING_FROZEN:
@@ -2870,7 +2907,7 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
                break;
        }
        trace_rcu_utilization("End CPU hotplug");
-       return NOTIFY_OK;
+       return ret;
 }
 
 /*
@@ -2890,6 +2927,7 @@ static int __init rcu_spawn_gp_kthread(void)
                raw_spin_lock_irqsave(&rnp->lock, flags);
                rsp->gp_kthread = t;
                raw_spin_unlock_irqrestore(&rnp->lock, flags);
+               rcu_spawn_nocb_kthreads(rsp);
        }
        return 0;
 }
@@ -3085,6 +3123,7 @@ void __init rcu_init(void)
        rcu_init_one(&rcu_sched_state, &rcu_sched_data);
        rcu_init_one(&rcu_bh_state, &rcu_bh_data);
        __rcu_init_preempt();
+       rcu_init_nocb();
         open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
 
        /*
index d274af3572103d93580d70c5c46d954b25bf5048..488f2ec6b663917f8570a4df82236133d44990d7 100644 (file)
@@ -317,6 +317,18 @@ struct rcu_data {
        struct rcu_head oom_head;
 #endif /* #ifdef CONFIG_RCU_FAST_NO_HZ */
 
+       /* 7) Callback offloading. */
+#ifdef CONFIG_RCU_NOCB_CPU
+       struct rcu_head *nocb_head;     /* CBs waiting for kthread. */
+       struct rcu_head **nocb_tail;
+       atomic_long_t nocb_q_count;     /* # CBs waiting for kthread */
+       atomic_long_t nocb_q_count_lazy; /*  (approximate). */
+       int nocb_p_count;               /* # CBs being invoked by kthread */
+       int nocb_p_count_lazy;          /*  (approximate). */
+       wait_queue_head_t nocb_wq;      /* For nocb kthreads to sleep on. */
+       struct task_struct *nocb_kthread;
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
+
        int cpu;
        struct rcu_state *rsp;
 };
@@ -369,6 +381,12 @@ struct rcu_state {
        struct rcu_data __percpu *rda;          /* pointer of percu rcu_data. */
        void (*call)(struct rcu_head *head,     /* call_rcu() flavor. */
                     void (*func)(struct rcu_head *head));
+#ifdef CONFIG_RCU_NOCB_CPU
+       void (*call_remote)(struct rcu_head *head,
+                    void (*func)(struct rcu_head *head));
+                                               /* call_rcu() flavor, but for */
+                                               /*  placing on remote CPU. */
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 
        /* The following fields are guarded by the root rcu_node's lock. */
 
@@ -439,6 +457,8 @@ struct rcu_state {
 #define RCU_GP_FLAG_FQS  0x2   /* Need grace-period quiescent-state forcing. */
 
 extern struct list_head rcu_struct_flavors;
+
+/* Sequence through rcu_state structures for each RCU flavor. */
 #define for_each_rcu_flavor(rsp) \
        list_for_each_entry((rsp), &rcu_struct_flavors, flavors)
 
@@ -515,5 +535,32 @@ static void print_cpu_stall_info(struct rcu_state *rsp, int cpu);
 static void print_cpu_stall_info_end(void);
 static void zero_cpu_stall_ticks(struct rcu_data *rdp);
 static void increment_cpu_stall_ticks(void);
+static bool is_nocb_cpu(int cpu);
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+                           bool lazy);
+static bool rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+                                     struct rcu_data *rdp);
+static bool nocb_cpu_expendable(int cpu);
+static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp);
+static void rcu_spawn_nocb_kthreads(struct rcu_state *rsp);
+static void init_nocb_callback_list(struct rcu_data *rdp);
+static void __init rcu_init_nocb(void);
 
 #endif /* #ifndef RCU_TREE_NONCORE */
+
+#ifdef CONFIG_RCU_TRACE
+#ifdef CONFIG_RCU_NOCB_CPU
+/* Sum up queue lengths for tracing. */
+static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
+{
+       *ql = atomic_long_read(&rdp->nocb_q_count) + rdp->nocb_p_count;
+       *qll = atomic_long_read(&rdp->nocb_q_count_lazy) + rdp->nocb_p_count_lazy;
+}
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
+{
+       *ql = 0;
+       *qll = 0;
+}
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
+#endif /* #ifdef CONFIG_RCU_TRACE */
index 5ce3352505e94fc222136e47a7443e4bbe351af7..6cdc372de34c41e181b16d9c48cdaa6745840f9b 100644 (file)
@@ -25,6 +25,7 @@
  */
 
 #include <linux/delay.h>
+#include <linux/gfp.h>
 #include <linux/oom.h>
 #include <linux/smpboot.h>
 
 #define RCU_BOOST_PRIO RCU_KTHREAD_PRIO
 #endif
 
+#ifdef CONFIG_RCU_NOCB_CPU
+static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
+static bool have_rcu_nocb_mask;            /* Was rcu_nocb_mask allocated? */
+static bool rcu_nocb_poll;         /* Offload kthread are to poll. */
+module_param(rcu_nocb_poll, bool, 0444);
+static char __initdata nocb_buf[NR_CPUS * 5];
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
+
 /*
  * Check the RCU kernel configuration parameters and print informative
  * messages about anything out of the ordinary.  If you like #ifdef, you
@@ -76,6 +85,18 @@ static void __init rcu_bootup_announce_oddness(void)
                printk(KERN_INFO "\tExperimental boot-time adjustment of leaf fanout to %d.\n", rcu_fanout_leaf);
        if (nr_cpu_ids != NR_CPUS)
                printk(KERN_INFO "\tRCU restricting CPUs from NR_CPUS=%d to nr_cpu_ids=%d.\n", NR_CPUS, nr_cpu_ids);
+#ifdef CONFIG_RCU_NOCB_CPU
+       if (have_rcu_nocb_mask) {
+               if (cpumask_test_cpu(0, rcu_nocb_mask)) {
+                       cpumask_clear_cpu(0, rcu_nocb_mask);
+                       pr_info("\tCPU 0: illegal no-CBs CPU (cleared).\n");
+               }
+               cpulist_scnprintf(nocb_buf, sizeof(nocb_buf), rcu_nocb_mask);
+               pr_info("\tExperimental no-CBs CPUs: %s.\n", nocb_buf);
+               if (rcu_nocb_poll)
+                       pr_info("\tExperimental polled no-CBs CPUs.\n");
+       }
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 }
 
 #ifdef CONFIG_TREE_PREEMPT_RCU
@@ -642,7 +663,7 @@ static void rcu_preempt_do_callbacks(void)
  */
 void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_preempt_state, 0);
+       __call_rcu(head, func, &rcu_preempt_state, -1, 0);
 }
 EXPORT_SYMBOL_GPL(call_rcu);
 
@@ -656,7 +677,7 @@ EXPORT_SYMBOL_GPL(call_rcu);
 void kfree_call_rcu(struct rcu_head *head,
                    void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_preempt_state, 1);
+       __call_rcu(head, func, &rcu_preempt_state, -1, 1);
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
@@ -1025,7 +1046,7 @@ static void rcu_preempt_check_callbacks(int cpu)
 void kfree_call_rcu(struct rcu_head *head,
                    void (*func)(struct rcu_head *rcu))
 {
-       __call_rcu(head, func, &rcu_sched_state, 1);
+       __call_rcu(head, func, &rcu_sched_state, -1, 1);
 }
 EXPORT_SYMBOL_GPL(kfree_call_rcu);
 
@@ -2104,3 +2125,373 @@ static void increment_cpu_stall_ticks(void)
 }
 
 #endif /* #else #ifdef CONFIG_RCU_CPU_STALL_INFO */
+
+#ifdef CONFIG_RCU_NOCB_CPU
+
+/*
+ * Offload callback processing from the boot-time-specified set of CPUs
+ * specified by rcu_nocb_mask.  For each CPU in the set, there is a
+ * kthread created that pulls the callbacks from the corresponding CPU,
+ * waits for a grace period to elapse, and invokes the callbacks.
+ * The no-CBs CPUs do a wake_up() on their kthread when they insert
+ * a callback into any empty list, unless the rcu_nocb_poll boot parameter
+ * has been specified, in which case each kthread actively polls its
+ * CPU.  (Which isn't so great for energy efficiency, but which does
+ * reduce RCU's overhead on that CPU.)
+ *
+ * This is intended to be used in conjunction with Frederic Weisbecker's
+ * adaptive-idle work, which would seriously reduce OS jitter on CPUs
+ * running CPU-bound user-mode computations.
+ *
+ * Offloading of callback processing could also in theory be used as
+ * an energy-efficiency measure because CPUs with no RCU callbacks
+ * queued are more aggressive about entering dyntick-idle mode.
+ */
+
+
+/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */
+static int __init rcu_nocb_setup(char *str)
+{
+       alloc_bootmem_cpumask_var(&rcu_nocb_mask);
+       have_rcu_nocb_mask = true;
+       cpulist_parse(str, rcu_nocb_mask);
+       return 1;
+}
+__setup("rcu_nocbs=", rcu_nocb_setup);
+
+/* Is the specified CPU a no-CPUs CPU? */
+static bool is_nocb_cpu(int cpu)
+{
+       if (have_rcu_nocb_mask)
+               return cpumask_test_cpu(cpu, rcu_nocb_mask);
+       return false;
+}
+
+/*
+ * Enqueue the specified string of rcu_head structures onto the specified
+ * CPU's no-CBs lists.  The CPU is specified by rdp, the head of the
+ * string by rhp, and the tail of the string by rhtp.  The non-lazy/lazy
+ * counts are supplied by rhcount and rhcount_lazy.
+ *
+ * If warranted, also wake up the kthread servicing this CPUs queues.
+ */
+static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
+                                   struct rcu_head *rhp,
+                                   struct rcu_head **rhtp,
+                                   int rhcount, int rhcount_lazy)
+{
+       int len;
+       struct rcu_head **old_rhpp;
+       struct task_struct *t;
+
+       /* Enqueue the callback on the nocb list and update counts. */
+       old_rhpp = xchg(&rdp->nocb_tail, rhtp);
+       ACCESS_ONCE(*old_rhpp) = rhp;
+       atomic_long_add(rhcount, &rdp->nocb_q_count);
+       atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
+
+       /* If we are not being polled and there is a kthread, awaken it ... */
+       t = ACCESS_ONCE(rdp->nocb_kthread);
+       if (rcu_nocb_poll | !t)
+               return;
+       len = atomic_long_read(&rdp->nocb_q_count);
+       if (old_rhpp == &rdp->nocb_head) {
+               wake_up(&rdp->nocb_wq); /* ... only if queue was empty ... */
+               rdp->qlen_last_fqs_check = 0;
+       } else if (len > rdp->qlen_last_fqs_check + qhimark) {
+               wake_up_process(t); /* ... or if many callbacks queued. */
+               rdp->qlen_last_fqs_check = LONG_MAX / 2;
+       }
+       return;
+}
+
+/*
+ * This is a helper for __call_rcu(), which invokes this when the normal
+ * callback queue is inoperable.  If this is not a no-CBs CPU, this
+ * function returns failure back to __call_rcu(), which can complain
+ * appropriately.
+ *
+ * Otherwise, this function queues the callback where the corresponding
+ * "rcuo" kthread can find it.
+ */
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+                           bool lazy)
+{
+
+       if (!is_nocb_cpu(rdp->cpu))
+               return 0;
+       __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy);
+       return 1;
+}
+
+/*
+ * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is
+ * not a no-CBs CPU.
+ */
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+                                                    struct rcu_data *rdp)
+{
+       long ql = rsp->qlen;
+       long qll = rsp->qlen_lazy;
+
+       /* If this is not a no-CBs CPU, tell the caller to do it the old way. */
+       if (!is_nocb_cpu(smp_processor_id()))
+               return 0;
+       rsp->qlen = 0;
+       rsp->qlen_lazy = 0;
+
+       /* First, enqueue the donelist, if any.  This preserves CB ordering. */
+       if (rsp->orphan_donelist != NULL) {
+               __call_rcu_nocb_enqueue(rdp, rsp->orphan_donelist,
+                                       rsp->orphan_donetail, ql, qll);
+               ql = qll = 0;
+               rsp->orphan_donelist = NULL;
+               rsp->orphan_donetail = &rsp->orphan_donelist;
+       }
+       if (rsp->orphan_nxtlist != NULL) {
+               __call_rcu_nocb_enqueue(rdp, rsp->orphan_nxtlist,
+                                       rsp->orphan_nxttail, ql, qll);
+               ql = qll = 0;
+               rsp->orphan_nxtlist = NULL;
+               rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+       }
+       return 1;
+}
+
+/*
+ * There must be at least one non-no-CBs CPU in operation at any given
+ * time, because no-CBs CPUs are not capable of initiating grace periods
+ * independently.  This function therefore complains if the specified
+ * CPU is the last non-no-CBs CPU, allowing the CPU-hotplug system to
+ * avoid offlining the last such CPU.  (Recursion is a wonderful thing,
+ * but you have to have a base case!)
+ */
+static bool nocb_cpu_expendable(int cpu)
+{
+       cpumask_var_t non_nocb_cpus;
+       int ret;
+
+       /*
+        * If there are no no-CB CPUs or if this CPU is not a no-CB CPU,
+        * then offlining this CPU is harmless.  Let it happen.
+        */
+       if (!have_rcu_nocb_mask || is_nocb_cpu(cpu))
+               return 1;
+
+       /* If no memory, play it safe and keep the CPU around. */
+       if (!alloc_cpumask_var(&non_nocb_cpus, GFP_NOIO))
+               return 0;
+       cpumask_andnot(non_nocb_cpus, cpu_online_mask, rcu_nocb_mask);
+       cpumask_clear_cpu(cpu, non_nocb_cpus);
+       ret = !cpumask_empty(non_nocb_cpus);
+       free_cpumask_var(non_nocb_cpus);
+       return ret;
+}
+
+/*
+ * Helper structure for remote registry of RCU callbacks.
+ * This is needed for when a no-CBs CPU needs to start a grace period.
+ * If it just invokes call_rcu(), the resulting callback will be queued,
+ * which can result in deadlock.
+ */
+struct rcu_head_remote {
+       struct rcu_head *rhp;
+       call_rcu_func_t *crf;
+       void (*func)(struct rcu_head *rhp);
+};
+
+/*
+ * Register a callback as specified by the rcu_head_remote struct.
+ * This function is intended to be invoked via smp_call_function_single().
+ */
+static void call_rcu_local(void *arg)
+{
+       struct rcu_head_remote *rhrp =
+               container_of(arg, struct rcu_head_remote, rhp);
+
+       rhrp->crf(rhrp->rhp, rhrp->func);
+}
+
+/*
+ * Set up an rcu_head_remote structure and the invoke call_rcu_local()
+ * on CPU 0 (which is guaranteed to be a non-no-CBs CPU) via
+ * smp_call_function_single().
+ */
+static void invoke_crf_remote(struct rcu_head *rhp,
+                             void (*func)(struct rcu_head *rhp),
+                             call_rcu_func_t crf)
+{
+       struct rcu_head_remote rhr;
+
+       rhr.rhp = rhp;
+       rhr.crf = crf;
+       rhr.func = func;
+       smp_call_function_single(0, call_rcu_local, &rhr, 1);
+}
+
+/*
+ * Helper functions to be passed to wait_rcu_gp(), each of which
+ * invokes invoke_crf_remote() to register a callback appropriately.
+ */
+static void __maybe_unused
+call_rcu_preempt_remote(struct rcu_head *rhp,
+                       void (*func)(struct rcu_head *rhp))
+{
+       invoke_crf_remote(rhp, func, call_rcu);
+}
+static void call_rcu_bh_remote(struct rcu_head *rhp,
+                              void (*func)(struct rcu_head *rhp))
+{
+       invoke_crf_remote(rhp, func, call_rcu_bh);
+}
+static void call_rcu_sched_remote(struct rcu_head *rhp,
+                                 void (*func)(struct rcu_head *rhp))
+{
+       invoke_crf_remote(rhp, func, call_rcu_sched);
+}
+
+/*
+ * Per-rcu_data kthread, but only for no-CBs CPUs.  Each kthread invokes
+ * callbacks queued by the corresponding no-CBs CPU.
+ */
+static int rcu_nocb_kthread(void *arg)
+{
+       int c, cl;
+       struct rcu_head *list;
+       struct rcu_head *next;
+       struct rcu_head **tail;
+       struct rcu_data *rdp = arg;
+
+       /* Each pass through this loop invokes one batch of callbacks */
+       for (;;) {
+               /* If not polling, wait for next batch of callbacks. */
+               if (!rcu_nocb_poll)
+                       wait_event(rdp->nocb_wq, rdp->nocb_head);
+               list = ACCESS_ONCE(rdp->nocb_head);
+               if (!list) {
+                       schedule_timeout_interruptible(1);
+                       continue;
+               }
+
+               /*
+                * Extract queued callbacks, update counts, and wait
+                * for a grace period to elapse.
+                */
+               ACCESS_ONCE(rdp->nocb_head) = NULL;
+               tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
+               c = atomic_long_xchg(&rdp->nocb_q_count, 0);
+               cl = atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
+               ACCESS_ONCE(rdp->nocb_p_count) += c;
+               ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
+               wait_rcu_gp(rdp->rsp->call_remote);
+
+               /* Each pass through the following loop invokes a callback. */
+               trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
+               c = cl = 0;
+               while (list) {
+                       next = list->next;
+                       /* Wait for enqueuing to complete, if needed. */
+                       while (next == NULL && &list->next != tail) {
+                               schedule_timeout_interruptible(1);
+                               next = list->next;
+                       }
+                       debug_rcu_head_unqueue(list);
+                       local_bh_disable();
+                       if (__rcu_reclaim(rdp->rsp->name, list))
+                               cl++;
+                       c++;
+                       local_bh_enable();
+                       list = next;
+               }
+               trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
+               ACCESS_ONCE(rdp->nocb_p_count) -= c;
+               ACCESS_ONCE(rdp->nocb_p_count_lazy) -= cl;
+               rdp->n_cbs_invoked += c;
+       }
+       return 0;
+}
+
+/* Initialize per-rcu_data variables for no-CBs CPUs. */
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+       rdp->nocb_tail = &rdp->nocb_head;
+       init_waitqueue_head(&rdp->nocb_wq);
+}
+
+/* Create a kthread for each RCU flavor for each no-CBs CPU. */
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+       int cpu;
+       struct rcu_data *rdp;
+       struct task_struct *t;
+
+       if (rcu_nocb_mask == NULL)
+               return;
+       for_each_cpu(cpu, rcu_nocb_mask) {
+               rdp = per_cpu_ptr(rsp->rda, cpu);
+               t = kthread_run(rcu_nocb_kthread, rdp, "rcuo%d", cpu);
+               BUG_ON(IS_ERR(t));
+               ACCESS_ONCE(rdp->nocb_kthread) = t;
+       }
+}
+
+/* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+       if (rcu_nocb_mask == NULL ||
+           !cpumask_test_cpu(rdp->cpu, rcu_nocb_mask))
+               return;
+       rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+}
+
+/* Initialize the ->call_remote fields in the rcu_state structures. */
+static void __init rcu_init_nocb(void)
+{
+#ifdef CONFIG_PREEMPT_RCU
+       rcu_preempt_state.call_remote = call_rcu_preempt_remote;
+#endif /* #ifdef CONFIG_PREEMPT_RCU */
+       rcu_bh_state.call_remote = call_rcu_bh_remote;
+       rcu_sched_state.call_remote = call_rcu_sched_remote;
+}
+
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+
+static bool is_nocb_cpu(int cpu)
+{
+       return false;
+}
+
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+                           bool lazy)
+{
+       return 0;
+}
+
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+                                                    struct rcu_data *rdp)
+{
+       return 0;
+}
+
+static bool nocb_cpu_expendable(int cpu)
+{
+       return 1;
+}
+
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+}
+
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_init_nocb(void)
+{
+}
+
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
index f9512687a6e529d0048e143ea43ff1e7960611d7..3189f9aa3e84a2fa6ba7f0bae95138c3daf50ac4 100644 (file)
@@ -113,6 +113,8 @@ static char convert_kthread_status(unsigned int kthread_status)
 
 static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp)
 {
+       long ql, qll;
+
        if (!rdp->beenonline)
                return;
        seq_printf(m, "%3d%cc=%ld g=%ld pq=%d qp=%d",
@@ -126,8 +128,11 @@ static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp)
                   rdp->dynticks->dynticks_nmi_nesting,
                   rdp->dynticks_fqs);
        seq_printf(m, " of=%lu", rdp->offline_fqs);
+       rcu_nocb_q_lengths(rdp, &ql, &qll);
+       qll += rdp->qlen_lazy;
+       ql += rdp->qlen;
        seq_printf(m, " ql=%ld/%ld qs=%c%c%c%c",
-                  rdp->qlen_lazy, rdp->qlen,
+                  qll, ql,
                   ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] !=
                        rdp->nxttail[RCU_NEXT_TAIL]],
                   ".R"[rdp->nxttail[RCU_WAIT_TAIL] !=