struct list_head node;
};
+struct rbd_req_coll;
+
/*
* a single io request
*/
struct bio *bio; /* cloned bio */
struct page **pages; /* list of used pages */
u64 len;
+ int coll_index;
+ struct rbd_req_coll *coll;
+};
+
+struct rbd_req_status {
+ int done;
+ int rc;
+ u64 bytes;
+};
+
+/*
+ * a collection of requests
+ */
+struct rbd_req_coll {
+ int total;
+ int num_done;
+ struct kref kref;
+ struct rbd_req_status status[0];
};
struct rbd_snap {
rbd_dev->client = NULL;
}
+/*
+ * Destroy requests collection
+ */
+static void rbd_coll_release(struct kref *kref)
+{
+ struct rbd_req_coll *coll =
+ container_of(kref, struct rbd_req_coll, kref);
+
+ dout("rbd_coll_release %p\n", coll);
+ kfree(coll);
+}
/*
* Create a new header structure, translate header format from the on-disk
return len;
}
+static int rbd_get_num_segments(struct rbd_image_header *header,
+ u64 ofs, u64 len)
+{
+ u64 start_seg = ofs >> header->obj_order;
+ u64 end_seg = (ofs + len - 1) >> header->obj_order;
+ return end_seg - start_seg + 1;
+}
+
/*
* bio helpers
*/
kfree(ops);
}
+static void rbd_coll_end_req_index(struct request *rq,
+ struct rbd_req_coll *coll,
+ int index,
+ int ret, u64 len)
+{
+ struct request_queue *q;
+ int min, max, i;
+
+ dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
+ coll, index, ret, len);
+
+ if (!rq)
+ return;
+
+ if (!coll) {
+ blk_end_request(rq, ret, len);
+ return;
+ }
+
+ q = rq->q;
+
+ spin_lock_irq(q->queue_lock);
+ coll->status[index].done = 1;
+ coll->status[index].rc = ret;
+ coll->status[index].bytes = len;
+ max = min = coll->num_done;
+ while (max < coll->total && coll->status[max].done)
+ max++;
+
+ for (i = min; i<max; i++) {
+ __blk_end_request(rq, coll->status[i].rc,
+ coll->status[i].bytes);
+ coll->num_done++;
+ kref_put(&coll->kref, rbd_coll_release);
+ }
+ spin_unlock_irq(q->queue_lock);
+}
+
+static void rbd_coll_end_req(struct rbd_request *req,
+ int ret, u64 len)
+{
+ rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
+}
+
/*
* Send ceph osd request
*/
int flags,
struct ceph_osd_req_op *ops,
int num_reply,
+ struct rbd_req_coll *coll,
+ int coll_index,
void (*rbd_cb)(struct ceph_osd_request *req,
struct ceph_msg *msg),
struct ceph_osd_request **linger_req,
struct ceph_osd_request_head *reqhead;
struct rbd_image_header *header = &dev->header;
- ret = -ENOMEM;
req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
- if (!req_data)
- goto done;
+ if (!req_data) {
+ if (coll)
+ rbd_coll_end_req_index(rq, coll, coll_index,
+ -ENOMEM, len);
+ return -ENOMEM;
+ }
+
+ if (coll) {
+ req_data->coll = coll;
+ req_data->coll_index = coll_index;
+ }
- dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
+ dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
down_read(&header->snap_rwsem);
ret = ceph_osdc_wait_request(&dev->client->osdc, req);
if (ver)
*ver = le64_to_cpu(req->r_reassert_version.version);
- dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
+ dout("reassert_ver=%lld\n",
+ le64_to_cpu(req->r_reassert_version.version));
ceph_osdc_put_request(req);
}
return ret;
bio_chain_put(req_data->bio);
ceph_osdc_put_request(req);
done_pages:
+ rbd_coll_end_req(req_data, ret, len);
kfree(req_data);
-done:
- if (rq)
- blk_end_request(rq, ret, len);
return ret;
}
bytes = req_data->len;
}
- blk_end_request(req_data->rq, rc, bytes);
+ rbd_coll_end_req(req_data, rc, bytes);
if (req_data->bio)
bio_chain_put(req_data->bio);
flags,
ops,
2,
+ NULL, 0,
NULL,
linger_req, ver);
if (ret < 0)
u64 snapid,
int opcode, int flags, int num_reply,
u64 ofs, u64 len,
- struct bio *bio)
+ struct bio *bio,
+ struct rbd_req_coll *coll,
+ int coll_index)
{
char *seg_name;
u64 seg_ofs;
flags,
ops,
num_reply,
+ coll, coll_index,
rbd_req_cb, 0, NULL);
rbd_destroy_ops(ops);
struct rbd_device *rbd_dev,
struct ceph_snap_context *snapc,
u64 ofs, u64 len,
- struct bio *bio)
+ struct bio *bio,
+ struct rbd_req_coll *coll,
+ int coll_index)
{
return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
2,
- ofs, len, bio);
+ ofs, len, bio, coll, coll_index);
}
/*
struct rbd_device *rbd_dev,
u64 snapid,
u64 ofs, u64 len,
- struct bio *bio)
+ struct bio *bio,
+ struct rbd_req_coll *coll,
+ int coll_index)
{
return rbd_do_op(rq, rbd_dev, NULL,
(snapid ? snapid : CEPH_NOSNAP),
CEPH_OSD_OP_READ,
CEPH_OSD_FLAG_READ,
2,
- ofs, len, bio);
+ ofs, len, bio, coll, coll_index);
}
/*
CEPH_OSD_FLAG_READ,
ops,
1,
+ NULL, 0,
rbd_simple_req_cb, 0, NULL);
rbd_destroy_ops(ops);
return ret;
}
+static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
+{
+ struct rbd_req_coll *coll =
+ kzalloc(sizeof(struct rbd_req_coll) +
+ sizeof(struct rbd_req_status) * num_reqs,
+ GFP_ATOMIC);
+
+ if (!coll)
+ return NULL;
+ coll->total = num_reqs;
+ kref_init(&coll->kref);
+ return coll;
+}
+
/*
* block device queue callback
*/
bool do_write;
int size, op_size = 0;
u64 ofs;
+ int num_segs, cur_seg = 0;
+ struct rbd_req_coll *coll;
/* peek at request from block layer */
if (!rq)
do_write ? "write" : "read",
size, blk_rq_pos(rq) * 512ULL);
+ num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
+ coll = rbd_alloc_coll(num_segs);
+ if (!coll) {
+ spin_lock_irq(q->queue_lock);
+ __blk_end_request_all(rq, -ENOMEM);
+ goto next;
+ }
+
do {
/* a bio clone to be passed down to OSD req */
dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
rbd_dev->header.block_name,
ofs, size,
NULL, NULL);
+ kref_get(&coll->kref);
bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
op_size, GFP_ATOMIC);
if (!bio) {
- spin_lock_irq(q->queue_lock);
- __blk_end_request_all(rq, -ENOMEM);
- goto next;
+ rbd_coll_end_req_index(rq, coll, cur_seg,
+ -ENOMEM, op_size);
+ goto next_seg;
}
+
/* init OSD command: write or read */
if (do_write)
rbd_req_write(rq, rbd_dev,
rbd_dev->header.snapc,
ofs,
- op_size, bio);
+ op_size, bio,
+ coll, cur_seg);
else
rbd_req_read(rq, rbd_dev,
cur_snap_id(rbd_dev),
ofs,
- op_size, bio);
+ op_size, bio,
+ coll, cur_seg);
+next_seg:
size -= op_size;
ofs += op_size;
+ cur_seg++;
rq_bio = next_bio;
} while (size > 0);
+ kref_put(&coll->kref, rbd_coll_release);
if (bp)
bio_pair_release(bp);
-
spin_lock_irq(q->queue_lock);
next:
rq = blk_fetch_request(q);