locking/rwsem: Prevent decrement of reader count before increment
authorWaiman Long <longman@redhat.com>
Sun, 28 Apr 2019 21:25:38 +0000 (17:25 -0400)
committerGreg Kroah-Hartman <gregkh@linuxfoundation.org>
Tue, 21 May 2019 16:50:12 +0000 (18:50 +0200)
[ Upstream commit a9e9bcb45b1525ba7aea26ed9441e8632aeeda58 ]

During my rwsem testing, it was found that after a down_read(), the
reader count may occasionally become 0 or even negative. Consequently,
a writer may steal the lock at that time and execute with the reader
in parallel thus breaking the mutual exclusion guarantee of the write
lock. In other words, both readers and writer can become rwsem owners
simultaneously.

The current reader wakeup code does it in one pass to clear waiter->task
and put them into wake_q before fully incrementing the reader count.
Once waiter->task is cleared, the corresponding reader may see it,
finish the critical section and do unlock to decrement the count before
the count is incremented. This is not a problem if there is only one
reader to wake up as the count has been pre-incremented by 1.  It is
a problem if there are more than one readers to be woken up and writer
can steal the lock.

The wakeup was actually done in 2 passes before the following v4.9 commit:

  70800c3c0cc5 ("locking/rwsem: Scan the wait_list for readers only once")

To fix this problem, the wakeup is now done in two passes
again. In the first pass, we collect the readers and count them.
The reader count is then fully incremented. In the second pass, the
waiter->task is then cleared and they are put into wake_q to be woken
up later.

Signed-off-by: Waiman Long <longman@redhat.com>
Acked-by: Linus Torvalds <torvalds@linux-foundation.org>
Cc: Borislav Petkov <bp@alien8.de>
Cc: Davidlohr Bueso <dave@stgolabs.net>
Cc: Peter Zijlstra <peterz@infradead.org>
Cc: Thomas Gleixner <tglx@linutronix.de>
Cc: Tim Chen <tim.c.chen@linux.intel.com>
Cc: Will Deacon <will.deacon@arm.com>
Cc: huang ying <huang.ying.caritas@gmail.com>
Fixes: 70800c3c0cc5 ("locking/rwsem: Scan the wait_list for readers only once")
Link: http://lkml.kernel.org/r/20190428212557.13482-2-longman@redhat.com
Signed-off-by: Ingo Molnar <mingo@kernel.org>
Signed-off-by: Sasha Levin <sashal@kernel.org>
kernel/locking/rwsem-xadd.c

index c75017326c37aed4f06c1c1f5e419e5b36fbf92b..3f5be624c76494feba3f7dc901c161a73dd36b9f 100644 (file)
@@ -130,6 +130,7 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
 {
        struct rwsem_waiter *waiter, *tmp;
        long oldcount, woken = 0, adjustment = 0;
+       struct list_head wlist;
 
        /*
         * Take a peek at the queue head waiter such that we can determine
@@ -188,18 +189,42 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
         * of the queue. We know that woken will be at least 1 as we accounted
         * for above. Note we increment the 'active part' of the count by the
         * number of readers before waking any processes up.
+        *
+        * We have to do wakeup in 2 passes to prevent the possibility that
+        * the reader count may be decremented before it is incremented. It
+        * is because the to-be-woken waiter may not have slept yet. So it
+        * may see waiter->task got cleared, finish its critical section and
+        * do an unlock before the reader count increment.
+        *
+        * 1) Collect the read-waiters in a separate list, count them and
+        *    fully increment the reader count in rwsem.
+        * 2) For each waiters in the new list, clear waiter->task and
+        *    put them into wake_q to be woken up later.
         */
-       list_for_each_entry_safe(waiter, tmp, &sem->wait_list, list) {
-               struct task_struct *tsk;
-
+       list_for_each_entry(waiter, &sem->wait_list, list) {
                if (waiter->type == RWSEM_WAITING_FOR_WRITE)
                        break;
 
                woken++;
-               tsk = waiter->task;
+       }
+       list_cut_before(&wlist, &sem->wait_list, &waiter->list);
+
+       adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment;
+       if (list_empty(&sem->wait_list)) {
+               /* hit end of list above */
+               adjustment -= RWSEM_WAITING_BIAS;
+       }
+
+       if (adjustment)
+               atomic_long_add(adjustment, &sem->count);
+
+       /* 2nd pass */
+       list_for_each_entry_safe(waiter, tmp, &wlist, list) {
+               struct task_struct *tsk;
 
+               tsk = waiter->task;
                get_task_struct(tsk);
-               list_del(&waiter->list);
+
                /*
                 * Ensure calling get_task_struct() before setting the reader
                 * waiter to nil such that rwsem_down_read_failed() cannot
@@ -215,15 +240,6 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
                /* wake_q_add() already take the task ref */
                put_task_struct(tsk);
        }
-
-       adjustment = woken * RWSEM_ACTIVE_READ_BIAS - adjustment;
-       if (list_empty(&sem->wait_list)) {
-               /* hit end of list above */
-               adjustment -= RWSEM_WAITING_BIAS;
-       }
-
-       if (adjustment)
-               atomic_long_add(adjustment, &sem->count);
 }
 
 /*