b9371f0b9532f293ad9eb702d4a52850813f3dba
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46
47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48
49 #define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN 64
51 #define RBD_MAX_SNAP_NAME_LEN 32
52 #define RBD_MAX_OPT_LEN 1024
53
54 #define RBD_SNAP_HEAD_NAME "-"
55
56 #define DEV_NAME_LEN 32
57
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
60 /*
61 * block device image metadata (in-memory version)
62 */
63 struct rbd_image_header {
64 u64 image_size;
65 char block_name[32];
66 __u8 obj_order;
67 __u8 crypt_type;
68 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
72 u64 snap_seq;
73 u32 total_snaps;
74
75 char *snap_names;
76 u64 *snap_sizes;
77
78 u64 obj_version;
79 };
80
81 struct rbd_options {
82 int notify_timeout;
83 };
84
85 /*
86 * an instance of the client. multiple devices may share a client.
87 */
88 struct rbd_client {
89 struct ceph_client *client;
90 struct rbd_options *rbd_opts;
91 struct kref kref;
92 struct list_head node;
93 };
94
95 struct rbd_req_coll;
96
97 /*
98 * a single io request
99 */
100 struct rbd_request {
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
104 u64 len;
105 int coll_index;
106 struct rbd_req_coll *coll;
107 };
108
109 struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
113 };
114
115 /*
116 * a collection of requests
117 */
118 struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
123 };
124
125 struct rbd_snap {
126 struct device dev;
127 const char *name;
128 size_t size;
129 struct list_head node;
130 u64 id;
131 };
132
133 /*
134 * a single device
135 */
136 struct rbd_device {
137 int id; /* blkdev unique id */
138
139 int major; /* blkdev assigned major */
140 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q;
142
143 struct ceph_client *client;
144 struct rbd_client *rbd_client;
145
146 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147
148 spinlock_t lock; /* queue lock */
149
150 struct rbd_image_header header;
151 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152 int obj_len;
153 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154 char pool_name[RBD_MAX_POOL_NAME_LEN];
155 int poolid;
156
157 struct ceph_osd_event *watch_event;
158 struct ceph_osd_request *watch_request;
159
160 char snap_name[RBD_MAX_SNAP_NAME_LEN];
161 u32 cur_snap; /* index+1 of current snapshot within snap context
162 0 - for the head */
163 int read_only;
164
165 struct list_head node;
166
167 /* list of snapshots */
168 struct list_head snaps;
169
170 /* sysfs related */
171 struct device dev;
172 };
173
174 static struct bus_type rbd_bus_type = {
175 .name = "rbd",
176 };
177
178 static DEFINE_SPINLOCK(node_lock); /* protects client get/put */
179
180 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list); /* devices */
182 static LIST_HEAD(rbd_client_list); /* clients */
183
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_add(struct device *dev,
187 struct device_attribute *attr,
188 const char *buf,
189 size_t count);
190 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
191 struct rbd_snap *snap);
192
193
194 static struct rbd_device *dev_to_rbd(struct device *dev)
195 {
196 return container_of(dev, struct rbd_device, dev);
197 }
198
199 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
200 {
201 return get_device(&rbd_dev->dev);
202 }
203
204 static void rbd_put_dev(struct rbd_device *rbd_dev)
205 {
206 put_device(&rbd_dev->dev);
207 }
208
209 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
210
211 static int rbd_open(struct block_device *bdev, fmode_t mode)
212 {
213 struct gendisk *disk = bdev->bd_disk;
214 struct rbd_device *rbd_dev = disk->private_data;
215
216 rbd_get_dev(rbd_dev);
217
218 set_device_ro(bdev, rbd_dev->read_only);
219
220 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
221 return -EROFS;
222
223 return 0;
224 }
225
226 static int rbd_release(struct gendisk *disk, fmode_t mode)
227 {
228 struct rbd_device *rbd_dev = disk->private_data;
229
230 rbd_put_dev(rbd_dev);
231
232 return 0;
233 }
234
235 static const struct block_device_operations rbd_bd_ops = {
236 .owner = THIS_MODULE,
237 .open = rbd_open,
238 .release = rbd_release,
239 };
240
241 /*
242 * Initialize an rbd client instance.
243 * We own *opt.
244 */
245 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
246 struct rbd_options *rbd_opts)
247 {
248 struct rbd_client *rbdc;
249 int ret = -ENOMEM;
250
251 dout("rbd_client_create\n");
252 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
253 if (!rbdc)
254 goto out_opt;
255
256 kref_init(&rbdc->kref);
257 INIT_LIST_HEAD(&rbdc->node);
258
259 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
260 if (IS_ERR(rbdc->client))
261 goto out_rbdc;
262 opt = NULL; /* Now rbdc->client is responsible for opt */
263
264 ret = ceph_open_session(rbdc->client);
265 if (ret < 0)
266 goto out_err;
267
268 rbdc->rbd_opts = rbd_opts;
269
270 spin_lock(&node_lock);
271 list_add_tail(&rbdc->node, &rbd_client_list);
272 spin_unlock(&node_lock);
273
274 dout("rbd_client_create created %p\n", rbdc);
275 return rbdc;
276
277 out_err:
278 ceph_destroy_client(rbdc->client);
279 out_rbdc:
280 kfree(rbdc);
281 out_opt:
282 if (opt)
283 ceph_destroy_options(opt);
284 return ERR_PTR(ret);
285 }
286
287 /*
288 * Find a ceph client with specific addr and configuration.
289 */
290 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
291 {
292 struct rbd_client *client_node;
293
294 if (opt->flags & CEPH_OPT_NOSHARE)
295 return NULL;
296
297 list_for_each_entry(client_node, &rbd_client_list, node)
298 if (ceph_compare_options(opt, client_node->client) == 0)
299 return client_node;
300 return NULL;
301 }
302
303 /*
304 * mount options
305 */
306 enum {
307 Opt_notify_timeout,
308 Opt_last_int,
309 /* int args above */
310 Opt_last_string,
311 /* string args above */
312 };
313
314 static match_table_t rbdopt_tokens = {
315 {Opt_notify_timeout, "notify_timeout=%d"},
316 /* int args above */
317 /* string args above */
318 {-1, NULL}
319 };
320
321 static int parse_rbd_opts_token(char *c, void *private)
322 {
323 struct rbd_options *rbdopt = private;
324 substring_t argstr[MAX_OPT_ARGS];
325 int token, intval, ret;
326
327 token = match_token(c, rbdopt_tokens, argstr);
328 if (token < 0)
329 return -EINVAL;
330
331 if (token < Opt_last_int) {
332 ret = match_int(&argstr[0], &intval);
333 if (ret < 0) {
334 pr_err("bad mount option arg (not int) "
335 "at '%s'\n", c);
336 return ret;
337 }
338 dout("got int token %d val %d\n", token, intval);
339 } else if (token > Opt_last_int && token < Opt_last_string) {
340 dout("got string token %d val %s\n", token,
341 argstr[0].from);
342 } else {
343 dout("got token %d\n", token);
344 }
345
346 switch (token) {
347 case Opt_notify_timeout:
348 rbdopt->notify_timeout = intval;
349 break;
350 default:
351 BUG_ON(token);
352 }
353 return 0;
354 }
355
356 /*
357 * Get a ceph client with specific addr and configuration, if one does
358 * not exist create it.
359 */
360 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
361 char *options)
362 {
363 struct rbd_client *rbdc;
364 struct ceph_options *opt;
365 int ret;
366 struct rbd_options *rbd_opts;
367
368 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
369 if (!rbd_opts)
370 return -ENOMEM;
371
372 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
373
374 ret = ceph_parse_options(&opt, options, mon_addr,
375 mon_addr + strlen(mon_addr),
376 parse_rbd_opts_token, rbd_opts);
377 if (ret < 0)
378 goto done_err;
379
380 spin_lock(&node_lock);
381 rbdc = __rbd_client_find(opt);
382 if (rbdc) {
383 ceph_destroy_options(opt);
384 kfree(rbd_opts);
385
386 /* using an existing client */
387 kref_get(&rbdc->kref);
388 rbd_dev->rbd_client = rbdc;
389 rbd_dev->client = rbdc->client;
390 spin_unlock(&node_lock);
391 return 0;
392 }
393 spin_unlock(&node_lock);
394
395 rbdc = rbd_client_create(opt, rbd_opts);
396 if (IS_ERR(rbdc)) {
397 ret = PTR_ERR(rbdc);
398 goto done_err;
399 }
400
401 rbd_dev->rbd_client = rbdc;
402 rbd_dev->client = rbdc->client;
403 return 0;
404 done_err:
405 kfree(rbd_opts);
406 return ret;
407 }
408
409 /*
410 * Destroy ceph client
411 *
412 * Caller must hold node_lock.
413 */
414 static void rbd_client_release(struct kref *kref)
415 {
416 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
417
418 dout("rbd_release_client %p\n", rbdc);
419 list_del(&rbdc->node);
420
421 ceph_destroy_client(rbdc->client);
422 kfree(rbdc->rbd_opts);
423 kfree(rbdc);
424 }
425
426 /*
427 * Drop reference to ceph client node. If it's not referenced anymore, release
428 * it.
429 */
430 static void rbd_put_client(struct rbd_device *rbd_dev)
431 {
432 spin_lock(&node_lock);
433 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
434 spin_unlock(&node_lock);
435 rbd_dev->rbd_client = NULL;
436 rbd_dev->client = NULL;
437 }
438
439 /*
440 * Destroy requests collection
441 */
442 static void rbd_coll_release(struct kref *kref)
443 {
444 struct rbd_req_coll *coll =
445 container_of(kref, struct rbd_req_coll, kref);
446
447 dout("rbd_coll_release %p\n", coll);
448 kfree(coll);
449 }
450
451 /*
452 * Create a new header structure, translate header format from the on-disk
453 * header.
454 */
455 static int rbd_header_from_disk(struct rbd_image_header *header,
456 struct rbd_image_header_ondisk *ondisk,
457 int allocated_snaps,
458 gfp_t gfp_flags)
459 {
460 int i;
461 u32 snap_count = le32_to_cpu(ondisk->snap_count);
462 int ret = -ENOMEM;
463
464 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
465 return -ENXIO;
466
467 init_rwsem(&header->snap_rwsem);
468 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
469 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
470 snap_count * sizeof (*ondisk),
471 gfp_flags);
472 if (!header->snapc)
473 return -ENOMEM;
474 if (snap_count) {
475 header->snap_names = kmalloc(header->snap_names_len,
476 GFP_KERNEL);
477 if (!header->snap_names)
478 goto err_snapc;
479 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
480 GFP_KERNEL);
481 if (!header->snap_sizes)
482 goto err_names;
483 } else {
484 header->snap_names = NULL;
485 header->snap_sizes = NULL;
486 }
487 memcpy(header->block_name, ondisk->block_name,
488 sizeof(ondisk->block_name));
489
490 header->image_size = le64_to_cpu(ondisk->image_size);
491 header->obj_order = ondisk->options.order;
492 header->crypt_type = ondisk->options.crypt_type;
493 header->comp_type = ondisk->options.comp_type;
494
495 atomic_set(&header->snapc->nref, 1);
496 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
497 header->snapc->num_snaps = snap_count;
498 header->total_snaps = snap_count;
499
500 if (snap_count && allocated_snaps == snap_count) {
501 for (i = 0; i < snap_count; i++) {
502 header->snapc->snaps[i] =
503 le64_to_cpu(ondisk->snaps[i].id);
504 header->snap_sizes[i] =
505 le64_to_cpu(ondisk->snaps[i].image_size);
506 }
507
508 /* copy snapshot names */
509 memcpy(header->snap_names, &ondisk->snaps[i],
510 header->snap_names_len);
511 }
512
513 return 0;
514
515 err_names:
516 kfree(header->snap_names);
517 err_snapc:
518 kfree(header->snapc);
519 return ret;
520 }
521
522 static int snap_index(struct rbd_image_header *header, int snap_num)
523 {
524 return header->total_snaps - snap_num;
525 }
526
527 static u64 cur_snap_id(struct rbd_device *rbd_dev)
528 {
529 struct rbd_image_header *header = &rbd_dev->header;
530
531 if (!rbd_dev->cur_snap)
532 return 0;
533
534 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
535 }
536
537 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
538 u64 *seq, u64 *size)
539 {
540 int i;
541 char *p = header->snap_names;
542
543 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
544 if (strcmp(snap_name, p) == 0)
545 break;
546 }
547 if (i == header->total_snaps)
548 return -ENOENT;
549 if (seq)
550 *seq = header->snapc->snaps[i];
551
552 if (size)
553 *size = header->snap_sizes[i];
554
555 return i;
556 }
557
558 static int rbd_header_set_snap(struct rbd_device *dev,
559 const char *snap_name,
560 u64 *size)
561 {
562 struct rbd_image_header *header = &dev->header;
563 struct ceph_snap_context *snapc = header->snapc;
564 int ret = -ENOENT;
565
566 down_write(&header->snap_rwsem);
567
568 if (!snap_name ||
569 !*snap_name ||
570 strcmp(snap_name, "-") == 0 ||
571 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
572 if (header->total_snaps)
573 snapc->seq = header->snap_seq;
574 else
575 snapc->seq = 0;
576 dev->cur_snap = 0;
577 dev->read_only = 0;
578 if (size)
579 *size = header->image_size;
580 } else {
581 ret = snap_by_name(header, snap_name, &snapc->seq, size);
582 if (ret < 0)
583 goto done;
584
585 dev->cur_snap = header->total_snaps - ret;
586 dev->read_only = 1;
587 }
588
589 ret = 0;
590 done:
591 up_write(&header->snap_rwsem);
592 return ret;
593 }
594
595 static void rbd_header_free(struct rbd_image_header *header)
596 {
597 kfree(header->snapc);
598 kfree(header->snap_names);
599 kfree(header->snap_sizes);
600 }
601
602 /*
603 * get the actual striped segment name, offset and length
604 */
605 static u64 rbd_get_segment(struct rbd_image_header *header,
606 const char *block_name,
607 u64 ofs, u64 len,
608 char *seg_name, u64 *segofs)
609 {
610 u64 seg = ofs >> header->obj_order;
611
612 if (seg_name)
613 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
614 "%s.%012llx", block_name, seg);
615
616 ofs = ofs & ((1 << header->obj_order) - 1);
617 len = min_t(u64, len, (1 << header->obj_order) - ofs);
618
619 if (segofs)
620 *segofs = ofs;
621
622 return len;
623 }
624
625 static int rbd_get_num_segments(struct rbd_image_header *header,
626 u64 ofs, u64 len)
627 {
628 u64 start_seg = ofs >> header->obj_order;
629 u64 end_seg = (ofs + len - 1) >> header->obj_order;
630 return end_seg - start_seg + 1;
631 }
632
633 /*
634 * returns the size of an object in the image
635 */
636 static u64 rbd_obj_bytes(struct rbd_image_header *header)
637 {
638 return 1 << header->obj_order;
639 }
640
641 /*
642 * bio helpers
643 */
644
645 static void bio_chain_put(struct bio *chain)
646 {
647 struct bio *tmp;
648
649 while (chain) {
650 tmp = chain;
651 chain = chain->bi_next;
652 bio_put(tmp);
653 }
654 }
655
656 /*
657 * zeros a bio chain, starting at specific offset
658 */
659 static void zero_bio_chain(struct bio *chain, int start_ofs)
660 {
661 struct bio_vec *bv;
662 unsigned long flags;
663 void *buf;
664 int i;
665 int pos = 0;
666
667 while (chain) {
668 bio_for_each_segment(bv, chain, i) {
669 if (pos + bv->bv_len > start_ofs) {
670 int remainder = max(start_ofs - pos, 0);
671 buf = bvec_kmap_irq(bv, &flags);
672 memset(buf + remainder, 0,
673 bv->bv_len - remainder);
674 bvec_kunmap_irq(buf, &flags);
675 }
676 pos += bv->bv_len;
677 }
678
679 chain = chain->bi_next;
680 }
681 }
682
683 /*
684 * bio_chain_clone - clone a chain of bios up to a certain length.
685 * might return a bio_pair that will need to be released.
686 */
687 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
688 struct bio_pair **bp,
689 int len, gfp_t gfpmask)
690 {
691 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
692 int total = 0;
693
694 if (*bp) {
695 bio_pair_release(*bp);
696 *bp = NULL;
697 }
698
699 while (old_chain && (total < len)) {
700 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
701 if (!tmp)
702 goto err_out;
703
704 if (total + old_chain->bi_size > len) {
705 struct bio_pair *bp;
706
707 /*
708 * this split can only happen with a single paged bio,
709 * split_bio will BUG_ON if this is not the case
710 */
711 dout("bio_chain_clone split! total=%d remaining=%d"
712 "bi_size=%d\n",
713 (int)total, (int)len-total,
714 (int)old_chain->bi_size);
715
716 /* split the bio. We'll release it either in the next
717 call, or it will have to be released outside */
718 bp = bio_split(old_chain, (len - total) / 512ULL);
719 if (!bp)
720 goto err_out;
721
722 __bio_clone(tmp, &bp->bio1);
723
724 *next = &bp->bio2;
725 } else {
726 __bio_clone(tmp, old_chain);
727 *next = old_chain->bi_next;
728 }
729
730 tmp->bi_bdev = NULL;
731 gfpmask &= ~__GFP_WAIT;
732 tmp->bi_next = NULL;
733
734 if (!new_chain) {
735 new_chain = tail = tmp;
736 } else {
737 tail->bi_next = tmp;
738 tail = tmp;
739 }
740 old_chain = old_chain->bi_next;
741
742 total += tmp->bi_size;
743 }
744
745 BUG_ON(total < len);
746
747 if (tail)
748 tail->bi_next = NULL;
749
750 *old = old_chain;
751
752 return new_chain;
753
754 err_out:
755 dout("bio_chain_clone with err\n");
756 bio_chain_put(new_chain);
757 return NULL;
758 }
759
760 /*
761 * helpers for osd request op vectors.
762 */
763 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
764 int num_ops,
765 int opcode,
766 u32 payload_len)
767 {
768 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
769 GFP_NOIO);
770 if (!*ops)
771 return -ENOMEM;
772 (*ops)[0].op = opcode;
773 /*
774 * op extent offset and length will be set later on
775 * in calc_raw_layout()
776 */
777 (*ops)[0].payload_len = payload_len;
778 return 0;
779 }
780
781 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
782 {
783 kfree(ops);
784 }
785
786 static void rbd_coll_end_req_index(struct request *rq,
787 struct rbd_req_coll *coll,
788 int index,
789 int ret, u64 len)
790 {
791 struct request_queue *q;
792 int min, max, i;
793
794 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
795 coll, index, ret, len);
796
797 if (!rq)
798 return;
799
800 if (!coll) {
801 blk_end_request(rq, ret, len);
802 return;
803 }
804
805 q = rq->q;
806
807 spin_lock_irq(q->queue_lock);
808 coll->status[index].done = 1;
809 coll->status[index].rc = ret;
810 coll->status[index].bytes = len;
811 max = min = coll->num_done;
812 while (max < coll->total && coll->status[max].done)
813 max++;
814
815 for (i = min; i<max; i++) {
816 __blk_end_request(rq, coll->status[i].rc,
817 coll->status[i].bytes);
818 coll->num_done++;
819 kref_put(&coll->kref, rbd_coll_release);
820 }
821 spin_unlock_irq(q->queue_lock);
822 }
823
824 static void rbd_coll_end_req(struct rbd_request *req,
825 int ret, u64 len)
826 {
827 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
828 }
829
830 /*
831 * Send ceph osd request
832 */
833 static int rbd_do_request(struct request *rq,
834 struct rbd_device *dev,
835 struct ceph_snap_context *snapc,
836 u64 snapid,
837 const char *obj, u64 ofs, u64 len,
838 struct bio *bio,
839 struct page **pages,
840 int num_pages,
841 int flags,
842 struct ceph_osd_req_op *ops,
843 int num_reply,
844 struct rbd_req_coll *coll,
845 int coll_index,
846 void (*rbd_cb)(struct ceph_osd_request *req,
847 struct ceph_msg *msg),
848 struct ceph_osd_request **linger_req,
849 u64 *ver)
850 {
851 struct ceph_osd_request *req;
852 struct ceph_file_layout *layout;
853 int ret;
854 u64 bno;
855 struct timespec mtime = CURRENT_TIME;
856 struct rbd_request *req_data;
857 struct ceph_osd_request_head *reqhead;
858 struct rbd_image_header *header = &dev->header;
859
860 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
861 if (!req_data) {
862 if (coll)
863 rbd_coll_end_req_index(rq, coll, coll_index,
864 -ENOMEM, len);
865 return -ENOMEM;
866 }
867
868 if (coll) {
869 req_data->coll = coll;
870 req_data->coll_index = coll_index;
871 }
872
873 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
874
875 down_read(&header->snap_rwsem);
876
877 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
878 snapc,
879 ops,
880 false,
881 GFP_NOIO, pages, bio);
882 if (!req) {
883 up_read(&header->snap_rwsem);
884 ret = -ENOMEM;
885 goto done_pages;
886 }
887
888 req->r_callback = rbd_cb;
889
890 req_data->rq = rq;
891 req_data->bio = bio;
892 req_data->pages = pages;
893 req_data->len = len;
894
895 req->r_priv = req_data;
896
897 reqhead = req->r_request->front.iov_base;
898 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
899
900 strncpy(req->r_oid, obj, sizeof(req->r_oid));
901 req->r_oid_len = strlen(req->r_oid);
902
903 layout = &req->r_file_layout;
904 memset(layout, 0, sizeof(*layout));
905 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
906 layout->fl_stripe_count = cpu_to_le32(1);
907 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
908 layout->fl_pg_preferred = cpu_to_le32(-1);
909 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
910 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
911 ofs, &len, &bno, req, ops);
912
913 ceph_osdc_build_request(req, ofs, &len,
914 ops,
915 snapc,
916 &mtime,
917 req->r_oid, req->r_oid_len);
918 up_read(&header->snap_rwsem);
919
920 if (linger_req) {
921 ceph_osdc_set_request_linger(&dev->client->osdc, req);
922 *linger_req = req;
923 }
924
925 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
926 if (ret < 0)
927 goto done_err;
928
929 if (!rbd_cb) {
930 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
931 if (ver)
932 *ver = le64_to_cpu(req->r_reassert_version.version);
933 dout("reassert_ver=%lld\n",
934 le64_to_cpu(req->r_reassert_version.version));
935 ceph_osdc_put_request(req);
936 }
937 return ret;
938
939 done_err:
940 bio_chain_put(req_data->bio);
941 ceph_osdc_put_request(req);
942 done_pages:
943 rbd_coll_end_req(req_data, ret, len);
944 kfree(req_data);
945 return ret;
946 }
947
948 /*
949 * Ceph osd op callback
950 */
951 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
952 {
953 struct rbd_request *req_data = req->r_priv;
954 struct ceph_osd_reply_head *replyhead;
955 struct ceph_osd_op *op;
956 __s32 rc;
957 u64 bytes;
958 int read_op;
959
960 /* parse reply */
961 replyhead = msg->front.iov_base;
962 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
963 op = (void *)(replyhead + 1);
964 rc = le32_to_cpu(replyhead->result);
965 bytes = le64_to_cpu(op->extent.length);
966 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
967
968 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
969
970 if (rc == -ENOENT && read_op) {
971 zero_bio_chain(req_data->bio, 0);
972 rc = 0;
973 } else if (rc == 0 && read_op && bytes < req_data->len) {
974 zero_bio_chain(req_data->bio, bytes);
975 bytes = req_data->len;
976 }
977
978 rbd_coll_end_req(req_data, rc, bytes);
979
980 if (req_data->bio)
981 bio_chain_put(req_data->bio);
982
983 ceph_osdc_put_request(req);
984 kfree(req_data);
985 }
986
987 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
988 {
989 ceph_osdc_put_request(req);
990 }
991
992 /*
993 * Do a synchronous ceph osd operation
994 */
995 static int rbd_req_sync_op(struct rbd_device *dev,
996 struct ceph_snap_context *snapc,
997 u64 snapid,
998 int opcode,
999 int flags,
1000 struct ceph_osd_req_op *orig_ops,
1001 int num_reply,
1002 const char *obj,
1003 u64 ofs, u64 len,
1004 char *buf,
1005 struct ceph_osd_request **linger_req,
1006 u64 *ver)
1007 {
1008 int ret;
1009 struct page **pages;
1010 int num_pages;
1011 struct ceph_osd_req_op *ops = orig_ops;
1012 u32 payload_len;
1013
1014 num_pages = calc_pages_for(ofs , len);
1015 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1016 if (IS_ERR(pages))
1017 return PTR_ERR(pages);
1018
1019 if (!orig_ops) {
1020 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1021 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1022 if (ret < 0)
1023 goto done;
1024
1025 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1026 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1027 if (ret < 0)
1028 goto done_ops;
1029 }
1030 }
1031
1032 ret = rbd_do_request(NULL, dev, snapc, snapid,
1033 obj, ofs, len, NULL,
1034 pages, num_pages,
1035 flags,
1036 ops,
1037 2,
1038 NULL, 0,
1039 NULL,
1040 linger_req, ver);
1041 if (ret < 0)
1042 goto done_ops;
1043
1044 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1045 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1046
1047 done_ops:
1048 if (!orig_ops)
1049 rbd_destroy_ops(ops);
1050 done:
1051 ceph_release_page_vector(pages, num_pages);
1052 return ret;
1053 }
1054
1055 /*
1056 * Do an asynchronous ceph osd operation
1057 */
1058 static int rbd_do_op(struct request *rq,
1059 struct rbd_device *rbd_dev ,
1060 struct ceph_snap_context *snapc,
1061 u64 snapid,
1062 int opcode, int flags, int num_reply,
1063 u64 ofs, u64 len,
1064 struct bio *bio,
1065 struct rbd_req_coll *coll,
1066 int coll_index)
1067 {
1068 char *seg_name;
1069 u64 seg_ofs;
1070 u64 seg_len;
1071 int ret;
1072 struct ceph_osd_req_op *ops;
1073 u32 payload_len;
1074
1075 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1076 if (!seg_name)
1077 return -ENOMEM;
1078
1079 seg_len = rbd_get_segment(&rbd_dev->header,
1080 rbd_dev->header.block_name,
1081 ofs, len,
1082 seg_name, &seg_ofs);
1083
1084 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1085
1086 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1087 if (ret < 0)
1088 goto done;
1089
1090 /* we've taken care of segment sizes earlier when we
1091 cloned the bios. We should never have a segment
1092 truncated at this point */
1093 BUG_ON(seg_len < len);
1094
1095 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1096 seg_name, seg_ofs, seg_len,
1097 bio,
1098 NULL, 0,
1099 flags,
1100 ops,
1101 num_reply,
1102 coll, coll_index,
1103 rbd_req_cb, 0, NULL);
1104
1105 rbd_destroy_ops(ops);
1106 done:
1107 kfree(seg_name);
1108 return ret;
1109 }
1110
1111 /*
1112 * Request async osd write
1113 */
1114 static int rbd_req_write(struct request *rq,
1115 struct rbd_device *rbd_dev,
1116 struct ceph_snap_context *snapc,
1117 u64 ofs, u64 len,
1118 struct bio *bio,
1119 struct rbd_req_coll *coll,
1120 int coll_index)
1121 {
1122 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1123 CEPH_OSD_OP_WRITE,
1124 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1125 2,
1126 ofs, len, bio, coll, coll_index);
1127 }
1128
1129 /*
1130 * Request async osd read
1131 */
1132 static int rbd_req_read(struct request *rq,
1133 struct rbd_device *rbd_dev,
1134 u64 snapid,
1135 u64 ofs, u64 len,
1136 struct bio *bio,
1137 struct rbd_req_coll *coll,
1138 int coll_index)
1139 {
1140 return rbd_do_op(rq, rbd_dev, NULL,
1141 (snapid ? snapid : CEPH_NOSNAP),
1142 CEPH_OSD_OP_READ,
1143 CEPH_OSD_FLAG_READ,
1144 2,
1145 ofs, len, bio, coll, coll_index);
1146 }
1147
1148 /*
1149 * Request sync osd read
1150 */
1151 static int rbd_req_sync_read(struct rbd_device *dev,
1152 struct ceph_snap_context *snapc,
1153 u64 snapid,
1154 const char *obj,
1155 u64 ofs, u64 len,
1156 char *buf,
1157 u64 *ver)
1158 {
1159 return rbd_req_sync_op(dev, NULL,
1160 (snapid ? snapid : CEPH_NOSNAP),
1161 CEPH_OSD_OP_READ,
1162 CEPH_OSD_FLAG_READ,
1163 NULL,
1164 1, obj, ofs, len, buf, NULL, ver);
1165 }
1166
1167 /*
1168 * Request sync osd watch
1169 */
1170 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1171 u64 ver,
1172 u64 notify_id,
1173 const char *obj)
1174 {
1175 struct ceph_osd_req_op *ops;
1176 struct page **pages = NULL;
1177 int ret;
1178
1179 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1180 if (ret < 0)
1181 return ret;
1182
1183 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1184 ops[0].watch.cookie = notify_id;
1185 ops[0].watch.flag = 0;
1186
1187 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1188 obj, 0, 0, NULL,
1189 pages, 0,
1190 CEPH_OSD_FLAG_READ,
1191 ops,
1192 1,
1193 NULL, 0,
1194 rbd_simple_req_cb, 0, NULL);
1195
1196 rbd_destroy_ops(ops);
1197 return ret;
1198 }
1199
1200 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1201 {
1202 struct rbd_device *dev = (struct rbd_device *)data;
1203 int rc;
1204
1205 if (!dev)
1206 return;
1207
1208 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1209 notify_id, (int)opcode);
1210 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1211 rc = __rbd_update_snaps(dev);
1212 mutex_unlock(&ctl_mutex);
1213 if (rc)
1214 pr_warning(DRV_NAME "%d got notification but failed to update"
1215 " snaps: %d\n", dev->major, rc);
1216
1217 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1218 }
1219
1220 /*
1221 * Request sync osd watch
1222 */
1223 static int rbd_req_sync_watch(struct rbd_device *dev,
1224 const char *obj,
1225 u64 ver)
1226 {
1227 struct ceph_osd_req_op *ops;
1228 struct ceph_osd_client *osdc = &dev->client->osdc;
1229
1230 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1231 if (ret < 0)
1232 return ret;
1233
1234 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1235 (void *)dev, &dev->watch_event);
1236 if (ret < 0)
1237 goto fail;
1238
1239 ops[0].watch.ver = cpu_to_le64(ver);
1240 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1241 ops[0].watch.flag = 1;
1242
1243 ret = rbd_req_sync_op(dev, NULL,
1244 CEPH_NOSNAP,
1245 0,
1246 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1247 ops,
1248 1, obj, 0, 0, NULL,
1249 &dev->watch_request, NULL);
1250
1251 if (ret < 0)
1252 goto fail_event;
1253
1254 rbd_destroy_ops(ops);
1255 return 0;
1256
1257 fail_event:
1258 ceph_osdc_cancel_event(dev->watch_event);
1259 dev->watch_event = NULL;
1260 fail:
1261 rbd_destroy_ops(ops);
1262 return ret;
1263 }
1264
1265 /*
1266 * Request sync osd unwatch
1267 */
1268 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1269 const char *obj)
1270 {
1271 struct ceph_osd_req_op *ops;
1272
1273 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1274 if (ret < 0)
1275 return ret;
1276
1277 ops[0].watch.ver = 0;
1278 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1279 ops[0].watch.flag = 0;
1280
1281 ret = rbd_req_sync_op(dev, NULL,
1282 CEPH_NOSNAP,
1283 0,
1284 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1285 ops,
1286 1, obj, 0, 0, NULL, NULL, NULL);
1287
1288 rbd_destroy_ops(ops);
1289 ceph_osdc_cancel_event(dev->watch_event);
1290 dev->watch_event = NULL;
1291 return ret;
1292 }
1293
1294 struct rbd_notify_info {
1295 struct rbd_device *dev;
1296 };
1297
1298 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1299 {
1300 struct rbd_device *dev = (struct rbd_device *)data;
1301 if (!dev)
1302 return;
1303
1304 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1305 notify_id, (int)opcode);
1306 }
1307
1308 /*
1309 * Request sync osd notify
1310 */
1311 static int rbd_req_sync_notify(struct rbd_device *dev,
1312 const char *obj)
1313 {
1314 struct ceph_osd_req_op *ops;
1315 struct ceph_osd_client *osdc = &dev->client->osdc;
1316 struct ceph_osd_event *event;
1317 struct rbd_notify_info info;
1318 int payload_len = sizeof(u32) + sizeof(u32);
1319 int ret;
1320
1321 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1322 if (ret < 0)
1323 return ret;
1324
1325 info.dev = dev;
1326
1327 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1328 (void *)&info, &event);
1329 if (ret < 0)
1330 goto fail;
1331
1332 ops[0].watch.ver = 1;
1333 ops[0].watch.flag = 1;
1334 ops[0].watch.cookie = event->cookie;
1335 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1336 ops[0].watch.timeout = 12;
1337
1338 ret = rbd_req_sync_op(dev, NULL,
1339 CEPH_NOSNAP,
1340 0,
1341 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1342 ops,
1343 1, obj, 0, 0, NULL, NULL, NULL);
1344 if (ret < 0)
1345 goto fail_event;
1346
1347 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1348 dout("ceph_osdc_wait_event returned %d\n", ret);
1349 rbd_destroy_ops(ops);
1350 return 0;
1351
1352 fail_event:
1353 ceph_osdc_cancel_event(event);
1354 fail:
1355 rbd_destroy_ops(ops);
1356 return ret;
1357 }
1358
1359 /*
1360 * Request sync osd read
1361 */
1362 static int rbd_req_sync_exec(struct rbd_device *dev,
1363 const char *obj,
1364 const char *cls,
1365 const char *method,
1366 const char *data,
1367 int len,
1368 u64 *ver)
1369 {
1370 struct ceph_osd_req_op *ops;
1371 int cls_len = strlen(cls);
1372 int method_len = strlen(method);
1373 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1374 cls_len + method_len + len);
1375 if (ret < 0)
1376 return ret;
1377
1378 ops[0].cls.class_name = cls;
1379 ops[0].cls.class_len = (__u8)cls_len;
1380 ops[0].cls.method_name = method;
1381 ops[0].cls.method_len = (__u8)method_len;
1382 ops[0].cls.argc = 0;
1383 ops[0].cls.indata = data;
1384 ops[0].cls.indata_len = len;
1385
1386 ret = rbd_req_sync_op(dev, NULL,
1387 CEPH_NOSNAP,
1388 0,
1389 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1390 ops,
1391 1, obj, 0, 0, NULL, NULL, ver);
1392
1393 rbd_destroy_ops(ops);
1394
1395 dout("cls_exec returned %d\n", ret);
1396 return ret;
1397 }
1398
1399 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1400 {
1401 struct rbd_req_coll *coll =
1402 kzalloc(sizeof(struct rbd_req_coll) +
1403 sizeof(struct rbd_req_status) * num_reqs,
1404 GFP_ATOMIC);
1405
1406 if (!coll)
1407 return NULL;
1408 coll->total = num_reqs;
1409 kref_init(&coll->kref);
1410 return coll;
1411 }
1412
1413 /*
1414 * block device queue callback
1415 */
1416 static void rbd_rq_fn(struct request_queue *q)
1417 {
1418 struct rbd_device *rbd_dev = q->queuedata;
1419 struct request *rq;
1420 struct bio_pair *bp = NULL;
1421
1422 rq = blk_fetch_request(q);
1423
1424 while (1) {
1425 struct bio *bio;
1426 struct bio *rq_bio, *next_bio = NULL;
1427 bool do_write;
1428 int size, op_size = 0;
1429 u64 ofs;
1430 int num_segs, cur_seg = 0;
1431 struct rbd_req_coll *coll;
1432
1433 /* peek at request from block layer */
1434 if (!rq)
1435 break;
1436
1437 dout("fetched request\n");
1438
1439 /* filter out block requests we don't understand */
1440 if ((rq->cmd_type != REQ_TYPE_FS)) {
1441 __blk_end_request_all(rq, 0);
1442 goto next;
1443 }
1444
1445 /* deduce our operation (read, write) */
1446 do_write = (rq_data_dir(rq) == WRITE);
1447
1448 size = blk_rq_bytes(rq);
1449 ofs = blk_rq_pos(rq) * 512ULL;
1450 rq_bio = rq->bio;
1451 if (do_write && rbd_dev->read_only) {
1452 __blk_end_request_all(rq, -EROFS);
1453 goto next;
1454 }
1455
1456 spin_unlock_irq(q->queue_lock);
1457
1458 dout("%s 0x%x bytes at 0x%llx\n",
1459 do_write ? "write" : "read",
1460 size, blk_rq_pos(rq) * 512ULL);
1461
1462 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1463 coll = rbd_alloc_coll(num_segs);
1464 if (!coll) {
1465 spin_lock_irq(q->queue_lock);
1466 __blk_end_request_all(rq, -ENOMEM);
1467 goto next;
1468 }
1469
1470 do {
1471 /* a bio clone to be passed down to OSD req */
1472 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1473 op_size = rbd_get_segment(&rbd_dev->header,
1474 rbd_dev->header.block_name,
1475 ofs, size,
1476 NULL, NULL);
1477 kref_get(&coll->kref);
1478 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1479 op_size, GFP_ATOMIC);
1480 if (!bio) {
1481 rbd_coll_end_req_index(rq, coll, cur_seg,
1482 -ENOMEM, op_size);
1483 goto next_seg;
1484 }
1485
1486
1487 /* init OSD command: write or read */
1488 if (do_write)
1489 rbd_req_write(rq, rbd_dev,
1490 rbd_dev->header.snapc,
1491 ofs,
1492 op_size, bio,
1493 coll, cur_seg);
1494 else
1495 rbd_req_read(rq, rbd_dev,
1496 cur_snap_id(rbd_dev),
1497 ofs,
1498 op_size, bio,
1499 coll, cur_seg);
1500
1501 next_seg:
1502 size -= op_size;
1503 ofs += op_size;
1504
1505 cur_seg++;
1506 rq_bio = next_bio;
1507 } while (size > 0);
1508 kref_put(&coll->kref, rbd_coll_release);
1509
1510 if (bp)
1511 bio_pair_release(bp);
1512 spin_lock_irq(q->queue_lock);
1513 next:
1514 rq = blk_fetch_request(q);
1515 }
1516 }
1517
1518 /*
1519 * a queue callback. Makes sure that we don't create a bio that spans across
1520 * multiple osd objects. One exception would be with a single page bios,
1521 * which we handle later at bio_chain_clone
1522 */
1523 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1524 struct bio_vec *bvec)
1525 {
1526 struct rbd_device *rbd_dev = q->queuedata;
1527 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1528 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1529 unsigned int bio_sectors = bmd->bi_size >> 9;
1530 int max;
1531
1532 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1533 + bio_sectors)) << 9;
1534 if (max < 0)
1535 max = 0; /* bio_add cannot handle a negative return */
1536 if (max <= bvec->bv_len && bio_sectors == 0)
1537 return bvec->bv_len;
1538 return max;
1539 }
1540
1541 static void rbd_free_disk(struct rbd_device *rbd_dev)
1542 {
1543 struct gendisk *disk = rbd_dev->disk;
1544
1545 if (!disk)
1546 return;
1547
1548 rbd_header_free(&rbd_dev->header);
1549
1550 if (disk->flags & GENHD_FL_UP)
1551 del_gendisk(disk);
1552 if (disk->queue)
1553 blk_cleanup_queue(disk->queue);
1554 put_disk(disk);
1555 }
1556
1557 /*
1558 * reload the ondisk the header
1559 */
1560 static int rbd_read_header(struct rbd_device *rbd_dev,
1561 struct rbd_image_header *header)
1562 {
1563 ssize_t rc;
1564 struct rbd_image_header_ondisk *dh;
1565 int snap_count = 0;
1566 u64 snap_names_len = 0;
1567 u64 ver;
1568
1569 while (1) {
1570 int len = sizeof(*dh) +
1571 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1572 snap_names_len;
1573
1574 rc = -ENOMEM;
1575 dh = kmalloc(len, GFP_KERNEL);
1576 if (!dh)
1577 return -ENOMEM;
1578
1579 rc = rbd_req_sync_read(rbd_dev,
1580 NULL, CEPH_NOSNAP,
1581 rbd_dev->obj_md_name,
1582 0, len,
1583 (char *)dh, &ver);
1584 if (rc < 0)
1585 goto out_dh;
1586
1587 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1588 if (rc < 0) {
1589 if (rc == -ENXIO) {
1590 pr_warning("unrecognized header format"
1591 " for image %s", rbd_dev->obj);
1592 }
1593 goto out_dh;
1594 }
1595
1596 if (snap_count != header->total_snaps) {
1597 snap_count = header->total_snaps;
1598 snap_names_len = header->snap_names_len;
1599 rbd_header_free(header);
1600 kfree(dh);
1601 continue;
1602 }
1603 break;
1604 }
1605 header->obj_version = ver;
1606
1607 out_dh:
1608 kfree(dh);
1609 return rc;
1610 }
1611
1612 /*
1613 * create a snapshot
1614 */
1615 static int rbd_header_add_snap(struct rbd_device *dev,
1616 const char *snap_name,
1617 gfp_t gfp_flags)
1618 {
1619 int name_len = strlen(snap_name);
1620 u64 new_snapid;
1621 int ret;
1622 void *data, *p, *e;
1623 u64 ver;
1624
1625 /* we should create a snapshot only if we're pointing at the head */
1626 if (dev->cur_snap)
1627 return -EINVAL;
1628
1629 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1630 &new_snapid);
1631 dout("created snapid=%lld\n", new_snapid);
1632 if (ret < 0)
1633 return ret;
1634
1635 data = kmalloc(name_len + 16, gfp_flags);
1636 if (!data)
1637 return -ENOMEM;
1638
1639 p = data;
1640 e = data + name_len + 16;
1641
1642 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1643 ceph_encode_64_safe(&p, e, new_snapid, bad);
1644
1645 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1646 data, p - data, &ver);
1647
1648 kfree(data);
1649
1650 if (ret < 0)
1651 return ret;
1652
1653 dev->header.snapc->seq = new_snapid;
1654
1655 return 0;
1656 bad:
1657 return -ERANGE;
1658 }
1659
1660 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1661 {
1662 struct rbd_snap *snap;
1663
1664 while (!list_empty(&rbd_dev->snaps)) {
1665 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1666 __rbd_remove_snap_dev(rbd_dev, snap);
1667 }
1668 }
1669
1670 /*
1671 * only read the first part of the ondisk header, without the snaps info
1672 */
1673 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1674 {
1675 int ret;
1676 struct rbd_image_header h;
1677 u64 snap_seq;
1678 int follow_seq = 0;
1679
1680 ret = rbd_read_header(rbd_dev, &h);
1681 if (ret < 0)
1682 return ret;
1683
1684 /* resized? */
1685 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1686
1687 down_write(&rbd_dev->header.snap_rwsem);
1688
1689 snap_seq = rbd_dev->header.snapc->seq;
1690 if (rbd_dev->header.total_snaps &&
1691 rbd_dev->header.snapc->snaps[0] == snap_seq)
1692 /* pointing at the head, will need to follow that
1693 if head moves */
1694 follow_seq = 1;
1695
1696 kfree(rbd_dev->header.snapc);
1697 kfree(rbd_dev->header.snap_names);
1698 kfree(rbd_dev->header.snap_sizes);
1699
1700 rbd_dev->header.total_snaps = h.total_snaps;
1701 rbd_dev->header.snapc = h.snapc;
1702 rbd_dev->header.snap_names = h.snap_names;
1703 rbd_dev->header.snap_names_len = h.snap_names_len;
1704 rbd_dev->header.snap_sizes = h.snap_sizes;
1705 if (follow_seq)
1706 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1707 else
1708 rbd_dev->header.snapc->seq = snap_seq;
1709
1710 ret = __rbd_init_snaps_header(rbd_dev);
1711
1712 up_write(&rbd_dev->header.snap_rwsem);
1713
1714 return ret;
1715 }
1716
1717 static int rbd_init_disk(struct rbd_device *rbd_dev)
1718 {
1719 struct gendisk *disk;
1720 struct request_queue *q;
1721 int rc;
1722 u64 total_size = 0;
1723
1724 /* contact OSD, request size info about the object being mapped */
1725 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1726 if (rc)
1727 return rc;
1728
1729 /* no need to lock here, as rbd_dev is not registered yet */
1730 rc = __rbd_init_snaps_header(rbd_dev);
1731 if (rc)
1732 return rc;
1733
1734 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1735 if (rc)
1736 return rc;
1737
1738 /* create gendisk info */
1739 rc = -ENOMEM;
1740 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1741 if (!disk)
1742 goto out;
1743
1744 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1745 rbd_dev->id);
1746 disk->major = rbd_dev->major;
1747 disk->first_minor = 0;
1748 disk->fops = &rbd_bd_ops;
1749 disk->private_data = rbd_dev;
1750
1751 /* init rq */
1752 rc = -ENOMEM;
1753 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1754 if (!q)
1755 goto out_disk;
1756
1757 /* set io sizes to object size */
1758 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1759 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1760 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1761 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1762
1763 blk_queue_merge_bvec(q, rbd_merge_bvec);
1764 disk->queue = q;
1765
1766 q->queuedata = rbd_dev;
1767
1768 rbd_dev->disk = disk;
1769 rbd_dev->q = q;
1770
1771 /* finally, announce the disk to the world */
1772 set_capacity(disk, total_size / 512ULL);
1773 add_disk(disk);
1774
1775 pr_info("%s: added with size 0x%llx\n",
1776 disk->disk_name, (unsigned long long)total_size);
1777 return 0;
1778
1779 out_disk:
1780 put_disk(disk);
1781 out:
1782 return rc;
1783 }
1784
1785 /*
1786 sysfs
1787 */
1788
1789 static ssize_t rbd_size_show(struct device *dev,
1790 struct device_attribute *attr, char *buf)
1791 {
1792 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1793
1794 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1795 }
1796
1797 static ssize_t rbd_major_show(struct device *dev,
1798 struct device_attribute *attr, char *buf)
1799 {
1800 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1801
1802 return sprintf(buf, "%d\n", rbd_dev->major);
1803 }
1804
1805 static ssize_t rbd_client_id_show(struct device *dev,
1806 struct device_attribute *attr, char *buf)
1807 {
1808 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1809
1810 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1811 }
1812
1813 static ssize_t rbd_pool_show(struct device *dev,
1814 struct device_attribute *attr, char *buf)
1815 {
1816 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1817
1818 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1819 }
1820
1821 static ssize_t rbd_name_show(struct device *dev,
1822 struct device_attribute *attr, char *buf)
1823 {
1824 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1825
1826 return sprintf(buf, "%s\n", rbd_dev->obj);
1827 }
1828
1829 static ssize_t rbd_snap_show(struct device *dev,
1830 struct device_attribute *attr,
1831 char *buf)
1832 {
1833 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1834
1835 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1836 }
1837
1838 static ssize_t rbd_image_refresh(struct device *dev,
1839 struct device_attribute *attr,
1840 const char *buf,
1841 size_t size)
1842 {
1843 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1844 int rc;
1845 int ret = size;
1846
1847 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1848
1849 rc = __rbd_update_snaps(rbd_dev);
1850 if (rc < 0)
1851 ret = rc;
1852
1853 mutex_unlock(&ctl_mutex);
1854 return ret;
1855 }
1856
1857 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1858 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1859 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1860 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1861 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1862 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1863 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1864 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1865
1866 static struct attribute *rbd_attrs[] = {
1867 &dev_attr_size.attr,
1868 &dev_attr_major.attr,
1869 &dev_attr_client_id.attr,
1870 &dev_attr_pool.attr,
1871 &dev_attr_name.attr,
1872 &dev_attr_current_snap.attr,
1873 &dev_attr_refresh.attr,
1874 &dev_attr_create_snap.attr,
1875 NULL
1876 };
1877
1878 static struct attribute_group rbd_attr_group = {
1879 .attrs = rbd_attrs,
1880 };
1881
1882 static const struct attribute_group *rbd_attr_groups[] = {
1883 &rbd_attr_group,
1884 NULL
1885 };
1886
1887 static void rbd_sysfs_dev_release(struct device *dev)
1888 {
1889 }
1890
1891 static struct device_type rbd_device_type = {
1892 .name = "rbd",
1893 .groups = rbd_attr_groups,
1894 .release = rbd_sysfs_dev_release,
1895 };
1896
1897
1898 /*
1899 sysfs - snapshots
1900 */
1901
1902 static ssize_t rbd_snap_size_show(struct device *dev,
1903 struct device_attribute *attr,
1904 char *buf)
1905 {
1906 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1907
1908 return sprintf(buf, "%lld\n", (long long)snap->size);
1909 }
1910
1911 static ssize_t rbd_snap_id_show(struct device *dev,
1912 struct device_attribute *attr,
1913 char *buf)
1914 {
1915 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1916
1917 return sprintf(buf, "%lld\n", (long long)snap->id);
1918 }
1919
1920 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1921 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1922
1923 static struct attribute *rbd_snap_attrs[] = {
1924 &dev_attr_snap_size.attr,
1925 &dev_attr_snap_id.attr,
1926 NULL,
1927 };
1928
1929 static struct attribute_group rbd_snap_attr_group = {
1930 .attrs = rbd_snap_attrs,
1931 };
1932
1933 static void rbd_snap_dev_release(struct device *dev)
1934 {
1935 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1936 kfree(snap->name);
1937 kfree(snap);
1938 }
1939
1940 static const struct attribute_group *rbd_snap_attr_groups[] = {
1941 &rbd_snap_attr_group,
1942 NULL
1943 };
1944
1945 static struct device_type rbd_snap_device_type = {
1946 .groups = rbd_snap_attr_groups,
1947 .release = rbd_snap_dev_release,
1948 };
1949
1950 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1951 struct rbd_snap *snap)
1952 {
1953 list_del(&snap->node);
1954 device_unregister(&snap->dev);
1955 }
1956
1957 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1958 struct rbd_snap *snap,
1959 struct device *parent)
1960 {
1961 struct device *dev = &snap->dev;
1962 int ret;
1963
1964 dev->type = &rbd_snap_device_type;
1965 dev->parent = parent;
1966 dev->release = rbd_snap_dev_release;
1967 dev_set_name(dev, "snap_%s", snap->name);
1968 ret = device_register(dev);
1969
1970 return ret;
1971 }
1972
1973 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1974 int i, const char *name,
1975 struct rbd_snap **snapp)
1976 {
1977 int ret;
1978 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1979 if (!snap)
1980 return -ENOMEM;
1981 snap->name = kstrdup(name, GFP_KERNEL);
1982 snap->size = rbd_dev->header.snap_sizes[i];
1983 snap->id = rbd_dev->header.snapc->snaps[i];
1984 if (device_is_registered(&rbd_dev->dev)) {
1985 ret = rbd_register_snap_dev(rbd_dev, snap,
1986 &rbd_dev->dev);
1987 if (ret < 0)
1988 goto err;
1989 }
1990 *snapp = snap;
1991 return 0;
1992 err:
1993 kfree(snap->name);
1994 kfree(snap);
1995 return ret;
1996 }
1997
1998 /*
1999 * search for the previous snap in a null delimited string list
2000 */
2001 const char *rbd_prev_snap_name(const char *name, const char *start)
2002 {
2003 if (name < start + 2)
2004 return NULL;
2005
2006 name -= 2;
2007 while (*name) {
2008 if (name == start)
2009 return start;
2010 name--;
2011 }
2012 return name + 1;
2013 }
2014
2015 /*
2016 * compare the old list of snapshots that we have to what's in the header
2017 * and update it accordingly. Note that the header holds the snapshots
2018 * in a reverse order (from newest to oldest) and we need to go from
2019 * older to new so that we don't get a duplicate snap name when
2020 * doing the process (e.g., removed snapshot and recreated a new
2021 * one with the same name.
2022 */
2023 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2024 {
2025 const char *name, *first_name;
2026 int i = rbd_dev->header.total_snaps;
2027 struct rbd_snap *snap, *old_snap = NULL;
2028 int ret;
2029 struct list_head *p, *n;
2030
2031 first_name = rbd_dev->header.snap_names;
2032 name = first_name + rbd_dev->header.snap_names_len;
2033
2034 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2035 u64 cur_id;
2036
2037 old_snap = list_entry(p, struct rbd_snap, node);
2038
2039 if (i)
2040 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2041
2042 if (!i || old_snap->id < cur_id) {
2043 /* old_snap->id was skipped, thus was removed */
2044 __rbd_remove_snap_dev(rbd_dev, old_snap);
2045 continue;
2046 }
2047 if (old_snap->id == cur_id) {
2048 /* we have this snapshot already */
2049 i--;
2050 name = rbd_prev_snap_name(name, first_name);
2051 continue;
2052 }
2053 for (; i > 0;
2054 i--, name = rbd_prev_snap_name(name, first_name)) {
2055 if (!name) {
2056 WARN_ON(1);
2057 return -EINVAL;
2058 }
2059 cur_id = rbd_dev->header.snapc->snaps[i];
2060 /* snapshot removal? handle it above */
2061 if (cur_id >= old_snap->id)
2062 break;
2063 /* a new snapshot */
2064 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2065 if (ret < 0)
2066 return ret;
2067
2068 /* note that we add it backward so using n and not p */
2069 list_add(&snap->node, n);
2070 p = &snap->node;
2071 }
2072 }
2073 /* we're done going over the old snap list, just add what's left */
2074 for (; i > 0; i--) {
2075 name = rbd_prev_snap_name(name, first_name);
2076 if (!name) {
2077 WARN_ON(1);
2078 return -EINVAL;
2079 }
2080 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2081 if (ret < 0)
2082 return ret;
2083 list_add(&snap->node, &rbd_dev->snaps);
2084 }
2085
2086 return 0;
2087 }
2088
2089
2090 static void rbd_root_dev_release(struct device *dev)
2091 {
2092 }
2093
2094 static struct device rbd_root_dev = {
2095 .init_name = "rbd",
2096 .release = rbd_root_dev_release,
2097 };
2098
2099 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2100 {
2101 int ret = -ENOMEM;
2102 struct device *dev;
2103 struct rbd_snap *snap;
2104
2105 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2106 dev = &rbd_dev->dev;
2107
2108 dev->bus = &rbd_bus_type;
2109 dev->type = &rbd_device_type;
2110 dev->parent = &rbd_root_dev;
2111 dev->release = rbd_dev_release;
2112 dev_set_name(dev, "%d", rbd_dev->id);
2113 ret = device_register(dev);
2114 if (ret < 0)
2115 goto done_free;
2116
2117 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2118 ret = rbd_register_snap_dev(rbd_dev, snap,
2119 &rbd_dev->dev);
2120 if (ret < 0)
2121 break;
2122 }
2123
2124 mutex_unlock(&ctl_mutex);
2125 return 0;
2126 done_free:
2127 mutex_unlock(&ctl_mutex);
2128 return ret;
2129 }
2130
2131 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2132 {
2133 device_unregister(&rbd_dev->dev);
2134 }
2135
2136 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2137 {
2138 int ret, rc;
2139
2140 do {
2141 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2142 rbd_dev->header.obj_version);
2143 if (ret == -ERANGE) {
2144 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2145 rc = __rbd_update_snaps(rbd_dev);
2146 mutex_unlock(&ctl_mutex);
2147 if (rc < 0)
2148 return rc;
2149 }
2150 } while (ret == -ERANGE);
2151
2152 return ret;
2153 }
2154
2155 static ssize_t rbd_add(struct bus_type *bus,
2156 const char *buf,
2157 size_t count)
2158 {
2159 struct ceph_osd_client *osdc;
2160 struct rbd_device *rbd_dev;
2161 ssize_t rc = -ENOMEM;
2162 int irc, new_id = 0;
2163 struct list_head *tmp;
2164 char *mon_dev_name;
2165 char *options;
2166
2167 if (!try_module_get(THIS_MODULE))
2168 return -ENODEV;
2169
2170 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2171 if (!mon_dev_name)
2172 goto err_out_mod;
2173
2174 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2175 if (!options)
2176 goto err_mon_dev;
2177
2178 /* new rbd_device object */
2179 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2180 if (!rbd_dev)
2181 goto err_out_opt;
2182
2183 /* static rbd_device initialization */
2184 spin_lock_init(&rbd_dev->lock);
2185 INIT_LIST_HEAD(&rbd_dev->node);
2186 INIT_LIST_HEAD(&rbd_dev->snaps);
2187
2188 init_rwsem(&rbd_dev->header.snap_rwsem);
2189
2190 /* generate unique id: find highest unique id, add one */
2191 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2192
2193 list_for_each(tmp, &rbd_dev_list) {
2194 struct rbd_device *rbd_dev;
2195
2196 rbd_dev = list_entry(tmp, struct rbd_device, node);
2197 if (rbd_dev->id >= new_id)
2198 new_id = rbd_dev->id + 1;
2199 }
2200
2201 rbd_dev->id = new_id;
2202
2203 /* add to global list */
2204 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2205
2206 /* parse add command */
2207 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2208 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2209 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2210 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2211 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2212 mon_dev_name, options, rbd_dev->pool_name,
2213 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2214 rc = -EINVAL;
2215 goto err_out_slot;
2216 }
2217
2218 if (rbd_dev->snap_name[0] == 0)
2219 rbd_dev->snap_name[0] = '-';
2220
2221 rbd_dev->obj_len = strlen(rbd_dev->obj);
2222 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2223 rbd_dev->obj, RBD_SUFFIX);
2224
2225 /* initialize rest of new object */
2226 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2227 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2228 if (rc < 0)
2229 goto err_out_slot;
2230
2231 mutex_unlock(&ctl_mutex);
2232
2233 /* pick the pool */
2234 osdc = &rbd_dev->client->osdc;
2235 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2236 if (rc < 0)
2237 goto err_out_client;
2238 rbd_dev->poolid = rc;
2239
2240 /* register our block device */
2241 irc = register_blkdev(0, rbd_dev->name);
2242 if (irc < 0) {
2243 rc = irc;
2244 goto err_out_client;
2245 }
2246 rbd_dev->major = irc;
2247
2248 rc = rbd_bus_add_dev(rbd_dev);
2249 if (rc)
2250 goto err_out_blkdev;
2251
2252 /* set up and announce blkdev mapping */
2253 rc = rbd_init_disk(rbd_dev);
2254 if (rc)
2255 goto err_out_bus;
2256
2257 rc = rbd_init_watch_dev(rbd_dev);
2258 if (rc)
2259 goto err_out_bus;
2260
2261 return count;
2262
2263 err_out_bus:
2264 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2265 list_del_init(&rbd_dev->node);
2266 mutex_unlock(&ctl_mutex);
2267
2268 /* this will also clean up rest of rbd_dev stuff */
2269
2270 rbd_bus_del_dev(rbd_dev);
2271 kfree(options);
2272 kfree(mon_dev_name);
2273 return rc;
2274
2275 err_out_blkdev:
2276 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2277 err_out_client:
2278 rbd_put_client(rbd_dev);
2279 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2280 err_out_slot:
2281 list_del_init(&rbd_dev->node);
2282 mutex_unlock(&ctl_mutex);
2283
2284 kfree(rbd_dev);
2285 err_out_opt:
2286 kfree(options);
2287 err_mon_dev:
2288 kfree(mon_dev_name);
2289 err_out_mod:
2290 dout("Error adding device %s\n", buf);
2291 module_put(THIS_MODULE);
2292 return rc;
2293 }
2294
2295 static struct rbd_device *__rbd_get_dev(unsigned long id)
2296 {
2297 struct list_head *tmp;
2298 struct rbd_device *rbd_dev;
2299
2300 list_for_each(tmp, &rbd_dev_list) {
2301 rbd_dev = list_entry(tmp, struct rbd_device, node);
2302 if (rbd_dev->id == id)
2303 return rbd_dev;
2304 }
2305 return NULL;
2306 }
2307
2308 static void rbd_dev_release(struct device *dev)
2309 {
2310 struct rbd_device *rbd_dev =
2311 container_of(dev, struct rbd_device, dev);
2312
2313 if (rbd_dev->watch_request)
2314 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2315 rbd_dev->watch_request);
2316 if (rbd_dev->watch_event)
2317 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2318
2319 rbd_put_client(rbd_dev);
2320
2321 /* clean up and free blkdev */
2322 rbd_free_disk(rbd_dev);
2323 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2324 kfree(rbd_dev);
2325
2326 /* release module ref */
2327 module_put(THIS_MODULE);
2328 }
2329
2330 static ssize_t rbd_remove(struct bus_type *bus,
2331 const char *buf,
2332 size_t count)
2333 {
2334 struct rbd_device *rbd_dev = NULL;
2335 int target_id, rc;
2336 unsigned long ul;
2337 int ret = count;
2338
2339 rc = strict_strtoul(buf, 10, &ul);
2340 if (rc)
2341 return rc;
2342
2343 /* convert to int; abort if we lost anything in the conversion */
2344 target_id = (int) ul;
2345 if (target_id != ul)
2346 return -EINVAL;
2347
2348 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2349
2350 rbd_dev = __rbd_get_dev(target_id);
2351 if (!rbd_dev) {
2352 ret = -ENOENT;
2353 goto done;
2354 }
2355
2356 list_del_init(&rbd_dev->node);
2357
2358 __rbd_remove_all_snaps(rbd_dev);
2359 rbd_bus_del_dev(rbd_dev);
2360
2361 done:
2362 mutex_unlock(&ctl_mutex);
2363 return ret;
2364 }
2365
2366 static ssize_t rbd_snap_add(struct device *dev,
2367 struct device_attribute *attr,
2368 const char *buf,
2369 size_t count)
2370 {
2371 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2372 int ret;
2373 char *name = kmalloc(count + 1, GFP_KERNEL);
2374 if (!name)
2375 return -ENOMEM;
2376
2377 snprintf(name, count, "%s", buf);
2378
2379 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2380
2381 ret = rbd_header_add_snap(rbd_dev,
2382 name, GFP_KERNEL);
2383 if (ret < 0)
2384 goto err_unlock;
2385
2386 ret = __rbd_update_snaps(rbd_dev);
2387 if (ret < 0)
2388 goto err_unlock;
2389
2390 /* shouldn't hold ctl_mutex when notifying.. notify might
2391 trigger a watch callback that would need to get that mutex */
2392 mutex_unlock(&ctl_mutex);
2393
2394 /* make a best effort, don't error if failed */
2395 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2396
2397 ret = count;
2398 kfree(name);
2399 return ret;
2400
2401 err_unlock:
2402 mutex_unlock(&ctl_mutex);
2403 kfree(name);
2404 return ret;
2405 }
2406
2407 static struct bus_attribute rbd_bus_attrs[] = {
2408 __ATTR(add, S_IWUSR, NULL, rbd_add),
2409 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2410 __ATTR_NULL
2411 };
2412
2413 /*
2414 * create control files in sysfs
2415 * /sys/bus/rbd/...
2416 */
2417 static int rbd_sysfs_init(void)
2418 {
2419 int ret;
2420
2421 rbd_bus_type.bus_attrs = rbd_bus_attrs;
2422
2423 ret = bus_register(&rbd_bus_type);
2424 if (ret < 0)
2425 return ret;
2426
2427 ret = device_register(&rbd_root_dev);
2428
2429 return ret;
2430 }
2431
2432 static void rbd_sysfs_cleanup(void)
2433 {
2434 device_unregister(&rbd_root_dev);
2435 bus_unregister(&rbd_bus_type);
2436 }
2437
2438 int __init rbd_init(void)
2439 {
2440 int rc;
2441
2442 rc = rbd_sysfs_init();
2443 if (rc)
2444 return rc;
2445 pr_info("loaded " DRV_NAME_LONG "\n");
2446 return 0;
2447 }
2448
2449 void __exit rbd_exit(void)
2450 {
2451 rbd_sysfs_cleanup();
2452 }
2453
2454 module_init(rbd_init);
2455 module_exit(rbd_exit);
2456
2457 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2458 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2459 MODULE_DESCRIPTION("rados block device");
2460
2461 /* following authorship retained from original osdblk.c */
2462 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2463
2464 MODULE_LICENSE("GPL");