rbd: return earlier in rbd_header_from_disk()
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
f0f8cef5
AE
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
602adf40
YS
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
81a89793
AE
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
602adf40 69#define DEV_NAME_LEN 32
81a89793 70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 71
59c2be1e
YS
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
602adf40
YS
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78 u64 image_size;
849b4260 79 char *object_prefix;
602adf40
YS
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
602adf40 83 struct ceph_snap_context *snapc;
0f1d3f93 84 u64 snap_names_len;
602adf40
YS
85 u32 total_snaps;
86
87 char *snap_names;
88 u64 *snap_sizes;
59c2be1e
YS
89
90 u64 obj_version;
91};
92
93struct rbd_options {
94 int notify_timeout;
602adf40
YS
95};
96
97/*
f0f8cef5 98 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
99 */
100struct rbd_client {
101 struct ceph_client *client;
59c2be1e 102 struct rbd_options *rbd_opts;
602adf40
YS
103 struct kref kref;
104 struct list_head node;
105};
106
107/*
f0f8cef5 108 * a request completion status
602adf40 109 */
1fec7093
YS
110struct rbd_req_status {
111 int done;
112 int rc;
113 u64 bytes;
114};
115
116/*
117 * a collection of requests
118 */
119struct rbd_req_coll {
120 int total;
121 int num_done;
122 struct kref kref;
123 struct rbd_req_status status[0];
602adf40
YS
124};
125
f0f8cef5
AE
126/*
127 * a single io request
128 */
129struct rbd_request {
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
133 u64 len;
134 int coll_index;
135 struct rbd_req_coll *coll;
136};
137
dfc5606d
YS
138struct rbd_snap {
139 struct device dev;
140 const char *name;
3591538f 141 u64 size;
dfc5606d
YS
142 struct list_head node;
143 u64 id;
144};
145
602adf40
YS
146/*
147 * a single device
148 */
149struct rbd_device {
de71a297 150 int dev_id; /* blkdev unique id */
602adf40
YS
151
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
155
602adf40
YS
156 struct rbd_client *rbd_client;
157
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160 spinlock_t lock; /* queue lock */
161
162 struct rbd_image_header header;
0bed54dc
AE
163 char *image_name;
164 size_t image_name_len;
165 char *header_name;
d22f76e7 166 char *pool_name;
9bb2f334 167 int pool_id;
602adf40 168
59c2be1e
YS
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
171
c666601a
JD
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
e88a36ec 174 /* name of the snapshot this device reads from */
820a5f3e 175 char *snap_name;
e88a36ec 176 /* id of the snapshot this device reads from */
77dfe99f 177 u64 snap_id; /* current snapshot id */
e88a36ec
JD
178 /* whether the snap_id this device reads from still exists */
179 bool snap_exists;
180 int read_only;
602adf40
YS
181
182 struct list_head node;
dfc5606d
YS
183
184 /* list of snapshots */
185 struct list_head snaps;
186
187 /* sysfs related */
188 struct device dev;
189};
190
602adf40 191static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 192
602adf40 193static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
194static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
432b8587
AE
196static LIST_HEAD(rbd_client_list); /* clients */
197static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 198
dfc5606d
YS
199static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200static void rbd_dev_release(struct device *dev);
dfc5606d
YS
201static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
203 const char *buf,
204 size_t count);
14e7085d 205static void __rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 206
f0f8cef5
AE
207static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 size_t count);
209static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 size_t count);
211
212static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215 __ATTR_NULL
216};
217
218static struct bus_type rbd_bus_type = {
219 .name = "rbd",
220 .bus_attrs = rbd_bus_attrs,
221};
222
223static void rbd_root_dev_release(struct device *dev)
224{
225}
226
227static struct device rbd_root_dev = {
228 .init_name = "rbd",
229 .release = rbd_root_dev_release,
230};
231
dfc5606d 232
dfc5606d
YS
233static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234{
235 return get_device(&rbd_dev->dev);
236}
237
238static void rbd_put_dev(struct rbd_device *rbd_dev)
239{
240 put_device(&rbd_dev->dev);
241}
602adf40 242
1fe5e993 243static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 244
602adf40
YS
245static int rbd_open(struct block_device *bdev, fmode_t mode)
246{
f0f8cef5 247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 248
602adf40
YS
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250 return -EROFS;
251
340c7a2b
AE
252 rbd_get_dev(rbd_dev);
253 set_device_ro(bdev, rbd_dev->read_only);
254
602adf40
YS
255 return 0;
256}
257
dfc5606d
YS
258static int rbd_release(struct gendisk *disk, fmode_t mode)
259{
260 struct rbd_device *rbd_dev = disk->private_data;
261
262 rbd_put_dev(rbd_dev);
263
264 return 0;
265}
266
602adf40
YS
267static const struct block_device_operations rbd_bd_ops = {
268 .owner = THIS_MODULE,
269 .open = rbd_open,
dfc5606d 270 .release = rbd_release,
602adf40
YS
271};
272
273/*
274 * Initialize an rbd client instance.
43ae4701 275 * We own *ceph_opts.
602adf40 276 */
43ae4701 277static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
59c2be1e 278 struct rbd_options *rbd_opts)
602adf40
YS
279{
280 struct rbd_client *rbdc;
281 int ret = -ENOMEM;
282
283 dout("rbd_client_create\n");
284 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285 if (!rbdc)
286 goto out_opt;
287
288 kref_init(&rbdc->kref);
289 INIT_LIST_HEAD(&rbdc->node);
290
bc534d86
AE
291 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
43ae4701 293 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 294 if (IS_ERR(rbdc->client))
bc534d86 295 goto out_mutex;
43ae4701 296 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
297
298 ret = ceph_open_session(rbdc->client);
299 if (ret < 0)
300 goto out_err;
301
59c2be1e
YS
302 rbdc->rbd_opts = rbd_opts;
303
432b8587 304 spin_lock(&rbd_client_list_lock);
602adf40 305 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 306 spin_unlock(&rbd_client_list_lock);
602adf40 307
bc534d86
AE
308 mutex_unlock(&ctl_mutex);
309
602adf40
YS
310 dout("rbd_client_create created %p\n", rbdc);
311 return rbdc;
312
313out_err:
314 ceph_destroy_client(rbdc->client);
bc534d86
AE
315out_mutex:
316 mutex_unlock(&ctl_mutex);
602adf40
YS
317 kfree(rbdc);
318out_opt:
43ae4701
AE
319 if (ceph_opts)
320 ceph_destroy_options(ceph_opts);
28f259b7 321 return ERR_PTR(ret);
602adf40
YS
322}
323
324/*
325 * Find a ceph client with specific addr and configuration.
326 */
43ae4701 327static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
328{
329 struct rbd_client *client_node;
330
43ae4701 331 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
332 return NULL;
333
334 list_for_each_entry(client_node, &rbd_client_list, node)
43ae4701 335 if (!ceph_compare_options(ceph_opts, client_node->client))
602adf40
YS
336 return client_node;
337 return NULL;
338}
339
59c2be1e
YS
340/*
341 * mount options
342 */
343enum {
344 Opt_notify_timeout,
345 Opt_last_int,
346 /* int args above */
347 Opt_last_string,
348 /* string args above */
349};
350
43ae4701 351static match_table_t rbd_opts_tokens = {
59c2be1e
YS
352 {Opt_notify_timeout, "notify_timeout=%d"},
353 /* int args above */
354 /* string args above */
355 {-1, NULL}
356};
357
358static int parse_rbd_opts_token(char *c, void *private)
359{
43ae4701 360 struct rbd_options *rbd_opts = private;
59c2be1e
YS
361 substring_t argstr[MAX_OPT_ARGS];
362 int token, intval, ret;
363
43ae4701 364 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
365 if (token < 0)
366 return -EINVAL;
367
368 if (token < Opt_last_int) {
369 ret = match_int(&argstr[0], &intval);
370 if (ret < 0) {
371 pr_err("bad mount option arg (not int) "
372 "at '%s'\n", c);
373 return ret;
374 }
375 dout("got int token %d val %d\n", token, intval);
376 } else if (token > Opt_last_int && token < Opt_last_string) {
377 dout("got string token %d val %s\n", token,
378 argstr[0].from);
379 } else {
380 dout("got token %d\n", token);
381 }
382
383 switch (token) {
384 case Opt_notify_timeout:
43ae4701 385 rbd_opts->notify_timeout = intval;
59c2be1e
YS
386 break;
387 default:
388 BUG_ON(token);
389 }
390 return 0;
391}
392
602adf40
YS
393/*
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
396 */
5214ecc4
AE
397static struct rbd_client *rbd_get_client(const char *mon_addr,
398 size_t mon_addr_len,
399 char *options)
602adf40
YS
400{
401 struct rbd_client *rbdc;
43ae4701 402 struct ceph_options *ceph_opts;
59c2be1e
YS
403 struct rbd_options *rbd_opts;
404
405 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406 if (!rbd_opts)
d720bcb0 407 return ERR_PTR(-ENOMEM);
59c2be1e
YS
408
409 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 410
43ae4701
AE
411 ceph_opts = ceph_parse_options(options, mon_addr,
412 mon_addr + mon_addr_len,
413 parse_rbd_opts_token, rbd_opts);
414 if (IS_ERR(ceph_opts)) {
d720bcb0 415 kfree(rbd_opts);
43ae4701 416 return ERR_CAST(ceph_opts);
ee57741c 417 }
602adf40 418
432b8587 419 spin_lock(&rbd_client_list_lock);
43ae4701 420 rbdc = __rbd_client_find(ceph_opts);
602adf40 421 if (rbdc) {
602adf40
YS
422 /* using an existing client */
423 kref_get(&rbdc->kref);
432b8587 424 spin_unlock(&rbd_client_list_lock);
e6994d3d 425
43ae4701 426 ceph_destroy_options(ceph_opts);
e6994d3d
AE
427 kfree(rbd_opts);
428
d720bcb0 429 return rbdc;
602adf40 430 }
432b8587 431 spin_unlock(&rbd_client_list_lock);
602adf40 432
43ae4701 433 rbdc = rbd_client_create(ceph_opts, rbd_opts);
d97081b0 434
d720bcb0
AE
435 if (IS_ERR(rbdc))
436 kfree(rbd_opts);
602adf40 437
d720bcb0 438 return rbdc;
602adf40
YS
439}
440
441/*
442 * Destroy ceph client
d23a4b3f 443 *
432b8587 444 * Caller must hold rbd_client_list_lock.
602adf40
YS
445 */
446static void rbd_client_release(struct kref *kref)
447{
448 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 451 spin_lock(&rbd_client_list_lock);
602adf40 452 list_del(&rbdc->node);
cd9d9f5d 453 spin_unlock(&rbd_client_list_lock);
602adf40
YS
454
455 ceph_destroy_client(rbdc->client);
59c2be1e 456 kfree(rbdc->rbd_opts);
602adf40
YS
457 kfree(rbdc);
458}
459
460/*
461 * Drop reference to ceph client node. If it's not referenced anymore, release
462 * it.
463 */
464static void rbd_put_client(struct rbd_device *rbd_dev)
465{
466 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467 rbd_dev->rbd_client = NULL;
602adf40
YS
468}
469
1fec7093
YS
470/*
471 * Destroy requests collection
472 */
473static void rbd_coll_release(struct kref *kref)
474{
475 struct rbd_req_coll *coll =
476 container_of(kref, struct rbd_req_coll, kref);
477
478 dout("rbd_coll_release %p\n", coll);
479 kfree(coll);
480}
602adf40 481
8e94af8e
AE
482static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483{
484 return !memcmp(&ondisk->text,
485 RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
486}
487
602adf40
YS
488/*
489 * Create a new header structure, translate header format from the on-disk
490 * header.
491 */
492static int rbd_header_from_disk(struct rbd_image_header *header,
493 struct rbd_image_header_ondisk *ondisk,
ed63f4fd 494 u32 allocated_snaps)
602adf40 495{
ccece235 496 u32 snap_count;
d2bb24e5 497 size_t size;
602adf40 498
8e94af8e 499 if (!rbd_dev_ondisk_valid(ondisk))
81e759fb 500 return -ENXIO;
81e759fb 501
00f1f36f 502 snap_count = le32_to_cpu(ondisk->snap_count);
d2bb24e5
AE
503
504 /* Make sure we don't overflow below */
505 size = SIZE_MAX - sizeof (struct ceph_snap_context);
506 if (snap_count > size / sizeof (header->snapc->snaps[0]))
50f7c4c9 507 return -EINVAL;
d2bb24e5 508
6a52325f
AE
509 memset(header, 0, sizeof (*header));
510
511 size = sizeof (ondisk->block_name) + 1;
512 header->object_prefix = kmalloc(size, GFP_KERNEL);
513 if (!header->object_prefix)
602adf40 514 return -ENOMEM;
6a52325f
AE
515 memcpy(header->object_prefix, ondisk->block_name, size - 1);
516 header->object_prefix[size - 1] = '\0';
00f1f36f 517
602adf40 518 if (snap_count) {
ccece235 519 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
0f1d3f93 520 BUG_ON(header->snap_names_len > (u64) SIZE_MAX);
602adf40 521 header->snap_names = kmalloc(header->snap_names_len,
ed63f4fd 522 GFP_KERNEL);
602adf40 523 if (!header->snap_names)
6a52325f
AE
524 goto out_err;
525
d2bb24e5
AE
526 size = snap_count * sizeof (*header->snap_sizes);
527 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 528 if (!header->snap_sizes)
6a52325f 529 goto out_err;
602adf40 530 } else {
ccece235
AE
531 WARN_ON(ondisk->snap_names_len);
532 header->snap_names_len = 0;
602adf40
YS
533 header->snap_names = NULL;
534 header->snap_sizes = NULL;
535 }
849b4260 536
602adf40
YS
537 header->image_size = le64_to_cpu(ondisk->image_size);
538 header->obj_order = ondisk->options.order;
539 header->crypt_type = ondisk->options.crypt_type;
540 header->comp_type = ondisk->options.comp_type;
6a52325f
AE
541 header->total_snaps = snap_count;
542
28cb775d
AE
543 /*
544 * If the number of snapshot ids provided by the caller
545 * doesn't match the number in the entire context there's
546 * no point in going further. Caller will try again after
547 * getting an updated snapshot context from the server.
548 */
549 if (allocated_snaps != snap_count)
550 return 0;
6a52325f
AE
551
552 size = sizeof (struct ceph_snap_context);
553 size += snap_count * sizeof (header->snapc->snaps[0]);
554 header->snapc = kzalloc(size, GFP_KERNEL);
555 if (!header->snapc)
556 goto out_err;
602adf40
YS
557
558 atomic_set(&header->snapc->nref, 1);
505cbb9b 559 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 560 header->snapc->num_snaps = snap_count;
602adf40 561
28cb775d
AE
562 /* Fill in the snapshot information */
563
564 if (snap_count) {
565 u32 i;
ccece235 566
602adf40
YS
567 for (i = 0; i < snap_count; i++) {
568 header->snapc->snaps[i] =
569 le64_to_cpu(ondisk->snaps[i].id);
570 header->snap_sizes[i] =
571 le64_to_cpu(ondisk->snaps[i].image_size);
572 }
573
574 /* copy snapshot names */
ccece235 575 memcpy(header->snap_names, &ondisk->snaps[snap_count],
602adf40
YS
576 header->snap_names_len);
577 }
578
579 return 0;
580
6a52325f 581out_err:
849b4260 582 kfree(header->snap_sizes);
ccece235 583 header->snap_sizes = NULL;
602adf40 584 kfree(header->snap_names);
ccece235 585 header->snap_names = NULL;
d78fd7ae 586 header->snap_names_len = 0;
6a52325f
AE
587 kfree(header->object_prefix);
588 header->object_prefix = NULL;
ccece235 589
00f1f36f 590 return -ENOMEM;
602adf40
YS
591}
592
602adf40
YS
593static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
594 u64 *seq, u64 *size)
595{
596 int i;
597 char *p = header->snap_names;
598
00f1f36f
AE
599 for (i = 0; i < header->total_snaps; i++) {
600 if (!strcmp(snap_name, p)) {
602adf40 601
00f1f36f 602 /* Found it. Pass back its id and/or size */
602adf40 603
00f1f36f
AE
604 if (seq)
605 *seq = header->snapc->snaps[i];
606 if (size)
607 *size = header->snap_sizes[i];
608 return i;
609 }
610 p += strlen(p) + 1; /* Skip ahead to the next name */
611 }
612 return -ENOENT;
602adf40
YS
613}
614
0ce1a794 615static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 616{
78dc447d 617 int ret;
602adf40 618
0ce1a794 619 down_write(&rbd_dev->header_rwsem);
602adf40 620
0ce1a794 621 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 622 sizeof (RBD_SNAP_HEAD_NAME))) {
0ce1a794 623 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 624 rbd_dev->snap_exists = false;
0ce1a794 625 rbd_dev->read_only = 0;
602adf40 626 if (size)
78dc447d 627 *size = rbd_dev->header.image_size;
602adf40 628 } else {
78dc447d
AE
629 u64 snap_id = 0;
630
631 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
632 &snap_id, size);
602adf40
YS
633 if (ret < 0)
634 goto done;
78dc447d 635 rbd_dev->snap_id = snap_id;
e88a36ec 636 rbd_dev->snap_exists = true;
0ce1a794 637 rbd_dev->read_only = 1;
602adf40
YS
638 }
639
640 ret = 0;
641done:
0ce1a794 642 up_write(&rbd_dev->header_rwsem);
602adf40
YS
643 return ret;
644}
645
646static void rbd_header_free(struct rbd_image_header *header)
647{
849b4260 648 kfree(header->object_prefix);
d78fd7ae 649 header->object_prefix = NULL;
602adf40 650 kfree(header->snap_sizes);
d78fd7ae 651 header->snap_sizes = NULL;
849b4260 652 kfree(header->snap_names);
d78fd7ae
AE
653 header->snap_names = NULL;
654 header->snap_names_len = 0;
d1d25646 655 ceph_put_snap_context(header->snapc);
d78fd7ae 656 header->snapc = NULL;
602adf40
YS
657}
658
659/*
660 * get the actual striped segment name, offset and length
661 */
662static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 663 const char *object_prefix,
602adf40
YS
664 u64 ofs, u64 len,
665 char *seg_name, u64 *segofs)
666{
667 u64 seg = ofs >> header->obj_order;
668
669 if (seg_name)
670 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 671 "%s.%012llx", object_prefix, seg);
602adf40
YS
672
673 ofs = ofs & ((1 << header->obj_order) - 1);
674 len = min_t(u64, len, (1 << header->obj_order) - ofs);
675
676 if (segofs)
677 *segofs = ofs;
678
679 return len;
680}
681
1fec7093
YS
682static int rbd_get_num_segments(struct rbd_image_header *header,
683 u64 ofs, u64 len)
684{
685 u64 start_seg = ofs >> header->obj_order;
686 u64 end_seg = (ofs + len - 1) >> header->obj_order;
687 return end_seg - start_seg + 1;
688}
689
029bcbd8
JD
690/*
691 * returns the size of an object in the image
692 */
693static u64 rbd_obj_bytes(struct rbd_image_header *header)
694{
695 return 1 << header->obj_order;
696}
697
602adf40
YS
698/*
699 * bio helpers
700 */
701
702static void bio_chain_put(struct bio *chain)
703{
704 struct bio *tmp;
705
706 while (chain) {
707 tmp = chain;
708 chain = chain->bi_next;
709 bio_put(tmp);
710 }
711}
712
713/*
714 * zeros a bio chain, starting at specific offset
715 */
716static void zero_bio_chain(struct bio *chain, int start_ofs)
717{
718 struct bio_vec *bv;
719 unsigned long flags;
720 void *buf;
721 int i;
722 int pos = 0;
723
724 while (chain) {
725 bio_for_each_segment(bv, chain, i) {
726 if (pos + bv->bv_len > start_ofs) {
727 int remainder = max(start_ofs - pos, 0);
728 buf = bvec_kmap_irq(bv, &flags);
729 memset(buf + remainder, 0,
730 bv->bv_len - remainder);
85b5aaa6 731 bvec_kunmap_irq(buf, &flags);
602adf40
YS
732 }
733 pos += bv->bv_len;
734 }
735
736 chain = chain->bi_next;
737 }
738}
739
740/*
741 * bio_chain_clone - clone a chain of bios up to a certain length.
742 * might return a bio_pair that will need to be released.
743 */
744static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
745 struct bio_pair **bp,
746 int len, gfp_t gfpmask)
747{
748 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
749 int total = 0;
750
751 if (*bp) {
752 bio_pair_release(*bp);
753 *bp = NULL;
754 }
755
756 while (old_chain && (total < len)) {
757 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
758 if (!tmp)
759 goto err_out;
760
761 if (total + old_chain->bi_size > len) {
762 struct bio_pair *bp;
763
764 /*
765 * this split can only happen with a single paged bio,
766 * split_bio will BUG_ON if this is not the case
767 */
768 dout("bio_chain_clone split! total=%d remaining=%d"
bd919d45
AE
769 "bi_size=%u\n",
770 total, len - total, old_chain->bi_size);
602adf40
YS
771
772 /* split the bio. We'll release it either in the next
773 call, or it will have to be released outside */
593a9e7b 774 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
775 if (!bp)
776 goto err_out;
777
778 __bio_clone(tmp, &bp->bio1);
779
780 *next = &bp->bio2;
781 } else {
782 __bio_clone(tmp, old_chain);
783 *next = old_chain->bi_next;
784 }
785
786 tmp->bi_bdev = NULL;
787 gfpmask &= ~__GFP_WAIT;
788 tmp->bi_next = NULL;
789
790 if (!new_chain) {
791 new_chain = tail = tmp;
792 } else {
793 tail->bi_next = tmp;
794 tail = tmp;
795 }
796 old_chain = old_chain->bi_next;
797
798 total += tmp->bi_size;
799 }
800
801 BUG_ON(total < len);
802
803 if (tail)
804 tail->bi_next = NULL;
805
806 *old = old_chain;
807
808 return new_chain;
809
810err_out:
811 dout("bio_chain_clone with err\n");
812 bio_chain_put(new_chain);
813 return NULL;
814}
815
816/*
817 * helpers for osd request op vectors.
818 */
57cfc106
AE
819static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
820 int opcode, u32 payload_len)
602adf40 821{
57cfc106
AE
822 struct ceph_osd_req_op *ops;
823
824 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
825 if (!ops)
826 return NULL;
827
828 ops[0].op = opcode;
829
602adf40
YS
830 /*
831 * op extent offset and length will be set later on
832 * in calc_raw_layout()
833 */
57cfc106
AE
834 ops[0].payload_len = payload_len;
835
836 return ops;
602adf40
YS
837}
838
839static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
840{
841 kfree(ops);
842}
843
1fec7093
YS
844static void rbd_coll_end_req_index(struct request *rq,
845 struct rbd_req_coll *coll,
846 int index,
847 int ret, u64 len)
848{
849 struct request_queue *q;
850 int min, max, i;
851
bd919d45
AE
852 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
853 coll, index, ret, (unsigned long long) len);
1fec7093
YS
854
855 if (!rq)
856 return;
857
858 if (!coll) {
859 blk_end_request(rq, ret, len);
860 return;
861 }
862
863 q = rq->q;
864
865 spin_lock_irq(q->queue_lock);
866 coll->status[index].done = 1;
867 coll->status[index].rc = ret;
868 coll->status[index].bytes = len;
869 max = min = coll->num_done;
870 while (max < coll->total && coll->status[max].done)
871 max++;
872
873 for (i = min; i<max; i++) {
874 __blk_end_request(rq, coll->status[i].rc,
875 coll->status[i].bytes);
876 coll->num_done++;
877 kref_put(&coll->kref, rbd_coll_release);
878 }
879 spin_unlock_irq(q->queue_lock);
880}
881
882static void rbd_coll_end_req(struct rbd_request *req,
883 int ret, u64 len)
884{
885 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
886}
887
602adf40
YS
888/*
889 * Send ceph osd request
890 */
891static int rbd_do_request(struct request *rq,
0ce1a794 892 struct rbd_device *rbd_dev,
602adf40
YS
893 struct ceph_snap_context *snapc,
894 u64 snapid,
aded07ea 895 const char *object_name, u64 ofs, u64 len,
602adf40
YS
896 struct bio *bio,
897 struct page **pages,
898 int num_pages,
899 int flags,
900 struct ceph_osd_req_op *ops,
1fec7093
YS
901 struct rbd_req_coll *coll,
902 int coll_index,
602adf40 903 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
904 struct ceph_msg *msg),
905 struct ceph_osd_request **linger_req,
906 u64 *ver)
602adf40
YS
907{
908 struct ceph_osd_request *req;
909 struct ceph_file_layout *layout;
910 int ret;
911 u64 bno;
912 struct timespec mtime = CURRENT_TIME;
913 struct rbd_request *req_data;
914 struct ceph_osd_request_head *reqhead;
1dbb4399 915 struct ceph_osd_client *osdc;
602adf40 916
602adf40 917 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
918 if (!req_data) {
919 if (coll)
920 rbd_coll_end_req_index(rq, coll, coll_index,
921 -ENOMEM, len);
922 return -ENOMEM;
923 }
924
925 if (coll) {
926 req_data->coll = coll;
927 req_data->coll_index = coll_index;
928 }
602adf40 929
bd919d45
AE
930 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
931 (unsigned long long) ofs, (unsigned long long) len);
602adf40 932
0ce1a794 933 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
934 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
935 false, GFP_NOIO, pages, bio);
4ad12621 936 if (!req) {
4ad12621 937 ret = -ENOMEM;
602adf40
YS
938 goto done_pages;
939 }
940
941 req->r_callback = rbd_cb;
942
943 req_data->rq = rq;
944 req_data->bio = bio;
945 req_data->pages = pages;
946 req_data->len = len;
947
948 req->r_priv = req_data;
949
950 reqhead = req->r_request->front.iov_base;
951 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
952
aded07ea 953 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
954 req->r_oid_len = strlen(req->r_oid);
955
956 layout = &req->r_file_layout;
957 memset(layout, 0, sizeof(*layout));
958 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
959 layout->fl_stripe_count = cpu_to_le32(1);
960 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 961 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
962 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
963 req, ops);
602adf40
YS
964
965 ceph_osdc_build_request(req, ofs, &len,
966 ops,
967 snapc,
968 &mtime,
969 req->r_oid, req->r_oid_len);
602adf40 970
59c2be1e 971 if (linger_req) {
1dbb4399 972 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
973 *linger_req = req;
974 }
975
1dbb4399 976 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
977 if (ret < 0)
978 goto done_err;
979
980 if (!rbd_cb) {
1dbb4399 981 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
982 if (ver)
983 *ver = le64_to_cpu(req->r_reassert_version.version);
bd919d45
AE
984 dout("reassert_ver=%llu\n",
985 (unsigned long long)
986 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
987 ceph_osdc_put_request(req);
988 }
989 return ret;
990
991done_err:
992 bio_chain_put(req_data->bio);
993 ceph_osdc_put_request(req);
994done_pages:
1fec7093 995 rbd_coll_end_req(req_data, ret, len);
602adf40 996 kfree(req_data);
602adf40
YS
997 return ret;
998}
999
1000/*
1001 * Ceph osd op callback
1002 */
1003static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1004{
1005 struct rbd_request *req_data = req->r_priv;
1006 struct ceph_osd_reply_head *replyhead;
1007 struct ceph_osd_op *op;
1008 __s32 rc;
1009 u64 bytes;
1010 int read_op;
1011
1012 /* parse reply */
1013 replyhead = msg->front.iov_base;
1014 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1015 op = (void *)(replyhead + 1);
1016 rc = le32_to_cpu(replyhead->result);
1017 bytes = le64_to_cpu(op->extent.length);
895cfcc8 1018 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40 1019
bd919d45
AE
1020 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1021 (unsigned long long) bytes, read_op, (int) rc);
602adf40
YS
1022
1023 if (rc == -ENOENT && read_op) {
1024 zero_bio_chain(req_data->bio, 0);
1025 rc = 0;
1026 } else if (rc == 0 && read_op && bytes < req_data->len) {
1027 zero_bio_chain(req_data->bio, bytes);
1028 bytes = req_data->len;
1029 }
1030
1fec7093 1031 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1032
1033 if (req_data->bio)
1034 bio_chain_put(req_data->bio);
1035
1036 ceph_osdc_put_request(req);
1037 kfree(req_data);
1038}
1039
59c2be1e
YS
1040static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1041{
1042 ceph_osdc_put_request(req);
1043}
1044
602adf40
YS
1045/*
1046 * Do a synchronous ceph osd operation
1047 */
0ce1a794 1048static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1049 struct ceph_snap_context *snapc,
1050 u64 snapid,
602adf40 1051 int flags,
913d2fdc 1052 struct ceph_osd_req_op *ops,
aded07ea 1053 const char *object_name,
602adf40 1054 u64 ofs, u64 len,
59c2be1e
YS
1055 char *buf,
1056 struct ceph_osd_request **linger_req,
1057 u64 *ver)
602adf40
YS
1058{
1059 int ret;
1060 struct page **pages;
1061 int num_pages;
913d2fdc
AE
1062
1063 BUG_ON(ops == NULL);
602adf40
YS
1064
1065 num_pages = calc_pages_for(ofs , len);
1066 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1067 if (IS_ERR(pages))
1068 return PTR_ERR(pages);
602adf40 1069
0ce1a794 1070 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1071 object_name, ofs, len, NULL,
602adf40
YS
1072 pages, num_pages,
1073 flags,
1074 ops,
1fec7093 1075 NULL, 0,
59c2be1e
YS
1076 NULL,
1077 linger_req, ver);
602adf40 1078 if (ret < 0)
913d2fdc 1079 goto done;
602adf40
YS
1080
1081 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1082 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1083
602adf40
YS
1084done:
1085 ceph_release_page_vector(pages, num_pages);
1086 return ret;
1087}
1088
1089/*
1090 * Do an asynchronous ceph osd operation
1091 */
1092static int rbd_do_op(struct request *rq,
0ce1a794 1093 struct rbd_device *rbd_dev,
602adf40
YS
1094 struct ceph_snap_context *snapc,
1095 u64 snapid,
d1f57ea6 1096 int opcode, int flags,
602adf40 1097 u64 ofs, u64 len,
1fec7093
YS
1098 struct bio *bio,
1099 struct rbd_req_coll *coll,
1100 int coll_index)
602adf40
YS
1101{
1102 char *seg_name;
1103 u64 seg_ofs;
1104 u64 seg_len;
1105 int ret;
1106 struct ceph_osd_req_op *ops;
1107 u32 payload_len;
1108
1109 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1110 if (!seg_name)
1111 return -ENOMEM;
1112
1113 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1114 rbd_dev->header.object_prefix,
602adf40
YS
1115 ofs, len,
1116 seg_name, &seg_ofs);
602adf40
YS
1117
1118 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1119
57cfc106
AE
1120 ret = -ENOMEM;
1121 ops = rbd_create_rw_ops(1, opcode, payload_len);
1122 if (!ops)
602adf40
YS
1123 goto done;
1124
1125 /* we've taken care of segment sizes earlier when we
1126 cloned the bios. We should never have a segment
1127 truncated at this point */
1128 BUG_ON(seg_len < len);
1129
1130 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1131 seg_name, seg_ofs, seg_len,
1132 bio,
1133 NULL, 0,
1134 flags,
1135 ops,
1fec7093 1136 coll, coll_index,
59c2be1e 1137 rbd_req_cb, 0, NULL);
11f77002
SW
1138
1139 rbd_destroy_ops(ops);
602adf40
YS
1140done:
1141 kfree(seg_name);
1142 return ret;
1143}
1144
1145/*
1146 * Request async osd write
1147 */
1148static int rbd_req_write(struct request *rq,
1149 struct rbd_device *rbd_dev,
1150 struct ceph_snap_context *snapc,
1151 u64 ofs, u64 len,
1fec7093
YS
1152 struct bio *bio,
1153 struct rbd_req_coll *coll,
1154 int coll_index)
602adf40
YS
1155{
1156 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1157 CEPH_OSD_OP_WRITE,
1158 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1159 ofs, len, bio, coll, coll_index);
602adf40
YS
1160}
1161
1162/*
1163 * Request async osd read
1164 */
1165static int rbd_req_read(struct request *rq,
1166 struct rbd_device *rbd_dev,
1167 u64 snapid,
1168 u64 ofs, u64 len,
1fec7093
YS
1169 struct bio *bio,
1170 struct rbd_req_coll *coll,
1171 int coll_index)
602adf40
YS
1172{
1173 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1174 snapid,
602adf40
YS
1175 CEPH_OSD_OP_READ,
1176 CEPH_OSD_FLAG_READ,
1fec7093 1177 ofs, len, bio, coll, coll_index);
602adf40
YS
1178}
1179
1180/*
1181 * Request sync osd read
1182 */
0ce1a794 1183static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40 1184 u64 snapid,
aded07ea 1185 const char *object_name,
602adf40 1186 u64 ofs, u64 len,
59c2be1e
YS
1187 char *buf,
1188 u64 *ver)
602adf40 1189{
913d2fdc
AE
1190 struct ceph_osd_req_op *ops;
1191 int ret;
1192
1193 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1194 if (!ops)
1195 return -ENOMEM;
1196
1197 ret = rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1198 snapid,
602adf40 1199 CEPH_OSD_FLAG_READ,
913d2fdc
AE
1200 ops, object_name, ofs, len, buf, NULL, ver);
1201 rbd_destroy_ops(ops);
1202
1203 return ret;
602adf40
YS
1204}
1205
1206/*
59c2be1e
YS
1207 * Request sync osd watch
1208 */
0ce1a794 1209static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e 1210 u64 ver,
7f0a24d8 1211 u64 notify_id)
59c2be1e
YS
1212{
1213 struct ceph_osd_req_op *ops;
11f77002
SW
1214 int ret;
1215
57cfc106
AE
1216 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1217 if (!ops)
1218 return -ENOMEM;
59c2be1e 1219
a71b891b 1220 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1221 ops[0].watch.cookie = notify_id;
1222 ops[0].watch.flag = 0;
1223
0ce1a794 1224 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
7f0a24d8 1225 rbd_dev->header_name, 0, 0, NULL,
ad4f232f 1226 NULL, 0,
59c2be1e
YS
1227 CEPH_OSD_FLAG_READ,
1228 ops,
1fec7093 1229 NULL, 0,
59c2be1e
YS
1230 rbd_simple_req_cb, 0, NULL);
1231
1232 rbd_destroy_ops(ops);
1233 return ret;
1234}
1235
1236static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1237{
0ce1a794 1238 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1239 u64 hver;
13143d2d
SW
1240 int rc;
1241
0ce1a794 1242 if (!rbd_dev)
59c2be1e
YS
1243 return;
1244
bd919d45
AE
1245 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1246 rbd_dev->header_name, (unsigned long long) notify_id,
1247 (unsigned int) opcode);
1fe5e993 1248 rc = rbd_refresh_header(rbd_dev, &hver);
13143d2d 1249 if (rc)
f0f8cef5 1250 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1251 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1252
7f0a24d8 1253 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
59c2be1e
YS
1254}
1255
1256/*
1257 * Request sync osd watch
1258 */
0e6f322d 1259static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
59c2be1e
YS
1260{
1261 struct ceph_osd_req_op *ops;
0ce1a794 1262 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
57cfc106 1263 int ret;
59c2be1e 1264
57cfc106
AE
1265 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1266 if (!ops)
1267 return -ENOMEM;
59c2be1e
YS
1268
1269 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1270 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1271 if (ret < 0)
1272 goto fail;
1273
0e6f322d 1274 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
0ce1a794 1275 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1276 ops[0].watch.flag = 1;
1277
0ce1a794 1278 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1279 CEPH_NOSNAP,
59c2be1e
YS
1280 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1281 ops,
0e6f322d
AE
1282 rbd_dev->header_name,
1283 0, 0, NULL,
0ce1a794 1284 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1285
1286 if (ret < 0)
1287 goto fail_event;
1288
1289 rbd_destroy_ops(ops);
1290 return 0;
1291
1292fail_event:
0ce1a794
AE
1293 ceph_osdc_cancel_event(rbd_dev->watch_event);
1294 rbd_dev->watch_event = NULL;
59c2be1e
YS
1295fail:
1296 rbd_destroy_ops(ops);
1297 return ret;
1298}
1299
79e3057c
YS
1300/*
1301 * Request sync osd unwatch
1302 */
070c633f 1303static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
79e3057c
YS
1304{
1305 struct ceph_osd_req_op *ops;
57cfc106 1306 int ret;
79e3057c 1307
57cfc106
AE
1308 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1309 if (!ops)
1310 return -ENOMEM;
79e3057c
YS
1311
1312 ops[0].watch.ver = 0;
0ce1a794 1313 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1314 ops[0].watch.flag = 0;
1315
0ce1a794 1316 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c 1317 CEPH_NOSNAP,
79e3057c
YS
1318 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1319 ops,
070c633f
AE
1320 rbd_dev->header_name,
1321 0, 0, NULL, NULL, NULL);
1322
79e3057c
YS
1323
1324 rbd_destroy_ops(ops);
0ce1a794
AE
1325 ceph_osdc_cancel_event(rbd_dev->watch_event);
1326 rbd_dev->watch_event = NULL;
79e3057c
YS
1327 return ret;
1328}
1329
59c2be1e 1330struct rbd_notify_info {
0ce1a794 1331 struct rbd_device *rbd_dev;
59c2be1e
YS
1332};
1333
1334static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1335{
0ce1a794
AE
1336 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1337 if (!rbd_dev)
59c2be1e
YS
1338 return;
1339
bd919d45
AE
1340 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1341 rbd_dev->header_name, (unsigned long long) notify_id,
1342 (unsigned int) opcode);
59c2be1e
YS
1343}
1344
1345/*
1346 * Request sync osd notify
1347 */
4cb16250 1348static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
59c2be1e
YS
1349{
1350 struct ceph_osd_req_op *ops;
0ce1a794 1351 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1352 struct ceph_osd_event *event;
1353 struct rbd_notify_info info;
1354 int payload_len = sizeof(u32) + sizeof(u32);
1355 int ret;
1356
57cfc106
AE
1357 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1358 if (!ops)
1359 return -ENOMEM;
59c2be1e 1360
0ce1a794 1361 info.rbd_dev = rbd_dev;
59c2be1e
YS
1362
1363 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1364 (void *)&info, &event);
1365 if (ret < 0)
1366 goto fail;
1367
1368 ops[0].watch.ver = 1;
1369 ops[0].watch.flag = 1;
1370 ops[0].watch.cookie = event->cookie;
1371 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1372 ops[0].watch.timeout = 12;
1373
0ce1a794 1374 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1375 CEPH_NOSNAP,
59c2be1e
YS
1376 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1377 ops,
4cb16250
AE
1378 rbd_dev->header_name,
1379 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1380 if (ret < 0)
1381 goto fail_event;
1382
1383 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1384 dout("ceph_osdc_wait_event returned %d\n", ret);
1385 rbd_destroy_ops(ops);
1386 return 0;
1387
1388fail_event:
1389 ceph_osdc_cancel_event(event);
1390fail:
1391 rbd_destroy_ops(ops);
1392 return ret;
1393}
1394
602adf40
YS
1395/*
1396 * Request sync osd read
1397 */
0ce1a794 1398static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1399 const char *object_name,
1400 const char *class_name,
1401 const char *method_name,
602adf40 1402 const char *data,
59c2be1e
YS
1403 int len,
1404 u64 *ver)
602adf40
YS
1405{
1406 struct ceph_osd_req_op *ops;
aded07ea
AE
1407 int class_name_len = strlen(class_name);
1408 int method_name_len = strlen(method_name);
57cfc106
AE
1409 int ret;
1410
1411 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
aded07ea 1412 class_name_len + method_name_len + len);
57cfc106
AE
1413 if (!ops)
1414 return -ENOMEM;
602adf40 1415
aded07ea
AE
1416 ops[0].cls.class_name = class_name;
1417 ops[0].cls.class_len = (__u8) class_name_len;
1418 ops[0].cls.method_name = method_name;
1419 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1420 ops[0].cls.argc = 0;
1421 ops[0].cls.indata = data;
1422 ops[0].cls.indata_len = len;
1423
0ce1a794 1424 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40 1425 CEPH_NOSNAP,
602adf40
YS
1426 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1427 ops,
d1f57ea6 1428 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1429
1430 rbd_destroy_ops(ops);
1431
1432 dout("cls_exec returned %d\n", ret);
1433 return ret;
1434}
1435
1fec7093
YS
1436static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1437{
1438 struct rbd_req_coll *coll =
1439 kzalloc(sizeof(struct rbd_req_coll) +
1440 sizeof(struct rbd_req_status) * num_reqs,
1441 GFP_ATOMIC);
1442
1443 if (!coll)
1444 return NULL;
1445 coll->total = num_reqs;
1446 kref_init(&coll->kref);
1447 return coll;
1448}
1449
602adf40
YS
1450/*
1451 * block device queue callback
1452 */
1453static void rbd_rq_fn(struct request_queue *q)
1454{
1455 struct rbd_device *rbd_dev = q->queuedata;
1456 struct request *rq;
1457 struct bio_pair *bp = NULL;
1458
00f1f36f 1459 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1460 struct bio *bio;
1461 struct bio *rq_bio, *next_bio = NULL;
1462 bool do_write;
bd919d45
AE
1463 unsigned int size;
1464 u64 op_size = 0;
602adf40 1465 u64 ofs;
1fec7093
YS
1466 int num_segs, cur_seg = 0;
1467 struct rbd_req_coll *coll;
d1d25646 1468 struct ceph_snap_context *snapc;
602adf40
YS
1469
1470 /* peek at request from block layer */
1471 if (!rq)
1472 break;
1473
1474 dout("fetched request\n");
1475
1476 /* filter out block requests we don't understand */
1477 if ((rq->cmd_type != REQ_TYPE_FS)) {
1478 __blk_end_request_all(rq, 0);
00f1f36f 1479 continue;
602adf40
YS
1480 }
1481
1482 /* deduce our operation (read, write) */
1483 do_write = (rq_data_dir(rq) == WRITE);
1484
1485 size = blk_rq_bytes(rq);
593a9e7b 1486 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1487 rq_bio = rq->bio;
1488 if (do_write && rbd_dev->read_only) {
1489 __blk_end_request_all(rq, -EROFS);
00f1f36f 1490 continue;
602adf40
YS
1491 }
1492
1493 spin_unlock_irq(q->queue_lock);
1494
d1d25646 1495 down_read(&rbd_dev->header_rwsem);
e88a36ec 1496
d1d25646 1497 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1498 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1499 dout("request for non-existent snapshot");
1500 spin_lock_irq(q->queue_lock);
1501 __blk_end_request_all(rq, -ENXIO);
1502 continue;
e88a36ec
JD
1503 }
1504
d1d25646
JD
1505 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1506
1507 up_read(&rbd_dev->header_rwsem);
1508
602adf40
YS
1509 dout("%s 0x%x bytes at 0x%llx\n",
1510 do_write ? "write" : "read",
bd919d45 1511 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1512
1fec7093
YS
1513 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1514 coll = rbd_alloc_coll(num_segs);
1515 if (!coll) {
1516 spin_lock_irq(q->queue_lock);
1517 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1518 ceph_put_snap_context(snapc);
00f1f36f 1519 continue;
1fec7093
YS
1520 }
1521
602adf40
YS
1522 do {
1523 /* a bio clone to be passed down to OSD req */
bd919d45 1524 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
602adf40 1525 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1526 rbd_dev->header.object_prefix,
602adf40
YS
1527 ofs, size,
1528 NULL, NULL);
1fec7093 1529 kref_get(&coll->kref);
602adf40
YS
1530 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1531 op_size, GFP_ATOMIC);
1532 if (!bio) {
1fec7093
YS
1533 rbd_coll_end_req_index(rq, coll, cur_seg,
1534 -ENOMEM, op_size);
1535 goto next_seg;
602adf40
YS
1536 }
1537
1fec7093 1538
602adf40
YS
1539 /* init OSD command: write or read */
1540 if (do_write)
1541 rbd_req_write(rq, rbd_dev,
d1d25646 1542 snapc,
602adf40 1543 ofs,
1fec7093
YS
1544 op_size, bio,
1545 coll, cur_seg);
602adf40
YS
1546 else
1547 rbd_req_read(rq, rbd_dev,
77dfe99f 1548 rbd_dev->snap_id,
602adf40 1549 ofs,
1fec7093
YS
1550 op_size, bio,
1551 coll, cur_seg);
602adf40 1552
1fec7093 1553next_seg:
602adf40
YS
1554 size -= op_size;
1555 ofs += op_size;
1556
1fec7093 1557 cur_seg++;
602adf40
YS
1558 rq_bio = next_bio;
1559 } while (size > 0);
1fec7093 1560 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1561
1562 if (bp)
1563 bio_pair_release(bp);
602adf40 1564 spin_lock_irq(q->queue_lock);
d1d25646
JD
1565
1566 ceph_put_snap_context(snapc);
602adf40
YS
1567 }
1568}
1569
1570/*
1571 * a queue callback. Makes sure that we don't create a bio that spans across
1572 * multiple osd objects. One exception would be with a single page bios,
1573 * which we handle later at bio_chain_clone
1574 */
1575static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1576 struct bio_vec *bvec)
1577{
1578 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1579 unsigned int chunk_sectors;
1580 sector_t sector;
1581 unsigned int bio_sectors;
602adf40
YS
1582 int max;
1583
593a9e7b
AE
1584 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1585 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1586 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1587
602adf40 1588 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1589 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1590 if (max < 0)
1591 max = 0; /* bio_add cannot handle a negative return */
1592 if (max <= bvec->bv_len && bio_sectors == 0)
1593 return bvec->bv_len;
1594 return max;
1595}
1596
1597static void rbd_free_disk(struct rbd_device *rbd_dev)
1598{
1599 struct gendisk *disk = rbd_dev->disk;
1600
1601 if (!disk)
1602 return;
1603
1604 rbd_header_free(&rbd_dev->header);
1605
1606 if (disk->flags & GENHD_FL_UP)
1607 del_gendisk(disk);
1608 if (disk->queue)
1609 blk_cleanup_queue(disk->queue);
1610 put_disk(disk);
1611}
1612
1613/*
1614 * reload the ondisk the header
1615 */
1616static int rbd_read_header(struct rbd_device *rbd_dev,
1617 struct rbd_image_header *header)
1618{
1619 ssize_t rc;
1620 struct rbd_image_header_ondisk *dh;
50f7c4c9 1621 u32 snap_count = 0;
59c2be1e 1622 u64 ver;
00f1f36f 1623 size_t len;
602adf40 1624
00f1f36f
AE
1625 /*
1626 * First reads the fixed-size header to determine the number
1627 * of snapshots, then re-reads it, along with all snapshot
1628 * records as well as their stored names.
1629 */
1630 len = sizeof (*dh);
602adf40 1631 while (1) {
602adf40
YS
1632 dh = kmalloc(len, GFP_KERNEL);
1633 if (!dh)
1634 return -ENOMEM;
1635
1636 rc = rbd_req_sync_read(rbd_dev,
9a5d690b 1637 CEPH_NOSNAP,
0bed54dc 1638 rbd_dev->header_name,
602adf40 1639 0, len,
59c2be1e 1640 (char *)dh, &ver);
602adf40
YS
1641 if (rc < 0)
1642 goto out_dh;
1643
ed63f4fd 1644 rc = rbd_header_from_disk(header, dh, snap_count);
81e759fb 1645 if (rc < 0) {
00f1f36f 1646 if (rc == -ENXIO)
81e759fb 1647 pr_warning("unrecognized header format"
0bed54dc
AE
1648 " for image %s\n",
1649 rbd_dev->image_name);
602adf40 1650 goto out_dh;
81e759fb 1651 }
602adf40 1652
00f1f36f
AE
1653 if (snap_count == header->total_snaps)
1654 break;
1655
1656 snap_count = header->total_snaps;
1657 len = sizeof (*dh) +
1658 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1659 header->snap_names_len;
1660
1661 rbd_header_free(header);
1662 kfree(dh);
602adf40 1663 }
59c2be1e 1664 header->obj_version = ver;
602adf40
YS
1665
1666out_dh:
1667 kfree(dh);
1668 return rc;
1669}
1670
1671/*
1672 * create a snapshot
1673 */
0ce1a794 1674static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1675 const char *snap_name,
1676 gfp_t gfp_flags)
1677{
1678 int name_len = strlen(snap_name);
1679 u64 new_snapid;
1680 int ret;
916d4d67 1681 void *data, *p, *e;
1dbb4399 1682 struct ceph_mon_client *monc;
602adf40
YS
1683
1684 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1685 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1686 return -EINVAL;
1687
0ce1a794
AE
1688 monc = &rbd_dev->rbd_client->client->monc;
1689 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
bd919d45 1690 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
602adf40
YS
1691 if (ret < 0)
1692 return ret;
1693
1694 data = kmalloc(name_len + 16, gfp_flags);
1695 if (!data)
1696 return -ENOMEM;
1697
916d4d67
SW
1698 p = data;
1699 e = data + name_len + 16;
602adf40 1700
916d4d67
SW
1701 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1702 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1703
0bed54dc 1704 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1705 "rbd", "snap_add",
d67d4be5 1706 data, p - data, NULL);
602adf40 1707
916d4d67 1708 kfree(data);
602adf40 1709
505cbb9b 1710 return ret < 0 ? ret : 0;
602adf40
YS
1711bad:
1712 return -ERANGE;
1713}
1714
dfc5606d
YS
1715static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1716{
1717 struct rbd_snap *snap;
a0593290 1718 struct rbd_snap *next;
dfc5606d 1719
a0593290 1720 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
14e7085d 1721 __rbd_remove_snap_dev(snap);
dfc5606d
YS
1722}
1723
602adf40
YS
1724/*
1725 * only read the first part of the ondisk header, without the snaps info
1726 */
b813623a 1727static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
1728{
1729 int ret;
1730 struct rbd_image_header h;
602adf40
YS
1731
1732 ret = rbd_read_header(rbd_dev, &h);
1733 if (ret < 0)
1734 return ret;
1735
a51aa0c0
JD
1736 down_write(&rbd_dev->header_rwsem);
1737
9db4b3e3 1738 /* resized? */
474ef7ce
JD
1739 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1740 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1741
1742 dout("setting size to %llu sectors", (unsigned long long) size);
1743 set_capacity(rbd_dev->disk, size);
1744 }
9db4b3e3 1745
849b4260 1746 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1747 kfree(rbd_dev->header.snap_sizes);
849b4260 1748 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1749 /* osd requests may still refer to snapc */
1750 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1751
b813623a
AE
1752 if (hver)
1753 *hver = h.obj_version;
a71b891b 1754 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1755 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1756 rbd_dev->header.total_snaps = h.total_snaps;
1757 rbd_dev->header.snapc = h.snapc;
1758 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1759 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1760 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1761 /* Free the extra copy of the object prefix */
1762 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1763 kfree(h.object_prefix);
1764
dfc5606d
YS
1765 ret = __rbd_init_snaps_header(rbd_dev);
1766
c666601a 1767 up_write(&rbd_dev->header_rwsem);
602adf40 1768
dfc5606d 1769 return ret;
602adf40
YS
1770}
1771
1fe5e993
AE
1772static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1773{
1774 int ret;
1775
1776 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1777 ret = __rbd_refresh_header(rbd_dev, hver);
1778 mutex_unlock(&ctl_mutex);
1779
1780 return ret;
1781}
1782
602adf40
YS
1783static int rbd_init_disk(struct rbd_device *rbd_dev)
1784{
1785 struct gendisk *disk;
1786 struct request_queue *q;
1787 int rc;
593a9e7b 1788 u64 segment_size;
602adf40
YS
1789 u64 total_size = 0;
1790
1791 /* contact OSD, request size info about the object being mapped */
1792 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1793 if (rc)
1794 return rc;
1795
dfc5606d
YS
1796 /* no need to lock here, as rbd_dev is not registered yet */
1797 rc = __rbd_init_snaps_header(rbd_dev);
1798 if (rc)
1799 return rc;
1800
cc9d734c 1801 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1802 if (rc)
1803 return rc;
1804
1805 /* create gendisk info */
1806 rc = -ENOMEM;
1807 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1808 if (!disk)
1809 goto out;
1810
f0f8cef5 1811 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 1812 rbd_dev->dev_id);
602adf40
YS
1813 disk->major = rbd_dev->major;
1814 disk->first_minor = 0;
1815 disk->fops = &rbd_bd_ops;
1816 disk->private_data = rbd_dev;
1817
1818 /* init rq */
1819 rc = -ENOMEM;
1820 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1821 if (!q)
1822 goto out_disk;
029bcbd8 1823
593a9e7b
AE
1824 /* We use the default size, but let's be explicit about it. */
1825 blk_queue_physical_block_size(q, SECTOR_SIZE);
1826
029bcbd8 1827 /* set io sizes to object size */
593a9e7b
AE
1828 segment_size = rbd_obj_bytes(&rbd_dev->header);
1829 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1830 blk_queue_max_segment_size(q, segment_size);
1831 blk_queue_io_min(q, segment_size);
1832 blk_queue_io_opt(q, segment_size);
029bcbd8 1833
602adf40
YS
1834 blk_queue_merge_bvec(q, rbd_merge_bvec);
1835 disk->queue = q;
1836
1837 q->queuedata = rbd_dev;
1838
1839 rbd_dev->disk = disk;
1840 rbd_dev->q = q;
1841
1842 /* finally, announce the disk to the world */
593a9e7b 1843 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1844 add_disk(disk);
1845
1846 pr_info("%s: added with size 0x%llx\n",
1847 disk->disk_name, (unsigned long long)total_size);
1848 return 0;
1849
1850out_disk:
1851 put_disk(disk);
1852out:
1853 return rc;
1854}
1855
dfc5606d
YS
1856/*
1857 sysfs
1858*/
1859
593a9e7b
AE
1860static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1861{
1862 return container_of(dev, struct rbd_device, dev);
1863}
1864
dfc5606d
YS
1865static ssize_t rbd_size_show(struct device *dev,
1866 struct device_attribute *attr, char *buf)
1867{
593a9e7b 1868 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1869 sector_t size;
1870
1871 down_read(&rbd_dev->header_rwsem);
1872 size = get_capacity(rbd_dev->disk);
1873 up_read(&rbd_dev->header_rwsem);
dfc5606d 1874
a51aa0c0 1875 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1876}
1877
1878static ssize_t rbd_major_show(struct device *dev,
1879 struct device_attribute *attr, char *buf)
1880{
593a9e7b 1881 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1882
dfc5606d
YS
1883 return sprintf(buf, "%d\n", rbd_dev->major);
1884}
1885
1886static ssize_t rbd_client_id_show(struct device *dev,
1887 struct device_attribute *attr, char *buf)
602adf40 1888{
593a9e7b 1889 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1890
1dbb4399
AE
1891 return sprintf(buf, "client%lld\n",
1892 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1893}
1894
dfc5606d
YS
1895static ssize_t rbd_pool_show(struct device *dev,
1896 struct device_attribute *attr, char *buf)
602adf40 1897{
593a9e7b 1898 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1899
1900 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1901}
1902
9bb2f334
AE
1903static ssize_t rbd_pool_id_show(struct device *dev,
1904 struct device_attribute *attr, char *buf)
1905{
1906 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1907
1908 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1909}
1910
dfc5606d
YS
1911static ssize_t rbd_name_show(struct device *dev,
1912 struct device_attribute *attr, char *buf)
1913{
593a9e7b 1914 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1915
0bed54dc 1916 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1917}
1918
1919static ssize_t rbd_snap_show(struct device *dev,
1920 struct device_attribute *attr,
1921 char *buf)
1922{
593a9e7b 1923 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1924
1925 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1926}
1927
1928static ssize_t rbd_image_refresh(struct device *dev,
1929 struct device_attribute *attr,
1930 const char *buf,
1931 size_t size)
1932{
593a9e7b 1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 1934 int ret;
602adf40 1935
1fe5e993 1936 ret = rbd_refresh_header(rbd_dev, NULL);
b813623a
AE
1937
1938 return ret < 0 ? ret : size;
dfc5606d 1939}
602adf40 1940
dfc5606d
YS
1941static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1942static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1943static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1944static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 1945static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
1946static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1947static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1948static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1949static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1950
1951static struct attribute *rbd_attrs[] = {
1952 &dev_attr_size.attr,
1953 &dev_attr_major.attr,
1954 &dev_attr_client_id.attr,
1955 &dev_attr_pool.attr,
9bb2f334 1956 &dev_attr_pool_id.attr,
dfc5606d
YS
1957 &dev_attr_name.attr,
1958 &dev_attr_current_snap.attr,
1959 &dev_attr_refresh.attr,
1960 &dev_attr_create_snap.attr,
dfc5606d
YS
1961 NULL
1962};
1963
1964static struct attribute_group rbd_attr_group = {
1965 .attrs = rbd_attrs,
1966};
1967
1968static const struct attribute_group *rbd_attr_groups[] = {
1969 &rbd_attr_group,
1970 NULL
1971};
1972
1973static void rbd_sysfs_dev_release(struct device *dev)
1974{
1975}
1976
1977static struct device_type rbd_device_type = {
1978 .name = "rbd",
1979 .groups = rbd_attr_groups,
1980 .release = rbd_sysfs_dev_release,
1981};
1982
1983
1984/*
1985 sysfs - snapshots
1986*/
1987
1988static ssize_t rbd_snap_size_show(struct device *dev,
1989 struct device_attribute *attr,
1990 char *buf)
1991{
1992 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1993
3591538f 1994 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
1995}
1996
1997static ssize_t rbd_snap_id_show(struct device *dev,
1998 struct device_attribute *attr,
1999 char *buf)
2000{
2001 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2002
3591538f 2003 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2004}
2005
2006static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2007static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2008
2009static struct attribute *rbd_snap_attrs[] = {
2010 &dev_attr_snap_size.attr,
2011 &dev_attr_snap_id.attr,
2012 NULL,
2013};
2014
2015static struct attribute_group rbd_snap_attr_group = {
2016 .attrs = rbd_snap_attrs,
2017};
2018
2019static void rbd_snap_dev_release(struct device *dev)
2020{
2021 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2022 kfree(snap->name);
2023 kfree(snap);
2024}
2025
2026static const struct attribute_group *rbd_snap_attr_groups[] = {
2027 &rbd_snap_attr_group,
2028 NULL
2029};
2030
2031static struct device_type rbd_snap_device_type = {
2032 .groups = rbd_snap_attr_groups,
2033 .release = rbd_snap_dev_release,
2034};
2035
14e7085d 2036static void __rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2037{
2038 list_del(&snap->node);
2039 device_unregister(&snap->dev);
2040}
2041
14e7085d 2042static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2043 struct device *parent)
2044{
2045 struct device *dev = &snap->dev;
2046 int ret;
2047
2048 dev->type = &rbd_snap_device_type;
2049 dev->parent = parent;
2050 dev->release = rbd_snap_dev_release;
2051 dev_set_name(dev, "snap_%s", snap->name);
2052 ret = device_register(dev);
2053
2054 return ret;
2055}
2056
4e891e0a
AE
2057static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2058 int i, const char *name)
dfc5606d 2059{
4e891e0a 2060 struct rbd_snap *snap;
dfc5606d 2061 int ret;
4e891e0a
AE
2062
2063 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2064 if (!snap)
4e891e0a
AE
2065 return ERR_PTR(-ENOMEM);
2066
2067 ret = -ENOMEM;
dfc5606d 2068 snap->name = kstrdup(name, GFP_KERNEL);
4e891e0a
AE
2069 if (!snap->name)
2070 goto err;
2071
dfc5606d
YS
2072 snap->size = rbd_dev->header.snap_sizes[i];
2073 snap->id = rbd_dev->header.snapc->snaps[i];
2074 if (device_is_registered(&rbd_dev->dev)) {
14e7085d 2075 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d
YS
2076 if (ret < 0)
2077 goto err;
2078 }
4e891e0a
AE
2079
2080 return snap;
2081
dfc5606d
YS
2082err:
2083 kfree(snap->name);
2084 kfree(snap);
4e891e0a
AE
2085
2086 return ERR_PTR(ret);
dfc5606d
YS
2087}
2088
2089/*
35938150
AE
2090 * Scan the rbd device's current snapshot list and compare it to the
2091 * newly-received snapshot context. Remove any existing snapshots
2092 * not present in the new snapshot context. Add a new snapshot for
2093 * any snaphots in the snapshot context not in the current list.
2094 * And verify there are no changes to snapshots we already know
2095 * about.
2096 *
2097 * Assumes the snapshots in the snapshot context are sorted by
2098 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2099 * are also maintained in that order.)
dfc5606d
YS
2100 */
2101static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2102{
35938150
AE
2103 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2104 const u32 snap_count = snapc->num_snaps;
2105 char *snap_name = rbd_dev->header.snap_names;
2106 struct list_head *head = &rbd_dev->snaps;
2107 struct list_head *links = head->next;
2108 u32 index = 0;
dfc5606d 2109
35938150
AE
2110 while (index < snap_count || links != head) {
2111 u64 snap_id;
2112 struct rbd_snap *snap;
dfc5606d 2113
35938150
AE
2114 snap_id = index < snap_count ? snapc->snaps[index]
2115 : CEPH_NOSNAP;
2116 snap = links != head ? list_entry(links, struct rbd_snap, node)
2117 : NULL;
2118 BUG_ON(snap && snap->id == CEPH_NOSNAP);
dfc5606d 2119
35938150
AE
2120 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2121 struct list_head *next = links->next;
dfc5606d 2122
35938150 2123 /* Existing snapshot not in the new snap context */
dfc5606d 2124
35938150 2125 if (rbd_dev->snap_id == snap->id)
e88a36ec 2126 rbd_dev->snap_exists = false;
35938150
AE
2127 __rbd_remove_snap_dev(snap);
2128
2129 /* Done with this list entry; advance */
2130
2131 links = next;
dfc5606d
YS
2132 continue;
2133 }
35938150
AE
2134
2135 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2136 struct rbd_snap *new_snap;
2137
2138 /* We haven't seen this snapshot before */
2139
2140 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2141 snap_name);
2142 if (IS_ERR(new_snap))
2143 return PTR_ERR(new_snap);
2144
2145 /* New goes before existing, or at end of list */
2146
2147 if (snap)
2148 list_add_tail(&new_snap->node, &snap->node);
2149 else
2150 list_add(&new_snap->node, head);
2151 } else {
2152 /* Already have this one */
2153
2154 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2155 BUG_ON(strcmp(snap->name, snap_name));
2156
2157 /* Done with this list entry; advance */
2158
2159 links = links->next;
dfc5606d 2160 }
35938150
AE
2161
2162 /* Advance to the next entry in the snapshot context */
2163
2164 index++;
2165 snap_name += strlen(snap_name) + 1;
dfc5606d
YS
2166 }
2167
2168 return 0;
2169}
2170
dfc5606d
YS
2171static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2172{
f0f8cef5 2173 int ret;
dfc5606d
YS
2174 struct device *dev;
2175 struct rbd_snap *snap;
2176
2177 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2178 dev = &rbd_dev->dev;
2179
2180 dev->bus = &rbd_bus_type;
2181 dev->type = &rbd_device_type;
2182 dev->parent = &rbd_root_dev;
2183 dev->release = rbd_dev_release;
de71a297 2184 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d
YS
2185 ret = device_register(dev);
2186 if (ret < 0)
f0f8cef5 2187 goto out;
dfc5606d
YS
2188
2189 list_for_each_entry(snap, &rbd_dev->snaps, node) {
14e7085d 2190 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d 2191 if (ret < 0)
602adf40
YS
2192 break;
2193 }
f0f8cef5 2194out:
dfc5606d
YS
2195 mutex_unlock(&ctl_mutex);
2196 return ret;
602adf40
YS
2197}
2198
dfc5606d
YS
2199static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2200{
2201 device_unregister(&rbd_dev->dev);
2202}
2203
59c2be1e
YS
2204static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2205{
2206 int ret, rc;
2207
2208 do {
0e6f322d 2209 ret = rbd_req_sync_watch(rbd_dev);
59c2be1e 2210 if (ret == -ERANGE) {
1fe5e993 2211 rc = rbd_refresh_header(rbd_dev, NULL);
59c2be1e
YS
2212 if (rc < 0)
2213 return rc;
2214 }
2215 } while (ret == -ERANGE);
2216
2217 return ret;
2218}
2219
1ddbe94e
AE
2220static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2221
2222/*
499afd5b
AE
2223 * Get a unique rbd identifier for the given new rbd_dev, and add
2224 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2225 */
499afd5b 2226static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2227{
de71a297 2228 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
499afd5b
AE
2229
2230 spin_lock(&rbd_dev_list_lock);
2231 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2232 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2233}
b7f23c36 2234
1ddbe94e 2235/*
499afd5b
AE
2236 * Remove an rbd_dev from the global list, and record that its
2237 * identifier is no longer in use.
1ddbe94e 2238 */
499afd5b 2239static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2240{
d184f6bf 2241 struct list_head *tmp;
de71a297 2242 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
2243 int max_id;
2244
2245 BUG_ON(rbd_id < 1);
499afd5b
AE
2246
2247 spin_lock(&rbd_dev_list_lock);
2248 list_del_init(&rbd_dev->node);
d184f6bf
AE
2249
2250 /*
2251 * If the id being "put" is not the current maximum, there
2252 * is nothing special we need to do.
2253 */
2254 if (rbd_id != atomic64_read(&rbd_id_max)) {
2255 spin_unlock(&rbd_dev_list_lock);
2256 return;
2257 }
2258
2259 /*
2260 * We need to update the current maximum id. Search the
2261 * list to find out what it is. We're more likely to find
2262 * the maximum at the end, so search the list backward.
2263 */
2264 max_id = 0;
2265 list_for_each_prev(tmp, &rbd_dev_list) {
2266 struct rbd_device *rbd_dev;
2267
2268 rbd_dev = list_entry(tmp, struct rbd_device, node);
2269 if (rbd_id > max_id)
2270 max_id = rbd_id;
2271 }
499afd5b 2272 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2273
1ddbe94e 2274 /*
d184f6bf
AE
2275 * The max id could have been updated by rbd_id_get(), in
2276 * which case it now accurately reflects the new maximum.
2277 * Be careful not to overwrite the maximum value in that
2278 * case.
1ddbe94e 2279 */
d184f6bf 2280 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2281}
2282
e28fff26
AE
2283/*
2284 * Skips over white space at *buf, and updates *buf to point to the
2285 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2286 * the token (string of non-white space characters) found. Note
2287 * that *buf must be terminated with '\0'.
e28fff26
AE
2288 */
2289static inline size_t next_token(const char **buf)
2290{
2291 /*
2292 * These are the characters that produce nonzero for
2293 * isspace() in the "C" and "POSIX" locales.
2294 */
2295 const char *spaces = " \f\n\r\t\v";
2296
2297 *buf += strspn(*buf, spaces); /* Find start of token */
2298
2299 return strcspn(*buf, spaces); /* Return token length */
2300}
2301
2302/*
2303 * Finds the next token in *buf, and if the provided token buffer is
2304 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2305 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2306 * must be terminated with '\0' on entry.
e28fff26
AE
2307 *
2308 * Returns the length of the token found (not including the '\0').
2309 * Return value will be 0 if no token is found, and it will be >=
2310 * token_size if the token would not fit.
2311 *
593a9e7b 2312 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2313 * found token. Note that this occurs even if the token buffer is
2314 * too small to hold it.
2315 */
2316static inline size_t copy_token(const char **buf,
2317 char *token,
2318 size_t token_size)
2319{
2320 size_t len;
2321
2322 len = next_token(buf);
2323 if (len < token_size) {
2324 memcpy(token, *buf, len);
2325 *(token + len) = '\0';
2326 }
2327 *buf += len;
2328
2329 return len;
2330}
2331
ea3352f4
AE
2332/*
2333 * Finds the next token in *buf, dynamically allocates a buffer big
2334 * enough to hold a copy of it, and copies the token into the new
2335 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2336 * that a duplicate buffer is created even for a zero-length token.
2337 *
2338 * Returns a pointer to the newly-allocated duplicate, or a null
2339 * pointer if memory for the duplicate was not available. If
2340 * the lenp argument is a non-null pointer, the length of the token
2341 * (not including the '\0') is returned in *lenp.
2342 *
2343 * If successful, the *buf pointer will be updated to point beyond
2344 * the end of the found token.
2345 *
2346 * Note: uses GFP_KERNEL for allocation.
2347 */
2348static inline char *dup_token(const char **buf, size_t *lenp)
2349{
2350 char *dup;
2351 size_t len;
2352
2353 len = next_token(buf);
2354 dup = kmalloc(len + 1, GFP_KERNEL);
2355 if (!dup)
2356 return NULL;
2357
2358 memcpy(dup, *buf, len);
2359 *(dup + len) = '\0';
2360 *buf += len;
2361
2362 if (lenp)
2363 *lenp = len;
2364
2365 return dup;
2366}
2367
a725f65e 2368/*
0bed54dc 2369 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2370 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2371 * on the list of monitor addresses and other options provided via
2372 * /sys/bus/rbd/add.
d22f76e7
AE
2373 *
2374 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2375 */
2376static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2377 const char *buf,
7ef3214a 2378 const char **mon_addrs,
5214ecc4 2379 size_t *mon_addrs_size,
e28fff26 2380 char *options,
0bed54dc 2381 size_t options_size)
e28fff26 2382{
d22f76e7
AE
2383 size_t len;
2384 int ret;
e28fff26
AE
2385
2386 /* The first four tokens are required */
2387
7ef3214a
AE
2388 len = next_token(&buf);
2389 if (!len)
a725f65e 2390 return -EINVAL;
5214ecc4 2391 *mon_addrs_size = len + 1;
7ef3214a
AE
2392 *mon_addrs = buf;
2393
2394 buf += len;
a725f65e 2395
e28fff26
AE
2396 len = copy_token(&buf, options, options_size);
2397 if (!len || len >= options_size)
2398 return -EINVAL;
2399
bf3e5ae1 2400 ret = -ENOMEM;
d22f76e7
AE
2401 rbd_dev->pool_name = dup_token(&buf, NULL);
2402 if (!rbd_dev->pool_name)
d22f76e7 2403 goto out_err;
e28fff26 2404
0bed54dc
AE
2405 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2406 if (!rbd_dev->image_name)
bf3e5ae1 2407 goto out_err;
a725f65e 2408
cb8627c7
AE
2409 /* Create the name of the header object */
2410
0bed54dc 2411 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2412 + sizeof (RBD_SUFFIX),
2413 GFP_KERNEL);
0bed54dc 2414 if (!rbd_dev->header_name)
cb8627c7 2415 goto out_err;
0bed54dc 2416 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2417
e28fff26 2418 /*
820a5f3e
AE
2419 * The snapshot name is optional. If none is is supplied,
2420 * we use the default value.
e28fff26 2421 */
820a5f3e
AE
2422 rbd_dev->snap_name = dup_token(&buf, &len);
2423 if (!rbd_dev->snap_name)
2424 goto out_err;
2425 if (!len) {
2426 /* Replace the empty name with the default */
2427 kfree(rbd_dev->snap_name);
2428 rbd_dev->snap_name
2429 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2430 if (!rbd_dev->snap_name)
2431 goto out_err;
2432
e28fff26
AE
2433 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2434 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2435 }
e28fff26 2436
a725f65e 2437 return 0;
d22f76e7
AE
2438
2439out_err:
0bed54dc 2440 kfree(rbd_dev->header_name);
d78fd7ae 2441 rbd_dev->header_name = NULL;
0bed54dc 2442 kfree(rbd_dev->image_name);
d78fd7ae
AE
2443 rbd_dev->image_name = NULL;
2444 rbd_dev->image_name_len = 0;
d22f76e7
AE
2445 kfree(rbd_dev->pool_name);
2446 rbd_dev->pool_name = NULL;
2447
2448 return ret;
a725f65e
AE
2449}
2450
59c2be1e
YS
2451static ssize_t rbd_add(struct bus_type *bus,
2452 const char *buf,
2453 size_t count)
602adf40 2454{
cb8627c7
AE
2455 char *options;
2456 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2457 const char *mon_addrs = NULL;
2458 size_t mon_addrs_size = 0;
27cc2594
AE
2459 struct ceph_osd_client *osdc;
2460 int rc = -ENOMEM;
602adf40
YS
2461
2462 if (!try_module_get(THIS_MODULE))
2463 return -ENODEV;
2464
60571c7d 2465 options = kmalloc(count, GFP_KERNEL);
602adf40 2466 if (!options)
27cc2594 2467 goto err_nomem;
cb8627c7
AE
2468 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2469 if (!rbd_dev)
2470 goto err_nomem;
602adf40
YS
2471
2472 /* static rbd_device initialization */
2473 spin_lock_init(&rbd_dev->lock);
2474 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2475 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2476 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2477
d184f6bf 2478 /* generate unique id: find highest unique id, add one */
499afd5b 2479 rbd_id_get(rbd_dev);
602adf40 2480
a725f65e 2481 /* Fill in the device name, now that we have its id. */
81a89793
AE
2482 BUILD_BUG_ON(DEV_NAME_LEN
2483 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
de71a297 2484 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
a725f65e 2485
602adf40 2486 /* parse add command */
7ef3214a 2487 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2488 options, count);
a725f65e 2489 if (rc)
f0f8cef5 2490 goto err_put_id;
e124a82f 2491
5214ecc4
AE
2492 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2493 options);
d720bcb0
AE
2494 if (IS_ERR(rbd_dev->rbd_client)) {
2495 rc = PTR_ERR(rbd_dev->rbd_client);
d78fd7ae 2496 rbd_dev->rbd_client = NULL;
f0f8cef5 2497 goto err_put_id;
d720bcb0 2498 }
602adf40 2499
602adf40 2500 /* pick the pool */
1dbb4399 2501 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2502 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2503 if (rc < 0)
2504 goto err_out_client;
9bb2f334 2505 rbd_dev->pool_id = rc;
602adf40
YS
2506
2507 /* register our block device */
27cc2594
AE
2508 rc = register_blkdev(0, rbd_dev->name);
2509 if (rc < 0)
602adf40 2510 goto err_out_client;
27cc2594 2511 rbd_dev->major = rc;
602adf40 2512
dfc5606d
YS
2513 rc = rbd_bus_add_dev(rbd_dev);
2514 if (rc)
766fc439
YS
2515 goto err_out_blkdev;
2516
32eec68d
AE
2517 /*
2518 * At this point cleanup in the event of an error is the job
2519 * of the sysfs code (initiated by rbd_bus_del_dev()).
2520 *
2521 * Set up and announce blkdev mapping.
2522 */
602adf40
YS
2523 rc = rbd_init_disk(rbd_dev);
2524 if (rc)
766fc439 2525 goto err_out_bus;
602adf40 2526
59c2be1e
YS
2527 rc = rbd_init_watch_dev(rbd_dev);
2528 if (rc)
2529 goto err_out_bus;
2530
602adf40
YS
2531 return count;
2532
766fc439 2533err_out_bus:
766fc439
YS
2534 /* this will also clean up rest of rbd_dev stuff */
2535
2536 rbd_bus_del_dev(rbd_dev);
2537 kfree(options);
766fc439
YS
2538 return rc;
2539
602adf40
YS
2540err_out_blkdev:
2541 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2542err_out_client:
2543 rbd_put_client(rbd_dev);
f0f8cef5 2544err_put_id:
cb8627c7 2545 if (rbd_dev->pool_name) {
820a5f3e 2546 kfree(rbd_dev->snap_name);
0bed54dc
AE
2547 kfree(rbd_dev->header_name);
2548 kfree(rbd_dev->image_name);
cb8627c7
AE
2549 kfree(rbd_dev->pool_name);
2550 }
499afd5b 2551 rbd_id_put(rbd_dev);
27cc2594 2552err_nomem:
27cc2594 2553 kfree(rbd_dev);
cb8627c7 2554 kfree(options);
27cc2594 2555
602adf40
YS
2556 dout("Error adding device %s\n", buf);
2557 module_put(THIS_MODULE);
27cc2594
AE
2558
2559 return (ssize_t) rc;
602adf40
YS
2560}
2561
de71a297 2562static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
2563{
2564 struct list_head *tmp;
2565 struct rbd_device *rbd_dev;
2566
e124a82f 2567 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2568 list_for_each(tmp, &rbd_dev_list) {
2569 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 2570 if (rbd_dev->dev_id == dev_id) {
e124a82f 2571 spin_unlock(&rbd_dev_list_lock);
602adf40 2572 return rbd_dev;
e124a82f 2573 }
602adf40 2574 }
e124a82f 2575 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2576 return NULL;
2577}
2578
dfc5606d 2579static void rbd_dev_release(struct device *dev)
602adf40 2580{
593a9e7b 2581 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2582
1dbb4399
AE
2583 if (rbd_dev->watch_request) {
2584 struct ceph_client *client = rbd_dev->rbd_client->client;
2585
2586 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2587 rbd_dev->watch_request);
1dbb4399 2588 }
59c2be1e 2589 if (rbd_dev->watch_event)
070c633f 2590 rbd_req_sync_unwatch(rbd_dev);
59c2be1e 2591
602adf40
YS
2592 rbd_put_client(rbd_dev);
2593
2594 /* clean up and free blkdev */
2595 rbd_free_disk(rbd_dev);
2596 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2597
2598 /* done with the id, and with the rbd_dev */
820a5f3e 2599 kfree(rbd_dev->snap_name);
0bed54dc 2600 kfree(rbd_dev->header_name);
d22f76e7 2601 kfree(rbd_dev->pool_name);
0bed54dc 2602 kfree(rbd_dev->image_name);
32eec68d 2603 rbd_id_put(rbd_dev);
602adf40
YS
2604 kfree(rbd_dev);
2605
2606 /* release module ref */
2607 module_put(THIS_MODULE);
602adf40
YS
2608}
2609
dfc5606d
YS
2610static ssize_t rbd_remove(struct bus_type *bus,
2611 const char *buf,
2612 size_t count)
602adf40
YS
2613{
2614 struct rbd_device *rbd_dev = NULL;
2615 int target_id, rc;
2616 unsigned long ul;
2617 int ret = count;
2618
2619 rc = strict_strtoul(buf, 10, &ul);
2620 if (rc)
2621 return rc;
2622
2623 /* convert to int; abort if we lost anything in the conversion */
2624 target_id = (int) ul;
2625 if (target_id != ul)
2626 return -EINVAL;
2627
2628 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2629
2630 rbd_dev = __rbd_get_dev(target_id);
2631 if (!rbd_dev) {
2632 ret = -ENOENT;
2633 goto done;
2634 }
2635
dfc5606d
YS
2636 __rbd_remove_all_snaps(rbd_dev);
2637 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2638
2639done:
2640 mutex_unlock(&ctl_mutex);
2641 return ret;
2642}
2643
dfc5606d
YS
2644static ssize_t rbd_snap_add(struct device *dev,
2645 struct device_attribute *attr,
2646 const char *buf,
2647 size_t count)
602adf40 2648{
593a9e7b 2649 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2650 int ret;
2651 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2652 if (!name)
2653 return -ENOMEM;
2654
dfc5606d 2655 snprintf(name, count, "%s", buf);
602adf40
YS
2656
2657 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2658
602adf40
YS
2659 ret = rbd_header_add_snap(rbd_dev,
2660 name, GFP_KERNEL);
2661 if (ret < 0)
59c2be1e 2662 goto err_unlock;
602adf40 2663
b813623a 2664 ret = __rbd_refresh_header(rbd_dev, NULL);
602adf40 2665 if (ret < 0)
59c2be1e
YS
2666 goto err_unlock;
2667
2668 /* shouldn't hold ctl_mutex when notifying.. notify might
2669 trigger a watch callback that would need to get that mutex */
2670 mutex_unlock(&ctl_mutex);
2671
2672 /* make a best effort, don't error if failed */
4cb16250 2673 rbd_req_sync_notify(rbd_dev);
602adf40
YS
2674
2675 ret = count;
59c2be1e
YS
2676 kfree(name);
2677 return ret;
2678
2679err_unlock:
602adf40 2680 mutex_unlock(&ctl_mutex);
602adf40
YS
2681 kfree(name);
2682 return ret;
2683}
2684
602adf40
YS
2685/*
2686 * create control files in sysfs
dfc5606d 2687 * /sys/bus/rbd/...
602adf40
YS
2688 */
2689static int rbd_sysfs_init(void)
2690{
dfc5606d 2691 int ret;
602adf40 2692
fed4c143 2693 ret = device_register(&rbd_root_dev);
21079786 2694 if (ret < 0)
dfc5606d 2695 return ret;
602adf40 2696
fed4c143
AE
2697 ret = bus_register(&rbd_bus_type);
2698 if (ret < 0)
2699 device_unregister(&rbd_root_dev);
602adf40 2700
602adf40
YS
2701 return ret;
2702}
2703
2704static void rbd_sysfs_cleanup(void)
2705{
dfc5606d 2706 bus_unregister(&rbd_bus_type);
fed4c143 2707 device_unregister(&rbd_root_dev);
602adf40
YS
2708}
2709
2710int __init rbd_init(void)
2711{
2712 int rc;
2713
2714 rc = rbd_sysfs_init();
2715 if (rc)
2716 return rc;
f0f8cef5 2717 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2718 return 0;
2719}
2720
2721void __exit rbd_exit(void)
2722{
2723 rbd_sysfs_cleanup();
2724}
2725
2726module_init(rbd_init);
2727module_exit(rbd_exit);
2728
2729MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2730MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2731MODULE_DESCRIPTION("rados block device");
2732
2733/* following authorship retained from original osdblk.c */
2734MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2735
2736MODULE_LICENSE("GPL");