Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Thu, 20 Dec 2012 22:00:13 +0000 (14:00 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Thu, 20 Dec 2012 22:00:13 +0000 (14:00 -0800)
Pull Ceph update from Sage Weil:
 "There are a few different groups of commits here.  The largest is
  Alex's ongoing work to enable the coming RBD features (cloning,
  striping).  There is some cleanup in libceph that goes along with it.

  Cyril and David have fixed some problems with NFS reexport (leaking
  dentries and page locks), and there is a batch of patches from Yan
  fixing problems with the fs client when running against a clustered
  MDS.  There are a few bug fixes mixed in for good measure, many of
  which will be going to the stable trees once they're upstream.

  My apologies for the late pull.  There is still a gremlin in the rbd
  map/unmap code and I was hoping to include the fix for that as well,
  but we haven't been able to confirm the fix is correct yet; I'll send
  that in a separate pull once it's nailed down."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (68 commits)
  rbd: get rid of rbd_{get,put}_dev()
  libceph: register request before unregister linger
  libceph: don't use rb_init_node() in ceph_osdc_alloc_request()
  libceph: init event->node in ceph_osdc_create_event()
  libceph: init osd->o_node in create_osd()
  libceph: report connection fault with warning
  libceph: socket can close in any connection state
  rbd: don't use ENOTSUPP
  rbd: remove linger unconditionally
  rbd: get rid of RBD_MAX_SEG_NAME_LEN
  libceph: avoid using freed osd in __kick_osd_requests()
  ceph: don't reference req after put
  rbd: do not allow remove of mounted-on image
  libceph: Unlock unprocessed pages in start_read() error path
  ceph: call handle_cap_grant() for cap import message
  ceph: Fix __ceph_do_pending_vmtruncate
  ceph: Don't add dirty inode to dirty list if caps is in migration
  ceph: Fix infinite loop in __wake_requests
  ceph: Don't update i_max_size when handling non-auth cap
  bdi_register: add __printf verification, fix arg mismatch
  ...

17 files changed:
Documentation/ABI/testing/sysfs-bus-rbd
drivers/block/rbd.c
drivers/block/rbd_types.h
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/super.c
include/linux/backing-dev.h
include/linux/ceph/libceph.h
include/linux/ceph/osdmap.h
include/linux/ceph/rados.h
net/ceph/ceph_common.c
net/ceph/messenger.c
net/ceph/osd_client.c
net/ceph/osdmap.c

index 1cf2adf46b118cd4b2f7bc2449a1065d764a2bd6..cd9213ccf3dc263ba2e87ff2ecc660ef370c1f1a 100644 (file)
@@ -70,6 +70,10 @@ snap_*
 
        A directory per each snapshot
 
+parent
+
+       Information identifying the pool, image, and snapshot id for
+       the parent image in a layered rbd image (format 2 only).
 
 Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name>
 -------------------------------------------------------------
index bb3d9be3b1b4eecb82ba82f8bc1045e825ff7a75..89576a0b3f2ed5b099ecf7ce675104d282b0dd48 100644 (file)
 
 #define RBD_MINORS_PER_MAJOR   256             /* max minors per blkdev */
 
-#define RBD_MAX_SNAP_NAME_LEN  32
+#define RBD_SNAP_DEV_NAME_PREFIX       "snap_"
+#define RBD_MAX_SNAP_NAME_LEN  \
+                       (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
+
 #define RBD_MAX_SNAP_COUNT     510     /* allows max snapc to fit in 4KB */
 #define RBD_MAX_OPT_LEN                1024
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
+/* This allows a single page to hold an image name sent by OSD */
+#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
 #define RBD_IMAGE_ID_LEN_MAX   64
+
 #define RBD_OBJ_PREFIX_LEN_MAX 64
 
+/* Feature bits */
+
+#define RBD_FEATURE_LAYERING      1
+
+/* Features supported by this (client software) implementation. */
+
+#define RBD_FEATURES_ALL          (0)
+
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
@@ -101,6 +115,27 @@ struct rbd_image_header {
        u64 obj_version;
 };
 
+/*
+ * An rbd image specification.
+ *
+ * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
+ * identify an image.
+ */
+struct rbd_spec {
+       u64             pool_id;
+       char            *pool_name;
+
+       char            *image_id;
+       size_t          image_id_len;
+       char            *image_name;
+       size_t          image_name_len;
+
+       u64             snap_id;
+       char            *snap_name;
+
+       struct kref     kref;
+};
+
 struct rbd_options {
        bool    read_only;
 };
@@ -155,11 +190,8 @@ struct rbd_snap {
 };
 
 struct rbd_mapping {
-       char                    *snap_name;
-       u64                     snap_id;
        u64                     size;
        u64                     features;
-       bool                    snap_exists;
        bool                    read_only;
 };
 
@@ -173,7 +205,6 @@ struct rbd_device {
        struct gendisk          *disk;          /* blkdev's gendisk and rq */
 
        u32                     image_format;   /* Either 1 or 2 */
-       struct rbd_options      rbd_opts;
        struct rbd_client       *rbd_client;
 
        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -181,17 +212,17 @@ struct rbd_device {
        spinlock_t              lock;           /* queue lock */
 
        struct rbd_image_header header;
-       char                    *image_id;
-       size_t                  image_id_len;
-       char                    *image_name;
-       size_t                  image_name_len;
+       bool                    exists;
+       struct rbd_spec         *spec;
+
        char                    *header_name;
-       char                    *pool_name;
-       int                     pool_id;
 
        struct ceph_osd_event   *watch_event;
        struct ceph_osd_request *watch_request;
 
+       struct rbd_spec         *parent_spec;
+       u64                     parent_overlap;
+
        /* protects updating the header */
        struct rw_semaphore     header_rwsem;
 
@@ -204,6 +235,7 @@ struct rbd_device {
 
        /* sysfs related */
        struct device           dev;
+       unsigned long           open_count;
 };
 
 static DEFINE_MUTEX(ctl_mutex);          /* Serialize open/close/setup/teardown */
@@ -218,7 +250,7 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
 
 static void rbd_dev_release(struct device *dev);
-static void __rbd_remove_snap_dev(struct rbd_snap *snap);
+static void rbd_remove_snap_dev(struct rbd_snap *snap);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
                       size_t count);
@@ -258,17 +290,8 @@ static struct device rbd_root_dev = {
 #  define rbd_assert(expr)     ((void) 0)
 #endif /* !RBD_DEBUG */
 
-static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
-{
-       return get_device(&rbd_dev->dev);
-}
-
-static void rbd_put_dev(struct rbd_device *rbd_dev)
-{
-       put_device(&rbd_dev->dev);
-}
-
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -277,8 +300,11 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
        if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
                return -EROFS;
 
-       rbd_get_dev(rbd_dev);
+       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+       (void) get_device(&rbd_dev->dev);
        set_device_ro(bdev, rbd_dev->mapping.read_only);
+       rbd_dev->open_count++;
+       mutex_unlock(&ctl_mutex);
 
        return 0;
 }
@@ -287,7 +313,11 @@ static int rbd_release(struct gendisk *disk, fmode_t mode)
 {
        struct rbd_device *rbd_dev = disk->private_data;
 
-       rbd_put_dev(rbd_dev);
+       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+       rbd_assert(rbd_dev->open_count > 0);
+       rbd_dev->open_count--;
+       put_device(&rbd_dev->dev);
+       mutex_unlock(&ctl_mutex);
 
        return 0;
 }
@@ -388,7 +418,7 @@ enum {
 static match_table_t rbd_opts_tokens = {
        /* int args above */
        /* string args above */
-       {Opt_read_only, "mapping.read_only"},
+       {Opt_read_only, "read_only"},
        {Opt_read_only, "ro"},          /* Alternate spelling */
        {Opt_read_write, "read_write"},
        {Opt_read_write, "rw"},         /* Alternate spelling */
@@ -441,33 +471,17 @@ static int parse_rbd_opts_token(char *c, void *private)
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
  */
-static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
-                               size_t mon_addr_len, char *options)
+static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 {
-       struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
-       struct ceph_options *ceph_opts;
        struct rbd_client *rbdc;
 
-       rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
-
-       ceph_opts = ceph_parse_options(options, mon_addr,
-                                       mon_addr + mon_addr_len,
-                                       parse_rbd_opts_token, rbd_opts);
-       if (IS_ERR(ceph_opts))
-               return PTR_ERR(ceph_opts);
-
        rbdc = rbd_client_find(ceph_opts);
-       if (rbdc) {
-               /* using an existing client */
+       if (rbdc)       /* using an existing client */
                ceph_destroy_options(ceph_opts);
-       } else {
+       else
                rbdc = rbd_client_create(ceph_opts);
-               if (IS_ERR(rbdc))
-                       return PTR_ERR(rbdc);
-       }
-       rbd_dev->rbd_client = rbdc;
 
-       return 0;
+       return rbdc;
 }
 
 /*
@@ -492,10 +506,10 @@ static void rbd_client_release(struct kref *kref)
  * Drop reference to ceph client node. If it's not referenced anymore, release
  * it.
  */
-static void rbd_put_client(struct rbd_device *rbd_dev)
+static void rbd_put_client(struct rbd_client *rbdc)
 {
-       kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
-       rbd_dev->rbd_client = NULL;
+       if (rbdc)
+               kref_put(&rbdc->kref, rbd_client_release);
 }
 
 /*
@@ -524,6 +538,16 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
        if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
                return false;
 
+       /* The bio layer requires at least sector-sized I/O */
+
+       if (ondisk->options.order < SECTOR_SHIFT)
+               return false;
+
+       /* If we use u64 in a few spots we may be able to loosen this */
+
+       if (ondisk->options.order > 8 * sizeof (int) - 1)
+               return false;
+
        /*
         * The size of a snapshot header has to fit in a size_t, and
         * that limits the number of snapshots.
@@ -635,6 +659,20 @@ out_err:
        return -ENOMEM;
 }
 
+static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+{
+       struct rbd_snap *snap;
+
+       if (snap_id == CEPH_NOSNAP)
+               return RBD_SNAP_HEAD_NAME;
+
+       list_for_each_entry(snap, &rbd_dev->snaps, node)
+               if (snap_id == snap->id)
+                       return snap->name;
+
+       return NULL;
+}
+
 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
 {
 
@@ -642,7 +680,7 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
 
        list_for_each_entry(snap, &rbd_dev->snaps, node) {
                if (!strcmp(snap_name, snap->name)) {
-                       rbd_dev->mapping.snap_id = snap->id;
+                       rbd_dev->spec->snap_id = snap->id;
                        rbd_dev->mapping.size = snap->size;
                        rbd_dev->mapping.features = snap->features;
 
@@ -653,26 +691,23 @@ static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
        return -ENOENT;
 }
 
-static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
+static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
 {
        int ret;
 
-       if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
+       if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
                    sizeof (RBD_SNAP_HEAD_NAME))) {
-               rbd_dev->mapping.snap_id = CEPH_NOSNAP;
+               rbd_dev->spec->snap_id = CEPH_NOSNAP;
                rbd_dev->mapping.size = rbd_dev->header.image_size;
                rbd_dev->mapping.features = rbd_dev->header.features;
-               rbd_dev->mapping.snap_exists = false;
-               rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
                ret = 0;
        } else {
-               ret = snap_by_name(rbd_dev, snap_name);
+               ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
                if (ret < 0)
                        goto done;
-               rbd_dev->mapping.snap_exists = true;
                rbd_dev->mapping.read_only = true;
        }
-       rbd_dev->mapping.snap_name = snap_name;
+       rbd_dev->exists = true;
 done:
        return ret;
 }
@@ -695,13 +730,13 @@ static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
        u64 segment;
        int ret;
 
-       name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+       name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
        if (!name)
                return NULL;
        segment = offset >> rbd_dev->header.obj_order;
-       ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
+       ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
                        rbd_dev->header.object_prefix, segment);
-       if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
+       if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
                pr_err("error formatting segment name for #%llu (%d)\n",
                        segment, ret);
                kfree(name);
@@ -800,77 +835,144 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
 }
 
 /*
- * bio_chain_clone - clone a chain of bios up to a certain length.
- * might return a bio_pair that will need to be released.
+ * Clone a portion of a bio, starting at the given byte offset
+ * and continuing for the number of bytes indicated.
  */
-static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
-                                  struct bio_pair **bp,
-                                  int len, gfp_t gfpmask)
-{
-       struct bio *old_chain = *old;
-       struct bio *new_chain = NULL;
-       struct bio *tail;
-       int total = 0;
-
-       if (*bp) {
-               bio_pair_release(*bp);
-               *bp = NULL;
-       }
+static struct bio *bio_clone_range(struct bio *bio_src,
+                                       unsigned int offset,
+                                       unsigned int len,
+                                       gfp_t gfpmask)
+{
+       struct bio_vec *bv;
+       unsigned int resid;
+       unsigned short idx;
+       unsigned int voff;
+       unsigned short end_idx;
+       unsigned short vcnt;
+       struct bio *bio;
 
-       while (old_chain && (total < len)) {
-               struct bio *tmp;
+       /* Handle the easy case for the caller */
 
-               tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
-               if (!tmp)
-                       goto err_out;
-               gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
+       if (!offset && len == bio_src->bi_size)
+               return bio_clone(bio_src, gfpmask);
 
-               if (total + old_chain->bi_size > len) {
-                       struct bio_pair *bp;
+       if (WARN_ON_ONCE(!len))
+               return NULL;
+       if (WARN_ON_ONCE(len > bio_src->bi_size))
+               return NULL;
+       if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
+               return NULL;
 
-                       /*
-                        * this split can only happen with a single paged bio,
-                        * split_bio will BUG_ON if this is not the case
-                        */
-                       dout("bio_chain_clone split! total=%d remaining=%d"
-                            "bi_size=%u\n",
-                            total, len - total, old_chain->bi_size);
+       /* Find first affected segment... */
 
-                       /* split the bio. We'll release it either in the next
-                          call, or it will have to be released outside */
-                       bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
-                       if (!bp)
-                               goto err_out;
+       resid = offset;
+       __bio_for_each_segment(bv, bio_src, idx, 0) {
+               if (resid < bv->bv_len)
+                       break;
+               resid -= bv->bv_len;
+       }
+       voff = resid;
 
-                       __bio_clone(tmp, &bp->bio1);
+       /* ...and the last affected segment */
 
-                       *next = &bp->bio2;
-               } else {
-                       __bio_clone(tmp, old_chain);
-                       *next = old_chain->bi_next;
-               }
+       resid += len;
+       __bio_for_each_segment(bv, bio_src, end_idx, idx) {
+               if (resid <= bv->bv_len)
+                       break;
+               resid -= bv->bv_len;
+       }
+       vcnt = end_idx - idx + 1;
+
+       /* Build the clone */
 
-               tmp->bi_bdev = NULL;
-               tmp->bi_next = NULL;
-               if (new_chain)
-                       tail->bi_next = tmp;
-               else
-                       new_chain = tmp;
-               tail = tmp;
-               old_chain = old_chain->bi_next;
+       bio = bio_alloc(gfpmask, (unsigned int) vcnt);
+       if (!bio)
+               return NULL;    /* ENOMEM */
 
-               total += tmp->bi_size;
+       bio->bi_bdev = bio_src->bi_bdev;
+       bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
+       bio->bi_rw = bio_src->bi_rw;
+       bio->bi_flags |= 1 << BIO_CLONED;
+
+       /*
+        * Copy over our part of the bio_vec, then update the first
+        * and last (or only) entries.
+        */
+       memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
+                       vcnt * sizeof (struct bio_vec));
+       bio->bi_io_vec[0].bv_offset += voff;
+       if (vcnt > 1) {
+               bio->bi_io_vec[0].bv_len -= voff;
+               bio->bi_io_vec[vcnt - 1].bv_len = resid;
+       } else {
+               bio->bi_io_vec[0].bv_len = len;
        }
 
-       rbd_assert(total == len);
+       bio->bi_vcnt = vcnt;
+       bio->bi_size = len;
+       bio->bi_idx = 0;
+
+       return bio;
+}
+
+/*
+ * Clone a portion of a bio chain, starting at the given byte offset
+ * into the first bio in the source chain and continuing for the
+ * number of bytes indicated.  The result is another bio chain of
+ * exactly the given length, or a null pointer on error.
+ *
+ * The bio_src and offset parameters are both in-out.  On entry they
+ * refer to the first source bio and the offset into that bio where
+ * the start of data to be cloned is located.
+ *
+ * On return, bio_src is updated to refer to the bio in the source
+ * chain that contains first un-cloned byte, and *offset will
+ * contain the offset of that byte within that bio.
+ */
+static struct bio *bio_chain_clone_range(struct bio **bio_src,
+                                       unsigned int *offset,
+                                       unsigned int len,
+                                       gfp_t gfpmask)
+{
+       struct bio *bi = *bio_src;
+       unsigned int off = *offset;
+       struct bio *chain = NULL;
+       struct bio **end;
+
+       /* Build up a chain of clone bios up to the limit */
+
+       if (!bi || off >= bi->bi_size || !len)
+               return NULL;            /* Nothing to clone */
 
-       *old = old_chain;
+       end = &chain;
+       while (len) {
+               unsigned int bi_size;
+               struct bio *bio;
+
+               if (!bi)
+                       goto out_err;   /* EINVAL; ran out of bio's */
+               bi_size = min_t(unsigned int, bi->bi_size - off, len);
+               bio = bio_clone_range(bi, off, bi_size, gfpmask);
+               if (!bio)
+                       goto out_err;   /* ENOMEM */
+
+               *end = bio;
+               end = &bio->bi_next;
+
+               off += bi_size;
+               if (off == bi->bi_size) {
+                       bi = bi->bi_next;
+                       off = 0;
+               }
+               len -= bi_size;
+       }
+       *bio_src = bi;
+       *offset = off;
 
-       return new_chain;
+       return chain;
+out_err:
+       bio_chain_put(chain);
 
-err_out:
-       dout("bio_chain_clone with err\n");
-       bio_chain_put(new_chain);
        return NULL;
 }
 
@@ -988,8 +1090,9 @@ static int rbd_do_request(struct request *rq,
                req_data->coll_index = coll_index;
        }
 
-       dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
-               (unsigned long long) ofs, (unsigned long long) len);
+       dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
+               object_name, (unsigned long long) ofs,
+               (unsigned long long) len, coll, coll_index);
 
        osdc = &rbd_dev->rbd_client->client->osdc;
        req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@@ -1019,7 +1122,7 @@ static int rbd_do_request(struct request *rq,
        layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
        layout->fl_stripe_count = cpu_to_le32(1);
        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-       layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
+       layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
        ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
                                   req, ops);
        rbd_assert(ret == 0);
@@ -1154,8 +1257,6 @@ done:
 static int rbd_do_op(struct request *rq,
                     struct rbd_device *rbd_dev,
                     struct ceph_snap_context *snapc,
-                    u64 snapid,
-                    int opcode, int flags,
                     u64 ofs, u64 len,
                     struct bio *bio,
                     struct rbd_req_coll *coll,
@@ -1167,6 +1268,9 @@ static int rbd_do_op(struct request *rq,
        int ret;
        struct ceph_osd_req_op *ops;
        u32 payload_len;
+       int opcode;
+       int flags;
+       u64 snapid;
 
        seg_name = rbd_segment_name(rbd_dev, ofs);
        if (!seg_name)
@@ -1174,7 +1278,18 @@ static int rbd_do_op(struct request *rq,
        seg_len = rbd_segment_length(rbd_dev, ofs, len);
        seg_ofs = rbd_segment_offset(rbd_dev, ofs);
 
-       payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
+       if (rq_data_dir(rq) == WRITE) {
+               opcode = CEPH_OSD_OP_WRITE;
+               flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
+               snapid = CEPH_NOSNAP;
+               payload_len = seg_len;
+       } else {
+               opcode = CEPH_OSD_OP_READ;
+               flags = CEPH_OSD_FLAG_READ;
+               snapc = NULL;
+               snapid = rbd_dev->spec->snap_id;
+               payload_len = 0;
+       }
 
        ret = -ENOMEM;
        ops = rbd_create_rw_ops(1, opcode, payload_len);
@@ -1201,41 +1316,6 @@ done:
        return ret;
 }
 
-/*
- * Request async osd write
- */
-static int rbd_req_write(struct request *rq,
-                        struct rbd_device *rbd_dev,
-                        struct ceph_snap_context *snapc,
-                        u64 ofs, u64 len,
-                        struct bio *bio,
-                        struct rbd_req_coll *coll,
-                        int coll_index)
-{
-       return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
-                        CEPH_OSD_OP_WRITE,
-                        CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                        ofs, len, bio, coll, coll_index);
-}
-
-/*
- * Request async osd read
- */
-static int rbd_req_read(struct request *rq,
-                        struct rbd_device *rbd_dev,
-                        u64 snapid,
-                        u64 ofs, u64 len,
-                        struct bio *bio,
-                        struct rbd_req_coll *coll,
-                        int coll_index)
-{
-       return rbd_do_op(rq, rbd_dev, NULL,
-                        snapid,
-                        CEPH_OSD_OP_READ,
-                        CEPH_OSD_FLAG_READ,
-                        ofs, len, bio, coll, coll_index);
-}
-
 /*
  * Request sync osd read
  */
@@ -1304,7 +1384,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
                rbd_dev->header_name, (unsigned long long) notify_id,
                (unsigned int) opcode);
-       rc = rbd_refresh_header(rbd_dev, &hver);
+       rc = rbd_dev_refresh(rbd_dev, &hver);
        if (rc)
                pr_warning(RBD_DRV_NAME "%d got notification but failed to "
                           " update snaps: %d\n", rbd_dev->major, rc);
@@ -1460,18 +1540,16 @@ static void rbd_rq_fn(struct request_queue *q)
 {
        struct rbd_device *rbd_dev = q->queuedata;
        struct request *rq;
-       struct bio_pair *bp = NULL;
 
        while ((rq = blk_fetch_request(q))) {
                struct bio *bio;
-               struct bio *rq_bio, *next_bio = NULL;
                bool do_write;
                unsigned int size;
-               u64 op_size = 0;
                u64 ofs;
                int num_segs, cur_seg = 0;
                struct rbd_req_coll *coll;
                struct ceph_snap_context *snapc;
+               unsigned int bio_offset;
 
                dout("fetched request\n");
 
@@ -1483,10 +1561,6 @@ static void rbd_rq_fn(struct request_queue *q)
 
                /* deduce our operation (read, write) */
                do_write = (rq_data_dir(rq) == WRITE);
-
-               size = blk_rq_bytes(rq);
-               ofs = blk_rq_pos(rq) * SECTOR_SIZE;
-               rq_bio = rq->bio;
                if (do_write && rbd_dev->mapping.read_only) {
                        __blk_end_request_all(rq, -EROFS);
                        continue;
@@ -1496,8 +1570,8 @@ static void rbd_rq_fn(struct request_queue *q)
 
                down_read(&rbd_dev->header_rwsem);
 
-               if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
-                               !rbd_dev->mapping.snap_exists) {
+               if (!rbd_dev->exists) {
+                       rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
                        up_read(&rbd_dev->header_rwsem);
                        dout("request for non-existent snapshot");
                        spin_lock_irq(q->queue_lock);
@@ -1509,6 +1583,10 @@ static void rbd_rq_fn(struct request_queue *q)
 
                up_read(&rbd_dev->header_rwsem);
 
+               size = blk_rq_bytes(rq);
+               ofs = blk_rq_pos(rq) * SECTOR_SIZE;
+               bio = rq->bio;
+
                dout("%s 0x%x bytes at 0x%llx\n",
                     do_write ? "write" : "read",
                     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1528,45 +1606,37 @@ static void rbd_rq_fn(struct request_queue *q)
                        continue;
                }
 
+               bio_offset = 0;
                do {
-                       /* a bio clone to be passed down to OSD req */
+                       u64 limit = rbd_segment_length(rbd_dev, ofs, size);
+                       unsigned int chain_size;
+                       struct bio *bio_chain;
+
+                       BUG_ON(limit > (u64) UINT_MAX);
+                       chain_size = (unsigned int) limit;
                        dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-                       op_size = rbd_segment_length(rbd_dev, ofs, size);
+
                        kref_get(&coll->kref);
-                       bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
-                                             op_size, GFP_ATOMIC);
-                       if (!bio) {
-                               rbd_coll_end_req_index(rq, coll, cur_seg,
-                                                      -ENOMEM, op_size);
-                               goto next_seg;
-                       }
 
+                       /* Pass a cloned bio chain via an osd request */
 
-                       /* init OSD command: write or read */
-                       if (do_write)
-                               rbd_req_write(rq, rbd_dev,
-                                             snapc,
-                                             ofs,
-                                             op_size, bio,
-                                             coll, cur_seg);
+                       bio_chain = bio_chain_clone_range(&bio,
+                                               &bio_offset, chain_size,
+                                               GFP_ATOMIC);
+                       if (bio_chain)
+                               (void) rbd_do_op(rq, rbd_dev, snapc,
+                                               ofs, chain_size,
+                                               bio_chain, coll, cur_seg);
                        else
-                               rbd_req_read(rq, rbd_dev,
-                                            rbd_dev->mapping.snap_id,
-                                            ofs,
-                                            op_size, bio,
-                                            coll, cur_seg);
-
-next_seg:
-                       size -= op_size;
-                       ofs += op_size;
+                               rbd_coll_end_req_index(rq, coll, cur_seg,
+                                                      -ENOMEM, chain_size);
+                       size -= chain_size;
+                       ofs += chain_size;
 
                        cur_seg++;
-                       rq_bio = next_bio;
                } while (size > 0);
                kref_put(&coll->kref, rbd_coll_release);
 
-               if (bp)
-                       bio_pair_release(bp);
                spin_lock_irq(q->queue_lock);
 
                ceph_put_snap_context(snapc);
@@ -1576,28 +1646,47 @@ next_seg:
 /*
  * a queue callback. Makes sure that we don't create a bio that spans across
  * multiple osd objects. One exception would be with a single page bios,
- * which we handle later at bio_chain_clone
+ * which we handle later at bio_chain_clone_range()
  */
 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
                          struct bio_vec *bvec)
 {
        struct rbd_device *rbd_dev = q->queuedata;
-       unsigned int chunk_sectors;
-       sector_t sector;
-       unsigned int bio_sectors;
-       int max;
+       sector_t sector_offset;
+       sector_t sectors_per_obj;
+       sector_t obj_sector_offset;
+       int ret;
 
-       chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
-       sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
-       bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
+       /*
+        * Find how far into its rbd object the partition-relative
+        * bio start sector is to offset relative to the enclosing
+        * device.
+        */
+       sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
+       sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
+       obj_sector_offset = sector_offset & (sectors_per_obj - 1);
+
+       /*
+        * Compute the number of bytes from that offset to the end
+        * of the object.  Account for what's already used by the bio.
+        */
+       ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
+       if (ret > bmd->bi_size)
+               ret -= bmd->bi_size;
+       else
+               ret = 0;
 
-       max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
-                                + bio_sectors)) << SECTOR_SHIFT;
-       if (max < 0)
-               max = 0; /* bio_add cannot handle a negative return */
-       if (max <= bvec->bv_len && bio_sectors == 0)
-               return bvec->bv_len;
-       return max;
+       /*
+        * Don't send back more than was asked for.  And if the bio
+        * was empty, let the whole thing through because:  "Note
+        * that a block device *must* allow a single page to be
+        * added to an empty bio."
+        */
+       rbd_assert(bvec->bv_len <= PAGE_SIZE);
+       if (ret > (int) bvec->bv_len || !bmd->bi_size)
+               ret = (int) bvec->bv_len;
+
+       return ret;
 }
 
 static void rbd_free_disk(struct rbd_device *rbd_dev)
@@ -1663,13 +1752,13 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
                        ret = -ENXIO;
                        pr_warning("short header read for image %s"
                                        " (want %zd got %d)\n",
-                               rbd_dev->image_name, size, ret);
+                               rbd_dev->spec->image_name, size, ret);
                        goto out_err;
                }
                if (!rbd_dev_ondisk_valid(ondisk)) {
                        ret = -ENXIO;
                        pr_warning("invalid header for image %s\n",
-                               rbd_dev->image_name);
+                               rbd_dev->spec->image_name);
                        goto out_err;
                }
 
@@ -1707,19 +1796,32 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
        return ret;
 }
 
-static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
+static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
 {
        struct rbd_snap *snap;
        struct rbd_snap *next;
 
        list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
-               __rbd_remove_snap_dev(snap);
+               rbd_remove_snap_dev(snap);
+}
+
+static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
+{
+       sector_t size;
+
+       if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+               return;
+
+       size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
+       dout("setting size to %llu sectors", (unsigned long long) size);
+       rbd_dev->mapping.size = (u64) size;
+       set_capacity(rbd_dev->disk, size);
 }
 
 /*
  * only read the first part of the ondisk header, without the snaps info
  */
-static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
        int ret;
        struct rbd_image_header h;
@@ -1730,17 +1832,9 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
 
        down_write(&rbd_dev->header_rwsem);
 
-       /* resized? */
-       if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
-               sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
-
-               if (size != (sector_t) rbd_dev->mapping.size) {
-                       dout("setting size to %llu sectors",
-                               (unsigned long long) size);
-                       rbd_dev->mapping.size = (u64) size;
-                       set_capacity(rbd_dev->disk, size);
-               }
-       }
+       /* Update image size, and check for resize of mapped image */
+       rbd_dev->header.image_size = h.image_size;
+       rbd_update_mapping_size(rbd_dev);
 
        /* rbd_dev->header.object_prefix shouldn't change */
        kfree(rbd_dev->header.snap_sizes);
@@ -1768,12 +1862,16 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
        return ret;
 }
 
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
        int ret;
 
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       ret = __rbd_refresh_header(rbd_dev, hver);
+       if (rbd_dev->image_format == 1)
+               ret = rbd_dev_v1_refresh(rbd_dev, hver);
+       else
+               ret = rbd_dev_v2_refresh(rbd_dev, hver);
        mutex_unlock(&ctl_mutex);
 
        return ret;
@@ -1885,7 +1983,7 @@ static ssize_t rbd_pool_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->pool_name);
+       return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
 }
 
 static ssize_t rbd_pool_id_show(struct device *dev,
@@ -1893,7 +1991,8 @@ static ssize_t rbd_pool_id_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%d\n", rbd_dev->pool_id);
+       return sprintf(buf, "%llu\n",
+               (unsigned long long) rbd_dev->spec->pool_id);
 }
 
 static ssize_t rbd_name_show(struct device *dev,
@@ -1901,7 +2000,10 @@ static ssize_t rbd_name_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->image_name);
+       if (rbd_dev->spec->image_name)
+               return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
+
+       return sprintf(buf, "(unknown)\n");
 }
 
 static ssize_t rbd_image_id_show(struct device *dev,
@@ -1909,7 +2011,7 @@ static ssize_t rbd_image_id_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->image_id);
+       return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
 }
 
 /*
@@ -1922,7 +2024,50 @@ static ssize_t rbd_snap_show(struct device *dev,
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
+       return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
+}
+
+/*
+ * For an rbd v2 image, shows the pool id, image id, and snapshot id
+ * for the parent image.  If there is no parent, simply shows
+ * "(no parent image)".
+ */
+static ssize_t rbd_parent_show(struct device *dev,
+                            struct device_attribute *attr,
+                            char *buf)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+       struct rbd_spec *spec = rbd_dev->parent_spec;
+       int count;
+       char *bufp = buf;
+
+       if (!spec)
+               return sprintf(buf, "(no parent image)\n");
+
+       count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
+                       (unsigned long long) spec->pool_id, spec->pool_name);
+       if (count < 0)
+               return count;
+       bufp += count;
+
+       count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
+                       spec->image_name ? spec->image_name : "(unknown)");
+       if (count < 0)
+               return count;
+       bufp += count;
+
+       count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
+                       (unsigned long long) spec->snap_id, spec->snap_name);
+       if (count < 0)
+               return count;
+       bufp += count;
+
+       count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
+       if (count < 0)
+               return count;
+       bufp += count;
+
+       return (ssize_t) (bufp - buf);
 }
 
 static ssize_t rbd_image_refresh(struct device *dev,
@@ -1933,7 +2078,7 @@ static ssize_t rbd_image_refresh(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
        int ret;
 
-       ret = rbd_refresh_header(rbd_dev, NULL);
+       ret = rbd_dev_refresh(rbd_dev, NULL);
 
        return ret < 0 ? ret : size;
 }
@@ -1948,6 +2093,7 @@ static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
+static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
 
 static struct attribute *rbd_attrs[] = {
        &dev_attr_size.attr,
@@ -1959,6 +2105,7 @@ static struct attribute *rbd_attrs[] = {
        &dev_attr_name.attr,
        &dev_attr_image_id.attr,
        &dev_attr_current_snap.attr,
+       &dev_attr_parent.attr,
        &dev_attr_refresh.attr,
        NULL
 };
@@ -2047,6 +2194,74 @@ static struct device_type rbd_snap_device_type = {
        .release        = rbd_snap_dev_release,
 };
 
+static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
+{
+       kref_get(&spec->kref);
+
+       return spec;
+}
+
+static void rbd_spec_free(struct kref *kref);
+static void rbd_spec_put(struct rbd_spec *spec)
+{
+       if (spec)
+               kref_put(&spec->kref, rbd_spec_free);
+}
+
+static struct rbd_spec *rbd_spec_alloc(void)
+{
+       struct rbd_spec *spec;
+
+       spec = kzalloc(sizeof (*spec), GFP_KERNEL);
+       if (!spec)
+               return NULL;
+       kref_init(&spec->kref);
+
+       rbd_spec_put(rbd_spec_get(spec));       /* TEMPORARY */
+
+       return spec;
+}
+
+static void rbd_spec_free(struct kref *kref)
+{
+       struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
+
+       kfree(spec->pool_name);
+       kfree(spec->image_id);
+       kfree(spec->image_name);
+       kfree(spec->snap_name);
+       kfree(spec);
+}
+
+struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+                               struct rbd_spec *spec)
+{
+       struct rbd_device *rbd_dev;
+
+       rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
+       if (!rbd_dev)
+               return NULL;
+
+       spin_lock_init(&rbd_dev->lock);
+       INIT_LIST_HEAD(&rbd_dev->node);
+       INIT_LIST_HEAD(&rbd_dev->snaps);
+       init_rwsem(&rbd_dev->header_rwsem);
+
+       rbd_dev->spec = spec;
+       rbd_dev->rbd_client = rbdc;
+
+       return rbd_dev;
+}
+
+static void rbd_dev_destroy(struct rbd_device *rbd_dev)
+{
+       rbd_spec_put(rbd_dev->parent_spec);
+       kfree(rbd_dev->header_name);
+       rbd_put_client(rbd_dev->rbd_client);
+       rbd_spec_put(rbd_dev->spec);
+       kfree(rbd_dev);
+}
+
 static bool rbd_snap_registered(struct rbd_snap *snap)
 {
        bool ret = snap->dev.type == &rbd_snap_device_type;
@@ -2057,7 +2272,7 @@ static bool rbd_snap_registered(struct rbd_snap *snap)
        return ret;
 }
 
-static void __rbd_remove_snap_dev(struct rbd_snap *snap)
+static void rbd_remove_snap_dev(struct rbd_snap *snap)
 {
        list_del(&snap->node);
        if (device_is_registered(&snap->dev))
@@ -2073,7 +2288,7 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
        dev->type = &rbd_snap_device_type;
        dev->parent = parent;
        dev->release = rbd_snap_dev_release;
-       dev_set_name(dev, "snap_%s", snap->name);
+       dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
        dout("%s: registering device for snapshot %s\n", __func__, snap->name);
 
        ret = device_register(dev);
@@ -2189,6 +2404,7 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
+       ret = 0;    /* rbd_req_sync_exec() can return positive */
 
        p = reply_buf;
        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2216,6 +2432,7 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
                __le64 features;
                __le64 incompat;
        } features_buf = { 0 };
+       u64 incompat;
        int ret;
 
        ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
@@ -2226,6 +2443,11 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
+
+       incompat = le64_to_cpu(features_buf.incompat);
+       if (incompat & ~RBD_FEATURES_ALL)
+               return -ENXIO;
+
        *snap_features = le64_to_cpu(features_buf.features);
 
        dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
@@ -2242,6 +2464,183 @@ static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
                                                &rbd_dev->header.features);
 }
 
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
+{
+       struct rbd_spec *parent_spec;
+       size_t size;
+       void *reply_buf = NULL;
+       __le64 snapid;
+       void *p;
+       void *end;
+       char *image_id;
+       u64 overlap;
+       size_t len = 0;
+       int ret;
+
+       parent_spec = rbd_spec_alloc();
+       if (!parent_spec)
+               return -ENOMEM;
+
+       size = sizeof (__le64) +                                /* pool_id */
+               sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
+               sizeof (__le64) +                               /* snap_id */
+               sizeof (__le64);                                /* overlap */
+       reply_buf = kmalloc(size, GFP_KERNEL);
+       if (!reply_buf) {
+               ret = -ENOMEM;
+               goto out_err;
+       }
+
+       snapid = cpu_to_le64(CEPH_NOSNAP);
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_parent",
+                               (char *) &snapid, sizeof (snapid),
+                               (char *) reply_buf, size,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out_err;
+
+       ret = -ERANGE;
+       p = reply_buf;
+       end = (char *) reply_buf + size;
+       ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
+       if (parent_spec->pool_id == CEPH_NOPOOL)
+               goto out;       /* No parent?  No problem. */
+
+       image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+       if (IS_ERR(image_id)) {
+               ret = PTR_ERR(image_id);
+               goto out_err;
+       }
+       parent_spec->image_id = image_id;
+       parent_spec->image_id_len = len;
+       ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
+       ceph_decode_64_safe(&p, end, overlap, out_err);
+
+       rbd_dev->parent_overlap = overlap;
+       rbd_dev->parent_spec = parent_spec;
+       parent_spec = NULL;     /* rbd_dev now owns this */
+out:
+       ret = 0;
+out_err:
+       kfree(reply_buf);
+       rbd_spec_put(parent_spec);
+
+       return ret;
+}
+
+static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
+{
+       size_t image_id_size;
+       char *image_id;
+       void *p;
+       void *end;
+       size_t size;
+       void *reply_buf = NULL;
+       size_t len = 0;
+       char *image_name = NULL;
+       int ret;
+
+       rbd_assert(!rbd_dev->spec->image_name);
+
+       image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
+       image_id = kmalloc(image_id_size, GFP_KERNEL);
+       if (!image_id)
+               return NULL;
+
+       p = image_id;
+       end = (char *) image_id + image_id_size;
+       ceph_encode_string(&p, end, rbd_dev->spec->image_id,
+                               (u32) rbd_dev->spec->image_id_len);
+
+       size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
+       reply_buf = kmalloc(size, GFP_KERNEL);
+       if (!reply_buf)
+               goto out;
+
+       ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
+                               "rbd", "dir_get_name",
+                               image_id, image_id_size,
+                               (char *) reply_buf, size,
+                               CEPH_OSD_FLAG_READ, NULL);
+       if (ret < 0)
+               goto out;
+       p = reply_buf;
+       end = (char *) reply_buf + size;
+       image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+       if (IS_ERR(image_name))
+               image_name = NULL;
+       else
+               dout("%s: name is %s len is %zd\n", __func__, image_name, len);
+out:
+       kfree(reply_buf);
+       kfree(image_id);
+
+       return image_name;
+}
+
+/*
+ * When a parent image gets probed, we only have the pool, image,
+ * and snapshot ids but not the names of any of them.  This call
+ * is made later to fill in those names.  It has to be done after
+ * rbd_dev_snaps_update() has completed because some of the
+ * information (in particular, snapshot name) is not available
+ * until then.
+ */
+static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
+{
+       struct ceph_osd_client *osdc;
+       const char *name;
+       void *reply_buf = NULL;
+       int ret;
+
+       if (rbd_dev->spec->pool_name)
+               return 0;       /* Already have the names */
+
+       /* Look up the pool name */
+
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
+       if (!name)
+               return -EIO;    /* pool id too large (>= 2^31) */
+
+       rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
+       if (!rbd_dev->spec->pool_name)
+               return -ENOMEM;
+
+       /* Fetch the image name; tolerate failure here */
+
+       name = rbd_dev_image_name(rbd_dev);
+       if (name) {
+               rbd_dev->spec->image_name_len = strlen(name);
+               rbd_dev->spec->image_name = (char *) name;
+       } else {
+               pr_warning(RBD_DRV_NAME "%d "
+                       "unable to get image name for image id %s\n",
+                       rbd_dev->major, rbd_dev->spec->image_id);
+       }
+
+       /* Look up the snapshot name. */
+
+       name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
+       if (!name) {
+               ret = -EIO;
+               goto out_err;
+       }
+       rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
+       if(!rbd_dev->spec->snap_name)
+               goto out_err;
+
+       return 0;
+out_err:
+       kfree(reply_buf);
+       kfree(rbd_dev->spec->pool_name);
+       rbd_dev->spec->pool_name = NULL;
+
+       return ret;
+}
+
 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
 {
        size_t size;
@@ -2328,7 +2727,6 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
        int ret;
        void *p;
        void *end;
-       size_t snap_name_len;
        char *snap_name;
 
        size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
@@ -2348,9 +2746,7 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
 
        p = reply_buf;
        end = (char *) reply_buf + size;
-       snap_name_len = 0;
-       snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
-                               GFP_KERNEL);
+       snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(snap_name)) {
                ret = PTR_ERR(snap_name);
                goto out;
@@ -2397,6 +2793,41 @@ static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
        return ERR_PTR(-EINVAL);
 }
 
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
+{
+       int ret;
+       __u8 obj_order;
+
+       down_write(&rbd_dev->header_rwsem);
+
+       /* Grab old order first, to see if it changes */
+
+       obj_order = rbd_dev->header.obj_order,
+       ret = rbd_dev_v2_image_size(rbd_dev);
+       if (ret)
+               goto out;
+       if (rbd_dev->header.obj_order != obj_order) {
+               ret = -EIO;
+               goto out;
+       }
+       rbd_update_mapping_size(rbd_dev);
+
+       ret = rbd_dev_v2_snap_context(rbd_dev, hver);
+       dout("rbd_dev_v2_snap_context returned %d\n", ret);
+       if (ret)
+               goto out;
+       ret = rbd_dev_snaps_update(rbd_dev);
+       dout("rbd_dev_snaps_update returned %d\n", ret);
+       if (ret)
+               goto out;
+       ret = rbd_dev_snaps_register(rbd_dev);
+       dout("rbd_dev_snaps_register returned %d\n", ret);
+out:
+       up_write(&rbd_dev->header_rwsem);
+
+       return ret;
+}
+
 /*
  * Scan the rbd device's current snapshot list and compare it to the
  * newly-received snapshot context.  Remove any existing snapshots
@@ -2436,12 +2867,12 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
 
                        /* Existing snapshot not in the new snap context */
 
-                       if (rbd_dev->mapping.snap_id == snap->id)
-                               rbd_dev->mapping.snap_exists = false;
-                       __rbd_remove_snap_dev(snap);
+                       if (rbd_dev->spec->snap_id == snap->id)
+                               rbd_dev->exists = false;
+                       rbd_remove_snap_dev(snap);
                        dout("%ssnap id %llu has been removed\n",
-                               rbd_dev->mapping.snap_id == snap->id ?
-                                                               "mapped " : "",
+                               rbd_dev->spec->snap_id == snap->id ?
+                                                       "mapped " : "",
                                (unsigned long long) snap->id);
 
                        /* Done with this list entry; advance */
@@ -2559,7 +2990,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
        do {
                ret = rbd_req_sync_watch(rbd_dev);
                if (ret == -ERANGE) {
-                       rc = rbd_refresh_header(rbd_dev, NULL);
+                       rc = rbd_dev_refresh(rbd_dev, NULL);
                        if (rc < 0)
                                return rc;
                }
@@ -2621,8 +3052,8 @@ static void rbd_dev_id_put(struct rbd_device *rbd_dev)
                struct rbd_device *rbd_dev;
 
                rbd_dev = list_entry(tmp, struct rbd_device, node);
-               if (rbd_id > max_id)
-                       max_id = rbd_id;
+               if (rbd_dev->dev_id > max_id)
+                       max_id = rbd_dev->dev_id;
        }
        spin_unlock(&rbd_dev_list_lock);
 
@@ -2722,73 +3153,140 @@ static inline char *dup_token(const char **buf, size_t *lenp)
 }
 
 /*
- * This fills in the pool_name, image_name, image_name_len, rbd_dev,
- * rbd_md_name, and name fields of the given rbd_dev, based on the
- * list of monitor addresses and other options provided via
- * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
- * copy of the snapshot name to map if successful, or a
- * pointer-coded error otherwise.
+ * Parse the options provided for an "rbd add" (i.e., rbd image
+ * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
+ * and the data written is passed here via a NUL-terminated buffer.
+ * Returns 0 if successful or an error code otherwise.
+ *
+ * The information extracted from these options is recorded in
+ * the other parameters which return dynamically-allocated
+ * structures:
+ *  ceph_opts
+ *      The address of a pointer that will refer to a ceph options
+ *      structure.  Caller must release the returned pointer using
+ *      ceph_destroy_options() when it is no longer needed.
+ *  rbd_opts
+ *     Address of an rbd options pointer.  Fully initialized by
+ *     this function; caller must release with kfree().
+ *  spec
+ *     Address of an rbd image specification pointer.  Fully
+ *     initialized by this function based on parsed options.
+ *     Caller must release with rbd_spec_put().
  *
- * Note: rbd_dev is assumed to have been initially zero-filled.
+ * The options passed take this form:
+ *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
+ * where:
+ *  <mon_addrs>
+ *      A comma-separated list of one or more monitor addresses.
+ *      A monitor address is an ip address, optionally followed
+ *      by a port number (separated by a colon).
+ *        I.e.:  ip1[:port1][,ip2[:port2]...]
+ *  <options>
+ *      A comma-separated list of ceph and/or rbd options.
+ *  <pool_name>
+ *      The name of the rados pool containing the rbd image.
+ *  <image_name>
+ *      The name of the image in that pool to map.
+ *  <snap_id>
+ *      An optional snapshot id.  If provided, the mapping will
+ *      present data from the image at the time that snapshot was
+ *      created.  The image head is used if no snapshot id is
+ *      provided.  Snapshot mappings are always read-only.
  */
-static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
-                               const char *buf,
-                               const char **mon_addrs,
-                               size_t *mon_addrs_size,
-                               char *options,
-                               size_t options_size)
+static int rbd_add_parse_args(const char *buf,
+                               struct ceph_options **ceph_opts,
+                               struct rbd_options **opts,
+                               struct rbd_spec **rbd_spec)
 {
        size_t len;
-       char *err_ptr = ERR_PTR(-EINVAL);
-       char *snap_name;
+       char *options;
+       const char *mon_addrs;
+       size_t mon_addrs_size;
+       struct rbd_spec *spec = NULL;
+       struct rbd_options *rbd_opts = NULL;
+       struct ceph_options *copts;
+       int ret;
 
        /* The first four tokens are required */
 
        len = next_token(&buf);
        if (!len)
-               return err_ptr;
-       *mon_addrs_size = len + 1;
-       *mon_addrs = buf;
-
+               return -EINVAL; /* Missing monitor address(es) */
+       mon_addrs = buf;
+       mon_addrs_size = len + 1;
        buf += len;
 
-       len = copy_token(&buf, options, options_size);
-       if (!len || len >= options_size)
-               return err_ptr;
+       ret = -EINVAL;
+       options = dup_token(&buf, NULL);
+       if (!options)
+               return -ENOMEM;
+       if (!*options)
+               goto out_err;   /* Missing options */
 
-       err_ptr = ERR_PTR(-ENOMEM);
-       rbd_dev->pool_name = dup_token(&buf, NULL);
-       if (!rbd_dev->pool_name)
-               goto out_err;
+       spec = rbd_spec_alloc();
+       if (!spec)
+               goto out_mem;
 
-       rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
-       if (!rbd_dev->image_name)
-               goto out_err;
+       spec->pool_name = dup_token(&buf, NULL);
+       if (!spec->pool_name)
+               goto out_mem;
+       if (!*spec->pool_name)
+               goto out_err;   /* Missing pool name */
 
-       /* Snapshot name is optional */
+       spec->image_name = dup_token(&buf, &spec->image_name_len);
+       if (!spec->image_name)
+               goto out_mem;
+       if (!*spec->image_name)
+               goto out_err;   /* Missing image name */
+
+       /*
+        * Snapshot name is optional; default is to use "-"
+        * (indicating the head/no snapshot).
+        */
        len = next_token(&buf);
        if (!len) {
                buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
                len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
-       }
-       snap_name = kmalloc(len + 1, GFP_KERNEL);
-       if (!snap_name)
+       } else if (len > RBD_MAX_SNAP_NAME_LEN) {
+               ret = -ENAMETOOLONG;
                goto out_err;
-       memcpy(snap_name, buf, len);
-       *(snap_name + len) = '\0';
+       }
+       spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
+       if (!spec->snap_name)
+               goto out_mem;
+       memcpy(spec->snap_name, buf, len);
+       *(spec->snap_name + len) = '\0';
 
-dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
+       /* Initialize all rbd options to the defaults */
 
-       return snap_name;
+       rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
+       if (!rbd_opts)
+               goto out_mem;
+
+       rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
+
+       copts = ceph_parse_options(options, mon_addrs,
+                                       mon_addrs + mon_addrs_size - 1,
+                                       parse_rbd_opts_token, rbd_opts);
+       if (IS_ERR(copts)) {
+               ret = PTR_ERR(copts);
+               goto out_err;
+       }
+       kfree(options);
 
+       *ceph_opts = copts;
+       *opts = rbd_opts;
+       *rbd_spec = spec;
+
+       return 0;
+out_mem:
+       ret = -ENOMEM;
 out_err:
-       kfree(rbd_dev->image_name);
-       rbd_dev->image_name = NULL;
-       rbd_dev->image_name_len = 0;
-       kfree(rbd_dev->pool_name);
-       rbd_dev->pool_name = NULL;
+       kfree(rbd_opts);
+       rbd_spec_put(spec);
+       kfree(options);
 
-       return err_ptr;
+       return ret;
 }
 
 /*
@@ -2813,15 +3311,23 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
        void *response;
        void *p;
 
+       /*
+        * When probing a parent image, the image id is already
+        * known (and the image name likely is not).  There's no
+        * need to fetch the image id again in this case.
+        */
+       if (rbd_dev->spec->image_id)
+               return 0;
+
        /*
         * First, see if the format 2 image id file exists, and if
         * so, get the image's persistent id from it.
         */
-       size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
+       size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
        object_name = kmalloc(size, GFP_NOIO);
        if (!object_name)
                return -ENOMEM;
-       sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
+       sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
        dout("rbd id object name is %s\n", object_name);
 
        /* Response will be an encoded string, which includes a length */
@@ -2841,17 +3347,18 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
        dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
+       ret = 0;    /* rbd_req_sync_exec() can return positive */
 
        p = response;
-       rbd_dev->image_id = ceph_extract_encoded_string(&p,
+       rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
                                                p + RBD_IMAGE_ID_LEN_MAX,
-                                               &rbd_dev->image_id_len,
+                                               &rbd_dev->spec->image_id_len,
                                                GFP_NOIO);
-       if (IS_ERR(rbd_dev->image_id)) {
-               ret = PTR_ERR(rbd_dev->image_id);
-               rbd_dev->image_id = NULL;
+       if (IS_ERR(rbd_dev->spec->image_id)) {
+               ret = PTR_ERR(rbd_dev->spec->image_id);
+               rbd_dev->spec->image_id = NULL;
        } else {
-               dout("image_id is %s\n", rbd_dev->image_id);
+               dout("image_id is %s\n", rbd_dev->spec->image_id);
        }
 out:
        kfree(response);
@@ -2867,26 +3374,33 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
 
        /* Version 1 images have no id; empty string is used */
 
-       rbd_dev->image_id = kstrdup("", GFP_KERNEL);
-       if (!rbd_dev->image_id)
+       rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
+       if (!rbd_dev->spec->image_id)
                return -ENOMEM;
-       rbd_dev->image_id_len = 0;
+       rbd_dev->spec->image_id_len = 0;
 
        /* Record the header object name for this rbd image. */
 
-       size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
+       size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
        rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
        if (!rbd_dev->header_name) {
                ret = -ENOMEM;
                goto out_err;
        }
-       sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
+       sprintf(rbd_dev->header_name, "%s%s",
+               rbd_dev->spec->image_name, RBD_SUFFIX);
 
        /* Populate rbd image metadata */
 
        ret = rbd_read_header(rbd_dev, &rbd_dev->header);
        if (ret < 0)
                goto out_err;
+
+       /* Version 1 images have no parent (no layering) */
+
+       rbd_dev->parent_spec = NULL;
+       rbd_dev->parent_overlap = 0;
+
        rbd_dev->image_format = 1;
 
        dout("discovered version 1 image, header name is %s\n",
@@ -2897,8 +3411,8 @@ static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
 out_err:
        kfree(rbd_dev->header_name);
        rbd_dev->header_name = NULL;
-       kfree(rbd_dev->image_id);
-       rbd_dev->image_id = NULL;
+       kfree(rbd_dev->spec->image_id);
+       rbd_dev->spec->image_id = NULL;
 
        return ret;
 }
@@ -2913,12 +3427,12 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
         * Image id was filled in by the caller.  Record the header
         * object name for this rbd image.
         */
-       size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
+       size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
        rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
        if (!rbd_dev->header_name)
                return -ENOMEM;
        sprintf(rbd_dev->header_name, "%s%s",
-                       RBD_HEADER_PREFIX, rbd_dev->image_id);
+                       RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
 
        /* Get the size and object order for the image */
 
@@ -2932,12 +3446,20 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
        if (ret < 0)
                goto out_err;
 
-       /* Get the features for the image */
+       /* Get the and check features for the image */
 
        ret = rbd_dev_v2_features(rbd_dev);
        if (ret < 0)
                goto out_err;
 
+       /* If the image supports layering, get the parent info */
+
+       if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
+               ret = rbd_dev_v2_parent_info(rbd_dev);
+               if (ret < 0)
+                       goto out_err;
+       }
+
        /* crypto and compression type aren't (yet) supported for v2 images */
 
        rbd_dev->header.crypt_type = 0;
@@ -2955,8 +3477,11 @@ static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
        dout("discovered version 2 image, header name is %s\n",
                rbd_dev->header_name);
 
-       return -ENOTSUPP;
+       return 0;
 out_err:
+       rbd_dev->parent_overlap = 0;
+       rbd_spec_put(rbd_dev->parent_spec);
+       rbd_dev->parent_spec = NULL;
        kfree(rbd_dev->header_name);
        rbd_dev->header_name = NULL;
        kfree(rbd_dev->header.object_prefix);
@@ -2965,91 +3490,22 @@ out_err:
        return ret;
 }
 
-/*
- * Probe for the existence of the header object for the given rbd
- * device.  For format 2 images this includes determining the image
- * id.
- */
-static int rbd_dev_probe(struct rbd_device *rbd_dev)
+static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
 {
        int ret;
 
-       /*
-        * Get the id from the image id object.  If it's not a
-        * format 2 image, we'll get ENOENT back, and we'll assume
-        * it's a format 1 image.
-        */
-       ret = rbd_dev_image_id(rbd_dev);
-       if (ret)
-               ret = rbd_dev_v1_probe(rbd_dev);
-       else
-               ret = rbd_dev_v2_probe(rbd_dev);
+       /* no need to lock here, as rbd_dev is not registered yet */
+       ret = rbd_dev_snaps_update(rbd_dev);
        if (ret)
-               dout("probe failed, returning %d\n", ret);
-
-       return ret;
-}
-
-static ssize_t rbd_add(struct bus_type *bus,
-                      const char *buf,
-                      size_t count)
-{
-       char *options;
-       struct rbd_device *rbd_dev = NULL;
-       const char *mon_addrs = NULL;
-       size_t mon_addrs_size = 0;
-       struct ceph_osd_client *osdc;
-       int rc = -ENOMEM;
-       char *snap_name;
-
-       if (!try_module_get(THIS_MODULE))
-               return -ENODEV;
-
-       options = kmalloc(count, GFP_KERNEL);
-       if (!options)
-               goto err_out_mem;
-       rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
-       if (!rbd_dev)
-               goto err_out_mem;
-
-       /* static rbd_device initialization */
-       spin_lock_init(&rbd_dev->lock);
-       INIT_LIST_HEAD(&rbd_dev->node);
-       INIT_LIST_HEAD(&rbd_dev->snaps);
-       init_rwsem(&rbd_dev->header_rwsem);
-
-       /* parse add command */
-       snap_name = rbd_add_parse_args(rbd_dev, buf,
-                               &mon_addrs, &mon_addrs_size, options, count);
-       if (IS_ERR(snap_name)) {
-               rc = PTR_ERR(snap_name);
-               goto err_out_mem;
-       }
-
-       rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
-       if (rc < 0)
-               goto err_out_args;
-
-       /* pick the pool */
-       osdc = &rbd_dev->rbd_client->client->osdc;
-       rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
-       if (rc < 0)
-               goto err_out_client;
-       rbd_dev->pool_id = rc;
-
-       rc = rbd_dev_probe(rbd_dev);
-       if (rc < 0)
-               goto err_out_client;
-       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+               return ret;
 
-       /* no need to lock here, as rbd_dev is not registered yet */
-       rc = rbd_dev_snaps_update(rbd_dev);
-       if (rc)
-               goto err_out_header;
+       ret = rbd_dev_probe_update_spec(rbd_dev);
+       if (ret)
+               goto err_out_snaps;
 
-       rc = rbd_dev_set_mapping(rbd_dev, snap_name);
-       if (rc)
-               goto err_out_header;
+       ret = rbd_dev_set_mapping(rbd_dev);
+       if (ret)
+               goto err_out_snaps;
 
        /* generate unique id: find highest unique id, add one */
        rbd_dev_id_get(rbd_dev);
@@ -3061,34 +3517,33 @@ static ssize_t rbd_add(struct bus_type *bus,
 
        /* Get our block major device number. */
 
-       rc = register_blkdev(0, rbd_dev->name);
-       if (rc < 0)
+       ret = register_blkdev(0, rbd_dev->name);
+       if (ret < 0)
                goto err_out_id;
-       rbd_dev->major = rc;
+       rbd_dev->major = ret;
 
        /* Set up the blkdev mapping. */
 
-       rc = rbd_init_disk(rbd_dev);
-       if (rc)
+       ret = rbd_init_disk(rbd_dev);
+       if (ret)
                goto err_out_blkdev;
 
-       rc = rbd_bus_add_dev(rbd_dev);
-       if (rc)
+       ret = rbd_bus_add_dev(rbd_dev);
+       if (ret)
                goto err_out_disk;
 
        /*
         * At this point cleanup in the event of an error is the job
         * of the sysfs code (initiated by rbd_bus_del_dev()).
         */
-
        down_write(&rbd_dev->header_rwsem);
-       rc = rbd_dev_snaps_register(rbd_dev);
+       ret = rbd_dev_snaps_register(rbd_dev);
        up_write(&rbd_dev->header_rwsem);
-       if (rc)
+       if (ret)
                goto err_out_bus;
 
-       rc = rbd_init_watch_dev(rbd_dev);
-       if (rc)
+       ret = rbd_init_watch_dev(rbd_dev);
+       if (ret)
                goto err_out_bus;
 
        /* Everything's ready.  Announce the disk to the world. */
@@ -3098,37 +3553,119 @@ static ssize_t rbd_add(struct bus_type *bus,
        pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
                (unsigned long long) rbd_dev->mapping.size);
 
-       return count;
-
+       return ret;
 err_out_bus:
        /* this will also clean up rest of rbd_dev stuff */
 
        rbd_bus_del_dev(rbd_dev);
-       kfree(options);
-       return rc;
 
+       return ret;
 err_out_disk:
        rbd_free_disk(rbd_dev);
 err_out_blkdev:
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
 err_out_id:
        rbd_dev_id_put(rbd_dev);
-err_out_header:
-       rbd_header_free(&rbd_dev->header);
+err_out_snaps:
+       rbd_remove_all_snaps(rbd_dev);
+
+       return ret;
+}
+
+/*
+ * Probe for the existence of the header object for the given rbd
+ * device.  For format 2 images this includes determining the image
+ * id.
+ */
+static int rbd_dev_probe(struct rbd_device *rbd_dev)
+{
+       int ret;
+
+       /*
+        * Get the id from the image id object.  If it's not a
+        * format 2 image, we'll get ENOENT back, and we'll assume
+        * it's a format 1 image.
+        */
+       ret = rbd_dev_image_id(rbd_dev);
+       if (ret)
+               ret = rbd_dev_v1_probe(rbd_dev);
+       else
+               ret = rbd_dev_v2_probe(rbd_dev);
+       if (ret) {
+               dout("probe failed, returning %d\n", ret);
+
+               return ret;
+       }
+
+       ret = rbd_dev_probe_finish(rbd_dev);
+       if (ret)
+               rbd_header_free(&rbd_dev->header);
+
+       return ret;
+}
+
+static ssize_t rbd_add(struct bus_type *bus,
+                      const char *buf,
+                      size_t count)
+{
+       struct rbd_device *rbd_dev = NULL;
+       struct ceph_options *ceph_opts = NULL;
+       struct rbd_options *rbd_opts = NULL;
+       struct rbd_spec *spec = NULL;
+       struct rbd_client *rbdc;
+       struct ceph_osd_client *osdc;
+       int rc = -ENOMEM;
+
+       if (!try_module_get(THIS_MODULE))
+               return -ENODEV;
+
+       /* parse add command */
+       rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
+       if (rc < 0)
+               goto err_out_module;
+
+       rbdc = rbd_get_client(ceph_opts);
+       if (IS_ERR(rbdc)) {
+               rc = PTR_ERR(rbdc);
+               goto err_out_args;
+       }
+       ceph_opts = NULL;       /* rbd_dev client now owns this */
+
+       /* pick the pool */
+       osdc = &rbdc->client->osdc;
+       rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
+       if (rc < 0)
+               goto err_out_client;
+       spec->pool_id = (u64) rc;
+
+       rbd_dev = rbd_dev_create(rbdc, spec);
+       if (!rbd_dev)
+               goto err_out_client;
+       rbdc = NULL;            /* rbd_dev now owns this */
+       spec = NULL;            /* rbd_dev now owns this */
+
+       rbd_dev->mapping.read_only = rbd_opts->read_only;
+       kfree(rbd_opts);
+       rbd_opts = NULL;        /* done with this */
+
+       rc = rbd_dev_probe(rbd_dev);
+       if (rc < 0)
+               goto err_out_rbd_dev;
+
+       return count;
+err_out_rbd_dev:
+       rbd_dev_destroy(rbd_dev);
 err_out_client:
-       kfree(rbd_dev->header_name);
-       rbd_put_client(rbd_dev);
-       kfree(rbd_dev->image_id);
+       rbd_put_client(rbdc);
 err_out_args:
-       kfree(rbd_dev->mapping.snap_name);
-       kfree(rbd_dev->image_name);
-       kfree(rbd_dev->pool_name);
-err_out_mem:
-       kfree(rbd_dev);
-       kfree(options);
+       if (ceph_opts)
+               ceph_destroy_options(ceph_opts);
+       kfree(rbd_opts);
+       rbd_spec_put(spec);
+err_out_module:
+       module_put(THIS_MODULE);
 
        dout("Error adding device %s\n", buf);
-       module_put(THIS_MODULE);
 
        return (ssize_t) rc;
 }
@@ -3163,7 +3700,6 @@ static void rbd_dev_release(struct device *dev)
        if (rbd_dev->watch_event)
                rbd_req_sync_unwatch(rbd_dev);
 
-       rbd_put_client(rbd_dev);
 
        /* clean up and free blkdev */
        rbd_free_disk(rbd_dev);
@@ -3173,13 +3709,9 @@ static void rbd_dev_release(struct device *dev)
        rbd_header_free(&rbd_dev->header);
 
        /* done with the id, and with the rbd_dev */
-       kfree(rbd_dev->mapping.snap_name);
-       kfree(rbd_dev->image_id);
-       kfree(rbd_dev->header_name);
-       kfree(rbd_dev->pool_name);
-       kfree(rbd_dev->image_name);
        rbd_dev_id_put(rbd_dev);
-       kfree(rbd_dev);
+       rbd_assert(rbd_dev->rbd_client != NULL);
+       rbd_dev_destroy(rbd_dev);
 
        /* release module ref */
        module_put(THIS_MODULE);
@@ -3211,7 +3743,12 @@ static ssize_t rbd_remove(struct bus_type *bus,
                goto done;
        }
 
-       __rbd_remove_all_snaps(rbd_dev);
+       if (rbd_dev->open_count) {
+               ret = -EBUSY;
+               goto done;
+       }
+
+       rbd_remove_all_snaps(rbd_dev);
        rbd_bus_del_dev(rbd_dev);
 
 done:
index cbe77fa105baace065c8714c348c3cf782391b78..49d77cbcf8bdb0601ac9db5f1ad80309a80cb32e 100644 (file)
@@ -46,8 +46,6 @@
 #define RBD_MIN_OBJ_ORDER       16
 #define RBD_MAX_OBJ_ORDER       30
 
-#define RBD_MAX_SEG_NAME_LEN   128
-
 #define RBD_COMP_NONE          0
 #define RBD_CRYPT_NONE         0
 
index 6690269f5dde2c5617cff3238167788f93636985..064d1a68d2c1d561cc3cd2b4bf63b511b4a76430 100644 (file)
@@ -267,6 +267,14 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
        kfree(req->r_pages);
 }
 
+static void ceph_unlock_page_vector(struct page **pages, int num_pages)
+{
+       int i;
+
+       for (i = 0; i < num_pages; i++)
+               unlock_page(pages[i]);
+}
+
 /*
  * start an async read(ahead) operation.  return nr_pages we submitted
  * a read for on success, or negative error code.
@@ -347,6 +355,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
        return nr_pages;
 
 out_pages:
+       ceph_unlock_page_vector(pages, nr_pages);
        ceph_release_page_vector(pages, nr_pages);
 out:
        ceph_osdc_put_request(req);
@@ -1078,23 +1087,51 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
                            struct page **pagep, void **fsdata)
 {
        struct inode *inode = file->f_dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_file_info *fi = file->private_data;
        struct page *page;
        pgoff_t index = pos >> PAGE_CACHE_SHIFT;
-       int r;
+       int r, want, got = 0;
+
+       if (fi->fmode & CEPH_FILE_MODE_LAZY)
+               want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+       else
+               want = CEPH_CAP_FILE_BUFFER;
+
+       dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
+            inode, ceph_vinop(inode), pos, len, inode->i_size);
+       r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len);
+       if (r < 0)
+               return r;
+       dout("write_begin %p %llx.%llx %llu~%u  got cap refs on %s\n",
+            inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
+       if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
+               ceph_put_cap_refs(ci, got);
+               return -EAGAIN;
+       }
 
        do {
                /* get a page */
                page = grab_cache_page_write_begin(mapping, index, 0);
-               if (!page)
-                       return -ENOMEM;
-               *pagep = page;
+               if (!page) {
+                       r = -ENOMEM;
+                       break;
+               }
 
                dout("write_begin file %p inode %p page %p %d~%d\n", file,
                     inode, page, (int)pos, (int)len);
 
                r = ceph_update_writeable_page(file, pos, len, page);
+               if (r)
+                       page_cache_release(page);
        } while (r == -EAGAIN);
 
+       if (r) {
+               ceph_put_cap_refs(ci, got);
+       } else {
+               *pagep = page;
+               *(int *)fsdata = got;
+       }
        return r;
 }
 
@@ -1108,10 +1145,12 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
                          struct page *page, void *fsdata)
 {
        struct inode *inode = file->f_dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_mds_client *mdsc = fsc->mdsc;
        unsigned from = pos & (PAGE_CACHE_SIZE - 1);
        int check_cap = 0;
+       int got = (unsigned long)fsdata;
 
        dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
             inode, page, (int)pos, (int)copied, (int)len);
@@ -1134,6 +1173,19 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
        up_read(&mdsc->snap_rwsem);
        page_cache_release(page);
 
+       if (copied > 0) {
+               int dirty;
+               spin_lock(&ci->i_ceph_lock);
+               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+               spin_unlock(&ci->i_ceph_lock);
+               if (dirty)
+                       __mark_inode_dirty(inode, dirty);
+       }
+
+       dout("write_end %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
+            inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
+       ceph_put_cap_refs(ci, got);
+
        if (check_cap)
                ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
 
index 3251e9cc64014fc99f7660d46388427732c88114..a1d9bb30c1bf9fc5460286ff465e2bcbc338bf30 100644 (file)
@@ -236,8 +236,10 @@ static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
        if (!ctx) {
                cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
                if (cap) {
+                       spin_lock(&mdsc->caps_list_lock);
                        mdsc->caps_use_count++;
                        mdsc->caps_total_count++;
+                       spin_unlock(&mdsc->caps_list_lock);
                }
                return cap;
        }
@@ -1349,11 +1351,15 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
                if (!ci->i_head_snapc)
                        ci->i_head_snapc = ceph_get_snap_context(
                                ci->i_snap_realm->cached_context);
-               dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode,
-                       ci->i_head_snapc);
+               dout(" inode %p now dirty snapc %p auth cap %p\n",
+                    &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
                BUG_ON(!list_empty(&ci->i_dirty_item));
                spin_lock(&mdsc->cap_dirty_lock);
-               list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+               if (ci->i_auth_cap)
+                       list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+               else
+                       list_add(&ci->i_dirty_item,
+                                &mdsc->cap_dirty_migrating);
                spin_unlock(&mdsc->cap_dirty_lock);
                if (ci->i_flushing_caps == 0) {
                        ihold(inode);
@@ -2388,7 +2394,7 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
                            &atime);
 
        /* max size increase? */
-       if (max_size != ci->i_max_size) {
+       if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
                dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
                ci->i_max_size = max_size;
                if (max_size >= ci->i_wanted_max_size) {
@@ -2745,6 +2751,7 @@ static void handle_cap_import(struct ceph_mds_client *mdsc,
 
        /* make sure we re-request max_size, if necessary */
        spin_lock(&ci->i_ceph_lock);
+       ci->i_wanted_max_size = 0;  /* reset */
        ci->i_requested_max_size = 0;
        spin_unlock(&ci->i_ceph_lock);
 }
@@ -2840,8 +2847,6 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        case CEPH_CAP_OP_IMPORT:
                handle_cap_import(mdsc, inode, h, session,
                                  snaptrace, snaptrace_len);
-               ceph_check_caps(ceph_inode(inode), 0, session);
-               goto done_unlocked;
        }
 
        /* the rest require a cap */
@@ -2858,6 +2863,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        switch (op) {
        case CEPH_CAP_OP_REVOKE:
        case CEPH_CAP_OP_GRANT:
+       case CEPH_CAP_OP_IMPORT:
                handle_cap_grant(inode, h, session, cap, msg->middle);
                goto done_unlocked;
 
index d4dfdcf76d7fdfbff4c318e1e4f5d9a2487d0088..e51558fca3a346a93797561e26e148de037d971a 100644 (file)
@@ -712,63 +712,53 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
        struct ceph_osd_client *osdc =
                &ceph_sb_to_client(inode->i_sb)->client->osdc;
        loff_t endoff = pos + iov->iov_len;
-       int want, got = 0;
-       int ret, err;
+       int got = 0;
+       int ret, err, written;
 
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
 
 retry_snap:
+       written = 0;
        if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
                return -ENOSPC;
        __ceph_do_pending_vmtruncate(inode);
-       dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
-            inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
-            inode->i_size);
-       if (fi->fmode & CEPH_FILE_MODE_LAZY)
-               want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
-       else
-               want = CEPH_CAP_FILE_BUFFER;
-       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
-       if (ret < 0)
-               goto out_put;
-
-       dout("aio_write %p %llx.%llx %llu~%u  got cap refs on %s\n",
-            inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
-            ceph_cap_string(got));
-
-       if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-           (iocb->ki_filp->f_flags & O_DIRECT) ||
-           (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
-           (fi->flags & CEPH_F_SYNC)) {
-               ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
-                       &iocb->ki_pos);
-       } else {
-               /*
-                * buffered write; drop Fw early to avoid slow
-                * revocation if we get stuck on balance_dirty_pages
-                */
-               int dirty;
-
-               spin_lock(&ci->i_ceph_lock);
-               dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
-               spin_unlock(&ci->i_ceph_lock);
-               ceph_put_cap_refs(ci, got);
 
+       /*
+        * try to do a buffered write.  if we don't have sufficient
+        * caps, we'll get -EAGAIN from generic_file_aio_write, or a
+        * short write if we only get caps for some pages.
+        */
+       if (!(iocb->ki_filp->f_flags & O_DIRECT) &&
+           !(inode->i_sb->s_flags & MS_SYNCHRONOUS) &&
+           !(fi->flags & CEPH_F_SYNC)) {
                ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
+               if (ret >= 0)
+                       written = ret;
+
                if ((ret >= 0 || ret == -EIOCBQUEUED) &&
                    ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
                     || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
-                       err = vfs_fsync_range(file, pos, pos + ret - 1, 1);
+                       err = vfs_fsync_range(file, pos, pos + written - 1, 1);
                        if (err < 0)
                                ret = err;
                }
+               if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff)
+                       goto out;
+       }
 
-               if (dirty)
-                       __mark_inode_dirty(inode, dirty);
+       dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
+            inode, ceph_vinop(inode), pos + written,
+            (unsigned)iov->iov_len - written, inode->i_size);
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff);
+       if (ret < 0)
                goto out;
-       }
 
+       dout("aio_write %p %llx.%llx %llu~%u  got cap refs on %s\n",
+            inode, ceph_vinop(inode), pos + written,
+            (unsigned)iov->iov_len - written, ceph_cap_string(got));
+       ret = ceph_sync_write(file, iov->iov_base + written,
+                             iov->iov_len - written, &iocb->ki_pos);
        if (ret >= 0) {
                int dirty;
                spin_lock(&ci->i_ceph_lock);
@@ -777,13 +767,10 @@ retry_snap:
                if (dirty)
                        __mark_inode_dirty(inode, dirty);
        }
-
-out_put:
        dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
-            inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
-            ceph_cap_string(got));
+            inode, ceph_vinop(inode), pos + written,
+            (unsigned)iov->iov_len - written, ceph_cap_string(got));
        ceph_put_cap_refs(ci, got);
-
 out:
        if (ret == -EOLDSNAPC) {
                dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
index ba95eea201bf1ef8e809159d6b41019fb9169752..2971eaa65cdce9b3b31edca64ea979eeb814ab7a 100644 (file)
@@ -1466,7 +1466,7 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 to;
-       int wrbuffer_refs, wake = 0;
+       int wrbuffer_refs, finish = 0;
 
 retry:
        spin_lock(&ci->i_ceph_lock);
@@ -1498,15 +1498,18 @@ retry:
        truncate_inode_pages(inode->i_mapping, to);
 
        spin_lock(&ci->i_ceph_lock);
-       ci->i_truncate_pending--;
-       if (ci->i_truncate_pending == 0)
-               wake = 1;
+       if (to == ci->i_truncate_size) {
+               ci->i_truncate_pending = 0;
+               finish = 1;
+       }
        spin_unlock(&ci->i_ceph_lock);
+       if (!finish)
+               goto retry;
 
        if (wrbuffer_refs == 0)
                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
-       if (wake)
-               wake_up_all(&ci->i_cap_wq);
+
+       wake_up_all(&ci->i_cap_wq);
 }
 
 
index 1bcf712655d90a6001db4e8de8e86cfa3a70187d..9165eb8309eba442efa237aaa313a43c92641607 100644 (file)
@@ -1590,7 +1590,7 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
        } else if (rpath || rino) {
                *ino = rino;
                *ppath = rpath;
-               *pathlen = strlen(rpath);
+               *pathlen = rpath ? strlen(rpath) : 0;
                dout(" path %.*s\n", *pathlen, rpath);
        }
 
@@ -1876,9 +1876,14 @@ finish:
 static void __wake_requests(struct ceph_mds_client *mdsc,
                            struct list_head *head)
 {
-       struct ceph_mds_request *req, *nreq;
+       struct ceph_mds_request *req;
+       LIST_HEAD(tmp_list);
+
+       list_splice_init(head, &tmp_list);
 
-       list_for_each_entry_safe(req, nreq, head, r_wait) {
+       while (!list_empty(&tmp_list)) {
+               req = list_entry(tmp_list.next,
+                                struct ceph_mds_request, r_wait);
                list_del_init(&req->r_wait);
                __do_request(mdsc, req);
        }
index 2eb43f211325860a9d8c4eb870fa32c494568966..e86aa9948124b3601e1734515e4fb629627855de 100644 (file)
@@ -403,8 +403,6 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)
                seq_printf(m, ",mount_timeout=%d", opt->mount_timeout);
        if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
                seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl);
-       if (opt->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
-               seq_printf(m, ",osdtimeout=%d", opt->osd_timeout);
        if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
                seq_printf(m, ",osdkeepalivetimeout=%d",
                           opt->osd_keepalive_timeout);
@@ -849,7 +847,7 @@ static int ceph_register_bdi(struct super_block *sb,
                fsc->backing_dev_info.ra_pages =
                        default_backing_dev_info.ra_pages;
 
-       err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d",
+       err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
                           atomic_long_inc_return(&bdi_seq));
        if (!err)
                sb->s_bdi = &fsc->backing_dev_info;
index 2a9a9abc91260c09a7940136a965a08209c5828b..12731a19ef060792b5500c3731551b636c437fe8 100644 (file)
@@ -114,6 +114,7 @@ struct backing_dev_info {
 int bdi_init(struct backing_dev_info *bdi);
 void bdi_destroy(struct backing_dev_info *bdi);
 
+__printf(3, 4)
 int bdi_register(struct backing_dev_info *bdi, struct device *parent,
                const char *fmt, ...);
 int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
index 6470792b13d3c79734a9a3ae7321a9df691cec78..084d3c622b12a8ac58ad626da2e8f325d6d3d1c9 100644 (file)
@@ -43,7 +43,6 @@ struct ceph_options {
        struct ceph_entity_addr my_addr;
        int mount_timeout;
        int osd_idle_ttl;
-       int osd_timeout;
        int osd_keepalive_timeout;
 
        /*
@@ -63,7 +62,6 @@ struct ceph_options {
  * defaults
  */
 #define CEPH_MOUNT_TIMEOUT_DEFAULT  60
-#define CEPH_OSD_TIMEOUT_DEFAULT    60  /* seconds */
 #define CEPH_OSD_KEEPALIVE_DEFAULT  5
 #define CEPH_OSD_IDLE_TTL_DEFAULT    60
 
index e37acbe989a906c8cdead48c6eea2e68f53e6ed8..10a417f9f76fa9ccd2e2fffbc27950dbc355df59 100644 (file)
@@ -123,6 +123,7 @@ extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
 extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
                                struct ceph_pg pgid);
 
+extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id);
 extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
 
 #endif
index de91fbdf127e64b343f3fc3530d6d7fb9bd4e59e..2c04afeead1cb3d776feea90e51f93731417b1a3 100644 (file)
@@ -87,6 +87,8 @@ struct ceph_pg {
  *
  *  lpgp_num -- as above.
  */
+#define CEPH_NOPOOL  ((__u64) (-1))  /* pool id not defined */
+
 #define CEPH_PG_TYPE_REP     1
 #define CEPH_PG_TYPE_RAID4   2
 #define CEPH_PG_POOL_VERSION 2
index a8020293f34210620eff72f6193ee35e064f5d69..ee71ea26777ae892fafd13a8338b3f556d9f427b 100644 (file)
@@ -305,7 +305,6 @@ ceph_parse_options(char *options, const char *dev_name,
 
        /* start with defaults */
        opt->flags = CEPH_OPT_DEFAULT;
-       opt->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
        opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
        opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
        opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;   /* seconds */
@@ -391,7 +390,7 @@ ceph_parse_options(char *options, const char *dev_name,
 
                        /* misc */
                case Opt_osdtimeout:
-                       opt->osd_timeout = intval;
+                       pr_warning("ignoring deprecated osdtimeout option\n");
                        break;
                case Opt_osdkeepalivetimeout:
                        opt->osd_keepalive_timeout = intval;
index 3ef1759403b411fe53595e2ddf1eb6314a4f9ef8..4d111fd2b4923f7fe7cd287de00029b19c61f274 100644 (file)
@@ -2244,22 +2244,62 @@ bad_tag:
 
 
 /*
- * Atomically queue work on a connection.  Bump @con reference to
- * avoid races with connection teardown.
+ * Atomically queue work on a connection after the specified delay.
+ * Bump @con reference to avoid races with connection teardown.
+ * Returns 0 if work was queued, or an error code otherwise.
  */
-static void queue_con(struct ceph_connection *con)
+static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
 {
        if (!con->ops->get(con)) {
-               dout("queue_con %p ref count 0\n", con);
-               return;
+               dout("%s %p ref count 0\n", __func__, con);
+
+               return -ENOENT;
        }
 
-       if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
-               dout("queue_con %p - already queued\n", con);
+       if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
+               dout("%s %p - already queued\n", __func__, con);
                con->ops->put(con);
-       } else {
-               dout("queue_con %p\n", con);
+
+               return -EBUSY;
        }
+
+       dout("%s %p %lu\n", __func__, con, delay);
+
+       return 0;
+}
+
+static void queue_con(struct ceph_connection *con)
+{
+       (void) queue_con_delay(con, 0);
+}
+
+static bool con_sock_closed(struct ceph_connection *con)
+{
+       if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
+               return false;
+
+#define CASE(x)                                                                \
+       case CON_STATE_ ## x:                                           \
+               con->error_msg = "socket closed (con state " #x ")";    \
+               break;
+
+       switch (con->state) {
+       CASE(CLOSED);
+       CASE(PREOPEN);
+       CASE(CONNECTING);
+       CASE(NEGOTIATING);
+       CASE(OPEN);
+       CASE(STANDBY);
+       default:
+               pr_warning("%s con %p unrecognized state %lu\n",
+                       __func__, con, con->state);
+               con->error_msg = "unrecognized con state";
+               BUG();
+               break;
+       }
+#undef CASE
+
+       return true;
 }
 
 /*
@@ -2273,35 +2313,16 @@ static void con_work(struct work_struct *work)
 
        mutex_lock(&con->mutex);
 restart:
-       if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
-               switch (con->state) {
-               case CON_STATE_CONNECTING:
-                       con->error_msg = "connection failed";
-                       break;
-               case CON_STATE_NEGOTIATING:
-                       con->error_msg = "negotiation failed";
-                       break;
-               case CON_STATE_OPEN:
-                       con->error_msg = "socket closed";
-                       break;
-               default:
-                       dout("unrecognized con state %d\n", (int)con->state);
-                       con->error_msg = "unrecognized con state";
-                       BUG();
-               }
+       if (con_sock_closed(con))
                goto fault;
-       }
 
        if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
                dout("con_work %p backing off\n", con);
-               if (queue_delayed_work(ceph_msgr_wq, &con->work,
-                                      round_jiffies_relative(con->delay))) {
-                       dout("con_work %p backoff %lu\n", con, con->delay);
-                       mutex_unlock(&con->mutex);
-                       return;
-               } else {
+               ret = queue_con_delay(con, round_jiffies_relative(con->delay));
+               if (ret) {
                        dout("con_work %p FAILED to back off %lu\n", con,
                             con->delay);
+                       BUG_ON(ret == -ENOENT);
                        set_bit(CON_FLAG_BACKOFF, &con->flags);
                }
                goto done;
@@ -2356,7 +2377,7 @@ fault:
 static void ceph_fault(struct ceph_connection *con)
        __releases(con->mutex)
 {
-       pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
+       pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
               ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
        dout("fault %p state %lu to peer %s\n",
             con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
@@ -2398,24 +2419,8 @@ static void ceph_fault(struct ceph_connection *con)
                        con->delay = BASE_DELAY_INTERVAL;
                else if (con->delay < MAX_DELAY_INTERVAL)
                        con->delay *= 2;
-               con->ops->get(con);
-               if (queue_delayed_work(ceph_msgr_wq, &con->work,
-                                      round_jiffies_relative(con->delay))) {
-                       dout("fault queued %p delay %lu\n", con, con->delay);
-               } else {
-                       con->ops->put(con);
-                       dout("fault failed to queue %p delay %lu, backoff\n",
-                            con, con->delay);
-                       /*
-                        * In many cases we see a socket state change
-                        * while con_work is running and end up
-                        * queuing (non-delayed) work, such that we
-                        * can't backoff with a delay.  Set a flag so
-                        * that when con_work restarts we schedule the
-                        * delay then.
-                        */
-                       set_bit(CON_FLAG_BACKOFF, &con->flags);
-               }
+               set_bit(CON_FLAG_BACKOFF, &con->flags);
+               queue_con(con);
        }
 
 out_unlock:
index c1d756cc74486178ed8d1f662d14172e3e59bb1c..780caf6b049188cd20e043f2206b23600223b820 100644 (file)
@@ -221,6 +221,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        kref_init(&req->r_kref);
        init_completion(&req->r_completion);
        init_completion(&req->r_safe_completion);
+       RB_CLEAR_NODE(&req->r_node);
        INIT_LIST_HEAD(&req->r_unsafe_item);
        INIT_LIST_HEAD(&req->r_linger_item);
        INIT_LIST_HEAD(&req->r_linger_osd);
@@ -580,7 +581,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 
        dout("__kick_osd_requests osd%d\n", osd->o_osd);
        err = __reset_osd(osdc, osd);
-       if (err == -EAGAIN)
+       if (err)
                return;
 
        list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -607,14 +608,6 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
        }
 }
 
-static void kick_osd_requests(struct ceph_osd_client *osdc,
-                             struct ceph_osd *kickosd)
-{
-       mutex_lock(&osdc->request_mutex);
-       __kick_osd_requests(osdc, kickosd);
-       mutex_unlock(&osdc->request_mutex);
-}
-
 /*
  * If the osd connection drops, we need to resubmit all requests.
  */
@@ -628,7 +621,9 @@ static void osd_reset(struct ceph_connection *con)
        dout("osd_reset osd%d\n", osd->o_osd);
        osdc = osd->o_osdc;
        down_read(&osdc->map_sem);
-       kick_osd_requests(osdc, osd);
+       mutex_lock(&osdc->request_mutex);
+       __kick_osd_requests(osdc, osd);
+       mutex_unlock(&osdc->request_mutex);
        send_queued(osdc);
        up_read(&osdc->map_sem);
 }
@@ -647,6 +642,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
        osd->o_osd = onum;
+       RB_CLEAR_NODE(&osd->o_node);
        INIT_LIST_HEAD(&osd->o_requests);
        INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
@@ -750,6 +746,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        if (list_empty(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
+               ret = -ENODEV;
        } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
                          &osd->o_con.peer_addr,
                          sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -876,9 +873,9 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                        req->r_osd = NULL;
        }
 
+       list_del_init(&req->r_req_lru_item);
        ceph_osdc_put_request(req);
 
-       list_del_init(&req->r_req_lru_item);
        if (osdc->num_requests == 0) {
                dout(" no requests, canceling timeout\n");
                __cancel_osd_timeout(osdc);
@@ -910,8 +907,8 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
                                        struct ceph_osd_request *req)
 {
        dout("__unregister_linger_request %p\n", req);
+       list_del_init(&req->r_linger_item);
        if (req->r_osd) {
-               list_del_init(&req->r_linger_item);
                list_del_init(&req->r_linger_osd);
 
                if (list_empty(&req->r_osd->o_requests) &&
@@ -1090,12 +1087,10 @@ static void handle_timeout(struct work_struct *work)
 {
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
-       struct ceph_osd_request *req, *last_req = NULL;
+       struct ceph_osd_request *req;
        struct ceph_osd *osd;
-       unsigned long timeout = osdc->client->options->osd_timeout * HZ;
        unsigned long keepalive =
                osdc->client->options->osd_keepalive_timeout * HZ;
-       unsigned long last_stamp = 0;
        struct list_head slow_osds;
        dout("timeout\n");
        down_read(&osdc->map_sem);
@@ -1104,37 +1099,6 @@ static void handle_timeout(struct work_struct *work)
 
        mutex_lock(&osdc->request_mutex);
 
-       /*
-        * reset osds that appear to be _really_ unresponsive.  this
-        * is a failsafe measure.. we really shouldn't be getting to
-        * this point if the system is working properly.  the monitors
-        * should mark the osd as failed and we should find out about
-        * it from an updated osd map.
-        */
-       while (timeout && !list_empty(&osdc->req_lru)) {
-               req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
-                                r_req_lru_item);
-
-               /* hasn't been long enough since we sent it? */
-               if (time_before(jiffies, req->r_stamp + timeout))
-                       break;
-
-               /* hasn't been long enough since it was acked? */
-               if (req->r_request->ack_stamp == 0 ||
-                   time_before(jiffies, req->r_request->ack_stamp + timeout))
-                       break;
-
-               BUG_ON(req == last_req && req->r_stamp == last_stamp);
-               last_req = req;
-               last_stamp = req->r_stamp;
-
-               osd = req->r_osd;
-               BUG_ON(!osd);
-               pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
-                          req->r_tid, osd->o_osd);
-               __kick_osd_requests(osdc, osd);
-       }
-
        /*
         * ping osds that are a bit slow.  this ensures that if there
         * is a break in the TCP connection we will notice, and reopen
@@ -1364,8 +1328,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 
                dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
                     req->r_osd ? req->r_osd->o_osd : -1);
-               __unregister_linger_request(osdc, req);
                __register_request(osdc, req);
+               __unregister_linger_request(osdc, req);
        }
        mutex_unlock(&osdc->request_mutex);
 
@@ -1599,6 +1563,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
        event->data = data;
        event->osdc = osdc;
        INIT_LIST_HEAD(&event->osd_node);
+       RB_CLEAR_NODE(&event->node);
        kref_init(&event->kref);   /* one ref for us */
        kref_get(&event->kref);    /* one ref for the caller */
        init_completion(&event->completion);
index 5433fb0eb3c68596b375388b3df74a03c6e8441c..de73214b5d26c04989905bafc62bd9c056f2131c 100644 (file)
@@ -469,6 +469,22 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
        return NULL;
 }
 
+const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
+{
+       struct ceph_pg_pool_info *pi;
+
+       if (id == CEPH_NOPOOL)
+               return NULL;
+
+       if (WARN_ON_ONCE(id > (u64) INT_MAX))
+               return NULL;
+
+       pi = __lookup_pg_pool(&map->pg_pools, (int) id);
+
+       return pi ? pi->name : NULL;
+}
+EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
+
 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
 {
        struct rb_node *rbp;
@@ -645,10 +661,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
        ceph_decode_32_safe(p, end, max, bad);
        while (max--) {
                ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
+               err = -ENOMEM;
                pi = kzalloc(sizeof(*pi), GFP_NOFS);
                if (!pi)
                        goto bad;
                pi->id = ceph_decode_32(p);
+               err = -EINVAL;
                ev = ceph_decode_8(p); /* encoding version */
                if (ev > CEPH_PG_POOL_VERSION) {
                        pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
@@ -664,8 +682,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
                __insert_pg_pool(&map->pg_pools, pi);
        }
 
-       if (version >= 5 && __decode_pool_names(p, end, map) < 0)
-               goto bad;
+       if (version >= 5) {
+               err = __decode_pool_names(p, end, map);
+               if (err < 0) {
+                       dout("fail to decode pool names");
+                       goto bad;
+               }
+       }
 
        ceph_decode_32_safe(p, end, map->pool_max, bad);
 
@@ -745,7 +768,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
        return map;
 
 bad:
-       dout("osdmap_decode fail\n");
+       dout("osdmap_decode fail err %d\n", err);
        ceph_osdmap_destroy(map);
        return ERR_PTR(err);
 }
@@ -839,6 +862,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
                if (ev > CEPH_PG_POOL_VERSION) {
                        pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
                                   ev, CEPH_PG_POOL_VERSION);
+                       err = -EINVAL;
                        goto bad;
                }
                pi = __lookup_pg_pool(&map->pg_pools, pool);
@@ -855,8 +879,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
                if (err < 0)
                        goto bad;
        }
-       if (version >= 5 && __decode_pool_names(p, end, map) < 0)
-               goto bad;
+       if (version >= 5) {
+               err = __decode_pool_names(p, end, map);
+               if (err < 0)
+                       goto bad;
+       }
 
        /* old_pool */
        ceph_decode_32_safe(p, end, len, bad);
@@ -932,15 +959,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
                        (void) __remove_pg_mapping(&map->pg_temp, pgid);
 
                        /* insert */
-                       if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
-                               err = -EINVAL;
+                       err = -EINVAL;
+                       if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
                                goto bad;
-                       }
+                       err = -ENOMEM;
                        pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
-                       if (!pg) {
-                               err = -ENOMEM;
+                       if (!pg)
                                goto bad;
-                       }
                        pg->pgid = pgid;
                        pg->len = pglen;
                        for (j = 0; j < pglen; j++)