Merge tag 'v3.10.66' into update
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
a2acd00e
AE
58/*
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
63 */
64static int atomic_inc_return_safe(atomic_t *v)
65{
66 unsigned int counter;
67
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
70 return (int)counter;
71
72 atomic_dec(v);
73
74 return -EINVAL;
75}
76
77/* Decrement the counter. Return the resulting value, or -EINVAL */
78static int atomic_dec_return_safe(atomic_t *v)
79{
80 int counter;
81
82 counter = atomic_dec_return(v);
83 if (counter >= 0)
84 return counter;
85
86 atomic_inc(v);
87
88 return -EINVAL;
89}
90
f0f8cef5
AE
91#define RBD_DRV_NAME "rbd"
92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
93
94#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95
d4b125e9
AE
96#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97#define RBD_MAX_SNAP_NAME_LEN \
98 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
35d489f9 100#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
101
102#define RBD_SNAP_HEAD_NAME "-"
103
9682fc6d
AE
104#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105
9e15b77d
AE
106/* This allows a single page to hold an image name sent by OSD */
107#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 108#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 109
1e130199 110#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 111
d889140c
AE
112/* Feature bits */
113
5cbf6f12
AE
114#define RBD_FEATURE_LAYERING (1<<0)
115#define RBD_FEATURE_STRIPINGV2 (1<<1)
116#define RBD_FEATURES_ALL \
117 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
118
119/* Features supported by this (client software) implementation. */
120
770eba6e 121#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 122
81a89793
AE
123/*
124 * An RBD device name will be "rbd#", where the "rbd" comes from
125 * RBD_DRV_NAME above, and # is a unique integer identifier.
126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127 * enough to hold all possible device names.
128 */
602adf40 129#define DEV_NAME_LEN 32
81a89793 130#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
131
132/*
133 * block device image metadata (in-memory version)
134 */
135struct rbd_image_header {
f35a4dee 136 /* These six fields never change for a given rbd image */
849b4260 137 char *object_prefix;
602adf40
YS
138 __u8 obj_order;
139 __u8 crypt_type;
140 __u8 comp_type;
f35a4dee
AE
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
602adf40 144
f84344f3
AE
145 /* The remaining fields need to be updated occasionally */
146 u64 image_size;
147 struct ceph_snap_context *snapc;
f35a4dee
AE
148 char *snap_names; /* format 1 only */
149 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
150};
151
0d7dbfce
AE
152/*
153 * An rbd image specification.
154 *
155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
156 * identify an image. Each rbd_dev structure includes a pointer to
157 * an rbd_spec structure that encapsulates this identity.
158 *
159 * Each of the id's in an rbd_spec has an associated name. For a
160 * user-mapped image, the names are supplied and the id's associated
161 * with them are looked up. For a layered image, a parent image is
162 * defined by the tuple, and the names are looked up.
163 *
164 * An rbd_dev structure contains a parent_spec pointer which is
165 * non-null if the image it represents is a child in a layered
166 * image. This pointer will refer to the rbd_spec structure used
167 * by the parent rbd_dev for its own identity (i.e., the structure
168 * is shared between the parent and child).
169 *
170 * Since these structures are populated once, during the discovery
171 * phase of image construction, they are effectively immutable so
172 * we make no effort to synchronize access to them.
173 *
174 * Note that code herein does not assume the image name is known (it
175 * could be a null pointer).
0d7dbfce
AE
176 */
177struct rbd_spec {
178 u64 pool_id;
ecb4dc22 179 const char *pool_name;
0d7dbfce 180
ecb4dc22
AE
181 const char *image_id;
182 const char *image_name;
0d7dbfce
AE
183
184 u64 snap_id;
ecb4dc22 185 const char *snap_name;
0d7dbfce
AE
186
187 struct kref kref;
188};
189
602adf40 190/*
f0f8cef5 191 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
192 */
193struct rbd_client {
194 struct ceph_client *client;
195 struct kref kref;
196 struct list_head node;
197};
198
bf0d5f50
AE
199struct rbd_img_request;
200typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203
204struct rbd_obj_request;
205typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
9969ebc5
AE
207enum obj_request_type {
208 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209};
bf0d5f50 210
926f9b3f
AE
211enum obj_req_flags {
212 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 213 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
214 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
215 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
216};
217
bf0d5f50
AE
218struct rbd_obj_request {
219 const char *object_name;
220 u64 offset; /* object start byte */
221 u64 length; /* bytes from offset */
926f9b3f 222 unsigned long flags;
bf0d5f50 223
c5b5ef6c
AE
224 /*
225 * An object request associated with an image will have its
226 * img_data flag set; a standalone object request will not.
227 *
228 * A standalone object request will have which == BAD_WHICH
229 * and a null obj_request pointer.
230 *
231 * An object request initiated in support of a layered image
232 * object (to check for its existence before a write) will
233 * have which == BAD_WHICH and a non-null obj_request pointer.
234 *
235 * Finally, an object request for rbd image data will have
236 * which != BAD_WHICH, and will have a non-null img_request
237 * pointer. The value of which will be in the range
238 * 0..(img_request->obj_request_count-1).
239 */
240 union {
241 struct rbd_obj_request *obj_request; /* STAT op */
242 struct {
243 struct rbd_img_request *img_request;
244 u64 img_offset;
245 /* links for img_request->obj_requests list */
246 struct list_head links;
247 };
248 };
bf0d5f50
AE
249 u32 which; /* posn image request list */
250
251 enum obj_request_type type;
788e2df3
AE
252 union {
253 struct bio *bio_list;
254 struct {
255 struct page **pages;
256 u32 page_count;
257 };
258 };
0eefd470 259 struct page **copyup_pages;
ebda6408 260 u32 copyup_page_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50
AE
266
267 rbd_obj_callback_t callback;
788e2df3 268 struct completion completion;
bf0d5f50
AE
269
270 struct kref kref;
271};
272
0c425248 273enum img_req_flags {
9849e986
AE
274 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
275 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 276 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
277};
278
bf0d5f50 279struct rbd_img_request {
bf0d5f50
AE
280 struct rbd_device *rbd_dev;
281 u64 offset; /* starting image byte offset */
282 u64 length; /* byte count from offset */
0c425248 283 unsigned long flags;
bf0d5f50 284 union {
9849e986 285 u64 snap_id; /* for reads */
bf0d5f50 286 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
287 };
288 union {
289 struct request *rq; /* block request */
290 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 291 };
3d7efd18 292 struct page **copyup_pages;
ebda6408 293 u32 copyup_page_count;
bf0d5f50
AE
294 spinlock_t completion_lock;/* protects next_completion */
295 u32 next_completion;
296 rbd_img_callback_t callback;
55f27e09 297 u64 xferred;/* aggregate bytes transferred */
a5a337d4 298 int result; /* first nonzero obj_request result */
bf0d5f50
AE
299
300 u32 obj_request_count;
301 struct list_head obj_requests; /* rbd_obj_request structs */
302
303 struct kref kref;
304};
305
306#define for_each_obj_request(ireq, oreq) \
ef06f4d3 307 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 308#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 309 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 310#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 311 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 312
f84344f3 313struct rbd_mapping {
99c1f08f 314 u64 size;
34b13184 315 u64 features;
f84344f3
AE
316 bool read_only;
317};
318
602adf40
YS
319/*
320 * a single device
321 */
322struct rbd_device {
de71a297 323 int dev_id; /* blkdev unique id */
602adf40
YS
324
325 int major; /* blkdev assigned major */
326 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 327
a30b71b9 328 u32 image_format; /* Either 1 or 2 */
602adf40
YS
329 struct rbd_client *rbd_client;
330
331 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
b82d167b 333 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
334
335 struct rbd_image_header header;
b82d167b 336 unsigned long flags; /* possibly lock protected */
0d7dbfce 337 struct rbd_spec *spec;
602adf40 338
0d7dbfce 339 char *header_name;
971f839a 340
0903e875
AE
341 struct ceph_file_layout layout;
342
59c2be1e 343 struct ceph_osd_event *watch_event;
975241af 344 struct rbd_obj_request *watch_request;
59c2be1e 345
86b00e0d
AE
346 struct rbd_spec *parent_spec;
347 u64 parent_overlap;
a2acd00e 348 atomic_t parent_ref;
2f82ee54 349 struct rbd_device *parent;
86b00e0d 350
c666601a
JD
351 /* protects updating the header */
352 struct rw_semaphore header_rwsem;
f84344f3
AE
353
354 struct rbd_mapping mapping;
602adf40
YS
355
356 struct list_head node;
dfc5606d 357
dfc5606d
YS
358 /* sysfs related */
359 struct device dev;
b82d167b 360 unsigned long open_count; /* protected by lock */
dfc5606d
YS
361};
362
b82d167b
AE
363/*
364 * Flag bits for rbd_dev->flags. If atomicity is required,
365 * rbd_dev->lock is used to protect access.
366 *
367 * Currently, only the "removing" flag (which is coupled with the
368 * "open_count" field) requires atomic access.
369 */
6d292906
AE
370enum rbd_dev_flags {
371 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 372 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
373};
374
602adf40 375static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 376
602adf40 377static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
378static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
432b8587
AE
380static LIST_HEAD(rbd_client_list); /* clients */
381static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 382
78c2a44a
AE
383/* Slab caches for frequently-allocated structures */
384
1c2a9dfe 385static struct kmem_cache *rbd_img_request_cache;
868311b1 386static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 387static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 388
3d7efd18
AE
389static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
200a6a8b 391static void rbd_dev_device_release(struct device *dev);
dfc5606d 392
f0f8cef5
AE
393static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394 size_t count);
395static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396 size_t count);
1f3ef788 397static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
a2acd00e 398static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5
AE
399
400static struct bus_attribute rbd_bus_attrs[] = {
401 __ATTR(add, S_IWUSR, NULL, rbd_add),
402 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403 __ATTR_NULL
404};
405
406static struct bus_type rbd_bus_type = {
407 .name = "rbd",
408 .bus_attrs = rbd_bus_attrs,
409};
410
411static void rbd_root_dev_release(struct device *dev)
412{
413}
414
415static struct device rbd_root_dev = {
416 .init_name = "rbd",
417 .release = rbd_root_dev_release,
418};
419
06ecc6cb
AE
420static __printf(2, 3)
421void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422{
423 struct va_format vaf;
424 va_list args;
425
426 va_start(args, fmt);
427 vaf.fmt = fmt;
428 vaf.va = &args;
429
430 if (!rbd_dev)
431 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432 else if (rbd_dev->disk)
433 printk(KERN_WARNING "%s: %s: %pV\n",
434 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435 else if (rbd_dev->spec && rbd_dev->spec->image_name)
436 printk(KERN_WARNING "%s: image %s: %pV\n",
437 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438 else if (rbd_dev->spec && rbd_dev->spec->image_id)
439 printk(KERN_WARNING "%s: id %s: %pV\n",
440 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441 else /* punt */
442 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443 RBD_DRV_NAME, rbd_dev, &vaf);
444 va_end(args);
445}
446
aafb230e
AE
447#ifdef RBD_DEBUG
448#define rbd_assert(expr) \
449 if (unlikely(!(expr))) { \
450 printk(KERN_ERR "\nAssertion failure in %s() " \
451 "at line %d:\n\n" \
452 "\trbd_assert(%s);\n\n", \
453 __func__, __LINE__, #expr); \
454 BUG(); \
455 }
456#else /* !RBD_DEBUG */
457# define rbd_assert(expr) ((void) 0)
458#endif /* !RBD_DEBUG */
dfc5606d 459
b454e36d 460static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
461static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 463
cc4a38bd 464static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
465static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
467static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468 u64 snap_id);
2ad3d716
AE
469static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470 u8 *order, u64 *snap_size);
471static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472 u64 *snap_features);
473static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 474
602adf40
YS
475static int rbd_open(struct block_device *bdev, fmode_t mode)
476{
f0f8cef5 477 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 478 bool removing = false;
602adf40 479
f84344f3 480 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
481 return -EROFS;
482
a14ea269 483 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
484 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485 removing = true;
486 else
487 rbd_dev->open_count++;
a14ea269 488 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
489 if (removing)
490 return -ENOENT;
491
42382b70 492 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 493 (void) get_device(&rbd_dev->dev);
f84344f3 494 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 495 mutex_unlock(&ctl_mutex);
340c7a2b 496
602adf40
YS
497 return 0;
498}
499
db2a144b 500static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
501{
502 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
503 unsigned long open_count_before;
504
a14ea269 505 spin_lock_irq(&rbd_dev->lock);
b82d167b 506 open_count_before = rbd_dev->open_count--;
a14ea269 507 spin_unlock_irq(&rbd_dev->lock);
b82d167b 508 rbd_assert(open_count_before > 0);
dfc5606d 509
42382b70 510 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 511 put_device(&rbd_dev->dev);
42382b70 512 mutex_unlock(&ctl_mutex);
dfc5606d
YS
513}
514
602adf40
YS
515static const struct block_device_operations rbd_bd_ops = {
516 .owner = THIS_MODULE,
517 .open = rbd_open,
dfc5606d 518 .release = rbd_release,
602adf40
YS
519};
520
521/*
7262cfca
AE
522 * Initialize an rbd client instance. Success or not, this function
523 * consumes ceph_opts.
602adf40 524 */
f8c38929 525static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
526{
527 struct rbd_client *rbdc;
528 int ret = -ENOMEM;
529
37206ee5 530 dout("%s:\n", __func__);
602adf40
YS
531 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
532 if (!rbdc)
533 goto out_opt;
534
535 kref_init(&rbdc->kref);
536 INIT_LIST_HEAD(&rbdc->node);
537
bc534d86
AE
538 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
539
43ae4701 540 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 541 if (IS_ERR(rbdc->client))
bc534d86 542 goto out_mutex;
43ae4701 543 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
544
545 ret = ceph_open_session(rbdc->client);
546 if (ret < 0)
547 goto out_err;
548
432b8587 549 spin_lock(&rbd_client_list_lock);
602adf40 550 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 551 spin_unlock(&rbd_client_list_lock);
602adf40 552
bc534d86 553 mutex_unlock(&ctl_mutex);
37206ee5 554 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 555
602adf40
YS
556 return rbdc;
557
558out_err:
559 ceph_destroy_client(rbdc->client);
bc534d86
AE
560out_mutex:
561 mutex_unlock(&ctl_mutex);
602adf40
YS
562 kfree(rbdc);
563out_opt:
43ae4701
AE
564 if (ceph_opts)
565 ceph_destroy_options(ceph_opts);
37206ee5
AE
566 dout("%s: error %d\n", __func__, ret);
567
28f259b7 568 return ERR_PTR(ret);
602adf40
YS
569}
570
2f82ee54
AE
571static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
572{
573 kref_get(&rbdc->kref);
574
575 return rbdc;
576}
577
602adf40 578/*
1f7ba331
AE
579 * Find a ceph client with specific addr and configuration. If
580 * found, bump its reference count.
602adf40 581 */
1f7ba331 582static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
583{
584 struct rbd_client *client_node;
1f7ba331 585 bool found = false;
602adf40 586
43ae4701 587 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
588 return NULL;
589
1f7ba331
AE
590 spin_lock(&rbd_client_list_lock);
591 list_for_each_entry(client_node, &rbd_client_list, node) {
592 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
593 __rbd_get_client(client_node);
594
1f7ba331
AE
595 found = true;
596 break;
597 }
598 }
599 spin_unlock(&rbd_client_list_lock);
600
601 return found ? client_node : NULL;
602adf40
YS
602}
603
59c2be1e
YS
604/*
605 * mount options
606 */
607enum {
59c2be1e
YS
608 Opt_last_int,
609 /* int args above */
610 Opt_last_string,
611 /* string args above */
cc0538b6
AE
612 Opt_read_only,
613 Opt_read_write,
614 /* Boolean args above */
615 Opt_last_bool,
59c2be1e
YS
616};
617
43ae4701 618static match_table_t rbd_opts_tokens = {
59c2be1e
YS
619 /* int args above */
620 /* string args above */
be466c1c 621 {Opt_read_only, "read_only"},
cc0538b6
AE
622 {Opt_read_only, "ro"}, /* Alternate spelling */
623 {Opt_read_write, "read_write"},
624 {Opt_read_write, "rw"}, /* Alternate spelling */
625 /* Boolean args above */
59c2be1e
YS
626 {-1, NULL}
627};
628
98571b5a
AE
629struct rbd_options {
630 bool read_only;
631};
632
633#define RBD_READ_ONLY_DEFAULT false
634
59c2be1e
YS
635static int parse_rbd_opts_token(char *c, void *private)
636{
43ae4701 637 struct rbd_options *rbd_opts = private;
59c2be1e
YS
638 substring_t argstr[MAX_OPT_ARGS];
639 int token, intval, ret;
640
43ae4701 641 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
642 if (token < 0)
643 return -EINVAL;
644
645 if (token < Opt_last_int) {
646 ret = match_int(&argstr[0], &intval);
647 if (ret < 0) {
648 pr_err("bad mount option arg (not int) "
649 "at '%s'\n", c);
650 return ret;
651 }
652 dout("got int token %d val %d\n", token, intval);
653 } else if (token > Opt_last_int && token < Opt_last_string) {
654 dout("got string token %d val %s\n", token,
655 argstr[0].from);
cc0538b6
AE
656 } else if (token > Opt_last_string && token < Opt_last_bool) {
657 dout("got Boolean token %d\n", token);
59c2be1e
YS
658 } else {
659 dout("got token %d\n", token);
660 }
661
662 switch (token) {
cc0538b6
AE
663 case Opt_read_only:
664 rbd_opts->read_only = true;
665 break;
666 case Opt_read_write:
667 rbd_opts->read_only = false;
668 break;
59c2be1e 669 default:
aafb230e
AE
670 rbd_assert(false);
671 break;
59c2be1e
YS
672 }
673 return 0;
674}
675
602adf40
YS
676/*
677 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
678 * not exist create it. Either way, ceph_opts is consumed by this
679 * function.
602adf40 680 */
9d3997fd 681static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 682{
f8c38929 683 struct rbd_client *rbdc;
59c2be1e 684
1f7ba331 685 rbdc = rbd_client_find(ceph_opts);
9d3997fd 686 if (rbdc) /* using an existing client */
43ae4701 687 ceph_destroy_options(ceph_opts);
9d3997fd 688 else
f8c38929 689 rbdc = rbd_client_create(ceph_opts);
602adf40 690
9d3997fd 691 return rbdc;
602adf40
YS
692}
693
694/*
695 * Destroy ceph client
d23a4b3f 696 *
432b8587 697 * Caller must hold rbd_client_list_lock.
602adf40
YS
698 */
699static void rbd_client_release(struct kref *kref)
700{
701 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
702
37206ee5 703 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 704 spin_lock(&rbd_client_list_lock);
602adf40 705 list_del(&rbdc->node);
cd9d9f5d 706 spin_unlock(&rbd_client_list_lock);
602adf40
YS
707
708 ceph_destroy_client(rbdc->client);
709 kfree(rbdc);
710}
711
712/*
713 * Drop reference to ceph client node. If it's not referenced anymore, release
714 * it.
715 */
9d3997fd 716static void rbd_put_client(struct rbd_client *rbdc)
602adf40 717{
c53d5893
AE
718 if (rbdc)
719 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
720}
721
a30b71b9
AE
722static bool rbd_image_format_valid(u32 image_format)
723{
724 return image_format == 1 || image_format == 2;
725}
726
8e94af8e
AE
727static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
728{
103a150f
AE
729 size_t size;
730 u32 snap_count;
731
732 /* The header has to start with the magic rbd header text */
733 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
734 return false;
735
db2388b6
AE
736 /* The bio layer requires at least sector-sized I/O */
737
738 if (ondisk->options.order < SECTOR_SHIFT)
739 return false;
740
741 /* If we use u64 in a few spots we may be able to loosen this */
742
743 if (ondisk->options.order > 8 * sizeof (int) - 1)
744 return false;
745
103a150f
AE
746 /*
747 * The size of a snapshot header has to fit in a size_t, and
748 * that limits the number of snapshots.
749 */
750 snap_count = le32_to_cpu(ondisk->snap_count);
751 size = SIZE_MAX - sizeof (struct ceph_snap_context);
752 if (snap_count > size / sizeof (__le64))
753 return false;
754
755 /*
756 * Not only that, but the size of the entire the snapshot
757 * header must also be representable in a size_t.
758 */
759 size -= snap_count * sizeof (__le64);
760 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
761 return false;
762
763 return true;
8e94af8e
AE
764}
765
602adf40 766/*
bb23e37a
AE
767 * Fill an rbd image header with information from the given format 1
768 * on-disk header.
602adf40 769 */
662518b1 770static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 771 struct rbd_image_header_ondisk *ondisk)
602adf40 772{
662518b1 773 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
774 bool first_time = header->object_prefix == NULL;
775 struct ceph_snap_context *snapc;
776 char *object_prefix = NULL;
777 char *snap_names = NULL;
778 u64 *snap_sizes = NULL;
ccece235 779 u32 snap_count;
d2bb24e5 780 size_t size;
bb23e37a 781 int ret = -ENOMEM;
621901d6 782 u32 i;
602adf40 783
bb23e37a 784 /* Allocate this now to avoid having to handle failure below */
6a52325f 785
bb23e37a
AE
786 if (first_time) {
787 size_t len;
103a150f 788
bb23e37a
AE
789 len = strnlen(ondisk->object_prefix,
790 sizeof (ondisk->object_prefix));
791 object_prefix = kmalloc(len + 1, GFP_KERNEL);
792 if (!object_prefix)
793 return -ENOMEM;
794 memcpy(object_prefix, ondisk->object_prefix, len);
795 object_prefix[len] = '\0';
796 }
00f1f36f 797
bb23e37a 798 /* Allocate the snapshot context and fill it in */
00f1f36f 799
bb23e37a
AE
800 snap_count = le32_to_cpu(ondisk->snap_count);
801 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
802 if (!snapc)
803 goto out_err;
804 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 805 if (snap_count) {
bb23e37a 806 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
807 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
808
bb23e37a 809 /* We'll keep a copy of the snapshot names... */
621901d6 810
bb23e37a
AE
811 if (snap_names_len > (u64)SIZE_MAX)
812 goto out_2big;
813 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
814 if (!snap_names)
6a52325f
AE
815 goto out_err;
816
bb23e37a 817 /* ...as well as the array of their sizes. */
621901d6 818
d2bb24e5 819 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
820 snap_sizes = kmalloc(size, GFP_KERNEL);
821 if (!snap_sizes)
6a52325f 822 goto out_err;
bb23e37a 823
f785cc1d 824 /*
bb23e37a
AE
825 * Copy the names, and fill in each snapshot's id
826 * and size.
827 *
99a41ebc 828 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 829 * ondisk buffer we're working with has
f785cc1d
AE
830 * snap_names_len bytes beyond the end of the
831 * snapshot id array, this memcpy() is safe.
832 */
bb23e37a
AE
833 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
834 snaps = ondisk->snaps;
835 for (i = 0; i < snap_count; i++) {
836 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
837 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
838 }
602adf40 839 }
6a52325f 840
bb23e37a 841 /* We won't fail any more, fill in the header */
621901d6 842
662518b1 843 down_write(&rbd_dev->header_rwsem);
bb23e37a
AE
844 if (first_time) {
845 header->object_prefix = object_prefix;
846 header->obj_order = ondisk->options.order;
847 header->crypt_type = ondisk->options.crypt_type;
848 header->comp_type = ondisk->options.comp_type;
849 /* The rest aren't used for format 1 images */
850 header->stripe_unit = 0;
851 header->stripe_count = 0;
852 header->features = 0;
602adf40 853 } else {
662518b1
AE
854 ceph_put_snap_context(header->snapc);
855 kfree(header->snap_names);
856 kfree(header->snap_sizes);
602adf40 857 }
849b4260 858
bb23e37a 859 /* The remaining fields always get updated (when we refresh) */
621901d6 860
f84344f3 861 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
862 header->snapc = snapc;
863 header->snap_names = snap_names;
864 header->snap_sizes = snap_sizes;
468521c1 865
662518b1 866 /* Make sure mapping size is consistent with header info */
602adf40 867
662518b1
AE
868 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
869 if (rbd_dev->mapping.size != header->image_size)
870 rbd_dev->mapping.size = header->image_size;
871
872 up_write(&rbd_dev->header_rwsem);
602adf40 873
602adf40 874 return 0;
bb23e37a
AE
875out_2big:
876 ret = -EIO;
6a52325f 877out_err:
bb23e37a
AE
878 kfree(snap_sizes);
879 kfree(snap_names);
880 ceph_put_snap_context(snapc);
881 kfree(object_prefix);
ccece235 882
bb23e37a 883 return ret;
602adf40
YS
884}
885
9682fc6d
AE
886static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
887{
888 const char *snap_name;
889
890 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
891
892 /* Skip over names until we find the one we are looking for */
893
894 snap_name = rbd_dev->header.snap_names;
895 while (which--)
896 snap_name += strlen(snap_name) + 1;
897
898 return kstrdup(snap_name, GFP_KERNEL);
899}
900
30d1cff8
AE
901/*
902 * Snapshot id comparison function for use with qsort()/bsearch().
903 * Note that result is for snapshots in *descending* order.
904 */
905static int snapid_compare_reverse(const void *s1, const void *s2)
906{
907 u64 snap_id1 = *(u64 *)s1;
908 u64 snap_id2 = *(u64 *)s2;
909
910 if (snap_id1 < snap_id2)
911 return 1;
912 return snap_id1 == snap_id2 ? 0 : -1;
913}
914
915/*
916 * Search a snapshot context to see if the given snapshot id is
917 * present.
918 *
919 * Returns the position of the snapshot id in the array if it's found,
920 * or BAD_SNAP_INDEX otherwise.
921 *
922 * Note: The snapshot array is in kept sorted (by the osd) in
923 * reverse order, highest snapshot id first.
924 */
9682fc6d
AE
925static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
926{
927 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 928 u64 *found;
9682fc6d 929
30d1cff8
AE
930 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
931 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 932
30d1cff8 933 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
934}
935
2ad3d716
AE
936static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
937 u64 snap_id)
9e15b77d 938{
54cac61f 939 u32 which;
08518f6f 940 const char *snap_name;
9e15b77d 941
54cac61f
AE
942 which = rbd_dev_snap_index(rbd_dev, snap_id);
943 if (which == BAD_SNAP_INDEX)
08518f6f 944 return ERR_PTR(-ENOENT);
54cac61f 945
08518f6f
JD
946 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
947 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
948}
949
950static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
951{
9e15b77d
AE
952 if (snap_id == CEPH_NOSNAP)
953 return RBD_SNAP_HEAD_NAME;
954
54cac61f
AE
955 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
956 if (rbd_dev->image_format == 1)
957 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 958
54cac61f 959 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
960}
961
2ad3d716
AE
962static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
963 u64 *snap_size)
602adf40 964{
2ad3d716
AE
965 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
966 if (snap_id == CEPH_NOSNAP) {
967 *snap_size = rbd_dev->header.image_size;
968 } else if (rbd_dev->image_format == 1) {
969 u32 which;
602adf40 970
2ad3d716
AE
971 which = rbd_dev_snap_index(rbd_dev, snap_id);
972 if (which == BAD_SNAP_INDEX)
973 return -ENOENT;
e86924a8 974
2ad3d716
AE
975 *snap_size = rbd_dev->header.snap_sizes[which];
976 } else {
977 u64 size = 0;
978 int ret;
979
980 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
981 if (ret)
982 return ret;
983
984 *snap_size = size;
985 }
986 return 0;
602adf40
YS
987}
988
2ad3d716
AE
989static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
990 u64 *snap_features)
602adf40 991{
2ad3d716
AE
992 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
993 if (snap_id == CEPH_NOSNAP) {
994 *snap_features = rbd_dev->header.features;
995 } else if (rbd_dev->image_format == 1) {
996 *snap_features = 0; /* No features for format 1 */
602adf40 997 } else {
2ad3d716
AE
998 u64 features = 0;
999 int ret;
8b0241f8 1000
2ad3d716
AE
1001 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1002 if (ret)
1003 return ret;
1004
1005 *snap_features = features;
1006 }
1007 return 0;
1008}
1009
1010static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1011{
8f4b7d98 1012 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1013 u64 size = 0;
1014 u64 features = 0;
1015 int ret;
1016
2ad3d716
AE
1017 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1018 if (ret)
1019 return ret;
1020 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1021 if (ret)
1022 return ret;
1023
1024 rbd_dev->mapping.size = size;
1025 rbd_dev->mapping.features = features;
1026
8b0241f8 1027 return 0;
602adf40
YS
1028}
1029
d1cf5788
AE
1030static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1031{
1032 rbd_dev->mapping.size = 0;
1033 rbd_dev->mapping.features = 0;
200a6a8b
AE
1034}
1035
98571b5a 1036static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1037{
65ccfe21
AE
1038 char *name;
1039 u64 segment;
1040 int ret;
3a96d5cd 1041 char *name_format;
602adf40 1042
78c2a44a 1043 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1044 if (!name)
1045 return NULL;
1046 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1047 name_format = "%s.%012llx";
1048 if (rbd_dev->image_format == 2)
1049 name_format = "%s.%016llx";
1050 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
65ccfe21 1051 rbd_dev->header.object_prefix, segment);
2fd82b9e 1052 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1053 pr_err("error formatting segment name for #%llu (%d)\n",
1054 segment, ret);
1055 kfree(name);
1056 name = NULL;
1057 }
602adf40 1058
65ccfe21
AE
1059 return name;
1060}
602adf40 1061
78c2a44a
AE
1062static void rbd_segment_name_free(const char *name)
1063{
1064 /* The explicit cast here is needed to drop the const qualifier */
1065
1066 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1067}
1068
65ccfe21
AE
1069static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1070{
1071 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1072
65ccfe21
AE
1073 return offset & (segment_size - 1);
1074}
1075
1076static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1077 u64 offset, u64 length)
1078{
1079 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1080
1081 offset &= segment_size - 1;
1082
aafb230e 1083 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1084 if (offset + length > segment_size)
1085 length = segment_size - offset;
1086
1087 return length;
602adf40
YS
1088}
1089
029bcbd8
JD
1090/*
1091 * returns the size of an object in the image
1092 */
1093static u64 rbd_obj_bytes(struct rbd_image_header *header)
1094{
1095 return 1 << header->obj_order;
1096}
1097
602adf40
YS
1098/*
1099 * bio helpers
1100 */
1101
1102static void bio_chain_put(struct bio *chain)
1103{
1104 struct bio *tmp;
1105
1106 while (chain) {
1107 tmp = chain;
1108 chain = chain->bi_next;
1109 bio_put(tmp);
1110 }
1111}
1112
1113/*
1114 * zeros a bio chain, starting at specific offset
1115 */
1116static void zero_bio_chain(struct bio *chain, int start_ofs)
1117{
1118 struct bio_vec *bv;
1119 unsigned long flags;
1120 void *buf;
1121 int i;
1122 int pos = 0;
1123
1124 while (chain) {
1125 bio_for_each_segment(bv, chain, i) {
1126 if (pos + bv->bv_len > start_ofs) {
1127 int remainder = max(start_ofs - pos, 0);
1128 buf = bvec_kmap_irq(bv, &flags);
1129 memset(buf + remainder, 0,
1130 bv->bv_len - remainder);
350505e7 1131 flush_dcache_page(bv->bv_page);
85b5aaa6 1132 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1133 }
1134 pos += bv->bv_len;
1135 }
1136
1137 chain = chain->bi_next;
1138 }
1139}
1140
b9434c5b
AE
1141/*
1142 * similar to zero_bio_chain(), zeros data defined by a page array,
1143 * starting at the given byte offset from the start of the array and
1144 * continuing up to the given end offset. The pages array is
1145 * assumed to be big enough to hold all bytes up to the end.
1146 */
1147static void zero_pages(struct page **pages, u64 offset, u64 end)
1148{
1149 struct page **page = &pages[offset >> PAGE_SHIFT];
1150
1151 rbd_assert(end > offset);
1152 rbd_assert(end - offset <= (u64)SIZE_MAX);
1153 while (offset < end) {
1154 size_t page_offset;
1155 size_t length;
1156 unsigned long flags;
1157 void *kaddr;
1158
1159 page_offset = (size_t)(offset & ~PAGE_MASK);
1160 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1161 local_irq_save(flags);
1162 kaddr = kmap_atomic(*page);
1163 memset(kaddr + page_offset, 0, length);
350505e7 1164 flush_dcache_page(*page);
b9434c5b
AE
1165 kunmap_atomic(kaddr);
1166 local_irq_restore(flags);
1167
1168 offset += length;
1169 page++;
1170 }
1171}
1172
602adf40 1173/*
f7760dad
AE
1174 * Clone a portion of a bio, starting at the given byte offset
1175 * and continuing for the number of bytes indicated.
602adf40 1176 */
f7760dad
AE
1177static struct bio *bio_clone_range(struct bio *bio_src,
1178 unsigned int offset,
1179 unsigned int len,
1180 gfp_t gfpmask)
602adf40 1181{
f7760dad
AE
1182 struct bio_vec *bv;
1183 unsigned int resid;
1184 unsigned short idx;
1185 unsigned int voff;
1186 unsigned short end_idx;
1187 unsigned short vcnt;
1188 struct bio *bio;
1189
1190 /* Handle the easy case for the caller */
1191
1192 if (!offset && len == bio_src->bi_size)
1193 return bio_clone(bio_src, gfpmask);
1194
1195 if (WARN_ON_ONCE(!len))
1196 return NULL;
1197 if (WARN_ON_ONCE(len > bio_src->bi_size))
1198 return NULL;
1199 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1200 return NULL;
1201
1202 /* Find first affected segment... */
1203
1204 resid = offset;
d74c6d51 1205 bio_for_each_segment(bv, bio_src, idx) {
f7760dad
AE
1206 if (resid < bv->bv_len)
1207 break;
1208 resid -= bv->bv_len;
602adf40 1209 }
f7760dad 1210 voff = resid;
602adf40 1211
f7760dad 1212 /* ...and the last affected segment */
602adf40 1213
f7760dad
AE
1214 resid += len;
1215 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1216 if (resid <= bv->bv_len)
1217 break;
1218 resid -= bv->bv_len;
1219 }
1220 vcnt = end_idx - idx + 1;
1221
1222 /* Build the clone */
1223
1224 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1225 if (!bio)
1226 return NULL; /* ENOMEM */
602adf40 1227
f7760dad
AE
1228 bio->bi_bdev = bio_src->bi_bdev;
1229 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1230 bio->bi_rw = bio_src->bi_rw;
1231 bio->bi_flags |= 1 << BIO_CLONED;
1232
1233 /*
1234 * Copy over our part of the bio_vec, then update the first
1235 * and last (or only) entries.
1236 */
1237 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1238 vcnt * sizeof (struct bio_vec));
1239 bio->bi_io_vec[0].bv_offset += voff;
1240 if (vcnt > 1) {
1241 bio->bi_io_vec[0].bv_len -= voff;
1242 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1243 } else {
1244 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1245 }
1246
f7760dad
AE
1247 bio->bi_vcnt = vcnt;
1248 bio->bi_size = len;
1249 bio->bi_idx = 0;
1250
1251 return bio;
1252}
1253
1254/*
1255 * Clone a portion of a bio chain, starting at the given byte offset
1256 * into the first bio in the source chain and continuing for the
1257 * number of bytes indicated. The result is another bio chain of
1258 * exactly the given length, or a null pointer on error.
1259 *
1260 * The bio_src and offset parameters are both in-out. On entry they
1261 * refer to the first source bio and the offset into that bio where
1262 * the start of data to be cloned is located.
1263 *
1264 * On return, bio_src is updated to refer to the bio in the source
1265 * chain that contains first un-cloned byte, and *offset will
1266 * contain the offset of that byte within that bio.
1267 */
1268static struct bio *bio_chain_clone_range(struct bio **bio_src,
1269 unsigned int *offset,
1270 unsigned int len,
1271 gfp_t gfpmask)
1272{
1273 struct bio *bi = *bio_src;
1274 unsigned int off = *offset;
1275 struct bio *chain = NULL;
1276 struct bio **end;
1277
1278 /* Build up a chain of clone bios up to the limit */
1279
1280 if (!bi || off >= bi->bi_size || !len)
1281 return NULL; /* Nothing to clone */
602adf40 1282
f7760dad
AE
1283 end = &chain;
1284 while (len) {
1285 unsigned int bi_size;
1286 struct bio *bio;
1287
f5400b7a
AE
1288 if (!bi) {
1289 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1290 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1291 }
f7760dad
AE
1292 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1293 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1294 if (!bio)
1295 goto out_err; /* ENOMEM */
1296
1297 *end = bio;
1298 end = &bio->bi_next;
602adf40 1299
f7760dad
AE
1300 off += bi_size;
1301 if (off == bi->bi_size) {
1302 bi = bi->bi_next;
1303 off = 0;
1304 }
1305 len -= bi_size;
1306 }
1307 *bio_src = bi;
1308 *offset = off;
1309
1310 return chain;
1311out_err:
1312 bio_chain_put(chain);
602adf40 1313
602adf40
YS
1314 return NULL;
1315}
1316
926f9b3f
AE
1317/*
1318 * The default/initial value for all object request flags is 0. For
1319 * each flag, once its value is set to 1 it is never reset to 0
1320 * again.
1321 */
57acbaa7 1322static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1323{
57acbaa7 1324 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1325 struct rbd_device *rbd_dev;
1326
57acbaa7
AE
1327 rbd_dev = obj_request->img_request->rbd_dev;
1328 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1329 obj_request);
1330 }
1331}
1332
57acbaa7 1333static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1334{
1335 smp_mb();
57acbaa7 1336 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1337}
1338
57acbaa7 1339static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1340{
57acbaa7
AE
1341 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1342 struct rbd_device *rbd_dev = NULL;
6365d33a 1343
57acbaa7
AE
1344 if (obj_request_img_data_test(obj_request))
1345 rbd_dev = obj_request->img_request->rbd_dev;
1346 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1347 obj_request);
1348 }
1349}
1350
57acbaa7 1351static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1352{
1353 smp_mb();
57acbaa7 1354 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1355}
1356
5679c59f
AE
1357/*
1358 * This sets the KNOWN flag after (possibly) setting the EXISTS
1359 * flag. The latter is set based on the "exists" value provided.
1360 *
1361 * Note that for our purposes once an object exists it never goes
1362 * away again. It's possible that the response from two existence
1363 * checks are separated by the creation of the target object, and
1364 * the first ("doesn't exist") response arrives *after* the second
1365 * ("does exist"). In that case we ignore the second one.
1366 */
1367static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1368 bool exists)
1369{
1370 if (exists)
1371 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1372 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1373 smp_mb();
1374}
1375
1376static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1377{
1378 smp_mb();
1379 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1380}
1381
1382static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1383{
1384 smp_mb();
1385 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1386}
1387
7029f064
ID
1388static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1389{
1390 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1391
1392 return obj_request->img_offset <
1393 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1394}
1395
bf0d5f50
AE
1396static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1397{
37206ee5
AE
1398 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1399 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1400 kref_get(&obj_request->kref);
1401}
1402
1403static void rbd_obj_request_destroy(struct kref *kref);
1404static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1405{
1406 rbd_assert(obj_request != NULL);
37206ee5
AE
1407 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1408 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1409 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1410}
1411
dabf1f3b
AE
1412static void rbd_img_request_get(struct rbd_img_request *img_request)
1413{
1414 dout("%s: img %p (was %d)\n", __func__, img_request,
1415 atomic_read(&img_request->kref.refcount));
1416 kref_get(&img_request->kref);
1417}
1418
e93f3152
AE
1419static bool img_request_child_test(struct rbd_img_request *img_request);
1420static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1421static void rbd_img_request_destroy(struct kref *kref);
1422static void rbd_img_request_put(struct rbd_img_request *img_request)
1423{
1424 rbd_assert(img_request != NULL);
37206ee5
AE
1425 dout("%s: img %p (was %d)\n", __func__, img_request,
1426 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1427 if (img_request_child_test(img_request))
1428 kref_put(&img_request->kref, rbd_parent_request_destroy);
1429 else
1430 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1431}
1432
1433static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1434 struct rbd_obj_request *obj_request)
1435{
25dcf954
AE
1436 rbd_assert(obj_request->img_request == NULL);
1437
b155e86c 1438 /* Image request now owns object's original reference */
bf0d5f50 1439 obj_request->img_request = img_request;
25dcf954 1440 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1441 rbd_assert(!obj_request_img_data_test(obj_request));
1442 obj_request_img_data_set(obj_request);
bf0d5f50 1443 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1444 img_request->obj_request_count++;
1445 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1446 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1447 obj_request->which);
bf0d5f50
AE
1448}
1449
1450static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1451 struct rbd_obj_request *obj_request)
1452{
1453 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1454
37206ee5
AE
1455 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1456 obj_request->which);
bf0d5f50 1457 list_del(&obj_request->links);
25dcf954
AE
1458 rbd_assert(img_request->obj_request_count > 0);
1459 img_request->obj_request_count--;
1460 rbd_assert(obj_request->which == img_request->obj_request_count);
1461 obj_request->which = BAD_WHICH;
6365d33a 1462 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1463 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1464 obj_request->img_request = NULL;
25dcf954 1465 obj_request->callback = NULL;
bf0d5f50
AE
1466 rbd_obj_request_put(obj_request);
1467}
1468
1469static bool obj_request_type_valid(enum obj_request_type type)
1470{
1471 switch (type) {
9969ebc5 1472 case OBJ_REQUEST_NODATA:
bf0d5f50 1473 case OBJ_REQUEST_BIO:
788e2df3 1474 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1475 return true;
1476 default:
1477 return false;
1478 }
1479}
1480
bf0d5f50
AE
1481static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1482 struct rbd_obj_request *obj_request)
1483{
37206ee5
AE
1484 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1485
bf0d5f50
AE
1486 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1487}
1488
1489static void rbd_img_request_complete(struct rbd_img_request *img_request)
1490{
55f27e09 1491
37206ee5 1492 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1493
1494 /*
1495 * If no error occurred, compute the aggregate transfer
1496 * count for the image request. We could instead use
1497 * atomic64_cmpxchg() to update it as each object request
1498 * completes; not clear which way is better off hand.
1499 */
1500 if (!img_request->result) {
1501 struct rbd_obj_request *obj_request;
1502 u64 xferred = 0;
1503
1504 for_each_obj_request(img_request, obj_request)
1505 xferred += obj_request->xferred;
1506 img_request->xferred = xferred;
1507 }
1508
bf0d5f50
AE
1509 if (img_request->callback)
1510 img_request->callback(img_request);
1511 else
1512 rbd_img_request_put(img_request);
1513}
1514
788e2df3
AE
1515/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1516
1517static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1518{
37206ee5
AE
1519 dout("%s: obj %p\n", __func__, obj_request);
1520
788e2df3
AE
1521 return wait_for_completion_interruptible(&obj_request->completion);
1522}
1523
0c425248
AE
1524/*
1525 * The default/initial value for all image request flags is 0. Each
1526 * is conditionally set to 1 at image request initialization time
1527 * and currently never change thereafter.
1528 */
1529static void img_request_write_set(struct rbd_img_request *img_request)
1530{
1531 set_bit(IMG_REQ_WRITE, &img_request->flags);
1532 smp_mb();
1533}
1534
1535static bool img_request_write_test(struct rbd_img_request *img_request)
1536{
1537 smp_mb();
1538 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1539}
1540
9849e986
AE
1541static void img_request_child_set(struct rbd_img_request *img_request)
1542{
1543 set_bit(IMG_REQ_CHILD, &img_request->flags);
1544 smp_mb();
1545}
1546
e93f3152
AE
1547static void img_request_child_clear(struct rbd_img_request *img_request)
1548{
1549 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1550 smp_mb();
1551}
1552
9849e986
AE
1553static bool img_request_child_test(struct rbd_img_request *img_request)
1554{
1555 smp_mb();
1556 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1557}
1558
d0b2e944
AE
1559static void img_request_layered_set(struct rbd_img_request *img_request)
1560{
1561 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1562 smp_mb();
1563}
1564
a2acd00e
AE
1565static void img_request_layered_clear(struct rbd_img_request *img_request)
1566{
1567 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1568 smp_mb();
1569}
1570
d0b2e944
AE
1571static bool img_request_layered_test(struct rbd_img_request *img_request)
1572{
1573 smp_mb();
1574 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1575}
1576
6e2a4505
AE
1577static void
1578rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1579{
b9434c5b
AE
1580 u64 xferred = obj_request->xferred;
1581 u64 length = obj_request->length;
1582
6e2a4505
AE
1583 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1584 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1585 xferred, length);
6e2a4505 1586 /*
be4c4b85
JD
1587 * ENOENT means a hole in the image. We zero-fill the entire
1588 * length of the request. A short read also implies zero-fill
1589 * to the end of the request. An error requires the whole
1590 * length of the request to be reported finished with an error
1591 * to the block layer. In each case we update the xferred
1592 * count to indicate the whole request was satisfied.
6e2a4505 1593 */
b9434c5b 1594 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1595 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1596 if (obj_request->type == OBJ_REQUEST_BIO)
1597 zero_bio_chain(obj_request->bio_list, 0);
1598 else
1599 zero_pages(obj_request->pages, 0, length);
6e2a4505 1600 obj_request->result = 0;
b9434c5b
AE
1601 } else if (xferred < length && !obj_request->result) {
1602 if (obj_request->type == OBJ_REQUEST_BIO)
1603 zero_bio_chain(obj_request->bio_list, xferred);
1604 else
1605 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1606 }
be4c4b85 1607 obj_request->xferred = length;
6e2a4505
AE
1608 obj_request_done_set(obj_request);
1609}
1610
bf0d5f50
AE
1611static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1612{
37206ee5
AE
1613 dout("%s: obj %p cb %p\n", __func__, obj_request,
1614 obj_request->callback);
bf0d5f50
AE
1615 if (obj_request->callback)
1616 obj_request->callback(obj_request);
788e2df3
AE
1617 else
1618 complete_all(&obj_request->completion);
bf0d5f50
AE
1619}
1620
c47f9371 1621static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1622{
1623 dout("%s: obj %p\n", __func__, obj_request);
1624 obj_request_done_set(obj_request);
1625}
1626
c47f9371 1627static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1628{
57acbaa7 1629 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1630 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1631 bool layered = false;
1632
1633 if (obj_request_img_data_test(obj_request)) {
1634 img_request = obj_request->img_request;
1635 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1636 rbd_dev = img_request->rbd_dev;
57acbaa7 1637 }
8b3e1a56
AE
1638
1639 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1640 obj_request, img_request, obj_request->result,
1641 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1642 if (layered && obj_request->result == -ENOENT &&
1643 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1644 rbd_img_parent_read(obj_request);
1645 else if (img_request)
6e2a4505
AE
1646 rbd_img_obj_request_read_callback(obj_request);
1647 else
1648 obj_request_done_set(obj_request);
bf0d5f50
AE
1649}
1650
c47f9371 1651static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1652{
1b83bef2
SW
1653 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1654 obj_request->result, obj_request->length);
1655 /*
8b3e1a56
AE
1656 * There is no such thing as a successful short write. Set
1657 * it to our originally-requested length.
1b83bef2
SW
1658 */
1659 obj_request->xferred = obj_request->length;
07741308 1660 obj_request_done_set(obj_request);
bf0d5f50
AE
1661}
1662
fbfab539
AE
1663/*
1664 * For a simple stat call there's nothing to do. We'll do more if
1665 * this is part of a write sequence for a layered image.
1666 */
c47f9371 1667static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1668{
37206ee5 1669 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1670 obj_request_done_set(obj_request);
1671}
1672
bf0d5f50
AE
1673static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1674 struct ceph_msg *msg)
1675{
1676 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1677 u16 opcode;
1678
37206ee5 1679 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1680 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1681 if (obj_request_img_data_test(obj_request)) {
1682 rbd_assert(obj_request->img_request);
1683 rbd_assert(obj_request->which != BAD_WHICH);
1684 } else {
1685 rbd_assert(obj_request->which == BAD_WHICH);
1686 }
bf0d5f50 1687
1b83bef2
SW
1688 if (osd_req->r_result < 0)
1689 obj_request->result = osd_req->r_result;
bf0d5f50 1690
0eefd470 1691 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1692
c47f9371
AE
1693 /*
1694 * We support a 64-bit length, but ultimately it has to be
1695 * passed to blk_end_request(), which takes an unsigned int.
1696 */
1b83bef2 1697 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1698 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1699 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1700 switch (opcode) {
1701 case CEPH_OSD_OP_READ:
c47f9371 1702 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1703 break;
1704 case CEPH_OSD_OP_WRITE:
c47f9371 1705 rbd_osd_write_callback(obj_request);
bf0d5f50 1706 break;
fbfab539 1707 case CEPH_OSD_OP_STAT:
c47f9371 1708 rbd_osd_stat_callback(obj_request);
fbfab539 1709 break;
36be9a76 1710 case CEPH_OSD_OP_CALL:
b8d70035 1711 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1712 case CEPH_OSD_OP_WATCH:
c47f9371 1713 rbd_osd_trivial_callback(obj_request);
9969ebc5 1714 break;
bf0d5f50
AE
1715 default:
1716 rbd_warn(NULL, "%s: unsupported op %hu\n",
1717 obj_request->object_name, (unsigned short) opcode);
1718 break;
1719 }
1720
07741308 1721 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1722 rbd_obj_request_complete(obj_request);
1723}
1724
9d4df01f 1725static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1726{
1727 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1728 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1729 u64 snap_id;
430c28c3 1730
8c042b0d 1731 rbd_assert(osd_req != NULL);
430c28c3 1732
9d4df01f 1733 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1734 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1735 NULL, snap_id, NULL);
1736}
1737
1738static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1739{
1740 struct rbd_img_request *img_request = obj_request->img_request;
1741 struct ceph_osd_request *osd_req = obj_request->osd_req;
1742 struct ceph_snap_context *snapc;
1743 struct timespec mtime = CURRENT_TIME;
1744
1745 rbd_assert(osd_req != NULL);
1746
1747 snapc = img_request ? img_request->snapc : NULL;
1748 ceph_osdc_build_request(osd_req, obj_request->offset,
1749 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1750}
1751
bf0d5f50
AE
1752static struct ceph_osd_request *rbd_osd_req_create(
1753 struct rbd_device *rbd_dev,
1754 bool write_request,
430c28c3 1755 struct rbd_obj_request *obj_request)
bf0d5f50 1756{
bf0d5f50
AE
1757 struct ceph_snap_context *snapc = NULL;
1758 struct ceph_osd_client *osdc;
1759 struct ceph_osd_request *osd_req;
bf0d5f50 1760
6365d33a
AE
1761 if (obj_request_img_data_test(obj_request)) {
1762 struct rbd_img_request *img_request = obj_request->img_request;
1763
0c425248
AE
1764 rbd_assert(write_request ==
1765 img_request_write_test(img_request));
1766 if (write_request)
bf0d5f50 1767 snapc = img_request->snapc;
bf0d5f50
AE
1768 }
1769
1770 /* Allocate and initialize the request, for the single op */
1771
1772 osdc = &rbd_dev->rbd_client->client->osdc;
1773 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1774 if (!osd_req)
1775 return NULL; /* ENOMEM */
bf0d5f50 1776
430c28c3 1777 if (write_request)
bf0d5f50 1778 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1779 else
bf0d5f50 1780 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1781
1782 osd_req->r_callback = rbd_osd_req_callback;
1783 osd_req->r_priv = obj_request;
1784
1785 osd_req->r_oid_len = strlen(obj_request->object_name);
1786 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1787 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1788
1789 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1790
bf0d5f50
AE
1791 return osd_req;
1792}
1793
0eefd470
AE
1794/*
1795 * Create a copyup osd request based on the information in the
1796 * object request supplied. A copyup request has two osd ops,
1797 * a copyup method call, and a "normal" write request.
1798 */
1799static struct ceph_osd_request *
1800rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1801{
1802 struct rbd_img_request *img_request;
1803 struct ceph_snap_context *snapc;
1804 struct rbd_device *rbd_dev;
1805 struct ceph_osd_client *osdc;
1806 struct ceph_osd_request *osd_req;
1807
1808 rbd_assert(obj_request_img_data_test(obj_request));
1809 img_request = obj_request->img_request;
1810 rbd_assert(img_request);
1811 rbd_assert(img_request_write_test(img_request));
1812
1813 /* Allocate and initialize the request, for the two ops */
1814
1815 snapc = img_request->snapc;
1816 rbd_dev = img_request->rbd_dev;
1817 osdc = &rbd_dev->rbd_client->client->osdc;
1818 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1819 if (!osd_req)
1820 return NULL; /* ENOMEM */
1821
1822 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1823 osd_req->r_callback = rbd_osd_req_callback;
1824 osd_req->r_priv = obj_request;
1825
1826 osd_req->r_oid_len = strlen(obj_request->object_name);
1827 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1828 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1829
1830 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1831
1832 return osd_req;
1833}
1834
1835
bf0d5f50
AE
1836static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1837{
1838 ceph_osdc_put_request(osd_req);
1839}
1840
1841/* object_name is assumed to be a non-null pointer and NUL-terminated */
1842
1843static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1844 u64 offset, u64 length,
1845 enum obj_request_type type)
1846{
1847 struct rbd_obj_request *obj_request;
1848 size_t size;
1849 char *name;
1850
1851 rbd_assert(obj_request_type_valid(type));
1852
1853 size = strlen(object_name) + 1;
f907ad55
AE
1854 name = kmalloc(size, GFP_KERNEL);
1855 if (!name)
bf0d5f50
AE
1856 return NULL;
1857
868311b1 1858 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1859 if (!obj_request) {
1860 kfree(name);
1861 return NULL;
1862 }
1863
bf0d5f50
AE
1864 obj_request->object_name = memcpy(name, object_name, size);
1865 obj_request->offset = offset;
1866 obj_request->length = length;
926f9b3f 1867 obj_request->flags = 0;
bf0d5f50
AE
1868 obj_request->which = BAD_WHICH;
1869 obj_request->type = type;
1870 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1871 init_completion(&obj_request->completion);
bf0d5f50
AE
1872 kref_init(&obj_request->kref);
1873
37206ee5
AE
1874 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1875 offset, length, (int)type, obj_request);
1876
bf0d5f50
AE
1877 return obj_request;
1878}
1879
1880static void rbd_obj_request_destroy(struct kref *kref)
1881{
1882 struct rbd_obj_request *obj_request;
1883
1884 obj_request = container_of(kref, struct rbd_obj_request, kref);
1885
37206ee5
AE
1886 dout("%s: obj %p\n", __func__, obj_request);
1887
bf0d5f50
AE
1888 rbd_assert(obj_request->img_request == NULL);
1889 rbd_assert(obj_request->which == BAD_WHICH);
1890
1891 if (obj_request->osd_req)
1892 rbd_osd_req_destroy(obj_request->osd_req);
1893
1894 rbd_assert(obj_request_type_valid(obj_request->type));
1895 switch (obj_request->type) {
9969ebc5
AE
1896 case OBJ_REQUEST_NODATA:
1897 break; /* Nothing to do */
bf0d5f50
AE
1898 case OBJ_REQUEST_BIO:
1899 if (obj_request->bio_list)
1900 bio_chain_put(obj_request->bio_list);
1901 break;
788e2df3
AE
1902 case OBJ_REQUEST_PAGES:
1903 if (obj_request->pages)
1904 ceph_release_page_vector(obj_request->pages,
1905 obj_request->page_count);
1906 break;
bf0d5f50
AE
1907 }
1908
f907ad55 1909 kfree(obj_request->object_name);
868311b1
AE
1910 obj_request->object_name = NULL;
1911 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1912}
1913
fb65d228
AE
1914/* It's OK to call this for a device with no parent */
1915
1916static void rbd_spec_put(struct rbd_spec *spec);
1917static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1918{
1919 rbd_dev_remove_parent(rbd_dev);
1920 rbd_spec_put(rbd_dev->parent_spec);
1921 rbd_dev->parent_spec = NULL;
1922 rbd_dev->parent_overlap = 0;
1923}
1924
a2acd00e
AE
1925/*
1926 * Parent image reference counting is used to determine when an
1927 * image's parent fields can be safely torn down--after there are no
1928 * more in-flight requests to the parent image. When the last
1929 * reference is dropped, cleaning them up is safe.
1930 */
1931static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1932{
1933 int counter;
1934
1935 if (!rbd_dev->parent_spec)
1936 return;
1937
1938 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1939 if (counter > 0)
1940 return;
1941
1942 /* Last reference; clean up parent data structures */
1943
1944 if (!counter)
1945 rbd_dev_unparent(rbd_dev);
1946 else
1947 rbd_warn(rbd_dev, "parent reference underflow\n");
1948}
1949
1950/*
1951 * If an image has a non-zero parent overlap, get a reference to its
1952 * parent.
1953 *
392a9dad
AE
1954 * We must get the reference before checking for the overlap to
1955 * coordinate properly with zeroing the parent overlap in
1956 * rbd_dev_v2_parent_info() when an image gets flattened. We
1957 * drop it again if there is no overlap.
1958 *
a2acd00e
AE
1959 * Returns true if the rbd device has a parent with a non-zero
1960 * overlap and a reference for it was successfully taken, or
1961 * false otherwise.
1962 */
1963static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1964{
1965 int counter;
1966
1967 if (!rbd_dev->parent_spec)
1968 return false;
1969
1970 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1971 if (counter > 0 && rbd_dev->parent_overlap)
1972 return true;
1973
1974 /* Image was flattened, but parent is not yet torn down */
1975
1976 if (counter < 0)
1977 rbd_warn(rbd_dev, "parent reference overflow\n");
1978
1979 return false;
1980}
1981
bf0d5f50
AE
1982/*
1983 * Caller is responsible for filling in the list of object requests
1984 * that comprises the image request, and the Linux request pointer
1985 * (if there is one).
1986 */
cc344fa1
AE
1987static struct rbd_img_request *rbd_img_request_create(
1988 struct rbd_device *rbd_dev,
bf0d5f50 1989 u64 offset, u64 length,
e93f3152 1990 bool write_request)
bf0d5f50
AE
1991{
1992 struct rbd_img_request *img_request;
bf0d5f50 1993
1c2a9dfe 1994 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1995 if (!img_request)
1996 return NULL;
1997
1998 if (write_request) {
1999 down_read(&rbd_dev->header_rwsem);
812164f8 2000 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 2001 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
2002 }
2003
2004 img_request->rq = NULL;
2005 img_request->rbd_dev = rbd_dev;
2006 img_request->offset = offset;
2007 img_request->length = length;
0c425248
AE
2008 img_request->flags = 0;
2009 if (write_request) {
2010 img_request_write_set(img_request);
468521c1 2011 img_request->snapc = rbd_dev->header.snapc;
0c425248 2012 } else {
bf0d5f50 2013 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2014 }
a2acd00e 2015 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2016 img_request_layered_set(img_request);
bf0d5f50
AE
2017 spin_lock_init(&img_request->completion_lock);
2018 img_request->next_completion = 0;
2019 img_request->callback = NULL;
a5a337d4 2020 img_request->result = 0;
bf0d5f50
AE
2021 img_request->obj_request_count = 0;
2022 INIT_LIST_HEAD(&img_request->obj_requests);
2023 kref_init(&img_request->kref);
2024
37206ee5
AE
2025 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2026 write_request ? "write" : "read", offset, length,
2027 img_request);
2028
bf0d5f50
AE
2029 return img_request;
2030}
2031
2032static void rbd_img_request_destroy(struct kref *kref)
2033{
2034 struct rbd_img_request *img_request;
2035 struct rbd_obj_request *obj_request;
2036 struct rbd_obj_request *next_obj_request;
2037
2038 img_request = container_of(kref, struct rbd_img_request, kref);
2039
37206ee5
AE
2040 dout("%s: img %p\n", __func__, img_request);
2041
bf0d5f50
AE
2042 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2043 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2044 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2045
a2acd00e
AE
2046 if (img_request_layered_test(img_request)) {
2047 img_request_layered_clear(img_request);
2048 rbd_dev_parent_put(img_request->rbd_dev);
2049 }
2050
0c425248 2051 if (img_request_write_test(img_request))
812164f8 2052 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2053
1c2a9dfe 2054 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2055}
2056
e93f3152
AE
2057static struct rbd_img_request *rbd_parent_request_create(
2058 struct rbd_obj_request *obj_request,
2059 u64 img_offset, u64 length)
2060{
2061 struct rbd_img_request *parent_request;
2062 struct rbd_device *rbd_dev;
2063
2064 rbd_assert(obj_request->img_request);
2065 rbd_dev = obj_request->img_request->rbd_dev;
2066
2067 parent_request = rbd_img_request_create(rbd_dev->parent,
2068 img_offset, length, false);
2069 if (!parent_request)
2070 return NULL;
2071
2072 img_request_child_set(parent_request);
2073 rbd_obj_request_get(obj_request);
2074 parent_request->obj_request = obj_request;
2075
2076 return parent_request;
2077}
2078
2079static void rbd_parent_request_destroy(struct kref *kref)
2080{
2081 struct rbd_img_request *parent_request;
2082 struct rbd_obj_request *orig_request;
2083
2084 parent_request = container_of(kref, struct rbd_img_request, kref);
2085 orig_request = parent_request->obj_request;
2086
2087 parent_request->obj_request = NULL;
2088 rbd_obj_request_put(orig_request);
2089 img_request_child_clear(parent_request);
2090
2091 rbd_img_request_destroy(kref);
2092}
2093
1217857f
AE
2094static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2095{
6365d33a 2096 struct rbd_img_request *img_request;
1217857f
AE
2097 unsigned int xferred;
2098 int result;
8b3e1a56 2099 bool more;
1217857f 2100
6365d33a
AE
2101 rbd_assert(obj_request_img_data_test(obj_request));
2102 img_request = obj_request->img_request;
2103
1217857f
AE
2104 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2105 xferred = (unsigned int)obj_request->xferred;
2106 result = obj_request->result;
2107 if (result) {
2108 struct rbd_device *rbd_dev = img_request->rbd_dev;
2109
2110 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2111 img_request_write_test(img_request) ? "write" : "read",
2112 obj_request->length, obj_request->img_offset,
2113 obj_request->offset);
2114 rbd_warn(rbd_dev, " result %d xferred %x\n",
2115 result, xferred);
2116 if (!img_request->result)
2117 img_request->result = result;
2118 }
2119
f1a4739f
AE
2120 /* Image object requests don't own their page array */
2121
2122 if (obj_request->type == OBJ_REQUEST_PAGES) {
2123 obj_request->pages = NULL;
2124 obj_request->page_count = 0;
2125 }
2126
8b3e1a56
AE
2127 if (img_request_child_test(img_request)) {
2128 rbd_assert(img_request->obj_request != NULL);
2129 more = obj_request->which < img_request->obj_request_count - 1;
2130 } else {
2131 rbd_assert(img_request->rq != NULL);
2132 more = blk_end_request(img_request->rq, result, xferred);
2133 }
2134
2135 return more;
1217857f
AE
2136}
2137
2169238d
AE
2138static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2139{
2140 struct rbd_img_request *img_request;
2141 u32 which = obj_request->which;
2142 bool more = true;
2143
6365d33a 2144 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2145 img_request = obj_request->img_request;
2146
2147 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2148 rbd_assert(img_request != NULL);
2169238d
AE
2149 rbd_assert(img_request->obj_request_count > 0);
2150 rbd_assert(which != BAD_WHICH);
2151 rbd_assert(which < img_request->obj_request_count);
2152 rbd_assert(which >= img_request->next_completion);
2153
2154 spin_lock_irq(&img_request->completion_lock);
2155 if (which != img_request->next_completion)
2156 goto out;
2157
2158 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2159 rbd_assert(more);
2160 rbd_assert(which < img_request->obj_request_count);
2161
2162 if (!obj_request_done_test(obj_request))
2163 break;
1217857f 2164 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2165 which++;
2166 }
2167
2168 rbd_assert(more ^ (which == img_request->obj_request_count));
2169 img_request->next_completion = which;
2170out:
2171 spin_unlock_irq(&img_request->completion_lock);
dabf1f3b 2172 rbd_img_request_put(img_request);
2169238d
AE
2173
2174 if (!more)
2175 rbd_img_request_complete(img_request);
2176}
2177
f1a4739f
AE
2178/*
2179 * Split up an image request into one or more object requests, each
2180 * to a different object. The "type" parameter indicates whether
2181 * "data_desc" is the pointer to the head of a list of bio
2182 * structures, or the base of a page array. In either case this
2183 * function assumes data_desc describes memory sufficient to hold
2184 * all data described by the image request.
2185 */
2186static int rbd_img_request_fill(struct rbd_img_request *img_request,
2187 enum obj_request_type type,
2188 void *data_desc)
bf0d5f50
AE
2189{
2190 struct rbd_device *rbd_dev = img_request->rbd_dev;
2191 struct rbd_obj_request *obj_request = NULL;
2192 struct rbd_obj_request *next_obj_request;
0c425248 2193 bool write_request = img_request_write_test(img_request);
9a640548 2194 struct bio *bio_list = 0;
f1a4739f 2195 unsigned int bio_offset = 0;
9a640548 2196 struct page **pages = 0;
7da22d29 2197 u64 img_offset;
bf0d5f50
AE
2198 u64 resid;
2199 u16 opcode;
2200
f1a4739f
AE
2201 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2202 (int)type, data_desc);
37206ee5 2203
430c28c3 2204 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2205 img_offset = img_request->offset;
bf0d5f50 2206 resid = img_request->length;
4dda41d3 2207 rbd_assert(resid > 0);
f1a4739f
AE
2208
2209 if (type == OBJ_REQUEST_BIO) {
2210 bio_list = data_desc;
2211 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2212 } else {
2213 rbd_assert(type == OBJ_REQUEST_PAGES);
2214 pages = data_desc;
2215 }
2216
bf0d5f50 2217 while (resid) {
2fa12320 2218 struct ceph_osd_request *osd_req;
bf0d5f50 2219 const char *object_name;
bf0d5f50
AE
2220 u64 offset;
2221 u64 length;
2222
7da22d29 2223 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2224 if (!object_name)
2225 goto out_unwind;
7da22d29
AE
2226 offset = rbd_segment_offset(rbd_dev, img_offset);
2227 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2228 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2229 offset, length, type);
78c2a44a
AE
2230 /* object request has its own copy of the object name */
2231 rbd_segment_name_free(object_name);
bf0d5f50
AE
2232 if (!obj_request)
2233 goto out_unwind;
b33b7132
JD
2234 /*
2235 * set obj_request->img_request before creating the
2236 * osd_request so that it gets the right snapc
2237 */
2238 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2239
f1a4739f
AE
2240 if (type == OBJ_REQUEST_BIO) {
2241 unsigned int clone_size;
2242
2243 rbd_assert(length <= (u64)UINT_MAX);
2244 clone_size = (unsigned int)length;
2245 obj_request->bio_list =
2246 bio_chain_clone_range(&bio_list,
2247 &bio_offset,
2248 clone_size,
2249 GFP_ATOMIC);
2250 if (!obj_request->bio_list)
2251 goto out_partial;
2252 } else {
2253 unsigned int page_count;
2254
2255 obj_request->pages = pages;
2256 page_count = (u32)calc_pages_for(offset, length);
2257 obj_request->page_count = page_count;
2258 if ((offset + length) & ~PAGE_MASK)
2259 page_count--; /* more on last page */
2260 pages += page_count;
2261 }
bf0d5f50 2262
2fa12320
AE
2263 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2264 obj_request);
2265 if (!osd_req)
bf0d5f50 2266 goto out_partial;
2fa12320 2267 obj_request->osd_req = osd_req;
2169238d 2268 obj_request->callback = rbd_img_obj_callback;
dabf1f3b 2269 rbd_img_request_get(img_request);
430c28c3 2270
2fa12320
AE
2271 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2272 0, 0);
f1a4739f
AE
2273 if (type == OBJ_REQUEST_BIO)
2274 osd_req_op_extent_osd_data_bio(osd_req, 0,
2275 obj_request->bio_list, length);
2276 else
2277 osd_req_op_extent_osd_data_pages(osd_req, 0,
2278 obj_request->pages, length,
2279 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2280
2281 if (write_request)
2282 rbd_osd_req_format_write(obj_request);
2283 else
2284 rbd_osd_req_format_read(obj_request);
430c28c3 2285
7da22d29 2286 obj_request->img_offset = img_offset;
bf0d5f50 2287
7da22d29 2288 img_offset += length;
bf0d5f50
AE
2289 resid -= length;
2290 }
2291
2292 return 0;
2293
2294out_partial:
2295 rbd_obj_request_put(obj_request);
2296out_unwind:
2297 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
04168b98 2298 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2299
2300 return -ENOMEM;
2301}
2302
0eefd470
AE
2303static void
2304rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2305{
2306 struct rbd_img_request *img_request;
2307 struct rbd_device *rbd_dev;
ebda6408 2308 struct page **pages;
0eefd470
AE
2309 u32 page_count;
2310
2311 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2312 rbd_assert(obj_request_img_data_test(obj_request));
2313 img_request = obj_request->img_request;
2314 rbd_assert(img_request);
2315
2316 rbd_dev = img_request->rbd_dev;
2317 rbd_assert(rbd_dev);
0eefd470 2318
ebda6408
AE
2319 pages = obj_request->copyup_pages;
2320 rbd_assert(pages != NULL);
0eefd470 2321 obj_request->copyup_pages = NULL;
ebda6408
AE
2322 page_count = obj_request->copyup_page_count;
2323 rbd_assert(page_count);
2324 obj_request->copyup_page_count = 0;
2325 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2326
2327 /*
2328 * We want the transfer count to reflect the size of the
2329 * original write request. There is no such thing as a
2330 * successful short write, so if the request was successful
2331 * we can just set it to the originally-requested length.
2332 */
2333 if (!obj_request->result)
2334 obj_request->xferred = obj_request->length;
2335
2336 /* Finish up with the normal image object callback */
2337
2338 rbd_img_obj_callback(obj_request);
2339}
2340
3d7efd18
AE
2341static void
2342rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2343{
2344 struct rbd_obj_request *orig_request;
0eefd470
AE
2345 struct ceph_osd_request *osd_req;
2346 struct ceph_osd_client *osdc;
2347 struct rbd_device *rbd_dev;
3d7efd18 2348 struct page **pages;
ebda6408 2349 u32 page_count;
bbea1c1a 2350 int img_result;
ebda6408 2351 u64 parent_length;
b91f09f1
AE
2352 u64 offset;
2353 u64 length;
3d7efd18
AE
2354
2355 rbd_assert(img_request_child_test(img_request));
2356
2357 /* First get what we need from the image request */
2358
2359 pages = img_request->copyup_pages;
2360 rbd_assert(pages != NULL);
2361 img_request->copyup_pages = NULL;
ebda6408
AE
2362 page_count = img_request->copyup_page_count;
2363 rbd_assert(page_count);
2364 img_request->copyup_page_count = 0;
3d7efd18
AE
2365
2366 orig_request = img_request->obj_request;
2367 rbd_assert(orig_request != NULL);
b91f09f1 2368 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2369 img_result = img_request->result;
ebda6408
AE
2370 parent_length = img_request->length;
2371 rbd_assert(parent_length == img_request->xferred);
91c6febb 2372 rbd_img_request_put(img_request);
3d7efd18 2373
91c6febb
AE
2374 rbd_assert(orig_request->img_request);
2375 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2376 rbd_assert(rbd_dev);
0eefd470 2377
bbea1c1a
AE
2378 /*
2379 * If the overlap has become 0 (most likely because the
2380 * image has been flattened) we need to free the pages
2381 * and re-submit the original write request.
2382 */
2383 if (!rbd_dev->parent_overlap) {
2384 struct ceph_osd_client *osdc;
3d7efd18 2385
bbea1c1a
AE
2386 ceph_release_page_vector(pages, page_count);
2387 osdc = &rbd_dev->rbd_client->client->osdc;
2388 img_result = rbd_obj_request_submit(osdc, orig_request);
2389 if (!img_result)
2390 return;
2391 }
0eefd470 2392
bbea1c1a 2393 if (img_result)
0eefd470 2394 goto out_err;
0eefd470 2395
8785b1d4
AE
2396 /*
2397 * The original osd request is of no use to use any more.
2398 * We need a new one that can hold the two ops in a copyup
2399 * request. Allocate the new copyup osd request for the
2400 * original request, and release the old one.
2401 */
bbea1c1a 2402 img_result = -ENOMEM;
0eefd470
AE
2403 osd_req = rbd_osd_req_create_copyup(orig_request);
2404 if (!osd_req)
2405 goto out_err;
8785b1d4 2406 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2407 orig_request->osd_req = osd_req;
2408 orig_request->copyup_pages = pages;
ebda6408 2409 orig_request->copyup_page_count = page_count;
3d7efd18 2410
0eefd470 2411 /* Initialize the copyup op */
3d7efd18 2412
0eefd470 2413 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2414 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2415 false, false);
3d7efd18 2416
0eefd470
AE
2417 /* Then the original write request op */
2418
b91f09f1
AE
2419 offset = orig_request->offset;
2420 length = orig_request->length;
0eefd470 2421 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2422 offset, length, 0, 0);
2423 if (orig_request->type == OBJ_REQUEST_BIO)
2424 osd_req_op_extent_osd_data_bio(osd_req, 1,
2425 orig_request->bio_list, length);
2426 else
2427 osd_req_op_extent_osd_data_pages(osd_req, 1,
2428 orig_request->pages, length,
2429 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2430
2431 rbd_osd_req_format_write(orig_request);
2432
2433 /* All set, send it off. */
2434
2435 orig_request->callback = rbd_img_obj_copyup_callback;
2436 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2437 img_result = rbd_obj_request_submit(osdc, orig_request);
2438 if (!img_result)
0eefd470
AE
2439 return;
2440out_err:
2441 /* Record the error code and complete the request */
2442
bbea1c1a 2443 orig_request->result = img_result;
0eefd470
AE
2444 orig_request->xferred = 0;
2445 obj_request_done_set(orig_request);
2446 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2447}
2448
2449/*
2450 * Read from the parent image the range of data that covers the
2451 * entire target of the given object request. This is used for
2452 * satisfying a layered image write request when the target of an
2453 * object request from the image request does not exist.
2454 *
2455 * A page array big enough to hold the returned data is allocated
2456 * and supplied to rbd_img_request_fill() as the "data descriptor."
2457 * When the read completes, this page array will be transferred to
2458 * the original object request for the copyup operation.
2459 *
2460 * If an error occurs, record it as the result of the original
2461 * object request and mark it done so it gets completed.
2462 */
2463static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2464{
2465 struct rbd_img_request *img_request = NULL;
2466 struct rbd_img_request *parent_request = NULL;
2467 struct rbd_device *rbd_dev;
2468 u64 img_offset;
2469 u64 length;
2470 struct page **pages = NULL;
2471 u32 page_count;
2472 int result;
2473
2474 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2475 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2476
2477 img_request = obj_request->img_request;
2478 rbd_assert(img_request != NULL);
2479 rbd_dev = img_request->rbd_dev;
2480 rbd_assert(rbd_dev->parent != NULL);
2481
2482 /*
2483 * Determine the byte range covered by the object in the
2484 * child image to which the original request was to be sent.
2485 */
2486 img_offset = obj_request->img_offset - obj_request->offset;
2487 length = (u64)1 << rbd_dev->header.obj_order;
2488
a9e8ba2c
AE
2489 /*
2490 * There is no defined parent data beyond the parent
2491 * overlap, so limit what we read at that boundary if
2492 * necessary.
2493 */
2494 if (img_offset + length > rbd_dev->parent_overlap) {
2495 rbd_assert(img_offset < rbd_dev->parent_overlap);
2496 length = rbd_dev->parent_overlap - img_offset;
2497 }
2498
3d7efd18
AE
2499 /*
2500 * Allocate a page array big enough to receive the data read
2501 * from the parent.
2502 */
2503 page_count = (u32)calc_pages_for(0, length);
2504 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2505 if (IS_ERR(pages)) {
2506 result = PTR_ERR(pages);
2507 pages = NULL;
2508 goto out_err;
2509 }
2510
2511 result = -ENOMEM;
e93f3152
AE
2512 parent_request = rbd_parent_request_create(obj_request,
2513 img_offset, length);
3d7efd18
AE
2514 if (!parent_request)
2515 goto out_err;
3d7efd18
AE
2516
2517 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2518 if (result)
2519 goto out_err;
2520 parent_request->copyup_pages = pages;
ebda6408 2521 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2522
2523 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2524 result = rbd_img_request_submit(parent_request);
2525 if (!result)
2526 return 0;
2527
2528 parent_request->copyup_pages = NULL;
ebda6408 2529 parent_request->copyup_page_count = 0;
3d7efd18
AE
2530 parent_request->obj_request = NULL;
2531 rbd_obj_request_put(obj_request);
2532out_err:
2533 if (pages)
2534 ceph_release_page_vector(pages, page_count);
2535 if (parent_request)
2536 rbd_img_request_put(parent_request);
2537 obj_request->result = result;
2538 obj_request->xferred = 0;
2539 obj_request_done_set(obj_request);
2540
2541 return result;
2542}
2543
c5b5ef6c
AE
2544static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2545{
c5b5ef6c 2546 struct rbd_obj_request *orig_request;
638f5abe 2547 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2548 int result;
2549
2550 rbd_assert(!obj_request_img_data_test(obj_request));
2551
2552 /*
2553 * All we need from the object request is the original
2554 * request and the result of the STAT op. Grab those, then
2555 * we're done with the request.
2556 */
2557 orig_request = obj_request->obj_request;
2558 obj_request->obj_request = NULL;
2559 rbd_assert(orig_request);
2560 rbd_assert(orig_request->img_request);
2561
2562 result = obj_request->result;
2563 obj_request->result = 0;
2564
2565 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2566 obj_request, orig_request, result,
2567 obj_request->xferred, obj_request->length);
2568 rbd_obj_request_put(obj_request);
2569
638f5abe
AE
2570 /*
2571 * If the overlap has become 0 (most likely because the
2572 * image has been flattened) we need to free the pages
2573 * and re-submit the original write request.
2574 */
2575 rbd_dev = orig_request->img_request->rbd_dev;
2576 if (!rbd_dev->parent_overlap) {
2577 struct ceph_osd_client *osdc;
2578
2579 rbd_obj_request_put(orig_request);
2580 osdc = &rbd_dev->rbd_client->client->osdc;
2581 result = rbd_obj_request_submit(osdc, orig_request);
2582 if (!result)
2583 return;
2584 }
c5b5ef6c
AE
2585
2586 /*
2587 * Our only purpose here is to determine whether the object
2588 * exists, and we don't want to treat the non-existence as
2589 * an error. If something else comes back, transfer the
2590 * error to the original request and complete it now.
2591 */
2592 if (!result) {
2593 obj_request_existence_set(orig_request, true);
2594 } else if (result == -ENOENT) {
2595 obj_request_existence_set(orig_request, false);
2596 } else if (result) {
2597 orig_request->result = result;
3d7efd18 2598 goto out;
c5b5ef6c
AE
2599 }
2600
2601 /*
2602 * Resubmit the original request now that we have recorded
2603 * whether the target object exists.
2604 */
b454e36d 2605 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2606out:
c5b5ef6c
AE
2607 if (orig_request->result)
2608 rbd_obj_request_complete(orig_request);
2609 rbd_obj_request_put(orig_request);
2610}
2611
2612static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2613{
2614 struct rbd_obj_request *stat_request;
2615 struct rbd_device *rbd_dev;
2616 struct ceph_osd_client *osdc;
2617 struct page **pages = NULL;
2618 u32 page_count;
2619 size_t size;
2620 int ret;
2621
2622 /*
2623 * The response data for a STAT call consists of:
2624 * le64 length;
2625 * struct {
2626 * le32 tv_sec;
2627 * le32 tv_nsec;
2628 * } mtime;
2629 */
2630 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2631 page_count = (u32)calc_pages_for(0, size);
2632 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2633 if (IS_ERR(pages))
2634 return PTR_ERR(pages);
2635
2636 ret = -ENOMEM;
2637 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2638 OBJ_REQUEST_PAGES);
2639 if (!stat_request)
2640 goto out;
2641
2642 rbd_obj_request_get(obj_request);
2643 stat_request->obj_request = obj_request;
2644 stat_request->pages = pages;
2645 stat_request->page_count = page_count;
2646
2647 rbd_assert(obj_request->img_request);
2648 rbd_dev = obj_request->img_request->rbd_dev;
2649 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2650 stat_request);
2651 if (!stat_request->osd_req)
2652 goto out;
2653 stat_request->callback = rbd_img_obj_exists_callback;
2654
2655 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2656 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2657 false, false);
9d4df01f 2658 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2659
2660 osdc = &rbd_dev->rbd_client->client->osdc;
2661 ret = rbd_obj_request_submit(osdc, stat_request);
2662out:
2663 if (ret)
2664 rbd_obj_request_put(obj_request);
2665
2666 return ret;
2667}
2668
b454e36d
AE
2669static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2670{
2671 struct rbd_img_request *img_request;
a9e8ba2c 2672 struct rbd_device *rbd_dev;
3d7efd18 2673 bool known;
b454e36d
AE
2674
2675 rbd_assert(obj_request_img_data_test(obj_request));
2676
2677 img_request = obj_request->img_request;
2678 rbd_assert(img_request);
a9e8ba2c 2679 rbd_dev = img_request->rbd_dev;
b454e36d 2680
b454e36d 2681 /*
a9e8ba2c
AE
2682 * Only writes to layered images need special handling.
2683 * Reads and non-layered writes are simple object requests.
2684 * Layered writes that start beyond the end of the overlap
2685 * with the parent have no parent data, so they too are
2686 * simple object requests. Finally, if the target object is
2687 * known to already exist, its parent data has already been
2688 * copied, so a write to the object can also be handled as a
2689 * simple object request.
b454e36d
AE
2690 */
2691 if (!img_request_write_test(img_request) ||
2692 !img_request_layered_test(img_request) ||
7029f064 2693 !obj_request_overlaps_parent(obj_request) ||
3d7efd18
AE
2694 ((known = obj_request_known_test(obj_request)) &&
2695 obj_request_exists_test(obj_request))) {
b454e36d
AE
2696
2697 struct rbd_device *rbd_dev;
2698 struct ceph_osd_client *osdc;
2699
2700 rbd_dev = obj_request->img_request->rbd_dev;
2701 osdc = &rbd_dev->rbd_client->client->osdc;
2702
2703 return rbd_obj_request_submit(osdc, obj_request);
2704 }
2705
2706 /*
3d7efd18
AE
2707 * It's a layered write. The target object might exist but
2708 * we may not know that yet. If we know it doesn't exist,
2709 * start by reading the data for the full target object from
2710 * the parent so we can use it for a copyup to the target.
b454e36d 2711 */
3d7efd18
AE
2712 if (known)
2713 return rbd_img_obj_parent_read_full(obj_request);
2714
2715 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2716
2717 return rbd_img_obj_exists_submit(obj_request);
2718}
2719
bf0d5f50
AE
2720static int rbd_img_request_submit(struct rbd_img_request *img_request)
2721{
bf0d5f50 2722 struct rbd_obj_request *obj_request;
46faeed4 2723 struct rbd_obj_request *next_obj_request;
bf0d5f50 2724
37206ee5 2725 dout("%s: img %p\n", __func__, img_request);
46faeed4 2726 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2727 int ret;
2728
b454e36d 2729 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2730 if (ret)
2731 return ret;
bf0d5f50
AE
2732 }
2733
2734 return 0;
2735}
8b3e1a56
AE
2736
2737static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2738{
2739 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2740 struct rbd_device *rbd_dev;
2741 u64 obj_end;
02c74fba
AE
2742 u64 img_xferred;
2743 int img_result;
8b3e1a56
AE
2744
2745 rbd_assert(img_request_child_test(img_request));
2746
02c74fba
AE
2747 /* First get what we need from the image request and release it */
2748
8b3e1a56 2749 obj_request = img_request->obj_request;
02c74fba
AE
2750 img_xferred = img_request->xferred;
2751 img_result = img_request->result;
2752 rbd_img_request_put(img_request);
2753
2754 /*
2755 * If the overlap has become 0 (most likely because the
2756 * image has been flattened) we need to re-submit the
2757 * original request.
2758 */
a9e8ba2c
AE
2759 rbd_assert(obj_request);
2760 rbd_assert(obj_request->img_request);
02c74fba
AE
2761 rbd_dev = obj_request->img_request->rbd_dev;
2762 if (!rbd_dev->parent_overlap) {
2763 struct ceph_osd_client *osdc;
2764
2765 osdc = &rbd_dev->rbd_client->client->osdc;
2766 img_result = rbd_obj_request_submit(osdc, obj_request);
2767 if (!img_result)
2768 return;
2769 }
a9e8ba2c 2770
02c74fba 2771 obj_request->result = img_result;
a9e8ba2c
AE
2772 if (obj_request->result)
2773 goto out;
2774
2775 /*
2776 * We need to zero anything beyond the parent overlap
2777 * boundary. Since rbd_img_obj_request_read_callback()
2778 * will zero anything beyond the end of a short read, an
2779 * easy way to do this is to pretend the data from the
2780 * parent came up short--ending at the overlap boundary.
2781 */
2782 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2783 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2784 if (obj_end > rbd_dev->parent_overlap) {
2785 u64 xferred = 0;
2786
2787 if (obj_request->img_offset < rbd_dev->parent_overlap)
2788 xferred = rbd_dev->parent_overlap -
2789 obj_request->img_offset;
8b3e1a56 2790
02c74fba 2791 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2792 } else {
02c74fba 2793 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2794 }
2795out:
8b3e1a56
AE
2796 rbd_img_obj_request_read_callback(obj_request);
2797 rbd_obj_request_complete(obj_request);
2798}
2799
2800static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2801{
8b3e1a56
AE
2802 struct rbd_img_request *img_request;
2803 int result;
2804
2805 rbd_assert(obj_request_img_data_test(obj_request));
2806 rbd_assert(obj_request->img_request != NULL);
2807 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2808 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2809
8b3e1a56 2810 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2811 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2812 obj_request->img_offset,
e93f3152 2813 obj_request->length);
8b3e1a56
AE
2814 result = -ENOMEM;
2815 if (!img_request)
2816 goto out_err;
2817
5b2ab72d
AE
2818 if (obj_request->type == OBJ_REQUEST_BIO)
2819 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2820 obj_request->bio_list);
2821 else
2822 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2823 obj_request->pages);
8b3e1a56
AE
2824 if (result)
2825 goto out_err;
2826
2827 img_request->callback = rbd_img_parent_read_callback;
2828 result = rbd_img_request_submit(img_request);
2829 if (result)
2830 goto out_err;
2831
2832 return;
2833out_err:
2834 if (img_request)
2835 rbd_img_request_put(img_request);
2836 obj_request->result = result;
2837 obj_request->xferred = 0;
2838 obj_request_done_set(obj_request);
2839}
bf0d5f50 2840
b10f19aa 2841static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2842{
2843 struct rbd_obj_request *obj_request;
2169238d 2844 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2845 int ret;
2846
2847 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2848 OBJ_REQUEST_NODATA);
2849 if (!obj_request)
2850 return -ENOMEM;
2851
2852 ret = -ENOMEM;
430c28c3 2853 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2854 if (!obj_request->osd_req)
2855 goto out;
2856
c99d2d4a 2857 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2858 notify_id, 0, 0);
9d4df01f 2859 rbd_osd_req_format_read(obj_request);
430c28c3 2860
b8d70035 2861 ret = rbd_obj_request_submit(osdc, obj_request);
cf81b60e 2862 if (ret)
b10f19aa
JD
2863 goto out;
2864 ret = rbd_obj_request_wait(obj_request);
2865out:
2866 rbd_obj_request_put(obj_request);
b8d70035
AE
2867
2868 return ret;
2869}
2870
2871static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2872{
2873 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2874 int ret;
b8d70035
AE
2875
2876 if (!rbd_dev)
2877 return;
2878
37206ee5 2879 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2880 rbd_dev->header_name, (unsigned long long)notify_id,
2881 (unsigned int)opcode);
e627db08
AE
2882 ret = rbd_dev_refresh(rbd_dev);
2883 if (ret)
2884 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2885
b10f19aa 2886 rbd_obj_notify_ack_sync(rbd_dev, notify_id);
b8d70035
AE
2887}
2888
9969ebc5
AE
2889/*
2890 * Request sync osd watch/unwatch. The value of "start" determines
2891 * whether a watch request is being initiated or torn down.
2892 */
1f3ef788 2893static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2894{
2895 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2896 struct rbd_obj_request *obj_request;
9969ebc5
AE
2897 int ret;
2898
2899 rbd_assert(start ^ !!rbd_dev->watch_event);
2900 rbd_assert(start ^ !!rbd_dev->watch_request);
2901
2902 if (start) {
3c663bbd 2903 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2904 &rbd_dev->watch_event);
2905 if (ret < 0)
2906 return ret;
8eb87565 2907 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2908 }
2909
2910 ret = -ENOMEM;
2911 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2912 OBJ_REQUEST_NODATA);
2913 if (!obj_request)
2914 goto out_cancel;
2915
430c28c3
AE
2916 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2917 if (!obj_request->osd_req)
2918 goto out_cancel;
2919
8eb87565 2920 if (start)
975241af 2921 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2922 else
6977c3f9 2923 ceph_osdc_unregister_linger_request(osdc,
975241af 2924 rbd_dev->watch_request->osd_req);
2169238d
AE
2925
2926 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2927 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2928 rbd_osd_req_format_write(obj_request);
2169238d 2929
9969ebc5
AE
2930 ret = rbd_obj_request_submit(osdc, obj_request);
2931 if (ret)
2932 goto out_cancel;
2933 ret = rbd_obj_request_wait(obj_request);
2934 if (ret)
2935 goto out_cancel;
9969ebc5
AE
2936 ret = obj_request->result;
2937 if (ret)
2938 goto out_cancel;
2939
8eb87565
AE
2940 /*
2941 * A watch request is set to linger, so the underlying osd
2942 * request won't go away until we unregister it. We retain
2943 * a pointer to the object request during that time (in
2944 * rbd_dev->watch_request), so we'll keep a reference to
2945 * it. We'll drop that reference (below) after we've
2946 * unregistered it.
2947 */
2948 if (start) {
2949 rbd_dev->watch_request = obj_request;
2950
2951 return 0;
2952 }
2953
2954 /* We have successfully torn down the watch request */
2955
2956 rbd_obj_request_put(rbd_dev->watch_request);
2957 rbd_dev->watch_request = NULL;
9969ebc5
AE
2958out_cancel:
2959 /* Cancel the event if we're tearing down, or on error */
2960 ceph_osdc_cancel_event(rbd_dev->watch_event);
2961 rbd_dev->watch_event = NULL;
9969ebc5
AE
2962 if (obj_request)
2963 rbd_obj_request_put(obj_request);
2964
2965 return ret;
2966}
2967
36be9a76 2968/*
f40eb349
AE
2969 * Synchronous osd object method call. Returns the number of bytes
2970 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2971 */
2972static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2973 const char *object_name,
2974 const char *class_name,
2975 const char *method_name,
4157976b 2976 const void *outbound,
36be9a76 2977 size_t outbound_size,
4157976b 2978 void *inbound,
e2a58ee5 2979 size_t inbound_size)
36be9a76 2980{
2169238d 2981 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2982 struct rbd_obj_request *obj_request;
36be9a76
AE
2983 struct page **pages;
2984 u32 page_count;
2985 int ret;
2986
2987 /*
6010a451
AE
2988 * Method calls are ultimately read operations. The result
2989 * should placed into the inbound buffer provided. They
2990 * also supply outbound data--parameters for the object
2991 * method. Currently if this is present it will be a
2992 * snapshot id.
36be9a76 2993 */
57385b51 2994 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2995 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2996 if (IS_ERR(pages))
2997 return PTR_ERR(pages);
2998
2999 ret = -ENOMEM;
6010a451 3000 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
3001 OBJ_REQUEST_PAGES);
3002 if (!obj_request)
3003 goto out;
3004
3005 obj_request->pages = pages;
3006 obj_request->page_count = page_count;
3007
430c28c3 3008 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
3009 if (!obj_request->osd_req)
3010 goto out;
3011
c99d2d4a 3012 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
3013 class_name, method_name);
3014 if (outbound_size) {
3015 struct ceph_pagelist *pagelist;
3016
3017 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3018 if (!pagelist)
3019 goto out;
3020
3021 ceph_pagelist_init(pagelist);
3022 ceph_pagelist_append(pagelist, outbound, outbound_size);
3023 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3024 pagelist);
3025 }
a4ce40a9
AE
3026 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3027 obj_request->pages, inbound_size,
44cd188d 3028 0, false, false);
9d4df01f 3029 rbd_osd_req_format_read(obj_request);
430c28c3 3030
36be9a76
AE
3031 ret = rbd_obj_request_submit(osdc, obj_request);
3032 if (ret)
3033 goto out;
3034 ret = rbd_obj_request_wait(obj_request);
3035 if (ret)
3036 goto out;
3037
3038 ret = obj_request->result;
3039 if (ret < 0)
3040 goto out;
57385b51
AE
3041
3042 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3043 ret = (int)obj_request->xferred;
903bb32e 3044 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3045out:
3046 if (obj_request)
3047 rbd_obj_request_put(obj_request);
3048 else
3049 ceph_release_page_vector(pages, page_count);
3050
3051 return ret;
3052}
3053
bf0d5f50 3054static void rbd_request_fn(struct request_queue *q)
cc344fa1 3055 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
3056{
3057 struct rbd_device *rbd_dev = q->queuedata;
3058 bool read_only = rbd_dev->mapping.read_only;
3059 struct request *rq;
3060 int result;
3061
3062 while ((rq = blk_fetch_request(q))) {
3063 bool write_request = rq_data_dir(rq) == WRITE;
3064 struct rbd_img_request *img_request;
3065 u64 offset;
3066 u64 length;
3067
3068 /* Ignore any non-FS requests that filter through. */
3069
3070 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
3071 dout("%s: non-fs request type %d\n", __func__,
3072 (int) rq->cmd_type);
3073 __blk_end_request_all(rq, 0);
3074 continue;
3075 }
3076
3077 /* Ignore/skip any zero-length requests */
3078
3079 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3080 length = (u64) blk_rq_bytes(rq);
3081
3082 if (!length) {
3083 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3084 __blk_end_request_all(rq, 0);
3085 continue;
3086 }
3087
3088 spin_unlock_irq(q->queue_lock);
3089
3090 /* Disallow writes to a read-only device */
3091
3092 if (write_request) {
3093 result = -EROFS;
3094 if (read_only)
3095 goto end_request;
3096 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3097 }
3098
6d292906
AE
3099 /*
3100 * Quit early if the mapped snapshot no longer
3101 * exists. It's still possible the snapshot will
3102 * have disappeared by the time our request arrives
3103 * at the osd, but there's no sense in sending it if
3104 * we already know.
3105 */
3106 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3107 dout("request for non-existent snapshot");
3108 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3109 result = -ENXIO;
3110 goto end_request;
3111 }
3112
bf0d5f50 3113 result = -EINVAL;
c0cd10db
AE
3114 if (offset && length > U64_MAX - offset + 1) {
3115 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3116 offset, length);
bf0d5f50 3117 goto end_request; /* Shouldn't happen */
c0cd10db 3118 }
bf0d5f50 3119
00a653e2
AE
3120 result = -EIO;
3121 if (offset + length > rbd_dev->mapping.size) {
3122 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3123 offset, length, rbd_dev->mapping.size);
3124 goto end_request;
3125 }
3126
bf0d5f50
AE
3127 result = -ENOMEM;
3128 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3129 write_request);
bf0d5f50
AE
3130 if (!img_request)
3131 goto end_request;
3132
3133 img_request->rq = rq;
3134
f1a4739f
AE
3135 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3136 rq->bio);
bf0d5f50
AE
3137 if (!result)
3138 result = rbd_img_request_submit(img_request);
3139 if (result)
3140 rbd_img_request_put(img_request);
3141end_request:
3142 spin_lock_irq(q->queue_lock);
3143 if (result < 0) {
7da22d29
AE
3144 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3145 write_request ? "write" : "read",
3146 length, offset, result);
3147
bf0d5f50
AE
3148 __blk_end_request_all(rq, result);
3149 }
3150 }
3151}
3152
602adf40
YS
3153/*
3154 * a queue callback. Makes sure that we don't create a bio that spans across
3155 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3156 * which we handle later at bio_chain_clone_range()
602adf40
YS
3157 */
3158static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3159 struct bio_vec *bvec)
3160{
3161 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3162 sector_t sector_offset;
3163 sector_t sectors_per_obj;
3164 sector_t obj_sector_offset;
3165 int ret;
3166
3167 /*
3168 * Find how far into its rbd object the partition-relative
3169 * bio start sector is to offset relative to the enclosing
3170 * device.
3171 */
3172 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3173 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3174 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3175
3176 /*
3177 * Compute the number of bytes from that offset to the end
3178 * of the object. Account for what's already used by the bio.
3179 */
3180 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3181 if (ret > bmd->bi_size)
3182 ret -= bmd->bi_size;
3183 else
3184 ret = 0;
3185
3186 /*
3187 * Don't send back more than was asked for. And if the bio
3188 * was empty, let the whole thing through because: "Note
3189 * that a block device *must* allow a single page to be
3190 * added to an empty bio."
3191 */
3192 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3193 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3194 ret = (int) bvec->bv_len;
3195
3196 return ret;
602adf40
YS
3197}
3198
3199static void rbd_free_disk(struct rbd_device *rbd_dev)
3200{
3201 struct gendisk *disk = rbd_dev->disk;
3202
3203 if (!disk)
3204 return;
3205
a0cab924
AE
3206 rbd_dev->disk = NULL;
3207 if (disk->flags & GENHD_FL_UP) {
602adf40 3208 del_gendisk(disk);
a0cab924
AE
3209 if (disk->queue)
3210 blk_cleanup_queue(disk->queue);
3211 }
602adf40
YS
3212 put_disk(disk);
3213}
3214
788e2df3
AE
3215static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3216 const char *object_name,
7097f8df 3217 u64 offset, u64 length, void *buf)
788e2df3
AE
3218
3219{
2169238d 3220 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3221 struct rbd_obj_request *obj_request;
788e2df3
AE
3222 struct page **pages = NULL;
3223 u32 page_count;
1ceae7ef 3224 size_t size;
788e2df3
AE
3225 int ret;
3226
3227 page_count = (u32) calc_pages_for(offset, length);
3228 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3229 if (IS_ERR(pages))
f693fddf 3230 return PTR_ERR(pages);
788e2df3
AE
3231
3232 ret = -ENOMEM;
3233 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3234 OBJ_REQUEST_PAGES);
788e2df3
AE
3235 if (!obj_request)
3236 goto out;
3237
3238 obj_request->pages = pages;
3239 obj_request->page_count = page_count;
3240
430c28c3 3241 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3242 if (!obj_request->osd_req)
3243 goto out;
3244
c99d2d4a
AE
3245 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3246 offset, length, 0, 0);
406e2c9f 3247 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3248 obj_request->pages,
44cd188d
AE
3249 obj_request->length,
3250 obj_request->offset & ~PAGE_MASK,
3251 false, false);
9d4df01f 3252 rbd_osd_req_format_read(obj_request);
430c28c3 3253
788e2df3
AE
3254 ret = rbd_obj_request_submit(osdc, obj_request);
3255 if (ret)
3256 goto out;
3257 ret = rbd_obj_request_wait(obj_request);
3258 if (ret)
3259 goto out;
3260
3261 ret = obj_request->result;
3262 if (ret < 0)
3263 goto out;
1ceae7ef
AE
3264
3265 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3266 size = (size_t) obj_request->xferred;
903bb32e 3267 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3268 rbd_assert(size <= (size_t)INT_MAX);
3269 ret = (int)size;
788e2df3
AE
3270out:
3271 if (obj_request)
3272 rbd_obj_request_put(obj_request);
3273 else
3274 ceph_release_page_vector(pages, page_count);
3275
3276 return ret;
3277}
3278
602adf40 3279/*
662518b1
AE
3280 * Read the complete header for the given rbd device. On successful
3281 * return, the rbd_dev->header field will contain up-to-date
3282 * information about the image.
602adf40 3283 */
99a41ebc 3284static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3285{
4156d998 3286 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3287 u32 snap_count = 0;
4156d998
AE
3288 u64 names_size = 0;
3289 u32 want_count;
3290 int ret;
602adf40 3291
00f1f36f 3292 /*
4156d998
AE
3293 * The complete header will include an array of its 64-bit
3294 * snapshot ids, followed by the names of those snapshots as
3295 * a contiguous block of NUL-terminated strings. Note that
3296 * the number of snapshots could change by the time we read
3297 * it in, in which case we re-read it.
00f1f36f 3298 */
4156d998
AE
3299 do {
3300 size_t size;
3301
3302 kfree(ondisk);
3303
3304 size = sizeof (*ondisk);
3305 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3306 size += names_size;
3307 ondisk = kmalloc(size, GFP_KERNEL);
3308 if (!ondisk)
662518b1 3309 return -ENOMEM;
4156d998 3310
788e2df3 3311 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3312 0, size, ondisk);
4156d998 3313 if (ret < 0)
662518b1 3314 goto out;
c0cd10db 3315 if ((size_t)ret < size) {
4156d998 3316 ret = -ENXIO;
06ecc6cb
AE
3317 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3318 size, ret);
662518b1 3319 goto out;
4156d998
AE
3320 }
3321 if (!rbd_dev_ondisk_valid(ondisk)) {
3322 ret = -ENXIO;
06ecc6cb 3323 rbd_warn(rbd_dev, "invalid header");
662518b1 3324 goto out;
81e759fb 3325 }
602adf40 3326
4156d998
AE
3327 names_size = le64_to_cpu(ondisk->snap_names_len);
3328 want_count = snap_count;
3329 snap_count = le32_to_cpu(ondisk->snap_count);
3330 } while (snap_count != want_count);
00f1f36f 3331
662518b1
AE
3332 ret = rbd_header_from_disk(rbd_dev, ondisk);
3333out:
4156d998
AE
3334 kfree(ondisk);
3335
3336 return ret;
602adf40
YS
3337}
3338
15228ede
AE
3339/*
3340 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3341 * has disappeared from the (just updated) snapshot context.
3342 */
3343static void rbd_exists_validate(struct rbd_device *rbd_dev)
3344{
3345 u64 snap_id;
3346
3347 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3348 return;
3349
3350 snap_id = rbd_dev->spec->snap_id;
3351 if (snap_id == CEPH_NOSNAP)
3352 return;
3353
3354 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3355 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3356}
3357
5b213542
JD
3358static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3359{
3360 sector_t size;
3361 bool removing;
3362
3363 /*
3364 * Don't hold the lock while doing disk operations,
3365 * or lock ordering will conflict with the bdev mutex via:
3366 * rbd_add() -> blkdev_get() -> rbd_open()
3367 */
3368 spin_lock_irq(&rbd_dev->lock);
3369 removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3370 spin_unlock_irq(&rbd_dev->lock);
3371 /*
3372 * If the device is being removed, rbd_dev->disk has
3373 * been destroyed, so don't try to update its size
3374 */
3375 if (!removing) {
3376 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3377 dout("setting size to %llu sectors", (unsigned long long)size);
3378 set_capacity(rbd_dev->disk, size);
3379 revalidate_disk(rbd_dev->disk);
3380 }
3381}
3382
cc4a38bd 3383static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3384{
e627db08 3385 u64 mapping_size;
1fe5e993
AE
3386 int ret;
3387
117973fb 3388 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3389 mapping_size = rbd_dev->mapping.size;
1fe5e993 3390 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3391 if (rbd_dev->image_format == 1)
99a41ebc 3392 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3393 else
2df3fac7 3394 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3395
3396 /* If it's a mapped snapshot, validate its EXISTS flag */
3397
3398 rbd_exists_validate(rbd_dev);
1fe5e993 3399 mutex_unlock(&ctl_mutex);
00a653e2 3400 if (mapping_size != rbd_dev->mapping.size) {
5b213542 3401 rbd_dev_update_size(rbd_dev);
00a653e2 3402 }
1fe5e993
AE
3403
3404 return ret;
3405}
3406
602adf40
YS
3407static int rbd_init_disk(struct rbd_device *rbd_dev)
3408{
3409 struct gendisk *disk;
3410 struct request_queue *q;
593a9e7b 3411 u64 segment_size;
602adf40 3412
602adf40 3413 /* create gendisk info */
602adf40
YS
3414 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3415 if (!disk)
1fcdb8aa 3416 return -ENOMEM;
602adf40 3417
f0f8cef5 3418 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3419 rbd_dev->dev_id);
602adf40
YS
3420 disk->major = rbd_dev->major;
3421 disk->first_minor = 0;
3422 disk->fops = &rbd_bd_ops;
3423 disk->private_data = rbd_dev;
3424
bf0d5f50 3425 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3426 if (!q)
3427 goto out_disk;
029bcbd8 3428
593a9e7b
AE
3429 /* We use the default size, but let's be explicit about it. */
3430 blk_queue_physical_block_size(q, SECTOR_SIZE);
3431
029bcbd8 3432 /* set io sizes to object size */
593a9e7b
AE
3433 segment_size = rbd_obj_bytes(&rbd_dev->header);
3434 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3435 blk_queue_max_segment_size(q, segment_size);
3436 blk_queue_io_min(q, segment_size);
3437 blk_queue_io_opt(q, segment_size);
029bcbd8 3438
602adf40
YS
3439 blk_queue_merge_bvec(q, rbd_merge_bvec);
3440 disk->queue = q;
3441
3442 q->queuedata = rbd_dev;
3443
3444 rbd_dev->disk = disk;
602adf40 3445
602adf40 3446 return 0;
602adf40
YS
3447out_disk:
3448 put_disk(disk);
1fcdb8aa
AE
3449
3450 return -ENOMEM;
602adf40
YS
3451}
3452
dfc5606d
YS
3453/*
3454 sysfs
3455*/
3456
593a9e7b
AE
3457static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3458{
3459 return container_of(dev, struct rbd_device, dev);
3460}
3461
dfc5606d
YS
3462static ssize_t rbd_size_show(struct device *dev,
3463 struct device_attribute *attr, char *buf)
3464{
593a9e7b 3465 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3466
fc71d833
AE
3467 return sprintf(buf, "%llu\n",
3468 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3469}
3470
34b13184
AE
3471/*
3472 * Note this shows the features for whatever's mapped, which is not
3473 * necessarily the base image.
3474 */
3475static ssize_t rbd_features_show(struct device *dev,
3476 struct device_attribute *attr, char *buf)
3477{
3478 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3479
3480 return sprintf(buf, "0x%016llx\n",
fc71d833 3481 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3482}
3483
dfc5606d
YS
3484static ssize_t rbd_major_show(struct device *dev,
3485 struct device_attribute *attr, char *buf)
3486{
593a9e7b 3487 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3488
fc71d833
AE
3489 if (rbd_dev->major)
3490 return sprintf(buf, "%d\n", rbd_dev->major);
3491
3492 return sprintf(buf, "(none)\n");
3493
dfc5606d
YS
3494}
3495
3496static ssize_t rbd_client_id_show(struct device *dev,
3497 struct device_attribute *attr, char *buf)
602adf40 3498{
593a9e7b 3499 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3500
1dbb4399
AE
3501 return sprintf(buf, "client%lld\n",
3502 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3503}
3504
dfc5606d
YS
3505static ssize_t rbd_pool_show(struct device *dev,
3506 struct device_attribute *attr, char *buf)
602adf40 3507{
593a9e7b 3508 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3509
0d7dbfce 3510 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3511}
3512
9bb2f334
AE
3513static ssize_t rbd_pool_id_show(struct device *dev,
3514 struct device_attribute *attr, char *buf)
3515{
3516 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3517
0d7dbfce 3518 return sprintf(buf, "%llu\n",
fc71d833 3519 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3520}
3521
dfc5606d
YS
3522static ssize_t rbd_name_show(struct device *dev,
3523 struct device_attribute *attr, char *buf)
3524{
593a9e7b 3525 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3526
a92ffdf8
AE
3527 if (rbd_dev->spec->image_name)
3528 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3529
3530 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3531}
3532
589d30e0
AE
3533static ssize_t rbd_image_id_show(struct device *dev,
3534 struct device_attribute *attr, char *buf)
3535{
3536 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3537
0d7dbfce 3538 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3539}
3540
34b13184
AE
3541/*
3542 * Shows the name of the currently-mapped snapshot (or
3543 * RBD_SNAP_HEAD_NAME for the base image).
3544 */
dfc5606d
YS
3545static ssize_t rbd_snap_show(struct device *dev,
3546 struct device_attribute *attr,
3547 char *buf)
3548{
593a9e7b 3549 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3550
0d7dbfce 3551 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3552}
3553
86b00e0d
AE
3554/*
3555 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3556 * for the parent image. If there is no parent, simply shows
3557 * "(no parent image)".
3558 */
3559static ssize_t rbd_parent_show(struct device *dev,
3560 struct device_attribute *attr,
3561 char *buf)
3562{
3563 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3564 struct rbd_spec *spec = rbd_dev->parent_spec;
3565 int count;
3566 char *bufp = buf;
3567
3568 if (!spec)
3569 return sprintf(buf, "(no parent image)\n");
3570
3571 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3572 (unsigned long long) spec->pool_id, spec->pool_name);
3573 if (count < 0)
3574 return count;
3575 bufp += count;
3576
3577 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3578 spec->image_name ? spec->image_name : "(unknown)");
3579 if (count < 0)
3580 return count;
3581 bufp += count;
3582
3583 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3584 (unsigned long long) spec->snap_id, spec->snap_name);
3585 if (count < 0)
3586 return count;
3587 bufp += count;
3588
3589 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3590 if (count < 0)
3591 return count;
3592 bufp += count;
3593
3594 return (ssize_t) (bufp - buf);
3595}
3596
dfc5606d
YS
3597static ssize_t rbd_image_refresh(struct device *dev,
3598 struct device_attribute *attr,
3599 const char *buf,
3600 size_t size)
3601{
593a9e7b 3602 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3603 int ret;
602adf40 3604
cc4a38bd 3605 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3606 if (ret)
3607 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3608
3609 return ret < 0 ? ret : size;
dfc5606d 3610}
602adf40 3611
dfc5606d 3612static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3613static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3614static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3615static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3616static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3617static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3618static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3619static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3620static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3621static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3622static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3623
3624static struct attribute *rbd_attrs[] = {
3625 &dev_attr_size.attr,
34b13184 3626 &dev_attr_features.attr,
dfc5606d
YS
3627 &dev_attr_major.attr,
3628 &dev_attr_client_id.attr,
3629 &dev_attr_pool.attr,
9bb2f334 3630 &dev_attr_pool_id.attr,
dfc5606d 3631 &dev_attr_name.attr,
589d30e0 3632 &dev_attr_image_id.attr,
dfc5606d 3633 &dev_attr_current_snap.attr,
86b00e0d 3634 &dev_attr_parent.attr,
dfc5606d 3635 &dev_attr_refresh.attr,
dfc5606d
YS
3636 NULL
3637};
3638
3639static struct attribute_group rbd_attr_group = {
3640 .attrs = rbd_attrs,
3641};
3642
3643static const struct attribute_group *rbd_attr_groups[] = {
3644 &rbd_attr_group,
3645 NULL
3646};
3647
3648static void rbd_sysfs_dev_release(struct device *dev)
3649{
3650}
3651
3652static struct device_type rbd_device_type = {
3653 .name = "rbd",
3654 .groups = rbd_attr_groups,
3655 .release = rbd_sysfs_dev_release,
3656};
3657
8b8fb99c
AE
3658static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3659{
3660 kref_get(&spec->kref);
3661
3662 return spec;
3663}
3664
3665static void rbd_spec_free(struct kref *kref);
3666static void rbd_spec_put(struct rbd_spec *spec)
3667{
3668 if (spec)
3669 kref_put(&spec->kref, rbd_spec_free);
3670}
3671
3672static struct rbd_spec *rbd_spec_alloc(void)
3673{
3674 struct rbd_spec *spec;
3675
3676 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3677 if (!spec)
3678 return NULL;
3679 kref_init(&spec->kref);
3680
8b8fb99c
AE
3681 return spec;
3682}
3683
3684static void rbd_spec_free(struct kref *kref)
3685{
3686 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3687
3688 kfree(spec->pool_name);
3689 kfree(spec->image_id);
3690 kfree(spec->image_name);
3691 kfree(spec->snap_name);
3692 kfree(spec);
3693}
3694
cc344fa1 3695static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3696 struct rbd_spec *spec)
3697{
3698 struct rbd_device *rbd_dev;
3699
3700 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3701 if (!rbd_dev)
3702 return NULL;
3703
3704 spin_lock_init(&rbd_dev->lock);
6d292906 3705 rbd_dev->flags = 0;
a2acd00e 3706 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3707 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3708 init_rwsem(&rbd_dev->header_rwsem);
3709
3710 rbd_dev->spec = spec;
3711 rbd_dev->rbd_client = rbdc;
3712
0903e875
AE
3713 /* Initialize the layout used for all rbd requests */
3714
3715 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3716 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3717 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3718 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3719
c53d5893
AE
3720 return rbd_dev;
3721}
3722
3723static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3724{
c53d5893
AE
3725 rbd_put_client(rbd_dev->rbd_client);
3726 rbd_spec_put(rbd_dev->spec);
3727 kfree(rbd_dev);
3728}
3729
9d475de5
AE
3730/*
3731 * Get the size and object order for an image snapshot, or if
3732 * snap_id is CEPH_NOSNAP, gets this information for the base
3733 * image.
3734 */
3735static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3736 u8 *order, u64 *snap_size)
3737{
3738 __le64 snapid = cpu_to_le64(snap_id);
3739 int ret;
3740 struct {
3741 u8 order;
3742 __le64 size;
3743 } __attribute__ ((packed)) size_buf = { 0 };
3744
36be9a76 3745 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3746 "rbd", "get_size",
4157976b 3747 &snapid, sizeof (snapid),
e2a58ee5 3748 &size_buf, sizeof (size_buf));
36be9a76 3749 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3750 if (ret < 0)
3751 return ret;
57385b51
AE
3752 if (ret < sizeof (size_buf))
3753 return -ERANGE;
9d475de5 3754
dd932ee7 3755 if (order) {
c86f86e9 3756 *order = size_buf.order;
dd932ee7
JD
3757 dout(" order %u", (unsigned int)*order);
3758 }
9d475de5
AE
3759 *snap_size = le64_to_cpu(size_buf.size);
3760
dd932ee7
JD
3761 dout(" snap_id 0x%016llx snap_size = %llu\n",
3762 (unsigned long long)snap_id,
57385b51 3763 (unsigned long long)*snap_size);
9d475de5
AE
3764
3765 return 0;
3766}
3767
3768static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3769{
3770 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3771 &rbd_dev->header.obj_order,
3772 &rbd_dev->header.image_size);
3773}
3774
1e130199
AE
3775static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3776{
3777 void *reply_buf;
3778 int ret;
3779 void *p;
3780
3781 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3782 if (!reply_buf)
3783 return -ENOMEM;
3784
36be9a76 3785 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3786 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3787 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3788 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3789 if (ret < 0)
3790 goto out;
3791
3792 p = reply_buf;
3793 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3794 p + ret, NULL, GFP_NOIO);
3795 ret = 0;
1e130199
AE
3796
3797 if (IS_ERR(rbd_dev->header.object_prefix)) {
3798 ret = PTR_ERR(rbd_dev->header.object_prefix);
3799 rbd_dev->header.object_prefix = NULL;
3800 } else {
3801 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3802 }
1e130199
AE
3803out:
3804 kfree(reply_buf);
3805
3806 return ret;
3807}
3808
b1b5402a
AE
3809static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3810 u64 *snap_features)
3811{
3812 __le64 snapid = cpu_to_le64(snap_id);
3813 struct {
3814 __le64 features;
3815 __le64 incompat;
4157976b 3816 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3817 u64 incompat;
b1b5402a
AE
3818 int ret;
3819
36be9a76 3820 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3821 "rbd", "get_features",
4157976b 3822 &snapid, sizeof (snapid),
e2a58ee5 3823 &features_buf, sizeof (features_buf));
36be9a76 3824 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3825 if (ret < 0)
3826 return ret;
57385b51
AE
3827 if (ret < sizeof (features_buf))
3828 return -ERANGE;
d889140c
AE
3829
3830 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3831 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3832 return -ENXIO;
d889140c 3833
b1b5402a
AE
3834 *snap_features = le64_to_cpu(features_buf.features);
3835
3836 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3837 (unsigned long long)snap_id,
3838 (unsigned long long)*snap_features,
3839 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3840
3841 return 0;
3842}
3843
3844static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3845{
3846 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3847 &rbd_dev->header.features);
3848}
3849
86b00e0d
AE
3850static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3851{
3852 struct rbd_spec *parent_spec;
3853 size_t size;
3854 void *reply_buf = NULL;
3855 __le64 snapid;
3856 void *p;
3857 void *end;
642a2537 3858 u64 pool_id;
86b00e0d
AE
3859 char *image_id;
3860 u64 overlap;
86b00e0d
AE
3861 int ret;
3862
3863 parent_spec = rbd_spec_alloc();
3864 if (!parent_spec)
3865 return -ENOMEM;
3866
3867 size = sizeof (__le64) + /* pool_id */
3868 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3869 sizeof (__le64) + /* snap_id */
3870 sizeof (__le64); /* overlap */
3871 reply_buf = kmalloc(size, GFP_KERNEL);
3872 if (!reply_buf) {
3873 ret = -ENOMEM;
3874 goto out_err;
3875 }
3876
3877 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3878 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3879 "rbd", "get_parent",
4157976b 3880 &snapid, sizeof (snapid),
e2a58ee5 3881 reply_buf, size);
36be9a76 3882 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3883 if (ret < 0)
3884 goto out_err;
3885
86b00e0d 3886 p = reply_buf;
57385b51
AE
3887 end = reply_buf + ret;
3888 ret = -ERANGE;
642a2537 3889 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
3890 if (pool_id == CEPH_NOPOOL) {
3891 /*
3892 * Either the parent never existed, or we have
3893 * record of it but the image got flattened so it no
3894 * longer has a parent. When the parent of a
3895 * layered image disappears we immediately set the
3896 * overlap to 0. The effect of this is that all new
3897 * requests will be treated as if the image had no
3898 * parent.
3899 */
3900 if (rbd_dev->parent_overlap) {
3901 rbd_dev->parent_overlap = 0;
3902 smp_mb();
3903 rbd_dev_parent_put(rbd_dev);
3904 pr_info("%s: clone image has been flattened\n",
3905 rbd_dev->disk->disk_name);
3906 }
3907
86b00e0d 3908 goto out; /* No parent? No problem. */
392a9dad 3909 }
86b00e0d 3910
0903e875
AE
3911 /* The ceph file layout needs to fit pool id in 32 bits */
3912
3913 ret = -EIO;
642a2537 3914 if (pool_id > (u64)U32_MAX) {
c0cd10db 3915 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3916 (unsigned long long)pool_id, U32_MAX);
57385b51 3917 goto out_err;
c0cd10db 3918 }
642a2537 3919 parent_spec->pool_id = pool_id;
0903e875 3920
979ed480 3921 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3922 if (IS_ERR(image_id)) {
3923 ret = PTR_ERR(image_id);
3924 goto out_err;
3925 }
3926 parent_spec->image_id = image_id;
3927 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3928 ceph_decode_64_safe(&p, end, overlap, out_err);
3929
70cf49cf 3930 if (overlap) {
642a2537 3931 rbd_spec_put(rbd_dev->parent_spec);
70cf49cf
AE
3932 rbd_dev->parent_spec = parent_spec;
3933 parent_spec = NULL; /* rbd_dev now owns this */
3934 rbd_dev->parent_overlap = overlap;
3935 } else {
3936 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3937 }
86b00e0d
AE
3938out:
3939 ret = 0;
3940out_err:
3941 kfree(reply_buf);
3942 rbd_spec_put(parent_spec);
3943
3944 return ret;
3945}
3946
cc070d59
AE
3947static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3948{
3949 struct {
3950 __le64 stripe_unit;
3951 __le64 stripe_count;
3952 } __attribute__ ((packed)) striping_info_buf = { 0 };
3953 size_t size = sizeof (striping_info_buf);
3954 void *p;
3955 u64 obj_size;
3956 u64 stripe_unit;
3957 u64 stripe_count;
3958 int ret;
3959
3960 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3961 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3962 (char *)&striping_info_buf, size);
cc070d59
AE
3963 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3964 if (ret < 0)
3965 return ret;
3966 if (ret < size)
3967 return -ERANGE;
3968
3969 /*
3970 * We don't actually support the "fancy striping" feature
3971 * (STRIPINGV2) yet, but if the striping sizes are the
3972 * defaults the behavior is the same as before. So find
3973 * out, and only fail if the image has non-default values.
3974 */
3975 ret = -EINVAL;
3976 obj_size = (u64)1 << rbd_dev->header.obj_order;
3977 p = &striping_info_buf;
3978 stripe_unit = ceph_decode_64(&p);
3979 if (stripe_unit != obj_size) {
3980 rbd_warn(rbd_dev, "unsupported stripe unit "
3981 "(got %llu want %llu)",
3982 stripe_unit, obj_size);
3983 return -EINVAL;
3984 }
3985 stripe_count = ceph_decode_64(&p);
3986 if (stripe_count != 1) {
3987 rbd_warn(rbd_dev, "unsupported stripe count "
3988 "(got %llu want 1)", stripe_count);
3989 return -EINVAL;
3990 }
500d0c0f
AE
3991 rbd_dev->header.stripe_unit = stripe_unit;
3992 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3993
3994 return 0;
3995}
3996
9e15b77d
AE
3997static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3998{
3999 size_t image_id_size;
4000 char *image_id;
4001 void *p;
4002 void *end;
4003 size_t size;
4004 void *reply_buf = NULL;
4005 size_t len = 0;
4006 char *image_name = NULL;
4007 int ret;
4008
4009 rbd_assert(!rbd_dev->spec->image_name);
4010
69e7a02f
AE
4011 len = strlen(rbd_dev->spec->image_id);
4012 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4013 image_id = kmalloc(image_id_size, GFP_KERNEL);
4014 if (!image_id)
4015 return NULL;
4016
4017 p = image_id;
4157976b 4018 end = image_id + image_id_size;
57385b51 4019 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4020
4021 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4022 reply_buf = kmalloc(size, GFP_KERNEL);
4023 if (!reply_buf)
4024 goto out;
4025
36be9a76 4026 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
4027 "rbd", "dir_get_name",
4028 image_id, image_id_size,
e2a58ee5 4029 reply_buf, size);
9e15b77d
AE
4030 if (ret < 0)
4031 goto out;
4032 p = reply_buf;
f40eb349
AE
4033 end = reply_buf + ret;
4034
9e15b77d
AE
4035 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4036 if (IS_ERR(image_name))
4037 image_name = NULL;
4038 else
4039 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4040out:
4041 kfree(reply_buf);
4042 kfree(image_id);
4043
4044 return image_name;
4045}
4046
2ad3d716
AE
4047static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4048{
4049 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4050 const char *snap_name;
4051 u32 which = 0;
4052
4053 /* Skip over names until we find the one we are looking for */
4054
4055 snap_name = rbd_dev->header.snap_names;
4056 while (which < snapc->num_snaps) {
4057 if (!strcmp(name, snap_name))
4058 return snapc->snaps[which];
4059 snap_name += strlen(snap_name) + 1;
4060 which++;
4061 }
4062 return CEPH_NOSNAP;
4063}
4064
4065static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4066{
4067 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4068 u32 which;
4069 bool found = false;
4070 u64 snap_id;
4071
4072 for (which = 0; !found && which < snapc->num_snaps; which++) {
4073 const char *snap_name;
4074
4075 snap_id = snapc->snaps[which];
4076 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6fe77759
JD
4077 if (IS_ERR(snap_name)) {
4078 /* ignore no-longer existing snapshots */
4079 if (PTR_ERR(snap_name) == -ENOENT)
4080 continue;
4081 else
4082 break;
4083 }
2ad3d716
AE
4084 found = !strcmp(name, snap_name);
4085 kfree(snap_name);
4086 }
4087 return found ? snap_id : CEPH_NOSNAP;
4088}
4089
4090/*
4091 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4092 * no snapshot by that name is found, or if an error occurs.
4093 */
4094static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4095{
4096 if (rbd_dev->image_format == 1)
4097 return rbd_v1_snap_id_by_name(rbd_dev, name);
4098
4099 return rbd_v2_snap_id_by_name(rbd_dev, name);
4100}
4101
9e15b77d 4102/*
2e9f7f1c
AE
4103 * When an rbd image has a parent image, it is identified by the
4104 * pool, image, and snapshot ids (not names). This function fills
4105 * in the names for those ids. (It's OK if we can't figure out the
4106 * name for an image id, but the pool and snapshot ids should always
4107 * exist and have names.) All names in an rbd spec are dynamically
4108 * allocated.
e1d4213f
AE
4109 *
4110 * When an image being mapped (not a parent) is probed, we have the
4111 * pool name and pool id, image name and image id, and the snapshot
4112 * name. The only thing we're missing is the snapshot id.
9e15b77d 4113 */
2e9f7f1c 4114static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 4115{
2e9f7f1c
AE
4116 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4117 struct rbd_spec *spec = rbd_dev->spec;
4118 const char *pool_name;
4119 const char *image_name;
4120 const char *snap_name;
9e15b77d
AE
4121 int ret;
4122
e1d4213f
AE
4123 /*
4124 * An image being mapped will have the pool name (etc.), but
4125 * we need to look up the snapshot id.
4126 */
2e9f7f1c
AE
4127 if (spec->pool_name) {
4128 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4129 u64 snap_id;
e1d4213f 4130
2ad3d716
AE
4131 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4132 if (snap_id == CEPH_NOSNAP)
e1d4213f 4133 return -ENOENT;
2ad3d716 4134 spec->snap_id = snap_id;
e1d4213f 4135 } else {
2e9f7f1c 4136 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4137 }
4138
4139 return 0;
4140 }
9e15b77d 4141
2e9f7f1c 4142 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4143
2e9f7f1c
AE
4144 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4145 if (!pool_name) {
4146 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4147 return -EIO;
4148 }
2e9f7f1c
AE
4149 pool_name = kstrdup(pool_name, GFP_KERNEL);
4150 if (!pool_name)
9e15b77d
AE
4151 return -ENOMEM;
4152
4153 /* Fetch the image name; tolerate failure here */
4154
2e9f7f1c
AE
4155 image_name = rbd_dev_image_name(rbd_dev);
4156 if (!image_name)
06ecc6cb 4157 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4158
2e9f7f1c 4159 /* Look up the snapshot name, and make a copy */
9e15b77d 4160
2e9f7f1c 4161 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
08518f6f
JD
4162 if (IS_ERR(snap_name)) {
4163 ret = PTR_ERR(snap_name);
9e15b77d 4164 goto out_err;
2e9f7f1c
AE
4165 }
4166
4167 spec->pool_name = pool_name;
4168 spec->image_name = image_name;
4169 spec->snap_name = snap_name;
9e15b77d
AE
4170
4171 return 0;
4172out_err:
2e9f7f1c
AE
4173 kfree(image_name);
4174 kfree(pool_name);
9e15b77d
AE
4175
4176 return ret;
4177}
4178
cc4a38bd 4179static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4180{
4181 size_t size;
4182 int ret;
4183 void *reply_buf;
4184 void *p;
4185 void *end;
4186 u64 seq;
4187 u32 snap_count;
4188 struct ceph_snap_context *snapc;
4189 u32 i;
4190
4191 /*
4192 * We'll need room for the seq value (maximum snapshot id),
4193 * snapshot count, and array of that many snapshot ids.
4194 * For now we have a fixed upper limit on the number we're
4195 * prepared to receive.
4196 */
4197 size = sizeof (__le64) + sizeof (__le32) +
4198 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4199 reply_buf = kzalloc(size, GFP_KERNEL);
4200 if (!reply_buf)
4201 return -ENOMEM;
4202
36be9a76 4203 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4204 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4205 reply_buf, size);
36be9a76 4206 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4207 if (ret < 0)
4208 goto out;
4209
35d489f9 4210 p = reply_buf;
57385b51
AE
4211 end = reply_buf + ret;
4212 ret = -ERANGE;
35d489f9
AE
4213 ceph_decode_64_safe(&p, end, seq, out);
4214 ceph_decode_32_safe(&p, end, snap_count, out);
4215
4216 /*
4217 * Make sure the reported number of snapshot ids wouldn't go
4218 * beyond the end of our buffer. But before checking that,
4219 * make sure the computed size of the snapshot context we
4220 * allocate is representable in a size_t.
4221 */
4222 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4223 / sizeof (u64)) {
4224 ret = -EINVAL;
4225 goto out;
4226 }
4227 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4228 goto out;
468521c1 4229 ret = 0;
35d489f9 4230
812164f8 4231 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4232 if (!snapc) {
4233 ret = -ENOMEM;
4234 goto out;
4235 }
35d489f9 4236 snapc->seq = seq;
35d489f9
AE
4237 for (i = 0; i < snap_count; i++)
4238 snapc->snaps[i] = ceph_decode_64(&p);
4239
49ece554 4240 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4241 rbd_dev->header.snapc = snapc;
4242
4243 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4244 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4245out:
4246 kfree(reply_buf);
4247
57385b51 4248 return ret;
35d489f9
AE
4249}
4250
54cac61f
AE
4251static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4252 u64 snap_id)
b8b1e2db
AE
4253{
4254 size_t size;
4255 void *reply_buf;
54cac61f 4256 __le64 snapid;
b8b1e2db
AE
4257 int ret;
4258 void *p;
4259 void *end;
b8b1e2db
AE
4260 char *snap_name;
4261
4262 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4263 reply_buf = kmalloc(size, GFP_KERNEL);
4264 if (!reply_buf)
4265 return ERR_PTR(-ENOMEM);
4266
54cac61f 4267 snapid = cpu_to_le64(snap_id);
36be9a76 4268 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4269 "rbd", "get_snapshot_name",
54cac61f 4270 &snapid, sizeof (snapid),
e2a58ee5 4271 reply_buf, size);
36be9a76 4272 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4273 if (ret < 0) {
4274 snap_name = ERR_PTR(ret);
b8b1e2db 4275 goto out;
f40eb349 4276 }
b8b1e2db
AE
4277
4278 p = reply_buf;
f40eb349 4279 end = reply_buf + ret;
e5c35534 4280 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4281 if (IS_ERR(snap_name))
b8b1e2db 4282 goto out;
b8b1e2db 4283
f40eb349 4284 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4285 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4286out:
4287 kfree(reply_buf);
4288
f40eb349 4289 return snap_name;
b8b1e2db
AE
4290}
4291
2df3fac7 4292static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4293{
2df3fac7 4294 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4295 int ret;
117973fb
AE
4296
4297 down_write(&rbd_dev->header_rwsem);
4298
1617e40c
JD
4299 ret = rbd_dev_v2_image_size(rbd_dev);
4300 if (ret)
4301 goto out;
4302
2df3fac7
AE
4303 if (first_time) {
4304 ret = rbd_dev_v2_header_onetime(rbd_dev);
4305 if (ret)
4306 goto out;
4307 }
4308
642a2537
AE
4309 /*
4310 * If the image supports layering, get the parent info. We
4311 * need to probe the first time regardless. Thereafter we
4312 * only need to if there's a parent, to see if it has
4313 * disappeared due to the mapped image getting flattened.
4314 */
4315 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4316 (first_time || rbd_dev->parent_spec)) {
4317 bool warn;
4318
4319 ret = rbd_dev_v2_parent_info(rbd_dev);
4320 if (ret)
4321 goto out;
4322
4323 /*
4324 * Print a warning if this is the initial probe and
4325 * the image has a parent. Don't print it if the
4326 * image now being probed is itself a parent. We
4327 * can tell at this point because we won't know its
4328 * pool name yet (just its pool id).
4329 */
4330 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4331 if (first_time && warn)
4332 rbd_warn(rbd_dev, "WARNING: kernel layering "
4333 "is EXPERIMENTAL!");
4334 }
4335
29334ba4
AE
4336 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4337 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4338 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4339
cc4a38bd 4340 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4341 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4342out:
4343 up_write(&rbd_dev->header_rwsem);
4344
4345 return ret;
4346}
4347
dfc5606d
YS
4348static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4349{
dfc5606d 4350 struct device *dev;
cd789ab9 4351 int ret;
dfc5606d
YS
4352
4353 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4354
cd789ab9 4355 dev = &rbd_dev->dev;
dfc5606d
YS
4356 dev->bus = &rbd_bus_type;
4357 dev->type = &rbd_device_type;
4358 dev->parent = &rbd_root_dev;
200a6a8b 4359 dev->release = rbd_dev_device_release;
de71a297 4360 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4361 ret = device_register(dev);
dfc5606d 4362
dfc5606d 4363 mutex_unlock(&ctl_mutex);
cd789ab9 4364
dfc5606d 4365 return ret;
602adf40
YS
4366}
4367
dfc5606d
YS
4368static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4369{
4370 device_unregister(&rbd_dev->dev);
4371}
4372
e2839308 4373static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4374
4375/*
499afd5b
AE
4376 * Get a unique rbd identifier for the given new rbd_dev, and add
4377 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4378 */
e2839308 4379static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4380{
e2839308 4381 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4382
4383 spin_lock(&rbd_dev_list_lock);
4384 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4385 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4386 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4387 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4388}
b7f23c36 4389
1ddbe94e 4390/*
499afd5b
AE
4391 * Remove an rbd_dev from the global list, and record that its
4392 * identifier is no longer in use.
1ddbe94e 4393 */
e2839308 4394static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4395{
d184f6bf 4396 struct list_head *tmp;
de71a297 4397 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4398 int max_id;
4399
aafb230e 4400 rbd_assert(rbd_id > 0);
499afd5b 4401
e2839308
AE
4402 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4403 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4404 spin_lock(&rbd_dev_list_lock);
4405 list_del_init(&rbd_dev->node);
d184f6bf
AE
4406
4407 /*
4408 * If the id being "put" is not the current maximum, there
4409 * is nothing special we need to do.
4410 */
e2839308 4411 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4412 spin_unlock(&rbd_dev_list_lock);
4413 return;
4414 }
4415
4416 /*
4417 * We need to update the current maximum id. Search the
4418 * list to find out what it is. We're more likely to find
4419 * the maximum at the end, so search the list backward.
4420 */
4421 max_id = 0;
4422 list_for_each_prev(tmp, &rbd_dev_list) {
4423 struct rbd_device *rbd_dev;
4424
4425 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4426 if (rbd_dev->dev_id > max_id)
4427 max_id = rbd_dev->dev_id;
d184f6bf 4428 }
499afd5b 4429 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4430
1ddbe94e 4431 /*
e2839308 4432 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4433 * which case it now accurately reflects the new maximum.
4434 * Be careful not to overwrite the maximum value in that
4435 * case.
1ddbe94e 4436 */
e2839308
AE
4437 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4438 dout(" max dev id has been reset\n");
b7f23c36
AE
4439}
4440
e28fff26
AE
4441/*
4442 * Skips over white space at *buf, and updates *buf to point to the
4443 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4444 * the token (string of non-white space characters) found. Note
4445 * that *buf must be terminated with '\0'.
e28fff26
AE
4446 */
4447static inline size_t next_token(const char **buf)
4448{
4449 /*
4450 * These are the characters that produce nonzero for
4451 * isspace() in the "C" and "POSIX" locales.
4452 */
4453 const char *spaces = " \f\n\r\t\v";
4454
4455 *buf += strspn(*buf, spaces); /* Find start of token */
4456
4457 return strcspn(*buf, spaces); /* Return token length */
4458}
4459
4460/*
4461 * Finds the next token in *buf, and if the provided token buffer is
4462 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4463 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4464 * must be terminated with '\0' on entry.
e28fff26
AE
4465 *
4466 * Returns the length of the token found (not including the '\0').
4467 * Return value will be 0 if no token is found, and it will be >=
4468 * token_size if the token would not fit.
4469 *
593a9e7b 4470 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4471 * found token. Note that this occurs even if the token buffer is
4472 * too small to hold it.
4473 */
4474static inline size_t copy_token(const char **buf,
4475 char *token,
4476 size_t token_size)
4477{
4478 size_t len;
4479
4480 len = next_token(buf);
4481 if (len < token_size) {
4482 memcpy(token, *buf, len);
4483 *(token + len) = '\0';
4484 }
4485 *buf += len;
4486
4487 return len;
4488}
4489
ea3352f4
AE
4490/*
4491 * Finds the next token in *buf, dynamically allocates a buffer big
4492 * enough to hold a copy of it, and copies the token into the new
4493 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4494 * that a duplicate buffer is created even for a zero-length token.
4495 *
4496 * Returns a pointer to the newly-allocated duplicate, or a null
4497 * pointer if memory for the duplicate was not available. If
4498 * the lenp argument is a non-null pointer, the length of the token
4499 * (not including the '\0') is returned in *lenp.
4500 *
4501 * If successful, the *buf pointer will be updated to point beyond
4502 * the end of the found token.
4503 *
4504 * Note: uses GFP_KERNEL for allocation.
4505 */
4506static inline char *dup_token(const char **buf, size_t *lenp)
4507{
4508 char *dup;
4509 size_t len;
4510
4511 len = next_token(buf);
4caf35f9 4512 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4513 if (!dup)
4514 return NULL;
ea3352f4
AE
4515 *(dup + len) = '\0';
4516 *buf += len;
4517
4518 if (lenp)
4519 *lenp = len;
4520
4521 return dup;
4522}
4523
a725f65e 4524/*
859c31df
AE
4525 * Parse the options provided for an "rbd add" (i.e., rbd image
4526 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4527 * and the data written is passed here via a NUL-terminated buffer.
4528 * Returns 0 if successful or an error code otherwise.
d22f76e7 4529 *
859c31df
AE
4530 * The information extracted from these options is recorded in
4531 * the other parameters which return dynamically-allocated
4532 * structures:
4533 * ceph_opts
4534 * The address of a pointer that will refer to a ceph options
4535 * structure. Caller must release the returned pointer using
4536 * ceph_destroy_options() when it is no longer needed.
4537 * rbd_opts
4538 * Address of an rbd options pointer. Fully initialized by
4539 * this function; caller must release with kfree().
4540 * spec
4541 * Address of an rbd image specification pointer. Fully
4542 * initialized by this function based on parsed options.
4543 * Caller must release with rbd_spec_put().
4544 *
4545 * The options passed take this form:
4546 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4547 * where:
4548 * <mon_addrs>
4549 * A comma-separated list of one or more monitor addresses.
4550 * A monitor address is an ip address, optionally followed
4551 * by a port number (separated by a colon).
4552 * I.e.: ip1[:port1][,ip2[:port2]...]
4553 * <options>
4554 * A comma-separated list of ceph and/or rbd options.
4555 * <pool_name>
4556 * The name of the rados pool containing the rbd image.
4557 * <image_name>
4558 * The name of the image in that pool to map.
4559 * <snap_id>
4560 * An optional snapshot id. If provided, the mapping will
4561 * present data from the image at the time that snapshot was
4562 * created. The image head is used if no snapshot id is
4563 * provided. Snapshot mappings are always read-only.
a725f65e 4564 */
859c31df 4565static int rbd_add_parse_args(const char *buf,
dc79b113 4566 struct ceph_options **ceph_opts,
859c31df
AE
4567 struct rbd_options **opts,
4568 struct rbd_spec **rbd_spec)
e28fff26 4569{
d22f76e7 4570 size_t len;
859c31df 4571 char *options;
0ddebc0c 4572 const char *mon_addrs;
ecb4dc22 4573 char *snap_name;
0ddebc0c 4574 size_t mon_addrs_size;
859c31df 4575 struct rbd_spec *spec = NULL;
4e9afeba 4576 struct rbd_options *rbd_opts = NULL;
859c31df 4577 struct ceph_options *copts;
dc79b113 4578 int ret;
e28fff26
AE
4579
4580 /* The first four tokens are required */
4581
7ef3214a 4582 len = next_token(&buf);
4fb5d671
AE
4583 if (!len) {
4584 rbd_warn(NULL, "no monitor address(es) provided");
4585 return -EINVAL;
4586 }
0ddebc0c 4587 mon_addrs = buf;
f28e565a 4588 mon_addrs_size = len + 1;
7ef3214a 4589 buf += len;
a725f65e 4590
dc79b113 4591 ret = -EINVAL;
f28e565a
AE
4592 options = dup_token(&buf, NULL);
4593 if (!options)
dc79b113 4594 return -ENOMEM;
4fb5d671
AE
4595 if (!*options) {
4596 rbd_warn(NULL, "no options provided");
4597 goto out_err;
4598 }
e28fff26 4599
859c31df
AE
4600 spec = rbd_spec_alloc();
4601 if (!spec)
f28e565a 4602 goto out_mem;
859c31df
AE
4603
4604 spec->pool_name = dup_token(&buf, NULL);
4605 if (!spec->pool_name)
4606 goto out_mem;
4fb5d671
AE
4607 if (!*spec->pool_name) {
4608 rbd_warn(NULL, "no pool name provided");
4609 goto out_err;
4610 }
e28fff26 4611
69e7a02f 4612 spec->image_name = dup_token(&buf, NULL);
859c31df 4613 if (!spec->image_name)
f28e565a 4614 goto out_mem;
4fb5d671
AE
4615 if (!*spec->image_name) {
4616 rbd_warn(NULL, "no image name provided");
4617 goto out_err;
4618 }
d4b125e9 4619
f28e565a
AE
4620 /*
4621 * Snapshot name is optional; default is to use "-"
4622 * (indicating the head/no snapshot).
4623 */
3feeb894 4624 len = next_token(&buf);
820a5f3e 4625 if (!len) {
3feeb894
AE
4626 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4627 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4628 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4629 ret = -ENAMETOOLONG;
f28e565a 4630 goto out_err;
849b4260 4631 }
ecb4dc22
AE
4632 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4633 if (!snap_name)
f28e565a 4634 goto out_mem;
ecb4dc22
AE
4635 *(snap_name + len) = '\0';
4636 spec->snap_name = snap_name;
e5c35534 4637
0ddebc0c 4638 /* Initialize all rbd options to the defaults */
e28fff26 4639
4e9afeba
AE
4640 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4641 if (!rbd_opts)
4642 goto out_mem;
4643
4644 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4645
859c31df 4646 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4647 mon_addrs + mon_addrs_size - 1,
4e9afeba 4648 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4649 if (IS_ERR(copts)) {
4650 ret = PTR_ERR(copts);
dc79b113
AE
4651 goto out_err;
4652 }
859c31df
AE
4653 kfree(options);
4654
4655 *ceph_opts = copts;
4e9afeba 4656 *opts = rbd_opts;
859c31df 4657 *rbd_spec = spec;
0ddebc0c 4658
dc79b113 4659 return 0;
f28e565a 4660out_mem:
dc79b113 4661 ret = -ENOMEM;
d22f76e7 4662out_err:
859c31df
AE
4663 kfree(rbd_opts);
4664 rbd_spec_put(spec);
f28e565a 4665 kfree(options);
d22f76e7 4666
dc79b113 4667 return ret;
a725f65e
AE
4668}
4669
589d30e0
AE
4670/*
4671 * An rbd format 2 image has a unique identifier, distinct from the
4672 * name given to it by the user. Internally, that identifier is
4673 * what's used to specify the names of objects related to the image.
4674 *
4675 * A special "rbd id" object is used to map an rbd image name to its
4676 * id. If that object doesn't exist, then there is no v2 rbd image
4677 * with the supplied name.
4678 *
4679 * This function will record the given rbd_dev's image_id field if
4680 * it can be determined, and in that case will return 0. If any
4681 * errors occur a negative errno will be returned and the rbd_dev's
4682 * image_id field will be unchanged (and should be NULL).
4683 */
4684static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4685{
4686 int ret;
4687 size_t size;
4688 char *object_name;
4689 void *response;
c0fba368 4690 char *image_id;
2f82ee54 4691
2c0d0a10
AE
4692 /*
4693 * When probing a parent image, the image id is already
4694 * known (and the image name likely is not). There's no
c0fba368
AE
4695 * need to fetch the image id again in this case. We
4696 * do still need to set the image format though.
2c0d0a10 4697 */
c0fba368
AE
4698 if (rbd_dev->spec->image_id) {
4699 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4700
2c0d0a10 4701 return 0;
c0fba368 4702 }
2c0d0a10 4703
589d30e0
AE
4704 /*
4705 * First, see if the format 2 image id file exists, and if
4706 * so, get the image's persistent id from it.
4707 */
69e7a02f 4708 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4709 object_name = kmalloc(size, GFP_NOIO);
4710 if (!object_name)
4711 return -ENOMEM;
0d7dbfce 4712 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4713 dout("rbd id object name is %s\n", object_name);
4714
4715 /* Response will be an encoded string, which includes a length */
4716
4717 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4718 response = kzalloc(size, GFP_NOIO);
4719 if (!response) {
4720 ret = -ENOMEM;
4721 goto out;
4722 }
4723
c0fba368
AE
4724 /* If it doesn't exist we'll assume it's a format 1 image */
4725
36be9a76 4726 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4727 "rbd", "get_id", NULL, 0,
e2a58ee5 4728 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4729 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4730 if (ret == -ENOENT) {
4731 image_id = kstrdup("", GFP_KERNEL);
4732 ret = image_id ? 0 : -ENOMEM;
4733 if (!ret)
4734 rbd_dev->image_format = 1;
4735 } else if (ret > sizeof (__le32)) {
4736 void *p = response;
4737
4738 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4739 NULL, GFP_NOIO);
c0fba368
AE
4740 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4741 if (!ret)
4742 rbd_dev->image_format = 2;
589d30e0 4743 } else {
c0fba368
AE
4744 ret = -EINVAL;
4745 }
4746
4747 if (!ret) {
4748 rbd_dev->spec->image_id = image_id;
4749 dout("image_id is %s\n", image_id);
589d30e0
AE
4750 }
4751out:
4752 kfree(response);
4753 kfree(object_name);
4754
4755 return ret;
4756}
4757
3abef3b3
AE
4758/*
4759 * Undo whatever state changes are made by v1 or v2 header info
4760 * call.
4761 */
6fd48b3b
AE
4762static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4763{
4764 struct rbd_image_header *header;
4765
392a9dad
AE
4766 /* Drop parent reference unless it's already been done (or none) */
4767
4768 if (rbd_dev->parent_overlap)
4769 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4770
4771 /* Free dynamic fields from the header, then zero it out */
4772
4773 header = &rbd_dev->header;
812164f8 4774 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4775 kfree(header->snap_sizes);
4776 kfree(header->snap_names);
4777 kfree(header->object_prefix);
4778 memset(header, 0, sizeof (*header));
4779}
4780
2df3fac7 4781static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
4782{
4783 int ret;
a30b71b9 4784
1e130199 4785 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4786 if (ret)
b1b5402a
AE
4787 goto out_err;
4788
2df3fac7
AE
4789 /*
4790 * Get the and check features for the image. Currently the
4791 * features are assumed to never change.
4792 */
b1b5402a 4793 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4794 if (ret)
9d475de5 4795 goto out_err;
35d489f9 4796
cc070d59
AE
4797 /* If the image supports fancy striping, get its parameters */
4798
4799 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4800 ret = rbd_dev_v2_striping_info(rbd_dev);
4801 if (ret < 0)
4802 goto out_err;
4803 }
2df3fac7 4804 /* No support for crypto and compression type format 2 images */
a30b71b9 4805
35152979 4806 return 0;
9d475de5 4807out_err:
642a2537 4808 rbd_dev->header.features = 0;
1e130199
AE
4809 kfree(rbd_dev->header.object_prefix);
4810 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4811
4812 return ret;
a30b71b9
AE
4813}
4814
124afba2 4815static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4816{
2f82ee54 4817 struct rbd_device *parent = NULL;
124afba2
AE
4818 struct rbd_spec *parent_spec;
4819 struct rbd_client *rbdc;
4820 int ret;
4821
4822 if (!rbd_dev->parent_spec)
4823 return 0;
4824 /*
4825 * We need to pass a reference to the client and the parent
4826 * spec when creating the parent rbd_dev. Images related by
4827 * parent/child relationships always share both.
4828 */
4829 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4830 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4831
4832 ret = -ENOMEM;
4833 parent = rbd_dev_create(rbdc, parent_spec);
4834 if (!parent)
4835 goto out_err;
4836
1f3ef788 4837 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4838 if (ret < 0)
4839 goto out_err;
4840 rbd_dev->parent = parent;
a2acd00e 4841 atomic_set(&rbd_dev->parent_ref, 1);
124afba2
AE
4842
4843 return 0;
4844out_err:
4845 if (parent) {
fb65d228 4846 rbd_dev_unparent(rbd_dev);
124afba2
AE
4847 kfree(rbd_dev->header_name);
4848 rbd_dev_destroy(parent);
4849 } else {
4850 rbd_put_client(rbdc);
4851 rbd_spec_put(parent_spec);
4852 }
4853
4854 return ret;
4855}
4856
200a6a8b 4857static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4858{
83a06263 4859 int ret;
d1cf5788 4860
83a06263
AE
4861 /* generate unique id: find highest unique id, add one */
4862 rbd_dev_id_get(rbd_dev);
4863
4864 /* Fill in the device name, now that we have its id. */
4865 BUILD_BUG_ON(DEV_NAME_LEN
4866 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4867 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4868
4869 /* Get our block major device number. */
4870
4871 ret = register_blkdev(0, rbd_dev->name);
4872 if (ret < 0)
4873 goto err_out_id;
4874 rbd_dev->major = ret;
4875
4876 /* Set up the blkdev mapping. */
4877
4878 ret = rbd_init_disk(rbd_dev);
4879 if (ret)
4880 goto err_out_blkdev;
4881
f35a4dee 4882 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4883 if (ret)
4884 goto err_out_disk;
f35a4dee
AE
4885 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4886
4887 ret = rbd_bus_add_dev(rbd_dev);
4888 if (ret)
4889 goto err_out_mapping;
83a06263 4890
83a06263
AE
4891 /* Everything's ready. Announce the disk to the world. */
4892
129b79d4 4893 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4894 add_disk(rbd_dev->disk);
4895
4896 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4897 (unsigned long long) rbd_dev->mapping.size);
4898
4899 return ret;
2f82ee54 4900
f35a4dee
AE
4901err_out_mapping:
4902 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4903err_out_disk:
4904 rbd_free_disk(rbd_dev);
4905err_out_blkdev:
4906 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4907err_out_id:
4908 rbd_dev_id_put(rbd_dev);
d1cf5788 4909 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4910
4911 return ret;
4912}
4913
332bb12d
AE
4914static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4915{
4916 struct rbd_spec *spec = rbd_dev->spec;
4917 size_t size;
4918
4919 /* Record the header object name for this rbd image. */
4920
4921 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4922
4923 if (rbd_dev->image_format == 1)
4924 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4925 else
4926 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4927
4928 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4929 if (!rbd_dev->header_name)
4930 return -ENOMEM;
4931
4932 if (rbd_dev->image_format == 1)
4933 sprintf(rbd_dev->header_name, "%s%s",
4934 spec->image_name, RBD_SUFFIX);
4935 else
4936 sprintf(rbd_dev->header_name, "%s%s",
4937 RBD_HEADER_PREFIX, spec->image_id);
4938 return 0;
4939}
4940
200a6a8b
AE
4941static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4942{
6fd48b3b 4943 rbd_dev_unprobe(rbd_dev);
200a6a8b 4944 kfree(rbd_dev->header_name);
6fd48b3b
AE
4945 rbd_dev->header_name = NULL;
4946 rbd_dev->image_format = 0;
4947 kfree(rbd_dev->spec->image_id);
4948 rbd_dev->spec->image_id = NULL;
4949
200a6a8b
AE
4950 rbd_dev_destroy(rbd_dev);
4951}
4952
a30b71b9
AE
4953/*
4954 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4955 * device. If this image is the one being mapped (i.e., not a
4956 * parent), initiate a watch on its header object before using that
4957 * object to get detailed information about the rbd image.
a30b71b9 4958 */
1f3ef788 4959static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4960{
4961 int ret;
b644de2b 4962 int tmp;
a30b71b9
AE
4963
4964 /*
3abef3b3
AE
4965 * Get the id from the image id object. Unless there's an
4966 * error, rbd_dev->spec->image_id will be filled in with
4967 * a dynamically-allocated string, and rbd_dev->image_format
4968 * will be set to either 1 or 2.
a30b71b9
AE
4969 */
4970 ret = rbd_dev_image_id(rbd_dev);
4971 if (ret)
c0fba368
AE
4972 return ret;
4973 rbd_assert(rbd_dev->spec->image_id);
4974 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4975
332bb12d
AE
4976 ret = rbd_dev_header_name(rbd_dev);
4977 if (ret)
4978 goto err_out_format;
4979
1f3ef788
AE
4980 if (mapping) {
4981 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4982 if (ret)
4983 goto out_header_name;
4984 }
b644de2b 4985
c0fba368 4986 if (rbd_dev->image_format == 1)
99a41ebc 4987 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 4988 else
2df3fac7 4989 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 4990 if (ret)
b644de2b 4991 goto err_out_watch;
83a06263 4992
9bb81c9b
AE
4993 ret = rbd_dev_spec_update(rbd_dev);
4994 if (ret)
33dca39f 4995 goto err_out_probe;
9bb81c9b
AE
4996
4997 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
4998 if (ret)
4999 goto err_out_probe;
5000
5001 dout("discovered format %u image, header name is %s\n",
5002 rbd_dev->image_format, rbd_dev->header_name);
83a06263 5003
30d60ba2 5004 return 0;
6fd48b3b
AE
5005err_out_probe:
5006 rbd_dev_unprobe(rbd_dev);
b644de2b 5007err_out_watch:
1f3ef788
AE
5008 if (mapping) {
5009 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
5010 if (tmp)
5011 rbd_warn(rbd_dev, "unable to tear down "
5012 "watch request (%d)\n", tmp);
5013 }
332bb12d
AE
5014out_header_name:
5015 kfree(rbd_dev->header_name);
5016 rbd_dev->header_name = NULL;
5017err_out_format:
5018 rbd_dev->image_format = 0;
5655c4d9
AE
5019 kfree(rbd_dev->spec->image_id);
5020 rbd_dev->spec->image_id = NULL;
5021
5022 dout("probe failed, returning %d\n", ret);
5023
a30b71b9
AE
5024 return ret;
5025}
5026
59c2be1e
YS
5027static ssize_t rbd_add(struct bus_type *bus,
5028 const char *buf,
5029 size_t count)
602adf40 5030{
cb8627c7 5031 struct rbd_device *rbd_dev = NULL;
dc79b113 5032 struct ceph_options *ceph_opts = NULL;
4e9afeba 5033 struct rbd_options *rbd_opts = NULL;
859c31df 5034 struct rbd_spec *spec = NULL;
9d3997fd 5035 struct rbd_client *rbdc;
27cc2594 5036 struct ceph_osd_client *osdc;
51344a38 5037 bool read_only;
27cc2594 5038 int rc = -ENOMEM;
602adf40
YS
5039
5040 if (!try_module_get(THIS_MODULE))
5041 return -ENODEV;
5042
602adf40 5043 /* parse add command */
859c31df 5044 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5045 if (rc < 0)
bd4ba655 5046 goto err_out_module;
51344a38
AE
5047 read_only = rbd_opts->read_only;
5048 kfree(rbd_opts);
5049 rbd_opts = NULL; /* done with this */
78cea76e 5050
9d3997fd
AE
5051 rbdc = rbd_get_client(ceph_opts);
5052 if (IS_ERR(rbdc)) {
5053 rc = PTR_ERR(rbdc);
0ddebc0c 5054 goto err_out_args;
9d3997fd 5055 }
602adf40 5056
602adf40 5057 /* pick the pool */
9d3997fd 5058 osdc = &rbdc->client->osdc;
859c31df 5059 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
5060 if (rc < 0)
5061 goto err_out_client;
c0cd10db 5062 spec->pool_id = (u64)rc;
859c31df 5063
0903e875
AE
5064 /* The ceph file layout needs to fit pool id in 32 bits */
5065
c0cd10db
AE
5066 if (spec->pool_id > (u64)U32_MAX) {
5067 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5068 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
5069 rc = -EIO;
5070 goto err_out_client;
5071 }
5072
c53d5893 5073 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
5074 if (!rbd_dev)
5075 goto err_out_client;
c53d5893
AE
5076 rbdc = NULL; /* rbd_dev now owns this */
5077 spec = NULL; /* rbd_dev now owns this */
602adf40 5078
1f3ef788 5079 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 5080 if (rc < 0)
c53d5893 5081 goto err_out_rbd_dev;
05fd6f6f 5082
7ce4eef7
AE
5083 /* If we are mapping a snapshot it must be marked read-only */
5084
5085 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5086 read_only = true;
5087 rbd_dev->mapping.read_only = read_only;
5088
b536f69a 5089 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3
AE
5090 if (rc) {
5091 rbd_dev_image_release(rbd_dev);
5092 goto err_out_module;
5093 }
5094
5095 return count;
b536f69a 5096
c53d5893
AE
5097err_out_rbd_dev:
5098 rbd_dev_destroy(rbd_dev);
bd4ba655 5099err_out_client:
9d3997fd 5100 rbd_put_client(rbdc);
0ddebc0c 5101err_out_args:
859c31df 5102 rbd_spec_put(spec);
bd4ba655
AE
5103err_out_module:
5104 module_put(THIS_MODULE);
27cc2594 5105
602adf40 5106 dout("Error adding device %s\n", buf);
27cc2594 5107
c0cd10db 5108 return (ssize_t)rc;
602adf40
YS
5109}
5110
200a6a8b 5111static void rbd_dev_device_release(struct device *dev)
602adf40 5112{
593a9e7b 5113 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 5114
602adf40 5115 rbd_free_disk(rbd_dev);
200a6a8b 5116 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5117 rbd_dev_mapping_clear(rbd_dev);
602adf40 5118 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 5119 rbd_dev->major = 0;
e2839308 5120 rbd_dev_id_put(rbd_dev);
d1cf5788 5121 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5122}
5123
05a46afd
AE
5124static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5125{
ad945fc1 5126 while (rbd_dev->parent) {
05a46afd
AE
5127 struct rbd_device *first = rbd_dev;
5128 struct rbd_device *second = first->parent;
5129 struct rbd_device *third;
5130
5131 /*
5132 * Follow to the parent with no grandparent and
5133 * remove it.
5134 */
5135 while (second && (third = second->parent)) {
5136 first = second;
5137 second = third;
5138 }
ad945fc1 5139 rbd_assert(second);
8ad42cd0 5140 rbd_dev_image_release(second);
ad945fc1
AE
5141 first->parent = NULL;
5142 first->parent_overlap = 0;
5143
5144 rbd_assert(first->parent_spec);
05a46afd
AE
5145 rbd_spec_put(first->parent_spec);
5146 first->parent_spec = NULL;
05a46afd
AE
5147 }
5148}
5149
dfc5606d
YS
5150static ssize_t rbd_remove(struct bus_type *bus,
5151 const char *buf,
5152 size_t count)
602adf40
YS
5153{
5154 struct rbd_device *rbd_dev = NULL;
c4d00f5b
AE
5155 struct list_head *tmp;
5156 int dev_id;
602adf40 5157 unsigned long ul;
7aa73ee1 5158 bool already = false;
0d8189e1 5159 int ret;
602adf40 5160
0d8189e1
AE
5161 ret = strict_strtoul(buf, 10, &ul);
5162 if (ret)
5163 return ret;
602adf40
YS
5164
5165 /* convert to int; abort if we lost anything in the conversion */
c4d00f5b
AE
5166 dev_id = (int)ul;
5167 if (dev_id != ul)
602adf40
YS
5168 return -EINVAL;
5169
5170 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5171
c4d00f5b
AE
5172 ret = -ENOENT;
5173 spin_lock(&rbd_dev_list_lock);
5174 list_for_each(tmp, &rbd_dev_list) {
5175 rbd_dev = list_entry(tmp, struct rbd_device, node);
5176 if (rbd_dev->dev_id == dev_id) {
5177 ret = 0;
5178 break;
5179 }
42382b70 5180 }
c4d00f5b
AE
5181 if (!ret) {
5182 spin_lock_irq(&rbd_dev->lock);
5183 if (rbd_dev->open_count)
5184 ret = -EBUSY;
5185 else
7aa73ee1
AE
5186 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5187 &rbd_dev->flags);
c4d00f5b
AE
5188 spin_unlock_irq(&rbd_dev->lock);
5189 }
5190 spin_unlock(&rbd_dev_list_lock);
7aa73ee1 5191 if (ret < 0 || already)
42382b70 5192 goto done;
c4d00f5b 5193
1f3ef788
AE
5194 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5195 if (ret)
5196 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
282e0636
JD
5197
5198 /*
5199 * flush remaining watch callbacks - these must be complete
5200 * before the osd_client is shutdown
5201 */
5202 dout("%s: flushing notifies", __func__);
5203 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5b213542
JD
5204 /*
5205 * Don't free anything from rbd_dev->disk until after all
5206 * notifies are completely processed. Otherwise
5207 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5208 * in a potential use after free of rbd_dev->disk or rbd_dev.
5209 */
5210 rbd_bus_del_dev(rbd_dev);
8ad42cd0 5211 rbd_dev_image_release(rbd_dev);
79ab7558 5212 module_put(THIS_MODULE);
1f3ef788 5213 ret = count;
602adf40
YS
5214done:
5215 mutex_unlock(&ctl_mutex);
aafb230e 5216
602adf40
YS
5217 return ret;
5218}
5219
602adf40
YS
5220/*
5221 * create control files in sysfs
dfc5606d 5222 * /sys/bus/rbd/...
602adf40
YS
5223 */
5224static int rbd_sysfs_init(void)
5225{
dfc5606d 5226 int ret;
602adf40 5227
fed4c143 5228 ret = device_register(&rbd_root_dev);
21079786 5229 if (ret < 0)
dfc5606d 5230 return ret;
602adf40 5231
fed4c143
AE
5232 ret = bus_register(&rbd_bus_type);
5233 if (ret < 0)
5234 device_unregister(&rbd_root_dev);
602adf40 5235
602adf40
YS
5236 return ret;
5237}
5238
5239static void rbd_sysfs_cleanup(void)
5240{
dfc5606d 5241 bus_unregister(&rbd_bus_type);
fed4c143 5242 device_unregister(&rbd_root_dev);
602adf40
YS
5243}
5244
1c2a9dfe
AE
5245static int rbd_slab_init(void)
5246{
5247 rbd_assert(!rbd_img_request_cache);
5248 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5249 sizeof (struct rbd_img_request),
5250 __alignof__(struct rbd_img_request),
5251 0, NULL);
868311b1
AE
5252 if (!rbd_img_request_cache)
5253 return -ENOMEM;
5254
5255 rbd_assert(!rbd_obj_request_cache);
5256 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5257 sizeof (struct rbd_obj_request),
5258 __alignof__(struct rbd_obj_request),
5259 0, NULL);
78c2a44a
AE
5260 if (!rbd_obj_request_cache)
5261 goto out_err;
5262
5263 rbd_assert(!rbd_segment_name_cache);
5264 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5265 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5266 if (rbd_segment_name_cache)
1c2a9dfe 5267 return 0;
78c2a44a
AE
5268out_err:
5269 if (rbd_obj_request_cache) {
5270 kmem_cache_destroy(rbd_obj_request_cache);
5271 rbd_obj_request_cache = NULL;
5272 }
1c2a9dfe 5273
868311b1
AE
5274 kmem_cache_destroy(rbd_img_request_cache);
5275 rbd_img_request_cache = NULL;
5276
1c2a9dfe
AE
5277 return -ENOMEM;
5278}
5279
5280static void rbd_slab_exit(void)
5281{
78c2a44a
AE
5282 rbd_assert(rbd_segment_name_cache);
5283 kmem_cache_destroy(rbd_segment_name_cache);
5284 rbd_segment_name_cache = NULL;
5285
868311b1
AE
5286 rbd_assert(rbd_obj_request_cache);
5287 kmem_cache_destroy(rbd_obj_request_cache);
5288 rbd_obj_request_cache = NULL;
5289
1c2a9dfe
AE
5290 rbd_assert(rbd_img_request_cache);
5291 kmem_cache_destroy(rbd_img_request_cache);
5292 rbd_img_request_cache = NULL;
5293}
5294
cc344fa1 5295static int __init rbd_init(void)
602adf40
YS
5296{
5297 int rc;
5298
1e32d34c
AE
5299 if (!libceph_compatible(NULL)) {
5300 rbd_warn(NULL, "libceph incompatibility (quitting)");
5301
5302 return -EINVAL;
5303 }
1c2a9dfe 5304 rc = rbd_slab_init();
602adf40
YS
5305 if (rc)
5306 return rc;
1c2a9dfe
AE
5307 rc = rbd_sysfs_init();
5308 if (rc)
5309 rbd_slab_exit();
5310 else
5311 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5312
5313 return rc;
602adf40
YS
5314}
5315
cc344fa1 5316static void __exit rbd_exit(void)
602adf40
YS
5317{
5318 rbd_sysfs_cleanup();
1c2a9dfe 5319 rbd_slab_exit();
602adf40
YS
5320}
5321
5322module_init(rbd_init);
5323module_exit(rbd_exit);
5324
5325MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5326MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5327MODULE_DESCRIPTION("rados block device");
5328
5329/* following authorship retained from original osdblk.c */
5330MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5331
5332MODULE_LICENSE("GPL");