2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
49 #define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN 64
51 #define RBD_MAX_SNAP_NAME_LEN 32
52 #define RBD_MAX_OPT_LEN 1024
54 #define RBD_SNAP_HEAD_NAME "-"
56 #define DEV_NAME_LEN 32
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
61 * block device image metadata (in-memory version)
63 struct rbd_image_header
{
69 struct rw_semaphore snap_rwsem
;
70 struct ceph_snap_context
*snapc
;
71 size_t snap_names_len
;
86 * an instance of the client. multiple devices may share a client.
89 struct ceph_client
*client
;
90 struct rbd_options
*rbd_opts
;
92 struct list_head node
;
101 struct request
*rq
; /* blk layer request */
102 struct bio
*bio
; /* cloned bio */
103 struct page
**pages
; /* list of used pages */
106 struct rbd_req_coll
*coll
;
109 struct rbd_req_status
{
116 * a collection of requests
118 struct rbd_req_coll
{
122 struct rbd_req_status status
[0];
129 struct list_head node
;
137 int id
; /* blkdev unique id */
139 int major
; /* blkdev assigned major */
140 struct gendisk
*disk
; /* blkdev's gendisk and rq */
141 struct request_queue
*q
;
143 struct ceph_client
*client
;
144 struct rbd_client
*rbd_client
;
146 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
148 spinlock_t lock
; /* queue lock */
150 struct rbd_image_header header
;
151 char obj
[RBD_MAX_OBJ_NAME_LEN
]; /* rbd image name */
153 char obj_md_name
[RBD_MAX_MD_NAME_LEN
]; /* hdr nm. */
154 char pool_name
[RBD_MAX_POOL_NAME_LEN
];
157 struct ceph_osd_event
*watch_event
;
158 struct ceph_osd_request
*watch_request
;
160 char snap_name
[RBD_MAX_SNAP_NAME_LEN
];
161 u32 cur_snap
; /* index+1 of current snapshot within snap context
165 struct list_head node
;
167 /* list of snapshots */
168 struct list_head snaps
;
174 static struct bus_type rbd_bus_type
= {
178 static DEFINE_SPINLOCK(node_lock
); /* protects client get/put */
180 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list
); /* devices */
182 static LIST_HEAD(rbd_client_list
); /* clients */
184 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
);
185 static void rbd_dev_release(struct device
*dev
);
186 static ssize_t
rbd_snap_add(struct device
*dev
,
187 struct device_attribute
*attr
,
190 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
191 struct rbd_snap
*snap
);
194 static struct rbd_device
*dev_to_rbd(struct device
*dev
)
196 return container_of(dev
, struct rbd_device
, dev
);
199 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
201 return get_device(&rbd_dev
->dev
);
204 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
206 put_device(&rbd_dev
->dev
);
209 static int __rbd_update_snaps(struct rbd_device
*rbd_dev
);
211 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
213 struct gendisk
*disk
= bdev
->bd_disk
;
214 struct rbd_device
*rbd_dev
= disk
->private_data
;
216 rbd_get_dev(rbd_dev
);
218 set_device_ro(bdev
, rbd_dev
->read_only
);
220 if ((mode
& FMODE_WRITE
) && rbd_dev
->read_only
)
226 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
228 struct rbd_device
*rbd_dev
= disk
->private_data
;
230 rbd_put_dev(rbd_dev
);
235 static const struct block_device_operations rbd_bd_ops
= {
236 .owner
= THIS_MODULE
,
238 .release
= rbd_release
,
242 * Initialize an rbd client instance.
245 static struct rbd_client
*rbd_client_create(struct ceph_options
*opt
,
246 struct rbd_options
*rbd_opts
)
248 struct rbd_client
*rbdc
;
251 dout("rbd_client_create\n");
252 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
256 kref_init(&rbdc
->kref
);
257 INIT_LIST_HEAD(&rbdc
->node
);
259 rbdc
->client
= ceph_create_client(opt
, rbdc
, 0, 0);
260 if (IS_ERR(rbdc
->client
))
262 opt
= NULL
; /* Now rbdc->client is responsible for opt */
264 ret
= ceph_open_session(rbdc
->client
);
268 rbdc
->rbd_opts
= rbd_opts
;
270 spin_lock(&node_lock
);
271 list_add_tail(&rbdc
->node
, &rbd_client_list
);
272 spin_unlock(&node_lock
);
274 dout("rbd_client_create created %p\n", rbdc
);
278 ceph_destroy_client(rbdc
->client
);
283 ceph_destroy_options(opt
);
288 * Find a ceph client with specific addr and configuration.
290 static struct rbd_client
*__rbd_client_find(struct ceph_options
*opt
)
292 struct rbd_client
*client_node
;
294 if (opt
->flags
& CEPH_OPT_NOSHARE
)
297 list_for_each_entry(client_node
, &rbd_client_list
, node
)
298 if (ceph_compare_options(opt
, client_node
->client
) == 0)
311 /* string args above */
314 static match_table_t rbdopt_tokens
= {
315 {Opt_notify_timeout
, "notify_timeout=%d"},
317 /* string args above */
321 static int parse_rbd_opts_token(char *c
, void *private)
323 struct rbd_options
*rbdopt
= private;
324 substring_t argstr
[MAX_OPT_ARGS
];
325 int token
, intval
, ret
;
327 token
= match_token(c
, rbdopt_tokens
, argstr
);
331 if (token
< Opt_last_int
) {
332 ret
= match_int(&argstr
[0], &intval
);
334 pr_err("bad mount option arg (not int) "
338 dout("got int token %d val %d\n", token
, intval
);
339 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
340 dout("got string token %d val %s\n", token
,
343 dout("got token %d\n", token
);
347 case Opt_notify_timeout
:
348 rbdopt
->notify_timeout
= intval
;
357 * Get a ceph client with specific addr and configuration, if one does
358 * not exist create it.
360 static int rbd_get_client(struct rbd_device
*rbd_dev
, const char *mon_addr
,
363 struct rbd_client
*rbdc
;
364 struct ceph_options
*opt
;
366 struct rbd_options
*rbd_opts
;
368 rbd_opts
= kzalloc(sizeof(*rbd_opts
), GFP_KERNEL
);
372 rbd_opts
->notify_timeout
= RBD_NOTIFY_TIMEOUT_DEFAULT
;
374 ret
= ceph_parse_options(&opt
, options
, mon_addr
,
375 mon_addr
+ strlen(mon_addr
),
376 parse_rbd_opts_token
, rbd_opts
);
380 spin_lock(&node_lock
);
381 rbdc
= __rbd_client_find(opt
);
383 ceph_destroy_options(opt
);
386 /* using an existing client */
387 kref_get(&rbdc
->kref
);
388 rbd_dev
->rbd_client
= rbdc
;
389 rbd_dev
->client
= rbdc
->client
;
390 spin_unlock(&node_lock
);
393 spin_unlock(&node_lock
);
395 rbdc
= rbd_client_create(opt
, rbd_opts
);
401 rbd_dev
->rbd_client
= rbdc
;
402 rbd_dev
->client
= rbdc
->client
;
410 * Destroy ceph client
412 * Caller must hold node_lock.
414 static void rbd_client_release(struct kref
*kref
)
416 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
418 dout("rbd_release_client %p\n", rbdc
);
419 list_del(&rbdc
->node
);
421 ceph_destroy_client(rbdc
->client
);
422 kfree(rbdc
->rbd_opts
);
427 * Drop reference to ceph client node. If it's not referenced anymore, release
430 static void rbd_put_client(struct rbd_device
*rbd_dev
)
432 spin_lock(&node_lock
);
433 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
434 spin_unlock(&node_lock
);
435 rbd_dev
->rbd_client
= NULL
;
436 rbd_dev
->client
= NULL
;
440 * Destroy requests collection
442 static void rbd_coll_release(struct kref
*kref
)
444 struct rbd_req_coll
*coll
=
445 container_of(kref
, struct rbd_req_coll
, kref
);
447 dout("rbd_coll_release %p\n", coll
);
452 * Create a new header structure, translate header format from the on-disk
455 static int rbd_header_from_disk(struct rbd_image_header
*header
,
456 struct rbd_image_header_ondisk
*ondisk
,
461 u32 snap_count
= le32_to_cpu(ondisk
->snap_count
);
464 if (memcmp(ondisk
, RBD_HEADER_TEXT
, sizeof(RBD_HEADER_TEXT
)))
467 init_rwsem(&header
->snap_rwsem
);
468 header
->snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
469 header
->snapc
= kmalloc(sizeof(struct ceph_snap_context
) +
470 snap_count
* sizeof (*ondisk
),
475 header
->snap_names
= kmalloc(header
->snap_names_len
,
477 if (!header
->snap_names
)
479 header
->snap_sizes
= kmalloc(snap_count
* sizeof(u64
),
481 if (!header
->snap_sizes
)
484 header
->snap_names
= NULL
;
485 header
->snap_sizes
= NULL
;
487 memcpy(header
->block_name
, ondisk
->block_name
,
488 sizeof(ondisk
->block_name
));
490 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
491 header
->obj_order
= ondisk
->options
.order
;
492 header
->crypt_type
= ondisk
->options
.crypt_type
;
493 header
->comp_type
= ondisk
->options
.comp_type
;
495 atomic_set(&header
->snapc
->nref
, 1);
496 header
->snap_seq
= le64_to_cpu(ondisk
->snap_seq
);
497 header
->snapc
->num_snaps
= snap_count
;
498 header
->total_snaps
= snap_count
;
500 if (snap_count
&& allocated_snaps
== snap_count
) {
501 for (i
= 0; i
< snap_count
; i
++) {
502 header
->snapc
->snaps
[i
] =
503 le64_to_cpu(ondisk
->snaps
[i
].id
);
504 header
->snap_sizes
[i
] =
505 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
508 /* copy snapshot names */
509 memcpy(header
->snap_names
, &ondisk
->snaps
[i
],
510 header
->snap_names_len
);
516 kfree(header
->snap_names
);
518 kfree(header
->snapc
);
522 static int snap_index(struct rbd_image_header
*header
, int snap_num
)
524 return header
->total_snaps
- snap_num
;
527 static u64
cur_snap_id(struct rbd_device
*rbd_dev
)
529 struct rbd_image_header
*header
= &rbd_dev
->header
;
531 if (!rbd_dev
->cur_snap
)
534 return header
->snapc
->snaps
[snap_index(header
, rbd_dev
->cur_snap
)];
537 static int snap_by_name(struct rbd_image_header
*header
, const char *snap_name
,
541 char *p
= header
->snap_names
;
543 for (i
= 0; i
< header
->total_snaps
; i
++, p
+= strlen(p
) + 1) {
544 if (strcmp(snap_name
, p
) == 0)
547 if (i
== header
->total_snaps
)
550 *seq
= header
->snapc
->snaps
[i
];
553 *size
= header
->snap_sizes
[i
];
558 static int rbd_header_set_snap(struct rbd_device
*dev
,
559 const char *snap_name
,
562 struct rbd_image_header
*header
= &dev
->header
;
563 struct ceph_snap_context
*snapc
= header
->snapc
;
566 down_write(&header
->snap_rwsem
);
570 strcmp(snap_name
, "-") == 0 ||
571 strcmp(snap_name
, RBD_SNAP_HEAD_NAME
) == 0) {
572 if (header
->total_snaps
)
573 snapc
->seq
= header
->snap_seq
;
579 *size
= header
->image_size
;
581 ret
= snap_by_name(header
, snap_name
, &snapc
->seq
, size
);
585 dev
->cur_snap
= header
->total_snaps
- ret
;
591 up_write(&header
->snap_rwsem
);
595 static void rbd_header_free(struct rbd_image_header
*header
)
597 kfree(header
->snapc
);
598 kfree(header
->snap_names
);
599 kfree(header
->snap_sizes
);
603 * get the actual striped segment name, offset and length
605 static u64
rbd_get_segment(struct rbd_image_header
*header
,
606 const char *block_name
,
608 char *seg_name
, u64
*segofs
)
610 u64 seg
= ofs
>> header
->obj_order
;
613 snprintf(seg_name
, RBD_MAX_SEG_NAME_LEN
,
614 "%s.%012llx", block_name
, seg
);
616 ofs
= ofs
& ((1 << header
->obj_order
) - 1);
617 len
= min_t(u64
, len
, (1 << header
->obj_order
) - ofs
);
625 static int rbd_get_num_segments(struct rbd_image_header
*header
,
628 u64 start_seg
= ofs
>> header
->obj_order
;
629 u64 end_seg
= (ofs
+ len
- 1) >> header
->obj_order
;
630 return end_seg
- start_seg
+ 1;
634 * returns the size of an object in the image
636 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
638 return 1 << header
->obj_order
;
645 static void bio_chain_put(struct bio
*chain
)
651 chain
= chain
->bi_next
;
657 * zeros a bio chain, starting at specific offset
659 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
668 bio_for_each_segment(bv
, chain
, i
) {
669 if (pos
+ bv
->bv_len
> start_ofs
) {
670 int remainder
= max(start_ofs
- pos
, 0);
671 buf
= bvec_kmap_irq(bv
, &flags
);
672 memset(buf
+ remainder
, 0,
673 bv
->bv_len
- remainder
);
674 bvec_kunmap_irq(buf
, &flags
);
679 chain
= chain
->bi_next
;
684 * bio_chain_clone - clone a chain of bios up to a certain length.
685 * might return a bio_pair that will need to be released.
687 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
688 struct bio_pair
**bp
,
689 int len
, gfp_t gfpmask
)
691 struct bio
*tmp
, *old_chain
= *old
, *new_chain
= NULL
, *tail
= NULL
;
695 bio_pair_release(*bp
);
699 while (old_chain
&& (total
< len
)) {
700 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
704 if (total
+ old_chain
->bi_size
> len
) {
708 * this split can only happen with a single paged bio,
709 * split_bio will BUG_ON if this is not the case
711 dout("bio_chain_clone split! total=%d remaining=%d"
713 (int)total
, (int)len
-total
,
714 (int)old_chain
->bi_size
);
716 /* split the bio. We'll release it either in the next
717 call, or it will have to be released outside */
718 bp
= bio_split(old_chain
, (len
- total
) / 512ULL);
722 __bio_clone(tmp
, &bp
->bio1
);
726 __bio_clone(tmp
, old_chain
);
727 *next
= old_chain
->bi_next
;
731 gfpmask
&= ~__GFP_WAIT
;
735 new_chain
= tail
= tmp
;
740 old_chain
= old_chain
->bi_next
;
742 total
+= tmp
->bi_size
;
748 tail
->bi_next
= NULL
;
755 dout("bio_chain_clone with err\n");
756 bio_chain_put(new_chain
);
761 * helpers for osd request op vectors.
763 static int rbd_create_rw_ops(struct ceph_osd_req_op
**ops
,
768 *ops
= kzalloc(sizeof(struct ceph_osd_req_op
) * (num_ops
+ 1),
772 (*ops
)[0].op
= opcode
;
774 * op extent offset and length will be set later on
775 * in calc_raw_layout()
777 (*ops
)[0].payload_len
= payload_len
;
781 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
786 static void rbd_coll_end_req_index(struct request
*rq
,
787 struct rbd_req_coll
*coll
,
791 struct request_queue
*q
;
794 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
795 coll
, index
, ret
, len
);
801 blk_end_request(rq
, ret
, len
);
807 spin_lock_irq(q
->queue_lock
);
808 coll
->status
[index
].done
= 1;
809 coll
->status
[index
].rc
= ret
;
810 coll
->status
[index
].bytes
= len
;
811 max
= min
= coll
->num_done
;
812 while (max
< coll
->total
&& coll
->status
[max
].done
)
815 for (i
= min
; i
<max
; i
++) {
816 __blk_end_request(rq
, coll
->status
[i
].rc
,
817 coll
->status
[i
].bytes
);
819 kref_put(&coll
->kref
, rbd_coll_release
);
821 spin_unlock_irq(q
->queue_lock
);
824 static void rbd_coll_end_req(struct rbd_request
*req
,
827 rbd_coll_end_req_index(req
->rq
, req
->coll
, req
->coll_index
, ret
, len
);
831 * Send ceph osd request
833 static int rbd_do_request(struct request
*rq
,
834 struct rbd_device
*dev
,
835 struct ceph_snap_context
*snapc
,
837 const char *obj
, u64 ofs
, u64 len
,
842 struct ceph_osd_req_op
*ops
,
844 struct rbd_req_coll
*coll
,
846 void (*rbd_cb
)(struct ceph_osd_request
*req
,
847 struct ceph_msg
*msg
),
848 struct ceph_osd_request
**linger_req
,
851 struct ceph_osd_request
*req
;
852 struct ceph_file_layout
*layout
;
855 struct timespec mtime
= CURRENT_TIME
;
856 struct rbd_request
*req_data
;
857 struct ceph_osd_request_head
*reqhead
;
858 struct rbd_image_header
*header
= &dev
->header
;
860 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
863 rbd_coll_end_req_index(rq
, coll
, coll_index
,
869 req_data
->coll
= coll
;
870 req_data
->coll_index
= coll_index
;
873 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj
, len
, ofs
);
875 down_read(&header
->snap_rwsem
);
877 req
= ceph_osdc_alloc_request(&dev
->client
->osdc
, flags
,
881 GFP_NOIO
, pages
, bio
);
883 up_read(&header
->snap_rwsem
);
888 req
->r_callback
= rbd_cb
;
892 req_data
->pages
= pages
;
895 req
->r_priv
= req_data
;
897 reqhead
= req
->r_request
->front
.iov_base
;
898 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
900 strncpy(req
->r_oid
, obj
, sizeof(req
->r_oid
));
901 req
->r_oid_len
= strlen(req
->r_oid
);
903 layout
= &req
->r_file_layout
;
904 memset(layout
, 0, sizeof(*layout
));
905 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
906 layout
->fl_stripe_count
= cpu_to_le32(1);
907 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
908 layout
->fl_pg_preferred
= cpu_to_le32(-1);
909 layout
->fl_pg_pool
= cpu_to_le32(dev
->poolid
);
910 ceph_calc_raw_layout(&dev
->client
->osdc
, layout
, snapid
,
911 ofs
, &len
, &bno
, req
, ops
);
913 ceph_osdc_build_request(req
, ofs
, &len
,
917 req
->r_oid
, req
->r_oid_len
);
918 up_read(&header
->snap_rwsem
);
921 ceph_osdc_set_request_linger(&dev
->client
->osdc
, req
);
925 ret
= ceph_osdc_start_request(&dev
->client
->osdc
, req
, false);
930 ret
= ceph_osdc_wait_request(&dev
->client
->osdc
, req
);
932 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
933 dout("reassert_ver=%lld\n",
934 le64_to_cpu(req
->r_reassert_version
.version
));
935 ceph_osdc_put_request(req
);
940 bio_chain_put(req_data
->bio
);
941 ceph_osdc_put_request(req
);
943 rbd_coll_end_req(req_data
, ret
, len
);
949 * Ceph osd op callback
951 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
953 struct rbd_request
*req_data
= req
->r_priv
;
954 struct ceph_osd_reply_head
*replyhead
;
955 struct ceph_osd_op
*op
;
961 replyhead
= msg
->front
.iov_base
;
962 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
963 op
= (void *)(replyhead
+ 1);
964 rc
= le32_to_cpu(replyhead
->result
);
965 bytes
= le64_to_cpu(op
->extent
.length
);
966 read_op
= (le32_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
968 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes
, read_op
, rc
);
970 if (rc
== -ENOENT
&& read_op
) {
971 zero_bio_chain(req_data
->bio
, 0);
973 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
974 zero_bio_chain(req_data
->bio
, bytes
);
975 bytes
= req_data
->len
;
978 rbd_coll_end_req(req_data
, rc
, bytes
);
981 bio_chain_put(req_data
->bio
);
983 ceph_osdc_put_request(req
);
987 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
989 ceph_osdc_put_request(req
);
993 * Do a synchronous ceph osd operation
995 static int rbd_req_sync_op(struct rbd_device
*dev
,
996 struct ceph_snap_context
*snapc
,
1000 struct ceph_osd_req_op
*orig_ops
,
1005 struct ceph_osd_request
**linger_req
,
1009 struct page
**pages
;
1011 struct ceph_osd_req_op
*ops
= orig_ops
;
1014 num_pages
= calc_pages_for(ofs
, len
);
1015 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1017 return PTR_ERR(pages
);
1020 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? len
: 0);
1021 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
1025 if ((flags
& CEPH_OSD_FLAG_WRITE
) && buf
) {
1026 ret
= ceph_copy_to_page_vector(pages
, buf
, ofs
, len
);
1032 ret
= rbd_do_request(NULL
, dev
, snapc
, snapid
,
1033 obj
, ofs
, len
, NULL
,
1044 if ((flags
& CEPH_OSD_FLAG_READ
) && buf
)
1045 ret
= ceph_copy_from_page_vector(pages
, buf
, ofs
, ret
);
1049 rbd_destroy_ops(ops
);
1051 ceph_release_page_vector(pages
, num_pages
);
1056 * Do an asynchronous ceph osd operation
1058 static int rbd_do_op(struct request
*rq
,
1059 struct rbd_device
*rbd_dev
,
1060 struct ceph_snap_context
*snapc
,
1062 int opcode
, int flags
, int num_reply
,
1065 struct rbd_req_coll
*coll
,
1072 struct ceph_osd_req_op
*ops
;
1075 seg_name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
1079 seg_len
= rbd_get_segment(&rbd_dev
->header
,
1080 rbd_dev
->header
.block_name
,
1082 seg_name
, &seg_ofs
);
1084 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
1086 ret
= rbd_create_rw_ops(&ops
, 1, opcode
, payload_len
);
1090 /* we've taken care of segment sizes earlier when we
1091 cloned the bios. We should never have a segment
1092 truncated at this point */
1093 BUG_ON(seg_len
< len
);
1095 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
1096 seg_name
, seg_ofs
, seg_len
,
1103 rbd_req_cb
, 0, NULL
);
1105 rbd_destroy_ops(ops
);
1112 * Request async osd write
1114 static int rbd_req_write(struct request
*rq
,
1115 struct rbd_device
*rbd_dev
,
1116 struct ceph_snap_context
*snapc
,
1119 struct rbd_req_coll
*coll
,
1122 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
1124 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1126 ofs
, len
, bio
, coll
, coll_index
);
1130 * Request async osd read
1132 static int rbd_req_read(struct request
*rq
,
1133 struct rbd_device
*rbd_dev
,
1137 struct rbd_req_coll
*coll
,
1140 return rbd_do_op(rq
, rbd_dev
, NULL
,
1141 (snapid
? snapid
: CEPH_NOSNAP
),
1145 ofs
, len
, bio
, coll
, coll_index
);
1149 * Request sync osd read
1151 static int rbd_req_sync_read(struct rbd_device
*dev
,
1152 struct ceph_snap_context
*snapc
,
1159 return rbd_req_sync_op(dev
, NULL
,
1160 (snapid
? snapid
: CEPH_NOSNAP
),
1164 1, obj
, ofs
, len
, buf
, NULL
, ver
);
1168 * Request sync osd watch
1170 static int rbd_req_sync_notify_ack(struct rbd_device
*dev
,
1175 struct ceph_osd_req_op
*ops
;
1176 struct page
**pages
= NULL
;
1179 ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1183 ops
[0].watch
.ver
= cpu_to_le64(dev
->header
.obj_version
);
1184 ops
[0].watch
.cookie
= notify_id
;
1185 ops
[0].watch
.flag
= 0;
1187 ret
= rbd_do_request(NULL
, dev
, NULL
, CEPH_NOSNAP
,
1194 rbd_simple_req_cb
, 0, NULL
);
1196 rbd_destroy_ops(ops
);
1200 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1202 struct rbd_device
*dev
= (struct rbd_device
*)data
;
1208 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev
->obj_md_name
,
1209 notify_id
, (int)opcode
);
1210 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1211 rc
= __rbd_update_snaps(dev
);
1212 mutex_unlock(&ctl_mutex
);
1214 pr_warning(DRV_NAME
"%d got notification but failed to update"
1215 " snaps: %d\n", dev
->major
, rc
);
1217 rbd_req_sync_notify_ack(dev
, ver
, notify_id
, dev
->obj_md_name
);
1221 * Request sync osd watch
1223 static int rbd_req_sync_watch(struct rbd_device
*dev
,
1227 struct ceph_osd_req_op
*ops
;
1228 struct ceph_osd_client
*osdc
= &dev
->client
->osdc
;
1230 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_WATCH
, 0);
1234 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1235 (void *)dev
, &dev
->watch_event
);
1239 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1240 ops
[0].watch
.cookie
= cpu_to_le64(dev
->watch_event
->cookie
);
1241 ops
[0].watch
.flag
= 1;
1243 ret
= rbd_req_sync_op(dev
, NULL
,
1246 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1249 &dev
->watch_request
, NULL
);
1254 rbd_destroy_ops(ops
);
1258 ceph_osdc_cancel_event(dev
->watch_event
);
1259 dev
->watch_event
= NULL
;
1261 rbd_destroy_ops(ops
);
1266 * Request sync osd unwatch
1268 static int rbd_req_sync_unwatch(struct rbd_device
*dev
,
1271 struct ceph_osd_req_op
*ops
;
1273 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_WATCH
, 0);
1277 ops
[0].watch
.ver
= 0;
1278 ops
[0].watch
.cookie
= cpu_to_le64(dev
->watch_event
->cookie
);
1279 ops
[0].watch
.flag
= 0;
1281 ret
= rbd_req_sync_op(dev
, NULL
,
1284 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1286 1, obj
, 0, 0, NULL
, NULL
, NULL
);
1288 rbd_destroy_ops(ops
);
1289 ceph_osdc_cancel_event(dev
->watch_event
);
1290 dev
->watch_event
= NULL
;
1294 struct rbd_notify_info
{
1295 struct rbd_device
*dev
;
1298 static void rbd_notify_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1300 struct rbd_device
*dev
= (struct rbd_device
*)data
;
1304 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev
->obj_md_name
,
1305 notify_id
, (int)opcode
);
1309 * Request sync osd notify
1311 static int rbd_req_sync_notify(struct rbd_device
*dev
,
1314 struct ceph_osd_req_op
*ops
;
1315 struct ceph_osd_client
*osdc
= &dev
->client
->osdc
;
1316 struct ceph_osd_event
*event
;
1317 struct rbd_notify_info info
;
1318 int payload_len
= sizeof(u32
) + sizeof(u32
);
1321 ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_NOTIFY
, payload_len
);
1327 ret
= ceph_osdc_create_event(osdc
, rbd_notify_cb
, 1,
1328 (void *)&info
, &event
);
1332 ops
[0].watch
.ver
= 1;
1333 ops
[0].watch
.flag
= 1;
1334 ops
[0].watch
.cookie
= event
->cookie
;
1335 ops
[0].watch
.prot_ver
= RADOS_NOTIFY_VER
;
1336 ops
[0].watch
.timeout
= 12;
1338 ret
= rbd_req_sync_op(dev
, NULL
,
1341 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1343 1, obj
, 0, 0, NULL
, NULL
, NULL
);
1347 ret
= ceph_osdc_wait_event(event
, CEPH_OSD_TIMEOUT_DEFAULT
);
1348 dout("ceph_osdc_wait_event returned %d\n", ret
);
1349 rbd_destroy_ops(ops
);
1353 ceph_osdc_cancel_event(event
);
1355 rbd_destroy_ops(ops
);
1360 * Request sync osd read
1362 static int rbd_req_sync_exec(struct rbd_device
*dev
,
1370 struct ceph_osd_req_op
*ops
;
1371 int cls_len
= strlen(cls
);
1372 int method_len
= strlen(method
);
1373 int ret
= rbd_create_rw_ops(&ops
, 1, CEPH_OSD_OP_CALL
,
1374 cls_len
+ method_len
+ len
);
1378 ops
[0].cls
.class_name
= cls
;
1379 ops
[0].cls
.class_len
= (__u8
)cls_len
;
1380 ops
[0].cls
.method_name
= method
;
1381 ops
[0].cls
.method_len
= (__u8
)method_len
;
1382 ops
[0].cls
.argc
= 0;
1383 ops
[0].cls
.indata
= data
;
1384 ops
[0].cls
.indata_len
= len
;
1386 ret
= rbd_req_sync_op(dev
, NULL
,
1389 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1391 1, obj
, 0, 0, NULL
, NULL
, ver
);
1393 rbd_destroy_ops(ops
);
1395 dout("cls_exec returned %d\n", ret
);
1399 static struct rbd_req_coll
*rbd_alloc_coll(int num_reqs
)
1401 struct rbd_req_coll
*coll
=
1402 kzalloc(sizeof(struct rbd_req_coll
) +
1403 sizeof(struct rbd_req_status
) * num_reqs
,
1408 coll
->total
= num_reqs
;
1409 kref_init(&coll
->kref
);
1414 * block device queue callback
1416 static void rbd_rq_fn(struct request_queue
*q
)
1418 struct rbd_device
*rbd_dev
= q
->queuedata
;
1420 struct bio_pair
*bp
= NULL
;
1422 rq
= blk_fetch_request(q
);
1426 struct bio
*rq_bio
, *next_bio
= NULL
;
1428 int size
, op_size
= 0;
1430 int num_segs
, cur_seg
= 0;
1431 struct rbd_req_coll
*coll
;
1433 /* peek at request from block layer */
1437 dout("fetched request\n");
1439 /* filter out block requests we don't understand */
1440 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1441 __blk_end_request_all(rq
, 0);
1445 /* deduce our operation (read, write) */
1446 do_write
= (rq_data_dir(rq
) == WRITE
);
1448 size
= blk_rq_bytes(rq
);
1449 ofs
= blk_rq_pos(rq
) * 512ULL;
1451 if (do_write
&& rbd_dev
->read_only
) {
1452 __blk_end_request_all(rq
, -EROFS
);
1456 spin_unlock_irq(q
->queue_lock
);
1458 dout("%s 0x%x bytes at 0x%llx\n",
1459 do_write
? "write" : "read",
1460 size
, blk_rq_pos(rq
) * 512ULL);
1462 num_segs
= rbd_get_num_segments(&rbd_dev
->header
, ofs
, size
);
1463 coll
= rbd_alloc_coll(num_segs
);
1465 spin_lock_irq(q
->queue_lock
);
1466 __blk_end_request_all(rq
, -ENOMEM
);
1471 /* a bio clone to be passed down to OSD req */
1472 dout("rq->bio->bi_vcnt=%d\n", rq
->bio
->bi_vcnt
);
1473 op_size
= rbd_get_segment(&rbd_dev
->header
,
1474 rbd_dev
->header
.block_name
,
1477 kref_get(&coll
->kref
);
1478 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1479 op_size
, GFP_ATOMIC
);
1481 rbd_coll_end_req_index(rq
, coll
, cur_seg
,
1487 /* init OSD command: write or read */
1489 rbd_req_write(rq
, rbd_dev
,
1490 rbd_dev
->header
.snapc
,
1495 rbd_req_read(rq
, rbd_dev
,
1496 cur_snap_id(rbd_dev
),
1508 kref_put(&coll
->kref
, rbd_coll_release
);
1511 bio_pair_release(bp
);
1512 spin_lock_irq(q
->queue_lock
);
1514 rq
= blk_fetch_request(q
);
1519 * a queue callback. Makes sure that we don't create a bio that spans across
1520 * multiple osd objects. One exception would be with a single page bios,
1521 * which we handle later at bio_chain_clone
1523 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1524 struct bio_vec
*bvec
)
1526 struct rbd_device
*rbd_dev
= q
->queuedata
;
1527 unsigned int chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- 9);
1528 sector_t sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1529 unsigned int bio_sectors
= bmd
->bi_size
>> 9;
1532 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1533 + bio_sectors
)) << 9;
1535 max
= 0; /* bio_add cannot handle a negative return */
1536 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1537 return bvec
->bv_len
;
1541 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1543 struct gendisk
*disk
= rbd_dev
->disk
;
1548 rbd_header_free(&rbd_dev
->header
);
1550 if (disk
->flags
& GENHD_FL_UP
)
1553 blk_cleanup_queue(disk
->queue
);
1558 * reload the ondisk the header
1560 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1561 struct rbd_image_header
*header
)
1564 struct rbd_image_header_ondisk
*dh
;
1566 u64 snap_names_len
= 0;
1570 int len
= sizeof(*dh
) +
1571 snap_count
* sizeof(struct rbd_image_snap_ondisk
) +
1575 dh
= kmalloc(len
, GFP_KERNEL
);
1579 rc
= rbd_req_sync_read(rbd_dev
,
1581 rbd_dev
->obj_md_name
,
1587 rc
= rbd_header_from_disk(header
, dh
, snap_count
, GFP_KERNEL
);
1590 pr_warning("unrecognized header format"
1591 " for image %s", rbd_dev
->obj
);
1596 if (snap_count
!= header
->total_snaps
) {
1597 snap_count
= header
->total_snaps
;
1598 snap_names_len
= header
->snap_names_len
;
1599 rbd_header_free(header
);
1605 header
->obj_version
= ver
;
1615 static int rbd_header_add_snap(struct rbd_device
*dev
,
1616 const char *snap_name
,
1619 int name_len
= strlen(snap_name
);
1625 /* we should create a snapshot only if we're pointing at the head */
1629 ret
= ceph_monc_create_snapid(&dev
->client
->monc
, dev
->poolid
,
1631 dout("created snapid=%lld\n", new_snapid
);
1635 data
= kmalloc(name_len
+ 16, gfp_flags
);
1640 e
= data
+ name_len
+ 16;
1642 ceph_encode_string_safe(&p
, e
, snap_name
, name_len
, bad
);
1643 ceph_encode_64_safe(&p
, e
, new_snapid
, bad
);
1645 ret
= rbd_req_sync_exec(dev
, dev
->obj_md_name
, "rbd", "snap_add",
1646 data
, p
- data
, &ver
);
1653 dev
->header
.snapc
->seq
= new_snapid
;
1660 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1662 struct rbd_snap
*snap
;
1664 while (!list_empty(&rbd_dev
->snaps
)) {
1665 snap
= list_first_entry(&rbd_dev
->snaps
, struct rbd_snap
, node
);
1666 __rbd_remove_snap_dev(rbd_dev
, snap
);
1671 * only read the first part of the ondisk header, without the snaps info
1673 static int __rbd_update_snaps(struct rbd_device
*rbd_dev
)
1676 struct rbd_image_header h
;
1680 ret
= rbd_read_header(rbd_dev
, &h
);
1685 set_capacity(rbd_dev
->disk
, h
.image_size
/ 512ULL);
1687 down_write(&rbd_dev
->header
.snap_rwsem
);
1689 snap_seq
= rbd_dev
->header
.snapc
->seq
;
1690 if (rbd_dev
->header
.total_snaps
&&
1691 rbd_dev
->header
.snapc
->snaps
[0] == snap_seq
)
1692 /* pointing at the head, will need to follow that
1696 kfree(rbd_dev
->header
.snapc
);
1697 kfree(rbd_dev
->header
.snap_names
);
1698 kfree(rbd_dev
->header
.snap_sizes
);
1700 rbd_dev
->header
.total_snaps
= h
.total_snaps
;
1701 rbd_dev
->header
.snapc
= h
.snapc
;
1702 rbd_dev
->header
.snap_names
= h
.snap_names
;
1703 rbd_dev
->header
.snap_names_len
= h
.snap_names_len
;
1704 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1706 rbd_dev
->header
.snapc
->seq
= rbd_dev
->header
.snapc
->snaps
[0];
1708 rbd_dev
->header
.snapc
->seq
= snap_seq
;
1710 ret
= __rbd_init_snaps_header(rbd_dev
);
1712 up_write(&rbd_dev
->header
.snap_rwsem
);
1717 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1719 struct gendisk
*disk
;
1720 struct request_queue
*q
;
1724 /* contact OSD, request size info about the object being mapped */
1725 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
1729 /* no need to lock here, as rbd_dev is not registered yet */
1730 rc
= __rbd_init_snaps_header(rbd_dev
);
1734 rc
= rbd_header_set_snap(rbd_dev
, rbd_dev
->snap_name
, &total_size
);
1738 /* create gendisk info */
1740 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1744 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), DRV_NAME
"%d",
1746 disk
->major
= rbd_dev
->major
;
1747 disk
->first_minor
= 0;
1748 disk
->fops
= &rbd_bd_ops
;
1749 disk
->private_data
= rbd_dev
;
1753 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1757 /* set io sizes to object size */
1758 blk_queue_max_hw_sectors(q
, rbd_obj_bytes(&rbd_dev
->header
) / 512ULL);
1759 blk_queue_max_segment_size(q
, rbd_obj_bytes(&rbd_dev
->header
));
1760 blk_queue_io_min(q
, rbd_obj_bytes(&rbd_dev
->header
));
1761 blk_queue_io_opt(q
, rbd_obj_bytes(&rbd_dev
->header
));
1763 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1766 q
->queuedata
= rbd_dev
;
1768 rbd_dev
->disk
= disk
;
1771 /* finally, announce the disk to the world */
1772 set_capacity(disk
, total_size
/ 512ULL);
1775 pr_info("%s: added with size 0x%llx\n",
1776 disk
->disk_name
, (unsigned long long)total_size
);
1789 static ssize_t
rbd_size_show(struct device
*dev
,
1790 struct device_attribute
*attr
, char *buf
)
1792 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1794 return sprintf(buf
, "%llu\n", (unsigned long long)rbd_dev
->header
.image_size
);
1797 static ssize_t
rbd_major_show(struct device
*dev
,
1798 struct device_attribute
*attr
, char *buf
)
1800 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1802 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1805 static ssize_t
rbd_client_id_show(struct device
*dev
,
1806 struct device_attribute
*attr
, char *buf
)
1808 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1810 return sprintf(buf
, "client%lld\n", ceph_client_id(rbd_dev
->client
));
1813 static ssize_t
rbd_pool_show(struct device
*dev
,
1814 struct device_attribute
*attr
, char *buf
)
1816 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1818 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1821 static ssize_t
rbd_name_show(struct device
*dev
,
1822 struct device_attribute
*attr
, char *buf
)
1824 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1826 return sprintf(buf
, "%s\n", rbd_dev
->obj
);
1829 static ssize_t
rbd_snap_show(struct device
*dev
,
1830 struct device_attribute
*attr
,
1833 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1835 return sprintf(buf
, "%s\n", rbd_dev
->snap_name
);
1838 static ssize_t
rbd_image_refresh(struct device
*dev
,
1839 struct device_attribute
*attr
,
1843 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
1847 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1849 rc
= __rbd_update_snaps(rbd_dev
);
1853 mutex_unlock(&ctl_mutex
);
1857 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
1858 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
1859 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
1860 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
1861 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
1862 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
1863 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
1864 static DEVICE_ATTR(create_snap
, S_IWUSR
, NULL
, rbd_snap_add
);
1866 static struct attribute
*rbd_attrs
[] = {
1867 &dev_attr_size
.attr
,
1868 &dev_attr_major
.attr
,
1869 &dev_attr_client_id
.attr
,
1870 &dev_attr_pool
.attr
,
1871 &dev_attr_name
.attr
,
1872 &dev_attr_current_snap
.attr
,
1873 &dev_attr_refresh
.attr
,
1874 &dev_attr_create_snap
.attr
,
1878 static struct attribute_group rbd_attr_group
= {
1882 static const struct attribute_group
*rbd_attr_groups
[] = {
1887 static void rbd_sysfs_dev_release(struct device
*dev
)
1891 static struct device_type rbd_device_type
= {
1893 .groups
= rbd_attr_groups
,
1894 .release
= rbd_sysfs_dev_release
,
1902 static ssize_t
rbd_snap_size_show(struct device
*dev
,
1903 struct device_attribute
*attr
,
1906 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1908 return sprintf(buf
, "%lld\n", (long long)snap
->size
);
1911 static ssize_t
rbd_snap_id_show(struct device
*dev
,
1912 struct device_attribute
*attr
,
1915 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1917 return sprintf(buf
, "%lld\n", (long long)snap
->id
);
1920 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
1921 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
1923 static struct attribute
*rbd_snap_attrs
[] = {
1924 &dev_attr_snap_size
.attr
,
1925 &dev_attr_snap_id
.attr
,
1929 static struct attribute_group rbd_snap_attr_group
= {
1930 .attrs
= rbd_snap_attrs
,
1933 static void rbd_snap_dev_release(struct device
*dev
)
1935 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1940 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
1941 &rbd_snap_attr_group
,
1945 static struct device_type rbd_snap_device_type
= {
1946 .groups
= rbd_snap_attr_groups
,
1947 .release
= rbd_snap_dev_release
,
1950 static void __rbd_remove_snap_dev(struct rbd_device
*rbd_dev
,
1951 struct rbd_snap
*snap
)
1953 list_del(&snap
->node
);
1954 device_unregister(&snap
->dev
);
1957 static int rbd_register_snap_dev(struct rbd_device
*rbd_dev
,
1958 struct rbd_snap
*snap
,
1959 struct device
*parent
)
1961 struct device
*dev
= &snap
->dev
;
1964 dev
->type
= &rbd_snap_device_type
;
1965 dev
->parent
= parent
;
1966 dev
->release
= rbd_snap_dev_release
;
1967 dev_set_name(dev
, "snap_%s", snap
->name
);
1968 ret
= device_register(dev
);
1973 static int __rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
1974 int i
, const char *name
,
1975 struct rbd_snap
**snapp
)
1978 struct rbd_snap
*snap
= kzalloc(sizeof(*snap
), GFP_KERNEL
);
1981 snap
->name
= kstrdup(name
, GFP_KERNEL
);
1982 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
1983 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
1984 if (device_is_registered(&rbd_dev
->dev
)) {
1985 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
1999 * search for the previous snap in a null delimited string list
2001 const char *rbd_prev_snap_name(const char *name
, const char *start
)
2003 if (name
< start
+ 2)
2016 * compare the old list of snapshots that we have to what's in the header
2017 * and update it accordingly. Note that the header holds the snapshots
2018 * in a reverse order (from newest to oldest) and we need to go from
2019 * older to new so that we don't get a duplicate snap name when
2020 * doing the process (e.g., removed snapshot and recreated a new
2021 * one with the same name.
2023 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
)
2025 const char *name
, *first_name
;
2026 int i
= rbd_dev
->header
.total_snaps
;
2027 struct rbd_snap
*snap
, *old_snap
= NULL
;
2029 struct list_head
*p
, *n
;
2031 first_name
= rbd_dev
->header
.snap_names
;
2032 name
= first_name
+ rbd_dev
->header
.snap_names_len
;
2034 list_for_each_prev_safe(p
, n
, &rbd_dev
->snaps
) {
2037 old_snap
= list_entry(p
, struct rbd_snap
, node
);
2040 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
- 1];
2042 if (!i
|| old_snap
->id
< cur_id
) {
2043 /* old_snap->id was skipped, thus was removed */
2044 __rbd_remove_snap_dev(rbd_dev
, old_snap
);
2047 if (old_snap
->id
== cur_id
) {
2048 /* we have this snapshot already */
2050 name
= rbd_prev_snap_name(name
, first_name
);
2054 i
--, name
= rbd_prev_snap_name(name
, first_name
)) {
2059 cur_id
= rbd_dev
->header
.snapc
->snaps
[i
];
2060 /* snapshot removal? handle it above */
2061 if (cur_id
>= old_snap
->id
)
2063 /* a new snapshot */
2064 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
2068 /* note that we add it backward so using n and not p */
2069 list_add(&snap
->node
, n
);
2073 /* we're done going over the old snap list, just add what's left */
2074 for (; i
> 0; i
--) {
2075 name
= rbd_prev_snap_name(name
, first_name
);
2080 ret
= __rbd_add_snap_dev(rbd_dev
, i
- 1, name
, &snap
);
2083 list_add(&snap
->node
, &rbd_dev
->snaps
);
2090 static void rbd_root_dev_release(struct device
*dev
)
2094 static struct device rbd_root_dev
= {
2096 .release
= rbd_root_dev_release
,
2099 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
2103 struct rbd_snap
*snap
;
2105 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2106 dev
= &rbd_dev
->dev
;
2108 dev
->bus
= &rbd_bus_type
;
2109 dev
->type
= &rbd_device_type
;
2110 dev
->parent
= &rbd_root_dev
;
2111 dev
->release
= rbd_dev_release
;
2112 dev_set_name(dev
, "%d", rbd_dev
->id
);
2113 ret
= device_register(dev
);
2117 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
2118 ret
= rbd_register_snap_dev(rbd_dev
, snap
,
2124 mutex_unlock(&ctl_mutex
);
2127 mutex_unlock(&ctl_mutex
);
2131 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
2133 device_unregister(&rbd_dev
->dev
);
2136 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
2141 ret
= rbd_req_sync_watch(rbd_dev
, rbd_dev
->obj_md_name
,
2142 rbd_dev
->header
.obj_version
);
2143 if (ret
== -ERANGE
) {
2144 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2145 rc
= __rbd_update_snaps(rbd_dev
);
2146 mutex_unlock(&ctl_mutex
);
2150 } while (ret
== -ERANGE
);
2155 static ssize_t
rbd_add(struct bus_type
*bus
,
2159 struct ceph_osd_client
*osdc
;
2160 struct rbd_device
*rbd_dev
;
2161 ssize_t rc
= -ENOMEM
;
2162 int irc
, new_id
= 0;
2163 struct list_head
*tmp
;
2167 if (!try_module_get(THIS_MODULE
))
2170 mon_dev_name
= kmalloc(RBD_MAX_OPT_LEN
, GFP_KERNEL
);
2174 options
= kmalloc(RBD_MAX_OPT_LEN
, GFP_KERNEL
);
2178 /* new rbd_device object */
2179 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
2183 /* static rbd_device initialization */
2184 spin_lock_init(&rbd_dev
->lock
);
2185 INIT_LIST_HEAD(&rbd_dev
->node
);
2186 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2188 init_rwsem(&rbd_dev
->header
.snap_rwsem
);
2190 /* generate unique id: find highest unique id, add one */
2191 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2193 list_for_each(tmp
, &rbd_dev_list
) {
2194 struct rbd_device
*rbd_dev
;
2196 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2197 if (rbd_dev
->id
>= new_id
)
2198 new_id
= rbd_dev
->id
+ 1;
2201 rbd_dev
->id
= new_id
;
2203 /* add to global list */
2204 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
2206 /* parse add command */
2207 if (sscanf(buf
, "%" __stringify(RBD_MAX_OPT_LEN
) "s "
2208 "%" __stringify(RBD_MAX_OPT_LEN
) "s "
2209 "%" __stringify(RBD_MAX_POOL_NAME_LEN
) "s "
2210 "%" __stringify(RBD_MAX_OBJ_NAME_LEN
) "s"
2211 "%" __stringify(RBD_MAX_SNAP_NAME_LEN
) "s",
2212 mon_dev_name
, options
, rbd_dev
->pool_name
,
2213 rbd_dev
->obj
, rbd_dev
->snap_name
) < 4) {
2218 if (rbd_dev
->snap_name
[0] == 0)
2219 rbd_dev
->snap_name
[0] = '-';
2221 rbd_dev
->obj_len
= strlen(rbd_dev
->obj
);
2222 snprintf(rbd_dev
->obj_md_name
, sizeof(rbd_dev
->obj_md_name
), "%s%s",
2223 rbd_dev
->obj
, RBD_SUFFIX
);
2225 /* initialize rest of new object */
2226 snprintf(rbd_dev
->name
, DEV_NAME_LEN
, DRV_NAME
"%d", rbd_dev
->id
);
2227 rc
= rbd_get_client(rbd_dev
, mon_dev_name
, options
);
2231 mutex_unlock(&ctl_mutex
);
2234 osdc
= &rbd_dev
->client
->osdc
;
2235 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
2237 goto err_out_client
;
2238 rbd_dev
->poolid
= rc
;
2240 /* register our block device */
2241 irc
= register_blkdev(0, rbd_dev
->name
);
2244 goto err_out_client
;
2246 rbd_dev
->major
= irc
;
2248 rc
= rbd_bus_add_dev(rbd_dev
);
2250 goto err_out_blkdev
;
2252 /* set up and announce blkdev mapping */
2253 rc
= rbd_init_disk(rbd_dev
);
2257 rc
= rbd_init_watch_dev(rbd_dev
);
2264 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2265 list_del_init(&rbd_dev
->node
);
2266 mutex_unlock(&ctl_mutex
);
2268 /* this will also clean up rest of rbd_dev stuff */
2270 rbd_bus_del_dev(rbd_dev
);
2272 kfree(mon_dev_name
);
2276 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2278 rbd_put_client(rbd_dev
);
2279 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2281 list_del_init(&rbd_dev
->node
);
2282 mutex_unlock(&ctl_mutex
);
2288 kfree(mon_dev_name
);
2290 dout("Error adding device %s\n", buf
);
2291 module_put(THIS_MODULE
);
2295 static struct rbd_device
*__rbd_get_dev(unsigned long id
)
2297 struct list_head
*tmp
;
2298 struct rbd_device
*rbd_dev
;
2300 list_for_each(tmp
, &rbd_dev_list
) {
2301 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2302 if (rbd_dev
->id
== id
)
2308 static void rbd_dev_release(struct device
*dev
)
2310 struct rbd_device
*rbd_dev
=
2311 container_of(dev
, struct rbd_device
, dev
);
2313 if (rbd_dev
->watch_request
)
2314 ceph_osdc_unregister_linger_request(&rbd_dev
->client
->osdc
,
2315 rbd_dev
->watch_request
);
2316 if (rbd_dev
->watch_event
)
2317 rbd_req_sync_unwatch(rbd_dev
, rbd_dev
->obj_md_name
);
2319 rbd_put_client(rbd_dev
);
2321 /* clean up and free blkdev */
2322 rbd_free_disk(rbd_dev
);
2323 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2326 /* release module ref */
2327 module_put(THIS_MODULE
);
2330 static ssize_t
rbd_remove(struct bus_type
*bus
,
2334 struct rbd_device
*rbd_dev
= NULL
;
2339 rc
= strict_strtoul(buf
, 10, &ul
);
2343 /* convert to int; abort if we lost anything in the conversion */
2344 target_id
= (int) ul
;
2345 if (target_id
!= ul
)
2348 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2350 rbd_dev
= __rbd_get_dev(target_id
);
2356 list_del_init(&rbd_dev
->node
);
2358 __rbd_remove_all_snaps(rbd_dev
);
2359 rbd_bus_del_dev(rbd_dev
);
2362 mutex_unlock(&ctl_mutex
);
2366 static ssize_t
rbd_snap_add(struct device
*dev
,
2367 struct device_attribute
*attr
,
2371 struct rbd_device
*rbd_dev
= dev_to_rbd(dev
);
2373 char *name
= kmalloc(count
+ 1, GFP_KERNEL
);
2377 snprintf(name
, count
, "%s", buf
);
2379 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2381 ret
= rbd_header_add_snap(rbd_dev
,
2386 ret
= __rbd_update_snaps(rbd_dev
);
2390 /* shouldn't hold ctl_mutex when notifying.. notify might
2391 trigger a watch callback that would need to get that mutex */
2392 mutex_unlock(&ctl_mutex
);
2394 /* make a best effort, don't error if failed */
2395 rbd_req_sync_notify(rbd_dev
, rbd_dev
->obj_md_name
);
2402 mutex_unlock(&ctl_mutex
);
2407 static struct bus_attribute rbd_bus_attrs
[] = {
2408 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
2409 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
2414 * create control files in sysfs
2417 static int rbd_sysfs_init(void)
2421 rbd_bus_type
.bus_attrs
= rbd_bus_attrs
;
2423 ret
= bus_register(&rbd_bus_type
);
2427 ret
= device_register(&rbd_root_dev
);
2432 static void rbd_sysfs_cleanup(void)
2434 device_unregister(&rbd_root_dev
);
2435 bus_unregister(&rbd_bus_type
);
2438 int __init
rbd_init(void)
2442 rc
= rbd_sysfs_init();
2445 pr_info("loaded " DRV_NAME_LONG
"\n");
2449 void __exit
rbd_exit(void)
2451 rbd_sysfs_cleanup();
2454 module_init(rbd_init
);
2455 module_exit(rbd_exit
);
2457 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2458 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2459 MODULE_DESCRIPTION("rados block device");
2461 /* following authorship retained from original osdblk.c */
2462 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2464 MODULE_LICENSE("GPL");