rbd: enforce parent overlap
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
f0f8cef5
AE
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
57
58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
59
d4b125e9
AE
60#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61#define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
35d489f9 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
65
66#define RBD_SNAP_HEAD_NAME "-"
67
9e15b77d
AE
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 70#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 71
1e130199 72#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 73
d889140c
AE
74/* Feature bits */
75
5cbf6f12
AE
76#define RBD_FEATURE_LAYERING (1<<0)
77#define RBD_FEATURE_STRIPINGV2 (1<<1)
78#define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
80
81/* Features supported by this (client software) implementation. */
82
5cbf6f12 83#define RBD_FEATURES_SUPPORTED (0)
d889140c 84
81a89793
AE
85/*
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
90 */
602adf40 91#define DEV_NAME_LEN 32
81a89793 92#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
93
94/*
95 * block device image metadata (in-memory version)
96 */
97struct rbd_image_header {
f84344f3 98 /* These four fields never change for a given rbd image */
849b4260 99 char *object_prefix;
34b13184 100 u64 features;
602adf40
YS
101 __u8 obj_order;
102 __u8 crypt_type;
103 __u8 comp_type;
602adf40 104
f84344f3
AE
105 /* The remaining fields need to be updated occasionally */
106 u64 image_size;
107 struct ceph_snap_context *snapc;
602adf40
YS
108 char *snap_names;
109 u64 *snap_sizes;
59c2be1e
YS
110
111 u64 obj_version;
112};
113
0d7dbfce
AE
114/*
115 * An rbd image specification.
116 *
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
120 *
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
125 *
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
131 *
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
135 *
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
0d7dbfce
AE
138 */
139struct rbd_spec {
140 u64 pool_id;
141 char *pool_name;
142
143 char *image_id;
0d7dbfce 144 char *image_name;
0d7dbfce
AE
145
146 u64 snap_id;
147 char *snap_name;
148
149 struct kref kref;
150};
151
602adf40 152/*
f0f8cef5 153 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
154 */
155struct rbd_client {
156 struct ceph_client *client;
157 struct kref kref;
158 struct list_head node;
159};
160
bf0d5f50
AE
161struct rbd_img_request;
162typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
165
166struct rbd_obj_request;
167typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
9969ebc5
AE
169enum obj_request_type {
170 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171};
bf0d5f50 172
926f9b3f
AE
173enum obj_req_flags {
174 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 175 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
176 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
177 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
178};
179
bf0d5f50
AE
180struct rbd_obj_request {
181 const char *object_name;
182 u64 offset; /* object start byte */
183 u64 length; /* bytes from offset */
926f9b3f 184 unsigned long flags;
bf0d5f50 185
c5b5ef6c
AE
186 /*
187 * An object request associated with an image will have its
188 * img_data flag set; a standalone object request will not.
189 *
190 * A standalone object request will have which == BAD_WHICH
191 * and a null obj_request pointer.
192 *
193 * An object request initiated in support of a layered image
194 * object (to check for its existence before a write) will
195 * have which == BAD_WHICH and a non-null obj_request pointer.
196 *
197 * Finally, an object request for rbd image data will have
198 * which != BAD_WHICH, and will have a non-null img_request
199 * pointer. The value of which will be in the range
200 * 0..(img_request->obj_request_count-1).
201 */
202 union {
203 struct rbd_obj_request *obj_request; /* STAT op */
204 struct {
205 struct rbd_img_request *img_request;
206 u64 img_offset;
207 /* links for img_request->obj_requests list */
208 struct list_head links;
209 };
210 };
bf0d5f50
AE
211 u32 which; /* posn image request list */
212
213 enum obj_request_type type;
788e2df3
AE
214 union {
215 struct bio *bio_list;
216 struct {
217 struct page **pages;
218 u32 page_count;
219 };
220 };
0eefd470 221 struct page **copyup_pages;
bf0d5f50
AE
222
223 struct ceph_osd_request *osd_req;
224
225 u64 xferred; /* bytes transferred */
226 u64 version;
1b83bef2 227 int result;
bf0d5f50
AE
228
229 rbd_obj_callback_t callback;
788e2df3 230 struct completion completion;
bf0d5f50
AE
231
232 struct kref kref;
233};
234
0c425248 235enum img_req_flags {
9849e986
AE
236 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
237 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 238 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
239};
240
bf0d5f50 241struct rbd_img_request {
bf0d5f50
AE
242 struct rbd_device *rbd_dev;
243 u64 offset; /* starting image byte offset */
244 u64 length; /* byte count from offset */
0c425248 245 unsigned long flags;
bf0d5f50 246 union {
9849e986 247 u64 snap_id; /* for reads */
bf0d5f50 248 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
249 };
250 union {
251 struct request *rq; /* block request */
252 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 253 };
3d7efd18 254 struct page **copyup_pages;
bf0d5f50
AE
255 spinlock_t completion_lock;/* protects next_completion */
256 u32 next_completion;
257 rbd_img_callback_t callback;
55f27e09 258 u64 xferred;/* aggregate bytes transferred */
a5a337d4 259 int result; /* first nonzero obj_request result */
bf0d5f50
AE
260
261 u32 obj_request_count;
262 struct list_head obj_requests; /* rbd_obj_request structs */
263
264 struct kref kref;
265};
266
267#define for_each_obj_request(ireq, oreq) \
ef06f4d3 268 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 269#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 270 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 271#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 272 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 273
dfc5606d
YS
274struct rbd_snap {
275 struct device dev;
276 const char *name;
3591538f 277 u64 size;
dfc5606d
YS
278 struct list_head node;
279 u64 id;
34b13184 280 u64 features;
dfc5606d
YS
281};
282
f84344f3 283struct rbd_mapping {
99c1f08f 284 u64 size;
34b13184 285 u64 features;
f84344f3
AE
286 bool read_only;
287};
288
602adf40
YS
289/*
290 * a single device
291 */
292struct rbd_device {
de71a297 293 int dev_id; /* blkdev unique id */
602adf40
YS
294
295 int major; /* blkdev assigned major */
296 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 297
a30b71b9 298 u32 image_format; /* Either 1 or 2 */
602adf40
YS
299 struct rbd_client *rbd_client;
300
301 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
302
b82d167b 303 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
304
305 struct rbd_image_header header;
b82d167b 306 unsigned long flags; /* possibly lock protected */
0d7dbfce 307 struct rbd_spec *spec;
602adf40 308
0d7dbfce 309 char *header_name;
971f839a 310
0903e875
AE
311 struct ceph_file_layout layout;
312
59c2be1e 313 struct ceph_osd_event *watch_event;
975241af 314 struct rbd_obj_request *watch_request;
59c2be1e 315
86b00e0d
AE
316 struct rbd_spec *parent_spec;
317 u64 parent_overlap;
2f82ee54 318 struct rbd_device *parent;
86b00e0d 319
c666601a
JD
320 /* protects updating the header */
321 struct rw_semaphore header_rwsem;
f84344f3
AE
322
323 struct rbd_mapping mapping;
602adf40
YS
324
325 struct list_head node;
dfc5606d
YS
326
327 /* list of snapshots */
328 struct list_head snaps;
329
330 /* sysfs related */
331 struct device dev;
b82d167b 332 unsigned long open_count; /* protected by lock */
dfc5606d
YS
333};
334
b82d167b
AE
335/*
336 * Flag bits for rbd_dev->flags. If atomicity is required,
337 * rbd_dev->lock is used to protect access.
338 *
339 * Currently, only the "removing" flag (which is coupled with the
340 * "open_count" field) requires atomic access.
341 */
6d292906
AE
342enum rbd_dev_flags {
343 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 344 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
345};
346
602adf40 347static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 348
602adf40 349static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
350static DEFINE_SPINLOCK(rbd_dev_list_lock);
351
432b8587
AE
352static LIST_HEAD(rbd_client_list); /* clients */
353static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 354
3d7efd18
AE
355static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
304f6808
AE
357static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
358static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
359
dfc5606d 360static void rbd_dev_release(struct device *dev);
41f38c2b 361static void rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 362
f0f8cef5
AE
363static ssize_t rbd_add(struct bus_type *bus, const char *buf,
364 size_t count);
365static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
366 size_t count);
2f82ee54 367static int rbd_dev_probe(struct rbd_device *rbd_dev);
f0f8cef5
AE
368
369static struct bus_attribute rbd_bus_attrs[] = {
370 __ATTR(add, S_IWUSR, NULL, rbd_add),
371 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
372 __ATTR_NULL
373};
374
375static struct bus_type rbd_bus_type = {
376 .name = "rbd",
377 .bus_attrs = rbd_bus_attrs,
378};
379
380static void rbd_root_dev_release(struct device *dev)
381{
382}
383
384static struct device rbd_root_dev = {
385 .init_name = "rbd",
386 .release = rbd_root_dev_release,
387};
388
06ecc6cb
AE
389static __printf(2, 3)
390void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
391{
392 struct va_format vaf;
393 va_list args;
394
395 va_start(args, fmt);
396 vaf.fmt = fmt;
397 vaf.va = &args;
398
399 if (!rbd_dev)
400 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
401 else if (rbd_dev->disk)
402 printk(KERN_WARNING "%s: %s: %pV\n",
403 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
404 else if (rbd_dev->spec && rbd_dev->spec->image_name)
405 printk(KERN_WARNING "%s: image %s: %pV\n",
406 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
407 else if (rbd_dev->spec && rbd_dev->spec->image_id)
408 printk(KERN_WARNING "%s: id %s: %pV\n",
409 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
410 else /* punt */
411 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
412 RBD_DRV_NAME, rbd_dev, &vaf);
413 va_end(args);
414}
415
aafb230e
AE
416#ifdef RBD_DEBUG
417#define rbd_assert(expr) \
418 if (unlikely(!(expr))) { \
419 printk(KERN_ERR "\nAssertion failure in %s() " \
420 "at line %d:\n\n" \
421 "\trbd_assert(%s);\n\n", \
422 __func__, __LINE__, #expr); \
423 BUG(); \
424 }
425#else /* !RBD_DEBUG */
426# define rbd_assert(expr) ((void) 0)
427#endif /* !RBD_DEBUG */
dfc5606d 428
8b3e1a56 429static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
b454e36d 430static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
8b3e1a56 431
117973fb
AE
432static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
433static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 434
602adf40
YS
435static int rbd_open(struct block_device *bdev, fmode_t mode)
436{
f0f8cef5 437 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 438 bool removing = false;
602adf40 439
f84344f3 440 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
441 return -EROFS;
442
a14ea269 443 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
444 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
445 removing = true;
446 else
447 rbd_dev->open_count++;
a14ea269 448 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
449 if (removing)
450 return -ENOENT;
451
42382b70 452 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 453 (void) get_device(&rbd_dev->dev);
f84344f3 454 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 455 mutex_unlock(&ctl_mutex);
340c7a2b 456
602adf40
YS
457 return 0;
458}
459
dfc5606d
YS
460static int rbd_release(struct gendisk *disk, fmode_t mode)
461{
462 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
463 unsigned long open_count_before;
464
a14ea269 465 spin_lock_irq(&rbd_dev->lock);
b82d167b 466 open_count_before = rbd_dev->open_count--;
a14ea269 467 spin_unlock_irq(&rbd_dev->lock);
b82d167b 468 rbd_assert(open_count_before > 0);
dfc5606d 469
42382b70 470 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 471 put_device(&rbd_dev->dev);
42382b70 472 mutex_unlock(&ctl_mutex);
dfc5606d
YS
473
474 return 0;
475}
476
602adf40
YS
477static const struct block_device_operations rbd_bd_ops = {
478 .owner = THIS_MODULE,
479 .open = rbd_open,
dfc5606d 480 .release = rbd_release,
602adf40
YS
481};
482
483/*
484 * Initialize an rbd client instance.
43ae4701 485 * We own *ceph_opts.
602adf40 486 */
f8c38929 487static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
488{
489 struct rbd_client *rbdc;
490 int ret = -ENOMEM;
491
37206ee5 492 dout("%s:\n", __func__);
602adf40
YS
493 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
494 if (!rbdc)
495 goto out_opt;
496
497 kref_init(&rbdc->kref);
498 INIT_LIST_HEAD(&rbdc->node);
499
bc534d86
AE
500 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
501
43ae4701 502 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 503 if (IS_ERR(rbdc->client))
bc534d86 504 goto out_mutex;
43ae4701 505 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
506
507 ret = ceph_open_session(rbdc->client);
508 if (ret < 0)
509 goto out_err;
510
432b8587 511 spin_lock(&rbd_client_list_lock);
602adf40 512 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 513 spin_unlock(&rbd_client_list_lock);
602adf40 514
bc534d86 515 mutex_unlock(&ctl_mutex);
37206ee5 516 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 517
602adf40
YS
518 return rbdc;
519
520out_err:
521 ceph_destroy_client(rbdc->client);
bc534d86
AE
522out_mutex:
523 mutex_unlock(&ctl_mutex);
602adf40
YS
524 kfree(rbdc);
525out_opt:
43ae4701
AE
526 if (ceph_opts)
527 ceph_destroy_options(ceph_opts);
37206ee5
AE
528 dout("%s: error %d\n", __func__, ret);
529
28f259b7 530 return ERR_PTR(ret);
602adf40
YS
531}
532
2f82ee54
AE
533static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
534{
535 kref_get(&rbdc->kref);
536
537 return rbdc;
538}
539
602adf40 540/*
1f7ba331
AE
541 * Find a ceph client with specific addr and configuration. If
542 * found, bump its reference count.
602adf40 543 */
1f7ba331 544static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
545{
546 struct rbd_client *client_node;
1f7ba331 547 bool found = false;
602adf40 548
43ae4701 549 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
550 return NULL;
551
1f7ba331
AE
552 spin_lock(&rbd_client_list_lock);
553 list_for_each_entry(client_node, &rbd_client_list, node) {
554 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
555 __rbd_get_client(client_node);
556
1f7ba331
AE
557 found = true;
558 break;
559 }
560 }
561 spin_unlock(&rbd_client_list_lock);
562
563 return found ? client_node : NULL;
602adf40
YS
564}
565
59c2be1e
YS
566/*
567 * mount options
568 */
569enum {
59c2be1e
YS
570 Opt_last_int,
571 /* int args above */
572 Opt_last_string,
573 /* string args above */
cc0538b6
AE
574 Opt_read_only,
575 Opt_read_write,
576 /* Boolean args above */
577 Opt_last_bool,
59c2be1e
YS
578};
579
43ae4701 580static match_table_t rbd_opts_tokens = {
59c2be1e
YS
581 /* int args above */
582 /* string args above */
be466c1c 583 {Opt_read_only, "read_only"},
cc0538b6
AE
584 {Opt_read_only, "ro"}, /* Alternate spelling */
585 {Opt_read_write, "read_write"},
586 {Opt_read_write, "rw"}, /* Alternate spelling */
587 /* Boolean args above */
59c2be1e
YS
588 {-1, NULL}
589};
590
98571b5a
AE
591struct rbd_options {
592 bool read_only;
593};
594
595#define RBD_READ_ONLY_DEFAULT false
596
59c2be1e
YS
597static int parse_rbd_opts_token(char *c, void *private)
598{
43ae4701 599 struct rbd_options *rbd_opts = private;
59c2be1e
YS
600 substring_t argstr[MAX_OPT_ARGS];
601 int token, intval, ret;
602
43ae4701 603 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
604 if (token < 0)
605 return -EINVAL;
606
607 if (token < Opt_last_int) {
608 ret = match_int(&argstr[0], &intval);
609 if (ret < 0) {
610 pr_err("bad mount option arg (not int) "
611 "at '%s'\n", c);
612 return ret;
613 }
614 dout("got int token %d val %d\n", token, intval);
615 } else if (token > Opt_last_int && token < Opt_last_string) {
616 dout("got string token %d val %s\n", token,
617 argstr[0].from);
cc0538b6
AE
618 } else if (token > Opt_last_string && token < Opt_last_bool) {
619 dout("got Boolean token %d\n", token);
59c2be1e
YS
620 } else {
621 dout("got token %d\n", token);
622 }
623
624 switch (token) {
cc0538b6
AE
625 case Opt_read_only:
626 rbd_opts->read_only = true;
627 break;
628 case Opt_read_write:
629 rbd_opts->read_only = false;
630 break;
59c2be1e 631 default:
aafb230e
AE
632 rbd_assert(false);
633 break;
59c2be1e
YS
634 }
635 return 0;
636}
637
602adf40
YS
638/*
639 * Get a ceph client with specific addr and configuration, if one does
640 * not exist create it.
641 */
9d3997fd 642static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 643{
f8c38929 644 struct rbd_client *rbdc;
59c2be1e 645
1f7ba331 646 rbdc = rbd_client_find(ceph_opts);
9d3997fd 647 if (rbdc) /* using an existing client */
43ae4701 648 ceph_destroy_options(ceph_opts);
9d3997fd 649 else
f8c38929 650 rbdc = rbd_client_create(ceph_opts);
602adf40 651
9d3997fd 652 return rbdc;
602adf40
YS
653}
654
655/*
656 * Destroy ceph client
d23a4b3f 657 *
432b8587 658 * Caller must hold rbd_client_list_lock.
602adf40
YS
659 */
660static void rbd_client_release(struct kref *kref)
661{
662 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
663
37206ee5 664 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 665 spin_lock(&rbd_client_list_lock);
602adf40 666 list_del(&rbdc->node);
cd9d9f5d 667 spin_unlock(&rbd_client_list_lock);
602adf40
YS
668
669 ceph_destroy_client(rbdc->client);
670 kfree(rbdc);
671}
672
673/*
674 * Drop reference to ceph client node. If it's not referenced anymore, release
675 * it.
676 */
9d3997fd 677static void rbd_put_client(struct rbd_client *rbdc)
602adf40 678{
c53d5893
AE
679 if (rbdc)
680 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
681}
682
a30b71b9
AE
683static bool rbd_image_format_valid(u32 image_format)
684{
685 return image_format == 1 || image_format == 2;
686}
687
8e94af8e
AE
688static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689{
103a150f
AE
690 size_t size;
691 u32 snap_count;
692
693 /* The header has to start with the magic rbd header text */
694 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
695 return false;
696
db2388b6
AE
697 /* The bio layer requires at least sector-sized I/O */
698
699 if (ondisk->options.order < SECTOR_SHIFT)
700 return false;
701
702 /* If we use u64 in a few spots we may be able to loosen this */
703
704 if (ondisk->options.order > 8 * sizeof (int) - 1)
705 return false;
706
103a150f
AE
707 /*
708 * The size of a snapshot header has to fit in a size_t, and
709 * that limits the number of snapshots.
710 */
711 snap_count = le32_to_cpu(ondisk->snap_count);
712 size = SIZE_MAX - sizeof (struct ceph_snap_context);
713 if (snap_count > size / sizeof (__le64))
714 return false;
715
716 /*
717 * Not only that, but the size of the entire the snapshot
718 * header must also be representable in a size_t.
719 */
720 size -= snap_count * sizeof (__le64);
721 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
722 return false;
723
724 return true;
8e94af8e
AE
725}
726
602adf40
YS
727/*
728 * Create a new header structure, translate header format from the on-disk
729 * header.
730 */
731static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 732 struct rbd_image_header_ondisk *ondisk)
602adf40 733{
ccece235 734 u32 snap_count;
58c17b0e 735 size_t len;
d2bb24e5 736 size_t size;
621901d6 737 u32 i;
602adf40 738
6a52325f
AE
739 memset(header, 0, sizeof (*header));
740
103a150f
AE
741 snap_count = le32_to_cpu(ondisk->snap_count);
742
58c17b0e
AE
743 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 745 if (!header->object_prefix)
602adf40 746 return -ENOMEM;
58c17b0e
AE
747 memcpy(header->object_prefix, ondisk->object_prefix, len);
748 header->object_prefix[len] = '\0';
00f1f36f 749
602adf40 750 if (snap_count) {
f785cc1d
AE
751 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
621901d6
AE
753 /* Save a copy of the snapshot names */
754
f785cc1d
AE
755 if (snap_names_len > (u64) SIZE_MAX)
756 return -EIO;
757 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 758 if (!header->snap_names)
6a52325f 759 goto out_err;
f785cc1d
AE
760 /*
761 * Note that rbd_dev_v1_header_read() guarantees
762 * the ondisk buffer we're working with has
763 * snap_names_len bytes beyond the end of the
764 * snapshot id array, this memcpy() is safe.
765 */
766 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767 snap_names_len);
6a52325f 768
621901d6
AE
769 /* Record each snapshot's size */
770
d2bb24e5
AE
771 size = snap_count * sizeof (*header->snap_sizes);
772 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 773 if (!header->snap_sizes)
6a52325f 774 goto out_err;
621901d6
AE
775 for (i = 0; i < snap_count; i++)
776 header->snap_sizes[i] =
777 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 778 } else {
ccece235 779 WARN_ON(ondisk->snap_names_len);
602adf40
YS
780 header->snap_names = NULL;
781 header->snap_sizes = NULL;
782 }
849b4260 783
34b13184 784 header->features = 0; /* No features support in v1 images */
602adf40
YS
785 header->obj_order = ondisk->options.order;
786 header->crypt_type = ondisk->options.crypt_type;
787 header->comp_type = ondisk->options.comp_type;
6a52325f 788
621901d6
AE
789 /* Allocate and fill in the snapshot context */
790
f84344f3 791 header->image_size = le64_to_cpu(ondisk->image_size);
6a52325f
AE
792 size = sizeof (struct ceph_snap_context);
793 size += snap_count * sizeof (header->snapc->snaps[0]);
794 header->snapc = kzalloc(size, GFP_KERNEL);
795 if (!header->snapc)
796 goto out_err;
602adf40
YS
797
798 atomic_set(&header->snapc->nref, 1);
505cbb9b 799 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 800 header->snapc->num_snaps = snap_count;
621901d6
AE
801 for (i = 0; i < snap_count; i++)
802 header->snapc->snaps[i] =
803 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
804
805 return 0;
806
6a52325f 807out_err:
849b4260 808 kfree(header->snap_sizes);
ccece235 809 header->snap_sizes = NULL;
602adf40 810 kfree(header->snap_names);
ccece235 811 header->snap_names = NULL;
6a52325f
AE
812 kfree(header->object_prefix);
813 header->object_prefix = NULL;
ccece235 814
00f1f36f 815 return -ENOMEM;
602adf40
YS
816}
817
9e15b77d
AE
818static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
819{
820 struct rbd_snap *snap;
821
822 if (snap_id == CEPH_NOSNAP)
823 return RBD_SNAP_HEAD_NAME;
824
825 list_for_each_entry(snap, &rbd_dev->snaps, node)
826 if (snap_id == snap->id)
827 return snap->name;
828
829 return NULL;
830}
831
8836b995 832static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
602adf40 833{
602adf40 834
e86924a8 835 struct rbd_snap *snap;
602adf40 836
e86924a8
AE
837 list_for_each_entry(snap, &rbd_dev->snaps, node) {
838 if (!strcmp(snap_name, snap->name)) {
0d7dbfce 839 rbd_dev->spec->snap_id = snap->id;
e86924a8 840 rbd_dev->mapping.size = snap->size;
34b13184 841 rbd_dev->mapping.features = snap->features;
602adf40 842
e86924a8 843 return 0;
00f1f36f 844 }
00f1f36f 845 }
e86924a8 846
00f1f36f 847 return -ENOENT;
602adf40
YS
848}
849
819d52bf 850static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
602adf40 851{
78dc447d 852 int ret;
602adf40 853
0d7dbfce 854 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 855 sizeof (RBD_SNAP_HEAD_NAME))) {
0d7dbfce 856 rbd_dev->spec->snap_id = CEPH_NOSNAP;
99c1f08f 857 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 858 rbd_dev->mapping.features = rbd_dev->header.features;
e86924a8 859 ret = 0;
602adf40 860 } else {
0d7dbfce 861 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
602adf40
YS
862 if (ret < 0)
863 goto done;
f84344f3 864 rbd_dev->mapping.read_only = true;
602adf40 865 }
6d292906
AE
866 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
867
602adf40 868done:
602adf40
YS
869 return ret;
870}
871
872static void rbd_header_free(struct rbd_image_header *header)
873{
849b4260 874 kfree(header->object_prefix);
d78fd7ae 875 header->object_prefix = NULL;
602adf40 876 kfree(header->snap_sizes);
d78fd7ae 877 header->snap_sizes = NULL;
849b4260 878 kfree(header->snap_names);
d78fd7ae 879 header->snap_names = NULL;
d1d25646 880 ceph_put_snap_context(header->snapc);
d78fd7ae 881 header->snapc = NULL;
602adf40
YS
882}
883
98571b5a 884static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 885{
65ccfe21
AE
886 char *name;
887 u64 segment;
888 int ret;
602adf40 889
2fd82b9e 890 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
891 if (!name)
892 return NULL;
893 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 894 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 895 rbd_dev->header.object_prefix, segment);
2fd82b9e 896 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
897 pr_err("error formatting segment name for #%llu (%d)\n",
898 segment, ret);
899 kfree(name);
900 name = NULL;
901 }
602adf40 902
65ccfe21
AE
903 return name;
904}
602adf40 905
65ccfe21
AE
906static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
907{
908 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 909
65ccfe21
AE
910 return offset & (segment_size - 1);
911}
912
913static u64 rbd_segment_length(struct rbd_device *rbd_dev,
914 u64 offset, u64 length)
915{
916 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
917
918 offset &= segment_size - 1;
919
aafb230e 920 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
921 if (offset + length > segment_size)
922 length = segment_size - offset;
923
924 return length;
602adf40
YS
925}
926
029bcbd8
JD
927/*
928 * returns the size of an object in the image
929 */
930static u64 rbd_obj_bytes(struct rbd_image_header *header)
931{
932 return 1 << header->obj_order;
933}
934
602adf40
YS
935/*
936 * bio helpers
937 */
938
939static void bio_chain_put(struct bio *chain)
940{
941 struct bio *tmp;
942
943 while (chain) {
944 tmp = chain;
945 chain = chain->bi_next;
946 bio_put(tmp);
947 }
948}
949
950/*
951 * zeros a bio chain, starting at specific offset
952 */
953static void zero_bio_chain(struct bio *chain, int start_ofs)
954{
955 struct bio_vec *bv;
956 unsigned long flags;
957 void *buf;
958 int i;
959 int pos = 0;
960
961 while (chain) {
962 bio_for_each_segment(bv, chain, i) {
963 if (pos + bv->bv_len > start_ofs) {
964 int remainder = max(start_ofs - pos, 0);
965 buf = bvec_kmap_irq(bv, &flags);
966 memset(buf + remainder, 0,
967 bv->bv_len - remainder);
85b5aaa6 968 bvec_kunmap_irq(buf, &flags);
602adf40
YS
969 }
970 pos += bv->bv_len;
971 }
972
973 chain = chain->bi_next;
974 }
975}
976
b9434c5b
AE
977/*
978 * similar to zero_bio_chain(), zeros data defined by a page array,
979 * starting at the given byte offset from the start of the array and
980 * continuing up to the given end offset. The pages array is
981 * assumed to be big enough to hold all bytes up to the end.
982 */
983static void zero_pages(struct page **pages, u64 offset, u64 end)
984{
985 struct page **page = &pages[offset >> PAGE_SHIFT];
986
987 rbd_assert(end > offset);
988 rbd_assert(end - offset <= (u64)SIZE_MAX);
989 while (offset < end) {
990 size_t page_offset;
991 size_t length;
992 unsigned long flags;
993 void *kaddr;
994
995 page_offset = (size_t)(offset & ~PAGE_MASK);
996 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
997 local_irq_save(flags);
998 kaddr = kmap_atomic(*page);
999 memset(kaddr + page_offset, 0, length);
1000 kunmap_atomic(kaddr);
1001 local_irq_restore(flags);
1002
1003 offset += length;
1004 page++;
1005 }
1006}
1007
602adf40 1008/*
f7760dad
AE
1009 * Clone a portion of a bio, starting at the given byte offset
1010 * and continuing for the number of bytes indicated.
602adf40 1011 */
f7760dad
AE
1012static struct bio *bio_clone_range(struct bio *bio_src,
1013 unsigned int offset,
1014 unsigned int len,
1015 gfp_t gfpmask)
602adf40 1016{
f7760dad
AE
1017 struct bio_vec *bv;
1018 unsigned int resid;
1019 unsigned short idx;
1020 unsigned int voff;
1021 unsigned short end_idx;
1022 unsigned short vcnt;
1023 struct bio *bio;
1024
1025 /* Handle the easy case for the caller */
1026
1027 if (!offset && len == bio_src->bi_size)
1028 return bio_clone(bio_src, gfpmask);
1029
1030 if (WARN_ON_ONCE(!len))
1031 return NULL;
1032 if (WARN_ON_ONCE(len > bio_src->bi_size))
1033 return NULL;
1034 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1035 return NULL;
1036
1037 /* Find first affected segment... */
1038
1039 resid = offset;
1040 __bio_for_each_segment(bv, bio_src, idx, 0) {
1041 if (resid < bv->bv_len)
1042 break;
1043 resid -= bv->bv_len;
602adf40 1044 }
f7760dad 1045 voff = resid;
602adf40 1046
f7760dad 1047 /* ...and the last affected segment */
602adf40 1048
f7760dad
AE
1049 resid += len;
1050 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1051 if (resid <= bv->bv_len)
1052 break;
1053 resid -= bv->bv_len;
1054 }
1055 vcnt = end_idx - idx + 1;
1056
1057 /* Build the clone */
1058
1059 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1060 if (!bio)
1061 return NULL; /* ENOMEM */
602adf40 1062
f7760dad
AE
1063 bio->bi_bdev = bio_src->bi_bdev;
1064 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1065 bio->bi_rw = bio_src->bi_rw;
1066 bio->bi_flags |= 1 << BIO_CLONED;
1067
1068 /*
1069 * Copy over our part of the bio_vec, then update the first
1070 * and last (or only) entries.
1071 */
1072 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1073 vcnt * sizeof (struct bio_vec));
1074 bio->bi_io_vec[0].bv_offset += voff;
1075 if (vcnt > 1) {
1076 bio->bi_io_vec[0].bv_len -= voff;
1077 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1078 } else {
1079 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1080 }
1081
f7760dad
AE
1082 bio->bi_vcnt = vcnt;
1083 bio->bi_size = len;
1084 bio->bi_idx = 0;
1085
1086 return bio;
1087}
1088
1089/*
1090 * Clone a portion of a bio chain, starting at the given byte offset
1091 * into the first bio in the source chain and continuing for the
1092 * number of bytes indicated. The result is another bio chain of
1093 * exactly the given length, or a null pointer on error.
1094 *
1095 * The bio_src and offset parameters are both in-out. On entry they
1096 * refer to the first source bio and the offset into that bio where
1097 * the start of data to be cloned is located.
1098 *
1099 * On return, bio_src is updated to refer to the bio in the source
1100 * chain that contains first un-cloned byte, and *offset will
1101 * contain the offset of that byte within that bio.
1102 */
1103static struct bio *bio_chain_clone_range(struct bio **bio_src,
1104 unsigned int *offset,
1105 unsigned int len,
1106 gfp_t gfpmask)
1107{
1108 struct bio *bi = *bio_src;
1109 unsigned int off = *offset;
1110 struct bio *chain = NULL;
1111 struct bio **end;
1112
1113 /* Build up a chain of clone bios up to the limit */
1114
1115 if (!bi || off >= bi->bi_size || !len)
1116 return NULL; /* Nothing to clone */
602adf40 1117
f7760dad
AE
1118 end = &chain;
1119 while (len) {
1120 unsigned int bi_size;
1121 struct bio *bio;
1122
f5400b7a
AE
1123 if (!bi) {
1124 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1125 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1126 }
f7760dad
AE
1127 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1128 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1129 if (!bio)
1130 goto out_err; /* ENOMEM */
1131
1132 *end = bio;
1133 end = &bio->bi_next;
602adf40 1134
f7760dad
AE
1135 off += bi_size;
1136 if (off == bi->bi_size) {
1137 bi = bi->bi_next;
1138 off = 0;
1139 }
1140 len -= bi_size;
1141 }
1142 *bio_src = bi;
1143 *offset = off;
1144
1145 return chain;
1146out_err:
1147 bio_chain_put(chain);
602adf40 1148
602adf40
YS
1149 return NULL;
1150}
1151
926f9b3f
AE
1152/*
1153 * The default/initial value for all object request flags is 0. For
1154 * each flag, once its value is set to 1 it is never reset to 0
1155 * again.
1156 */
57acbaa7 1157static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1158{
57acbaa7 1159 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1160 struct rbd_device *rbd_dev;
1161
57acbaa7
AE
1162 rbd_dev = obj_request->img_request->rbd_dev;
1163 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1164 obj_request);
1165 }
1166}
1167
57acbaa7 1168static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1169{
1170 smp_mb();
57acbaa7 1171 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1172}
1173
57acbaa7 1174static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1175{
57acbaa7
AE
1176 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1177 struct rbd_device *rbd_dev = NULL;
6365d33a 1178
57acbaa7
AE
1179 if (obj_request_img_data_test(obj_request))
1180 rbd_dev = obj_request->img_request->rbd_dev;
1181 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1182 obj_request);
1183 }
1184}
1185
57acbaa7 1186static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1187{
1188 smp_mb();
57acbaa7 1189 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1190}
1191
5679c59f
AE
1192/*
1193 * This sets the KNOWN flag after (possibly) setting the EXISTS
1194 * flag. The latter is set based on the "exists" value provided.
1195 *
1196 * Note that for our purposes once an object exists it never goes
1197 * away again. It's possible that the response from two existence
1198 * checks are separated by the creation of the target object, and
1199 * the first ("doesn't exist") response arrives *after* the second
1200 * ("does exist"). In that case we ignore the second one.
1201 */
1202static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1203 bool exists)
1204{
1205 if (exists)
1206 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1207 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1208 smp_mb();
1209}
1210
1211static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1212{
1213 smp_mb();
1214 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1215}
1216
1217static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1218{
1219 smp_mb();
1220 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1221}
1222
bf0d5f50
AE
1223static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1224{
37206ee5
AE
1225 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1226 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1227 kref_get(&obj_request->kref);
1228}
1229
1230static void rbd_obj_request_destroy(struct kref *kref);
1231static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1232{
1233 rbd_assert(obj_request != NULL);
37206ee5
AE
1234 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1235 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1236 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1237}
1238
1239static void rbd_img_request_get(struct rbd_img_request *img_request)
1240{
37206ee5
AE
1241 dout("%s: img %p (was %d)\n", __func__, img_request,
1242 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1243 kref_get(&img_request->kref);
1244}
1245
1246static void rbd_img_request_destroy(struct kref *kref);
1247static void rbd_img_request_put(struct rbd_img_request *img_request)
1248{
1249 rbd_assert(img_request != NULL);
37206ee5
AE
1250 dout("%s: img %p (was %d)\n", __func__, img_request,
1251 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1252 kref_put(&img_request->kref, rbd_img_request_destroy);
1253}
1254
1255static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1256 struct rbd_obj_request *obj_request)
1257{
25dcf954
AE
1258 rbd_assert(obj_request->img_request == NULL);
1259
b155e86c 1260 /* Image request now owns object's original reference */
bf0d5f50 1261 obj_request->img_request = img_request;
25dcf954 1262 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1263 rbd_assert(!obj_request_img_data_test(obj_request));
1264 obj_request_img_data_set(obj_request);
bf0d5f50 1265 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1266 img_request->obj_request_count++;
1267 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1268 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1269 obj_request->which);
bf0d5f50
AE
1270}
1271
1272static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1273 struct rbd_obj_request *obj_request)
1274{
1275 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1276
37206ee5
AE
1277 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1278 obj_request->which);
bf0d5f50 1279 list_del(&obj_request->links);
25dcf954
AE
1280 rbd_assert(img_request->obj_request_count > 0);
1281 img_request->obj_request_count--;
1282 rbd_assert(obj_request->which == img_request->obj_request_count);
1283 obj_request->which = BAD_WHICH;
6365d33a 1284 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1285 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1286 obj_request->img_request = NULL;
25dcf954 1287 obj_request->callback = NULL;
bf0d5f50
AE
1288 rbd_obj_request_put(obj_request);
1289}
1290
1291static bool obj_request_type_valid(enum obj_request_type type)
1292{
1293 switch (type) {
9969ebc5 1294 case OBJ_REQUEST_NODATA:
bf0d5f50 1295 case OBJ_REQUEST_BIO:
788e2df3 1296 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1297 return true;
1298 default:
1299 return false;
1300 }
1301}
1302
bf0d5f50
AE
1303static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1304 struct rbd_obj_request *obj_request)
1305{
37206ee5
AE
1306 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1307
bf0d5f50
AE
1308 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1309}
1310
1311static void rbd_img_request_complete(struct rbd_img_request *img_request)
1312{
55f27e09 1313
37206ee5 1314 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1315
1316 /*
1317 * If no error occurred, compute the aggregate transfer
1318 * count for the image request. We could instead use
1319 * atomic64_cmpxchg() to update it as each object request
1320 * completes; not clear which way is better off hand.
1321 */
1322 if (!img_request->result) {
1323 struct rbd_obj_request *obj_request;
1324 u64 xferred = 0;
1325
1326 for_each_obj_request(img_request, obj_request)
1327 xferred += obj_request->xferred;
1328 img_request->xferred = xferred;
1329 }
1330
bf0d5f50
AE
1331 if (img_request->callback)
1332 img_request->callback(img_request);
1333 else
1334 rbd_img_request_put(img_request);
1335}
1336
788e2df3
AE
1337/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1338
1339static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1340{
37206ee5
AE
1341 dout("%s: obj %p\n", __func__, obj_request);
1342
788e2df3
AE
1343 return wait_for_completion_interruptible(&obj_request->completion);
1344}
1345
0c425248
AE
1346/*
1347 * The default/initial value for all image request flags is 0. Each
1348 * is conditionally set to 1 at image request initialization time
1349 * and currently never change thereafter.
1350 */
1351static void img_request_write_set(struct rbd_img_request *img_request)
1352{
1353 set_bit(IMG_REQ_WRITE, &img_request->flags);
1354 smp_mb();
1355}
1356
1357static bool img_request_write_test(struct rbd_img_request *img_request)
1358{
1359 smp_mb();
1360 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1361}
1362
9849e986
AE
1363static void img_request_child_set(struct rbd_img_request *img_request)
1364{
1365 set_bit(IMG_REQ_CHILD, &img_request->flags);
1366 smp_mb();
1367}
1368
1369static bool img_request_child_test(struct rbd_img_request *img_request)
1370{
1371 smp_mb();
1372 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1373}
1374
d0b2e944
AE
1375static void img_request_layered_set(struct rbd_img_request *img_request)
1376{
1377 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1378 smp_mb();
1379}
1380
1381static bool img_request_layered_test(struct rbd_img_request *img_request)
1382{
1383 smp_mb();
1384 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1385}
1386
6e2a4505
AE
1387static void
1388rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1389{
b9434c5b
AE
1390 u64 xferred = obj_request->xferred;
1391 u64 length = obj_request->length;
1392
6e2a4505
AE
1393 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1394 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1395 xferred, length);
6e2a4505
AE
1396 /*
1397 * ENOENT means a hole in the image. We zero-fill the
1398 * entire length of the request. A short read also implies
1399 * zero-fill to the end of the request. Either way we
1400 * update the xferred count to indicate the whole request
1401 * was satisfied.
1402 */
b9434c5b 1403 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1404 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1405 if (obj_request->type == OBJ_REQUEST_BIO)
1406 zero_bio_chain(obj_request->bio_list, 0);
1407 else
1408 zero_pages(obj_request->pages, 0, length);
6e2a4505 1409 obj_request->result = 0;
b9434c5b
AE
1410 obj_request->xferred = length;
1411 } else if (xferred < length && !obj_request->result) {
1412 if (obj_request->type == OBJ_REQUEST_BIO)
1413 zero_bio_chain(obj_request->bio_list, xferred);
1414 else
1415 zero_pages(obj_request->pages, xferred, length);
1416 obj_request->xferred = length;
6e2a4505
AE
1417 }
1418 obj_request_done_set(obj_request);
1419}
1420
bf0d5f50
AE
1421static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1422{
37206ee5
AE
1423 dout("%s: obj %p cb %p\n", __func__, obj_request,
1424 obj_request->callback);
bf0d5f50
AE
1425 if (obj_request->callback)
1426 obj_request->callback(obj_request);
788e2df3
AE
1427 else
1428 complete_all(&obj_request->completion);
bf0d5f50
AE
1429}
1430
c47f9371 1431static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1432{
1433 dout("%s: obj %p\n", __func__, obj_request);
1434 obj_request_done_set(obj_request);
1435}
1436
c47f9371 1437static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1438{
57acbaa7 1439 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1440 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1441 bool layered = false;
1442
1443 if (obj_request_img_data_test(obj_request)) {
1444 img_request = obj_request->img_request;
1445 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1446 rbd_dev = img_request->rbd_dev;
57acbaa7 1447 }
8b3e1a56
AE
1448
1449 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1450 obj_request, img_request, obj_request->result,
1451 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1452 if (layered && obj_request->result == -ENOENT &&
1453 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1454 rbd_img_parent_read(obj_request);
1455 else if (img_request)
6e2a4505
AE
1456 rbd_img_obj_request_read_callback(obj_request);
1457 else
1458 obj_request_done_set(obj_request);
bf0d5f50
AE
1459}
1460
c47f9371 1461static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1462{
1b83bef2
SW
1463 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1464 obj_request->result, obj_request->length);
1465 /*
8b3e1a56
AE
1466 * There is no such thing as a successful short write. Set
1467 * it to our originally-requested length.
1b83bef2
SW
1468 */
1469 obj_request->xferred = obj_request->length;
07741308 1470 obj_request_done_set(obj_request);
bf0d5f50
AE
1471}
1472
fbfab539
AE
1473/*
1474 * For a simple stat call there's nothing to do. We'll do more if
1475 * this is part of a write sequence for a layered image.
1476 */
c47f9371 1477static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1478{
37206ee5 1479 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1480 obj_request_done_set(obj_request);
1481}
1482
bf0d5f50
AE
1483static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1484 struct ceph_msg *msg)
1485{
1486 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1487 u16 opcode;
1488
37206ee5 1489 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1490 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1491 if (obj_request_img_data_test(obj_request)) {
1492 rbd_assert(obj_request->img_request);
1493 rbd_assert(obj_request->which != BAD_WHICH);
1494 } else {
1495 rbd_assert(obj_request->which == BAD_WHICH);
1496 }
bf0d5f50 1497
1b83bef2
SW
1498 if (osd_req->r_result < 0)
1499 obj_request->result = osd_req->r_result;
bf0d5f50
AE
1500 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1501
0eefd470 1502 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1503
c47f9371
AE
1504 /*
1505 * We support a 64-bit length, but ultimately it has to be
1506 * passed to blk_end_request(), which takes an unsigned int.
1507 */
1b83bef2 1508 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1509 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1510 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1511 switch (opcode) {
1512 case CEPH_OSD_OP_READ:
c47f9371 1513 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1514 break;
1515 case CEPH_OSD_OP_WRITE:
c47f9371 1516 rbd_osd_write_callback(obj_request);
bf0d5f50 1517 break;
fbfab539 1518 case CEPH_OSD_OP_STAT:
c47f9371 1519 rbd_osd_stat_callback(obj_request);
fbfab539 1520 break;
36be9a76 1521 case CEPH_OSD_OP_CALL:
b8d70035 1522 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1523 case CEPH_OSD_OP_WATCH:
c47f9371 1524 rbd_osd_trivial_callback(obj_request);
9969ebc5 1525 break;
bf0d5f50
AE
1526 default:
1527 rbd_warn(NULL, "%s: unsupported op %hu\n",
1528 obj_request->object_name, (unsigned short) opcode);
1529 break;
1530 }
1531
07741308 1532 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1533 rbd_obj_request_complete(obj_request);
1534}
1535
9d4df01f 1536static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1537{
1538 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1539 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1540 u64 snap_id;
430c28c3 1541
8c042b0d 1542 rbd_assert(osd_req != NULL);
430c28c3 1543
9d4df01f 1544 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1545 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1546 NULL, snap_id, NULL);
1547}
1548
1549static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1550{
1551 struct rbd_img_request *img_request = obj_request->img_request;
1552 struct ceph_osd_request *osd_req = obj_request->osd_req;
1553 struct ceph_snap_context *snapc;
1554 struct timespec mtime = CURRENT_TIME;
1555
1556 rbd_assert(osd_req != NULL);
1557
1558 snapc = img_request ? img_request->snapc : NULL;
1559 ceph_osdc_build_request(osd_req, obj_request->offset,
1560 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1561}
1562
bf0d5f50
AE
1563static struct ceph_osd_request *rbd_osd_req_create(
1564 struct rbd_device *rbd_dev,
1565 bool write_request,
430c28c3 1566 struct rbd_obj_request *obj_request)
bf0d5f50 1567{
bf0d5f50
AE
1568 struct ceph_snap_context *snapc = NULL;
1569 struct ceph_osd_client *osdc;
1570 struct ceph_osd_request *osd_req;
bf0d5f50 1571
6365d33a
AE
1572 if (obj_request_img_data_test(obj_request)) {
1573 struct rbd_img_request *img_request = obj_request->img_request;
1574
0c425248
AE
1575 rbd_assert(write_request ==
1576 img_request_write_test(img_request));
1577 if (write_request)
bf0d5f50 1578 snapc = img_request->snapc;
bf0d5f50
AE
1579 }
1580
1581 /* Allocate and initialize the request, for the single op */
1582
1583 osdc = &rbd_dev->rbd_client->client->osdc;
1584 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1585 if (!osd_req)
1586 return NULL; /* ENOMEM */
bf0d5f50 1587
430c28c3 1588 if (write_request)
bf0d5f50 1589 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1590 else
bf0d5f50 1591 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1592
1593 osd_req->r_callback = rbd_osd_req_callback;
1594 osd_req->r_priv = obj_request;
1595
1596 osd_req->r_oid_len = strlen(obj_request->object_name);
1597 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1598 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1599
1600 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1601
bf0d5f50
AE
1602 return osd_req;
1603}
1604
0eefd470
AE
1605/*
1606 * Create a copyup osd request based on the information in the
1607 * object request supplied. A copyup request has two osd ops,
1608 * a copyup method call, and a "normal" write request.
1609 */
1610static struct ceph_osd_request *
1611rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1612{
1613 struct rbd_img_request *img_request;
1614 struct ceph_snap_context *snapc;
1615 struct rbd_device *rbd_dev;
1616 struct ceph_osd_client *osdc;
1617 struct ceph_osd_request *osd_req;
1618
1619 rbd_assert(obj_request_img_data_test(obj_request));
1620 img_request = obj_request->img_request;
1621 rbd_assert(img_request);
1622 rbd_assert(img_request_write_test(img_request));
1623
1624 /* Allocate and initialize the request, for the two ops */
1625
1626 snapc = img_request->snapc;
1627 rbd_dev = img_request->rbd_dev;
1628 osdc = &rbd_dev->rbd_client->client->osdc;
1629 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1630 if (!osd_req)
1631 return NULL; /* ENOMEM */
1632
1633 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1634 osd_req->r_callback = rbd_osd_req_callback;
1635 osd_req->r_priv = obj_request;
1636
1637 osd_req->r_oid_len = strlen(obj_request->object_name);
1638 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1639 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1640
1641 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1642
1643 return osd_req;
1644}
1645
1646
bf0d5f50
AE
1647static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1648{
1649 ceph_osdc_put_request(osd_req);
1650}
1651
1652/* object_name is assumed to be a non-null pointer and NUL-terminated */
1653
1654static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1655 u64 offset, u64 length,
1656 enum obj_request_type type)
1657{
1658 struct rbd_obj_request *obj_request;
1659 size_t size;
1660 char *name;
1661
1662 rbd_assert(obj_request_type_valid(type));
1663
1664 size = strlen(object_name) + 1;
1665 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1666 if (!obj_request)
1667 return NULL;
1668
1669 name = (char *)(obj_request + 1);
1670 obj_request->object_name = memcpy(name, object_name, size);
1671 obj_request->offset = offset;
1672 obj_request->length = length;
926f9b3f 1673 obj_request->flags = 0;
bf0d5f50
AE
1674 obj_request->which = BAD_WHICH;
1675 obj_request->type = type;
1676 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1677 init_completion(&obj_request->completion);
bf0d5f50
AE
1678 kref_init(&obj_request->kref);
1679
37206ee5
AE
1680 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1681 offset, length, (int)type, obj_request);
1682
bf0d5f50
AE
1683 return obj_request;
1684}
1685
1686static void rbd_obj_request_destroy(struct kref *kref)
1687{
1688 struct rbd_obj_request *obj_request;
1689
1690 obj_request = container_of(kref, struct rbd_obj_request, kref);
1691
37206ee5
AE
1692 dout("%s: obj %p\n", __func__, obj_request);
1693
bf0d5f50
AE
1694 rbd_assert(obj_request->img_request == NULL);
1695 rbd_assert(obj_request->which == BAD_WHICH);
1696
1697 if (obj_request->osd_req)
1698 rbd_osd_req_destroy(obj_request->osd_req);
1699
1700 rbd_assert(obj_request_type_valid(obj_request->type));
1701 switch (obj_request->type) {
9969ebc5
AE
1702 case OBJ_REQUEST_NODATA:
1703 break; /* Nothing to do */
bf0d5f50
AE
1704 case OBJ_REQUEST_BIO:
1705 if (obj_request->bio_list)
1706 bio_chain_put(obj_request->bio_list);
1707 break;
788e2df3
AE
1708 case OBJ_REQUEST_PAGES:
1709 if (obj_request->pages)
1710 ceph_release_page_vector(obj_request->pages,
1711 obj_request->page_count);
1712 break;
bf0d5f50
AE
1713 }
1714
1715 kfree(obj_request);
1716}
1717
1718/*
1719 * Caller is responsible for filling in the list of object requests
1720 * that comprises the image request, and the Linux request pointer
1721 * (if there is one).
1722 */
cc344fa1
AE
1723static struct rbd_img_request *rbd_img_request_create(
1724 struct rbd_device *rbd_dev,
bf0d5f50 1725 u64 offset, u64 length,
9849e986
AE
1726 bool write_request,
1727 bool child_request)
bf0d5f50
AE
1728{
1729 struct rbd_img_request *img_request;
1730 struct ceph_snap_context *snapc = NULL;
1731
1732 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1733 if (!img_request)
1734 return NULL;
1735
1736 if (write_request) {
1737 down_read(&rbd_dev->header_rwsem);
1738 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1739 up_read(&rbd_dev->header_rwsem);
1740 if (WARN_ON(!snapc)) {
1741 kfree(img_request);
1742 return NULL; /* Shouldn't happen */
1743 }
0c425248 1744
bf0d5f50
AE
1745 }
1746
1747 img_request->rq = NULL;
1748 img_request->rbd_dev = rbd_dev;
1749 img_request->offset = offset;
1750 img_request->length = length;
0c425248
AE
1751 img_request->flags = 0;
1752 if (write_request) {
1753 img_request_write_set(img_request);
bf0d5f50 1754 img_request->snapc = snapc;
0c425248 1755 } else {
bf0d5f50 1756 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1757 }
9849e986
AE
1758 if (child_request)
1759 img_request_child_set(img_request);
d0b2e944
AE
1760 if (rbd_dev->parent_spec)
1761 img_request_layered_set(img_request);
bf0d5f50
AE
1762 spin_lock_init(&img_request->completion_lock);
1763 img_request->next_completion = 0;
1764 img_request->callback = NULL;
a5a337d4 1765 img_request->result = 0;
bf0d5f50
AE
1766 img_request->obj_request_count = 0;
1767 INIT_LIST_HEAD(&img_request->obj_requests);
1768 kref_init(&img_request->kref);
1769
1770 rbd_img_request_get(img_request); /* Avoid a warning */
1771 rbd_img_request_put(img_request); /* TEMPORARY */
1772
37206ee5
AE
1773 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1774 write_request ? "write" : "read", offset, length,
1775 img_request);
1776
bf0d5f50
AE
1777 return img_request;
1778}
1779
1780static void rbd_img_request_destroy(struct kref *kref)
1781{
1782 struct rbd_img_request *img_request;
1783 struct rbd_obj_request *obj_request;
1784 struct rbd_obj_request *next_obj_request;
1785
1786 img_request = container_of(kref, struct rbd_img_request, kref);
1787
37206ee5
AE
1788 dout("%s: img %p\n", __func__, img_request);
1789
bf0d5f50
AE
1790 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1791 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1792 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1793
0c425248 1794 if (img_request_write_test(img_request))
bf0d5f50
AE
1795 ceph_put_snap_context(img_request->snapc);
1796
8b3e1a56
AE
1797 if (img_request_child_test(img_request))
1798 rbd_obj_request_put(img_request->obj_request);
1799
bf0d5f50
AE
1800 kfree(img_request);
1801}
1802
1217857f
AE
1803static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1804{
6365d33a 1805 struct rbd_img_request *img_request;
1217857f
AE
1806 unsigned int xferred;
1807 int result;
8b3e1a56 1808 bool more;
1217857f 1809
6365d33a
AE
1810 rbd_assert(obj_request_img_data_test(obj_request));
1811 img_request = obj_request->img_request;
1812
1217857f
AE
1813 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1814 xferred = (unsigned int)obj_request->xferred;
1815 result = obj_request->result;
1816 if (result) {
1817 struct rbd_device *rbd_dev = img_request->rbd_dev;
1818
1819 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1820 img_request_write_test(img_request) ? "write" : "read",
1821 obj_request->length, obj_request->img_offset,
1822 obj_request->offset);
1823 rbd_warn(rbd_dev, " result %d xferred %x\n",
1824 result, xferred);
1825 if (!img_request->result)
1826 img_request->result = result;
1827 }
1828
f1a4739f
AE
1829 /* Image object requests don't own their page array */
1830
1831 if (obj_request->type == OBJ_REQUEST_PAGES) {
1832 obj_request->pages = NULL;
1833 obj_request->page_count = 0;
1834 }
1835
8b3e1a56
AE
1836 if (img_request_child_test(img_request)) {
1837 rbd_assert(img_request->obj_request != NULL);
1838 more = obj_request->which < img_request->obj_request_count - 1;
1839 } else {
1840 rbd_assert(img_request->rq != NULL);
1841 more = blk_end_request(img_request->rq, result, xferred);
1842 }
1843
1844 return more;
1217857f
AE
1845}
1846
2169238d
AE
1847static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1848{
1849 struct rbd_img_request *img_request;
1850 u32 which = obj_request->which;
1851 bool more = true;
1852
6365d33a 1853 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1854 img_request = obj_request->img_request;
1855
1856 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1857 rbd_assert(img_request != NULL);
2169238d
AE
1858 rbd_assert(img_request->obj_request_count > 0);
1859 rbd_assert(which != BAD_WHICH);
1860 rbd_assert(which < img_request->obj_request_count);
1861 rbd_assert(which >= img_request->next_completion);
1862
1863 spin_lock_irq(&img_request->completion_lock);
1864 if (which != img_request->next_completion)
1865 goto out;
1866
1867 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1868 rbd_assert(more);
1869 rbd_assert(which < img_request->obj_request_count);
1870
1871 if (!obj_request_done_test(obj_request))
1872 break;
1217857f 1873 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1874 which++;
1875 }
1876
1877 rbd_assert(more ^ (which == img_request->obj_request_count));
1878 img_request->next_completion = which;
1879out:
1880 spin_unlock_irq(&img_request->completion_lock);
1881
1882 if (!more)
1883 rbd_img_request_complete(img_request);
1884}
1885
f1a4739f
AE
1886/*
1887 * Split up an image request into one or more object requests, each
1888 * to a different object. The "type" parameter indicates whether
1889 * "data_desc" is the pointer to the head of a list of bio
1890 * structures, or the base of a page array. In either case this
1891 * function assumes data_desc describes memory sufficient to hold
1892 * all data described by the image request.
1893 */
1894static int rbd_img_request_fill(struct rbd_img_request *img_request,
1895 enum obj_request_type type,
1896 void *data_desc)
bf0d5f50
AE
1897{
1898 struct rbd_device *rbd_dev = img_request->rbd_dev;
1899 struct rbd_obj_request *obj_request = NULL;
1900 struct rbd_obj_request *next_obj_request;
0c425248 1901 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
1902 struct bio *bio_list;
1903 unsigned int bio_offset = 0;
1904 struct page **pages;
7da22d29 1905 u64 img_offset;
bf0d5f50
AE
1906 u64 resid;
1907 u16 opcode;
1908
f1a4739f
AE
1909 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1910 (int)type, data_desc);
37206ee5 1911
430c28c3 1912 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 1913 img_offset = img_request->offset;
bf0d5f50 1914 resid = img_request->length;
4dda41d3 1915 rbd_assert(resid > 0);
f1a4739f
AE
1916
1917 if (type == OBJ_REQUEST_BIO) {
1918 bio_list = data_desc;
1919 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1920 } else {
1921 rbd_assert(type == OBJ_REQUEST_PAGES);
1922 pages = data_desc;
1923 }
1924
bf0d5f50 1925 while (resid) {
2fa12320 1926 struct ceph_osd_request *osd_req;
bf0d5f50 1927 const char *object_name;
bf0d5f50
AE
1928 u64 offset;
1929 u64 length;
1930
7da22d29 1931 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
1932 if (!object_name)
1933 goto out_unwind;
7da22d29
AE
1934 offset = rbd_segment_offset(rbd_dev, img_offset);
1935 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 1936 obj_request = rbd_obj_request_create(object_name,
f1a4739f 1937 offset, length, type);
bf0d5f50
AE
1938 kfree(object_name); /* object request has its own copy */
1939 if (!obj_request)
1940 goto out_unwind;
1941
f1a4739f
AE
1942 if (type == OBJ_REQUEST_BIO) {
1943 unsigned int clone_size;
1944
1945 rbd_assert(length <= (u64)UINT_MAX);
1946 clone_size = (unsigned int)length;
1947 obj_request->bio_list =
1948 bio_chain_clone_range(&bio_list,
1949 &bio_offset,
1950 clone_size,
1951 GFP_ATOMIC);
1952 if (!obj_request->bio_list)
1953 goto out_partial;
1954 } else {
1955 unsigned int page_count;
1956
1957 obj_request->pages = pages;
1958 page_count = (u32)calc_pages_for(offset, length);
1959 obj_request->page_count = page_count;
1960 if ((offset + length) & ~PAGE_MASK)
1961 page_count--; /* more on last page */
1962 pages += page_count;
1963 }
bf0d5f50 1964
2fa12320
AE
1965 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1966 obj_request);
1967 if (!osd_req)
bf0d5f50 1968 goto out_partial;
2fa12320 1969 obj_request->osd_req = osd_req;
2169238d 1970 obj_request->callback = rbd_img_obj_callback;
430c28c3 1971
2fa12320
AE
1972 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1973 0, 0);
f1a4739f
AE
1974 if (type == OBJ_REQUEST_BIO)
1975 osd_req_op_extent_osd_data_bio(osd_req, 0,
1976 obj_request->bio_list, length);
1977 else
1978 osd_req_op_extent_osd_data_pages(osd_req, 0,
1979 obj_request->pages, length,
1980 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
1981
1982 if (write_request)
1983 rbd_osd_req_format_write(obj_request);
1984 else
1985 rbd_osd_req_format_read(obj_request);
430c28c3 1986
7da22d29 1987 obj_request->img_offset = img_offset;
bf0d5f50
AE
1988 rbd_img_obj_request_add(img_request, obj_request);
1989
7da22d29 1990 img_offset += length;
bf0d5f50
AE
1991 resid -= length;
1992 }
1993
1994 return 0;
1995
1996out_partial:
1997 rbd_obj_request_put(obj_request);
1998out_unwind:
1999 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2000 rbd_obj_request_put(obj_request);
2001
2002 return -ENOMEM;
2003}
2004
0eefd470
AE
2005static void
2006rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2007{
2008 struct rbd_img_request *img_request;
2009 struct rbd_device *rbd_dev;
2010 u64 length;
2011 u32 page_count;
2012
2013 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2014 rbd_assert(obj_request_img_data_test(obj_request));
2015 img_request = obj_request->img_request;
2016 rbd_assert(img_request);
2017
2018 rbd_dev = img_request->rbd_dev;
2019 rbd_assert(rbd_dev);
2020 length = (u64)1 << rbd_dev->header.obj_order;
2021 page_count = (u32)calc_pages_for(0, length);
2022
2023 rbd_assert(obj_request->copyup_pages);
2024 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2025 obj_request->copyup_pages = NULL;
2026
2027 /*
2028 * We want the transfer count to reflect the size of the
2029 * original write request. There is no such thing as a
2030 * successful short write, so if the request was successful
2031 * we can just set it to the originally-requested length.
2032 */
2033 if (!obj_request->result)
2034 obj_request->xferred = obj_request->length;
2035
2036 /* Finish up with the normal image object callback */
2037
2038 rbd_img_obj_callback(obj_request);
2039}
2040
3d7efd18
AE
2041static void
2042rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2043{
2044 struct rbd_obj_request *orig_request;
0eefd470
AE
2045 struct ceph_osd_request *osd_req;
2046 struct ceph_osd_client *osdc;
2047 struct rbd_device *rbd_dev;
3d7efd18 2048 struct page **pages;
3d7efd18
AE
2049 int result;
2050 u64 obj_size;
2051 u64 xferred;
2052
2053 rbd_assert(img_request_child_test(img_request));
2054
2055 /* First get what we need from the image request */
2056
2057 pages = img_request->copyup_pages;
2058 rbd_assert(pages != NULL);
2059 img_request->copyup_pages = NULL;
2060
2061 orig_request = img_request->obj_request;
2062 rbd_assert(orig_request != NULL);
0eefd470 2063 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2064 result = img_request->result;
2065 obj_size = img_request->length;
2066 xferred = img_request->xferred;
2067
0eefd470
AE
2068 rbd_dev = img_request->rbd_dev;
2069 rbd_assert(rbd_dev);
2070 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2071
3d7efd18
AE
2072 rbd_img_request_put(img_request);
2073
0eefd470
AE
2074 if (result)
2075 goto out_err;
2076
2077 /* Allocate the new copyup osd request for the original request */
2078
2079 result = -ENOMEM;
2080 rbd_assert(!orig_request->osd_req);
2081 osd_req = rbd_osd_req_create_copyup(orig_request);
2082 if (!osd_req)
2083 goto out_err;
2084 orig_request->osd_req = osd_req;
2085 orig_request->copyup_pages = pages;
3d7efd18 2086
0eefd470 2087 /* Initialize the copyup op */
3d7efd18 2088
0eefd470
AE
2089 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2090 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2091 false, false);
3d7efd18 2092
0eefd470
AE
2093 /* Then the original write request op */
2094
2095 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2096 orig_request->offset,
2097 orig_request->length, 0, 0);
2098 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2099 orig_request->length);
2100
2101 rbd_osd_req_format_write(orig_request);
2102
2103 /* All set, send it off. */
2104
2105 orig_request->callback = rbd_img_obj_copyup_callback;
2106 osdc = &rbd_dev->rbd_client->client->osdc;
2107 result = rbd_obj_request_submit(osdc, orig_request);
2108 if (!result)
2109 return;
2110out_err:
2111 /* Record the error code and complete the request */
2112
2113 orig_request->result = result;
2114 orig_request->xferred = 0;
2115 obj_request_done_set(orig_request);
2116 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2117}
2118
2119/*
2120 * Read from the parent image the range of data that covers the
2121 * entire target of the given object request. This is used for
2122 * satisfying a layered image write request when the target of an
2123 * object request from the image request does not exist.
2124 *
2125 * A page array big enough to hold the returned data is allocated
2126 * and supplied to rbd_img_request_fill() as the "data descriptor."
2127 * When the read completes, this page array will be transferred to
2128 * the original object request for the copyup operation.
2129 *
2130 * If an error occurs, record it as the result of the original
2131 * object request and mark it done so it gets completed.
2132 */
2133static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2134{
2135 struct rbd_img_request *img_request = NULL;
2136 struct rbd_img_request *parent_request = NULL;
2137 struct rbd_device *rbd_dev;
2138 u64 img_offset;
2139 u64 length;
2140 struct page **pages = NULL;
2141 u32 page_count;
2142 int result;
2143
2144 rbd_assert(obj_request_img_data_test(obj_request));
2145 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2146
2147 img_request = obj_request->img_request;
2148 rbd_assert(img_request != NULL);
2149 rbd_dev = img_request->rbd_dev;
2150 rbd_assert(rbd_dev->parent != NULL);
2151
0eefd470
AE
2152 /*
2153 * First things first. The original osd request is of no
2154 * use to use any more, we'll need a new one that can hold
2155 * the two ops in a copyup request. We'll get that later,
2156 * but for now we can release the old one.
2157 */
2158 rbd_osd_req_destroy(obj_request->osd_req);
2159 obj_request->osd_req = NULL;
2160
3d7efd18
AE
2161 /*
2162 * Determine the byte range covered by the object in the
2163 * child image to which the original request was to be sent.
2164 */
2165 img_offset = obj_request->img_offset - obj_request->offset;
2166 length = (u64)1 << rbd_dev->header.obj_order;
2167
a9e8ba2c
AE
2168 /*
2169 * There is no defined parent data beyond the parent
2170 * overlap, so limit what we read at that boundary if
2171 * necessary.
2172 */
2173 if (img_offset + length > rbd_dev->parent_overlap) {
2174 rbd_assert(img_offset < rbd_dev->parent_overlap);
2175 length = rbd_dev->parent_overlap - img_offset;
2176 }
2177
3d7efd18
AE
2178 /*
2179 * Allocate a page array big enough to receive the data read
2180 * from the parent.
2181 */
2182 page_count = (u32)calc_pages_for(0, length);
2183 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2184 if (IS_ERR(pages)) {
2185 result = PTR_ERR(pages);
2186 pages = NULL;
2187 goto out_err;
2188 }
2189
2190 result = -ENOMEM;
2191 parent_request = rbd_img_request_create(rbd_dev->parent,
2192 img_offset, length,
2193 false, true);
2194 if (!parent_request)
2195 goto out_err;
2196 rbd_obj_request_get(obj_request);
2197 parent_request->obj_request = obj_request;
2198
2199 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2200 if (result)
2201 goto out_err;
2202 parent_request->copyup_pages = pages;
2203
2204 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2205 result = rbd_img_request_submit(parent_request);
2206 if (!result)
2207 return 0;
2208
2209 parent_request->copyup_pages = NULL;
2210 parent_request->obj_request = NULL;
2211 rbd_obj_request_put(obj_request);
2212out_err:
2213 if (pages)
2214 ceph_release_page_vector(pages, page_count);
2215 if (parent_request)
2216 rbd_img_request_put(parent_request);
2217 obj_request->result = result;
2218 obj_request->xferred = 0;
2219 obj_request_done_set(obj_request);
2220
2221 return result;
2222}
2223
c5b5ef6c
AE
2224static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2225{
c5b5ef6c
AE
2226 struct rbd_obj_request *orig_request;
2227 int result;
2228
2229 rbd_assert(!obj_request_img_data_test(obj_request));
2230
2231 /*
2232 * All we need from the object request is the original
2233 * request and the result of the STAT op. Grab those, then
2234 * we're done with the request.
2235 */
2236 orig_request = obj_request->obj_request;
2237 obj_request->obj_request = NULL;
2238 rbd_assert(orig_request);
2239 rbd_assert(orig_request->img_request);
2240
2241 result = obj_request->result;
2242 obj_request->result = 0;
2243
2244 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2245 obj_request, orig_request, result,
2246 obj_request->xferred, obj_request->length);
2247 rbd_obj_request_put(obj_request);
2248
2249 rbd_assert(orig_request);
2250 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2251
2252 /*
2253 * Our only purpose here is to determine whether the object
2254 * exists, and we don't want to treat the non-existence as
2255 * an error. If something else comes back, transfer the
2256 * error to the original request and complete it now.
2257 */
2258 if (!result) {
2259 obj_request_existence_set(orig_request, true);
2260 } else if (result == -ENOENT) {
2261 obj_request_existence_set(orig_request, false);
2262 } else if (result) {
2263 orig_request->result = result;
3d7efd18 2264 goto out;
c5b5ef6c
AE
2265 }
2266
2267 /*
2268 * Resubmit the original request now that we have recorded
2269 * whether the target object exists.
2270 */
b454e36d 2271 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2272out:
c5b5ef6c
AE
2273 if (orig_request->result)
2274 rbd_obj_request_complete(orig_request);
2275 rbd_obj_request_put(orig_request);
2276}
2277
2278static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2279{
2280 struct rbd_obj_request *stat_request;
2281 struct rbd_device *rbd_dev;
2282 struct ceph_osd_client *osdc;
2283 struct page **pages = NULL;
2284 u32 page_count;
2285 size_t size;
2286 int ret;
2287
2288 /*
2289 * The response data for a STAT call consists of:
2290 * le64 length;
2291 * struct {
2292 * le32 tv_sec;
2293 * le32 tv_nsec;
2294 * } mtime;
2295 */
2296 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2297 page_count = (u32)calc_pages_for(0, size);
2298 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2299 if (IS_ERR(pages))
2300 return PTR_ERR(pages);
2301
2302 ret = -ENOMEM;
2303 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2304 OBJ_REQUEST_PAGES);
2305 if (!stat_request)
2306 goto out;
2307
2308 rbd_obj_request_get(obj_request);
2309 stat_request->obj_request = obj_request;
2310 stat_request->pages = pages;
2311 stat_request->page_count = page_count;
2312
2313 rbd_assert(obj_request->img_request);
2314 rbd_dev = obj_request->img_request->rbd_dev;
2315 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2316 stat_request);
2317 if (!stat_request->osd_req)
2318 goto out;
2319 stat_request->callback = rbd_img_obj_exists_callback;
2320
2321 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2322 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2323 false, false);
9d4df01f 2324 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2325
2326 osdc = &rbd_dev->rbd_client->client->osdc;
2327 ret = rbd_obj_request_submit(osdc, stat_request);
2328out:
2329 if (ret)
2330 rbd_obj_request_put(obj_request);
2331
2332 return ret;
2333}
2334
b454e36d
AE
2335static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2336{
2337 struct rbd_img_request *img_request;
a9e8ba2c 2338 struct rbd_device *rbd_dev;
3d7efd18 2339 bool known;
b454e36d
AE
2340
2341 rbd_assert(obj_request_img_data_test(obj_request));
2342
2343 img_request = obj_request->img_request;
2344 rbd_assert(img_request);
a9e8ba2c 2345 rbd_dev = img_request->rbd_dev;
b454e36d 2346
b454e36d 2347 /*
a9e8ba2c
AE
2348 * Only writes to layered images need special handling.
2349 * Reads and non-layered writes are simple object requests.
2350 * Layered writes that start beyond the end of the overlap
2351 * with the parent have no parent data, so they too are
2352 * simple object requests. Finally, if the target object is
2353 * known to already exist, its parent data has already been
2354 * copied, so a write to the object can also be handled as a
2355 * simple object request.
b454e36d
AE
2356 */
2357 if (!img_request_write_test(img_request) ||
2358 !img_request_layered_test(img_request) ||
a9e8ba2c 2359 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2360 ((known = obj_request_known_test(obj_request)) &&
2361 obj_request_exists_test(obj_request))) {
b454e36d
AE
2362
2363 struct rbd_device *rbd_dev;
2364 struct ceph_osd_client *osdc;
2365
2366 rbd_dev = obj_request->img_request->rbd_dev;
2367 osdc = &rbd_dev->rbd_client->client->osdc;
2368
2369 return rbd_obj_request_submit(osdc, obj_request);
2370 }
2371
2372 /*
3d7efd18
AE
2373 * It's a layered write. The target object might exist but
2374 * we may not know that yet. If we know it doesn't exist,
2375 * start by reading the data for the full target object from
2376 * the parent so we can use it for a copyup to the target.
b454e36d 2377 */
3d7efd18
AE
2378 if (known)
2379 return rbd_img_obj_parent_read_full(obj_request);
2380
2381 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2382
2383 return rbd_img_obj_exists_submit(obj_request);
2384}
2385
bf0d5f50
AE
2386static int rbd_img_request_submit(struct rbd_img_request *img_request)
2387{
bf0d5f50 2388 struct rbd_obj_request *obj_request;
46faeed4 2389 struct rbd_obj_request *next_obj_request;
bf0d5f50 2390
37206ee5 2391 dout("%s: img %p\n", __func__, img_request);
46faeed4 2392 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2393 int ret;
2394
b454e36d 2395 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2396 if (ret)
2397 return ret;
bf0d5f50
AE
2398 }
2399
2400 return 0;
2401}
8b3e1a56
AE
2402
2403static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2404{
2405 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2406 struct rbd_device *rbd_dev;
2407 u64 obj_end;
8b3e1a56
AE
2408
2409 rbd_assert(img_request_child_test(img_request));
2410
2411 obj_request = img_request->obj_request;
a9e8ba2c
AE
2412 rbd_assert(obj_request);
2413 rbd_assert(obj_request->img_request);
2414
8b3e1a56 2415 obj_request->result = img_request->result;
a9e8ba2c
AE
2416 if (obj_request->result)
2417 goto out;
2418
2419 /*
2420 * We need to zero anything beyond the parent overlap
2421 * boundary. Since rbd_img_obj_request_read_callback()
2422 * will zero anything beyond the end of a short read, an
2423 * easy way to do this is to pretend the data from the
2424 * parent came up short--ending at the overlap boundary.
2425 */
2426 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2427 obj_end = obj_request->img_offset + obj_request->length;
2428 rbd_dev = obj_request->img_request->rbd_dev;
2429 if (obj_end > rbd_dev->parent_overlap) {
2430 u64 xferred = 0;
2431
2432 if (obj_request->img_offset < rbd_dev->parent_overlap)
2433 xferred = rbd_dev->parent_overlap -
2434 obj_request->img_offset;
8b3e1a56 2435
a9e8ba2c
AE
2436 obj_request->xferred = min(img_request->xferred, xferred);
2437 } else {
2438 obj_request->xferred = img_request->xferred;
2439 }
2440out:
8b3e1a56
AE
2441 rbd_img_obj_request_read_callback(obj_request);
2442 rbd_obj_request_complete(obj_request);
2443}
2444
2445static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2446{
2447 struct rbd_device *rbd_dev;
2448 struct rbd_img_request *img_request;
2449 int result;
2450
2451 rbd_assert(obj_request_img_data_test(obj_request));
2452 rbd_assert(obj_request->img_request != NULL);
2453 rbd_assert(obj_request->result == (s32) -ENOENT);
2454 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2455
2456 rbd_dev = obj_request->img_request->rbd_dev;
2457 rbd_assert(rbd_dev->parent != NULL);
2458 /* rbd_read_finish(obj_request, obj_request->length); */
2459 img_request = rbd_img_request_create(rbd_dev->parent,
2460 obj_request->img_offset,
2461 obj_request->length,
2462 false, true);
2463 result = -ENOMEM;
2464 if (!img_request)
2465 goto out_err;
2466
2467 rbd_obj_request_get(obj_request);
2468 img_request->obj_request = obj_request;
2469
f1a4739f
AE
2470 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2471 obj_request->bio_list);
8b3e1a56
AE
2472 if (result)
2473 goto out_err;
2474
2475 img_request->callback = rbd_img_parent_read_callback;
2476 result = rbd_img_request_submit(img_request);
2477 if (result)
2478 goto out_err;
2479
2480 return;
2481out_err:
2482 if (img_request)
2483 rbd_img_request_put(img_request);
2484 obj_request->result = result;
2485 obj_request->xferred = 0;
2486 obj_request_done_set(obj_request);
2487}
bf0d5f50 2488
cf81b60e 2489static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
2490 u64 ver, u64 notify_id)
2491{
2492 struct rbd_obj_request *obj_request;
2169238d 2493 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2494 int ret;
2495
2496 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2497 OBJ_REQUEST_NODATA);
2498 if (!obj_request)
2499 return -ENOMEM;
2500
2501 ret = -ENOMEM;
430c28c3 2502 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2503 if (!obj_request->osd_req)
2504 goto out;
2169238d 2505 obj_request->callback = rbd_obj_request_put;
b8d70035 2506
c99d2d4a
AE
2507 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2508 notify_id, ver, 0);
9d4df01f 2509 rbd_osd_req_format_read(obj_request);
430c28c3 2510
b8d70035 2511 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2512out:
cf81b60e
AE
2513 if (ret)
2514 rbd_obj_request_put(obj_request);
b8d70035
AE
2515
2516 return ret;
2517}
2518
2519static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2520{
2521 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2522 u64 hver;
2523 int rc;
2524
2525 if (!rbd_dev)
2526 return;
2527
37206ee5 2528 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
b8d70035
AE
2529 rbd_dev->header_name, (unsigned long long) notify_id,
2530 (unsigned int) opcode);
2531 rc = rbd_dev_refresh(rbd_dev, &hver);
2532 if (rc)
2533 rbd_warn(rbd_dev, "got notification but failed to "
2534 " update snaps: %d\n", rc);
2535
cf81b60e 2536 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
2537}
2538
9969ebc5
AE
2539/*
2540 * Request sync osd watch/unwatch. The value of "start" determines
2541 * whether a watch request is being initiated or torn down.
2542 */
2543static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2544{
2545 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2546 struct rbd_obj_request *obj_request;
9969ebc5
AE
2547 int ret;
2548
2549 rbd_assert(start ^ !!rbd_dev->watch_event);
2550 rbd_assert(start ^ !!rbd_dev->watch_request);
2551
2552 if (start) {
3c663bbd 2553 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2554 &rbd_dev->watch_event);
2555 if (ret < 0)
2556 return ret;
8eb87565 2557 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2558 }
2559
2560 ret = -ENOMEM;
2561 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2562 OBJ_REQUEST_NODATA);
2563 if (!obj_request)
2564 goto out_cancel;
2565
430c28c3
AE
2566 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2567 if (!obj_request->osd_req)
2568 goto out_cancel;
2569
8eb87565 2570 if (start)
975241af 2571 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2572 else
6977c3f9 2573 ceph_osdc_unregister_linger_request(osdc,
975241af 2574 rbd_dev->watch_request->osd_req);
2169238d
AE
2575
2576 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2577 rbd_dev->watch_event->cookie,
2578 rbd_dev->header.obj_version, start);
9d4df01f 2579 rbd_osd_req_format_write(obj_request);
2169238d 2580
9969ebc5
AE
2581 ret = rbd_obj_request_submit(osdc, obj_request);
2582 if (ret)
2583 goto out_cancel;
2584 ret = rbd_obj_request_wait(obj_request);
2585 if (ret)
2586 goto out_cancel;
9969ebc5
AE
2587 ret = obj_request->result;
2588 if (ret)
2589 goto out_cancel;
2590
8eb87565
AE
2591 /*
2592 * A watch request is set to linger, so the underlying osd
2593 * request won't go away until we unregister it. We retain
2594 * a pointer to the object request during that time (in
2595 * rbd_dev->watch_request), so we'll keep a reference to
2596 * it. We'll drop that reference (below) after we've
2597 * unregistered it.
2598 */
2599 if (start) {
2600 rbd_dev->watch_request = obj_request;
2601
2602 return 0;
2603 }
2604
2605 /* We have successfully torn down the watch request */
2606
2607 rbd_obj_request_put(rbd_dev->watch_request);
2608 rbd_dev->watch_request = NULL;
9969ebc5
AE
2609out_cancel:
2610 /* Cancel the event if we're tearing down, or on error */
2611 ceph_osdc_cancel_event(rbd_dev->watch_event);
2612 rbd_dev->watch_event = NULL;
9969ebc5
AE
2613 if (obj_request)
2614 rbd_obj_request_put(obj_request);
2615
2616 return ret;
2617}
2618
36be9a76
AE
2619/*
2620 * Synchronous osd object method call
2621 */
2622static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2623 const char *object_name,
2624 const char *class_name,
2625 const char *method_name,
2626 const char *outbound,
2627 size_t outbound_size,
2628 char *inbound,
2629 size_t inbound_size,
2630 u64 *version)
2631{
2169238d 2632 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2633 struct rbd_obj_request *obj_request;
36be9a76
AE
2634 struct page **pages;
2635 u32 page_count;
2636 int ret;
2637
2638 /*
6010a451
AE
2639 * Method calls are ultimately read operations. The result
2640 * should placed into the inbound buffer provided. They
2641 * also supply outbound data--parameters for the object
2642 * method. Currently if this is present it will be a
2643 * snapshot id.
36be9a76
AE
2644 */
2645 page_count = (u32) calc_pages_for(0, inbound_size);
2646 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2647 if (IS_ERR(pages))
2648 return PTR_ERR(pages);
2649
2650 ret = -ENOMEM;
6010a451 2651 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2652 OBJ_REQUEST_PAGES);
2653 if (!obj_request)
2654 goto out;
2655
2656 obj_request->pages = pages;
2657 obj_request->page_count = page_count;
2658
430c28c3 2659 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2660 if (!obj_request->osd_req)
2661 goto out;
2662
c99d2d4a 2663 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2664 class_name, method_name);
2665 if (outbound_size) {
2666 struct ceph_pagelist *pagelist;
2667
2668 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2669 if (!pagelist)
2670 goto out;
2671
2672 ceph_pagelist_init(pagelist);
2673 ceph_pagelist_append(pagelist, outbound, outbound_size);
2674 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2675 pagelist);
2676 }
a4ce40a9
AE
2677 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2678 obj_request->pages, inbound_size,
44cd188d 2679 0, false, false);
9d4df01f 2680 rbd_osd_req_format_read(obj_request);
430c28c3 2681
36be9a76
AE
2682 ret = rbd_obj_request_submit(osdc, obj_request);
2683 if (ret)
2684 goto out;
2685 ret = rbd_obj_request_wait(obj_request);
2686 if (ret)
2687 goto out;
2688
2689 ret = obj_request->result;
2690 if (ret < 0)
2691 goto out;
23ed6e13 2692 ret = 0;
903bb32e 2693 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2694 if (version)
2695 *version = obj_request->version;
2696out:
2697 if (obj_request)
2698 rbd_obj_request_put(obj_request);
2699 else
2700 ceph_release_page_vector(pages, page_count);
2701
2702 return ret;
2703}
2704
bf0d5f50 2705static void rbd_request_fn(struct request_queue *q)
cc344fa1 2706 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2707{
2708 struct rbd_device *rbd_dev = q->queuedata;
2709 bool read_only = rbd_dev->mapping.read_only;
2710 struct request *rq;
2711 int result;
2712
2713 while ((rq = blk_fetch_request(q))) {
2714 bool write_request = rq_data_dir(rq) == WRITE;
2715 struct rbd_img_request *img_request;
2716 u64 offset;
2717 u64 length;
2718
2719 /* Ignore any non-FS requests that filter through. */
2720
2721 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2722 dout("%s: non-fs request type %d\n", __func__,
2723 (int) rq->cmd_type);
2724 __blk_end_request_all(rq, 0);
2725 continue;
2726 }
2727
2728 /* Ignore/skip any zero-length requests */
2729
2730 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2731 length = (u64) blk_rq_bytes(rq);
2732
2733 if (!length) {
2734 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2735 __blk_end_request_all(rq, 0);
2736 continue;
2737 }
2738
2739 spin_unlock_irq(q->queue_lock);
2740
2741 /* Disallow writes to a read-only device */
2742
2743 if (write_request) {
2744 result = -EROFS;
2745 if (read_only)
2746 goto end_request;
2747 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2748 }
2749
6d292906
AE
2750 /*
2751 * Quit early if the mapped snapshot no longer
2752 * exists. It's still possible the snapshot will
2753 * have disappeared by the time our request arrives
2754 * at the osd, but there's no sense in sending it if
2755 * we already know.
2756 */
2757 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2758 dout("request for non-existent snapshot");
2759 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2760 result = -ENXIO;
2761 goto end_request;
2762 }
2763
bf0d5f50
AE
2764 result = -EINVAL;
2765 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2766 goto end_request; /* Shouldn't happen */
2767
2768 result = -ENOMEM;
2769 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2770 write_request, false);
bf0d5f50
AE
2771 if (!img_request)
2772 goto end_request;
2773
2774 img_request->rq = rq;
2775
f1a4739f
AE
2776 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2777 rq->bio);
bf0d5f50
AE
2778 if (!result)
2779 result = rbd_img_request_submit(img_request);
2780 if (result)
2781 rbd_img_request_put(img_request);
2782end_request:
2783 spin_lock_irq(q->queue_lock);
2784 if (result < 0) {
7da22d29
AE
2785 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2786 write_request ? "write" : "read",
2787 length, offset, result);
2788
bf0d5f50
AE
2789 __blk_end_request_all(rq, result);
2790 }
2791 }
2792}
2793
602adf40
YS
2794/*
2795 * a queue callback. Makes sure that we don't create a bio that spans across
2796 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2797 * which we handle later at bio_chain_clone_range()
602adf40
YS
2798 */
2799static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2800 struct bio_vec *bvec)
2801{
2802 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2803 sector_t sector_offset;
2804 sector_t sectors_per_obj;
2805 sector_t obj_sector_offset;
2806 int ret;
2807
2808 /*
2809 * Find how far into its rbd object the partition-relative
2810 * bio start sector is to offset relative to the enclosing
2811 * device.
2812 */
2813 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2814 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2815 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2816
2817 /*
2818 * Compute the number of bytes from that offset to the end
2819 * of the object. Account for what's already used by the bio.
2820 */
2821 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2822 if (ret > bmd->bi_size)
2823 ret -= bmd->bi_size;
2824 else
2825 ret = 0;
2826
2827 /*
2828 * Don't send back more than was asked for. And if the bio
2829 * was empty, let the whole thing through because: "Note
2830 * that a block device *must* allow a single page to be
2831 * added to an empty bio."
2832 */
2833 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2834 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2835 ret = (int) bvec->bv_len;
2836
2837 return ret;
602adf40
YS
2838}
2839
2840static void rbd_free_disk(struct rbd_device *rbd_dev)
2841{
2842 struct gendisk *disk = rbd_dev->disk;
2843
2844 if (!disk)
2845 return;
2846
602adf40
YS
2847 if (disk->flags & GENHD_FL_UP)
2848 del_gendisk(disk);
2849 if (disk->queue)
2850 blk_cleanup_queue(disk->queue);
2851 put_disk(disk);
2852}
2853
788e2df3
AE
2854static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2855 const char *object_name,
2856 u64 offset, u64 length,
2857 char *buf, u64 *version)
2858
2859{
2169238d 2860 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2861 struct rbd_obj_request *obj_request;
788e2df3
AE
2862 struct page **pages = NULL;
2863 u32 page_count;
1ceae7ef 2864 size_t size;
788e2df3
AE
2865 int ret;
2866
2867 page_count = (u32) calc_pages_for(offset, length);
2868 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2869 if (IS_ERR(pages))
2870 ret = PTR_ERR(pages);
2871
2872 ret = -ENOMEM;
2873 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2874 OBJ_REQUEST_PAGES);
788e2df3
AE
2875 if (!obj_request)
2876 goto out;
2877
2878 obj_request->pages = pages;
2879 obj_request->page_count = page_count;
2880
430c28c3 2881 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2882 if (!obj_request->osd_req)
2883 goto out;
2884
c99d2d4a
AE
2885 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2886 offset, length, 0, 0);
406e2c9f 2887 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 2888 obj_request->pages,
44cd188d
AE
2889 obj_request->length,
2890 obj_request->offset & ~PAGE_MASK,
2891 false, false);
9d4df01f 2892 rbd_osd_req_format_read(obj_request);
430c28c3 2893
788e2df3
AE
2894 ret = rbd_obj_request_submit(osdc, obj_request);
2895 if (ret)
2896 goto out;
2897 ret = rbd_obj_request_wait(obj_request);
2898 if (ret)
2899 goto out;
2900
2901 ret = obj_request->result;
2902 if (ret < 0)
2903 goto out;
1ceae7ef
AE
2904
2905 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2906 size = (size_t) obj_request->xferred;
903bb32e 2907 ceph_copy_from_page_vector(pages, buf, 0, size);
23ed6e13
AE
2908 rbd_assert(size <= (size_t) INT_MAX);
2909 ret = (int) size;
788e2df3
AE
2910 if (version)
2911 *version = obj_request->version;
2912out:
2913 if (obj_request)
2914 rbd_obj_request_put(obj_request);
2915 else
2916 ceph_release_page_vector(pages, page_count);
2917
2918 return ret;
2919}
2920
602adf40 2921/*
4156d998
AE
2922 * Read the complete header for the given rbd device.
2923 *
2924 * Returns a pointer to a dynamically-allocated buffer containing
2925 * the complete and validated header. Caller can pass the address
2926 * of a variable that will be filled in with the version of the
2927 * header object at the time it was read.
2928 *
2929 * Returns a pointer-coded errno if a failure occurs.
602adf40 2930 */
4156d998
AE
2931static struct rbd_image_header_ondisk *
2932rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2933{
4156d998 2934 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2935 u32 snap_count = 0;
4156d998
AE
2936 u64 names_size = 0;
2937 u32 want_count;
2938 int ret;
602adf40 2939
00f1f36f 2940 /*
4156d998
AE
2941 * The complete header will include an array of its 64-bit
2942 * snapshot ids, followed by the names of those snapshots as
2943 * a contiguous block of NUL-terminated strings. Note that
2944 * the number of snapshots could change by the time we read
2945 * it in, in which case we re-read it.
00f1f36f 2946 */
4156d998
AE
2947 do {
2948 size_t size;
2949
2950 kfree(ondisk);
2951
2952 size = sizeof (*ondisk);
2953 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2954 size += names_size;
2955 ondisk = kmalloc(size, GFP_KERNEL);
2956 if (!ondisk)
2957 return ERR_PTR(-ENOMEM);
2958
788e2df3 2959 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
4156d998
AE
2960 0, size,
2961 (char *) ondisk, version);
4156d998
AE
2962 if (ret < 0)
2963 goto out_err;
2964 if (WARN_ON((size_t) ret < size)) {
2965 ret = -ENXIO;
06ecc6cb
AE
2966 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2967 size, ret);
4156d998
AE
2968 goto out_err;
2969 }
2970 if (!rbd_dev_ondisk_valid(ondisk)) {
2971 ret = -ENXIO;
06ecc6cb 2972 rbd_warn(rbd_dev, "invalid header");
4156d998 2973 goto out_err;
81e759fb 2974 }
602adf40 2975
4156d998
AE
2976 names_size = le64_to_cpu(ondisk->snap_names_len);
2977 want_count = snap_count;
2978 snap_count = le32_to_cpu(ondisk->snap_count);
2979 } while (snap_count != want_count);
00f1f36f 2980
4156d998 2981 return ondisk;
00f1f36f 2982
4156d998
AE
2983out_err:
2984 kfree(ondisk);
2985
2986 return ERR_PTR(ret);
2987}
2988
2989/*
2990 * reload the ondisk the header
2991 */
2992static int rbd_read_header(struct rbd_device *rbd_dev,
2993 struct rbd_image_header *header)
2994{
2995 struct rbd_image_header_ondisk *ondisk;
2996 u64 ver = 0;
2997 int ret;
602adf40 2998
4156d998
AE
2999 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3000 if (IS_ERR(ondisk))
3001 return PTR_ERR(ondisk);
3002 ret = rbd_header_from_disk(header, ondisk);
3003 if (ret >= 0)
3004 header->obj_version = ver;
3005 kfree(ondisk);
3006
3007 return ret;
602adf40
YS
3008}
3009
41f38c2b 3010static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
3011{
3012 struct rbd_snap *snap;
a0593290 3013 struct rbd_snap *next;
dfc5606d 3014
a0593290 3015 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
41f38c2b 3016 rbd_remove_snap_dev(snap);
dfc5606d
YS
3017}
3018
9478554a
AE
3019static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3020{
3021 sector_t size;
3022
0d7dbfce 3023 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
3024 return;
3025
3026 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
3027 dout("setting size to %llu sectors", (unsigned long long) size);
3028 rbd_dev->mapping.size = (u64) size;
3029 set_capacity(rbd_dev->disk, size);
3030}
3031
602adf40
YS
3032/*
3033 * only read the first part of the ondisk header, without the snaps info
3034 */
117973fb 3035static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
3036{
3037 int ret;
3038 struct rbd_image_header h;
602adf40
YS
3039
3040 ret = rbd_read_header(rbd_dev, &h);
3041 if (ret < 0)
3042 return ret;
3043
a51aa0c0
JD
3044 down_write(&rbd_dev->header_rwsem);
3045
9478554a
AE
3046 /* Update image size, and check for resize of mapped image */
3047 rbd_dev->header.image_size = h.image_size;
3048 rbd_update_mapping_size(rbd_dev);
9db4b3e3 3049
849b4260 3050 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 3051 kfree(rbd_dev->header.snap_sizes);
849b4260 3052 kfree(rbd_dev->header.snap_names);
d1d25646
JD
3053 /* osd requests may still refer to snapc */
3054 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 3055
b813623a
AE
3056 if (hver)
3057 *hver = h.obj_version;
a71b891b 3058 rbd_dev->header.obj_version = h.obj_version;
93a24e08 3059 rbd_dev->header.image_size = h.image_size;
602adf40
YS
3060 rbd_dev->header.snapc = h.snapc;
3061 rbd_dev->header.snap_names = h.snap_names;
3062 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
3063 /* Free the extra copy of the object prefix */
3064 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
3065 kfree(h.object_prefix);
3066
304f6808
AE
3067 ret = rbd_dev_snaps_update(rbd_dev);
3068 if (!ret)
3069 ret = rbd_dev_snaps_register(rbd_dev);
dfc5606d 3070
c666601a 3071 up_write(&rbd_dev->header_rwsem);
602adf40 3072
dfc5606d 3073 return ret;
602adf40
YS
3074}
3075
117973fb 3076static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
3077{
3078 int ret;
3079
117973fb 3080 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 3081 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
3082 if (rbd_dev->image_format == 1)
3083 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3084 else
3085 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993 3086 mutex_unlock(&ctl_mutex);
d98df63e 3087 revalidate_disk(rbd_dev->disk);
1fe5e993
AE
3088
3089 return ret;
3090}
3091
602adf40
YS
3092static int rbd_init_disk(struct rbd_device *rbd_dev)
3093{
3094 struct gendisk *disk;
3095 struct request_queue *q;
593a9e7b 3096 u64 segment_size;
602adf40 3097
602adf40 3098 /* create gendisk info */
602adf40
YS
3099 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3100 if (!disk)
1fcdb8aa 3101 return -ENOMEM;
602adf40 3102
f0f8cef5 3103 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3104 rbd_dev->dev_id);
602adf40
YS
3105 disk->major = rbd_dev->major;
3106 disk->first_minor = 0;
3107 disk->fops = &rbd_bd_ops;
3108 disk->private_data = rbd_dev;
3109
bf0d5f50 3110 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3111 if (!q)
3112 goto out_disk;
029bcbd8 3113
593a9e7b
AE
3114 /* We use the default size, but let's be explicit about it. */
3115 blk_queue_physical_block_size(q, SECTOR_SIZE);
3116
029bcbd8 3117 /* set io sizes to object size */
593a9e7b
AE
3118 segment_size = rbd_obj_bytes(&rbd_dev->header);
3119 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3120 blk_queue_max_segment_size(q, segment_size);
3121 blk_queue_io_min(q, segment_size);
3122 blk_queue_io_opt(q, segment_size);
029bcbd8 3123
602adf40
YS
3124 blk_queue_merge_bvec(q, rbd_merge_bvec);
3125 disk->queue = q;
3126
3127 q->queuedata = rbd_dev;
3128
3129 rbd_dev->disk = disk;
602adf40 3130
12f02944
AE
3131 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3132
602adf40 3133 return 0;
602adf40
YS
3134out_disk:
3135 put_disk(disk);
1fcdb8aa
AE
3136
3137 return -ENOMEM;
602adf40
YS
3138}
3139
dfc5606d
YS
3140/*
3141 sysfs
3142*/
3143
593a9e7b
AE
3144static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3145{
3146 return container_of(dev, struct rbd_device, dev);
3147}
3148
dfc5606d
YS
3149static ssize_t rbd_size_show(struct device *dev,
3150 struct device_attribute *attr, char *buf)
3151{
593a9e7b 3152 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
3153 sector_t size;
3154
3155 down_read(&rbd_dev->header_rwsem);
3156 size = get_capacity(rbd_dev->disk);
3157 up_read(&rbd_dev->header_rwsem);
dfc5606d 3158
a51aa0c0 3159 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
3160}
3161
34b13184
AE
3162/*
3163 * Note this shows the features for whatever's mapped, which is not
3164 * necessarily the base image.
3165 */
3166static ssize_t rbd_features_show(struct device *dev,
3167 struct device_attribute *attr, char *buf)
3168{
3169 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3170
3171 return sprintf(buf, "0x%016llx\n",
3172 (unsigned long long) rbd_dev->mapping.features);
3173}
3174
dfc5606d
YS
3175static ssize_t rbd_major_show(struct device *dev,
3176 struct device_attribute *attr, char *buf)
3177{
593a9e7b 3178 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3179
dfc5606d
YS
3180 return sprintf(buf, "%d\n", rbd_dev->major);
3181}
3182
3183static ssize_t rbd_client_id_show(struct device *dev,
3184 struct device_attribute *attr, char *buf)
602adf40 3185{
593a9e7b 3186 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3187
1dbb4399
AE
3188 return sprintf(buf, "client%lld\n",
3189 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3190}
3191
dfc5606d
YS
3192static ssize_t rbd_pool_show(struct device *dev,
3193 struct device_attribute *attr, char *buf)
602adf40 3194{
593a9e7b 3195 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3196
0d7dbfce 3197 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3198}
3199
9bb2f334
AE
3200static ssize_t rbd_pool_id_show(struct device *dev,
3201 struct device_attribute *attr, char *buf)
3202{
3203 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3204
0d7dbfce
AE
3205 return sprintf(buf, "%llu\n",
3206 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3207}
3208
dfc5606d
YS
3209static ssize_t rbd_name_show(struct device *dev,
3210 struct device_attribute *attr, char *buf)
3211{
593a9e7b 3212 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3213
a92ffdf8
AE
3214 if (rbd_dev->spec->image_name)
3215 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3216
3217 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3218}
3219
589d30e0
AE
3220static ssize_t rbd_image_id_show(struct device *dev,
3221 struct device_attribute *attr, char *buf)
3222{
3223 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3224
0d7dbfce 3225 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3226}
3227
34b13184
AE
3228/*
3229 * Shows the name of the currently-mapped snapshot (or
3230 * RBD_SNAP_HEAD_NAME for the base image).
3231 */
dfc5606d
YS
3232static ssize_t rbd_snap_show(struct device *dev,
3233 struct device_attribute *attr,
3234 char *buf)
3235{
593a9e7b 3236 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3237
0d7dbfce 3238 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3239}
3240
86b00e0d
AE
3241/*
3242 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3243 * for the parent image. If there is no parent, simply shows
3244 * "(no parent image)".
3245 */
3246static ssize_t rbd_parent_show(struct device *dev,
3247 struct device_attribute *attr,
3248 char *buf)
3249{
3250 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3251 struct rbd_spec *spec = rbd_dev->parent_spec;
3252 int count;
3253 char *bufp = buf;
3254
3255 if (!spec)
3256 return sprintf(buf, "(no parent image)\n");
3257
3258 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3259 (unsigned long long) spec->pool_id, spec->pool_name);
3260 if (count < 0)
3261 return count;
3262 bufp += count;
3263
3264 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3265 spec->image_name ? spec->image_name : "(unknown)");
3266 if (count < 0)
3267 return count;
3268 bufp += count;
3269
3270 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3271 (unsigned long long) spec->snap_id, spec->snap_name);
3272 if (count < 0)
3273 return count;
3274 bufp += count;
3275
3276 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3277 if (count < 0)
3278 return count;
3279 bufp += count;
3280
3281 return (ssize_t) (bufp - buf);
3282}
3283
dfc5606d
YS
3284static ssize_t rbd_image_refresh(struct device *dev,
3285 struct device_attribute *attr,
3286 const char *buf,
3287 size_t size)
3288{
593a9e7b 3289 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3290 int ret;
602adf40 3291
117973fb 3292 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
3293
3294 return ret < 0 ? ret : size;
dfc5606d 3295}
602adf40 3296
dfc5606d 3297static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3298static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3299static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3300static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3301static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3302static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3303static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3304static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3305static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3306static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3307static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3308
3309static struct attribute *rbd_attrs[] = {
3310 &dev_attr_size.attr,
34b13184 3311 &dev_attr_features.attr,
dfc5606d
YS
3312 &dev_attr_major.attr,
3313 &dev_attr_client_id.attr,
3314 &dev_attr_pool.attr,
9bb2f334 3315 &dev_attr_pool_id.attr,
dfc5606d 3316 &dev_attr_name.attr,
589d30e0 3317 &dev_attr_image_id.attr,
dfc5606d 3318 &dev_attr_current_snap.attr,
86b00e0d 3319 &dev_attr_parent.attr,
dfc5606d 3320 &dev_attr_refresh.attr,
dfc5606d
YS
3321 NULL
3322};
3323
3324static struct attribute_group rbd_attr_group = {
3325 .attrs = rbd_attrs,
3326};
3327
3328static const struct attribute_group *rbd_attr_groups[] = {
3329 &rbd_attr_group,
3330 NULL
3331};
3332
3333static void rbd_sysfs_dev_release(struct device *dev)
3334{
3335}
3336
3337static struct device_type rbd_device_type = {
3338 .name = "rbd",
3339 .groups = rbd_attr_groups,
3340 .release = rbd_sysfs_dev_release,
3341};
3342
3343
3344/*
3345 sysfs - snapshots
3346*/
3347
3348static ssize_t rbd_snap_size_show(struct device *dev,
3349 struct device_attribute *attr,
3350 char *buf)
3351{
3352 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3353
3591538f 3354 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
3355}
3356
3357static ssize_t rbd_snap_id_show(struct device *dev,
3358 struct device_attribute *attr,
3359 char *buf)
3360{
3361 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3362
3591538f 3363 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
3364}
3365
34b13184
AE
3366static ssize_t rbd_snap_features_show(struct device *dev,
3367 struct device_attribute *attr,
3368 char *buf)
3369{
3370 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3371
3372 return sprintf(buf, "0x%016llx\n",
3373 (unsigned long long) snap->features);
3374}
3375
dfc5606d
YS
3376static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
3377static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
34b13184 3378static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
dfc5606d
YS
3379
3380static struct attribute *rbd_snap_attrs[] = {
3381 &dev_attr_snap_size.attr,
3382 &dev_attr_snap_id.attr,
34b13184 3383 &dev_attr_snap_features.attr,
dfc5606d
YS
3384 NULL,
3385};
3386
3387static struct attribute_group rbd_snap_attr_group = {
3388 .attrs = rbd_snap_attrs,
3389};
3390
3391static void rbd_snap_dev_release(struct device *dev)
3392{
3393 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3394 kfree(snap->name);
3395 kfree(snap);
3396}
3397
3398static const struct attribute_group *rbd_snap_attr_groups[] = {
3399 &rbd_snap_attr_group,
3400 NULL
3401};
3402
3403static struct device_type rbd_snap_device_type = {
3404 .groups = rbd_snap_attr_groups,
3405 .release = rbd_snap_dev_release,
3406};
3407
8b8fb99c
AE
3408static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3409{
3410 kref_get(&spec->kref);
3411
3412 return spec;
3413}
3414
3415static void rbd_spec_free(struct kref *kref);
3416static void rbd_spec_put(struct rbd_spec *spec)
3417{
3418 if (spec)
3419 kref_put(&spec->kref, rbd_spec_free);
3420}
3421
3422static struct rbd_spec *rbd_spec_alloc(void)
3423{
3424 struct rbd_spec *spec;
3425
3426 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3427 if (!spec)
3428 return NULL;
3429 kref_init(&spec->kref);
3430
8b8fb99c
AE
3431 return spec;
3432}
3433
3434static void rbd_spec_free(struct kref *kref)
3435{
3436 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3437
3438 kfree(spec->pool_name);
3439 kfree(spec->image_id);
3440 kfree(spec->image_name);
3441 kfree(spec->snap_name);
3442 kfree(spec);
3443}
3444
cc344fa1 3445static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3446 struct rbd_spec *spec)
3447{
3448 struct rbd_device *rbd_dev;
3449
3450 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3451 if (!rbd_dev)
3452 return NULL;
3453
3454 spin_lock_init(&rbd_dev->lock);
6d292906 3455 rbd_dev->flags = 0;
c53d5893
AE
3456 INIT_LIST_HEAD(&rbd_dev->node);
3457 INIT_LIST_HEAD(&rbd_dev->snaps);
3458 init_rwsem(&rbd_dev->header_rwsem);
3459
3460 rbd_dev->spec = spec;
3461 rbd_dev->rbd_client = rbdc;
3462
0903e875
AE
3463 /* Initialize the layout used for all rbd requests */
3464
3465 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3466 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3467 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3468 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3469
c53d5893
AE
3470 return rbd_dev;
3471}
3472
3473static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3474{
86b00e0d 3475 rbd_spec_put(rbd_dev->parent_spec);
c53d5893
AE
3476 kfree(rbd_dev->header_name);
3477 rbd_put_client(rbd_dev->rbd_client);
3478 rbd_spec_put(rbd_dev->spec);
3479 kfree(rbd_dev);
3480}
3481
304f6808
AE
3482static bool rbd_snap_registered(struct rbd_snap *snap)
3483{
3484 bool ret = snap->dev.type == &rbd_snap_device_type;
3485 bool reg = device_is_registered(&snap->dev);
3486
3487 rbd_assert(!ret ^ reg);
3488
3489 return ret;
3490}
3491
41f38c2b 3492static void rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
3493{
3494 list_del(&snap->node);
304f6808
AE
3495 if (device_is_registered(&snap->dev))
3496 device_unregister(&snap->dev);
dfc5606d
YS
3497}
3498
14e7085d 3499static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
3500 struct device *parent)
3501{
3502 struct device *dev = &snap->dev;
3503 int ret;
3504
3505 dev->type = &rbd_snap_device_type;
3506 dev->parent = parent;
3507 dev->release = rbd_snap_dev_release;
d4b125e9 3508 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
304f6808
AE
3509 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
3510
dfc5606d
YS
3511 ret = device_register(dev);
3512
3513 return ret;
3514}
3515
4e891e0a 3516static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
c8d18425 3517 const char *snap_name,
34b13184
AE
3518 u64 snap_id, u64 snap_size,
3519 u64 snap_features)
dfc5606d 3520{
4e891e0a 3521 struct rbd_snap *snap;
dfc5606d 3522 int ret;
4e891e0a
AE
3523
3524 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 3525 if (!snap)
4e891e0a
AE
3526 return ERR_PTR(-ENOMEM);
3527
3528 ret = -ENOMEM;
c8d18425 3529 snap->name = kstrdup(snap_name, GFP_KERNEL);
4e891e0a
AE
3530 if (!snap->name)
3531 goto err;
3532
c8d18425
AE
3533 snap->id = snap_id;
3534 snap->size = snap_size;
34b13184 3535 snap->features = snap_features;
4e891e0a
AE
3536
3537 return snap;
3538
dfc5606d
YS
3539err:
3540 kfree(snap->name);
3541 kfree(snap);
4e891e0a
AE
3542
3543 return ERR_PTR(ret);
dfc5606d
YS
3544}
3545
cd892126
AE
3546static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3547 u64 *snap_size, u64 *snap_features)
3548{
3549 char *snap_name;
3550
3551 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3552
3553 *snap_size = rbd_dev->header.snap_sizes[which];
3554 *snap_features = 0; /* No features for v1 */
3555
3556 /* Skip over names until we find the one we are looking for */
3557
3558 snap_name = rbd_dev->header.snap_names;
3559 while (which--)
3560 snap_name += strlen(snap_name) + 1;
3561
3562 return snap_name;
3563}
3564
9d475de5
AE
3565/*
3566 * Get the size and object order for an image snapshot, or if
3567 * snap_id is CEPH_NOSNAP, gets this information for the base
3568 * image.
3569 */
3570static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3571 u8 *order, u64 *snap_size)
3572{
3573 __le64 snapid = cpu_to_le64(snap_id);
3574 int ret;
3575 struct {
3576 u8 order;
3577 __le64 size;
3578 } __attribute__ ((packed)) size_buf = { 0 };
3579
36be9a76 3580 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5
AE
3581 "rbd", "get_size",
3582 (char *) &snapid, sizeof (snapid),
07b2391f 3583 (char *) &size_buf, sizeof (size_buf), NULL);
36be9a76 3584 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3585 if (ret < 0)
3586 return ret;
3587
3588 *order = size_buf.order;
3589 *snap_size = le64_to_cpu(size_buf.size);
3590
3591 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3592 (unsigned long long) snap_id, (unsigned int) *order,
3593 (unsigned long long) *snap_size);
3594
3595 return 0;
3596}
3597
3598static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3599{
3600 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3601 &rbd_dev->header.obj_order,
3602 &rbd_dev->header.image_size);
3603}
3604
1e130199
AE
3605static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3606{
3607 void *reply_buf;
3608 int ret;
3609 void *p;
3610
3611 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3612 if (!reply_buf)
3613 return -ENOMEM;
3614
36be9a76 3615 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
1e130199
AE
3616 "rbd", "get_object_prefix",
3617 NULL, 0,
07b2391f 3618 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 3619 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3620 if (ret < 0)
3621 goto out;
3622
3623 p = reply_buf;
3624 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3625 p + RBD_OBJ_PREFIX_LEN_MAX,
3626 NULL, GFP_NOIO);
3627
3628 if (IS_ERR(rbd_dev->header.object_prefix)) {
3629 ret = PTR_ERR(rbd_dev->header.object_prefix);
3630 rbd_dev->header.object_prefix = NULL;
3631 } else {
3632 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3633 }
3634
3635out:
3636 kfree(reply_buf);
3637
3638 return ret;
3639}
3640
b1b5402a
AE
3641static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3642 u64 *snap_features)
3643{
3644 __le64 snapid = cpu_to_le64(snap_id);
3645 struct {
3646 __le64 features;
3647 __le64 incompat;
3648 } features_buf = { 0 };
d889140c 3649 u64 incompat;
b1b5402a
AE
3650 int ret;
3651
36be9a76 3652 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a
AE
3653 "rbd", "get_features",
3654 (char *) &snapid, sizeof (snapid),
3655 (char *) &features_buf, sizeof (features_buf),
07b2391f 3656 NULL);
36be9a76 3657 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3658 if (ret < 0)
3659 return ret;
d889140c
AE
3660
3661 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3662 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3663 return -ENXIO;
d889140c 3664
b1b5402a
AE
3665 *snap_features = le64_to_cpu(features_buf.features);
3666
3667 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3668 (unsigned long long) snap_id,
3669 (unsigned long long) *snap_features,
3670 (unsigned long long) le64_to_cpu(features_buf.incompat));
3671
3672 return 0;
3673}
3674
3675static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3676{
3677 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3678 &rbd_dev->header.features);
3679}
3680
86b00e0d
AE
3681static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3682{
3683 struct rbd_spec *parent_spec;
3684 size_t size;
3685 void *reply_buf = NULL;
3686 __le64 snapid;
3687 void *p;
3688 void *end;
3689 char *image_id;
3690 u64 overlap;
86b00e0d
AE
3691 int ret;
3692
3693 parent_spec = rbd_spec_alloc();
3694 if (!parent_spec)
3695 return -ENOMEM;
3696
3697 size = sizeof (__le64) + /* pool_id */
3698 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3699 sizeof (__le64) + /* snap_id */
3700 sizeof (__le64); /* overlap */
3701 reply_buf = kmalloc(size, GFP_KERNEL);
3702 if (!reply_buf) {
3703 ret = -ENOMEM;
3704 goto out_err;
3705 }
3706
3707 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3708 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d
AE
3709 "rbd", "get_parent",
3710 (char *) &snapid, sizeof (snapid),
07b2391f 3711 (char *) reply_buf, size, NULL);
36be9a76 3712 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3713 if (ret < 0)
3714 goto out_err;
3715
3716 ret = -ERANGE;
3717 p = reply_buf;
3718 end = (char *) reply_buf + size;
3719 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3720 if (parent_spec->pool_id == CEPH_NOPOOL)
3721 goto out; /* No parent? No problem. */
3722
0903e875
AE
3723 /* The ceph file layout needs to fit pool id in 32 bits */
3724
3725 ret = -EIO;
3726 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
3727 goto out;
3728
979ed480 3729 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3730 if (IS_ERR(image_id)) {
3731 ret = PTR_ERR(image_id);
3732 goto out_err;
3733 }
3734 parent_spec->image_id = image_id;
3735 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3736 ceph_decode_64_safe(&p, end, overlap, out_err);
3737
3738 rbd_dev->parent_overlap = overlap;
3739 rbd_dev->parent_spec = parent_spec;
3740 parent_spec = NULL; /* rbd_dev now owns this */
3741out:
3742 ret = 0;
3743out_err:
3744 kfree(reply_buf);
3745 rbd_spec_put(parent_spec);
3746
3747 return ret;
3748}
3749
9e15b77d
AE
3750static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3751{
3752 size_t image_id_size;
3753 char *image_id;
3754 void *p;
3755 void *end;
3756 size_t size;
3757 void *reply_buf = NULL;
3758 size_t len = 0;
3759 char *image_name = NULL;
3760 int ret;
3761
3762 rbd_assert(!rbd_dev->spec->image_name);
3763
69e7a02f
AE
3764 len = strlen(rbd_dev->spec->image_id);
3765 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3766 image_id = kmalloc(image_id_size, GFP_KERNEL);
3767 if (!image_id)
3768 return NULL;
3769
3770 p = image_id;
3771 end = (char *) image_id + image_id_size;
69e7a02f 3772 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
9e15b77d
AE
3773
3774 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3775 reply_buf = kmalloc(size, GFP_KERNEL);
3776 if (!reply_buf)
3777 goto out;
3778
36be9a76 3779 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3780 "rbd", "dir_get_name",
3781 image_id, image_id_size,
07b2391f 3782 (char *) reply_buf, size, NULL);
9e15b77d
AE
3783 if (ret < 0)
3784 goto out;
3785 p = reply_buf;
3786 end = (char *) reply_buf + size;
3787 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3788 if (IS_ERR(image_name))
3789 image_name = NULL;
3790 else
3791 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3792out:
3793 kfree(reply_buf);
3794 kfree(image_id);
3795
3796 return image_name;
3797}
3798
3799/*
3800 * When a parent image gets probed, we only have the pool, image,
3801 * and snapshot ids but not the names of any of them. This call
3802 * is made later to fill in those names. It has to be done after
3803 * rbd_dev_snaps_update() has completed because some of the
3804 * information (in particular, snapshot name) is not available
3805 * until then.
3806 */
3807static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3808{
3809 struct ceph_osd_client *osdc;
3810 const char *name;
3811 void *reply_buf = NULL;
3812 int ret;
3813
3814 if (rbd_dev->spec->pool_name)
3815 return 0; /* Already have the names */
3816
3817 /* Look up the pool name */
3818
3819 osdc = &rbd_dev->rbd_client->client->osdc;
3820 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
935dc89f
AE
3821 if (!name) {
3822 rbd_warn(rbd_dev, "there is no pool with id %llu",
3823 rbd_dev->spec->pool_id); /* Really a BUG() */
3824 return -EIO;
3825 }
9e15b77d
AE
3826
3827 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3828 if (!rbd_dev->spec->pool_name)
3829 return -ENOMEM;
3830
3831 /* Fetch the image name; tolerate failure here */
3832
3833 name = rbd_dev_image_name(rbd_dev);
69e7a02f 3834 if (name)
9e15b77d 3835 rbd_dev->spec->image_name = (char *) name;
69e7a02f 3836 else
06ecc6cb 3837 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d
AE
3838
3839 /* Look up the snapshot name. */
3840
3841 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3842 if (!name) {
935dc89f
AE
3843 rbd_warn(rbd_dev, "no snapshot with id %llu",
3844 rbd_dev->spec->snap_id); /* Really a BUG() */
9e15b77d
AE
3845 ret = -EIO;
3846 goto out_err;
3847 }
3848 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3849 if(!rbd_dev->spec->snap_name)
3850 goto out_err;
3851
3852 return 0;
3853out_err:
3854 kfree(reply_buf);
3855 kfree(rbd_dev->spec->pool_name);
3856 rbd_dev->spec->pool_name = NULL;
3857
3858 return ret;
3859}
3860
6e14b1a6 3861static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3862{
3863 size_t size;
3864 int ret;
3865 void *reply_buf;
3866 void *p;
3867 void *end;
3868 u64 seq;
3869 u32 snap_count;
3870 struct ceph_snap_context *snapc;
3871 u32 i;
3872
3873 /*
3874 * We'll need room for the seq value (maximum snapshot id),
3875 * snapshot count, and array of that many snapshot ids.
3876 * For now we have a fixed upper limit on the number we're
3877 * prepared to receive.
3878 */
3879 size = sizeof (__le64) + sizeof (__le32) +
3880 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3881 reply_buf = kzalloc(size, GFP_KERNEL);
3882 if (!reply_buf)
3883 return -ENOMEM;
3884
36be9a76 3885 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
35d489f9
AE
3886 "rbd", "get_snapcontext",
3887 NULL, 0,
07b2391f 3888 reply_buf, size, ver);
36be9a76 3889 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3890 if (ret < 0)
3891 goto out;
3892
3893 ret = -ERANGE;
3894 p = reply_buf;
3895 end = (char *) reply_buf + size;
3896 ceph_decode_64_safe(&p, end, seq, out);
3897 ceph_decode_32_safe(&p, end, snap_count, out);
3898
3899 /*
3900 * Make sure the reported number of snapshot ids wouldn't go
3901 * beyond the end of our buffer. But before checking that,
3902 * make sure the computed size of the snapshot context we
3903 * allocate is representable in a size_t.
3904 */
3905 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3906 / sizeof (u64)) {
3907 ret = -EINVAL;
3908 goto out;
3909 }
3910 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3911 goto out;
3912
3913 size = sizeof (struct ceph_snap_context) +
3914 snap_count * sizeof (snapc->snaps[0]);
3915 snapc = kmalloc(size, GFP_KERNEL);
3916 if (!snapc) {
3917 ret = -ENOMEM;
3918 goto out;
3919 }
3920
3921 atomic_set(&snapc->nref, 1);
3922 snapc->seq = seq;
3923 snapc->num_snaps = snap_count;
3924 for (i = 0; i < snap_count; i++)
3925 snapc->snaps[i] = ceph_decode_64(&p);
3926
3927 rbd_dev->header.snapc = snapc;
3928
3929 dout(" snap context seq = %llu, snap_count = %u\n",
3930 (unsigned long long) seq, (unsigned int) snap_count);
3931
3932out:
3933 kfree(reply_buf);
3934
3935 return 0;
3936}
3937
b8b1e2db
AE
3938static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3939{
3940 size_t size;
3941 void *reply_buf;
3942 __le64 snap_id;
3943 int ret;
3944 void *p;
3945 void *end;
b8b1e2db
AE
3946 char *snap_name;
3947
3948 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3949 reply_buf = kmalloc(size, GFP_KERNEL);
3950 if (!reply_buf)
3951 return ERR_PTR(-ENOMEM);
3952
3953 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3954 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db
AE
3955 "rbd", "get_snapshot_name",
3956 (char *) &snap_id, sizeof (snap_id),
07b2391f 3957 reply_buf, size, NULL);
36be9a76 3958 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b8b1e2db
AE
3959 if (ret < 0)
3960 goto out;
3961
3962 p = reply_buf;
3963 end = (char *) reply_buf + size;
e5c35534 3964 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
b8b1e2db
AE
3965 if (IS_ERR(snap_name)) {
3966 ret = PTR_ERR(snap_name);
3967 goto out;
3968 } else {
3969 dout(" snap_id 0x%016llx snap_name = %s\n",
3970 (unsigned long long) le64_to_cpu(snap_id), snap_name);
3971 }
3972 kfree(reply_buf);
3973
3974 return snap_name;
3975out:
3976 kfree(reply_buf);
3977
3978 return ERR_PTR(ret);
3979}
3980
3981static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3982 u64 *snap_size, u64 *snap_features)
3983{
e0b49868 3984 u64 snap_id;
b8b1e2db
AE
3985 u8 order;
3986 int ret;
3987
3988 snap_id = rbd_dev->header.snapc->snaps[which];
3989 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3990 if (ret)
3991 return ERR_PTR(ret);
3992 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3993 if (ret)
3994 return ERR_PTR(ret);
3995
3996 return rbd_dev_v2_snap_name(rbd_dev, which);
3997}
3998
3999static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4000 u64 *snap_size, u64 *snap_features)
4001{
4002 if (rbd_dev->image_format == 1)
4003 return rbd_dev_v1_snap_info(rbd_dev, which,
4004 snap_size, snap_features);
4005 if (rbd_dev->image_format == 2)
4006 return rbd_dev_v2_snap_info(rbd_dev, which,
4007 snap_size, snap_features);
4008 return ERR_PTR(-EINVAL);
4009}
4010
117973fb
AE
4011static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4012{
4013 int ret;
4014 __u8 obj_order;
4015
4016 down_write(&rbd_dev->header_rwsem);
4017
4018 /* Grab old order first, to see if it changes */
4019
4020 obj_order = rbd_dev->header.obj_order,
4021 ret = rbd_dev_v2_image_size(rbd_dev);
4022 if (ret)
4023 goto out;
4024 if (rbd_dev->header.obj_order != obj_order) {
4025 ret = -EIO;
4026 goto out;
4027 }
4028 rbd_update_mapping_size(rbd_dev);
4029
4030 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4031 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4032 if (ret)
4033 goto out;
4034 ret = rbd_dev_snaps_update(rbd_dev);
4035 dout("rbd_dev_snaps_update returned %d\n", ret);
4036 if (ret)
4037 goto out;
4038 ret = rbd_dev_snaps_register(rbd_dev);
4039 dout("rbd_dev_snaps_register returned %d\n", ret);
4040out:
4041 up_write(&rbd_dev->header_rwsem);
4042
4043 return ret;
4044}
4045
dfc5606d 4046/*
35938150
AE
4047 * Scan the rbd device's current snapshot list and compare it to the
4048 * newly-received snapshot context. Remove any existing snapshots
4049 * not present in the new snapshot context. Add a new snapshot for
4050 * any snaphots in the snapshot context not in the current list.
4051 * And verify there are no changes to snapshots we already know
4052 * about.
4053 *
4054 * Assumes the snapshots in the snapshot context are sorted by
4055 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4056 * are also maintained in that order.)
dfc5606d 4057 */
304f6808 4058static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 4059{
35938150
AE
4060 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4061 const u32 snap_count = snapc->num_snaps;
35938150
AE
4062 struct list_head *head = &rbd_dev->snaps;
4063 struct list_head *links = head->next;
4064 u32 index = 0;
dfc5606d 4065
9fcbb800 4066 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
35938150
AE
4067 while (index < snap_count || links != head) {
4068 u64 snap_id;
4069 struct rbd_snap *snap;
cd892126
AE
4070 char *snap_name;
4071 u64 snap_size = 0;
4072 u64 snap_features = 0;
dfc5606d 4073
35938150
AE
4074 snap_id = index < snap_count ? snapc->snaps[index]
4075 : CEPH_NOSNAP;
4076 snap = links != head ? list_entry(links, struct rbd_snap, node)
4077 : NULL;
aafb230e 4078 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 4079
35938150
AE
4080 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4081 struct list_head *next = links->next;
dfc5606d 4082
6d292906
AE
4083 /*
4084 * A previously-existing snapshot is not in
4085 * the new snap context.
4086 *
4087 * If the now missing snapshot is the one the
4088 * image is mapped to, clear its exists flag
4089 * so we can avoid sending any more requests
4090 * to it.
4091 */
0d7dbfce 4092 if (rbd_dev->spec->snap_id == snap->id)
6d292906 4093 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
41f38c2b 4094 rbd_remove_snap_dev(snap);
9fcbb800 4095 dout("%ssnap id %llu has been removed\n",
0d7dbfce
AE
4096 rbd_dev->spec->snap_id == snap->id ?
4097 "mapped " : "",
9fcbb800 4098 (unsigned long long) snap->id);
35938150
AE
4099
4100 /* Done with this list entry; advance */
4101
4102 links = next;
dfc5606d
YS
4103 continue;
4104 }
35938150 4105
b8b1e2db
AE
4106 snap_name = rbd_dev_snap_info(rbd_dev, index,
4107 &snap_size, &snap_features);
cd892126
AE
4108 if (IS_ERR(snap_name))
4109 return PTR_ERR(snap_name);
4110
9fcbb800
AE
4111 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
4112 (unsigned long long) snap_id);
35938150
AE
4113 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4114 struct rbd_snap *new_snap;
4115
4116 /* We haven't seen this snapshot before */
4117
c8d18425 4118 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
cd892126 4119 snap_id, snap_size, snap_features);
9fcbb800
AE
4120 if (IS_ERR(new_snap)) {
4121 int err = PTR_ERR(new_snap);
4122
4123 dout(" failed to add dev, error %d\n", err);
4124
4125 return err;
4126 }
35938150
AE
4127
4128 /* New goes before existing, or at end of list */
4129
9fcbb800 4130 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
4131 if (snap)
4132 list_add_tail(&new_snap->node, &snap->node);
4133 else
523f3258 4134 list_add_tail(&new_snap->node, head);
35938150
AE
4135 } else {
4136 /* Already have this one */
4137
9fcbb800
AE
4138 dout(" already present\n");
4139
cd892126 4140 rbd_assert(snap->size == snap_size);
aafb230e 4141 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 4142 rbd_assert(snap->features == snap_features);
35938150
AE
4143
4144 /* Done with this list entry; advance */
4145
4146 links = links->next;
dfc5606d 4147 }
35938150
AE
4148
4149 /* Advance to the next entry in the snapshot context */
4150
4151 index++;
dfc5606d 4152 }
9fcbb800 4153 dout("%s: done\n", __func__);
dfc5606d
YS
4154
4155 return 0;
4156}
4157
304f6808
AE
4158/*
4159 * Scan the list of snapshots and register the devices for any that
4160 * have not already been registered.
4161 */
4162static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
4163{
4164 struct rbd_snap *snap;
4165 int ret = 0;
4166
37206ee5 4167 dout("%s:\n", __func__);
86ff77bb
AE
4168 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
4169 return -EIO;
304f6808
AE
4170
4171 list_for_each_entry(snap, &rbd_dev->snaps, node) {
4172 if (!rbd_snap_registered(snap)) {
4173 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
4174 if (ret < 0)
4175 break;
4176 }
4177 }
4178 dout("%s: returning %d\n", __func__, ret);
4179
4180 return ret;
4181}
4182
dfc5606d
YS
4183static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4184{
dfc5606d 4185 struct device *dev;
cd789ab9 4186 int ret;
dfc5606d
YS
4187
4188 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4189
cd789ab9 4190 dev = &rbd_dev->dev;
dfc5606d
YS
4191 dev->bus = &rbd_bus_type;
4192 dev->type = &rbd_device_type;
4193 dev->parent = &rbd_root_dev;
4194 dev->release = rbd_dev_release;
de71a297 4195 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4196 ret = device_register(dev);
dfc5606d 4197
dfc5606d 4198 mutex_unlock(&ctl_mutex);
cd789ab9 4199
dfc5606d 4200 return ret;
602adf40
YS
4201}
4202
dfc5606d
YS
4203static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4204{
4205 device_unregister(&rbd_dev->dev);
4206}
4207
e2839308 4208static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4209
4210/*
499afd5b
AE
4211 * Get a unique rbd identifier for the given new rbd_dev, and add
4212 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4213 */
e2839308 4214static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4215{
e2839308 4216 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4217
4218 spin_lock(&rbd_dev_list_lock);
4219 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4220 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4221 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4222 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4223}
b7f23c36 4224
1ddbe94e 4225/*
499afd5b
AE
4226 * Remove an rbd_dev from the global list, and record that its
4227 * identifier is no longer in use.
1ddbe94e 4228 */
e2839308 4229static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4230{
d184f6bf 4231 struct list_head *tmp;
de71a297 4232 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4233 int max_id;
4234
aafb230e 4235 rbd_assert(rbd_id > 0);
499afd5b 4236
e2839308
AE
4237 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4238 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4239 spin_lock(&rbd_dev_list_lock);
4240 list_del_init(&rbd_dev->node);
d184f6bf
AE
4241
4242 /*
4243 * If the id being "put" is not the current maximum, there
4244 * is nothing special we need to do.
4245 */
e2839308 4246 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4247 spin_unlock(&rbd_dev_list_lock);
4248 return;
4249 }
4250
4251 /*
4252 * We need to update the current maximum id. Search the
4253 * list to find out what it is. We're more likely to find
4254 * the maximum at the end, so search the list backward.
4255 */
4256 max_id = 0;
4257 list_for_each_prev(tmp, &rbd_dev_list) {
4258 struct rbd_device *rbd_dev;
4259
4260 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4261 if (rbd_dev->dev_id > max_id)
4262 max_id = rbd_dev->dev_id;
d184f6bf 4263 }
499afd5b 4264 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4265
1ddbe94e 4266 /*
e2839308 4267 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4268 * which case it now accurately reflects the new maximum.
4269 * Be careful not to overwrite the maximum value in that
4270 * case.
1ddbe94e 4271 */
e2839308
AE
4272 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4273 dout(" max dev id has been reset\n");
b7f23c36
AE
4274}
4275
e28fff26
AE
4276/*
4277 * Skips over white space at *buf, and updates *buf to point to the
4278 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4279 * the token (string of non-white space characters) found. Note
4280 * that *buf must be terminated with '\0'.
e28fff26
AE
4281 */
4282static inline size_t next_token(const char **buf)
4283{
4284 /*
4285 * These are the characters that produce nonzero for
4286 * isspace() in the "C" and "POSIX" locales.
4287 */
4288 const char *spaces = " \f\n\r\t\v";
4289
4290 *buf += strspn(*buf, spaces); /* Find start of token */
4291
4292 return strcspn(*buf, spaces); /* Return token length */
4293}
4294
4295/*
4296 * Finds the next token in *buf, and if the provided token buffer is
4297 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4298 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4299 * must be terminated with '\0' on entry.
e28fff26
AE
4300 *
4301 * Returns the length of the token found (not including the '\0').
4302 * Return value will be 0 if no token is found, and it will be >=
4303 * token_size if the token would not fit.
4304 *
593a9e7b 4305 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4306 * found token. Note that this occurs even if the token buffer is
4307 * too small to hold it.
4308 */
4309static inline size_t copy_token(const char **buf,
4310 char *token,
4311 size_t token_size)
4312{
4313 size_t len;
4314
4315 len = next_token(buf);
4316 if (len < token_size) {
4317 memcpy(token, *buf, len);
4318 *(token + len) = '\0';
4319 }
4320 *buf += len;
4321
4322 return len;
4323}
4324
ea3352f4
AE
4325/*
4326 * Finds the next token in *buf, dynamically allocates a buffer big
4327 * enough to hold a copy of it, and copies the token into the new
4328 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4329 * that a duplicate buffer is created even for a zero-length token.
4330 *
4331 * Returns a pointer to the newly-allocated duplicate, or a null
4332 * pointer if memory for the duplicate was not available. If
4333 * the lenp argument is a non-null pointer, the length of the token
4334 * (not including the '\0') is returned in *lenp.
4335 *
4336 * If successful, the *buf pointer will be updated to point beyond
4337 * the end of the found token.
4338 *
4339 * Note: uses GFP_KERNEL for allocation.
4340 */
4341static inline char *dup_token(const char **buf, size_t *lenp)
4342{
4343 char *dup;
4344 size_t len;
4345
4346 len = next_token(buf);
4caf35f9 4347 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4348 if (!dup)
4349 return NULL;
ea3352f4
AE
4350 *(dup + len) = '\0';
4351 *buf += len;
4352
4353 if (lenp)
4354 *lenp = len;
4355
4356 return dup;
4357}
4358
a725f65e 4359/*
859c31df
AE
4360 * Parse the options provided for an "rbd add" (i.e., rbd image
4361 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4362 * and the data written is passed here via a NUL-terminated buffer.
4363 * Returns 0 if successful or an error code otherwise.
d22f76e7 4364 *
859c31df
AE
4365 * The information extracted from these options is recorded in
4366 * the other parameters which return dynamically-allocated
4367 * structures:
4368 * ceph_opts
4369 * The address of a pointer that will refer to a ceph options
4370 * structure. Caller must release the returned pointer using
4371 * ceph_destroy_options() when it is no longer needed.
4372 * rbd_opts
4373 * Address of an rbd options pointer. Fully initialized by
4374 * this function; caller must release with kfree().
4375 * spec
4376 * Address of an rbd image specification pointer. Fully
4377 * initialized by this function based on parsed options.
4378 * Caller must release with rbd_spec_put().
4379 *
4380 * The options passed take this form:
4381 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4382 * where:
4383 * <mon_addrs>
4384 * A comma-separated list of one or more monitor addresses.
4385 * A monitor address is an ip address, optionally followed
4386 * by a port number (separated by a colon).
4387 * I.e.: ip1[:port1][,ip2[:port2]...]
4388 * <options>
4389 * A comma-separated list of ceph and/or rbd options.
4390 * <pool_name>
4391 * The name of the rados pool containing the rbd image.
4392 * <image_name>
4393 * The name of the image in that pool to map.
4394 * <snap_id>
4395 * An optional snapshot id. If provided, the mapping will
4396 * present data from the image at the time that snapshot was
4397 * created. The image head is used if no snapshot id is
4398 * provided. Snapshot mappings are always read-only.
a725f65e 4399 */
859c31df 4400static int rbd_add_parse_args(const char *buf,
dc79b113 4401 struct ceph_options **ceph_opts,
859c31df
AE
4402 struct rbd_options **opts,
4403 struct rbd_spec **rbd_spec)
e28fff26 4404{
d22f76e7 4405 size_t len;
859c31df 4406 char *options;
0ddebc0c
AE
4407 const char *mon_addrs;
4408 size_t mon_addrs_size;
859c31df 4409 struct rbd_spec *spec = NULL;
4e9afeba 4410 struct rbd_options *rbd_opts = NULL;
859c31df 4411 struct ceph_options *copts;
dc79b113 4412 int ret;
e28fff26
AE
4413
4414 /* The first four tokens are required */
4415
7ef3214a 4416 len = next_token(&buf);
4fb5d671
AE
4417 if (!len) {
4418 rbd_warn(NULL, "no monitor address(es) provided");
4419 return -EINVAL;
4420 }
0ddebc0c 4421 mon_addrs = buf;
f28e565a 4422 mon_addrs_size = len + 1;
7ef3214a 4423 buf += len;
a725f65e 4424
dc79b113 4425 ret = -EINVAL;
f28e565a
AE
4426 options = dup_token(&buf, NULL);
4427 if (!options)
dc79b113 4428 return -ENOMEM;
4fb5d671
AE
4429 if (!*options) {
4430 rbd_warn(NULL, "no options provided");
4431 goto out_err;
4432 }
e28fff26 4433
859c31df
AE
4434 spec = rbd_spec_alloc();
4435 if (!spec)
f28e565a 4436 goto out_mem;
859c31df
AE
4437
4438 spec->pool_name = dup_token(&buf, NULL);
4439 if (!spec->pool_name)
4440 goto out_mem;
4fb5d671
AE
4441 if (!*spec->pool_name) {
4442 rbd_warn(NULL, "no pool name provided");
4443 goto out_err;
4444 }
e28fff26 4445
69e7a02f 4446 spec->image_name = dup_token(&buf, NULL);
859c31df 4447 if (!spec->image_name)
f28e565a 4448 goto out_mem;
4fb5d671
AE
4449 if (!*spec->image_name) {
4450 rbd_warn(NULL, "no image name provided");
4451 goto out_err;
4452 }
d4b125e9 4453
f28e565a
AE
4454 /*
4455 * Snapshot name is optional; default is to use "-"
4456 * (indicating the head/no snapshot).
4457 */
3feeb894 4458 len = next_token(&buf);
820a5f3e 4459 if (!len) {
3feeb894
AE
4460 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4461 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4462 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4463 ret = -ENAMETOOLONG;
f28e565a 4464 goto out_err;
849b4260 4465 }
4caf35f9 4466 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
859c31df 4467 if (!spec->snap_name)
f28e565a 4468 goto out_mem;
859c31df 4469 *(spec->snap_name + len) = '\0';
e5c35534 4470
0ddebc0c 4471 /* Initialize all rbd options to the defaults */
e28fff26 4472
4e9afeba
AE
4473 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4474 if (!rbd_opts)
4475 goto out_mem;
4476
4477 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4478
859c31df 4479 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4480 mon_addrs + mon_addrs_size - 1,
4e9afeba 4481 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4482 if (IS_ERR(copts)) {
4483 ret = PTR_ERR(copts);
dc79b113
AE
4484 goto out_err;
4485 }
859c31df
AE
4486 kfree(options);
4487
4488 *ceph_opts = copts;
4e9afeba 4489 *opts = rbd_opts;
859c31df 4490 *rbd_spec = spec;
0ddebc0c 4491
dc79b113 4492 return 0;
f28e565a 4493out_mem:
dc79b113 4494 ret = -ENOMEM;
d22f76e7 4495out_err:
859c31df
AE
4496 kfree(rbd_opts);
4497 rbd_spec_put(spec);
f28e565a 4498 kfree(options);
d22f76e7 4499
dc79b113 4500 return ret;
a725f65e
AE
4501}
4502
589d30e0
AE
4503/*
4504 * An rbd format 2 image has a unique identifier, distinct from the
4505 * name given to it by the user. Internally, that identifier is
4506 * what's used to specify the names of objects related to the image.
4507 *
4508 * A special "rbd id" object is used to map an rbd image name to its
4509 * id. If that object doesn't exist, then there is no v2 rbd image
4510 * with the supplied name.
4511 *
4512 * This function will record the given rbd_dev's image_id field if
4513 * it can be determined, and in that case will return 0. If any
4514 * errors occur a negative errno will be returned and the rbd_dev's
4515 * image_id field will be unchanged (and should be NULL).
4516 */
4517static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4518{
4519 int ret;
4520 size_t size;
4521 char *object_name;
4522 void *response;
4523 void *p;
4524
2f82ee54
AE
4525 /* If we already have it we don't need to look it up */
4526
4527 if (rbd_dev->spec->image_id)
4528 return 0;
4529
2c0d0a10
AE
4530 /*
4531 * When probing a parent image, the image id is already
4532 * known (and the image name likely is not). There's no
4533 * need to fetch the image id again in this case.
4534 */
4535 if (rbd_dev->spec->image_id)
4536 return 0;
4537
589d30e0
AE
4538 /*
4539 * First, see if the format 2 image id file exists, and if
4540 * so, get the image's persistent id from it.
4541 */
69e7a02f 4542 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4543 object_name = kmalloc(size, GFP_NOIO);
4544 if (!object_name)
4545 return -ENOMEM;
0d7dbfce 4546 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4547 dout("rbd id object name is %s\n", object_name);
4548
4549 /* Response will be an encoded string, which includes a length */
4550
4551 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4552 response = kzalloc(size, GFP_NOIO);
4553 if (!response) {
4554 ret = -ENOMEM;
4555 goto out;
4556 }
4557
36be9a76 4558 ret = rbd_obj_method_sync(rbd_dev, object_name,
589d30e0
AE
4559 "rbd", "get_id",
4560 NULL, 0,
07b2391f 4561 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 4562 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
589d30e0
AE
4563 if (ret < 0)
4564 goto out;
4565
4566 p = response;
0d7dbfce 4567 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
589d30e0 4568 p + RBD_IMAGE_ID_LEN_MAX,
979ed480 4569 NULL, GFP_NOIO);
0d7dbfce
AE
4570 if (IS_ERR(rbd_dev->spec->image_id)) {
4571 ret = PTR_ERR(rbd_dev->spec->image_id);
4572 rbd_dev->spec->image_id = NULL;
589d30e0 4573 } else {
0d7dbfce 4574 dout("image_id is %s\n", rbd_dev->spec->image_id);
589d30e0
AE
4575 }
4576out:
4577 kfree(response);
4578 kfree(object_name);
4579
4580 return ret;
4581}
4582
a30b71b9
AE
4583static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4584{
4585 int ret;
4586 size_t size;
4587
4588 /* Version 1 images have no id; empty string is used */
4589
0d7dbfce
AE
4590 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4591 if (!rbd_dev->spec->image_id)
a30b71b9 4592 return -ENOMEM;
a30b71b9
AE
4593
4594 /* Record the header object name for this rbd image. */
4595
69e7a02f 4596 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
a30b71b9
AE
4597 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4598 if (!rbd_dev->header_name) {
4599 ret = -ENOMEM;
4600 goto out_err;
4601 }
0d7dbfce
AE
4602 sprintf(rbd_dev->header_name, "%s%s",
4603 rbd_dev->spec->image_name, RBD_SUFFIX);
a30b71b9
AE
4604
4605 /* Populate rbd image metadata */
4606
4607 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4608 if (ret < 0)
4609 goto out_err;
86b00e0d
AE
4610
4611 /* Version 1 images have no parent (no layering) */
4612
4613 rbd_dev->parent_spec = NULL;
4614 rbd_dev->parent_overlap = 0;
4615
a30b71b9
AE
4616 rbd_dev->image_format = 1;
4617
4618 dout("discovered version 1 image, header name is %s\n",
4619 rbd_dev->header_name);
4620
4621 return 0;
4622
4623out_err:
4624 kfree(rbd_dev->header_name);
4625 rbd_dev->header_name = NULL;
0d7dbfce
AE
4626 kfree(rbd_dev->spec->image_id);
4627 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4628
4629 return ret;
4630}
4631
4632static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4633{
4634 size_t size;
9d475de5 4635 int ret;
6e14b1a6 4636 u64 ver = 0;
a30b71b9
AE
4637
4638 /*
4639 * Image id was filled in by the caller. Record the header
4640 * object name for this rbd image.
4641 */
979ed480 4642 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
a30b71b9
AE
4643 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4644 if (!rbd_dev->header_name)
4645 return -ENOMEM;
4646 sprintf(rbd_dev->header_name, "%s%s",
0d7dbfce 4647 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
9d475de5
AE
4648
4649 /* Get the size and object order for the image */
4650
4651 ret = rbd_dev_v2_image_size(rbd_dev);
1e130199
AE
4652 if (ret < 0)
4653 goto out_err;
4654
4655 /* Get the object prefix (a.k.a. block_name) for the image */
4656
4657 ret = rbd_dev_v2_object_prefix(rbd_dev);
b1b5402a
AE
4658 if (ret < 0)
4659 goto out_err;
4660
d889140c 4661 /* Get the and check features for the image */
b1b5402a
AE
4662
4663 ret = rbd_dev_v2_features(rbd_dev);
9d475de5
AE
4664 if (ret < 0)
4665 goto out_err;
35d489f9 4666
86b00e0d
AE
4667 /* If the image supports layering, get the parent info */
4668
4669 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4670 ret = rbd_dev_v2_parent_info(rbd_dev);
4671 if (ret < 0)
4672 goto out_err;
4673 }
4674
6e14b1a6
AE
4675 /* crypto and compression type aren't (yet) supported for v2 images */
4676
4677 rbd_dev->header.crypt_type = 0;
4678 rbd_dev->header.comp_type = 0;
35d489f9 4679
6e14b1a6
AE
4680 /* Get the snapshot context, plus the header version */
4681
4682 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
4683 if (ret)
4684 goto out_err;
6e14b1a6
AE
4685 rbd_dev->header.obj_version = ver;
4686
a30b71b9
AE
4687 rbd_dev->image_format = 2;
4688
4689 dout("discovered version 2 image, header name is %s\n",
4690 rbd_dev->header_name);
4691
35152979 4692 return 0;
9d475de5 4693out_err:
86b00e0d
AE
4694 rbd_dev->parent_overlap = 0;
4695 rbd_spec_put(rbd_dev->parent_spec);
4696 rbd_dev->parent_spec = NULL;
9d475de5
AE
4697 kfree(rbd_dev->header_name);
4698 rbd_dev->header_name = NULL;
1e130199
AE
4699 kfree(rbd_dev->header.object_prefix);
4700 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4701
4702 return ret;
a30b71b9
AE
4703}
4704
83a06263
AE
4705static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4706{
2f82ee54
AE
4707 struct rbd_device *parent = NULL;
4708 struct rbd_spec *parent_spec = NULL;
4709 struct rbd_client *rbdc = NULL;
83a06263
AE
4710 int ret;
4711
4712 /* no need to lock here, as rbd_dev is not registered yet */
4713 ret = rbd_dev_snaps_update(rbd_dev);
4714 if (ret)
4715 return ret;
4716
9e15b77d
AE
4717 ret = rbd_dev_probe_update_spec(rbd_dev);
4718 if (ret)
4719 goto err_out_snaps;
4720
83a06263
AE
4721 ret = rbd_dev_set_mapping(rbd_dev);
4722 if (ret)
4723 goto err_out_snaps;
4724
4725 /* generate unique id: find highest unique id, add one */
4726 rbd_dev_id_get(rbd_dev);
4727
4728 /* Fill in the device name, now that we have its id. */
4729 BUILD_BUG_ON(DEV_NAME_LEN
4730 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4731 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4732
4733 /* Get our block major device number. */
4734
4735 ret = register_blkdev(0, rbd_dev->name);
4736 if (ret < 0)
4737 goto err_out_id;
4738 rbd_dev->major = ret;
4739
4740 /* Set up the blkdev mapping. */
4741
4742 ret = rbd_init_disk(rbd_dev);
4743 if (ret)
4744 goto err_out_blkdev;
4745
4746 ret = rbd_bus_add_dev(rbd_dev);
4747 if (ret)
4748 goto err_out_disk;
4749
4750 /*
4751 * At this point cleanup in the event of an error is the job
4752 * of the sysfs code (initiated by rbd_bus_del_dev()).
4753 */
2f82ee54
AE
4754 /* Probe the parent if there is one */
4755
4756 if (rbd_dev->parent_spec) {
4757 /*
4758 * We need to pass a reference to the client and the
4759 * parent spec when creating the parent rbd_dev.
4760 * Images related by parent/child relationships
4761 * always share both.
4762 */
4763 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4764 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4765
4766 parent = rbd_dev_create(rbdc, parent_spec);
4767 if (!parent) {
4768 ret = -ENOMEM;
4769 goto err_out_spec;
4770 }
4771 rbdc = NULL; /* parent now owns reference */
4772 parent_spec = NULL; /* parent now owns reference */
4773 ret = rbd_dev_probe(parent);
4774 if (ret < 0)
4775 goto err_out_parent;
4776 rbd_dev->parent = parent;
4777 }
4778
83a06263
AE
4779 down_write(&rbd_dev->header_rwsem);
4780 ret = rbd_dev_snaps_register(rbd_dev);
4781 up_write(&rbd_dev->header_rwsem);
4782 if (ret)
4783 goto err_out_bus;
4784
9969ebc5 4785 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
83a06263
AE
4786 if (ret)
4787 goto err_out_bus;
4788
4789 /* Everything's ready. Announce the disk to the world. */
4790
4791 add_disk(rbd_dev->disk);
4792
4793 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4794 (unsigned long long) rbd_dev->mapping.size);
4795
4796 return ret;
2f82ee54
AE
4797
4798err_out_parent:
4799 rbd_dev_destroy(parent);
4800err_out_spec:
4801 rbd_spec_put(parent_spec);
4802 rbd_put_client(rbdc);
83a06263
AE
4803err_out_bus:
4804 /* this will also clean up rest of rbd_dev stuff */
4805
4806 rbd_bus_del_dev(rbd_dev);
4807
4808 return ret;
4809err_out_disk:
4810 rbd_free_disk(rbd_dev);
4811err_out_blkdev:
4812 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4813err_out_id:
4814 rbd_dev_id_put(rbd_dev);
4815err_out_snaps:
4816 rbd_remove_all_snaps(rbd_dev);
4817
4818 return ret;
4819}
4820
a30b71b9
AE
4821/*
4822 * Probe for the existence of the header object for the given rbd
4823 * device. For format 2 images this includes determining the image
4824 * id.
4825 */
4826static int rbd_dev_probe(struct rbd_device *rbd_dev)
4827{
4828 int ret;
4829
4830 /*
4831 * Get the id from the image id object. If it's not a
4832 * format 2 image, we'll get ENOENT back, and we'll assume
4833 * it's a format 1 image.
4834 */
4835 ret = rbd_dev_image_id(rbd_dev);
4836 if (ret)
4837 ret = rbd_dev_v1_probe(rbd_dev);
4838 else
4839 ret = rbd_dev_v2_probe(rbd_dev);
83a06263 4840 if (ret) {
a30b71b9
AE
4841 dout("probe failed, returning %d\n", ret);
4842
83a06263
AE
4843 return ret;
4844 }
4845
4846 ret = rbd_dev_probe_finish(rbd_dev);
4847 if (ret)
4848 rbd_header_free(&rbd_dev->header);
4849
a30b71b9
AE
4850 return ret;
4851}
4852
59c2be1e
YS
4853static ssize_t rbd_add(struct bus_type *bus,
4854 const char *buf,
4855 size_t count)
602adf40 4856{
cb8627c7 4857 struct rbd_device *rbd_dev = NULL;
dc79b113 4858 struct ceph_options *ceph_opts = NULL;
4e9afeba 4859 struct rbd_options *rbd_opts = NULL;
859c31df 4860 struct rbd_spec *spec = NULL;
9d3997fd 4861 struct rbd_client *rbdc;
27cc2594
AE
4862 struct ceph_osd_client *osdc;
4863 int rc = -ENOMEM;
602adf40
YS
4864
4865 if (!try_module_get(THIS_MODULE))
4866 return -ENODEV;
4867
602adf40 4868 /* parse add command */
859c31df 4869 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4870 if (rc < 0)
bd4ba655 4871 goto err_out_module;
78cea76e 4872
9d3997fd
AE
4873 rbdc = rbd_get_client(ceph_opts);
4874 if (IS_ERR(rbdc)) {
4875 rc = PTR_ERR(rbdc);
0ddebc0c 4876 goto err_out_args;
9d3997fd 4877 }
c53d5893 4878 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4879
602adf40 4880 /* pick the pool */
9d3997fd 4881 osdc = &rbdc->client->osdc;
859c31df 4882 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4883 if (rc < 0)
4884 goto err_out_client;
859c31df
AE
4885 spec->pool_id = (u64) rc;
4886
0903e875
AE
4887 /* The ceph file layout needs to fit pool id in 32 bits */
4888
4889 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4890 rc = -EIO;
4891 goto err_out_client;
4892 }
4893
c53d5893 4894 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4895 if (!rbd_dev)
4896 goto err_out_client;
c53d5893
AE
4897 rbdc = NULL; /* rbd_dev now owns this */
4898 spec = NULL; /* rbd_dev now owns this */
602adf40 4899
bd4ba655 4900 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4901 kfree(rbd_opts);
4902 rbd_opts = NULL; /* done with this */
bd4ba655 4903
a30b71b9
AE
4904 rc = rbd_dev_probe(rbd_dev);
4905 if (rc < 0)
c53d5893 4906 goto err_out_rbd_dev;
05fd6f6f 4907
602adf40 4908 return count;
c53d5893
AE
4909err_out_rbd_dev:
4910 rbd_dev_destroy(rbd_dev);
bd4ba655 4911err_out_client:
9d3997fd 4912 rbd_put_client(rbdc);
0ddebc0c 4913err_out_args:
78cea76e
AE
4914 if (ceph_opts)
4915 ceph_destroy_options(ceph_opts);
4e9afeba 4916 kfree(rbd_opts);
859c31df 4917 rbd_spec_put(spec);
bd4ba655
AE
4918err_out_module:
4919 module_put(THIS_MODULE);
27cc2594 4920
602adf40 4921 dout("Error adding device %s\n", buf);
27cc2594
AE
4922
4923 return (ssize_t) rc;
602adf40
YS
4924}
4925
de71a297 4926static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4927{
4928 struct list_head *tmp;
4929 struct rbd_device *rbd_dev;
4930
e124a82f 4931 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4932 list_for_each(tmp, &rbd_dev_list) {
4933 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4934 if (rbd_dev->dev_id == dev_id) {
e124a82f 4935 spin_unlock(&rbd_dev_list_lock);
602adf40 4936 return rbd_dev;
e124a82f 4937 }
602adf40 4938 }
e124a82f 4939 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4940 return NULL;
4941}
4942
dfc5606d 4943static void rbd_dev_release(struct device *dev)
602adf40 4944{
593a9e7b 4945 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4946
59c2be1e 4947 if (rbd_dev->watch_event)
9969ebc5 4948 rbd_dev_header_watch_sync(rbd_dev, 0);
602adf40
YS
4949
4950 /* clean up and free blkdev */
4951 rbd_free_disk(rbd_dev);
4952 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d 4953
2ac4e75d
AE
4954 /* release allocated disk header fields */
4955 rbd_header_free(&rbd_dev->header);
4956
32eec68d 4957 /* done with the id, and with the rbd_dev */
e2839308 4958 rbd_dev_id_put(rbd_dev);
c53d5893
AE
4959 rbd_assert(rbd_dev->rbd_client != NULL);
4960 rbd_dev_destroy(rbd_dev);
602adf40
YS
4961
4962 /* release module ref */
4963 module_put(THIS_MODULE);
602adf40
YS
4964}
4965
2f82ee54
AE
4966static void __rbd_remove(struct rbd_device *rbd_dev)
4967{
4968 rbd_remove_all_snaps(rbd_dev);
4969 rbd_bus_del_dev(rbd_dev);
4970}
4971
dfc5606d
YS
4972static ssize_t rbd_remove(struct bus_type *bus,
4973 const char *buf,
4974 size_t count)
602adf40
YS
4975{
4976 struct rbd_device *rbd_dev = NULL;
4977 int target_id, rc;
4978 unsigned long ul;
4979 int ret = count;
4980
4981 rc = strict_strtoul(buf, 10, &ul);
4982 if (rc)
4983 return rc;
4984
4985 /* convert to int; abort if we lost anything in the conversion */
4986 target_id = (int) ul;
4987 if (target_id != ul)
4988 return -EINVAL;
4989
4990 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4991
4992 rbd_dev = __rbd_get_dev(target_id);
4993 if (!rbd_dev) {
4994 ret = -ENOENT;
4995 goto done;
42382b70
AE
4996 }
4997
a14ea269 4998 spin_lock_irq(&rbd_dev->lock);
b82d167b 4999 if (rbd_dev->open_count)
42382b70 5000 ret = -EBUSY;
b82d167b
AE
5001 else
5002 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 5003 spin_unlock_irq(&rbd_dev->lock);
b82d167b 5004 if (ret < 0)
42382b70 5005 goto done;
602adf40 5006
2f82ee54
AE
5007 while (rbd_dev->parent_spec) {
5008 struct rbd_device *first = rbd_dev;
5009 struct rbd_device *second = first->parent;
5010 struct rbd_device *third;
5011
5012 /*
5013 * Follow to the parent with no grandparent and
5014 * remove it.
5015 */
5016 while (second && (third = second->parent)) {
5017 first = second;
5018 second = third;
5019 }
5020 __rbd_remove(second);
5021 rbd_spec_put(first->parent_spec);
5022 first->parent_spec = NULL;
5023 first->parent_overlap = 0;
5024 first->parent = NULL;
5025 }
5026 __rbd_remove(rbd_dev);
602adf40
YS
5027
5028done:
5029 mutex_unlock(&ctl_mutex);
aafb230e 5030
602adf40
YS
5031 return ret;
5032}
5033
602adf40
YS
5034/*
5035 * create control files in sysfs
dfc5606d 5036 * /sys/bus/rbd/...
602adf40
YS
5037 */
5038static int rbd_sysfs_init(void)
5039{
dfc5606d 5040 int ret;
602adf40 5041
fed4c143 5042 ret = device_register(&rbd_root_dev);
21079786 5043 if (ret < 0)
dfc5606d 5044 return ret;
602adf40 5045
fed4c143
AE
5046 ret = bus_register(&rbd_bus_type);
5047 if (ret < 0)
5048 device_unregister(&rbd_root_dev);
602adf40 5049
602adf40
YS
5050 return ret;
5051}
5052
5053static void rbd_sysfs_cleanup(void)
5054{
dfc5606d 5055 bus_unregister(&rbd_bus_type);
fed4c143 5056 device_unregister(&rbd_root_dev);
602adf40
YS
5057}
5058
cc344fa1 5059static int __init rbd_init(void)
602adf40
YS
5060{
5061 int rc;
5062
1e32d34c
AE
5063 if (!libceph_compatible(NULL)) {
5064 rbd_warn(NULL, "libceph incompatibility (quitting)");
5065
5066 return -EINVAL;
5067 }
602adf40
YS
5068 rc = rbd_sysfs_init();
5069 if (rc)
5070 return rc;
f0f8cef5 5071 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
5072 return 0;
5073}
5074
cc344fa1 5075static void __exit rbd_exit(void)
602adf40
YS
5076{
5077 rbd_sysfs_cleanup();
5078}
5079
5080module_init(rbd_init);
5081module_exit(rbd_exit);
5082
5083MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5084MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5085MODULE_DESCRIPTION("rados block device");
5086
5087/* following authorship retained from original osdblk.c */
5088MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5089
5090MODULE_LICENSE("GPL");