kref_put(&rbdc->kref, rbd_client_release);
}
-/*
- * Destroy requests collection
- */
-static void rbd_coll_release(struct kref *kref)
-{
- struct rbd_req_coll *coll =
- container_of(kref, struct rbd_req_coll, kref);
-
- dout("rbd_coll_release %p\n", coll);
- kfree(coll);
-}
-
static bool rbd_image_format_valid(u32 image_format)
{
return image_format == 1 || image_format == 2;
return length;
}
-static int rbd_get_num_segments(struct rbd_image_header *header,
- u64 ofs, u64 len)
-{
- u64 start_seg;
- u64 end_seg;
- u64 result;
-
- if (!len)
- return 0;
- if (len - 1 > U64_MAX - ofs)
- return -ERANGE;
-
- start_seg = ofs >> header->obj_order;
- end_seg = (ofs + len - 1) >> header->obj_order;
-
- result = end_seg - start_seg + 1;
- if (result > (u64) INT_MAX)
- return -ERANGE;
-
- return (int) result;
-}
-
/*
* returns the size of an object in the image
*/
kfree(op);
}
-static void rbd_coll_end_req_index(struct request *rq,
- struct rbd_req_coll *coll,
- int index,
- s32 ret, u64 len)
-{
- struct request_queue *q;
- int min, max, i;
-
- dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
- coll, index, (int)ret, (unsigned long long)len);
-
- if (!rq)
- return;
-
- if (!coll) {
- blk_end_request(rq, ret, len);
- return;
- }
-
- q = rq->q;
-
- spin_lock_irq(q->queue_lock);
- coll->status[index].done = 1;
- coll->status[index].rc = ret;
- coll->status[index].bytes = len;
- max = min = coll->num_done;
- while (max < coll->total && coll->status[max].done)
- max++;
-
- for (i = min; i<max; i++) {
- __blk_end_request(rq, (int)coll->status[i].rc,
- coll->status[i].bytes);
- coll->num_done++;
- kref_put(&coll->kref, rbd_coll_release);
- }
- spin_unlock_irq(q->queue_lock);
-}
-
-static void rbd_coll_end_req(struct rbd_request *rbd_req,
- s32 ret, u64 len)
-{
- rbd_coll_end_req_index(rbd_req->rq,
- rbd_req->coll, rbd_req->coll_index,
- ret, len);
-}
-
/*
* Send ceph osd request
*/
return ret;
}
-/*
- * Ceph osd op callback
- */
-static void rbd_req_cb(struct ceph_osd_request *osd_req, struct ceph_msg *msg)
-{
- struct rbd_request *rbd_req = osd_req->r_priv;
- struct ceph_osd_reply_head *replyhead;
- struct ceph_osd_op *op;
- s32 rc;
- u64 bytes;
- int read_op;
-
- /* parse reply */
- replyhead = msg->front.iov_base;
- WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
- op = (void *)(replyhead + 1);
- rc = (s32)le32_to_cpu(replyhead->result);
- bytes = le64_to_cpu(op->extent.length);
- read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
-
- dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
- (unsigned long long) bytes, read_op, (int) rc);
-
- if (rc == (s32)-ENOENT && read_op) {
- zero_bio_chain(rbd_req->bio, 0);
- rc = 0;
- } else if (rc == 0 && read_op && bytes < rbd_req->len) {
- zero_bio_chain(rbd_req->bio, bytes);
- bytes = rbd_req->len;
- }
-
- rbd_coll_end_req(rbd_req, rc, bytes);
-
- if (rbd_req->bio)
- bio_chain_put(rbd_req->bio);
-
- ceph_osdc_put_request(osd_req);
- kfree(rbd_req);
-}
-
static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
struct ceph_msg *msg)
{
return ret;
}
-/*
- * Do an asynchronous ceph osd operation
- */
-static int rbd_do_op(struct request *rq,
- struct rbd_device *rbd_dev,
- struct ceph_snap_context *snapc,
- u64 ofs, u64 len,
- struct bio *bio,
- struct rbd_req_coll *coll,
- int coll_index)
-{
- const char *seg_name;
- u64 seg_ofs;
- u64 seg_len;
- int ret;
- struct ceph_osd_req_op *op;
- int opcode;
- int flags;
- u64 snapid;
-
- seg_name = rbd_segment_name(rbd_dev, ofs);
- if (!seg_name)
- return -ENOMEM;
- seg_len = rbd_segment_length(rbd_dev, ofs, len);
- seg_ofs = rbd_segment_offset(rbd_dev, ofs);
-
- if (rq_data_dir(rq) == WRITE) {
- opcode = CEPH_OSD_OP_WRITE;
- flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
- snapid = CEPH_NOSNAP;
- } else {
- opcode = CEPH_OSD_OP_READ;
- flags = CEPH_OSD_FLAG_READ;
- rbd_assert(!snapc);
- snapid = rbd_dev->spec->snap_id;
- }
-
- ret = -ENOMEM;
- op = rbd_osd_req_op_create(opcode, seg_ofs, seg_len);
- if (!op)
- goto done;
-
- /* we've taken care of segment sizes earlier when we
- cloned the bios. We should never have a segment
- truncated at this point */
- rbd_assert(seg_len == len);
-
- ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
- seg_name, seg_ofs, seg_len,
- bio,
- NULL, 0,
- flags,
- op,
- coll, coll_index,
- rbd_req_cb, NULL);
- if (ret < 0)
- rbd_coll_end_req_index(rq, coll, coll_index,
- (s32)ret, seg_len);
- rbd_osd_req_op_destroy(op);
-done:
- kfree(seg_name);
- return ret;
-}
-
static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
struct rbd_obj_request *obj_request)
{
return ret;
}
-static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
-{
- struct rbd_req_coll *coll =
- kzalloc(sizeof(struct rbd_req_coll) +
- sizeof(struct rbd_req_status) * num_reqs,
- GFP_ATOMIC);
-
- if (!coll)
- return NULL;
- coll->total = num_reqs;
- kref_init(&coll->kref);
- return coll;
-}
-
-static int rbd_dev_do_request(struct request *rq,
- struct rbd_device *rbd_dev,
- struct ceph_snap_context *snapc,
- u64 ofs, unsigned int size,
- struct bio *bio_chain)
-{
- int num_segs;
- struct rbd_req_coll *coll;
- unsigned int bio_offset;
- int cur_seg = 0;
-
- dout("%s 0x%x bytes at 0x%llx\n",
- rq_data_dir(rq) == WRITE ? "write" : "read",
- size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
-
- num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
- if (num_segs <= 0)
- return num_segs;
-
- coll = rbd_alloc_coll(num_segs);
- if (!coll)
- return -ENOMEM;
-
- bio_offset = 0;
- do {
- u64 limit = rbd_segment_length(rbd_dev, ofs, size);
- unsigned int clone_size;
- struct bio *bio_clone;
-
- BUG_ON(limit > (u64)UINT_MAX);
- clone_size = (unsigned int)limit;
- dout("bio_chain->bi_vcnt=%hu\n", bio_chain->bi_vcnt);
-
- kref_get(&coll->kref);
-
- /* Pass a cloned bio chain via an osd request */
-
- bio_clone = bio_chain_clone_range(&bio_chain,
- &bio_offset, clone_size,
- GFP_ATOMIC);
- if (bio_clone)
- (void)rbd_do_op(rq, rbd_dev, snapc,
- ofs, clone_size,
- bio_clone, coll, cur_seg);
- else
- rbd_coll_end_req_index(rq, coll, cur_seg,
- (s32)-ENOMEM,
- clone_size);
- size -= clone_size;
- ofs += clone_size;
-
- cur_seg++;
- } while (size > 0);
- kref_put(&coll->kref, rbd_coll_release);
-
- return 0;
-}
-
static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
struct ceph_osd_op *op)
{
}
}
-/*
- * block device queue callback
- */
-static void rbd_rq_fn(struct request_queue *q)
-{
- struct rbd_device *rbd_dev = q->queuedata;
- bool read_only = rbd_dev->mapping.read_only;
- struct request *rq;
-
- while ((rq = blk_fetch_request(q))) {
- struct ceph_snap_context *snapc = NULL;
- unsigned int size = 0;
- int result;
-
- dout("fetched request\n");
-
- /* Filter out block requests we don't understand */
-
- if ((rq->cmd_type != REQ_TYPE_FS)) {
- __blk_end_request_all(rq, 0);
- continue;
- }
- spin_unlock_irq(q->queue_lock);
-
- /* Write requests need a reference to the snapshot context */
-
- if (rq_data_dir(rq) == WRITE) {
- result = -EROFS;
- if (read_only) /* Can't write to a read-only device */
- goto out_end_request;
-
- /*
- * Note that each osd request will take its
- * own reference to the snapshot context
- * supplied. The reference we take here
- * just guarantees the one we provide stays
- * valid.
- */
- down_read(&rbd_dev->header_rwsem);
- snapc = ceph_get_snap_context(rbd_dev->header.snapc);
- up_read(&rbd_dev->header_rwsem);
- rbd_assert(snapc != NULL);
- } else if (!atomic_read(&rbd_dev->exists)) {
- rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
- dout("request for non-existent snapshot");
- result = -ENXIO;
- goto out_end_request;
- }
-
- size = blk_rq_bytes(rq);
- result = rbd_dev_do_request(rq, rbd_dev, snapc,
- blk_rq_pos(rq) * SECTOR_SIZE,
- size, rq->bio);
-out_end_request:
- if (snapc)
- ceph_put_snap_context(snapc);
- spin_lock_irq(q->queue_lock);
- if (!size || result < 0)
- __blk_end_request_all(rq, result);
- }
-}
-
/*
* a queue callback. Makes sure that we don't create a bio that spans across
* multiple osd objects. One exception would be with a single page bios,
disk->fops = &rbd_bd_ops;
disk->private_data = rbd_dev;
- (void) rbd_rq_fn; /* avoid a warning */
q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
if (!q)
goto out_disk;