rbd: do a few checks at build time
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
f0f8cef5
AE
44#define RBD_DRV_NAME "rbd"
45#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
46
47#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48
21079786 49#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
602adf40
YS
50#define RBD_MAX_POOL_NAME_LEN 64
51#define RBD_MAX_SNAP_NAME_LEN 32
52#define RBD_MAX_OPT_LEN 1024
53
54#define RBD_SNAP_HEAD_NAME "-"
55
81a89793
AE
56/*
57 * An RBD device name will be "rbd#", where the "rbd" comes from
58 * RBD_DRV_NAME above, and # is a unique integer identifier.
59 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
60 * enough to hold all possible device names.
61 */
602adf40 62#define DEV_NAME_LEN 32
81a89793 63#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 64
59c2be1e
YS
65#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
66
602adf40
YS
67/*
68 * block device image metadata (in-memory version)
69 */
70struct rbd_image_header {
71 u64 image_size;
72 char block_name[32];
73 __u8 obj_order;
74 __u8 crypt_type;
75 __u8 comp_type;
76 struct rw_semaphore snap_rwsem;
77 struct ceph_snap_context *snapc;
78 size_t snap_names_len;
79 u64 snap_seq;
80 u32 total_snaps;
81
82 char *snap_names;
83 u64 *snap_sizes;
59c2be1e
YS
84
85 u64 obj_version;
86};
87
88struct rbd_options {
89 int notify_timeout;
602adf40
YS
90};
91
92/*
f0f8cef5 93 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
94 */
95struct rbd_client {
96 struct ceph_client *client;
59c2be1e 97 struct rbd_options *rbd_opts;
602adf40
YS
98 struct kref kref;
99 struct list_head node;
100};
101
102/*
f0f8cef5 103 * a request completion status
602adf40 104 */
1fec7093
YS
105struct rbd_req_status {
106 int done;
107 int rc;
108 u64 bytes;
109};
110
111/*
112 * a collection of requests
113 */
114struct rbd_req_coll {
115 int total;
116 int num_done;
117 struct kref kref;
118 struct rbd_req_status status[0];
602adf40
YS
119};
120
f0f8cef5
AE
121/*
122 * a single io request
123 */
124struct rbd_request {
125 struct request *rq; /* blk layer request */
126 struct bio *bio; /* cloned bio */
127 struct page **pages; /* list of used pages */
128 u64 len;
129 int coll_index;
130 struct rbd_req_coll *coll;
131};
132
dfc5606d
YS
133struct rbd_snap {
134 struct device dev;
135 const char *name;
136 size_t size;
137 struct list_head node;
138 u64 id;
139};
140
602adf40
YS
141/*
142 * a single device
143 */
144struct rbd_device {
145 int id; /* blkdev unique id */
146
147 int major; /* blkdev assigned major */
148 struct gendisk *disk; /* blkdev's gendisk and rq */
149 struct request_queue *q;
150
602adf40
YS
151 struct rbd_client *rbd_client;
152
153 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
154
155 spinlock_t lock; /* queue lock */
156
157 struct rbd_image_header header;
158 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
159 int obj_len;
160 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
161 char pool_name[RBD_MAX_POOL_NAME_LEN];
162 int poolid;
163
59c2be1e
YS
164 struct ceph_osd_event *watch_event;
165 struct ceph_osd_request *watch_request;
166
602adf40
YS
167 char snap_name[RBD_MAX_SNAP_NAME_LEN];
168 u32 cur_snap; /* index+1 of current snapshot within snap context
169 0 - for the head */
170 int read_only;
171
172 struct list_head node;
dfc5606d
YS
173
174 /* list of snapshots */
175 struct list_head snaps;
176
177 /* sysfs related */
178 struct device dev;
179};
180
602adf40 181static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 182
602adf40 183static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
184static DEFINE_SPINLOCK(rbd_dev_list_lock);
185
432b8587
AE
186static LIST_HEAD(rbd_client_list); /* clients */
187static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 188
dfc5606d
YS
189static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
190static void rbd_dev_release(struct device *dev);
dfc5606d
YS
191static ssize_t rbd_snap_add(struct device *dev,
192 struct device_attribute *attr,
193 const char *buf,
194 size_t count);
195static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 196 struct rbd_snap *snap);
dfc5606d 197
f0f8cef5
AE
198static ssize_t rbd_add(struct bus_type *bus, const char *buf,
199 size_t count);
200static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
201 size_t count);
202
203static struct bus_attribute rbd_bus_attrs[] = {
204 __ATTR(add, S_IWUSR, NULL, rbd_add),
205 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
206 __ATTR_NULL
207};
208
209static struct bus_type rbd_bus_type = {
210 .name = "rbd",
211 .bus_attrs = rbd_bus_attrs,
212};
213
214static void rbd_root_dev_release(struct device *dev)
215{
216}
217
218static struct device rbd_root_dev = {
219 .init_name = "rbd",
220 .release = rbd_root_dev_release,
221};
222
dfc5606d
YS
223
224static struct rbd_device *dev_to_rbd(struct device *dev)
225{
226 return container_of(dev, struct rbd_device, dev);
227}
228
229static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
230{
231 return get_device(&rbd_dev->dev);
232}
233
234static void rbd_put_dev(struct rbd_device *rbd_dev)
235{
236 put_device(&rbd_dev->dev);
237}
602adf40 238
59c2be1e
YS
239static int __rbd_update_snaps(struct rbd_device *rbd_dev);
240
602adf40
YS
241static int rbd_open(struct block_device *bdev, fmode_t mode)
242{
f0f8cef5 243 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 244
dfc5606d
YS
245 rbd_get_dev(rbd_dev);
246
602adf40
YS
247 set_device_ro(bdev, rbd_dev->read_only);
248
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250 return -EROFS;
251
252 return 0;
253}
254
dfc5606d
YS
255static int rbd_release(struct gendisk *disk, fmode_t mode)
256{
257 struct rbd_device *rbd_dev = disk->private_data;
258
259 rbd_put_dev(rbd_dev);
260
261 return 0;
262}
263
602adf40
YS
264static const struct block_device_operations rbd_bd_ops = {
265 .owner = THIS_MODULE,
266 .open = rbd_open,
dfc5606d 267 .release = rbd_release,
602adf40
YS
268};
269
270/*
271 * Initialize an rbd client instance.
272 * We own *opt.
273 */
59c2be1e
YS
274static struct rbd_client *rbd_client_create(struct ceph_options *opt,
275 struct rbd_options *rbd_opts)
602adf40
YS
276{
277 struct rbd_client *rbdc;
278 int ret = -ENOMEM;
279
280 dout("rbd_client_create\n");
281 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
282 if (!rbdc)
283 goto out_opt;
284
285 kref_init(&rbdc->kref);
286 INIT_LIST_HEAD(&rbdc->node);
287
bc534d86
AE
288 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
289
6ab00d46 290 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
602adf40 291 if (IS_ERR(rbdc->client))
bc534d86 292 goto out_mutex;
28f259b7 293 opt = NULL; /* Now rbdc->client is responsible for opt */
602adf40
YS
294
295 ret = ceph_open_session(rbdc->client);
296 if (ret < 0)
297 goto out_err;
298
59c2be1e
YS
299 rbdc->rbd_opts = rbd_opts;
300
432b8587 301 spin_lock(&rbd_client_list_lock);
602adf40 302 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 303 spin_unlock(&rbd_client_list_lock);
602adf40 304
bc534d86
AE
305 mutex_unlock(&ctl_mutex);
306
602adf40
YS
307 dout("rbd_client_create created %p\n", rbdc);
308 return rbdc;
309
310out_err:
311 ceph_destroy_client(rbdc->client);
bc534d86
AE
312out_mutex:
313 mutex_unlock(&ctl_mutex);
602adf40
YS
314 kfree(rbdc);
315out_opt:
28f259b7
VK
316 if (opt)
317 ceph_destroy_options(opt);
318 return ERR_PTR(ret);
602adf40
YS
319}
320
321/*
322 * Find a ceph client with specific addr and configuration.
323 */
324static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
325{
326 struct rbd_client *client_node;
327
328 if (opt->flags & CEPH_OPT_NOSHARE)
329 return NULL;
330
331 list_for_each_entry(client_node, &rbd_client_list, node)
332 if (ceph_compare_options(opt, client_node->client) == 0)
333 return client_node;
334 return NULL;
335}
336
59c2be1e
YS
337/*
338 * mount options
339 */
340enum {
341 Opt_notify_timeout,
342 Opt_last_int,
343 /* int args above */
344 Opt_last_string,
345 /* string args above */
346};
347
348static match_table_t rbdopt_tokens = {
349 {Opt_notify_timeout, "notify_timeout=%d"},
350 /* int args above */
351 /* string args above */
352 {-1, NULL}
353};
354
355static int parse_rbd_opts_token(char *c, void *private)
356{
357 struct rbd_options *rbdopt = private;
358 substring_t argstr[MAX_OPT_ARGS];
359 int token, intval, ret;
360
21079786 361 token = match_token(c, rbdopt_tokens, argstr);
59c2be1e
YS
362 if (token < 0)
363 return -EINVAL;
364
365 if (token < Opt_last_int) {
366 ret = match_int(&argstr[0], &intval);
367 if (ret < 0) {
368 pr_err("bad mount option arg (not int) "
369 "at '%s'\n", c);
370 return ret;
371 }
372 dout("got int token %d val %d\n", token, intval);
373 } else if (token > Opt_last_int && token < Opt_last_string) {
374 dout("got string token %d val %s\n", token,
375 argstr[0].from);
376 } else {
377 dout("got token %d\n", token);
378 }
379
380 switch (token) {
381 case Opt_notify_timeout:
382 rbdopt->notify_timeout = intval;
383 break;
384 default:
385 BUG_ON(token);
386 }
387 return 0;
388}
389
602adf40
YS
390/*
391 * Get a ceph client with specific addr and configuration, if one does
392 * not exist create it.
393 */
d720bcb0 394static struct rbd_client *rbd_get_client(const char *mon_addr, char *options)
602adf40
YS
395{
396 struct rbd_client *rbdc;
397 struct ceph_options *opt;
59c2be1e
YS
398 struct rbd_options *rbd_opts;
399
400 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
401 if (!rbd_opts)
d720bcb0 402 return ERR_PTR(-ENOMEM);
59c2be1e
YS
403
404 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 405
ee57741c 406 opt = ceph_parse_options(options, mon_addr,
21079786
AE
407 mon_addr + strlen(mon_addr),
408 parse_rbd_opts_token, rbd_opts);
ee57741c 409 if (IS_ERR(opt)) {
d720bcb0
AE
410 kfree(rbd_opts);
411 return ERR_CAST(opt);
ee57741c 412 }
602adf40 413
432b8587 414 spin_lock(&rbd_client_list_lock);
602adf40
YS
415 rbdc = __rbd_client_find(opt);
416 if (rbdc) {
602adf40
YS
417 /* using an existing client */
418 kref_get(&rbdc->kref);
432b8587 419 spin_unlock(&rbd_client_list_lock);
e6994d3d 420
e6994d3d
AE
421 ceph_destroy_options(opt);
422 kfree(rbd_opts);
423
d720bcb0 424 return rbdc;
602adf40 425 }
432b8587 426 spin_unlock(&rbd_client_list_lock);
602adf40 427
59c2be1e 428 rbdc = rbd_client_create(opt, rbd_opts);
d97081b0 429
d720bcb0
AE
430 if (IS_ERR(rbdc))
431 kfree(rbd_opts);
602adf40 432
d720bcb0 433 return rbdc;
602adf40
YS
434}
435
436/*
437 * Destroy ceph client
d23a4b3f 438 *
432b8587 439 * Caller must hold rbd_client_list_lock.
602adf40
YS
440 */
441static void rbd_client_release(struct kref *kref)
442{
443 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
444
445 dout("rbd_release_client %p\n", rbdc);
602adf40 446 list_del(&rbdc->node);
602adf40
YS
447
448 ceph_destroy_client(rbdc->client);
59c2be1e 449 kfree(rbdc->rbd_opts);
602adf40
YS
450 kfree(rbdc);
451}
452
453/*
454 * Drop reference to ceph client node. If it's not referenced anymore, release
455 * it.
456 */
457static void rbd_put_client(struct rbd_device *rbd_dev)
458{
432b8587 459 spin_lock(&rbd_client_list_lock);
602adf40 460 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
432b8587 461 spin_unlock(&rbd_client_list_lock);
602adf40 462 rbd_dev->rbd_client = NULL;
602adf40
YS
463}
464
1fec7093
YS
465/*
466 * Destroy requests collection
467 */
468static void rbd_coll_release(struct kref *kref)
469{
470 struct rbd_req_coll *coll =
471 container_of(kref, struct rbd_req_coll, kref);
472
473 dout("rbd_coll_release %p\n", coll);
474 kfree(coll);
475}
602adf40
YS
476
477/*
478 * Create a new header structure, translate header format from the on-disk
479 * header.
480 */
481static int rbd_header_from_disk(struct rbd_image_header *header,
482 struct rbd_image_header_ondisk *ondisk,
483 int allocated_snaps,
484 gfp_t gfp_flags)
485{
486 int i;
487 u32 snap_count = le32_to_cpu(ondisk->snap_count);
488 int ret = -ENOMEM;
489
21079786 490 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 491 return -ENXIO;
81e759fb 492
602adf40 493 init_rwsem(&header->snap_rwsem);
602adf40
YS
494 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
495 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
21079786 496 snap_count * sizeof (*ondisk),
602adf40
YS
497 gfp_flags);
498 if (!header->snapc)
499 return -ENOMEM;
500 if (snap_count) {
501 header->snap_names = kmalloc(header->snap_names_len,
502 GFP_KERNEL);
503 if (!header->snap_names)
504 goto err_snapc;
505 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
506 GFP_KERNEL);
507 if (!header->snap_sizes)
508 goto err_names;
509 } else {
510 header->snap_names = NULL;
511 header->snap_sizes = NULL;
512 }
513 memcpy(header->block_name, ondisk->block_name,
514 sizeof(ondisk->block_name));
515
516 header->image_size = le64_to_cpu(ondisk->image_size);
517 header->obj_order = ondisk->options.order;
518 header->crypt_type = ondisk->options.crypt_type;
519 header->comp_type = ondisk->options.comp_type;
520
521 atomic_set(&header->snapc->nref, 1);
522 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
523 header->snapc->num_snaps = snap_count;
524 header->total_snaps = snap_count;
525
21079786 526 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
527 for (i = 0; i < snap_count; i++) {
528 header->snapc->snaps[i] =
529 le64_to_cpu(ondisk->snaps[i].id);
530 header->snap_sizes[i] =
531 le64_to_cpu(ondisk->snaps[i].image_size);
532 }
533
534 /* copy snapshot names */
535 memcpy(header->snap_names, &ondisk->snaps[i],
536 header->snap_names_len);
537 }
538
539 return 0;
540
541err_names:
542 kfree(header->snap_names);
543err_snapc:
544 kfree(header->snapc);
545 return ret;
546}
547
548static int snap_index(struct rbd_image_header *header, int snap_num)
549{
550 return header->total_snaps - snap_num;
551}
552
553static u64 cur_snap_id(struct rbd_device *rbd_dev)
554{
555 struct rbd_image_header *header = &rbd_dev->header;
556
557 if (!rbd_dev->cur_snap)
558 return 0;
559
560 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
561}
562
563static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
564 u64 *seq, u64 *size)
565{
566 int i;
567 char *p = header->snap_names;
568
569 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
570 if (strcmp(snap_name, p) == 0)
571 break;
572 }
573 if (i == header->total_snaps)
574 return -ENOENT;
575 if (seq)
576 *seq = header->snapc->snaps[i];
577
578 if (size)
579 *size = header->snap_sizes[i];
580
581 return i;
582}
583
cc9d734c 584static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
602adf40
YS
585{
586 struct rbd_image_header *header = &dev->header;
587 struct ceph_snap_context *snapc = header->snapc;
588 int ret = -ENOENT;
589
cc9d734c
JD
590 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
591
602adf40
YS
592 down_write(&header->snap_rwsem);
593
cc9d734c
JD
594 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
595 sizeof (RBD_SNAP_HEAD_NAME))) {
602adf40
YS
596 if (header->total_snaps)
597 snapc->seq = header->snap_seq;
598 else
599 snapc->seq = 0;
600 dev->cur_snap = 0;
601 dev->read_only = 0;
602 if (size)
603 *size = header->image_size;
604 } else {
cc9d734c 605 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
602adf40
YS
606 if (ret < 0)
607 goto done;
608
609 dev->cur_snap = header->total_snaps - ret;
610 dev->read_only = 1;
611 }
612
613 ret = 0;
614done:
615 up_write(&header->snap_rwsem);
616 return ret;
617}
618
619static void rbd_header_free(struct rbd_image_header *header)
620{
621 kfree(header->snapc);
622 kfree(header->snap_names);
623 kfree(header->snap_sizes);
624}
625
626/*
627 * get the actual striped segment name, offset and length
628 */
629static u64 rbd_get_segment(struct rbd_image_header *header,
630 const char *block_name,
631 u64 ofs, u64 len,
632 char *seg_name, u64 *segofs)
633{
634 u64 seg = ofs >> header->obj_order;
635
636 if (seg_name)
637 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
638 "%s.%012llx", block_name, seg);
639
640 ofs = ofs & ((1 << header->obj_order) - 1);
641 len = min_t(u64, len, (1 << header->obj_order) - ofs);
642
643 if (segofs)
644 *segofs = ofs;
645
646 return len;
647}
648
1fec7093
YS
649static int rbd_get_num_segments(struct rbd_image_header *header,
650 u64 ofs, u64 len)
651{
652 u64 start_seg = ofs >> header->obj_order;
653 u64 end_seg = (ofs + len - 1) >> header->obj_order;
654 return end_seg - start_seg + 1;
655}
656
029bcbd8
JD
657/*
658 * returns the size of an object in the image
659 */
660static u64 rbd_obj_bytes(struct rbd_image_header *header)
661{
662 return 1 << header->obj_order;
663}
664
602adf40
YS
665/*
666 * bio helpers
667 */
668
669static void bio_chain_put(struct bio *chain)
670{
671 struct bio *tmp;
672
673 while (chain) {
674 tmp = chain;
675 chain = chain->bi_next;
676 bio_put(tmp);
677 }
678}
679
680/*
681 * zeros a bio chain, starting at specific offset
682 */
683static void zero_bio_chain(struct bio *chain, int start_ofs)
684{
685 struct bio_vec *bv;
686 unsigned long flags;
687 void *buf;
688 int i;
689 int pos = 0;
690
691 while (chain) {
692 bio_for_each_segment(bv, chain, i) {
693 if (pos + bv->bv_len > start_ofs) {
694 int remainder = max(start_ofs - pos, 0);
695 buf = bvec_kmap_irq(bv, &flags);
696 memset(buf + remainder, 0,
697 bv->bv_len - remainder);
85b5aaa6 698 bvec_kunmap_irq(buf, &flags);
602adf40
YS
699 }
700 pos += bv->bv_len;
701 }
702
703 chain = chain->bi_next;
704 }
705}
706
707/*
708 * bio_chain_clone - clone a chain of bios up to a certain length.
709 * might return a bio_pair that will need to be released.
710 */
711static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
712 struct bio_pair **bp,
713 int len, gfp_t gfpmask)
714{
715 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
716 int total = 0;
717
718 if (*bp) {
719 bio_pair_release(*bp);
720 *bp = NULL;
721 }
722
723 while (old_chain && (total < len)) {
724 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
725 if (!tmp)
726 goto err_out;
727
728 if (total + old_chain->bi_size > len) {
729 struct bio_pair *bp;
730
731 /*
732 * this split can only happen with a single paged bio,
733 * split_bio will BUG_ON if this is not the case
734 */
735 dout("bio_chain_clone split! total=%d remaining=%d"
736 "bi_size=%d\n",
737 (int)total, (int)len-total,
738 (int)old_chain->bi_size);
739
740 /* split the bio. We'll release it either in the next
741 call, or it will have to be released outside */
742 bp = bio_split(old_chain, (len - total) / 512ULL);
743 if (!bp)
744 goto err_out;
745
746 __bio_clone(tmp, &bp->bio1);
747
748 *next = &bp->bio2;
749 } else {
750 __bio_clone(tmp, old_chain);
751 *next = old_chain->bi_next;
752 }
753
754 tmp->bi_bdev = NULL;
755 gfpmask &= ~__GFP_WAIT;
756 tmp->bi_next = NULL;
757
758 if (!new_chain) {
759 new_chain = tail = tmp;
760 } else {
761 tail->bi_next = tmp;
762 tail = tmp;
763 }
764 old_chain = old_chain->bi_next;
765
766 total += tmp->bi_size;
767 }
768
769 BUG_ON(total < len);
770
771 if (tail)
772 tail->bi_next = NULL;
773
774 *old = old_chain;
775
776 return new_chain;
777
778err_out:
779 dout("bio_chain_clone with err\n");
780 bio_chain_put(new_chain);
781 return NULL;
782}
783
784/*
785 * helpers for osd request op vectors.
786 */
787static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
788 int num_ops,
789 int opcode,
790 u32 payload_len)
791{
792 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
793 GFP_NOIO);
794 if (!*ops)
795 return -ENOMEM;
796 (*ops)[0].op = opcode;
797 /*
798 * op extent offset and length will be set later on
799 * in calc_raw_layout()
800 */
801 (*ops)[0].payload_len = payload_len;
802 return 0;
803}
804
805static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
806{
807 kfree(ops);
808}
809
1fec7093
YS
810static void rbd_coll_end_req_index(struct request *rq,
811 struct rbd_req_coll *coll,
812 int index,
813 int ret, u64 len)
814{
815 struct request_queue *q;
816 int min, max, i;
817
818 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
819 coll, index, ret, len);
820
821 if (!rq)
822 return;
823
824 if (!coll) {
825 blk_end_request(rq, ret, len);
826 return;
827 }
828
829 q = rq->q;
830
831 spin_lock_irq(q->queue_lock);
832 coll->status[index].done = 1;
833 coll->status[index].rc = ret;
834 coll->status[index].bytes = len;
835 max = min = coll->num_done;
836 while (max < coll->total && coll->status[max].done)
837 max++;
838
839 for (i = min; i<max; i++) {
840 __blk_end_request(rq, coll->status[i].rc,
841 coll->status[i].bytes);
842 coll->num_done++;
843 kref_put(&coll->kref, rbd_coll_release);
844 }
845 spin_unlock_irq(q->queue_lock);
846}
847
848static void rbd_coll_end_req(struct rbd_request *req,
849 int ret, u64 len)
850{
851 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
852}
853
602adf40
YS
854/*
855 * Send ceph osd request
856 */
857static int rbd_do_request(struct request *rq,
858 struct rbd_device *dev,
859 struct ceph_snap_context *snapc,
860 u64 snapid,
861 const char *obj, u64 ofs, u64 len,
862 struct bio *bio,
863 struct page **pages,
864 int num_pages,
865 int flags,
866 struct ceph_osd_req_op *ops,
867 int num_reply,
1fec7093
YS
868 struct rbd_req_coll *coll,
869 int coll_index,
602adf40 870 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
871 struct ceph_msg *msg),
872 struct ceph_osd_request **linger_req,
873 u64 *ver)
602adf40
YS
874{
875 struct ceph_osd_request *req;
876 struct ceph_file_layout *layout;
877 int ret;
878 u64 bno;
879 struct timespec mtime = CURRENT_TIME;
880 struct rbd_request *req_data;
881 struct ceph_osd_request_head *reqhead;
882 struct rbd_image_header *header = &dev->header;
1dbb4399 883 struct ceph_osd_client *osdc;
602adf40 884
602adf40 885 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
886 if (!req_data) {
887 if (coll)
888 rbd_coll_end_req_index(rq, coll, coll_index,
889 -ENOMEM, len);
890 return -ENOMEM;
891 }
892
893 if (coll) {
894 req_data->coll = coll;
895 req_data->coll_index = coll_index;
896 }
602adf40 897
1fec7093 898 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
602adf40
YS
899
900 down_read(&header->snap_rwsem);
901
1dbb4399
AE
902 osdc = &dev->rbd_client->client->osdc;
903 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
904 false, GFP_NOIO, pages, bio);
4ad12621 905 if (!req) {
602adf40 906 up_read(&header->snap_rwsem);
4ad12621 907 ret = -ENOMEM;
602adf40
YS
908 goto done_pages;
909 }
910
911 req->r_callback = rbd_cb;
912
913 req_data->rq = rq;
914 req_data->bio = bio;
915 req_data->pages = pages;
916 req_data->len = len;
917
918 req->r_priv = req_data;
919
920 reqhead = req->r_request->front.iov_base;
921 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
922
923 strncpy(req->r_oid, obj, sizeof(req->r_oid));
924 req->r_oid_len = strlen(req->r_oid);
925
926 layout = &req->r_file_layout;
927 memset(layout, 0, sizeof(*layout));
928 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
929 layout->fl_stripe_count = cpu_to_le32(1);
930 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931 layout->fl_pg_preferred = cpu_to_le32(-1);
932 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
1dbb4399
AE
933 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
934 req, ops);
602adf40
YS
935
936 ceph_osdc_build_request(req, ofs, &len,
937 ops,
938 snapc,
939 &mtime,
940 req->r_oid, req->r_oid_len);
941 up_read(&header->snap_rwsem);
942
59c2be1e 943 if (linger_req) {
1dbb4399 944 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
945 *linger_req = req;
946 }
947
1dbb4399 948 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
949 if (ret < 0)
950 goto done_err;
951
952 if (!rbd_cb) {
1dbb4399 953 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
954 if (ver)
955 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
956 dout("reassert_ver=%lld\n",
957 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
958 ceph_osdc_put_request(req);
959 }
960 return ret;
961
962done_err:
963 bio_chain_put(req_data->bio);
964 ceph_osdc_put_request(req);
965done_pages:
1fec7093 966 rbd_coll_end_req(req_data, ret, len);
602adf40 967 kfree(req_data);
602adf40
YS
968 return ret;
969}
970
971/*
972 * Ceph osd op callback
973 */
974static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
975{
976 struct rbd_request *req_data = req->r_priv;
977 struct ceph_osd_reply_head *replyhead;
978 struct ceph_osd_op *op;
979 __s32 rc;
980 u64 bytes;
981 int read_op;
982
983 /* parse reply */
984 replyhead = msg->front.iov_base;
985 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
986 op = (void *)(replyhead + 1);
987 rc = le32_to_cpu(replyhead->result);
988 bytes = le64_to_cpu(op->extent.length);
989 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
990
991 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
992
993 if (rc == -ENOENT && read_op) {
994 zero_bio_chain(req_data->bio, 0);
995 rc = 0;
996 } else if (rc == 0 && read_op && bytes < req_data->len) {
997 zero_bio_chain(req_data->bio, bytes);
998 bytes = req_data->len;
999 }
1000
1fec7093 1001 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1002
1003 if (req_data->bio)
1004 bio_chain_put(req_data->bio);
1005
1006 ceph_osdc_put_request(req);
1007 kfree(req_data);
1008}
1009
59c2be1e
YS
1010static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1011{
1012 ceph_osdc_put_request(req);
1013}
1014
602adf40
YS
1015/*
1016 * Do a synchronous ceph osd operation
1017 */
1018static int rbd_req_sync_op(struct rbd_device *dev,
1019 struct ceph_snap_context *snapc,
1020 u64 snapid,
1021 int opcode,
1022 int flags,
1023 struct ceph_osd_req_op *orig_ops,
1024 int num_reply,
1025 const char *obj,
1026 u64 ofs, u64 len,
59c2be1e
YS
1027 char *buf,
1028 struct ceph_osd_request **linger_req,
1029 u64 *ver)
602adf40
YS
1030{
1031 int ret;
1032 struct page **pages;
1033 int num_pages;
1034 struct ceph_osd_req_op *ops = orig_ops;
1035 u32 payload_len;
1036
1037 num_pages = calc_pages_for(ofs , len);
1038 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1039 if (IS_ERR(pages))
1040 return PTR_ERR(pages);
602adf40
YS
1041
1042 if (!orig_ops) {
1043 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1044 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1045 if (ret < 0)
1046 goto done;
1047
1048 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1049 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1050 if (ret < 0)
1051 goto done_ops;
1052 }
1053 }
1054
1055 ret = rbd_do_request(NULL, dev, snapc, snapid,
1056 obj, ofs, len, NULL,
1057 pages, num_pages,
1058 flags,
1059 ops,
1060 2,
1fec7093 1061 NULL, 0,
59c2be1e
YS
1062 NULL,
1063 linger_req, ver);
602adf40
YS
1064 if (ret < 0)
1065 goto done_ops;
1066
1067 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1068 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1069
1070done_ops:
1071 if (!orig_ops)
1072 rbd_destroy_ops(ops);
1073done:
1074 ceph_release_page_vector(pages, num_pages);
1075 return ret;
1076}
1077
1078/*
1079 * Do an asynchronous ceph osd operation
1080 */
1081static int rbd_do_op(struct request *rq,
1082 struct rbd_device *rbd_dev ,
1083 struct ceph_snap_context *snapc,
1084 u64 snapid,
1085 int opcode, int flags, int num_reply,
1086 u64 ofs, u64 len,
1fec7093
YS
1087 struct bio *bio,
1088 struct rbd_req_coll *coll,
1089 int coll_index)
602adf40
YS
1090{
1091 char *seg_name;
1092 u64 seg_ofs;
1093 u64 seg_len;
1094 int ret;
1095 struct ceph_osd_req_op *ops;
1096 u32 payload_len;
1097
1098 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1099 if (!seg_name)
1100 return -ENOMEM;
1101
1102 seg_len = rbd_get_segment(&rbd_dev->header,
1103 rbd_dev->header.block_name,
1104 ofs, len,
1105 seg_name, &seg_ofs);
602adf40
YS
1106
1107 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1108
1109 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1110 if (ret < 0)
1111 goto done;
1112
1113 /* we've taken care of segment sizes earlier when we
1114 cloned the bios. We should never have a segment
1115 truncated at this point */
1116 BUG_ON(seg_len < len);
1117
1118 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1119 seg_name, seg_ofs, seg_len,
1120 bio,
1121 NULL, 0,
1122 flags,
1123 ops,
1124 num_reply,
1fec7093 1125 coll, coll_index,
59c2be1e 1126 rbd_req_cb, 0, NULL);
11f77002
SW
1127
1128 rbd_destroy_ops(ops);
602adf40
YS
1129done:
1130 kfree(seg_name);
1131 return ret;
1132}
1133
1134/*
1135 * Request async osd write
1136 */
1137static int rbd_req_write(struct request *rq,
1138 struct rbd_device *rbd_dev,
1139 struct ceph_snap_context *snapc,
1140 u64 ofs, u64 len,
1fec7093
YS
1141 struct bio *bio,
1142 struct rbd_req_coll *coll,
1143 int coll_index)
602adf40
YS
1144{
1145 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1146 CEPH_OSD_OP_WRITE,
1147 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1148 2,
1fec7093 1149 ofs, len, bio, coll, coll_index);
602adf40
YS
1150}
1151
1152/*
1153 * Request async osd read
1154 */
1155static int rbd_req_read(struct request *rq,
1156 struct rbd_device *rbd_dev,
1157 u64 snapid,
1158 u64 ofs, u64 len,
1fec7093
YS
1159 struct bio *bio,
1160 struct rbd_req_coll *coll,
1161 int coll_index)
602adf40
YS
1162{
1163 return rbd_do_op(rq, rbd_dev, NULL,
1164 (snapid ? snapid : CEPH_NOSNAP),
1165 CEPH_OSD_OP_READ,
1166 CEPH_OSD_FLAG_READ,
1167 2,
1fec7093 1168 ofs, len, bio, coll, coll_index);
602adf40
YS
1169}
1170
1171/*
1172 * Request sync osd read
1173 */
1174static int rbd_req_sync_read(struct rbd_device *dev,
1175 struct ceph_snap_context *snapc,
1176 u64 snapid,
1177 const char *obj,
1178 u64 ofs, u64 len,
59c2be1e
YS
1179 char *buf,
1180 u64 *ver)
602adf40
YS
1181{
1182 return rbd_req_sync_op(dev, NULL,
1183 (snapid ? snapid : CEPH_NOSNAP),
1184 CEPH_OSD_OP_READ,
1185 CEPH_OSD_FLAG_READ,
1186 NULL,
59c2be1e 1187 1, obj, ofs, len, buf, NULL, ver);
602adf40
YS
1188}
1189
1190/*
59c2be1e
YS
1191 * Request sync osd watch
1192 */
1193static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1194 u64 ver,
1195 u64 notify_id,
1196 const char *obj)
1197{
1198 struct ceph_osd_req_op *ops;
1199 struct page **pages = NULL;
11f77002
SW
1200 int ret;
1201
1202 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1203 if (ret < 0)
1204 return ret;
1205
1206 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1207 ops[0].watch.cookie = notify_id;
1208 ops[0].watch.flag = 0;
1209
1210 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1211 obj, 0, 0, NULL,
1212 pages, 0,
1213 CEPH_OSD_FLAG_READ,
1214 ops,
1215 1,
1fec7093 1216 NULL, 0,
59c2be1e
YS
1217 rbd_simple_req_cb, 0, NULL);
1218
1219 rbd_destroy_ops(ops);
1220 return ret;
1221}
1222
1223static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1224{
1225 struct rbd_device *dev = (struct rbd_device *)data;
13143d2d
SW
1226 int rc;
1227
59c2be1e
YS
1228 if (!dev)
1229 return;
1230
1231 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1232 notify_id, (int)opcode);
1233 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
13143d2d 1234 rc = __rbd_update_snaps(dev);
59c2be1e 1235 mutex_unlock(&ctl_mutex);
13143d2d 1236 if (rc)
f0f8cef5
AE
1237 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1238 " update snaps: %d\n", dev->major, rc);
59c2be1e
YS
1239
1240 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1241}
1242
1243/*
1244 * Request sync osd watch
1245 */
1246static int rbd_req_sync_watch(struct rbd_device *dev,
1247 const char *obj,
1248 u64 ver)
1249{
1250 struct ceph_osd_req_op *ops;
1dbb4399 1251 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1252
1253 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1254 if (ret < 0)
1255 return ret;
1256
1257 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1258 (void *)dev, &dev->watch_event);
1259 if (ret < 0)
1260 goto fail;
1261
1262 ops[0].watch.ver = cpu_to_le64(ver);
1263 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1264 ops[0].watch.flag = 1;
1265
1266 ret = rbd_req_sync_op(dev, NULL,
1267 CEPH_NOSNAP,
1268 0,
1269 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1270 ops,
1271 1, obj, 0, 0, NULL,
1272 &dev->watch_request, NULL);
1273
1274 if (ret < 0)
1275 goto fail_event;
1276
1277 rbd_destroy_ops(ops);
1278 return 0;
1279
1280fail_event:
1281 ceph_osdc_cancel_event(dev->watch_event);
1282 dev->watch_event = NULL;
1283fail:
1284 rbd_destroy_ops(ops);
1285 return ret;
1286}
1287
79e3057c
YS
1288/*
1289 * Request sync osd unwatch
1290 */
1291static int rbd_req_sync_unwatch(struct rbd_device *dev,
1292 const char *obj)
1293{
1294 struct ceph_osd_req_op *ops;
1295
1296 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1297 if (ret < 0)
1298 return ret;
1299
1300 ops[0].watch.ver = 0;
1301 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1302 ops[0].watch.flag = 0;
1303
1304 ret = rbd_req_sync_op(dev, NULL,
1305 CEPH_NOSNAP,
1306 0,
1307 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1308 ops,
1309 1, obj, 0, 0, NULL, NULL, NULL);
1310
1311 rbd_destroy_ops(ops);
1312 ceph_osdc_cancel_event(dev->watch_event);
1313 dev->watch_event = NULL;
1314 return ret;
1315}
1316
59c2be1e
YS
1317struct rbd_notify_info {
1318 struct rbd_device *dev;
1319};
1320
1321static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1322{
1323 struct rbd_device *dev = (struct rbd_device *)data;
1324 if (!dev)
1325 return;
1326
1327 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1328 notify_id, (int)opcode);
1329}
1330
1331/*
1332 * Request sync osd notify
1333 */
1334static int rbd_req_sync_notify(struct rbd_device *dev,
1335 const char *obj)
1336{
1337 struct ceph_osd_req_op *ops;
1dbb4399 1338 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1339 struct ceph_osd_event *event;
1340 struct rbd_notify_info info;
1341 int payload_len = sizeof(u32) + sizeof(u32);
1342 int ret;
1343
1344 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1345 if (ret < 0)
1346 return ret;
1347
1348 info.dev = dev;
1349
1350 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1351 (void *)&info, &event);
1352 if (ret < 0)
1353 goto fail;
1354
1355 ops[0].watch.ver = 1;
1356 ops[0].watch.flag = 1;
1357 ops[0].watch.cookie = event->cookie;
1358 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1359 ops[0].watch.timeout = 12;
1360
1361 ret = rbd_req_sync_op(dev, NULL,
1362 CEPH_NOSNAP,
1363 0,
1364 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1365 ops,
1366 1, obj, 0, 0, NULL, NULL, NULL);
1367 if (ret < 0)
1368 goto fail_event;
1369
1370 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1371 dout("ceph_osdc_wait_event returned %d\n", ret);
1372 rbd_destroy_ops(ops);
1373 return 0;
1374
1375fail_event:
1376 ceph_osdc_cancel_event(event);
1377fail:
1378 rbd_destroy_ops(ops);
1379 return ret;
1380}
1381
602adf40
YS
1382/*
1383 * Request sync osd read
1384 */
1385static int rbd_req_sync_exec(struct rbd_device *dev,
1386 const char *obj,
1387 const char *cls,
1388 const char *method,
1389 const char *data,
59c2be1e
YS
1390 int len,
1391 u64 *ver)
602adf40
YS
1392{
1393 struct ceph_osd_req_op *ops;
1394 int cls_len = strlen(cls);
1395 int method_len = strlen(method);
1396 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1397 cls_len + method_len + len);
1398 if (ret < 0)
1399 return ret;
1400
1401 ops[0].cls.class_name = cls;
1402 ops[0].cls.class_len = (__u8)cls_len;
1403 ops[0].cls.method_name = method;
1404 ops[0].cls.method_len = (__u8)method_len;
1405 ops[0].cls.argc = 0;
1406 ops[0].cls.indata = data;
1407 ops[0].cls.indata_len = len;
1408
1409 ret = rbd_req_sync_op(dev, NULL,
1410 CEPH_NOSNAP,
1411 0,
1412 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1413 ops,
59c2be1e 1414 1, obj, 0, 0, NULL, NULL, ver);
602adf40
YS
1415
1416 rbd_destroy_ops(ops);
1417
1418 dout("cls_exec returned %d\n", ret);
1419 return ret;
1420}
1421
1fec7093
YS
1422static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1423{
1424 struct rbd_req_coll *coll =
1425 kzalloc(sizeof(struct rbd_req_coll) +
1426 sizeof(struct rbd_req_status) * num_reqs,
1427 GFP_ATOMIC);
1428
1429 if (!coll)
1430 return NULL;
1431 coll->total = num_reqs;
1432 kref_init(&coll->kref);
1433 return coll;
1434}
1435
602adf40
YS
1436/*
1437 * block device queue callback
1438 */
1439static void rbd_rq_fn(struct request_queue *q)
1440{
1441 struct rbd_device *rbd_dev = q->queuedata;
1442 struct request *rq;
1443 struct bio_pair *bp = NULL;
1444
1445 rq = blk_fetch_request(q);
1446
1447 while (1) {
1448 struct bio *bio;
1449 struct bio *rq_bio, *next_bio = NULL;
1450 bool do_write;
1451 int size, op_size = 0;
1452 u64 ofs;
1fec7093
YS
1453 int num_segs, cur_seg = 0;
1454 struct rbd_req_coll *coll;
602adf40
YS
1455
1456 /* peek at request from block layer */
1457 if (!rq)
1458 break;
1459
1460 dout("fetched request\n");
1461
1462 /* filter out block requests we don't understand */
1463 if ((rq->cmd_type != REQ_TYPE_FS)) {
1464 __blk_end_request_all(rq, 0);
1465 goto next;
1466 }
1467
1468 /* deduce our operation (read, write) */
1469 do_write = (rq_data_dir(rq) == WRITE);
1470
1471 size = blk_rq_bytes(rq);
1472 ofs = blk_rq_pos(rq) * 512ULL;
1473 rq_bio = rq->bio;
1474 if (do_write && rbd_dev->read_only) {
1475 __blk_end_request_all(rq, -EROFS);
1476 goto next;
1477 }
1478
1479 spin_unlock_irq(q->queue_lock);
1480
1481 dout("%s 0x%x bytes at 0x%llx\n",
1482 do_write ? "write" : "read",
1483 size, blk_rq_pos(rq) * 512ULL);
1484
1fec7093
YS
1485 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1486 coll = rbd_alloc_coll(num_segs);
1487 if (!coll) {
1488 spin_lock_irq(q->queue_lock);
1489 __blk_end_request_all(rq, -ENOMEM);
1490 goto next;
1491 }
1492
602adf40
YS
1493 do {
1494 /* a bio clone to be passed down to OSD req */
1495 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1496 op_size = rbd_get_segment(&rbd_dev->header,
1497 rbd_dev->header.block_name,
1498 ofs, size,
1499 NULL, NULL);
1fec7093 1500 kref_get(&coll->kref);
602adf40
YS
1501 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1502 op_size, GFP_ATOMIC);
1503 if (!bio) {
1fec7093
YS
1504 rbd_coll_end_req_index(rq, coll, cur_seg,
1505 -ENOMEM, op_size);
1506 goto next_seg;
602adf40
YS
1507 }
1508
1fec7093 1509
602adf40
YS
1510 /* init OSD command: write or read */
1511 if (do_write)
1512 rbd_req_write(rq, rbd_dev,
1513 rbd_dev->header.snapc,
1514 ofs,
1fec7093
YS
1515 op_size, bio,
1516 coll, cur_seg);
602adf40
YS
1517 else
1518 rbd_req_read(rq, rbd_dev,
1519 cur_snap_id(rbd_dev),
1520 ofs,
1fec7093
YS
1521 op_size, bio,
1522 coll, cur_seg);
602adf40 1523
1fec7093 1524next_seg:
602adf40
YS
1525 size -= op_size;
1526 ofs += op_size;
1527
1fec7093 1528 cur_seg++;
602adf40
YS
1529 rq_bio = next_bio;
1530 } while (size > 0);
1fec7093 1531 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1532
1533 if (bp)
1534 bio_pair_release(bp);
602adf40
YS
1535 spin_lock_irq(q->queue_lock);
1536next:
1537 rq = blk_fetch_request(q);
1538 }
1539}
1540
1541/*
1542 * a queue callback. Makes sure that we don't create a bio that spans across
1543 * multiple osd objects. One exception would be with a single page bios,
1544 * which we handle later at bio_chain_clone
1545 */
1546static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1547 struct bio_vec *bvec)
1548{
1549 struct rbd_device *rbd_dev = q->queuedata;
1550 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1551 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1552 unsigned int bio_sectors = bmd->bi_size >> 9;
1553 int max;
1554
1555 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1556 + bio_sectors)) << 9;
1557 if (max < 0)
1558 max = 0; /* bio_add cannot handle a negative return */
1559 if (max <= bvec->bv_len && bio_sectors == 0)
1560 return bvec->bv_len;
1561 return max;
1562}
1563
1564static void rbd_free_disk(struct rbd_device *rbd_dev)
1565{
1566 struct gendisk *disk = rbd_dev->disk;
1567
1568 if (!disk)
1569 return;
1570
1571 rbd_header_free(&rbd_dev->header);
1572
1573 if (disk->flags & GENHD_FL_UP)
1574 del_gendisk(disk);
1575 if (disk->queue)
1576 blk_cleanup_queue(disk->queue);
1577 put_disk(disk);
1578}
1579
1580/*
1581 * reload the ondisk the header
1582 */
1583static int rbd_read_header(struct rbd_device *rbd_dev,
1584 struct rbd_image_header *header)
1585{
1586 ssize_t rc;
1587 struct rbd_image_header_ondisk *dh;
1588 int snap_count = 0;
1589 u64 snap_names_len = 0;
59c2be1e 1590 u64 ver;
602adf40
YS
1591
1592 while (1) {
1593 int len = sizeof(*dh) +
1594 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1595 snap_names_len;
1596
1597 rc = -ENOMEM;
1598 dh = kmalloc(len, GFP_KERNEL);
1599 if (!dh)
1600 return -ENOMEM;
1601
1602 rc = rbd_req_sync_read(rbd_dev,
1603 NULL, CEPH_NOSNAP,
1604 rbd_dev->obj_md_name,
1605 0, len,
59c2be1e 1606 (char *)dh, &ver);
602adf40
YS
1607 if (rc < 0)
1608 goto out_dh;
1609
1610 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb
JD
1611 if (rc < 0) {
1612 if (rc == -ENXIO) {
1613 pr_warning("unrecognized header format"
1614 " for image %s", rbd_dev->obj);
1615 }
602adf40 1616 goto out_dh;
81e759fb 1617 }
602adf40
YS
1618
1619 if (snap_count != header->total_snaps) {
1620 snap_count = header->total_snaps;
1621 snap_names_len = header->snap_names_len;
1622 rbd_header_free(header);
1623 kfree(dh);
1624 continue;
1625 }
1626 break;
1627 }
59c2be1e 1628 header->obj_version = ver;
602adf40
YS
1629
1630out_dh:
1631 kfree(dh);
1632 return rc;
1633}
1634
1635/*
1636 * create a snapshot
1637 */
1638static int rbd_header_add_snap(struct rbd_device *dev,
1639 const char *snap_name,
1640 gfp_t gfp_flags)
1641{
1642 int name_len = strlen(snap_name);
1643 u64 new_snapid;
1644 int ret;
916d4d67 1645 void *data, *p, *e;
59c2be1e 1646 u64 ver;
1dbb4399 1647 struct ceph_mon_client *monc;
602adf40
YS
1648
1649 /* we should create a snapshot only if we're pointing at the head */
1650 if (dev->cur_snap)
1651 return -EINVAL;
1652
1dbb4399
AE
1653 monc = &dev->rbd_client->client->monc;
1654 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
602adf40
YS
1655 dout("created snapid=%lld\n", new_snapid);
1656 if (ret < 0)
1657 return ret;
1658
1659 data = kmalloc(name_len + 16, gfp_flags);
1660 if (!data)
1661 return -ENOMEM;
1662
916d4d67
SW
1663 p = data;
1664 e = data + name_len + 16;
602adf40 1665
916d4d67
SW
1666 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1667 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40
YS
1668
1669 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
916d4d67 1670 data, p - data, &ver);
602adf40 1671
916d4d67 1672 kfree(data);
602adf40
YS
1673
1674 if (ret < 0)
1675 return ret;
1676
1677 dev->header.snapc->seq = new_snapid;
1678
1679 return 0;
1680bad:
1681 return -ERANGE;
1682}
1683
dfc5606d
YS
1684static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1685{
1686 struct rbd_snap *snap;
1687
1688 while (!list_empty(&rbd_dev->snaps)) {
1689 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1690 __rbd_remove_snap_dev(rbd_dev, snap);
1691 }
1692}
1693
602adf40
YS
1694/*
1695 * only read the first part of the ondisk header, without the snaps info
1696 */
dfc5606d 1697static int __rbd_update_snaps(struct rbd_device *rbd_dev)
602adf40
YS
1698{
1699 int ret;
1700 struct rbd_image_header h;
1701 u64 snap_seq;
59c2be1e 1702 int follow_seq = 0;
602adf40
YS
1703
1704 ret = rbd_read_header(rbd_dev, &h);
1705 if (ret < 0)
1706 return ret;
1707
9db4b3e3
SW
1708 /* resized? */
1709 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1710
602adf40
YS
1711 down_write(&rbd_dev->header.snap_rwsem);
1712
1713 snap_seq = rbd_dev->header.snapc->seq;
59c2be1e
YS
1714 if (rbd_dev->header.total_snaps &&
1715 rbd_dev->header.snapc->snaps[0] == snap_seq)
1716 /* pointing at the head, will need to follow that
1717 if head moves */
1718 follow_seq = 1;
602adf40
YS
1719
1720 kfree(rbd_dev->header.snapc);
1721 kfree(rbd_dev->header.snap_names);
1722 kfree(rbd_dev->header.snap_sizes);
1723
1724 rbd_dev->header.total_snaps = h.total_snaps;
1725 rbd_dev->header.snapc = h.snapc;
1726 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1727 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1728 rbd_dev->header.snap_sizes = h.snap_sizes;
59c2be1e
YS
1729 if (follow_seq)
1730 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1731 else
1732 rbd_dev->header.snapc->seq = snap_seq;
602adf40 1733
dfc5606d
YS
1734 ret = __rbd_init_snaps_header(rbd_dev);
1735
602adf40
YS
1736 up_write(&rbd_dev->header.snap_rwsem);
1737
dfc5606d 1738 return ret;
602adf40
YS
1739}
1740
1741static int rbd_init_disk(struct rbd_device *rbd_dev)
1742{
1743 struct gendisk *disk;
1744 struct request_queue *q;
1745 int rc;
1746 u64 total_size = 0;
1747
1748 /* contact OSD, request size info about the object being mapped */
1749 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1750 if (rc)
1751 return rc;
1752
dfc5606d
YS
1753 /* no need to lock here, as rbd_dev is not registered yet */
1754 rc = __rbd_init_snaps_header(rbd_dev);
1755 if (rc)
1756 return rc;
1757
cc9d734c 1758 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1759 if (rc)
1760 return rc;
1761
1762 /* create gendisk info */
1763 rc = -ENOMEM;
1764 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1765 if (!disk)
1766 goto out;
1767
f0f8cef5 1768 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
aedfec59 1769 rbd_dev->id);
602adf40
YS
1770 disk->major = rbd_dev->major;
1771 disk->first_minor = 0;
1772 disk->fops = &rbd_bd_ops;
1773 disk->private_data = rbd_dev;
1774
1775 /* init rq */
1776 rc = -ENOMEM;
1777 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1778 if (!q)
1779 goto out_disk;
029bcbd8
JD
1780
1781 /* set io sizes to object size */
1782 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1783 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1784 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1785 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1786
602adf40
YS
1787 blk_queue_merge_bvec(q, rbd_merge_bvec);
1788 disk->queue = q;
1789
1790 q->queuedata = rbd_dev;
1791
1792 rbd_dev->disk = disk;
1793 rbd_dev->q = q;
1794
1795 /* finally, announce the disk to the world */
1796 set_capacity(disk, total_size / 512ULL);
1797 add_disk(disk);
1798
1799 pr_info("%s: added with size 0x%llx\n",
1800 disk->disk_name, (unsigned long long)total_size);
1801 return 0;
1802
1803out_disk:
1804 put_disk(disk);
1805out:
1806 return rc;
1807}
1808
dfc5606d
YS
1809/*
1810 sysfs
1811*/
1812
1813static ssize_t rbd_size_show(struct device *dev,
1814 struct device_attribute *attr, char *buf)
1815{
1816 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1817
1818 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1819}
1820
1821static ssize_t rbd_major_show(struct device *dev,
1822 struct device_attribute *attr, char *buf)
1823{
1824 struct rbd_device *rbd_dev = dev_to_rbd(dev);
602adf40 1825
dfc5606d
YS
1826 return sprintf(buf, "%d\n", rbd_dev->major);
1827}
1828
1829static ssize_t rbd_client_id_show(struct device *dev,
1830 struct device_attribute *attr, char *buf)
602adf40 1831{
dfc5606d
YS
1832 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1833
1dbb4399
AE
1834 return sprintf(buf, "client%lld\n",
1835 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1836}
1837
dfc5606d
YS
1838static ssize_t rbd_pool_show(struct device *dev,
1839 struct device_attribute *attr, char *buf)
602adf40 1840{
dfc5606d
YS
1841 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1842
1843 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1844}
1845
1846static ssize_t rbd_name_show(struct device *dev,
1847 struct device_attribute *attr, char *buf)
1848{
1849 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1850
1851 return sprintf(buf, "%s\n", rbd_dev->obj);
1852}
1853
1854static ssize_t rbd_snap_show(struct device *dev,
1855 struct device_attribute *attr,
1856 char *buf)
1857{
1858 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1859
1860 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1861}
1862
1863static ssize_t rbd_image_refresh(struct device *dev,
1864 struct device_attribute *attr,
1865 const char *buf,
1866 size_t size)
1867{
1868 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1869 int rc;
1870 int ret = size;
602adf40
YS
1871
1872 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1873
dfc5606d
YS
1874 rc = __rbd_update_snaps(rbd_dev);
1875 if (rc < 0)
1876 ret = rc;
602adf40 1877
dfc5606d
YS
1878 mutex_unlock(&ctl_mutex);
1879 return ret;
1880}
602adf40 1881
dfc5606d
YS
1882static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1883static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1884static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1885static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1886static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1887static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1888static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1889static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1890
1891static struct attribute *rbd_attrs[] = {
1892 &dev_attr_size.attr,
1893 &dev_attr_major.attr,
1894 &dev_attr_client_id.attr,
1895 &dev_attr_pool.attr,
1896 &dev_attr_name.attr,
1897 &dev_attr_current_snap.attr,
1898 &dev_attr_refresh.attr,
1899 &dev_attr_create_snap.attr,
dfc5606d
YS
1900 NULL
1901};
1902
1903static struct attribute_group rbd_attr_group = {
1904 .attrs = rbd_attrs,
1905};
1906
1907static const struct attribute_group *rbd_attr_groups[] = {
1908 &rbd_attr_group,
1909 NULL
1910};
1911
1912static void rbd_sysfs_dev_release(struct device *dev)
1913{
1914}
1915
1916static struct device_type rbd_device_type = {
1917 .name = "rbd",
1918 .groups = rbd_attr_groups,
1919 .release = rbd_sysfs_dev_release,
1920};
1921
1922
1923/*
1924 sysfs - snapshots
1925*/
1926
1927static ssize_t rbd_snap_size_show(struct device *dev,
1928 struct device_attribute *attr,
1929 char *buf)
1930{
1931 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1932
1933 return sprintf(buf, "%lld\n", (long long)snap->size);
1934}
1935
1936static ssize_t rbd_snap_id_show(struct device *dev,
1937 struct device_attribute *attr,
1938 char *buf)
1939{
1940 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1941
1942 return sprintf(buf, "%lld\n", (long long)snap->id);
1943}
1944
1945static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1946static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1947
1948static struct attribute *rbd_snap_attrs[] = {
1949 &dev_attr_snap_size.attr,
1950 &dev_attr_snap_id.attr,
1951 NULL,
1952};
1953
1954static struct attribute_group rbd_snap_attr_group = {
1955 .attrs = rbd_snap_attrs,
1956};
1957
1958static void rbd_snap_dev_release(struct device *dev)
1959{
1960 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1961 kfree(snap->name);
1962 kfree(snap);
1963}
1964
1965static const struct attribute_group *rbd_snap_attr_groups[] = {
1966 &rbd_snap_attr_group,
1967 NULL
1968};
1969
1970static struct device_type rbd_snap_device_type = {
1971 .groups = rbd_snap_attr_groups,
1972 .release = rbd_snap_dev_release,
1973};
1974
1975static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1976 struct rbd_snap *snap)
1977{
1978 list_del(&snap->node);
1979 device_unregister(&snap->dev);
1980}
1981
1982static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1983 struct rbd_snap *snap,
1984 struct device *parent)
1985{
1986 struct device *dev = &snap->dev;
1987 int ret;
1988
1989 dev->type = &rbd_snap_device_type;
1990 dev->parent = parent;
1991 dev->release = rbd_snap_dev_release;
1992 dev_set_name(dev, "snap_%s", snap->name);
1993 ret = device_register(dev);
1994
1995 return ret;
1996}
1997
1998static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1999 int i, const char *name,
2000 struct rbd_snap **snapp)
2001{
2002 int ret;
2003 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2004 if (!snap)
2005 return -ENOMEM;
2006 snap->name = kstrdup(name, GFP_KERNEL);
2007 snap->size = rbd_dev->header.snap_sizes[i];
2008 snap->id = rbd_dev->header.snapc->snaps[i];
2009 if (device_is_registered(&rbd_dev->dev)) {
2010 ret = rbd_register_snap_dev(rbd_dev, snap,
2011 &rbd_dev->dev);
2012 if (ret < 0)
2013 goto err;
2014 }
2015 *snapp = snap;
2016 return 0;
2017err:
2018 kfree(snap->name);
2019 kfree(snap);
2020 return ret;
2021}
2022
2023/*
2024 * search for the previous snap in a null delimited string list
2025 */
2026const char *rbd_prev_snap_name(const char *name, const char *start)
2027{
2028 if (name < start + 2)
2029 return NULL;
2030
2031 name -= 2;
2032 while (*name) {
2033 if (name == start)
2034 return start;
2035 name--;
2036 }
2037 return name + 1;
2038}
2039
2040/*
2041 * compare the old list of snapshots that we have to what's in the header
2042 * and update it accordingly. Note that the header holds the snapshots
2043 * in a reverse order (from newest to oldest) and we need to go from
2044 * older to new so that we don't get a duplicate snap name when
2045 * doing the process (e.g., removed snapshot and recreated a new
2046 * one with the same name.
2047 */
2048static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2049{
2050 const char *name, *first_name;
2051 int i = rbd_dev->header.total_snaps;
2052 struct rbd_snap *snap, *old_snap = NULL;
2053 int ret;
2054 struct list_head *p, *n;
2055
2056 first_name = rbd_dev->header.snap_names;
2057 name = first_name + rbd_dev->header.snap_names_len;
2058
2059 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2060 u64 cur_id;
2061
2062 old_snap = list_entry(p, struct rbd_snap, node);
2063
2064 if (i)
2065 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2066
2067 if (!i || old_snap->id < cur_id) {
2068 /* old_snap->id was skipped, thus was removed */
2069 __rbd_remove_snap_dev(rbd_dev, old_snap);
2070 continue;
2071 }
2072 if (old_snap->id == cur_id) {
2073 /* we have this snapshot already */
2074 i--;
2075 name = rbd_prev_snap_name(name, first_name);
2076 continue;
2077 }
2078 for (; i > 0;
2079 i--, name = rbd_prev_snap_name(name, first_name)) {
2080 if (!name) {
2081 WARN_ON(1);
2082 return -EINVAL;
2083 }
2084 cur_id = rbd_dev->header.snapc->snaps[i];
2085 /* snapshot removal? handle it above */
2086 if (cur_id >= old_snap->id)
2087 break;
2088 /* a new snapshot */
2089 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2090 if (ret < 0)
2091 return ret;
2092
2093 /* note that we add it backward so using n and not p */
2094 list_add(&snap->node, n);
2095 p = &snap->node;
2096 }
2097 }
2098 /* we're done going over the old snap list, just add what's left */
2099 for (; i > 0; i--) {
2100 name = rbd_prev_snap_name(name, first_name);
2101 if (!name) {
2102 WARN_ON(1);
2103 return -EINVAL;
2104 }
2105 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2106 if (ret < 0)
2107 return ret;
2108 list_add(&snap->node, &rbd_dev->snaps);
2109 }
2110
2111 return 0;
2112}
2113
dfc5606d
YS
2114static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2115{
f0f8cef5 2116 int ret;
dfc5606d
YS
2117 struct device *dev;
2118 struct rbd_snap *snap;
2119
2120 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2121 dev = &rbd_dev->dev;
2122
2123 dev->bus = &rbd_bus_type;
2124 dev->type = &rbd_device_type;
2125 dev->parent = &rbd_root_dev;
2126 dev->release = rbd_dev_release;
2127 dev_set_name(dev, "%d", rbd_dev->id);
2128 ret = device_register(dev);
2129 if (ret < 0)
f0f8cef5 2130 goto out;
dfc5606d
YS
2131
2132 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2133 ret = rbd_register_snap_dev(rbd_dev, snap,
2134 &rbd_dev->dev);
2135 if (ret < 0)
602adf40
YS
2136 break;
2137 }
f0f8cef5 2138out:
dfc5606d
YS
2139 mutex_unlock(&ctl_mutex);
2140 return ret;
602adf40
YS
2141}
2142
dfc5606d
YS
2143static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2144{
2145 device_unregister(&rbd_dev->dev);
2146}
2147
59c2be1e
YS
2148static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2149{
2150 int ret, rc;
2151
2152 do {
2153 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2154 rbd_dev->header.obj_version);
2155 if (ret == -ERANGE) {
2156 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2157 rc = __rbd_update_snaps(rbd_dev);
2158 mutex_unlock(&ctl_mutex);
2159 if (rc < 0)
2160 return rc;
2161 }
2162 } while (ret == -ERANGE);
2163
2164 return ret;
2165}
2166
1ddbe94e
AE
2167static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2168
2169/*
499afd5b
AE
2170 * Get a unique rbd identifier for the given new rbd_dev, and add
2171 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2172 */
499afd5b 2173static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2174{
499afd5b
AE
2175 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2176
2177 spin_lock(&rbd_dev_list_lock);
2178 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2179 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2180}
b7f23c36 2181
1ddbe94e 2182/*
499afd5b
AE
2183 * Remove an rbd_dev from the global list, and record that its
2184 * identifier is no longer in use.
1ddbe94e 2185 */
499afd5b 2186static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2187{
d184f6bf
AE
2188 struct list_head *tmp;
2189 int rbd_id = rbd_dev->id;
2190 int max_id;
2191
2192 BUG_ON(rbd_id < 1);
499afd5b
AE
2193
2194 spin_lock(&rbd_dev_list_lock);
2195 list_del_init(&rbd_dev->node);
d184f6bf
AE
2196
2197 /*
2198 * If the id being "put" is not the current maximum, there
2199 * is nothing special we need to do.
2200 */
2201 if (rbd_id != atomic64_read(&rbd_id_max)) {
2202 spin_unlock(&rbd_dev_list_lock);
2203 return;
2204 }
2205
2206 /*
2207 * We need to update the current maximum id. Search the
2208 * list to find out what it is. We're more likely to find
2209 * the maximum at the end, so search the list backward.
2210 */
2211 max_id = 0;
2212 list_for_each_prev(tmp, &rbd_dev_list) {
2213 struct rbd_device *rbd_dev;
2214
2215 rbd_dev = list_entry(tmp, struct rbd_device, node);
2216 if (rbd_id > max_id)
2217 max_id = rbd_id;
2218 }
499afd5b 2219 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2220
1ddbe94e 2221 /*
d184f6bf
AE
2222 * The max id could have been updated by rbd_id_get(), in
2223 * which case it now accurately reflects the new maximum.
2224 * Be careful not to overwrite the maximum value in that
2225 * case.
1ddbe94e 2226 */
d184f6bf 2227 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2228}
2229
e28fff26
AE
2230/*
2231 * Skips over white space at *buf, and updates *buf to point to the
2232 * first found non-space character (if any). Returns the length of
2233 * the token (string of non-white space characters) found.
2234 */
2235static inline size_t next_token(const char **buf)
2236{
2237 /*
2238 * These are the characters that produce nonzero for
2239 * isspace() in the "C" and "POSIX" locales.
2240 */
2241 const char *spaces = " \f\n\r\t\v";
2242
2243 *buf += strspn(*buf, spaces); /* Find start of token */
2244
2245 return strcspn(*buf, spaces); /* Return token length */
2246}
2247
2248/*
2249 * Finds the next token in *buf, and if the provided token buffer is
2250 * big enough, copies the found token into it. The result, if
2251 * copied, is guaranteed to be terminated with '\0'.
2252 *
2253 * Returns the length of the token found (not including the '\0').
2254 * Return value will be 0 if no token is found, and it will be >=
2255 * token_size if the token would not fit.
2256 *
2257 * The *buf pointer will be updated point beyond the end of the
2258 * found token. Note that this occurs even if the token buffer is
2259 * too small to hold it.
2260 */
2261static inline size_t copy_token(const char **buf,
2262 char *token,
2263 size_t token_size)
2264{
2265 size_t len;
2266
2267 len = next_token(buf);
2268 if (len < token_size) {
2269 memcpy(token, *buf, len);
2270 *(token + len) = '\0';
2271 }
2272 *buf += len;
2273
2274 return len;
2275}
2276
a725f65e
AE
2277/*
2278 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2279 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2280 * on the list of monitor addresses and other options provided via
2281 * /sys/bus/rbd/add.
2282 */
2283static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2284 const char *buf,
2285 char *mon_addrs,
e28fff26
AE
2286 size_t mon_addrs_size,
2287 char *options,
2288 size_t options_size)
2289{
2290 size_t len;
2291
2292 /* The first four tokens are required */
2293
2294 len = copy_token(&buf, mon_addrs, mon_addrs_size);
2295 if (!len || len >= mon_addrs_size)
a725f65e
AE
2296 return -EINVAL;
2297
e28fff26
AE
2298 len = copy_token(&buf, options, options_size);
2299 if (!len || len >= options_size)
2300 return -EINVAL;
2301
2302 len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2303 if (!len || len >= sizeof (rbd_dev->pool_name))
2304 return -EINVAL;
2305
2306 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2307 if (!len || len >= sizeof (rbd_dev->obj))
2308 return -EINVAL;
2309
2310 /* We have the object length in hand, save it. */
2311
2312 rbd_dev->obj_len = len;
a725f65e 2313
81a89793
AE
2314 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2315 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2316 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
a725f65e 2317
e28fff26
AE
2318 /*
2319 * The snapshot name is optional, but it's an error if it's
2320 * too long. If no snapshot is supplied, fill in the default.
2321 */
2322 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2323 if (!len)
2324 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2325 sizeof (RBD_SNAP_HEAD_NAME));
2326 else if (len >= sizeof (rbd_dev->snap_name))
2327 return -EINVAL;
2328
a725f65e
AE
2329 return 0;
2330}
2331
59c2be1e
YS
2332static ssize_t rbd_add(struct bus_type *bus,
2333 const char *buf,
2334 size_t count)
602adf40 2335{
602adf40 2336 struct rbd_device *rbd_dev;
a725f65e 2337 char *mon_addrs = NULL;
27cc2594
AE
2338 char *options = NULL;
2339 struct ceph_osd_client *osdc;
2340 int rc = -ENOMEM;
602adf40
YS
2341
2342 if (!try_module_get(THIS_MODULE))
2343 return -ENODEV;
2344
27cc2594
AE
2345 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2346 if (!rbd_dev)
2347 goto err_nomem;
a725f65e
AE
2348 mon_addrs = kmalloc(count, GFP_KERNEL);
2349 if (!mon_addrs)
27cc2594 2350 goto err_nomem;
60571c7d 2351 options = kmalloc(count, GFP_KERNEL);
602adf40 2352 if (!options)
27cc2594 2353 goto err_nomem;
602adf40
YS
2354
2355 /* static rbd_device initialization */
2356 spin_lock_init(&rbd_dev->lock);
2357 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2358 INIT_LIST_HEAD(&rbd_dev->snaps);
602adf40 2359
0e805a1d
AE
2360 init_rwsem(&rbd_dev->header.snap_rwsem);
2361
d184f6bf 2362 /* generate unique id: find highest unique id, add one */
499afd5b 2363 rbd_id_get(rbd_dev);
602adf40 2364
a725f65e 2365 /* Fill in the device name, now that we have its id. */
81a89793
AE
2366 BUILD_BUG_ON(DEV_NAME_LEN
2367 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2368 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
a725f65e 2369
602adf40 2370 /* parse add command */
e28fff26
AE
2371 rc = rbd_add_parse_args(rbd_dev, buf, mon_addrs, count,
2372 options, count);
a725f65e 2373 if (rc)
f0f8cef5 2374 goto err_put_id;
e124a82f 2375
a725f65e 2376 rbd_dev->rbd_client = rbd_get_client(mon_addrs, options);
d720bcb0
AE
2377 if (IS_ERR(rbd_dev->rbd_client)) {
2378 rc = PTR_ERR(rbd_dev->rbd_client);
f0f8cef5 2379 goto err_put_id;
d720bcb0 2380 }
602adf40 2381
602adf40 2382 /* pick the pool */
1dbb4399 2383 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2384 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2385 if (rc < 0)
2386 goto err_out_client;
2387 rbd_dev->poolid = rc;
2388
2389 /* register our block device */
27cc2594
AE
2390 rc = register_blkdev(0, rbd_dev->name);
2391 if (rc < 0)
602adf40 2392 goto err_out_client;
27cc2594 2393 rbd_dev->major = rc;
602adf40 2394
dfc5606d
YS
2395 rc = rbd_bus_add_dev(rbd_dev);
2396 if (rc)
766fc439
YS
2397 goto err_out_blkdev;
2398
602adf40
YS
2399 /* set up and announce blkdev mapping */
2400 rc = rbd_init_disk(rbd_dev);
2401 if (rc)
766fc439 2402 goto err_out_bus;
602adf40 2403
59c2be1e
YS
2404 rc = rbd_init_watch_dev(rbd_dev);
2405 if (rc)
2406 goto err_out_bus;
2407
602adf40
YS
2408 return count;
2409
766fc439 2410err_out_bus:
499afd5b 2411 rbd_id_put(rbd_dev);
766fc439
YS
2412
2413 /* this will also clean up rest of rbd_dev stuff */
2414
2415 rbd_bus_del_dev(rbd_dev);
2416 kfree(options);
a725f65e 2417 kfree(mon_addrs);
766fc439
YS
2418 return rc;
2419
602adf40
YS
2420err_out_blkdev:
2421 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2422err_out_client:
2423 rbd_put_client(rbd_dev);
f0f8cef5 2424err_put_id:
499afd5b 2425 rbd_id_put(rbd_dev);
27cc2594 2426err_nomem:
602adf40 2427 kfree(options);
a725f65e 2428 kfree(mon_addrs);
27cc2594
AE
2429 kfree(rbd_dev);
2430
602adf40
YS
2431 dout("Error adding device %s\n", buf);
2432 module_put(THIS_MODULE);
27cc2594
AE
2433
2434 return (ssize_t) rc;
602adf40
YS
2435}
2436
2437static struct rbd_device *__rbd_get_dev(unsigned long id)
2438{
2439 struct list_head *tmp;
2440 struct rbd_device *rbd_dev;
2441
e124a82f 2442 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2443 list_for_each(tmp, &rbd_dev_list) {
2444 rbd_dev = list_entry(tmp, struct rbd_device, node);
e124a82f
AE
2445 if (rbd_dev->id == id) {
2446 spin_unlock(&rbd_dev_list_lock);
602adf40 2447 return rbd_dev;
e124a82f 2448 }
602adf40 2449 }
e124a82f 2450 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2451 return NULL;
2452}
2453
dfc5606d 2454static void rbd_dev_release(struct device *dev)
602adf40 2455{
dfc5606d
YS
2456 struct rbd_device *rbd_dev =
2457 container_of(dev, struct rbd_device, dev);
602adf40 2458
1dbb4399
AE
2459 if (rbd_dev->watch_request) {
2460 struct ceph_client *client = rbd_dev->rbd_client->client;
2461
2462 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2463 rbd_dev->watch_request);
1dbb4399 2464 }
59c2be1e 2465 if (rbd_dev->watch_event)
79e3057c 2466 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
59c2be1e 2467
602adf40
YS
2468 rbd_put_client(rbd_dev);
2469
2470 /* clean up and free blkdev */
2471 rbd_free_disk(rbd_dev);
2472 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2473 kfree(rbd_dev);
2474
2475 /* release module ref */
2476 module_put(THIS_MODULE);
602adf40
YS
2477}
2478
dfc5606d
YS
2479static ssize_t rbd_remove(struct bus_type *bus,
2480 const char *buf,
2481 size_t count)
602adf40
YS
2482{
2483 struct rbd_device *rbd_dev = NULL;
2484 int target_id, rc;
2485 unsigned long ul;
2486 int ret = count;
2487
2488 rc = strict_strtoul(buf, 10, &ul);
2489 if (rc)
2490 return rc;
2491
2492 /* convert to int; abort if we lost anything in the conversion */
2493 target_id = (int) ul;
2494 if (target_id != ul)
2495 return -EINVAL;
2496
2497 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2498
2499 rbd_dev = __rbd_get_dev(target_id);
2500 if (!rbd_dev) {
2501 ret = -ENOENT;
2502 goto done;
2503 }
2504
499afd5b 2505 rbd_id_put(rbd_dev);
dfc5606d
YS
2506
2507 __rbd_remove_all_snaps(rbd_dev);
2508 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2509
2510done:
2511 mutex_unlock(&ctl_mutex);
2512 return ret;
2513}
2514
dfc5606d
YS
2515static ssize_t rbd_snap_add(struct device *dev,
2516 struct device_attribute *attr,
2517 const char *buf,
2518 size_t count)
602adf40 2519{
dfc5606d
YS
2520 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2521 int ret;
2522 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2523 if (!name)
2524 return -ENOMEM;
2525
dfc5606d 2526 snprintf(name, count, "%s", buf);
602adf40
YS
2527
2528 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2529
602adf40
YS
2530 ret = rbd_header_add_snap(rbd_dev,
2531 name, GFP_KERNEL);
2532 if (ret < 0)
59c2be1e 2533 goto err_unlock;
602adf40 2534
dfc5606d 2535 ret = __rbd_update_snaps(rbd_dev);
602adf40 2536 if (ret < 0)
59c2be1e
YS
2537 goto err_unlock;
2538
2539 /* shouldn't hold ctl_mutex when notifying.. notify might
2540 trigger a watch callback that would need to get that mutex */
2541 mutex_unlock(&ctl_mutex);
2542
2543 /* make a best effort, don't error if failed */
2544 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
602adf40
YS
2545
2546 ret = count;
59c2be1e
YS
2547 kfree(name);
2548 return ret;
2549
2550err_unlock:
602adf40 2551 mutex_unlock(&ctl_mutex);
602adf40
YS
2552 kfree(name);
2553 return ret;
2554}
2555
602adf40
YS
2556/*
2557 * create control files in sysfs
dfc5606d 2558 * /sys/bus/rbd/...
602adf40
YS
2559 */
2560static int rbd_sysfs_init(void)
2561{
dfc5606d 2562 int ret;
602adf40 2563
dfc5606d 2564 ret = bus_register(&rbd_bus_type);
21079786 2565 if (ret < 0)
dfc5606d 2566 return ret;
602adf40 2567
dfc5606d 2568 ret = device_register(&rbd_root_dev);
602adf40 2569
602adf40
YS
2570 return ret;
2571}
2572
2573static void rbd_sysfs_cleanup(void)
2574{
dfc5606d
YS
2575 device_unregister(&rbd_root_dev);
2576 bus_unregister(&rbd_bus_type);
602adf40
YS
2577}
2578
2579int __init rbd_init(void)
2580{
2581 int rc;
2582
2583 rc = rbd_sysfs_init();
2584 if (rc)
2585 return rc;
f0f8cef5 2586 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2587 return 0;
2588}
2589
2590void __exit rbd_exit(void)
2591{
2592 rbd_sysfs_cleanup();
2593}
2594
2595module_init(rbd_init);
2596module_exit(rbd_exit);
2597
2598MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2599MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2600MODULE_DESCRIPTION("rados block device");
2601
2602/* following authorship retained from original osdblk.c */
2603MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2604
2605MODULE_LICENSE("GPL");