rbd: use watch/notify for changes in rbd header
authorYehuda Sadeh <yehuda@hq.newdream.net>
Mon, 21 Mar 2011 22:10:11 +0000 (15:10 -0700)
committerSage Weil <sage@newdream.net>
Tue, 22 Mar 2011 18:33:56 +0000 (11:33 -0700)
Send notifications when we change the rbd header (e.g. create a snapshot)
and wait for such notifications.  This allows synchronizing the snapshot
creation between different rbd clients/rools.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
drivers/block/rbd.c

index e1e38b11f48ae3b60f15a27f0242b4b9c100dde5..16dc3645291cd7bbb3eab203b83dde68939b61d7 100644 (file)
@@ -31,6 +31,7 @@
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/decode.h>
+#include <linux/parser.h>
 
 #include <linux/kernel.h>
 #include <linux/device.h>
@@ -54,6 +55,8 @@
 
 #define DEV_NAME_LEN           32
 
+#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
+
 /*
  * block device image metadata (in-memory version)
  */
@@ -71,6 +74,12 @@ struct rbd_image_header {
 
        char *snap_names;
        u64 *snap_sizes;
+
+       u64 obj_version;
+};
+
+struct rbd_options {
+       int     notify_timeout;
 };
 
 /*
@@ -78,6 +87,7 @@ struct rbd_image_header {
  */
 struct rbd_client {
        struct ceph_client      *client;
+       struct rbd_options      *rbd_opts;
        struct kref             kref;
        struct list_head        node;
 };
@@ -124,6 +134,9 @@ struct rbd_device {
        char                    pool_name[RBD_MAX_POOL_NAME_LEN];
        int                     poolid;
 
+       struct ceph_osd_event   *watch_event;
+       struct ceph_osd_request *watch_request;
+
        char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
        u32 cur_snap;   /* index+1 of current snapshot within snap context
                           0 - for the head */
@@ -177,6 +190,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
        put_device(&rbd_dev->dev);
 }
 
+static int __rbd_update_snaps(struct rbd_device *rbd_dev);
+
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
        struct gendisk *disk = bdev->bd_disk;
@@ -211,7 +226,8 @@ static const struct block_device_operations rbd_bd_ops = {
  * Initialize an rbd client instance.
  * We own *opt.
  */
-static struct rbd_client *rbd_client_create(struct ceph_options *opt)
+static struct rbd_client *rbd_client_create(struct ceph_options *opt,
+                                           struct rbd_options *rbd_opts)
 {
        struct rbd_client *rbdc;
        int ret = -ENOMEM;
@@ -233,6 +249,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt)
        if (ret < 0)
                goto out_err;
 
+       rbdc->rbd_opts = rbd_opts;
+
        spin_lock(&node_lock);
        list_add_tail(&rbdc->node, &rbd_client_list);
        spin_unlock(&node_lock);
@@ -266,6 +284,59 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
        return NULL;
 }
 
+/*
+ * mount options
+ */
+enum {
+       Opt_notify_timeout,
+       Opt_last_int,
+       /* int args above */
+       Opt_last_string,
+       /* string args above */
+};
+
+static match_table_t rbdopt_tokens = {
+       {Opt_notify_timeout, "notify_timeout=%d"},
+       /* int args above */
+       /* string args above */
+       {-1, NULL}
+};
+
+static int parse_rbd_opts_token(char *c, void *private)
+{
+       struct rbd_options *rbdopt = private;
+       substring_t argstr[MAX_OPT_ARGS];
+       int token, intval, ret;
+
+       token = match_token((char *)c, rbdopt_tokens, argstr);
+       if (token < 0)
+               return -EINVAL;
+
+       if (token < Opt_last_int) {
+               ret = match_int(&argstr[0], &intval);
+               if (ret < 0) {
+                       pr_err("bad mount option arg (not int) "
+                              "at '%s'\n", c);
+                       return ret;
+               }
+               dout("got int token %d val %d\n", token, intval);
+       } else if (token > Opt_last_int && token < Opt_last_string) {
+               dout("got string token %d val %s\n", token,
+                    argstr[0].from);
+       } else {
+               dout("got token %d\n", token);
+       }
+
+       switch (token) {
+       case Opt_notify_timeout:
+               rbdopt->notify_timeout = intval;
+               break;
+       default:
+               BUG_ON(token);
+       }
+       return 0;
+}
+
 /*
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
@@ -276,11 +347,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
        struct rbd_client *rbdc;
        struct ceph_options *opt;
        int ret;
+       struct rbd_options *rbd_opts;
+
+       rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
+       if (!rbd_opts)
+               return -ENOMEM;
+
+       rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 
        ret = ceph_parse_options(&opt, options, mon_addr,
-                                mon_addr + strlen(mon_addr), NULL, NULL);
+                                mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
        if (ret < 0)
-               return ret;
+               goto done_err;
 
        spin_lock(&node_lock);
        rbdc = __rbd_client_find(opt);
@@ -296,13 +374,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
        }
        spin_unlock(&node_lock);
 
-       rbdc = rbd_client_create(opt);
-       if (IS_ERR(rbdc))
-               return PTR_ERR(rbdc);
+       rbdc = rbd_client_create(opt, rbd_opts);
+       if (IS_ERR(rbdc)) {
+               ret = PTR_ERR(rbdc);
+               goto done_err;
+       }
 
        rbd_dev->rbd_client = rbdc;
        rbd_dev->client = rbdc->client;
        return 0;
+done_err:
+       kfree(rbd_opts);
+       return ret;
 }
 
 /*
@@ -318,6 +401,7 @@ static void rbd_client_release(struct kref *kref)
        spin_unlock(&node_lock);
 
        ceph_destroy_client(rbdc->client);
+       kfree(rbdc->rbd_opts);
        kfree(rbdc);
 }
 
@@ -666,7 +750,9 @@ static int rbd_do_request(struct request *rq,
                          struct ceph_osd_req_op *ops,
                          int num_reply,
                          void (*rbd_cb)(struct ceph_osd_request *req,
-                                        struct ceph_msg *msg))
+                                        struct ceph_msg *msg),
+                         struct ceph_osd_request **linger_req,
+                         u64 *ver)
 {
        struct ceph_osd_request *req;
        struct ceph_file_layout *layout;
@@ -729,12 +815,20 @@ static int rbd_do_request(struct request *rq,
                                req->r_oid, req->r_oid_len);
        up_read(&header->snap_rwsem);
 
+       if (linger_req) {
+               ceph_osdc_set_request_linger(&dev->client->osdc, req);
+               *linger_req = req;
+       }
+
        ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
        if (ret < 0)
                goto done_err;
 
        if (!rbd_cb) {
                ret = ceph_osdc_wait_request(&dev->client->osdc, req);
+               if (ver)
+                       *ver = le64_to_cpu(req->r_reassert_version.version);
+               dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
                ceph_osdc_put_request(req);
        }
        return ret;
@@ -789,6 +883,11 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
        kfree(req_data);
 }
 
+static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+{
+       ceph_osdc_put_request(req);
+}
+
 /*
  * Do a synchronous ceph osd operation
  */
@@ -801,7 +900,9 @@ static int rbd_req_sync_op(struct rbd_device *dev,
                           int num_reply,
                           const char *obj,
                           u64 ofs, u64 len,
-                          char *buf)
+                          char *buf,
+                          struct ceph_osd_request **linger_req,
+                          u64 *ver)
 {
        int ret;
        struct page **pages;
@@ -833,7 +934,8 @@ static int rbd_req_sync_op(struct rbd_device *dev,
                          flags,
                          ops,
                          2,
-                         NULL);
+                         NULL,
+                         linger_req, ver);
        if (ret < 0)
                goto done_ops;
 
@@ -893,7 +995,7 @@ static int rbd_do_op(struct request *rq,
                             flags,
                             ops,
                             num_reply,
-                            rbd_req_cb);
+                            rbd_req_cb, 0, NULL);
 done:
        kfree(seg_name);
        return ret;
@@ -940,18 +1042,174 @@ static int rbd_req_sync_read(struct rbd_device *dev,
                          u64 snapid,
                          const char *obj,
                          u64 ofs, u64 len,
-                         char *buf)
+                         char *buf,
+                         u64 *ver)
 {
        return rbd_req_sync_op(dev, NULL,
                               (snapid ? snapid : CEPH_NOSNAP),
                               CEPH_OSD_OP_READ,
                               CEPH_OSD_FLAG_READ,
                               NULL,
-                              1, obj, ofs, len, buf);
+                              1, obj, ofs, len, buf, NULL, ver);
 }
 
 /*
- * Request sync osd read
+ * Request sync osd watch
+ */
+static int rbd_req_sync_notify_ack(struct rbd_device *dev,
+                                  u64 ver,
+                                  u64 notify_id,
+                                  const char *obj)
+{
+       struct ceph_osd_req_op *ops;
+       struct page **pages = NULL;
+       int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
+       if (ret < 0)
+               return ret;
+
+       ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
+       ops[0].watch.cookie = notify_id;
+       ops[0].watch.flag = 0;
+
+       ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
+                         obj, 0, 0, NULL,
+                         pages, 0,
+                         CEPH_OSD_FLAG_READ,
+                         ops,
+                         1,
+                         rbd_simple_req_cb, 0, NULL);
+
+       rbd_destroy_ops(ops);
+       return ret;
+}
+
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+{
+       struct rbd_device *dev = (struct rbd_device *)data;
+       if (!dev)
+               return;
+
+       dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
+               notify_id, (int)opcode);
+       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+       __rbd_update_snaps(dev);
+       mutex_unlock(&ctl_mutex);
+
+       rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
+}
+
+/*
+ * Request sync osd watch
+ */
+static int rbd_req_sync_watch(struct rbd_device *dev,
+                             const char *obj,
+                             u64 ver)
+{
+       struct ceph_osd_req_op *ops;
+       struct ceph_osd_client *osdc = &dev->client->osdc;
+
+       int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
+       if (ret < 0)
+               return ret;
+
+       ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
+                                    (void *)dev, &dev->watch_event);
+       if (ret < 0)
+               goto fail;
+
+       ops[0].watch.ver = cpu_to_le64(ver);
+       ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
+       ops[0].watch.flag = 1;
+
+       ret = rbd_req_sync_op(dev, NULL,
+                             CEPH_NOSNAP,
+                             0,
+                             CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+                             ops,
+                             1, obj, 0, 0, NULL,
+                             &dev->watch_request, NULL);
+
+       if (ret < 0)
+               goto fail_event;
+
+       rbd_destroy_ops(ops);
+       return 0;
+
+fail_event:
+       ceph_osdc_cancel_event(dev->watch_event);
+       dev->watch_event = NULL;
+fail:
+       rbd_destroy_ops(ops);
+       return ret;
+}
+
+struct rbd_notify_info {
+       struct rbd_device *dev;
+};
+
+static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+{
+       struct rbd_device *dev = (struct rbd_device *)data;
+       if (!dev)
+               return;
+
+       dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
+               notify_id, (int)opcode);
+}
+
+/*
+ * Request sync osd notify
+ */
+static int rbd_req_sync_notify(struct rbd_device *dev,
+                         const char *obj)
+{
+       struct ceph_osd_req_op *ops;
+       struct ceph_osd_client *osdc = &dev->client->osdc;
+       struct ceph_osd_event *event;
+       struct rbd_notify_info info;
+       int payload_len = sizeof(u32) + sizeof(u32);
+       int ret;
+
+       ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
+       if (ret < 0)
+               return ret;
+
+       info.dev = dev;
+
+       ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
+                                    (void *)&info, &event);
+       if (ret < 0)
+               goto fail;
+
+       ops[0].watch.ver = 1;
+       ops[0].watch.flag = 1;
+       ops[0].watch.cookie = event->cookie;
+       ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
+       ops[0].watch.timeout = 12;
+
+       ret = rbd_req_sync_op(dev, NULL,
+                              CEPH_NOSNAP,
+                              0,
+                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+                              ops,
+                              1, obj, 0, 0, NULL, NULL, NULL);
+       if (ret < 0)
+               goto fail_event;
+
+       ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
+       dout("ceph_osdc_wait_event returned %d\n", ret);
+       rbd_destroy_ops(ops);
+       return 0;
+
+fail_event:
+       ceph_osdc_cancel_event(event);
+fail:
+       rbd_destroy_ops(ops);
+       return ret;
+}
+
+/*
+ * Request sync osd rollback
  */
 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
                                     u64 snapid,
@@ -969,13 +1227,10 @@ static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
                               0,
                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
                               ops,
-                              1, obj, 0, 0, NULL);
+                              1, obj, 0, 0, NULL, NULL, NULL);
 
        rbd_destroy_ops(ops);
 
-       if (ret < 0)
-               return ret;
-
        return ret;
 }
 
@@ -987,7 +1242,8 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
                             const char *cls,
                             const char *method,
                             const char *data,
-                            int len)
+                            int len,
+                            u64 *ver)
 {
        struct ceph_osd_req_op *ops;
        int cls_len = strlen(cls);
@@ -1010,7 +1266,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
                               0,
                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
                               ops,
-                              1, obj, 0, 0, NULL);
+                              1, obj, 0, 0, NULL, NULL, ver);
 
        rbd_destroy_ops(ops);
 
@@ -1156,6 +1412,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
        struct rbd_image_header_ondisk *dh;
        int snap_count = 0;
        u64 snap_names_len = 0;
+       u64 ver;
 
        while (1) {
                int len = sizeof(*dh) +
@@ -1171,7 +1428,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
                                       NULL, CEPH_NOSNAP,
                                       rbd_dev->obj_md_name,
                                       0, len,
-                                      (char *)dh);
+                                      (char *)dh, &ver);
                if (rc < 0)
                        goto out_dh;
 
@@ -1188,6 +1445,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
                }
                break;
        }
+       header->obj_version = ver;
 
 out_dh:
        kfree(dh);
@@ -1205,6 +1463,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        u64 new_snapid;
        int ret;
        void *data, *data_start, *data_end;
+       u64 ver;
 
        /* we should create a snapshot only if we're pointing at the head */
        if (dev->cur_snap)
@@ -1227,7 +1486,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        ceph_encode_64_safe(&data, data_end, new_snapid, bad);
 
        ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
-                               data_start, data - data_start);
+                               data_start, data - data_start, &ver);
 
        kfree(data_start);
 
@@ -1259,6 +1518,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
        int ret;
        struct rbd_image_header h;
        u64 snap_seq;
+       int follow_seq = 0;
 
        ret = rbd_read_header(rbd_dev, &h);
        if (ret < 0)
@@ -1267,6 +1527,11 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
        down_write(&rbd_dev->header.snap_rwsem);
 
        snap_seq = rbd_dev->header.snapc->seq;
+       if (rbd_dev->header.total_snaps &&
+           rbd_dev->header.snapc->snaps[0] == snap_seq)
+               /* pointing at the head, will need to follow that
+                  if head moves */
+               follow_seq = 1;
 
        kfree(rbd_dev->header.snapc);
        kfree(rbd_dev->header.snap_names);
@@ -1277,7 +1542,10 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
        rbd_dev->header.snap_names = h.snap_names;
        rbd_dev->header.snap_names_len = h.snap_names_len;
        rbd_dev->header.snap_sizes = h.snap_sizes;
-       rbd_dev->header.snapc->seq = snap_seq;
+       if (follow_seq)
+               rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
+       else
+               rbd_dev->header.snapc->seq = snap_seq;
 
        ret = __rbd_init_snaps_header(rbd_dev);
 
@@ -1699,7 +1967,28 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
        device_unregister(&rbd_dev->dev);
 }
 
-static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
+static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
+{
+       int ret, rc;
+
+       do {
+               ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
+                                        rbd_dev->header.obj_version);
+               if (ret == -ERANGE) {
+                       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+                       rc = __rbd_update_snaps(rbd_dev);
+                       mutex_unlock(&ctl_mutex);
+                       if (rc < 0)
+                               return rc;
+               }
+       } while (ret == -ERANGE);
+
+       return ret;
+}
+
+static ssize_t rbd_add(struct bus_type *bus,
+                      const char *buf,
+                      size_t count)
 {
        struct ceph_osd_client *osdc;
        struct rbd_device *rbd_dev;
@@ -1797,6 +2086,10 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
        if (rc)
                goto err_out_bus;
 
+       rc = rbd_init_watch_dev(rbd_dev);
+       if (rc)
+               goto err_out_bus;
+
        return count;
 
 err_out_bus:
@@ -1849,6 +2142,12 @@ static void rbd_dev_release(struct device *dev)
        struct rbd_device *rbd_dev =
                        container_of(dev, struct rbd_device, dev);
 
+       if (rbd_dev->watch_request)
+               ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
+                                                   rbd_dev->watch_request);
+       if (rbd_dev->watch_event)
+               ceph_osdc_cancel_event(rbd_dev->watch_event);
+
        rbd_put_client(rbd_dev);
 
        /* clean up and free blkdev */
@@ -1914,14 +2213,24 @@ static ssize_t rbd_snap_add(struct device *dev,
        ret = rbd_header_add_snap(rbd_dev,
                                  name, GFP_KERNEL);
        if (ret < 0)
-               goto done_unlock;
+               goto err_unlock;
 
        ret = __rbd_update_snaps(rbd_dev);
        if (ret < 0)
-               goto done_unlock;
+               goto err_unlock;
+
+       /* shouldn't hold ctl_mutex when notifying.. notify might
+          trigger a watch callback that would need to get that mutex */
+       mutex_unlock(&ctl_mutex);
+
+       /* make a best effort, don't error if failed */
+       rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
 
        ret = count;
-done_unlock:
+       kfree(name);
+       return ret;
+
+err_unlock:
        mutex_unlock(&ctl_mutex);
        kfree(name);
        return ret;