xprtrdma: Split the completion queue
authorChuck Lever <chuck.lever@oracle.com>
Wed, 28 May 2014 14:33:25 +0000 (10:33 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Wed, 4 Jun 2014 12:56:42 +0000 (08:56 -0400)
The current CQ handler uses the ib_wc.opcode field to distinguish
between event types. However, the contents of that field are not
reliable if the completion status is not IB_WC_SUCCESS.

When an error completion occurs on a send event, the CQ handler
schedules a tasklet with something that is not a struct rpcrdma_rep.
This is never correct behavior, and sometimes it results in a panic.

To resolve this issue, split the completion queue into a send CQ and
a receive CQ. The send CQ handler now handles only struct rpcrdma_mw
wr_id's, and the receive CQ handler now handles only struct
rpcrdma_rep wr_id's.

Fix suggested by Shirley Ma <shirley.ma@oracle.com>

Reported-by: Rafael Reiter <rafael.reiter@ims.co.at>
Fixes: 5c635e09cec0feeeb310968e51dad01040244851
BugLink: https://bugzilla.kernel.org/show_bug.cgi?id=73211
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Tested-by: Klemens Senn <klemens.senn@ims.co.at>
Tested-by: Steve Wise <swise@opengridcomputing.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index edc951e1f1d9a043890ec183d4eed20616024d9f..af2d097c221fc9a0dd3b1a619e9f78b01bc3e8f1 100644 (file)
@@ -142,96 +142,115 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
        }
 }
 
-static inline
-void rpcrdma_event_process(struct ib_wc *wc)
+static void
+rpcrdma_sendcq_process_wc(struct ib_wc *wc)
 {
-       struct rpcrdma_mw *frmr;
-       struct rpcrdma_rep *rep =
-                       (struct rpcrdma_rep *)(unsigned long) wc->wr_id;
+       struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
 
-       dprintk("RPC:       %s: event rep %p status %X opcode %X length %u\n",
-               __func__, rep, wc->status, wc->opcode, wc->byte_len);
+       dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
+               __func__, frmr, wc->status, wc->opcode);
 
-       if (!rep) /* send completion that we don't care about */
+       if (wc->wr_id == 0ULL)
                return;
-
-       if (IB_WC_SUCCESS != wc->status) {
-               dprintk("RPC:       %s: WC opcode %d status %X, connection lost\n",
-                       __func__, wc->opcode, wc->status);
-               rep->rr_len = ~0U;
-               if (wc->opcode != IB_WC_FAST_REG_MR && wc->opcode != IB_WC_LOCAL_INV)
-                       rpcrdma_schedule_tasklet(rep);
+       if (wc->status != IB_WC_SUCCESS)
                return;
-       }
 
-       switch (wc->opcode) {
-       case IB_WC_FAST_REG_MR:
-               frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+       if (wc->opcode == IB_WC_FAST_REG_MR)
                frmr->r.frmr.state = FRMR_IS_VALID;
-               break;
-       case IB_WC_LOCAL_INV:
-               frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+       else if (wc->opcode == IB_WC_LOCAL_INV)
                frmr->r.frmr.state = FRMR_IS_INVALID;
-               break;
-       case IB_WC_RECV:
-               rep->rr_len = wc->byte_len;
-               ib_dma_sync_single_for_cpu(
-                       rdmab_to_ia(rep->rr_buffer)->ri_id->device,
-                       rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
-               /* Keep (only) the most recent credits, after check validity */
-               if (rep->rr_len >= 16) {
-                       struct rpcrdma_msg *p =
-                                       (struct rpcrdma_msg *) rep->rr_base;
-                       unsigned int credits = ntohl(p->rm_credit);
-                       if (credits == 0) {
-                               dprintk("RPC:       %s: server"
-                                       " dropped credits to 0!\n", __func__);
-                               /* don't deadlock */
-                               credits = 1;
-                       } else if (credits > rep->rr_buffer->rb_max_requests) {
-                               dprintk("RPC:       %s: server"
-                                       " over-crediting: %d (%d)\n",
-                                       __func__, credits,
-                                       rep->rr_buffer->rb_max_requests);
-                               credits = rep->rr_buffer->rb_max_requests;
-                       }
-                       atomic_set(&rep->rr_buffer->rb_credits, credits);
-               }
-               rpcrdma_schedule_tasklet(rep);
-               break;
-       default:
-               dprintk("RPC:       %s: unexpected WC event %X\n",
-                       __func__, wc->opcode);
-               break;
-       }
 }
 
-static inline int
-rpcrdma_cq_poll(struct ib_cq *cq)
+static int
+rpcrdma_sendcq_poll(struct ib_cq *cq)
 {
        struct ib_wc wc;
        int rc;
 
-       for (;;) {
-               rc = ib_poll_cq(cq, 1, &wc);
-               if (rc < 0) {
-                       dprintk("RPC:       %s: ib_poll_cq failed %i\n",
-                               __func__, rc);
-                       return rc;
-               }
-               if (rc == 0)
-                       break;
+       while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
+               rpcrdma_sendcq_process_wc(&wc);
+       return rc;
+}
 
-               rpcrdma_event_process(&wc);
+/*
+ * Handle send, fast_reg_mr, and local_inv completions.
+ *
+ * Send events are typically suppressed and thus do not result
+ * in an upcall. Occasionally one is signaled, however. This
+ * prevents the provider's completion queue from wrapping and
+ * losing a completion.
+ */
+static void
+rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
+{
+       int rc;
+
+       rc = rpcrdma_sendcq_poll(cq);
+       if (rc) {
+               dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
+                       __func__, rc);
+               return;
        }
 
-       return 0;
+       rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+       if (rc) {
+               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
+                       __func__, rc);
+               return;
+       }
+
+       rpcrdma_sendcq_poll(cq);
+}
+
+static void
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
+{
+       struct rpcrdma_rep *rep =
+                       (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
+
+       dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
+               __func__, rep, wc->status, wc->opcode, wc->byte_len);
+
+       if (wc->status != IB_WC_SUCCESS) {
+               rep->rr_len = ~0U;
+               goto out_schedule;
+       }
+       if (wc->opcode != IB_WC_RECV)
+               return;
+
+       rep->rr_len = wc->byte_len;
+       ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
+                       rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
+
+       if (rep->rr_len >= 16) {
+               struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
+               unsigned int credits = ntohl(p->rm_credit);
+
+               if (credits == 0)
+                       credits = 1;    /* don't deadlock */
+               else if (credits > rep->rr_buffer->rb_max_requests)
+                       credits = rep->rr_buffer->rb_max_requests;
+               atomic_set(&rep->rr_buffer->rb_credits, credits);
+       }
+
+out_schedule:
+       rpcrdma_schedule_tasklet(rep);
+}
+
+static int
+rpcrdma_recvcq_poll(struct ib_cq *cq)
+{
+       struct ib_wc wc;
+       int rc;
+
+       while ((rc = ib_poll_cq(cq, 1, &wc)) == 1)
+               rpcrdma_recvcq_process_wc(&wc);
+       return rc;
 }
 
 /*
- * rpcrdma_cq_event_upcall
+ * Handle receive completions.
  *
- * This upcall handles recv and send events.
  * It is reentrant but processes single events in order to maintain
  * ordering of receives to keep server credits.
  *
@@ -240,26 +259,27 @@ rpcrdma_cq_poll(struct ib_cq *cq)
  * connection shutdown. That is, the structures required for
  * the completion of the reply handler must remain intact until
  * all memory has been reclaimed.
- *
- * Note that send events are suppressed and do not result in an upcall.
  */
 static void
-rpcrdma_cq_event_upcall(struct ib_cq *cq, void *context)
+rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
 {
        int rc;
 
-       rc = rpcrdma_cq_poll(cq);
-       if (rc)
+       rc = rpcrdma_recvcq_poll(cq);
+       if (rc) {
+               dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
+                       __func__, rc);
                return;
+       }
 
        rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
        if (rc) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed %i\n",
+               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
                        __func__, rc);
                return;
        }
 
-       rpcrdma_cq_poll(cq);
+       rpcrdma_recvcq_poll(cq);
 }
 
 #ifdef RPC_DEBUG
@@ -610,6 +630,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                                struct rpcrdma_create_data_internal *cdata)
 {
        struct ib_device_attr devattr;
+       struct ib_cq *sendcq, *recvcq;
        int rc, err;
 
        rc = ib_query_device(ia->ri_id->device, &devattr);
@@ -685,7 +706,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                ep->rep_attr.cap.max_recv_sge);
 
        /* set trigger for requesting send completion */
-       ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 /*  - 1*/;
+       ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
        if (ep->rep_cqinit <= 2)
                ep->rep_cqinit = 0;
        INIT_CQCOUNT(ep);
@@ -693,26 +714,43 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        init_waitqueue_head(&ep->rep_connect_wait);
        INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
-       ep->rep_cq = ib_create_cq(ia->ri_id->device, rpcrdma_cq_event_upcall,
+       sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
                                  rpcrdma_cq_async_error_upcall, NULL,
-                                 ep->rep_attr.cap.max_recv_wr +
                                  ep->rep_attr.cap.max_send_wr + 1, 0);
-       if (IS_ERR(ep->rep_cq)) {
-               rc = PTR_ERR(ep->rep_cq);
-               dprintk("RPC:       %s: ib_create_cq failed: %i\n",
+       if (IS_ERR(sendcq)) {
+               rc = PTR_ERR(sendcq);
+               dprintk("RPC:       %s: failed to create send CQ: %i\n",
                        __func__, rc);
                goto out1;
        }
 
-       rc = ib_req_notify_cq(ep->rep_cq, IB_CQ_NEXT_COMP);
+       rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
        if (rc) {
                dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
                        __func__, rc);
                goto out2;
        }
 
-       ep->rep_attr.send_cq = ep->rep_cq;
-       ep->rep_attr.recv_cq = ep->rep_cq;
+       recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
+                                 rpcrdma_cq_async_error_upcall, NULL,
+                                 ep->rep_attr.cap.max_recv_wr + 1, 0);
+       if (IS_ERR(recvcq)) {
+               rc = PTR_ERR(recvcq);
+               dprintk("RPC:       %s: failed to create recv CQ: %i\n",
+                       __func__, rc);
+               goto out2;
+       }
+
+       rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
+       if (rc) {
+               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
+                       __func__, rc);
+               ib_destroy_cq(recvcq);
+               goto out2;
+       }
+
+       ep->rep_attr.send_cq = sendcq;
+       ep->rep_attr.recv_cq = recvcq;
 
        /* Initialize cma parameters */
 
@@ -734,7 +772,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        return 0;
 
 out2:
-       err = ib_destroy_cq(ep->rep_cq);
+       err = ib_destroy_cq(sendcq);
        if (err)
                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
                        __func__, err);
@@ -774,8 +812,14 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
                ep->rep_pad_mr = NULL;
        }
 
-       rpcrdma_clean_cq(ep->rep_cq);
-       rc = ib_destroy_cq(ep->rep_cq);
+       rpcrdma_clean_cq(ep->rep_attr.recv_cq);
+       rc = ib_destroy_cq(ep->rep_attr.recv_cq);
+       if (rc)
+               dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
+                       __func__, rc);
+
+       rpcrdma_clean_cq(ep->rep_attr.send_cq);
+       rc = ib_destroy_cq(ep->rep_attr.send_cq);
        if (rc)
                dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
                        __func__, rc);
@@ -798,7 +842,9 @@ retry:
                if (rc && rc != -ENOTCONN)
                        dprintk("RPC:       %s: rpcrdma_ep_disconnect"
                                " status %i\n", __func__, rc);
-               rpcrdma_clean_cq(ep->rep_cq);
+
+               rpcrdma_clean_cq(ep->rep_attr.recv_cq);
+               rpcrdma_clean_cq(ep->rep_attr.send_cq);
 
                xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
                id = rpcrdma_create_id(xprt, ia,
@@ -907,7 +953,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
        int rc;
 
-       rpcrdma_clean_cq(ep->rep_cq);
+       rpcrdma_clean_cq(ep->rep_attr.recv_cq);
+       rpcrdma_clean_cq(ep->rep_attr.send_cq);
        rc = rdma_disconnect(ia->ri_id);
        if (!rc) {
                /* returns without wait if not connected */
@@ -1727,7 +1774,6 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
        ib_dma_sync_single_for_cpu(ia->ri_id->device,
                rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
 
-       DECR_CQCOUNT(ep);
        rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
 
        if (rc)
index 362a19d16440a4f2ed12090027f549335a479813..334ab6ee041a934724aa7fc1e647173a88ec2d7e 100644 (file)
@@ -79,7 +79,6 @@ struct rpcrdma_ep {
        int                     rep_cqinit;
        int                     rep_connected;
        struct rpcrdma_ia       *rep_ia;
-       struct ib_cq            *rep_cq;
        struct ib_qp_init_attr  rep_attr;
        wait_queue_head_t       rep_connect_wait;
        struct ib_sge           rep_pad;        /* holds zeroed pad */