drbd: fix potential deadlock during "restart" of conflicting writes
authorLars Ellenberg <lars.ellenberg@linbit.com>
Thu, 24 Nov 2011 09:36:25 +0000 (10:36 +0100)
committerPhilipp Reisner <philipp.reisner@linbit.com>
Thu, 8 Nov 2012 15:58:21 +0000 (16:58 +0100)
w_restart_write(), run from worker context, calls __drbd_make_request()
and further drbd_al_begin_io(, delegate=true), which then
potentially deadlocks.  The previous patch moved a BUG_ON to expose
such call paths, which would now be triggered.

Also, if we call __drbd_make_request() from resource worker context,
like w_restart_write() did, and that should block for whatever reason
(!drbd_state_is_stable(), resource suspended, ...),
we potentially deadlock the whole resource, as the worker
is needed for state changes and other things.

Create a dedicated retry workqueue for this instead.

Also make sure that inc_ap_bio()/dec_ap_bio() are properly paired,
even if do_retry() needs to retry itself,
in case __drbd_make_request() returns != 0.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
drivers/block/drbd/drbd_main.c
drivers/block/drbd/drbd_receiver.c
drivers/block/drbd/drbd_req.c
drivers/block/drbd/drbd_req.h

index a0045ac880406306a12c19d8c8c4e3f4da7bd1a6..5529d392e5d37e042f0f37fade76ca84def2df28 100644 (file)
@@ -2383,6 +2383,73 @@ void drbd_minor_destroy(struct kref *kref)
        kref_put(&tconn->kref, &conn_destroy);
 }
 
+/* One global retry thread, if we need to push back some bio and have it
+ * reinserted through our make request function.
+ */
+static struct retry_worker {
+       struct workqueue_struct *wq;
+       struct work_struct worker;
+
+       spinlock_t lock;
+       struct list_head writes;
+} retry;
+
+static void do_retry(struct work_struct *ws)
+{
+       struct retry_worker *retry = container_of(ws, struct retry_worker, worker);
+       LIST_HEAD(writes);
+       struct drbd_request *req, *tmp;
+
+       spin_lock_irq(&retry->lock);
+       list_splice_init(&retry->writes, &writes);
+       spin_unlock_irq(&retry->lock);
+
+       list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
+               struct drbd_conf *mdev = req->w.mdev;
+               struct bio *bio = req->master_bio;
+               unsigned long start_time = req->start_time;
+
+               /* We have exclusive access to this request object.
+                * If it had not been RQ_POSTPONED, the code path which queued
+                * it here would have completed and freed it already.
+                */
+               mempool_free(req, drbd_request_mempool);
+
+               /* A single suspended or otherwise blocking device may stall
+                * all others as well.  Fortunately, this code path is to
+                * recover from a situation that "should not happen":
+                * concurrent writes in multi-primary setup.
+                * In a "normal" lifecycle, this workqueue is supposed to be
+                * destroyed without ever doing anything.
+                * If it turns out to be an issue anyways, we can do per
+                * resource (replication group) or per device (minor) retry
+                * workqueues instead.
+                */
+
+               /* We are not just doing generic_make_request(),
+                * as we want to keep the start_time information. */
+               do {
+                       inc_ap_bio(mdev);
+               } while(__drbd_make_request(mdev, bio, start_time));
+       }
+}
+
+void drbd_restart_write(struct drbd_request *req)
+{
+       unsigned long flags;
+       spin_lock_irqsave(&retry.lock, flags);
+       list_move_tail(&req->tl_requests, &retry.writes);
+       spin_unlock_irqrestore(&retry.lock, flags);
+
+       /* Drop the extra reference that would otherwise
+        * have been dropped by complete_master_bio.
+        * do_retry() needs to grab a new one. */
+       dec_ap_bio(req->w.mdev);
+
+       queue_work(retry.wq, &retry.worker);
+}
+
+
 static void drbd_cleanup(void)
 {
        unsigned int i;
@@ -2402,6 +2469,9 @@ static void drbd_cleanup(void)
        if (drbd_proc)
                remove_proc_entry("drbd", NULL);
 
+       if (retry.wq)
+               destroy_workqueue(retry.wq);
+
        drbd_genl_unregister();
 
        idr_for_each_entry(&minors, mdev, i) {
@@ -2851,6 +2921,15 @@ int __init drbd_init(void)
        rwlock_init(&global_state_lock);
        INIT_LIST_HEAD(&drbd_tconns);
 
+       retry.wq = create_singlethread_workqueue("drbd-reissue");
+       if (!retry.wq) {
+               printk(KERN_ERR "drbd: unable to create retry workqueue\n");
+               goto fail;
+       }
+       INIT_WORK(&retry.worker, do_retry);
+       spin_lock_init(&retry.lock);
+       INIT_LIST_HEAD(&retry.writes);
+
        printk(KERN_INFO "drbd: initialized. "
               "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
               API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
index 8a7f61ba74a89e08406f150de0393f0df116313c..b159ad15abe5f8a60fa5b311ccba730a276a95b9 100644 (file)
@@ -1748,30 +1748,6 @@ static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
        return err;
 }
 
-static int w_restart_write(struct drbd_work *w, int cancel)
-{
-       struct drbd_request *req = container_of(w, struct drbd_request, w);
-       struct drbd_conf *mdev = w->mdev;
-       struct bio *bio;
-       unsigned long start_time;
-       unsigned long flags;
-
-       spin_lock_irqsave(&mdev->tconn->req_lock, flags);
-       if (!expect(req->rq_state & RQ_POSTPONED)) {
-               spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
-               return -EIO;
-       }
-       bio = req->master_bio;
-       start_time = req->start_time;
-       /* Postponed requests will not have their master_bio completed!  */
-       __req_mod(req, DISCARD_WRITE, NULL);
-       spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
-
-       while (__drbd_make_request(mdev, bio, start_time))
-               /* retry */ ;
-       return 0;
-}
-
 static void restart_conflicting_writes(struct drbd_conf *mdev,
                                       sector_t sector, int size)
 {
@@ -1785,11 +1761,9 @@ static void restart_conflicting_writes(struct drbd_conf *mdev,
                if (req->rq_state & RQ_LOCAL_PENDING ||
                    !(req->rq_state & RQ_POSTPONED))
                        continue;
-               if (expect(list_empty(&req->w.list))) {
-                       req->w.mdev = mdev;
-                       req->w.cb = w_restart_write;
-                       drbd_queue_work(&mdev->tconn->data.work, &req->w);
-               }
+               /* as it is RQ_POSTPONED, this will cause it to
+                * be queued on the retry workqueue. */
+               __req_mod(req, DISCARD_WRITE, NULL);
        }
 }
 
index c3f99bde0e1125d920bf3a6cee3e04092ad5265e..5f4436c3abb3d3388c393df2980adfe09e7338fd 100644 (file)
@@ -104,7 +104,7 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
         * and never sent), it should still be "empty" as
         * initialized in drbd_req_new(), so we can list_del() it
         * here unconditionally */
-       list_del(&req->tl_requests);
+       list_del_init(&req->tl_requests);
 
        /* if it was a write, we may have to set the corresponding
         * bit(s) out-of-sync first. If it had a local part, we need to
@@ -143,7 +143,10 @@ static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const
                }
        }
 
-       drbd_req_free(req);
+       if (s & RQ_POSTPONED)
+               drbd_restart_write(req);
+       else
+               drbd_req_free(req);
 }
 
 static void queue_barrier(struct drbd_conf *mdev)
@@ -289,8 +292,16 @@ void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
                if (!(s & RQ_POSTPONED)) {
                        m->error = ok ? 0 : (error ?: -EIO);
                        m->bio = req->master_bio;
+                       req->master_bio = NULL;
+               } else {
+                       /* Assert that this will be _req_is_done()
+                        * with this very invokation. */
+                       /* FIXME:
+                        * what about (RQ_LOCAL_PENDING | RQ_LOCAL_ABORTED)?
+                        */
+                       D_ASSERT(!(s & RQ_LOCAL_PENDING));
+                       D_ASSERT(s & RQ_NET_DONE);
                }
-               req->master_bio = NULL;
        }
 
        if (s & RQ_LOCAL_PENDING)
index 68f54050b7ca4459411f85f5820eec66e49dce29..492f81d3765a16ca3503ddb37ccf906b64932f20 100644 (file)
@@ -268,6 +268,9 @@ extern void request_timer_fn(unsigned long data);
 extern void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what);
 extern void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what);
 
+/* this is in drbd_main.c */
+extern void drbd_restart_write(struct drbd_request *req);
+
 /* use this if you don't want to deal with calling complete_master_bio()
  * outside the spinlock, e.g. when walking some list on cleanup. */
 static inline int _req_mod(struct drbd_request *req, enum drbd_req_event what)