xprtrdma: Refactor MR recovery work queues
authorChuck Lever <chuck.lever@oracle.com>
Wed, 29 Jun 2016 17:52:54 +0000 (13:52 -0400)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Mon, 11 Jul 2016 19:50:43 +0000 (15:50 -0400)
I found that commit ead3f26e359e ("xprtrdma: Add ro_unmap_safe
memreg method"), which introduces ro_unmap_safe, never wired up the
FMR recovery worker.

The FMR and FRWR recovery work queues both do the same thing.
Instead of setting up separate individual work queues for this,
schedule a delayed worker to deal with them, since recovering MRs is
not performance-critical.

Fixes: ead3f26e359e ("xprtrdma: Add ro_unmap_safe memreg method")
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Tested-by: Steve Wise <swise@opengridcomputing.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/fmr_ops.c
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index df5fe17861056ea3b891191cad7e93b6e9579458..4837ced20b65c5cd7e5ba4c01bbbb1186ba4dfa5 100644 (file)
  * verb (fmr_op_unmap).
  */
 
-/* Transport recovery
- *
- * After a transport reconnect, fmr_op_map re-uses the MR already
- * allocated for the RPC, but generates a fresh rkey then maps the
- * MR again. This process is synchronous.
- */
-
 #include "xprt_rdma.h"
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -41,30 +34,6 @@ enum {
                                          IB_ACCESS_REMOTE_READ,
 };
 
-static struct workqueue_struct *fmr_recovery_wq;
-
-#define FMR_RECOVERY_WQ_FLAGS          (WQ_UNBOUND)
-
-int
-fmr_alloc_recovery_wq(void)
-{
-       fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
-       return !fmr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-fmr_destroy_recovery_wq(void)
-{
-       struct workqueue_struct *wq;
-
-       if (!fmr_recovery_wq)
-               return;
-
-       wq = fmr_recovery_wq;
-       fmr_recovery_wq = NULL;
-       destroy_workqueue(wq);
-}
-
 static int
 __fmr_init(struct rpcrdma_mw *mw, struct ib_pd *pd)
 {
@@ -115,66 +84,56 @@ __fmr_unmap(struct rpcrdma_mw *mw)
        return rc;
 }
 
-static void
-__fmr_dma_unmap(struct rpcrdma_mw *mw)
-{
-       struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
-
-       ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
-                       mw->mw_sg, mw->mw_nents, mw->mw_dir);
-       rpcrdma_put_mw(r_xprt, mw);
-}
-
-static void
-__fmr_reset_and_unmap(struct rpcrdma_mw *mw)
-{
-       int rc;
-
-       /* ORDER */
-       rc = __fmr_unmap(mw);
-       if (rc) {
-               pr_warn("rpcrdma: ib_unmap_fmr status %d, fmr %p orphaned\n",
-                       rc, mw);
-               return;
-       }
-       __fmr_dma_unmap(mw);
-}
-
 static void
 __fmr_release(struct rpcrdma_mw *r)
 {
+       LIST_HEAD(unmap_list);
        int rc;
 
        kfree(r->fmr.fm_physaddrs);
        kfree(r->mw_sg);
 
+       /* In case this one was left mapped, try to unmap it
+        * to prevent dealloc_fmr from failing with EBUSY
+        */
+       rc = __fmr_unmap(r);
+       if (rc)
+               pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n",
+                      r, rc);
+
        rc = ib_dealloc_fmr(r->fmr.fm_mr);
        if (rc)
                pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n",
                       r, rc);
 }
 
-/* Deferred reset of a single FMR. Generate a fresh rkey by
- * replacing the MR. There's no recovery if this fails.
+/* Reset of a single FMR.
+ *
+ * There's no recovery if this fails. The FMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
  */
 static void
-__fmr_recovery_worker(struct work_struct *work)
+fmr_op_recover_mr(struct rpcrdma_mw *mw)
 {
-       struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
-                                            mw_work);
+       struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+       int rc;
 
-       __fmr_reset_and_unmap(mw);
-       return;
-}
+       /* ORDER: invalidate first */
+       rc = __fmr_unmap(mw);
 
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
-static void
-__fmr_queue_recovery(struct rpcrdma_mw *mw)
-{
-       INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
-       queue_work(fmr_recovery_wq, &mw->mw_work);
+       /* ORDER: then DMA unmap */
+       ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+                       mw->mw_sg, mw->mw_nents, mw->mw_dir);
+       if (rc) {
+               pr_err("rpcrdma: FMR reset status %d, %p orphaned\n",
+                      rc, mw);
+               r_xprt->rx_stats.mrs_orphaned++;
+               return;
+       }
+
+       rpcrdma_put_mw(r_xprt, mw);
+       r_xprt->rx_stats.mrs_recovered++;
 }
 
 static int
@@ -245,16 +204,11 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
 
        mw = seg1->rl_mw;
        seg1->rl_mw = NULL;
-       if (!mw) {
-               mw = rpcrdma_get_mw(r_xprt);
-               if (!mw)
-                       return -ENOMEM;
-       } else {
-               /* this is a retransmit; generate a fresh rkey */
-               rc = __fmr_unmap(mw);
-               if (rc)
-                       return rc;
-       }
+       if (mw)
+               rpcrdma_defer_mr_recovery(mw);
+       mw = rpcrdma_get_mw(r_xprt);
+       if (!mw)
+               return -ENOMEM;
 
        pageoff = offset_in_page(seg1->mr_offset);
        seg1->mr_offset -= pageoff;     /* start of page */
@@ -309,7 +263,7 @@ out_maperr:
        pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
               len, (unsigned long long)dma_pages[0],
               pageoff, mw->mw_nents, rc);
-       __fmr_dma_unmap(mw);
+       rpcrdma_defer_mr_recovery(mw);
        return rc;
 }
 
@@ -332,7 +286,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        /* ORDER: Invalidate all of the req's MRs first
         *
         * ib_unmap_fmr() is slow, so use a single call instead
-        * of one call per mapped MR.
+        * of one call per mapped FMR.
         */
        for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
                seg = &req->rl_segments[i];
@@ -344,7 +298,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        }
        rc = ib_unmap_fmr(&unmap_list);
        if (rc)
-               pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc);
+               goto out_reset;
 
        /* ORDER: Now DMA unmap all of the req's MRs, and return
         * them to the free MW list.
@@ -354,7 +308,9 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
                mw = seg->rl_mw;
 
                list_del_init(&mw->fmr.fm_mr->list);
-               __fmr_dma_unmap(mw);
+               ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+                               mw->mw_sg, mw->mw_nents, mw->mw_dir);
+               rpcrdma_put_mw(r_xprt, mw);
 
                i += seg->mr_nsegs;
                seg->mr_nsegs = 0;
@@ -362,6 +318,20 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
        }
 
        req->rl_nchunks = 0;
+       return;
+
+out_reset:
+       pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
+
+       for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+               seg = &req->rl_segments[i];
+               mw = seg->rl_mw;
+
+               list_del_init(&mw->fmr.fm_mr->list);
+               fmr_op_recover_mr(mw);
+
+               i += seg->mr_nsegs;
+       }
 }
 
 /* Use a slow, safe mechanism to invalidate all memory regions
@@ -380,9 +350,9 @@ fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
                mw = seg->rl_mw;
 
                if (sync)
-                       __fmr_reset_and_unmap(mw);
+                       fmr_op_recover_mr(mw);
                else
-                       __fmr_queue_recovery(mw);
+                       rpcrdma_defer_mr_recovery(mw);
 
                i += seg->mr_nsegs;
                seg->mr_nsegs = 0;
@@ -407,6 +377,7 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
        .ro_map                         = fmr_op_map,
        .ro_unmap_sync                  = fmr_op_unmap_sync,
        .ro_unmap_safe                  = fmr_op_unmap_safe,
+       .ro_recover_mr                  = fmr_op_recover_mr,
        .ro_open                        = fmr_op_open,
        .ro_maxpages                    = fmr_op_maxpages,
        .ro_init                        = fmr_op_init,
index 9cd60bf0917da0d66474f1db12c4fbd342c565a2..cbb2d05be57fbc794b9a0ddca46237da4ebd01b3 100644 (file)
 # define RPCDBG_FACILITY       RPCDBG_TRANS
 #endif
 
-static struct workqueue_struct *frwr_recovery_wq;
-
-#define FRWR_RECOVERY_WQ_FLAGS         (WQ_UNBOUND | WQ_MEM_RECLAIM)
-
-int
-frwr_alloc_recovery_wq(void)
-{
-       frwr_recovery_wq = alloc_workqueue("frwr_recovery",
-                                          FRWR_RECOVERY_WQ_FLAGS, 0);
-       return !frwr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-frwr_destroy_recovery_wq(void)
-{
-       struct workqueue_struct *wq;
-
-       if (!frwr_recovery_wq)
-               return;
-
-       wq = frwr_recovery_wq;
-       frwr_recovery_wq = NULL;
-       destroy_workqueue(wq);
-}
-
 static int
 __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, unsigned int depth)
 {
@@ -168,8 +143,14 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
        return 0;
 }
 
+/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
+ *
+ * There's no recovery if this fails. The FRMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
 static void
-__frwr_reset_and_unmap(struct rpcrdma_mw *mw)
+frwr_op_recover_mr(struct rpcrdma_mw *mw)
 {
        struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
@@ -177,35 +158,15 @@ __frwr_reset_and_unmap(struct rpcrdma_mw *mw)
 
        rc = __frwr_reset_mr(ia, mw);
        ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir);
-       if (rc)
+       if (rc) {
+               pr_err("rpcrdma: FRMR reset status %d, %p orphaned\n",
+                      rc, mw);
+               r_xprt->rx_stats.mrs_orphaned++;
                return;
-       rpcrdma_put_mw(r_xprt, mw);
-}
-
-/* Deferred reset of a single FRMR. Generate a fresh rkey by
- * replacing the MR.
- *
- * There's no recovery if this fails. The FRMR is abandoned, but
- * remains in rb_all. It will be cleaned up when the transport is
- * destroyed.
- */
-static void
-__frwr_recovery_worker(struct work_struct *work)
-{
-       struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
-                                           mw_work);
-
-       __frwr_reset_and_unmap(r);
-}
+       }
 
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
-static void
-__frwr_queue_recovery(struct rpcrdma_mw *r)
-{
-       INIT_WORK(&r->mw_work, __frwr_recovery_worker);
-       queue_work(frwr_recovery_wq, &r->mw_work);
+       rpcrdma_put_mw(r_xprt, mw);
+       r_xprt->rx_stats.mrs_recovered++;
 }
 
 static int
@@ -401,7 +362,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
        seg1->rl_mw = NULL;
        do {
                if (mw)
-                       __frwr_queue_recovery(mw);
+                       rpcrdma_defer_mr_recovery(mw);
                mw = rpcrdma_get_mw(r_xprt);
                if (!mw)
                        return -ENOMEM;
@@ -483,12 +444,11 @@ out_mapmr_err:
        pr_err("rpcrdma: failed to map mr %p (%u/%u)\n",
               frmr->fr_mr, n, mw->mw_nents);
        rc = n < 0 ? n : -EIO;
-       __frwr_queue_recovery(mw);
+       rpcrdma_defer_mr_recovery(mw);
        return rc;
 
 out_senderr:
-       pr_err("rpcrdma: ib_post_send status %i\n", rc);
-       __frwr_queue_recovery(mw);
+       rpcrdma_defer_mr_recovery(mw);
        return rc;
 }
 
@@ -627,9 +587,9 @@ frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
                mw = seg->rl_mw;
 
                if (sync)
-                       __frwr_reset_and_unmap(mw);
+                       frwr_op_recover_mr(mw);
                else
-                       __frwr_queue_recovery(mw);
+                       rpcrdma_defer_mr_recovery(mw);
 
                i += seg->mr_nsegs;
                seg->mr_nsegs = 0;
@@ -642,9 +602,6 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
 {
        struct rpcrdma_mw *r;
 
-       /* Ensure stale MWs for "buf" are no longer in flight */
-       flush_workqueue(frwr_recovery_wq);
-
        while (!list_empty(&buf->rb_all)) {
                r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
                list_del(&r->mw_all);
@@ -657,6 +614,7 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
        .ro_map                         = frwr_op_map,
        .ro_unmap_sync                  = frwr_op_unmap_sync,
        .ro_unmap_safe                  = frwr_op_unmap_safe,
+       .ro_recover_mr                  = frwr_op_recover_mr,
        .ro_open                        = frwr_op_open,
        .ro_maxpages                    = frwr_op_maxpages,
        .ro_init                        = frwr_op_init,
index 99d2e5b72726abd00f1ac5e5732d5fa02119f55a..4c8e7f11b906fd74c0cc1fc2133f7efe76671b4e 100644 (file)
@@ -660,7 +660,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
                   xprt->stat.bad_xids,
                   xprt->stat.req_u,
                   xprt->stat.bklog_u);
-       seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n",
+       seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
                   r_xprt->rx_stats.read_chunk_count,
                   r_xprt->rx_stats.write_chunk_count,
                   r_xprt->rx_stats.reply_chunk_count,
@@ -672,6 +672,9 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
                   r_xprt->rx_stats.failed_marshal_count,
                   r_xprt->rx_stats.bad_reply_count,
                   r_xprt->rx_stats.nomsg_call_count);
+       seq_printf(seq, "%lu %lu\n",
+                  r_xprt->rx_stats.mrs_recovered,
+                  r_xprt->rx_stats.mrs_orphaned);
 }
 
 static int
@@ -741,7 +744,6 @@ void xprt_rdma_cleanup(void)
                        __func__, rc);
 
        rpcrdma_destroy_wq();
-       frwr_destroy_recovery_wq();
 
        rc = xprt_unregister_transport(&xprt_rdma_bc);
        if (rc)
@@ -753,20 +755,13 @@ int xprt_rdma_init(void)
 {
        int rc;
 
-       rc = frwr_alloc_recovery_wq();
-       if (rc)
-               return rc;
-
        rc = rpcrdma_alloc_wq();
-       if (rc) {
-               frwr_destroy_recovery_wq();
+       if (rc)
                return rc;
-       }
 
        rc = xprt_register_transport(&xprt_rdma);
        if (rc) {
                rpcrdma_destroy_wq();
-               frwr_destroy_recovery_wq();
                return rc;
        }
 
@@ -774,7 +769,6 @@ int xprt_rdma_init(void)
        if (rc) {
                xprt_unregister_transport(&xprt_rdma);
                rpcrdma_destroy_wq();
-               frwr_destroy_recovery_wq();
                return rc;
        }
 
index b044d98a1370207422d129689bb43b35766fc76f..77a371d3cde8416ec6226eff99005036ce9197c8 100644 (file)
@@ -777,6 +777,41 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
        ib_drain_qp(ia->ri_id->qp);
 }
 
+static void
+rpcrdma_mr_recovery_worker(struct work_struct *work)
+{
+       struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+                                                 rb_recovery_worker.work);
+       struct rpcrdma_mw *mw;
+
+       spin_lock(&buf->rb_recovery_lock);
+       while (!list_empty(&buf->rb_stale_mrs)) {
+               mw = list_first_entry(&buf->rb_stale_mrs,
+                                     struct rpcrdma_mw, mw_list);
+               list_del_init(&mw->mw_list);
+               spin_unlock(&buf->rb_recovery_lock);
+
+               dprintk("RPC:       %s: recovering MR %p\n", __func__, mw);
+               mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
+
+               spin_lock(&buf->rb_recovery_lock);
+       };
+       spin_unlock(&buf->rb_recovery_lock);
+}
+
+void
+rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
+{
+       struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+       spin_lock(&buf->rb_recovery_lock);
+       list_add(&mw->mw_list, &buf->rb_stale_mrs);
+       spin_unlock(&buf->rb_recovery_lock);
+
+       schedule_delayed_work(&buf->rb_recovery_worker, 0);
+}
+
 struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
@@ -837,8 +872,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 
        buf->rb_max_requests = r_xprt->rx_data.max_requests;
        buf->rb_bc_srv_max_requests = 0;
-       spin_lock_init(&buf->rb_lock);
        atomic_set(&buf->rb_credits, 1);
+       spin_lock_init(&buf->rb_lock);
+       spin_lock_init(&buf->rb_recovery_lock);
+       INIT_LIST_HEAD(&buf->rb_stale_mrs);
+       INIT_DELAYED_WORK(&buf->rb_recovery_worker,
+                         rpcrdma_mr_recovery_worker);
 
        rc = ia->ri_ops->ro_init(r_xprt);
        if (rc)
@@ -923,6 +962,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 {
        struct rpcrdma_ia *ia = rdmab_to_ia(buf);
 
+       cancel_delayed_work_sync(&buf->rb_recovery_worker);
+
        while (!list_empty(&buf->rb_recv_bufs)) {
                struct rpcrdma_rep *rep;
 
index 04696c046dc573e7da4658981ce49872279511fd..4e03037d042c36cd14c2440c64a2fb91bd170abf 100644 (file)
@@ -245,7 +245,6 @@ struct rpcrdma_mw {
                struct rpcrdma_fmr      fmr;
                struct rpcrdma_frmr     frmr;
        };
-       struct work_struct      mw_work;
        struct rpcrdma_xprt     *mw_xprt;
        struct list_head        mw_all;
 };
@@ -341,6 +340,10 @@ struct rpcrdma_buffer {
        struct list_head        rb_allreqs;
 
        u32                     rb_bc_max_requests;
+
+       spinlock_t              rb_recovery_lock; /* protect rb_stale_mrs */
+       struct list_head        rb_stale_mrs;
+       struct delayed_work     rb_recovery_worker;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 
@@ -387,6 +390,8 @@ struct rpcrdma_stats {
        unsigned long           bad_reply_count;
        unsigned long           nomsg_call_count;
        unsigned long           bcall_count;
+       unsigned long           mrs_recovered;
+       unsigned long           mrs_orphaned;
 };
 
 /*
@@ -400,6 +405,7 @@ struct rpcrdma_memreg_ops {
                                         struct rpcrdma_req *);
        void            (*ro_unmap_safe)(struct rpcrdma_xprt *,
                                         struct rpcrdma_req *, bool);
+       void            (*ro_recover_mr)(struct rpcrdma_mw *);
        int             (*ro_open)(struct rpcrdma_ia *,
                                   struct rpcrdma_ep *,
                                   struct rpcrdma_create_data_internal *);
@@ -477,6 +483,8 @@ void rpcrdma_buffer_put(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
+void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);
+
 struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
                                            size_t, gfp_t);
 void rpcrdma_free_regbuf(struct rpcrdma_ia *,
@@ -484,9 +492,6 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
 
 int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
 
-int frwr_alloc_recovery_wq(void);
-void frwr_destroy_recovery_wq(void);
-
 int rpcrdma_alloc_wq(void);
 void rpcrdma_destroy_wq(void);