Merge branches 'x86-alternatives-for-linus', 'x86-fpu-for-linus', 'x86-hwmon-for...
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34
35 #include <linux/kernel.h>
36 #include <linux/device.h>
37 #include <linux/module.h>
38 #include <linux/fs.h>
39 #include <linux/blkdev.h>
40
41 #include "rbd_types.h"
42
43 #define DRV_NAME "rbd"
44 #define DRV_NAME_LONG "rbd (rados block device)"
45
46 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
47
48 #define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
49 #define RBD_MAX_POOL_NAME_LEN 64
50 #define RBD_MAX_SNAP_NAME_LEN 32
51 #define RBD_MAX_OPT_LEN 1024
52
53 #define RBD_SNAP_HEAD_NAME "-"
54
55 #define DEV_NAME_LEN 32
56
57 /*
58 * block device image metadata (in-memory version)
59 */
60 struct rbd_image_header {
61 u64 image_size;
62 char block_name[32];
63 __u8 obj_order;
64 __u8 crypt_type;
65 __u8 comp_type;
66 struct rw_semaphore snap_rwsem;
67 struct ceph_snap_context *snapc;
68 size_t snap_names_len;
69 u64 snap_seq;
70 u32 total_snaps;
71
72 char *snap_names;
73 u64 *snap_sizes;
74 };
75
76 /*
77 * an instance of the client. multiple devices may share a client.
78 */
79 struct rbd_client {
80 struct ceph_client *client;
81 struct kref kref;
82 struct list_head node;
83 };
84
85 /*
86 * a single io request
87 */
88 struct rbd_request {
89 struct request *rq; /* blk layer request */
90 struct bio *bio; /* cloned bio */
91 struct page **pages; /* list of used pages */
92 u64 len;
93 };
94
95 struct rbd_snap {
96 struct device dev;
97 const char *name;
98 size_t size;
99 struct list_head node;
100 u64 id;
101 };
102
103 /*
104 * a single device
105 */
106 struct rbd_device {
107 int id; /* blkdev unique id */
108
109 int major; /* blkdev assigned major */
110 struct gendisk *disk; /* blkdev's gendisk and rq */
111 struct request_queue *q;
112
113 struct ceph_client *client;
114 struct rbd_client *rbd_client;
115
116 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
117
118 spinlock_t lock; /* queue lock */
119
120 struct rbd_image_header header;
121 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
122 int obj_len;
123 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
124 char pool_name[RBD_MAX_POOL_NAME_LEN];
125 int poolid;
126
127 char snap_name[RBD_MAX_SNAP_NAME_LEN];
128 u32 cur_snap; /* index+1 of current snapshot within snap context
129 0 - for the head */
130 int read_only;
131
132 struct list_head node;
133
134 /* list of snapshots */
135 struct list_head snaps;
136
137 /* sysfs related */
138 struct device dev;
139 };
140
141 static struct bus_type rbd_bus_type = {
142 .name = "rbd",
143 };
144
145 static spinlock_t node_lock; /* protects client get/put */
146
147 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
148 static LIST_HEAD(rbd_dev_list); /* devices */
149 static LIST_HEAD(rbd_client_list); /* clients */
150
151 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
152 static void rbd_dev_release(struct device *dev);
153 static ssize_t rbd_snap_rollback(struct device *dev,
154 struct device_attribute *attr,
155 const char *buf,
156 size_t size);
157 static ssize_t rbd_snap_add(struct device *dev,
158 struct device_attribute *attr,
159 const char *buf,
160 size_t count);
161 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
162 struct rbd_snap *snap);;
163
164
165 static struct rbd_device *dev_to_rbd(struct device *dev)
166 {
167 return container_of(dev, struct rbd_device, dev);
168 }
169
170 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
171 {
172 return get_device(&rbd_dev->dev);
173 }
174
175 static void rbd_put_dev(struct rbd_device *rbd_dev)
176 {
177 put_device(&rbd_dev->dev);
178 }
179
180 static int rbd_open(struct block_device *bdev, fmode_t mode)
181 {
182 struct gendisk *disk = bdev->bd_disk;
183 struct rbd_device *rbd_dev = disk->private_data;
184
185 rbd_get_dev(rbd_dev);
186
187 set_device_ro(bdev, rbd_dev->read_only);
188
189 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
190 return -EROFS;
191
192 return 0;
193 }
194
195 static int rbd_release(struct gendisk *disk, fmode_t mode)
196 {
197 struct rbd_device *rbd_dev = disk->private_data;
198
199 rbd_put_dev(rbd_dev);
200
201 return 0;
202 }
203
204 static const struct block_device_operations rbd_bd_ops = {
205 .owner = THIS_MODULE,
206 .open = rbd_open,
207 .release = rbd_release,
208 };
209
210 /*
211 * Initialize an rbd client instance.
212 * We own *opt.
213 */
214 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
215 {
216 struct rbd_client *rbdc;
217 int ret = -ENOMEM;
218
219 dout("rbd_client_create\n");
220 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
221 if (!rbdc)
222 goto out_opt;
223
224 kref_init(&rbdc->kref);
225 INIT_LIST_HEAD(&rbdc->node);
226
227 rbdc->client = ceph_create_client(opt, rbdc);
228 if (IS_ERR(rbdc->client))
229 goto out_rbdc;
230 opt = NULL; /* Now rbdc->client is responsible for opt */
231
232 ret = ceph_open_session(rbdc->client);
233 if (ret < 0)
234 goto out_err;
235
236 spin_lock(&node_lock);
237 list_add_tail(&rbdc->node, &rbd_client_list);
238 spin_unlock(&node_lock);
239
240 dout("rbd_client_create created %p\n", rbdc);
241 return rbdc;
242
243 out_err:
244 ceph_destroy_client(rbdc->client);
245 out_rbdc:
246 kfree(rbdc);
247 out_opt:
248 if (opt)
249 ceph_destroy_options(opt);
250 return ERR_PTR(ret);
251 }
252
253 /*
254 * Find a ceph client with specific addr and configuration.
255 */
256 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
257 {
258 struct rbd_client *client_node;
259
260 if (opt->flags & CEPH_OPT_NOSHARE)
261 return NULL;
262
263 list_for_each_entry(client_node, &rbd_client_list, node)
264 if (ceph_compare_options(opt, client_node->client) == 0)
265 return client_node;
266 return NULL;
267 }
268
269 /*
270 * Get a ceph client with specific addr and configuration, if one does
271 * not exist create it.
272 */
273 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
274 char *options)
275 {
276 struct rbd_client *rbdc;
277 struct ceph_options *opt;
278 int ret;
279
280 ret = ceph_parse_options(&opt, options, mon_addr,
281 mon_addr + strlen(mon_addr), NULL, NULL);
282 if (ret < 0)
283 return ret;
284
285 spin_lock(&node_lock);
286 rbdc = __rbd_client_find(opt);
287 if (rbdc) {
288 ceph_destroy_options(opt);
289
290 /* using an existing client */
291 kref_get(&rbdc->kref);
292 rbd_dev->rbd_client = rbdc;
293 rbd_dev->client = rbdc->client;
294 spin_unlock(&node_lock);
295 return 0;
296 }
297 spin_unlock(&node_lock);
298
299 rbdc = rbd_client_create(opt);
300 if (IS_ERR(rbdc))
301 return PTR_ERR(rbdc);
302
303 rbd_dev->rbd_client = rbdc;
304 rbd_dev->client = rbdc->client;
305 return 0;
306 }
307
308 /*
309 * Destroy ceph client
310 */
311 static void rbd_client_release(struct kref *kref)
312 {
313 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
314
315 dout("rbd_release_client %p\n", rbdc);
316 spin_lock(&node_lock);
317 list_del(&rbdc->node);
318 spin_unlock(&node_lock);
319
320 ceph_destroy_client(rbdc->client);
321 kfree(rbdc);
322 }
323
324 /*
325 * Drop reference to ceph client node. If it's not referenced anymore, release
326 * it.
327 */
328 static void rbd_put_client(struct rbd_device *rbd_dev)
329 {
330 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
331 rbd_dev->rbd_client = NULL;
332 rbd_dev->client = NULL;
333 }
334
335
336 /*
337 * Create a new header structure, translate header format from the on-disk
338 * header.
339 */
340 static int rbd_header_from_disk(struct rbd_image_header *header,
341 struct rbd_image_header_ondisk *ondisk,
342 int allocated_snaps,
343 gfp_t gfp_flags)
344 {
345 int i;
346 u32 snap_count = le32_to_cpu(ondisk->snap_count);
347 int ret = -ENOMEM;
348
349 init_rwsem(&header->snap_rwsem);
350 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
351 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
352 snap_count *
353 sizeof(struct rbd_image_snap_ondisk),
354 gfp_flags);
355 if (!header->snapc)
356 return -ENOMEM;
357 if (snap_count) {
358 header->snap_names = kmalloc(header->snap_names_len,
359 GFP_KERNEL);
360 if (!header->snap_names)
361 goto err_snapc;
362 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
363 GFP_KERNEL);
364 if (!header->snap_sizes)
365 goto err_names;
366 } else {
367 header->snap_names = NULL;
368 header->snap_sizes = NULL;
369 }
370 memcpy(header->block_name, ondisk->block_name,
371 sizeof(ondisk->block_name));
372
373 header->image_size = le64_to_cpu(ondisk->image_size);
374 header->obj_order = ondisk->options.order;
375 header->crypt_type = ondisk->options.crypt_type;
376 header->comp_type = ondisk->options.comp_type;
377
378 atomic_set(&header->snapc->nref, 1);
379 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
380 header->snapc->num_snaps = snap_count;
381 header->total_snaps = snap_count;
382
383 if (snap_count &&
384 allocated_snaps == snap_count) {
385 for (i = 0; i < snap_count; i++) {
386 header->snapc->snaps[i] =
387 le64_to_cpu(ondisk->snaps[i].id);
388 header->snap_sizes[i] =
389 le64_to_cpu(ondisk->snaps[i].image_size);
390 }
391
392 /* copy snapshot names */
393 memcpy(header->snap_names, &ondisk->snaps[i],
394 header->snap_names_len);
395 }
396
397 return 0;
398
399 err_names:
400 kfree(header->snap_names);
401 err_snapc:
402 kfree(header->snapc);
403 return ret;
404 }
405
406 static int snap_index(struct rbd_image_header *header, int snap_num)
407 {
408 return header->total_snaps - snap_num;
409 }
410
411 static u64 cur_snap_id(struct rbd_device *rbd_dev)
412 {
413 struct rbd_image_header *header = &rbd_dev->header;
414
415 if (!rbd_dev->cur_snap)
416 return 0;
417
418 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
419 }
420
421 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
422 u64 *seq, u64 *size)
423 {
424 int i;
425 char *p = header->snap_names;
426
427 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
428 if (strcmp(snap_name, p) == 0)
429 break;
430 }
431 if (i == header->total_snaps)
432 return -ENOENT;
433 if (seq)
434 *seq = header->snapc->snaps[i];
435
436 if (size)
437 *size = header->snap_sizes[i];
438
439 return i;
440 }
441
442 static int rbd_header_set_snap(struct rbd_device *dev,
443 const char *snap_name,
444 u64 *size)
445 {
446 struct rbd_image_header *header = &dev->header;
447 struct ceph_snap_context *snapc = header->snapc;
448 int ret = -ENOENT;
449
450 down_write(&header->snap_rwsem);
451
452 if (!snap_name ||
453 !*snap_name ||
454 strcmp(snap_name, "-") == 0 ||
455 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
456 if (header->total_snaps)
457 snapc->seq = header->snap_seq;
458 else
459 snapc->seq = 0;
460 dev->cur_snap = 0;
461 dev->read_only = 0;
462 if (size)
463 *size = header->image_size;
464 } else {
465 ret = snap_by_name(header, snap_name, &snapc->seq, size);
466 if (ret < 0)
467 goto done;
468
469 dev->cur_snap = header->total_snaps - ret;
470 dev->read_only = 1;
471 }
472
473 ret = 0;
474 done:
475 up_write(&header->snap_rwsem);
476 return ret;
477 }
478
479 static void rbd_header_free(struct rbd_image_header *header)
480 {
481 kfree(header->snapc);
482 kfree(header->snap_names);
483 kfree(header->snap_sizes);
484 }
485
486 /*
487 * get the actual striped segment name, offset and length
488 */
489 static u64 rbd_get_segment(struct rbd_image_header *header,
490 const char *block_name,
491 u64 ofs, u64 len,
492 char *seg_name, u64 *segofs)
493 {
494 u64 seg = ofs >> header->obj_order;
495
496 if (seg_name)
497 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
498 "%s.%012llx", block_name, seg);
499
500 ofs = ofs & ((1 << header->obj_order) - 1);
501 len = min_t(u64, len, (1 << header->obj_order) - ofs);
502
503 if (segofs)
504 *segofs = ofs;
505
506 return len;
507 }
508
509 /*
510 * bio helpers
511 */
512
513 static void bio_chain_put(struct bio *chain)
514 {
515 struct bio *tmp;
516
517 while (chain) {
518 tmp = chain;
519 chain = chain->bi_next;
520 bio_put(tmp);
521 }
522 }
523
524 /*
525 * zeros a bio chain, starting at specific offset
526 */
527 static void zero_bio_chain(struct bio *chain, int start_ofs)
528 {
529 struct bio_vec *bv;
530 unsigned long flags;
531 void *buf;
532 int i;
533 int pos = 0;
534
535 while (chain) {
536 bio_for_each_segment(bv, chain, i) {
537 if (pos + bv->bv_len > start_ofs) {
538 int remainder = max(start_ofs - pos, 0);
539 buf = bvec_kmap_irq(bv, &flags);
540 memset(buf + remainder, 0,
541 bv->bv_len - remainder);
542 bvec_kunmap_irq(buf, &flags);
543 }
544 pos += bv->bv_len;
545 }
546
547 chain = chain->bi_next;
548 }
549 }
550
551 /*
552 * bio_chain_clone - clone a chain of bios up to a certain length.
553 * might return a bio_pair that will need to be released.
554 */
555 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
556 struct bio_pair **bp,
557 int len, gfp_t gfpmask)
558 {
559 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
560 int total = 0;
561
562 if (*bp) {
563 bio_pair_release(*bp);
564 *bp = NULL;
565 }
566
567 while (old_chain && (total < len)) {
568 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
569 if (!tmp)
570 goto err_out;
571
572 if (total + old_chain->bi_size > len) {
573 struct bio_pair *bp;
574
575 /*
576 * this split can only happen with a single paged bio,
577 * split_bio will BUG_ON if this is not the case
578 */
579 dout("bio_chain_clone split! total=%d remaining=%d"
580 "bi_size=%d\n",
581 (int)total, (int)len-total,
582 (int)old_chain->bi_size);
583
584 /* split the bio. We'll release it either in the next
585 call, or it will have to be released outside */
586 bp = bio_split(old_chain, (len - total) / 512ULL);
587 if (!bp)
588 goto err_out;
589
590 __bio_clone(tmp, &bp->bio1);
591
592 *next = &bp->bio2;
593 } else {
594 __bio_clone(tmp, old_chain);
595 *next = old_chain->bi_next;
596 }
597
598 tmp->bi_bdev = NULL;
599 gfpmask &= ~__GFP_WAIT;
600 tmp->bi_next = NULL;
601
602 if (!new_chain) {
603 new_chain = tail = tmp;
604 } else {
605 tail->bi_next = tmp;
606 tail = tmp;
607 }
608 old_chain = old_chain->bi_next;
609
610 total += tmp->bi_size;
611 }
612
613 BUG_ON(total < len);
614
615 if (tail)
616 tail->bi_next = NULL;
617
618 *old = old_chain;
619
620 return new_chain;
621
622 err_out:
623 dout("bio_chain_clone with err\n");
624 bio_chain_put(new_chain);
625 return NULL;
626 }
627
628 /*
629 * helpers for osd request op vectors.
630 */
631 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
632 int num_ops,
633 int opcode,
634 u32 payload_len)
635 {
636 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
637 GFP_NOIO);
638 if (!*ops)
639 return -ENOMEM;
640 (*ops)[0].op = opcode;
641 /*
642 * op extent offset and length will be set later on
643 * in calc_raw_layout()
644 */
645 (*ops)[0].payload_len = payload_len;
646 return 0;
647 }
648
649 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
650 {
651 kfree(ops);
652 }
653
654 /*
655 * Send ceph osd request
656 */
657 static int rbd_do_request(struct request *rq,
658 struct rbd_device *dev,
659 struct ceph_snap_context *snapc,
660 u64 snapid,
661 const char *obj, u64 ofs, u64 len,
662 struct bio *bio,
663 struct page **pages,
664 int num_pages,
665 int flags,
666 struct ceph_osd_req_op *ops,
667 int num_reply,
668 void (*rbd_cb)(struct ceph_osd_request *req,
669 struct ceph_msg *msg))
670 {
671 struct ceph_osd_request *req;
672 struct ceph_file_layout *layout;
673 int ret;
674 u64 bno;
675 struct timespec mtime = CURRENT_TIME;
676 struct rbd_request *req_data;
677 struct ceph_osd_request_head *reqhead;
678 struct rbd_image_header *header = &dev->header;
679
680 ret = -ENOMEM;
681 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
682 if (!req_data)
683 goto done;
684
685 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
686
687 down_read(&header->snap_rwsem);
688
689 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
690 snapc,
691 ops,
692 false,
693 GFP_NOIO, pages, bio);
694 if (IS_ERR(req)) {
695 up_read(&header->snap_rwsem);
696 ret = PTR_ERR(req);
697 goto done_pages;
698 }
699
700 req->r_callback = rbd_cb;
701
702 req_data->rq = rq;
703 req_data->bio = bio;
704 req_data->pages = pages;
705 req_data->len = len;
706
707 req->r_priv = req_data;
708
709 reqhead = req->r_request->front.iov_base;
710 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
711
712 strncpy(req->r_oid, obj, sizeof(req->r_oid));
713 req->r_oid_len = strlen(req->r_oid);
714
715 layout = &req->r_file_layout;
716 memset(layout, 0, sizeof(*layout));
717 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
718 layout->fl_stripe_count = cpu_to_le32(1);
719 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
720 layout->fl_pg_preferred = cpu_to_le32(-1);
721 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
722 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
723 ofs, &len, &bno, req, ops);
724
725 ceph_osdc_build_request(req, ofs, &len,
726 ops,
727 snapc,
728 &mtime,
729 req->r_oid, req->r_oid_len);
730 up_read(&header->snap_rwsem);
731
732 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
733 if (ret < 0)
734 goto done_err;
735
736 if (!rbd_cb) {
737 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
738 ceph_osdc_put_request(req);
739 }
740 return ret;
741
742 done_err:
743 bio_chain_put(req_data->bio);
744 ceph_osdc_put_request(req);
745 done_pages:
746 kfree(req_data);
747 done:
748 if (rq)
749 blk_end_request(rq, ret, len);
750 return ret;
751 }
752
753 /*
754 * Ceph osd op callback
755 */
756 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
757 {
758 struct rbd_request *req_data = req->r_priv;
759 struct ceph_osd_reply_head *replyhead;
760 struct ceph_osd_op *op;
761 __s32 rc;
762 u64 bytes;
763 int read_op;
764
765 /* parse reply */
766 replyhead = msg->front.iov_base;
767 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
768 op = (void *)(replyhead + 1);
769 rc = le32_to_cpu(replyhead->result);
770 bytes = le64_to_cpu(op->extent.length);
771 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
772
773 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
774
775 if (rc == -ENOENT && read_op) {
776 zero_bio_chain(req_data->bio, 0);
777 rc = 0;
778 } else if (rc == 0 && read_op && bytes < req_data->len) {
779 zero_bio_chain(req_data->bio, bytes);
780 bytes = req_data->len;
781 }
782
783 blk_end_request(req_data->rq, rc, bytes);
784
785 if (req_data->bio)
786 bio_chain_put(req_data->bio);
787
788 ceph_osdc_put_request(req);
789 kfree(req_data);
790 }
791
792 /*
793 * Do a synchronous ceph osd operation
794 */
795 static int rbd_req_sync_op(struct rbd_device *dev,
796 struct ceph_snap_context *snapc,
797 u64 snapid,
798 int opcode,
799 int flags,
800 struct ceph_osd_req_op *orig_ops,
801 int num_reply,
802 const char *obj,
803 u64 ofs, u64 len,
804 char *buf)
805 {
806 int ret;
807 struct page **pages;
808 int num_pages;
809 struct ceph_osd_req_op *ops = orig_ops;
810 u32 payload_len;
811
812 num_pages = calc_pages_for(ofs , len);
813 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
814 if (IS_ERR(pages))
815 return PTR_ERR(pages);
816
817 if (!orig_ops) {
818 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
819 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
820 if (ret < 0)
821 goto done;
822
823 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
824 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
825 if (ret < 0)
826 goto done_ops;
827 }
828 }
829
830 ret = rbd_do_request(NULL, dev, snapc, snapid,
831 obj, ofs, len, NULL,
832 pages, num_pages,
833 flags,
834 ops,
835 2,
836 NULL);
837 if (ret < 0)
838 goto done_ops;
839
840 if ((flags & CEPH_OSD_FLAG_READ) && buf)
841 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
842
843 done_ops:
844 if (!orig_ops)
845 rbd_destroy_ops(ops);
846 done:
847 ceph_release_page_vector(pages, num_pages);
848 return ret;
849 }
850
851 /*
852 * Do an asynchronous ceph osd operation
853 */
854 static int rbd_do_op(struct request *rq,
855 struct rbd_device *rbd_dev ,
856 struct ceph_snap_context *snapc,
857 u64 snapid,
858 int opcode, int flags, int num_reply,
859 u64 ofs, u64 len,
860 struct bio *bio)
861 {
862 char *seg_name;
863 u64 seg_ofs;
864 u64 seg_len;
865 int ret;
866 struct ceph_osd_req_op *ops;
867 u32 payload_len;
868
869 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
870 if (!seg_name)
871 return -ENOMEM;
872
873 seg_len = rbd_get_segment(&rbd_dev->header,
874 rbd_dev->header.block_name,
875 ofs, len,
876 seg_name, &seg_ofs);
877
878 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
879
880 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
881 if (ret < 0)
882 goto done;
883
884 /* we've taken care of segment sizes earlier when we
885 cloned the bios. We should never have a segment
886 truncated at this point */
887 BUG_ON(seg_len < len);
888
889 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
890 seg_name, seg_ofs, seg_len,
891 bio,
892 NULL, 0,
893 flags,
894 ops,
895 num_reply,
896 rbd_req_cb);
897 done:
898 kfree(seg_name);
899 return ret;
900 }
901
902 /*
903 * Request async osd write
904 */
905 static int rbd_req_write(struct request *rq,
906 struct rbd_device *rbd_dev,
907 struct ceph_snap_context *snapc,
908 u64 ofs, u64 len,
909 struct bio *bio)
910 {
911 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
912 CEPH_OSD_OP_WRITE,
913 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
914 2,
915 ofs, len, bio);
916 }
917
918 /*
919 * Request async osd read
920 */
921 static int rbd_req_read(struct request *rq,
922 struct rbd_device *rbd_dev,
923 u64 snapid,
924 u64 ofs, u64 len,
925 struct bio *bio)
926 {
927 return rbd_do_op(rq, rbd_dev, NULL,
928 (snapid ? snapid : CEPH_NOSNAP),
929 CEPH_OSD_OP_READ,
930 CEPH_OSD_FLAG_READ,
931 2,
932 ofs, len, bio);
933 }
934
935 /*
936 * Request sync osd read
937 */
938 static int rbd_req_sync_read(struct rbd_device *dev,
939 struct ceph_snap_context *snapc,
940 u64 snapid,
941 const char *obj,
942 u64 ofs, u64 len,
943 char *buf)
944 {
945 return rbd_req_sync_op(dev, NULL,
946 (snapid ? snapid : CEPH_NOSNAP),
947 CEPH_OSD_OP_READ,
948 CEPH_OSD_FLAG_READ,
949 NULL,
950 1, obj, ofs, len, buf);
951 }
952
953 /*
954 * Request sync osd read
955 */
956 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
957 u64 snapid,
958 const char *obj)
959 {
960 struct ceph_osd_req_op *ops;
961 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
962 if (ret < 0)
963 return ret;
964
965 ops[0].snap.snapid = snapid;
966
967 ret = rbd_req_sync_op(dev, NULL,
968 CEPH_NOSNAP,
969 0,
970 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
971 ops,
972 1, obj, 0, 0, NULL);
973
974 rbd_destroy_ops(ops);
975
976 if (ret < 0)
977 return ret;
978
979 return ret;
980 }
981
982 /*
983 * Request sync osd read
984 */
985 static int rbd_req_sync_exec(struct rbd_device *dev,
986 const char *obj,
987 const char *cls,
988 const char *method,
989 const char *data,
990 int len)
991 {
992 struct ceph_osd_req_op *ops;
993 int cls_len = strlen(cls);
994 int method_len = strlen(method);
995 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
996 cls_len + method_len + len);
997 if (ret < 0)
998 return ret;
999
1000 ops[0].cls.class_name = cls;
1001 ops[0].cls.class_len = (__u8)cls_len;
1002 ops[0].cls.method_name = method;
1003 ops[0].cls.method_len = (__u8)method_len;
1004 ops[0].cls.argc = 0;
1005 ops[0].cls.indata = data;
1006 ops[0].cls.indata_len = len;
1007
1008 ret = rbd_req_sync_op(dev, NULL,
1009 CEPH_NOSNAP,
1010 0,
1011 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1012 ops,
1013 1, obj, 0, 0, NULL);
1014
1015 rbd_destroy_ops(ops);
1016
1017 dout("cls_exec returned %d\n", ret);
1018 return ret;
1019 }
1020
1021 /*
1022 * block device queue callback
1023 */
1024 static void rbd_rq_fn(struct request_queue *q)
1025 {
1026 struct rbd_device *rbd_dev = q->queuedata;
1027 struct request *rq;
1028 struct bio_pair *bp = NULL;
1029
1030 rq = blk_fetch_request(q);
1031
1032 while (1) {
1033 struct bio *bio;
1034 struct bio *rq_bio, *next_bio = NULL;
1035 bool do_write;
1036 int size, op_size = 0;
1037 u64 ofs;
1038
1039 /* peek at request from block layer */
1040 if (!rq)
1041 break;
1042
1043 dout("fetched request\n");
1044
1045 /* filter out block requests we don't understand */
1046 if ((rq->cmd_type != REQ_TYPE_FS)) {
1047 __blk_end_request_all(rq, 0);
1048 goto next;
1049 }
1050
1051 /* deduce our operation (read, write) */
1052 do_write = (rq_data_dir(rq) == WRITE);
1053
1054 size = blk_rq_bytes(rq);
1055 ofs = blk_rq_pos(rq) * 512ULL;
1056 rq_bio = rq->bio;
1057 if (do_write && rbd_dev->read_only) {
1058 __blk_end_request_all(rq, -EROFS);
1059 goto next;
1060 }
1061
1062 spin_unlock_irq(q->queue_lock);
1063
1064 dout("%s 0x%x bytes at 0x%llx\n",
1065 do_write ? "write" : "read",
1066 size, blk_rq_pos(rq) * 512ULL);
1067
1068 do {
1069 /* a bio clone to be passed down to OSD req */
1070 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1071 op_size = rbd_get_segment(&rbd_dev->header,
1072 rbd_dev->header.block_name,
1073 ofs, size,
1074 NULL, NULL);
1075 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1076 op_size, GFP_ATOMIC);
1077 if (!bio) {
1078 spin_lock_irq(q->queue_lock);
1079 __blk_end_request_all(rq, -ENOMEM);
1080 goto next;
1081 }
1082
1083 /* init OSD command: write or read */
1084 if (do_write)
1085 rbd_req_write(rq, rbd_dev,
1086 rbd_dev->header.snapc,
1087 ofs,
1088 op_size, bio);
1089 else
1090 rbd_req_read(rq, rbd_dev,
1091 cur_snap_id(rbd_dev),
1092 ofs,
1093 op_size, bio);
1094
1095 size -= op_size;
1096 ofs += op_size;
1097
1098 rq_bio = next_bio;
1099 } while (size > 0);
1100
1101 if (bp)
1102 bio_pair_release(bp);
1103
1104 spin_lock_irq(q->queue_lock);
1105 next:
1106 rq = blk_fetch_request(q);
1107 }
1108 }
1109
1110 /*
1111 * a queue callback. Makes sure that we don't create a bio that spans across
1112 * multiple osd objects. One exception would be with a single page bios,
1113 * which we handle later at bio_chain_clone
1114 */
1115 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1116 struct bio_vec *bvec)
1117 {
1118 struct rbd_device *rbd_dev = q->queuedata;
1119 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1120 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1121 unsigned int bio_sectors = bmd->bi_size >> 9;
1122 int max;
1123
1124 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1125 + bio_sectors)) << 9;
1126 if (max < 0)
1127 max = 0; /* bio_add cannot handle a negative return */
1128 if (max <= bvec->bv_len && bio_sectors == 0)
1129 return bvec->bv_len;
1130 return max;
1131 }
1132
1133 static void rbd_free_disk(struct rbd_device *rbd_dev)
1134 {
1135 struct gendisk *disk = rbd_dev->disk;
1136
1137 if (!disk)
1138 return;
1139
1140 rbd_header_free(&rbd_dev->header);
1141
1142 if (disk->flags & GENHD_FL_UP)
1143 del_gendisk(disk);
1144 if (disk->queue)
1145 blk_cleanup_queue(disk->queue);
1146 put_disk(disk);
1147 }
1148
1149 /*
1150 * reload the ondisk the header
1151 */
1152 static int rbd_read_header(struct rbd_device *rbd_dev,
1153 struct rbd_image_header *header)
1154 {
1155 ssize_t rc;
1156 struct rbd_image_header_ondisk *dh;
1157 int snap_count = 0;
1158 u64 snap_names_len = 0;
1159
1160 while (1) {
1161 int len = sizeof(*dh) +
1162 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1163 snap_names_len;
1164
1165 rc = -ENOMEM;
1166 dh = kmalloc(len, GFP_KERNEL);
1167 if (!dh)
1168 return -ENOMEM;
1169
1170 rc = rbd_req_sync_read(rbd_dev,
1171 NULL, CEPH_NOSNAP,
1172 rbd_dev->obj_md_name,
1173 0, len,
1174 (char *)dh);
1175 if (rc < 0)
1176 goto out_dh;
1177
1178 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1179 if (rc < 0)
1180 goto out_dh;
1181
1182 if (snap_count != header->total_snaps) {
1183 snap_count = header->total_snaps;
1184 snap_names_len = header->snap_names_len;
1185 rbd_header_free(header);
1186 kfree(dh);
1187 continue;
1188 }
1189 break;
1190 }
1191
1192 out_dh:
1193 kfree(dh);
1194 return rc;
1195 }
1196
1197 /*
1198 * create a snapshot
1199 */
1200 static int rbd_header_add_snap(struct rbd_device *dev,
1201 const char *snap_name,
1202 gfp_t gfp_flags)
1203 {
1204 int name_len = strlen(snap_name);
1205 u64 new_snapid;
1206 int ret;
1207 void *data, *data_start, *data_end;
1208
1209 /* we should create a snapshot only if we're pointing at the head */
1210 if (dev->cur_snap)
1211 return -EINVAL;
1212
1213 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1214 &new_snapid);
1215 dout("created snapid=%lld\n", new_snapid);
1216 if (ret < 0)
1217 return ret;
1218
1219 data = kmalloc(name_len + 16, gfp_flags);
1220 if (!data)
1221 return -ENOMEM;
1222
1223 data_start = data;
1224 data_end = data + name_len + 16;
1225
1226 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1227 ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1228
1229 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1230 data_start, data - data_start);
1231
1232 kfree(data_start);
1233
1234 if (ret < 0)
1235 return ret;
1236
1237 dev->header.snapc->seq = new_snapid;
1238
1239 return 0;
1240 bad:
1241 return -ERANGE;
1242 }
1243
1244 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1245 {
1246 struct rbd_snap *snap;
1247
1248 while (!list_empty(&rbd_dev->snaps)) {
1249 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1250 __rbd_remove_snap_dev(rbd_dev, snap);
1251 }
1252 }
1253
1254 /*
1255 * only read the first part of the ondisk header, without the snaps info
1256 */
1257 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1258 {
1259 int ret;
1260 struct rbd_image_header h;
1261 u64 snap_seq;
1262
1263 ret = rbd_read_header(rbd_dev, &h);
1264 if (ret < 0)
1265 return ret;
1266
1267 down_write(&rbd_dev->header.snap_rwsem);
1268
1269 snap_seq = rbd_dev->header.snapc->seq;
1270
1271 kfree(rbd_dev->header.snapc);
1272 kfree(rbd_dev->header.snap_names);
1273 kfree(rbd_dev->header.snap_sizes);
1274
1275 rbd_dev->header.total_snaps = h.total_snaps;
1276 rbd_dev->header.snapc = h.snapc;
1277 rbd_dev->header.snap_names = h.snap_names;
1278 rbd_dev->header.snap_names_len = h.snap_names_len;
1279 rbd_dev->header.snap_sizes = h.snap_sizes;
1280 rbd_dev->header.snapc->seq = snap_seq;
1281
1282 ret = __rbd_init_snaps_header(rbd_dev);
1283
1284 up_write(&rbd_dev->header.snap_rwsem);
1285
1286 return ret;
1287 }
1288
1289 static int rbd_init_disk(struct rbd_device *rbd_dev)
1290 {
1291 struct gendisk *disk;
1292 struct request_queue *q;
1293 int rc;
1294 u64 total_size = 0;
1295
1296 /* contact OSD, request size info about the object being mapped */
1297 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1298 if (rc)
1299 return rc;
1300
1301 /* no need to lock here, as rbd_dev is not registered yet */
1302 rc = __rbd_init_snaps_header(rbd_dev);
1303 if (rc)
1304 return rc;
1305
1306 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1307 if (rc)
1308 return rc;
1309
1310 /* create gendisk info */
1311 rc = -ENOMEM;
1312 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1313 if (!disk)
1314 goto out;
1315
1316 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1317 disk->major = rbd_dev->major;
1318 disk->first_minor = 0;
1319 disk->fops = &rbd_bd_ops;
1320 disk->private_data = rbd_dev;
1321
1322 /* init rq */
1323 rc = -ENOMEM;
1324 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1325 if (!q)
1326 goto out_disk;
1327 blk_queue_merge_bvec(q, rbd_merge_bvec);
1328 disk->queue = q;
1329
1330 q->queuedata = rbd_dev;
1331
1332 rbd_dev->disk = disk;
1333 rbd_dev->q = q;
1334
1335 /* finally, announce the disk to the world */
1336 set_capacity(disk, total_size / 512ULL);
1337 add_disk(disk);
1338
1339 pr_info("%s: added with size 0x%llx\n",
1340 disk->disk_name, (unsigned long long)total_size);
1341 return 0;
1342
1343 out_disk:
1344 put_disk(disk);
1345 out:
1346 return rc;
1347 }
1348
1349 /*
1350 sysfs
1351 */
1352
1353 static ssize_t rbd_size_show(struct device *dev,
1354 struct device_attribute *attr, char *buf)
1355 {
1356 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1357
1358 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1359 }
1360
1361 static ssize_t rbd_major_show(struct device *dev,
1362 struct device_attribute *attr, char *buf)
1363 {
1364 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1365
1366 return sprintf(buf, "%d\n", rbd_dev->major);
1367 }
1368
1369 static ssize_t rbd_client_id_show(struct device *dev,
1370 struct device_attribute *attr, char *buf)
1371 {
1372 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1373
1374 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1375 }
1376
1377 static ssize_t rbd_pool_show(struct device *dev,
1378 struct device_attribute *attr, char *buf)
1379 {
1380 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1381
1382 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1383 }
1384
1385 static ssize_t rbd_name_show(struct device *dev,
1386 struct device_attribute *attr, char *buf)
1387 {
1388 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1389
1390 return sprintf(buf, "%s\n", rbd_dev->obj);
1391 }
1392
1393 static ssize_t rbd_snap_show(struct device *dev,
1394 struct device_attribute *attr,
1395 char *buf)
1396 {
1397 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1398
1399 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1400 }
1401
1402 static ssize_t rbd_image_refresh(struct device *dev,
1403 struct device_attribute *attr,
1404 const char *buf,
1405 size_t size)
1406 {
1407 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1408 int rc;
1409 int ret = size;
1410
1411 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1412
1413 rc = __rbd_update_snaps(rbd_dev);
1414 if (rc < 0)
1415 ret = rc;
1416
1417 mutex_unlock(&ctl_mutex);
1418 return ret;
1419 }
1420
1421 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1422 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1423 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1424 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1425 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1426 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1427 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1428 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1429 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1430
1431 static struct attribute *rbd_attrs[] = {
1432 &dev_attr_size.attr,
1433 &dev_attr_major.attr,
1434 &dev_attr_client_id.attr,
1435 &dev_attr_pool.attr,
1436 &dev_attr_name.attr,
1437 &dev_attr_current_snap.attr,
1438 &dev_attr_refresh.attr,
1439 &dev_attr_create_snap.attr,
1440 &dev_attr_rollback_snap.attr,
1441 NULL
1442 };
1443
1444 static struct attribute_group rbd_attr_group = {
1445 .attrs = rbd_attrs,
1446 };
1447
1448 static const struct attribute_group *rbd_attr_groups[] = {
1449 &rbd_attr_group,
1450 NULL
1451 };
1452
1453 static void rbd_sysfs_dev_release(struct device *dev)
1454 {
1455 }
1456
1457 static struct device_type rbd_device_type = {
1458 .name = "rbd",
1459 .groups = rbd_attr_groups,
1460 .release = rbd_sysfs_dev_release,
1461 };
1462
1463
1464 /*
1465 sysfs - snapshots
1466 */
1467
1468 static ssize_t rbd_snap_size_show(struct device *dev,
1469 struct device_attribute *attr,
1470 char *buf)
1471 {
1472 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1473
1474 return sprintf(buf, "%lld\n", (long long)snap->size);
1475 }
1476
1477 static ssize_t rbd_snap_id_show(struct device *dev,
1478 struct device_attribute *attr,
1479 char *buf)
1480 {
1481 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1482
1483 return sprintf(buf, "%lld\n", (long long)snap->id);
1484 }
1485
1486 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1487 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1488
1489 static struct attribute *rbd_snap_attrs[] = {
1490 &dev_attr_snap_size.attr,
1491 &dev_attr_snap_id.attr,
1492 NULL,
1493 };
1494
1495 static struct attribute_group rbd_snap_attr_group = {
1496 .attrs = rbd_snap_attrs,
1497 };
1498
1499 static void rbd_snap_dev_release(struct device *dev)
1500 {
1501 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1502 kfree(snap->name);
1503 kfree(snap);
1504 }
1505
1506 static const struct attribute_group *rbd_snap_attr_groups[] = {
1507 &rbd_snap_attr_group,
1508 NULL
1509 };
1510
1511 static struct device_type rbd_snap_device_type = {
1512 .groups = rbd_snap_attr_groups,
1513 .release = rbd_snap_dev_release,
1514 };
1515
1516 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1517 struct rbd_snap *snap)
1518 {
1519 list_del(&snap->node);
1520 device_unregister(&snap->dev);
1521 }
1522
1523 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1524 struct rbd_snap *snap,
1525 struct device *parent)
1526 {
1527 struct device *dev = &snap->dev;
1528 int ret;
1529
1530 dev->type = &rbd_snap_device_type;
1531 dev->parent = parent;
1532 dev->release = rbd_snap_dev_release;
1533 dev_set_name(dev, "snap_%s", snap->name);
1534 ret = device_register(dev);
1535
1536 return ret;
1537 }
1538
1539 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1540 int i, const char *name,
1541 struct rbd_snap **snapp)
1542 {
1543 int ret;
1544 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1545 if (!snap)
1546 return -ENOMEM;
1547 snap->name = kstrdup(name, GFP_KERNEL);
1548 snap->size = rbd_dev->header.snap_sizes[i];
1549 snap->id = rbd_dev->header.snapc->snaps[i];
1550 if (device_is_registered(&rbd_dev->dev)) {
1551 ret = rbd_register_snap_dev(rbd_dev, snap,
1552 &rbd_dev->dev);
1553 if (ret < 0)
1554 goto err;
1555 }
1556 *snapp = snap;
1557 return 0;
1558 err:
1559 kfree(snap->name);
1560 kfree(snap);
1561 return ret;
1562 }
1563
1564 /*
1565 * search for the previous snap in a null delimited string list
1566 */
1567 const char *rbd_prev_snap_name(const char *name, const char *start)
1568 {
1569 if (name < start + 2)
1570 return NULL;
1571
1572 name -= 2;
1573 while (*name) {
1574 if (name == start)
1575 return start;
1576 name--;
1577 }
1578 return name + 1;
1579 }
1580
1581 /*
1582 * compare the old list of snapshots that we have to what's in the header
1583 * and update it accordingly. Note that the header holds the snapshots
1584 * in a reverse order (from newest to oldest) and we need to go from
1585 * older to new so that we don't get a duplicate snap name when
1586 * doing the process (e.g., removed snapshot and recreated a new
1587 * one with the same name.
1588 */
1589 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1590 {
1591 const char *name, *first_name;
1592 int i = rbd_dev->header.total_snaps;
1593 struct rbd_snap *snap, *old_snap = NULL;
1594 int ret;
1595 struct list_head *p, *n;
1596
1597 first_name = rbd_dev->header.snap_names;
1598 name = first_name + rbd_dev->header.snap_names_len;
1599
1600 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1601 u64 cur_id;
1602
1603 old_snap = list_entry(p, struct rbd_snap, node);
1604
1605 if (i)
1606 cur_id = rbd_dev->header.snapc->snaps[i - 1];
1607
1608 if (!i || old_snap->id < cur_id) {
1609 /* old_snap->id was skipped, thus was removed */
1610 __rbd_remove_snap_dev(rbd_dev, old_snap);
1611 continue;
1612 }
1613 if (old_snap->id == cur_id) {
1614 /* we have this snapshot already */
1615 i--;
1616 name = rbd_prev_snap_name(name, first_name);
1617 continue;
1618 }
1619 for (; i > 0;
1620 i--, name = rbd_prev_snap_name(name, first_name)) {
1621 if (!name) {
1622 WARN_ON(1);
1623 return -EINVAL;
1624 }
1625 cur_id = rbd_dev->header.snapc->snaps[i];
1626 /* snapshot removal? handle it above */
1627 if (cur_id >= old_snap->id)
1628 break;
1629 /* a new snapshot */
1630 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1631 if (ret < 0)
1632 return ret;
1633
1634 /* note that we add it backward so using n and not p */
1635 list_add(&snap->node, n);
1636 p = &snap->node;
1637 }
1638 }
1639 /* we're done going over the old snap list, just add what's left */
1640 for (; i > 0; i--) {
1641 name = rbd_prev_snap_name(name, first_name);
1642 if (!name) {
1643 WARN_ON(1);
1644 return -EINVAL;
1645 }
1646 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1647 if (ret < 0)
1648 return ret;
1649 list_add(&snap->node, &rbd_dev->snaps);
1650 }
1651
1652 return 0;
1653 }
1654
1655
1656 static void rbd_root_dev_release(struct device *dev)
1657 {
1658 }
1659
1660 static struct device rbd_root_dev = {
1661 .init_name = "rbd",
1662 .release = rbd_root_dev_release,
1663 };
1664
1665 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
1666 {
1667 int ret = -ENOMEM;
1668 struct device *dev;
1669 struct rbd_snap *snap;
1670
1671 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1672 dev = &rbd_dev->dev;
1673
1674 dev->bus = &rbd_bus_type;
1675 dev->type = &rbd_device_type;
1676 dev->parent = &rbd_root_dev;
1677 dev->release = rbd_dev_release;
1678 dev_set_name(dev, "%d", rbd_dev->id);
1679 ret = device_register(dev);
1680 if (ret < 0)
1681 goto done_free;
1682
1683 list_for_each_entry(snap, &rbd_dev->snaps, node) {
1684 ret = rbd_register_snap_dev(rbd_dev, snap,
1685 &rbd_dev->dev);
1686 if (ret < 0)
1687 break;
1688 }
1689
1690 mutex_unlock(&ctl_mutex);
1691 return 0;
1692 done_free:
1693 mutex_unlock(&ctl_mutex);
1694 return ret;
1695 }
1696
1697 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1698 {
1699 device_unregister(&rbd_dev->dev);
1700 }
1701
1702 static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
1703 {
1704 struct ceph_osd_client *osdc;
1705 struct rbd_device *rbd_dev;
1706 ssize_t rc = -ENOMEM;
1707 int irc, new_id = 0;
1708 struct list_head *tmp;
1709 char *mon_dev_name;
1710 char *options;
1711
1712 if (!try_module_get(THIS_MODULE))
1713 return -ENODEV;
1714
1715 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1716 if (!mon_dev_name)
1717 goto err_out_mod;
1718
1719 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1720 if (!options)
1721 goto err_mon_dev;
1722
1723 /* new rbd_device object */
1724 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1725 if (!rbd_dev)
1726 goto err_out_opt;
1727
1728 /* static rbd_device initialization */
1729 spin_lock_init(&rbd_dev->lock);
1730 INIT_LIST_HEAD(&rbd_dev->node);
1731 INIT_LIST_HEAD(&rbd_dev->snaps);
1732
1733 /* generate unique id: find highest unique id, add one */
1734 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1735
1736 list_for_each(tmp, &rbd_dev_list) {
1737 struct rbd_device *rbd_dev;
1738
1739 rbd_dev = list_entry(tmp, struct rbd_device, node);
1740 if (rbd_dev->id >= new_id)
1741 new_id = rbd_dev->id + 1;
1742 }
1743
1744 rbd_dev->id = new_id;
1745
1746 /* add to global list */
1747 list_add_tail(&rbd_dev->node, &rbd_dev_list);
1748
1749 /* parse add command */
1750 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1751 "%" __stringify(RBD_MAX_OPT_LEN) "s "
1752 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1753 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1754 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1755 mon_dev_name, options, rbd_dev->pool_name,
1756 rbd_dev->obj, rbd_dev->snap_name) < 4) {
1757 rc = -EINVAL;
1758 goto err_out_slot;
1759 }
1760
1761 if (rbd_dev->snap_name[0] == 0)
1762 rbd_dev->snap_name[0] = '-';
1763
1764 rbd_dev->obj_len = strlen(rbd_dev->obj);
1765 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1766 rbd_dev->obj, RBD_SUFFIX);
1767
1768 /* initialize rest of new object */
1769 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1770 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1771 if (rc < 0)
1772 goto err_out_slot;
1773
1774 mutex_unlock(&ctl_mutex);
1775
1776 /* pick the pool */
1777 osdc = &rbd_dev->client->osdc;
1778 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1779 if (rc < 0)
1780 goto err_out_client;
1781 rbd_dev->poolid = rc;
1782
1783 /* register our block device */
1784 irc = register_blkdev(0, rbd_dev->name);
1785 if (irc < 0) {
1786 rc = irc;
1787 goto err_out_client;
1788 }
1789 rbd_dev->major = irc;
1790
1791 rc = rbd_bus_add_dev(rbd_dev);
1792 if (rc)
1793 goto err_out_disk;
1794 /* set up and announce blkdev mapping */
1795 rc = rbd_init_disk(rbd_dev);
1796 if (rc)
1797 goto err_out_blkdev;
1798
1799 return count;
1800
1801 err_out_blkdev:
1802 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1803 err_out_disk:
1804 rbd_free_disk(rbd_dev);
1805 err_out_client:
1806 rbd_put_client(rbd_dev);
1807 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1808 err_out_slot:
1809 list_del_init(&rbd_dev->node);
1810 mutex_unlock(&ctl_mutex);
1811
1812 kfree(rbd_dev);
1813 err_out_opt:
1814 kfree(options);
1815 err_mon_dev:
1816 kfree(mon_dev_name);
1817 err_out_mod:
1818 dout("Error adding device %s\n", buf);
1819 module_put(THIS_MODULE);
1820 return rc;
1821 }
1822
1823 static struct rbd_device *__rbd_get_dev(unsigned long id)
1824 {
1825 struct list_head *tmp;
1826 struct rbd_device *rbd_dev;
1827
1828 list_for_each(tmp, &rbd_dev_list) {
1829 rbd_dev = list_entry(tmp, struct rbd_device, node);
1830 if (rbd_dev->id == id)
1831 return rbd_dev;
1832 }
1833 return NULL;
1834 }
1835
1836 static void rbd_dev_release(struct device *dev)
1837 {
1838 struct rbd_device *rbd_dev =
1839 container_of(dev, struct rbd_device, dev);
1840
1841 rbd_put_client(rbd_dev);
1842
1843 /* clean up and free blkdev */
1844 rbd_free_disk(rbd_dev);
1845 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1846 kfree(rbd_dev);
1847
1848 /* release module ref */
1849 module_put(THIS_MODULE);
1850 }
1851
1852 static ssize_t rbd_remove(struct bus_type *bus,
1853 const char *buf,
1854 size_t count)
1855 {
1856 struct rbd_device *rbd_dev = NULL;
1857 int target_id, rc;
1858 unsigned long ul;
1859 int ret = count;
1860
1861 rc = strict_strtoul(buf, 10, &ul);
1862 if (rc)
1863 return rc;
1864
1865 /* convert to int; abort if we lost anything in the conversion */
1866 target_id = (int) ul;
1867 if (target_id != ul)
1868 return -EINVAL;
1869
1870 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1871
1872 rbd_dev = __rbd_get_dev(target_id);
1873 if (!rbd_dev) {
1874 ret = -ENOENT;
1875 goto done;
1876 }
1877
1878 list_del_init(&rbd_dev->node);
1879
1880 __rbd_remove_all_snaps(rbd_dev);
1881 rbd_bus_del_dev(rbd_dev);
1882
1883 done:
1884 mutex_unlock(&ctl_mutex);
1885 return ret;
1886 }
1887
1888 static ssize_t rbd_snap_add(struct device *dev,
1889 struct device_attribute *attr,
1890 const char *buf,
1891 size_t count)
1892 {
1893 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1894 int ret;
1895 char *name = kmalloc(count + 1, GFP_KERNEL);
1896 if (!name)
1897 return -ENOMEM;
1898
1899 snprintf(name, count, "%s", buf);
1900
1901 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1902
1903 ret = rbd_header_add_snap(rbd_dev,
1904 name, GFP_KERNEL);
1905 if (ret < 0)
1906 goto done_unlock;
1907
1908 ret = __rbd_update_snaps(rbd_dev);
1909 if (ret < 0)
1910 goto done_unlock;
1911
1912 ret = count;
1913 done_unlock:
1914 mutex_unlock(&ctl_mutex);
1915 kfree(name);
1916 return ret;
1917 }
1918
1919 static ssize_t rbd_snap_rollback(struct device *dev,
1920 struct device_attribute *attr,
1921 const char *buf,
1922 size_t count)
1923 {
1924 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1925 int ret;
1926 u64 snapid;
1927 u64 cur_ofs;
1928 char *seg_name = NULL;
1929 char *snap_name = kmalloc(count + 1, GFP_KERNEL);
1930 ret = -ENOMEM;
1931 if (!snap_name)
1932 return ret;
1933
1934 /* parse snaps add command */
1935 snprintf(snap_name, count, "%s", buf);
1936 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1937 if (!seg_name)
1938 goto done;
1939
1940 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1941
1942 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1943 if (ret < 0)
1944 goto done_unlock;
1945
1946 dout("snapid=%lld\n", snapid);
1947
1948 cur_ofs = 0;
1949 while (cur_ofs < rbd_dev->header.image_size) {
1950 cur_ofs += rbd_get_segment(&rbd_dev->header,
1951 rbd_dev->obj,
1952 cur_ofs, (u64)-1,
1953 seg_name, NULL);
1954 dout("seg_name=%s\n", seg_name);
1955
1956 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1957 if (ret < 0)
1958 pr_warning("could not roll back obj %s err=%d\n",
1959 seg_name, ret);
1960 }
1961
1962 ret = __rbd_update_snaps(rbd_dev);
1963 if (ret < 0)
1964 goto done_unlock;
1965
1966 ret = count;
1967
1968 done_unlock:
1969 mutex_unlock(&ctl_mutex);
1970 done:
1971 kfree(seg_name);
1972 kfree(snap_name);
1973
1974 return ret;
1975 }
1976
1977 static struct bus_attribute rbd_bus_attrs[] = {
1978 __ATTR(add, S_IWUSR, NULL, rbd_add),
1979 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
1980 __ATTR_NULL
1981 };
1982
1983 /*
1984 * create control files in sysfs
1985 * /sys/bus/rbd/...
1986 */
1987 static int rbd_sysfs_init(void)
1988 {
1989 int ret;
1990
1991 rbd_bus_type.bus_attrs = rbd_bus_attrs;
1992
1993 ret = bus_register(&rbd_bus_type);
1994 if (ret < 0)
1995 return ret;
1996
1997 ret = device_register(&rbd_root_dev);
1998
1999 return ret;
2000 }
2001
2002 static void rbd_sysfs_cleanup(void)
2003 {
2004 device_unregister(&rbd_root_dev);
2005 bus_unregister(&rbd_bus_type);
2006 }
2007
2008 int __init rbd_init(void)
2009 {
2010 int rc;
2011
2012 rc = rbd_sysfs_init();
2013 if (rc)
2014 return rc;
2015 spin_lock_init(&node_lock);
2016 pr_info("loaded " DRV_NAME_LONG "\n");
2017 return 0;
2018 }
2019
2020 void __exit rbd_exit(void)
2021 {
2022 rbd_sysfs_cleanup();
2023 }
2024
2025 module_init(rbd_init);
2026 module_exit(rbd_exit);
2027
2028 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2029 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2030 MODULE_DESCRIPTION("rados block device");
2031
2032 /* following authorship retained from original osdblk.c */
2033 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2034
2035 MODULE_LICENSE("GPL");