Merge tag 'v3.10.66' into update
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / drivers / block / rbd.c
1
2 /*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25 For usage instructions, please refer to:
26
27 Documentation/ABI/testing/sysfs-bus-rbd
28
29 */
30
31 #include <linux/ceph/libceph.h>
32 #include <linux/ceph/osd_client.h>
33 #include <linux/ceph/mon_client.h>
34 #include <linux/ceph/decode.h>
35 #include <linux/parser.h>
36 #include <linux/bsearch.h>
37
38 #include <linux/kernel.h>
39 #include <linux/device.h>
40 #include <linux/module.h>
41 #include <linux/fs.h>
42 #include <linux/blkdev.h>
43 #include <linux/slab.h>
44
45 #include "rbd_types.h"
46
47 #define RBD_DEBUG /* Activate rbd_assert() calls */
48
49 /*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55 #define SECTOR_SHIFT 9
56 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
58 /*
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
63 */
64 static int atomic_inc_return_safe(atomic_t *v)
65 {
66 unsigned int counter;
67
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
70 return (int)counter;
71
72 atomic_dec(v);
73
74 return -EINVAL;
75 }
76
77 /* Decrement the counter. Return the resulting value, or -EINVAL */
78 static int atomic_dec_return_safe(atomic_t *v)
79 {
80 int counter;
81
82 counter = atomic_dec_return(v);
83 if (counter >= 0)
84 return counter;
85
86 atomic_inc(v);
87
88 return -EINVAL;
89 }
90
91 #define RBD_DRV_NAME "rbd"
92 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
93
94 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95
96 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97 #define RBD_MAX_SNAP_NAME_LEN \
98 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
100 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
101
102 #define RBD_SNAP_HEAD_NAME "-"
103
104 #define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105
106 /* This allows a single page to hold an image name sent by OSD */
107 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
108 #define RBD_IMAGE_ID_LEN_MAX 64
109
110 #define RBD_OBJ_PREFIX_LEN_MAX 64
111
112 /* Feature bits */
113
114 #define RBD_FEATURE_LAYERING (1<<0)
115 #define RBD_FEATURE_STRIPINGV2 (1<<1)
116 #define RBD_FEATURES_ALL \
117 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
118
119 /* Features supported by this (client software) implementation. */
120
121 #define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
122
123 /*
124 * An RBD device name will be "rbd#", where the "rbd" comes from
125 * RBD_DRV_NAME above, and # is a unique integer identifier.
126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127 * enough to hold all possible device names.
128 */
129 #define DEV_NAME_LEN 32
130 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
131
132 /*
133 * block device image metadata (in-memory version)
134 */
135 struct rbd_image_header {
136 /* These six fields never change for a given rbd image */
137 char *object_prefix;
138 __u8 obj_order;
139 __u8 crypt_type;
140 __u8 comp_type;
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
144
145 /* The remaining fields need to be updated occasionally */
146 u64 image_size;
147 struct ceph_snap_context *snapc;
148 char *snap_names; /* format 1 only */
149 u64 *snap_sizes; /* format 1 only */
150 };
151
152 /*
153 * An rbd image specification.
154 *
155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
156 * identify an image. Each rbd_dev structure includes a pointer to
157 * an rbd_spec structure that encapsulates this identity.
158 *
159 * Each of the id's in an rbd_spec has an associated name. For a
160 * user-mapped image, the names are supplied and the id's associated
161 * with them are looked up. For a layered image, a parent image is
162 * defined by the tuple, and the names are looked up.
163 *
164 * An rbd_dev structure contains a parent_spec pointer which is
165 * non-null if the image it represents is a child in a layered
166 * image. This pointer will refer to the rbd_spec structure used
167 * by the parent rbd_dev for its own identity (i.e., the structure
168 * is shared between the parent and child).
169 *
170 * Since these structures are populated once, during the discovery
171 * phase of image construction, they are effectively immutable so
172 * we make no effort to synchronize access to them.
173 *
174 * Note that code herein does not assume the image name is known (it
175 * could be a null pointer).
176 */
177 struct rbd_spec {
178 u64 pool_id;
179 const char *pool_name;
180
181 const char *image_id;
182 const char *image_name;
183
184 u64 snap_id;
185 const char *snap_name;
186
187 struct kref kref;
188 };
189
190 /*
191 * an instance of the client. multiple devices may share an rbd client.
192 */
193 struct rbd_client {
194 struct ceph_client *client;
195 struct kref kref;
196 struct list_head node;
197 };
198
199 struct rbd_img_request;
200 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203
204 struct rbd_obj_request;
205 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
207 enum obj_request_type {
208 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209 };
210
211 enum obj_req_flags {
212 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
213 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
214 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
215 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
216 };
217
218 struct rbd_obj_request {
219 const char *object_name;
220 u64 offset; /* object start byte */
221 u64 length; /* bytes from offset */
222 unsigned long flags;
223
224 /*
225 * An object request associated with an image will have its
226 * img_data flag set; a standalone object request will not.
227 *
228 * A standalone object request will have which == BAD_WHICH
229 * and a null obj_request pointer.
230 *
231 * An object request initiated in support of a layered image
232 * object (to check for its existence before a write) will
233 * have which == BAD_WHICH and a non-null obj_request pointer.
234 *
235 * Finally, an object request for rbd image data will have
236 * which != BAD_WHICH, and will have a non-null img_request
237 * pointer. The value of which will be in the range
238 * 0..(img_request->obj_request_count-1).
239 */
240 union {
241 struct rbd_obj_request *obj_request; /* STAT op */
242 struct {
243 struct rbd_img_request *img_request;
244 u64 img_offset;
245 /* links for img_request->obj_requests list */
246 struct list_head links;
247 };
248 };
249 u32 which; /* posn image request list */
250
251 enum obj_request_type type;
252 union {
253 struct bio *bio_list;
254 struct {
255 struct page **pages;
256 u32 page_count;
257 };
258 };
259 struct page **copyup_pages;
260 u32 copyup_page_count;
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
265 int result;
266
267 rbd_obj_callback_t callback;
268 struct completion completion;
269
270 struct kref kref;
271 };
272
273 enum img_req_flags {
274 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
275 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
276 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
277 };
278
279 struct rbd_img_request {
280 struct rbd_device *rbd_dev;
281 u64 offset; /* starting image byte offset */
282 u64 length; /* byte count from offset */
283 unsigned long flags;
284 union {
285 u64 snap_id; /* for reads */
286 struct ceph_snap_context *snapc; /* for writes */
287 };
288 union {
289 struct request *rq; /* block request */
290 struct rbd_obj_request *obj_request; /* obj req initiator */
291 };
292 struct page **copyup_pages;
293 u32 copyup_page_count;
294 spinlock_t completion_lock;/* protects next_completion */
295 u32 next_completion;
296 rbd_img_callback_t callback;
297 u64 xferred;/* aggregate bytes transferred */
298 int result; /* first nonzero obj_request result */
299
300 u32 obj_request_count;
301 struct list_head obj_requests; /* rbd_obj_request structs */
302
303 struct kref kref;
304 };
305
306 #define for_each_obj_request(ireq, oreq) \
307 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
308 #define for_each_obj_request_from(ireq, oreq) \
309 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
310 #define for_each_obj_request_safe(ireq, oreq, n) \
311 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
312
313 struct rbd_mapping {
314 u64 size;
315 u64 features;
316 bool read_only;
317 };
318
319 /*
320 * a single device
321 */
322 struct rbd_device {
323 int dev_id; /* blkdev unique id */
324
325 int major; /* blkdev assigned major */
326 struct gendisk *disk; /* blkdev's gendisk and rq */
327
328 u32 image_format; /* Either 1 or 2 */
329 struct rbd_client *rbd_client;
330
331 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
333 spinlock_t lock; /* queue, flags, open_count */
334
335 struct rbd_image_header header;
336 unsigned long flags; /* possibly lock protected */
337 struct rbd_spec *spec;
338
339 char *header_name;
340
341 struct ceph_file_layout layout;
342
343 struct ceph_osd_event *watch_event;
344 struct rbd_obj_request *watch_request;
345
346 struct rbd_spec *parent_spec;
347 u64 parent_overlap;
348 atomic_t parent_ref;
349 struct rbd_device *parent;
350
351 /* protects updating the header */
352 struct rw_semaphore header_rwsem;
353
354 struct rbd_mapping mapping;
355
356 struct list_head node;
357
358 /* sysfs related */
359 struct device dev;
360 unsigned long open_count; /* protected by lock */
361 };
362
363 /*
364 * Flag bits for rbd_dev->flags. If atomicity is required,
365 * rbd_dev->lock is used to protect access.
366 *
367 * Currently, only the "removing" flag (which is coupled with the
368 * "open_count" field) requires atomic access.
369 */
370 enum rbd_dev_flags {
371 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
372 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
373 };
374
375 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
376
377 static LIST_HEAD(rbd_dev_list); /* devices */
378 static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
380 static LIST_HEAD(rbd_client_list); /* clients */
381 static DEFINE_SPINLOCK(rbd_client_list_lock);
382
383 /* Slab caches for frequently-allocated structures */
384
385 static struct kmem_cache *rbd_img_request_cache;
386 static struct kmem_cache *rbd_obj_request_cache;
387 static struct kmem_cache *rbd_segment_name_cache;
388
389 static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
391 static void rbd_dev_device_release(struct device *dev);
392
393 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394 size_t count);
395 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396 size_t count);
397 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
398 static void rbd_spec_put(struct rbd_spec *spec);
399
400 static struct bus_attribute rbd_bus_attrs[] = {
401 __ATTR(add, S_IWUSR, NULL, rbd_add),
402 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403 __ATTR_NULL
404 };
405
406 static struct bus_type rbd_bus_type = {
407 .name = "rbd",
408 .bus_attrs = rbd_bus_attrs,
409 };
410
411 static void rbd_root_dev_release(struct device *dev)
412 {
413 }
414
415 static struct device rbd_root_dev = {
416 .init_name = "rbd",
417 .release = rbd_root_dev_release,
418 };
419
420 static __printf(2, 3)
421 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422 {
423 struct va_format vaf;
424 va_list args;
425
426 va_start(args, fmt);
427 vaf.fmt = fmt;
428 vaf.va = &args;
429
430 if (!rbd_dev)
431 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432 else if (rbd_dev->disk)
433 printk(KERN_WARNING "%s: %s: %pV\n",
434 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435 else if (rbd_dev->spec && rbd_dev->spec->image_name)
436 printk(KERN_WARNING "%s: image %s: %pV\n",
437 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438 else if (rbd_dev->spec && rbd_dev->spec->image_id)
439 printk(KERN_WARNING "%s: id %s: %pV\n",
440 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441 else /* punt */
442 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443 RBD_DRV_NAME, rbd_dev, &vaf);
444 va_end(args);
445 }
446
447 #ifdef RBD_DEBUG
448 #define rbd_assert(expr) \
449 if (unlikely(!(expr))) { \
450 printk(KERN_ERR "\nAssertion failure in %s() " \
451 "at line %d:\n\n" \
452 "\trbd_assert(%s);\n\n", \
453 __func__, __LINE__, #expr); \
454 BUG(); \
455 }
456 #else /* !RBD_DEBUG */
457 # define rbd_assert(expr) ((void) 0)
458 #endif /* !RBD_DEBUG */
459
460 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
461 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
463
464 static int rbd_dev_refresh(struct rbd_device *rbd_dev);
465 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
467 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468 u64 snap_id);
469 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470 u8 *order, u64 *snap_size);
471 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472 u64 *snap_features);
473 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
474
475 static int rbd_open(struct block_device *bdev, fmode_t mode)
476 {
477 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
478 bool removing = false;
479
480 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
481 return -EROFS;
482
483 spin_lock_irq(&rbd_dev->lock);
484 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485 removing = true;
486 else
487 rbd_dev->open_count++;
488 spin_unlock_irq(&rbd_dev->lock);
489 if (removing)
490 return -ENOENT;
491
492 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
493 (void) get_device(&rbd_dev->dev);
494 set_device_ro(bdev, rbd_dev->mapping.read_only);
495 mutex_unlock(&ctl_mutex);
496
497 return 0;
498 }
499
500 static void rbd_release(struct gendisk *disk, fmode_t mode)
501 {
502 struct rbd_device *rbd_dev = disk->private_data;
503 unsigned long open_count_before;
504
505 spin_lock_irq(&rbd_dev->lock);
506 open_count_before = rbd_dev->open_count--;
507 spin_unlock_irq(&rbd_dev->lock);
508 rbd_assert(open_count_before > 0);
509
510 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
511 put_device(&rbd_dev->dev);
512 mutex_unlock(&ctl_mutex);
513 }
514
515 static const struct block_device_operations rbd_bd_ops = {
516 .owner = THIS_MODULE,
517 .open = rbd_open,
518 .release = rbd_release,
519 };
520
521 /*
522 * Initialize an rbd client instance. Success or not, this function
523 * consumes ceph_opts.
524 */
525 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
526 {
527 struct rbd_client *rbdc;
528 int ret = -ENOMEM;
529
530 dout("%s:\n", __func__);
531 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
532 if (!rbdc)
533 goto out_opt;
534
535 kref_init(&rbdc->kref);
536 INIT_LIST_HEAD(&rbdc->node);
537
538 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
539
540 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
541 if (IS_ERR(rbdc->client))
542 goto out_mutex;
543 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
544
545 ret = ceph_open_session(rbdc->client);
546 if (ret < 0)
547 goto out_err;
548
549 spin_lock(&rbd_client_list_lock);
550 list_add_tail(&rbdc->node, &rbd_client_list);
551 spin_unlock(&rbd_client_list_lock);
552
553 mutex_unlock(&ctl_mutex);
554 dout("%s: rbdc %p\n", __func__, rbdc);
555
556 return rbdc;
557
558 out_err:
559 ceph_destroy_client(rbdc->client);
560 out_mutex:
561 mutex_unlock(&ctl_mutex);
562 kfree(rbdc);
563 out_opt:
564 if (ceph_opts)
565 ceph_destroy_options(ceph_opts);
566 dout("%s: error %d\n", __func__, ret);
567
568 return ERR_PTR(ret);
569 }
570
571 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
572 {
573 kref_get(&rbdc->kref);
574
575 return rbdc;
576 }
577
578 /*
579 * Find a ceph client with specific addr and configuration. If
580 * found, bump its reference count.
581 */
582 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
583 {
584 struct rbd_client *client_node;
585 bool found = false;
586
587 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
588 return NULL;
589
590 spin_lock(&rbd_client_list_lock);
591 list_for_each_entry(client_node, &rbd_client_list, node) {
592 if (!ceph_compare_options(ceph_opts, client_node->client)) {
593 __rbd_get_client(client_node);
594
595 found = true;
596 break;
597 }
598 }
599 spin_unlock(&rbd_client_list_lock);
600
601 return found ? client_node : NULL;
602 }
603
604 /*
605 * mount options
606 */
607 enum {
608 Opt_last_int,
609 /* int args above */
610 Opt_last_string,
611 /* string args above */
612 Opt_read_only,
613 Opt_read_write,
614 /* Boolean args above */
615 Opt_last_bool,
616 };
617
618 static match_table_t rbd_opts_tokens = {
619 /* int args above */
620 /* string args above */
621 {Opt_read_only, "read_only"},
622 {Opt_read_only, "ro"}, /* Alternate spelling */
623 {Opt_read_write, "read_write"},
624 {Opt_read_write, "rw"}, /* Alternate spelling */
625 /* Boolean args above */
626 {-1, NULL}
627 };
628
629 struct rbd_options {
630 bool read_only;
631 };
632
633 #define RBD_READ_ONLY_DEFAULT false
634
635 static int parse_rbd_opts_token(char *c, void *private)
636 {
637 struct rbd_options *rbd_opts = private;
638 substring_t argstr[MAX_OPT_ARGS];
639 int token, intval, ret;
640
641 token = match_token(c, rbd_opts_tokens, argstr);
642 if (token < 0)
643 return -EINVAL;
644
645 if (token < Opt_last_int) {
646 ret = match_int(&argstr[0], &intval);
647 if (ret < 0) {
648 pr_err("bad mount option arg (not int) "
649 "at '%s'\n", c);
650 return ret;
651 }
652 dout("got int token %d val %d\n", token, intval);
653 } else if (token > Opt_last_int && token < Opt_last_string) {
654 dout("got string token %d val %s\n", token,
655 argstr[0].from);
656 } else if (token > Opt_last_string && token < Opt_last_bool) {
657 dout("got Boolean token %d\n", token);
658 } else {
659 dout("got token %d\n", token);
660 }
661
662 switch (token) {
663 case Opt_read_only:
664 rbd_opts->read_only = true;
665 break;
666 case Opt_read_write:
667 rbd_opts->read_only = false;
668 break;
669 default:
670 rbd_assert(false);
671 break;
672 }
673 return 0;
674 }
675
676 /*
677 * Get a ceph client with specific addr and configuration, if one does
678 * not exist create it. Either way, ceph_opts is consumed by this
679 * function.
680 */
681 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
682 {
683 struct rbd_client *rbdc;
684
685 rbdc = rbd_client_find(ceph_opts);
686 if (rbdc) /* using an existing client */
687 ceph_destroy_options(ceph_opts);
688 else
689 rbdc = rbd_client_create(ceph_opts);
690
691 return rbdc;
692 }
693
694 /*
695 * Destroy ceph client
696 *
697 * Caller must hold rbd_client_list_lock.
698 */
699 static void rbd_client_release(struct kref *kref)
700 {
701 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
702
703 dout("%s: rbdc %p\n", __func__, rbdc);
704 spin_lock(&rbd_client_list_lock);
705 list_del(&rbdc->node);
706 spin_unlock(&rbd_client_list_lock);
707
708 ceph_destroy_client(rbdc->client);
709 kfree(rbdc);
710 }
711
712 /*
713 * Drop reference to ceph client node. If it's not referenced anymore, release
714 * it.
715 */
716 static void rbd_put_client(struct rbd_client *rbdc)
717 {
718 if (rbdc)
719 kref_put(&rbdc->kref, rbd_client_release);
720 }
721
722 static bool rbd_image_format_valid(u32 image_format)
723 {
724 return image_format == 1 || image_format == 2;
725 }
726
727 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
728 {
729 size_t size;
730 u32 snap_count;
731
732 /* The header has to start with the magic rbd header text */
733 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
734 return false;
735
736 /* The bio layer requires at least sector-sized I/O */
737
738 if (ondisk->options.order < SECTOR_SHIFT)
739 return false;
740
741 /* If we use u64 in a few spots we may be able to loosen this */
742
743 if (ondisk->options.order > 8 * sizeof (int) - 1)
744 return false;
745
746 /*
747 * The size of a snapshot header has to fit in a size_t, and
748 * that limits the number of snapshots.
749 */
750 snap_count = le32_to_cpu(ondisk->snap_count);
751 size = SIZE_MAX - sizeof (struct ceph_snap_context);
752 if (snap_count > size / sizeof (__le64))
753 return false;
754
755 /*
756 * Not only that, but the size of the entire the snapshot
757 * header must also be representable in a size_t.
758 */
759 size -= snap_count * sizeof (__le64);
760 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
761 return false;
762
763 return true;
764 }
765
766 /*
767 * Fill an rbd image header with information from the given format 1
768 * on-disk header.
769 */
770 static int rbd_header_from_disk(struct rbd_device *rbd_dev,
771 struct rbd_image_header_ondisk *ondisk)
772 {
773 struct rbd_image_header *header = &rbd_dev->header;
774 bool first_time = header->object_prefix == NULL;
775 struct ceph_snap_context *snapc;
776 char *object_prefix = NULL;
777 char *snap_names = NULL;
778 u64 *snap_sizes = NULL;
779 u32 snap_count;
780 size_t size;
781 int ret = -ENOMEM;
782 u32 i;
783
784 /* Allocate this now to avoid having to handle failure below */
785
786 if (first_time) {
787 size_t len;
788
789 len = strnlen(ondisk->object_prefix,
790 sizeof (ondisk->object_prefix));
791 object_prefix = kmalloc(len + 1, GFP_KERNEL);
792 if (!object_prefix)
793 return -ENOMEM;
794 memcpy(object_prefix, ondisk->object_prefix, len);
795 object_prefix[len] = '\0';
796 }
797
798 /* Allocate the snapshot context and fill it in */
799
800 snap_count = le32_to_cpu(ondisk->snap_count);
801 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
802 if (!snapc)
803 goto out_err;
804 snapc->seq = le64_to_cpu(ondisk->snap_seq);
805 if (snap_count) {
806 struct rbd_image_snap_ondisk *snaps;
807 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
808
809 /* We'll keep a copy of the snapshot names... */
810
811 if (snap_names_len > (u64)SIZE_MAX)
812 goto out_2big;
813 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
814 if (!snap_names)
815 goto out_err;
816
817 /* ...as well as the array of their sizes. */
818
819 size = snap_count * sizeof (*header->snap_sizes);
820 snap_sizes = kmalloc(size, GFP_KERNEL);
821 if (!snap_sizes)
822 goto out_err;
823
824 /*
825 * Copy the names, and fill in each snapshot's id
826 * and size.
827 *
828 * Note that rbd_dev_v1_header_info() guarantees the
829 * ondisk buffer we're working with has
830 * snap_names_len bytes beyond the end of the
831 * snapshot id array, this memcpy() is safe.
832 */
833 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
834 snaps = ondisk->snaps;
835 for (i = 0; i < snap_count; i++) {
836 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
837 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
838 }
839 }
840
841 /* We won't fail any more, fill in the header */
842
843 down_write(&rbd_dev->header_rwsem);
844 if (first_time) {
845 header->object_prefix = object_prefix;
846 header->obj_order = ondisk->options.order;
847 header->crypt_type = ondisk->options.crypt_type;
848 header->comp_type = ondisk->options.comp_type;
849 /* The rest aren't used for format 1 images */
850 header->stripe_unit = 0;
851 header->stripe_count = 0;
852 header->features = 0;
853 } else {
854 ceph_put_snap_context(header->snapc);
855 kfree(header->snap_names);
856 kfree(header->snap_sizes);
857 }
858
859 /* The remaining fields always get updated (when we refresh) */
860
861 header->image_size = le64_to_cpu(ondisk->image_size);
862 header->snapc = snapc;
863 header->snap_names = snap_names;
864 header->snap_sizes = snap_sizes;
865
866 /* Make sure mapping size is consistent with header info */
867
868 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
869 if (rbd_dev->mapping.size != header->image_size)
870 rbd_dev->mapping.size = header->image_size;
871
872 up_write(&rbd_dev->header_rwsem);
873
874 return 0;
875 out_2big:
876 ret = -EIO;
877 out_err:
878 kfree(snap_sizes);
879 kfree(snap_names);
880 ceph_put_snap_context(snapc);
881 kfree(object_prefix);
882
883 return ret;
884 }
885
886 static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
887 {
888 const char *snap_name;
889
890 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
891
892 /* Skip over names until we find the one we are looking for */
893
894 snap_name = rbd_dev->header.snap_names;
895 while (which--)
896 snap_name += strlen(snap_name) + 1;
897
898 return kstrdup(snap_name, GFP_KERNEL);
899 }
900
901 /*
902 * Snapshot id comparison function for use with qsort()/bsearch().
903 * Note that result is for snapshots in *descending* order.
904 */
905 static int snapid_compare_reverse(const void *s1, const void *s2)
906 {
907 u64 snap_id1 = *(u64 *)s1;
908 u64 snap_id2 = *(u64 *)s2;
909
910 if (snap_id1 < snap_id2)
911 return 1;
912 return snap_id1 == snap_id2 ? 0 : -1;
913 }
914
915 /*
916 * Search a snapshot context to see if the given snapshot id is
917 * present.
918 *
919 * Returns the position of the snapshot id in the array if it's found,
920 * or BAD_SNAP_INDEX otherwise.
921 *
922 * Note: The snapshot array is in kept sorted (by the osd) in
923 * reverse order, highest snapshot id first.
924 */
925 static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
926 {
927 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
928 u64 *found;
929
930 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
931 sizeof (snap_id), snapid_compare_reverse);
932
933 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
934 }
935
936 static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
937 u64 snap_id)
938 {
939 u32 which;
940 const char *snap_name;
941
942 which = rbd_dev_snap_index(rbd_dev, snap_id);
943 if (which == BAD_SNAP_INDEX)
944 return ERR_PTR(-ENOENT);
945
946 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
947 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
948 }
949
950 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
951 {
952 if (snap_id == CEPH_NOSNAP)
953 return RBD_SNAP_HEAD_NAME;
954
955 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
956 if (rbd_dev->image_format == 1)
957 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
958
959 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
960 }
961
962 static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
963 u64 *snap_size)
964 {
965 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
966 if (snap_id == CEPH_NOSNAP) {
967 *snap_size = rbd_dev->header.image_size;
968 } else if (rbd_dev->image_format == 1) {
969 u32 which;
970
971 which = rbd_dev_snap_index(rbd_dev, snap_id);
972 if (which == BAD_SNAP_INDEX)
973 return -ENOENT;
974
975 *snap_size = rbd_dev->header.snap_sizes[which];
976 } else {
977 u64 size = 0;
978 int ret;
979
980 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
981 if (ret)
982 return ret;
983
984 *snap_size = size;
985 }
986 return 0;
987 }
988
989 static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
990 u64 *snap_features)
991 {
992 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
993 if (snap_id == CEPH_NOSNAP) {
994 *snap_features = rbd_dev->header.features;
995 } else if (rbd_dev->image_format == 1) {
996 *snap_features = 0; /* No features for format 1 */
997 } else {
998 u64 features = 0;
999 int ret;
1000
1001 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1002 if (ret)
1003 return ret;
1004
1005 *snap_features = features;
1006 }
1007 return 0;
1008 }
1009
1010 static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1011 {
1012 u64 snap_id = rbd_dev->spec->snap_id;
1013 u64 size = 0;
1014 u64 features = 0;
1015 int ret;
1016
1017 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1018 if (ret)
1019 return ret;
1020 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1021 if (ret)
1022 return ret;
1023
1024 rbd_dev->mapping.size = size;
1025 rbd_dev->mapping.features = features;
1026
1027 return 0;
1028 }
1029
1030 static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1031 {
1032 rbd_dev->mapping.size = 0;
1033 rbd_dev->mapping.features = 0;
1034 }
1035
1036 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1037 {
1038 char *name;
1039 u64 segment;
1040 int ret;
1041 char *name_format;
1042
1043 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1044 if (!name)
1045 return NULL;
1046 segment = offset >> rbd_dev->header.obj_order;
1047 name_format = "%s.%012llx";
1048 if (rbd_dev->image_format == 2)
1049 name_format = "%s.%016llx";
1050 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
1051 rbd_dev->header.object_prefix, segment);
1052 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
1053 pr_err("error formatting segment name for #%llu (%d)\n",
1054 segment, ret);
1055 kfree(name);
1056 name = NULL;
1057 }
1058
1059 return name;
1060 }
1061
1062 static void rbd_segment_name_free(const char *name)
1063 {
1064 /* The explicit cast here is needed to drop the const qualifier */
1065
1066 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1067 }
1068
1069 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1070 {
1071 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1072
1073 return offset & (segment_size - 1);
1074 }
1075
1076 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1077 u64 offset, u64 length)
1078 {
1079 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1080
1081 offset &= segment_size - 1;
1082
1083 rbd_assert(length <= U64_MAX - offset);
1084 if (offset + length > segment_size)
1085 length = segment_size - offset;
1086
1087 return length;
1088 }
1089
1090 /*
1091 * returns the size of an object in the image
1092 */
1093 static u64 rbd_obj_bytes(struct rbd_image_header *header)
1094 {
1095 return 1 << header->obj_order;
1096 }
1097
1098 /*
1099 * bio helpers
1100 */
1101
1102 static void bio_chain_put(struct bio *chain)
1103 {
1104 struct bio *tmp;
1105
1106 while (chain) {
1107 tmp = chain;
1108 chain = chain->bi_next;
1109 bio_put(tmp);
1110 }
1111 }
1112
1113 /*
1114 * zeros a bio chain, starting at specific offset
1115 */
1116 static void zero_bio_chain(struct bio *chain, int start_ofs)
1117 {
1118 struct bio_vec *bv;
1119 unsigned long flags;
1120 void *buf;
1121 int i;
1122 int pos = 0;
1123
1124 while (chain) {
1125 bio_for_each_segment(bv, chain, i) {
1126 if (pos + bv->bv_len > start_ofs) {
1127 int remainder = max(start_ofs - pos, 0);
1128 buf = bvec_kmap_irq(bv, &flags);
1129 memset(buf + remainder, 0,
1130 bv->bv_len - remainder);
1131 flush_dcache_page(bv->bv_page);
1132 bvec_kunmap_irq(buf, &flags);
1133 }
1134 pos += bv->bv_len;
1135 }
1136
1137 chain = chain->bi_next;
1138 }
1139 }
1140
1141 /*
1142 * similar to zero_bio_chain(), zeros data defined by a page array,
1143 * starting at the given byte offset from the start of the array and
1144 * continuing up to the given end offset. The pages array is
1145 * assumed to be big enough to hold all bytes up to the end.
1146 */
1147 static void zero_pages(struct page **pages, u64 offset, u64 end)
1148 {
1149 struct page **page = &pages[offset >> PAGE_SHIFT];
1150
1151 rbd_assert(end > offset);
1152 rbd_assert(end - offset <= (u64)SIZE_MAX);
1153 while (offset < end) {
1154 size_t page_offset;
1155 size_t length;
1156 unsigned long flags;
1157 void *kaddr;
1158
1159 page_offset = (size_t)(offset & ~PAGE_MASK);
1160 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1161 local_irq_save(flags);
1162 kaddr = kmap_atomic(*page);
1163 memset(kaddr + page_offset, 0, length);
1164 flush_dcache_page(*page);
1165 kunmap_atomic(kaddr);
1166 local_irq_restore(flags);
1167
1168 offset += length;
1169 page++;
1170 }
1171 }
1172
1173 /*
1174 * Clone a portion of a bio, starting at the given byte offset
1175 * and continuing for the number of bytes indicated.
1176 */
1177 static struct bio *bio_clone_range(struct bio *bio_src,
1178 unsigned int offset,
1179 unsigned int len,
1180 gfp_t gfpmask)
1181 {
1182 struct bio_vec *bv;
1183 unsigned int resid;
1184 unsigned short idx;
1185 unsigned int voff;
1186 unsigned short end_idx;
1187 unsigned short vcnt;
1188 struct bio *bio;
1189
1190 /* Handle the easy case for the caller */
1191
1192 if (!offset && len == bio_src->bi_size)
1193 return bio_clone(bio_src, gfpmask);
1194
1195 if (WARN_ON_ONCE(!len))
1196 return NULL;
1197 if (WARN_ON_ONCE(len > bio_src->bi_size))
1198 return NULL;
1199 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1200 return NULL;
1201
1202 /* Find first affected segment... */
1203
1204 resid = offset;
1205 bio_for_each_segment(bv, bio_src, idx) {
1206 if (resid < bv->bv_len)
1207 break;
1208 resid -= bv->bv_len;
1209 }
1210 voff = resid;
1211
1212 /* ...and the last affected segment */
1213
1214 resid += len;
1215 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1216 if (resid <= bv->bv_len)
1217 break;
1218 resid -= bv->bv_len;
1219 }
1220 vcnt = end_idx - idx + 1;
1221
1222 /* Build the clone */
1223
1224 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1225 if (!bio)
1226 return NULL; /* ENOMEM */
1227
1228 bio->bi_bdev = bio_src->bi_bdev;
1229 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1230 bio->bi_rw = bio_src->bi_rw;
1231 bio->bi_flags |= 1 << BIO_CLONED;
1232
1233 /*
1234 * Copy over our part of the bio_vec, then update the first
1235 * and last (or only) entries.
1236 */
1237 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1238 vcnt * sizeof (struct bio_vec));
1239 bio->bi_io_vec[0].bv_offset += voff;
1240 if (vcnt > 1) {
1241 bio->bi_io_vec[0].bv_len -= voff;
1242 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1243 } else {
1244 bio->bi_io_vec[0].bv_len = len;
1245 }
1246
1247 bio->bi_vcnt = vcnt;
1248 bio->bi_size = len;
1249 bio->bi_idx = 0;
1250
1251 return bio;
1252 }
1253
1254 /*
1255 * Clone a portion of a bio chain, starting at the given byte offset
1256 * into the first bio in the source chain and continuing for the
1257 * number of bytes indicated. The result is another bio chain of
1258 * exactly the given length, or a null pointer on error.
1259 *
1260 * The bio_src and offset parameters are both in-out. On entry they
1261 * refer to the first source bio and the offset into that bio where
1262 * the start of data to be cloned is located.
1263 *
1264 * On return, bio_src is updated to refer to the bio in the source
1265 * chain that contains first un-cloned byte, and *offset will
1266 * contain the offset of that byte within that bio.
1267 */
1268 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1269 unsigned int *offset,
1270 unsigned int len,
1271 gfp_t gfpmask)
1272 {
1273 struct bio *bi = *bio_src;
1274 unsigned int off = *offset;
1275 struct bio *chain = NULL;
1276 struct bio **end;
1277
1278 /* Build up a chain of clone bios up to the limit */
1279
1280 if (!bi || off >= bi->bi_size || !len)
1281 return NULL; /* Nothing to clone */
1282
1283 end = &chain;
1284 while (len) {
1285 unsigned int bi_size;
1286 struct bio *bio;
1287
1288 if (!bi) {
1289 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1290 goto out_err; /* EINVAL; ran out of bio's */
1291 }
1292 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1293 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1294 if (!bio)
1295 goto out_err; /* ENOMEM */
1296
1297 *end = bio;
1298 end = &bio->bi_next;
1299
1300 off += bi_size;
1301 if (off == bi->bi_size) {
1302 bi = bi->bi_next;
1303 off = 0;
1304 }
1305 len -= bi_size;
1306 }
1307 *bio_src = bi;
1308 *offset = off;
1309
1310 return chain;
1311 out_err:
1312 bio_chain_put(chain);
1313
1314 return NULL;
1315 }
1316
1317 /*
1318 * The default/initial value for all object request flags is 0. For
1319 * each flag, once its value is set to 1 it is never reset to 0
1320 * again.
1321 */
1322 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1323 {
1324 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1325 struct rbd_device *rbd_dev;
1326
1327 rbd_dev = obj_request->img_request->rbd_dev;
1328 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1329 obj_request);
1330 }
1331 }
1332
1333 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1334 {
1335 smp_mb();
1336 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1337 }
1338
1339 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1340 {
1341 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1342 struct rbd_device *rbd_dev = NULL;
1343
1344 if (obj_request_img_data_test(obj_request))
1345 rbd_dev = obj_request->img_request->rbd_dev;
1346 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1347 obj_request);
1348 }
1349 }
1350
1351 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1352 {
1353 smp_mb();
1354 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1355 }
1356
1357 /*
1358 * This sets the KNOWN flag after (possibly) setting the EXISTS
1359 * flag. The latter is set based on the "exists" value provided.
1360 *
1361 * Note that for our purposes once an object exists it never goes
1362 * away again. It's possible that the response from two existence
1363 * checks are separated by the creation of the target object, and
1364 * the first ("doesn't exist") response arrives *after* the second
1365 * ("does exist"). In that case we ignore the second one.
1366 */
1367 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1368 bool exists)
1369 {
1370 if (exists)
1371 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1372 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1373 smp_mb();
1374 }
1375
1376 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1377 {
1378 smp_mb();
1379 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1380 }
1381
1382 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1383 {
1384 smp_mb();
1385 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1386 }
1387
1388 static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1389 {
1390 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1391
1392 return obj_request->img_offset <
1393 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1394 }
1395
1396 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1397 {
1398 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1399 atomic_read(&obj_request->kref.refcount));
1400 kref_get(&obj_request->kref);
1401 }
1402
1403 static void rbd_obj_request_destroy(struct kref *kref);
1404 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1405 {
1406 rbd_assert(obj_request != NULL);
1407 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1408 atomic_read(&obj_request->kref.refcount));
1409 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1410 }
1411
1412 static void rbd_img_request_get(struct rbd_img_request *img_request)
1413 {
1414 dout("%s: img %p (was %d)\n", __func__, img_request,
1415 atomic_read(&img_request->kref.refcount));
1416 kref_get(&img_request->kref);
1417 }
1418
1419 static bool img_request_child_test(struct rbd_img_request *img_request);
1420 static void rbd_parent_request_destroy(struct kref *kref);
1421 static void rbd_img_request_destroy(struct kref *kref);
1422 static void rbd_img_request_put(struct rbd_img_request *img_request)
1423 {
1424 rbd_assert(img_request != NULL);
1425 dout("%s: img %p (was %d)\n", __func__, img_request,
1426 atomic_read(&img_request->kref.refcount));
1427 if (img_request_child_test(img_request))
1428 kref_put(&img_request->kref, rbd_parent_request_destroy);
1429 else
1430 kref_put(&img_request->kref, rbd_img_request_destroy);
1431 }
1432
1433 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1434 struct rbd_obj_request *obj_request)
1435 {
1436 rbd_assert(obj_request->img_request == NULL);
1437
1438 /* Image request now owns object's original reference */
1439 obj_request->img_request = img_request;
1440 obj_request->which = img_request->obj_request_count;
1441 rbd_assert(!obj_request_img_data_test(obj_request));
1442 obj_request_img_data_set(obj_request);
1443 rbd_assert(obj_request->which != BAD_WHICH);
1444 img_request->obj_request_count++;
1445 list_add_tail(&obj_request->links, &img_request->obj_requests);
1446 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1447 obj_request->which);
1448 }
1449
1450 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1451 struct rbd_obj_request *obj_request)
1452 {
1453 rbd_assert(obj_request->which != BAD_WHICH);
1454
1455 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1456 obj_request->which);
1457 list_del(&obj_request->links);
1458 rbd_assert(img_request->obj_request_count > 0);
1459 img_request->obj_request_count--;
1460 rbd_assert(obj_request->which == img_request->obj_request_count);
1461 obj_request->which = BAD_WHICH;
1462 rbd_assert(obj_request_img_data_test(obj_request));
1463 rbd_assert(obj_request->img_request == img_request);
1464 obj_request->img_request = NULL;
1465 obj_request->callback = NULL;
1466 rbd_obj_request_put(obj_request);
1467 }
1468
1469 static bool obj_request_type_valid(enum obj_request_type type)
1470 {
1471 switch (type) {
1472 case OBJ_REQUEST_NODATA:
1473 case OBJ_REQUEST_BIO:
1474 case OBJ_REQUEST_PAGES:
1475 return true;
1476 default:
1477 return false;
1478 }
1479 }
1480
1481 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1482 struct rbd_obj_request *obj_request)
1483 {
1484 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1485
1486 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1487 }
1488
1489 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1490 {
1491
1492 dout("%s: img %p\n", __func__, img_request);
1493
1494 /*
1495 * If no error occurred, compute the aggregate transfer
1496 * count for the image request. We could instead use
1497 * atomic64_cmpxchg() to update it as each object request
1498 * completes; not clear which way is better off hand.
1499 */
1500 if (!img_request->result) {
1501 struct rbd_obj_request *obj_request;
1502 u64 xferred = 0;
1503
1504 for_each_obj_request(img_request, obj_request)
1505 xferred += obj_request->xferred;
1506 img_request->xferred = xferred;
1507 }
1508
1509 if (img_request->callback)
1510 img_request->callback(img_request);
1511 else
1512 rbd_img_request_put(img_request);
1513 }
1514
1515 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1516
1517 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1518 {
1519 dout("%s: obj %p\n", __func__, obj_request);
1520
1521 return wait_for_completion_interruptible(&obj_request->completion);
1522 }
1523
1524 /*
1525 * The default/initial value for all image request flags is 0. Each
1526 * is conditionally set to 1 at image request initialization time
1527 * and currently never change thereafter.
1528 */
1529 static void img_request_write_set(struct rbd_img_request *img_request)
1530 {
1531 set_bit(IMG_REQ_WRITE, &img_request->flags);
1532 smp_mb();
1533 }
1534
1535 static bool img_request_write_test(struct rbd_img_request *img_request)
1536 {
1537 smp_mb();
1538 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1539 }
1540
1541 static void img_request_child_set(struct rbd_img_request *img_request)
1542 {
1543 set_bit(IMG_REQ_CHILD, &img_request->flags);
1544 smp_mb();
1545 }
1546
1547 static void img_request_child_clear(struct rbd_img_request *img_request)
1548 {
1549 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1550 smp_mb();
1551 }
1552
1553 static bool img_request_child_test(struct rbd_img_request *img_request)
1554 {
1555 smp_mb();
1556 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1557 }
1558
1559 static void img_request_layered_set(struct rbd_img_request *img_request)
1560 {
1561 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1562 smp_mb();
1563 }
1564
1565 static void img_request_layered_clear(struct rbd_img_request *img_request)
1566 {
1567 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1568 smp_mb();
1569 }
1570
1571 static bool img_request_layered_test(struct rbd_img_request *img_request)
1572 {
1573 smp_mb();
1574 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1575 }
1576
1577 static void
1578 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1579 {
1580 u64 xferred = obj_request->xferred;
1581 u64 length = obj_request->length;
1582
1583 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1584 obj_request, obj_request->img_request, obj_request->result,
1585 xferred, length);
1586 /*
1587 * ENOENT means a hole in the image. We zero-fill the entire
1588 * length of the request. A short read also implies zero-fill
1589 * to the end of the request. An error requires the whole
1590 * length of the request to be reported finished with an error
1591 * to the block layer. In each case we update the xferred
1592 * count to indicate the whole request was satisfied.
1593 */
1594 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1595 if (obj_request->result == -ENOENT) {
1596 if (obj_request->type == OBJ_REQUEST_BIO)
1597 zero_bio_chain(obj_request->bio_list, 0);
1598 else
1599 zero_pages(obj_request->pages, 0, length);
1600 obj_request->result = 0;
1601 } else if (xferred < length && !obj_request->result) {
1602 if (obj_request->type == OBJ_REQUEST_BIO)
1603 zero_bio_chain(obj_request->bio_list, xferred);
1604 else
1605 zero_pages(obj_request->pages, xferred, length);
1606 }
1607 obj_request->xferred = length;
1608 obj_request_done_set(obj_request);
1609 }
1610
1611 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1612 {
1613 dout("%s: obj %p cb %p\n", __func__, obj_request,
1614 obj_request->callback);
1615 if (obj_request->callback)
1616 obj_request->callback(obj_request);
1617 else
1618 complete_all(&obj_request->completion);
1619 }
1620
1621 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1622 {
1623 dout("%s: obj %p\n", __func__, obj_request);
1624 obj_request_done_set(obj_request);
1625 }
1626
1627 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1628 {
1629 struct rbd_img_request *img_request = NULL;
1630 struct rbd_device *rbd_dev = NULL;
1631 bool layered = false;
1632
1633 if (obj_request_img_data_test(obj_request)) {
1634 img_request = obj_request->img_request;
1635 layered = img_request && img_request_layered_test(img_request);
1636 rbd_dev = img_request->rbd_dev;
1637 }
1638
1639 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1640 obj_request, img_request, obj_request->result,
1641 obj_request->xferred, obj_request->length);
1642 if (layered && obj_request->result == -ENOENT &&
1643 obj_request->img_offset < rbd_dev->parent_overlap)
1644 rbd_img_parent_read(obj_request);
1645 else if (img_request)
1646 rbd_img_obj_request_read_callback(obj_request);
1647 else
1648 obj_request_done_set(obj_request);
1649 }
1650
1651 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1652 {
1653 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1654 obj_request->result, obj_request->length);
1655 /*
1656 * There is no such thing as a successful short write. Set
1657 * it to our originally-requested length.
1658 */
1659 obj_request->xferred = obj_request->length;
1660 obj_request_done_set(obj_request);
1661 }
1662
1663 /*
1664 * For a simple stat call there's nothing to do. We'll do more if
1665 * this is part of a write sequence for a layered image.
1666 */
1667 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1668 {
1669 dout("%s: obj %p\n", __func__, obj_request);
1670 obj_request_done_set(obj_request);
1671 }
1672
1673 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1674 struct ceph_msg *msg)
1675 {
1676 struct rbd_obj_request *obj_request = osd_req->r_priv;
1677 u16 opcode;
1678
1679 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1680 rbd_assert(osd_req == obj_request->osd_req);
1681 if (obj_request_img_data_test(obj_request)) {
1682 rbd_assert(obj_request->img_request);
1683 rbd_assert(obj_request->which != BAD_WHICH);
1684 } else {
1685 rbd_assert(obj_request->which == BAD_WHICH);
1686 }
1687
1688 if (osd_req->r_result < 0)
1689 obj_request->result = osd_req->r_result;
1690
1691 BUG_ON(osd_req->r_num_ops > 2);
1692
1693 /*
1694 * We support a 64-bit length, but ultimately it has to be
1695 * passed to blk_end_request(), which takes an unsigned int.
1696 */
1697 obj_request->xferred = osd_req->r_reply_op_len[0];
1698 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1699 opcode = osd_req->r_ops[0].op;
1700 switch (opcode) {
1701 case CEPH_OSD_OP_READ:
1702 rbd_osd_read_callback(obj_request);
1703 break;
1704 case CEPH_OSD_OP_WRITE:
1705 rbd_osd_write_callback(obj_request);
1706 break;
1707 case CEPH_OSD_OP_STAT:
1708 rbd_osd_stat_callback(obj_request);
1709 break;
1710 case CEPH_OSD_OP_CALL:
1711 case CEPH_OSD_OP_NOTIFY_ACK:
1712 case CEPH_OSD_OP_WATCH:
1713 rbd_osd_trivial_callback(obj_request);
1714 break;
1715 default:
1716 rbd_warn(NULL, "%s: unsupported op %hu\n",
1717 obj_request->object_name, (unsigned short) opcode);
1718 break;
1719 }
1720
1721 if (obj_request_done_test(obj_request))
1722 rbd_obj_request_complete(obj_request);
1723 }
1724
1725 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1726 {
1727 struct rbd_img_request *img_request = obj_request->img_request;
1728 struct ceph_osd_request *osd_req = obj_request->osd_req;
1729 u64 snap_id;
1730
1731 rbd_assert(osd_req != NULL);
1732
1733 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1734 ceph_osdc_build_request(osd_req, obj_request->offset,
1735 NULL, snap_id, NULL);
1736 }
1737
1738 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1739 {
1740 struct rbd_img_request *img_request = obj_request->img_request;
1741 struct ceph_osd_request *osd_req = obj_request->osd_req;
1742 struct ceph_snap_context *snapc;
1743 struct timespec mtime = CURRENT_TIME;
1744
1745 rbd_assert(osd_req != NULL);
1746
1747 snapc = img_request ? img_request->snapc : NULL;
1748 ceph_osdc_build_request(osd_req, obj_request->offset,
1749 snapc, CEPH_NOSNAP, &mtime);
1750 }
1751
1752 static struct ceph_osd_request *rbd_osd_req_create(
1753 struct rbd_device *rbd_dev,
1754 bool write_request,
1755 struct rbd_obj_request *obj_request)
1756 {
1757 struct ceph_snap_context *snapc = NULL;
1758 struct ceph_osd_client *osdc;
1759 struct ceph_osd_request *osd_req;
1760
1761 if (obj_request_img_data_test(obj_request)) {
1762 struct rbd_img_request *img_request = obj_request->img_request;
1763
1764 rbd_assert(write_request ==
1765 img_request_write_test(img_request));
1766 if (write_request)
1767 snapc = img_request->snapc;
1768 }
1769
1770 /* Allocate and initialize the request, for the single op */
1771
1772 osdc = &rbd_dev->rbd_client->client->osdc;
1773 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1774 if (!osd_req)
1775 return NULL; /* ENOMEM */
1776
1777 if (write_request)
1778 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1779 else
1780 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1781
1782 osd_req->r_callback = rbd_osd_req_callback;
1783 osd_req->r_priv = obj_request;
1784
1785 osd_req->r_oid_len = strlen(obj_request->object_name);
1786 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1787 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1788
1789 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1790
1791 return osd_req;
1792 }
1793
1794 /*
1795 * Create a copyup osd request based on the information in the
1796 * object request supplied. A copyup request has two osd ops,
1797 * a copyup method call, and a "normal" write request.
1798 */
1799 static struct ceph_osd_request *
1800 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1801 {
1802 struct rbd_img_request *img_request;
1803 struct ceph_snap_context *snapc;
1804 struct rbd_device *rbd_dev;
1805 struct ceph_osd_client *osdc;
1806 struct ceph_osd_request *osd_req;
1807
1808 rbd_assert(obj_request_img_data_test(obj_request));
1809 img_request = obj_request->img_request;
1810 rbd_assert(img_request);
1811 rbd_assert(img_request_write_test(img_request));
1812
1813 /* Allocate and initialize the request, for the two ops */
1814
1815 snapc = img_request->snapc;
1816 rbd_dev = img_request->rbd_dev;
1817 osdc = &rbd_dev->rbd_client->client->osdc;
1818 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1819 if (!osd_req)
1820 return NULL; /* ENOMEM */
1821
1822 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1823 osd_req->r_callback = rbd_osd_req_callback;
1824 osd_req->r_priv = obj_request;
1825
1826 osd_req->r_oid_len = strlen(obj_request->object_name);
1827 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1828 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1829
1830 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1831
1832 return osd_req;
1833 }
1834
1835
1836 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1837 {
1838 ceph_osdc_put_request(osd_req);
1839 }
1840
1841 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1842
1843 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1844 u64 offset, u64 length,
1845 enum obj_request_type type)
1846 {
1847 struct rbd_obj_request *obj_request;
1848 size_t size;
1849 char *name;
1850
1851 rbd_assert(obj_request_type_valid(type));
1852
1853 size = strlen(object_name) + 1;
1854 name = kmalloc(size, GFP_KERNEL);
1855 if (!name)
1856 return NULL;
1857
1858 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1859 if (!obj_request) {
1860 kfree(name);
1861 return NULL;
1862 }
1863
1864 obj_request->object_name = memcpy(name, object_name, size);
1865 obj_request->offset = offset;
1866 obj_request->length = length;
1867 obj_request->flags = 0;
1868 obj_request->which = BAD_WHICH;
1869 obj_request->type = type;
1870 INIT_LIST_HEAD(&obj_request->links);
1871 init_completion(&obj_request->completion);
1872 kref_init(&obj_request->kref);
1873
1874 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1875 offset, length, (int)type, obj_request);
1876
1877 return obj_request;
1878 }
1879
1880 static void rbd_obj_request_destroy(struct kref *kref)
1881 {
1882 struct rbd_obj_request *obj_request;
1883
1884 obj_request = container_of(kref, struct rbd_obj_request, kref);
1885
1886 dout("%s: obj %p\n", __func__, obj_request);
1887
1888 rbd_assert(obj_request->img_request == NULL);
1889 rbd_assert(obj_request->which == BAD_WHICH);
1890
1891 if (obj_request->osd_req)
1892 rbd_osd_req_destroy(obj_request->osd_req);
1893
1894 rbd_assert(obj_request_type_valid(obj_request->type));
1895 switch (obj_request->type) {
1896 case OBJ_REQUEST_NODATA:
1897 break; /* Nothing to do */
1898 case OBJ_REQUEST_BIO:
1899 if (obj_request->bio_list)
1900 bio_chain_put(obj_request->bio_list);
1901 break;
1902 case OBJ_REQUEST_PAGES:
1903 if (obj_request->pages)
1904 ceph_release_page_vector(obj_request->pages,
1905 obj_request->page_count);
1906 break;
1907 }
1908
1909 kfree(obj_request->object_name);
1910 obj_request->object_name = NULL;
1911 kmem_cache_free(rbd_obj_request_cache, obj_request);
1912 }
1913
1914 /* It's OK to call this for a device with no parent */
1915
1916 static void rbd_spec_put(struct rbd_spec *spec);
1917 static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1918 {
1919 rbd_dev_remove_parent(rbd_dev);
1920 rbd_spec_put(rbd_dev->parent_spec);
1921 rbd_dev->parent_spec = NULL;
1922 rbd_dev->parent_overlap = 0;
1923 }
1924
1925 /*
1926 * Parent image reference counting is used to determine when an
1927 * image's parent fields can be safely torn down--after there are no
1928 * more in-flight requests to the parent image. When the last
1929 * reference is dropped, cleaning them up is safe.
1930 */
1931 static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1932 {
1933 int counter;
1934
1935 if (!rbd_dev->parent_spec)
1936 return;
1937
1938 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1939 if (counter > 0)
1940 return;
1941
1942 /* Last reference; clean up parent data structures */
1943
1944 if (!counter)
1945 rbd_dev_unparent(rbd_dev);
1946 else
1947 rbd_warn(rbd_dev, "parent reference underflow\n");
1948 }
1949
1950 /*
1951 * If an image has a non-zero parent overlap, get a reference to its
1952 * parent.
1953 *
1954 * We must get the reference before checking for the overlap to
1955 * coordinate properly with zeroing the parent overlap in
1956 * rbd_dev_v2_parent_info() when an image gets flattened. We
1957 * drop it again if there is no overlap.
1958 *
1959 * Returns true if the rbd device has a parent with a non-zero
1960 * overlap and a reference for it was successfully taken, or
1961 * false otherwise.
1962 */
1963 static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1964 {
1965 int counter;
1966
1967 if (!rbd_dev->parent_spec)
1968 return false;
1969
1970 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1971 if (counter > 0 && rbd_dev->parent_overlap)
1972 return true;
1973
1974 /* Image was flattened, but parent is not yet torn down */
1975
1976 if (counter < 0)
1977 rbd_warn(rbd_dev, "parent reference overflow\n");
1978
1979 return false;
1980 }
1981
1982 /*
1983 * Caller is responsible for filling in the list of object requests
1984 * that comprises the image request, and the Linux request pointer
1985 * (if there is one).
1986 */
1987 static struct rbd_img_request *rbd_img_request_create(
1988 struct rbd_device *rbd_dev,
1989 u64 offset, u64 length,
1990 bool write_request)
1991 {
1992 struct rbd_img_request *img_request;
1993
1994 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
1995 if (!img_request)
1996 return NULL;
1997
1998 if (write_request) {
1999 down_read(&rbd_dev->header_rwsem);
2000 ceph_get_snap_context(rbd_dev->header.snapc);
2001 up_read(&rbd_dev->header_rwsem);
2002 }
2003
2004 img_request->rq = NULL;
2005 img_request->rbd_dev = rbd_dev;
2006 img_request->offset = offset;
2007 img_request->length = length;
2008 img_request->flags = 0;
2009 if (write_request) {
2010 img_request_write_set(img_request);
2011 img_request->snapc = rbd_dev->header.snapc;
2012 } else {
2013 img_request->snap_id = rbd_dev->spec->snap_id;
2014 }
2015 if (rbd_dev_parent_get(rbd_dev))
2016 img_request_layered_set(img_request);
2017 spin_lock_init(&img_request->completion_lock);
2018 img_request->next_completion = 0;
2019 img_request->callback = NULL;
2020 img_request->result = 0;
2021 img_request->obj_request_count = 0;
2022 INIT_LIST_HEAD(&img_request->obj_requests);
2023 kref_init(&img_request->kref);
2024
2025 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2026 write_request ? "write" : "read", offset, length,
2027 img_request);
2028
2029 return img_request;
2030 }
2031
2032 static void rbd_img_request_destroy(struct kref *kref)
2033 {
2034 struct rbd_img_request *img_request;
2035 struct rbd_obj_request *obj_request;
2036 struct rbd_obj_request *next_obj_request;
2037
2038 img_request = container_of(kref, struct rbd_img_request, kref);
2039
2040 dout("%s: img %p\n", __func__, img_request);
2041
2042 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2043 rbd_img_obj_request_del(img_request, obj_request);
2044 rbd_assert(img_request->obj_request_count == 0);
2045
2046 if (img_request_layered_test(img_request)) {
2047 img_request_layered_clear(img_request);
2048 rbd_dev_parent_put(img_request->rbd_dev);
2049 }
2050
2051 if (img_request_write_test(img_request))
2052 ceph_put_snap_context(img_request->snapc);
2053
2054 kmem_cache_free(rbd_img_request_cache, img_request);
2055 }
2056
2057 static struct rbd_img_request *rbd_parent_request_create(
2058 struct rbd_obj_request *obj_request,
2059 u64 img_offset, u64 length)
2060 {
2061 struct rbd_img_request *parent_request;
2062 struct rbd_device *rbd_dev;
2063
2064 rbd_assert(obj_request->img_request);
2065 rbd_dev = obj_request->img_request->rbd_dev;
2066
2067 parent_request = rbd_img_request_create(rbd_dev->parent,
2068 img_offset, length, false);
2069 if (!parent_request)
2070 return NULL;
2071
2072 img_request_child_set(parent_request);
2073 rbd_obj_request_get(obj_request);
2074 parent_request->obj_request = obj_request;
2075
2076 return parent_request;
2077 }
2078
2079 static void rbd_parent_request_destroy(struct kref *kref)
2080 {
2081 struct rbd_img_request *parent_request;
2082 struct rbd_obj_request *orig_request;
2083
2084 parent_request = container_of(kref, struct rbd_img_request, kref);
2085 orig_request = parent_request->obj_request;
2086
2087 parent_request->obj_request = NULL;
2088 rbd_obj_request_put(orig_request);
2089 img_request_child_clear(parent_request);
2090
2091 rbd_img_request_destroy(kref);
2092 }
2093
2094 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2095 {
2096 struct rbd_img_request *img_request;
2097 unsigned int xferred;
2098 int result;
2099 bool more;
2100
2101 rbd_assert(obj_request_img_data_test(obj_request));
2102 img_request = obj_request->img_request;
2103
2104 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2105 xferred = (unsigned int)obj_request->xferred;
2106 result = obj_request->result;
2107 if (result) {
2108 struct rbd_device *rbd_dev = img_request->rbd_dev;
2109
2110 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2111 img_request_write_test(img_request) ? "write" : "read",
2112 obj_request->length, obj_request->img_offset,
2113 obj_request->offset);
2114 rbd_warn(rbd_dev, " result %d xferred %x\n",
2115 result, xferred);
2116 if (!img_request->result)
2117 img_request->result = result;
2118 }
2119
2120 /* Image object requests don't own their page array */
2121
2122 if (obj_request->type == OBJ_REQUEST_PAGES) {
2123 obj_request->pages = NULL;
2124 obj_request->page_count = 0;
2125 }
2126
2127 if (img_request_child_test(img_request)) {
2128 rbd_assert(img_request->obj_request != NULL);
2129 more = obj_request->which < img_request->obj_request_count - 1;
2130 } else {
2131 rbd_assert(img_request->rq != NULL);
2132 more = blk_end_request(img_request->rq, result, xferred);
2133 }
2134
2135 return more;
2136 }
2137
2138 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2139 {
2140 struct rbd_img_request *img_request;
2141 u32 which = obj_request->which;
2142 bool more = true;
2143
2144 rbd_assert(obj_request_img_data_test(obj_request));
2145 img_request = obj_request->img_request;
2146
2147 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2148 rbd_assert(img_request != NULL);
2149 rbd_assert(img_request->obj_request_count > 0);
2150 rbd_assert(which != BAD_WHICH);
2151 rbd_assert(which < img_request->obj_request_count);
2152 rbd_assert(which >= img_request->next_completion);
2153
2154 spin_lock_irq(&img_request->completion_lock);
2155 if (which != img_request->next_completion)
2156 goto out;
2157
2158 for_each_obj_request_from(img_request, obj_request) {
2159 rbd_assert(more);
2160 rbd_assert(which < img_request->obj_request_count);
2161
2162 if (!obj_request_done_test(obj_request))
2163 break;
2164 more = rbd_img_obj_end_request(obj_request);
2165 which++;
2166 }
2167
2168 rbd_assert(more ^ (which == img_request->obj_request_count));
2169 img_request->next_completion = which;
2170 out:
2171 spin_unlock_irq(&img_request->completion_lock);
2172 rbd_img_request_put(img_request);
2173
2174 if (!more)
2175 rbd_img_request_complete(img_request);
2176 }
2177
2178 /*
2179 * Split up an image request into one or more object requests, each
2180 * to a different object. The "type" parameter indicates whether
2181 * "data_desc" is the pointer to the head of a list of bio
2182 * structures, or the base of a page array. In either case this
2183 * function assumes data_desc describes memory sufficient to hold
2184 * all data described by the image request.
2185 */
2186 static int rbd_img_request_fill(struct rbd_img_request *img_request,
2187 enum obj_request_type type,
2188 void *data_desc)
2189 {
2190 struct rbd_device *rbd_dev = img_request->rbd_dev;
2191 struct rbd_obj_request *obj_request = NULL;
2192 struct rbd_obj_request *next_obj_request;
2193 bool write_request = img_request_write_test(img_request);
2194 struct bio *bio_list = 0;
2195 unsigned int bio_offset = 0;
2196 struct page **pages = 0;
2197 u64 img_offset;
2198 u64 resid;
2199 u16 opcode;
2200
2201 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2202 (int)type, data_desc);
2203
2204 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2205 img_offset = img_request->offset;
2206 resid = img_request->length;
2207 rbd_assert(resid > 0);
2208
2209 if (type == OBJ_REQUEST_BIO) {
2210 bio_list = data_desc;
2211 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2212 } else {
2213 rbd_assert(type == OBJ_REQUEST_PAGES);
2214 pages = data_desc;
2215 }
2216
2217 while (resid) {
2218 struct ceph_osd_request *osd_req;
2219 const char *object_name;
2220 u64 offset;
2221 u64 length;
2222
2223 object_name = rbd_segment_name(rbd_dev, img_offset);
2224 if (!object_name)
2225 goto out_unwind;
2226 offset = rbd_segment_offset(rbd_dev, img_offset);
2227 length = rbd_segment_length(rbd_dev, img_offset, resid);
2228 obj_request = rbd_obj_request_create(object_name,
2229 offset, length, type);
2230 /* object request has its own copy of the object name */
2231 rbd_segment_name_free(object_name);
2232 if (!obj_request)
2233 goto out_unwind;
2234 /*
2235 * set obj_request->img_request before creating the
2236 * osd_request so that it gets the right snapc
2237 */
2238 rbd_img_obj_request_add(img_request, obj_request);
2239
2240 if (type == OBJ_REQUEST_BIO) {
2241 unsigned int clone_size;
2242
2243 rbd_assert(length <= (u64)UINT_MAX);
2244 clone_size = (unsigned int)length;
2245 obj_request->bio_list =
2246 bio_chain_clone_range(&bio_list,
2247 &bio_offset,
2248 clone_size,
2249 GFP_ATOMIC);
2250 if (!obj_request->bio_list)
2251 goto out_partial;
2252 } else {
2253 unsigned int page_count;
2254
2255 obj_request->pages = pages;
2256 page_count = (u32)calc_pages_for(offset, length);
2257 obj_request->page_count = page_count;
2258 if ((offset + length) & ~PAGE_MASK)
2259 page_count--; /* more on last page */
2260 pages += page_count;
2261 }
2262
2263 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2264 obj_request);
2265 if (!osd_req)
2266 goto out_partial;
2267 obj_request->osd_req = osd_req;
2268 obj_request->callback = rbd_img_obj_callback;
2269 rbd_img_request_get(img_request);
2270
2271 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2272 0, 0);
2273 if (type == OBJ_REQUEST_BIO)
2274 osd_req_op_extent_osd_data_bio(osd_req, 0,
2275 obj_request->bio_list, length);
2276 else
2277 osd_req_op_extent_osd_data_pages(osd_req, 0,
2278 obj_request->pages, length,
2279 offset & ~PAGE_MASK, false, false);
2280
2281 if (write_request)
2282 rbd_osd_req_format_write(obj_request);
2283 else
2284 rbd_osd_req_format_read(obj_request);
2285
2286 obj_request->img_offset = img_offset;
2287
2288 img_offset += length;
2289 resid -= length;
2290 }
2291
2292 return 0;
2293
2294 out_partial:
2295 rbd_obj_request_put(obj_request);
2296 out_unwind:
2297 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2298 rbd_img_obj_request_del(img_request, obj_request);
2299
2300 return -ENOMEM;
2301 }
2302
2303 static void
2304 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2305 {
2306 struct rbd_img_request *img_request;
2307 struct rbd_device *rbd_dev;
2308 struct page **pages;
2309 u32 page_count;
2310
2311 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2312 rbd_assert(obj_request_img_data_test(obj_request));
2313 img_request = obj_request->img_request;
2314 rbd_assert(img_request);
2315
2316 rbd_dev = img_request->rbd_dev;
2317 rbd_assert(rbd_dev);
2318
2319 pages = obj_request->copyup_pages;
2320 rbd_assert(pages != NULL);
2321 obj_request->copyup_pages = NULL;
2322 page_count = obj_request->copyup_page_count;
2323 rbd_assert(page_count);
2324 obj_request->copyup_page_count = 0;
2325 ceph_release_page_vector(pages, page_count);
2326
2327 /*
2328 * We want the transfer count to reflect the size of the
2329 * original write request. There is no such thing as a
2330 * successful short write, so if the request was successful
2331 * we can just set it to the originally-requested length.
2332 */
2333 if (!obj_request->result)
2334 obj_request->xferred = obj_request->length;
2335
2336 /* Finish up with the normal image object callback */
2337
2338 rbd_img_obj_callback(obj_request);
2339 }
2340
2341 static void
2342 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2343 {
2344 struct rbd_obj_request *orig_request;
2345 struct ceph_osd_request *osd_req;
2346 struct ceph_osd_client *osdc;
2347 struct rbd_device *rbd_dev;
2348 struct page **pages;
2349 u32 page_count;
2350 int img_result;
2351 u64 parent_length;
2352 u64 offset;
2353 u64 length;
2354
2355 rbd_assert(img_request_child_test(img_request));
2356
2357 /* First get what we need from the image request */
2358
2359 pages = img_request->copyup_pages;
2360 rbd_assert(pages != NULL);
2361 img_request->copyup_pages = NULL;
2362 page_count = img_request->copyup_page_count;
2363 rbd_assert(page_count);
2364 img_request->copyup_page_count = 0;
2365
2366 orig_request = img_request->obj_request;
2367 rbd_assert(orig_request != NULL);
2368 rbd_assert(obj_request_type_valid(orig_request->type));
2369 img_result = img_request->result;
2370 parent_length = img_request->length;
2371 rbd_assert(parent_length == img_request->xferred);
2372 rbd_img_request_put(img_request);
2373
2374 rbd_assert(orig_request->img_request);
2375 rbd_dev = orig_request->img_request->rbd_dev;
2376 rbd_assert(rbd_dev);
2377
2378 /*
2379 * If the overlap has become 0 (most likely because the
2380 * image has been flattened) we need to free the pages
2381 * and re-submit the original write request.
2382 */
2383 if (!rbd_dev->parent_overlap) {
2384 struct ceph_osd_client *osdc;
2385
2386 ceph_release_page_vector(pages, page_count);
2387 osdc = &rbd_dev->rbd_client->client->osdc;
2388 img_result = rbd_obj_request_submit(osdc, orig_request);
2389 if (!img_result)
2390 return;
2391 }
2392
2393 if (img_result)
2394 goto out_err;
2395
2396 /*
2397 * The original osd request is of no use to use any more.
2398 * We need a new one that can hold the two ops in a copyup
2399 * request. Allocate the new copyup osd request for the
2400 * original request, and release the old one.
2401 */
2402 img_result = -ENOMEM;
2403 osd_req = rbd_osd_req_create_copyup(orig_request);
2404 if (!osd_req)
2405 goto out_err;
2406 rbd_osd_req_destroy(orig_request->osd_req);
2407 orig_request->osd_req = osd_req;
2408 orig_request->copyup_pages = pages;
2409 orig_request->copyup_page_count = page_count;
2410
2411 /* Initialize the copyup op */
2412
2413 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2414 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2415 false, false);
2416
2417 /* Then the original write request op */
2418
2419 offset = orig_request->offset;
2420 length = orig_request->length;
2421 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2422 offset, length, 0, 0);
2423 if (orig_request->type == OBJ_REQUEST_BIO)
2424 osd_req_op_extent_osd_data_bio(osd_req, 1,
2425 orig_request->bio_list, length);
2426 else
2427 osd_req_op_extent_osd_data_pages(osd_req, 1,
2428 orig_request->pages, length,
2429 offset & ~PAGE_MASK, false, false);
2430
2431 rbd_osd_req_format_write(orig_request);
2432
2433 /* All set, send it off. */
2434
2435 orig_request->callback = rbd_img_obj_copyup_callback;
2436 osdc = &rbd_dev->rbd_client->client->osdc;
2437 img_result = rbd_obj_request_submit(osdc, orig_request);
2438 if (!img_result)
2439 return;
2440 out_err:
2441 /* Record the error code and complete the request */
2442
2443 orig_request->result = img_result;
2444 orig_request->xferred = 0;
2445 obj_request_done_set(orig_request);
2446 rbd_obj_request_complete(orig_request);
2447 }
2448
2449 /*
2450 * Read from the parent image the range of data that covers the
2451 * entire target of the given object request. This is used for
2452 * satisfying a layered image write request when the target of an
2453 * object request from the image request does not exist.
2454 *
2455 * A page array big enough to hold the returned data is allocated
2456 * and supplied to rbd_img_request_fill() as the "data descriptor."
2457 * When the read completes, this page array will be transferred to
2458 * the original object request for the copyup operation.
2459 *
2460 * If an error occurs, record it as the result of the original
2461 * object request and mark it done so it gets completed.
2462 */
2463 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2464 {
2465 struct rbd_img_request *img_request = NULL;
2466 struct rbd_img_request *parent_request = NULL;
2467 struct rbd_device *rbd_dev;
2468 u64 img_offset;
2469 u64 length;
2470 struct page **pages = NULL;
2471 u32 page_count;
2472 int result;
2473
2474 rbd_assert(obj_request_img_data_test(obj_request));
2475 rbd_assert(obj_request_type_valid(obj_request->type));
2476
2477 img_request = obj_request->img_request;
2478 rbd_assert(img_request != NULL);
2479 rbd_dev = img_request->rbd_dev;
2480 rbd_assert(rbd_dev->parent != NULL);
2481
2482 /*
2483 * Determine the byte range covered by the object in the
2484 * child image to which the original request was to be sent.
2485 */
2486 img_offset = obj_request->img_offset - obj_request->offset;
2487 length = (u64)1 << rbd_dev->header.obj_order;
2488
2489 /*
2490 * There is no defined parent data beyond the parent
2491 * overlap, so limit what we read at that boundary if
2492 * necessary.
2493 */
2494 if (img_offset + length > rbd_dev->parent_overlap) {
2495 rbd_assert(img_offset < rbd_dev->parent_overlap);
2496 length = rbd_dev->parent_overlap - img_offset;
2497 }
2498
2499 /*
2500 * Allocate a page array big enough to receive the data read
2501 * from the parent.
2502 */
2503 page_count = (u32)calc_pages_for(0, length);
2504 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2505 if (IS_ERR(pages)) {
2506 result = PTR_ERR(pages);
2507 pages = NULL;
2508 goto out_err;
2509 }
2510
2511 result = -ENOMEM;
2512 parent_request = rbd_parent_request_create(obj_request,
2513 img_offset, length);
2514 if (!parent_request)
2515 goto out_err;
2516
2517 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2518 if (result)
2519 goto out_err;
2520 parent_request->copyup_pages = pages;
2521 parent_request->copyup_page_count = page_count;
2522
2523 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2524 result = rbd_img_request_submit(parent_request);
2525 if (!result)
2526 return 0;
2527
2528 parent_request->copyup_pages = NULL;
2529 parent_request->copyup_page_count = 0;
2530 parent_request->obj_request = NULL;
2531 rbd_obj_request_put(obj_request);
2532 out_err:
2533 if (pages)
2534 ceph_release_page_vector(pages, page_count);
2535 if (parent_request)
2536 rbd_img_request_put(parent_request);
2537 obj_request->result = result;
2538 obj_request->xferred = 0;
2539 obj_request_done_set(obj_request);
2540
2541 return result;
2542 }
2543
2544 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2545 {
2546 struct rbd_obj_request *orig_request;
2547 struct rbd_device *rbd_dev;
2548 int result;
2549
2550 rbd_assert(!obj_request_img_data_test(obj_request));
2551
2552 /*
2553 * All we need from the object request is the original
2554 * request and the result of the STAT op. Grab those, then
2555 * we're done with the request.
2556 */
2557 orig_request = obj_request->obj_request;
2558 obj_request->obj_request = NULL;
2559 rbd_assert(orig_request);
2560 rbd_assert(orig_request->img_request);
2561
2562 result = obj_request->result;
2563 obj_request->result = 0;
2564
2565 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2566 obj_request, orig_request, result,
2567 obj_request->xferred, obj_request->length);
2568 rbd_obj_request_put(obj_request);
2569
2570 /*
2571 * If the overlap has become 0 (most likely because the
2572 * image has been flattened) we need to free the pages
2573 * and re-submit the original write request.
2574 */
2575 rbd_dev = orig_request->img_request->rbd_dev;
2576 if (!rbd_dev->parent_overlap) {
2577 struct ceph_osd_client *osdc;
2578
2579 rbd_obj_request_put(orig_request);
2580 osdc = &rbd_dev->rbd_client->client->osdc;
2581 result = rbd_obj_request_submit(osdc, orig_request);
2582 if (!result)
2583 return;
2584 }
2585
2586 /*
2587 * Our only purpose here is to determine whether the object
2588 * exists, and we don't want to treat the non-existence as
2589 * an error. If something else comes back, transfer the
2590 * error to the original request and complete it now.
2591 */
2592 if (!result) {
2593 obj_request_existence_set(orig_request, true);
2594 } else if (result == -ENOENT) {
2595 obj_request_existence_set(orig_request, false);
2596 } else if (result) {
2597 orig_request->result = result;
2598 goto out;
2599 }
2600
2601 /*
2602 * Resubmit the original request now that we have recorded
2603 * whether the target object exists.
2604 */
2605 orig_request->result = rbd_img_obj_request_submit(orig_request);
2606 out:
2607 if (orig_request->result)
2608 rbd_obj_request_complete(orig_request);
2609 rbd_obj_request_put(orig_request);
2610 }
2611
2612 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2613 {
2614 struct rbd_obj_request *stat_request;
2615 struct rbd_device *rbd_dev;
2616 struct ceph_osd_client *osdc;
2617 struct page **pages = NULL;
2618 u32 page_count;
2619 size_t size;
2620 int ret;
2621
2622 /*
2623 * The response data for a STAT call consists of:
2624 * le64 length;
2625 * struct {
2626 * le32 tv_sec;
2627 * le32 tv_nsec;
2628 * } mtime;
2629 */
2630 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2631 page_count = (u32)calc_pages_for(0, size);
2632 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2633 if (IS_ERR(pages))
2634 return PTR_ERR(pages);
2635
2636 ret = -ENOMEM;
2637 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2638 OBJ_REQUEST_PAGES);
2639 if (!stat_request)
2640 goto out;
2641
2642 rbd_obj_request_get(obj_request);
2643 stat_request->obj_request = obj_request;
2644 stat_request->pages = pages;
2645 stat_request->page_count = page_count;
2646
2647 rbd_assert(obj_request->img_request);
2648 rbd_dev = obj_request->img_request->rbd_dev;
2649 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2650 stat_request);
2651 if (!stat_request->osd_req)
2652 goto out;
2653 stat_request->callback = rbd_img_obj_exists_callback;
2654
2655 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2656 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2657 false, false);
2658 rbd_osd_req_format_read(stat_request);
2659
2660 osdc = &rbd_dev->rbd_client->client->osdc;
2661 ret = rbd_obj_request_submit(osdc, stat_request);
2662 out:
2663 if (ret)
2664 rbd_obj_request_put(obj_request);
2665
2666 return ret;
2667 }
2668
2669 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2670 {
2671 struct rbd_img_request *img_request;
2672 struct rbd_device *rbd_dev;
2673 bool known;
2674
2675 rbd_assert(obj_request_img_data_test(obj_request));
2676
2677 img_request = obj_request->img_request;
2678 rbd_assert(img_request);
2679 rbd_dev = img_request->rbd_dev;
2680
2681 /*
2682 * Only writes to layered images need special handling.
2683 * Reads and non-layered writes are simple object requests.
2684 * Layered writes that start beyond the end of the overlap
2685 * with the parent have no parent data, so they too are
2686 * simple object requests. Finally, if the target object is
2687 * known to already exist, its parent data has already been
2688 * copied, so a write to the object can also be handled as a
2689 * simple object request.
2690 */
2691 if (!img_request_write_test(img_request) ||
2692 !img_request_layered_test(img_request) ||
2693 !obj_request_overlaps_parent(obj_request) ||
2694 ((known = obj_request_known_test(obj_request)) &&
2695 obj_request_exists_test(obj_request))) {
2696
2697 struct rbd_device *rbd_dev;
2698 struct ceph_osd_client *osdc;
2699
2700 rbd_dev = obj_request->img_request->rbd_dev;
2701 osdc = &rbd_dev->rbd_client->client->osdc;
2702
2703 return rbd_obj_request_submit(osdc, obj_request);
2704 }
2705
2706 /*
2707 * It's a layered write. The target object might exist but
2708 * we may not know that yet. If we know it doesn't exist,
2709 * start by reading the data for the full target object from
2710 * the parent so we can use it for a copyup to the target.
2711 */
2712 if (known)
2713 return rbd_img_obj_parent_read_full(obj_request);
2714
2715 /* We don't know whether the target exists. Go find out. */
2716
2717 return rbd_img_obj_exists_submit(obj_request);
2718 }
2719
2720 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2721 {
2722 struct rbd_obj_request *obj_request;
2723 struct rbd_obj_request *next_obj_request;
2724
2725 dout("%s: img %p\n", __func__, img_request);
2726 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2727 int ret;
2728
2729 ret = rbd_img_obj_request_submit(obj_request);
2730 if (ret)
2731 return ret;
2732 }
2733
2734 return 0;
2735 }
2736
2737 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2738 {
2739 struct rbd_obj_request *obj_request;
2740 struct rbd_device *rbd_dev;
2741 u64 obj_end;
2742 u64 img_xferred;
2743 int img_result;
2744
2745 rbd_assert(img_request_child_test(img_request));
2746
2747 /* First get what we need from the image request and release it */
2748
2749 obj_request = img_request->obj_request;
2750 img_xferred = img_request->xferred;
2751 img_result = img_request->result;
2752 rbd_img_request_put(img_request);
2753
2754 /*
2755 * If the overlap has become 0 (most likely because the
2756 * image has been flattened) we need to re-submit the
2757 * original request.
2758 */
2759 rbd_assert(obj_request);
2760 rbd_assert(obj_request->img_request);
2761 rbd_dev = obj_request->img_request->rbd_dev;
2762 if (!rbd_dev->parent_overlap) {
2763 struct ceph_osd_client *osdc;
2764
2765 osdc = &rbd_dev->rbd_client->client->osdc;
2766 img_result = rbd_obj_request_submit(osdc, obj_request);
2767 if (!img_result)
2768 return;
2769 }
2770
2771 obj_request->result = img_result;
2772 if (obj_request->result)
2773 goto out;
2774
2775 /*
2776 * We need to zero anything beyond the parent overlap
2777 * boundary. Since rbd_img_obj_request_read_callback()
2778 * will zero anything beyond the end of a short read, an
2779 * easy way to do this is to pretend the data from the
2780 * parent came up short--ending at the overlap boundary.
2781 */
2782 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2783 obj_end = obj_request->img_offset + obj_request->length;
2784 if (obj_end > rbd_dev->parent_overlap) {
2785 u64 xferred = 0;
2786
2787 if (obj_request->img_offset < rbd_dev->parent_overlap)
2788 xferred = rbd_dev->parent_overlap -
2789 obj_request->img_offset;
2790
2791 obj_request->xferred = min(img_xferred, xferred);
2792 } else {
2793 obj_request->xferred = img_xferred;
2794 }
2795 out:
2796 rbd_img_obj_request_read_callback(obj_request);
2797 rbd_obj_request_complete(obj_request);
2798 }
2799
2800 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2801 {
2802 struct rbd_img_request *img_request;
2803 int result;
2804
2805 rbd_assert(obj_request_img_data_test(obj_request));
2806 rbd_assert(obj_request->img_request != NULL);
2807 rbd_assert(obj_request->result == (s32) -ENOENT);
2808 rbd_assert(obj_request_type_valid(obj_request->type));
2809
2810 /* rbd_read_finish(obj_request, obj_request->length); */
2811 img_request = rbd_parent_request_create(obj_request,
2812 obj_request->img_offset,
2813 obj_request->length);
2814 result = -ENOMEM;
2815 if (!img_request)
2816 goto out_err;
2817
2818 if (obj_request->type == OBJ_REQUEST_BIO)
2819 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2820 obj_request->bio_list);
2821 else
2822 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2823 obj_request->pages);
2824 if (result)
2825 goto out_err;
2826
2827 img_request->callback = rbd_img_parent_read_callback;
2828 result = rbd_img_request_submit(img_request);
2829 if (result)
2830 goto out_err;
2831
2832 return;
2833 out_err:
2834 if (img_request)
2835 rbd_img_request_put(img_request);
2836 obj_request->result = result;
2837 obj_request->xferred = 0;
2838 obj_request_done_set(obj_request);
2839 }
2840
2841 static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
2842 {
2843 struct rbd_obj_request *obj_request;
2844 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2845 int ret;
2846
2847 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2848 OBJ_REQUEST_NODATA);
2849 if (!obj_request)
2850 return -ENOMEM;
2851
2852 ret = -ENOMEM;
2853 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2854 if (!obj_request->osd_req)
2855 goto out;
2856
2857 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2858 notify_id, 0, 0);
2859 rbd_osd_req_format_read(obj_request);
2860
2861 ret = rbd_obj_request_submit(osdc, obj_request);
2862 if (ret)
2863 goto out;
2864 ret = rbd_obj_request_wait(obj_request);
2865 out:
2866 rbd_obj_request_put(obj_request);
2867
2868 return ret;
2869 }
2870
2871 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2872 {
2873 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2874 int ret;
2875
2876 if (!rbd_dev)
2877 return;
2878
2879 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2880 rbd_dev->header_name, (unsigned long long)notify_id,
2881 (unsigned int)opcode);
2882 ret = rbd_dev_refresh(rbd_dev);
2883 if (ret)
2884 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
2885
2886 rbd_obj_notify_ack_sync(rbd_dev, notify_id);
2887 }
2888
2889 /*
2890 * Request sync osd watch/unwatch. The value of "start" determines
2891 * whether a watch request is being initiated or torn down.
2892 */
2893 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
2894 {
2895 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2896 struct rbd_obj_request *obj_request;
2897 int ret;
2898
2899 rbd_assert(start ^ !!rbd_dev->watch_event);
2900 rbd_assert(start ^ !!rbd_dev->watch_request);
2901
2902 if (start) {
2903 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2904 &rbd_dev->watch_event);
2905 if (ret < 0)
2906 return ret;
2907 rbd_assert(rbd_dev->watch_event != NULL);
2908 }
2909
2910 ret = -ENOMEM;
2911 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2912 OBJ_REQUEST_NODATA);
2913 if (!obj_request)
2914 goto out_cancel;
2915
2916 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2917 if (!obj_request->osd_req)
2918 goto out_cancel;
2919
2920 if (start)
2921 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2922 else
2923 ceph_osdc_unregister_linger_request(osdc,
2924 rbd_dev->watch_request->osd_req);
2925
2926 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2927 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
2928 rbd_osd_req_format_write(obj_request);
2929
2930 ret = rbd_obj_request_submit(osdc, obj_request);
2931 if (ret)
2932 goto out_cancel;
2933 ret = rbd_obj_request_wait(obj_request);
2934 if (ret)
2935 goto out_cancel;
2936 ret = obj_request->result;
2937 if (ret)
2938 goto out_cancel;
2939
2940 /*
2941 * A watch request is set to linger, so the underlying osd
2942 * request won't go away until we unregister it. We retain
2943 * a pointer to the object request during that time (in
2944 * rbd_dev->watch_request), so we'll keep a reference to
2945 * it. We'll drop that reference (below) after we've
2946 * unregistered it.
2947 */
2948 if (start) {
2949 rbd_dev->watch_request = obj_request;
2950
2951 return 0;
2952 }
2953
2954 /* We have successfully torn down the watch request */
2955
2956 rbd_obj_request_put(rbd_dev->watch_request);
2957 rbd_dev->watch_request = NULL;
2958 out_cancel:
2959 /* Cancel the event if we're tearing down, or on error */
2960 ceph_osdc_cancel_event(rbd_dev->watch_event);
2961 rbd_dev->watch_event = NULL;
2962 if (obj_request)
2963 rbd_obj_request_put(obj_request);
2964
2965 return ret;
2966 }
2967
2968 /*
2969 * Synchronous osd object method call. Returns the number of bytes
2970 * returned in the outbound buffer, or a negative error code.
2971 */
2972 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2973 const char *object_name,
2974 const char *class_name,
2975 const char *method_name,
2976 const void *outbound,
2977 size_t outbound_size,
2978 void *inbound,
2979 size_t inbound_size)
2980 {
2981 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2982 struct rbd_obj_request *obj_request;
2983 struct page **pages;
2984 u32 page_count;
2985 int ret;
2986
2987 /*
2988 * Method calls are ultimately read operations. The result
2989 * should placed into the inbound buffer provided. They
2990 * also supply outbound data--parameters for the object
2991 * method. Currently if this is present it will be a
2992 * snapshot id.
2993 */
2994 page_count = (u32)calc_pages_for(0, inbound_size);
2995 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2996 if (IS_ERR(pages))
2997 return PTR_ERR(pages);
2998
2999 ret = -ENOMEM;
3000 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
3001 OBJ_REQUEST_PAGES);
3002 if (!obj_request)
3003 goto out;
3004
3005 obj_request->pages = pages;
3006 obj_request->page_count = page_count;
3007
3008 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3009 if (!obj_request->osd_req)
3010 goto out;
3011
3012 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
3013 class_name, method_name);
3014 if (outbound_size) {
3015 struct ceph_pagelist *pagelist;
3016
3017 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3018 if (!pagelist)
3019 goto out;
3020
3021 ceph_pagelist_init(pagelist);
3022 ceph_pagelist_append(pagelist, outbound, outbound_size);
3023 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3024 pagelist);
3025 }
3026 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3027 obj_request->pages, inbound_size,
3028 0, false, false);
3029 rbd_osd_req_format_read(obj_request);
3030
3031 ret = rbd_obj_request_submit(osdc, obj_request);
3032 if (ret)
3033 goto out;
3034 ret = rbd_obj_request_wait(obj_request);
3035 if (ret)
3036 goto out;
3037
3038 ret = obj_request->result;
3039 if (ret < 0)
3040 goto out;
3041
3042 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3043 ret = (int)obj_request->xferred;
3044 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3045 out:
3046 if (obj_request)
3047 rbd_obj_request_put(obj_request);
3048 else
3049 ceph_release_page_vector(pages, page_count);
3050
3051 return ret;
3052 }
3053
3054 static void rbd_request_fn(struct request_queue *q)
3055 __releases(q->queue_lock) __acquires(q->queue_lock)
3056 {
3057 struct rbd_device *rbd_dev = q->queuedata;
3058 bool read_only = rbd_dev->mapping.read_only;
3059 struct request *rq;
3060 int result;
3061
3062 while ((rq = blk_fetch_request(q))) {
3063 bool write_request = rq_data_dir(rq) == WRITE;
3064 struct rbd_img_request *img_request;
3065 u64 offset;
3066 u64 length;
3067
3068 /* Ignore any non-FS requests that filter through. */
3069
3070 if (rq->cmd_type != REQ_TYPE_FS) {
3071 dout("%s: non-fs request type %d\n", __func__,
3072 (int) rq->cmd_type);
3073 __blk_end_request_all(rq, 0);
3074 continue;
3075 }
3076
3077 /* Ignore/skip any zero-length requests */
3078
3079 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3080 length = (u64) blk_rq_bytes(rq);
3081
3082 if (!length) {
3083 dout("%s: zero-length request\n", __func__);
3084 __blk_end_request_all(rq, 0);
3085 continue;
3086 }
3087
3088 spin_unlock_irq(q->queue_lock);
3089
3090 /* Disallow writes to a read-only device */
3091
3092 if (write_request) {
3093 result = -EROFS;
3094 if (read_only)
3095 goto end_request;
3096 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3097 }
3098
3099 /*
3100 * Quit early if the mapped snapshot no longer
3101 * exists. It's still possible the snapshot will
3102 * have disappeared by the time our request arrives
3103 * at the osd, but there's no sense in sending it if
3104 * we already know.
3105 */
3106 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3107 dout("request for non-existent snapshot");
3108 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3109 result = -ENXIO;
3110 goto end_request;
3111 }
3112
3113 result = -EINVAL;
3114 if (offset && length > U64_MAX - offset + 1) {
3115 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3116 offset, length);
3117 goto end_request; /* Shouldn't happen */
3118 }
3119
3120 result = -EIO;
3121 if (offset + length > rbd_dev->mapping.size) {
3122 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3123 offset, length, rbd_dev->mapping.size);
3124 goto end_request;
3125 }
3126
3127 result = -ENOMEM;
3128 img_request = rbd_img_request_create(rbd_dev, offset, length,
3129 write_request);
3130 if (!img_request)
3131 goto end_request;
3132
3133 img_request->rq = rq;
3134
3135 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3136 rq->bio);
3137 if (!result)
3138 result = rbd_img_request_submit(img_request);
3139 if (result)
3140 rbd_img_request_put(img_request);
3141 end_request:
3142 spin_lock_irq(q->queue_lock);
3143 if (result < 0) {
3144 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3145 write_request ? "write" : "read",
3146 length, offset, result);
3147
3148 __blk_end_request_all(rq, result);
3149 }
3150 }
3151 }
3152
3153 /*
3154 * a queue callback. Makes sure that we don't create a bio that spans across
3155 * multiple osd objects. One exception would be with a single page bios,
3156 * which we handle later at bio_chain_clone_range()
3157 */
3158 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3159 struct bio_vec *bvec)
3160 {
3161 struct rbd_device *rbd_dev = q->queuedata;
3162 sector_t sector_offset;
3163 sector_t sectors_per_obj;
3164 sector_t obj_sector_offset;
3165 int ret;
3166
3167 /*
3168 * Find how far into its rbd object the partition-relative
3169 * bio start sector is to offset relative to the enclosing
3170 * device.
3171 */
3172 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3173 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3174 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3175
3176 /*
3177 * Compute the number of bytes from that offset to the end
3178 * of the object. Account for what's already used by the bio.
3179 */
3180 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3181 if (ret > bmd->bi_size)
3182 ret -= bmd->bi_size;
3183 else
3184 ret = 0;
3185
3186 /*
3187 * Don't send back more than was asked for. And if the bio
3188 * was empty, let the whole thing through because: "Note
3189 * that a block device *must* allow a single page to be
3190 * added to an empty bio."
3191 */
3192 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3193 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3194 ret = (int) bvec->bv_len;
3195
3196 return ret;
3197 }
3198
3199 static void rbd_free_disk(struct rbd_device *rbd_dev)
3200 {
3201 struct gendisk *disk = rbd_dev->disk;
3202
3203 if (!disk)
3204 return;
3205
3206 rbd_dev->disk = NULL;
3207 if (disk->flags & GENHD_FL_UP) {
3208 del_gendisk(disk);
3209 if (disk->queue)
3210 blk_cleanup_queue(disk->queue);
3211 }
3212 put_disk(disk);
3213 }
3214
3215 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3216 const char *object_name,
3217 u64 offset, u64 length, void *buf)
3218
3219 {
3220 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3221 struct rbd_obj_request *obj_request;
3222 struct page **pages = NULL;
3223 u32 page_count;
3224 size_t size;
3225 int ret;
3226
3227 page_count = (u32) calc_pages_for(offset, length);
3228 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3229 if (IS_ERR(pages))
3230 return PTR_ERR(pages);
3231
3232 ret = -ENOMEM;
3233 obj_request = rbd_obj_request_create(object_name, offset, length,
3234 OBJ_REQUEST_PAGES);
3235 if (!obj_request)
3236 goto out;
3237
3238 obj_request->pages = pages;
3239 obj_request->page_count = page_count;
3240
3241 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
3242 if (!obj_request->osd_req)
3243 goto out;
3244
3245 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3246 offset, length, 0, 0);
3247 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3248 obj_request->pages,
3249 obj_request->length,
3250 obj_request->offset & ~PAGE_MASK,
3251 false, false);
3252 rbd_osd_req_format_read(obj_request);
3253
3254 ret = rbd_obj_request_submit(osdc, obj_request);
3255 if (ret)
3256 goto out;
3257 ret = rbd_obj_request_wait(obj_request);
3258 if (ret)
3259 goto out;
3260
3261 ret = obj_request->result;
3262 if (ret < 0)
3263 goto out;
3264
3265 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3266 size = (size_t) obj_request->xferred;
3267 ceph_copy_from_page_vector(pages, buf, 0, size);
3268 rbd_assert(size <= (size_t)INT_MAX);
3269 ret = (int)size;
3270 out:
3271 if (obj_request)
3272 rbd_obj_request_put(obj_request);
3273 else
3274 ceph_release_page_vector(pages, page_count);
3275
3276 return ret;
3277 }
3278
3279 /*
3280 * Read the complete header for the given rbd device. On successful
3281 * return, the rbd_dev->header field will contain up-to-date
3282 * information about the image.
3283 */
3284 static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3285 {
3286 struct rbd_image_header_ondisk *ondisk = NULL;
3287 u32 snap_count = 0;
3288 u64 names_size = 0;
3289 u32 want_count;
3290 int ret;
3291
3292 /*
3293 * The complete header will include an array of its 64-bit
3294 * snapshot ids, followed by the names of those snapshots as
3295 * a contiguous block of NUL-terminated strings. Note that
3296 * the number of snapshots could change by the time we read
3297 * it in, in which case we re-read it.
3298 */
3299 do {
3300 size_t size;
3301
3302 kfree(ondisk);
3303
3304 size = sizeof (*ondisk);
3305 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3306 size += names_size;
3307 ondisk = kmalloc(size, GFP_KERNEL);
3308 if (!ondisk)
3309 return -ENOMEM;
3310
3311 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3312 0, size, ondisk);
3313 if (ret < 0)
3314 goto out;
3315 if ((size_t)ret < size) {
3316 ret = -ENXIO;
3317 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3318 size, ret);
3319 goto out;
3320 }
3321 if (!rbd_dev_ondisk_valid(ondisk)) {
3322 ret = -ENXIO;
3323 rbd_warn(rbd_dev, "invalid header");
3324 goto out;
3325 }
3326
3327 names_size = le64_to_cpu(ondisk->snap_names_len);
3328 want_count = snap_count;
3329 snap_count = le32_to_cpu(ondisk->snap_count);
3330 } while (snap_count != want_count);
3331
3332 ret = rbd_header_from_disk(rbd_dev, ondisk);
3333 out:
3334 kfree(ondisk);
3335
3336 return ret;
3337 }
3338
3339 /*
3340 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3341 * has disappeared from the (just updated) snapshot context.
3342 */
3343 static void rbd_exists_validate(struct rbd_device *rbd_dev)
3344 {
3345 u64 snap_id;
3346
3347 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3348 return;
3349
3350 snap_id = rbd_dev->spec->snap_id;
3351 if (snap_id == CEPH_NOSNAP)
3352 return;
3353
3354 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3355 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3356 }
3357
3358 static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3359 {
3360 sector_t size;
3361 bool removing;
3362
3363 /*
3364 * Don't hold the lock while doing disk operations,
3365 * or lock ordering will conflict with the bdev mutex via:
3366 * rbd_add() -> blkdev_get() -> rbd_open()
3367 */
3368 spin_lock_irq(&rbd_dev->lock);
3369 removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3370 spin_unlock_irq(&rbd_dev->lock);
3371 /*
3372 * If the device is being removed, rbd_dev->disk has
3373 * been destroyed, so don't try to update its size
3374 */
3375 if (!removing) {
3376 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3377 dout("setting size to %llu sectors", (unsigned long long)size);
3378 set_capacity(rbd_dev->disk, size);
3379 revalidate_disk(rbd_dev->disk);
3380 }
3381 }
3382
3383 static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3384 {
3385 u64 mapping_size;
3386 int ret;
3387
3388 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3389 mapping_size = rbd_dev->mapping.size;
3390 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3391 if (rbd_dev->image_format == 1)
3392 ret = rbd_dev_v1_header_info(rbd_dev);
3393 else
3394 ret = rbd_dev_v2_header_info(rbd_dev);
3395
3396 /* If it's a mapped snapshot, validate its EXISTS flag */
3397
3398 rbd_exists_validate(rbd_dev);
3399 mutex_unlock(&ctl_mutex);
3400 if (mapping_size != rbd_dev->mapping.size) {
3401 rbd_dev_update_size(rbd_dev);
3402 }
3403
3404 return ret;
3405 }
3406
3407 static int rbd_init_disk(struct rbd_device *rbd_dev)
3408 {
3409 struct gendisk *disk;
3410 struct request_queue *q;
3411 u64 segment_size;
3412
3413 /* create gendisk info */
3414 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3415 if (!disk)
3416 return -ENOMEM;
3417
3418 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3419 rbd_dev->dev_id);
3420 disk->major = rbd_dev->major;
3421 disk->first_minor = 0;
3422 disk->fops = &rbd_bd_ops;
3423 disk->private_data = rbd_dev;
3424
3425 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3426 if (!q)
3427 goto out_disk;
3428
3429 /* We use the default size, but let's be explicit about it. */
3430 blk_queue_physical_block_size(q, SECTOR_SIZE);
3431
3432 /* set io sizes to object size */
3433 segment_size = rbd_obj_bytes(&rbd_dev->header);
3434 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3435 blk_queue_max_segment_size(q, segment_size);
3436 blk_queue_io_min(q, segment_size);
3437 blk_queue_io_opt(q, segment_size);
3438
3439 blk_queue_merge_bvec(q, rbd_merge_bvec);
3440 disk->queue = q;
3441
3442 q->queuedata = rbd_dev;
3443
3444 rbd_dev->disk = disk;
3445
3446 return 0;
3447 out_disk:
3448 put_disk(disk);
3449
3450 return -ENOMEM;
3451 }
3452
3453 /*
3454 sysfs
3455 */
3456
3457 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3458 {
3459 return container_of(dev, struct rbd_device, dev);
3460 }
3461
3462 static ssize_t rbd_size_show(struct device *dev,
3463 struct device_attribute *attr, char *buf)
3464 {
3465 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3466
3467 return sprintf(buf, "%llu\n",
3468 (unsigned long long)rbd_dev->mapping.size);
3469 }
3470
3471 /*
3472 * Note this shows the features for whatever's mapped, which is not
3473 * necessarily the base image.
3474 */
3475 static ssize_t rbd_features_show(struct device *dev,
3476 struct device_attribute *attr, char *buf)
3477 {
3478 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3479
3480 return sprintf(buf, "0x%016llx\n",
3481 (unsigned long long)rbd_dev->mapping.features);
3482 }
3483
3484 static ssize_t rbd_major_show(struct device *dev,
3485 struct device_attribute *attr, char *buf)
3486 {
3487 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3488
3489 if (rbd_dev->major)
3490 return sprintf(buf, "%d\n", rbd_dev->major);
3491
3492 return sprintf(buf, "(none)\n");
3493
3494 }
3495
3496 static ssize_t rbd_client_id_show(struct device *dev,
3497 struct device_attribute *attr, char *buf)
3498 {
3499 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3500
3501 return sprintf(buf, "client%lld\n",
3502 ceph_client_id(rbd_dev->rbd_client->client));
3503 }
3504
3505 static ssize_t rbd_pool_show(struct device *dev,
3506 struct device_attribute *attr, char *buf)
3507 {
3508 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3509
3510 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3511 }
3512
3513 static ssize_t rbd_pool_id_show(struct device *dev,
3514 struct device_attribute *attr, char *buf)
3515 {
3516 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3517
3518 return sprintf(buf, "%llu\n",
3519 (unsigned long long) rbd_dev->spec->pool_id);
3520 }
3521
3522 static ssize_t rbd_name_show(struct device *dev,
3523 struct device_attribute *attr, char *buf)
3524 {
3525 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3526
3527 if (rbd_dev->spec->image_name)
3528 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3529
3530 return sprintf(buf, "(unknown)\n");
3531 }
3532
3533 static ssize_t rbd_image_id_show(struct device *dev,
3534 struct device_attribute *attr, char *buf)
3535 {
3536 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3537
3538 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3539 }
3540
3541 /*
3542 * Shows the name of the currently-mapped snapshot (or
3543 * RBD_SNAP_HEAD_NAME for the base image).
3544 */
3545 static ssize_t rbd_snap_show(struct device *dev,
3546 struct device_attribute *attr,
3547 char *buf)
3548 {
3549 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3550
3551 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3552 }
3553
3554 /*
3555 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3556 * for the parent image. If there is no parent, simply shows
3557 * "(no parent image)".
3558 */
3559 static ssize_t rbd_parent_show(struct device *dev,
3560 struct device_attribute *attr,
3561 char *buf)
3562 {
3563 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3564 struct rbd_spec *spec = rbd_dev->parent_spec;
3565 int count;
3566 char *bufp = buf;
3567
3568 if (!spec)
3569 return sprintf(buf, "(no parent image)\n");
3570
3571 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3572 (unsigned long long) spec->pool_id, spec->pool_name);
3573 if (count < 0)
3574 return count;
3575 bufp += count;
3576
3577 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3578 spec->image_name ? spec->image_name : "(unknown)");
3579 if (count < 0)
3580 return count;
3581 bufp += count;
3582
3583 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3584 (unsigned long long) spec->snap_id, spec->snap_name);
3585 if (count < 0)
3586 return count;
3587 bufp += count;
3588
3589 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3590 if (count < 0)
3591 return count;
3592 bufp += count;
3593
3594 return (ssize_t) (bufp - buf);
3595 }
3596
3597 static ssize_t rbd_image_refresh(struct device *dev,
3598 struct device_attribute *attr,
3599 const char *buf,
3600 size_t size)
3601 {
3602 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3603 int ret;
3604
3605 ret = rbd_dev_refresh(rbd_dev);
3606 if (ret)
3607 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
3608
3609 return ret < 0 ? ret : size;
3610 }
3611
3612 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3613 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3614 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3615 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3616 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3617 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3618 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3619 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3620 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3621 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3622 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3623
3624 static struct attribute *rbd_attrs[] = {
3625 &dev_attr_size.attr,
3626 &dev_attr_features.attr,
3627 &dev_attr_major.attr,
3628 &dev_attr_client_id.attr,
3629 &dev_attr_pool.attr,
3630 &dev_attr_pool_id.attr,
3631 &dev_attr_name.attr,
3632 &dev_attr_image_id.attr,
3633 &dev_attr_current_snap.attr,
3634 &dev_attr_parent.attr,
3635 &dev_attr_refresh.attr,
3636 NULL
3637 };
3638
3639 static struct attribute_group rbd_attr_group = {
3640 .attrs = rbd_attrs,
3641 };
3642
3643 static const struct attribute_group *rbd_attr_groups[] = {
3644 &rbd_attr_group,
3645 NULL
3646 };
3647
3648 static void rbd_sysfs_dev_release(struct device *dev)
3649 {
3650 }
3651
3652 static struct device_type rbd_device_type = {
3653 .name = "rbd",
3654 .groups = rbd_attr_groups,
3655 .release = rbd_sysfs_dev_release,
3656 };
3657
3658 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3659 {
3660 kref_get(&spec->kref);
3661
3662 return spec;
3663 }
3664
3665 static void rbd_spec_free(struct kref *kref);
3666 static void rbd_spec_put(struct rbd_spec *spec)
3667 {
3668 if (spec)
3669 kref_put(&spec->kref, rbd_spec_free);
3670 }
3671
3672 static struct rbd_spec *rbd_spec_alloc(void)
3673 {
3674 struct rbd_spec *spec;
3675
3676 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3677 if (!spec)
3678 return NULL;
3679 kref_init(&spec->kref);
3680
3681 return spec;
3682 }
3683
3684 static void rbd_spec_free(struct kref *kref)
3685 {
3686 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3687
3688 kfree(spec->pool_name);
3689 kfree(spec->image_id);
3690 kfree(spec->image_name);
3691 kfree(spec->snap_name);
3692 kfree(spec);
3693 }
3694
3695 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3696 struct rbd_spec *spec)
3697 {
3698 struct rbd_device *rbd_dev;
3699
3700 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3701 if (!rbd_dev)
3702 return NULL;
3703
3704 spin_lock_init(&rbd_dev->lock);
3705 rbd_dev->flags = 0;
3706 atomic_set(&rbd_dev->parent_ref, 0);
3707 INIT_LIST_HEAD(&rbd_dev->node);
3708 init_rwsem(&rbd_dev->header_rwsem);
3709
3710 rbd_dev->spec = spec;
3711 rbd_dev->rbd_client = rbdc;
3712
3713 /* Initialize the layout used for all rbd requests */
3714
3715 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3716 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3717 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3718 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3719
3720 return rbd_dev;
3721 }
3722
3723 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3724 {
3725 rbd_put_client(rbd_dev->rbd_client);
3726 rbd_spec_put(rbd_dev->spec);
3727 kfree(rbd_dev);
3728 }
3729
3730 /*
3731 * Get the size and object order for an image snapshot, or if
3732 * snap_id is CEPH_NOSNAP, gets this information for the base
3733 * image.
3734 */
3735 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3736 u8 *order, u64 *snap_size)
3737 {
3738 __le64 snapid = cpu_to_le64(snap_id);
3739 int ret;
3740 struct {
3741 u8 order;
3742 __le64 size;
3743 } __attribute__ ((packed)) size_buf = { 0 };
3744
3745 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3746 "rbd", "get_size",
3747 &snapid, sizeof (snapid),
3748 &size_buf, sizeof (size_buf));
3749 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3750 if (ret < 0)
3751 return ret;
3752 if (ret < sizeof (size_buf))
3753 return -ERANGE;
3754
3755 if (order) {
3756 *order = size_buf.order;
3757 dout(" order %u", (unsigned int)*order);
3758 }
3759 *snap_size = le64_to_cpu(size_buf.size);
3760
3761 dout(" snap_id 0x%016llx snap_size = %llu\n",
3762 (unsigned long long)snap_id,
3763 (unsigned long long)*snap_size);
3764
3765 return 0;
3766 }
3767
3768 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3769 {
3770 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3771 &rbd_dev->header.obj_order,
3772 &rbd_dev->header.image_size);
3773 }
3774
3775 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3776 {
3777 void *reply_buf;
3778 int ret;
3779 void *p;
3780
3781 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3782 if (!reply_buf)
3783 return -ENOMEM;
3784
3785 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3786 "rbd", "get_object_prefix", NULL, 0,
3787 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3788 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3789 if (ret < 0)
3790 goto out;
3791
3792 p = reply_buf;
3793 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3794 p + ret, NULL, GFP_NOIO);
3795 ret = 0;
3796
3797 if (IS_ERR(rbd_dev->header.object_prefix)) {
3798 ret = PTR_ERR(rbd_dev->header.object_prefix);
3799 rbd_dev->header.object_prefix = NULL;
3800 } else {
3801 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3802 }
3803 out:
3804 kfree(reply_buf);
3805
3806 return ret;
3807 }
3808
3809 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3810 u64 *snap_features)
3811 {
3812 __le64 snapid = cpu_to_le64(snap_id);
3813 struct {
3814 __le64 features;
3815 __le64 incompat;
3816 } __attribute__ ((packed)) features_buf = { 0 };
3817 u64 incompat;
3818 int ret;
3819
3820 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3821 "rbd", "get_features",
3822 &snapid, sizeof (snapid),
3823 &features_buf, sizeof (features_buf));
3824 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3825 if (ret < 0)
3826 return ret;
3827 if (ret < sizeof (features_buf))
3828 return -ERANGE;
3829
3830 incompat = le64_to_cpu(features_buf.incompat);
3831 if (incompat & ~RBD_FEATURES_SUPPORTED)
3832 return -ENXIO;
3833
3834 *snap_features = le64_to_cpu(features_buf.features);
3835
3836 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3837 (unsigned long long)snap_id,
3838 (unsigned long long)*snap_features,
3839 (unsigned long long)le64_to_cpu(features_buf.incompat));
3840
3841 return 0;
3842 }
3843
3844 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3845 {
3846 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3847 &rbd_dev->header.features);
3848 }
3849
3850 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3851 {
3852 struct rbd_spec *parent_spec;
3853 size_t size;
3854 void *reply_buf = NULL;
3855 __le64 snapid;
3856 void *p;
3857 void *end;
3858 u64 pool_id;
3859 char *image_id;
3860 u64 overlap;
3861 int ret;
3862
3863 parent_spec = rbd_spec_alloc();
3864 if (!parent_spec)
3865 return -ENOMEM;
3866
3867 size = sizeof (__le64) + /* pool_id */
3868 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3869 sizeof (__le64) + /* snap_id */
3870 sizeof (__le64); /* overlap */
3871 reply_buf = kmalloc(size, GFP_KERNEL);
3872 if (!reply_buf) {
3873 ret = -ENOMEM;
3874 goto out_err;
3875 }
3876
3877 snapid = cpu_to_le64(CEPH_NOSNAP);
3878 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3879 "rbd", "get_parent",
3880 &snapid, sizeof (snapid),
3881 reply_buf, size);
3882 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3883 if (ret < 0)
3884 goto out_err;
3885
3886 p = reply_buf;
3887 end = reply_buf + ret;
3888 ret = -ERANGE;
3889 ceph_decode_64_safe(&p, end, pool_id, out_err);
3890 if (pool_id == CEPH_NOPOOL) {
3891 /*
3892 * Either the parent never existed, or we have
3893 * record of it but the image got flattened so it no
3894 * longer has a parent. When the parent of a
3895 * layered image disappears we immediately set the
3896 * overlap to 0. The effect of this is that all new
3897 * requests will be treated as if the image had no
3898 * parent.
3899 */
3900 if (rbd_dev->parent_overlap) {
3901 rbd_dev->parent_overlap = 0;
3902 smp_mb();
3903 rbd_dev_parent_put(rbd_dev);
3904 pr_info("%s: clone image has been flattened\n",
3905 rbd_dev->disk->disk_name);
3906 }
3907
3908 goto out; /* No parent? No problem. */
3909 }
3910
3911 /* The ceph file layout needs to fit pool id in 32 bits */
3912
3913 ret = -EIO;
3914 if (pool_id > (u64)U32_MAX) {
3915 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3916 (unsigned long long)pool_id, U32_MAX);
3917 goto out_err;
3918 }
3919 parent_spec->pool_id = pool_id;
3920
3921 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3922 if (IS_ERR(image_id)) {
3923 ret = PTR_ERR(image_id);
3924 goto out_err;
3925 }
3926 parent_spec->image_id = image_id;
3927 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3928 ceph_decode_64_safe(&p, end, overlap, out_err);
3929
3930 if (overlap) {
3931 rbd_spec_put(rbd_dev->parent_spec);
3932 rbd_dev->parent_spec = parent_spec;
3933 parent_spec = NULL; /* rbd_dev now owns this */
3934 rbd_dev->parent_overlap = overlap;
3935 } else {
3936 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3937 }
3938 out:
3939 ret = 0;
3940 out_err:
3941 kfree(reply_buf);
3942 rbd_spec_put(parent_spec);
3943
3944 return ret;
3945 }
3946
3947 static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3948 {
3949 struct {
3950 __le64 stripe_unit;
3951 __le64 stripe_count;
3952 } __attribute__ ((packed)) striping_info_buf = { 0 };
3953 size_t size = sizeof (striping_info_buf);
3954 void *p;
3955 u64 obj_size;
3956 u64 stripe_unit;
3957 u64 stripe_count;
3958 int ret;
3959
3960 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3961 "rbd", "get_stripe_unit_count", NULL, 0,
3962 (char *)&striping_info_buf, size);
3963 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3964 if (ret < 0)
3965 return ret;
3966 if (ret < size)
3967 return -ERANGE;
3968
3969 /*
3970 * We don't actually support the "fancy striping" feature
3971 * (STRIPINGV2) yet, but if the striping sizes are the
3972 * defaults the behavior is the same as before. So find
3973 * out, and only fail if the image has non-default values.
3974 */
3975 ret = -EINVAL;
3976 obj_size = (u64)1 << rbd_dev->header.obj_order;
3977 p = &striping_info_buf;
3978 stripe_unit = ceph_decode_64(&p);
3979 if (stripe_unit != obj_size) {
3980 rbd_warn(rbd_dev, "unsupported stripe unit "
3981 "(got %llu want %llu)",
3982 stripe_unit, obj_size);
3983 return -EINVAL;
3984 }
3985 stripe_count = ceph_decode_64(&p);
3986 if (stripe_count != 1) {
3987 rbd_warn(rbd_dev, "unsupported stripe count "
3988 "(got %llu want 1)", stripe_count);
3989 return -EINVAL;
3990 }
3991 rbd_dev->header.stripe_unit = stripe_unit;
3992 rbd_dev->header.stripe_count = stripe_count;
3993
3994 return 0;
3995 }
3996
3997 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3998 {
3999 size_t image_id_size;
4000 char *image_id;
4001 void *p;
4002 void *end;
4003 size_t size;
4004 void *reply_buf = NULL;
4005 size_t len = 0;
4006 char *image_name = NULL;
4007 int ret;
4008
4009 rbd_assert(!rbd_dev->spec->image_name);
4010
4011 len = strlen(rbd_dev->spec->image_id);
4012 image_id_size = sizeof (__le32) + len;
4013 image_id = kmalloc(image_id_size, GFP_KERNEL);
4014 if (!image_id)
4015 return NULL;
4016
4017 p = image_id;
4018 end = image_id + image_id_size;
4019 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4020
4021 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4022 reply_buf = kmalloc(size, GFP_KERNEL);
4023 if (!reply_buf)
4024 goto out;
4025
4026 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4027 "rbd", "dir_get_name",
4028 image_id, image_id_size,
4029 reply_buf, size);
4030 if (ret < 0)
4031 goto out;
4032 p = reply_buf;
4033 end = reply_buf + ret;
4034
4035 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4036 if (IS_ERR(image_name))
4037 image_name = NULL;
4038 else
4039 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4040 out:
4041 kfree(reply_buf);
4042 kfree(image_id);
4043
4044 return image_name;
4045 }
4046
4047 static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4048 {
4049 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4050 const char *snap_name;
4051 u32 which = 0;
4052
4053 /* Skip over names until we find the one we are looking for */
4054
4055 snap_name = rbd_dev->header.snap_names;
4056 while (which < snapc->num_snaps) {
4057 if (!strcmp(name, snap_name))
4058 return snapc->snaps[which];
4059 snap_name += strlen(snap_name) + 1;
4060 which++;
4061 }
4062 return CEPH_NOSNAP;
4063 }
4064
4065 static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4066 {
4067 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4068 u32 which;
4069 bool found = false;
4070 u64 snap_id;
4071
4072 for (which = 0; !found && which < snapc->num_snaps; which++) {
4073 const char *snap_name;
4074
4075 snap_id = snapc->snaps[which];
4076 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4077 if (IS_ERR(snap_name)) {
4078 /* ignore no-longer existing snapshots */
4079 if (PTR_ERR(snap_name) == -ENOENT)
4080 continue;
4081 else
4082 break;
4083 }
4084 found = !strcmp(name, snap_name);
4085 kfree(snap_name);
4086 }
4087 return found ? snap_id : CEPH_NOSNAP;
4088 }
4089
4090 /*
4091 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4092 * no snapshot by that name is found, or if an error occurs.
4093 */
4094 static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4095 {
4096 if (rbd_dev->image_format == 1)
4097 return rbd_v1_snap_id_by_name(rbd_dev, name);
4098
4099 return rbd_v2_snap_id_by_name(rbd_dev, name);
4100 }
4101
4102 /*
4103 * When an rbd image has a parent image, it is identified by the
4104 * pool, image, and snapshot ids (not names). This function fills
4105 * in the names for those ids. (It's OK if we can't figure out the
4106 * name for an image id, but the pool and snapshot ids should always
4107 * exist and have names.) All names in an rbd spec are dynamically
4108 * allocated.
4109 *
4110 * When an image being mapped (not a parent) is probed, we have the
4111 * pool name and pool id, image name and image id, and the snapshot
4112 * name. The only thing we're missing is the snapshot id.
4113 */
4114 static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
4115 {
4116 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4117 struct rbd_spec *spec = rbd_dev->spec;
4118 const char *pool_name;
4119 const char *image_name;
4120 const char *snap_name;
4121 int ret;
4122
4123 /*
4124 * An image being mapped will have the pool name (etc.), but
4125 * we need to look up the snapshot id.
4126 */
4127 if (spec->pool_name) {
4128 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4129 u64 snap_id;
4130
4131 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4132 if (snap_id == CEPH_NOSNAP)
4133 return -ENOENT;
4134 spec->snap_id = snap_id;
4135 } else {
4136 spec->snap_id = CEPH_NOSNAP;
4137 }
4138
4139 return 0;
4140 }
4141
4142 /* Get the pool name; we have to make our own copy of this */
4143
4144 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4145 if (!pool_name) {
4146 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4147 return -EIO;
4148 }
4149 pool_name = kstrdup(pool_name, GFP_KERNEL);
4150 if (!pool_name)
4151 return -ENOMEM;
4152
4153 /* Fetch the image name; tolerate failure here */
4154
4155 image_name = rbd_dev_image_name(rbd_dev);
4156 if (!image_name)
4157 rbd_warn(rbd_dev, "unable to get image name");
4158
4159 /* Look up the snapshot name, and make a copy */
4160
4161 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4162 if (IS_ERR(snap_name)) {
4163 ret = PTR_ERR(snap_name);
4164 goto out_err;
4165 }
4166
4167 spec->pool_name = pool_name;
4168 spec->image_name = image_name;
4169 spec->snap_name = snap_name;
4170
4171 return 0;
4172 out_err:
4173 kfree(image_name);
4174 kfree(pool_name);
4175
4176 return ret;
4177 }
4178
4179 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4180 {
4181 size_t size;
4182 int ret;
4183 void *reply_buf;
4184 void *p;
4185 void *end;
4186 u64 seq;
4187 u32 snap_count;
4188 struct ceph_snap_context *snapc;
4189 u32 i;
4190
4191 /*
4192 * We'll need room for the seq value (maximum snapshot id),
4193 * snapshot count, and array of that many snapshot ids.
4194 * For now we have a fixed upper limit on the number we're
4195 * prepared to receive.
4196 */
4197 size = sizeof (__le64) + sizeof (__le32) +
4198 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4199 reply_buf = kzalloc(size, GFP_KERNEL);
4200 if (!reply_buf)
4201 return -ENOMEM;
4202
4203 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4204 "rbd", "get_snapcontext", NULL, 0,
4205 reply_buf, size);
4206 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4207 if (ret < 0)
4208 goto out;
4209
4210 p = reply_buf;
4211 end = reply_buf + ret;
4212 ret = -ERANGE;
4213 ceph_decode_64_safe(&p, end, seq, out);
4214 ceph_decode_32_safe(&p, end, snap_count, out);
4215
4216 /*
4217 * Make sure the reported number of snapshot ids wouldn't go
4218 * beyond the end of our buffer. But before checking that,
4219 * make sure the computed size of the snapshot context we
4220 * allocate is representable in a size_t.
4221 */
4222 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4223 / sizeof (u64)) {
4224 ret = -EINVAL;
4225 goto out;
4226 }
4227 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4228 goto out;
4229 ret = 0;
4230
4231 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4232 if (!snapc) {
4233 ret = -ENOMEM;
4234 goto out;
4235 }
4236 snapc->seq = seq;
4237 for (i = 0; i < snap_count; i++)
4238 snapc->snaps[i] = ceph_decode_64(&p);
4239
4240 ceph_put_snap_context(rbd_dev->header.snapc);
4241 rbd_dev->header.snapc = snapc;
4242
4243 dout(" snap context seq = %llu, snap_count = %u\n",
4244 (unsigned long long)seq, (unsigned int)snap_count);
4245 out:
4246 kfree(reply_buf);
4247
4248 return ret;
4249 }
4250
4251 static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4252 u64 snap_id)
4253 {
4254 size_t size;
4255 void *reply_buf;
4256 __le64 snapid;
4257 int ret;
4258 void *p;
4259 void *end;
4260 char *snap_name;
4261
4262 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4263 reply_buf = kmalloc(size, GFP_KERNEL);
4264 if (!reply_buf)
4265 return ERR_PTR(-ENOMEM);
4266
4267 snapid = cpu_to_le64(snap_id);
4268 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4269 "rbd", "get_snapshot_name",
4270 &snapid, sizeof (snapid),
4271 reply_buf, size);
4272 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4273 if (ret < 0) {
4274 snap_name = ERR_PTR(ret);
4275 goto out;
4276 }
4277
4278 p = reply_buf;
4279 end = reply_buf + ret;
4280 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4281 if (IS_ERR(snap_name))
4282 goto out;
4283
4284 dout(" snap_id 0x%016llx snap_name = %s\n",
4285 (unsigned long long)snap_id, snap_name);
4286 out:
4287 kfree(reply_buf);
4288
4289 return snap_name;
4290 }
4291
4292 static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4293 {
4294 bool first_time = rbd_dev->header.object_prefix == NULL;
4295 int ret;
4296
4297 down_write(&rbd_dev->header_rwsem);
4298
4299 ret = rbd_dev_v2_image_size(rbd_dev);
4300 if (ret)
4301 goto out;
4302
4303 if (first_time) {
4304 ret = rbd_dev_v2_header_onetime(rbd_dev);
4305 if (ret)
4306 goto out;
4307 }
4308
4309 /*
4310 * If the image supports layering, get the parent info. We
4311 * need to probe the first time regardless. Thereafter we
4312 * only need to if there's a parent, to see if it has
4313 * disappeared due to the mapped image getting flattened.
4314 */
4315 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4316 (first_time || rbd_dev->parent_spec)) {
4317 bool warn;
4318
4319 ret = rbd_dev_v2_parent_info(rbd_dev);
4320 if (ret)
4321 goto out;
4322
4323 /*
4324 * Print a warning if this is the initial probe and
4325 * the image has a parent. Don't print it if the
4326 * image now being probed is itself a parent. We
4327 * can tell at this point because we won't know its
4328 * pool name yet (just its pool id).
4329 */
4330 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4331 if (first_time && warn)
4332 rbd_warn(rbd_dev, "WARNING: kernel layering "
4333 "is EXPERIMENTAL!");
4334 }
4335
4336 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4337 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4338 rbd_dev->mapping.size = rbd_dev->header.image_size;
4339
4340 ret = rbd_dev_v2_snap_context(rbd_dev);
4341 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4342 out:
4343 up_write(&rbd_dev->header_rwsem);
4344
4345 return ret;
4346 }
4347
4348 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4349 {
4350 struct device *dev;
4351 int ret;
4352
4353 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4354
4355 dev = &rbd_dev->dev;
4356 dev->bus = &rbd_bus_type;
4357 dev->type = &rbd_device_type;
4358 dev->parent = &rbd_root_dev;
4359 dev->release = rbd_dev_device_release;
4360 dev_set_name(dev, "%d", rbd_dev->dev_id);
4361 ret = device_register(dev);
4362
4363 mutex_unlock(&ctl_mutex);
4364
4365 return ret;
4366 }
4367
4368 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4369 {
4370 device_unregister(&rbd_dev->dev);
4371 }
4372
4373 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4374
4375 /*
4376 * Get a unique rbd identifier for the given new rbd_dev, and add
4377 * the rbd_dev to the global list. The minimum rbd id is 1.
4378 */
4379 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4380 {
4381 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4382
4383 spin_lock(&rbd_dev_list_lock);
4384 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4385 spin_unlock(&rbd_dev_list_lock);
4386 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4387 (unsigned long long) rbd_dev->dev_id);
4388 }
4389
4390 /*
4391 * Remove an rbd_dev from the global list, and record that its
4392 * identifier is no longer in use.
4393 */
4394 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4395 {
4396 struct list_head *tmp;
4397 int rbd_id = rbd_dev->dev_id;
4398 int max_id;
4399
4400 rbd_assert(rbd_id > 0);
4401
4402 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4403 (unsigned long long) rbd_dev->dev_id);
4404 spin_lock(&rbd_dev_list_lock);
4405 list_del_init(&rbd_dev->node);
4406
4407 /*
4408 * If the id being "put" is not the current maximum, there
4409 * is nothing special we need to do.
4410 */
4411 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4412 spin_unlock(&rbd_dev_list_lock);
4413 return;
4414 }
4415
4416 /*
4417 * We need to update the current maximum id. Search the
4418 * list to find out what it is. We're more likely to find
4419 * the maximum at the end, so search the list backward.
4420 */
4421 max_id = 0;
4422 list_for_each_prev(tmp, &rbd_dev_list) {
4423 struct rbd_device *rbd_dev;
4424
4425 rbd_dev = list_entry(tmp, struct rbd_device, node);
4426 if (rbd_dev->dev_id > max_id)
4427 max_id = rbd_dev->dev_id;
4428 }
4429 spin_unlock(&rbd_dev_list_lock);
4430
4431 /*
4432 * The max id could have been updated by rbd_dev_id_get(), in
4433 * which case it now accurately reflects the new maximum.
4434 * Be careful not to overwrite the maximum value in that
4435 * case.
4436 */
4437 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4438 dout(" max dev id has been reset\n");
4439 }
4440
4441 /*
4442 * Skips over white space at *buf, and updates *buf to point to the
4443 * first found non-space character (if any). Returns the length of
4444 * the token (string of non-white space characters) found. Note
4445 * that *buf must be terminated with '\0'.
4446 */
4447 static inline size_t next_token(const char **buf)
4448 {
4449 /*
4450 * These are the characters that produce nonzero for
4451 * isspace() in the "C" and "POSIX" locales.
4452 */
4453 const char *spaces = " \f\n\r\t\v";
4454
4455 *buf += strspn(*buf, spaces); /* Find start of token */
4456
4457 return strcspn(*buf, spaces); /* Return token length */
4458 }
4459
4460 /*
4461 * Finds the next token in *buf, and if the provided token buffer is
4462 * big enough, copies the found token into it. The result, if
4463 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4464 * must be terminated with '\0' on entry.
4465 *
4466 * Returns the length of the token found (not including the '\0').
4467 * Return value will be 0 if no token is found, and it will be >=
4468 * token_size if the token would not fit.
4469 *
4470 * The *buf pointer will be updated to point beyond the end of the
4471 * found token. Note that this occurs even if the token buffer is
4472 * too small to hold it.
4473 */
4474 static inline size_t copy_token(const char **buf,
4475 char *token,
4476 size_t token_size)
4477 {
4478 size_t len;
4479
4480 len = next_token(buf);
4481 if (len < token_size) {
4482 memcpy(token, *buf, len);
4483 *(token + len) = '\0';
4484 }
4485 *buf += len;
4486
4487 return len;
4488 }
4489
4490 /*
4491 * Finds the next token in *buf, dynamically allocates a buffer big
4492 * enough to hold a copy of it, and copies the token into the new
4493 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4494 * that a duplicate buffer is created even for a zero-length token.
4495 *
4496 * Returns a pointer to the newly-allocated duplicate, or a null
4497 * pointer if memory for the duplicate was not available. If
4498 * the lenp argument is a non-null pointer, the length of the token
4499 * (not including the '\0') is returned in *lenp.
4500 *
4501 * If successful, the *buf pointer will be updated to point beyond
4502 * the end of the found token.
4503 *
4504 * Note: uses GFP_KERNEL for allocation.
4505 */
4506 static inline char *dup_token(const char **buf, size_t *lenp)
4507 {
4508 char *dup;
4509 size_t len;
4510
4511 len = next_token(buf);
4512 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4513 if (!dup)
4514 return NULL;
4515 *(dup + len) = '\0';
4516 *buf += len;
4517
4518 if (lenp)
4519 *lenp = len;
4520
4521 return dup;
4522 }
4523
4524 /*
4525 * Parse the options provided for an "rbd add" (i.e., rbd image
4526 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4527 * and the data written is passed here via a NUL-terminated buffer.
4528 * Returns 0 if successful or an error code otherwise.
4529 *
4530 * The information extracted from these options is recorded in
4531 * the other parameters which return dynamically-allocated
4532 * structures:
4533 * ceph_opts
4534 * The address of a pointer that will refer to a ceph options
4535 * structure. Caller must release the returned pointer using
4536 * ceph_destroy_options() when it is no longer needed.
4537 * rbd_opts
4538 * Address of an rbd options pointer. Fully initialized by
4539 * this function; caller must release with kfree().
4540 * spec
4541 * Address of an rbd image specification pointer. Fully
4542 * initialized by this function based on parsed options.
4543 * Caller must release with rbd_spec_put().
4544 *
4545 * The options passed take this form:
4546 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4547 * where:
4548 * <mon_addrs>
4549 * A comma-separated list of one or more monitor addresses.
4550 * A monitor address is an ip address, optionally followed
4551 * by a port number (separated by a colon).
4552 * I.e.: ip1[:port1][,ip2[:port2]...]
4553 * <options>
4554 * A comma-separated list of ceph and/or rbd options.
4555 * <pool_name>
4556 * The name of the rados pool containing the rbd image.
4557 * <image_name>
4558 * The name of the image in that pool to map.
4559 * <snap_id>
4560 * An optional snapshot id. If provided, the mapping will
4561 * present data from the image at the time that snapshot was
4562 * created. The image head is used if no snapshot id is
4563 * provided. Snapshot mappings are always read-only.
4564 */
4565 static int rbd_add_parse_args(const char *buf,
4566 struct ceph_options **ceph_opts,
4567 struct rbd_options **opts,
4568 struct rbd_spec **rbd_spec)
4569 {
4570 size_t len;
4571 char *options;
4572 const char *mon_addrs;
4573 char *snap_name;
4574 size_t mon_addrs_size;
4575 struct rbd_spec *spec = NULL;
4576 struct rbd_options *rbd_opts = NULL;
4577 struct ceph_options *copts;
4578 int ret;
4579
4580 /* The first four tokens are required */
4581
4582 len = next_token(&buf);
4583 if (!len) {
4584 rbd_warn(NULL, "no monitor address(es) provided");
4585 return -EINVAL;
4586 }
4587 mon_addrs = buf;
4588 mon_addrs_size = len + 1;
4589 buf += len;
4590
4591 ret = -EINVAL;
4592 options = dup_token(&buf, NULL);
4593 if (!options)
4594 return -ENOMEM;
4595 if (!*options) {
4596 rbd_warn(NULL, "no options provided");
4597 goto out_err;
4598 }
4599
4600 spec = rbd_spec_alloc();
4601 if (!spec)
4602 goto out_mem;
4603
4604 spec->pool_name = dup_token(&buf, NULL);
4605 if (!spec->pool_name)
4606 goto out_mem;
4607 if (!*spec->pool_name) {
4608 rbd_warn(NULL, "no pool name provided");
4609 goto out_err;
4610 }
4611
4612 spec->image_name = dup_token(&buf, NULL);
4613 if (!spec->image_name)
4614 goto out_mem;
4615 if (!*spec->image_name) {
4616 rbd_warn(NULL, "no image name provided");
4617 goto out_err;
4618 }
4619
4620 /*
4621 * Snapshot name is optional; default is to use "-"
4622 * (indicating the head/no snapshot).
4623 */
4624 len = next_token(&buf);
4625 if (!len) {
4626 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4627 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4628 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4629 ret = -ENAMETOOLONG;
4630 goto out_err;
4631 }
4632 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4633 if (!snap_name)
4634 goto out_mem;
4635 *(snap_name + len) = '\0';
4636 spec->snap_name = snap_name;
4637
4638 /* Initialize all rbd options to the defaults */
4639
4640 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4641 if (!rbd_opts)
4642 goto out_mem;
4643
4644 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4645
4646 copts = ceph_parse_options(options, mon_addrs,
4647 mon_addrs + mon_addrs_size - 1,
4648 parse_rbd_opts_token, rbd_opts);
4649 if (IS_ERR(copts)) {
4650 ret = PTR_ERR(copts);
4651 goto out_err;
4652 }
4653 kfree(options);
4654
4655 *ceph_opts = copts;
4656 *opts = rbd_opts;
4657 *rbd_spec = spec;
4658
4659 return 0;
4660 out_mem:
4661 ret = -ENOMEM;
4662 out_err:
4663 kfree(rbd_opts);
4664 rbd_spec_put(spec);
4665 kfree(options);
4666
4667 return ret;
4668 }
4669
4670 /*
4671 * An rbd format 2 image has a unique identifier, distinct from the
4672 * name given to it by the user. Internally, that identifier is
4673 * what's used to specify the names of objects related to the image.
4674 *
4675 * A special "rbd id" object is used to map an rbd image name to its
4676 * id. If that object doesn't exist, then there is no v2 rbd image
4677 * with the supplied name.
4678 *
4679 * This function will record the given rbd_dev's image_id field if
4680 * it can be determined, and in that case will return 0. If any
4681 * errors occur a negative errno will be returned and the rbd_dev's
4682 * image_id field will be unchanged (and should be NULL).
4683 */
4684 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4685 {
4686 int ret;
4687 size_t size;
4688 char *object_name;
4689 void *response;
4690 char *image_id;
4691
4692 /*
4693 * When probing a parent image, the image id is already
4694 * known (and the image name likely is not). There's no
4695 * need to fetch the image id again in this case. We
4696 * do still need to set the image format though.
4697 */
4698 if (rbd_dev->spec->image_id) {
4699 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4700
4701 return 0;
4702 }
4703
4704 /*
4705 * First, see if the format 2 image id file exists, and if
4706 * so, get the image's persistent id from it.
4707 */
4708 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4709 object_name = kmalloc(size, GFP_NOIO);
4710 if (!object_name)
4711 return -ENOMEM;
4712 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4713 dout("rbd id object name is %s\n", object_name);
4714
4715 /* Response will be an encoded string, which includes a length */
4716
4717 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4718 response = kzalloc(size, GFP_NOIO);
4719 if (!response) {
4720 ret = -ENOMEM;
4721 goto out;
4722 }
4723
4724 /* If it doesn't exist we'll assume it's a format 1 image */
4725
4726 ret = rbd_obj_method_sync(rbd_dev, object_name,
4727 "rbd", "get_id", NULL, 0,
4728 response, RBD_IMAGE_ID_LEN_MAX);
4729 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4730 if (ret == -ENOENT) {
4731 image_id = kstrdup("", GFP_KERNEL);
4732 ret = image_id ? 0 : -ENOMEM;
4733 if (!ret)
4734 rbd_dev->image_format = 1;
4735 } else if (ret > sizeof (__le32)) {
4736 void *p = response;
4737
4738 image_id = ceph_extract_encoded_string(&p, p + ret,
4739 NULL, GFP_NOIO);
4740 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4741 if (!ret)
4742 rbd_dev->image_format = 2;
4743 } else {
4744 ret = -EINVAL;
4745 }
4746
4747 if (!ret) {
4748 rbd_dev->spec->image_id = image_id;
4749 dout("image_id is %s\n", image_id);
4750 }
4751 out:
4752 kfree(response);
4753 kfree(object_name);
4754
4755 return ret;
4756 }
4757
4758 /*
4759 * Undo whatever state changes are made by v1 or v2 header info
4760 * call.
4761 */
4762 static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4763 {
4764 struct rbd_image_header *header;
4765
4766 /* Drop parent reference unless it's already been done (or none) */
4767
4768 if (rbd_dev->parent_overlap)
4769 rbd_dev_parent_put(rbd_dev);
4770
4771 /* Free dynamic fields from the header, then zero it out */
4772
4773 header = &rbd_dev->header;
4774 ceph_put_snap_context(header->snapc);
4775 kfree(header->snap_sizes);
4776 kfree(header->snap_names);
4777 kfree(header->object_prefix);
4778 memset(header, 0, sizeof (*header));
4779 }
4780
4781 static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4782 {
4783 int ret;
4784
4785 ret = rbd_dev_v2_object_prefix(rbd_dev);
4786 if (ret)
4787 goto out_err;
4788
4789 /*
4790 * Get the and check features for the image. Currently the
4791 * features are assumed to never change.
4792 */
4793 ret = rbd_dev_v2_features(rbd_dev);
4794 if (ret)
4795 goto out_err;
4796
4797 /* If the image supports fancy striping, get its parameters */
4798
4799 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4800 ret = rbd_dev_v2_striping_info(rbd_dev);
4801 if (ret < 0)
4802 goto out_err;
4803 }
4804 /* No support for crypto and compression type format 2 images */
4805
4806 return 0;
4807 out_err:
4808 rbd_dev->header.features = 0;
4809 kfree(rbd_dev->header.object_prefix);
4810 rbd_dev->header.object_prefix = NULL;
4811
4812 return ret;
4813 }
4814
4815 static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4816 {
4817 struct rbd_device *parent = NULL;
4818 struct rbd_spec *parent_spec;
4819 struct rbd_client *rbdc;
4820 int ret;
4821
4822 if (!rbd_dev->parent_spec)
4823 return 0;
4824 /*
4825 * We need to pass a reference to the client and the parent
4826 * spec when creating the parent rbd_dev. Images related by
4827 * parent/child relationships always share both.
4828 */
4829 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4830 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4831
4832 ret = -ENOMEM;
4833 parent = rbd_dev_create(rbdc, parent_spec);
4834 if (!parent)
4835 goto out_err;
4836
4837 ret = rbd_dev_image_probe(parent, false);
4838 if (ret < 0)
4839 goto out_err;
4840 rbd_dev->parent = parent;
4841 atomic_set(&rbd_dev->parent_ref, 1);
4842
4843 return 0;
4844 out_err:
4845 if (parent) {
4846 rbd_dev_unparent(rbd_dev);
4847 kfree(rbd_dev->header_name);
4848 rbd_dev_destroy(parent);
4849 } else {
4850 rbd_put_client(rbdc);
4851 rbd_spec_put(parent_spec);
4852 }
4853
4854 return ret;
4855 }
4856
4857 static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
4858 {
4859 int ret;
4860
4861 /* generate unique id: find highest unique id, add one */
4862 rbd_dev_id_get(rbd_dev);
4863
4864 /* Fill in the device name, now that we have its id. */
4865 BUILD_BUG_ON(DEV_NAME_LEN
4866 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4867 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4868
4869 /* Get our block major device number. */
4870
4871 ret = register_blkdev(0, rbd_dev->name);
4872 if (ret < 0)
4873 goto err_out_id;
4874 rbd_dev->major = ret;
4875
4876 /* Set up the blkdev mapping. */
4877
4878 ret = rbd_init_disk(rbd_dev);
4879 if (ret)
4880 goto err_out_blkdev;
4881
4882 ret = rbd_dev_mapping_set(rbd_dev);
4883 if (ret)
4884 goto err_out_disk;
4885 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4886
4887 ret = rbd_bus_add_dev(rbd_dev);
4888 if (ret)
4889 goto err_out_mapping;
4890
4891 /* Everything's ready. Announce the disk to the world. */
4892
4893 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4894 add_disk(rbd_dev->disk);
4895
4896 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4897 (unsigned long long) rbd_dev->mapping.size);
4898
4899 return ret;
4900
4901 err_out_mapping:
4902 rbd_dev_mapping_clear(rbd_dev);
4903 err_out_disk:
4904 rbd_free_disk(rbd_dev);
4905 err_out_blkdev:
4906 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4907 err_out_id:
4908 rbd_dev_id_put(rbd_dev);
4909 rbd_dev_mapping_clear(rbd_dev);
4910
4911 return ret;
4912 }
4913
4914 static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4915 {
4916 struct rbd_spec *spec = rbd_dev->spec;
4917 size_t size;
4918
4919 /* Record the header object name for this rbd image. */
4920
4921 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4922
4923 if (rbd_dev->image_format == 1)
4924 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4925 else
4926 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4927
4928 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4929 if (!rbd_dev->header_name)
4930 return -ENOMEM;
4931
4932 if (rbd_dev->image_format == 1)
4933 sprintf(rbd_dev->header_name, "%s%s",
4934 spec->image_name, RBD_SUFFIX);
4935 else
4936 sprintf(rbd_dev->header_name, "%s%s",
4937 RBD_HEADER_PREFIX, spec->image_id);
4938 return 0;
4939 }
4940
4941 static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4942 {
4943 rbd_dev_unprobe(rbd_dev);
4944 kfree(rbd_dev->header_name);
4945 rbd_dev->header_name = NULL;
4946 rbd_dev->image_format = 0;
4947 kfree(rbd_dev->spec->image_id);
4948 rbd_dev->spec->image_id = NULL;
4949
4950 rbd_dev_destroy(rbd_dev);
4951 }
4952
4953 /*
4954 * Probe for the existence of the header object for the given rbd
4955 * device. If this image is the one being mapped (i.e., not a
4956 * parent), initiate a watch on its header object before using that
4957 * object to get detailed information about the rbd image.
4958 */
4959 static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
4960 {
4961 int ret;
4962 int tmp;
4963
4964 /*
4965 * Get the id from the image id object. Unless there's an
4966 * error, rbd_dev->spec->image_id will be filled in with
4967 * a dynamically-allocated string, and rbd_dev->image_format
4968 * will be set to either 1 or 2.
4969 */
4970 ret = rbd_dev_image_id(rbd_dev);
4971 if (ret)
4972 return ret;
4973 rbd_assert(rbd_dev->spec->image_id);
4974 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4975
4976 ret = rbd_dev_header_name(rbd_dev);
4977 if (ret)
4978 goto err_out_format;
4979
4980 if (mapping) {
4981 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4982 if (ret)
4983 goto out_header_name;
4984 }
4985
4986 if (rbd_dev->image_format == 1)
4987 ret = rbd_dev_v1_header_info(rbd_dev);
4988 else
4989 ret = rbd_dev_v2_header_info(rbd_dev);
4990 if (ret)
4991 goto err_out_watch;
4992
4993 ret = rbd_dev_spec_update(rbd_dev);
4994 if (ret)
4995 goto err_out_probe;
4996
4997 ret = rbd_dev_probe_parent(rbd_dev);
4998 if (ret)
4999 goto err_out_probe;
5000
5001 dout("discovered format %u image, header name is %s\n",
5002 rbd_dev->image_format, rbd_dev->header_name);
5003
5004 return 0;
5005 err_out_probe:
5006 rbd_dev_unprobe(rbd_dev);
5007 err_out_watch:
5008 if (mapping) {
5009 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
5010 if (tmp)
5011 rbd_warn(rbd_dev, "unable to tear down "
5012 "watch request (%d)\n", tmp);
5013 }
5014 out_header_name:
5015 kfree(rbd_dev->header_name);
5016 rbd_dev->header_name = NULL;
5017 err_out_format:
5018 rbd_dev->image_format = 0;
5019 kfree(rbd_dev->spec->image_id);
5020 rbd_dev->spec->image_id = NULL;
5021
5022 dout("probe failed, returning %d\n", ret);
5023
5024 return ret;
5025 }
5026
5027 static ssize_t rbd_add(struct bus_type *bus,
5028 const char *buf,
5029 size_t count)
5030 {
5031 struct rbd_device *rbd_dev = NULL;
5032 struct ceph_options *ceph_opts = NULL;
5033 struct rbd_options *rbd_opts = NULL;
5034 struct rbd_spec *spec = NULL;
5035 struct rbd_client *rbdc;
5036 struct ceph_osd_client *osdc;
5037 bool read_only;
5038 int rc = -ENOMEM;
5039
5040 if (!try_module_get(THIS_MODULE))
5041 return -ENODEV;
5042
5043 /* parse add command */
5044 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5045 if (rc < 0)
5046 goto err_out_module;
5047 read_only = rbd_opts->read_only;
5048 kfree(rbd_opts);
5049 rbd_opts = NULL; /* done with this */
5050
5051 rbdc = rbd_get_client(ceph_opts);
5052 if (IS_ERR(rbdc)) {
5053 rc = PTR_ERR(rbdc);
5054 goto err_out_args;
5055 }
5056
5057 /* pick the pool */
5058 osdc = &rbdc->client->osdc;
5059 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
5060 if (rc < 0)
5061 goto err_out_client;
5062 spec->pool_id = (u64)rc;
5063
5064 /* The ceph file layout needs to fit pool id in 32 bits */
5065
5066 if (spec->pool_id > (u64)U32_MAX) {
5067 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5068 (unsigned long long)spec->pool_id, U32_MAX);
5069 rc = -EIO;
5070 goto err_out_client;
5071 }
5072
5073 rbd_dev = rbd_dev_create(rbdc, spec);
5074 if (!rbd_dev)
5075 goto err_out_client;
5076 rbdc = NULL; /* rbd_dev now owns this */
5077 spec = NULL; /* rbd_dev now owns this */
5078
5079 rc = rbd_dev_image_probe(rbd_dev, true);
5080 if (rc < 0)
5081 goto err_out_rbd_dev;
5082
5083 /* If we are mapping a snapshot it must be marked read-only */
5084
5085 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5086 read_only = true;
5087 rbd_dev->mapping.read_only = read_only;
5088
5089 rc = rbd_dev_device_setup(rbd_dev);
5090 if (rc) {
5091 rbd_dev_image_release(rbd_dev);
5092 goto err_out_module;
5093 }
5094
5095 return count;
5096
5097 err_out_rbd_dev:
5098 rbd_dev_destroy(rbd_dev);
5099 err_out_client:
5100 rbd_put_client(rbdc);
5101 err_out_args:
5102 rbd_spec_put(spec);
5103 err_out_module:
5104 module_put(THIS_MODULE);
5105
5106 dout("Error adding device %s\n", buf);
5107
5108 return (ssize_t)rc;
5109 }
5110
5111 static void rbd_dev_device_release(struct device *dev)
5112 {
5113 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5114
5115 rbd_free_disk(rbd_dev);
5116 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5117 rbd_dev_mapping_clear(rbd_dev);
5118 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5119 rbd_dev->major = 0;
5120 rbd_dev_id_put(rbd_dev);
5121 rbd_dev_mapping_clear(rbd_dev);
5122 }
5123
5124 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5125 {
5126 while (rbd_dev->parent) {
5127 struct rbd_device *first = rbd_dev;
5128 struct rbd_device *second = first->parent;
5129 struct rbd_device *third;
5130
5131 /*
5132 * Follow to the parent with no grandparent and
5133 * remove it.
5134 */
5135 while (second && (third = second->parent)) {
5136 first = second;
5137 second = third;
5138 }
5139 rbd_assert(second);
5140 rbd_dev_image_release(second);
5141 first->parent = NULL;
5142 first->parent_overlap = 0;
5143
5144 rbd_assert(first->parent_spec);
5145 rbd_spec_put(first->parent_spec);
5146 first->parent_spec = NULL;
5147 }
5148 }
5149
5150 static ssize_t rbd_remove(struct bus_type *bus,
5151 const char *buf,
5152 size_t count)
5153 {
5154 struct rbd_device *rbd_dev = NULL;
5155 struct list_head *tmp;
5156 int dev_id;
5157 unsigned long ul;
5158 bool already = false;
5159 int ret;
5160
5161 ret = strict_strtoul(buf, 10, &ul);
5162 if (ret)
5163 return ret;
5164
5165 /* convert to int; abort if we lost anything in the conversion */
5166 dev_id = (int)ul;
5167 if (dev_id != ul)
5168 return -EINVAL;
5169
5170 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5171
5172 ret = -ENOENT;
5173 spin_lock(&rbd_dev_list_lock);
5174 list_for_each(tmp, &rbd_dev_list) {
5175 rbd_dev = list_entry(tmp, struct rbd_device, node);
5176 if (rbd_dev->dev_id == dev_id) {
5177 ret = 0;
5178 break;
5179 }
5180 }
5181 if (!ret) {
5182 spin_lock_irq(&rbd_dev->lock);
5183 if (rbd_dev->open_count)
5184 ret = -EBUSY;
5185 else
5186 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5187 &rbd_dev->flags);
5188 spin_unlock_irq(&rbd_dev->lock);
5189 }
5190 spin_unlock(&rbd_dev_list_lock);
5191 if (ret < 0 || already)
5192 goto done;
5193
5194 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5195 if (ret)
5196 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
5197
5198 /*
5199 * flush remaining watch callbacks - these must be complete
5200 * before the osd_client is shutdown
5201 */
5202 dout("%s: flushing notifies", __func__);
5203 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5204 /*
5205 * Don't free anything from rbd_dev->disk until after all
5206 * notifies are completely processed. Otherwise
5207 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5208 * in a potential use after free of rbd_dev->disk or rbd_dev.
5209 */
5210 rbd_bus_del_dev(rbd_dev);
5211 rbd_dev_image_release(rbd_dev);
5212 module_put(THIS_MODULE);
5213 ret = count;
5214 done:
5215 mutex_unlock(&ctl_mutex);
5216
5217 return ret;
5218 }
5219
5220 /*
5221 * create control files in sysfs
5222 * /sys/bus/rbd/...
5223 */
5224 static int rbd_sysfs_init(void)
5225 {
5226 int ret;
5227
5228 ret = device_register(&rbd_root_dev);
5229 if (ret < 0)
5230 return ret;
5231
5232 ret = bus_register(&rbd_bus_type);
5233 if (ret < 0)
5234 device_unregister(&rbd_root_dev);
5235
5236 return ret;
5237 }
5238
5239 static void rbd_sysfs_cleanup(void)
5240 {
5241 bus_unregister(&rbd_bus_type);
5242 device_unregister(&rbd_root_dev);
5243 }
5244
5245 static int rbd_slab_init(void)
5246 {
5247 rbd_assert(!rbd_img_request_cache);
5248 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5249 sizeof (struct rbd_img_request),
5250 __alignof__(struct rbd_img_request),
5251 0, NULL);
5252 if (!rbd_img_request_cache)
5253 return -ENOMEM;
5254
5255 rbd_assert(!rbd_obj_request_cache);
5256 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5257 sizeof (struct rbd_obj_request),
5258 __alignof__(struct rbd_obj_request),
5259 0, NULL);
5260 if (!rbd_obj_request_cache)
5261 goto out_err;
5262
5263 rbd_assert(!rbd_segment_name_cache);
5264 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5265 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5266 if (rbd_segment_name_cache)
5267 return 0;
5268 out_err:
5269 if (rbd_obj_request_cache) {
5270 kmem_cache_destroy(rbd_obj_request_cache);
5271 rbd_obj_request_cache = NULL;
5272 }
5273
5274 kmem_cache_destroy(rbd_img_request_cache);
5275 rbd_img_request_cache = NULL;
5276
5277 return -ENOMEM;
5278 }
5279
5280 static void rbd_slab_exit(void)
5281 {
5282 rbd_assert(rbd_segment_name_cache);
5283 kmem_cache_destroy(rbd_segment_name_cache);
5284 rbd_segment_name_cache = NULL;
5285
5286 rbd_assert(rbd_obj_request_cache);
5287 kmem_cache_destroy(rbd_obj_request_cache);
5288 rbd_obj_request_cache = NULL;
5289
5290 rbd_assert(rbd_img_request_cache);
5291 kmem_cache_destroy(rbd_img_request_cache);
5292 rbd_img_request_cache = NULL;
5293 }
5294
5295 static int __init rbd_init(void)
5296 {
5297 int rc;
5298
5299 if (!libceph_compatible(NULL)) {
5300 rbd_warn(NULL, "libceph incompatibility (quitting)");
5301
5302 return -EINVAL;
5303 }
5304 rc = rbd_slab_init();
5305 if (rc)
5306 return rc;
5307 rc = rbd_sysfs_init();
5308 if (rc)
5309 rbd_slab_exit();
5310 else
5311 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5312
5313 return rc;
5314 }
5315
5316 static void __exit rbd_exit(void)
5317 {
5318 rbd_sysfs_cleanup();
5319 rbd_slab_exit();
5320 }
5321
5322 module_init(rbd_init);
5323 module_exit(rbd_exit);
5324
5325 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5326 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5327 MODULE_DESCRIPTION("rados block device");
5328
5329 /* following authorship retained from original osdblk.c */
5330 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5331
5332 MODULE_LICENSE("GPL");