drbd: fix potential distributed deadlock
authorLars Ellenberg <lars.ellenberg@linbit.com>
Tue, 8 Mar 2011 16:11:40 +0000 (17:11 +0100)
committerPhilipp Reisner <philipp.reisner@linbit.com>
Tue, 24 May 2011 08:02:41 +0000 (10:02 +0200)
We limit ourselves to a configurable maximum number of pages used as
temporary bio pages.

If the configured "max_buffers" is not big enough to match the bandwidth
of the respective deployment, a distributed deadlock could be triggered
by e.g. fast online verify and heavy application IO.

TCP connections would block on congestion, because both receivers
would wait on pages to become available.

Fortunately the respective senders in this case would be able to give
back some pages already. So do that.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
drivers/block/drbd/drbd_worker.c

index f7e6c92f8d03d001c1a452fe92b0c605c450bd38..b5e53695fd7e0d2ad56df80d7bc18f011b1477ec 100644 (file)
@@ -297,42 +297,48 @@ void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *
        crypto_hash_final(&desc, digest);
 }
 
-static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
+/* TODO merge common code with w_e_end_ov_req */
+int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 {
        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
        int digest_size;
        void *digest;
-       int ok;
+       int ok = 1;
 
        D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
 
-       if (unlikely(cancel)) {
-               drbd_free_ee(mdev, e);
-               return 1;
-       }
+       if (unlikely(cancel))
+               goto out;
 
-       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
-               digest_size = crypto_hash_digestsize(mdev->csums_tfm);
-               digest = kmalloc(digest_size, GFP_NOIO);
-               if (digest) {
-                       drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
+       if (likely((e->flags & EE_WAS_ERROR) != 0))
+               goto out;
 
-                       inc_rs_pending(mdev);
-                       ok = drbd_send_drequest_csum(mdev,
-                                                    e->sector,
-                                                    e->size,
-                                                    digest,
-                                                    digest_size,
-                                                    P_CSUM_RS_REQUEST);
-                       kfree(digest);
-               } else {
-                       dev_err(DEV, "kmalloc() of digest failed.\n");
-                       ok = 0;
-               }
-       } else
-               ok = 1;
+       digest_size = crypto_hash_digestsize(mdev->csums_tfm);
+       digest = kmalloc(digest_size, GFP_NOIO);
+       if (digest) {
+               sector_t sector = e->sector;
+               unsigned int size = e->size;
+               drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
+               /* Free e and pages before send.
+                * In case we block on congestion, we could otherwise run into
+                * some distributed deadlock, if the other side blocks on
+                * congestion as well, because our receiver blocks in
+                * drbd_pp_alloc due to pp_in_use > max_buffers. */
+               drbd_free_ee(mdev, e);
+               e = NULL;
+               inc_rs_pending(mdev);
+               ok = drbd_send_drequest_csum(mdev, sector, size,
+                                            digest, digest_size,
+                                            P_CSUM_RS_REQUEST);
+               kfree(digest);
+       } else {
+               dev_err(DEV, "kmalloc() of digest failed.\n");
+               ok = 0;
+       }
 
-       drbd_free_ee(mdev, e);
+out:
+       if (e)
+               drbd_free_ee(mdev, e);
 
        if (unlikely(!ok))
                dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
@@ -1071,9 +1077,12 @@ int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
        return ok;
 }
 
+/* TODO merge common code with w_e_send_csum */
 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 {
        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
+       sector_t sector = e->sector;
+       unsigned int size = e->size;
        int digest_size;
        void *digest;
        int ok = 1;
@@ -1093,17 +1102,25 @@ int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
        else
                memset(digest, 0, digest_size);
 
+       /* Free e and pages before send.
+        * In case we block on congestion, we could otherwise run into
+        * some distributed deadlock, if the other side blocks on
+        * congestion as well, because our receiver blocks in
+        * drbd_pp_alloc due to pp_in_use > max_buffers. */
+       drbd_free_ee(mdev, e);
+       e = NULL;
        inc_rs_pending(mdev);
-       ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
-                                    digest, digest_size, P_OV_REPLY);
+       ok = drbd_send_drequest_csum(mdev, sector, size,
+                                    digest, digest_size,
+                                    P_OV_REPLY);
        if (!ok)
                dec_rs_pending(mdev);
        kfree(digest);
 
 out:
-       drbd_free_ee(mdev, e);
+       if (e)
+               drbd_free_ee(mdev, e);
        dec_unacked(mdev);
-
        return ok;
 }
 
@@ -1122,8 +1139,10 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 {
        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
        struct digest_info *di;
-       int digest_size;
        void *digest;
+       sector_t sector = e->sector;
+       unsigned int size = e->size;
+       int digest_size;
        int ok, eq = 0;
 
        if (unlikely(cancel)) {
@@ -1153,16 +1172,21 @@ int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
                }
        }
 
-       dec_unacked(mdev);
+               /* Free e and pages before send.
+                * In case we block on congestion, we could otherwise run into
+                * some distributed deadlock, if the other side blocks on
+                * congestion as well, because our receiver blocks in
+                * drbd_pp_alloc due to pp_in_use > max_buffers. */
+       drbd_free_ee(mdev, e);
        if (!eq)
-               drbd_ov_oos_found(mdev, e->sector, e->size);
+               drbd_ov_oos_found(mdev, sector, size);
        else
                ov_oos_print(mdev);
 
-       ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
+       ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
                              eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
 
-       drbd_free_ee(mdev, e);
+       dec_unacked(mdev);
 
        --mdev->ov_left;