libceph: rados pool namespace support
authorYan, Zheng <zyan@redhat.com>
Sun, 14 Feb 2016 03:24:31 +0000 (11:24 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 28 Jul 2016 00:55:37 +0000 (02:55 +0200)
Add pool namesapce pointer to struct ceph_file_layout and struct
ceph_object_locator. Pool namespace is used by when mapping object
to PG, it's also used when composing OSD request.

The namespace pointer in struct ceph_file_layout is RCU protected.
So libceph can read namespace without taking lock.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
[idryomov@gmail.com: ceph_oloc_destroy(), misc minor changes]
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
drivers/block/rbd.c
fs/ceph/inode.c
include/linux/ceph/ceph_fs.h
include/linux/ceph/osdmap.h
net/ceph/debugfs.c
net/ceph/osd_client.c
net/ceph/osdmap.c

index cc272ed46cfdc91aad134afa919b9fed09fa63ca..58fd02d4e534b6bbbf7654c1280a7ed75cc7b4b0 100644 (file)
@@ -3999,6 +3999,7 @@ static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
        rbd_dev->layout.stripe_count = 1;
        rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
        rbd_dev->layout.pool_id = spec->pool_id;
+       RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
 
        /*
         * If this is a mapping rbd_dev (as opposed to a parent one),
index 6c5903e0a976bc187036b665af66cd16d68b1f4b..d035e0ab60294fb12a73e85d7e842a623469e1ff 100644 (file)
@@ -446,6 +446,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        ci->i_symlink = NULL;
 
        memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
+       RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
        ci->i_pool_ns_len = 0;
 
        ci->i_fragtree = RB_ROOT;
@@ -570,6 +571,8 @@ void ceph_destroy_inode(struct inode *inode)
        if (ci->i_xattrs.prealloc_blob)
                ceph_buffer_put(ci->i_xattrs.prealloc_blob);
 
+       ceph_put_string(ci->i_layout.pool_ns);
+
        call_rcu(&inode->i_rcu, ceph_i_callback);
 }
 
index e5a5fb9ca3f5872f3ae3e6f197ecba03cfb08211..08b8fd72261e24e6f7e7926b96fa132357137936 100644 (file)
@@ -53,6 +53,7 @@ struct ceph_file_layout_legacy {
        __le32 fl_pg_pool;      /* namespace, crush ruleset, rep level */
 } __attribute__ ((packed));
 
+struct ceph_string;
 /*
  * ceph_file_layout - describe data layout for a file/inode
  */
@@ -62,6 +63,7 @@ struct ceph_file_layout {
        u32 stripe_count;  /* over this many objects */
        u32 object_size;   /* until objects are this big */
        s64 pool_id;        /* rados pool id */
+       struct ceph_string __rcu *pool_ns; /* rados pool namespace */
 };
 
 extern int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
index 21d7f048959f1baad8dbc8ea6b56244f16b2e968..9a9041784dcff383169a169c39203dd79f383438 100644 (file)
@@ -63,11 +63,13 @@ static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
 
 struct ceph_object_locator {
        s64 pool;
+       struct ceph_string *pool_ns;
 };
 
 static inline void ceph_oloc_init(struct ceph_object_locator *oloc)
 {
        oloc->pool = -1;
+       oloc->pool_ns = NULL;
 }
 
 static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc)
@@ -75,11 +77,9 @@ static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc)
        return oloc->pool == -1;
 }
 
-static inline void ceph_oloc_copy(struct ceph_object_locator *dest,
-                                 const struct ceph_object_locator *src)
-{
-       dest->pool = src->pool;
-}
+void ceph_oloc_copy(struct ceph_object_locator *dest,
+                   const struct ceph_object_locator *src);
+void ceph_oloc_destroy(struct ceph_object_locator *oloc);
 
 /*
  * Maximum supported by kernel client object name length
index e77b04ca7802c046af213abd25650a11bcc6a6a3..c62b2b029a6e6fe66fa3440551d0602fc8c4123e 100644 (file)
@@ -156,8 +156,16 @@ static void dump_target(struct seq_file *s, struct ceph_osd_request_target *t)
        seq_printf(s, "]/%d\t[", t->up.primary);
        for (i = 0; i < t->acting.size; i++)
                seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
-       seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary,
-                  t->target_oid.name_len, t->target_oid.name, t->flags);
+       seq_printf(s, "]/%d\t", t->acting.primary);
+       if (t->target_oloc.pool_ns) {
+               seq_printf(s, "%*pE/%*pE\t0x%x",
+                       (int)t->target_oloc.pool_ns->len,
+                       t->target_oloc.pool_ns->str,
+                       t->target_oid.name_len, t->target_oid.name, t->flags);
+       } else {
+               seq_printf(s, "%*pE\t0x%x", t->target_oid.name_len,
+                       t->target_oid.name, t->flags);
+       }
        if (t->paused)
                seq_puts(s, "\tP");
 }
index 23efcac8007271376bf31b0e1a983cbfdcaf4755..b68cc15e9cdc640ad504297291b812741da0153f 100644 (file)
@@ -387,7 +387,9 @@ static void target_copy(struct ceph_osd_request_target *dest,
 static void target_destroy(struct ceph_osd_request_target *t)
 {
        ceph_oid_destroy(&t->base_oid);
+       ceph_oloc_destroy(&t->base_oloc);
        ceph_oid_destroy(&t->target_oid);
+       ceph_oloc_destroy(&t->target_oloc);
 }
 
 /*
@@ -533,6 +535,11 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 }
 EXPORT_SYMBOL(ceph_osdc_alloc_request);
 
+static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc)
+{
+       return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
+}
+
 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 {
        struct ceph_osd_client *osdc = req->r_osdc;
@@ -540,11 +547,13 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
        int msg_size;
 
        WARN_ON(ceph_oid_empty(&req->r_base_oid));
+       WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
 
        /* create request message */
        msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
        msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
-       msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+       msg_size += CEPH_ENCODING_START_BLK_LEN +
+                       ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
        msg_size += 1 + 8 + 4 + 4; /* pgid */
        msg_size += 4 + req->r_base_oid.name_len; /* oid */
        msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
@@ -949,6 +958,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
        req->r_flags = flags;
        req->r_base_oloc.pool = layout->pool_id;
+       req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
        ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
 
        req->r_snapid = vino.snap;
@@ -1489,12 +1499,16 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
        p += sizeof(req->r_replay_version);
 
        /* oloc */
-       ceph_encode_8(&p, 4);
-       ceph_encode_8(&p, 4);
-       ceph_encode_32(&p, 8 + 4 + 4);
+       ceph_start_encoding(&p, 5, 4,
+                           ceph_oloc_encoding_size(&req->r_t.target_oloc));
        ceph_encode_64(&p, req->r_t.target_oloc.pool);
        ceph_encode_32(&p, -1); /* preferred */
        ceph_encode_32(&p, 0); /* key len */
+       if (req->r_t.target_oloc.pool_ns)
+               ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
+                                  req->r_t.target_oloc.pool_ns->len);
+       else
+               ceph_encode_32(&p, 0);
 
        /* pgid */
        ceph_encode_8(&p, 1);
@@ -2596,8 +2610,8 @@ static int ceph_oloc_decode(void **p, void *end,
        if (struct_v >= 5) {
                len = ceph_decode_32(p);
                if (len > 0) {
-                       pr_warn("ceph_object_locator::nspace is set\n");
-                       goto e_inval;
+                       ceph_decode_need(p, end, len, e_inval);
+                       *p += len;
                }
        }
 
@@ -2835,7 +2849,11 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
                unlink_request(osd, req);
                mutex_unlock(&osd->lock);
 
-               ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
+               /*
+                * Not ceph_oloc_copy() - changing pool_ns is not
+                * supported.
+                */
+               req->r_t.target_oloc.pool = m.redirect.oloc.pool;
                req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
                req->r_tid = 0;
                __submit_request(req, false);
index 5947b2e9fb8ee660d7b532eacdd490e81649048e..d2436880b3056da8342845103c23aa59c66f8066 100644 (file)
@@ -1510,6 +1510,24 @@ bad:
        return ERR_PTR(err);
 }
 
+void ceph_oloc_copy(struct ceph_object_locator *dest,
+                   const struct ceph_object_locator *src)
+{
+       WARN_ON(!ceph_oloc_empty(dest));
+       WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
+
+       dest->pool = src->pool;
+       if (src->pool_ns)
+               dest->pool_ns = ceph_get_string(src->pool_ns);
+}
+EXPORT_SYMBOL(ceph_oloc_copy);
+
+void ceph_oloc_destroy(struct ceph_object_locator *oloc)
+{
+       ceph_put_string(oloc->pool_ns);
+}
+EXPORT_SYMBOL(ceph_oloc_destroy);
+
 void ceph_oid_copy(struct ceph_object_id *dest,
                   const struct ceph_object_id *src)
 {
@@ -1844,12 +1862,34 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
        if (!pi)
                return -ENOENT;
 
-       raw_pgid->pool = oloc->pool;
-       raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
-                                      oid->name_len);
-
-       dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
-            raw_pgid->pool, raw_pgid->seed);
+       if (!oloc->pool_ns) {
+               raw_pgid->pool = oloc->pool;
+               raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
+                                            oid->name_len);
+               dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
+                    raw_pgid->pool, raw_pgid->seed);
+       } else {
+               char stack_buf[256];
+               char *buf = stack_buf;
+               int nsl = oloc->pool_ns->len;
+               size_t total = nsl + 1 + oid->name_len;
+
+               if (total > sizeof(stack_buf)) {
+                       buf = kmalloc(total, GFP_NOIO);
+                       if (!buf)
+                               return -ENOMEM;
+               }
+               memcpy(buf, oloc->pool_ns->str, nsl);
+               buf[nsl] = '\037';
+               memcpy(buf + nsl + 1, oid->name, oid->name_len);
+               raw_pgid->pool = oloc->pool;
+               raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
+               if (buf != stack_buf)
+                       kfree(buf);
+               dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
+                    oid->name, nsl, oloc->pool_ns->str,
+                    raw_pgid->pool, raw_pgid->seed);
+       }
        return 0;
 }
 EXPORT_SYMBOL(ceph_object_locator_to_pg);