From dbcd00eba99945acfc433508a58eadc5dcd18cad Mon Sep 17 00:00:00 2001 From: Tom Tucker Date: Tue, 6 May 2008 11:33:11 -0500 Subject: [PATCH] svcrdma: Fix race with dto_tasklet in svc_rdma_send The svc_rdma_send function will attempt to reap SQ WR to make room for a new request if it finds the SQ full. This function races with the dto_tasklet that also reaps SQ WR. To avoid polling and arming the CQ unnecessarily move the test_and_clear_bit of the RDMAXPRT_SQ_PENDING flag and arming of the CQ to the sq_cq_reap function. Refactor the rq_cq_reap function to match sq_cq_reap so that the code is easier to follow. Signed-off-by: Tom Tucker --- net/sunrpc/xprtrdma/svc_rdma_transport.c | 40 +++++++++++++----------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 1e0af2f205e9..73734173f994 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -228,23 +228,8 @@ static void dto_tasklet_func(unsigned long data) list_del_init(&xprt->sc_dto_q); spin_unlock_irqrestore(&dto_lock, flags); - if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) { - ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); - rq_cq_reap(xprt); - set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); - /* - * If data arrived before established event, - * don't enqueue. This defers RPC I/O until the - * RDMA connection is complete. - */ - if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) - svc_xprt_enqueue(&xprt->sc_xprt); - } - - if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) { - ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP); - sq_cq_reap(xprt); - } + rq_cq_reap(xprt); + sq_cq_reap(xprt); svc_xprt_put(&xprt->sc_xprt); spin_lock_irqsave(&dto_lock, flags); @@ -297,6 +282,10 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt) struct ib_wc wc; struct svc_rdma_op_ctxt *ctxt = NULL; + if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) + return; + + ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); atomic_inc(&rdma_stat_rq_poll); spin_lock_bh(&xprt->sc_rq_dto_lock); @@ -316,6 +305,15 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt) if (ctxt) atomic_inc(&rdma_stat_rq_prod); + + set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); + /* + * If data arrived before established event, + * don't enqueue. This defers RPC I/O until the + * RDMA connection is complete. + */ + if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags)) + svc_xprt_enqueue(&xprt->sc_xprt); } /* @@ -328,6 +326,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt) struct ib_cq *cq = xprt->sc_sq_cq; int ret; + + if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) + return; + + ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP); atomic_inc(&rdma_stat_sq_poll); while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; @@ -1010,7 +1013,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) { spin_unlock_bh(&xprt->sc_lock); atomic_inc(&rdma_stat_sq_starve); - /* See if we can reap some SQ WR */ + + /* See if we can opportunistically reap SQ WR to make room */ sq_cq_reap(xprt); /* Wait until SQ WR available if SQ still full */ -- 2.20.1