drbd: add lists to find oldest pending requests
authorLars Ellenberg <lars.ellenberg@linbit.com>
Fri, 22 Nov 2013 11:52:03 +0000 (12:52 +0100)
committerPhilipp Reisner <philipp.reisner@linbit.com>
Thu, 10 Jul 2014 16:35:12 +0000 (18:35 +0200)
Adding requests to per-device fifo lists as soon as possible after
allocating them leaves a simple list_first_entry_or_null() to find the
oldest request, regardless what it is still waiting for.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
drivers/block/drbd/drbd_int.h
drivers/block/drbd/drbd_main.c
drivers/block/drbd/drbd_req.c

index 08fa2dc8cdba7d704e6d2321a1c6c6e98c056ca1..f29f107be9b8f645572640443a1bff55f8249423 100644 (file)
@@ -318,6 +318,10 @@ struct drbd_request {
        struct list_head tl_requests; /* ring list in the transfer log */
        struct bio *master_bio;       /* master bio pointer */
 
+       /* see struct drbd_device */
+       struct list_head req_pending_master_completion;
+       struct list_head req_pending_local;
+
        /* for generic IO accounting */
        unsigned long start_jif;
 
@@ -738,7 +742,7 @@ struct submit_worker {
        struct workqueue_struct *wq;
        struct work_struct worker;
 
-       spinlock_t lock;
+       /* protected by ..->resource->req_lock */
        struct list_head writes;
 };
 
@@ -795,6 +799,11 @@ struct drbd_device {
        struct rb_root read_requests;
        struct rb_root write_requests;
 
+       /* for statistics and timeouts */
+       /* [0] read, [1] write */
+       struct list_head pending_master_completion[2];
+       struct list_head pending_completion[2];
+
        /* use checksums for *this* resync */
        bool use_csums;
        /* blocks to resync in this run [unit BM_BLOCK_SIZE] */
index 0baec7a3fa81e2ba5693853140d5cf087c3200e2..58865969c9f475994a8ed8b25354fe44b31c0040 100644 (file)
@@ -1934,6 +1934,10 @@ void drbd_init_set_defaults(struct drbd_device *device)
        INIT_LIST_HEAD(&device->resync_work.list);
        INIT_LIST_HEAD(&device->unplug_work.list);
        INIT_LIST_HEAD(&device->bm_io_work.w.list);
+       INIT_LIST_HEAD(&device->pending_master_completion[0]);
+       INIT_LIST_HEAD(&device->pending_master_completion[1]);
+       INIT_LIST_HEAD(&device->pending_completion[0]);
+       INIT_LIST_HEAD(&device->pending_completion[1]);
 
        device->resync_work.cb  = w_resync_timer;
        device->unplug_work.cb  = w_send_write_hint;
@@ -2268,6 +2272,8 @@ static void do_retry(struct work_struct *ws)
        }
 }
 
+/* called via drbd_req_put_completion_ref(),
+ * holds resource->req_lock */
 void drbd_restart_request(struct drbd_request *req)
 {
        unsigned long flags;
@@ -2687,7 +2693,6 @@ static int init_submitter(struct drbd_device *device)
                return -ENOMEM;
 
        INIT_WORK(&device->submit.worker, do_submit);
-       spin_lock_init(&device->submit.lock);
        INIT_LIST_HEAD(&device->submit.writes);
        return 0;
 }
index 1319beab1b370f74779db985db8577d932d2b891..23cd909dc7f10ac16f24f5c7b113edadc36fd048 100644 (file)
@@ -84,6 +84,8 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device,
 
        INIT_LIST_HEAD(&req->tl_requests);
        INIT_LIST_HEAD(&req->w.list);
+       INIT_LIST_HEAD(&req->req_pending_master_completion);
+       INIT_LIST_HEAD(&req->req_pending_local);
 
        /* one reference to be put by __drbd_make_request */
        atomic_set(&req->completion_ref, 1);
@@ -120,12 +122,14 @@ void drbd_req_destroy(struct kref *kref)
                return;
        }
 
-       /* remove it from the transfer log.
-        * well, only if it had been there in the first
-        * place... if it had not (local only or conflicting
-        * and never sent), it should still be "empty" as
-        * initialized in drbd_req_new(), so we can list_del() it
-        * here unconditionally */
+       /* If called from mod_rq_state (expected normal case) or
+        * drbd_send_and_submit (the less likely normal path), this holds the
+        * req_lock, and req->tl_requests will typicaly be on ->transfer_log,
+        * though it may be still empty (never added to the transfer log).
+        *
+        * If called from do_retry(), we do NOT hold the req_lock, but we are
+        * still allowed to unconditionally list_del(&req->tl_requests),
+        * because it will be on a local on-stack list only. */
        list_del_init(&req->tl_requests);
 
        /* finally remove the request from the conflict detection
@@ -312,8 +316,15 @@ void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
 
        if (req->i.waiting)
                wake_up(&device->misc_wait);
+
+       /* Either we are about to complete to upper layers,
+        * or we will restart this request.
+        * In either case, the request object will be destroyed soon,
+        * so better remove it from all lists. */
+       list_del_init(&req->req_pending_master_completion);
 }
 
+/* still holds resource->req_lock */
 static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
 {
        struct drbd_device *device = req->device;
@@ -400,6 +411,7 @@ static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
                        ++k_put;
                else
                        ++c_put;
+               list_del_init(&req->req_pending_local);
        }
 
        if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
@@ -1070,9 +1082,11 @@ drbd_submit_req_private_bio(struct drbd_request *req)
 
 static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
 {
-       spin_lock(&device->submit.lock);
+       spin_lock_irq(&device->resource->req_lock);
        list_add_tail(&req->tl_requests, &device->submit.writes);
-       spin_unlock(&device->submit.lock);
+       list_add_tail(&req->req_pending_master_completion,
+                       &device->pending_master_completion[1 /* WRITE */]);
+       spin_unlock_irq(&device->resource->req_lock);
        queue_work(device->submit.wq, &device->submit.worker);
 }
 
@@ -1186,8 +1200,15 @@ static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request
                        no_remote = true;
        }
 
+       /* If it took the fast path in drbd_request_prepare, add it here.
+        * The slow path has added it already. */
+       if (list_empty(&req->req_pending_master_completion))
+               list_add_tail(&req->req_pending_master_completion,
+                       &device->pending_master_completion[rw == WRITE]);
        if (req->private_bio) {
                /* needs to be marked within the same spinlock */
+               list_add_tail(&req->req_pending_local,
+                       &device->pending_completion[rw == WRITE]);
                _req_mod(req, TO_BE_SUBMITTED);
                /* but we need to give up the spinlock to submit */
                submit_private_bio = true;
@@ -1278,9 +1299,9 @@ void do_submit(struct work_struct *ws)
        struct drbd_request *req, *tmp;
 
        for (;;) {
-               spin_lock(&device->submit.lock);
+               spin_lock_irq(&device->resource->req_lock);
                list_splice_tail_init(&device->submit.writes, &incoming);
-               spin_unlock(&device->submit.lock);
+               spin_unlock_irq(&device->resource->req_lock);
 
                submit_fast_path(device, &incoming);
                if (list_empty(&incoming))
@@ -1304,9 +1325,9 @@ skip_fast_path:
                        if (list_empty(&device->submit.writes))
                                break;
 
-                       spin_lock(&device->submit.lock);
+                       spin_lock_irq(&device->resource->req_lock);
                        list_splice_tail_init(&device->submit.writes, &more_incoming);
-                       spin_unlock(&device->submit.lock);
+                       spin_unlock_irq(&device->resource->req_lock);
 
                        if (list_empty(&more_incoming))
                                break;