svcrpc: fix wspace-checking race
authorJ. Bruce Fields <bfields@redhat.com>
Tue, 26 Oct 2010 15:32:03 +0000 (11:32 -0400)
committerJ. Bruce Fields <bfields@redhat.com>
Fri, 19 Nov 2010 23:35:12 +0000 (18:35 -0500)
We call svc_xprt_enqueue() after something happens which we think may
require handling from a server thread.  To avoid such events being lost,
svc_xprt_enqueue() must guarantee that there will be a svc_serv() call
from a server thread following any such event.  It does that by either
waking up a server thread itself, or checking that XPT_BUSY is set (in
which case somebody else is doing it).

But the check of XPT_BUSY could occur just as someone finishes
processing some other event, and just before they clear XPT_BUSY.

Therefore it's important not to clear XPT_BUSY without subsequently
doing another svc_export_enqueue() to check whether the xprt should be
requeued.

The xpo_wspace() check in svc_xprt_enqueue() breaks this rule, allowing
an event to be missed in situations like:

data arrives
call svc_tcp_data_ready():
call svc_xprt_enqueue():
set BUSY
find no write space
svc_reserve():
free up write space
call svc_enqueue():
test BUSY
clear BUSY

So, instead, check wspace in the same places that the state flags are
checked: before taking BUSY, and in svc_receive().

Signed-off-by: J. Bruce Fields <bfields@redhat.com>
net/sunrpc/svc_xprt.c

index bfbda676574ab575fcca7deabdb0e0a209441718..b6f47da170b368e2427f4c5eb986863df47992a8 100644 (file)
@@ -302,6 +302,15 @@ static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
        list_del(&rqstp->rq_list);
 }
 
+static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
+{
+       if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
+               return true;
+       if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED)))
+               return xprt->xpt_ops->xpo_has_wspace(xprt);
+       return false;
+}
+
 /*
  * Queue up a transport with data pending. If there are idle nfsd
  * processes, wake 'em up.
@@ -314,8 +323,7 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
        struct svc_rqst *rqstp;
        int cpu;
 
-       if (!(xprt->xpt_flags &
-             ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED))))
+       if (!svc_xprt_has_something_to_do(xprt))
                return;
 
        cpu = get_cpu();
@@ -345,25 +353,6 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
        BUG_ON(xprt->xpt_pool != NULL);
        xprt->xpt_pool = pool;
 
-       /* Handle pending connection */
-       if (test_bit(XPT_CONN, &xprt->xpt_flags))
-               goto process;
-
-       /* Handle close in-progress */
-       if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
-               goto process;
-
-       /* Check if we have space to reply to a request */
-       if (!xprt->xpt_ops->xpo_has_wspace(xprt)) {
-               /* Don't enqueue while not enough space for reply */
-               dprintk("svc: no write space, transport %p  not enqueued\n",
-                       xprt);
-               xprt->xpt_pool = NULL;
-               clear_bit(XPT_BUSY, &xprt->xpt_flags);
-               goto out_unlock;
-       }
-
- process:
        if (!list_empty(&pool->sp_threads)) {
                rqstp = list_entry(pool->sp_threads.next,
                                   struct svc_rqst,
@@ -744,7 +733,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
                        spin_unlock_bh(&serv->sv_lock);
                        svc_xprt_received(newxpt);
                }
-       } else {
+       } else if (xprt->xpt_ops->xpo_has_wspace(xprt)) {
                dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
                        rqstp, pool->sp_id, xprt,
                        atomic_read(&xprt->xpt_ref.refcount));