From 9d659ae14b545c4296e812c70493bfdc999b5c1c Mon Sep 17 00:00:00 2001 From: Peter Zijlstra Date: Tue, 23 Aug 2016 14:40:16 +0200 Subject: [PATCH] locking/mutex: Add lock handoff to avoid starvation Implement lock handoff to avoid lock starvation. Lock starvation is possible because mutex_lock() allows lock stealing, where a running (or optimistic spinning) task beats the woken waiter to the acquire. Lock stealing is an important performance optimization because waiting for a waiter to wake up and get runtime can take a significant time, during which everyboy would stall on the lock. The down-side is of course that it allows for starvation. This patch has the waiter requesting a handoff if it fails to acquire the lock upon waking. This re-introduces some of the wait time, because once we do a handoff we have to wait for the waiter to wake up again. A future patch will add a round of optimistic spinning to attempt to alleviate this penalty, but if that turns out to not be enough, we can add a counter and only request handoff after multiple failed wakeups. There are a few tricky implementation details: - accepting a handoff must only be done in the wait-loop. Since the handoff condition is owner == current, it can easily cause recursive locking trouble. - accepting the handoff must be careful to provide the ACQUIRE semantics. - having the HANDOFF bit set on unlock requires care, we must not clear the owner. - we must be careful to not leave HANDOFF set after we've acquired the lock. The tricky scenario is setting the HANDOFF bit on an unlocked mutex. Tested-by: Jason Low Signed-off-by: Peter Zijlstra (Intel) Reviewed-by: Waiman Long Cc: Andrew Morton Cc: Linus Torvalds Cc: Paul E. McKenney Cc: Peter Zijlstra Cc: Thomas Gleixner Cc: linux-kernel@vger.kernel.org Signed-off-by: Ingo Molnar --- kernel/locking/mutex.c | 142 ++++++++++++++++++++++++++++++++++------- 1 file changed, 119 insertions(+), 23 deletions(-) diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c index de1ce0bae0d5..b4ebd8b9bcd5 100644 --- a/kernel/locking/mutex.c +++ b/kernel/locking/mutex.c @@ -54,8 +54,10 @@ EXPORT_SYMBOL(__mutex_init); * bits to store extra state. * * Bit0 indicates a non-empty waiter list; unlock must issue a wakeup. + * Bit1 indicates unlock needs to hand the lock to the top-waiter */ #define MUTEX_FLAG_WAITERS 0x01 +#define MUTEX_FLAG_HANDOFF 0x02 #define MUTEX_FLAGS 0x03 @@ -71,20 +73,48 @@ static inline unsigned long __owner_flags(unsigned long owner) /* * Actual trylock that will work on any unlocked state. + * + * When setting the owner field, we must preserve the low flag bits. + * + * Be careful with @handoff, only set that in a wait-loop (where you set + * HANDOFF) to avoid recursive lock attempts. */ -static inline bool __mutex_trylock(struct mutex *lock) +static inline bool __mutex_trylock(struct mutex *lock, const bool handoff) { unsigned long owner, curr = (unsigned long)current; owner = atomic_long_read(&lock->owner); for (;;) { /* must loop, can race against a flag */ - unsigned long old; + unsigned long old, flags = __owner_flags(owner); + + if (__owner_task(owner)) { + if (handoff && unlikely(__owner_task(owner) == current)) { + /* + * Provide ACQUIRE semantics for the lock-handoff. + * + * We cannot easily use load-acquire here, since + * the actual load is a failed cmpxchg, which + * doesn't imply any barriers. + * + * Also, this is a fairly unlikely scenario, and + * this contains the cost. + */ + smp_mb(); /* ACQUIRE */ + return true; + } - if (__owner_task(owner)) return false; + } + + /* + * We set the HANDOFF bit, we must make sure it doesn't live + * past the point where we acquire it. This would be possible + * if we (accidentally) set the bit on an unlocked mutex. + */ + if (handoff) + flags &= ~MUTEX_FLAG_HANDOFF; - old = atomic_long_cmpxchg_acquire(&lock->owner, owner, - curr | __owner_flags(owner)); + old = atomic_long_cmpxchg_acquire(&lock->owner, owner, curr | flags); if (old == owner) return true; @@ -134,6 +164,39 @@ static inline void __mutex_clear_flag(struct mutex *lock, unsigned long flag) atomic_long_andnot(flag, &lock->owner); } +static inline bool __mutex_waiter_is_first(struct mutex *lock, struct mutex_waiter *waiter) +{ + return list_first_entry(&lock->wait_list, struct mutex_waiter, list) == waiter; +} + +/* + * Give up ownership to a specific task, when @task = NULL, this is equivalent + * to a regular unlock. Clears HANDOFF, preserves WAITERS. Provides RELEASE + * semantics like a regular unlock, the __mutex_trylock() provides matching + * ACQUIRE semantics for the handoff. + */ +static void __mutex_handoff(struct mutex *lock, struct task_struct *task) +{ + unsigned long owner = atomic_long_read(&lock->owner); + + for (;;) { + unsigned long old, new; + +#ifdef CONFIG_DEBUG_MUTEXES + DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current); +#endif + + new = (owner & MUTEX_FLAG_WAITERS); + new |= (unsigned long)task; + + old = atomic_long_cmpxchg_release(&lock->owner, owner, new); + if (old == owner) + break; + + owner = old; + } +} + #ifndef CONFIG_DEBUG_LOCK_ALLOC /* * We split the mutex lock/unlock logic into separate fastpath and @@ -398,7 +461,7 @@ static bool mutex_optimistic_spin(struct mutex *lock, break; /* Try to acquire the mutex if it is unlocked. */ - if (__mutex_trylock(lock)) { + if (__mutex_trylock(lock, false)) { osq_unlock(&lock->osq); return true; } @@ -523,6 +586,7 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, struct task_struct *task = current; struct mutex_waiter waiter; unsigned long flags; + bool first = false; int ret; if (use_ww_ctx) { @@ -534,7 +598,8 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, preempt_disable(); mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip); - if (__mutex_trylock(lock) || mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) { + if (__mutex_trylock(lock, false) || + mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) { /* got the lock, yay! */ lock_acquired(&lock->dep_map, ip); if (use_ww_ctx) { @@ -551,7 +616,7 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, /* * After waiting to acquire the wait_lock, try again. */ - if (__mutex_trylock(lock)) + if (__mutex_trylock(lock, false)) goto skip_wait; debug_mutex_lock_common(lock, &waiter); @@ -561,13 +626,13 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, list_add_tail(&waiter.list, &lock->wait_list); waiter.task = task; - if (list_first_entry(&lock->wait_list, struct mutex_waiter, list) == &waiter) + if (__mutex_waiter_is_first(lock, &waiter)) __mutex_set_flag(lock, MUTEX_FLAG_WAITERS); lock_contended(&lock->dep_map, ip); for (;;) { - if (__mutex_trylock(lock)) + if (__mutex_trylock(lock, first)) break; /* @@ -586,17 +651,20 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, } __set_task_state(task, state); - - /* didn't get the lock, go to sleep: */ spin_unlock_mutex(&lock->wait_lock, flags); schedule_preempt_disabled(); spin_lock_mutex(&lock->wait_lock, flags); + + if (!first && __mutex_waiter_is_first(lock, &waiter)) { + first = true; + __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF); + } } __set_task_state(task, TASK_RUNNING); mutex_remove_waiter(lock, &waiter, task); if (likely(list_empty(&lock->wait_list))) - __mutex_clear_flag(lock, MUTEX_FLAG_WAITERS); + __mutex_clear_flag(lock, MUTEX_FLAGS); debug_mutex_free_waiter(&waiter); @@ -724,33 +792,61 @@ EXPORT_SYMBOL_GPL(__ww_mutex_lock_interruptible); */ static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip) { + struct task_struct *next = NULL; unsigned long owner, flags; WAKE_Q(wake_q); mutex_release(&lock->dep_map, 1, ip); /* - * Release the lock before (potentially) taking the spinlock - * such that other contenders can get on with things ASAP. + * Release the lock before (potentially) taking the spinlock such that + * other contenders can get on with things ASAP. + * + * Except when HANDOFF, in that case we must not clear the owner field, + * but instead set it to the top waiter. */ - owner = atomic_long_fetch_and_release(MUTEX_FLAGS, &lock->owner); - if (!__owner_flags(owner)) - return; + owner = atomic_long_read(&lock->owner); + for (;;) { + unsigned long old; + +#ifdef CONFIG_DEBUG_MUTEXES + DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current); +#endif + + if (owner & MUTEX_FLAG_HANDOFF) + break; + + old = atomic_long_cmpxchg_release(&lock->owner, owner, + __owner_flags(owner)); + if (old == owner) { + if (owner & MUTEX_FLAG_WAITERS) + break; + + return; + } + + owner = old; + } spin_lock_mutex(&lock->wait_lock, flags); debug_mutex_unlock(lock); - if (!list_empty(&lock->wait_list)) { /* get the first entry from the wait-list: */ struct mutex_waiter *waiter = - list_entry(lock->wait_list.next, - struct mutex_waiter, list); + list_first_entry(&lock->wait_list, + struct mutex_waiter, list); + + next = waiter->task; debug_mutex_wake_waiter(lock, waiter); - wake_q_add(&wake_q, waiter->task); + wake_q_add(&wake_q, next); } + if (owner & MUTEX_FLAG_HANDOFF) + __mutex_handoff(lock, next); + spin_unlock_mutex(&lock->wait_lock, flags); + wake_up_q(&wake_q); } @@ -853,7 +949,7 @@ __ww_mutex_lock_interruptible_slowpath(struct ww_mutex *lock, */ int __sched mutex_trylock(struct mutex *lock) { - bool locked = __mutex_trylock(lock); + bool locked = __mutex_trylock(lock, false); if (locked) mutex_acquire(&lock->dep_map, 0, 1, _RET_IP_); -- 2.20.1