Merge tag 'renesas-soc-r8a7790-for-v3.10' of git://git.kernel.org/pub/scm/linux/kerne...
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
45
46 /*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have these defined elsewhere */
56
57 #define U8_MAX ((u8) (~0U))
58 #define U16_MAX ((u16) (~0U))
59 #define U32_MAX ((u32) (~0U))
60 #define U64_MAX ((u64) (~0ULL))
61
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
64
65 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
66
67 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
71 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
72
73 #define RBD_SNAP_HEAD_NAME "-"
74
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX 64
78
79 #define RBD_OBJ_PREFIX_LEN_MAX 64
80
81 /* Feature bits */
82
83 #define RBD_FEATURE_LAYERING 1
84
85 /* Features supported by this (client software) implementation. */
86
87 #define RBD_FEATURES_ALL (0)
88
89 /*
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
94 */
95 #define DEV_NAME_LEN 32
96 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
97
98 /*
99 * block device image metadata (in-memory version)
100 */
101 struct rbd_image_header {
102 /* These four fields never change for a given rbd image */
103 char *object_prefix;
104 u64 features;
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
108
109 /* The remaining fields need to be updated occasionally */
110 u64 image_size;
111 struct ceph_snap_context *snapc;
112 char *snap_names;
113 u64 *snap_sizes;
114
115 u64 obj_version;
116 };
117
118 /*
119 * An rbd image specification.
120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
124 *
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
129 *
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
135 *
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
139 *
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
142 */
143 struct rbd_spec {
144 u64 pool_id;
145 char *pool_name;
146
147 char *image_id;
148 char *image_name;
149
150 u64 snap_id;
151 char *snap_name;
152
153 struct kref kref;
154 };
155
156 /*
157 * an instance of the client. multiple devices may share an rbd client.
158 */
159 struct rbd_client {
160 struct ceph_client *client;
161 struct kref kref;
162 struct list_head node;
163 };
164
165 struct rbd_img_request;
166 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
169
170 struct rbd_obj_request;
171 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173 enum obj_request_type {
174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175 };
176
177 struct rbd_obj_request {
178 const char *object_name;
179 u64 offset; /* object start byte */
180 u64 length; /* bytes from offset */
181
182 struct rbd_img_request *img_request;
183 struct list_head links; /* img_request->obj_requests */
184 u32 which; /* posn image request list */
185
186 enum obj_request_type type;
187 union {
188 struct bio *bio_list;
189 struct {
190 struct page **pages;
191 u32 page_count;
192 };
193 };
194
195 struct ceph_osd_request *osd_req;
196
197 u64 xferred; /* bytes transferred */
198 u64 version;
199 int result;
200 atomic_t done;
201
202 rbd_obj_callback_t callback;
203 struct completion completion;
204
205 struct kref kref;
206 };
207
208 struct rbd_img_request {
209 struct request *rq;
210 struct rbd_device *rbd_dev;
211 u64 offset; /* starting image byte offset */
212 u64 length; /* byte count from offset */
213 bool write_request; /* false for read */
214 union {
215 struct ceph_snap_context *snapc; /* for writes */
216 u64 snap_id; /* for reads */
217 };
218 spinlock_t completion_lock;/* protects next_completion */
219 u32 next_completion;
220 rbd_img_callback_t callback;
221
222 u32 obj_request_count;
223 struct list_head obj_requests; /* rbd_obj_request structs */
224
225 struct kref kref;
226 };
227
228 #define for_each_obj_request(ireq, oreq) \
229 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
230 #define for_each_obj_request_from(ireq, oreq) \
231 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
232 #define for_each_obj_request_safe(ireq, oreq, n) \
233 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
234
235 struct rbd_snap {
236 struct device dev;
237 const char *name;
238 u64 size;
239 struct list_head node;
240 u64 id;
241 u64 features;
242 };
243
244 struct rbd_mapping {
245 u64 size;
246 u64 features;
247 bool read_only;
248 };
249
250 /*
251 * a single device
252 */
253 struct rbd_device {
254 int dev_id; /* blkdev unique id */
255
256 int major; /* blkdev assigned major */
257 struct gendisk *disk; /* blkdev's gendisk and rq */
258
259 u32 image_format; /* Either 1 or 2 */
260 struct rbd_client *rbd_client;
261
262 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
264 spinlock_t lock; /* queue, flags, open_count */
265
266 struct rbd_image_header header;
267 unsigned long flags; /* possibly lock protected */
268 struct rbd_spec *spec;
269
270 char *header_name;
271
272 struct ceph_file_layout layout;
273
274 struct ceph_osd_event *watch_event;
275 struct rbd_obj_request *watch_request;
276
277 struct rbd_spec *parent_spec;
278 u64 parent_overlap;
279
280 /* protects updating the header */
281 struct rw_semaphore header_rwsem;
282
283 struct rbd_mapping mapping;
284
285 struct list_head node;
286
287 /* list of snapshots */
288 struct list_head snaps;
289
290 /* sysfs related */
291 struct device dev;
292 unsigned long open_count; /* protected by lock */
293 };
294
295 /*
296 * Flag bits for rbd_dev->flags. If atomicity is required,
297 * rbd_dev->lock is used to protect access.
298 *
299 * Currently, only the "removing" flag (which is coupled with the
300 * "open_count" field) requires atomic access.
301 */
302 enum rbd_dev_flags {
303 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
304 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
305 };
306
307 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
308
309 static LIST_HEAD(rbd_dev_list); /* devices */
310 static DEFINE_SPINLOCK(rbd_dev_list_lock);
311
312 static LIST_HEAD(rbd_client_list); /* clients */
313 static DEFINE_SPINLOCK(rbd_client_list_lock);
314
315 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
316 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
317
318 static void rbd_dev_release(struct device *dev);
319 static void rbd_remove_snap_dev(struct rbd_snap *snap);
320
321 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
322 size_t count);
323 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
324 size_t count);
325
326 static struct bus_attribute rbd_bus_attrs[] = {
327 __ATTR(add, S_IWUSR, NULL, rbd_add),
328 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
329 __ATTR_NULL
330 };
331
332 static struct bus_type rbd_bus_type = {
333 .name = "rbd",
334 .bus_attrs = rbd_bus_attrs,
335 };
336
337 static void rbd_root_dev_release(struct device *dev)
338 {
339 }
340
341 static struct device rbd_root_dev = {
342 .init_name = "rbd",
343 .release = rbd_root_dev_release,
344 };
345
346 static __printf(2, 3)
347 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
348 {
349 struct va_format vaf;
350 va_list args;
351
352 va_start(args, fmt);
353 vaf.fmt = fmt;
354 vaf.va = &args;
355
356 if (!rbd_dev)
357 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
358 else if (rbd_dev->disk)
359 printk(KERN_WARNING "%s: %s: %pV\n",
360 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
361 else if (rbd_dev->spec && rbd_dev->spec->image_name)
362 printk(KERN_WARNING "%s: image %s: %pV\n",
363 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
364 else if (rbd_dev->spec && rbd_dev->spec->image_id)
365 printk(KERN_WARNING "%s: id %s: %pV\n",
366 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
367 else /* punt */
368 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
369 RBD_DRV_NAME, rbd_dev, &vaf);
370 va_end(args);
371 }
372
373 #ifdef RBD_DEBUG
374 #define rbd_assert(expr) \
375 if (unlikely(!(expr))) { \
376 printk(KERN_ERR "\nAssertion failure in %s() " \
377 "at line %d:\n\n" \
378 "\trbd_assert(%s);\n\n", \
379 __func__, __LINE__, #expr); \
380 BUG(); \
381 }
382 #else /* !RBD_DEBUG */
383 # define rbd_assert(expr) ((void) 0)
384 #endif /* !RBD_DEBUG */
385
386 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
387 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
388
389 static int rbd_open(struct block_device *bdev, fmode_t mode)
390 {
391 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
392 bool removing = false;
393
394 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
395 return -EROFS;
396
397 spin_lock_irq(&rbd_dev->lock);
398 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
399 removing = true;
400 else
401 rbd_dev->open_count++;
402 spin_unlock_irq(&rbd_dev->lock);
403 if (removing)
404 return -ENOENT;
405
406 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
407 (void) get_device(&rbd_dev->dev);
408 set_device_ro(bdev, rbd_dev->mapping.read_only);
409 mutex_unlock(&ctl_mutex);
410
411 return 0;
412 }
413
414 static int rbd_release(struct gendisk *disk, fmode_t mode)
415 {
416 struct rbd_device *rbd_dev = disk->private_data;
417 unsigned long open_count_before;
418
419 spin_lock_irq(&rbd_dev->lock);
420 open_count_before = rbd_dev->open_count--;
421 spin_unlock_irq(&rbd_dev->lock);
422 rbd_assert(open_count_before > 0);
423
424 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
425 put_device(&rbd_dev->dev);
426 mutex_unlock(&ctl_mutex);
427
428 return 0;
429 }
430
431 static const struct block_device_operations rbd_bd_ops = {
432 .owner = THIS_MODULE,
433 .open = rbd_open,
434 .release = rbd_release,
435 };
436
437 /*
438 * Initialize an rbd client instance.
439 * We own *ceph_opts.
440 */
441 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
442 {
443 struct rbd_client *rbdc;
444 int ret = -ENOMEM;
445
446 dout("%s:\n", __func__);
447 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
448 if (!rbdc)
449 goto out_opt;
450
451 kref_init(&rbdc->kref);
452 INIT_LIST_HEAD(&rbdc->node);
453
454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455
456 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
457 if (IS_ERR(rbdc->client))
458 goto out_mutex;
459 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
460
461 ret = ceph_open_session(rbdc->client);
462 if (ret < 0)
463 goto out_err;
464
465 spin_lock(&rbd_client_list_lock);
466 list_add_tail(&rbdc->node, &rbd_client_list);
467 spin_unlock(&rbd_client_list_lock);
468
469 mutex_unlock(&ctl_mutex);
470 dout("%s: rbdc %p\n", __func__, rbdc);
471
472 return rbdc;
473
474 out_err:
475 ceph_destroy_client(rbdc->client);
476 out_mutex:
477 mutex_unlock(&ctl_mutex);
478 kfree(rbdc);
479 out_opt:
480 if (ceph_opts)
481 ceph_destroy_options(ceph_opts);
482 dout("%s: error %d\n", __func__, ret);
483
484 return ERR_PTR(ret);
485 }
486
487 /*
488 * Find a ceph client with specific addr and configuration. If
489 * found, bump its reference count.
490 */
491 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
492 {
493 struct rbd_client *client_node;
494 bool found = false;
495
496 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
497 return NULL;
498
499 spin_lock(&rbd_client_list_lock);
500 list_for_each_entry(client_node, &rbd_client_list, node) {
501 if (!ceph_compare_options(ceph_opts, client_node->client)) {
502 kref_get(&client_node->kref);
503 found = true;
504 break;
505 }
506 }
507 spin_unlock(&rbd_client_list_lock);
508
509 return found ? client_node : NULL;
510 }
511
512 /*
513 * mount options
514 */
515 enum {
516 Opt_last_int,
517 /* int args above */
518 Opt_last_string,
519 /* string args above */
520 Opt_read_only,
521 Opt_read_write,
522 /* Boolean args above */
523 Opt_last_bool,
524 };
525
526 static match_table_t rbd_opts_tokens = {
527 /* int args above */
528 /* string args above */
529 {Opt_read_only, "read_only"},
530 {Opt_read_only, "ro"}, /* Alternate spelling */
531 {Opt_read_write, "read_write"},
532 {Opt_read_write, "rw"}, /* Alternate spelling */
533 /* Boolean args above */
534 {-1, NULL}
535 };
536
537 struct rbd_options {
538 bool read_only;
539 };
540
541 #define RBD_READ_ONLY_DEFAULT false
542
543 static int parse_rbd_opts_token(char *c, void *private)
544 {
545 struct rbd_options *rbd_opts = private;
546 substring_t argstr[MAX_OPT_ARGS];
547 int token, intval, ret;
548
549 token = match_token(c, rbd_opts_tokens, argstr);
550 if (token < 0)
551 return -EINVAL;
552
553 if (token < Opt_last_int) {
554 ret = match_int(&argstr[0], &intval);
555 if (ret < 0) {
556 pr_err("bad mount option arg (not int) "
557 "at '%s'\n", c);
558 return ret;
559 }
560 dout("got int token %d val %d\n", token, intval);
561 } else if (token > Opt_last_int && token < Opt_last_string) {
562 dout("got string token %d val %s\n", token,
563 argstr[0].from);
564 } else if (token > Opt_last_string && token < Opt_last_bool) {
565 dout("got Boolean token %d\n", token);
566 } else {
567 dout("got token %d\n", token);
568 }
569
570 switch (token) {
571 case Opt_read_only:
572 rbd_opts->read_only = true;
573 break;
574 case Opt_read_write:
575 rbd_opts->read_only = false;
576 break;
577 default:
578 rbd_assert(false);
579 break;
580 }
581 return 0;
582 }
583
584 /*
585 * Get a ceph client with specific addr and configuration, if one does
586 * not exist create it.
587 */
588 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
589 {
590 struct rbd_client *rbdc;
591
592 rbdc = rbd_client_find(ceph_opts);
593 if (rbdc) /* using an existing client */
594 ceph_destroy_options(ceph_opts);
595 else
596 rbdc = rbd_client_create(ceph_opts);
597
598 return rbdc;
599 }
600
601 /*
602 * Destroy ceph client
603 *
604 * Caller must hold rbd_client_list_lock.
605 */
606 static void rbd_client_release(struct kref *kref)
607 {
608 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
609
610 dout("%s: rbdc %p\n", __func__, rbdc);
611 spin_lock(&rbd_client_list_lock);
612 list_del(&rbdc->node);
613 spin_unlock(&rbd_client_list_lock);
614
615 ceph_destroy_client(rbdc->client);
616 kfree(rbdc);
617 }
618
619 /*
620 * Drop reference to ceph client node. If it's not referenced anymore, release
621 * it.
622 */
623 static void rbd_put_client(struct rbd_client *rbdc)
624 {
625 if (rbdc)
626 kref_put(&rbdc->kref, rbd_client_release);
627 }
628
629 static bool rbd_image_format_valid(u32 image_format)
630 {
631 return image_format == 1 || image_format == 2;
632 }
633
634 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
635 {
636 size_t size;
637 u32 snap_count;
638
639 /* The header has to start with the magic rbd header text */
640 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
641 return false;
642
643 /* The bio layer requires at least sector-sized I/O */
644
645 if (ondisk->options.order < SECTOR_SHIFT)
646 return false;
647
648 /* If we use u64 in a few spots we may be able to loosen this */
649
650 if (ondisk->options.order > 8 * sizeof (int) - 1)
651 return false;
652
653 /*
654 * The size of a snapshot header has to fit in a size_t, and
655 * that limits the number of snapshots.
656 */
657 snap_count = le32_to_cpu(ondisk->snap_count);
658 size = SIZE_MAX - sizeof (struct ceph_snap_context);
659 if (snap_count > size / sizeof (__le64))
660 return false;
661
662 /*
663 * Not only that, but the size of the entire the snapshot
664 * header must also be representable in a size_t.
665 */
666 size -= snap_count * sizeof (__le64);
667 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
668 return false;
669
670 return true;
671 }
672
673 /*
674 * Create a new header structure, translate header format from the on-disk
675 * header.
676 */
677 static int rbd_header_from_disk(struct rbd_image_header *header,
678 struct rbd_image_header_ondisk *ondisk)
679 {
680 u32 snap_count;
681 size_t len;
682 size_t size;
683 u32 i;
684
685 memset(header, 0, sizeof (*header));
686
687 snap_count = le32_to_cpu(ondisk->snap_count);
688
689 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
690 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
691 if (!header->object_prefix)
692 return -ENOMEM;
693 memcpy(header->object_prefix, ondisk->object_prefix, len);
694 header->object_prefix[len] = '\0';
695
696 if (snap_count) {
697 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
698
699 /* Save a copy of the snapshot names */
700
701 if (snap_names_len > (u64) SIZE_MAX)
702 return -EIO;
703 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
704 if (!header->snap_names)
705 goto out_err;
706 /*
707 * Note that rbd_dev_v1_header_read() guarantees
708 * the ondisk buffer we're working with has
709 * snap_names_len bytes beyond the end of the
710 * snapshot id array, this memcpy() is safe.
711 */
712 memcpy(header->snap_names, &ondisk->snaps[snap_count],
713 snap_names_len);
714
715 /* Record each snapshot's size */
716
717 size = snap_count * sizeof (*header->snap_sizes);
718 header->snap_sizes = kmalloc(size, GFP_KERNEL);
719 if (!header->snap_sizes)
720 goto out_err;
721 for (i = 0; i < snap_count; i++)
722 header->snap_sizes[i] =
723 le64_to_cpu(ondisk->snaps[i].image_size);
724 } else {
725 WARN_ON(ondisk->snap_names_len);
726 header->snap_names = NULL;
727 header->snap_sizes = NULL;
728 }
729
730 header->features = 0; /* No features support in v1 images */
731 header->obj_order = ondisk->options.order;
732 header->crypt_type = ondisk->options.crypt_type;
733 header->comp_type = ondisk->options.comp_type;
734
735 /* Allocate and fill in the snapshot context */
736
737 header->image_size = le64_to_cpu(ondisk->image_size);
738 size = sizeof (struct ceph_snap_context);
739 size += snap_count * sizeof (header->snapc->snaps[0]);
740 header->snapc = kzalloc(size, GFP_KERNEL);
741 if (!header->snapc)
742 goto out_err;
743
744 atomic_set(&header->snapc->nref, 1);
745 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
746 header->snapc->num_snaps = snap_count;
747 for (i = 0; i < snap_count; i++)
748 header->snapc->snaps[i] =
749 le64_to_cpu(ondisk->snaps[i].id);
750
751 return 0;
752
753 out_err:
754 kfree(header->snap_sizes);
755 header->snap_sizes = NULL;
756 kfree(header->snap_names);
757 header->snap_names = NULL;
758 kfree(header->object_prefix);
759 header->object_prefix = NULL;
760
761 return -ENOMEM;
762 }
763
764 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
765 {
766 struct rbd_snap *snap;
767
768 if (snap_id == CEPH_NOSNAP)
769 return RBD_SNAP_HEAD_NAME;
770
771 list_for_each_entry(snap, &rbd_dev->snaps, node)
772 if (snap_id == snap->id)
773 return snap->name;
774
775 return NULL;
776 }
777
778 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
779 {
780
781 struct rbd_snap *snap;
782
783 list_for_each_entry(snap, &rbd_dev->snaps, node) {
784 if (!strcmp(snap_name, snap->name)) {
785 rbd_dev->spec->snap_id = snap->id;
786 rbd_dev->mapping.size = snap->size;
787 rbd_dev->mapping.features = snap->features;
788
789 return 0;
790 }
791 }
792
793 return -ENOENT;
794 }
795
796 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
797 {
798 int ret;
799
800 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
801 sizeof (RBD_SNAP_HEAD_NAME))) {
802 rbd_dev->spec->snap_id = CEPH_NOSNAP;
803 rbd_dev->mapping.size = rbd_dev->header.image_size;
804 rbd_dev->mapping.features = rbd_dev->header.features;
805 ret = 0;
806 } else {
807 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
808 if (ret < 0)
809 goto done;
810 rbd_dev->mapping.read_only = true;
811 }
812 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
813
814 done:
815 return ret;
816 }
817
818 static void rbd_header_free(struct rbd_image_header *header)
819 {
820 kfree(header->object_prefix);
821 header->object_prefix = NULL;
822 kfree(header->snap_sizes);
823 header->snap_sizes = NULL;
824 kfree(header->snap_names);
825 header->snap_names = NULL;
826 ceph_put_snap_context(header->snapc);
827 header->snapc = NULL;
828 }
829
830 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
831 {
832 char *name;
833 u64 segment;
834 int ret;
835
836 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
837 if (!name)
838 return NULL;
839 segment = offset >> rbd_dev->header.obj_order;
840 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
841 rbd_dev->header.object_prefix, segment);
842 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
843 pr_err("error formatting segment name for #%llu (%d)\n",
844 segment, ret);
845 kfree(name);
846 name = NULL;
847 }
848
849 return name;
850 }
851
852 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
853 {
854 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
855
856 return offset & (segment_size - 1);
857 }
858
859 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
860 u64 offset, u64 length)
861 {
862 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
863
864 offset &= segment_size - 1;
865
866 rbd_assert(length <= U64_MAX - offset);
867 if (offset + length > segment_size)
868 length = segment_size - offset;
869
870 return length;
871 }
872
873 /*
874 * returns the size of an object in the image
875 */
876 static u64 rbd_obj_bytes(struct rbd_image_header *header)
877 {
878 return 1 << header->obj_order;
879 }
880
881 /*
882 * bio helpers
883 */
884
885 static void bio_chain_put(struct bio *chain)
886 {
887 struct bio *tmp;
888
889 while (chain) {
890 tmp = chain;
891 chain = chain->bi_next;
892 bio_put(tmp);
893 }
894 }
895
896 /*
897 * zeros a bio chain, starting at specific offset
898 */
899 static void zero_bio_chain(struct bio *chain, int start_ofs)
900 {
901 struct bio_vec *bv;
902 unsigned long flags;
903 void *buf;
904 int i;
905 int pos = 0;
906
907 while (chain) {
908 bio_for_each_segment(bv, chain, i) {
909 if (pos + bv->bv_len > start_ofs) {
910 int remainder = max(start_ofs - pos, 0);
911 buf = bvec_kmap_irq(bv, &flags);
912 memset(buf + remainder, 0,
913 bv->bv_len - remainder);
914 bvec_kunmap_irq(buf, &flags);
915 }
916 pos += bv->bv_len;
917 }
918
919 chain = chain->bi_next;
920 }
921 }
922
923 /*
924 * Clone a portion of a bio, starting at the given byte offset
925 * and continuing for the number of bytes indicated.
926 */
927 static struct bio *bio_clone_range(struct bio *bio_src,
928 unsigned int offset,
929 unsigned int len,
930 gfp_t gfpmask)
931 {
932 struct bio_vec *bv;
933 unsigned int resid;
934 unsigned short idx;
935 unsigned int voff;
936 unsigned short end_idx;
937 unsigned short vcnt;
938 struct bio *bio;
939
940 /* Handle the easy case for the caller */
941
942 if (!offset && len == bio_src->bi_size)
943 return bio_clone(bio_src, gfpmask);
944
945 if (WARN_ON_ONCE(!len))
946 return NULL;
947 if (WARN_ON_ONCE(len > bio_src->bi_size))
948 return NULL;
949 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
950 return NULL;
951
952 /* Find first affected segment... */
953
954 resid = offset;
955 __bio_for_each_segment(bv, bio_src, idx, 0) {
956 if (resid < bv->bv_len)
957 break;
958 resid -= bv->bv_len;
959 }
960 voff = resid;
961
962 /* ...and the last affected segment */
963
964 resid += len;
965 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
966 if (resid <= bv->bv_len)
967 break;
968 resid -= bv->bv_len;
969 }
970 vcnt = end_idx - idx + 1;
971
972 /* Build the clone */
973
974 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
975 if (!bio)
976 return NULL; /* ENOMEM */
977
978 bio->bi_bdev = bio_src->bi_bdev;
979 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
980 bio->bi_rw = bio_src->bi_rw;
981 bio->bi_flags |= 1 << BIO_CLONED;
982
983 /*
984 * Copy over our part of the bio_vec, then update the first
985 * and last (or only) entries.
986 */
987 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
988 vcnt * sizeof (struct bio_vec));
989 bio->bi_io_vec[0].bv_offset += voff;
990 if (vcnt > 1) {
991 bio->bi_io_vec[0].bv_len -= voff;
992 bio->bi_io_vec[vcnt - 1].bv_len = resid;
993 } else {
994 bio->bi_io_vec[0].bv_len = len;
995 }
996
997 bio->bi_vcnt = vcnt;
998 bio->bi_size = len;
999 bio->bi_idx = 0;
1000
1001 return bio;
1002 }
1003
1004 /*
1005 * Clone a portion of a bio chain, starting at the given byte offset
1006 * into the first bio in the source chain and continuing for the
1007 * number of bytes indicated. The result is another bio chain of
1008 * exactly the given length, or a null pointer on error.
1009 *
1010 * The bio_src and offset parameters are both in-out. On entry they
1011 * refer to the first source bio and the offset into that bio where
1012 * the start of data to be cloned is located.
1013 *
1014 * On return, bio_src is updated to refer to the bio in the source
1015 * chain that contains first un-cloned byte, and *offset will
1016 * contain the offset of that byte within that bio.
1017 */
1018 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1019 unsigned int *offset,
1020 unsigned int len,
1021 gfp_t gfpmask)
1022 {
1023 struct bio *bi = *bio_src;
1024 unsigned int off = *offset;
1025 struct bio *chain = NULL;
1026 struct bio **end;
1027
1028 /* Build up a chain of clone bios up to the limit */
1029
1030 if (!bi || off >= bi->bi_size || !len)
1031 return NULL; /* Nothing to clone */
1032
1033 end = &chain;
1034 while (len) {
1035 unsigned int bi_size;
1036 struct bio *bio;
1037
1038 if (!bi) {
1039 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1040 goto out_err; /* EINVAL; ran out of bio's */
1041 }
1042 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1043 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1044 if (!bio)
1045 goto out_err; /* ENOMEM */
1046
1047 *end = bio;
1048 end = &bio->bi_next;
1049
1050 off += bi_size;
1051 if (off == bi->bi_size) {
1052 bi = bi->bi_next;
1053 off = 0;
1054 }
1055 len -= bi_size;
1056 }
1057 *bio_src = bi;
1058 *offset = off;
1059
1060 return chain;
1061 out_err:
1062 bio_chain_put(chain);
1063
1064 return NULL;
1065 }
1066
1067 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1068 {
1069 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1070 atomic_read(&obj_request->kref.refcount));
1071 kref_get(&obj_request->kref);
1072 }
1073
1074 static void rbd_obj_request_destroy(struct kref *kref);
1075 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1076 {
1077 rbd_assert(obj_request != NULL);
1078 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1079 atomic_read(&obj_request->kref.refcount));
1080 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1081 }
1082
1083 static void rbd_img_request_get(struct rbd_img_request *img_request)
1084 {
1085 dout("%s: img %p (was %d)\n", __func__, img_request,
1086 atomic_read(&img_request->kref.refcount));
1087 kref_get(&img_request->kref);
1088 }
1089
1090 static void rbd_img_request_destroy(struct kref *kref);
1091 static void rbd_img_request_put(struct rbd_img_request *img_request)
1092 {
1093 rbd_assert(img_request != NULL);
1094 dout("%s: img %p (was %d)\n", __func__, img_request,
1095 atomic_read(&img_request->kref.refcount));
1096 kref_put(&img_request->kref, rbd_img_request_destroy);
1097 }
1098
1099 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1100 struct rbd_obj_request *obj_request)
1101 {
1102 rbd_assert(obj_request->img_request == NULL);
1103
1104 rbd_obj_request_get(obj_request);
1105 obj_request->img_request = img_request;
1106 obj_request->which = img_request->obj_request_count;
1107 rbd_assert(obj_request->which != BAD_WHICH);
1108 img_request->obj_request_count++;
1109 list_add_tail(&obj_request->links, &img_request->obj_requests);
1110 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1111 obj_request->which);
1112 }
1113
1114 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1115 struct rbd_obj_request *obj_request)
1116 {
1117 rbd_assert(obj_request->which != BAD_WHICH);
1118
1119 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1120 obj_request->which);
1121 list_del(&obj_request->links);
1122 rbd_assert(img_request->obj_request_count > 0);
1123 img_request->obj_request_count--;
1124 rbd_assert(obj_request->which == img_request->obj_request_count);
1125 obj_request->which = BAD_WHICH;
1126 rbd_assert(obj_request->img_request == img_request);
1127 obj_request->img_request = NULL;
1128 obj_request->callback = NULL;
1129 rbd_obj_request_put(obj_request);
1130 }
1131
1132 static bool obj_request_type_valid(enum obj_request_type type)
1133 {
1134 switch (type) {
1135 case OBJ_REQUEST_NODATA:
1136 case OBJ_REQUEST_BIO:
1137 case OBJ_REQUEST_PAGES:
1138 return true;
1139 default:
1140 return false;
1141 }
1142 }
1143
1144 static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1145 {
1146 struct ceph_osd_req_op *op;
1147 va_list args;
1148 size_t size;
1149
1150 op = kzalloc(sizeof (*op), GFP_NOIO);
1151 if (!op)
1152 return NULL;
1153 op->op = opcode;
1154 va_start(args, opcode);
1155 switch (opcode) {
1156 case CEPH_OSD_OP_READ:
1157 case CEPH_OSD_OP_WRITE:
1158 /* rbd_osd_req_op_create(READ, offset, length) */
1159 /* rbd_osd_req_op_create(WRITE, offset, length) */
1160 op->extent.offset = va_arg(args, u64);
1161 op->extent.length = va_arg(args, u64);
1162 if (opcode == CEPH_OSD_OP_WRITE)
1163 op->payload_len = op->extent.length;
1164 break;
1165 case CEPH_OSD_OP_STAT:
1166 break;
1167 case CEPH_OSD_OP_CALL:
1168 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1169 op->cls.class_name = va_arg(args, char *);
1170 size = strlen(op->cls.class_name);
1171 rbd_assert(size <= (size_t) U8_MAX);
1172 op->cls.class_len = size;
1173 op->payload_len = size;
1174
1175 op->cls.method_name = va_arg(args, char *);
1176 size = strlen(op->cls.method_name);
1177 rbd_assert(size <= (size_t) U8_MAX);
1178 op->cls.method_len = size;
1179 op->payload_len += size;
1180
1181 op->cls.argc = 0;
1182 op->cls.indata = va_arg(args, void *);
1183 size = va_arg(args, size_t);
1184 rbd_assert(size <= (size_t) U32_MAX);
1185 op->cls.indata_len = (u32) size;
1186 op->payload_len += size;
1187 break;
1188 case CEPH_OSD_OP_NOTIFY_ACK:
1189 case CEPH_OSD_OP_WATCH:
1190 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1191 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1192 op->watch.cookie = va_arg(args, u64);
1193 op->watch.ver = va_arg(args, u64);
1194 op->watch.ver = cpu_to_le64(op->watch.ver);
1195 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1196 op->watch.flag = (u8) 1;
1197 break;
1198 default:
1199 rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1200 kfree(op);
1201 op = NULL;
1202 break;
1203 }
1204 va_end(args);
1205
1206 return op;
1207 }
1208
1209 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1210 {
1211 kfree(op);
1212 }
1213
1214 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1215 struct rbd_obj_request *obj_request)
1216 {
1217 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1218
1219 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1220 }
1221
1222 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1223 {
1224 dout("%s: img %p\n", __func__, img_request);
1225 if (img_request->callback)
1226 img_request->callback(img_request);
1227 else
1228 rbd_img_request_put(img_request);
1229 }
1230
1231 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1232
1233 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1234 {
1235 dout("%s: obj %p\n", __func__, obj_request);
1236
1237 return wait_for_completion_interruptible(&obj_request->completion);
1238 }
1239
1240 static void obj_request_done_init(struct rbd_obj_request *obj_request)
1241 {
1242 atomic_set(&obj_request->done, 0);
1243 smp_wmb();
1244 }
1245
1246 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1247 {
1248 int done;
1249
1250 done = atomic_inc_return(&obj_request->done);
1251 if (done > 1) {
1252 struct rbd_img_request *img_request = obj_request->img_request;
1253 struct rbd_device *rbd_dev;
1254
1255 rbd_dev = img_request ? img_request->rbd_dev : NULL;
1256 rbd_warn(rbd_dev, "obj_request %p was already done\n",
1257 obj_request);
1258 }
1259 }
1260
1261 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1262 {
1263 smp_mb();
1264 return atomic_read(&obj_request->done) != 0;
1265 }
1266
1267 static void
1268 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1269 {
1270 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1271 obj_request, obj_request->img_request, obj_request->result,
1272 obj_request->xferred, obj_request->length);
1273 /*
1274 * ENOENT means a hole in the image. We zero-fill the
1275 * entire length of the request. A short read also implies
1276 * zero-fill to the end of the request. Either way we
1277 * update the xferred count to indicate the whole request
1278 * was satisfied.
1279 */
1280 BUG_ON(obj_request->type != OBJ_REQUEST_BIO);
1281 if (obj_request->result == -ENOENT) {
1282 zero_bio_chain(obj_request->bio_list, 0);
1283 obj_request->result = 0;
1284 obj_request->xferred = obj_request->length;
1285 } else if (obj_request->xferred < obj_request->length &&
1286 !obj_request->result) {
1287 zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1288 obj_request->xferred = obj_request->length;
1289 }
1290 obj_request_done_set(obj_request);
1291 }
1292
1293 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1294 {
1295 dout("%s: obj %p cb %p\n", __func__, obj_request,
1296 obj_request->callback);
1297 if (obj_request->callback)
1298 obj_request->callback(obj_request);
1299 else
1300 complete_all(&obj_request->completion);
1301 }
1302
1303 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1304 {
1305 dout("%s: obj %p\n", __func__, obj_request);
1306 obj_request_done_set(obj_request);
1307 }
1308
1309 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1310 {
1311 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1312 obj_request->result, obj_request->xferred, obj_request->length);
1313 if (obj_request->img_request)
1314 rbd_img_obj_request_read_callback(obj_request);
1315 else
1316 obj_request_done_set(obj_request);
1317 }
1318
1319 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1320 {
1321 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1322 obj_request->result, obj_request->length);
1323 /*
1324 * There is no such thing as a successful short write.
1325 * Our xferred value is the number of bytes transferred
1326 * back. Set it to our originally-requested length.
1327 */
1328 obj_request->xferred = obj_request->length;
1329 obj_request_done_set(obj_request);
1330 }
1331
1332 /*
1333 * For a simple stat call there's nothing to do. We'll do more if
1334 * this is part of a write sequence for a layered image.
1335 */
1336 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1337 {
1338 dout("%s: obj %p\n", __func__, obj_request);
1339 obj_request_done_set(obj_request);
1340 }
1341
1342 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1343 struct ceph_msg *msg)
1344 {
1345 struct rbd_obj_request *obj_request = osd_req->r_priv;
1346 u16 opcode;
1347
1348 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1349 rbd_assert(osd_req == obj_request->osd_req);
1350 rbd_assert(!!obj_request->img_request ^
1351 (obj_request->which == BAD_WHICH));
1352
1353 if (osd_req->r_result < 0)
1354 obj_request->result = osd_req->r_result;
1355 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1356
1357 WARN_ON(osd_req->r_num_ops != 1); /* For now */
1358
1359 /*
1360 * We support a 64-bit length, but ultimately it has to be
1361 * passed to blk_end_request(), which takes an unsigned int.
1362 */
1363 obj_request->xferred = osd_req->r_reply_op_len[0];
1364 rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1365 opcode = osd_req->r_request_ops[0].op;
1366 switch (opcode) {
1367 case CEPH_OSD_OP_READ:
1368 rbd_osd_read_callback(obj_request);
1369 break;
1370 case CEPH_OSD_OP_WRITE:
1371 rbd_osd_write_callback(obj_request);
1372 break;
1373 case CEPH_OSD_OP_STAT:
1374 rbd_osd_stat_callback(obj_request);
1375 break;
1376 case CEPH_OSD_OP_CALL:
1377 case CEPH_OSD_OP_NOTIFY_ACK:
1378 case CEPH_OSD_OP_WATCH:
1379 rbd_osd_trivial_callback(obj_request);
1380 break;
1381 default:
1382 rbd_warn(NULL, "%s: unsupported op %hu\n",
1383 obj_request->object_name, (unsigned short) opcode);
1384 break;
1385 }
1386
1387 if (obj_request_done_test(obj_request))
1388 rbd_obj_request_complete(obj_request);
1389 }
1390
1391 static struct ceph_osd_request *rbd_osd_req_create(
1392 struct rbd_device *rbd_dev,
1393 bool write_request,
1394 struct rbd_obj_request *obj_request,
1395 struct ceph_osd_req_op *op)
1396 {
1397 struct rbd_img_request *img_request = obj_request->img_request;
1398 struct ceph_snap_context *snapc = NULL;
1399 struct ceph_osd_client *osdc;
1400 struct ceph_osd_request *osd_req;
1401 struct timespec now;
1402 struct timespec *mtime;
1403 u64 snap_id = CEPH_NOSNAP;
1404 u64 offset = obj_request->offset;
1405 u64 length = obj_request->length;
1406
1407 if (img_request) {
1408 rbd_assert(img_request->write_request == write_request);
1409 if (img_request->write_request)
1410 snapc = img_request->snapc;
1411 else
1412 snap_id = img_request->snap_id;
1413 }
1414
1415 /* Allocate and initialize the request, for the single op */
1416
1417 osdc = &rbd_dev->rbd_client->client->osdc;
1418 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1419 if (!osd_req)
1420 return NULL; /* ENOMEM */
1421
1422 rbd_assert(obj_request_type_valid(obj_request->type));
1423 switch (obj_request->type) {
1424 case OBJ_REQUEST_NODATA:
1425 break; /* Nothing to do */
1426 case OBJ_REQUEST_BIO:
1427 rbd_assert(obj_request->bio_list != NULL);
1428 osd_req->r_bio = obj_request->bio_list;
1429 break;
1430 case OBJ_REQUEST_PAGES:
1431 osd_req->r_pages = obj_request->pages;
1432 osd_req->r_num_pages = obj_request->page_count;
1433 osd_req->r_page_alignment = offset & ~PAGE_MASK;
1434 break;
1435 }
1436
1437 if (write_request) {
1438 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1439 now = CURRENT_TIME;
1440 mtime = &now;
1441 } else {
1442 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1443 mtime = NULL; /* not needed for reads */
1444 offset = 0; /* These are not used... */
1445 length = 0; /* ...for osd read requests */
1446 }
1447
1448 osd_req->r_callback = rbd_osd_req_callback;
1449 osd_req->r_priv = obj_request;
1450
1451 osd_req->r_oid_len = strlen(obj_request->object_name);
1452 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1453 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1454
1455 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1456
1457 /* osd_req will get its own reference to snapc (if non-null) */
1458
1459 ceph_osdc_build_request(osd_req, offset, length, 1, op,
1460 snapc, snap_id, mtime);
1461
1462 return osd_req;
1463 }
1464
1465 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1466 {
1467 ceph_osdc_put_request(osd_req);
1468 }
1469
1470 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1471
1472 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1473 u64 offset, u64 length,
1474 enum obj_request_type type)
1475 {
1476 struct rbd_obj_request *obj_request;
1477 size_t size;
1478 char *name;
1479
1480 rbd_assert(obj_request_type_valid(type));
1481
1482 size = strlen(object_name) + 1;
1483 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1484 if (!obj_request)
1485 return NULL;
1486
1487 name = (char *)(obj_request + 1);
1488 obj_request->object_name = memcpy(name, object_name, size);
1489 obj_request->offset = offset;
1490 obj_request->length = length;
1491 obj_request->which = BAD_WHICH;
1492 obj_request->type = type;
1493 INIT_LIST_HEAD(&obj_request->links);
1494 obj_request_done_init(obj_request);
1495 init_completion(&obj_request->completion);
1496 kref_init(&obj_request->kref);
1497
1498 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1499 offset, length, (int)type, obj_request);
1500
1501 return obj_request;
1502 }
1503
1504 static void rbd_obj_request_destroy(struct kref *kref)
1505 {
1506 struct rbd_obj_request *obj_request;
1507
1508 obj_request = container_of(kref, struct rbd_obj_request, kref);
1509
1510 dout("%s: obj %p\n", __func__, obj_request);
1511
1512 rbd_assert(obj_request->img_request == NULL);
1513 rbd_assert(obj_request->which == BAD_WHICH);
1514
1515 if (obj_request->osd_req)
1516 rbd_osd_req_destroy(obj_request->osd_req);
1517
1518 rbd_assert(obj_request_type_valid(obj_request->type));
1519 switch (obj_request->type) {
1520 case OBJ_REQUEST_NODATA:
1521 break; /* Nothing to do */
1522 case OBJ_REQUEST_BIO:
1523 if (obj_request->bio_list)
1524 bio_chain_put(obj_request->bio_list);
1525 break;
1526 case OBJ_REQUEST_PAGES:
1527 if (obj_request->pages)
1528 ceph_release_page_vector(obj_request->pages,
1529 obj_request->page_count);
1530 break;
1531 }
1532
1533 kfree(obj_request);
1534 }
1535
1536 /*
1537 * Caller is responsible for filling in the list of object requests
1538 * that comprises the image request, and the Linux request pointer
1539 * (if there is one).
1540 */
1541 static struct rbd_img_request *rbd_img_request_create(
1542 struct rbd_device *rbd_dev,
1543 u64 offset, u64 length,
1544 bool write_request)
1545 {
1546 struct rbd_img_request *img_request;
1547 struct ceph_snap_context *snapc = NULL;
1548
1549 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1550 if (!img_request)
1551 return NULL;
1552
1553 if (write_request) {
1554 down_read(&rbd_dev->header_rwsem);
1555 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1556 up_read(&rbd_dev->header_rwsem);
1557 if (WARN_ON(!snapc)) {
1558 kfree(img_request);
1559 return NULL; /* Shouldn't happen */
1560 }
1561 }
1562
1563 img_request->rq = NULL;
1564 img_request->rbd_dev = rbd_dev;
1565 img_request->offset = offset;
1566 img_request->length = length;
1567 img_request->write_request = write_request;
1568 if (write_request)
1569 img_request->snapc = snapc;
1570 else
1571 img_request->snap_id = rbd_dev->spec->snap_id;
1572 spin_lock_init(&img_request->completion_lock);
1573 img_request->next_completion = 0;
1574 img_request->callback = NULL;
1575 img_request->obj_request_count = 0;
1576 INIT_LIST_HEAD(&img_request->obj_requests);
1577 kref_init(&img_request->kref);
1578
1579 rbd_img_request_get(img_request); /* Avoid a warning */
1580 rbd_img_request_put(img_request); /* TEMPORARY */
1581
1582 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1583 write_request ? "write" : "read", offset, length,
1584 img_request);
1585
1586 return img_request;
1587 }
1588
1589 static void rbd_img_request_destroy(struct kref *kref)
1590 {
1591 struct rbd_img_request *img_request;
1592 struct rbd_obj_request *obj_request;
1593 struct rbd_obj_request *next_obj_request;
1594
1595 img_request = container_of(kref, struct rbd_img_request, kref);
1596
1597 dout("%s: img %p\n", __func__, img_request);
1598
1599 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1600 rbd_img_obj_request_del(img_request, obj_request);
1601 rbd_assert(img_request->obj_request_count == 0);
1602
1603 if (img_request->write_request)
1604 ceph_put_snap_context(img_request->snapc);
1605
1606 kfree(img_request);
1607 }
1608
1609 static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1610 struct bio *bio_list)
1611 {
1612 struct rbd_device *rbd_dev = img_request->rbd_dev;
1613 struct rbd_obj_request *obj_request = NULL;
1614 struct rbd_obj_request *next_obj_request;
1615 unsigned int bio_offset;
1616 u64 image_offset;
1617 u64 resid;
1618 u16 opcode;
1619
1620 dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1621
1622 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1623 : CEPH_OSD_OP_READ;
1624 bio_offset = 0;
1625 image_offset = img_request->offset;
1626 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1627 resid = img_request->length;
1628 rbd_assert(resid > 0);
1629 while (resid) {
1630 const char *object_name;
1631 unsigned int clone_size;
1632 struct ceph_osd_req_op *op;
1633 u64 offset;
1634 u64 length;
1635
1636 object_name = rbd_segment_name(rbd_dev, image_offset);
1637 if (!object_name)
1638 goto out_unwind;
1639 offset = rbd_segment_offset(rbd_dev, image_offset);
1640 length = rbd_segment_length(rbd_dev, image_offset, resid);
1641 obj_request = rbd_obj_request_create(object_name,
1642 offset, length,
1643 OBJ_REQUEST_BIO);
1644 kfree(object_name); /* object request has its own copy */
1645 if (!obj_request)
1646 goto out_unwind;
1647
1648 rbd_assert(length <= (u64) UINT_MAX);
1649 clone_size = (unsigned int) length;
1650 obj_request->bio_list = bio_chain_clone_range(&bio_list,
1651 &bio_offset, clone_size,
1652 GFP_ATOMIC);
1653 if (!obj_request->bio_list)
1654 goto out_partial;
1655
1656 /*
1657 * Build up the op to use in building the osd
1658 * request. Note that the contents of the op are
1659 * copied by rbd_osd_req_create().
1660 */
1661 op = rbd_osd_req_op_create(opcode, offset, length);
1662 if (!op)
1663 goto out_partial;
1664 obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1665 img_request->write_request,
1666 obj_request, op);
1667 rbd_osd_req_op_destroy(op);
1668 if (!obj_request->osd_req)
1669 goto out_partial;
1670 /* status and version are initially zero-filled */
1671
1672 rbd_img_obj_request_add(img_request, obj_request);
1673
1674 image_offset += length;
1675 resid -= length;
1676 }
1677
1678 return 0;
1679
1680 out_partial:
1681 rbd_obj_request_put(obj_request);
1682 out_unwind:
1683 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1684 rbd_obj_request_put(obj_request);
1685
1686 return -ENOMEM;
1687 }
1688
1689 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1690 {
1691 struct rbd_img_request *img_request;
1692 u32 which = obj_request->which;
1693 bool more = true;
1694
1695 img_request = obj_request->img_request;
1696
1697 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1698 rbd_assert(img_request != NULL);
1699 rbd_assert(img_request->rq != NULL);
1700 rbd_assert(img_request->obj_request_count > 0);
1701 rbd_assert(which != BAD_WHICH);
1702 rbd_assert(which < img_request->obj_request_count);
1703 rbd_assert(which >= img_request->next_completion);
1704
1705 spin_lock_irq(&img_request->completion_lock);
1706 if (which != img_request->next_completion)
1707 goto out;
1708
1709 for_each_obj_request_from(img_request, obj_request) {
1710 unsigned int xferred;
1711 int result;
1712
1713 rbd_assert(more);
1714 rbd_assert(which < img_request->obj_request_count);
1715
1716 if (!obj_request_done_test(obj_request))
1717 break;
1718
1719 rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1720 xferred = (unsigned int) obj_request->xferred;
1721 result = (int) obj_request->result;
1722 if (result)
1723 rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1724 img_request->write_request ? "write" : "read",
1725 result, xferred);
1726
1727 more = blk_end_request(img_request->rq, result, xferred);
1728 which++;
1729 }
1730
1731 rbd_assert(more ^ (which == img_request->obj_request_count));
1732 img_request->next_completion = which;
1733 out:
1734 spin_unlock_irq(&img_request->completion_lock);
1735
1736 if (!more)
1737 rbd_img_request_complete(img_request);
1738 }
1739
1740 static int rbd_img_request_submit(struct rbd_img_request *img_request)
1741 {
1742 struct rbd_device *rbd_dev = img_request->rbd_dev;
1743 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1744 struct rbd_obj_request *obj_request;
1745
1746 dout("%s: img %p\n", __func__, img_request);
1747 for_each_obj_request(img_request, obj_request) {
1748 int ret;
1749
1750 obj_request->callback = rbd_img_obj_callback;
1751 ret = rbd_obj_request_submit(osdc, obj_request);
1752 if (ret)
1753 return ret;
1754 /*
1755 * The image request has its own reference to each
1756 * of its object requests, so we can safely drop the
1757 * initial one here.
1758 */
1759 rbd_obj_request_put(obj_request);
1760 }
1761
1762 return 0;
1763 }
1764
1765 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1766 u64 ver, u64 notify_id)
1767 {
1768 struct rbd_obj_request *obj_request;
1769 struct ceph_osd_req_op *op;
1770 struct ceph_osd_client *osdc;
1771 int ret;
1772
1773 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1774 OBJ_REQUEST_NODATA);
1775 if (!obj_request)
1776 return -ENOMEM;
1777
1778 ret = -ENOMEM;
1779 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1780 if (!op)
1781 goto out;
1782 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1783 obj_request, op);
1784 rbd_osd_req_op_destroy(op);
1785 if (!obj_request->osd_req)
1786 goto out;
1787
1788 osdc = &rbd_dev->rbd_client->client->osdc;
1789 obj_request->callback = rbd_obj_request_put;
1790 ret = rbd_obj_request_submit(osdc, obj_request);
1791 out:
1792 if (ret)
1793 rbd_obj_request_put(obj_request);
1794
1795 return ret;
1796 }
1797
1798 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1799 {
1800 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1801 u64 hver;
1802 int rc;
1803
1804 if (!rbd_dev)
1805 return;
1806
1807 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1808 rbd_dev->header_name, (unsigned long long) notify_id,
1809 (unsigned int) opcode);
1810 rc = rbd_dev_refresh(rbd_dev, &hver);
1811 if (rc)
1812 rbd_warn(rbd_dev, "got notification but failed to "
1813 " update snaps: %d\n", rc);
1814
1815 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1816 }
1817
1818 /*
1819 * Request sync osd watch/unwatch. The value of "start" determines
1820 * whether a watch request is being initiated or torn down.
1821 */
1822 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1823 {
1824 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1825 struct rbd_obj_request *obj_request;
1826 struct ceph_osd_req_op *op;
1827 int ret;
1828
1829 rbd_assert(start ^ !!rbd_dev->watch_event);
1830 rbd_assert(start ^ !!rbd_dev->watch_request);
1831
1832 if (start) {
1833 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1834 &rbd_dev->watch_event);
1835 if (ret < 0)
1836 return ret;
1837 rbd_assert(rbd_dev->watch_event != NULL);
1838 }
1839
1840 ret = -ENOMEM;
1841 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1842 OBJ_REQUEST_NODATA);
1843 if (!obj_request)
1844 goto out_cancel;
1845
1846 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1847 rbd_dev->watch_event->cookie,
1848 rbd_dev->header.obj_version, start);
1849 if (!op)
1850 goto out_cancel;
1851 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1852 obj_request, op);
1853 rbd_osd_req_op_destroy(op);
1854 if (!obj_request->osd_req)
1855 goto out_cancel;
1856
1857 if (start)
1858 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1859 else
1860 ceph_osdc_unregister_linger_request(osdc,
1861 rbd_dev->watch_request->osd_req);
1862 ret = rbd_obj_request_submit(osdc, obj_request);
1863 if (ret)
1864 goto out_cancel;
1865 ret = rbd_obj_request_wait(obj_request);
1866 if (ret)
1867 goto out_cancel;
1868 ret = obj_request->result;
1869 if (ret)
1870 goto out_cancel;
1871
1872 /*
1873 * A watch request is set to linger, so the underlying osd
1874 * request won't go away until we unregister it. We retain
1875 * a pointer to the object request during that time (in
1876 * rbd_dev->watch_request), so we'll keep a reference to
1877 * it. We'll drop that reference (below) after we've
1878 * unregistered it.
1879 */
1880 if (start) {
1881 rbd_dev->watch_request = obj_request;
1882
1883 return 0;
1884 }
1885
1886 /* We have successfully torn down the watch request */
1887
1888 rbd_obj_request_put(rbd_dev->watch_request);
1889 rbd_dev->watch_request = NULL;
1890 out_cancel:
1891 /* Cancel the event if we're tearing down, or on error */
1892 ceph_osdc_cancel_event(rbd_dev->watch_event);
1893 rbd_dev->watch_event = NULL;
1894 if (obj_request)
1895 rbd_obj_request_put(obj_request);
1896
1897 return ret;
1898 }
1899
1900 /*
1901 * Synchronous osd object method call
1902 */
1903 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1904 const char *object_name,
1905 const char *class_name,
1906 const char *method_name,
1907 const char *outbound,
1908 size_t outbound_size,
1909 char *inbound,
1910 size_t inbound_size,
1911 u64 *version)
1912 {
1913 struct rbd_obj_request *obj_request;
1914 struct ceph_osd_client *osdc;
1915 struct ceph_osd_req_op *op;
1916 struct page **pages;
1917 u32 page_count;
1918 int ret;
1919
1920 /*
1921 * Method calls are ultimately read operations but they
1922 * don't involve object data (so no offset or length).
1923 * The result should placed into the inbound buffer
1924 * provided. They also supply outbound data--parameters for
1925 * the object method. Currently if this is present it will
1926 * be a snapshot id.
1927 */
1928 page_count = (u32) calc_pages_for(0, inbound_size);
1929 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1930 if (IS_ERR(pages))
1931 return PTR_ERR(pages);
1932
1933 ret = -ENOMEM;
1934 obj_request = rbd_obj_request_create(object_name, 0, 0,
1935 OBJ_REQUEST_PAGES);
1936 if (!obj_request)
1937 goto out;
1938
1939 obj_request->pages = pages;
1940 obj_request->page_count = page_count;
1941
1942 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1943 method_name, outbound, outbound_size);
1944 if (!op)
1945 goto out;
1946 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1947 obj_request, op);
1948 rbd_osd_req_op_destroy(op);
1949 if (!obj_request->osd_req)
1950 goto out;
1951
1952 osdc = &rbd_dev->rbd_client->client->osdc;
1953 ret = rbd_obj_request_submit(osdc, obj_request);
1954 if (ret)
1955 goto out;
1956 ret = rbd_obj_request_wait(obj_request);
1957 if (ret)
1958 goto out;
1959
1960 ret = obj_request->result;
1961 if (ret < 0)
1962 goto out;
1963 ret = 0;
1964 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1965 if (version)
1966 *version = obj_request->version;
1967 out:
1968 if (obj_request)
1969 rbd_obj_request_put(obj_request);
1970 else
1971 ceph_release_page_vector(pages, page_count);
1972
1973 return ret;
1974 }
1975
1976 static void rbd_request_fn(struct request_queue *q)
1977 __releases(q->queue_lock) __acquires(q->queue_lock)
1978 {
1979 struct rbd_device *rbd_dev = q->queuedata;
1980 bool read_only = rbd_dev->mapping.read_only;
1981 struct request *rq;
1982 int result;
1983
1984 while ((rq = blk_fetch_request(q))) {
1985 bool write_request = rq_data_dir(rq) == WRITE;
1986 struct rbd_img_request *img_request;
1987 u64 offset;
1988 u64 length;
1989
1990 /* Ignore any non-FS requests that filter through. */
1991
1992 if (rq->cmd_type != REQ_TYPE_FS) {
1993 dout("%s: non-fs request type %d\n", __func__,
1994 (int) rq->cmd_type);
1995 __blk_end_request_all(rq, 0);
1996 continue;
1997 }
1998
1999 /* Ignore/skip any zero-length requests */
2000
2001 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2002 length = (u64) blk_rq_bytes(rq);
2003
2004 if (!length) {
2005 dout("%s: zero-length request\n", __func__);
2006 __blk_end_request_all(rq, 0);
2007 continue;
2008 }
2009
2010 spin_unlock_irq(q->queue_lock);
2011
2012 /* Disallow writes to a read-only device */
2013
2014 if (write_request) {
2015 result = -EROFS;
2016 if (read_only)
2017 goto end_request;
2018 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2019 }
2020
2021 /*
2022 * Quit early if the mapped snapshot no longer
2023 * exists. It's still possible the snapshot will
2024 * have disappeared by the time our request arrives
2025 * at the osd, but there's no sense in sending it if
2026 * we already know.
2027 */
2028 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2029 dout("request for non-existent snapshot");
2030 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2031 result = -ENXIO;
2032 goto end_request;
2033 }
2034
2035 result = -EINVAL;
2036 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2037 goto end_request; /* Shouldn't happen */
2038
2039 result = -ENOMEM;
2040 img_request = rbd_img_request_create(rbd_dev, offset, length,
2041 write_request);
2042 if (!img_request)
2043 goto end_request;
2044
2045 img_request->rq = rq;
2046
2047 result = rbd_img_request_fill_bio(img_request, rq->bio);
2048 if (!result)
2049 result = rbd_img_request_submit(img_request);
2050 if (result)
2051 rbd_img_request_put(img_request);
2052 end_request:
2053 spin_lock_irq(q->queue_lock);
2054 if (result < 0) {
2055 rbd_warn(rbd_dev, "obj_request %s result %d\n",
2056 write_request ? "write" : "read", result);
2057 __blk_end_request_all(rq, result);
2058 }
2059 }
2060 }
2061
2062 /*
2063 * a queue callback. Makes sure that we don't create a bio that spans across
2064 * multiple osd objects. One exception would be with a single page bios,
2065 * which we handle later at bio_chain_clone_range()
2066 */
2067 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2068 struct bio_vec *bvec)
2069 {
2070 struct rbd_device *rbd_dev = q->queuedata;
2071 sector_t sector_offset;
2072 sector_t sectors_per_obj;
2073 sector_t obj_sector_offset;
2074 int ret;
2075
2076 /*
2077 * Find how far into its rbd object the partition-relative
2078 * bio start sector is to offset relative to the enclosing
2079 * device.
2080 */
2081 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2082 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2083 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2084
2085 /*
2086 * Compute the number of bytes from that offset to the end
2087 * of the object. Account for what's already used by the bio.
2088 */
2089 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2090 if (ret > bmd->bi_size)
2091 ret -= bmd->bi_size;
2092 else
2093 ret = 0;
2094
2095 /*
2096 * Don't send back more than was asked for. And if the bio
2097 * was empty, let the whole thing through because: "Note
2098 * that a block device *must* allow a single page to be
2099 * added to an empty bio."
2100 */
2101 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2102 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2103 ret = (int) bvec->bv_len;
2104
2105 return ret;
2106 }
2107
2108 static void rbd_free_disk(struct rbd_device *rbd_dev)
2109 {
2110 struct gendisk *disk = rbd_dev->disk;
2111
2112 if (!disk)
2113 return;
2114
2115 if (disk->flags & GENHD_FL_UP)
2116 del_gendisk(disk);
2117 if (disk->queue)
2118 blk_cleanup_queue(disk->queue);
2119 put_disk(disk);
2120 }
2121
2122 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2123 const char *object_name,
2124 u64 offset, u64 length,
2125 char *buf, u64 *version)
2126
2127 {
2128 struct ceph_osd_req_op *op;
2129 struct rbd_obj_request *obj_request;
2130 struct ceph_osd_client *osdc;
2131 struct page **pages = NULL;
2132 u32 page_count;
2133 size_t size;
2134 int ret;
2135
2136 page_count = (u32) calc_pages_for(offset, length);
2137 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2138 if (IS_ERR(pages))
2139 ret = PTR_ERR(pages);
2140
2141 ret = -ENOMEM;
2142 obj_request = rbd_obj_request_create(object_name, offset, length,
2143 OBJ_REQUEST_PAGES);
2144 if (!obj_request)
2145 goto out;
2146
2147 obj_request->pages = pages;
2148 obj_request->page_count = page_count;
2149
2150 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2151 if (!op)
2152 goto out;
2153 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2154 obj_request, op);
2155 rbd_osd_req_op_destroy(op);
2156 if (!obj_request->osd_req)
2157 goto out;
2158
2159 osdc = &rbd_dev->rbd_client->client->osdc;
2160 ret = rbd_obj_request_submit(osdc, obj_request);
2161 if (ret)
2162 goto out;
2163 ret = rbd_obj_request_wait(obj_request);
2164 if (ret)
2165 goto out;
2166
2167 ret = obj_request->result;
2168 if (ret < 0)
2169 goto out;
2170
2171 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2172 size = (size_t) obj_request->xferred;
2173 ceph_copy_from_page_vector(pages, buf, 0, size);
2174 rbd_assert(size <= (size_t) INT_MAX);
2175 ret = (int) size;
2176 if (version)
2177 *version = obj_request->version;
2178 out:
2179 if (obj_request)
2180 rbd_obj_request_put(obj_request);
2181 else
2182 ceph_release_page_vector(pages, page_count);
2183
2184 return ret;
2185 }
2186
2187 /*
2188 * Read the complete header for the given rbd device.
2189 *
2190 * Returns a pointer to a dynamically-allocated buffer containing
2191 * the complete and validated header. Caller can pass the address
2192 * of a variable that will be filled in with the version of the
2193 * header object at the time it was read.
2194 *
2195 * Returns a pointer-coded errno if a failure occurs.
2196 */
2197 static struct rbd_image_header_ondisk *
2198 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2199 {
2200 struct rbd_image_header_ondisk *ondisk = NULL;
2201 u32 snap_count = 0;
2202 u64 names_size = 0;
2203 u32 want_count;
2204 int ret;
2205
2206 /*
2207 * The complete header will include an array of its 64-bit
2208 * snapshot ids, followed by the names of those snapshots as
2209 * a contiguous block of NUL-terminated strings. Note that
2210 * the number of snapshots could change by the time we read
2211 * it in, in which case we re-read it.
2212 */
2213 do {
2214 size_t size;
2215
2216 kfree(ondisk);
2217
2218 size = sizeof (*ondisk);
2219 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2220 size += names_size;
2221 ondisk = kmalloc(size, GFP_KERNEL);
2222 if (!ondisk)
2223 return ERR_PTR(-ENOMEM);
2224
2225 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2226 0, size,
2227 (char *) ondisk, version);
2228 if (ret < 0)
2229 goto out_err;
2230 if (WARN_ON((size_t) ret < size)) {
2231 ret = -ENXIO;
2232 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2233 size, ret);
2234 goto out_err;
2235 }
2236 if (!rbd_dev_ondisk_valid(ondisk)) {
2237 ret = -ENXIO;
2238 rbd_warn(rbd_dev, "invalid header");
2239 goto out_err;
2240 }
2241
2242 names_size = le64_to_cpu(ondisk->snap_names_len);
2243 want_count = snap_count;
2244 snap_count = le32_to_cpu(ondisk->snap_count);
2245 } while (snap_count != want_count);
2246
2247 return ondisk;
2248
2249 out_err:
2250 kfree(ondisk);
2251
2252 return ERR_PTR(ret);
2253 }
2254
2255 /*
2256 * reload the ondisk the header
2257 */
2258 static int rbd_read_header(struct rbd_device *rbd_dev,
2259 struct rbd_image_header *header)
2260 {
2261 struct rbd_image_header_ondisk *ondisk;
2262 u64 ver = 0;
2263 int ret;
2264
2265 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2266 if (IS_ERR(ondisk))
2267 return PTR_ERR(ondisk);
2268 ret = rbd_header_from_disk(header, ondisk);
2269 if (ret >= 0)
2270 header->obj_version = ver;
2271 kfree(ondisk);
2272
2273 return ret;
2274 }
2275
2276 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2277 {
2278 struct rbd_snap *snap;
2279 struct rbd_snap *next;
2280
2281 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2282 rbd_remove_snap_dev(snap);
2283 }
2284
2285 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2286 {
2287 sector_t size;
2288
2289 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2290 return;
2291
2292 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2293 dout("setting size to %llu sectors", (unsigned long long) size);
2294 rbd_dev->mapping.size = (u64) size;
2295 set_capacity(rbd_dev->disk, size);
2296 }
2297
2298 /*
2299 * only read the first part of the ondisk header, without the snaps info
2300 */
2301 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2302 {
2303 int ret;
2304 struct rbd_image_header h;
2305
2306 ret = rbd_read_header(rbd_dev, &h);
2307 if (ret < 0)
2308 return ret;
2309
2310 down_write(&rbd_dev->header_rwsem);
2311
2312 /* Update image size, and check for resize of mapped image */
2313 rbd_dev->header.image_size = h.image_size;
2314 rbd_update_mapping_size(rbd_dev);
2315
2316 /* rbd_dev->header.object_prefix shouldn't change */
2317 kfree(rbd_dev->header.snap_sizes);
2318 kfree(rbd_dev->header.snap_names);
2319 /* osd requests may still refer to snapc */
2320 ceph_put_snap_context(rbd_dev->header.snapc);
2321
2322 if (hver)
2323 *hver = h.obj_version;
2324 rbd_dev->header.obj_version = h.obj_version;
2325 rbd_dev->header.image_size = h.image_size;
2326 rbd_dev->header.snapc = h.snapc;
2327 rbd_dev->header.snap_names = h.snap_names;
2328 rbd_dev->header.snap_sizes = h.snap_sizes;
2329 /* Free the extra copy of the object prefix */
2330 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2331 kfree(h.object_prefix);
2332
2333 ret = rbd_dev_snaps_update(rbd_dev);
2334 if (!ret)
2335 ret = rbd_dev_snaps_register(rbd_dev);
2336
2337 up_write(&rbd_dev->header_rwsem);
2338
2339 return ret;
2340 }
2341
2342 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2343 {
2344 int ret;
2345
2346 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2347 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2348 if (rbd_dev->image_format == 1)
2349 ret = rbd_dev_v1_refresh(rbd_dev, hver);
2350 else
2351 ret = rbd_dev_v2_refresh(rbd_dev, hver);
2352 mutex_unlock(&ctl_mutex);
2353
2354 return ret;
2355 }
2356
2357 static int rbd_init_disk(struct rbd_device *rbd_dev)
2358 {
2359 struct gendisk *disk;
2360 struct request_queue *q;
2361 u64 segment_size;
2362
2363 /* create gendisk info */
2364 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2365 if (!disk)
2366 return -ENOMEM;
2367
2368 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2369 rbd_dev->dev_id);
2370 disk->major = rbd_dev->major;
2371 disk->first_minor = 0;
2372 disk->fops = &rbd_bd_ops;
2373 disk->private_data = rbd_dev;
2374
2375 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2376 if (!q)
2377 goto out_disk;
2378
2379 /* We use the default size, but let's be explicit about it. */
2380 blk_queue_physical_block_size(q, SECTOR_SIZE);
2381
2382 /* set io sizes to object size */
2383 segment_size = rbd_obj_bytes(&rbd_dev->header);
2384 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2385 blk_queue_max_segment_size(q, segment_size);
2386 blk_queue_io_min(q, segment_size);
2387 blk_queue_io_opt(q, segment_size);
2388
2389 blk_queue_merge_bvec(q, rbd_merge_bvec);
2390 disk->queue = q;
2391
2392 q->queuedata = rbd_dev;
2393
2394 rbd_dev->disk = disk;
2395
2396 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2397
2398 return 0;
2399 out_disk:
2400 put_disk(disk);
2401
2402 return -ENOMEM;
2403 }
2404
2405 /*
2406 sysfs
2407 */
2408
2409 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2410 {
2411 return container_of(dev, struct rbd_device, dev);
2412 }
2413
2414 static ssize_t rbd_size_show(struct device *dev,
2415 struct device_attribute *attr, char *buf)
2416 {
2417 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2418 sector_t size;
2419
2420 down_read(&rbd_dev->header_rwsem);
2421 size = get_capacity(rbd_dev->disk);
2422 up_read(&rbd_dev->header_rwsem);
2423
2424 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2425 }
2426
2427 /*
2428 * Note this shows the features for whatever's mapped, which is not
2429 * necessarily the base image.
2430 */
2431 static ssize_t rbd_features_show(struct device *dev,
2432 struct device_attribute *attr, char *buf)
2433 {
2434 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2435
2436 return sprintf(buf, "0x%016llx\n",
2437 (unsigned long long) rbd_dev->mapping.features);
2438 }
2439
2440 static ssize_t rbd_major_show(struct device *dev,
2441 struct device_attribute *attr, char *buf)
2442 {
2443 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2444
2445 return sprintf(buf, "%d\n", rbd_dev->major);
2446 }
2447
2448 static ssize_t rbd_client_id_show(struct device *dev,
2449 struct device_attribute *attr, char *buf)
2450 {
2451 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2452
2453 return sprintf(buf, "client%lld\n",
2454 ceph_client_id(rbd_dev->rbd_client->client));
2455 }
2456
2457 static ssize_t rbd_pool_show(struct device *dev,
2458 struct device_attribute *attr, char *buf)
2459 {
2460 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2461
2462 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2463 }
2464
2465 static ssize_t rbd_pool_id_show(struct device *dev,
2466 struct device_attribute *attr, char *buf)
2467 {
2468 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2469
2470 return sprintf(buf, "%llu\n",
2471 (unsigned long long) rbd_dev->spec->pool_id);
2472 }
2473
2474 static ssize_t rbd_name_show(struct device *dev,
2475 struct device_attribute *attr, char *buf)
2476 {
2477 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2478
2479 if (rbd_dev->spec->image_name)
2480 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2481
2482 return sprintf(buf, "(unknown)\n");
2483 }
2484
2485 static ssize_t rbd_image_id_show(struct device *dev,
2486 struct device_attribute *attr, char *buf)
2487 {
2488 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2489
2490 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2491 }
2492
2493 /*
2494 * Shows the name of the currently-mapped snapshot (or
2495 * RBD_SNAP_HEAD_NAME for the base image).
2496 */
2497 static ssize_t rbd_snap_show(struct device *dev,
2498 struct device_attribute *attr,
2499 char *buf)
2500 {
2501 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2502
2503 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2504 }
2505
2506 /*
2507 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2508 * for the parent image. If there is no parent, simply shows
2509 * "(no parent image)".
2510 */
2511 static ssize_t rbd_parent_show(struct device *dev,
2512 struct device_attribute *attr,
2513 char *buf)
2514 {
2515 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2516 struct rbd_spec *spec = rbd_dev->parent_spec;
2517 int count;
2518 char *bufp = buf;
2519
2520 if (!spec)
2521 return sprintf(buf, "(no parent image)\n");
2522
2523 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2524 (unsigned long long) spec->pool_id, spec->pool_name);
2525 if (count < 0)
2526 return count;
2527 bufp += count;
2528
2529 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2530 spec->image_name ? spec->image_name : "(unknown)");
2531 if (count < 0)
2532 return count;
2533 bufp += count;
2534
2535 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2536 (unsigned long long) spec->snap_id, spec->snap_name);
2537 if (count < 0)
2538 return count;
2539 bufp += count;
2540
2541 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2542 if (count < 0)
2543 return count;
2544 bufp += count;
2545
2546 return (ssize_t) (bufp - buf);
2547 }
2548
2549 static ssize_t rbd_image_refresh(struct device *dev,
2550 struct device_attribute *attr,
2551 const char *buf,
2552 size_t size)
2553 {
2554 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2555 int ret;
2556
2557 ret = rbd_dev_refresh(rbd_dev, NULL);
2558
2559 return ret < 0 ? ret : size;
2560 }
2561
2562 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2563 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2564 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2565 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2566 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2567 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2568 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2569 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2570 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2571 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2572 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2573
2574 static struct attribute *rbd_attrs[] = {
2575 &dev_attr_size.attr,
2576 &dev_attr_features.attr,
2577 &dev_attr_major.attr,
2578 &dev_attr_client_id.attr,
2579 &dev_attr_pool.attr,
2580 &dev_attr_pool_id.attr,
2581 &dev_attr_name.attr,
2582 &dev_attr_image_id.attr,
2583 &dev_attr_current_snap.attr,
2584 &dev_attr_parent.attr,
2585 &dev_attr_refresh.attr,
2586 NULL
2587 };
2588
2589 static struct attribute_group rbd_attr_group = {
2590 .attrs = rbd_attrs,
2591 };
2592
2593 static const struct attribute_group *rbd_attr_groups[] = {
2594 &rbd_attr_group,
2595 NULL
2596 };
2597
2598 static void rbd_sysfs_dev_release(struct device *dev)
2599 {
2600 }
2601
2602 static struct device_type rbd_device_type = {
2603 .name = "rbd",
2604 .groups = rbd_attr_groups,
2605 .release = rbd_sysfs_dev_release,
2606 };
2607
2608
2609 /*
2610 sysfs - snapshots
2611 */
2612
2613 static ssize_t rbd_snap_size_show(struct device *dev,
2614 struct device_attribute *attr,
2615 char *buf)
2616 {
2617 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2618
2619 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2620 }
2621
2622 static ssize_t rbd_snap_id_show(struct device *dev,
2623 struct device_attribute *attr,
2624 char *buf)
2625 {
2626 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2627
2628 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2629 }
2630
2631 static ssize_t rbd_snap_features_show(struct device *dev,
2632 struct device_attribute *attr,
2633 char *buf)
2634 {
2635 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2636
2637 return sprintf(buf, "0x%016llx\n",
2638 (unsigned long long) snap->features);
2639 }
2640
2641 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2642 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2643 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2644
2645 static struct attribute *rbd_snap_attrs[] = {
2646 &dev_attr_snap_size.attr,
2647 &dev_attr_snap_id.attr,
2648 &dev_attr_snap_features.attr,
2649 NULL,
2650 };
2651
2652 static struct attribute_group rbd_snap_attr_group = {
2653 .attrs = rbd_snap_attrs,
2654 };
2655
2656 static void rbd_snap_dev_release(struct device *dev)
2657 {
2658 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2659 kfree(snap->name);
2660 kfree(snap);
2661 }
2662
2663 static const struct attribute_group *rbd_snap_attr_groups[] = {
2664 &rbd_snap_attr_group,
2665 NULL
2666 };
2667
2668 static struct device_type rbd_snap_device_type = {
2669 .groups = rbd_snap_attr_groups,
2670 .release = rbd_snap_dev_release,
2671 };
2672
2673 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2674 {
2675 kref_get(&spec->kref);
2676
2677 return spec;
2678 }
2679
2680 static void rbd_spec_free(struct kref *kref);
2681 static void rbd_spec_put(struct rbd_spec *spec)
2682 {
2683 if (spec)
2684 kref_put(&spec->kref, rbd_spec_free);
2685 }
2686
2687 static struct rbd_spec *rbd_spec_alloc(void)
2688 {
2689 struct rbd_spec *spec;
2690
2691 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2692 if (!spec)
2693 return NULL;
2694 kref_init(&spec->kref);
2695
2696 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2697
2698 return spec;
2699 }
2700
2701 static void rbd_spec_free(struct kref *kref)
2702 {
2703 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2704
2705 kfree(spec->pool_name);
2706 kfree(spec->image_id);
2707 kfree(spec->image_name);
2708 kfree(spec->snap_name);
2709 kfree(spec);
2710 }
2711
2712 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2713 struct rbd_spec *spec)
2714 {
2715 struct rbd_device *rbd_dev;
2716
2717 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2718 if (!rbd_dev)
2719 return NULL;
2720
2721 spin_lock_init(&rbd_dev->lock);
2722 rbd_dev->flags = 0;
2723 INIT_LIST_HEAD(&rbd_dev->node);
2724 INIT_LIST_HEAD(&rbd_dev->snaps);
2725 init_rwsem(&rbd_dev->header_rwsem);
2726
2727 rbd_dev->spec = spec;
2728 rbd_dev->rbd_client = rbdc;
2729
2730 /* Initialize the layout used for all rbd requests */
2731
2732 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2733 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2734 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2735 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2736
2737 return rbd_dev;
2738 }
2739
2740 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2741 {
2742 rbd_spec_put(rbd_dev->parent_spec);
2743 kfree(rbd_dev->header_name);
2744 rbd_put_client(rbd_dev->rbd_client);
2745 rbd_spec_put(rbd_dev->spec);
2746 kfree(rbd_dev);
2747 }
2748
2749 static bool rbd_snap_registered(struct rbd_snap *snap)
2750 {
2751 bool ret = snap->dev.type == &rbd_snap_device_type;
2752 bool reg = device_is_registered(&snap->dev);
2753
2754 rbd_assert(!ret ^ reg);
2755
2756 return ret;
2757 }
2758
2759 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2760 {
2761 list_del(&snap->node);
2762 if (device_is_registered(&snap->dev))
2763 device_unregister(&snap->dev);
2764 }
2765
2766 static int rbd_register_snap_dev(struct rbd_snap *snap,
2767 struct device *parent)
2768 {
2769 struct device *dev = &snap->dev;
2770 int ret;
2771
2772 dev->type = &rbd_snap_device_type;
2773 dev->parent = parent;
2774 dev->release = rbd_snap_dev_release;
2775 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2776 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2777
2778 ret = device_register(dev);
2779
2780 return ret;
2781 }
2782
2783 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2784 const char *snap_name,
2785 u64 snap_id, u64 snap_size,
2786 u64 snap_features)
2787 {
2788 struct rbd_snap *snap;
2789 int ret;
2790
2791 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2792 if (!snap)
2793 return ERR_PTR(-ENOMEM);
2794
2795 ret = -ENOMEM;
2796 snap->name = kstrdup(snap_name, GFP_KERNEL);
2797 if (!snap->name)
2798 goto err;
2799
2800 snap->id = snap_id;
2801 snap->size = snap_size;
2802 snap->features = snap_features;
2803
2804 return snap;
2805
2806 err:
2807 kfree(snap->name);
2808 kfree(snap);
2809
2810 return ERR_PTR(ret);
2811 }
2812
2813 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2814 u64 *snap_size, u64 *snap_features)
2815 {
2816 char *snap_name;
2817
2818 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2819
2820 *snap_size = rbd_dev->header.snap_sizes[which];
2821 *snap_features = 0; /* No features for v1 */
2822
2823 /* Skip over names until we find the one we are looking for */
2824
2825 snap_name = rbd_dev->header.snap_names;
2826 while (which--)
2827 snap_name += strlen(snap_name) + 1;
2828
2829 return snap_name;
2830 }
2831
2832 /*
2833 * Get the size and object order for an image snapshot, or if
2834 * snap_id is CEPH_NOSNAP, gets this information for the base
2835 * image.
2836 */
2837 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2838 u8 *order, u64 *snap_size)
2839 {
2840 __le64 snapid = cpu_to_le64(snap_id);
2841 int ret;
2842 struct {
2843 u8 order;
2844 __le64 size;
2845 } __attribute__ ((packed)) size_buf = { 0 };
2846
2847 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2848 "rbd", "get_size",
2849 (char *) &snapid, sizeof (snapid),
2850 (char *) &size_buf, sizeof (size_buf), NULL);
2851 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2852 if (ret < 0)
2853 return ret;
2854
2855 *order = size_buf.order;
2856 *snap_size = le64_to_cpu(size_buf.size);
2857
2858 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2859 (unsigned long long) snap_id, (unsigned int) *order,
2860 (unsigned long long) *snap_size);
2861
2862 return 0;
2863 }
2864
2865 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2866 {
2867 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2868 &rbd_dev->header.obj_order,
2869 &rbd_dev->header.image_size);
2870 }
2871
2872 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2873 {
2874 void *reply_buf;
2875 int ret;
2876 void *p;
2877
2878 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2879 if (!reply_buf)
2880 return -ENOMEM;
2881
2882 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2883 "rbd", "get_object_prefix",
2884 NULL, 0,
2885 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2886 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2887 if (ret < 0)
2888 goto out;
2889
2890 p = reply_buf;
2891 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2892 p + RBD_OBJ_PREFIX_LEN_MAX,
2893 NULL, GFP_NOIO);
2894
2895 if (IS_ERR(rbd_dev->header.object_prefix)) {
2896 ret = PTR_ERR(rbd_dev->header.object_prefix);
2897 rbd_dev->header.object_prefix = NULL;
2898 } else {
2899 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2900 }
2901
2902 out:
2903 kfree(reply_buf);
2904
2905 return ret;
2906 }
2907
2908 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2909 u64 *snap_features)
2910 {
2911 __le64 snapid = cpu_to_le64(snap_id);
2912 struct {
2913 __le64 features;
2914 __le64 incompat;
2915 } features_buf = { 0 };
2916 u64 incompat;
2917 int ret;
2918
2919 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2920 "rbd", "get_features",
2921 (char *) &snapid, sizeof (snapid),
2922 (char *) &features_buf, sizeof (features_buf),
2923 NULL);
2924 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2925 if (ret < 0)
2926 return ret;
2927
2928 incompat = le64_to_cpu(features_buf.incompat);
2929 if (incompat & ~RBD_FEATURES_ALL)
2930 return -ENXIO;
2931
2932 *snap_features = le64_to_cpu(features_buf.features);
2933
2934 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2935 (unsigned long long) snap_id,
2936 (unsigned long long) *snap_features,
2937 (unsigned long long) le64_to_cpu(features_buf.incompat));
2938
2939 return 0;
2940 }
2941
2942 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2943 {
2944 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2945 &rbd_dev->header.features);
2946 }
2947
2948 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2949 {
2950 struct rbd_spec *parent_spec;
2951 size_t size;
2952 void *reply_buf = NULL;
2953 __le64 snapid;
2954 void *p;
2955 void *end;
2956 char *image_id;
2957 u64 overlap;
2958 int ret;
2959
2960 parent_spec = rbd_spec_alloc();
2961 if (!parent_spec)
2962 return -ENOMEM;
2963
2964 size = sizeof (__le64) + /* pool_id */
2965 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2966 sizeof (__le64) + /* snap_id */
2967 sizeof (__le64); /* overlap */
2968 reply_buf = kmalloc(size, GFP_KERNEL);
2969 if (!reply_buf) {
2970 ret = -ENOMEM;
2971 goto out_err;
2972 }
2973
2974 snapid = cpu_to_le64(CEPH_NOSNAP);
2975 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2976 "rbd", "get_parent",
2977 (char *) &snapid, sizeof (snapid),
2978 (char *) reply_buf, size, NULL);
2979 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2980 if (ret < 0)
2981 goto out_err;
2982
2983 ret = -ERANGE;
2984 p = reply_buf;
2985 end = (char *) reply_buf + size;
2986 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2987 if (parent_spec->pool_id == CEPH_NOPOOL)
2988 goto out; /* No parent? No problem. */
2989
2990 /* The ceph file layout needs to fit pool id in 32 bits */
2991
2992 ret = -EIO;
2993 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2994 goto out;
2995
2996 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2997 if (IS_ERR(image_id)) {
2998 ret = PTR_ERR(image_id);
2999 goto out_err;
3000 }
3001 parent_spec->image_id = image_id;
3002 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3003 ceph_decode_64_safe(&p, end, overlap, out_err);
3004
3005 rbd_dev->parent_overlap = overlap;
3006 rbd_dev->parent_spec = parent_spec;
3007 parent_spec = NULL; /* rbd_dev now owns this */
3008 out:
3009 ret = 0;
3010 out_err:
3011 kfree(reply_buf);
3012 rbd_spec_put(parent_spec);
3013
3014 return ret;
3015 }
3016
3017 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3018 {
3019 size_t image_id_size;
3020 char *image_id;
3021 void *p;
3022 void *end;
3023 size_t size;
3024 void *reply_buf = NULL;
3025 size_t len = 0;
3026 char *image_name = NULL;
3027 int ret;
3028
3029 rbd_assert(!rbd_dev->spec->image_name);
3030
3031 len = strlen(rbd_dev->spec->image_id);
3032 image_id_size = sizeof (__le32) + len;
3033 image_id = kmalloc(image_id_size, GFP_KERNEL);
3034 if (!image_id)
3035 return NULL;
3036
3037 p = image_id;
3038 end = (char *) image_id + image_id_size;
3039 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3040
3041 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3042 reply_buf = kmalloc(size, GFP_KERNEL);
3043 if (!reply_buf)
3044 goto out;
3045
3046 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3047 "rbd", "dir_get_name",
3048 image_id, image_id_size,
3049 (char *) reply_buf, size, NULL);
3050 if (ret < 0)
3051 goto out;
3052 p = reply_buf;
3053 end = (char *) reply_buf + size;
3054 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3055 if (IS_ERR(image_name))
3056 image_name = NULL;
3057 else
3058 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3059 out:
3060 kfree(reply_buf);
3061 kfree(image_id);
3062
3063 return image_name;
3064 }
3065
3066 /*
3067 * When a parent image gets probed, we only have the pool, image,
3068 * and snapshot ids but not the names of any of them. This call
3069 * is made later to fill in those names. It has to be done after
3070 * rbd_dev_snaps_update() has completed because some of the
3071 * information (in particular, snapshot name) is not available
3072 * until then.
3073 */
3074 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3075 {
3076 struct ceph_osd_client *osdc;
3077 const char *name;
3078 void *reply_buf = NULL;
3079 int ret;
3080
3081 if (rbd_dev->spec->pool_name)
3082 return 0; /* Already have the names */
3083
3084 /* Look up the pool name */
3085
3086 osdc = &rbd_dev->rbd_client->client->osdc;
3087 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3088 if (!name) {
3089 rbd_warn(rbd_dev, "there is no pool with id %llu",
3090 rbd_dev->spec->pool_id); /* Really a BUG() */
3091 return -EIO;
3092 }
3093
3094 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3095 if (!rbd_dev->spec->pool_name)
3096 return -ENOMEM;
3097
3098 /* Fetch the image name; tolerate failure here */
3099
3100 name = rbd_dev_image_name(rbd_dev);
3101 if (name)
3102 rbd_dev->spec->image_name = (char *) name;
3103 else
3104 rbd_warn(rbd_dev, "unable to get image name");
3105
3106 /* Look up the snapshot name. */
3107
3108 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3109 if (!name) {
3110 rbd_warn(rbd_dev, "no snapshot with id %llu",
3111 rbd_dev->spec->snap_id); /* Really a BUG() */
3112 ret = -EIO;
3113 goto out_err;
3114 }
3115 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3116 if(!rbd_dev->spec->snap_name)
3117 goto out_err;
3118
3119 return 0;
3120 out_err:
3121 kfree(reply_buf);
3122 kfree(rbd_dev->spec->pool_name);
3123 rbd_dev->spec->pool_name = NULL;
3124
3125 return ret;
3126 }
3127
3128 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3129 {
3130 size_t size;
3131 int ret;
3132 void *reply_buf;
3133 void *p;
3134 void *end;
3135 u64 seq;
3136 u32 snap_count;
3137 struct ceph_snap_context *snapc;
3138 u32 i;
3139
3140 /*
3141 * We'll need room for the seq value (maximum snapshot id),
3142 * snapshot count, and array of that many snapshot ids.
3143 * For now we have a fixed upper limit on the number we're
3144 * prepared to receive.
3145 */
3146 size = sizeof (__le64) + sizeof (__le32) +
3147 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3148 reply_buf = kzalloc(size, GFP_KERNEL);
3149 if (!reply_buf)
3150 return -ENOMEM;
3151
3152 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3153 "rbd", "get_snapcontext",
3154 NULL, 0,
3155 reply_buf, size, ver);
3156 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3157 if (ret < 0)
3158 goto out;
3159
3160 ret = -ERANGE;
3161 p = reply_buf;
3162 end = (char *) reply_buf + size;
3163 ceph_decode_64_safe(&p, end, seq, out);
3164 ceph_decode_32_safe(&p, end, snap_count, out);
3165
3166 /*
3167 * Make sure the reported number of snapshot ids wouldn't go
3168 * beyond the end of our buffer. But before checking that,
3169 * make sure the computed size of the snapshot context we
3170 * allocate is representable in a size_t.
3171 */
3172 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3173 / sizeof (u64)) {
3174 ret = -EINVAL;
3175 goto out;
3176 }
3177 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3178 goto out;
3179
3180 size = sizeof (struct ceph_snap_context) +
3181 snap_count * sizeof (snapc->snaps[0]);
3182 snapc = kmalloc(size, GFP_KERNEL);
3183 if (!snapc) {
3184 ret = -ENOMEM;
3185 goto out;
3186 }
3187
3188 atomic_set(&snapc->nref, 1);
3189 snapc->seq = seq;
3190 snapc->num_snaps = snap_count;
3191 for (i = 0; i < snap_count; i++)
3192 snapc->snaps[i] = ceph_decode_64(&p);
3193
3194 rbd_dev->header.snapc = snapc;
3195
3196 dout(" snap context seq = %llu, snap_count = %u\n",
3197 (unsigned long long) seq, (unsigned int) snap_count);
3198
3199 out:
3200 kfree(reply_buf);
3201
3202 return 0;
3203 }
3204
3205 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3206 {
3207 size_t size;
3208 void *reply_buf;
3209 __le64 snap_id;
3210 int ret;
3211 void *p;
3212 void *end;
3213 char *snap_name;
3214
3215 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3216 reply_buf = kmalloc(size, GFP_KERNEL);
3217 if (!reply_buf)
3218 return ERR_PTR(-ENOMEM);
3219
3220 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3221 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3222 "rbd", "get_snapshot_name",
3223 (char *) &snap_id, sizeof (snap_id),
3224 reply_buf, size, NULL);
3225 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3226 if (ret < 0)
3227 goto out;
3228
3229 p = reply_buf;
3230 end = (char *) reply_buf + size;
3231 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3232 if (IS_ERR(snap_name)) {
3233 ret = PTR_ERR(snap_name);
3234 goto out;
3235 } else {
3236 dout(" snap_id 0x%016llx snap_name = %s\n",
3237 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3238 }
3239 kfree(reply_buf);
3240
3241 return snap_name;
3242 out:
3243 kfree(reply_buf);
3244
3245 return ERR_PTR(ret);
3246 }
3247
3248 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3249 u64 *snap_size, u64 *snap_features)
3250 {
3251 u64 snap_id;
3252 u8 order;
3253 int ret;
3254
3255 snap_id = rbd_dev->header.snapc->snaps[which];
3256 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3257 if (ret)
3258 return ERR_PTR(ret);
3259 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3260 if (ret)
3261 return ERR_PTR(ret);
3262
3263 return rbd_dev_v2_snap_name(rbd_dev, which);
3264 }
3265
3266 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3267 u64 *snap_size, u64 *snap_features)
3268 {
3269 if (rbd_dev->image_format == 1)
3270 return rbd_dev_v1_snap_info(rbd_dev, which,
3271 snap_size, snap_features);
3272 if (rbd_dev->image_format == 2)
3273 return rbd_dev_v2_snap_info(rbd_dev, which,
3274 snap_size, snap_features);
3275 return ERR_PTR(-EINVAL);
3276 }
3277
3278 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3279 {
3280 int ret;
3281 __u8 obj_order;
3282
3283 down_write(&rbd_dev->header_rwsem);
3284
3285 /* Grab old order first, to see if it changes */
3286
3287 obj_order = rbd_dev->header.obj_order,
3288 ret = rbd_dev_v2_image_size(rbd_dev);
3289 if (ret)
3290 goto out;
3291 if (rbd_dev->header.obj_order != obj_order) {
3292 ret = -EIO;
3293 goto out;
3294 }
3295 rbd_update_mapping_size(rbd_dev);
3296
3297 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3298 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3299 if (ret)
3300 goto out;
3301 ret = rbd_dev_snaps_update(rbd_dev);
3302 dout("rbd_dev_snaps_update returned %d\n", ret);
3303 if (ret)
3304 goto out;
3305 ret = rbd_dev_snaps_register(rbd_dev);
3306 dout("rbd_dev_snaps_register returned %d\n", ret);
3307 out:
3308 up_write(&rbd_dev->header_rwsem);
3309
3310 return ret;
3311 }
3312
3313 /*
3314 * Scan the rbd device's current snapshot list and compare it to the
3315 * newly-received snapshot context. Remove any existing snapshots
3316 * not present in the new snapshot context. Add a new snapshot for
3317 * any snaphots in the snapshot context not in the current list.
3318 * And verify there are no changes to snapshots we already know
3319 * about.
3320 *
3321 * Assumes the snapshots in the snapshot context are sorted by
3322 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3323 * are also maintained in that order.)
3324 */
3325 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3326 {
3327 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3328 const u32 snap_count = snapc->num_snaps;
3329 struct list_head *head = &rbd_dev->snaps;
3330 struct list_head *links = head->next;
3331 u32 index = 0;
3332
3333 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3334 while (index < snap_count || links != head) {
3335 u64 snap_id;
3336 struct rbd_snap *snap;
3337 char *snap_name;
3338 u64 snap_size = 0;
3339 u64 snap_features = 0;
3340
3341 snap_id = index < snap_count ? snapc->snaps[index]
3342 : CEPH_NOSNAP;
3343 snap = links != head ? list_entry(links, struct rbd_snap, node)
3344 : NULL;
3345 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3346
3347 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3348 struct list_head *next = links->next;
3349
3350 /*
3351 * A previously-existing snapshot is not in
3352 * the new snap context.
3353 *
3354 * If the now missing snapshot is the one the
3355 * image is mapped to, clear its exists flag
3356 * so we can avoid sending any more requests
3357 * to it.
3358 */
3359 if (rbd_dev->spec->snap_id == snap->id)
3360 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3361 rbd_remove_snap_dev(snap);
3362 dout("%ssnap id %llu has been removed\n",
3363 rbd_dev->spec->snap_id == snap->id ?
3364 "mapped " : "",
3365 (unsigned long long) snap->id);
3366
3367 /* Done with this list entry; advance */
3368
3369 links = next;
3370 continue;
3371 }
3372
3373 snap_name = rbd_dev_snap_info(rbd_dev, index,
3374 &snap_size, &snap_features);
3375 if (IS_ERR(snap_name))
3376 return PTR_ERR(snap_name);
3377
3378 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3379 (unsigned long long) snap_id);
3380 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3381 struct rbd_snap *new_snap;
3382
3383 /* We haven't seen this snapshot before */
3384
3385 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3386 snap_id, snap_size, snap_features);
3387 if (IS_ERR(new_snap)) {
3388 int err = PTR_ERR(new_snap);
3389
3390 dout(" failed to add dev, error %d\n", err);
3391
3392 return err;
3393 }
3394
3395 /* New goes before existing, or at end of list */
3396
3397 dout(" added dev%s\n", snap ? "" : " at end\n");
3398 if (snap)
3399 list_add_tail(&new_snap->node, &snap->node);
3400 else
3401 list_add_tail(&new_snap->node, head);
3402 } else {
3403 /* Already have this one */
3404
3405 dout(" already present\n");
3406
3407 rbd_assert(snap->size == snap_size);
3408 rbd_assert(!strcmp(snap->name, snap_name));
3409 rbd_assert(snap->features == snap_features);
3410
3411 /* Done with this list entry; advance */
3412
3413 links = links->next;
3414 }
3415
3416 /* Advance to the next entry in the snapshot context */
3417
3418 index++;
3419 }
3420 dout("%s: done\n", __func__);
3421
3422 return 0;
3423 }
3424
3425 /*
3426 * Scan the list of snapshots and register the devices for any that
3427 * have not already been registered.
3428 */
3429 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3430 {
3431 struct rbd_snap *snap;
3432 int ret = 0;
3433
3434 dout("%s:\n", __func__);
3435 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3436 return -EIO;
3437
3438 list_for_each_entry(snap, &rbd_dev->snaps, node) {
3439 if (!rbd_snap_registered(snap)) {
3440 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3441 if (ret < 0)
3442 break;
3443 }
3444 }
3445 dout("%s: returning %d\n", __func__, ret);
3446
3447 return ret;
3448 }
3449
3450 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3451 {
3452 struct device *dev;
3453 int ret;
3454
3455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3456
3457 dev = &rbd_dev->dev;
3458 dev->bus = &rbd_bus_type;
3459 dev->type = &rbd_device_type;
3460 dev->parent = &rbd_root_dev;
3461 dev->release = rbd_dev_release;
3462 dev_set_name(dev, "%d", rbd_dev->dev_id);
3463 ret = device_register(dev);
3464
3465 mutex_unlock(&ctl_mutex);
3466
3467 return ret;
3468 }
3469
3470 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3471 {
3472 device_unregister(&rbd_dev->dev);
3473 }
3474
3475 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3476
3477 /*
3478 * Get a unique rbd identifier for the given new rbd_dev, and add
3479 * the rbd_dev to the global list. The minimum rbd id is 1.
3480 */
3481 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3482 {
3483 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3484
3485 spin_lock(&rbd_dev_list_lock);
3486 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3487 spin_unlock(&rbd_dev_list_lock);
3488 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3489 (unsigned long long) rbd_dev->dev_id);
3490 }
3491
3492 /*
3493 * Remove an rbd_dev from the global list, and record that its
3494 * identifier is no longer in use.
3495 */
3496 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3497 {
3498 struct list_head *tmp;
3499 int rbd_id = rbd_dev->dev_id;
3500 int max_id;
3501
3502 rbd_assert(rbd_id > 0);
3503
3504 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3505 (unsigned long long) rbd_dev->dev_id);
3506 spin_lock(&rbd_dev_list_lock);
3507 list_del_init(&rbd_dev->node);
3508
3509 /*
3510 * If the id being "put" is not the current maximum, there
3511 * is nothing special we need to do.
3512 */
3513 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3514 spin_unlock(&rbd_dev_list_lock);
3515 return;
3516 }
3517
3518 /*
3519 * We need to update the current maximum id. Search the
3520 * list to find out what it is. We're more likely to find
3521 * the maximum at the end, so search the list backward.
3522 */
3523 max_id = 0;
3524 list_for_each_prev(tmp, &rbd_dev_list) {
3525 struct rbd_device *rbd_dev;
3526
3527 rbd_dev = list_entry(tmp, struct rbd_device, node);
3528 if (rbd_dev->dev_id > max_id)
3529 max_id = rbd_dev->dev_id;
3530 }
3531 spin_unlock(&rbd_dev_list_lock);
3532
3533 /*
3534 * The max id could have been updated by rbd_dev_id_get(), in
3535 * which case it now accurately reflects the new maximum.
3536 * Be careful not to overwrite the maximum value in that
3537 * case.
3538 */
3539 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3540 dout(" max dev id has been reset\n");
3541 }
3542
3543 /*
3544 * Skips over white space at *buf, and updates *buf to point to the
3545 * first found non-space character (if any). Returns the length of
3546 * the token (string of non-white space characters) found. Note
3547 * that *buf must be terminated with '\0'.
3548 */
3549 static inline size_t next_token(const char **buf)
3550 {
3551 /*
3552 * These are the characters that produce nonzero for
3553 * isspace() in the "C" and "POSIX" locales.
3554 */
3555 const char *spaces = " \f\n\r\t\v";
3556
3557 *buf += strspn(*buf, spaces); /* Find start of token */
3558
3559 return strcspn(*buf, spaces); /* Return token length */
3560 }
3561
3562 /*
3563 * Finds the next token in *buf, and if the provided token buffer is
3564 * big enough, copies the found token into it. The result, if
3565 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3566 * must be terminated with '\0' on entry.
3567 *
3568 * Returns the length of the token found (not including the '\0').
3569 * Return value will be 0 if no token is found, and it will be >=
3570 * token_size if the token would not fit.
3571 *
3572 * The *buf pointer will be updated to point beyond the end of the
3573 * found token. Note that this occurs even if the token buffer is
3574 * too small to hold it.
3575 */
3576 static inline size_t copy_token(const char **buf,
3577 char *token,
3578 size_t token_size)
3579 {
3580 size_t len;
3581
3582 len = next_token(buf);
3583 if (len < token_size) {
3584 memcpy(token, *buf, len);
3585 *(token + len) = '\0';
3586 }
3587 *buf += len;
3588
3589 return len;
3590 }
3591
3592 /*
3593 * Finds the next token in *buf, dynamically allocates a buffer big
3594 * enough to hold a copy of it, and copies the token into the new
3595 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3596 * that a duplicate buffer is created even for a zero-length token.
3597 *
3598 * Returns a pointer to the newly-allocated duplicate, or a null
3599 * pointer if memory for the duplicate was not available. If
3600 * the lenp argument is a non-null pointer, the length of the token
3601 * (not including the '\0') is returned in *lenp.
3602 *
3603 * If successful, the *buf pointer will be updated to point beyond
3604 * the end of the found token.
3605 *
3606 * Note: uses GFP_KERNEL for allocation.
3607 */
3608 static inline char *dup_token(const char **buf, size_t *lenp)
3609 {
3610 char *dup;
3611 size_t len;
3612
3613 len = next_token(buf);
3614 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3615 if (!dup)
3616 return NULL;
3617 *(dup + len) = '\0';
3618 *buf += len;
3619
3620 if (lenp)
3621 *lenp = len;
3622
3623 return dup;
3624 }
3625
3626 /*
3627 * Parse the options provided for an "rbd add" (i.e., rbd image
3628 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3629 * and the data written is passed here via a NUL-terminated buffer.
3630 * Returns 0 if successful or an error code otherwise.
3631 *
3632 * The information extracted from these options is recorded in
3633 * the other parameters which return dynamically-allocated
3634 * structures:
3635 * ceph_opts
3636 * The address of a pointer that will refer to a ceph options
3637 * structure. Caller must release the returned pointer using
3638 * ceph_destroy_options() when it is no longer needed.
3639 * rbd_opts
3640 * Address of an rbd options pointer. Fully initialized by
3641 * this function; caller must release with kfree().
3642 * spec
3643 * Address of an rbd image specification pointer. Fully
3644 * initialized by this function based on parsed options.
3645 * Caller must release with rbd_spec_put().
3646 *
3647 * The options passed take this form:
3648 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3649 * where:
3650 * <mon_addrs>
3651 * A comma-separated list of one or more monitor addresses.
3652 * A monitor address is an ip address, optionally followed
3653 * by a port number (separated by a colon).
3654 * I.e.: ip1[:port1][,ip2[:port2]...]
3655 * <options>
3656 * A comma-separated list of ceph and/or rbd options.
3657 * <pool_name>
3658 * The name of the rados pool containing the rbd image.
3659 * <image_name>
3660 * The name of the image in that pool to map.
3661 * <snap_id>
3662 * An optional snapshot id. If provided, the mapping will
3663 * present data from the image at the time that snapshot was
3664 * created. The image head is used if no snapshot id is
3665 * provided. Snapshot mappings are always read-only.
3666 */
3667 static int rbd_add_parse_args(const char *buf,
3668 struct ceph_options **ceph_opts,
3669 struct rbd_options **opts,
3670 struct rbd_spec **rbd_spec)
3671 {
3672 size_t len;
3673 char *options;
3674 const char *mon_addrs;
3675 size_t mon_addrs_size;
3676 struct rbd_spec *spec = NULL;
3677 struct rbd_options *rbd_opts = NULL;
3678 struct ceph_options *copts;
3679 int ret;
3680
3681 /* The first four tokens are required */
3682
3683 len = next_token(&buf);
3684 if (!len) {
3685 rbd_warn(NULL, "no monitor address(es) provided");
3686 return -EINVAL;
3687 }
3688 mon_addrs = buf;
3689 mon_addrs_size = len + 1;
3690 buf += len;
3691
3692 ret = -EINVAL;
3693 options = dup_token(&buf, NULL);
3694 if (!options)
3695 return -ENOMEM;
3696 if (!*options) {
3697 rbd_warn(NULL, "no options provided");
3698 goto out_err;
3699 }
3700
3701 spec = rbd_spec_alloc();
3702 if (!spec)
3703 goto out_mem;
3704
3705 spec->pool_name = dup_token(&buf, NULL);
3706 if (!spec->pool_name)
3707 goto out_mem;
3708 if (!*spec->pool_name) {
3709 rbd_warn(NULL, "no pool name provided");
3710 goto out_err;
3711 }
3712
3713 spec->image_name = dup_token(&buf, NULL);
3714 if (!spec->image_name)
3715 goto out_mem;
3716 if (!*spec->image_name) {
3717 rbd_warn(NULL, "no image name provided");
3718 goto out_err;
3719 }
3720
3721 /*
3722 * Snapshot name is optional; default is to use "-"
3723 * (indicating the head/no snapshot).
3724 */
3725 len = next_token(&buf);
3726 if (!len) {
3727 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3728 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3729 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3730 ret = -ENAMETOOLONG;
3731 goto out_err;
3732 }
3733 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3734 if (!spec->snap_name)
3735 goto out_mem;
3736 *(spec->snap_name + len) = '\0';
3737
3738 /* Initialize all rbd options to the defaults */
3739
3740 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3741 if (!rbd_opts)
3742 goto out_mem;
3743
3744 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3745
3746 copts = ceph_parse_options(options, mon_addrs,
3747 mon_addrs + mon_addrs_size - 1,
3748 parse_rbd_opts_token, rbd_opts);
3749 if (IS_ERR(copts)) {
3750 ret = PTR_ERR(copts);
3751 goto out_err;
3752 }
3753 kfree(options);
3754
3755 *ceph_opts = copts;
3756 *opts = rbd_opts;
3757 *rbd_spec = spec;
3758
3759 return 0;
3760 out_mem:
3761 ret = -ENOMEM;
3762 out_err:
3763 kfree(rbd_opts);
3764 rbd_spec_put(spec);
3765 kfree(options);
3766
3767 return ret;
3768 }
3769
3770 /*
3771 * An rbd format 2 image has a unique identifier, distinct from the
3772 * name given to it by the user. Internally, that identifier is
3773 * what's used to specify the names of objects related to the image.
3774 *
3775 * A special "rbd id" object is used to map an rbd image name to its
3776 * id. If that object doesn't exist, then there is no v2 rbd image
3777 * with the supplied name.
3778 *
3779 * This function will record the given rbd_dev's image_id field if
3780 * it can be determined, and in that case will return 0. If any
3781 * errors occur a negative errno will be returned and the rbd_dev's
3782 * image_id field will be unchanged (and should be NULL).
3783 */
3784 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3785 {
3786 int ret;
3787 size_t size;
3788 char *object_name;
3789 void *response;
3790 void *p;
3791
3792 /*
3793 * When probing a parent image, the image id is already
3794 * known (and the image name likely is not). There's no
3795 * need to fetch the image id again in this case.
3796 */
3797 if (rbd_dev->spec->image_id)
3798 return 0;
3799
3800 /*
3801 * First, see if the format 2 image id file exists, and if
3802 * so, get the image's persistent id from it.
3803 */
3804 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3805 object_name = kmalloc(size, GFP_NOIO);
3806 if (!object_name)
3807 return -ENOMEM;
3808 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3809 dout("rbd id object name is %s\n", object_name);
3810
3811 /* Response will be an encoded string, which includes a length */
3812
3813 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3814 response = kzalloc(size, GFP_NOIO);
3815 if (!response) {
3816 ret = -ENOMEM;
3817 goto out;
3818 }
3819
3820 ret = rbd_obj_method_sync(rbd_dev, object_name,
3821 "rbd", "get_id",
3822 NULL, 0,
3823 response, RBD_IMAGE_ID_LEN_MAX, NULL);
3824 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3825 if (ret < 0)
3826 goto out;
3827
3828 p = response;
3829 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3830 p + RBD_IMAGE_ID_LEN_MAX,
3831 NULL, GFP_NOIO);
3832 if (IS_ERR(rbd_dev->spec->image_id)) {
3833 ret = PTR_ERR(rbd_dev->spec->image_id);
3834 rbd_dev->spec->image_id = NULL;
3835 } else {
3836 dout("image_id is %s\n", rbd_dev->spec->image_id);
3837 }
3838 out:
3839 kfree(response);
3840 kfree(object_name);
3841
3842 return ret;
3843 }
3844
3845 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3846 {
3847 int ret;
3848 size_t size;
3849
3850 /* Version 1 images have no id; empty string is used */
3851
3852 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3853 if (!rbd_dev->spec->image_id)
3854 return -ENOMEM;
3855
3856 /* Record the header object name for this rbd image. */
3857
3858 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3859 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3860 if (!rbd_dev->header_name) {
3861 ret = -ENOMEM;
3862 goto out_err;
3863 }
3864 sprintf(rbd_dev->header_name, "%s%s",
3865 rbd_dev->spec->image_name, RBD_SUFFIX);
3866
3867 /* Populate rbd image metadata */
3868
3869 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3870 if (ret < 0)
3871 goto out_err;
3872
3873 /* Version 1 images have no parent (no layering) */
3874
3875 rbd_dev->parent_spec = NULL;
3876 rbd_dev->parent_overlap = 0;
3877
3878 rbd_dev->image_format = 1;
3879
3880 dout("discovered version 1 image, header name is %s\n",
3881 rbd_dev->header_name);
3882
3883 return 0;
3884
3885 out_err:
3886 kfree(rbd_dev->header_name);
3887 rbd_dev->header_name = NULL;
3888 kfree(rbd_dev->spec->image_id);
3889 rbd_dev->spec->image_id = NULL;
3890
3891 return ret;
3892 }
3893
3894 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3895 {
3896 size_t size;
3897 int ret;
3898 u64 ver = 0;
3899
3900 /*
3901 * Image id was filled in by the caller. Record the header
3902 * object name for this rbd image.
3903 */
3904 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3905 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3906 if (!rbd_dev->header_name)
3907 return -ENOMEM;
3908 sprintf(rbd_dev->header_name, "%s%s",
3909 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3910
3911 /* Get the size and object order for the image */
3912
3913 ret = rbd_dev_v2_image_size(rbd_dev);
3914 if (ret < 0)
3915 goto out_err;
3916
3917 /* Get the object prefix (a.k.a. block_name) for the image */
3918
3919 ret = rbd_dev_v2_object_prefix(rbd_dev);
3920 if (ret < 0)
3921 goto out_err;
3922
3923 /* Get the and check features for the image */
3924
3925 ret = rbd_dev_v2_features(rbd_dev);
3926 if (ret < 0)
3927 goto out_err;
3928
3929 /* If the image supports layering, get the parent info */
3930
3931 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3932 ret = rbd_dev_v2_parent_info(rbd_dev);
3933 if (ret < 0)
3934 goto out_err;
3935 }
3936
3937 /* crypto and compression type aren't (yet) supported for v2 images */
3938
3939 rbd_dev->header.crypt_type = 0;
3940 rbd_dev->header.comp_type = 0;
3941
3942 /* Get the snapshot context, plus the header version */
3943
3944 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3945 if (ret)
3946 goto out_err;
3947 rbd_dev->header.obj_version = ver;
3948
3949 rbd_dev->image_format = 2;
3950
3951 dout("discovered version 2 image, header name is %s\n",
3952 rbd_dev->header_name);
3953
3954 return 0;
3955 out_err:
3956 rbd_dev->parent_overlap = 0;
3957 rbd_spec_put(rbd_dev->parent_spec);
3958 rbd_dev->parent_spec = NULL;
3959 kfree(rbd_dev->header_name);
3960 rbd_dev->header_name = NULL;
3961 kfree(rbd_dev->header.object_prefix);
3962 rbd_dev->header.object_prefix = NULL;
3963
3964 return ret;
3965 }
3966
3967 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3968 {
3969 int ret;
3970
3971 /* no need to lock here, as rbd_dev is not registered yet */
3972 ret = rbd_dev_snaps_update(rbd_dev);
3973 if (ret)
3974 return ret;
3975
3976 ret = rbd_dev_probe_update_spec(rbd_dev);
3977 if (ret)
3978 goto err_out_snaps;
3979
3980 ret = rbd_dev_set_mapping(rbd_dev);
3981 if (ret)
3982 goto err_out_snaps;
3983
3984 /* generate unique id: find highest unique id, add one */
3985 rbd_dev_id_get(rbd_dev);
3986
3987 /* Fill in the device name, now that we have its id. */
3988 BUILD_BUG_ON(DEV_NAME_LEN
3989 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3990 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3991
3992 /* Get our block major device number. */
3993
3994 ret = register_blkdev(0, rbd_dev->name);
3995 if (ret < 0)
3996 goto err_out_id;
3997 rbd_dev->major = ret;
3998
3999 /* Set up the blkdev mapping. */
4000
4001 ret = rbd_init_disk(rbd_dev);
4002 if (ret)
4003 goto err_out_blkdev;
4004
4005 ret = rbd_bus_add_dev(rbd_dev);
4006 if (ret)
4007 goto err_out_disk;
4008
4009 /*
4010 * At this point cleanup in the event of an error is the job
4011 * of the sysfs code (initiated by rbd_bus_del_dev()).
4012 */
4013 down_write(&rbd_dev->header_rwsem);
4014 ret = rbd_dev_snaps_register(rbd_dev);
4015 up_write(&rbd_dev->header_rwsem);
4016 if (ret)
4017 goto err_out_bus;
4018
4019 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4020 if (ret)
4021 goto err_out_bus;
4022
4023 /* Everything's ready. Announce the disk to the world. */
4024
4025 add_disk(rbd_dev->disk);
4026
4027 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4028 (unsigned long long) rbd_dev->mapping.size);
4029
4030 return ret;
4031 err_out_bus:
4032 /* this will also clean up rest of rbd_dev stuff */
4033
4034 rbd_bus_del_dev(rbd_dev);
4035
4036 return ret;
4037 err_out_disk:
4038 rbd_free_disk(rbd_dev);
4039 err_out_blkdev:
4040 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4041 err_out_id:
4042 rbd_dev_id_put(rbd_dev);
4043 err_out_snaps:
4044 rbd_remove_all_snaps(rbd_dev);
4045
4046 return ret;
4047 }
4048
4049 /*
4050 * Probe for the existence of the header object for the given rbd
4051 * device. For format 2 images this includes determining the image
4052 * id.
4053 */
4054 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4055 {
4056 int ret;
4057
4058 /*
4059 * Get the id from the image id object. If it's not a
4060 * format 2 image, we'll get ENOENT back, and we'll assume
4061 * it's a format 1 image.
4062 */
4063 ret = rbd_dev_image_id(rbd_dev);
4064 if (ret)
4065 ret = rbd_dev_v1_probe(rbd_dev);
4066 else
4067 ret = rbd_dev_v2_probe(rbd_dev);
4068 if (ret) {
4069 dout("probe failed, returning %d\n", ret);
4070
4071 return ret;
4072 }
4073
4074 ret = rbd_dev_probe_finish(rbd_dev);
4075 if (ret)
4076 rbd_header_free(&rbd_dev->header);
4077
4078 return ret;
4079 }
4080
4081 static ssize_t rbd_add(struct bus_type *bus,
4082 const char *buf,
4083 size_t count)
4084 {
4085 struct rbd_device *rbd_dev = NULL;
4086 struct ceph_options *ceph_opts = NULL;
4087 struct rbd_options *rbd_opts = NULL;
4088 struct rbd_spec *spec = NULL;
4089 struct rbd_client *rbdc;
4090 struct ceph_osd_client *osdc;
4091 int rc = -ENOMEM;
4092
4093 if (!try_module_get(THIS_MODULE))
4094 return -ENODEV;
4095
4096 /* parse add command */
4097 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4098 if (rc < 0)
4099 goto err_out_module;
4100
4101 rbdc = rbd_get_client(ceph_opts);
4102 if (IS_ERR(rbdc)) {
4103 rc = PTR_ERR(rbdc);
4104 goto err_out_args;
4105 }
4106 ceph_opts = NULL; /* rbd_dev client now owns this */
4107
4108 /* pick the pool */
4109 osdc = &rbdc->client->osdc;
4110 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4111 if (rc < 0)
4112 goto err_out_client;
4113 spec->pool_id = (u64) rc;
4114
4115 /* The ceph file layout needs to fit pool id in 32 bits */
4116
4117 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4118 rc = -EIO;
4119 goto err_out_client;
4120 }
4121
4122 rbd_dev = rbd_dev_create(rbdc, spec);
4123 if (!rbd_dev)
4124 goto err_out_client;
4125 rbdc = NULL; /* rbd_dev now owns this */
4126 spec = NULL; /* rbd_dev now owns this */
4127
4128 rbd_dev->mapping.read_only = rbd_opts->read_only;
4129 kfree(rbd_opts);
4130 rbd_opts = NULL; /* done with this */
4131
4132 rc = rbd_dev_probe(rbd_dev);
4133 if (rc < 0)
4134 goto err_out_rbd_dev;
4135
4136 return count;
4137 err_out_rbd_dev:
4138 rbd_dev_destroy(rbd_dev);
4139 err_out_client:
4140 rbd_put_client(rbdc);
4141 err_out_args:
4142 if (ceph_opts)
4143 ceph_destroy_options(ceph_opts);
4144 kfree(rbd_opts);
4145 rbd_spec_put(spec);
4146 err_out_module:
4147 module_put(THIS_MODULE);
4148
4149 dout("Error adding device %s\n", buf);
4150
4151 return (ssize_t) rc;
4152 }
4153
4154 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4155 {
4156 struct list_head *tmp;
4157 struct rbd_device *rbd_dev;
4158
4159 spin_lock(&rbd_dev_list_lock);
4160 list_for_each(tmp, &rbd_dev_list) {
4161 rbd_dev = list_entry(tmp, struct rbd_device, node);
4162 if (rbd_dev->dev_id == dev_id) {
4163 spin_unlock(&rbd_dev_list_lock);
4164 return rbd_dev;
4165 }
4166 }
4167 spin_unlock(&rbd_dev_list_lock);
4168 return NULL;
4169 }
4170
4171 static void rbd_dev_release(struct device *dev)
4172 {
4173 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4174
4175 if (rbd_dev->watch_event)
4176 rbd_dev_header_watch_sync(rbd_dev, 0);
4177
4178 /* clean up and free blkdev */
4179 rbd_free_disk(rbd_dev);
4180 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4181
4182 /* release allocated disk header fields */
4183 rbd_header_free(&rbd_dev->header);
4184
4185 /* done with the id, and with the rbd_dev */
4186 rbd_dev_id_put(rbd_dev);
4187 rbd_assert(rbd_dev->rbd_client != NULL);
4188 rbd_dev_destroy(rbd_dev);
4189
4190 /* release module ref */
4191 module_put(THIS_MODULE);
4192 }
4193
4194 static ssize_t rbd_remove(struct bus_type *bus,
4195 const char *buf,
4196 size_t count)
4197 {
4198 struct rbd_device *rbd_dev = NULL;
4199 int target_id, rc;
4200 unsigned long ul;
4201 int ret = count;
4202
4203 rc = strict_strtoul(buf, 10, &ul);
4204 if (rc)
4205 return rc;
4206
4207 /* convert to int; abort if we lost anything in the conversion */
4208 target_id = (int) ul;
4209 if (target_id != ul)
4210 return -EINVAL;
4211
4212 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4213
4214 rbd_dev = __rbd_get_dev(target_id);
4215 if (!rbd_dev) {
4216 ret = -ENOENT;
4217 goto done;
4218 }
4219
4220 spin_lock_irq(&rbd_dev->lock);
4221 if (rbd_dev->open_count)
4222 ret = -EBUSY;
4223 else
4224 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4225 spin_unlock_irq(&rbd_dev->lock);
4226 if (ret < 0)
4227 goto done;
4228
4229 rbd_remove_all_snaps(rbd_dev);
4230 rbd_bus_del_dev(rbd_dev);
4231
4232 done:
4233 mutex_unlock(&ctl_mutex);
4234
4235 return ret;
4236 }
4237
4238 /*
4239 * create control files in sysfs
4240 * /sys/bus/rbd/...
4241 */
4242 static int rbd_sysfs_init(void)
4243 {
4244 int ret;
4245
4246 ret = device_register(&rbd_root_dev);
4247 if (ret < 0)
4248 return ret;
4249
4250 ret = bus_register(&rbd_bus_type);
4251 if (ret < 0)
4252 device_unregister(&rbd_root_dev);
4253
4254 return ret;
4255 }
4256
4257 static void rbd_sysfs_cleanup(void)
4258 {
4259 bus_unregister(&rbd_bus_type);
4260 device_unregister(&rbd_root_dev);
4261 }
4262
4263 static int __init rbd_init(void)
4264 {
4265 int rc;
4266
4267 if (!libceph_compatible(NULL)) {
4268 rbd_warn(NULL, "libceph incompatibility (quitting)");
4269
4270 return -EINVAL;
4271 }
4272 rc = rbd_sysfs_init();
4273 if (rc)
4274 return rc;
4275 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4276 return 0;
4277 }
4278
4279 static void __exit rbd_exit(void)
4280 {
4281 rbd_sysfs_cleanup();
4282 }
4283
4284 module_init(rbd_init);
4285 module_exit(rbd_exit);
4286
4287 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4288 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4289 MODULE_DESCRIPTION("rados block device");
4290
4291 /* following authorship retained from original osdblk.c */
4292 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4293
4294 MODULE_LICENSE("GPL");