ceph: use ceph_pagelist for mds reconnect message; change encoding (protocol change)
authorSage Weil <sage@newdream.net>
Wed, 23 Dec 2009 20:21:51 +0000 (12:21 -0800)
committerSage Weil <sage@newdream.net>
Wed, 23 Dec 2009 20:21:51 +0000 (12:21 -0800)
Use the ceph_pagelist to encode the MDS reconnect message.  We change the
message encoding (protocol change!) at the same time to make our life
easier (we don't know how many snaprealms we have when we start encoding).

An empty message implies the session is closed/does not exist.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/ceph_fs.h
fs/ceph/mds_client.c

index db3fed33c4aac3e9892b187b6aa5fcc846eccb7e..d0f2557bb41b6f0a10863d0d9f6094c58dc4a3aa 100644 (file)
@@ -39,7 +39,7 @@
 #define CEPH_MDS_PROTOCOL     9 /* cluster internal */
 #define CEPH_MON_PROTOCOL     5 /* cluster internal */
 #define CEPH_OSDC_PROTOCOL   22 /* server/client */
-#define CEPH_MDSC_PROTOCOL   30 /* server/client */
+#define CEPH_MDSC_PROTOCOL   31 /* server/client */
 #define CEPH_MONC_PROTOCOL   15 /* server/client */
 
 
index ec884e2845dbc21d50b55bf297a8a13857ceaa10..6e08f488a30f0d8535dcb43779855531e82bef38 100644 (file)
@@ -9,6 +9,7 @@
 #include "messenger.h"
 #include "decode.h"
 #include "auth.h"
+#include "pagelist.h"
 
 /*
  * A cluster of MDS (metadata server) daemons is responsible for
@@ -1971,20 +1972,12 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
 /*
  * Encode information about a cap for a reconnect with the MDS.
  */
-struct encode_caps_data {
-       void **pp;
-       void *end;
-       int *num_caps;
-};
-
 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
                          void *arg)
 {
-       struct ceph_mds_cap_reconnect *rec;
+       struct ceph_mds_cap_reconnect rec;
        struct ceph_inode_info *ci;
-       struct encode_caps_data *data = (struct encode_caps_data *)arg;
-       void *p = *(data->pp);
-       void *end = data->end;
+       struct ceph_pagelist *pagelist = arg;
        char *path;
        int pathlen, err;
        u64 pathbase;
@@ -1995,8 +1988,9 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
             inode, ceph_vinop(inode), cap, cap->cap_id,
             ceph_cap_string(cap->issued));
-       ceph_decode_need(&p, end, sizeof(u64), needmore);
-       ceph_encode_64(&p, ceph_ino(inode));
+       err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
+       if (err)
+               return err;
 
        dentry = d_find_alias(inode);
        if (dentry) {
@@ -2009,33 +2003,29 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
                path = NULL;
                pathlen = 0;
        }
-       ceph_decode_need(&p, end, pathlen+4, needmore);
-       ceph_encode_string(&p, end, path, pathlen);
+       err = ceph_pagelist_encode_string(pagelist, path, pathlen);
+       if (err)
+               goto out;
 
-       ceph_decode_need(&p, end, sizeof(*rec), needmore);
-       rec = p;
-       p += sizeof(*rec);
-       BUG_ON(p > end);
        spin_lock(&inode->i_lock);
        cap->seq = 0;        /* reset cap seq */
        cap->issue_seq = 0;  /* and issue_seq */
-       rec->cap_id = cpu_to_le64(cap->cap_id);
-       rec->pathbase = cpu_to_le64(pathbase);
-       rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
-       rec->issued = cpu_to_le32(cap->issued);
-       rec->size = cpu_to_le64(inode->i_size);
-       ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
-       ceph_encode_timespec(&rec->atime, &inode->i_atime);
-       rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+       rec.cap_id = cpu_to_le64(cap->cap_id);
+       rec.pathbase = cpu_to_le64(pathbase);
+       rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+       rec.issued = cpu_to_le32(cap->issued);
+       rec.size = cpu_to_le64(inode->i_size);
+       ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
+       ceph_encode_timespec(&rec.atime, &inode->i_atime);
+       rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
        spin_unlock(&inode->i_lock);
 
+       err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
+
+out:
        kfree(path);
        dput(dentry);
-       (*data->num_caps)++;
-       *(data->pp) = p;
-       return 0;
-needmore:
-       return -ENOSPC;
+       return err;
 }
 
 
@@ -2053,19 +2043,26 @@ needmore:
  */
 static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
 {
-       struct ceph_mds_session *session;
+       struct ceph_mds_session *session = NULL;
        struct ceph_msg *reply;
-       int newlen, len = 4 + 1;
-       void *p, *end;
        int err;
-       int num_caps, num_realms = 0;
        int got;
        u64 next_snap_ino = 0;
-       __le32 *pnum_caps, *pnum_realms;
-       struct encode_caps_data iter_args;
+       struct ceph_pagelist *pagelist;
 
        pr_info("reconnect to recovering mds%d\n", mds);
 
+       pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
+       if (!pagelist)
+               goto fail_nopagelist;
+       ceph_pagelist_init(pagelist);
+
+       reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
+       if (IS_ERR(reply)) {
+               err = PTR_ERR(reply);
+               goto fail_nomsg;
+       }
+
        /* find session */
        session = __ceph_lookup_mds_session(mdsc, mds);
        mutex_unlock(&mdsc->mutex);    /* drop lock for duration */
@@ -2081,12 +2078,6 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
 
                /* replay unsafe requests */
                replay_unsafe_requests(mdsc, session);
-
-               /* estimate needed space */
-               len += session->s_nr_caps *
-                       (100+sizeof(struct ceph_mds_cap_reconnect));
-               pr_info("estimating i need %d bytes for %d caps\n",
-                    len, session->s_nr_caps);
        } else {
                dout("no session for mds%d, will send short reconnect\n",
                     mds);
@@ -2094,41 +2085,18 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
 
        down_read(&mdsc->snap_rwsem);
 
-retry:
-       /* build reply */
-       reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
-       if (IS_ERR(reply)) {
-               err = PTR_ERR(reply);
-               pr_err("send_mds_reconnect ENOMEM on %d for mds%d\n",
-                      len, mds);
-               goto out;
-       }
-       p = reply->front.iov_base;
-       end = p + len;
-
-       if (!session) {
-               ceph_encode_8(&p, 1); /* session was closed */
-               ceph_encode_32(&p, 0);
+       if (!session)
                goto send;
-       }
        dout("session %p state %s\n", session,
             session_state_name(session->s_state));
 
        /* traverse this session's caps */
-       ceph_encode_8(&p, 0);
-       pnum_caps = p;
-       ceph_encode_32(&p, session->s_nr_caps);
-       num_caps = 0;
-
-       iter_args.pp = &p;
-       iter_args.end = end;
-       iter_args.num_caps = &num_caps;
-       err = iterate_session_caps(session, encode_caps_cb, &iter_args);
-       if (err == -ENOSPC)
-               goto needmore;
+       err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
+       if (err)
+               goto fail;
+       err = iterate_session_caps(session, encode_caps_cb, pagelist);
        if (err < 0)
                goto out;
-       *pnum_caps = cpu_to_le32(num_caps);
 
        /*
         * snaprealms.  we provide mds with the ino, seq (version), and
@@ -2136,14 +2104,9 @@ retry:
         * it will tell us.
         */
        next_snap_ino = 0;
-       /* save some space for the snaprealm count */
-       pnum_realms = p;
-       ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
-       p += sizeof(*pnum_realms);
-       num_realms = 0;
        while (1) {
                struct ceph_snap_realm *realm;
-               struct ceph_mds_snaprealm_reconnect *sr_rec;
+               struct ceph_mds_snaprealm_reconnect sr_rec;
                got = radix_tree_gang_lookup(&mdsc->snap_realms,
                                             (void **)&realm, next_snap_ino, 1);
                if (!got)
@@ -2151,22 +2114,19 @@ retry:
 
                dout(" adding snap realm %llx seq %lld parent %llx\n",
                     realm->ino, realm->seq, realm->parent_ino);
-               ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
-               sr_rec = p;
-               sr_rec->ino = cpu_to_le64(realm->ino);
-               sr_rec->seq = cpu_to_le64(realm->seq);
-               sr_rec->parent = cpu_to_le64(realm->parent_ino);
-               p += sizeof(*sr_rec);
-               num_realms++;
+               sr_rec.ino = cpu_to_le64(realm->ino);
+               sr_rec.seq = cpu_to_le64(realm->seq);
+               sr_rec.parent = cpu_to_le64(realm->parent_ino);
+               err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
+               if (err)
+                       goto fail;
                next_snap_ino = realm->ino + 1;
        }
-       *pnum_realms = cpu_to_le32(num_realms);
 
 send:
-       reply->front.iov_len = p - reply->front.iov_base;
-       reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
-       dout("final len was %u (guessed %d)\n",
-            (unsigned)reply->front.iov_len, len);
+       reply->pagelist = pagelist;
+       reply->hdr.data_len = cpu_to_le32(pagelist->length);
+       reply->nr_pages = calc_pages_for(0, pagelist->length);
        ceph_con_send(&session->s_con, reply);
 
        if (session) {
@@ -2183,18 +2143,14 @@ out:
        mutex_lock(&mdsc->mutex);
        return;
 
-needmore:
-       /*
-        * we need a larger buffer.  this doesn't very accurately
-        * factor in snap realms, but it's safe.
-        */
-       num_caps += num_realms;
-       newlen = len * ((100 * (session->s_nr_caps+3)) / (num_caps + 1)) / 100;
-       pr_info("i guessed %d, and did %d of %d caps, retrying with %d\n",
-            len, num_caps, session->s_nr_caps, newlen);
-       len = newlen;
+fail:
        ceph_msg_put(reply);
-       goto retry;
+fail_nomsg:
+       ceph_pagelist_release(pagelist);
+       kfree(pagelist);
+fail_nopagelist:
+       pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
+       goto out;
 }