rbd: implement sync object read with new code
authorAlex Elder <elder@inktank.com>
Thu, 17 Jan 2013 18:25:27 +0000 (12:25 -0600)
committerSage Weil <sage@inktank.com>
Thu, 14 Feb 2013 02:29:08 +0000 (18:29 -0800)
Reimplement the synchronous read operation used for reading a
version 1 header using the new request tracking code.  Name the
resulting function rbd_obj_read_sync() to better reflect that
it's a full object operation, not an object request.  To do this,
implement a new OBJ_REQUEST_PAGES object request type.

This implements a new mechanism to allow the caller to wait for
completion for an rbd_obj_request by calling rbd_obj_request_wait().

This partially resolves:
    http://tracker.newdream.net/issues/3755

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
drivers/block/rbd.c

index c1bb649b4ad11a5a20bf10ac0cb62e39b88764d0..3f5eaea444a0d08fb816c8d49d2f49946f2ccb5a 100644 (file)
@@ -170,7 +170,7 @@ typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
 struct rbd_obj_request;
 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 
-enum obj_request_type { OBJ_REQUEST_BIO };     /* More types to come */
+enum obj_request_type { OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES };
 
 struct rbd_obj_request {
        const char              *object_name;
@@ -182,7 +182,13 @@ struct rbd_obj_request {
        u32                     which;          /* posn image request list */
 
        enum obj_request_type   type;
-       struct bio              *bio_list;
+       union {
+               struct bio      *bio_list;
+               struct {
+                       struct page     **pages;
+                       u32             page_count;
+               };
+       };
 
        struct ceph_osd_request *osd_req;
 
@@ -192,6 +198,7 @@ struct rbd_obj_request {
        atomic_t                done;
 
        rbd_obj_callback_t      callback;
+       struct completion       completion;
 
        struct kref             kref;
 };
@@ -1077,6 +1084,7 @@ static bool obj_request_type_valid(enum obj_request_type type)
 {
        switch (type) {
        case OBJ_REQUEST_BIO:
+       case OBJ_REQUEST_PAGES:
                return true;
        default:
                return false;
@@ -1291,14 +1299,23 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request)
                rbd_img_request_put(img_request);
 }
 
+/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
+
+static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
+{
+       return wait_for_completion_interruptible(&obj_request->completion);
+}
+
 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
 {
        if (obj_request->callback)
                obj_request->callback(obj_request);
+       else
+               complete_all(&obj_request->completion);
 }
 
 /*
- * Request sync osd read
+ * Synchronously read a range from an object into a provided buffer
  */
 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
                          const char *object_name,
@@ -1556,6 +1573,11 @@ static struct ceph_osd_request *rbd_osd_req_create(
                /* osd client requires "num pages" even for bio */
                osd_req->r_num_pages = calc_pages_for(offset, length);
                break;
+       case OBJ_REQUEST_PAGES:
+               osd_req->r_pages = obj_request->pages;
+               osd_req->r_num_pages = obj_request->page_count;
+               osd_req->r_page_alignment = offset & ~PAGE_MASK;
+               break;
        }
 
        if (write_request) {
@@ -1616,6 +1638,7 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
        obj_request->type = type;
        INIT_LIST_HEAD(&obj_request->links);
        atomic_set(&obj_request->done, 0);
+       init_completion(&obj_request->completion);
        kref_init(&obj_request->kref);
 
        return obj_request;
@@ -1639,6 +1662,11 @@ static void rbd_obj_request_destroy(struct kref *kref)
                if (obj_request->bio_list)
                        bio_chain_put(obj_request->bio_list);
                break;
+       case OBJ_REQUEST_PAGES:
+               if (obj_request->pages)
+                       ceph_release_page_vector(obj_request->pages,
+                                               obj_request->page_count);
+               break;
        }
 
        kfree(obj_request);
@@ -1987,6 +2015,65 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
        put_disk(disk);
 }
 
+static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
+                               const char *object_name,
+                               u64 offset, u64 length,
+                               char *buf, u64 *version)
+
+{
+       struct ceph_osd_req_op *op;
+       struct rbd_obj_request *obj_request;
+       struct ceph_osd_client *osdc;
+       struct page **pages = NULL;
+       u32 page_count;
+       int ret;
+
+       page_count = (u32) calc_pages_for(offset, length);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages))
+               ret = PTR_ERR(pages);
+
+       ret = -ENOMEM;
+       obj_request = rbd_obj_request_create(object_name, offset, length,
+                                               OBJ_REQUEST_PAGES);
+       if (!obj_request)
+               goto out;
+
+       obj_request->pages = pages;
+       obj_request->page_count = page_count;
+
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
+       if (!op)
+               goto out;
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+                                               obj_request, op);
+       rbd_osd_req_op_destroy(op);
+       if (!obj_request->osd_req)
+               goto out;
+
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       ret = rbd_obj_request_submit(osdc, obj_request);
+       if (ret)
+               goto out;
+       ret = rbd_obj_request_wait(obj_request);
+       if (ret)
+               goto out;
+
+       ret = obj_request->result;
+       if (ret < 0)
+               goto out;
+       ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
+       if (version)
+               *version = obj_request->version;
+out:
+       if (obj_request)
+               rbd_obj_request_put(obj_request);
+       else
+               ceph_release_page_vector(pages, page_count);
+
+       return ret;
+}
+
 /*
  * Read the complete header for the given rbd device.
  *
@@ -2025,7 +2112,8 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
                if (!ondisk)
                        return ERR_PTR(-ENOMEM);
 
-               ret = rbd_req_sync_read(rbd_dev, rbd_dev->header_name,
+               (void) rbd_req_sync_read;       /* avoid a warning */
+               ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
                                       0, size,
                                       (char *) ondisk, version);