ceph: handle epoch barriers in cap messages
authorJeff Layton <jlayton@redhat.com>
Thu, 13 Apr 2017 15:07:04 +0000 (11:07 -0400)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 4 May 2017 07:19:21 +0000 (09:19 +0200)
Have the client store and update the osdc epoch_barrier when a cap
message comes in with one.

When sending cap messages, send the epoch barrier as well. This allows
clients to inform servers that their released caps may not be used until
a particular OSD map epoch.

Signed-off-by: Jeff Layton <jlayton@redhat.com>
Reviewed-by: "Yan, Zhengā€¯ <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/caps.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h

index 60185434162afc86e7ea1c8d67f476b1516e1d2c..a3ebb632294e7b90a315508c9b75671d2ce8ff6a 100644 (file)
@@ -1015,6 +1015,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
        void *p;
        size_t extra_len;
        struct timespec zerotime = {0};
+       struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;
 
        dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
             " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
@@ -1076,8 +1077,12 @@ static int send_cap_msg(struct cap_msg_args *arg)
        ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
        /* inline data size */
        ceph_encode_32(&p, 0);
-       /* osd_epoch_barrier (version 5) */
-       ceph_encode_32(&p, 0);
+       /*
+        * osd_epoch_barrier (version 5)
+        * The epoch_barrier is protected osdc->lock, so READ_ONCE here in
+        * case it was recently changed
+        */
+       ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier));
        /* oldest_flush_tid (version 6) */
        ceph_encode_64(&p, arg->oldest_flush_tid);
 
@@ -3633,13 +3638,19 @@ void ceph_handle_caps(struct ceph_mds_session *session,
                p += inline_len;
        }
 
+       if (le16_to_cpu(msg->hdr.version) >= 5) {
+               struct ceph_osd_client  *osdc = &mdsc->fsc->client->osdc;
+               u32                     epoch_barrier;
+
+               ceph_decode_32_safe(&p, end, epoch_barrier, bad);
+               ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
+       }
+
        if (le16_to_cpu(msg->hdr.version) >= 8) {
                u64 flush_tid;
                u32 caller_uid, caller_gid;
-               u32 osd_epoch_barrier;
                u32 pool_ns_len;
-               /* version >= 5 */
-               ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
+
                /* version >= 6 */
                ceph_decode_64_safe(&p, end, flush_tid, bad);
                /* version >= 7 */
index 8cc4d4e8b0773f8c09e75df3f5eea34360e5df20..f7bfc22eb39c6577cbf97a960f4cb033bc518557 100644 (file)
@@ -1552,9 +1552,15 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
        struct ceph_msg *msg = NULL;
        struct ceph_mds_cap_release *head;
        struct ceph_mds_cap_item *item;
+       struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
        struct ceph_cap *cap;
        LIST_HEAD(tmp_list);
        int num_cap_releases;
+       __le32  barrier, *cap_barrier;
+
+       down_read(&osdc->lock);
+       barrier = cpu_to_le32(osdc->epoch_barrier);
+       up_read(&osdc->lock);
 
        spin_lock(&session->s_cap_lock);
 again:
@@ -1572,7 +1578,11 @@ again:
                        head = msg->front.iov_base;
                        head->num = cpu_to_le32(0);
                        msg->front.iov_len = sizeof(*head);
+
+                       msg->hdr.version = cpu_to_le16(2);
+                       msg->hdr.compat_version = cpu_to_le16(1);
                }
+
                cap = list_first_entry(&tmp_list, struct ceph_cap,
                                        session_caps);
                list_del(&cap->session_caps);
@@ -1590,6 +1600,11 @@ again:
                ceph_put_cap(mdsc, cap);
 
                if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+                       // Append cap_barrier field
+                       cap_barrier = msg->front.iov_base + msg->front.iov_len;
+                       *cap_barrier = barrier;
+                       msg->front.iov_len += sizeof(*cap_barrier);
+
                        msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
                        dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
                        ceph_con_send(&session->s_con, msg);
@@ -1605,6 +1620,11 @@ again:
        spin_unlock(&session->s_cap_lock);
 
        if (msg) {
+               // Append cap_barrier field
+               cap_barrier = msg->front.iov_base + msg->front.iov_len;
+               *cap_barrier = barrier;
+               msg->front.iov_len += sizeof(*cap_barrier);
+
                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
                dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
                ceph_con_send(&session->s_con, msg);
index 3e67dd2169fa12eeb5d77cdd8eea8a109dfdebec..db57ae98ed345e280358a2ac65e75ac6c71e7c2b 100644 (file)
@@ -106,10 +106,13 @@ struct ceph_mds_reply_info_parsed {
 
 /*
  * cap releases are batched and sent to the MDS en masse.
+ *
+ * Account for per-message overhead of mds_cap_release header
+ * and __le32 for osd epoch barrier trailing field.
  */
-#define CEPH_CAPS_PER_RELEASE ((PAGE_SIZE -                    \
+#define CEPH_CAPS_PER_RELEASE ((PAGE_SIZE - sizeof(u32) -              \
                                sizeof(struct ceph_mds_cap_release)) /  \
-                              sizeof(struct ceph_mds_cap_item))
+                               sizeof(struct ceph_mds_cap_item))
 
 
 /*