static int
cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
{
- return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL];
+ return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
+ rdp->nxttail[RCU_DONE_TAIL] != NULL;
}
/*
static int
cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
{
- return *rdp->nxttail[RCU_DONE_TAIL +
- (ACCESS_ONCE(rsp->completed) != rdp->completed)] &&
+ struct rcu_head **ntp;
+
+ ntp = rdp->nxttail[RCU_DONE_TAIL +
+ (ACCESS_ONCE(rsp->completed) != rdp->completed)];
+ return rdp->nxttail[RCU_DONE_TAIL] && ntp && *ntp &&
!rcu_gp_in_progress(rsp);
}
rdp->nxtlist = NULL;
for (i = 0; i < RCU_NEXT_SIZE; i++)
rdp->nxttail[i] = &rdp->nxtlist;
+ init_nocb_callback_list(rdp);
}
/*
rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
struct rcu_node *rnp, struct rcu_data *rdp)
{
+ /* No-CBs CPUs do not have orphanable callbacks. */
+ if (is_nocb_cpu(rdp->cpu))
+ return;
+
/*
* Orphan the callbacks. First adjust the counts. This is safe
* because _rcu_barrier() excludes CPU-hotplug operations, so it
int i;
struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
+ /* No-CBs CPUs are handled specially. */
+ if (rcu_nocb_adopt_orphan_cbs(rsp, rdp))
+ return;
+
/* Do the accounting first. */
rdp->qlen_lazy += rsp->qlen_lazy;
rdp->qlen += rsp->qlen;
}
}
+/*
+ * Helper function for call_rcu() and friends. The cpu argument will
+ * normally be -1, indicating "currently running CPU". It may specify
+ * a CPU only if that CPU is a no-CBs CPU. Currently, only _rcu_barrier()
+ * is expected to specify a CPU.
+ */
static void
__call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
- struct rcu_state *rsp, bool lazy)
+ struct rcu_state *rsp, int cpu, bool lazy)
{
unsigned long flags;
struct rcu_data *rdp;
rdp = this_cpu_ptr(rsp->rda);
/* Add the callback to our list. */
- if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL)) {
+ if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+ int offline;
+
+ if (cpu != -1)
+ rdp = per_cpu_ptr(rsp->rda, cpu);
+ offline = !__call_rcu_nocb(rdp, head, lazy);
+ WARN_ON_ONCE(offline);
/* _call_rcu() is illegal on offline CPU; leak the callback. */
- WARN_ON_ONCE(1);
local_irq_restore(flags);
return;
}
*/
void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
{
- __call_rcu(head, func, &rcu_sched_state, 0);
+ __call_rcu(head, func, &rcu_sched_state, -1, 0);
}
EXPORT_SYMBOL_GPL(call_rcu_sched);
*/
void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
{
- __call_rcu(head, func, &rcu_bh_state, 0);
+ __call_rcu(head, func, &rcu_bh_state, -1, 0);
}
EXPORT_SYMBOL_GPL(call_rcu_bh);
* When that callback is invoked, we will know that all of the
* corresponding CPU's preceding callbacks have been invoked.
*/
- for_each_online_cpu(cpu) {
+ for_each_possible_cpu(cpu) {
+ if (!cpu_online(cpu) && !is_nocb_cpu(cpu))
+ continue;
rdp = per_cpu_ptr(rsp->rda, cpu);
- if (ACCESS_ONCE(rdp->qlen)) {
+ if (is_nocb_cpu(cpu)) {
+ _rcu_barrier_trace(rsp, "OnlineNoCB", cpu,
+ rsp->n_barrier_done);
+ atomic_inc(&rsp->barrier_cpu_count);
+ __call_rcu(&rdp->barrier_head, rcu_barrier_callback,
+ rsp, cpu, 0);
+ } else if (ACCESS_ONCE(rdp->qlen)) {
_rcu_barrier_trace(rsp, "OnlineQ", cpu,
rsp->n_barrier_done);
smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
#endif
rdp->cpu = cpu;
rdp->rsp = rsp;
+ rcu_boot_init_nocb_percpu_data(rdp);
raw_spin_unlock_irqrestore(&rnp->lock, flags);
}
struct rcu_data *rdp = per_cpu_ptr(rcu_state->rda, cpu);
struct rcu_node *rnp = rdp->mynode;
struct rcu_state *rsp;
+ int ret = NOTIFY_OK;
trace_rcu_utilization("Start CPU hotplug");
switch (action) {
rcu_boost_kthread_setaffinity(rnp, -1);
break;
case CPU_DOWN_PREPARE:
- rcu_boost_kthread_setaffinity(rnp, cpu);
+ if (nocb_cpu_expendable(cpu))
+ rcu_boost_kthread_setaffinity(rnp, cpu);
+ else
+ ret = NOTIFY_BAD;
break;
case CPU_DYING:
case CPU_DYING_FROZEN:
break;
}
trace_rcu_utilization("End CPU hotplug");
- return NOTIFY_OK;
+ return ret;
}
/*
raw_spin_lock_irqsave(&rnp->lock, flags);
rsp->gp_kthread = t;
raw_spin_unlock_irqrestore(&rnp->lock, flags);
+ rcu_spawn_nocb_kthreads(rsp);
}
return 0;
}
rcu_init_one(&rcu_sched_state, &rcu_sched_data);
rcu_init_one(&rcu_bh_state, &rcu_bh_data);
__rcu_init_preempt();
+ rcu_init_nocb();
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
/*
*/
#include <linux/delay.h>
+#include <linux/gfp.h>
#include <linux/oom.h>
#include <linux/smpboot.h>
#define RCU_BOOST_PRIO RCU_KTHREAD_PRIO
#endif
+#ifdef CONFIG_RCU_NOCB_CPU
+static cpumask_var_t rcu_nocb_mask; /* CPUs to have callbacks offloaded. */
+static bool have_rcu_nocb_mask; /* Was rcu_nocb_mask allocated? */
+static bool rcu_nocb_poll; /* Offload kthread are to poll. */
+module_param(rcu_nocb_poll, bool, 0444);
+static char __initdata nocb_buf[NR_CPUS * 5];
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
+
/*
* Check the RCU kernel configuration parameters and print informative
* messages about anything out of the ordinary. If you like #ifdef, you
printk(KERN_INFO "\tExperimental boot-time adjustment of leaf fanout to %d.\n", rcu_fanout_leaf);
if (nr_cpu_ids != NR_CPUS)
printk(KERN_INFO "\tRCU restricting CPUs from NR_CPUS=%d to nr_cpu_ids=%d.\n", NR_CPUS, nr_cpu_ids);
+#ifdef CONFIG_RCU_NOCB_CPU
+ if (have_rcu_nocb_mask) {
+ if (cpumask_test_cpu(0, rcu_nocb_mask)) {
+ cpumask_clear_cpu(0, rcu_nocb_mask);
+ pr_info("\tCPU 0: illegal no-CBs CPU (cleared).\n");
+ }
+ cpulist_scnprintf(nocb_buf, sizeof(nocb_buf), rcu_nocb_mask);
+ pr_info("\tExperimental no-CBs CPUs: %s.\n", nocb_buf);
+ if (rcu_nocb_poll)
+ pr_info("\tExperimental polled no-CBs CPUs.\n");
+ }
+#endif /* #ifdef CONFIG_RCU_NOCB_CPU */
}
#ifdef CONFIG_TREE_PREEMPT_RCU
*/
void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
{
- __call_rcu(head, func, &rcu_preempt_state, 0);
+ __call_rcu(head, func, &rcu_preempt_state, -1, 0);
}
EXPORT_SYMBOL_GPL(call_rcu);
void kfree_call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *rcu))
{
- __call_rcu(head, func, &rcu_preempt_state, 1);
+ __call_rcu(head, func, &rcu_preempt_state, -1, 1);
}
EXPORT_SYMBOL_GPL(kfree_call_rcu);
void kfree_call_rcu(struct rcu_head *head,
void (*func)(struct rcu_head *rcu))
{
- __call_rcu(head, func, &rcu_sched_state, 1);
+ __call_rcu(head, func, &rcu_sched_state, -1, 1);
}
EXPORT_SYMBOL_GPL(kfree_call_rcu);
}
#endif /* #else #ifdef CONFIG_RCU_CPU_STALL_INFO */
+
+#ifdef CONFIG_RCU_NOCB_CPU
+
+/*
+ * Offload callback processing from the boot-time-specified set of CPUs
+ * specified by rcu_nocb_mask. For each CPU in the set, there is a
+ * kthread created that pulls the callbacks from the corresponding CPU,
+ * waits for a grace period to elapse, and invokes the callbacks.
+ * The no-CBs CPUs do a wake_up() on their kthread when they insert
+ * a callback into any empty list, unless the rcu_nocb_poll boot parameter
+ * has been specified, in which case each kthread actively polls its
+ * CPU. (Which isn't so great for energy efficiency, but which does
+ * reduce RCU's overhead on that CPU.)
+ *
+ * This is intended to be used in conjunction with Frederic Weisbecker's
+ * adaptive-idle work, which would seriously reduce OS jitter on CPUs
+ * running CPU-bound user-mode computations.
+ *
+ * Offloading of callback processing could also in theory be used as
+ * an energy-efficiency measure because CPUs with no RCU callbacks
+ * queued are more aggressive about entering dyntick-idle mode.
+ */
+
+
+/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */
+static int __init rcu_nocb_setup(char *str)
+{
+ alloc_bootmem_cpumask_var(&rcu_nocb_mask);
+ have_rcu_nocb_mask = true;
+ cpulist_parse(str, rcu_nocb_mask);
+ return 1;
+}
+__setup("rcu_nocbs=", rcu_nocb_setup);
+
+/* Is the specified CPU a no-CPUs CPU? */
+static bool is_nocb_cpu(int cpu)
+{
+ if (have_rcu_nocb_mask)
+ return cpumask_test_cpu(cpu, rcu_nocb_mask);
+ return false;
+}
+
+/*
+ * Enqueue the specified string of rcu_head structures onto the specified
+ * CPU's no-CBs lists. The CPU is specified by rdp, the head of the
+ * string by rhp, and the tail of the string by rhtp. The non-lazy/lazy
+ * counts are supplied by rhcount and rhcount_lazy.
+ *
+ * If warranted, also wake up the kthread servicing this CPUs queues.
+ */
+static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
+ struct rcu_head *rhp,
+ struct rcu_head **rhtp,
+ int rhcount, int rhcount_lazy)
+{
+ int len;
+ struct rcu_head **old_rhpp;
+ struct task_struct *t;
+
+ /* Enqueue the callback on the nocb list and update counts. */
+ old_rhpp = xchg(&rdp->nocb_tail, rhtp);
+ ACCESS_ONCE(*old_rhpp) = rhp;
+ atomic_long_add(rhcount, &rdp->nocb_q_count);
+ atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
+
+ /* If we are not being polled and there is a kthread, awaken it ... */
+ t = ACCESS_ONCE(rdp->nocb_kthread);
+ if (rcu_nocb_poll | !t)
+ return;
+ len = atomic_long_read(&rdp->nocb_q_count);
+ if (old_rhpp == &rdp->nocb_head) {
+ wake_up(&rdp->nocb_wq); /* ... only if queue was empty ... */
+ rdp->qlen_last_fqs_check = 0;
+ } else if (len > rdp->qlen_last_fqs_check + qhimark) {
+ wake_up_process(t); /* ... or if many callbacks queued. */
+ rdp->qlen_last_fqs_check = LONG_MAX / 2;
+ }
+ return;
+}
+
+/*
+ * This is a helper for __call_rcu(), which invokes this when the normal
+ * callback queue is inoperable. If this is not a no-CBs CPU, this
+ * function returns failure back to __call_rcu(), which can complain
+ * appropriately.
+ *
+ * Otherwise, this function queues the callback where the corresponding
+ * "rcuo" kthread can find it.
+ */
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+ bool lazy)
+{
+
+ if (!is_nocb_cpu(rdp->cpu))
+ return 0;
+ __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy);
+ return 1;
+}
+
+/*
+ * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is
+ * not a no-CBs CPU.
+ */
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+ struct rcu_data *rdp)
+{
+ long ql = rsp->qlen;
+ long qll = rsp->qlen_lazy;
+
+ /* If this is not a no-CBs CPU, tell the caller to do it the old way. */
+ if (!is_nocb_cpu(smp_processor_id()))
+ return 0;
+ rsp->qlen = 0;
+ rsp->qlen_lazy = 0;
+
+ /* First, enqueue the donelist, if any. This preserves CB ordering. */
+ if (rsp->orphan_donelist != NULL) {
+ __call_rcu_nocb_enqueue(rdp, rsp->orphan_donelist,
+ rsp->orphan_donetail, ql, qll);
+ ql = qll = 0;
+ rsp->orphan_donelist = NULL;
+ rsp->orphan_donetail = &rsp->orphan_donelist;
+ }
+ if (rsp->orphan_nxtlist != NULL) {
+ __call_rcu_nocb_enqueue(rdp, rsp->orphan_nxtlist,
+ rsp->orphan_nxttail, ql, qll);
+ ql = qll = 0;
+ rsp->orphan_nxtlist = NULL;
+ rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+ }
+ return 1;
+}
+
+/*
+ * There must be at least one non-no-CBs CPU in operation at any given
+ * time, because no-CBs CPUs are not capable of initiating grace periods
+ * independently. This function therefore complains if the specified
+ * CPU is the last non-no-CBs CPU, allowing the CPU-hotplug system to
+ * avoid offlining the last such CPU. (Recursion is a wonderful thing,
+ * but you have to have a base case!)
+ */
+static bool nocb_cpu_expendable(int cpu)
+{
+ cpumask_var_t non_nocb_cpus;
+ int ret;
+
+ /*
+ * If there are no no-CB CPUs or if this CPU is not a no-CB CPU,
+ * then offlining this CPU is harmless. Let it happen.
+ */
+ if (!have_rcu_nocb_mask || is_nocb_cpu(cpu))
+ return 1;
+
+ /* If no memory, play it safe and keep the CPU around. */
+ if (!alloc_cpumask_var(&non_nocb_cpus, GFP_NOIO))
+ return 0;
+ cpumask_andnot(non_nocb_cpus, cpu_online_mask, rcu_nocb_mask);
+ cpumask_clear_cpu(cpu, non_nocb_cpus);
+ ret = !cpumask_empty(non_nocb_cpus);
+ free_cpumask_var(non_nocb_cpus);
+ return ret;
+}
+
+/*
+ * Helper structure for remote registry of RCU callbacks.
+ * This is needed for when a no-CBs CPU needs to start a grace period.
+ * If it just invokes call_rcu(), the resulting callback will be queued,
+ * which can result in deadlock.
+ */
+struct rcu_head_remote {
+ struct rcu_head *rhp;
+ call_rcu_func_t *crf;
+ void (*func)(struct rcu_head *rhp);
+};
+
+/*
+ * Register a callback as specified by the rcu_head_remote struct.
+ * This function is intended to be invoked via smp_call_function_single().
+ */
+static void call_rcu_local(void *arg)
+{
+ struct rcu_head_remote *rhrp =
+ container_of(arg, struct rcu_head_remote, rhp);
+
+ rhrp->crf(rhrp->rhp, rhrp->func);
+}
+
+/*
+ * Set up an rcu_head_remote structure and the invoke call_rcu_local()
+ * on CPU 0 (which is guaranteed to be a non-no-CBs CPU) via
+ * smp_call_function_single().
+ */
+static void invoke_crf_remote(struct rcu_head *rhp,
+ void (*func)(struct rcu_head *rhp),
+ call_rcu_func_t crf)
+{
+ struct rcu_head_remote rhr;
+
+ rhr.rhp = rhp;
+ rhr.crf = crf;
+ rhr.func = func;
+ smp_call_function_single(0, call_rcu_local, &rhr, 1);
+}
+
+/*
+ * Helper functions to be passed to wait_rcu_gp(), each of which
+ * invokes invoke_crf_remote() to register a callback appropriately.
+ */
+static void __maybe_unused
+call_rcu_preempt_remote(struct rcu_head *rhp,
+ void (*func)(struct rcu_head *rhp))
+{
+ invoke_crf_remote(rhp, func, call_rcu);
+}
+static void call_rcu_bh_remote(struct rcu_head *rhp,
+ void (*func)(struct rcu_head *rhp))
+{
+ invoke_crf_remote(rhp, func, call_rcu_bh);
+}
+static void call_rcu_sched_remote(struct rcu_head *rhp,
+ void (*func)(struct rcu_head *rhp))
+{
+ invoke_crf_remote(rhp, func, call_rcu_sched);
+}
+
+/*
+ * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes
+ * callbacks queued by the corresponding no-CBs CPU.
+ */
+static int rcu_nocb_kthread(void *arg)
+{
+ int c, cl;
+ struct rcu_head *list;
+ struct rcu_head *next;
+ struct rcu_head **tail;
+ struct rcu_data *rdp = arg;
+
+ /* Each pass through this loop invokes one batch of callbacks */
+ for (;;) {
+ /* If not polling, wait for next batch of callbacks. */
+ if (!rcu_nocb_poll)
+ wait_event(rdp->nocb_wq, rdp->nocb_head);
+ list = ACCESS_ONCE(rdp->nocb_head);
+ if (!list) {
+ schedule_timeout_interruptible(1);
+ continue;
+ }
+
+ /*
+ * Extract queued callbacks, update counts, and wait
+ * for a grace period to elapse.
+ */
+ ACCESS_ONCE(rdp->nocb_head) = NULL;
+ tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
+ c = atomic_long_xchg(&rdp->nocb_q_count, 0);
+ cl = atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
+ ACCESS_ONCE(rdp->nocb_p_count) += c;
+ ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
+ wait_rcu_gp(rdp->rsp->call_remote);
+
+ /* Each pass through the following loop invokes a callback. */
+ trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
+ c = cl = 0;
+ while (list) {
+ next = list->next;
+ /* Wait for enqueuing to complete, if needed. */
+ while (next == NULL && &list->next != tail) {
+ schedule_timeout_interruptible(1);
+ next = list->next;
+ }
+ debug_rcu_head_unqueue(list);
+ local_bh_disable();
+ if (__rcu_reclaim(rdp->rsp->name, list))
+ cl++;
+ c++;
+ local_bh_enable();
+ list = next;
+ }
+ trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
+ ACCESS_ONCE(rdp->nocb_p_count) -= c;
+ ACCESS_ONCE(rdp->nocb_p_count_lazy) -= cl;
+ rdp->n_cbs_invoked += c;
+ }
+ return 0;
+}
+
+/* Initialize per-rcu_data variables for no-CBs CPUs. */
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+ rdp->nocb_tail = &rdp->nocb_head;
+ init_waitqueue_head(&rdp->nocb_wq);
+}
+
+/* Create a kthread for each RCU flavor for each no-CBs CPU. */
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+ int cpu;
+ struct rcu_data *rdp;
+ struct task_struct *t;
+
+ if (rcu_nocb_mask == NULL)
+ return;
+ for_each_cpu(cpu, rcu_nocb_mask) {
+ rdp = per_cpu_ptr(rsp->rda, cpu);
+ t = kthread_run(rcu_nocb_kthread, rdp, "rcuo%d", cpu);
+ BUG_ON(IS_ERR(t));
+ ACCESS_ONCE(rdp->nocb_kthread) = t;
+ }
+}
+
+/* Prevent __call_rcu() from enqueuing callbacks on no-CBs CPUs */
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+ if (rcu_nocb_mask == NULL ||
+ !cpumask_test_cpu(rdp->cpu, rcu_nocb_mask))
+ return;
+ rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+}
+
+/* Initialize the ->call_remote fields in the rcu_state structures. */
+static void __init rcu_init_nocb(void)
+{
+#ifdef CONFIG_PREEMPT_RCU
+ rcu_preempt_state.call_remote = call_rcu_preempt_remote;
+#endif /* #ifdef CONFIG_PREEMPT_RCU */
+ rcu_bh_state.call_remote = call_rcu_bh_remote;
+ rcu_sched_state.call_remote = call_rcu_sched_remote;
+}
+
+#else /* #ifdef CONFIG_RCU_NOCB_CPU */
+
+static bool is_nocb_cpu(int cpu)
+{
+ return false;
+}
+
+static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
+ bool lazy)
+{
+ return 0;
+}
+
+static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
+ struct rcu_data *rdp)
+{
+ return 0;
+}
+
+static bool nocb_cpu_expendable(int cpu)
+{
+ return 1;
+}
+
+static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
+{
+}
+
+static void init_nocb_callback_list(struct rcu_data *rdp)
+{
+}
+
+static void __init rcu_init_nocb(void)
+{
+}
+
+#endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */