rbd: support updating the lock cookie without releasing the lock
authorIlya Dryomov <idryomov@gmail.com>
Thu, 13 Apr 2017 10:17:38 +0000 (12:17 +0200)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 4 May 2017 07:19:23 +0000 (09:19 +0200)
As we no longer release the lock before potentially raising BLACKLISTED
in rbd_reregister_watch(), the "either locked or blacklisted" assert in
rbd_queue_workfn() needs to go: we can be both locked and blacklisted
at that point now.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Reviewed-by: Jason Dillaman <dillaman@redhat.com>
drivers/block/rbd.c
include/linux/ceph/cls_lock_client.h
net/ceph/cls_lock_client.c

index 5f563db598202f9d02a0ef639fc6d99840c31ae1..063c8f06fb9c97f208f2bc009783a05725d4354f 100644 (file)
@@ -3820,24 +3820,51 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev)
        ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
 }
 
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
+{
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       char cookie[32];
+       int ret;
+
+       WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+
+       format_lock_cookie(rbd_dev, cookie);
+       ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
+                                 &rbd_dev->header_oloc, RBD_LOCK_NAME,
+                                 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
+                                 RBD_LOCK_TAG, cookie);
+       if (ret) {
+               if (ret != -EOPNOTSUPP)
+                       rbd_warn(rbd_dev, "failed to update lock cookie: %d",
+                                ret);
+
+               /*
+                * Lock cookie cannot be updated on older OSDs, so do
+                * a manual release and queue an acquire.
+                */
+               if (rbd_release_lock(rbd_dev))
+                       queue_delayed_work(rbd_dev->task_wq,
+                                          &rbd_dev->lock_dwork, 0);
+       } else {
+               strcpy(rbd_dev->lock_cookie, cookie);
+       }
+}
+
 static void rbd_reregister_watch(struct work_struct *work)
 {
        struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
                                            struct rbd_device, watch_dwork);
-       bool was_lock_owner = false;
-       bool need_to_wake = false;
        int ret;
 
        dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
-       down_write(&rbd_dev->lock_rwsem);
-       if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
-               was_lock_owner = rbd_release_lock(rbd_dev);
-
        mutex_lock(&rbd_dev->watch_mutex);
        if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
                mutex_unlock(&rbd_dev->watch_mutex);
-               goto out;
+               return;
        }
 
        ret = __rbd_register_watch(rbd_dev);
@@ -3845,36 +3872,28 @@ static void rbd_reregister_watch(struct work_struct *work)
                rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
                if (ret == -EBLACKLISTED || ret == -ENOENT) {
                        set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
-                       need_to_wake = true;
+                       wake_requests(rbd_dev, true);
                } else {
                        queue_delayed_work(rbd_dev->task_wq,
                                           &rbd_dev->watch_dwork,
                                           RBD_RETRY_DELAY);
                }
                mutex_unlock(&rbd_dev->watch_mutex);
-               goto out;
+               return;
        }
 
-       need_to_wake = true;
        rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
        rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
        mutex_unlock(&rbd_dev->watch_mutex);
 
+       down_write(&rbd_dev->lock_rwsem);
+       if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
+               rbd_reacquire_lock(rbd_dev);
+       up_write(&rbd_dev->lock_rwsem);
+
        ret = rbd_dev_refresh(rbd_dev);
        if (ret)
                rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
-
-       if (was_lock_owner) {
-               ret = rbd_try_lock(rbd_dev);
-               if (ret)
-                       rbd_warn(rbd_dev, "reregisteration lock failed: %d",
-                                ret);
-       }
-
-out:
-       up_write(&rbd_dev->lock_rwsem);
-       if (need_to_wake)
-               wake_requests(rbd_dev, true);
 }
 
 /*
@@ -4052,9 +4071,6 @@ static void rbd_queue_workfn(struct work_struct *work)
                if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
                    !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
                        rbd_wait_state_locked(rbd_dev);
-
-               WARN_ON((rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) ^
-                       !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
                if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
                        result = -EBLACKLISTED;
                        goto err_unlock;
index 84884d8d4710bc568b1590e2ede3775c863e63a1..0594d3bba774c5a78c0bc7e4fde5fbd741bd1c17 100644 (file)
@@ -37,6 +37,11 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
                        struct ceph_object_locator *oloc,
                        char *lock_name, char *cookie,
                        struct ceph_entity_name *locker);
+int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
+                       struct ceph_object_id *oid,
+                       struct ceph_object_locator *oloc,
+                       char *lock_name, u8 type, char *old_cookie,
+                       char *tag, char *new_cookie);
 
 void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
 
index b9233b9903990bd38721213ce287a8045debd8c7..08ada893f01e6acb61b5f91425539a33d7b2c0d4 100644 (file)
@@ -179,6 +179,57 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc,
 }
 EXPORT_SYMBOL(ceph_cls_break_lock);
 
+int ceph_cls_set_cookie(struct ceph_osd_client *osdc,
+                       struct ceph_object_id *oid,
+                       struct ceph_object_locator *oloc,
+                       char *lock_name, u8 type, char *old_cookie,
+                       char *tag, char *new_cookie)
+{
+       int cookie_op_buf_size;
+       int name_len = strlen(lock_name);
+       int old_cookie_len = strlen(old_cookie);
+       int tag_len = strlen(tag);
+       int new_cookie_len = strlen(new_cookie);
+       void *p, *end;
+       struct page *cookie_op_page;
+       int ret;
+
+       cookie_op_buf_size = name_len + sizeof(__le32) +
+                            old_cookie_len + sizeof(__le32) +
+                            tag_len + sizeof(__le32) +
+                            new_cookie_len + sizeof(__le32) +
+                            sizeof(u8) + CEPH_ENCODING_START_BLK_LEN;
+       if (cookie_op_buf_size > PAGE_SIZE)
+               return -E2BIG;
+
+       cookie_op_page = alloc_page(GFP_NOIO);
+       if (!cookie_op_page)
+               return -ENOMEM;
+
+       p = page_address(cookie_op_page);
+       end = p + cookie_op_buf_size;
+
+       /* encode cls_lock_set_cookie_op struct */
+       ceph_start_encoding(&p, 1, 1,
+                           cookie_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+       ceph_encode_string(&p, end, lock_name, name_len);
+       ceph_encode_8(&p, type);
+       ceph_encode_string(&p, end, old_cookie, old_cookie_len);
+       ceph_encode_string(&p, end, tag, tag_len);
+       ceph_encode_string(&p, end, new_cookie, new_cookie_len);
+
+       dout("%s lock_name %s type %d old_cookie %s tag %s new_cookie %s\n",
+            __func__, lock_name, type, old_cookie, tag, new_cookie);
+       ret = ceph_osdc_call(osdc, oid, oloc, "lock", "set_cookie",
+                            CEPH_OSD_FLAG_WRITE, cookie_op_page,
+                            cookie_op_buf_size, NULL, NULL);
+
+       dout("%s: status %d\n", __func__, ret);
+       __free_page(cookie_op_page);
+       return ret;
+}
+EXPORT_SYMBOL(ceph_cls_set_cookie);
+
 void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
 {
        int i;