xprtrdma: Use workqueue to process RPC/RDMA replies
authorChuck Lever <chuck.lever@oracle.com>
Sat, 24 Oct 2015 21:27:10 +0000 (17:27 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Mon, 2 Nov 2015 18:45:15 +0000 (13:45 -0500)
The reply tasklet is fast, but it's single threaded. After reply
traffic saturates a single CPU, there's no more reply processing
capacity.

Replace the tasklet with a workqueue to spread reply handling across
all CPUs.  This also moves RPC/RDMA reply handling out of the soft
IRQ context and into a context that allows sleeps.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Reviewed-by: Sagi Grimberg <sagig@mellanox.com>
Tested-By: Devesh Sharma <devesh.sharma@avagotech.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 60ffa63096dd9b624e1f42b10980b8b45ffa4fce..95774fcc1b4352ca13be01a6823554f17cec26fc 100644 (file)
@@ -723,8 +723,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
        schedule_delayed_work(&ep->rep_connect_worker, 0);
 }
 
-/*
- * Called as a tasklet to do req/reply match and complete a request
+/* Process received RPC/RDMA messages.
+ *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
  */
@@ -752,13 +752,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
        if (headerp->rm_vers != rpcrdma_version)
                goto out_badversion;
 
-       /* Get XID and try for a match. */
-       spin_lock(&xprt->transport_lock);
+       /* Match incoming rpcrdma_rep to an rpcrdma_req to
+        * get context for handling any incoming chunks.
+        */
+       spin_lock_bh(&xprt->transport_lock);
        rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
        if (!rqst)
                goto out_nomatch;
 
-       /* get request object */
        req = rpcr_to_rdmar(rqst);
        if (req->rl_reply)
                goto out_duplicate;
@@ -859,7 +860,7 @@ badheader:
                xprt_release_rqst_cong(rqst->rq_task);
 
        xprt_complete_rqst(rqst->rq_task, status);
-       spin_unlock(&xprt->transport_lock);
+       spin_unlock_bh(&xprt->transport_lock);
        dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
                        __func__, xprt, rqst, status);
        return;
@@ -882,14 +883,14 @@ out_badversion:
        goto repost;
 
 out_nomatch:
-       spin_unlock(&xprt->transport_lock);
+       spin_unlock_bh(&xprt->transport_lock);
        dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
                __func__, be32_to_cpu(headerp->rm_xid),
                rep->rr_len);
        goto repost;
 
 out_duplicate:
-       spin_unlock(&xprt->transport_lock);
+       spin_unlock_bh(&xprt->transport_lock);
        dprintk("RPC:       %s: "
                "duplicate reply %p to RPC request %p: xid 0x%08x\n",
                __func__, rep, req, be32_to_cpu(headerp->rm_xid));
index e9e5ed7a57fe10a7888626229d74b686e6d0974d..897a2f3a96202a402e3ab6ee78425326cb7cb9ff 100644 (file)
@@ -732,6 +732,7 @@ void xprt_rdma_cleanup(void)
                dprintk("RPC:       %s: xprt_unregister returned %i\n",
                        __func__, rc);
 
+       rpcrdma_destroy_wq();
        frwr_destroy_recovery_wq();
 }
 
@@ -743,8 +744,15 @@ int xprt_rdma_init(void)
        if (rc)
                return rc;
 
+       rc = rpcrdma_alloc_wq();
+       if (rc) {
+               frwr_destroy_recovery_wq();
+               return rc;
+       }
+
        rc = xprt_register_transport(&xprt_rdma);
        if (rc) {
+               rpcrdma_destroy_wq();
                frwr_destroy_recovery_wq();
                return rc;
        }
index c09f1b6c3f0a1c6eec56e95292066c7598fe6070..5c20629544bbb88005a19f9c183be9c8c13b37dd 100644 (file)
@@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data)
 
 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
 
+static struct workqueue_struct *rpcrdma_receive_wq;
+
+int
+rpcrdma_alloc_wq(void)
+{
+       struct workqueue_struct *recv_wq;
+
+       recv_wq = alloc_workqueue("xprtrdma_receive",
+                                 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+                                 0);
+       if (!recv_wq)
+               return -ENOMEM;
+
+       rpcrdma_receive_wq = recv_wq;
+       return 0;
+}
+
+void
+rpcrdma_destroy_wq(void)
+{
+       struct workqueue_struct *wq;
+
+       if (rpcrdma_receive_wq) {
+               wq = rpcrdma_receive_wq;
+               rpcrdma_receive_wq = NULL;
+               destroy_workqueue(wq);
+       }
+}
+
 static void
 rpcrdma_schedule_tasklet(struct list_head *sched_list)
 {
@@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
 }
 
 static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_receive_worker(struct work_struct *work)
+{
+       struct rpcrdma_rep *rep =
+                       container_of(work, struct rpcrdma_rep, rr_work);
+
+       rpcrdma_reply_handler(rep);
+}
+
+static void
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
 {
        struct rpcrdma_rep *rep =
                        (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
        prefetch(rdmab_to_msg(rep->rr_rdmabuf));
 
 out_schedule:
-       list_add_tail(&rep->rr_list, sched_list);
+       queue_work(rpcrdma_receive_wq, &rep->rr_work);
        return;
+
 out_fail:
        if (wc->status != IB_WC_WR_FLUSH_ERR)
                pr_err("RPC:       %s: rep %p: %s\n",
@@ -239,7 +278,6 @@ static void
 rpcrdma_recvcq_poll(struct ib_cq *cq)
 {
        struct ib_wc *pos, wcs[4];
-       LIST_HEAD(sched_list);
        int count, rc;
 
        do {
@@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq)
 
                count = rc;
                while (count-- > 0)
-                       rpcrdma_recvcq_process_wc(pos++, &sched_list);
+                       rpcrdma_recvcq_process_wc(pos++);
        } while (rc == ARRAY_SIZE(wcs));
-
-       rpcrdma_schedule_tasklet(&sched_list);
 }
 
 /* Handle provider receive completion upcalls.
@@ -272,12 +308,9 @@ static void
 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 {
        struct ib_wc wc;
-       LIST_HEAD(sched_list);
 
        while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-               rpcrdma_recvcq_process_wc(&wc, &sched_list);
-       if (!list_empty(&sched_list))
-               rpcrdma_schedule_tasklet(&sched_list);
+               rpcrdma_recvcq_process_wc(&wc);
        while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
                rpcrdma_sendcq_process_wc(&wc);
 }
@@ -913,6 +946,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 
        rep->rr_device = ia->ri_device;
        rep->rr_rxprt = r_xprt;
+       INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
        return rep;
 
 out_free:
index e6a358fd1f1d6b31c9e780d96c813524a0cd94ba..6ea1dbe46e88354f66bf8cb056dd615a29e0b569 100644 (file)
@@ -164,6 +164,7 @@ struct rpcrdma_rep {
        unsigned int            rr_len;
        struct ib_device        *rr_device;
        struct rpcrdma_xprt     *rr_rxprt;
+       struct work_struct      rr_work;
        struct list_head        rr_list;
        struct rpcrdma_regbuf   *rr_rdmabuf;
 };
@@ -430,6 +431,9 @@ unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
 int frwr_alloc_recovery_wq(void);
 void frwr_destroy_recovery_wq(void);
 
+int rpcrdma_alloc_wq(void);
+void rpcrdma_destroy_wq(void);
+
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */