Merge tag 'v3.10.105' into update
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
a2acd00e
AE
58/*
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
63 */
64static int atomic_inc_return_safe(atomic_t *v)
65{
66 unsigned int counter;
67
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
70 return (int)counter;
71
72 atomic_dec(v);
73
74 return -EINVAL;
75}
76
77/* Decrement the counter. Return the resulting value, or -EINVAL */
78static int atomic_dec_return_safe(atomic_t *v)
79{
80 int counter;
81
82 counter = atomic_dec_return(v);
83 if (counter >= 0)
84 return counter;
85
86 atomic_inc(v);
87
88 return -EINVAL;
89}
90
f0f8cef5
AE
91#define RBD_DRV_NAME "rbd"
92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
93
94#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95
5ca84dac
ID
96#define RBD_MAX_PARENT_CHAIN_LEN 16
97
d4b125e9
AE
98#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
99#define RBD_MAX_SNAP_NAME_LEN \
100 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
101
35d489f9 102#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
103
104#define RBD_SNAP_HEAD_NAME "-"
105
9682fc6d
AE
106#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
107
9e15b77d
AE
108/* This allows a single page to hold an image name sent by OSD */
109#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 110#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 111
1e130199 112#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 113
d889140c
AE
114/* Feature bits */
115
5cbf6f12
AE
116#define RBD_FEATURE_LAYERING (1<<0)
117#define RBD_FEATURE_STRIPINGV2 (1<<1)
118#define RBD_FEATURES_ALL \
119 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
120
121/* Features supported by this (client software) implementation. */
122
770eba6e 123#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 124
81a89793
AE
125/*
126 * An RBD device name will be "rbd#", where the "rbd" comes from
127 * RBD_DRV_NAME above, and # is a unique integer identifier.
128 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
129 * enough to hold all possible device names.
130 */
602adf40 131#define DEV_NAME_LEN 32
81a89793 132#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
133
134/*
135 * block device image metadata (in-memory version)
136 */
137struct rbd_image_header {
f35a4dee 138 /* These six fields never change for a given rbd image */
849b4260 139 char *object_prefix;
602adf40
YS
140 __u8 obj_order;
141 __u8 crypt_type;
142 __u8 comp_type;
f35a4dee
AE
143 u64 stripe_unit;
144 u64 stripe_count;
145 u64 features; /* Might be changeable someday? */
602adf40 146
f84344f3
AE
147 /* The remaining fields need to be updated occasionally */
148 u64 image_size;
149 struct ceph_snap_context *snapc;
f35a4dee
AE
150 char *snap_names; /* format 1 only */
151 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
152};
153
0d7dbfce
AE
154/*
155 * An rbd image specification.
156 *
157 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
158 * identify an image. Each rbd_dev structure includes a pointer to
159 * an rbd_spec structure that encapsulates this identity.
160 *
161 * Each of the id's in an rbd_spec has an associated name. For a
162 * user-mapped image, the names are supplied and the id's associated
163 * with them are looked up. For a layered image, a parent image is
164 * defined by the tuple, and the names are looked up.
165 *
166 * An rbd_dev structure contains a parent_spec pointer which is
167 * non-null if the image it represents is a child in a layered
168 * image. This pointer will refer to the rbd_spec structure used
169 * by the parent rbd_dev for its own identity (i.e., the structure
170 * is shared between the parent and child).
171 *
172 * Since these structures are populated once, during the discovery
173 * phase of image construction, they are effectively immutable so
174 * we make no effort to synchronize access to them.
175 *
176 * Note that code herein does not assume the image name is known (it
177 * could be a null pointer).
0d7dbfce
AE
178 */
179struct rbd_spec {
180 u64 pool_id;
ecb4dc22 181 const char *pool_name;
0d7dbfce 182
ecb4dc22
AE
183 const char *image_id;
184 const char *image_name;
0d7dbfce
AE
185
186 u64 snap_id;
ecb4dc22 187 const char *snap_name;
0d7dbfce
AE
188
189 struct kref kref;
190};
191
602adf40 192/*
f0f8cef5 193 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
194 */
195struct rbd_client {
196 struct ceph_client *client;
197 struct kref kref;
198 struct list_head node;
199};
200
bf0d5f50
AE
201struct rbd_img_request;
202typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
203
204#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
205
206struct rbd_obj_request;
207typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
208
9969ebc5
AE
209enum obj_request_type {
210 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
211};
bf0d5f50 212
926f9b3f
AE
213enum obj_req_flags {
214 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 215 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
216 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
217 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
218};
219
bf0d5f50
AE
220struct rbd_obj_request {
221 const char *object_name;
222 u64 offset; /* object start byte */
223 u64 length; /* bytes from offset */
926f9b3f 224 unsigned long flags;
bf0d5f50 225
c5b5ef6c
AE
226 /*
227 * An object request associated with an image will have its
228 * img_data flag set; a standalone object request will not.
229 *
230 * A standalone object request will have which == BAD_WHICH
231 * and a null obj_request pointer.
232 *
233 * An object request initiated in support of a layered image
234 * object (to check for its existence before a write) will
235 * have which == BAD_WHICH and a non-null obj_request pointer.
236 *
237 * Finally, an object request for rbd image data will have
238 * which != BAD_WHICH, and will have a non-null img_request
239 * pointer. The value of which will be in the range
240 * 0..(img_request->obj_request_count-1).
241 */
242 union {
243 struct rbd_obj_request *obj_request; /* STAT op */
244 struct {
245 struct rbd_img_request *img_request;
246 u64 img_offset;
247 /* links for img_request->obj_requests list */
248 struct list_head links;
249 };
250 };
bf0d5f50
AE
251 u32 which; /* posn image request list */
252
253 enum obj_request_type type;
788e2df3
AE
254 union {
255 struct bio *bio_list;
256 struct {
257 struct page **pages;
258 u32 page_count;
259 };
260 };
0eefd470 261 struct page **copyup_pages;
ebda6408 262 u32 copyup_page_count;
bf0d5f50
AE
263
264 struct ceph_osd_request *osd_req;
265
266 u64 xferred; /* bytes transferred */
1b83bef2 267 int result;
bf0d5f50
AE
268
269 rbd_obj_callback_t callback;
788e2df3 270 struct completion completion;
bf0d5f50
AE
271
272 struct kref kref;
273};
274
0c425248 275enum img_req_flags {
9849e986
AE
276 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
277 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 278 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
279};
280
bf0d5f50 281struct rbd_img_request {
bf0d5f50
AE
282 struct rbd_device *rbd_dev;
283 u64 offset; /* starting image byte offset */
284 u64 length; /* byte count from offset */
0c425248 285 unsigned long flags;
bf0d5f50 286 union {
9849e986 287 u64 snap_id; /* for reads */
bf0d5f50 288 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
289 };
290 union {
291 struct request *rq; /* block request */
292 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 293 };
3d7efd18 294 struct page **copyup_pages;
ebda6408 295 u32 copyup_page_count;
bf0d5f50
AE
296 spinlock_t completion_lock;/* protects next_completion */
297 u32 next_completion;
298 rbd_img_callback_t callback;
55f27e09 299 u64 xferred;/* aggregate bytes transferred */
a5a337d4 300 int result; /* first nonzero obj_request result */
bf0d5f50
AE
301
302 u32 obj_request_count;
303 struct list_head obj_requests; /* rbd_obj_request structs */
304
305 struct kref kref;
306};
307
308#define for_each_obj_request(ireq, oreq) \
ef06f4d3 309 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 310#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 311 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 312#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 313 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 314
f84344f3 315struct rbd_mapping {
99c1f08f 316 u64 size;
34b13184 317 u64 features;
f84344f3
AE
318 bool read_only;
319};
320
602adf40
YS
321/*
322 * a single device
323 */
324struct rbd_device {
de71a297 325 int dev_id; /* blkdev unique id */
602adf40
YS
326
327 int major; /* blkdev assigned major */
328 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 329
a30b71b9 330 u32 image_format; /* Either 1 or 2 */
602adf40
YS
331 struct rbd_client *rbd_client;
332
333 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
334
b82d167b 335 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
336
337 struct rbd_image_header header;
b82d167b 338 unsigned long flags; /* possibly lock protected */
0d7dbfce 339 struct rbd_spec *spec;
602adf40 340
0d7dbfce 341 char *header_name;
971f839a 342
0903e875
AE
343 struct ceph_file_layout layout;
344
59c2be1e 345 struct ceph_osd_event *watch_event;
975241af 346 struct rbd_obj_request *watch_request;
59c2be1e 347
86b00e0d
AE
348 struct rbd_spec *parent_spec;
349 u64 parent_overlap;
a2acd00e 350 atomic_t parent_ref;
2f82ee54 351 struct rbd_device *parent;
86b00e0d 352
c666601a
JD
353 /* protects updating the header */
354 struct rw_semaphore header_rwsem;
f84344f3
AE
355
356 struct rbd_mapping mapping;
602adf40
YS
357
358 struct list_head node;
dfc5606d 359
dfc5606d
YS
360 /* sysfs related */
361 struct device dev;
b82d167b 362 unsigned long open_count; /* protected by lock */
dfc5606d
YS
363};
364
b82d167b
AE
365/*
366 * Flag bits for rbd_dev->flags. If atomicity is required,
367 * rbd_dev->lock is used to protect access.
368 *
369 * Currently, only the "removing" flag (which is coupled with the
370 * "open_count" field) requires atomic access.
371 */
6d292906
AE
372enum rbd_dev_flags {
373 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 374 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
375};
376
602adf40 377static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 378
602adf40 379static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
380static DEFINE_SPINLOCK(rbd_dev_list_lock);
381
432b8587
AE
382static LIST_HEAD(rbd_client_list); /* clients */
383static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 384
78c2a44a
AE
385/* Slab caches for frequently-allocated structures */
386
1c2a9dfe 387static struct kmem_cache *rbd_img_request_cache;
868311b1 388static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 389static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 390
3d7efd18
AE
391static int rbd_img_request_submit(struct rbd_img_request *img_request);
392
200a6a8b 393static void rbd_dev_device_release(struct device *dev);
dfc5606d 394
f0f8cef5
AE
395static ssize_t rbd_add(struct bus_type *bus, const char *buf,
396 size_t count);
397static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
398 size_t count);
5ca84dac 399static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 400static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5
AE
401
402static struct bus_attribute rbd_bus_attrs[] = {
403 __ATTR(add, S_IWUSR, NULL, rbd_add),
404 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
405 __ATTR_NULL
406};
407
408static struct bus_type rbd_bus_type = {
409 .name = "rbd",
410 .bus_attrs = rbd_bus_attrs,
411};
412
413static void rbd_root_dev_release(struct device *dev)
414{
415}
416
417static struct device rbd_root_dev = {
418 .init_name = "rbd",
419 .release = rbd_root_dev_release,
420};
421
06ecc6cb
AE
422static __printf(2, 3)
423void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
424{
425 struct va_format vaf;
426 va_list args;
427
428 va_start(args, fmt);
429 vaf.fmt = fmt;
430 vaf.va = &args;
431
432 if (!rbd_dev)
433 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
434 else if (rbd_dev->disk)
435 printk(KERN_WARNING "%s: %s: %pV\n",
436 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
437 else if (rbd_dev->spec && rbd_dev->spec->image_name)
438 printk(KERN_WARNING "%s: image %s: %pV\n",
439 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
440 else if (rbd_dev->spec && rbd_dev->spec->image_id)
441 printk(KERN_WARNING "%s: id %s: %pV\n",
442 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
443 else /* punt */
444 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
445 RBD_DRV_NAME, rbd_dev, &vaf);
446 va_end(args);
447}
448
aafb230e
AE
449#ifdef RBD_DEBUG
450#define rbd_assert(expr) \
451 if (unlikely(!(expr))) { \
452 printk(KERN_ERR "\nAssertion failure in %s() " \
453 "at line %d:\n\n" \
454 "\trbd_assert(%s);\n\n", \
455 __func__, __LINE__, #expr); \
456 BUG(); \
457 }
458#else /* !RBD_DEBUG */
459# define rbd_assert(expr) ((void) 0)
460#endif /* !RBD_DEBUG */
dfc5606d 461
dff252b8 462static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
b454e36d 463static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
464static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
465static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 466
cc4a38bd 467static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
468static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
469static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
470static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
471 u64 snap_id);
2ad3d716
AE
472static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
473 u8 *order, u64 *snap_size);
474static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
475 u64 *snap_features);
476static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 477
602adf40
YS
478static int rbd_open(struct block_device *bdev, fmode_t mode)
479{
f0f8cef5 480 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 481 bool removing = false;
602adf40 482
f84344f3 483 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
484 return -EROFS;
485
a14ea269 486 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
487 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
488 removing = true;
489 else
490 rbd_dev->open_count++;
a14ea269 491 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
492 if (removing)
493 return -ENOENT;
494
42382b70 495 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 496 (void) get_device(&rbd_dev->dev);
f84344f3 497 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 498 mutex_unlock(&ctl_mutex);
340c7a2b 499
602adf40
YS
500 return 0;
501}
502
db2a144b 503static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
504{
505 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
506 unsigned long open_count_before;
507
a14ea269 508 spin_lock_irq(&rbd_dev->lock);
b82d167b 509 open_count_before = rbd_dev->open_count--;
a14ea269 510 spin_unlock_irq(&rbd_dev->lock);
b82d167b 511 rbd_assert(open_count_before > 0);
dfc5606d 512
42382b70 513 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 514 put_device(&rbd_dev->dev);
42382b70 515 mutex_unlock(&ctl_mutex);
dfc5606d
YS
516}
517
602adf40
YS
518static const struct block_device_operations rbd_bd_ops = {
519 .owner = THIS_MODULE,
520 .open = rbd_open,
dfc5606d 521 .release = rbd_release,
602adf40
YS
522};
523
524/*
7262cfca
AE
525 * Initialize an rbd client instance. Success or not, this function
526 * consumes ceph_opts.
602adf40 527 */
f8c38929 528static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
529{
530 struct rbd_client *rbdc;
531 int ret = -ENOMEM;
532
37206ee5 533 dout("%s:\n", __func__);
602adf40
YS
534 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
535 if (!rbdc)
536 goto out_opt;
537
538 kref_init(&rbdc->kref);
539 INIT_LIST_HEAD(&rbdc->node);
540
bc534d86
AE
541 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
542
43ae4701 543 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 544 if (IS_ERR(rbdc->client))
bc534d86 545 goto out_mutex;
43ae4701 546 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
547
548 ret = ceph_open_session(rbdc->client);
549 if (ret < 0)
550 goto out_err;
551
432b8587 552 spin_lock(&rbd_client_list_lock);
602adf40 553 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 554 spin_unlock(&rbd_client_list_lock);
602adf40 555
bc534d86 556 mutex_unlock(&ctl_mutex);
37206ee5 557 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 558
602adf40
YS
559 return rbdc;
560
561out_err:
562 ceph_destroy_client(rbdc->client);
bc534d86
AE
563out_mutex:
564 mutex_unlock(&ctl_mutex);
602adf40
YS
565 kfree(rbdc);
566out_opt:
43ae4701
AE
567 if (ceph_opts)
568 ceph_destroy_options(ceph_opts);
37206ee5
AE
569 dout("%s: error %d\n", __func__, ret);
570
28f259b7 571 return ERR_PTR(ret);
602adf40
YS
572}
573
2f82ee54
AE
574static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
575{
576 kref_get(&rbdc->kref);
577
578 return rbdc;
579}
580
602adf40 581/*
1f7ba331
AE
582 * Find a ceph client with specific addr and configuration. If
583 * found, bump its reference count.
602adf40 584 */
1f7ba331 585static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
586{
587 struct rbd_client *client_node;
1f7ba331 588 bool found = false;
602adf40 589
43ae4701 590 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
591 return NULL;
592
1f7ba331
AE
593 spin_lock(&rbd_client_list_lock);
594 list_for_each_entry(client_node, &rbd_client_list, node) {
595 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
596 __rbd_get_client(client_node);
597
1f7ba331
AE
598 found = true;
599 break;
600 }
601 }
602 spin_unlock(&rbd_client_list_lock);
603
604 return found ? client_node : NULL;
602adf40
YS
605}
606
59c2be1e
YS
607/*
608 * mount options
609 */
610enum {
59c2be1e
YS
611 Opt_last_int,
612 /* int args above */
613 Opt_last_string,
614 /* string args above */
cc0538b6
AE
615 Opt_read_only,
616 Opt_read_write,
617 /* Boolean args above */
618 Opt_last_bool,
59c2be1e
YS
619};
620
43ae4701 621static match_table_t rbd_opts_tokens = {
59c2be1e
YS
622 /* int args above */
623 /* string args above */
be466c1c 624 {Opt_read_only, "read_only"},
cc0538b6
AE
625 {Opt_read_only, "ro"}, /* Alternate spelling */
626 {Opt_read_write, "read_write"},
627 {Opt_read_write, "rw"}, /* Alternate spelling */
628 /* Boolean args above */
59c2be1e
YS
629 {-1, NULL}
630};
631
98571b5a
AE
632struct rbd_options {
633 bool read_only;
634};
635
636#define RBD_READ_ONLY_DEFAULT false
637
59c2be1e
YS
638static int parse_rbd_opts_token(char *c, void *private)
639{
43ae4701 640 struct rbd_options *rbd_opts = private;
59c2be1e
YS
641 substring_t argstr[MAX_OPT_ARGS];
642 int token, intval, ret;
643
43ae4701 644 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
645 if (token < 0)
646 return -EINVAL;
647
648 if (token < Opt_last_int) {
649 ret = match_int(&argstr[0], &intval);
650 if (ret < 0) {
651 pr_err("bad mount option arg (not int) "
652 "at '%s'\n", c);
653 return ret;
654 }
655 dout("got int token %d val %d\n", token, intval);
656 } else if (token > Opt_last_int && token < Opt_last_string) {
657 dout("got string token %d val %s\n", token,
658 argstr[0].from);
cc0538b6
AE
659 } else if (token > Opt_last_string && token < Opt_last_bool) {
660 dout("got Boolean token %d\n", token);
59c2be1e
YS
661 } else {
662 dout("got token %d\n", token);
663 }
664
665 switch (token) {
cc0538b6
AE
666 case Opt_read_only:
667 rbd_opts->read_only = true;
668 break;
669 case Opt_read_write:
670 rbd_opts->read_only = false;
671 break;
59c2be1e 672 default:
aafb230e
AE
673 rbd_assert(false);
674 break;
59c2be1e
YS
675 }
676 return 0;
677}
678
602adf40
YS
679/*
680 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
681 * not exist create it. Either way, ceph_opts is consumed by this
682 * function.
602adf40 683 */
9d3997fd 684static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 685{
f8c38929 686 struct rbd_client *rbdc;
59c2be1e 687
1f7ba331 688 rbdc = rbd_client_find(ceph_opts);
9d3997fd 689 if (rbdc) /* using an existing client */
43ae4701 690 ceph_destroy_options(ceph_opts);
9d3997fd 691 else
f8c38929 692 rbdc = rbd_client_create(ceph_opts);
602adf40 693
9d3997fd 694 return rbdc;
602adf40
YS
695}
696
697/*
698 * Destroy ceph client
d23a4b3f 699 *
432b8587 700 * Caller must hold rbd_client_list_lock.
602adf40
YS
701 */
702static void rbd_client_release(struct kref *kref)
703{
704 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
705
37206ee5 706 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 707 spin_lock(&rbd_client_list_lock);
602adf40 708 list_del(&rbdc->node);
cd9d9f5d 709 spin_unlock(&rbd_client_list_lock);
602adf40
YS
710
711 ceph_destroy_client(rbdc->client);
712 kfree(rbdc);
713}
714
715/*
716 * Drop reference to ceph client node. If it's not referenced anymore, release
717 * it.
718 */
9d3997fd 719static void rbd_put_client(struct rbd_client *rbdc)
602adf40 720{
c53d5893
AE
721 if (rbdc)
722 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
723}
724
a30b71b9
AE
725static bool rbd_image_format_valid(u32 image_format)
726{
727 return image_format == 1 || image_format == 2;
728}
729
8e94af8e
AE
730static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
731{
103a150f
AE
732 size_t size;
733 u32 snap_count;
734
735 /* The header has to start with the magic rbd header text */
736 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
737 return false;
738
db2388b6
AE
739 /* The bio layer requires at least sector-sized I/O */
740
741 if (ondisk->options.order < SECTOR_SHIFT)
742 return false;
743
744 /* If we use u64 in a few spots we may be able to loosen this */
745
746 if (ondisk->options.order > 8 * sizeof (int) - 1)
747 return false;
748
103a150f
AE
749 /*
750 * The size of a snapshot header has to fit in a size_t, and
751 * that limits the number of snapshots.
752 */
753 snap_count = le32_to_cpu(ondisk->snap_count);
754 size = SIZE_MAX - sizeof (struct ceph_snap_context);
755 if (snap_count > size / sizeof (__le64))
756 return false;
757
758 /*
759 * Not only that, but the size of the entire the snapshot
760 * header must also be representable in a size_t.
761 */
762 size -= snap_count * sizeof (__le64);
763 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
764 return false;
765
766 return true;
8e94af8e
AE
767}
768
602adf40 769/*
bb23e37a
AE
770 * Fill an rbd image header with information from the given format 1
771 * on-disk header.
602adf40 772 */
662518b1 773static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 774 struct rbd_image_header_ondisk *ondisk)
602adf40 775{
662518b1 776 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
777 bool first_time = header->object_prefix == NULL;
778 struct ceph_snap_context *snapc;
779 char *object_prefix = NULL;
780 char *snap_names = NULL;
781 u64 *snap_sizes = NULL;
ccece235 782 u32 snap_count;
d2bb24e5 783 size_t size;
bb23e37a 784 int ret = -ENOMEM;
621901d6 785 u32 i;
602adf40 786
bb23e37a 787 /* Allocate this now to avoid having to handle failure below */
6a52325f 788
bb23e37a
AE
789 if (first_time) {
790 size_t len;
103a150f 791
bb23e37a
AE
792 len = strnlen(ondisk->object_prefix,
793 sizeof (ondisk->object_prefix));
794 object_prefix = kmalloc(len + 1, GFP_KERNEL);
795 if (!object_prefix)
796 return -ENOMEM;
797 memcpy(object_prefix, ondisk->object_prefix, len);
798 object_prefix[len] = '\0';
799 }
00f1f36f 800
bb23e37a 801 /* Allocate the snapshot context and fill it in */
00f1f36f 802
bb23e37a
AE
803 snap_count = le32_to_cpu(ondisk->snap_count);
804 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
805 if (!snapc)
806 goto out_err;
807 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 808 if (snap_count) {
bb23e37a 809 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
810 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
811
bb23e37a 812 /* We'll keep a copy of the snapshot names... */
621901d6 813
bb23e37a
AE
814 if (snap_names_len > (u64)SIZE_MAX)
815 goto out_2big;
816 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
817 if (!snap_names)
6a52325f
AE
818 goto out_err;
819
bb23e37a 820 /* ...as well as the array of their sizes. */
621901d6 821
d2bb24e5 822 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
823 snap_sizes = kmalloc(size, GFP_KERNEL);
824 if (!snap_sizes)
6a52325f 825 goto out_err;
bb23e37a 826
f785cc1d 827 /*
bb23e37a
AE
828 * Copy the names, and fill in each snapshot's id
829 * and size.
830 *
99a41ebc 831 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 832 * ondisk buffer we're working with has
f785cc1d
AE
833 * snap_names_len bytes beyond the end of the
834 * snapshot id array, this memcpy() is safe.
835 */
bb23e37a
AE
836 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
837 snaps = ondisk->snaps;
838 for (i = 0; i < snap_count; i++) {
839 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
840 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
841 }
602adf40 842 }
6a52325f 843
bb23e37a 844 /* We won't fail any more, fill in the header */
621901d6 845
662518b1 846 down_write(&rbd_dev->header_rwsem);
bb23e37a
AE
847 if (first_time) {
848 header->object_prefix = object_prefix;
849 header->obj_order = ondisk->options.order;
850 header->crypt_type = ondisk->options.crypt_type;
851 header->comp_type = ondisk->options.comp_type;
852 /* The rest aren't used for format 1 images */
853 header->stripe_unit = 0;
854 header->stripe_count = 0;
855 header->features = 0;
602adf40 856 } else {
662518b1
AE
857 ceph_put_snap_context(header->snapc);
858 kfree(header->snap_names);
859 kfree(header->snap_sizes);
602adf40 860 }
849b4260 861
bb23e37a 862 /* The remaining fields always get updated (when we refresh) */
621901d6 863
f84344f3 864 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
865 header->snapc = snapc;
866 header->snap_names = snap_names;
867 header->snap_sizes = snap_sizes;
468521c1 868
662518b1 869 /* Make sure mapping size is consistent with header info */
602adf40 870
662518b1
AE
871 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
872 if (rbd_dev->mapping.size != header->image_size)
873 rbd_dev->mapping.size = header->image_size;
874
875 up_write(&rbd_dev->header_rwsem);
602adf40 876
602adf40 877 return 0;
bb23e37a
AE
878out_2big:
879 ret = -EIO;
6a52325f 880out_err:
bb23e37a
AE
881 kfree(snap_sizes);
882 kfree(snap_names);
883 ceph_put_snap_context(snapc);
884 kfree(object_prefix);
ccece235 885
bb23e37a 886 return ret;
602adf40
YS
887}
888
9682fc6d
AE
889static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
890{
891 const char *snap_name;
892
893 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
894
895 /* Skip over names until we find the one we are looking for */
896
897 snap_name = rbd_dev->header.snap_names;
898 while (which--)
899 snap_name += strlen(snap_name) + 1;
900
901 return kstrdup(snap_name, GFP_KERNEL);
902}
903
30d1cff8
AE
904/*
905 * Snapshot id comparison function for use with qsort()/bsearch().
906 * Note that result is for snapshots in *descending* order.
907 */
908static int snapid_compare_reverse(const void *s1, const void *s2)
909{
910 u64 snap_id1 = *(u64 *)s1;
911 u64 snap_id2 = *(u64 *)s2;
912
913 if (snap_id1 < snap_id2)
914 return 1;
915 return snap_id1 == snap_id2 ? 0 : -1;
916}
917
918/*
919 * Search a snapshot context to see if the given snapshot id is
920 * present.
921 *
922 * Returns the position of the snapshot id in the array if it's found,
923 * or BAD_SNAP_INDEX otherwise.
924 *
925 * Note: The snapshot array is in kept sorted (by the osd) in
926 * reverse order, highest snapshot id first.
927 */
9682fc6d
AE
928static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
929{
930 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 931 u64 *found;
9682fc6d 932
30d1cff8
AE
933 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
934 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 935
30d1cff8 936 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
937}
938
2ad3d716
AE
939static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
940 u64 snap_id)
9e15b77d 941{
54cac61f 942 u32 which;
08518f6f 943 const char *snap_name;
9e15b77d 944
54cac61f
AE
945 which = rbd_dev_snap_index(rbd_dev, snap_id);
946 if (which == BAD_SNAP_INDEX)
08518f6f 947 return ERR_PTR(-ENOENT);
54cac61f 948
08518f6f
JD
949 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
950 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
951}
952
953static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
954{
9e15b77d
AE
955 if (snap_id == CEPH_NOSNAP)
956 return RBD_SNAP_HEAD_NAME;
957
54cac61f
AE
958 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
959 if (rbd_dev->image_format == 1)
960 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 961
54cac61f 962 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
963}
964
2ad3d716
AE
965static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
966 u64 *snap_size)
602adf40 967{
2ad3d716
AE
968 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
969 if (snap_id == CEPH_NOSNAP) {
970 *snap_size = rbd_dev->header.image_size;
971 } else if (rbd_dev->image_format == 1) {
972 u32 which;
602adf40 973
2ad3d716
AE
974 which = rbd_dev_snap_index(rbd_dev, snap_id);
975 if (which == BAD_SNAP_INDEX)
976 return -ENOENT;
e86924a8 977
2ad3d716
AE
978 *snap_size = rbd_dev->header.snap_sizes[which];
979 } else {
980 u64 size = 0;
981 int ret;
982
983 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
984 if (ret)
985 return ret;
986
987 *snap_size = size;
988 }
989 return 0;
602adf40
YS
990}
991
2ad3d716
AE
992static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
993 u64 *snap_features)
602adf40 994{
2ad3d716
AE
995 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
996 if (snap_id == CEPH_NOSNAP) {
997 *snap_features = rbd_dev->header.features;
998 } else if (rbd_dev->image_format == 1) {
999 *snap_features = 0; /* No features for format 1 */
602adf40 1000 } else {
2ad3d716
AE
1001 u64 features = 0;
1002 int ret;
8b0241f8 1003
2ad3d716
AE
1004 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1005 if (ret)
1006 return ret;
1007
1008 *snap_features = features;
1009 }
1010 return 0;
1011}
1012
1013static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1014{
8f4b7d98 1015 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1016 u64 size = 0;
1017 u64 features = 0;
1018 int ret;
1019
2ad3d716
AE
1020 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1021 if (ret)
1022 return ret;
1023 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1024 if (ret)
1025 return ret;
1026
1027 rbd_dev->mapping.size = size;
1028 rbd_dev->mapping.features = features;
1029
8b0241f8 1030 return 0;
602adf40
YS
1031}
1032
d1cf5788
AE
1033static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1034{
1035 rbd_dev->mapping.size = 0;
1036 rbd_dev->mapping.features = 0;
200a6a8b
AE
1037}
1038
98571b5a 1039static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1040{
65ccfe21
AE
1041 char *name;
1042 u64 segment;
1043 int ret;
3a96d5cd 1044 char *name_format;
602adf40 1045
78c2a44a 1046 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1047 if (!name)
1048 return NULL;
1049 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1050 name_format = "%s.%012llx";
1051 if (rbd_dev->image_format == 2)
1052 name_format = "%s.%016llx";
1053 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
65ccfe21 1054 rbd_dev->header.object_prefix, segment);
2fd82b9e 1055 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1056 pr_err("error formatting segment name for #%llu (%d)\n",
1057 segment, ret);
1058 kfree(name);
1059 name = NULL;
1060 }
602adf40 1061
65ccfe21
AE
1062 return name;
1063}
602adf40 1064
78c2a44a
AE
1065static void rbd_segment_name_free(const char *name)
1066{
1067 /* The explicit cast here is needed to drop the const qualifier */
1068
1069 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1070}
1071
65ccfe21
AE
1072static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1073{
1074 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1075
65ccfe21
AE
1076 return offset & (segment_size - 1);
1077}
1078
1079static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1080 u64 offset, u64 length)
1081{
1082 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1083
1084 offset &= segment_size - 1;
1085
aafb230e 1086 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1087 if (offset + length > segment_size)
1088 length = segment_size - offset;
1089
1090 return length;
602adf40
YS
1091}
1092
029bcbd8
JD
1093/*
1094 * returns the size of an object in the image
1095 */
1096static u64 rbd_obj_bytes(struct rbd_image_header *header)
1097{
1098 return 1 << header->obj_order;
1099}
1100
602adf40
YS
1101/*
1102 * bio helpers
1103 */
1104
1105static void bio_chain_put(struct bio *chain)
1106{
1107 struct bio *tmp;
1108
1109 while (chain) {
1110 tmp = chain;
1111 chain = chain->bi_next;
1112 bio_put(tmp);
1113 }
1114}
1115
1116/*
1117 * zeros a bio chain, starting at specific offset
1118 */
1119static void zero_bio_chain(struct bio *chain, int start_ofs)
1120{
1121 struct bio_vec *bv;
1122 unsigned long flags;
1123 void *buf;
1124 int i;
1125 int pos = 0;
1126
1127 while (chain) {
1128 bio_for_each_segment(bv, chain, i) {
1129 if (pos + bv->bv_len > start_ofs) {
1130 int remainder = max(start_ofs - pos, 0);
1131 buf = bvec_kmap_irq(bv, &flags);
1132 memset(buf + remainder, 0,
1133 bv->bv_len - remainder);
350505e7 1134 flush_dcache_page(bv->bv_page);
85b5aaa6 1135 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1136 }
1137 pos += bv->bv_len;
1138 }
1139
1140 chain = chain->bi_next;
1141 }
1142}
1143
b9434c5b
AE
1144/*
1145 * similar to zero_bio_chain(), zeros data defined by a page array,
1146 * starting at the given byte offset from the start of the array and
1147 * continuing up to the given end offset. The pages array is
1148 * assumed to be big enough to hold all bytes up to the end.
1149 */
1150static void zero_pages(struct page **pages, u64 offset, u64 end)
1151{
1152 struct page **page = &pages[offset >> PAGE_SHIFT];
1153
1154 rbd_assert(end > offset);
1155 rbd_assert(end - offset <= (u64)SIZE_MAX);
1156 while (offset < end) {
1157 size_t page_offset;
1158 size_t length;
1159 unsigned long flags;
1160 void *kaddr;
1161
1162 page_offset = (size_t)(offset & ~PAGE_MASK);
1163 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1164 local_irq_save(flags);
1165 kaddr = kmap_atomic(*page);
1166 memset(kaddr + page_offset, 0, length);
350505e7 1167 flush_dcache_page(*page);
b9434c5b
AE
1168 kunmap_atomic(kaddr);
1169 local_irq_restore(flags);
1170
1171 offset += length;
1172 page++;
1173 }
1174}
1175
602adf40 1176/*
f7760dad
AE
1177 * Clone a portion of a bio, starting at the given byte offset
1178 * and continuing for the number of bytes indicated.
602adf40 1179 */
f7760dad
AE
1180static struct bio *bio_clone_range(struct bio *bio_src,
1181 unsigned int offset,
1182 unsigned int len,
1183 gfp_t gfpmask)
602adf40 1184{
f7760dad
AE
1185 struct bio_vec *bv;
1186 unsigned int resid;
1187 unsigned short idx;
1188 unsigned int voff;
1189 unsigned short end_idx;
1190 unsigned short vcnt;
1191 struct bio *bio;
1192
1193 /* Handle the easy case for the caller */
1194
1195 if (!offset && len == bio_src->bi_size)
1196 return bio_clone(bio_src, gfpmask);
1197
1198 if (WARN_ON_ONCE(!len))
1199 return NULL;
1200 if (WARN_ON_ONCE(len > bio_src->bi_size))
1201 return NULL;
1202 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1203 return NULL;
1204
1205 /* Find first affected segment... */
1206
1207 resid = offset;
d74c6d51 1208 bio_for_each_segment(bv, bio_src, idx) {
f7760dad
AE
1209 if (resid < bv->bv_len)
1210 break;
1211 resid -= bv->bv_len;
602adf40 1212 }
f7760dad 1213 voff = resid;
602adf40 1214
f7760dad 1215 /* ...and the last affected segment */
602adf40 1216
f7760dad
AE
1217 resid += len;
1218 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1219 if (resid <= bv->bv_len)
1220 break;
1221 resid -= bv->bv_len;
1222 }
1223 vcnt = end_idx - idx + 1;
1224
1225 /* Build the clone */
1226
1227 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1228 if (!bio)
1229 return NULL; /* ENOMEM */
602adf40 1230
f7760dad
AE
1231 bio->bi_bdev = bio_src->bi_bdev;
1232 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1233 bio->bi_rw = bio_src->bi_rw;
1234 bio->bi_flags |= 1 << BIO_CLONED;
1235
1236 /*
1237 * Copy over our part of the bio_vec, then update the first
1238 * and last (or only) entries.
1239 */
1240 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1241 vcnt * sizeof (struct bio_vec));
1242 bio->bi_io_vec[0].bv_offset += voff;
1243 if (vcnt > 1) {
1244 bio->bi_io_vec[0].bv_len -= voff;
1245 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1246 } else {
1247 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1248 }
1249
f7760dad
AE
1250 bio->bi_vcnt = vcnt;
1251 bio->bi_size = len;
1252 bio->bi_idx = 0;
1253
1254 return bio;
1255}
1256
1257/*
1258 * Clone a portion of a bio chain, starting at the given byte offset
1259 * into the first bio in the source chain and continuing for the
1260 * number of bytes indicated. The result is another bio chain of
1261 * exactly the given length, or a null pointer on error.
1262 *
1263 * The bio_src and offset parameters are both in-out. On entry they
1264 * refer to the first source bio and the offset into that bio where
1265 * the start of data to be cloned is located.
1266 *
1267 * On return, bio_src is updated to refer to the bio in the source
1268 * chain that contains first un-cloned byte, and *offset will
1269 * contain the offset of that byte within that bio.
1270 */
1271static struct bio *bio_chain_clone_range(struct bio **bio_src,
1272 unsigned int *offset,
1273 unsigned int len,
1274 gfp_t gfpmask)
1275{
1276 struct bio *bi = *bio_src;
1277 unsigned int off = *offset;
1278 struct bio *chain = NULL;
1279 struct bio **end;
1280
1281 /* Build up a chain of clone bios up to the limit */
1282
1283 if (!bi || off >= bi->bi_size || !len)
1284 return NULL; /* Nothing to clone */
602adf40 1285
f7760dad
AE
1286 end = &chain;
1287 while (len) {
1288 unsigned int bi_size;
1289 struct bio *bio;
1290
f5400b7a
AE
1291 if (!bi) {
1292 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1293 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1294 }
f7760dad
AE
1295 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1296 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1297 if (!bio)
1298 goto out_err; /* ENOMEM */
1299
1300 *end = bio;
1301 end = &bio->bi_next;
602adf40 1302
f7760dad
AE
1303 off += bi_size;
1304 if (off == bi->bi_size) {
1305 bi = bi->bi_next;
1306 off = 0;
1307 }
1308 len -= bi_size;
1309 }
1310 *bio_src = bi;
1311 *offset = off;
1312
1313 return chain;
1314out_err:
1315 bio_chain_put(chain);
602adf40 1316
602adf40
YS
1317 return NULL;
1318}
1319
926f9b3f
AE
1320/*
1321 * The default/initial value for all object request flags is 0. For
1322 * each flag, once its value is set to 1 it is never reset to 0
1323 * again.
1324 */
57acbaa7 1325static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1326{
57acbaa7 1327 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1328 struct rbd_device *rbd_dev;
1329
57acbaa7
AE
1330 rbd_dev = obj_request->img_request->rbd_dev;
1331 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1332 obj_request);
1333 }
1334}
1335
57acbaa7 1336static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1337{
1338 smp_mb();
57acbaa7 1339 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1340}
1341
57acbaa7 1342static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1343{
57acbaa7
AE
1344 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1345 struct rbd_device *rbd_dev = NULL;
6365d33a 1346
57acbaa7
AE
1347 if (obj_request_img_data_test(obj_request))
1348 rbd_dev = obj_request->img_request->rbd_dev;
1349 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1350 obj_request);
1351 }
1352}
1353
57acbaa7 1354static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1355{
1356 smp_mb();
57acbaa7 1357 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1358}
1359
5679c59f
AE
1360/*
1361 * This sets the KNOWN flag after (possibly) setting the EXISTS
1362 * flag. The latter is set based on the "exists" value provided.
1363 *
1364 * Note that for our purposes once an object exists it never goes
1365 * away again. It's possible that the response from two existence
1366 * checks are separated by the creation of the target object, and
1367 * the first ("doesn't exist") response arrives *after* the second
1368 * ("does exist"). In that case we ignore the second one.
1369 */
1370static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1371 bool exists)
1372{
1373 if (exists)
1374 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1375 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1376 smp_mb();
1377}
1378
1379static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1380{
1381 smp_mb();
1382 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1383}
1384
1385static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1386{
1387 smp_mb();
1388 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1389}
1390
7029f064
ID
1391static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1392{
1393 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1394
1395 return obj_request->img_offset <
1396 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1397}
1398
bf0d5f50
AE
1399static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1400{
37206ee5
AE
1401 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1402 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1403 kref_get(&obj_request->kref);
1404}
1405
1406static void rbd_obj_request_destroy(struct kref *kref);
1407static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1408{
1409 rbd_assert(obj_request != NULL);
37206ee5
AE
1410 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1411 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1412 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1413}
1414
dabf1f3b
AE
1415static void rbd_img_request_get(struct rbd_img_request *img_request)
1416{
1417 dout("%s: img %p (was %d)\n", __func__, img_request,
1418 atomic_read(&img_request->kref.refcount));
1419 kref_get(&img_request->kref);
1420}
1421
e93f3152
AE
1422static bool img_request_child_test(struct rbd_img_request *img_request);
1423static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1424static void rbd_img_request_destroy(struct kref *kref);
1425static void rbd_img_request_put(struct rbd_img_request *img_request)
1426{
1427 rbd_assert(img_request != NULL);
37206ee5
AE
1428 dout("%s: img %p (was %d)\n", __func__, img_request,
1429 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1430 if (img_request_child_test(img_request))
1431 kref_put(&img_request->kref, rbd_parent_request_destroy);
1432 else
1433 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1434}
1435
1436static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1437 struct rbd_obj_request *obj_request)
1438{
25dcf954
AE
1439 rbd_assert(obj_request->img_request == NULL);
1440
b155e86c 1441 /* Image request now owns object's original reference */
bf0d5f50 1442 obj_request->img_request = img_request;
25dcf954 1443 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1444 rbd_assert(!obj_request_img_data_test(obj_request));
1445 obj_request_img_data_set(obj_request);
bf0d5f50 1446 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1447 img_request->obj_request_count++;
1448 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1449 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1450 obj_request->which);
bf0d5f50
AE
1451}
1452
1453static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1454 struct rbd_obj_request *obj_request)
1455{
1456 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1457
37206ee5
AE
1458 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1459 obj_request->which);
bf0d5f50 1460 list_del(&obj_request->links);
25dcf954
AE
1461 rbd_assert(img_request->obj_request_count > 0);
1462 img_request->obj_request_count--;
1463 rbd_assert(obj_request->which == img_request->obj_request_count);
1464 obj_request->which = BAD_WHICH;
6365d33a 1465 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1466 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1467 obj_request->img_request = NULL;
25dcf954 1468 obj_request->callback = NULL;
bf0d5f50
AE
1469 rbd_obj_request_put(obj_request);
1470}
1471
1472static bool obj_request_type_valid(enum obj_request_type type)
1473{
1474 switch (type) {
9969ebc5 1475 case OBJ_REQUEST_NODATA:
bf0d5f50 1476 case OBJ_REQUEST_BIO:
788e2df3 1477 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1478 return true;
1479 default:
1480 return false;
1481 }
1482}
1483
bf0d5f50
AE
1484static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1485 struct rbd_obj_request *obj_request)
1486{
37206ee5
AE
1487 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1488
bf0d5f50
AE
1489 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1490}
1491
1492static void rbd_img_request_complete(struct rbd_img_request *img_request)
1493{
55f27e09 1494
37206ee5 1495 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1496
1497 /*
1498 * If no error occurred, compute the aggregate transfer
1499 * count for the image request. We could instead use
1500 * atomic64_cmpxchg() to update it as each object request
1501 * completes; not clear which way is better off hand.
1502 */
1503 if (!img_request->result) {
1504 struct rbd_obj_request *obj_request;
1505 u64 xferred = 0;
1506
1507 for_each_obj_request(img_request, obj_request)
1508 xferred += obj_request->xferred;
1509 img_request->xferred = xferred;
1510 }
1511
bf0d5f50
AE
1512 if (img_request->callback)
1513 img_request->callback(img_request);
1514 else
1515 rbd_img_request_put(img_request);
1516}
1517
788e2df3
AE
1518/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1519
1520static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1521{
37206ee5
AE
1522 dout("%s: obj %p\n", __func__, obj_request);
1523
788e2df3
AE
1524 return wait_for_completion_interruptible(&obj_request->completion);
1525}
1526
0c425248
AE
1527/*
1528 * The default/initial value for all image request flags is 0. Each
1529 * is conditionally set to 1 at image request initialization time
1530 * and currently never change thereafter.
1531 */
1532static void img_request_write_set(struct rbd_img_request *img_request)
1533{
1534 set_bit(IMG_REQ_WRITE, &img_request->flags);
1535 smp_mb();
1536}
1537
1538static bool img_request_write_test(struct rbd_img_request *img_request)
1539{
1540 smp_mb();
1541 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1542}
1543
9849e986
AE
1544static void img_request_child_set(struct rbd_img_request *img_request)
1545{
1546 set_bit(IMG_REQ_CHILD, &img_request->flags);
1547 smp_mb();
1548}
1549
e93f3152
AE
1550static void img_request_child_clear(struct rbd_img_request *img_request)
1551{
1552 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1553 smp_mb();
1554}
1555
9849e986
AE
1556static bool img_request_child_test(struct rbd_img_request *img_request)
1557{
1558 smp_mb();
1559 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1560}
1561
d0b2e944
AE
1562static void img_request_layered_set(struct rbd_img_request *img_request)
1563{
1564 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1565 smp_mb();
1566}
1567
a2acd00e
AE
1568static void img_request_layered_clear(struct rbd_img_request *img_request)
1569{
1570 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1571 smp_mb();
1572}
1573
d0b2e944
AE
1574static bool img_request_layered_test(struct rbd_img_request *img_request)
1575{
1576 smp_mb();
1577 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1578}
1579
6e2a4505
AE
1580static void
1581rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1582{
b9434c5b
AE
1583 u64 xferred = obj_request->xferred;
1584 u64 length = obj_request->length;
1585
6e2a4505
AE
1586 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1587 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1588 xferred, length);
6e2a4505 1589 /*
be4c4b85
JD
1590 * ENOENT means a hole in the image. We zero-fill the entire
1591 * length of the request. A short read also implies zero-fill
1592 * to the end of the request. An error requires the whole
1593 * length of the request to be reported finished with an error
1594 * to the block layer. In each case we update the xferred
1595 * count to indicate the whole request was satisfied.
6e2a4505 1596 */
b9434c5b 1597 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1598 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1599 if (obj_request->type == OBJ_REQUEST_BIO)
1600 zero_bio_chain(obj_request->bio_list, 0);
1601 else
1602 zero_pages(obj_request->pages, 0, length);
6e2a4505 1603 obj_request->result = 0;
b9434c5b
AE
1604 } else if (xferred < length && !obj_request->result) {
1605 if (obj_request->type == OBJ_REQUEST_BIO)
1606 zero_bio_chain(obj_request->bio_list, xferred);
1607 else
1608 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1609 }
be4c4b85 1610 obj_request->xferred = length;
6e2a4505
AE
1611 obj_request_done_set(obj_request);
1612}
1613
bf0d5f50
AE
1614static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1615{
37206ee5
AE
1616 dout("%s: obj %p cb %p\n", __func__, obj_request,
1617 obj_request->callback);
bf0d5f50
AE
1618 if (obj_request->callback)
1619 obj_request->callback(obj_request);
788e2df3
AE
1620 else
1621 complete_all(&obj_request->completion);
bf0d5f50
AE
1622}
1623
c47f9371 1624static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1625{
1626 dout("%s: obj %p\n", __func__, obj_request);
1627 obj_request_done_set(obj_request);
1628}
1629
c47f9371 1630static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1631{
57acbaa7 1632 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1633 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1634 bool layered = false;
1635
1636 if (obj_request_img_data_test(obj_request)) {
1637 img_request = obj_request->img_request;
1638 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1639 rbd_dev = img_request->rbd_dev;
57acbaa7 1640 }
8b3e1a56
AE
1641
1642 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1643 obj_request, img_request, obj_request->result,
1644 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1645 if (layered && obj_request->result == -ENOENT &&
1646 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1647 rbd_img_parent_read(obj_request);
1648 else if (img_request)
6e2a4505
AE
1649 rbd_img_obj_request_read_callback(obj_request);
1650 else
1651 obj_request_done_set(obj_request);
bf0d5f50
AE
1652}
1653
c47f9371 1654static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1655{
1b83bef2
SW
1656 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1657 obj_request->result, obj_request->length);
1658 /*
8b3e1a56
AE
1659 * There is no such thing as a successful short write. Set
1660 * it to our originally-requested length.
1b83bef2
SW
1661 */
1662 obj_request->xferred = obj_request->length;
07741308 1663 obj_request_done_set(obj_request);
bf0d5f50
AE
1664}
1665
fbfab539
AE
1666/*
1667 * For a simple stat call there's nothing to do. We'll do more if
1668 * this is part of a write sequence for a layered image.
1669 */
c47f9371 1670static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1671{
37206ee5 1672 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1673 obj_request_done_set(obj_request);
1674}
1675
dff252b8
ID
1676static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1677{
1678 dout("%s: obj %p\n", __func__, obj_request);
1679
1680 if (obj_request_img_data_test(obj_request))
1681 rbd_osd_copyup_callback(obj_request);
1682 else
1683 obj_request_done_set(obj_request);
1684}
1685
bf0d5f50
AE
1686static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1687 struct ceph_msg *msg)
1688{
1689 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1690 u16 opcode;
1691
37206ee5 1692 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1693 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1694 if (obj_request_img_data_test(obj_request)) {
1695 rbd_assert(obj_request->img_request);
1696 rbd_assert(obj_request->which != BAD_WHICH);
1697 } else {
1698 rbd_assert(obj_request->which == BAD_WHICH);
1699 }
bf0d5f50 1700
1b83bef2
SW
1701 if (osd_req->r_result < 0)
1702 obj_request->result = osd_req->r_result;
bf0d5f50 1703
0eefd470 1704 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1705
c47f9371
AE
1706 /*
1707 * We support a 64-bit length, but ultimately it has to be
1708 * passed to blk_end_request(), which takes an unsigned int.
1709 */
1b83bef2 1710 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1711 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1712 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1713 switch (opcode) {
1714 case CEPH_OSD_OP_READ:
c47f9371 1715 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1716 break;
1717 case CEPH_OSD_OP_WRITE:
c47f9371 1718 rbd_osd_write_callback(obj_request);
bf0d5f50 1719 break;
fbfab539 1720 case CEPH_OSD_OP_STAT:
c47f9371 1721 rbd_osd_stat_callback(obj_request);
fbfab539 1722 break;
36be9a76 1723 case CEPH_OSD_OP_CALL:
dff252b8
ID
1724 rbd_osd_call_callback(obj_request);
1725 break;
b8d70035 1726 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1727 case CEPH_OSD_OP_WATCH:
c47f9371 1728 rbd_osd_trivial_callback(obj_request);
9969ebc5 1729 break;
bf0d5f50
AE
1730 default:
1731 rbd_warn(NULL, "%s: unsupported op %hu\n",
1732 obj_request->object_name, (unsigned short) opcode);
1733 break;
1734 }
1735
07741308 1736 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1737 rbd_obj_request_complete(obj_request);
1738}
1739
9d4df01f 1740static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1741{
1742 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1743 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1744 u64 snap_id;
430c28c3 1745
8c042b0d 1746 rbd_assert(osd_req != NULL);
430c28c3 1747
9d4df01f 1748 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1749 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1750 NULL, snap_id, NULL);
1751}
1752
1753static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1754{
1755 struct rbd_img_request *img_request = obj_request->img_request;
1756 struct ceph_osd_request *osd_req = obj_request->osd_req;
1757 struct ceph_snap_context *snapc;
1758 struct timespec mtime = CURRENT_TIME;
1759
1760 rbd_assert(osd_req != NULL);
1761
1762 snapc = img_request ? img_request->snapc : NULL;
1763 ceph_osdc_build_request(osd_req, obj_request->offset,
1764 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1765}
1766
bf0d5f50
AE
1767static struct ceph_osd_request *rbd_osd_req_create(
1768 struct rbd_device *rbd_dev,
1769 bool write_request,
430c28c3 1770 struct rbd_obj_request *obj_request)
bf0d5f50 1771{
bf0d5f50
AE
1772 struct ceph_snap_context *snapc = NULL;
1773 struct ceph_osd_client *osdc;
1774 struct ceph_osd_request *osd_req;
bf0d5f50 1775
6365d33a
AE
1776 if (obj_request_img_data_test(obj_request)) {
1777 struct rbd_img_request *img_request = obj_request->img_request;
1778
0c425248
AE
1779 rbd_assert(write_request ==
1780 img_request_write_test(img_request));
1781 if (write_request)
bf0d5f50 1782 snapc = img_request->snapc;
bf0d5f50
AE
1783 }
1784
1785 /* Allocate and initialize the request, for the single op */
1786
1787 osdc = &rbd_dev->rbd_client->client->osdc;
1788 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1789 if (!osd_req)
1790 return NULL; /* ENOMEM */
bf0d5f50 1791
430c28c3 1792 if (write_request)
bf0d5f50 1793 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1794 else
bf0d5f50 1795 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1796
1797 osd_req->r_callback = rbd_osd_req_callback;
1798 osd_req->r_priv = obj_request;
1799
1800 osd_req->r_oid_len = strlen(obj_request->object_name);
1801 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1802 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1803
1804 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1805
bf0d5f50
AE
1806 return osd_req;
1807}
1808
0eefd470
AE
1809/*
1810 * Create a copyup osd request based on the information in the
1811 * object request supplied. A copyup request has two osd ops,
1812 * a copyup method call, and a "normal" write request.
1813 */
1814static struct ceph_osd_request *
1815rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1816{
1817 struct rbd_img_request *img_request;
1818 struct ceph_snap_context *snapc;
1819 struct rbd_device *rbd_dev;
1820 struct ceph_osd_client *osdc;
1821 struct ceph_osd_request *osd_req;
1822
1823 rbd_assert(obj_request_img_data_test(obj_request));
1824 img_request = obj_request->img_request;
1825 rbd_assert(img_request);
1826 rbd_assert(img_request_write_test(img_request));
1827
1828 /* Allocate and initialize the request, for the two ops */
1829
1830 snapc = img_request->snapc;
1831 rbd_dev = img_request->rbd_dev;
1832 osdc = &rbd_dev->rbd_client->client->osdc;
1833 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1834 if (!osd_req)
1835 return NULL; /* ENOMEM */
1836
1837 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1838 osd_req->r_callback = rbd_osd_req_callback;
1839 osd_req->r_priv = obj_request;
1840
1841 osd_req->r_oid_len = strlen(obj_request->object_name);
1842 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1843 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1844
1845 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1846
1847 return osd_req;
1848}
1849
1850
bf0d5f50
AE
1851static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1852{
1853 ceph_osdc_put_request(osd_req);
1854}
1855
1856/* object_name is assumed to be a non-null pointer and NUL-terminated */
1857
1858static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1859 u64 offset, u64 length,
1860 enum obj_request_type type)
1861{
1862 struct rbd_obj_request *obj_request;
1863 size_t size;
1864 char *name;
1865
1866 rbd_assert(obj_request_type_valid(type));
1867
1868 size = strlen(object_name) + 1;
d8c97a8d 1869 name = kmalloc(size, GFP_NOIO);
f907ad55 1870 if (!name)
bf0d5f50
AE
1871 return NULL;
1872
d8c97a8d 1873 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
f907ad55
AE
1874 if (!obj_request) {
1875 kfree(name);
1876 return NULL;
1877 }
1878
bf0d5f50
AE
1879 obj_request->object_name = memcpy(name, object_name, size);
1880 obj_request->offset = offset;
1881 obj_request->length = length;
926f9b3f 1882 obj_request->flags = 0;
bf0d5f50
AE
1883 obj_request->which = BAD_WHICH;
1884 obj_request->type = type;
1885 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1886 init_completion(&obj_request->completion);
bf0d5f50
AE
1887 kref_init(&obj_request->kref);
1888
37206ee5
AE
1889 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1890 offset, length, (int)type, obj_request);
1891
bf0d5f50
AE
1892 return obj_request;
1893}
1894
1895static void rbd_obj_request_destroy(struct kref *kref)
1896{
1897 struct rbd_obj_request *obj_request;
1898
1899 obj_request = container_of(kref, struct rbd_obj_request, kref);
1900
37206ee5
AE
1901 dout("%s: obj %p\n", __func__, obj_request);
1902
bf0d5f50
AE
1903 rbd_assert(obj_request->img_request == NULL);
1904 rbd_assert(obj_request->which == BAD_WHICH);
1905
1906 if (obj_request->osd_req)
1907 rbd_osd_req_destroy(obj_request->osd_req);
1908
1909 rbd_assert(obj_request_type_valid(obj_request->type));
1910 switch (obj_request->type) {
9969ebc5
AE
1911 case OBJ_REQUEST_NODATA:
1912 break; /* Nothing to do */
bf0d5f50
AE
1913 case OBJ_REQUEST_BIO:
1914 if (obj_request->bio_list)
1915 bio_chain_put(obj_request->bio_list);
1916 break;
788e2df3
AE
1917 case OBJ_REQUEST_PAGES:
1918 if (obj_request->pages)
1919 ceph_release_page_vector(obj_request->pages,
1920 obj_request->page_count);
1921 break;
bf0d5f50
AE
1922 }
1923
f907ad55 1924 kfree(obj_request->object_name);
868311b1
AE
1925 obj_request->object_name = NULL;
1926 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1927}
1928
fb65d228
AE
1929/* It's OK to call this for a device with no parent */
1930
1931static void rbd_spec_put(struct rbd_spec *spec);
1932static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1933{
1934 rbd_dev_remove_parent(rbd_dev);
1935 rbd_spec_put(rbd_dev->parent_spec);
1936 rbd_dev->parent_spec = NULL;
1937 rbd_dev->parent_overlap = 0;
1938}
1939
a2acd00e
AE
1940/*
1941 * Parent image reference counting is used to determine when an
1942 * image's parent fields can be safely torn down--after there are no
1943 * more in-flight requests to the parent image. When the last
1944 * reference is dropped, cleaning them up is safe.
1945 */
1946static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1947{
1948 int counter;
1949
1950 if (!rbd_dev->parent_spec)
1951 return;
1952
1953 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1954 if (counter > 0)
1955 return;
1956
1957 /* Last reference; clean up parent data structures */
1958
1959 if (!counter)
1960 rbd_dev_unparent(rbd_dev);
1961 else
1962 rbd_warn(rbd_dev, "parent reference underflow\n");
1963}
1964
1965/*
1966 * If an image has a non-zero parent overlap, get a reference to its
1967 * parent.
1968 *
392a9dad
AE
1969 * We must get the reference before checking for the overlap to
1970 * coordinate properly with zeroing the parent overlap in
1971 * rbd_dev_v2_parent_info() when an image gets flattened. We
1972 * drop it again if there is no overlap.
1973 *
a2acd00e
AE
1974 * Returns true if the rbd device has a parent with a non-zero
1975 * overlap and a reference for it was successfully taken, or
1976 * false otherwise.
1977 */
1978static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1979{
1980 int counter;
1981
1982 if (!rbd_dev->parent_spec)
1983 return false;
1984
1985 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1986 if (counter > 0 && rbd_dev->parent_overlap)
1987 return true;
1988
1989 /* Image was flattened, but parent is not yet torn down */
1990
1991 if (counter < 0)
1992 rbd_warn(rbd_dev, "parent reference overflow\n");
1993
1994 return false;
1995}
1996
bf0d5f50
AE
1997/*
1998 * Caller is responsible for filling in the list of object requests
1999 * that comprises the image request, and the Linux request pointer
2000 * (if there is one).
2001 */
cc344fa1
AE
2002static struct rbd_img_request *rbd_img_request_create(
2003 struct rbd_device *rbd_dev,
bf0d5f50 2004 u64 offset, u64 length,
e93f3152 2005 bool write_request)
bf0d5f50
AE
2006{
2007 struct rbd_img_request *img_request;
bf0d5f50 2008
1c2a9dfe 2009 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
2010 if (!img_request)
2011 return NULL;
2012
2013 if (write_request) {
2014 down_read(&rbd_dev->header_rwsem);
812164f8 2015 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 2016 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
2017 }
2018
2019 img_request->rq = NULL;
2020 img_request->rbd_dev = rbd_dev;
2021 img_request->offset = offset;
2022 img_request->length = length;
0c425248
AE
2023 img_request->flags = 0;
2024 if (write_request) {
2025 img_request_write_set(img_request);
468521c1 2026 img_request->snapc = rbd_dev->header.snapc;
0c425248 2027 } else {
bf0d5f50 2028 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2029 }
a2acd00e 2030 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2031 img_request_layered_set(img_request);
bf0d5f50
AE
2032 spin_lock_init(&img_request->completion_lock);
2033 img_request->next_completion = 0;
2034 img_request->callback = NULL;
a5a337d4 2035 img_request->result = 0;
bf0d5f50
AE
2036 img_request->obj_request_count = 0;
2037 INIT_LIST_HEAD(&img_request->obj_requests);
2038 kref_init(&img_request->kref);
2039
37206ee5
AE
2040 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2041 write_request ? "write" : "read", offset, length,
2042 img_request);
2043
bf0d5f50
AE
2044 return img_request;
2045}
2046
2047static void rbd_img_request_destroy(struct kref *kref)
2048{
2049 struct rbd_img_request *img_request;
2050 struct rbd_obj_request *obj_request;
2051 struct rbd_obj_request *next_obj_request;
2052
2053 img_request = container_of(kref, struct rbd_img_request, kref);
2054
37206ee5
AE
2055 dout("%s: img %p\n", __func__, img_request);
2056
bf0d5f50
AE
2057 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2058 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2059 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2060
a2acd00e
AE
2061 if (img_request_layered_test(img_request)) {
2062 img_request_layered_clear(img_request);
2063 rbd_dev_parent_put(img_request->rbd_dev);
2064 }
2065
0c425248 2066 if (img_request_write_test(img_request))
812164f8 2067 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2068
1c2a9dfe 2069 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2070}
2071
e93f3152
AE
2072static struct rbd_img_request *rbd_parent_request_create(
2073 struct rbd_obj_request *obj_request,
2074 u64 img_offset, u64 length)
2075{
2076 struct rbd_img_request *parent_request;
2077 struct rbd_device *rbd_dev;
2078
2079 rbd_assert(obj_request->img_request);
2080 rbd_dev = obj_request->img_request->rbd_dev;
2081
2082 parent_request = rbd_img_request_create(rbd_dev->parent,
2083 img_offset, length, false);
2084 if (!parent_request)
2085 return NULL;
2086
2087 img_request_child_set(parent_request);
2088 rbd_obj_request_get(obj_request);
2089 parent_request->obj_request = obj_request;
2090
2091 return parent_request;
2092}
2093
2094static void rbd_parent_request_destroy(struct kref *kref)
2095{
2096 struct rbd_img_request *parent_request;
2097 struct rbd_obj_request *orig_request;
2098
2099 parent_request = container_of(kref, struct rbd_img_request, kref);
2100 orig_request = parent_request->obj_request;
2101
2102 parent_request->obj_request = NULL;
2103 rbd_obj_request_put(orig_request);
2104 img_request_child_clear(parent_request);
2105
2106 rbd_img_request_destroy(kref);
2107}
2108
1217857f
AE
2109static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2110{
6365d33a 2111 struct rbd_img_request *img_request;
1217857f
AE
2112 unsigned int xferred;
2113 int result;
8b3e1a56 2114 bool more;
1217857f 2115
6365d33a
AE
2116 rbd_assert(obj_request_img_data_test(obj_request));
2117 img_request = obj_request->img_request;
2118
1217857f
AE
2119 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2120 xferred = (unsigned int)obj_request->xferred;
2121 result = obj_request->result;
2122 if (result) {
2123 struct rbd_device *rbd_dev = img_request->rbd_dev;
2124
2125 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2126 img_request_write_test(img_request) ? "write" : "read",
2127 obj_request->length, obj_request->img_offset,
2128 obj_request->offset);
2129 rbd_warn(rbd_dev, " result %d xferred %x\n",
2130 result, xferred);
2131 if (!img_request->result)
2132 img_request->result = result;
be13cfbd
ID
2133 /*
2134 * Need to end I/O on the entire obj_request worth of
2135 * bytes in case of error.
2136 */
2137 xferred = obj_request->length;
1217857f
AE
2138 }
2139
f1a4739f
AE
2140 /* Image object requests don't own their page array */
2141
2142 if (obj_request->type == OBJ_REQUEST_PAGES) {
2143 obj_request->pages = NULL;
2144 obj_request->page_count = 0;
2145 }
2146
8b3e1a56
AE
2147 if (img_request_child_test(img_request)) {
2148 rbd_assert(img_request->obj_request != NULL);
2149 more = obj_request->which < img_request->obj_request_count - 1;
2150 } else {
2151 rbd_assert(img_request->rq != NULL);
2152 more = blk_end_request(img_request->rq, result, xferred);
2153 }
2154
2155 return more;
1217857f
AE
2156}
2157
2169238d
AE
2158static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2159{
2160 struct rbd_img_request *img_request;
2161 u32 which = obj_request->which;
2162 bool more = true;
2163
6365d33a 2164 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2165 img_request = obj_request->img_request;
2166
2167 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2168 rbd_assert(img_request != NULL);
2169238d
AE
2169 rbd_assert(img_request->obj_request_count > 0);
2170 rbd_assert(which != BAD_WHICH);
2171 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2172
2173 spin_lock_irq(&img_request->completion_lock);
2174 if (which != img_request->next_completion)
2175 goto out;
2176
2177 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2178 rbd_assert(more);
2179 rbd_assert(which < img_request->obj_request_count);
2180
2181 if (!obj_request_done_test(obj_request))
2182 break;
1217857f 2183 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2184 which++;
2185 }
2186
2187 rbd_assert(more ^ (which == img_request->obj_request_count));
2188 img_request->next_completion = which;
2189out:
2190 spin_unlock_irq(&img_request->completion_lock);
dabf1f3b 2191 rbd_img_request_put(img_request);
2169238d
AE
2192
2193 if (!more)
2194 rbd_img_request_complete(img_request);
2195}
2196
f1a4739f
AE
2197/*
2198 * Split up an image request into one or more object requests, each
2199 * to a different object. The "type" parameter indicates whether
2200 * "data_desc" is the pointer to the head of a list of bio
2201 * structures, or the base of a page array. In either case this
2202 * function assumes data_desc describes memory sufficient to hold
2203 * all data described by the image request.
2204 */
2205static int rbd_img_request_fill(struct rbd_img_request *img_request,
2206 enum obj_request_type type,
2207 void *data_desc)
bf0d5f50
AE
2208{
2209 struct rbd_device *rbd_dev = img_request->rbd_dev;
2210 struct rbd_obj_request *obj_request = NULL;
2211 struct rbd_obj_request *next_obj_request;
0c425248 2212 bool write_request = img_request_write_test(img_request);
9a640548 2213 struct bio *bio_list = 0;
f1a4739f 2214 unsigned int bio_offset = 0;
9a640548 2215 struct page **pages = 0;
7da22d29 2216 u64 img_offset;
bf0d5f50
AE
2217 u64 resid;
2218 u16 opcode;
2219
f1a4739f
AE
2220 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2221 (int)type, data_desc);
37206ee5 2222
430c28c3 2223 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2224 img_offset = img_request->offset;
bf0d5f50 2225 resid = img_request->length;
4dda41d3 2226 rbd_assert(resid > 0);
f1a4739f
AE
2227
2228 if (type == OBJ_REQUEST_BIO) {
2229 bio_list = data_desc;
2230 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2231 } else {
2232 rbd_assert(type == OBJ_REQUEST_PAGES);
2233 pages = data_desc;
2234 }
2235
bf0d5f50 2236 while (resid) {
2fa12320 2237 struct ceph_osd_request *osd_req;
bf0d5f50 2238 const char *object_name;
bf0d5f50
AE
2239 u64 offset;
2240 u64 length;
2241
7da22d29 2242 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2243 if (!object_name)
2244 goto out_unwind;
7da22d29
AE
2245 offset = rbd_segment_offset(rbd_dev, img_offset);
2246 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2247 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2248 offset, length, type);
78c2a44a
AE
2249 /* object request has its own copy of the object name */
2250 rbd_segment_name_free(object_name);
bf0d5f50
AE
2251 if (!obj_request)
2252 goto out_unwind;
b33b7132
JD
2253 /*
2254 * set obj_request->img_request before creating the
2255 * osd_request so that it gets the right snapc
2256 */
2257 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2258
f1a4739f
AE
2259 if (type == OBJ_REQUEST_BIO) {
2260 unsigned int clone_size;
2261
2262 rbd_assert(length <= (u64)UINT_MAX);
2263 clone_size = (unsigned int)length;
2264 obj_request->bio_list =
2265 bio_chain_clone_range(&bio_list,
2266 &bio_offset,
2267 clone_size,
2268 GFP_ATOMIC);
2269 if (!obj_request->bio_list)
2270 goto out_partial;
2271 } else {
2272 unsigned int page_count;
2273
2274 obj_request->pages = pages;
2275 page_count = (u32)calc_pages_for(offset, length);
2276 obj_request->page_count = page_count;
2277 if ((offset + length) & ~PAGE_MASK)
2278 page_count--; /* more on last page */
2279 pages += page_count;
2280 }
bf0d5f50 2281
2fa12320
AE
2282 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2283 obj_request);
2284 if (!osd_req)
bf0d5f50 2285 goto out_partial;
2fa12320 2286 obj_request->osd_req = osd_req;
2169238d 2287 obj_request->callback = rbd_img_obj_callback;
dabf1f3b 2288 rbd_img_request_get(img_request);
430c28c3 2289
2fa12320
AE
2290 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2291 0, 0);
f1a4739f
AE
2292 if (type == OBJ_REQUEST_BIO)
2293 osd_req_op_extent_osd_data_bio(osd_req, 0,
2294 obj_request->bio_list, length);
2295 else
2296 osd_req_op_extent_osd_data_pages(osd_req, 0,
2297 obj_request->pages, length,
2298 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2299
2300 if (write_request)
2301 rbd_osd_req_format_write(obj_request);
2302 else
2303 rbd_osd_req_format_read(obj_request);
430c28c3 2304
7da22d29 2305 obj_request->img_offset = img_offset;
bf0d5f50 2306
7da22d29 2307 img_offset += length;
bf0d5f50
AE
2308 resid -= length;
2309 }
2310
2311 return 0;
2312
2313out_partial:
2314 rbd_obj_request_put(obj_request);
2315out_unwind:
2316 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
04168b98 2317 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2318
2319 return -ENOMEM;
2320}
2321
0eefd470 2322static void
dff252b8 2323rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
0eefd470
AE
2324{
2325 struct rbd_img_request *img_request;
2326 struct rbd_device *rbd_dev;
ebda6408 2327 struct page **pages;
0eefd470
AE
2328 u32 page_count;
2329
dff252b8
ID
2330 dout("%s: obj %p\n", __func__, obj_request);
2331
0eefd470
AE
2332 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2333 rbd_assert(obj_request_img_data_test(obj_request));
2334 img_request = obj_request->img_request;
2335 rbd_assert(img_request);
2336
2337 rbd_dev = img_request->rbd_dev;
2338 rbd_assert(rbd_dev);
0eefd470 2339
ebda6408
AE
2340 pages = obj_request->copyup_pages;
2341 rbd_assert(pages != NULL);
0eefd470 2342 obj_request->copyup_pages = NULL;
ebda6408
AE
2343 page_count = obj_request->copyup_page_count;
2344 rbd_assert(page_count);
2345 obj_request->copyup_page_count = 0;
2346 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2347
2348 /*
2349 * We want the transfer count to reflect the size of the
2350 * original write request. There is no such thing as a
2351 * successful short write, so if the request was successful
2352 * we can just set it to the originally-requested length.
2353 */
2354 if (!obj_request->result)
2355 obj_request->xferred = obj_request->length;
2356
dff252b8 2357 obj_request_done_set(obj_request);
0eefd470
AE
2358}
2359
3d7efd18
AE
2360static void
2361rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2362{
2363 struct rbd_obj_request *orig_request;
0eefd470
AE
2364 struct ceph_osd_request *osd_req;
2365 struct ceph_osd_client *osdc;
2366 struct rbd_device *rbd_dev;
3d7efd18 2367 struct page **pages;
ebda6408 2368 u32 page_count;
bbea1c1a 2369 int img_result;
ebda6408 2370 u64 parent_length;
b91f09f1
AE
2371 u64 offset;
2372 u64 length;
3d7efd18
AE
2373
2374 rbd_assert(img_request_child_test(img_request));
2375
2376 /* First get what we need from the image request */
2377
2378 pages = img_request->copyup_pages;
2379 rbd_assert(pages != NULL);
2380 img_request->copyup_pages = NULL;
ebda6408
AE
2381 page_count = img_request->copyup_page_count;
2382 rbd_assert(page_count);
2383 img_request->copyup_page_count = 0;
3d7efd18
AE
2384
2385 orig_request = img_request->obj_request;
2386 rbd_assert(orig_request != NULL);
b91f09f1 2387 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2388 img_result = img_request->result;
ebda6408
AE
2389 parent_length = img_request->length;
2390 rbd_assert(parent_length == img_request->xferred);
91c6febb 2391 rbd_img_request_put(img_request);
3d7efd18 2392
91c6febb
AE
2393 rbd_assert(orig_request->img_request);
2394 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2395 rbd_assert(rbd_dev);
0eefd470 2396
bbea1c1a
AE
2397 /*
2398 * If the overlap has become 0 (most likely because the
2399 * image has been flattened) we need to free the pages
2400 * and re-submit the original write request.
2401 */
2402 if (!rbd_dev->parent_overlap) {
2403 struct ceph_osd_client *osdc;
3d7efd18 2404
bbea1c1a
AE
2405 ceph_release_page_vector(pages, page_count);
2406 osdc = &rbd_dev->rbd_client->client->osdc;
2407 img_result = rbd_obj_request_submit(osdc, orig_request);
2408 if (!img_result)
2409 return;
2410 }
0eefd470 2411
bbea1c1a 2412 if (img_result)
0eefd470 2413 goto out_err;
0eefd470 2414
8785b1d4
AE
2415 /*
2416 * The original osd request is of no use to use any more.
2417 * We need a new one that can hold the two ops in a copyup
2418 * request. Allocate the new copyup osd request for the
2419 * original request, and release the old one.
2420 */
bbea1c1a 2421 img_result = -ENOMEM;
0eefd470
AE
2422 osd_req = rbd_osd_req_create_copyup(orig_request);
2423 if (!osd_req)
2424 goto out_err;
8785b1d4 2425 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2426 orig_request->osd_req = osd_req;
2427 orig_request->copyup_pages = pages;
ebda6408 2428 orig_request->copyup_page_count = page_count;
3d7efd18 2429
0eefd470 2430 /* Initialize the copyup op */
3d7efd18 2431
0eefd470 2432 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2433 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2434 false, false);
3d7efd18 2435
0eefd470
AE
2436 /* Then the original write request op */
2437
b91f09f1
AE
2438 offset = orig_request->offset;
2439 length = orig_request->length;
0eefd470 2440 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2441 offset, length, 0, 0);
2442 if (orig_request->type == OBJ_REQUEST_BIO)
2443 osd_req_op_extent_osd_data_bio(osd_req, 1,
2444 orig_request->bio_list, length);
2445 else
2446 osd_req_op_extent_osd_data_pages(osd_req, 1,
2447 orig_request->pages, length,
2448 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2449
2450 rbd_osd_req_format_write(orig_request);
2451
2452 /* All set, send it off. */
2453
0eefd470 2454 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2455 img_result = rbd_obj_request_submit(osdc, orig_request);
2456 if (!img_result)
0eefd470
AE
2457 return;
2458out_err:
2459 /* Record the error code and complete the request */
2460
bbea1c1a 2461 orig_request->result = img_result;
0eefd470
AE
2462 orig_request->xferred = 0;
2463 obj_request_done_set(orig_request);
2464 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2465}
2466
2467/*
2468 * Read from the parent image the range of data that covers the
2469 * entire target of the given object request. This is used for
2470 * satisfying a layered image write request when the target of an
2471 * object request from the image request does not exist.
2472 *
2473 * A page array big enough to hold the returned data is allocated
2474 * and supplied to rbd_img_request_fill() as the "data descriptor."
2475 * When the read completes, this page array will be transferred to
2476 * the original object request for the copyup operation.
2477 *
2478 * If an error occurs, record it as the result of the original
2479 * object request and mark it done so it gets completed.
2480 */
2481static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2482{
2483 struct rbd_img_request *img_request = NULL;
2484 struct rbd_img_request *parent_request = NULL;
2485 struct rbd_device *rbd_dev;
2486 u64 img_offset;
2487 u64 length;
2488 struct page **pages = NULL;
2489 u32 page_count;
2490 int result;
2491
2492 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2493 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2494
2495 img_request = obj_request->img_request;
2496 rbd_assert(img_request != NULL);
2497 rbd_dev = img_request->rbd_dev;
2498 rbd_assert(rbd_dev->parent != NULL);
2499
2500 /*
2501 * Determine the byte range covered by the object in the
2502 * child image to which the original request was to be sent.
2503 */
2504 img_offset = obj_request->img_offset - obj_request->offset;
2505 length = (u64)1 << rbd_dev->header.obj_order;
2506
a9e8ba2c
AE
2507 /*
2508 * There is no defined parent data beyond the parent
2509 * overlap, so limit what we read at that boundary if
2510 * necessary.
2511 */
2512 if (img_offset + length > rbd_dev->parent_overlap) {
2513 rbd_assert(img_offset < rbd_dev->parent_overlap);
2514 length = rbd_dev->parent_overlap - img_offset;
2515 }
2516
3d7efd18
AE
2517 /*
2518 * Allocate a page array big enough to receive the data read
2519 * from the parent.
2520 */
2521 page_count = (u32)calc_pages_for(0, length);
2522 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2523 if (IS_ERR(pages)) {
2524 result = PTR_ERR(pages);
2525 pages = NULL;
2526 goto out_err;
2527 }
2528
2529 result = -ENOMEM;
e93f3152
AE
2530 parent_request = rbd_parent_request_create(obj_request,
2531 img_offset, length);
3d7efd18
AE
2532 if (!parent_request)
2533 goto out_err;
3d7efd18
AE
2534
2535 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2536 if (result)
2537 goto out_err;
2538 parent_request->copyup_pages = pages;
ebda6408 2539 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2540
2541 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2542 result = rbd_img_request_submit(parent_request);
2543 if (!result)
2544 return 0;
2545
2546 parent_request->copyup_pages = NULL;
ebda6408 2547 parent_request->copyup_page_count = 0;
3d7efd18
AE
2548 parent_request->obj_request = NULL;
2549 rbd_obj_request_put(obj_request);
2550out_err:
2551 if (pages)
2552 ceph_release_page_vector(pages, page_count);
2553 if (parent_request)
2554 rbd_img_request_put(parent_request);
2555 obj_request->result = result;
2556 obj_request->xferred = 0;
2557 obj_request_done_set(obj_request);
2558
2559 return result;
2560}
2561
c5b5ef6c
AE
2562static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2563{
c5b5ef6c 2564 struct rbd_obj_request *orig_request;
638f5abe 2565 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2566 int result;
2567
2568 rbd_assert(!obj_request_img_data_test(obj_request));
2569
2570 /*
2571 * All we need from the object request is the original
2572 * request and the result of the STAT op. Grab those, then
2573 * we're done with the request.
2574 */
2575 orig_request = obj_request->obj_request;
2576 obj_request->obj_request = NULL;
2577 rbd_assert(orig_request);
2578 rbd_assert(orig_request->img_request);
2579
2580 result = obj_request->result;
2581 obj_request->result = 0;
2582
2583 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2584 obj_request, orig_request, result,
2585 obj_request->xferred, obj_request->length);
2586 rbd_obj_request_put(obj_request);
2587
638f5abe
AE
2588 /*
2589 * If the overlap has become 0 (most likely because the
2590 * image has been flattened) we need to free the pages
2591 * and re-submit the original write request.
2592 */
2593 rbd_dev = orig_request->img_request->rbd_dev;
2594 if (!rbd_dev->parent_overlap) {
2595 struct ceph_osd_client *osdc;
2596
2597 rbd_obj_request_put(orig_request);
2598 osdc = &rbd_dev->rbd_client->client->osdc;
2599 result = rbd_obj_request_submit(osdc, orig_request);
2600 if (!result)
2601 return;
2602 }
c5b5ef6c
AE
2603
2604 /*
2605 * Our only purpose here is to determine whether the object
2606 * exists, and we don't want to treat the non-existence as
2607 * an error. If something else comes back, transfer the
2608 * error to the original request and complete it now.
2609 */
2610 if (!result) {
2611 obj_request_existence_set(orig_request, true);
2612 } else if (result == -ENOENT) {
2613 obj_request_existence_set(orig_request, false);
2614 } else if (result) {
2615 orig_request->result = result;
3d7efd18 2616 goto out;
c5b5ef6c
AE
2617 }
2618
2619 /*
2620 * Resubmit the original request now that we have recorded
2621 * whether the target object exists.
2622 */
b454e36d 2623 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2624out:
c5b5ef6c
AE
2625 if (orig_request->result)
2626 rbd_obj_request_complete(orig_request);
2627 rbd_obj_request_put(orig_request);
2628}
2629
2630static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2631{
2632 struct rbd_obj_request *stat_request;
2633 struct rbd_device *rbd_dev;
2634 struct ceph_osd_client *osdc;
2635 struct page **pages = NULL;
2636 u32 page_count;
2637 size_t size;
2638 int ret;
2639
2640 /*
2641 * The response data for a STAT call consists of:
2642 * le64 length;
2643 * struct {
2644 * le32 tv_sec;
2645 * le32 tv_nsec;
2646 * } mtime;
2647 */
2648 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2649 page_count = (u32)calc_pages_for(0, size);
2650 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2651 if (IS_ERR(pages))
2652 return PTR_ERR(pages);
2653
2654 ret = -ENOMEM;
2655 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2656 OBJ_REQUEST_PAGES);
2657 if (!stat_request)
2658 goto out;
2659
2660 rbd_obj_request_get(obj_request);
2661 stat_request->obj_request = obj_request;
2662 stat_request->pages = pages;
2663 stat_request->page_count = page_count;
2664
2665 rbd_assert(obj_request->img_request);
2666 rbd_dev = obj_request->img_request->rbd_dev;
2667 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2668 stat_request);
2669 if (!stat_request->osd_req)
2670 goto out;
2671 stat_request->callback = rbd_img_obj_exists_callback;
2672
2673 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2674 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2675 false, false);
9d4df01f 2676 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2677
2678 osdc = &rbd_dev->rbd_client->client->osdc;
2679 ret = rbd_obj_request_submit(osdc, stat_request);
2680out:
2681 if (ret)
2682 rbd_obj_request_put(obj_request);
2683
2684 return ret;
2685}
2686
b454e36d
AE
2687static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2688{
2689 struct rbd_img_request *img_request;
a9e8ba2c 2690 struct rbd_device *rbd_dev;
3d7efd18 2691 bool known;
b454e36d
AE
2692
2693 rbd_assert(obj_request_img_data_test(obj_request));
2694
2695 img_request = obj_request->img_request;
2696 rbd_assert(img_request);
a9e8ba2c 2697 rbd_dev = img_request->rbd_dev;
b454e36d 2698
b454e36d 2699 /*
a9e8ba2c
AE
2700 * Only writes to layered images need special handling.
2701 * Reads and non-layered writes are simple object requests.
2702 * Layered writes that start beyond the end of the overlap
2703 * with the parent have no parent data, so they too are
2704 * simple object requests. Finally, if the target object is
2705 * known to already exist, its parent data has already been
2706 * copied, so a write to the object can also be handled as a
2707 * simple object request.
b454e36d
AE
2708 */
2709 if (!img_request_write_test(img_request) ||
2710 !img_request_layered_test(img_request) ||
7029f064 2711 !obj_request_overlaps_parent(obj_request) ||
3d7efd18
AE
2712 ((known = obj_request_known_test(obj_request)) &&
2713 obj_request_exists_test(obj_request))) {
b454e36d
AE
2714
2715 struct rbd_device *rbd_dev;
2716 struct ceph_osd_client *osdc;
2717
2718 rbd_dev = obj_request->img_request->rbd_dev;
2719 osdc = &rbd_dev->rbd_client->client->osdc;
2720
2721 return rbd_obj_request_submit(osdc, obj_request);
2722 }
2723
2724 /*
3d7efd18
AE
2725 * It's a layered write. The target object might exist but
2726 * we may not know that yet. If we know it doesn't exist,
2727 * start by reading the data for the full target object from
2728 * the parent so we can use it for a copyup to the target.
b454e36d 2729 */
3d7efd18
AE
2730 if (known)
2731 return rbd_img_obj_parent_read_full(obj_request);
2732
2733 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2734
2735 return rbd_img_obj_exists_submit(obj_request);
2736}
2737
bf0d5f50
AE
2738static int rbd_img_request_submit(struct rbd_img_request *img_request)
2739{
bf0d5f50 2740 struct rbd_obj_request *obj_request;
46faeed4 2741 struct rbd_obj_request *next_obj_request;
bf0d5f50 2742
37206ee5 2743 dout("%s: img %p\n", __func__, img_request);
46faeed4 2744 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2745 int ret;
2746
b454e36d 2747 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2748 if (ret)
2749 return ret;
bf0d5f50
AE
2750 }
2751
2752 return 0;
2753}
8b3e1a56
AE
2754
2755static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2756{
2757 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2758 struct rbd_device *rbd_dev;
2759 u64 obj_end;
02c74fba
AE
2760 u64 img_xferred;
2761 int img_result;
8b3e1a56
AE
2762
2763 rbd_assert(img_request_child_test(img_request));
2764
02c74fba
AE
2765 /* First get what we need from the image request and release it */
2766
8b3e1a56 2767 obj_request = img_request->obj_request;
02c74fba
AE
2768 img_xferred = img_request->xferred;
2769 img_result = img_request->result;
2770 rbd_img_request_put(img_request);
2771
2772 /*
2773 * If the overlap has become 0 (most likely because the
2774 * image has been flattened) we need to re-submit the
2775 * original request.
2776 */
a9e8ba2c
AE
2777 rbd_assert(obj_request);
2778 rbd_assert(obj_request->img_request);
02c74fba
AE
2779 rbd_dev = obj_request->img_request->rbd_dev;
2780 if (!rbd_dev->parent_overlap) {
2781 struct ceph_osd_client *osdc;
2782
2783 osdc = &rbd_dev->rbd_client->client->osdc;
2784 img_result = rbd_obj_request_submit(osdc, obj_request);
2785 if (!img_result)
2786 return;
2787 }
a9e8ba2c 2788
02c74fba 2789 obj_request->result = img_result;
a9e8ba2c
AE
2790 if (obj_request->result)
2791 goto out;
2792
2793 /*
2794 * We need to zero anything beyond the parent overlap
2795 * boundary. Since rbd_img_obj_request_read_callback()
2796 * will zero anything beyond the end of a short read, an
2797 * easy way to do this is to pretend the data from the
2798 * parent came up short--ending at the overlap boundary.
2799 */
2800 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2801 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2802 if (obj_end > rbd_dev->parent_overlap) {
2803 u64 xferred = 0;
2804
2805 if (obj_request->img_offset < rbd_dev->parent_overlap)
2806 xferred = rbd_dev->parent_overlap -
2807 obj_request->img_offset;
8b3e1a56 2808
02c74fba 2809 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2810 } else {
02c74fba 2811 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2812 }
2813out:
8b3e1a56
AE
2814 rbd_img_obj_request_read_callback(obj_request);
2815 rbd_obj_request_complete(obj_request);
2816}
2817
2818static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2819{
8b3e1a56
AE
2820 struct rbd_img_request *img_request;
2821 int result;
2822
2823 rbd_assert(obj_request_img_data_test(obj_request));
2824 rbd_assert(obj_request->img_request != NULL);
2825 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2826 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2827
8b3e1a56 2828 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2829 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2830 obj_request->img_offset,
e93f3152 2831 obj_request->length);
8b3e1a56
AE
2832 result = -ENOMEM;
2833 if (!img_request)
2834 goto out_err;
2835
5b2ab72d
AE
2836 if (obj_request->type == OBJ_REQUEST_BIO)
2837 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2838 obj_request->bio_list);
2839 else
2840 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2841 obj_request->pages);
8b3e1a56
AE
2842 if (result)
2843 goto out_err;
2844
2845 img_request->callback = rbd_img_parent_read_callback;
2846 result = rbd_img_request_submit(img_request);
2847 if (result)
2848 goto out_err;
2849
2850 return;
2851out_err:
2852 if (img_request)
2853 rbd_img_request_put(img_request);
2854 obj_request->result = result;
2855 obj_request->xferred = 0;
2856 obj_request_done_set(obj_request);
2857}
bf0d5f50 2858
b10f19aa 2859static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2860{
2861 struct rbd_obj_request *obj_request;
2169238d 2862 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2863 int ret;
2864
2865 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2866 OBJ_REQUEST_NODATA);
2867 if (!obj_request)
2868 return -ENOMEM;
2869
2870 ret = -ENOMEM;
430c28c3 2871 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2872 if (!obj_request->osd_req)
2873 goto out;
2874
c99d2d4a 2875 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2876 notify_id, 0, 0);
9d4df01f 2877 rbd_osd_req_format_read(obj_request);
430c28c3 2878
b8d70035 2879 ret = rbd_obj_request_submit(osdc, obj_request);
cf81b60e 2880 if (ret)
b10f19aa
JD
2881 goto out;
2882 ret = rbd_obj_request_wait(obj_request);
2883out:
2884 rbd_obj_request_put(obj_request);
b8d70035
AE
2885
2886 return ret;
2887}
2888
2889static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2890{
2891 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2892 int ret;
b8d70035
AE
2893
2894 if (!rbd_dev)
2895 return;
2896
37206ee5 2897 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2898 rbd_dev->header_name, (unsigned long long)notify_id,
2899 (unsigned int)opcode);
e627db08
AE
2900 ret = rbd_dev_refresh(rbd_dev);
2901 if (ret)
2902 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2903
b10f19aa 2904 rbd_obj_notify_ack_sync(rbd_dev, notify_id);
b8d70035
AE
2905}
2906
9969ebc5
AE
2907/*
2908 * Request sync osd watch/unwatch. The value of "start" determines
2909 * whether a watch request is being initiated or torn down.
2910 */
1f3ef788 2911static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2912{
2913 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2914 struct rbd_obj_request *obj_request;
9969ebc5
AE
2915 int ret;
2916
2917 rbd_assert(start ^ !!rbd_dev->watch_event);
2918 rbd_assert(start ^ !!rbd_dev->watch_request);
2919
2920 if (start) {
3c663bbd 2921 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2922 &rbd_dev->watch_event);
2923 if (ret < 0)
2924 return ret;
8eb87565 2925 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2926 }
2927
2928 ret = -ENOMEM;
2929 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2930 OBJ_REQUEST_NODATA);
2931 if (!obj_request)
2932 goto out_cancel;
2933
430c28c3
AE
2934 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2935 if (!obj_request->osd_req)
2936 goto out_cancel;
2937
8eb87565 2938 if (start)
975241af 2939 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2940 else
6977c3f9 2941 ceph_osdc_unregister_linger_request(osdc,
975241af 2942 rbd_dev->watch_request->osd_req);
2169238d
AE
2943
2944 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2945 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2946 rbd_osd_req_format_write(obj_request);
2169238d 2947
9969ebc5
AE
2948 ret = rbd_obj_request_submit(osdc, obj_request);
2949 if (ret)
2950 goto out_cancel;
2951 ret = rbd_obj_request_wait(obj_request);
2952 if (ret)
2953 goto out_cancel;
9969ebc5
AE
2954 ret = obj_request->result;
2955 if (ret)
2956 goto out_cancel;
2957
8eb87565
AE
2958 /*
2959 * A watch request is set to linger, so the underlying osd
2960 * request won't go away until we unregister it. We retain
2961 * a pointer to the object request during that time (in
2962 * rbd_dev->watch_request), so we'll keep a reference to
2963 * it. We'll drop that reference (below) after we've
2964 * unregistered it.
2965 */
2966 if (start) {
2967 rbd_dev->watch_request = obj_request;
2968
2969 return 0;
2970 }
2971
2972 /* We have successfully torn down the watch request */
2973
2974 rbd_obj_request_put(rbd_dev->watch_request);
2975 rbd_dev->watch_request = NULL;
9969ebc5
AE
2976out_cancel:
2977 /* Cancel the event if we're tearing down, or on error */
2978 ceph_osdc_cancel_event(rbd_dev->watch_event);
2979 rbd_dev->watch_event = NULL;
9969ebc5
AE
2980 if (obj_request)
2981 rbd_obj_request_put(obj_request);
2982
2983 return ret;
2984}
2985
36be9a76 2986/*
f40eb349
AE
2987 * Synchronous osd object method call. Returns the number of bytes
2988 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2989 */
2990static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2991 const char *object_name,
2992 const char *class_name,
2993 const char *method_name,
4157976b 2994 const void *outbound,
36be9a76 2995 size_t outbound_size,
4157976b 2996 void *inbound,
e2a58ee5 2997 size_t inbound_size)
36be9a76 2998{
2169238d 2999 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 3000 struct rbd_obj_request *obj_request;
36be9a76
AE
3001 struct page **pages;
3002 u32 page_count;
3003 int ret;
3004
3005 /*
6010a451
AE
3006 * Method calls are ultimately read operations. The result
3007 * should placed into the inbound buffer provided. They
3008 * also supply outbound data--parameters for the object
3009 * method. Currently if this is present it will be a
3010 * snapshot id.
36be9a76 3011 */
57385b51 3012 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
3013 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3014 if (IS_ERR(pages))
3015 return PTR_ERR(pages);
3016
3017 ret = -ENOMEM;
6010a451 3018 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
3019 OBJ_REQUEST_PAGES);
3020 if (!obj_request)
3021 goto out;
3022
3023 obj_request->pages = pages;
3024 obj_request->page_count = page_count;
3025
430c28c3 3026 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
3027 if (!obj_request->osd_req)
3028 goto out;
3029
c99d2d4a 3030 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
3031 class_name, method_name);
3032 if (outbound_size) {
3033 struct ceph_pagelist *pagelist;
3034
3035 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3036 if (!pagelist)
3037 goto out;
3038
3039 ceph_pagelist_init(pagelist);
3040 ceph_pagelist_append(pagelist, outbound, outbound_size);
3041 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3042 pagelist);
3043 }
a4ce40a9
AE
3044 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3045 obj_request->pages, inbound_size,
44cd188d 3046 0, false, false);
9d4df01f 3047 rbd_osd_req_format_read(obj_request);
430c28c3 3048
36be9a76
AE
3049 ret = rbd_obj_request_submit(osdc, obj_request);
3050 if (ret)
3051 goto out;
3052 ret = rbd_obj_request_wait(obj_request);
3053 if (ret)
3054 goto out;
3055
3056 ret = obj_request->result;
3057 if (ret < 0)
3058 goto out;
57385b51
AE
3059
3060 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3061 ret = (int)obj_request->xferred;
903bb32e 3062 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3063out:
3064 if (obj_request)
3065 rbd_obj_request_put(obj_request);
3066 else
3067 ceph_release_page_vector(pages, page_count);
3068
3069 return ret;
3070}
3071
bf0d5f50 3072static void rbd_request_fn(struct request_queue *q)
cc344fa1 3073 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
3074{
3075 struct rbd_device *rbd_dev = q->queuedata;
3076 bool read_only = rbd_dev->mapping.read_only;
3077 struct request *rq;
3078 int result;
3079
3080 while ((rq = blk_fetch_request(q))) {
3081 bool write_request = rq_data_dir(rq) == WRITE;
3082 struct rbd_img_request *img_request;
3083 u64 offset;
3084 u64 length;
3085
3086 /* Ignore any non-FS requests that filter through. */
3087
3088 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
3089 dout("%s: non-fs request type %d\n", __func__,
3090 (int) rq->cmd_type);
3091 __blk_end_request_all(rq, 0);
3092 continue;
3093 }
3094
3095 /* Ignore/skip any zero-length requests */
3096
3097 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3098 length = (u64) blk_rq_bytes(rq);
3099
3100 if (!length) {
3101 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3102 __blk_end_request_all(rq, 0);
3103 continue;
3104 }
3105
3106 spin_unlock_irq(q->queue_lock);
3107
3108 /* Disallow writes to a read-only device */
3109
3110 if (write_request) {
3111 result = -EROFS;
3112 if (read_only)
3113 goto end_request;
3114 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3115 }
3116
6d292906
AE
3117 /*
3118 * Quit early if the mapped snapshot no longer
3119 * exists. It's still possible the snapshot will
3120 * have disappeared by the time our request arrives
3121 * at the osd, but there's no sense in sending it if
3122 * we already know.
3123 */
3124 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3125 dout("request for non-existent snapshot");
3126 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3127 result = -ENXIO;
3128 goto end_request;
3129 }
3130
bf0d5f50 3131 result = -EINVAL;
c0cd10db
AE
3132 if (offset && length > U64_MAX - offset + 1) {
3133 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3134 offset, length);
bf0d5f50 3135 goto end_request; /* Shouldn't happen */
c0cd10db 3136 }
bf0d5f50 3137
00a653e2
AE
3138 result = -EIO;
3139 if (offset + length > rbd_dev->mapping.size) {
3140 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3141 offset, length, rbd_dev->mapping.size);
3142 goto end_request;
3143 }
3144
bf0d5f50
AE
3145 result = -ENOMEM;
3146 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3147 write_request);
bf0d5f50
AE
3148 if (!img_request)
3149 goto end_request;
3150
3151 img_request->rq = rq;
3152
f1a4739f
AE
3153 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3154 rq->bio);
bf0d5f50
AE
3155 if (!result)
3156 result = rbd_img_request_submit(img_request);
3157 if (result)
3158 rbd_img_request_put(img_request);
3159end_request:
3160 spin_lock_irq(q->queue_lock);
3161 if (result < 0) {
7da22d29
AE
3162 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3163 write_request ? "write" : "read",
3164 length, offset, result);
3165
bf0d5f50
AE
3166 __blk_end_request_all(rq, result);
3167 }
3168 }
3169}
3170
602adf40
YS
3171/*
3172 * a queue callback. Makes sure that we don't create a bio that spans across
3173 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3174 * which we handle later at bio_chain_clone_range()
602adf40
YS
3175 */
3176static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3177 struct bio_vec *bvec)
3178{
3179 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3180 sector_t sector_offset;
3181 sector_t sectors_per_obj;
3182 sector_t obj_sector_offset;
3183 int ret;
3184
3185 /*
3186 * Find how far into its rbd object the partition-relative
3187 * bio start sector is to offset relative to the enclosing
3188 * device.
3189 */
3190 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3191 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3192 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3193
3194 /*
3195 * Compute the number of bytes from that offset to the end
3196 * of the object. Account for what's already used by the bio.
3197 */
3198 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3199 if (ret > bmd->bi_size)
3200 ret -= bmd->bi_size;
3201 else
3202 ret = 0;
3203
3204 /*
3205 * Don't send back more than was asked for. And if the bio
3206 * was empty, let the whole thing through because: "Note
3207 * that a block device *must* allow a single page to be
3208 * added to an empty bio."
3209 */
3210 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3211 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3212 ret = (int) bvec->bv_len;
3213
3214 return ret;
602adf40
YS
3215}
3216
3217static void rbd_free_disk(struct rbd_device *rbd_dev)
3218{
3219 struct gendisk *disk = rbd_dev->disk;
3220
3221 if (!disk)
3222 return;
3223
a0cab924
AE
3224 rbd_dev->disk = NULL;
3225 if (disk->flags & GENHD_FL_UP) {
602adf40 3226 del_gendisk(disk);
a0cab924
AE
3227 if (disk->queue)
3228 blk_cleanup_queue(disk->queue);
3229 }
602adf40
YS
3230 put_disk(disk);
3231}
3232
788e2df3
AE
3233static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3234 const char *object_name,
7097f8df 3235 u64 offset, u64 length, void *buf)
788e2df3
AE
3236
3237{
2169238d 3238 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3239 struct rbd_obj_request *obj_request;
788e2df3
AE
3240 struct page **pages = NULL;
3241 u32 page_count;
1ceae7ef 3242 size_t size;
788e2df3
AE
3243 int ret;
3244
3245 page_count = (u32) calc_pages_for(offset, length);
3246 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3247 if (IS_ERR(pages))
f693fddf 3248 return PTR_ERR(pages);
788e2df3
AE
3249
3250 ret = -ENOMEM;
3251 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3252 OBJ_REQUEST_PAGES);
788e2df3
AE
3253 if (!obj_request)
3254 goto out;
3255
3256 obj_request->pages = pages;
3257 obj_request->page_count = page_count;
3258
430c28c3 3259 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3260 if (!obj_request->osd_req)
3261 goto out;
3262
c99d2d4a
AE
3263 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3264 offset, length, 0, 0);
406e2c9f 3265 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3266 obj_request->pages,
44cd188d
AE
3267 obj_request->length,
3268 obj_request->offset & ~PAGE_MASK,
3269 false, false);
9d4df01f 3270 rbd_osd_req_format_read(obj_request);
430c28c3 3271
788e2df3
AE
3272 ret = rbd_obj_request_submit(osdc, obj_request);
3273 if (ret)
3274 goto out;
3275 ret = rbd_obj_request_wait(obj_request);
3276 if (ret)
3277 goto out;
3278
3279 ret = obj_request->result;
3280 if (ret < 0)
3281 goto out;
1ceae7ef
AE
3282
3283 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3284 size = (size_t) obj_request->xferred;
903bb32e 3285 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3286 rbd_assert(size <= (size_t)INT_MAX);
3287 ret = (int)size;
788e2df3
AE
3288out:
3289 if (obj_request)
3290 rbd_obj_request_put(obj_request);
3291 else
3292 ceph_release_page_vector(pages, page_count);
3293
3294 return ret;
3295}
3296
602adf40 3297/*
662518b1
AE
3298 * Read the complete header for the given rbd device. On successful
3299 * return, the rbd_dev->header field will contain up-to-date
3300 * information about the image.
602adf40 3301 */
99a41ebc 3302static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3303{
4156d998 3304 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3305 u32 snap_count = 0;
4156d998
AE
3306 u64 names_size = 0;
3307 u32 want_count;
3308 int ret;
602adf40 3309
00f1f36f 3310 /*
4156d998
AE
3311 * The complete header will include an array of its 64-bit
3312 * snapshot ids, followed by the names of those snapshots as
3313 * a contiguous block of NUL-terminated strings. Note that
3314 * the number of snapshots could change by the time we read
3315 * it in, in which case we re-read it.
00f1f36f 3316 */
4156d998
AE
3317 do {
3318 size_t size;
3319
3320 kfree(ondisk);
3321
3322 size = sizeof (*ondisk);
3323 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3324 size += names_size;
3325 ondisk = kmalloc(size, GFP_KERNEL);
3326 if (!ondisk)
662518b1 3327 return -ENOMEM;
4156d998 3328
788e2df3 3329 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3330 0, size, ondisk);
4156d998 3331 if (ret < 0)
662518b1 3332 goto out;
c0cd10db 3333 if ((size_t)ret < size) {
4156d998 3334 ret = -ENXIO;
06ecc6cb
AE
3335 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3336 size, ret);
662518b1 3337 goto out;
4156d998
AE
3338 }
3339 if (!rbd_dev_ondisk_valid(ondisk)) {
3340 ret = -ENXIO;
06ecc6cb 3341 rbd_warn(rbd_dev, "invalid header");
662518b1 3342 goto out;
81e759fb 3343 }
602adf40 3344
4156d998
AE
3345 names_size = le64_to_cpu(ondisk->snap_names_len);
3346 want_count = snap_count;
3347 snap_count = le32_to_cpu(ondisk->snap_count);
3348 } while (snap_count != want_count);
00f1f36f 3349
662518b1
AE
3350 ret = rbd_header_from_disk(rbd_dev, ondisk);
3351out:
4156d998
AE
3352 kfree(ondisk);
3353
3354 return ret;
602adf40
YS
3355}
3356
15228ede
AE
3357/*
3358 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3359 * has disappeared from the (just updated) snapshot context.
3360 */
3361static void rbd_exists_validate(struct rbd_device *rbd_dev)
3362{
3363 u64 snap_id;
3364
3365 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3366 return;
3367
3368 snap_id = rbd_dev->spec->snap_id;
3369 if (snap_id == CEPH_NOSNAP)
3370 return;
3371
3372 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3373 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3374}
3375
5b213542
JD
3376static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3377{
3378 sector_t size;
3379 bool removing;
3380
3381 /*
3382 * Don't hold the lock while doing disk operations,
3383 * or lock ordering will conflict with the bdev mutex via:
3384 * rbd_add() -> blkdev_get() -> rbd_open()
3385 */
3386 spin_lock_irq(&rbd_dev->lock);
3387 removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3388 spin_unlock_irq(&rbd_dev->lock);
3389 /*
3390 * If the device is being removed, rbd_dev->disk has
3391 * been destroyed, so don't try to update its size
3392 */
3393 if (!removing) {
3394 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3395 dout("setting size to %llu sectors", (unsigned long long)size);
3396 set_capacity(rbd_dev->disk, size);
3397 revalidate_disk(rbd_dev->disk);
3398 }
3399}
3400
cc4a38bd 3401static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3402{
e627db08 3403 u64 mapping_size;
1fe5e993
AE
3404 int ret;
3405
117973fb 3406 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3407 mapping_size = rbd_dev->mapping.size;
1fe5e993 3408 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3409 if (rbd_dev->image_format == 1)
99a41ebc 3410 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3411 else
2df3fac7 3412 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3413
3414 /* If it's a mapped snapshot, validate its EXISTS flag */
3415
3416 rbd_exists_validate(rbd_dev);
1fe5e993 3417 mutex_unlock(&ctl_mutex);
00a653e2 3418 if (mapping_size != rbd_dev->mapping.size) {
5b213542 3419 rbd_dev_update_size(rbd_dev);
00a653e2 3420 }
1fe5e993
AE
3421
3422 return ret;
3423}
3424
602adf40
YS
3425static int rbd_init_disk(struct rbd_device *rbd_dev)
3426{
3427 struct gendisk *disk;
3428 struct request_queue *q;
593a9e7b 3429 u64 segment_size;
602adf40 3430
602adf40 3431 /* create gendisk info */
602adf40
YS
3432 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3433 if (!disk)
1fcdb8aa 3434 return -ENOMEM;
602adf40 3435
f0f8cef5 3436 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3437 rbd_dev->dev_id);
602adf40
YS
3438 disk->major = rbd_dev->major;
3439 disk->first_minor = 0;
3440 disk->fops = &rbd_bd_ops;
3441 disk->private_data = rbd_dev;
3442
bf0d5f50 3443 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3444 if (!q)
3445 goto out_disk;
029bcbd8 3446
593a9e7b
AE
3447 /* We use the default size, but let's be explicit about it. */
3448 blk_queue_physical_block_size(q, SECTOR_SIZE);
3449
029bcbd8 3450 /* set io sizes to object size */
593a9e7b
AE
3451 segment_size = rbd_obj_bytes(&rbd_dev->header);
3452 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3453 blk_queue_max_segment_size(q, segment_size);
3454 blk_queue_io_min(q, segment_size);
3455 blk_queue_io_opt(q, segment_size);
029bcbd8 3456
602adf40 3457 blk_queue_merge_bvec(q, rbd_merge_bvec);
76cfab64
RH
3458 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
3459 q->backing_dev_info.capabilities |= BDI_CAP_STABLE_WRITES;
3460
602adf40
YS
3461 disk->queue = q;
3462
3463 q->queuedata = rbd_dev;
3464
3465 rbd_dev->disk = disk;
602adf40 3466
602adf40 3467 return 0;
602adf40
YS
3468out_disk:
3469 put_disk(disk);
1fcdb8aa
AE
3470
3471 return -ENOMEM;
602adf40
YS
3472}
3473
dfc5606d
YS
3474/*
3475 sysfs
3476*/
3477
593a9e7b
AE
3478static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3479{
3480 return container_of(dev, struct rbd_device, dev);
3481}
3482
dfc5606d
YS
3483static ssize_t rbd_size_show(struct device *dev,
3484 struct device_attribute *attr, char *buf)
3485{
593a9e7b 3486 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3487
fc71d833
AE
3488 return sprintf(buf, "%llu\n",
3489 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3490}
3491
34b13184
AE
3492/*
3493 * Note this shows the features for whatever's mapped, which is not
3494 * necessarily the base image.
3495 */
3496static ssize_t rbd_features_show(struct device *dev,
3497 struct device_attribute *attr, char *buf)
3498{
3499 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3500
3501 return sprintf(buf, "0x%016llx\n",
fc71d833 3502 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3503}
3504
dfc5606d
YS
3505static ssize_t rbd_major_show(struct device *dev,
3506 struct device_attribute *attr, char *buf)
3507{
593a9e7b 3508 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3509
fc71d833
AE
3510 if (rbd_dev->major)
3511 return sprintf(buf, "%d\n", rbd_dev->major);
3512
3513 return sprintf(buf, "(none)\n");
3514
dfc5606d
YS
3515}
3516
3517static ssize_t rbd_client_id_show(struct device *dev,
3518 struct device_attribute *attr, char *buf)
602adf40 3519{
593a9e7b 3520 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3521
1dbb4399
AE
3522 return sprintf(buf, "client%lld\n",
3523 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3524}
3525
dfc5606d
YS
3526static ssize_t rbd_pool_show(struct device *dev,
3527 struct device_attribute *attr, char *buf)
602adf40 3528{
593a9e7b 3529 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3530
0d7dbfce 3531 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3532}
3533
9bb2f334
AE
3534static ssize_t rbd_pool_id_show(struct device *dev,
3535 struct device_attribute *attr, char *buf)
3536{
3537 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3538
0d7dbfce 3539 return sprintf(buf, "%llu\n",
fc71d833 3540 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3541}
3542
dfc5606d
YS
3543static ssize_t rbd_name_show(struct device *dev,
3544 struct device_attribute *attr, char *buf)
3545{
593a9e7b 3546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3547
a92ffdf8
AE
3548 if (rbd_dev->spec->image_name)
3549 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3550
3551 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3552}
3553
589d30e0
AE
3554static ssize_t rbd_image_id_show(struct device *dev,
3555 struct device_attribute *attr, char *buf)
3556{
3557 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3558
0d7dbfce 3559 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3560}
3561
34b13184
AE
3562/*
3563 * Shows the name of the currently-mapped snapshot (or
3564 * RBD_SNAP_HEAD_NAME for the base image).
3565 */
dfc5606d
YS
3566static ssize_t rbd_snap_show(struct device *dev,
3567 struct device_attribute *attr,
3568 char *buf)
3569{
593a9e7b 3570 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3571
0d7dbfce 3572 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3573}
3574
86b00e0d
AE
3575/*
3576 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3577 * for the parent image. If there is no parent, simply shows
3578 * "(no parent image)".
3579 */
3580static ssize_t rbd_parent_show(struct device *dev,
3581 struct device_attribute *attr,
3582 char *buf)
3583{
3584 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3585 struct rbd_spec *spec = rbd_dev->parent_spec;
3586 int count;
3587 char *bufp = buf;
3588
3589 if (!spec)
3590 return sprintf(buf, "(no parent image)\n");
3591
3592 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3593 (unsigned long long) spec->pool_id, spec->pool_name);
3594 if (count < 0)
3595 return count;
3596 bufp += count;
3597
3598 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3599 spec->image_name ? spec->image_name : "(unknown)");
3600 if (count < 0)
3601 return count;
3602 bufp += count;
3603
3604 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3605 (unsigned long long) spec->snap_id, spec->snap_name);
3606 if (count < 0)
3607 return count;
3608 bufp += count;
3609
3610 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3611 if (count < 0)
3612 return count;
3613 bufp += count;
3614
3615 return (ssize_t) (bufp - buf);
3616}
3617
dfc5606d
YS
3618static ssize_t rbd_image_refresh(struct device *dev,
3619 struct device_attribute *attr,
3620 const char *buf,
3621 size_t size)
3622{
593a9e7b 3623 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3624 int ret;
602adf40 3625
cc4a38bd 3626 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3627 if (ret)
3628 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3629
3630 return ret < 0 ? ret : size;
dfc5606d 3631}
602adf40 3632
dfc5606d 3633static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3634static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3635static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3636static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3637static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3638static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3639static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3640static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3641static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3642static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3643static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3644
3645static struct attribute *rbd_attrs[] = {
3646 &dev_attr_size.attr,
34b13184 3647 &dev_attr_features.attr,
dfc5606d
YS
3648 &dev_attr_major.attr,
3649 &dev_attr_client_id.attr,
3650 &dev_attr_pool.attr,
9bb2f334 3651 &dev_attr_pool_id.attr,
dfc5606d 3652 &dev_attr_name.attr,
589d30e0 3653 &dev_attr_image_id.attr,
dfc5606d 3654 &dev_attr_current_snap.attr,
86b00e0d 3655 &dev_attr_parent.attr,
dfc5606d 3656 &dev_attr_refresh.attr,
dfc5606d
YS
3657 NULL
3658};
3659
3660static struct attribute_group rbd_attr_group = {
3661 .attrs = rbd_attrs,
3662};
3663
3664static const struct attribute_group *rbd_attr_groups[] = {
3665 &rbd_attr_group,
3666 NULL
3667};
3668
3669static void rbd_sysfs_dev_release(struct device *dev)
3670{
3671}
3672
3673static struct device_type rbd_device_type = {
3674 .name = "rbd",
3675 .groups = rbd_attr_groups,
3676 .release = rbd_sysfs_dev_release,
3677};
3678
8b8fb99c
AE
3679static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3680{
3681 kref_get(&spec->kref);
3682
3683 return spec;
3684}
3685
3686static void rbd_spec_free(struct kref *kref);
3687static void rbd_spec_put(struct rbd_spec *spec)
3688{
3689 if (spec)
3690 kref_put(&spec->kref, rbd_spec_free);
3691}
3692
3693static struct rbd_spec *rbd_spec_alloc(void)
3694{
3695 struct rbd_spec *spec;
3696
3697 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3698 if (!spec)
3699 return NULL;
3700 kref_init(&spec->kref);
3701
8b8fb99c
AE
3702 return spec;
3703}
3704
3705static void rbd_spec_free(struct kref *kref)
3706{
3707 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3708
3709 kfree(spec->pool_name);
3710 kfree(spec->image_id);
3711 kfree(spec->image_name);
3712 kfree(spec->snap_name);
3713 kfree(spec);
3714}
3715
cc344fa1 3716static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3717 struct rbd_spec *spec)
3718{
3719 struct rbd_device *rbd_dev;
3720
3721 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3722 if (!rbd_dev)
3723 return NULL;
3724
3725 spin_lock_init(&rbd_dev->lock);
6d292906 3726 rbd_dev->flags = 0;
a2acd00e 3727 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3728 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3729 init_rwsem(&rbd_dev->header_rwsem);
3730
3731 rbd_dev->spec = spec;
3732 rbd_dev->rbd_client = rbdc;
3733
0903e875
AE
3734 /* Initialize the layout used for all rbd requests */
3735
3736 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3737 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3738 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3739 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3740
c53d5893
AE
3741 return rbd_dev;
3742}
3743
3744static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3745{
c53d5893
AE
3746 rbd_put_client(rbd_dev->rbd_client);
3747 rbd_spec_put(rbd_dev->spec);
3748 kfree(rbd_dev);
3749}
3750
9d475de5
AE
3751/*
3752 * Get the size and object order for an image snapshot, or if
3753 * snap_id is CEPH_NOSNAP, gets this information for the base
3754 * image.
3755 */
3756static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3757 u8 *order, u64 *snap_size)
3758{
3759 __le64 snapid = cpu_to_le64(snap_id);
3760 int ret;
3761 struct {
3762 u8 order;
3763 __le64 size;
3764 } __attribute__ ((packed)) size_buf = { 0 };
3765
36be9a76 3766 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3767 "rbd", "get_size",
4157976b 3768 &snapid, sizeof (snapid),
e2a58ee5 3769 &size_buf, sizeof (size_buf));
36be9a76 3770 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3771 if (ret < 0)
3772 return ret;
57385b51
AE
3773 if (ret < sizeof (size_buf))
3774 return -ERANGE;
9d475de5 3775
dd932ee7 3776 if (order) {
c86f86e9 3777 *order = size_buf.order;
dd932ee7
JD
3778 dout(" order %u", (unsigned int)*order);
3779 }
9d475de5
AE
3780 *snap_size = le64_to_cpu(size_buf.size);
3781
dd932ee7
JD
3782 dout(" snap_id 0x%016llx snap_size = %llu\n",
3783 (unsigned long long)snap_id,
57385b51 3784 (unsigned long long)*snap_size);
9d475de5
AE
3785
3786 return 0;
3787}
3788
3789static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3790{
3791 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3792 &rbd_dev->header.obj_order,
3793 &rbd_dev->header.image_size);
3794}
3795
1e130199
AE
3796static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3797{
3798 void *reply_buf;
3799 int ret;
3800 void *p;
3801
3802 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3803 if (!reply_buf)
3804 return -ENOMEM;
3805
36be9a76 3806 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3807 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3808 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3809 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3810 if (ret < 0)
3811 goto out;
3812
3813 p = reply_buf;
3814 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3815 p + ret, NULL, GFP_NOIO);
3816 ret = 0;
1e130199
AE
3817
3818 if (IS_ERR(rbd_dev->header.object_prefix)) {
3819 ret = PTR_ERR(rbd_dev->header.object_prefix);
3820 rbd_dev->header.object_prefix = NULL;
3821 } else {
3822 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3823 }
1e130199
AE
3824out:
3825 kfree(reply_buf);
3826
3827 return ret;
3828}
3829
b1b5402a
AE
3830static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3831 u64 *snap_features)
3832{
3833 __le64 snapid = cpu_to_le64(snap_id);
3834 struct {
3835 __le64 features;
3836 __le64 incompat;
4157976b 3837 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3838 u64 incompat;
b1b5402a
AE
3839 int ret;
3840
36be9a76 3841 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3842 "rbd", "get_features",
4157976b 3843 &snapid, sizeof (snapid),
e2a58ee5 3844 &features_buf, sizeof (features_buf));
36be9a76 3845 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3846 if (ret < 0)
3847 return ret;
57385b51
AE
3848 if (ret < sizeof (features_buf))
3849 return -ERANGE;
d889140c
AE
3850
3851 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3852 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3853 return -ENXIO;
d889140c 3854
b1b5402a
AE
3855 *snap_features = le64_to_cpu(features_buf.features);
3856
3857 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3858 (unsigned long long)snap_id,
3859 (unsigned long long)*snap_features,
3860 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3861
3862 return 0;
3863}
3864
3865static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3866{
3867 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3868 &rbd_dev->header.features);
3869}
3870
86b00e0d
AE
3871static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3872{
3873 struct rbd_spec *parent_spec;
3874 size_t size;
3875 void *reply_buf = NULL;
3876 __le64 snapid;
3877 void *p;
3878 void *end;
642a2537 3879 u64 pool_id;
86b00e0d
AE
3880 char *image_id;
3881 u64 overlap;
86b00e0d
AE
3882 int ret;
3883
3884 parent_spec = rbd_spec_alloc();
3885 if (!parent_spec)
3886 return -ENOMEM;
3887
3888 size = sizeof (__le64) + /* pool_id */
3889 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3890 sizeof (__le64) + /* snap_id */
3891 sizeof (__le64); /* overlap */
3892 reply_buf = kmalloc(size, GFP_KERNEL);
3893 if (!reply_buf) {
3894 ret = -ENOMEM;
3895 goto out_err;
3896 }
3897
3898 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3899 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3900 "rbd", "get_parent",
4157976b 3901 &snapid, sizeof (snapid),
e2a58ee5 3902 reply_buf, size);
36be9a76 3903 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3904 if (ret < 0)
3905 goto out_err;
3906
86b00e0d 3907 p = reply_buf;
57385b51
AE
3908 end = reply_buf + ret;
3909 ret = -ERANGE;
642a2537 3910 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
3911 if (pool_id == CEPH_NOPOOL) {
3912 /*
3913 * Either the parent never existed, or we have
3914 * record of it but the image got flattened so it no
3915 * longer has a parent. When the parent of a
3916 * layered image disappears we immediately set the
3917 * overlap to 0. The effect of this is that all new
3918 * requests will be treated as if the image had no
3919 * parent.
3920 */
3921 if (rbd_dev->parent_overlap) {
3922 rbd_dev->parent_overlap = 0;
3923 smp_mb();
3924 rbd_dev_parent_put(rbd_dev);
3925 pr_info("%s: clone image has been flattened\n",
3926 rbd_dev->disk->disk_name);
3927 }
3928
86b00e0d 3929 goto out; /* No parent? No problem. */
392a9dad 3930 }
86b00e0d 3931
0903e875
AE
3932 /* The ceph file layout needs to fit pool id in 32 bits */
3933
3934 ret = -EIO;
642a2537 3935 if (pool_id > (u64)U32_MAX) {
c0cd10db 3936 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3937 (unsigned long long)pool_id, U32_MAX);
57385b51 3938 goto out_err;
c0cd10db 3939 }
642a2537 3940 parent_spec->pool_id = pool_id;
0903e875 3941
979ed480 3942 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3943 if (IS_ERR(image_id)) {
3944 ret = PTR_ERR(image_id);
3945 goto out_err;
3946 }
3947 parent_spec->image_id = image_id;
3948 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3949 ceph_decode_64_safe(&p, end, overlap, out_err);
3950
70cf49cf 3951 if (overlap) {
642a2537 3952 rbd_spec_put(rbd_dev->parent_spec);
70cf49cf
AE
3953 rbd_dev->parent_spec = parent_spec;
3954 parent_spec = NULL; /* rbd_dev now owns this */
3955 rbd_dev->parent_overlap = overlap;
3956 } else {
3957 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3958 }
86b00e0d
AE
3959out:
3960 ret = 0;
3961out_err:
3962 kfree(reply_buf);
3963 rbd_spec_put(parent_spec);
3964
3965 return ret;
3966}
3967
cc070d59
AE
3968static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3969{
3970 struct {
3971 __le64 stripe_unit;
3972 __le64 stripe_count;
3973 } __attribute__ ((packed)) striping_info_buf = { 0 };
3974 size_t size = sizeof (striping_info_buf);
3975 void *p;
3976 u64 obj_size;
3977 u64 stripe_unit;
3978 u64 stripe_count;
3979 int ret;
3980
3981 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3982 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3983 (char *)&striping_info_buf, size);
cc070d59
AE
3984 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3985 if (ret < 0)
3986 return ret;
3987 if (ret < size)
3988 return -ERANGE;
3989
3990 /*
3991 * We don't actually support the "fancy striping" feature
3992 * (STRIPINGV2) yet, but if the striping sizes are the
3993 * defaults the behavior is the same as before. So find
3994 * out, and only fail if the image has non-default values.
3995 */
3996 ret = -EINVAL;
3997 obj_size = (u64)1 << rbd_dev->header.obj_order;
3998 p = &striping_info_buf;
3999 stripe_unit = ceph_decode_64(&p);
4000 if (stripe_unit != obj_size) {
4001 rbd_warn(rbd_dev, "unsupported stripe unit "
4002 "(got %llu want %llu)",
4003 stripe_unit, obj_size);
4004 return -EINVAL;
4005 }
4006 stripe_count = ceph_decode_64(&p);
4007 if (stripe_count != 1) {
4008 rbd_warn(rbd_dev, "unsupported stripe count "
4009 "(got %llu want 1)", stripe_count);
4010 return -EINVAL;
4011 }
500d0c0f
AE
4012 rbd_dev->header.stripe_unit = stripe_unit;
4013 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
4014
4015 return 0;
4016}
4017
9e15b77d
AE
4018static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4019{
4020 size_t image_id_size;
4021 char *image_id;
4022 void *p;
4023 void *end;
4024 size_t size;
4025 void *reply_buf = NULL;
4026 size_t len = 0;
4027 char *image_name = NULL;
4028 int ret;
4029
4030 rbd_assert(!rbd_dev->spec->image_name);
4031
69e7a02f
AE
4032 len = strlen(rbd_dev->spec->image_id);
4033 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4034 image_id = kmalloc(image_id_size, GFP_KERNEL);
4035 if (!image_id)
4036 return NULL;
4037
4038 p = image_id;
4157976b 4039 end = image_id + image_id_size;
57385b51 4040 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4041
4042 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4043 reply_buf = kmalloc(size, GFP_KERNEL);
4044 if (!reply_buf)
4045 goto out;
4046
36be9a76 4047 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
4048 "rbd", "dir_get_name",
4049 image_id, image_id_size,
e2a58ee5 4050 reply_buf, size);
9e15b77d
AE
4051 if (ret < 0)
4052 goto out;
4053 p = reply_buf;
f40eb349
AE
4054 end = reply_buf + ret;
4055
9e15b77d
AE
4056 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4057 if (IS_ERR(image_name))
4058 image_name = NULL;
4059 else
4060 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4061out:
4062 kfree(reply_buf);
4063 kfree(image_id);
4064
4065 return image_name;
4066}
4067
2ad3d716
AE
4068static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4069{
4070 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4071 const char *snap_name;
4072 u32 which = 0;
4073
4074 /* Skip over names until we find the one we are looking for */
4075
4076 snap_name = rbd_dev->header.snap_names;
4077 while (which < snapc->num_snaps) {
4078 if (!strcmp(name, snap_name))
4079 return snapc->snaps[which];
4080 snap_name += strlen(snap_name) + 1;
4081 which++;
4082 }
4083 return CEPH_NOSNAP;
4084}
4085
4086static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4087{
4088 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4089 u32 which;
4090 bool found = false;
4091 u64 snap_id;
4092
4093 for (which = 0; !found && which < snapc->num_snaps; which++) {
4094 const char *snap_name;
4095
4096 snap_id = snapc->snaps[which];
4097 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
6fe77759
JD
4098 if (IS_ERR(snap_name)) {
4099 /* ignore no-longer existing snapshots */
4100 if (PTR_ERR(snap_name) == -ENOENT)
4101 continue;
4102 else
4103 break;
4104 }
2ad3d716
AE
4105 found = !strcmp(name, snap_name);
4106 kfree(snap_name);
4107 }
4108 return found ? snap_id : CEPH_NOSNAP;
4109}
4110
4111/*
4112 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4113 * no snapshot by that name is found, or if an error occurs.
4114 */
4115static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4116{
4117 if (rbd_dev->image_format == 1)
4118 return rbd_v1_snap_id_by_name(rbd_dev, name);
4119
4120 return rbd_v2_snap_id_by_name(rbd_dev, name);
4121}
4122
9e15b77d 4123/*
2e9f7f1c
AE
4124 * When an rbd image has a parent image, it is identified by the
4125 * pool, image, and snapshot ids (not names). This function fills
4126 * in the names for those ids. (It's OK if we can't figure out the
4127 * name for an image id, but the pool and snapshot ids should always
4128 * exist and have names.) All names in an rbd spec are dynamically
4129 * allocated.
e1d4213f
AE
4130 *
4131 * When an image being mapped (not a parent) is probed, we have the
4132 * pool name and pool id, image name and image id, and the snapshot
4133 * name. The only thing we're missing is the snapshot id.
9e15b77d 4134 */
2e9f7f1c 4135static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 4136{
2e9f7f1c
AE
4137 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4138 struct rbd_spec *spec = rbd_dev->spec;
4139 const char *pool_name;
4140 const char *image_name;
4141 const char *snap_name;
9e15b77d
AE
4142 int ret;
4143
e1d4213f
AE
4144 /*
4145 * An image being mapped will have the pool name (etc.), but
4146 * we need to look up the snapshot id.
4147 */
2e9f7f1c
AE
4148 if (spec->pool_name) {
4149 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4150 u64 snap_id;
e1d4213f 4151
2ad3d716
AE
4152 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4153 if (snap_id == CEPH_NOSNAP)
e1d4213f 4154 return -ENOENT;
2ad3d716 4155 spec->snap_id = snap_id;
e1d4213f 4156 } else {
2e9f7f1c 4157 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4158 }
4159
4160 return 0;
4161 }
9e15b77d 4162
2e9f7f1c 4163 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4164
2e9f7f1c
AE
4165 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4166 if (!pool_name) {
4167 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4168 return -EIO;
4169 }
2e9f7f1c
AE
4170 pool_name = kstrdup(pool_name, GFP_KERNEL);
4171 if (!pool_name)
9e15b77d
AE
4172 return -ENOMEM;
4173
4174 /* Fetch the image name; tolerate failure here */
4175
2e9f7f1c
AE
4176 image_name = rbd_dev_image_name(rbd_dev);
4177 if (!image_name)
06ecc6cb 4178 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4179
2e9f7f1c 4180 /* Look up the snapshot name, and make a copy */
9e15b77d 4181
2e9f7f1c 4182 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
08518f6f
JD
4183 if (IS_ERR(snap_name)) {
4184 ret = PTR_ERR(snap_name);
9e15b77d 4185 goto out_err;
2e9f7f1c
AE
4186 }
4187
4188 spec->pool_name = pool_name;
4189 spec->image_name = image_name;
4190 spec->snap_name = snap_name;
9e15b77d
AE
4191
4192 return 0;
4193out_err:
2e9f7f1c
AE
4194 kfree(image_name);
4195 kfree(pool_name);
9e15b77d
AE
4196
4197 return ret;
4198}
4199
cc4a38bd 4200static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4201{
4202 size_t size;
4203 int ret;
4204 void *reply_buf;
4205 void *p;
4206 void *end;
4207 u64 seq;
4208 u32 snap_count;
4209 struct ceph_snap_context *snapc;
4210 u32 i;
4211
4212 /*
4213 * We'll need room for the seq value (maximum snapshot id),
4214 * snapshot count, and array of that many snapshot ids.
4215 * For now we have a fixed upper limit on the number we're
4216 * prepared to receive.
4217 */
4218 size = sizeof (__le64) + sizeof (__le32) +
4219 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4220 reply_buf = kzalloc(size, GFP_KERNEL);
4221 if (!reply_buf)
4222 return -ENOMEM;
4223
36be9a76 4224 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4225 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4226 reply_buf, size);
36be9a76 4227 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4228 if (ret < 0)
4229 goto out;
4230
35d489f9 4231 p = reply_buf;
57385b51
AE
4232 end = reply_buf + ret;
4233 ret = -ERANGE;
35d489f9
AE
4234 ceph_decode_64_safe(&p, end, seq, out);
4235 ceph_decode_32_safe(&p, end, snap_count, out);
4236
4237 /*
4238 * Make sure the reported number of snapshot ids wouldn't go
4239 * beyond the end of our buffer. But before checking that,
4240 * make sure the computed size of the snapshot context we
4241 * allocate is representable in a size_t.
4242 */
4243 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4244 / sizeof (u64)) {
4245 ret = -EINVAL;
4246 goto out;
4247 }
4248 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4249 goto out;
468521c1 4250 ret = 0;
35d489f9 4251
812164f8 4252 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4253 if (!snapc) {
4254 ret = -ENOMEM;
4255 goto out;
4256 }
35d489f9 4257 snapc->seq = seq;
35d489f9
AE
4258 for (i = 0; i < snap_count; i++)
4259 snapc->snaps[i] = ceph_decode_64(&p);
4260
49ece554 4261 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4262 rbd_dev->header.snapc = snapc;
4263
4264 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4265 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4266out:
4267 kfree(reply_buf);
4268
57385b51 4269 return ret;
35d489f9
AE
4270}
4271
54cac61f
AE
4272static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4273 u64 snap_id)
b8b1e2db
AE
4274{
4275 size_t size;
4276 void *reply_buf;
54cac61f 4277 __le64 snapid;
b8b1e2db
AE
4278 int ret;
4279 void *p;
4280 void *end;
b8b1e2db
AE
4281 char *snap_name;
4282
4283 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4284 reply_buf = kmalloc(size, GFP_KERNEL);
4285 if (!reply_buf)
4286 return ERR_PTR(-ENOMEM);
4287
54cac61f 4288 snapid = cpu_to_le64(snap_id);
36be9a76 4289 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4290 "rbd", "get_snapshot_name",
54cac61f 4291 &snapid, sizeof (snapid),
e2a58ee5 4292 reply_buf, size);
36be9a76 4293 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4294 if (ret < 0) {
4295 snap_name = ERR_PTR(ret);
b8b1e2db 4296 goto out;
f40eb349 4297 }
b8b1e2db
AE
4298
4299 p = reply_buf;
f40eb349 4300 end = reply_buf + ret;
e5c35534 4301 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4302 if (IS_ERR(snap_name))
b8b1e2db 4303 goto out;
b8b1e2db 4304
f40eb349 4305 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4306 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4307out:
4308 kfree(reply_buf);
4309
f40eb349 4310 return snap_name;
b8b1e2db
AE
4311}
4312
2df3fac7 4313static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4314{
2df3fac7 4315 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4316 int ret;
117973fb
AE
4317
4318 down_write(&rbd_dev->header_rwsem);
4319
1617e40c
JD
4320 ret = rbd_dev_v2_image_size(rbd_dev);
4321 if (ret)
4322 goto out;
4323
2df3fac7
AE
4324 if (first_time) {
4325 ret = rbd_dev_v2_header_onetime(rbd_dev);
4326 if (ret)
4327 goto out;
4328 }
4329
642a2537
AE
4330 /*
4331 * If the image supports layering, get the parent info. We
4332 * need to probe the first time regardless. Thereafter we
4333 * only need to if there's a parent, to see if it has
4334 * disappeared due to the mapped image getting flattened.
4335 */
4336 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4337 (first_time || rbd_dev->parent_spec)) {
4338 bool warn;
4339
4340 ret = rbd_dev_v2_parent_info(rbd_dev);
4341 if (ret)
4342 goto out;
4343
4344 /*
4345 * Print a warning if this is the initial probe and
4346 * the image has a parent. Don't print it if the
4347 * image now being probed is itself a parent. We
4348 * can tell at this point because we won't know its
4349 * pool name yet (just its pool id).
4350 */
4351 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4352 if (first_time && warn)
4353 rbd_warn(rbd_dev, "WARNING: kernel layering "
4354 "is EXPERIMENTAL!");
4355 }
4356
29334ba4
AE
4357 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4358 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4359 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4360
cc4a38bd 4361 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4362 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4363out:
4364 up_write(&rbd_dev->header_rwsem);
4365
4366 return ret;
4367}
4368
dfc5606d
YS
4369static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4370{
dfc5606d 4371 struct device *dev;
cd789ab9 4372 int ret;
dfc5606d
YS
4373
4374 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4375
cd789ab9 4376 dev = &rbd_dev->dev;
dfc5606d
YS
4377 dev->bus = &rbd_bus_type;
4378 dev->type = &rbd_device_type;
4379 dev->parent = &rbd_root_dev;
200a6a8b 4380 dev->release = rbd_dev_device_release;
de71a297 4381 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4382 ret = device_register(dev);
dfc5606d 4383
dfc5606d 4384 mutex_unlock(&ctl_mutex);
cd789ab9 4385
dfc5606d 4386 return ret;
602adf40
YS
4387}
4388
dfc5606d
YS
4389static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4390{
4391 device_unregister(&rbd_dev->dev);
4392}
4393
e2839308 4394static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4395
4396/*
499afd5b
AE
4397 * Get a unique rbd identifier for the given new rbd_dev, and add
4398 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4399 */
e2839308 4400static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4401{
e2839308 4402 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4403
4404 spin_lock(&rbd_dev_list_lock);
4405 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4406 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4407 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4408 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4409}
b7f23c36 4410
1ddbe94e 4411/*
499afd5b
AE
4412 * Remove an rbd_dev from the global list, and record that its
4413 * identifier is no longer in use.
1ddbe94e 4414 */
e2839308 4415static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4416{
d184f6bf 4417 struct list_head *tmp;
de71a297 4418 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4419 int max_id;
4420
aafb230e 4421 rbd_assert(rbd_id > 0);
499afd5b 4422
e2839308
AE
4423 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4424 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4425 spin_lock(&rbd_dev_list_lock);
4426 list_del_init(&rbd_dev->node);
d184f6bf
AE
4427
4428 /*
4429 * If the id being "put" is not the current maximum, there
4430 * is nothing special we need to do.
4431 */
e2839308 4432 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4433 spin_unlock(&rbd_dev_list_lock);
4434 return;
4435 }
4436
4437 /*
4438 * We need to update the current maximum id. Search the
4439 * list to find out what it is. We're more likely to find
4440 * the maximum at the end, so search the list backward.
4441 */
4442 max_id = 0;
4443 list_for_each_prev(tmp, &rbd_dev_list) {
4444 struct rbd_device *rbd_dev;
4445
4446 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4447 if (rbd_dev->dev_id > max_id)
4448 max_id = rbd_dev->dev_id;
d184f6bf 4449 }
499afd5b 4450 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4451
1ddbe94e 4452 /*
e2839308 4453 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4454 * which case it now accurately reflects the new maximum.
4455 * Be careful not to overwrite the maximum value in that
4456 * case.
1ddbe94e 4457 */
e2839308
AE
4458 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4459 dout(" max dev id has been reset\n");
b7f23c36
AE
4460}
4461
e28fff26
AE
4462/*
4463 * Skips over white space at *buf, and updates *buf to point to the
4464 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4465 * the token (string of non-white space characters) found. Note
4466 * that *buf must be terminated with '\0'.
e28fff26
AE
4467 */
4468static inline size_t next_token(const char **buf)
4469{
4470 /*
4471 * These are the characters that produce nonzero for
4472 * isspace() in the "C" and "POSIX" locales.
4473 */
4474 const char *spaces = " \f\n\r\t\v";
4475
4476 *buf += strspn(*buf, spaces); /* Find start of token */
4477
4478 return strcspn(*buf, spaces); /* Return token length */
4479}
4480
4481/*
4482 * Finds the next token in *buf, and if the provided token buffer is
4483 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4484 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4485 * must be terminated with '\0' on entry.
e28fff26
AE
4486 *
4487 * Returns the length of the token found (not including the '\0').
4488 * Return value will be 0 if no token is found, and it will be >=
4489 * token_size if the token would not fit.
4490 *
593a9e7b 4491 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4492 * found token. Note that this occurs even if the token buffer is
4493 * too small to hold it.
4494 */
4495static inline size_t copy_token(const char **buf,
4496 char *token,
4497 size_t token_size)
4498{
4499 size_t len;
4500
4501 len = next_token(buf);
4502 if (len < token_size) {
4503 memcpy(token, *buf, len);
4504 *(token + len) = '\0';
4505 }
4506 *buf += len;
4507
4508 return len;
4509}
4510
ea3352f4
AE
4511/*
4512 * Finds the next token in *buf, dynamically allocates a buffer big
4513 * enough to hold a copy of it, and copies the token into the new
4514 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4515 * that a duplicate buffer is created even for a zero-length token.
4516 *
4517 * Returns a pointer to the newly-allocated duplicate, or a null
4518 * pointer if memory for the duplicate was not available. If
4519 * the lenp argument is a non-null pointer, the length of the token
4520 * (not including the '\0') is returned in *lenp.
4521 *
4522 * If successful, the *buf pointer will be updated to point beyond
4523 * the end of the found token.
4524 *
4525 * Note: uses GFP_KERNEL for allocation.
4526 */
4527static inline char *dup_token(const char **buf, size_t *lenp)
4528{
4529 char *dup;
4530 size_t len;
4531
4532 len = next_token(buf);
4caf35f9 4533 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4534 if (!dup)
4535 return NULL;
ea3352f4
AE
4536 *(dup + len) = '\0';
4537 *buf += len;
4538
4539 if (lenp)
4540 *lenp = len;
4541
4542 return dup;
4543}
4544
a725f65e 4545/*
859c31df
AE
4546 * Parse the options provided for an "rbd add" (i.e., rbd image
4547 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4548 * and the data written is passed here via a NUL-terminated buffer.
4549 * Returns 0 if successful or an error code otherwise.
d22f76e7 4550 *
859c31df
AE
4551 * The information extracted from these options is recorded in
4552 * the other parameters which return dynamically-allocated
4553 * structures:
4554 * ceph_opts
4555 * The address of a pointer that will refer to a ceph options
4556 * structure. Caller must release the returned pointer using
4557 * ceph_destroy_options() when it is no longer needed.
4558 * rbd_opts
4559 * Address of an rbd options pointer. Fully initialized by
4560 * this function; caller must release with kfree().
4561 * spec
4562 * Address of an rbd image specification pointer. Fully
4563 * initialized by this function based on parsed options.
4564 * Caller must release with rbd_spec_put().
4565 *
4566 * The options passed take this form:
4567 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4568 * where:
4569 * <mon_addrs>
4570 * A comma-separated list of one or more monitor addresses.
4571 * A monitor address is an ip address, optionally followed
4572 * by a port number (separated by a colon).
4573 * I.e.: ip1[:port1][,ip2[:port2]...]
4574 * <options>
4575 * A comma-separated list of ceph and/or rbd options.
4576 * <pool_name>
4577 * The name of the rados pool containing the rbd image.
4578 * <image_name>
4579 * The name of the image in that pool to map.
4580 * <snap_id>
4581 * An optional snapshot id. If provided, the mapping will
4582 * present data from the image at the time that snapshot was
4583 * created. The image head is used if no snapshot id is
4584 * provided. Snapshot mappings are always read-only.
a725f65e 4585 */
859c31df 4586static int rbd_add_parse_args(const char *buf,
dc79b113 4587 struct ceph_options **ceph_opts,
859c31df
AE
4588 struct rbd_options **opts,
4589 struct rbd_spec **rbd_spec)
e28fff26 4590{
d22f76e7 4591 size_t len;
859c31df 4592 char *options;
0ddebc0c 4593 const char *mon_addrs;
ecb4dc22 4594 char *snap_name;
0ddebc0c 4595 size_t mon_addrs_size;
859c31df 4596 struct rbd_spec *spec = NULL;
4e9afeba 4597 struct rbd_options *rbd_opts = NULL;
859c31df 4598 struct ceph_options *copts;
dc79b113 4599 int ret;
e28fff26
AE
4600
4601 /* The first four tokens are required */
4602
7ef3214a 4603 len = next_token(&buf);
4fb5d671
AE
4604 if (!len) {
4605 rbd_warn(NULL, "no monitor address(es) provided");
4606 return -EINVAL;
4607 }
0ddebc0c 4608 mon_addrs = buf;
f28e565a 4609 mon_addrs_size = len + 1;
7ef3214a 4610 buf += len;
a725f65e 4611
dc79b113 4612 ret = -EINVAL;
f28e565a
AE
4613 options = dup_token(&buf, NULL);
4614 if (!options)
dc79b113 4615 return -ENOMEM;
4fb5d671
AE
4616 if (!*options) {
4617 rbd_warn(NULL, "no options provided");
4618 goto out_err;
4619 }
e28fff26 4620
859c31df
AE
4621 spec = rbd_spec_alloc();
4622 if (!spec)
f28e565a 4623 goto out_mem;
859c31df
AE
4624
4625 spec->pool_name = dup_token(&buf, NULL);
4626 if (!spec->pool_name)
4627 goto out_mem;
4fb5d671
AE
4628 if (!*spec->pool_name) {
4629 rbd_warn(NULL, "no pool name provided");
4630 goto out_err;
4631 }
e28fff26 4632
69e7a02f 4633 spec->image_name = dup_token(&buf, NULL);
859c31df 4634 if (!spec->image_name)
f28e565a 4635 goto out_mem;
4fb5d671
AE
4636 if (!*spec->image_name) {
4637 rbd_warn(NULL, "no image name provided");
4638 goto out_err;
4639 }
d4b125e9 4640
f28e565a
AE
4641 /*
4642 * Snapshot name is optional; default is to use "-"
4643 * (indicating the head/no snapshot).
4644 */
3feeb894 4645 len = next_token(&buf);
820a5f3e 4646 if (!len) {
3feeb894
AE
4647 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4648 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4649 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4650 ret = -ENAMETOOLONG;
f28e565a 4651 goto out_err;
849b4260 4652 }
ecb4dc22
AE
4653 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4654 if (!snap_name)
f28e565a 4655 goto out_mem;
ecb4dc22
AE
4656 *(snap_name + len) = '\0';
4657 spec->snap_name = snap_name;
e5c35534 4658
0ddebc0c 4659 /* Initialize all rbd options to the defaults */
e28fff26 4660
4e9afeba
AE
4661 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4662 if (!rbd_opts)
4663 goto out_mem;
4664
4665 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4666
859c31df 4667 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4668 mon_addrs + mon_addrs_size - 1,
4e9afeba 4669 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4670 if (IS_ERR(copts)) {
4671 ret = PTR_ERR(copts);
dc79b113
AE
4672 goto out_err;
4673 }
859c31df
AE
4674 kfree(options);
4675
4676 *ceph_opts = copts;
4e9afeba 4677 *opts = rbd_opts;
859c31df 4678 *rbd_spec = spec;
0ddebc0c 4679
dc79b113 4680 return 0;
f28e565a 4681out_mem:
dc79b113 4682 ret = -ENOMEM;
d22f76e7 4683out_err:
859c31df
AE
4684 kfree(rbd_opts);
4685 rbd_spec_put(spec);
f28e565a 4686 kfree(options);
d22f76e7 4687
dc79b113 4688 return ret;
a725f65e
AE
4689}
4690
589d30e0
AE
4691/*
4692 * An rbd format 2 image has a unique identifier, distinct from the
4693 * name given to it by the user. Internally, that identifier is
4694 * what's used to specify the names of objects related to the image.
4695 *
4696 * A special "rbd id" object is used to map an rbd image name to its
4697 * id. If that object doesn't exist, then there is no v2 rbd image
4698 * with the supplied name.
4699 *
4700 * This function will record the given rbd_dev's image_id field if
4701 * it can be determined, and in that case will return 0. If any
4702 * errors occur a negative errno will be returned and the rbd_dev's
4703 * image_id field will be unchanged (and should be NULL).
4704 */
4705static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4706{
4707 int ret;
4708 size_t size;
4709 char *object_name;
4710 void *response;
c0fba368 4711 char *image_id;
2f82ee54 4712
2c0d0a10
AE
4713 /*
4714 * When probing a parent image, the image id is already
4715 * known (and the image name likely is not). There's no
c0fba368
AE
4716 * need to fetch the image id again in this case. We
4717 * do still need to set the image format though.
2c0d0a10 4718 */
c0fba368
AE
4719 if (rbd_dev->spec->image_id) {
4720 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4721
2c0d0a10 4722 return 0;
c0fba368 4723 }
2c0d0a10 4724
589d30e0
AE
4725 /*
4726 * First, see if the format 2 image id file exists, and if
4727 * so, get the image's persistent id from it.
4728 */
69e7a02f 4729 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4730 object_name = kmalloc(size, GFP_NOIO);
4731 if (!object_name)
4732 return -ENOMEM;
0d7dbfce 4733 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4734 dout("rbd id object name is %s\n", object_name);
4735
4736 /* Response will be an encoded string, which includes a length */
4737
4738 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4739 response = kzalloc(size, GFP_NOIO);
4740 if (!response) {
4741 ret = -ENOMEM;
4742 goto out;
4743 }
4744
c0fba368
AE
4745 /* If it doesn't exist we'll assume it's a format 1 image */
4746
36be9a76 4747 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4748 "rbd", "get_id", NULL, 0,
e2a58ee5 4749 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4750 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4751 if (ret == -ENOENT) {
4752 image_id = kstrdup("", GFP_KERNEL);
4753 ret = image_id ? 0 : -ENOMEM;
4754 if (!ret)
4755 rbd_dev->image_format = 1;
4756 } else if (ret > sizeof (__le32)) {
4757 void *p = response;
4758
4759 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4760 NULL, GFP_NOIO);
c0fba368
AE
4761 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4762 if (!ret)
4763 rbd_dev->image_format = 2;
589d30e0 4764 } else {
c0fba368
AE
4765 ret = -EINVAL;
4766 }
4767
4768 if (!ret) {
4769 rbd_dev->spec->image_id = image_id;
4770 dout("image_id is %s\n", image_id);
589d30e0
AE
4771 }
4772out:
4773 kfree(response);
4774 kfree(object_name);
4775
4776 return ret;
4777}
4778
3abef3b3
AE
4779/*
4780 * Undo whatever state changes are made by v1 or v2 header info
4781 * call.
4782 */
6fd48b3b
AE
4783static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4784{
4785 struct rbd_image_header *header;
4786
392a9dad
AE
4787 /* Drop parent reference unless it's already been done (or none) */
4788
4789 if (rbd_dev->parent_overlap)
4790 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4791
4792 /* Free dynamic fields from the header, then zero it out */
4793
4794 header = &rbd_dev->header;
812164f8 4795 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4796 kfree(header->snap_sizes);
4797 kfree(header->snap_names);
4798 kfree(header->object_prefix);
4799 memset(header, 0, sizeof (*header));
4800}
4801
2df3fac7 4802static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
4803{
4804 int ret;
a30b71b9 4805
1e130199 4806 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4807 if (ret)
b1b5402a
AE
4808 goto out_err;
4809
2df3fac7
AE
4810 /*
4811 * Get the and check features for the image. Currently the
4812 * features are assumed to never change.
4813 */
b1b5402a 4814 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4815 if (ret)
9d475de5 4816 goto out_err;
35d489f9 4817
cc070d59
AE
4818 /* If the image supports fancy striping, get its parameters */
4819
4820 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4821 ret = rbd_dev_v2_striping_info(rbd_dev);
4822 if (ret < 0)
4823 goto out_err;
4824 }
2df3fac7 4825 /* No support for crypto and compression type format 2 images */
a30b71b9 4826
35152979 4827 return 0;
9d475de5 4828out_err:
642a2537 4829 rbd_dev->header.features = 0;
1e130199
AE
4830 kfree(rbd_dev->header.object_prefix);
4831 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4832
4833 return ret;
a30b71b9
AE
4834}
4835
5ca84dac
ID
4836/*
4837 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
4838 * rbd_dev_image_probe() recursion depth, which means it's also the
4839 * length of the already discovered part of the parent chain.
4840 */
4841static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 4842{
2f82ee54 4843 struct rbd_device *parent = NULL;
124afba2
AE
4844 int ret;
4845
4846 if (!rbd_dev->parent_spec)
4847 return 0;
124afba2 4848
5ca84dac
ID
4849 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
4850 pr_info("parent chain is too long (%d)\n", depth);
4851 ret = -EINVAL;
4852 goto out_err;
4853 }
4854
110dd0f8
ID
4855 parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
4856 if (!parent) {
4857 ret = -ENOMEM;
124afba2 4858 goto out_err;
110dd0f8
ID
4859 }
4860
4861 /*
4862 * Images related by parent/child relationships always share
4863 * rbd_client and spec/parent_spec, so bump their refcounts.
4864 */
4865 __rbd_get_client(rbd_dev->rbd_client);
4866 rbd_spec_get(rbd_dev->parent_spec);
124afba2 4867
5ca84dac 4868 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
4869 if (ret < 0)
4870 goto out_err;
110dd0f8 4871
124afba2 4872 rbd_dev->parent = parent;
a2acd00e 4873 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 4874 return 0;
110dd0f8 4875
124afba2 4876out_err:
110dd0f8
ID
4877 rbd_dev_unparent(rbd_dev);
4878 if (parent)
124afba2 4879 rbd_dev_destroy(parent);
124afba2
AE
4880 return ret;
4881}
4882
200a6a8b 4883static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4884{
83a06263 4885 int ret;
d1cf5788 4886
83a06263
AE
4887 /* generate unique id: find highest unique id, add one */
4888 rbd_dev_id_get(rbd_dev);
4889
4890 /* Fill in the device name, now that we have its id. */
4891 BUILD_BUG_ON(DEV_NAME_LEN
4892 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4893 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4894
4895 /* Get our block major device number. */
4896
4897 ret = register_blkdev(0, rbd_dev->name);
4898 if (ret < 0)
4899 goto err_out_id;
4900 rbd_dev->major = ret;
4901
4902 /* Set up the blkdev mapping. */
4903
4904 ret = rbd_init_disk(rbd_dev);
4905 if (ret)
4906 goto err_out_blkdev;
4907
f35a4dee 4908 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4909 if (ret)
4910 goto err_out_disk;
f35a4dee
AE
4911 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4912
4913 ret = rbd_bus_add_dev(rbd_dev);
4914 if (ret)
4915 goto err_out_mapping;
83a06263 4916
83a06263
AE
4917 /* Everything's ready. Announce the disk to the world. */
4918
129b79d4 4919 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4920 add_disk(rbd_dev->disk);
4921
4922 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4923 (unsigned long long) rbd_dev->mapping.size);
4924
4925 return ret;
2f82ee54 4926
f35a4dee
AE
4927err_out_mapping:
4928 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4929err_out_disk:
4930 rbd_free_disk(rbd_dev);
4931err_out_blkdev:
4932 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4933err_out_id:
4934 rbd_dev_id_put(rbd_dev);
d1cf5788 4935 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4936
4937 return ret;
4938}
4939
332bb12d
AE
4940static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4941{
4942 struct rbd_spec *spec = rbd_dev->spec;
4943 size_t size;
4944
4945 /* Record the header object name for this rbd image. */
4946
4947 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4948
4949 if (rbd_dev->image_format == 1)
4950 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4951 else
4952 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4953
4954 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4955 if (!rbd_dev->header_name)
4956 return -ENOMEM;
4957
4958 if (rbd_dev->image_format == 1)
4959 sprintf(rbd_dev->header_name, "%s%s",
4960 spec->image_name, RBD_SUFFIX);
4961 else
4962 sprintf(rbd_dev->header_name, "%s%s",
4963 RBD_HEADER_PREFIX, spec->image_id);
4964 return 0;
4965}
4966
200a6a8b
AE
4967static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4968{
6fd48b3b 4969 rbd_dev_unprobe(rbd_dev);
200a6a8b 4970 kfree(rbd_dev->header_name);
6fd48b3b
AE
4971 rbd_dev->header_name = NULL;
4972 rbd_dev->image_format = 0;
4973 kfree(rbd_dev->spec->image_id);
4974 rbd_dev->spec->image_id = NULL;
4975
200a6a8b
AE
4976 rbd_dev_destroy(rbd_dev);
4977}
4978
a30b71b9
AE
4979/*
4980 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4981 * device. If this image is the one being mapped (i.e., not a
4982 * parent), initiate a watch on its header object before using that
4983 * object to get detailed information about the rbd image.
a30b71b9 4984 */
5ca84dac 4985static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
4986{
4987 int ret;
b644de2b 4988 int tmp;
a30b71b9
AE
4989
4990 /*
3abef3b3
AE
4991 * Get the id from the image id object. Unless there's an
4992 * error, rbd_dev->spec->image_id will be filled in with
4993 * a dynamically-allocated string, and rbd_dev->image_format
4994 * will be set to either 1 or 2.
a30b71b9
AE
4995 */
4996 ret = rbd_dev_image_id(rbd_dev);
4997 if (ret)
c0fba368
AE
4998 return ret;
4999 rbd_assert(rbd_dev->spec->image_id);
5000 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5001
332bb12d
AE
5002 ret = rbd_dev_header_name(rbd_dev);
5003 if (ret)
5004 goto err_out_format;
5005
5ca84dac 5006 if (!depth) {
1f3ef788
AE
5007 ret = rbd_dev_header_watch_sync(rbd_dev, true);
5008 if (ret)
5009 goto out_header_name;
5010 }
b644de2b 5011
c0fba368 5012 if (rbd_dev->image_format == 1)
99a41ebc 5013 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 5014 else
2df3fac7 5015 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 5016 if (ret)
b644de2b 5017 goto err_out_watch;
83a06263 5018
9bb81c9b
AE
5019 ret = rbd_dev_spec_update(rbd_dev);
5020 if (ret)
33dca39f 5021 goto err_out_probe;
9bb81c9b 5022
5ca84dac 5023 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
5024 if (ret)
5025 goto err_out_probe;
5026
5027 dout("discovered format %u image, header name is %s\n",
5028 rbd_dev->image_format, rbd_dev->header_name);
83a06263 5029
30d60ba2 5030 return 0;
6fd48b3b
AE
5031err_out_probe:
5032 rbd_dev_unprobe(rbd_dev);
b644de2b 5033err_out_watch:
5ca84dac 5034 if (!depth) {
1f3ef788
AE
5035 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
5036 if (tmp)
5037 rbd_warn(rbd_dev, "unable to tear down "
5038 "watch request (%d)\n", tmp);
5039 }
332bb12d
AE
5040out_header_name:
5041 kfree(rbd_dev->header_name);
5042 rbd_dev->header_name = NULL;
5043err_out_format:
5044 rbd_dev->image_format = 0;
5655c4d9
AE
5045 kfree(rbd_dev->spec->image_id);
5046 rbd_dev->spec->image_id = NULL;
5047
5048 dout("probe failed, returning %d\n", ret);
5049
a30b71b9
AE
5050 return ret;
5051}
5052
59c2be1e
YS
5053static ssize_t rbd_add(struct bus_type *bus,
5054 const char *buf,
5055 size_t count)
602adf40 5056{
cb8627c7 5057 struct rbd_device *rbd_dev = NULL;
dc79b113 5058 struct ceph_options *ceph_opts = NULL;
4e9afeba 5059 struct rbd_options *rbd_opts = NULL;
859c31df 5060 struct rbd_spec *spec = NULL;
9d3997fd 5061 struct rbd_client *rbdc;
27cc2594 5062 struct ceph_osd_client *osdc;
51344a38 5063 bool read_only;
27cc2594 5064 int rc = -ENOMEM;
602adf40
YS
5065
5066 if (!try_module_get(THIS_MODULE))
5067 return -ENODEV;
5068
602adf40 5069 /* parse add command */
859c31df 5070 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5071 if (rc < 0)
bd4ba655 5072 goto err_out_module;
51344a38
AE
5073 read_only = rbd_opts->read_only;
5074 kfree(rbd_opts);
5075 rbd_opts = NULL; /* done with this */
78cea76e 5076
9d3997fd
AE
5077 rbdc = rbd_get_client(ceph_opts);
5078 if (IS_ERR(rbdc)) {
5079 rc = PTR_ERR(rbdc);
0ddebc0c 5080 goto err_out_args;
9d3997fd 5081 }
602adf40 5082
602adf40 5083 /* pick the pool */
9d3997fd 5084 osdc = &rbdc->client->osdc;
859c31df 5085 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
5086 if (rc < 0)
5087 goto err_out_client;
c0cd10db 5088 spec->pool_id = (u64)rc;
859c31df 5089
0903e875
AE
5090 /* The ceph file layout needs to fit pool id in 32 bits */
5091
c0cd10db
AE
5092 if (spec->pool_id > (u64)U32_MAX) {
5093 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5094 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
5095 rc = -EIO;
5096 goto err_out_client;
5097 }
5098
c53d5893 5099 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
5100 if (!rbd_dev)
5101 goto err_out_client;
c53d5893
AE
5102 rbdc = NULL; /* rbd_dev now owns this */
5103 spec = NULL; /* rbd_dev now owns this */
602adf40 5104
5ca84dac 5105 rc = rbd_dev_image_probe(rbd_dev, 0);
a30b71b9 5106 if (rc < 0)
c53d5893 5107 goto err_out_rbd_dev;
05fd6f6f 5108
7ce4eef7
AE
5109 /* If we are mapping a snapshot it must be marked read-only */
5110
5111 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5112 read_only = true;
5113 rbd_dev->mapping.read_only = read_only;
5114
b536f69a 5115 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3
AE
5116 if (rc) {
5117 rbd_dev_image_release(rbd_dev);
5118 goto err_out_module;
5119 }
5120
5121 return count;
b536f69a 5122
c53d5893
AE
5123err_out_rbd_dev:
5124 rbd_dev_destroy(rbd_dev);
bd4ba655 5125err_out_client:
9d3997fd 5126 rbd_put_client(rbdc);
0ddebc0c 5127err_out_args:
859c31df 5128 rbd_spec_put(spec);
bd4ba655
AE
5129err_out_module:
5130 module_put(THIS_MODULE);
27cc2594 5131
602adf40 5132 dout("Error adding device %s\n", buf);
27cc2594 5133
c0cd10db 5134 return (ssize_t)rc;
602adf40
YS
5135}
5136
200a6a8b 5137static void rbd_dev_device_release(struct device *dev)
602adf40 5138{
593a9e7b 5139 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 5140
602adf40 5141 rbd_free_disk(rbd_dev);
200a6a8b 5142 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5143 rbd_dev_mapping_clear(rbd_dev);
602adf40 5144 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 5145 rbd_dev->major = 0;
e2839308 5146 rbd_dev_id_put(rbd_dev);
d1cf5788 5147 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5148}
5149
05a46afd
AE
5150static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5151{
ad945fc1 5152 while (rbd_dev->parent) {
05a46afd
AE
5153 struct rbd_device *first = rbd_dev;
5154 struct rbd_device *second = first->parent;
5155 struct rbd_device *third;
5156
5157 /*
5158 * Follow to the parent with no grandparent and
5159 * remove it.
5160 */
5161 while (second && (third = second->parent)) {
5162 first = second;
5163 second = third;
5164 }
ad945fc1 5165 rbd_assert(second);
8ad42cd0 5166 rbd_dev_image_release(second);
ad945fc1
AE
5167 first->parent = NULL;
5168 first->parent_overlap = 0;
5169
5170 rbd_assert(first->parent_spec);
05a46afd
AE
5171 rbd_spec_put(first->parent_spec);
5172 first->parent_spec = NULL;
05a46afd
AE
5173 }
5174}
5175
dfc5606d
YS
5176static ssize_t rbd_remove(struct bus_type *bus,
5177 const char *buf,
5178 size_t count)
602adf40
YS
5179{
5180 struct rbd_device *rbd_dev = NULL;
c4d00f5b
AE
5181 struct list_head *tmp;
5182 int dev_id;
602adf40 5183 unsigned long ul;
7aa73ee1 5184 bool already = false;
0d8189e1 5185 int ret;
602adf40 5186
0d8189e1
AE
5187 ret = strict_strtoul(buf, 10, &ul);
5188 if (ret)
5189 return ret;
602adf40
YS
5190
5191 /* convert to int; abort if we lost anything in the conversion */
c4d00f5b
AE
5192 dev_id = (int)ul;
5193 if (dev_id != ul)
602adf40
YS
5194 return -EINVAL;
5195
5196 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5197
c4d00f5b
AE
5198 ret = -ENOENT;
5199 spin_lock(&rbd_dev_list_lock);
5200 list_for_each(tmp, &rbd_dev_list) {
5201 rbd_dev = list_entry(tmp, struct rbd_device, node);
5202 if (rbd_dev->dev_id == dev_id) {
5203 ret = 0;
5204 break;
5205 }
42382b70 5206 }
c4d00f5b
AE
5207 if (!ret) {
5208 spin_lock_irq(&rbd_dev->lock);
5209 if (rbd_dev->open_count)
5210 ret = -EBUSY;
5211 else
7aa73ee1
AE
5212 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5213 &rbd_dev->flags);
c4d00f5b
AE
5214 spin_unlock_irq(&rbd_dev->lock);
5215 }
5216 spin_unlock(&rbd_dev_list_lock);
7aa73ee1 5217 if (ret < 0 || already)
42382b70 5218 goto done;
c4d00f5b 5219
1f3ef788
AE
5220 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5221 if (ret)
5222 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
282e0636
JD
5223
5224 /*
5225 * flush remaining watch callbacks - these must be complete
5226 * before the osd_client is shutdown
5227 */
5228 dout("%s: flushing notifies", __func__);
5229 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5b213542
JD
5230 /*
5231 * Don't free anything from rbd_dev->disk until after all
5232 * notifies are completely processed. Otherwise
5233 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5234 * in a potential use after free of rbd_dev->disk or rbd_dev.
5235 */
5236 rbd_bus_del_dev(rbd_dev);
8ad42cd0 5237 rbd_dev_image_release(rbd_dev);
79ab7558 5238 module_put(THIS_MODULE);
1f3ef788 5239 ret = count;
602adf40
YS
5240done:
5241 mutex_unlock(&ctl_mutex);
aafb230e 5242
602adf40
YS
5243 return ret;
5244}
5245
602adf40
YS
5246/*
5247 * create control files in sysfs
dfc5606d 5248 * /sys/bus/rbd/...
602adf40
YS
5249 */
5250static int rbd_sysfs_init(void)
5251{
dfc5606d 5252 int ret;
602adf40 5253
fed4c143 5254 ret = device_register(&rbd_root_dev);
21079786 5255 if (ret < 0)
dfc5606d 5256 return ret;
602adf40 5257
fed4c143
AE
5258 ret = bus_register(&rbd_bus_type);
5259 if (ret < 0)
5260 device_unregister(&rbd_root_dev);
602adf40 5261
602adf40
YS
5262 return ret;
5263}
5264
5265static void rbd_sysfs_cleanup(void)
5266{
dfc5606d 5267 bus_unregister(&rbd_bus_type);
fed4c143 5268 device_unregister(&rbd_root_dev);
602adf40
YS
5269}
5270
1c2a9dfe
AE
5271static int rbd_slab_init(void)
5272{
5273 rbd_assert(!rbd_img_request_cache);
5274 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5275 sizeof (struct rbd_img_request),
5276 __alignof__(struct rbd_img_request),
5277 0, NULL);
868311b1
AE
5278 if (!rbd_img_request_cache)
5279 return -ENOMEM;
5280
5281 rbd_assert(!rbd_obj_request_cache);
5282 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5283 sizeof (struct rbd_obj_request),
5284 __alignof__(struct rbd_obj_request),
5285 0, NULL);
78c2a44a
AE
5286 if (!rbd_obj_request_cache)
5287 goto out_err;
5288
5289 rbd_assert(!rbd_segment_name_cache);
5290 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5291 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5292 if (rbd_segment_name_cache)
1c2a9dfe 5293 return 0;
78c2a44a
AE
5294out_err:
5295 if (rbd_obj_request_cache) {
5296 kmem_cache_destroy(rbd_obj_request_cache);
5297 rbd_obj_request_cache = NULL;
5298 }
1c2a9dfe 5299
868311b1
AE
5300 kmem_cache_destroy(rbd_img_request_cache);
5301 rbd_img_request_cache = NULL;
5302
1c2a9dfe
AE
5303 return -ENOMEM;
5304}
5305
5306static void rbd_slab_exit(void)
5307{
78c2a44a
AE
5308 rbd_assert(rbd_segment_name_cache);
5309 kmem_cache_destroy(rbd_segment_name_cache);
5310 rbd_segment_name_cache = NULL;
5311
868311b1
AE
5312 rbd_assert(rbd_obj_request_cache);
5313 kmem_cache_destroy(rbd_obj_request_cache);
5314 rbd_obj_request_cache = NULL;
5315
1c2a9dfe
AE
5316 rbd_assert(rbd_img_request_cache);
5317 kmem_cache_destroy(rbd_img_request_cache);
5318 rbd_img_request_cache = NULL;
5319}
5320
cc344fa1 5321static int __init rbd_init(void)
602adf40
YS
5322{
5323 int rc;
5324
1e32d34c
AE
5325 if (!libceph_compatible(NULL)) {
5326 rbd_warn(NULL, "libceph incompatibility (quitting)");
5327
5328 return -EINVAL;
5329 }
1c2a9dfe 5330 rc = rbd_slab_init();
602adf40
YS
5331 if (rc)
5332 return rc;
1c2a9dfe
AE
5333 rc = rbd_sysfs_init();
5334 if (rc)
5335 rbd_slab_exit();
5336 else
5337 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5338
5339 return rc;
602adf40
YS
5340}
5341
cc344fa1 5342static void __exit rbd_exit(void)
602adf40
YS
5343{
5344 rbd_sysfs_cleanup();
1c2a9dfe 5345 rbd_slab_exit();
602adf40
YS
5346}
5347
5348module_init(rbd_init);
5349module_exit(rbd_exit);
5350
5351MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5352MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5353MODULE_DESCRIPTION("rados block device");
5354
5355/* following authorship retained from original osdblk.c */
5356MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5357
5358MODULE_LICENSE("GPL");