}
/*
- * Kick the leader kthread for this NOCB group.
+ * Kick the leader kthread for this NOCB group. Caller holds ->nocb_lock
+ * and this function releases it.
*/
-static void wake_nocb_leader(struct rcu_data *rdp, bool force)
+static void __wake_nocb_leader(struct rcu_data *rdp, bool force,
+ unsigned long flags)
+ __releases(rdp->nocb_lock)
{
struct rcu_data *rdp_leader = rdp->nocb_leader;
- if (!READ_ONCE(rdp_leader->nocb_kthread))
+ lockdep_assert_held(&rdp->nocb_lock);
+ if (!READ_ONCE(rdp_leader->nocb_kthread)) {
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
return;
- if (READ_ONCE(rdp_leader->nocb_leader_sleep) || force) {
+ }
+ if (rdp_leader->nocb_leader_sleep || force) {
/* Prior smp_mb__after_atomic() orders against prior enqueue. */
WRITE_ONCE(rdp_leader->nocb_leader_sleep, false);
+ del_timer(&rdp->nocb_timer);
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
smp_mb(); /* ->nocb_leader_sleep before swake_up(). */
swake_up(&rdp_leader->nocb_wq);
+ } else {
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
}
}
+/*
+ * Kick the leader kthread for this NOCB group, but caller has not
+ * acquired locks.
+ */
+static void wake_nocb_leader(struct rcu_data *rdp, bool force)
+{
+ unsigned long flags;
+
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ __wake_nocb_leader(rdp, force, flags);
+}
+
+/*
+ * Arrange to wake the leader kthread for this NOCB group at some
+ * future time when it is safe to do so.
+ */
+static void wake_nocb_leader_defer(struct rcu_data *rdp, int waketype,
+ const char *reason)
+{
+ unsigned long flags;
+
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
+ mod_timer(&rdp->nocb_timer, jiffies + 1);
+ WRITE_ONCE(rdp->nocb_defer_wakeup, waketype);
+ trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, reason);
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
+}
+
/*
* Does the specified CPU need an RCU callback for the specified flavor
* of rcu_barrier()?
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
TPS("WakeEmpty"));
} else {
- WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE);
- /* Store ->nocb_defer_wakeup before ->rcu_urgent_qs. */
- smp_store_release(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs), true);
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
- TPS("WakeEmptyIsDeferred"));
+ wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE,
+ TPS("WakeEmptyIsDeferred"));
}
rdp->qlen_last_fqs_check = 0;
} else if (len > rdp->qlen_last_fqs_check + qhimark) {
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
TPS("WakeOvf"));
} else {
- WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_FORCE);
- /* Store ->nocb_defer_wakeup before ->rcu_urgent_qs. */
- smp_store_release(this_cpu_ptr(&rcu_dynticks.rcu_urgent_qs), true);
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
- TPS("WakeOvfIsDeferred"));
+ wake_nocb_leader_defer(rdp, RCU_NOCB_WAKE,
+ TPS("WakeOvfIsDeferred"));
}
rdp->qlen_last_fqs_check = LONG_MAX / 2;
} else {
static void nocb_leader_wait(struct rcu_data *my_rdp)
{
bool firsttime = true;
+ unsigned long flags;
bool gotcbs;
struct rcu_data *rdp;
struct rcu_head **tail;
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Sleep");
swait_event_interruptible(my_rdp->nocb_wq,
!READ_ONCE(my_rdp->nocb_leader_sleep));
- /* Memory barrier handled by smp_mb() calls below and repoll. */
+ raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
+ my_rdp->nocb_leader_sleep = true;
+ WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
+ del_timer(&my_rdp->nocb_timer);
+ raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
} else if (firsttime) {
firsttime = false; /* Don't drown trace log with "Poll"! */
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Poll");
* nocb_gp_head, where they await a grace period.
*/
gotcbs = false;
- smp_mb(); /* wakeup before ->nocb_head reads. */
+ smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head);
if (!rdp->nocb_gp_head)
gotcbs = true;
}
- /*
- * If there were no callbacks, sleep a bit, rescan after a
- * memory barrier, and go retry.
- */
+ /* No callbacks? Sleep a bit if polling, and go retry. */
if (unlikely(!gotcbs)) {
- if (!rcu_nocb_poll)
+ WARN_ON(signal_pending(current));
+ if (rcu_nocb_poll) {
+ schedule_timeout_interruptible(1);
+ } else {
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
"WokeEmpty");
- WARN_ON(signal_pending(current));
- schedule_timeout_interruptible(1);
-
- /* Rescan in case we were a victim of memory ordering. */
- my_rdp->nocb_leader_sleep = true;
- smp_mb(); /* Ensure _sleep true before scan. */
- for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower)
- if (READ_ONCE(rdp->nocb_head)) {
- /* Found CB, so short-circuit next wait. */
- my_rdp->nocb_leader_sleep = false;
- break;
- }
+ }
goto wait_again;
}
/* Wait for one grace period. */
rcu_nocb_wait_gp(my_rdp);
- /*
- * We left ->nocb_leader_sleep unset to reduce cache thrashing.
- * We set it now, but recheck for new callbacks while
- * traversing our follower list.
- */
- my_rdp->nocb_leader_sleep = true;
- smp_mb(); /* Ensure _sleep true before scan of ->nocb_head. */
-
/* Each pass through the following loop wakes a follower, if needed. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
- if (READ_ONCE(rdp->nocb_head))
+ if (!rcu_nocb_poll &&
+ READ_ONCE(rdp->nocb_head) &&
+ READ_ONCE(my_rdp->nocb_leader_sleep)) {
+ raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags);
my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/
+ raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags);
+ }
if (!rdp->nocb_gp_head)
continue; /* No CBs, so no need to wake follower. */
/* Append callbacks to follower's "done" list. */
- tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail);
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ tail = rdp->nocb_follower_tail;
+ rdp->nocb_follower_tail = rdp->nocb_gp_tail;
*tail = rdp->nocb_gp_head;
- smp_mb__after_atomic(); /* Store *tail before wakeup. */
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
- /*
- * List was empty, wake up the follower.
- * Memory barriers supplied by atomic_long_add().
- */
+ /* List was empty, so wake up the follower. */
swake_up(&rdp->nocb_wq);
}
}
*/
static void nocb_follower_wait(struct rcu_data *rdp)
{
- bool firsttime = true;
-
for (;;) {
- if (!rcu_nocb_poll) {
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
- "FollowerSleep");
- swait_event_interruptible(rdp->nocb_wq,
- READ_ONCE(rdp->nocb_follower_head));
- } else if (firsttime) {
- /* Don't drown trace log with "Poll"! */
- firsttime = false;
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "Poll");
- }
+ trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "FollowerSleep");
+ swait_event_interruptible(rdp->nocb_wq,
+ READ_ONCE(rdp->nocb_follower_head));
if (smp_load_acquire(&rdp->nocb_follower_head)) {
/* ^^^ Ensure CB invocation follows _head test. */
return;
}
- if (!rcu_nocb_poll)
- trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
- "WokeEmpty");
WARN_ON(signal_pending(current));
- schedule_timeout_interruptible(1);
+ trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeEmpty");
}
}
static int rcu_nocb_kthread(void *arg)
{
int c, cl;
+ unsigned long flags;
struct rcu_head *list;
struct rcu_head *next;
struct rcu_head **tail;
nocb_follower_wait(rdp);
/* Pull the ready-to-invoke callbacks onto local list. */
- list = READ_ONCE(rdp->nocb_follower_head);
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ list = rdp->nocb_follower_head;
+ rdp->nocb_follower_head = NULL;
+ tail = rdp->nocb_follower_tail;
+ rdp->nocb_follower_tail = &rdp->nocb_follower_head;
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
BUG_ON(!list);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty");
- WRITE_ONCE(rdp->nocb_follower_head, NULL);
- tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);
/* Each pass through the following loop invokes a callback. */
trace_rcu_batch_start(rdp->rsp->name,
}
/* Do a deferred wakeup of rcu_nocb_kthread(). */
-static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
+static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp)
{
+ unsigned long flags;
int ndw;
- if (!rcu_nocb_need_deferred_wakeup(rdp))
+ raw_spin_lock_irqsave(&rdp->nocb_lock, flags);
+ if (!rcu_nocb_need_deferred_wakeup(rdp)) {
+ raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags);
return;
+ }
ndw = READ_ONCE(rdp->nocb_defer_wakeup);
WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
- wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE);
+ __wake_nocb_leader(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("DeferredWake"));
}
+/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
+static void do_nocb_deferred_wakeup_timer(unsigned long x)
+{
+ do_nocb_deferred_wakeup_common((struct rcu_data *)x);
+}
+
+/*
+ * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
+ * This means we do an inexact common-case check. Note that if
+ * we miss, ->nocb_timer will eventually clean things up.
+ */
+static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
+{
+ if (rcu_nocb_need_deferred_wakeup(rdp))
+ do_nocb_deferred_wakeup_common(rdp);
+}
+
void __init rcu_init_nohz(void)
{
int cpu;
rdp->nocb_tail = &rdp->nocb_head;
init_swait_queue_head(&rdp->nocb_wq);
rdp->nocb_follower_tail = &rdp->nocb_follower_head;
+ raw_spin_lock_init(&rdp->nocb_lock);
+ setup_timer(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer,
+ (unsigned long)rdp);
}
/*