]> git.ipfire.org Git - people/arne_f/kernel.git/blame - drivers/block/rbd.c
rbd: rename "node_lock"
[people/arne_f/kernel.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44#define DRV_NAME "rbd"
45#define DRV_NAME_LONG "rbd (rados block device)"
46
47#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48
21079786 49#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
602adf40
YS
50#define RBD_MAX_POOL_NAME_LEN 64
51#define RBD_MAX_SNAP_NAME_LEN 32
52#define RBD_MAX_OPT_LEN 1024
53
54#define RBD_SNAP_HEAD_NAME "-"
55
56#define DEV_NAME_LEN 32
57
59c2be1e
YS
58#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
602adf40
YS
60/*
61 * block device image metadata (in-memory version)
62 */
63struct rbd_image_header {
64 u64 image_size;
65 char block_name[32];
66 __u8 obj_order;
67 __u8 crypt_type;
68 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
72 u64 snap_seq;
73 u32 total_snaps;
74
75 char *snap_names;
76 u64 *snap_sizes;
59c2be1e
YS
77
78 u64 obj_version;
79};
80
81struct rbd_options {
82 int notify_timeout;
602adf40
YS
83};
84
85/*
86 * an instance of the client. multiple devices may share a client.
87 */
88struct rbd_client {
89 struct ceph_client *client;
59c2be1e 90 struct rbd_options *rbd_opts;
602adf40
YS
91 struct kref kref;
92 struct list_head node;
93};
94
1fec7093
YS
95struct rbd_req_coll;
96
602adf40
YS
97/*
98 * a single io request
99 */
100struct rbd_request {
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
104 u64 len;
1fec7093
YS
105 int coll_index;
106 struct rbd_req_coll *coll;
107};
108
109struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
113};
114
115/*
116 * a collection of requests
117 */
118struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
602adf40
YS
123};
124
dfc5606d
YS
125struct rbd_snap {
126 struct device dev;
127 const char *name;
128 size_t size;
129 struct list_head node;
130 u64 id;
131};
132
602adf40
YS
133/*
134 * a single device
135 */
136struct rbd_device {
137 int id; /* blkdev unique id */
138
139 int major; /* blkdev assigned major */
140 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q;
142
602adf40
YS
143 struct rbd_client *rbd_client;
144
145 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
146
147 spinlock_t lock; /* queue lock */
148
149 struct rbd_image_header header;
150 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
151 int obj_len;
152 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
153 char pool_name[RBD_MAX_POOL_NAME_LEN];
154 int poolid;
155
59c2be1e
YS
156 struct ceph_osd_event *watch_event;
157 struct ceph_osd_request *watch_request;
158
602adf40
YS
159 char snap_name[RBD_MAX_SNAP_NAME_LEN];
160 u32 cur_snap; /* index+1 of current snapshot within snap context
161 0 - for the head */
162 int read_only;
163
164 struct list_head node;
dfc5606d
YS
165
166 /* list of snapshots */
167 struct list_head snaps;
168
169 /* sysfs related */
170 struct device dev;
171};
172
173static struct bus_type rbd_bus_type = {
174 .name = "rbd",
602adf40
YS
175};
176
602adf40 177static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 178
602adf40 179static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
180static DEFINE_SPINLOCK(rbd_dev_list_lock);
181
432b8587
AE
182static LIST_HEAD(rbd_client_list); /* clients */
183static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 184
dfc5606d
YS
185static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
186static void rbd_dev_release(struct device *dev);
dfc5606d
YS
187static ssize_t rbd_snap_add(struct device *dev,
188 struct device_attribute *attr,
189 const char *buf,
190 size_t count);
191static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 192 struct rbd_snap *snap);
dfc5606d
YS
193
194
195static struct rbd_device *dev_to_rbd(struct device *dev)
196{
197 return container_of(dev, struct rbd_device, dev);
198}
199
200static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
201{
202 return get_device(&rbd_dev->dev);
203}
204
205static void rbd_put_dev(struct rbd_device *rbd_dev)
206{
207 put_device(&rbd_dev->dev);
208}
602adf40 209
59c2be1e
YS
210static int __rbd_update_snaps(struct rbd_device *rbd_dev);
211
602adf40
YS
212static int rbd_open(struct block_device *bdev, fmode_t mode)
213{
214 struct gendisk *disk = bdev->bd_disk;
215 struct rbd_device *rbd_dev = disk->private_data;
216
dfc5606d
YS
217 rbd_get_dev(rbd_dev);
218
602adf40
YS
219 set_device_ro(bdev, rbd_dev->read_only);
220
221 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
222 return -EROFS;
223
224 return 0;
225}
226
dfc5606d
YS
227static int rbd_release(struct gendisk *disk, fmode_t mode)
228{
229 struct rbd_device *rbd_dev = disk->private_data;
230
231 rbd_put_dev(rbd_dev);
232
233 return 0;
234}
235
602adf40
YS
236static const struct block_device_operations rbd_bd_ops = {
237 .owner = THIS_MODULE,
238 .open = rbd_open,
dfc5606d 239 .release = rbd_release,
602adf40
YS
240};
241
242/*
243 * Initialize an rbd client instance.
244 * We own *opt.
245 */
59c2be1e
YS
246static struct rbd_client *rbd_client_create(struct ceph_options *opt,
247 struct rbd_options *rbd_opts)
602adf40
YS
248{
249 struct rbd_client *rbdc;
250 int ret = -ENOMEM;
251
252 dout("rbd_client_create\n");
253 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
254 if (!rbdc)
255 goto out_opt;
256
257 kref_init(&rbdc->kref);
258 INIT_LIST_HEAD(&rbdc->node);
259
bc534d86
AE
260 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
261
6ab00d46 262 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
602adf40 263 if (IS_ERR(rbdc->client))
bc534d86 264 goto out_mutex;
28f259b7 265 opt = NULL; /* Now rbdc->client is responsible for opt */
602adf40
YS
266
267 ret = ceph_open_session(rbdc->client);
268 if (ret < 0)
269 goto out_err;
270
59c2be1e
YS
271 rbdc->rbd_opts = rbd_opts;
272
432b8587 273 spin_lock(&rbd_client_list_lock);
602adf40 274 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 275 spin_unlock(&rbd_client_list_lock);
602adf40 276
bc534d86
AE
277 mutex_unlock(&ctl_mutex);
278
602adf40
YS
279 dout("rbd_client_create created %p\n", rbdc);
280 return rbdc;
281
282out_err:
283 ceph_destroy_client(rbdc->client);
bc534d86
AE
284out_mutex:
285 mutex_unlock(&ctl_mutex);
602adf40
YS
286 kfree(rbdc);
287out_opt:
28f259b7
VK
288 if (opt)
289 ceph_destroy_options(opt);
290 return ERR_PTR(ret);
602adf40
YS
291}
292
293/*
294 * Find a ceph client with specific addr and configuration.
295 */
296static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
297{
298 struct rbd_client *client_node;
299
300 if (opt->flags & CEPH_OPT_NOSHARE)
301 return NULL;
302
303 list_for_each_entry(client_node, &rbd_client_list, node)
304 if (ceph_compare_options(opt, client_node->client) == 0)
305 return client_node;
306 return NULL;
307}
308
59c2be1e
YS
309/*
310 * mount options
311 */
312enum {
313 Opt_notify_timeout,
314 Opt_last_int,
315 /* int args above */
316 Opt_last_string,
317 /* string args above */
318};
319
320static match_table_t rbdopt_tokens = {
321 {Opt_notify_timeout, "notify_timeout=%d"},
322 /* int args above */
323 /* string args above */
324 {-1, NULL}
325};
326
327static int parse_rbd_opts_token(char *c, void *private)
328{
329 struct rbd_options *rbdopt = private;
330 substring_t argstr[MAX_OPT_ARGS];
331 int token, intval, ret;
332
21079786 333 token = match_token(c, rbdopt_tokens, argstr);
59c2be1e
YS
334 if (token < 0)
335 return -EINVAL;
336
337 if (token < Opt_last_int) {
338 ret = match_int(&argstr[0], &intval);
339 if (ret < 0) {
340 pr_err("bad mount option arg (not int) "
341 "at '%s'\n", c);
342 return ret;
343 }
344 dout("got int token %d val %d\n", token, intval);
345 } else if (token > Opt_last_int && token < Opt_last_string) {
346 dout("got string token %d val %s\n", token,
347 argstr[0].from);
348 } else {
349 dout("got token %d\n", token);
350 }
351
352 switch (token) {
353 case Opt_notify_timeout:
354 rbdopt->notify_timeout = intval;
355 break;
356 default:
357 BUG_ON(token);
358 }
359 return 0;
360}
361
602adf40
YS
362/*
363 * Get a ceph client with specific addr and configuration, if one does
364 * not exist create it.
365 */
366static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
367 char *options)
368{
369 struct rbd_client *rbdc;
370 struct ceph_options *opt;
371 int ret;
59c2be1e
YS
372 struct rbd_options *rbd_opts;
373
374 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
375 if (!rbd_opts)
376 return -ENOMEM;
377
378 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 379
ee57741c 380 opt = ceph_parse_options(options, mon_addr,
21079786
AE
381 mon_addr + strlen(mon_addr),
382 parse_rbd_opts_token, rbd_opts);
ee57741c
AE
383 if (IS_ERR(opt)) {
384 ret = PTR_ERR(opt);
59c2be1e 385 goto done_err;
ee57741c 386 }
602adf40 387
432b8587 388 spin_lock(&rbd_client_list_lock);
602adf40
YS
389 rbdc = __rbd_client_find(opt);
390 if (rbdc) {
602adf40
YS
391 /* using an existing client */
392 kref_get(&rbdc->kref);
432b8587 393 spin_unlock(&rbd_client_list_lock);
e6994d3d
AE
394
395 rbd_dev->rbd_client = rbdc;
396
397 ceph_destroy_options(opt);
398 kfree(rbd_opts);
399
602adf40
YS
400 return 0;
401 }
432b8587 402 spin_unlock(&rbd_client_list_lock);
602adf40 403
59c2be1e 404 rbdc = rbd_client_create(opt, rbd_opts);
d97081b0 405
59c2be1e
YS
406 if (IS_ERR(rbdc)) {
407 ret = PTR_ERR(rbdc);
408 goto done_err;
409 }
602adf40
YS
410
411 rbd_dev->rbd_client = rbdc;
602adf40 412 return 0;
59c2be1e
YS
413done_err:
414 kfree(rbd_opts);
415 return ret;
602adf40
YS
416}
417
418/*
419 * Destroy ceph client
d23a4b3f 420 *
432b8587 421 * Caller must hold rbd_client_list_lock.
602adf40
YS
422 */
423static void rbd_client_release(struct kref *kref)
424{
425 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
426
427 dout("rbd_release_client %p\n", rbdc);
602adf40 428 list_del(&rbdc->node);
602adf40
YS
429
430 ceph_destroy_client(rbdc->client);
59c2be1e 431 kfree(rbdc->rbd_opts);
602adf40
YS
432 kfree(rbdc);
433}
434
435/*
436 * Drop reference to ceph client node. If it's not referenced anymore, release
437 * it.
438 */
439static void rbd_put_client(struct rbd_device *rbd_dev)
440{
432b8587 441 spin_lock(&rbd_client_list_lock);
602adf40 442 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
432b8587 443 spin_unlock(&rbd_client_list_lock);
602adf40 444 rbd_dev->rbd_client = NULL;
602adf40
YS
445}
446
1fec7093
YS
447/*
448 * Destroy requests collection
449 */
450static void rbd_coll_release(struct kref *kref)
451{
452 struct rbd_req_coll *coll =
453 container_of(kref, struct rbd_req_coll, kref);
454
455 dout("rbd_coll_release %p\n", coll);
456 kfree(coll);
457}
602adf40
YS
458
459/*
460 * Create a new header structure, translate header format from the on-disk
461 * header.
462 */
463static int rbd_header_from_disk(struct rbd_image_header *header,
464 struct rbd_image_header_ondisk *ondisk,
465 int allocated_snaps,
466 gfp_t gfp_flags)
467{
468 int i;
469 u32 snap_count = le32_to_cpu(ondisk->snap_count);
470 int ret = -ENOMEM;
471
21079786 472 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 473 return -ENXIO;
81e759fb 474
602adf40 475 init_rwsem(&header->snap_rwsem);
602adf40
YS
476 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
477 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
21079786 478 snap_count * sizeof (*ondisk),
602adf40
YS
479 gfp_flags);
480 if (!header->snapc)
481 return -ENOMEM;
482 if (snap_count) {
483 header->snap_names = kmalloc(header->snap_names_len,
484 GFP_KERNEL);
485 if (!header->snap_names)
486 goto err_snapc;
487 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
488 GFP_KERNEL);
489 if (!header->snap_sizes)
490 goto err_names;
491 } else {
492 header->snap_names = NULL;
493 header->snap_sizes = NULL;
494 }
495 memcpy(header->block_name, ondisk->block_name,
496 sizeof(ondisk->block_name));
497
498 header->image_size = le64_to_cpu(ondisk->image_size);
499 header->obj_order = ondisk->options.order;
500 header->crypt_type = ondisk->options.crypt_type;
501 header->comp_type = ondisk->options.comp_type;
502
503 atomic_set(&header->snapc->nref, 1);
504 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
505 header->snapc->num_snaps = snap_count;
506 header->total_snaps = snap_count;
507
21079786 508 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
509 for (i = 0; i < snap_count; i++) {
510 header->snapc->snaps[i] =
511 le64_to_cpu(ondisk->snaps[i].id);
512 header->snap_sizes[i] =
513 le64_to_cpu(ondisk->snaps[i].image_size);
514 }
515
516 /* copy snapshot names */
517 memcpy(header->snap_names, &ondisk->snaps[i],
518 header->snap_names_len);
519 }
520
521 return 0;
522
523err_names:
524 kfree(header->snap_names);
525err_snapc:
526 kfree(header->snapc);
527 return ret;
528}
529
530static int snap_index(struct rbd_image_header *header, int snap_num)
531{
532 return header->total_snaps - snap_num;
533}
534
535static u64 cur_snap_id(struct rbd_device *rbd_dev)
536{
537 struct rbd_image_header *header = &rbd_dev->header;
538
539 if (!rbd_dev->cur_snap)
540 return 0;
541
542 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
543}
544
545static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
546 u64 *seq, u64 *size)
547{
548 int i;
549 char *p = header->snap_names;
550
551 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
552 if (strcmp(snap_name, p) == 0)
553 break;
554 }
555 if (i == header->total_snaps)
556 return -ENOENT;
557 if (seq)
558 *seq = header->snapc->snaps[i];
559
560 if (size)
561 *size = header->snap_sizes[i];
562
563 return i;
564}
565
cc9d734c 566static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
602adf40
YS
567{
568 struct rbd_image_header *header = &dev->header;
569 struct ceph_snap_context *snapc = header->snapc;
570 int ret = -ENOENT;
571
cc9d734c
JD
572 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
573
602adf40
YS
574 down_write(&header->snap_rwsem);
575
cc9d734c
JD
576 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
577 sizeof (RBD_SNAP_HEAD_NAME))) {
602adf40
YS
578 if (header->total_snaps)
579 snapc->seq = header->snap_seq;
580 else
581 snapc->seq = 0;
582 dev->cur_snap = 0;
583 dev->read_only = 0;
584 if (size)
585 *size = header->image_size;
586 } else {
cc9d734c 587 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
602adf40
YS
588 if (ret < 0)
589 goto done;
590
591 dev->cur_snap = header->total_snaps - ret;
592 dev->read_only = 1;
593 }
594
595 ret = 0;
596done:
597 up_write(&header->snap_rwsem);
598 return ret;
599}
600
601static void rbd_header_free(struct rbd_image_header *header)
602{
603 kfree(header->snapc);
604 kfree(header->snap_names);
605 kfree(header->snap_sizes);
606}
607
608/*
609 * get the actual striped segment name, offset and length
610 */
611static u64 rbd_get_segment(struct rbd_image_header *header,
612 const char *block_name,
613 u64 ofs, u64 len,
614 char *seg_name, u64 *segofs)
615{
616 u64 seg = ofs >> header->obj_order;
617
618 if (seg_name)
619 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
620 "%s.%012llx", block_name, seg);
621
622 ofs = ofs & ((1 << header->obj_order) - 1);
623 len = min_t(u64, len, (1 << header->obj_order) - ofs);
624
625 if (segofs)
626 *segofs = ofs;
627
628 return len;
629}
630
1fec7093
YS
631static int rbd_get_num_segments(struct rbd_image_header *header,
632 u64 ofs, u64 len)
633{
634 u64 start_seg = ofs >> header->obj_order;
635 u64 end_seg = (ofs + len - 1) >> header->obj_order;
636 return end_seg - start_seg + 1;
637}
638
029bcbd8
JD
639/*
640 * returns the size of an object in the image
641 */
642static u64 rbd_obj_bytes(struct rbd_image_header *header)
643{
644 return 1 << header->obj_order;
645}
646
602adf40
YS
647/*
648 * bio helpers
649 */
650
651static void bio_chain_put(struct bio *chain)
652{
653 struct bio *tmp;
654
655 while (chain) {
656 tmp = chain;
657 chain = chain->bi_next;
658 bio_put(tmp);
659 }
660}
661
662/*
663 * zeros a bio chain, starting at specific offset
664 */
665static void zero_bio_chain(struct bio *chain, int start_ofs)
666{
667 struct bio_vec *bv;
668 unsigned long flags;
669 void *buf;
670 int i;
671 int pos = 0;
672
673 while (chain) {
674 bio_for_each_segment(bv, chain, i) {
675 if (pos + bv->bv_len > start_ofs) {
676 int remainder = max(start_ofs - pos, 0);
677 buf = bvec_kmap_irq(bv, &flags);
678 memset(buf + remainder, 0,
679 bv->bv_len - remainder);
85b5aaa6 680 bvec_kunmap_irq(buf, &flags);
602adf40
YS
681 }
682 pos += bv->bv_len;
683 }
684
685 chain = chain->bi_next;
686 }
687}
688
689/*
690 * bio_chain_clone - clone a chain of bios up to a certain length.
691 * might return a bio_pair that will need to be released.
692 */
693static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
694 struct bio_pair **bp,
695 int len, gfp_t gfpmask)
696{
697 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
698 int total = 0;
699
700 if (*bp) {
701 bio_pair_release(*bp);
702 *bp = NULL;
703 }
704
705 while (old_chain && (total < len)) {
706 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
707 if (!tmp)
708 goto err_out;
709
710 if (total + old_chain->bi_size > len) {
711 struct bio_pair *bp;
712
713 /*
714 * this split can only happen with a single paged bio,
715 * split_bio will BUG_ON if this is not the case
716 */
717 dout("bio_chain_clone split! total=%d remaining=%d"
718 "bi_size=%d\n",
719 (int)total, (int)len-total,
720 (int)old_chain->bi_size);
721
722 /* split the bio. We'll release it either in the next
723 call, or it will have to be released outside */
724 bp = bio_split(old_chain, (len - total) / 512ULL);
725 if (!bp)
726 goto err_out;
727
728 __bio_clone(tmp, &bp->bio1);
729
730 *next = &bp->bio2;
731 } else {
732 __bio_clone(tmp, old_chain);
733 *next = old_chain->bi_next;
734 }
735
736 tmp->bi_bdev = NULL;
737 gfpmask &= ~__GFP_WAIT;
738 tmp->bi_next = NULL;
739
740 if (!new_chain) {
741 new_chain = tail = tmp;
742 } else {
743 tail->bi_next = tmp;
744 tail = tmp;
745 }
746 old_chain = old_chain->bi_next;
747
748 total += tmp->bi_size;
749 }
750
751 BUG_ON(total < len);
752
753 if (tail)
754 tail->bi_next = NULL;
755
756 *old = old_chain;
757
758 return new_chain;
759
760err_out:
761 dout("bio_chain_clone with err\n");
762 bio_chain_put(new_chain);
763 return NULL;
764}
765
766/*
767 * helpers for osd request op vectors.
768 */
769static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
770 int num_ops,
771 int opcode,
772 u32 payload_len)
773{
774 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
775 GFP_NOIO);
776 if (!*ops)
777 return -ENOMEM;
778 (*ops)[0].op = opcode;
779 /*
780 * op extent offset and length will be set later on
781 * in calc_raw_layout()
782 */
783 (*ops)[0].payload_len = payload_len;
784 return 0;
785}
786
787static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
788{
789 kfree(ops);
790}
791
1fec7093
YS
792static void rbd_coll_end_req_index(struct request *rq,
793 struct rbd_req_coll *coll,
794 int index,
795 int ret, u64 len)
796{
797 struct request_queue *q;
798 int min, max, i;
799
800 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
801 coll, index, ret, len);
802
803 if (!rq)
804 return;
805
806 if (!coll) {
807 blk_end_request(rq, ret, len);
808 return;
809 }
810
811 q = rq->q;
812
813 spin_lock_irq(q->queue_lock);
814 coll->status[index].done = 1;
815 coll->status[index].rc = ret;
816 coll->status[index].bytes = len;
817 max = min = coll->num_done;
818 while (max < coll->total && coll->status[max].done)
819 max++;
820
821 for (i = min; i<max; i++) {
822 __blk_end_request(rq, coll->status[i].rc,
823 coll->status[i].bytes);
824 coll->num_done++;
825 kref_put(&coll->kref, rbd_coll_release);
826 }
827 spin_unlock_irq(q->queue_lock);
828}
829
830static void rbd_coll_end_req(struct rbd_request *req,
831 int ret, u64 len)
832{
833 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
834}
835
602adf40
YS
836/*
837 * Send ceph osd request
838 */
839static int rbd_do_request(struct request *rq,
840 struct rbd_device *dev,
841 struct ceph_snap_context *snapc,
842 u64 snapid,
843 const char *obj, u64 ofs, u64 len,
844 struct bio *bio,
845 struct page **pages,
846 int num_pages,
847 int flags,
848 struct ceph_osd_req_op *ops,
849 int num_reply,
1fec7093
YS
850 struct rbd_req_coll *coll,
851 int coll_index,
602adf40 852 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
853 struct ceph_msg *msg),
854 struct ceph_osd_request **linger_req,
855 u64 *ver)
602adf40
YS
856{
857 struct ceph_osd_request *req;
858 struct ceph_file_layout *layout;
859 int ret;
860 u64 bno;
861 struct timespec mtime = CURRENT_TIME;
862 struct rbd_request *req_data;
863 struct ceph_osd_request_head *reqhead;
864 struct rbd_image_header *header = &dev->header;
1dbb4399 865 struct ceph_osd_client *osdc;
602adf40 866
602adf40 867 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
868 if (!req_data) {
869 if (coll)
870 rbd_coll_end_req_index(rq, coll, coll_index,
871 -ENOMEM, len);
872 return -ENOMEM;
873 }
874
875 if (coll) {
876 req_data->coll = coll;
877 req_data->coll_index = coll_index;
878 }
602adf40 879
1fec7093 880 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
602adf40
YS
881
882 down_read(&header->snap_rwsem);
883
1dbb4399
AE
884 osdc = &dev->rbd_client->client->osdc;
885 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
886 false, GFP_NOIO, pages, bio);
4ad12621 887 if (!req) {
602adf40 888 up_read(&header->snap_rwsem);
4ad12621 889 ret = -ENOMEM;
602adf40
YS
890 goto done_pages;
891 }
892
893 req->r_callback = rbd_cb;
894
895 req_data->rq = rq;
896 req_data->bio = bio;
897 req_data->pages = pages;
898 req_data->len = len;
899
900 req->r_priv = req_data;
901
902 reqhead = req->r_request->front.iov_base;
903 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
904
905 strncpy(req->r_oid, obj, sizeof(req->r_oid));
906 req->r_oid_len = strlen(req->r_oid);
907
908 layout = &req->r_file_layout;
909 memset(layout, 0, sizeof(*layout));
910 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
911 layout->fl_stripe_count = cpu_to_le32(1);
912 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
913 layout->fl_pg_preferred = cpu_to_le32(-1);
914 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
1dbb4399
AE
915 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
916 req, ops);
602adf40
YS
917
918 ceph_osdc_build_request(req, ofs, &len,
919 ops,
920 snapc,
921 &mtime,
922 req->r_oid, req->r_oid_len);
923 up_read(&header->snap_rwsem);
924
59c2be1e 925 if (linger_req) {
1dbb4399 926 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
927 *linger_req = req;
928 }
929
1dbb4399 930 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
931 if (ret < 0)
932 goto done_err;
933
934 if (!rbd_cb) {
1dbb4399 935 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
936 if (ver)
937 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
938 dout("reassert_ver=%lld\n",
939 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
940 ceph_osdc_put_request(req);
941 }
942 return ret;
943
944done_err:
945 bio_chain_put(req_data->bio);
946 ceph_osdc_put_request(req);
947done_pages:
1fec7093 948 rbd_coll_end_req(req_data, ret, len);
602adf40 949 kfree(req_data);
602adf40
YS
950 return ret;
951}
952
953/*
954 * Ceph osd op callback
955 */
956static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
957{
958 struct rbd_request *req_data = req->r_priv;
959 struct ceph_osd_reply_head *replyhead;
960 struct ceph_osd_op *op;
961 __s32 rc;
962 u64 bytes;
963 int read_op;
964
965 /* parse reply */
966 replyhead = msg->front.iov_base;
967 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
968 op = (void *)(replyhead + 1);
969 rc = le32_to_cpu(replyhead->result);
970 bytes = le64_to_cpu(op->extent.length);
971 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
972
973 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
974
975 if (rc == -ENOENT && read_op) {
976 zero_bio_chain(req_data->bio, 0);
977 rc = 0;
978 } else if (rc == 0 && read_op && bytes < req_data->len) {
979 zero_bio_chain(req_data->bio, bytes);
980 bytes = req_data->len;
981 }
982
1fec7093 983 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
984
985 if (req_data->bio)
986 bio_chain_put(req_data->bio);
987
988 ceph_osdc_put_request(req);
989 kfree(req_data);
990}
991
59c2be1e
YS
992static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
993{
994 ceph_osdc_put_request(req);
995}
996
602adf40
YS
997/*
998 * Do a synchronous ceph osd operation
999 */
1000static int rbd_req_sync_op(struct rbd_device *dev,
1001 struct ceph_snap_context *snapc,
1002 u64 snapid,
1003 int opcode,
1004 int flags,
1005 struct ceph_osd_req_op *orig_ops,
1006 int num_reply,
1007 const char *obj,
1008 u64 ofs, u64 len,
59c2be1e
YS
1009 char *buf,
1010 struct ceph_osd_request **linger_req,
1011 u64 *ver)
602adf40
YS
1012{
1013 int ret;
1014 struct page **pages;
1015 int num_pages;
1016 struct ceph_osd_req_op *ops = orig_ops;
1017 u32 payload_len;
1018
1019 num_pages = calc_pages_for(ofs , len);
1020 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1021 if (IS_ERR(pages))
1022 return PTR_ERR(pages);
602adf40
YS
1023
1024 if (!orig_ops) {
1025 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1026 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1027 if (ret < 0)
1028 goto done;
1029
1030 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1031 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1032 if (ret < 0)
1033 goto done_ops;
1034 }
1035 }
1036
1037 ret = rbd_do_request(NULL, dev, snapc, snapid,
1038 obj, ofs, len, NULL,
1039 pages, num_pages,
1040 flags,
1041 ops,
1042 2,
1fec7093 1043 NULL, 0,
59c2be1e
YS
1044 NULL,
1045 linger_req, ver);
602adf40
YS
1046 if (ret < 0)
1047 goto done_ops;
1048
1049 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1050 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1051
1052done_ops:
1053 if (!orig_ops)
1054 rbd_destroy_ops(ops);
1055done:
1056 ceph_release_page_vector(pages, num_pages);
1057 return ret;
1058}
1059
1060/*
1061 * Do an asynchronous ceph osd operation
1062 */
1063static int rbd_do_op(struct request *rq,
1064 struct rbd_device *rbd_dev ,
1065 struct ceph_snap_context *snapc,
1066 u64 snapid,
1067 int opcode, int flags, int num_reply,
1068 u64 ofs, u64 len,
1fec7093
YS
1069 struct bio *bio,
1070 struct rbd_req_coll *coll,
1071 int coll_index)
602adf40
YS
1072{
1073 char *seg_name;
1074 u64 seg_ofs;
1075 u64 seg_len;
1076 int ret;
1077 struct ceph_osd_req_op *ops;
1078 u32 payload_len;
1079
1080 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1081 if (!seg_name)
1082 return -ENOMEM;
1083
1084 seg_len = rbd_get_segment(&rbd_dev->header,
1085 rbd_dev->header.block_name,
1086 ofs, len,
1087 seg_name, &seg_ofs);
602adf40
YS
1088
1089 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1090
1091 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1092 if (ret < 0)
1093 goto done;
1094
1095 /* we've taken care of segment sizes earlier when we
1096 cloned the bios. We should never have a segment
1097 truncated at this point */
1098 BUG_ON(seg_len < len);
1099
1100 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1101 seg_name, seg_ofs, seg_len,
1102 bio,
1103 NULL, 0,
1104 flags,
1105 ops,
1106 num_reply,
1fec7093 1107 coll, coll_index,
59c2be1e 1108 rbd_req_cb, 0, NULL);
11f77002
SW
1109
1110 rbd_destroy_ops(ops);
602adf40
YS
1111done:
1112 kfree(seg_name);
1113 return ret;
1114}
1115
1116/*
1117 * Request async osd write
1118 */
1119static int rbd_req_write(struct request *rq,
1120 struct rbd_device *rbd_dev,
1121 struct ceph_snap_context *snapc,
1122 u64 ofs, u64 len,
1fec7093
YS
1123 struct bio *bio,
1124 struct rbd_req_coll *coll,
1125 int coll_index)
602adf40
YS
1126{
1127 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1128 CEPH_OSD_OP_WRITE,
1129 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1130 2,
1fec7093 1131 ofs, len, bio, coll, coll_index);
602adf40
YS
1132}
1133
1134/*
1135 * Request async osd read
1136 */
1137static int rbd_req_read(struct request *rq,
1138 struct rbd_device *rbd_dev,
1139 u64 snapid,
1140 u64 ofs, u64 len,
1fec7093
YS
1141 struct bio *bio,
1142 struct rbd_req_coll *coll,
1143 int coll_index)
602adf40
YS
1144{
1145 return rbd_do_op(rq, rbd_dev, NULL,
1146 (snapid ? snapid : CEPH_NOSNAP),
1147 CEPH_OSD_OP_READ,
1148 CEPH_OSD_FLAG_READ,
1149 2,
1fec7093 1150 ofs, len, bio, coll, coll_index);
602adf40
YS
1151}
1152
1153/*
1154 * Request sync osd read
1155 */
1156static int rbd_req_sync_read(struct rbd_device *dev,
1157 struct ceph_snap_context *snapc,
1158 u64 snapid,
1159 const char *obj,
1160 u64 ofs, u64 len,
59c2be1e
YS
1161 char *buf,
1162 u64 *ver)
602adf40
YS
1163{
1164 return rbd_req_sync_op(dev, NULL,
1165 (snapid ? snapid : CEPH_NOSNAP),
1166 CEPH_OSD_OP_READ,
1167 CEPH_OSD_FLAG_READ,
1168 NULL,
59c2be1e 1169 1, obj, ofs, len, buf, NULL, ver);
602adf40
YS
1170}
1171
1172/*
59c2be1e
YS
1173 * Request sync osd watch
1174 */
1175static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1176 u64 ver,
1177 u64 notify_id,
1178 const char *obj)
1179{
1180 struct ceph_osd_req_op *ops;
1181 struct page **pages = NULL;
11f77002
SW
1182 int ret;
1183
1184 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1185 if (ret < 0)
1186 return ret;
1187
1188 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1189 ops[0].watch.cookie = notify_id;
1190 ops[0].watch.flag = 0;
1191
1192 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1193 obj, 0, 0, NULL,
1194 pages, 0,
1195 CEPH_OSD_FLAG_READ,
1196 ops,
1197 1,
1fec7093 1198 NULL, 0,
59c2be1e
YS
1199 rbd_simple_req_cb, 0, NULL);
1200
1201 rbd_destroy_ops(ops);
1202 return ret;
1203}
1204
1205static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1206{
1207 struct rbd_device *dev = (struct rbd_device *)data;
13143d2d
SW
1208 int rc;
1209
59c2be1e
YS
1210 if (!dev)
1211 return;
1212
1213 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1214 notify_id, (int)opcode);
1215 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
13143d2d 1216 rc = __rbd_update_snaps(dev);
59c2be1e 1217 mutex_unlock(&ctl_mutex);
13143d2d
SW
1218 if (rc)
1219 pr_warning(DRV_NAME "%d got notification but failed to update"
1220 " snaps: %d\n", dev->major, rc);
59c2be1e
YS
1221
1222 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1223}
1224
1225/*
1226 * Request sync osd watch
1227 */
1228static int rbd_req_sync_watch(struct rbd_device *dev,
1229 const char *obj,
1230 u64 ver)
1231{
1232 struct ceph_osd_req_op *ops;
1dbb4399 1233 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1234
1235 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1236 if (ret < 0)
1237 return ret;
1238
1239 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1240 (void *)dev, &dev->watch_event);
1241 if (ret < 0)
1242 goto fail;
1243
1244 ops[0].watch.ver = cpu_to_le64(ver);
1245 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1246 ops[0].watch.flag = 1;
1247
1248 ret = rbd_req_sync_op(dev, NULL,
1249 CEPH_NOSNAP,
1250 0,
1251 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1252 ops,
1253 1, obj, 0, 0, NULL,
1254 &dev->watch_request, NULL);
1255
1256 if (ret < 0)
1257 goto fail_event;
1258
1259 rbd_destroy_ops(ops);
1260 return 0;
1261
1262fail_event:
1263 ceph_osdc_cancel_event(dev->watch_event);
1264 dev->watch_event = NULL;
1265fail:
1266 rbd_destroy_ops(ops);
1267 return ret;
1268}
1269
79e3057c
YS
1270/*
1271 * Request sync osd unwatch
1272 */
1273static int rbd_req_sync_unwatch(struct rbd_device *dev,
1274 const char *obj)
1275{
1276 struct ceph_osd_req_op *ops;
1277
1278 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1279 if (ret < 0)
1280 return ret;
1281
1282 ops[0].watch.ver = 0;
1283 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1284 ops[0].watch.flag = 0;
1285
1286 ret = rbd_req_sync_op(dev, NULL,
1287 CEPH_NOSNAP,
1288 0,
1289 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1290 ops,
1291 1, obj, 0, 0, NULL, NULL, NULL);
1292
1293 rbd_destroy_ops(ops);
1294 ceph_osdc_cancel_event(dev->watch_event);
1295 dev->watch_event = NULL;
1296 return ret;
1297}
1298
59c2be1e
YS
1299struct rbd_notify_info {
1300 struct rbd_device *dev;
1301};
1302
1303static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1304{
1305 struct rbd_device *dev = (struct rbd_device *)data;
1306 if (!dev)
1307 return;
1308
1309 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1310 notify_id, (int)opcode);
1311}
1312
1313/*
1314 * Request sync osd notify
1315 */
1316static int rbd_req_sync_notify(struct rbd_device *dev,
1317 const char *obj)
1318{
1319 struct ceph_osd_req_op *ops;
1dbb4399 1320 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1321 struct ceph_osd_event *event;
1322 struct rbd_notify_info info;
1323 int payload_len = sizeof(u32) + sizeof(u32);
1324 int ret;
1325
1326 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1327 if (ret < 0)
1328 return ret;
1329
1330 info.dev = dev;
1331
1332 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1333 (void *)&info, &event);
1334 if (ret < 0)
1335 goto fail;
1336
1337 ops[0].watch.ver = 1;
1338 ops[0].watch.flag = 1;
1339 ops[0].watch.cookie = event->cookie;
1340 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1341 ops[0].watch.timeout = 12;
1342
1343 ret = rbd_req_sync_op(dev, NULL,
1344 CEPH_NOSNAP,
1345 0,
1346 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1347 ops,
1348 1, obj, 0, 0, NULL, NULL, NULL);
1349 if (ret < 0)
1350 goto fail_event;
1351
1352 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1353 dout("ceph_osdc_wait_event returned %d\n", ret);
1354 rbd_destroy_ops(ops);
1355 return 0;
1356
1357fail_event:
1358 ceph_osdc_cancel_event(event);
1359fail:
1360 rbd_destroy_ops(ops);
1361 return ret;
1362}
1363
602adf40
YS
1364/*
1365 * Request sync osd read
1366 */
1367static int rbd_req_sync_exec(struct rbd_device *dev,
1368 const char *obj,
1369 const char *cls,
1370 const char *method,
1371 const char *data,
59c2be1e
YS
1372 int len,
1373 u64 *ver)
602adf40
YS
1374{
1375 struct ceph_osd_req_op *ops;
1376 int cls_len = strlen(cls);
1377 int method_len = strlen(method);
1378 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1379 cls_len + method_len + len);
1380 if (ret < 0)
1381 return ret;
1382
1383 ops[0].cls.class_name = cls;
1384 ops[0].cls.class_len = (__u8)cls_len;
1385 ops[0].cls.method_name = method;
1386 ops[0].cls.method_len = (__u8)method_len;
1387 ops[0].cls.argc = 0;
1388 ops[0].cls.indata = data;
1389 ops[0].cls.indata_len = len;
1390
1391 ret = rbd_req_sync_op(dev, NULL,
1392 CEPH_NOSNAP,
1393 0,
1394 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1395 ops,
59c2be1e 1396 1, obj, 0, 0, NULL, NULL, ver);
602adf40
YS
1397
1398 rbd_destroy_ops(ops);
1399
1400 dout("cls_exec returned %d\n", ret);
1401 return ret;
1402}
1403
1fec7093
YS
1404static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1405{
1406 struct rbd_req_coll *coll =
1407 kzalloc(sizeof(struct rbd_req_coll) +
1408 sizeof(struct rbd_req_status) * num_reqs,
1409 GFP_ATOMIC);
1410
1411 if (!coll)
1412 return NULL;
1413 coll->total = num_reqs;
1414 kref_init(&coll->kref);
1415 return coll;
1416}
1417
602adf40
YS
1418/*
1419 * block device queue callback
1420 */
1421static void rbd_rq_fn(struct request_queue *q)
1422{
1423 struct rbd_device *rbd_dev = q->queuedata;
1424 struct request *rq;
1425 struct bio_pair *bp = NULL;
1426
1427 rq = blk_fetch_request(q);
1428
1429 while (1) {
1430 struct bio *bio;
1431 struct bio *rq_bio, *next_bio = NULL;
1432 bool do_write;
1433 int size, op_size = 0;
1434 u64 ofs;
1fec7093
YS
1435 int num_segs, cur_seg = 0;
1436 struct rbd_req_coll *coll;
602adf40
YS
1437
1438 /* peek at request from block layer */
1439 if (!rq)
1440 break;
1441
1442 dout("fetched request\n");
1443
1444 /* filter out block requests we don't understand */
1445 if ((rq->cmd_type != REQ_TYPE_FS)) {
1446 __blk_end_request_all(rq, 0);
1447 goto next;
1448 }
1449
1450 /* deduce our operation (read, write) */
1451 do_write = (rq_data_dir(rq) == WRITE);
1452
1453 size = blk_rq_bytes(rq);
1454 ofs = blk_rq_pos(rq) * 512ULL;
1455 rq_bio = rq->bio;
1456 if (do_write && rbd_dev->read_only) {
1457 __blk_end_request_all(rq, -EROFS);
1458 goto next;
1459 }
1460
1461 spin_unlock_irq(q->queue_lock);
1462
1463 dout("%s 0x%x bytes at 0x%llx\n",
1464 do_write ? "write" : "read",
1465 size, blk_rq_pos(rq) * 512ULL);
1466
1fec7093
YS
1467 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1468 coll = rbd_alloc_coll(num_segs);
1469 if (!coll) {
1470 spin_lock_irq(q->queue_lock);
1471 __blk_end_request_all(rq, -ENOMEM);
1472 goto next;
1473 }
1474
602adf40
YS
1475 do {
1476 /* a bio clone to be passed down to OSD req */
1477 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1478 op_size = rbd_get_segment(&rbd_dev->header,
1479 rbd_dev->header.block_name,
1480 ofs, size,
1481 NULL, NULL);
1fec7093 1482 kref_get(&coll->kref);
602adf40
YS
1483 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1484 op_size, GFP_ATOMIC);
1485 if (!bio) {
1fec7093
YS
1486 rbd_coll_end_req_index(rq, coll, cur_seg,
1487 -ENOMEM, op_size);
1488 goto next_seg;
602adf40
YS
1489 }
1490
1fec7093 1491
602adf40
YS
1492 /* init OSD command: write or read */
1493 if (do_write)
1494 rbd_req_write(rq, rbd_dev,
1495 rbd_dev->header.snapc,
1496 ofs,
1fec7093
YS
1497 op_size, bio,
1498 coll, cur_seg);
602adf40
YS
1499 else
1500 rbd_req_read(rq, rbd_dev,
1501 cur_snap_id(rbd_dev),
1502 ofs,
1fec7093
YS
1503 op_size, bio,
1504 coll, cur_seg);
602adf40 1505
1fec7093 1506next_seg:
602adf40
YS
1507 size -= op_size;
1508 ofs += op_size;
1509
1fec7093 1510 cur_seg++;
602adf40
YS
1511 rq_bio = next_bio;
1512 } while (size > 0);
1fec7093 1513 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1514
1515 if (bp)
1516 bio_pair_release(bp);
602adf40
YS
1517 spin_lock_irq(q->queue_lock);
1518next:
1519 rq = blk_fetch_request(q);
1520 }
1521}
1522
1523/*
1524 * a queue callback. Makes sure that we don't create a bio that spans across
1525 * multiple osd objects. One exception would be with a single page bios,
1526 * which we handle later at bio_chain_clone
1527 */
1528static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1529 struct bio_vec *bvec)
1530{
1531 struct rbd_device *rbd_dev = q->queuedata;
1532 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1533 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1534 unsigned int bio_sectors = bmd->bi_size >> 9;
1535 int max;
1536
1537 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1538 + bio_sectors)) << 9;
1539 if (max < 0)
1540 max = 0; /* bio_add cannot handle a negative return */
1541 if (max <= bvec->bv_len && bio_sectors == 0)
1542 return bvec->bv_len;
1543 return max;
1544}
1545
1546static void rbd_free_disk(struct rbd_device *rbd_dev)
1547{
1548 struct gendisk *disk = rbd_dev->disk;
1549
1550 if (!disk)
1551 return;
1552
1553 rbd_header_free(&rbd_dev->header);
1554
1555 if (disk->flags & GENHD_FL_UP)
1556 del_gendisk(disk);
1557 if (disk->queue)
1558 blk_cleanup_queue(disk->queue);
1559 put_disk(disk);
1560}
1561
1562/*
1563 * reload the ondisk the header
1564 */
1565static int rbd_read_header(struct rbd_device *rbd_dev,
1566 struct rbd_image_header *header)
1567{
1568 ssize_t rc;
1569 struct rbd_image_header_ondisk *dh;
1570 int snap_count = 0;
1571 u64 snap_names_len = 0;
59c2be1e 1572 u64 ver;
602adf40
YS
1573
1574 while (1) {
1575 int len = sizeof(*dh) +
1576 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1577 snap_names_len;
1578
1579 rc = -ENOMEM;
1580 dh = kmalloc(len, GFP_KERNEL);
1581 if (!dh)
1582 return -ENOMEM;
1583
1584 rc = rbd_req_sync_read(rbd_dev,
1585 NULL, CEPH_NOSNAP,
1586 rbd_dev->obj_md_name,
1587 0, len,
59c2be1e 1588 (char *)dh, &ver);
602adf40
YS
1589 if (rc < 0)
1590 goto out_dh;
1591
1592 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb
JD
1593 if (rc < 0) {
1594 if (rc == -ENXIO) {
1595 pr_warning("unrecognized header format"
1596 " for image %s", rbd_dev->obj);
1597 }
602adf40 1598 goto out_dh;
81e759fb 1599 }
602adf40
YS
1600
1601 if (snap_count != header->total_snaps) {
1602 snap_count = header->total_snaps;
1603 snap_names_len = header->snap_names_len;
1604 rbd_header_free(header);
1605 kfree(dh);
1606 continue;
1607 }
1608 break;
1609 }
59c2be1e 1610 header->obj_version = ver;
602adf40
YS
1611
1612out_dh:
1613 kfree(dh);
1614 return rc;
1615}
1616
1617/*
1618 * create a snapshot
1619 */
1620static int rbd_header_add_snap(struct rbd_device *dev,
1621 const char *snap_name,
1622 gfp_t gfp_flags)
1623{
1624 int name_len = strlen(snap_name);
1625 u64 new_snapid;
1626 int ret;
916d4d67 1627 void *data, *p, *e;
59c2be1e 1628 u64 ver;
1dbb4399 1629 struct ceph_mon_client *monc;
602adf40
YS
1630
1631 /* we should create a snapshot only if we're pointing at the head */
1632 if (dev->cur_snap)
1633 return -EINVAL;
1634
1dbb4399
AE
1635 monc = &dev->rbd_client->client->monc;
1636 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
602adf40
YS
1637 dout("created snapid=%lld\n", new_snapid);
1638 if (ret < 0)
1639 return ret;
1640
1641 data = kmalloc(name_len + 16, gfp_flags);
1642 if (!data)
1643 return -ENOMEM;
1644
916d4d67
SW
1645 p = data;
1646 e = data + name_len + 16;
602adf40 1647
916d4d67
SW
1648 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1649 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40
YS
1650
1651 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
916d4d67 1652 data, p - data, &ver);
602adf40 1653
916d4d67 1654 kfree(data);
602adf40
YS
1655
1656 if (ret < 0)
1657 return ret;
1658
1659 dev->header.snapc->seq = new_snapid;
1660
1661 return 0;
1662bad:
1663 return -ERANGE;
1664}
1665
dfc5606d
YS
1666static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1667{
1668 struct rbd_snap *snap;
1669
1670 while (!list_empty(&rbd_dev->snaps)) {
1671 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1672 __rbd_remove_snap_dev(rbd_dev, snap);
1673 }
1674}
1675
602adf40
YS
1676/*
1677 * only read the first part of the ondisk header, without the snaps info
1678 */
dfc5606d 1679static int __rbd_update_snaps(struct rbd_device *rbd_dev)
602adf40
YS
1680{
1681 int ret;
1682 struct rbd_image_header h;
1683 u64 snap_seq;
59c2be1e 1684 int follow_seq = 0;
602adf40
YS
1685
1686 ret = rbd_read_header(rbd_dev, &h);
1687 if (ret < 0)
1688 return ret;
1689
9db4b3e3
SW
1690 /* resized? */
1691 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1692
602adf40
YS
1693 down_write(&rbd_dev->header.snap_rwsem);
1694
1695 snap_seq = rbd_dev->header.snapc->seq;
59c2be1e
YS
1696 if (rbd_dev->header.total_snaps &&
1697 rbd_dev->header.snapc->snaps[0] == snap_seq)
1698 /* pointing at the head, will need to follow that
1699 if head moves */
1700 follow_seq = 1;
602adf40
YS
1701
1702 kfree(rbd_dev->header.snapc);
1703 kfree(rbd_dev->header.snap_names);
1704 kfree(rbd_dev->header.snap_sizes);
1705
1706 rbd_dev->header.total_snaps = h.total_snaps;
1707 rbd_dev->header.snapc = h.snapc;
1708 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1709 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1710 rbd_dev->header.snap_sizes = h.snap_sizes;
59c2be1e
YS
1711 if (follow_seq)
1712 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1713 else
1714 rbd_dev->header.snapc->seq = snap_seq;
602adf40 1715
dfc5606d
YS
1716 ret = __rbd_init_snaps_header(rbd_dev);
1717
602adf40
YS
1718 up_write(&rbd_dev->header.snap_rwsem);
1719
dfc5606d 1720 return ret;
602adf40
YS
1721}
1722
1723static int rbd_init_disk(struct rbd_device *rbd_dev)
1724{
1725 struct gendisk *disk;
1726 struct request_queue *q;
1727 int rc;
1728 u64 total_size = 0;
1729
1730 /* contact OSD, request size info about the object being mapped */
1731 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1732 if (rc)
1733 return rc;
1734
dfc5606d
YS
1735 /* no need to lock here, as rbd_dev is not registered yet */
1736 rc = __rbd_init_snaps_header(rbd_dev);
1737 if (rc)
1738 return rc;
1739
cc9d734c 1740 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1741 if (rc)
1742 return rc;
1743
1744 /* create gendisk info */
1745 rc = -ENOMEM;
1746 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1747 if (!disk)
1748 goto out;
1749
aedfec59
SW
1750 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1751 rbd_dev->id);
602adf40
YS
1752 disk->major = rbd_dev->major;
1753 disk->first_minor = 0;
1754 disk->fops = &rbd_bd_ops;
1755 disk->private_data = rbd_dev;
1756
1757 /* init rq */
1758 rc = -ENOMEM;
1759 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1760 if (!q)
1761 goto out_disk;
029bcbd8
JD
1762
1763 /* set io sizes to object size */
1764 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1765 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1766 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1767 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1768
602adf40
YS
1769 blk_queue_merge_bvec(q, rbd_merge_bvec);
1770 disk->queue = q;
1771
1772 q->queuedata = rbd_dev;
1773
1774 rbd_dev->disk = disk;
1775 rbd_dev->q = q;
1776
1777 /* finally, announce the disk to the world */
1778 set_capacity(disk, total_size / 512ULL);
1779 add_disk(disk);
1780
1781 pr_info("%s: added with size 0x%llx\n",
1782 disk->disk_name, (unsigned long long)total_size);
1783 return 0;
1784
1785out_disk:
1786 put_disk(disk);
1787out:
1788 return rc;
1789}
1790
dfc5606d
YS
1791/*
1792 sysfs
1793*/
1794
1795static ssize_t rbd_size_show(struct device *dev,
1796 struct device_attribute *attr, char *buf)
1797{
1798 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1799
1800 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1801}
1802
1803static ssize_t rbd_major_show(struct device *dev,
1804 struct device_attribute *attr, char *buf)
1805{
1806 struct rbd_device *rbd_dev = dev_to_rbd(dev);
602adf40 1807
dfc5606d
YS
1808 return sprintf(buf, "%d\n", rbd_dev->major);
1809}
1810
1811static ssize_t rbd_client_id_show(struct device *dev,
1812 struct device_attribute *attr, char *buf)
602adf40 1813{
dfc5606d
YS
1814 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1815
1dbb4399
AE
1816 return sprintf(buf, "client%lld\n",
1817 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1818}
1819
dfc5606d
YS
1820static ssize_t rbd_pool_show(struct device *dev,
1821 struct device_attribute *attr, char *buf)
602adf40 1822{
dfc5606d
YS
1823 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1824
1825 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1826}
1827
1828static ssize_t rbd_name_show(struct device *dev,
1829 struct device_attribute *attr, char *buf)
1830{
1831 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1832
1833 return sprintf(buf, "%s\n", rbd_dev->obj);
1834}
1835
1836static ssize_t rbd_snap_show(struct device *dev,
1837 struct device_attribute *attr,
1838 char *buf)
1839{
1840 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1841
1842 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1843}
1844
1845static ssize_t rbd_image_refresh(struct device *dev,
1846 struct device_attribute *attr,
1847 const char *buf,
1848 size_t size)
1849{
1850 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1851 int rc;
1852 int ret = size;
602adf40
YS
1853
1854 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1855
dfc5606d
YS
1856 rc = __rbd_update_snaps(rbd_dev);
1857 if (rc < 0)
1858 ret = rc;
602adf40 1859
dfc5606d
YS
1860 mutex_unlock(&ctl_mutex);
1861 return ret;
1862}
602adf40 1863
dfc5606d
YS
1864static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1865static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1866static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1867static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1868static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1869static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1870static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1871static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1872
1873static struct attribute *rbd_attrs[] = {
1874 &dev_attr_size.attr,
1875 &dev_attr_major.attr,
1876 &dev_attr_client_id.attr,
1877 &dev_attr_pool.attr,
1878 &dev_attr_name.attr,
1879 &dev_attr_current_snap.attr,
1880 &dev_attr_refresh.attr,
1881 &dev_attr_create_snap.attr,
dfc5606d
YS
1882 NULL
1883};
1884
1885static struct attribute_group rbd_attr_group = {
1886 .attrs = rbd_attrs,
1887};
1888
1889static const struct attribute_group *rbd_attr_groups[] = {
1890 &rbd_attr_group,
1891 NULL
1892};
1893
1894static void rbd_sysfs_dev_release(struct device *dev)
1895{
1896}
1897
1898static struct device_type rbd_device_type = {
1899 .name = "rbd",
1900 .groups = rbd_attr_groups,
1901 .release = rbd_sysfs_dev_release,
1902};
1903
1904
1905/*
1906 sysfs - snapshots
1907*/
1908
1909static ssize_t rbd_snap_size_show(struct device *dev,
1910 struct device_attribute *attr,
1911 char *buf)
1912{
1913 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1914
1915 return sprintf(buf, "%lld\n", (long long)snap->size);
1916}
1917
1918static ssize_t rbd_snap_id_show(struct device *dev,
1919 struct device_attribute *attr,
1920 char *buf)
1921{
1922 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1923
1924 return sprintf(buf, "%lld\n", (long long)snap->id);
1925}
1926
1927static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1928static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1929
1930static struct attribute *rbd_snap_attrs[] = {
1931 &dev_attr_snap_size.attr,
1932 &dev_attr_snap_id.attr,
1933 NULL,
1934};
1935
1936static struct attribute_group rbd_snap_attr_group = {
1937 .attrs = rbd_snap_attrs,
1938};
1939
1940static void rbd_snap_dev_release(struct device *dev)
1941{
1942 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1943 kfree(snap->name);
1944 kfree(snap);
1945}
1946
1947static const struct attribute_group *rbd_snap_attr_groups[] = {
1948 &rbd_snap_attr_group,
1949 NULL
1950};
1951
1952static struct device_type rbd_snap_device_type = {
1953 .groups = rbd_snap_attr_groups,
1954 .release = rbd_snap_dev_release,
1955};
1956
1957static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1958 struct rbd_snap *snap)
1959{
1960 list_del(&snap->node);
1961 device_unregister(&snap->dev);
1962}
1963
1964static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1965 struct rbd_snap *snap,
1966 struct device *parent)
1967{
1968 struct device *dev = &snap->dev;
1969 int ret;
1970
1971 dev->type = &rbd_snap_device_type;
1972 dev->parent = parent;
1973 dev->release = rbd_snap_dev_release;
1974 dev_set_name(dev, "snap_%s", snap->name);
1975 ret = device_register(dev);
1976
1977 return ret;
1978}
1979
1980static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1981 int i, const char *name,
1982 struct rbd_snap **snapp)
1983{
1984 int ret;
1985 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1986 if (!snap)
1987 return -ENOMEM;
1988 snap->name = kstrdup(name, GFP_KERNEL);
1989 snap->size = rbd_dev->header.snap_sizes[i];
1990 snap->id = rbd_dev->header.snapc->snaps[i];
1991 if (device_is_registered(&rbd_dev->dev)) {
1992 ret = rbd_register_snap_dev(rbd_dev, snap,
1993 &rbd_dev->dev);
1994 if (ret < 0)
1995 goto err;
1996 }
1997 *snapp = snap;
1998 return 0;
1999err:
2000 kfree(snap->name);
2001 kfree(snap);
2002 return ret;
2003}
2004
2005/*
2006 * search for the previous snap in a null delimited string list
2007 */
2008const char *rbd_prev_snap_name(const char *name, const char *start)
2009{
2010 if (name < start + 2)
2011 return NULL;
2012
2013 name -= 2;
2014 while (*name) {
2015 if (name == start)
2016 return start;
2017 name--;
2018 }
2019 return name + 1;
2020}
2021
2022/*
2023 * compare the old list of snapshots that we have to what's in the header
2024 * and update it accordingly. Note that the header holds the snapshots
2025 * in a reverse order (from newest to oldest) and we need to go from
2026 * older to new so that we don't get a duplicate snap name when
2027 * doing the process (e.g., removed snapshot and recreated a new
2028 * one with the same name.
2029 */
2030static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2031{
2032 const char *name, *first_name;
2033 int i = rbd_dev->header.total_snaps;
2034 struct rbd_snap *snap, *old_snap = NULL;
2035 int ret;
2036 struct list_head *p, *n;
2037
2038 first_name = rbd_dev->header.snap_names;
2039 name = first_name + rbd_dev->header.snap_names_len;
2040
2041 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2042 u64 cur_id;
2043
2044 old_snap = list_entry(p, struct rbd_snap, node);
2045
2046 if (i)
2047 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2048
2049 if (!i || old_snap->id < cur_id) {
2050 /* old_snap->id was skipped, thus was removed */
2051 __rbd_remove_snap_dev(rbd_dev, old_snap);
2052 continue;
2053 }
2054 if (old_snap->id == cur_id) {
2055 /* we have this snapshot already */
2056 i--;
2057 name = rbd_prev_snap_name(name, first_name);
2058 continue;
2059 }
2060 for (; i > 0;
2061 i--, name = rbd_prev_snap_name(name, first_name)) {
2062 if (!name) {
2063 WARN_ON(1);
2064 return -EINVAL;
2065 }
2066 cur_id = rbd_dev->header.snapc->snaps[i];
2067 /* snapshot removal? handle it above */
2068 if (cur_id >= old_snap->id)
2069 break;
2070 /* a new snapshot */
2071 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2072 if (ret < 0)
2073 return ret;
2074
2075 /* note that we add it backward so using n and not p */
2076 list_add(&snap->node, n);
2077 p = &snap->node;
2078 }
2079 }
2080 /* we're done going over the old snap list, just add what's left */
2081 for (; i > 0; i--) {
2082 name = rbd_prev_snap_name(name, first_name);
2083 if (!name) {
2084 WARN_ON(1);
2085 return -EINVAL;
2086 }
2087 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2088 if (ret < 0)
2089 return ret;
2090 list_add(&snap->node, &rbd_dev->snaps);
2091 }
2092
2093 return 0;
2094}
2095
2096
2097static void rbd_root_dev_release(struct device *dev)
2098{
2099}
2100
2101static struct device rbd_root_dev = {
2102 .init_name = "rbd",
2103 .release = rbd_root_dev_release,
2104};
2105
2106static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2107{
2108 int ret = -ENOMEM;
2109 struct device *dev;
2110 struct rbd_snap *snap;
2111
2112 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2113 dev = &rbd_dev->dev;
2114
2115 dev->bus = &rbd_bus_type;
2116 dev->type = &rbd_device_type;
2117 dev->parent = &rbd_root_dev;
2118 dev->release = rbd_dev_release;
2119 dev_set_name(dev, "%d", rbd_dev->id);
2120 ret = device_register(dev);
2121 if (ret < 0)
2122 goto done_free;
2123
2124 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2125 ret = rbd_register_snap_dev(rbd_dev, snap,
2126 &rbd_dev->dev);
2127 if (ret < 0)
602adf40
YS
2128 break;
2129 }
2130
2131 mutex_unlock(&ctl_mutex);
dfc5606d
YS
2132 return 0;
2133done_free:
2134 mutex_unlock(&ctl_mutex);
2135 return ret;
602adf40
YS
2136}
2137
dfc5606d
YS
2138static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2139{
2140 device_unregister(&rbd_dev->dev);
2141}
2142
59c2be1e
YS
2143static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2144{
2145 int ret, rc;
2146
2147 do {
2148 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2149 rbd_dev->header.obj_version);
2150 if (ret == -ERANGE) {
2151 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2152 rc = __rbd_update_snaps(rbd_dev);
2153 mutex_unlock(&ctl_mutex);
2154 if (rc < 0)
2155 return rc;
2156 }
2157 } while (ret == -ERANGE);
2158
2159 return ret;
2160}
2161
1ddbe94e
AE
2162static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2163
2164/*
499afd5b
AE
2165 * Get a unique rbd identifier for the given new rbd_dev, and add
2166 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2167 */
499afd5b 2168static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2169{
499afd5b
AE
2170 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2171
2172 spin_lock(&rbd_dev_list_lock);
2173 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2174 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2175}
b7f23c36 2176
1ddbe94e 2177/*
499afd5b
AE
2178 * Remove an rbd_dev from the global list, and record that its
2179 * identifier is no longer in use.
1ddbe94e 2180 */
499afd5b 2181static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2182{
d184f6bf
AE
2183 struct list_head *tmp;
2184 int rbd_id = rbd_dev->id;
2185 int max_id;
2186
2187 BUG_ON(rbd_id < 1);
499afd5b
AE
2188
2189 spin_lock(&rbd_dev_list_lock);
2190 list_del_init(&rbd_dev->node);
d184f6bf
AE
2191
2192 /*
2193 * If the id being "put" is not the current maximum, there
2194 * is nothing special we need to do.
2195 */
2196 if (rbd_id != atomic64_read(&rbd_id_max)) {
2197 spin_unlock(&rbd_dev_list_lock);
2198 return;
2199 }
2200
2201 /*
2202 * We need to update the current maximum id. Search the
2203 * list to find out what it is. We're more likely to find
2204 * the maximum at the end, so search the list backward.
2205 */
2206 max_id = 0;
2207 list_for_each_prev(tmp, &rbd_dev_list) {
2208 struct rbd_device *rbd_dev;
2209
2210 rbd_dev = list_entry(tmp, struct rbd_device, node);
2211 if (rbd_id > max_id)
2212 max_id = rbd_id;
2213 }
499afd5b 2214 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2215
1ddbe94e 2216 /*
d184f6bf
AE
2217 * The max id could have been updated by rbd_id_get(), in
2218 * which case it now accurately reflects the new maximum.
2219 * Be careful not to overwrite the maximum value in that
2220 * case.
1ddbe94e 2221 */
d184f6bf 2222 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2223}
2224
59c2be1e
YS
2225static ssize_t rbd_add(struct bus_type *bus,
2226 const char *buf,
2227 size_t count)
602adf40
YS
2228{
2229 struct ceph_osd_client *osdc;
2230 struct rbd_device *rbd_dev;
2231 ssize_t rc = -ENOMEM;
b7f23c36 2232 int irc;
602adf40
YS
2233 char *mon_dev_name;
2234 char *options;
2235
2236 if (!try_module_get(THIS_MODULE))
2237 return -ENODEV;
2238
2239 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2240 if (!mon_dev_name)
2241 goto err_out_mod;
2242
2243 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2244 if (!options)
2245 goto err_mon_dev;
2246
2247 /* new rbd_device object */
2248 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2249 if (!rbd_dev)
2250 goto err_out_opt;
2251
2252 /* static rbd_device initialization */
2253 spin_lock_init(&rbd_dev->lock);
2254 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2255 INIT_LIST_HEAD(&rbd_dev->snaps);
602adf40 2256
0e805a1d
AE
2257 init_rwsem(&rbd_dev->header.snap_rwsem);
2258
d184f6bf 2259 /* generate unique id: find highest unique id, add one */
499afd5b 2260 rbd_id_get(rbd_dev);
602adf40
YS
2261
2262 /* parse add command */
2263 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2264 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2265 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2266 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2267 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2268 mon_dev_name, options, rbd_dev->pool_name,
2269 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2270 rc = -EINVAL;
2271 goto err_out_slot;
2272 }
2273
2274 if (rbd_dev->snap_name[0] == 0)
cc9d734c
JD
2275 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2276 sizeof (RBD_SNAP_HEAD_NAME));
602adf40
YS
2277
2278 rbd_dev->obj_len = strlen(rbd_dev->obj);
2279 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2280 rbd_dev->obj, RBD_SUFFIX);
2281
2282 /* initialize rest of new object */
2283 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
e124a82f 2284
602adf40
YS
2285 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2286 if (rc < 0)
2287 goto err_out_slot;
2288
602adf40 2289 /* pick the pool */
1dbb4399 2290 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2291 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2292 if (rc < 0)
2293 goto err_out_client;
2294 rbd_dev->poolid = rc;
2295
2296 /* register our block device */
2297 irc = register_blkdev(0, rbd_dev->name);
2298 if (irc < 0) {
2299 rc = irc;
2300 goto err_out_client;
2301 }
2302 rbd_dev->major = irc;
2303
dfc5606d
YS
2304 rc = rbd_bus_add_dev(rbd_dev);
2305 if (rc)
766fc439
YS
2306 goto err_out_blkdev;
2307
602adf40
YS
2308 /* set up and announce blkdev mapping */
2309 rc = rbd_init_disk(rbd_dev);
2310 if (rc)
766fc439 2311 goto err_out_bus;
602adf40 2312
59c2be1e
YS
2313 rc = rbd_init_watch_dev(rbd_dev);
2314 if (rc)
2315 goto err_out_bus;
2316
602adf40
YS
2317 return count;
2318
766fc439 2319err_out_bus:
499afd5b 2320 rbd_id_put(rbd_dev);
766fc439
YS
2321
2322 /* this will also clean up rest of rbd_dev stuff */
2323
2324 rbd_bus_del_dev(rbd_dev);
2325 kfree(options);
2326 kfree(mon_dev_name);
2327 return rc;
2328
602adf40
YS
2329err_out_blkdev:
2330 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2331err_out_client:
2332 rbd_put_client(rbd_dev);
602adf40 2333err_out_slot:
499afd5b 2334 rbd_id_put(rbd_dev);
602adf40
YS
2335
2336 kfree(rbd_dev);
2337err_out_opt:
2338 kfree(options);
2339err_mon_dev:
2340 kfree(mon_dev_name);
2341err_out_mod:
2342 dout("Error adding device %s\n", buf);
2343 module_put(THIS_MODULE);
2344 return rc;
2345}
2346
2347static struct rbd_device *__rbd_get_dev(unsigned long id)
2348{
2349 struct list_head *tmp;
2350 struct rbd_device *rbd_dev;
2351
e124a82f 2352 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2353 list_for_each(tmp, &rbd_dev_list) {
2354 rbd_dev = list_entry(tmp, struct rbd_device, node);
e124a82f
AE
2355 if (rbd_dev->id == id) {
2356 spin_unlock(&rbd_dev_list_lock);
602adf40 2357 return rbd_dev;
e124a82f 2358 }
602adf40 2359 }
e124a82f 2360 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2361 return NULL;
2362}
2363
dfc5606d 2364static void rbd_dev_release(struct device *dev)
602adf40 2365{
dfc5606d
YS
2366 struct rbd_device *rbd_dev =
2367 container_of(dev, struct rbd_device, dev);
602adf40 2368
1dbb4399
AE
2369 if (rbd_dev->watch_request) {
2370 struct ceph_client *client = rbd_dev->rbd_client->client;
2371
2372 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2373 rbd_dev->watch_request);
1dbb4399 2374 }
59c2be1e 2375 if (rbd_dev->watch_event)
79e3057c 2376 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
59c2be1e 2377
602adf40
YS
2378 rbd_put_client(rbd_dev);
2379
2380 /* clean up and free blkdev */
2381 rbd_free_disk(rbd_dev);
2382 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2383 kfree(rbd_dev);
2384
2385 /* release module ref */
2386 module_put(THIS_MODULE);
602adf40
YS
2387}
2388
dfc5606d
YS
2389static ssize_t rbd_remove(struct bus_type *bus,
2390 const char *buf,
2391 size_t count)
602adf40
YS
2392{
2393 struct rbd_device *rbd_dev = NULL;
2394 int target_id, rc;
2395 unsigned long ul;
2396 int ret = count;
2397
2398 rc = strict_strtoul(buf, 10, &ul);
2399 if (rc)
2400 return rc;
2401
2402 /* convert to int; abort if we lost anything in the conversion */
2403 target_id = (int) ul;
2404 if (target_id != ul)
2405 return -EINVAL;
2406
2407 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2408
2409 rbd_dev = __rbd_get_dev(target_id);
2410 if (!rbd_dev) {
2411 ret = -ENOENT;
2412 goto done;
2413 }
2414
499afd5b 2415 rbd_id_put(rbd_dev);
dfc5606d
YS
2416
2417 __rbd_remove_all_snaps(rbd_dev);
2418 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2419
2420done:
2421 mutex_unlock(&ctl_mutex);
2422 return ret;
2423}
2424
dfc5606d
YS
2425static ssize_t rbd_snap_add(struct device *dev,
2426 struct device_attribute *attr,
2427 const char *buf,
2428 size_t count)
602adf40 2429{
dfc5606d
YS
2430 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2431 int ret;
2432 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2433 if (!name)
2434 return -ENOMEM;
2435
dfc5606d 2436 snprintf(name, count, "%s", buf);
602adf40
YS
2437
2438 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2439
602adf40
YS
2440 ret = rbd_header_add_snap(rbd_dev,
2441 name, GFP_KERNEL);
2442 if (ret < 0)
59c2be1e 2443 goto err_unlock;
602adf40 2444
dfc5606d 2445 ret = __rbd_update_snaps(rbd_dev);
602adf40 2446 if (ret < 0)
59c2be1e
YS
2447 goto err_unlock;
2448
2449 /* shouldn't hold ctl_mutex when notifying.. notify might
2450 trigger a watch callback that would need to get that mutex */
2451 mutex_unlock(&ctl_mutex);
2452
2453 /* make a best effort, don't error if failed */
2454 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
602adf40
YS
2455
2456 ret = count;
59c2be1e
YS
2457 kfree(name);
2458 return ret;
2459
2460err_unlock:
602adf40 2461 mutex_unlock(&ctl_mutex);
602adf40
YS
2462 kfree(name);
2463 return ret;
2464}
2465
dfc5606d
YS
2466static struct bus_attribute rbd_bus_attrs[] = {
2467 __ATTR(add, S_IWUSR, NULL, rbd_add),
2468 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
602adf40
YS
2469 __ATTR_NULL
2470};
2471
2472/*
2473 * create control files in sysfs
dfc5606d 2474 * /sys/bus/rbd/...
602adf40
YS
2475 */
2476static int rbd_sysfs_init(void)
2477{
dfc5606d 2478 int ret;
602adf40 2479
dfc5606d 2480 rbd_bus_type.bus_attrs = rbd_bus_attrs;
602adf40 2481
dfc5606d 2482 ret = bus_register(&rbd_bus_type);
21079786 2483 if (ret < 0)
dfc5606d 2484 return ret;
602adf40 2485
dfc5606d 2486 ret = device_register(&rbd_root_dev);
602adf40 2487
602adf40
YS
2488 return ret;
2489}
2490
2491static void rbd_sysfs_cleanup(void)
2492{
dfc5606d
YS
2493 device_unregister(&rbd_root_dev);
2494 bus_unregister(&rbd_bus_type);
602adf40
YS
2495}
2496
2497int __init rbd_init(void)
2498{
2499 int rc;
2500
2501 rc = rbd_sysfs_init();
2502 if (rc)
2503 return rc;
602adf40
YS
2504 pr_info("loaded " DRV_NAME_LONG "\n");
2505 return 0;
2506}
2507
2508void __exit rbd_exit(void)
2509{
2510 rbd_sysfs_cleanup();
2511}
2512
2513module_init(rbd_init);
2514module_exit(rbd_exit);
2515
2516MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2517MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2518MODULE_DESCRIPTION("rados block device");
2519
2520/* following authorship retained from original osdblk.c */
2521MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2522
2523MODULE_LICENSE("GPL");