rbd: introduce a per-device ordered workqueue
authorIlya Dryomov <idryomov@gmail.com>
Fri, 12 Aug 2016 13:45:52 +0000 (15:45 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 24 Aug 2016 21:49:16 +0000 (23:49 +0200)
This is going to be used for re-registering watch requests and
exclusive-lock tasks: acquire/request lock, notify-acquired, release
lock, notify-released.  Some refactoring in the map/unmap paths was
necessary to give this workqueue a meaningful name: "rbdX-tasks".

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Reviewed-by: Mike Christie <mchristi@redhat.com>
drivers/block/rbd.c

index e0585e9040f10fbdbff32840fc3d564e7c8814fa..1c805eea676750d4dbe68ec21786b923c1fdeca0 100644 (file)
@@ -128,11 +128,8 @@ static int atomic_dec_return_safe(atomic_t *v)
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
- * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
- * enough to hold all possible device names.
  */
 #define DEV_NAME_LEN           32
-#define MAX_INT_FORMAT_WIDTH   ((5 * sizeof (int)) / 2 + 1)
 
 /*
  * block device image metadata (in-memory version)
@@ -353,10 +350,12 @@ struct rbd_device {
        struct ceph_object_id   header_oid;
        struct ceph_object_locator header_oloc;
 
-       struct ceph_file_layout layout;
+       struct ceph_file_layout layout;         /* used for all rbd requests */
 
        struct ceph_osd_linger_request *watch_handle;
 
+       struct workqueue_struct *task_wq;
+
        struct rbd_spec         *parent_spec;
        u64                     parent_overlap;
        atomic_t                parent_ref;
@@ -3944,11 +3943,8 @@ static void rbd_spec_free(struct kref *kref)
        kfree(spec);
 }
 
-static void rbd_dev_release(struct device *dev)
+static void rbd_dev_free(struct rbd_device *rbd_dev)
 {
-       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-       bool need_put = !!rbd_dev->opts;
-
        ceph_oid_destroy(&rbd_dev->header_oid);
        ceph_oloc_destroy(&rbd_dev->header_oloc);
 
@@ -3956,6 +3952,19 @@ static void rbd_dev_release(struct device *dev)
        rbd_spec_put(rbd_dev->spec);
        kfree(rbd_dev->opts);
        kfree(rbd_dev);
+}
+
+static void rbd_dev_release(struct device *dev)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+       bool need_put = !!rbd_dev->opts;
+
+       if (need_put) {
+               destroy_workqueue(rbd_dev->task_wq);
+               ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
+       }
+
+       rbd_dev_free(rbd_dev);
 
        /*
         * This is racy, but way better than putting module outside of
@@ -3966,19 +3975,16 @@ static void rbd_dev_release(struct device *dev)
                module_put(THIS_MODULE);
 }
 
-static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
-                                        struct rbd_spec *spec,
-                                        struct rbd_options *opts)
+static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
+                                          struct rbd_spec *spec)
 {
        struct rbd_device *rbd_dev;
 
-       rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
+       rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
        if (!rbd_dev)
                return NULL;
 
        spin_lock_init(&rbd_dev->lock);
-       rbd_dev->flags = 0;
-       atomic_set(&rbd_dev->parent_ref, 0);
        INIT_LIST_HEAD(&rbd_dev->node);
        init_rwsem(&rbd_dev->header_rwsem);
 
@@ -3992,9 +3998,6 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
 
        rbd_dev->rbd_client = rbdc;
        rbd_dev->spec = spec;
-       rbd_dev->opts = opts;
-
-       /* Initialize the layout used for all rbd requests */
 
        rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
        rbd_dev->layout.stripe_count = 1;
@@ -4002,15 +4005,48 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        rbd_dev->layout.pool_id = spec->pool_id;
        RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
 
-       /*
-        * If this is a mapping rbd_dev (as opposed to a parent one),
-        * pin our module.  We have a ref from do_rbd_add(), so use
-        * __module_get().
-        */
-       if (rbd_dev->opts)
-               __module_get(THIS_MODULE);
+       return rbd_dev;
+}
+
+/*
+ * Create a mapping rbd_dev.
+ */
+static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+                                        struct rbd_spec *spec,
+                                        struct rbd_options *opts)
+{
+       struct rbd_device *rbd_dev;
+
+       rbd_dev = __rbd_dev_create(rbdc, spec);
+       if (!rbd_dev)
+               return NULL;
+
+       rbd_dev->opts = opts;
+
+       /* get an id and fill in device name */
+       rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
+                                        minor_to_rbd_dev_id(1 << MINORBITS),
+                                        GFP_KERNEL);
+       if (rbd_dev->dev_id < 0)
+               goto fail_rbd_dev;
+
+       sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
+       rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
+                                                  rbd_dev->name);
+       if (!rbd_dev->task_wq)
+               goto fail_dev_id;
 
+       /* we have a ref from do_rbd_add() */
+       __module_get(THIS_MODULE);
+
+       dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
        return rbd_dev;
+
+fail_dev_id:
+       ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
+fail_rbd_dev:
+       rbd_dev_free(rbd_dev);
+       return NULL;
 }
 
 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
@@ -4645,46 +4681,6 @@ static int rbd_dev_header_info(struct rbd_device *rbd_dev)
        return rbd_dev_v2_header_info(rbd_dev);
 }
 
-/*
- * Get a unique rbd identifier for the given new rbd_dev, and add
- * the rbd_dev to the global list.
- */
-static int rbd_dev_id_get(struct rbd_device *rbd_dev)
-{
-       int new_dev_id;
-
-       new_dev_id = ida_simple_get(&rbd_dev_id_ida,
-                                   0, minor_to_rbd_dev_id(1 << MINORBITS),
-                                   GFP_KERNEL);
-       if (new_dev_id < 0)
-               return new_dev_id;
-
-       rbd_dev->dev_id = new_dev_id;
-
-       spin_lock(&rbd_dev_list_lock);
-       list_add_tail(&rbd_dev->node, &rbd_dev_list);
-       spin_unlock(&rbd_dev_list_lock);
-
-       dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
-
-       return 0;
-}
-
-/*
- * Remove an rbd_dev from the global list, and record that its
- * identifier is no longer in use.
- */
-static void rbd_dev_id_put(struct rbd_device *rbd_dev)
-{
-       spin_lock(&rbd_dev_list_lock);
-       list_del_init(&rbd_dev->node);
-       spin_unlock(&rbd_dev_list_lock);
-
-       ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
-
-       dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
-}
-
 /*
  * Skips over white space at *buf, and updates *buf to point to the
  * first found non-space character (if any). Returns the length of
@@ -5077,8 +5073,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
                goto out_err;
        }
 
-       parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec,
-                               NULL);
+       parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
        if (!parent) {
                ret = -ENOMEM;
                goto out_err;
@@ -5113,22 +5108,12 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
 {
        int ret;
 
-       /* Get an id and fill in device name. */
-
-       ret = rbd_dev_id_get(rbd_dev);
-       if (ret)
-               goto err_out_unlock;
-
-       BUILD_BUG_ON(DEV_NAME_LEN
-                       < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
-       sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
-
        /* Record our major and minor device numbers. */
 
        if (!single_major) {
                ret = register_blkdev(0, rbd_dev->name);
                if (ret < 0)
-                       goto err_out_id;
+                       goto err_out_unlock;
 
                rbd_dev->major = ret;
                rbd_dev->minor = 0;
@@ -5160,6 +5145,10 @@ static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
        set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        up_write(&rbd_dev->header_rwsem);
 
+       spin_lock(&rbd_dev_list_lock);
+       list_add_tail(&rbd_dev->node, &rbd_dev_list);
+       spin_unlock(&rbd_dev_list_lock);
+
        add_disk(rbd_dev->disk);
        pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
                (unsigned long long) rbd_dev->mapping.size);
@@ -5173,8 +5162,6 @@ err_out_disk:
 err_out_blkdev:
        if (!single_major)
                unregister_blkdev(rbd_dev->major, rbd_dev->name);
-err_out_id:
-       rbd_dev_id_put(rbd_dev);
 err_out_unlock:
        up_write(&rbd_dev->header_rwsem);
        return ret;
@@ -5406,12 +5393,16 @@ static ssize_t rbd_add_single_major(struct bus_type *bus,
 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
 {
        rbd_free_disk(rbd_dev);
+
+       spin_lock(&rbd_dev_list_lock);
+       list_del_init(&rbd_dev->node);
+       spin_unlock(&rbd_dev_list_lock);
+
        clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
        device_del(&rbd_dev->dev);
        rbd_dev_mapping_clear(rbd_dev);
        if (!single_major)
                unregister_blkdev(rbd_dev->major, rbd_dev->name);
-       rbd_dev_id_put(rbd_dev);
 }
 
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)