svcrdma: Fix race with dto_tasklet in svc_rdma_send
authorTom Tucker <tom@opengridcomputing.com>
Tue, 6 May 2008 16:33:11 +0000 (11:33 -0500)
committerTom Tucker <tom@opengridcomputing.com>
Mon, 19 May 2008 12:33:44 +0000 (07:33 -0500)
The svc_rdma_send function will attempt to reap SQ WR to make room for
a new request if it finds the SQ full. This function races with the
dto_tasklet that also reaps SQ WR. To avoid polling and arming the CQ
unnecessarily move the test_and_clear_bit of the RDMAXPRT_SQ_PENDING
flag and arming of the CQ to the sq_cq_reap function.

Refactor the rq_cq_reap function to match sq_cq_reap so that the
code is easier to follow.

Signed-off-by: Tom Tucker <tom@opengridcomputing.com>
net/sunrpc/xprtrdma/svc_rdma_transport.c

index 1e0af2f205e90e2a518949f3dfbdef94187b83a7..73734173f9942accec80260e975538b11fa2850c 100644 (file)
@@ -228,23 +228,8 @@ static void dto_tasklet_func(unsigned long data)
                list_del_init(&xprt->sc_dto_q);
                spin_unlock_irqrestore(&dto_lock, flags);
 
-               if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
-                       ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
-                       rq_cq_reap(xprt);
-                       set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-                       /*
-                        * If data arrived before established event,
-                        * don't enqueue. This defers RPC I/O until the
-                        * RDMA connection is complete.
-                        */
-                       if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
-                               svc_xprt_enqueue(&xprt->sc_xprt);
-               }
-
-               if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
-                       ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-                       sq_cq_reap(xprt);
-               }
+               rq_cq_reap(xprt);
+               sq_cq_reap(xprt);
 
                svc_xprt_put(&xprt->sc_xprt);
                spin_lock_irqsave(&dto_lock, flags);
@@ -297,6 +282,10 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
        struct ib_wc wc;
        struct svc_rdma_op_ctxt *ctxt = NULL;
 
+       if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
+               return;
+
+       ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
        atomic_inc(&rdma_stat_rq_poll);
 
        spin_lock_bh(&xprt->sc_rq_dto_lock);
@@ -316,6 +305,15 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
 
        if (ctxt)
                atomic_inc(&rdma_stat_rq_prod);
+
+       set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+       /*
+        * If data arrived before established event,
+        * don't enqueue. This defers RPC I/O until the
+        * RDMA connection is complete.
+        */
+       if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+               svc_xprt_enqueue(&xprt->sc_xprt);
 }
 
 /*
@@ -328,6 +326,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
        struct ib_cq *cq = xprt->sc_sq_cq;
        int ret;
 
+
+       if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
+               return;
+
+       ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
        atomic_inc(&rdma_stat_sq_poll);
        while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
                ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
@@ -1010,7 +1013,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
                if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
                        spin_unlock_bh(&xprt->sc_lock);
                        atomic_inc(&rdma_stat_sq_starve);
-                       /* See if we can reap some SQ WR */
+
+                       /* See if we can opportunistically reap SQ WR to make room */
                        sq_cq_reap(xprt);
 
                        /* Wait until SQ WR available if SQ still full */