]> git.ipfire.org Git - people/arne_f/kernel.git/blame - drivers/block/rbd.c
ceph: avoid inode lookup on nfs fh reconnect
[people/arne_f/kernel.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44#define DRV_NAME "rbd"
45#define DRV_NAME_LONG "rbd (rados block device)"
46
47#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48
49#define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
50#define RBD_MAX_POOL_NAME_LEN 64
51#define RBD_MAX_SNAP_NAME_LEN 32
52#define RBD_MAX_OPT_LEN 1024
53
54#define RBD_SNAP_HEAD_NAME "-"
55
56#define DEV_NAME_LEN 32
57
59c2be1e
YS
58#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
602adf40
YS
60/*
61 * block device image metadata (in-memory version)
62 */
63struct rbd_image_header {
64 u64 image_size;
65 char block_name[32];
66 __u8 obj_order;
67 __u8 crypt_type;
68 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
72 u64 snap_seq;
73 u32 total_snaps;
74
75 char *snap_names;
76 u64 *snap_sizes;
59c2be1e
YS
77
78 u64 obj_version;
79};
80
81struct rbd_options {
82 int notify_timeout;
602adf40
YS
83};
84
85/*
86 * an instance of the client. multiple devices may share a client.
87 */
88struct rbd_client {
89 struct ceph_client *client;
59c2be1e 90 struct rbd_options *rbd_opts;
602adf40
YS
91 struct kref kref;
92 struct list_head node;
93};
94
1fec7093
YS
95struct rbd_req_coll;
96
602adf40
YS
97/*
98 * a single io request
99 */
100struct rbd_request {
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
104 u64 len;
1fec7093
YS
105 int coll_index;
106 struct rbd_req_coll *coll;
107};
108
109struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
113};
114
115/*
116 * a collection of requests
117 */
118struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
602adf40
YS
123};
124
dfc5606d
YS
125struct rbd_snap {
126 struct device dev;
127 const char *name;
128 size_t size;
129 struct list_head node;
130 u64 id;
131};
132
602adf40
YS
133/*
134 * a single device
135 */
136struct rbd_device {
137 int id; /* blkdev unique id */
138
139 int major; /* blkdev assigned major */
140 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q;
142
143 struct ceph_client *client;
144 struct rbd_client *rbd_client;
145
146 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147
148 spinlock_t lock; /* queue lock */
149
150 struct rbd_image_header header;
151 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152 int obj_len;
153 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154 char pool_name[RBD_MAX_POOL_NAME_LEN];
155 int poolid;
156
59c2be1e
YS
157 struct ceph_osd_event *watch_event;
158 struct ceph_osd_request *watch_request;
159
602adf40
YS
160 char snap_name[RBD_MAX_SNAP_NAME_LEN];
161 u32 cur_snap; /* index+1 of current snapshot within snap context
162 0 - for the head */
163 int read_only;
164
165 struct list_head node;
dfc5606d
YS
166
167 /* list of snapshots */
168 struct list_head snaps;
169
170 /* sysfs related */
171 struct device dev;
172};
173
174static struct bus_type rbd_bus_type = {
175 .name = "rbd",
602adf40
YS
176};
177
178static spinlock_t node_lock; /* protects client get/put */
179
602adf40
YS
180static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
181static LIST_HEAD(rbd_dev_list); /* devices */
182static LIST_HEAD(rbd_client_list); /* clients */
183
dfc5606d
YS
184static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185static void rbd_dev_release(struct device *dev);
186static ssize_t rbd_snap_rollback(struct device *dev,
187 struct device_attribute *attr,
188 const char *buf,
189 size_t size);
190static ssize_t rbd_snap_add(struct device *dev,
191 struct device_attribute *attr,
192 const char *buf,
193 size_t count);
194static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
195 struct rbd_snap *snap);;
196
197
198static struct rbd_device *dev_to_rbd(struct device *dev)
199{
200 return container_of(dev, struct rbd_device, dev);
201}
202
203static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
204{
205 return get_device(&rbd_dev->dev);
206}
207
208static void rbd_put_dev(struct rbd_device *rbd_dev)
209{
210 put_device(&rbd_dev->dev);
211}
602adf40 212
59c2be1e
YS
213static int __rbd_update_snaps(struct rbd_device *rbd_dev);
214
602adf40
YS
215static int rbd_open(struct block_device *bdev, fmode_t mode)
216{
217 struct gendisk *disk = bdev->bd_disk;
218 struct rbd_device *rbd_dev = disk->private_data;
219
dfc5606d
YS
220 rbd_get_dev(rbd_dev);
221
602adf40
YS
222 set_device_ro(bdev, rbd_dev->read_only);
223
224 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
225 return -EROFS;
226
227 return 0;
228}
229
dfc5606d
YS
230static int rbd_release(struct gendisk *disk, fmode_t mode)
231{
232 struct rbd_device *rbd_dev = disk->private_data;
233
234 rbd_put_dev(rbd_dev);
235
236 return 0;
237}
238
602adf40
YS
239static const struct block_device_operations rbd_bd_ops = {
240 .owner = THIS_MODULE,
241 .open = rbd_open,
dfc5606d 242 .release = rbd_release,
602adf40
YS
243};
244
245/*
246 * Initialize an rbd client instance.
247 * We own *opt.
248 */
59c2be1e
YS
249static struct rbd_client *rbd_client_create(struct ceph_options *opt,
250 struct rbd_options *rbd_opts)
602adf40
YS
251{
252 struct rbd_client *rbdc;
253 int ret = -ENOMEM;
254
255 dout("rbd_client_create\n");
256 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
257 if (!rbdc)
258 goto out_opt;
259
260 kref_init(&rbdc->kref);
261 INIT_LIST_HEAD(&rbdc->node);
262
263 rbdc->client = ceph_create_client(opt, rbdc);
264 if (IS_ERR(rbdc->client))
265 goto out_rbdc;
28f259b7 266 opt = NULL; /* Now rbdc->client is responsible for opt */
602adf40
YS
267
268 ret = ceph_open_session(rbdc->client);
269 if (ret < 0)
270 goto out_err;
271
59c2be1e
YS
272 rbdc->rbd_opts = rbd_opts;
273
602adf40
YS
274 spin_lock(&node_lock);
275 list_add_tail(&rbdc->node, &rbd_client_list);
276 spin_unlock(&node_lock);
277
278 dout("rbd_client_create created %p\n", rbdc);
279 return rbdc;
280
281out_err:
282 ceph_destroy_client(rbdc->client);
602adf40
YS
283out_rbdc:
284 kfree(rbdc);
285out_opt:
28f259b7
VK
286 if (opt)
287 ceph_destroy_options(opt);
288 return ERR_PTR(ret);
602adf40
YS
289}
290
291/*
292 * Find a ceph client with specific addr and configuration.
293 */
294static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
295{
296 struct rbd_client *client_node;
297
298 if (opt->flags & CEPH_OPT_NOSHARE)
299 return NULL;
300
301 list_for_each_entry(client_node, &rbd_client_list, node)
302 if (ceph_compare_options(opt, client_node->client) == 0)
303 return client_node;
304 return NULL;
305}
306
59c2be1e
YS
307/*
308 * mount options
309 */
310enum {
311 Opt_notify_timeout,
312 Opt_last_int,
313 /* int args above */
314 Opt_last_string,
315 /* string args above */
316};
317
318static match_table_t rbdopt_tokens = {
319 {Opt_notify_timeout, "notify_timeout=%d"},
320 /* int args above */
321 /* string args above */
322 {-1, NULL}
323};
324
325static int parse_rbd_opts_token(char *c, void *private)
326{
327 struct rbd_options *rbdopt = private;
328 substring_t argstr[MAX_OPT_ARGS];
329 int token, intval, ret;
330
331 token = match_token((char *)c, rbdopt_tokens, argstr);
332 if (token < 0)
333 return -EINVAL;
334
335 if (token < Opt_last_int) {
336 ret = match_int(&argstr[0], &intval);
337 if (ret < 0) {
338 pr_err("bad mount option arg (not int) "
339 "at '%s'\n", c);
340 return ret;
341 }
342 dout("got int token %d val %d\n", token, intval);
343 } else if (token > Opt_last_int && token < Opt_last_string) {
344 dout("got string token %d val %s\n", token,
345 argstr[0].from);
346 } else {
347 dout("got token %d\n", token);
348 }
349
350 switch (token) {
351 case Opt_notify_timeout:
352 rbdopt->notify_timeout = intval;
353 break;
354 default:
355 BUG_ON(token);
356 }
357 return 0;
358}
359
602adf40
YS
360/*
361 * Get a ceph client with specific addr and configuration, if one does
362 * not exist create it.
363 */
364static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
365 char *options)
366{
367 struct rbd_client *rbdc;
368 struct ceph_options *opt;
369 int ret;
59c2be1e
YS
370 struct rbd_options *rbd_opts;
371
372 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
373 if (!rbd_opts)
374 return -ENOMEM;
375
376 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40
YS
377
378 ret = ceph_parse_options(&opt, options, mon_addr,
59c2be1e 379 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
602adf40 380 if (ret < 0)
59c2be1e 381 goto done_err;
602adf40
YS
382
383 spin_lock(&node_lock);
384 rbdc = __rbd_client_find(opt);
385 if (rbdc) {
386 ceph_destroy_options(opt);
387
388 /* using an existing client */
389 kref_get(&rbdc->kref);
390 rbd_dev->rbd_client = rbdc;
391 rbd_dev->client = rbdc->client;
392 spin_unlock(&node_lock);
393 return 0;
394 }
395 spin_unlock(&node_lock);
396
59c2be1e
YS
397 rbdc = rbd_client_create(opt, rbd_opts);
398 if (IS_ERR(rbdc)) {
399 ret = PTR_ERR(rbdc);
400 goto done_err;
401 }
602adf40
YS
402
403 rbd_dev->rbd_client = rbdc;
404 rbd_dev->client = rbdc->client;
405 return 0;
59c2be1e
YS
406done_err:
407 kfree(rbd_opts);
408 return ret;
602adf40
YS
409}
410
411/*
412 * Destroy ceph client
413 */
414static void rbd_client_release(struct kref *kref)
415{
416 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
417
418 dout("rbd_release_client %p\n", rbdc);
419 spin_lock(&node_lock);
420 list_del(&rbdc->node);
421 spin_unlock(&node_lock);
422
423 ceph_destroy_client(rbdc->client);
59c2be1e 424 kfree(rbdc->rbd_opts);
602adf40
YS
425 kfree(rbdc);
426}
427
428/*
429 * Drop reference to ceph client node. If it's not referenced anymore, release
430 * it.
431 */
432static void rbd_put_client(struct rbd_device *rbd_dev)
433{
434 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435 rbd_dev->rbd_client = NULL;
436 rbd_dev->client = NULL;
437}
438
1fec7093
YS
439/*
440 * Destroy requests collection
441 */
442static void rbd_coll_release(struct kref *kref)
443{
444 struct rbd_req_coll *coll =
445 container_of(kref, struct rbd_req_coll, kref);
446
447 dout("rbd_coll_release %p\n", coll);
448 kfree(coll);
449}
602adf40
YS
450
451/*
452 * Create a new header structure, translate header format from the on-disk
453 * header.
454 */
455static int rbd_header_from_disk(struct rbd_image_header *header,
456 struct rbd_image_header_ondisk *ondisk,
457 int allocated_snaps,
458 gfp_t gfp_flags)
459{
460 int i;
461 u32 snap_count = le32_to_cpu(ondisk->snap_count);
462 int ret = -ENOMEM;
463
464 init_rwsem(&header->snap_rwsem);
602adf40
YS
465 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
467 snap_count *
468 sizeof(struct rbd_image_snap_ondisk),
469 gfp_flags);
470 if (!header->snapc)
471 return -ENOMEM;
472 if (snap_count) {
473 header->snap_names = kmalloc(header->snap_names_len,
474 GFP_KERNEL);
475 if (!header->snap_names)
476 goto err_snapc;
477 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478 GFP_KERNEL);
479 if (!header->snap_sizes)
480 goto err_names;
481 } else {
482 header->snap_names = NULL;
483 header->snap_sizes = NULL;
484 }
485 memcpy(header->block_name, ondisk->block_name,
486 sizeof(ondisk->block_name));
487
488 header->image_size = le64_to_cpu(ondisk->image_size);
489 header->obj_order = ondisk->options.order;
490 header->crypt_type = ondisk->options.crypt_type;
491 header->comp_type = ondisk->options.comp_type;
492
493 atomic_set(&header->snapc->nref, 1);
494 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495 header->snapc->num_snaps = snap_count;
496 header->total_snaps = snap_count;
497
498 if (snap_count &&
499 allocated_snaps == snap_count) {
500 for (i = 0; i < snap_count; i++) {
501 header->snapc->snaps[i] =
502 le64_to_cpu(ondisk->snaps[i].id);
503 header->snap_sizes[i] =
504 le64_to_cpu(ondisk->snaps[i].image_size);
505 }
506
507 /* copy snapshot names */
508 memcpy(header->snap_names, &ondisk->snaps[i],
509 header->snap_names_len);
510 }
511
512 return 0;
513
514err_names:
515 kfree(header->snap_names);
516err_snapc:
517 kfree(header->snapc);
518 return ret;
519}
520
521static int snap_index(struct rbd_image_header *header, int snap_num)
522{
523 return header->total_snaps - snap_num;
524}
525
526static u64 cur_snap_id(struct rbd_device *rbd_dev)
527{
528 struct rbd_image_header *header = &rbd_dev->header;
529
530 if (!rbd_dev->cur_snap)
531 return 0;
532
533 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
534}
535
536static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
537 u64 *seq, u64 *size)
538{
539 int i;
540 char *p = header->snap_names;
541
542 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543 if (strcmp(snap_name, p) == 0)
544 break;
545 }
546 if (i == header->total_snaps)
547 return -ENOENT;
548 if (seq)
549 *seq = header->snapc->snaps[i];
550
551 if (size)
552 *size = header->snap_sizes[i];
553
554 return i;
555}
556
557static int rbd_header_set_snap(struct rbd_device *dev,
558 const char *snap_name,
559 u64 *size)
560{
561 struct rbd_image_header *header = &dev->header;
562 struct ceph_snap_context *snapc = header->snapc;
563 int ret = -ENOENT;
564
565 down_write(&header->snap_rwsem);
566
567 if (!snap_name ||
568 !*snap_name ||
569 strcmp(snap_name, "-") == 0 ||
570 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571 if (header->total_snaps)
572 snapc->seq = header->snap_seq;
573 else
574 snapc->seq = 0;
575 dev->cur_snap = 0;
576 dev->read_only = 0;
577 if (size)
578 *size = header->image_size;
579 } else {
580 ret = snap_by_name(header, snap_name, &snapc->seq, size);
581 if (ret < 0)
582 goto done;
583
584 dev->cur_snap = header->total_snaps - ret;
585 dev->read_only = 1;
586 }
587
588 ret = 0;
589done:
590 up_write(&header->snap_rwsem);
591 return ret;
592}
593
594static void rbd_header_free(struct rbd_image_header *header)
595{
596 kfree(header->snapc);
597 kfree(header->snap_names);
598 kfree(header->snap_sizes);
599}
600
601/*
602 * get the actual striped segment name, offset and length
603 */
604static u64 rbd_get_segment(struct rbd_image_header *header,
605 const char *block_name,
606 u64 ofs, u64 len,
607 char *seg_name, u64 *segofs)
608{
609 u64 seg = ofs >> header->obj_order;
610
611 if (seg_name)
612 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613 "%s.%012llx", block_name, seg);
614
615 ofs = ofs & ((1 << header->obj_order) - 1);
616 len = min_t(u64, len, (1 << header->obj_order) - ofs);
617
618 if (segofs)
619 *segofs = ofs;
620
621 return len;
622}
623
1fec7093
YS
624static int rbd_get_num_segments(struct rbd_image_header *header,
625 u64 ofs, u64 len)
626{
627 u64 start_seg = ofs >> header->obj_order;
628 u64 end_seg = (ofs + len - 1) >> header->obj_order;
629 return end_seg - start_seg + 1;
630}
631
602adf40
YS
632/*
633 * bio helpers
634 */
635
636static void bio_chain_put(struct bio *chain)
637{
638 struct bio *tmp;
639
640 while (chain) {
641 tmp = chain;
642 chain = chain->bi_next;
643 bio_put(tmp);
644 }
645}
646
647/*
648 * zeros a bio chain, starting at specific offset
649 */
650static void zero_bio_chain(struct bio *chain, int start_ofs)
651{
652 struct bio_vec *bv;
653 unsigned long flags;
654 void *buf;
655 int i;
656 int pos = 0;
657
658 while (chain) {
659 bio_for_each_segment(bv, chain, i) {
660 if (pos + bv->bv_len > start_ofs) {
661 int remainder = max(start_ofs - pos, 0);
662 buf = bvec_kmap_irq(bv, &flags);
663 memset(buf + remainder, 0,
664 bv->bv_len - remainder);
85b5aaa6 665 bvec_kunmap_irq(buf, &flags);
602adf40
YS
666 }
667 pos += bv->bv_len;
668 }
669
670 chain = chain->bi_next;
671 }
672}
673
674/*
675 * bio_chain_clone - clone a chain of bios up to a certain length.
676 * might return a bio_pair that will need to be released.
677 */
678static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
679 struct bio_pair **bp,
680 int len, gfp_t gfpmask)
681{
682 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
683 int total = 0;
684
685 if (*bp) {
686 bio_pair_release(*bp);
687 *bp = NULL;
688 }
689
690 while (old_chain && (total < len)) {
691 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
692 if (!tmp)
693 goto err_out;
694
695 if (total + old_chain->bi_size > len) {
696 struct bio_pair *bp;
697
698 /*
699 * this split can only happen with a single paged bio,
700 * split_bio will BUG_ON if this is not the case
701 */
702 dout("bio_chain_clone split! total=%d remaining=%d"
703 "bi_size=%d\n",
704 (int)total, (int)len-total,
705 (int)old_chain->bi_size);
706
707 /* split the bio. We'll release it either in the next
708 call, or it will have to be released outside */
709 bp = bio_split(old_chain, (len - total) / 512ULL);
710 if (!bp)
711 goto err_out;
712
713 __bio_clone(tmp, &bp->bio1);
714
715 *next = &bp->bio2;
716 } else {
717 __bio_clone(tmp, old_chain);
718 *next = old_chain->bi_next;
719 }
720
721 tmp->bi_bdev = NULL;
722 gfpmask &= ~__GFP_WAIT;
723 tmp->bi_next = NULL;
724
725 if (!new_chain) {
726 new_chain = tail = tmp;
727 } else {
728 tail->bi_next = tmp;
729 tail = tmp;
730 }
731 old_chain = old_chain->bi_next;
732
733 total += tmp->bi_size;
734 }
735
736 BUG_ON(total < len);
737
738 if (tail)
739 tail->bi_next = NULL;
740
741 *old = old_chain;
742
743 return new_chain;
744
745err_out:
746 dout("bio_chain_clone with err\n");
747 bio_chain_put(new_chain);
748 return NULL;
749}
750
751/*
752 * helpers for osd request op vectors.
753 */
754static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
755 int num_ops,
756 int opcode,
757 u32 payload_len)
758{
759 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
760 GFP_NOIO);
761 if (!*ops)
762 return -ENOMEM;
763 (*ops)[0].op = opcode;
764 /*
765 * op extent offset and length will be set later on
766 * in calc_raw_layout()
767 */
768 (*ops)[0].payload_len = payload_len;
769 return 0;
770}
771
772static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
773{
774 kfree(ops);
775}
776
1fec7093
YS
777static void rbd_coll_end_req_index(struct request *rq,
778 struct rbd_req_coll *coll,
779 int index,
780 int ret, u64 len)
781{
782 struct request_queue *q;
783 int min, max, i;
784
785 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
786 coll, index, ret, len);
787
788 if (!rq)
789 return;
790
791 if (!coll) {
792 blk_end_request(rq, ret, len);
793 return;
794 }
795
796 q = rq->q;
797
798 spin_lock_irq(q->queue_lock);
799 coll->status[index].done = 1;
800 coll->status[index].rc = ret;
801 coll->status[index].bytes = len;
802 max = min = coll->num_done;
803 while (max < coll->total && coll->status[max].done)
804 max++;
805
806 for (i = min; i<max; i++) {
807 __blk_end_request(rq, coll->status[i].rc,
808 coll->status[i].bytes);
809 coll->num_done++;
810 kref_put(&coll->kref, rbd_coll_release);
811 }
812 spin_unlock_irq(q->queue_lock);
813}
814
815static void rbd_coll_end_req(struct rbd_request *req,
816 int ret, u64 len)
817{
818 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
819}
820
602adf40
YS
821/*
822 * Send ceph osd request
823 */
824static int rbd_do_request(struct request *rq,
825 struct rbd_device *dev,
826 struct ceph_snap_context *snapc,
827 u64 snapid,
828 const char *obj, u64 ofs, u64 len,
829 struct bio *bio,
830 struct page **pages,
831 int num_pages,
832 int flags,
833 struct ceph_osd_req_op *ops,
834 int num_reply,
1fec7093
YS
835 struct rbd_req_coll *coll,
836 int coll_index,
602adf40 837 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
838 struct ceph_msg *msg),
839 struct ceph_osd_request **linger_req,
840 u64 *ver)
602adf40
YS
841{
842 struct ceph_osd_request *req;
843 struct ceph_file_layout *layout;
844 int ret;
845 u64 bno;
846 struct timespec mtime = CURRENT_TIME;
847 struct rbd_request *req_data;
848 struct ceph_osd_request_head *reqhead;
849 struct rbd_image_header *header = &dev->header;
850
602adf40 851 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
852 if (!req_data) {
853 if (coll)
854 rbd_coll_end_req_index(rq, coll, coll_index,
855 -ENOMEM, len);
856 return -ENOMEM;
857 }
858
859 if (coll) {
860 req_data->coll = coll;
861 req_data->coll_index = coll_index;
862 }
602adf40 863
1fec7093 864 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
602adf40
YS
865
866 down_read(&header->snap_rwsem);
867
868 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
869 snapc,
870 ops,
871 false,
872 GFP_NOIO, pages, bio);
4ad12621 873 if (!req) {
602adf40 874 up_read(&header->snap_rwsem);
4ad12621 875 ret = -ENOMEM;
602adf40
YS
876 goto done_pages;
877 }
878
879 req->r_callback = rbd_cb;
880
881 req_data->rq = rq;
882 req_data->bio = bio;
883 req_data->pages = pages;
884 req_data->len = len;
885
886 req->r_priv = req_data;
887
888 reqhead = req->r_request->front.iov_base;
889 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
890
891 strncpy(req->r_oid, obj, sizeof(req->r_oid));
892 req->r_oid_len = strlen(req->r_oid);
893
894 layout = &req->r_file_layout;
895 memset(layout, 0, sizeof(*layout));
896 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
897 layout->fl_stripe_count = cpu_to_le32(1);
898 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
899 layout->fl_pg_preferred = cpu_to_le32(-1);
900 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
901 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
902 ofs, &len, &bno, req, ops);
903
904 ceph_osdc_build_request(req, ofs, &len,
905 ops,
906 snapc,
907 &mtime,
908 req->r_oid, req->r_oid_len);
909 up_read(&header->snap_rwsem);
910
59c2be1e
YS
911 if (linger_req) {
912 ceph_osdc_set_request_linger(&dev->client->osdc, req);
913 *linger_req = req;
914 }
915
602adf40
YS
916 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
917 if (ret < 0)
918 goto done_err;
919
920 if (!rbd_cb) {
921 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
59c2be1e
YS
922 if (ver)
923 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
924 dout("reassert_ver=%lld\n",
925 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
926 ceph_osdc_put_request(req);
927 }
928 return ret;
929
930done_err:
931 bio_chain_put(req_data->bio);
932 ceph_osdc_put_request(req);
933done_pages:
1fec7093 934 rbd_coll_end_req(req_data, ret, len);
602adf40 935 kfree(req_data);
602adf40
YS
936 return ret;
937}
938
939/*
940 * Ceph osd op callback
941 */
942static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
943{
944 struct rbd_request *req_data = req->r_priv;
945 struct ceph_osd_reply_head *replyhead;
946 struct ceph_osd_op *op;
947 __s32 rc;
948 u64 bytes;
949 int read_op;
950
951 /* parse reply */
952 replyhead = msg->front.iov_base;
953 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
954 op = (void *)(replyhead + 1);
955 rc = le32_to_cpu(replyhead->result);
956 bytes = le64_to_cpu(op->extent.length);
957 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
958
959 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
960
961 if (rc == -ENOENT && read_op) {
962 zero_bio_chain(req_data->bio, 0);
963 rc = 0;
964 } else if (rc == 0 && read_op && bytes < req_data->len) {
965 zero_bio_chain(req_data->bio, bytes);
966 bytes = req_data->len;
967 }
968
1fec7093 969 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
970
971 if (req_data->bio)
972 bio_chain_put(req_data->bio);
973
974 ceph_osdc_put_request(req);
975 kfree(req_data);
976}
977
59c2be1e
YS
978static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
979{
980 ceph_osdc_put_request(req);
981}
982
602adf40
YS
983/*
984 * Do a synchronous ceph osd operation
985 */
986static int rbd_req_sync_op(struct rbd_device *dev,
987 struct ceph_snap_context *snapc,
988 u64 snapid,
989 int opcode,
990 int flags,
991 struct ceph_osd_req_op *orig_ops,
992 int num_reply,
993 const char *obj,
994 u64 ofs, u64 len,
59c2be1e
YS
995 char *buf,
996 struct ceph_osd_request **linger_req,
997 u64 *ver)
602adf40
YS
998{
999 int ret;
1000 struct page **pages;
1001 int num_pages;
1002 struct ceph_osd_req_op *ops = orig_ops;
1003 u32 payload_len;
1004
1005 num_pages = calc_pages_for(ofs , len);
1006 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1007 if (IS_ERR(pages))
1008 return PTR_ERR(pages);
602adf40
YS
1009
1010 if (!orig_ops) {
1011 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1012 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1013 if (ret < 0)
1014 goto done;
1015
1016 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1017 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1018 if (ret < 0)
1019 goto done_ops;
1020 }
1021 }
1022
1023 ret = rbd_do_request(NULL, dev, snapc, snapid,
1024 obj, ofs, len, NULL,
1025 pages, num_pages,
1026 flags,
1027 ops,
1028 2,
1fec7093 1029 NULL, 0,
59c2be1e
YS
1030 NULL,
1031 linger_req, ver);
602adf40
YS
1032 if (ret < 0)
1033 goto done_ops;
1034
1035 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1036 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1037
1038done_ops:
1039 if (!orig_ops)
1040 rbd_destroy_ops(ops);
1041done:
1042 ceph_release_page_vector(pages, num_pages);
1043 return ret;
1044}
1045
1046/*
1047 * Do an asynchronous ceph osd operation
1048 */
1049static int rbd_do_op(struct request *rq,
1050 struct rbd_device *rbd_dev ,
1051 struct ceph_snap_context *snapc,
1052 u64 snapid,
1053 int opcode, int flags, int num_reply,
1054 u64 ofs, u64 len,
1fec7093
YS
1055 struct bio *bio,
1056 struct rbd_req_coll *coll,
1057 int coll_index)
602adf40
YS
1058{
1059 char *seg_name;
1060 u64 seg_ofs;
1061 u64 seg_len;
1062 int ret;
1063 struct ceph_osd_req_op *ops;
1064 u32 payload_len;
1065
1066 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1067 if (!seg_name)
1068 return -ENOMEM;
1069
1070 seg_len = rbd_get_segment(&rbd_dev->header,
1071 rbd_dev->header.block_name,
1072 ofs, len,
1073 seg_name, &seg_ofs);
602adf40
YS
1074
1075 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1076
1077 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1078 if (ret < 0)
1079 goto done;
1080
1081 /* we've taken care of segment sizes earlier when we
1082 cloned the bios. We should never have a segment
1083 truncated at this point */
1084 BUG_ON(seg_len < len);
1085
1086 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1087 seg_name, seg_ofs, seg_len,
1088 bio,
1089 NULL, 0,
1090 flags,
1091 ops,
1092 num_reply,
1fec7093 1093 coll, coll_index,
59c2be1e 1094 rbd_req_cb, 0, NULL);
11f77002
SW
1095
1096 rbd_destroy_ops(ops);
602adf40
YS
1097done:
1098 kfree(seg_name);
1099 return ret;
1100}
1101
1102/*
1103 * Request async osd write
1104 */
1105static int rbd_req_write(struct request *rq,
1106 struct rbd_device *rbd_dev,
1107 struct ceph_snap_context *snapc,
1108 u64 ofs, u64 len,
1fec7093
YS
1109 struct bio *bio,
1110 struct rbd_req_coll *coll,
1111 int coll_index)
602adf40
YS
1112{
1113 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1114 CEPH_OSD_OP_WRITE,
1115 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1116 2,
1fec7093 1117 ofs, len, bio, coll, coll_index);
602adf40
YS
1118}
1119
1120/*
1121 * Request async osd read
1122 */
1123static int rbd_req_read(struct request *rq,
1124 struct rbd_device *rbd_dev,
1125 u64 snapid,
1126 u64 ofs, u64 len,
1fec7093
YS
1127 struct bio *bio,
1128 struct rbd_req_coll *coll,
1129 int coll_index)
602adf40
YS
1130{
1131 return rbd_do_op(rq, rbd_dev, NULL,
1132 (snapid ? snapid : CEPH_NOSNAP),
1133 CEPH_OSD_OP_READ,
1134 CEPH_OSD_FLAG_READ,
1135 2,
1fec7093 1136 ofs, len, bio, coll, coll_index);
602adf40
YS
1137}
1138
1139/*
1140 * Request sync osd read
1141 */
1142static int rbd_req_sync_read(struct rbd_device *dev,
1143 struct ceph_snap_context *snapc,
1144 u64 snapid,
1145 const char *obj,
1146 u64 ofs, u64 len,
59c2be1e
YS
1147 char *buf,
1148 u64 *ver)
602adf40
YS
1149{
1150 return rbd_req_sync_op(dev, NULL,
1151 (snapid ? snapid : CEPH_NOSNAP),
1152 CEPH_OSD_OP_READ,
1153 CEPH_OSD_FLAG_READ,
1154 NULL,
59c2be1e 1155 1, obj, ofs, len, buf, NULL, ver);
602adf40
YS
1156}
1157
1158/*
59c2be1e
YS
1159 * Request sync osd watch
1160 */
1161static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1162 u64 ver,
1163 u64 notify_id,
1164 const char *obj)
1165{
1166 struct ceph_osd_req_op *ops;
1167 struct page **pages = NULL;
11f77002
SW
1168 int ret;
1169
1170 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1171 if (ret < 0)
1172 return ret;
1173
1174 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1175 ops[0].watch.cookie = notify_id;
1176 ops[0].watch.flag = 0;
1177
1178 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1179 obj, 0, 0, NULL,
1180 pages, 0,
1181 CEPH_OSD_FLAG_READ,
1182 ops,
1183 1,
1fec7093 1184 NULL, 0,
59c2be1e
YS
1185 rbd_simple_req_cb, 0, NULL);
1186
1187 rbd_destroy_ops(ops);
1188 return ret;
1189}
1190
1191static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1192{
1193 struct rbd_device *dev = (struct rbd_device *)data;
13143d2d
SW
1194 int rc;
1195
59c2be1e
YS
1196 if (!dev)
1197 return;
1198
1199 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1200 notify_id, (int)opcode);
1201 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
13143d2d 1202 rc = __rbd_update_snaps(dev);
59c2be1e 1203 mutex_unlock(&ctl_mutex);
13143d2d
SW
1204 if (rc)
1205 pr_warning(DRV_NAME "%d got notification but failed to update"
1206 " snaps: %d\n", dev->major, rc);
59c2be1e
YS
1207
1208 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1209}
1210
1211/*
1212 * Request sync osd watch
1213 */
1214static int rbd_req_sync_watch(struct rbd_device *dev,
1215 const char *obj,
1216 u64 ver)
1217{
1218 struct ceph_osd_req_op *ops;
1219 struct ceph_osd_client *osdc = &dev->client->osdc;
1220
1221 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1222 if (ret < 0)
1223 return ret;
1224
1225 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1226 (void *)dev, &dev->watch_event);
1227 if (ret < 0)
1228 goto fail;
1229
1230 ops[0].watch.ver = cpu_to_le64(ver);
1231 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1232 ops[0].watch.flag = 1;
1233
1234 ret = rbd_req_sync_op(dev, NULL,
1235 CEPH_NOSNAP,
1236 0,
1237 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1238 ops,
1239 1, obj, 0, 0, NULL,
1240 &dev->watch_request, NULL);
1241
1242 if (ret < 0)
1243 goto fail_event;
1244
1245 rbd_destroy_ops(ops);
1246 return 0;
1247
1248fail_event:
1249 ceph_osdc_cancel_event(dev->watch_event);
1250 dev->watch_event = NULL;
1251fail:
1252 rbd_destroy_ops(ops);
1253 return ret;
1254}
1255
1256struct rbd_notify_info {
1257 struct rbd_device *dev;
1258};
1259
1260static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1261{
1262 struct rbd_device *dev = (struct rbd_device *)data;
1263 if (!dev)
1264 return;
1265
1266 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1267 notify_id, (int)opcode);
1268}
1269
1270/*
1271 * Request sync osd notify
1272 */
1273static int rbd_req_sync_notify(struct rbd_device *dev,
1274 const char *obj)
1275{
1276 struct ceph_osd_req_op *ops;
1277 struct ceph_osd_client *osdc = &dev->client->osdc;
1278 struct ceph_osd_event *event;
1279 struct rbd_notify_info info;
1280 int payload_len = sizeof(u32) + sizeof(u32);
1281 int ret;
1282
1283 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1284 if (ret < 0)
1285 return ret;
1286
1287 info.dev = dev;
1288
1289 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1290 (void *)&info, &event);
1291 if (ret < 0)
1292 goto fail;
1293
1294 ops[0].watch.ver = 1;
1295 ops[0].watch.flag = 1;
1296 ops[0].watch.cookie = event->cookie;
1297 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1298 ops[0].watch.timeout = 12;
1299
1300 ret = rbd_req_sync_op(dev, NULL,
1301 CEPH_NOSNAP,
1302 0,
1303 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1304 ops,
1305 1, obj, 0, 0, NULL, NULL, NULL);
1306 if (ret < 0)
1307 goto fail_event;
1308
1309 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1310 dout("ceph_osdc_wait_event returned %d\n", ret);
1311 rbd_destroy_ops(ops);
1312 return 0;
1313
1314fail_event:
1315 ceph_osdc_cancel_event(event);
1316fail:
1317 rbd_destroy_ops(ops);
1318 return ret;
1319}
1320
1321/*
1322 * Request sync osd rollback
602adf40
YS
1323 */
1324static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1325 u64 snapid,
1326 const char *obj)
1327{
1328 struct ceph_osd_req_op *ops;
1329 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1330 if (ret < 0)
1331 return ret;
1332
1333 ops[0].snap.snapid = snapid;
1334
1335 ret = rbd_req_sync_op(dev, NULL,
1336 CEPH_NOSNAP,
1337 0,
1338 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1339 ops,
59c2be1e 1340 1, obj, 0, 0, NULL, NULL, NULL);
602adf40
YS
1341
1342 rbd_destroy_ops(ops);
1343
602adf40
YS
1344 return ret;
1345}
1346
1347/*
1348 * Request sync osd read
1349 */
1350static int rbd_req_sync_exec(struct rbd_device *dev,
1351 const char *obj,
1352 const char *cls,
1353 const char *method,
1354 const char *data,
59c2be1e
YS
1355 int len,
1356 u64 *ver)
602adf40
YS
1357{
1358 struct ceph_osd_req_op *ops;
1359 int cls_len = strlen(cls);
1360 int method_len = strlen(method);
1361 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1362 cls_len + method_len + len);
1363 if (ret < 0)
1364 return ret;
1365
1366 ops[0].cls.class_name = cls;
1367 ops[0].cls.class_len = (__u8)cls_len;
1368 ops[0].cls.method_name = method;
1369 ops[0].cls.method_len = (__u8)method_len;
1370 ops[0].cls.argc = 0;
1371 ops[0].cls.indata = data;
1372 ops[0].cls.indata_len = len;
1373
1374 ret = rbd_req_sync_op(dev, NULL,
1375 CEPH_NOSNAP,
1376 0,
1377 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1378 ops,
59c2be1e 1379 1, obj, 0, 0, NULL, NULL, ver);
602adf40
YS
1380
1381 rbd_destroy_ops(ops);
1382
1383 dout("cls_exec returned %d\n", ret);
1384 return ret;
1385}
1386
1fec7093
YS
1387static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1388{
1389 struct rbd_req_coll *coll =
1390 kzalloc(sizeof(struct rbd_req_coll) +
1391 sizeof(struct rbd_req_status) * num_reqs,
1392 GFP_ATOMIC);
1393
1394 if (!coll)
1395 return NULL;
1396 coll->total = num_reqs;
1397 kref_init(&coll->kref);
1398 return coll;
1399}
1400
602adf40
YS
1401/*
1402 * block device queue callback
1403 */
1404static void rbd_rq_fn(struct request_queue *q)
1405{
1406 struct rbd_device *rbd_dev = q->queuedata;
1407 struct request *rq;
1408 struct bio_pair *bp = NULL;
1409
1410 rq = blk_fetch_request(q);
1411
1412 while (1) {
1413 struct bio *bio;
1414 struct bio *rq_bio, *next_bio = NULL;
1415 bool do_write;
1416 int size, op_size = 0;
1417 u64 ofs;
1fec7093
YS
1418 int num_segs, cur_seg = 0;
1419 struct rbd_req_coll *coll;
602adf40
YS
1420
1421 /* peek at request from block layer */
1422 if (!rq)
1423 break;
1424
1425 dout("fetched request\n");
1426
1427 /* filter out block requests we don't understand */
1428 if ((rq->cmd_type != REQ_TYPE_FS)) {
1429 __blk_end_request_all(rq, 0);
1430 goto next;
1431 }
1432
1433 /* deduce our operation (read, write) */
1434 do_write = (rq_data_dir(rq) == WRITE);
1435
1436 size = blk_rq_bytes(rq);
1437 ofs = blk_rq_pos(rq) * 512ULL;
1438 rq_bio = rq->bio;
1439 if (do_write && rbd_dev->read_only) {
1440 __blk_end_request_all(rq, -EROFS);
1441 goto next;
1442 }
1443
1444 spin_unlock_irq(q->queue_lock);
1445
1446 dout("%s 0x%x bytes at 0x%llx\n",
1447 do_write ? "write" : "read",
1448 size, blk_rq_pos(rq) * 512ULL);
1449
1fec7093
YS
1450 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1451 coll = rbd_alloc_coll(num_segs);
1452 if (!coll) {
1453 spin_lock_irq(q->queue_lock);
1454 __blk_end_request_all(rq, -ENOMEM);
1455 goto next;
1456 }
1457
602adf40
YS
1458 do {
1459 /* a bio clone to be passed down to OSD req */
1460 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1461 op_size = rbd_get_segment(&rbd_dev->header,
1462 rbd_dev->header.block_name,
1463 ofs, size,
1464 NULL, NULL);
1fec7093 1465 kref_get(&coll->kref);
602adf40
YS
1466 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1467 op_size, GFP_ATOMIC);
1468 if (!bio) {
1fec7093
YS
1469 rbd_coll_end_req_index(rq, coll, cur_seg,
1470 -ENOMEM, op_size);
1471 goto next_seg;
602adf40
YS
1472 }
1473
1fec7093 1474
602adf40
YS
1475 /* init OSD command: write or read */
1476 if (do_write)
1477 rbd_req_write(rq, rbd_dev,
1478 rbd_dev->header.snapc,
1479 ofs,
1fec7093
YS
1480 op_size, bio,
1481 coll, cur_seg);
602adf40
YS
1482 else
1483 rbd_req_read(rq, rbd_dev,
1484 cur_snap_id(rbd_dev),
1485 ofs,
1fec7093
YS
1486 op_size, bio,
1487 coll, cur_seg);
602adf40 1488
1fec7093 1489next_seg:
602adf40
YS
1490 size -= op_size;
1491 ofs += op_size;
1492
1fec7093 1493 cur_seg++;
602adf40
YS
1494 rq_bio = next_bio;
1495 } while (size > 0);
1fec7093 1496 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1497
1498 if (bp)
1499 bio_pair_release(bp);
602adf40
YS
1500 spin_lock_irq(q->queue_lock);
1501next:
1502 rq = blk_fetch_request(q);
1503 }
1504}
1505
1506/*
1507 * a queue callback. Makes sure that we don't create a bio that spans across
1508 * multiple osd objects. One exception would be with a single page bios,
1509 * which we handle later at bio_chain_clone
1510 */
1511static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1512 struct bio_vec *bvec)
1513{
1514 struct rbd_device *rbd_dev = q->queuedata;
1515 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1516 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1517 unsigned int bio_sectors = bmd->bi_size >> 9;
1518 int max;
1519
1520 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1521 + bio_sectors)) << 9;
1522 if (max < 0)
1523 max = 0; /* bio_add cannot handle a negative return */
1524 if (max <= bvec->bv_len && bio_sectors == 0)
1525 return bvec->bv_len;
1526 return max;
1527}
1528
1529static void rbd_free_disk(struct rbd_device *rbd_dev)
1530{
1531 struct gendisk *disk = rbd_dev->disk;
1532
1533 if (!disk)
1534 return;
1535
1536 rbd_header_free(&rbd_dev->header);
1537
1538 if (disk->flags & GENHD_FL_UP)
1539 del_gendisk(disk);
1540 if (disk->queue)
1541 blk_cleanup_queue(disk->queue);
1542 put_disk(disk);
1543}
1544
1545/*
1546 * reload the ondisk the header
1547 */
1548static int rbd_read_header(struct rbd_device *rbd_dev,
1549 struct rbd_image_header *header)
1550{
1551 ssize_t rc;
1552 struct rbd_image_header_ondisk *dh;
1553 int snap_count = 0;
1554 u64 snap_names_len = 0;
59c2be1e 1555 u64 ver;
602adf40
YS
1556
1557 while (1) {
1558 int len = sizeof(*dh) +
1559 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1560 snap_names_len;
1561
1562 rc = -ENOMEM;
1563 dh = kmalloc(len, GFP_KERNEL);
1564 if (!dh)
1565 return -ENOMEM;
1566
1567 rc = rbd_req_sync_read(rbd_dev,
1568 NULL, CEPH_NOSNAP,
1569 rbd_dev->obj_md_name,
1570 0, len,
59c2be1e 1571 (char *)dh, &ver);
602adf40
YS
1572 if (rc < 0)
1573 goto out_dh;
1574
1575 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1576 if (rc < 0)
1577 goto out_dh;
1578
1579 if (snap_count != header->total_snaps) {
1580 snap_count = header->total_snaps;
1581 snap_names_len = header->snap_names_len;
1582 rbd_header_free(header);
1583 kfree(dh);
1584 continue;
1585 }
1586 break;
1587 }
59c2be1e 1588 header->obj_version = ver;
602adf40
YS
1589
1590out_dh:
1591 kfree(dh);
1592 return rc;
1593}
1594
1595/*
1596 * create a snapshot
1597 */
1598static int rbd_header_add_snap(struct rbd_device *dev,
1599 const char *snap_name,
1600 gfp_t gfp_flags)
1601{
1602 int name_len = strlen(snap_name);
1603 u64 new_snapid;
1604 int ret;
916d4d67 1605 void *data, *p, *e;
59c2be1e 1606 u64 ver;
602adf40
YS
1607
1608 /* we should create a snapshot only if we're pointing at the head */
1609 if (dev->cur_snap)
1610 return -EINVAL;
1611
1612 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1613 &new_snapid);
1614 dout("created snapid=%lld\n", new_snapid);
1615 if (ret < 0)
1616 return ret;
1617
1618 data = kmalloc(name_len + 16, gfp_flags);
1619 if (!data)
1620 return -ENOMEM;
1621
916d4d67
SW
1622 p = data;
1623 e = data + name_len + 16;
602adf40 1624
916d4d67
SW
1625 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1626 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40
YS
1627
1628 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
916d4d67 1629 data, p - data, &ver);
602adf40 1630
916d4d67 1631 kfree(data);
602adf40
YS
1632
1633 if (ret < 0)
1634 return ret;
1635
1636 dev->header.snapc->seq = new_snapid;
1637
1638 return 0;
1639bad:
1640 return -ERANGE;
1641}
1642
dfc5606d
YS
1643static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1644{
1645 struct rbd_snap *snap;
1646
1647 while (!list_empty(&rbd_dev->snaps)) {
1648 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1649 __rbd_remove_snap_dev(rbd_dev, snap);
1650 }
1651}
1652
602adf40
YS
1653/*
1654 * only read the first part of the ondisk header, without the snaps info
1655 */
dfc5606d 1656static int __rbd_update_snaps(struct rbd_device *rbd_dev)
602adf40
YS
1657{
1658 int ret;
1659 struct rbd_image_header h;
1660 u64 snap_seq;
59c2be1e 1661 int follow_seq = 0;
602adf40
YS
1662
1663 ret = rbd_read_header(rbd_dev, &h);
1664 if (ret < 0)
1665 return ret;
1666
1667 down_write(&rbd_dev->header.snap_rwsem);
1668
1669 snap_seq = rbd_dev->header.snapc->seq;
59c2be1e
YS
1670 if (rbd_dev->header.total_snaps &&
1671 rbd_dev->header.snapc->snaps[0] == snap_seq)
1672 /* pointing at the head, will need to follow that
1673 if head moves */
1674 follow_seq = 1;
602adf40
YS
1675
1676 kfree(rbd_dev->header.snapc);
1677 kfree(rbd_dev->header.snap_names);
1678 kfree(rbd_dev->header.snap_sizes);
1679
1680 rbd_dev->header.total_snaps = h.total_snaps;
1681 rbd_dev->header.snapc = h.snapc;
1682 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1683 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1684 rbd_dev->header.snap_sizes = h.snap_sizes;
59c2be1e
YS
1685 if (follow_seq)
1686 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1687 else
1688 rbd_dev->header.snapc->seq = snap_seq;
602adf40 1689
dfc5606d
YS
1690 ret = __rbd_init_snaps_header(rbd_dev);
1691
602adf40
YS
1692 up_write(&rbd_dev->header.snap_rwsem);
1693
dfc5606d 1694 return ret;
602adf40
YS
1695}
1696
1697static int rbd_init_disk(struct rbd_device *rbd_dev)
1698{
1699 struct gendisk *disk;
1700 struct request_queue *q;
1701 int rc;
1702 u64 total_size = 0;
1703
1704 /* contact OSD, request size info about the object being mapped */
1705 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1706 if (rc)
1707 return rc;
1708
dfc5606d
YS
1709 /* no need to lock here, as rbd_dev is not registered yet */
1710 rc = __rbd_init_snaps_header(rbd_dev);
1711 if (rc)
1712 return rc;
1713
602adf40
YS
1714 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1715 if (rc)
1716 return rc;
1717
1718 /* create gendisk info */
1719 rc = -ENOMEM;
1720 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1721 if (!disk)
1722 goto out;
1723
aedfec59
SW
1724 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1725 rbd_dev->id);
602adf40
YS
1726 disk->major = rbd_dev->major;
1727 disk->first_minor = 0;
1728 disk->fops = &rbd_bd_ops;
1729 disk->private_data = rbd_dev;
1730
1731 /* init rq */
1732 rc = -ENOMEM;
1733 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1734 if (!q)
1735 goto out_disk;
1736 blk_queue_merge_bvec(q, rbd_merge_bvec);
1737 disk->queue = q;
1738
1739 q->queuedata = rbd_dev;
1740
1741 rbd_dev->disk = disk;
1742 rbd_dev->q = q;
1743
1744 /* finally, announce the disk to the world */
1745 set_capacity(disk, total_size / 512ULL);
1746 add_disk(disk);
1747
1748 pr_info("%s: added with size 0x%llx\n",
1749 disk->disk_name, (unsigned long long)total_size);
1750 return 0;
1751
1752out_disk:
1753 put_disk(disk);
1754out:
1755 return rc;
1756}
1757
dfc5606d
YS
1758/*
1759 sysfs
1760*/
1761
1762static ssize_t rbd_size_show(struct device *dev,
1763 struct device_attribute *attr, char *buf)
1764{
1765 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1766
1767 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1768}
1769
1770static ssize_t rbd_major_show(struct device *dev,
1771 struct device_attribute *attr, char *buf)
1772{
1773 struct rbd_device *rbd_dev = dev_to_rbd(dev);
602adf40 1774
dfc5606d
YS
1775 return sprintf(buf, "%d\n", rbd_dev->major);
1776}
1777
1778static ssize_t rbd_client_id_show(struct device *dev,
1779 struct device_attribute *attr, char *buf)
602adf40 1780{
dfc5606d
YS
1781 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1782
1783 return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
602adf40
YS
1784}
1785
dfc5606d
YS
1786static ssize_t rbd_pool_show(struct device *dev,
1787 struct device_attribute *attr, char *buf)
602adf40 1788{
dfc5606d
YS
1789 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1790
1791 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1792}
1793
1794static ssize_t rbd_name_show(struct device *dev,
1795 struct device_attribute *attr, char *buf)
1796{
1797 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1798
1799 return sprintf(buf, "%s\n", rbd_dev->obj);
1800}
1801
1802static ssize_t rbd_snap_show(struct device *dev,
1803 struct device_attribute *attr,
1804 char *buf)
1805{
1806 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1807
1808 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1809}
1810
1811static ssize_t rbd_image_refresh(struct device *dev,
1812 struct device_attribute *attr,
1813 const char *buf,
1814 size_t size)
1815{
1816 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1817 int rc;
1818 int ret = size;
602adf40
YS
1819
1820 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1821
dfc5606d
YS
1822 rc = __rbd_update_snaps(rbd_dev);
1823 if (rc < 0)
1824 ret = rc;
602adf40 1825
dfc5606d
YS
1826 mutex_unlock(&ctl_mutex);
1827 return ret;
1828}
602adf40 1829
dfc5606d
YS
1830static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1831static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1832static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1833static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1834static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1835static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1836static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1837static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1838static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1839
1840static struct attribute *rbd_attrs[] = {
1841 &dev_attr_size.attr,
1842 &dev_attr_major.attr,
1843 &dev_attr_client_id.attr,
1844 &dev_attr_pool.attr,
1845 &dev_attr_name.attr,
1846 &dev_attr_current_snap.attr,
1847 &dev_attr_refresh.attr,
1848 &dev_attr_create_snap.attr,
1849 &dev_attr_rollback_snap.attr,
1850 NULL
1851};
1852
1853static struct attribute_group rbd_attr_group = {
1854 .attrs = rbd_attrs,
1855};
1856
1857static const struct attribute_group *rbd_attr_groups[] = {
1858 &rbd_attr_group,
1859 NULL
1860};
1861
1862static void rbd_sysfs_dev_release(struct device *dev)
1863{
1864}
1865
1866static struct device_type rbd_device_type = {
1867 .name = "rbd",
1868 .groups = rbd_attr_groups,
1869 .release = rbd_sysfs_dev_release,
1870};
1871
1872
1873/*
1874 sysfs - snapshots
1875*/
1876
1877static ssize_t rbd_snap_size_show(struct device *dev,
1878 struct device_attribute *attr,
1879 char *buf)
1880{
1881 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1882
1883 return sprintf(buf, "%lld\n", (long long)snap->size);
1884}
1885
1886static ssize_t rbd_snap_id_show(struct device *dev,
1887 struct device_attribute *attr,
1888 char *buf)
1889{
1890 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1891
1892 return sprintf(buf, "%lld\n", (long long)snap->id);
1893}
1894
1895static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1896static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1897
1898static struct attribute *rbd_snap_attrs[] = {
1899 &dev_attr_snap_size.attr,
1900 &dev_attr_snap_id.attr,
1901 NULL,
1902};
1903
1904static struct attribute_group rbd_snap_attr_group = {
1905 .attrs = rbd_snap_attrs,
1906};
1907
1908static void rbd_snap_dev_release(struct device *dev)
1909{
1910 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1911 kfree(snap->name);
1912 kfree(snap);
1913}
1914
1915static const struct attribute_group *rbd_snap_attr_groups[] = {
1916 &rbd_snap_attr_group,
1917 NULL
1918};
1919
1920static struct device_type rbd_snap_device_type = {
1921 .groups = rbd_snap_attr_groups,
1922 .release = rbd_snap_dev_release,
1923};
1924
1925static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1926 struct rbd_snap *snap)
1927{
1928 list_del(&snap->node);
1929 device_unregister(&snap->dev);
1930}
1931
1932static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1933 struct rbd_snap *snap,
1934 struct device *parent)
1935{
1936 struct device *dev = &snap->dev;
1937 int ret;
1938
1939 dev->type = &rbd_snap_device_type;
1940 dev->parent = parent;
1941 dev->release = rbd_snap_dev_release;
1942 dev_set_name(dev, "snap_%s", snap->name);
1943 ret = device_register(dev);
1944
1945 return ret;
1946}
1947
1948static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1949 int i, const char *name,
1950 struct rbd_snap **snapp)
1951{
1952 int ret;
1953 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1954 if (!snap)
1955 return -ENOMEM;
1956 snap->name = kstrdup(name, GFP_KERNEL);
1957 snap->size = rbd_dev->header.snap_sizes[i];
1958 snap->id = rbd_dev->header.snapc->snaps[i];
1959 if (device_is_registered(&rbd_dev->dev)) {
1960 ret = rbd_register_snap_dev(rbd_dev, snap,
1961 &rbd_dev->dev);
1962 if (ret < 0)
1963 goto err;
1964 }
1965 *snapp = snap;
1966 return 0;
1967err:
1968 kfree(snap->name);
1969 kfree(snap);
1970 return ret;
1971}
1972
1973/*
1974 * search for the previous snap in a null delimited string list
1975 */
1976const char *rbd_prev_snap_name(const char *name, const char *start)
1977{
1978 if (name < start + 2)
1979 return NULL;
1980
1981 name -= 2;
1982 while (*name) {
1983 if (name == start)
1984 return start;
1985 name--;
1986 }
1987 return name + 1;
1988}
1989
1990/*
1991 * compare the old list of snapshots that we have to what's in the header
1992 * and update it accordingly. Note that the header holds the snapshots
1993 * in a reverse order (from newest to oldest) and we need to go from
1994 * older to new so that we don't get a duplicate snap name when
1995 * doing the process (e.g., removed snapshot and recreated a new
1996 * one with the same name.
1997 */
1998static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1999{
2000 const char *name, *first_name;
2001 int i = rbd_dev->header.total_snaps;
2002 struct rbd_snap *snap, *old_snap = NULL;
2003 int ret;
2004 struct list_head *p, *n;
2005
2006 first_name = rbd_dev->header.snap_names;
2007 name = first_name + rbd_dev->header.snap_names_len;
2008
2009 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2010 u64 cur_id;
2011
2012 old_snap = list_entry(p, struct rbd_snap, node);
2013
2014 if (i)
2015 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2016
2017 if (!i || old_snap->id < cur_id) {
2018 /* old_snap->id was skipped, thus was removed */
2019 __rbd_remove_snap_dev(rbd_dev, old_snap);
2020 continue;
2021 }
2022 if (old_snap->id == cur_id) {
2023 /* we have this snapshot already */
2024 i--;
2025 name = rbd_prev_snap_name(name, first_name);
2026 continue;
2027 }
2028 for (; i > 0;
2029 i--, name = rbd_prev_snap_name(name, first_name)) {
2030 if (!name) {
2031 WARN_ON(1);
2032 return -EINVAL;
2033 }
2034 cur_id = rbd_dev->header.snapc->snaps[i];
2035 /* snapshot removal? handle it above */
2036 if (cur_id >= old_snap->id)
2037 break;
2038 /* a new snapshot */
2039 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2040 if (ret < 0)
2041 return ret;
2042
2043 /* note that we add it backward so using n and not p */
2044 list_add(&snap->node, n);
2045 p = &snap->node;
2046 }
2047 }
2048 /* we're done going over the old snap list, just add what's left */
2049 for (; i > 0; i--) {
2050 name = rbd_prev_snap_name(name, first_name);
2051 if (!name) {
2052 WARN_ON(1);
2053 return -EINVAL;
2054 }
2055 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2056 if (ret < 0)
2057 return ret;
2058 list_add(&snap->node, &rbd_dev->snaps);
2059 }
2060
2061 return 0;
2062}
2063
2064
2065static void rbd_root_dev_release(struct device *dev)
2066{
2067}
2068
2069static struct device rbd_root_dev = {
2070 .init_name = "rbd",
2071 .release = rbd_root_dev_release,
2072};
2073
2074static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2075{
2076 int ret = -ENOMEM;
2077 struct device *dev;
2078 struct rbd_snap *snap;
2079
2080 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2081 dev = &rbd_dev->dev;
2082
2083 dev->bus = &rbd_bus_type;
2084 dev->type = &rbd_device_type;
2085 dev->parent = &rbd_root_dev;
2086 dev->release = rbd_dev_release;
2087 dev_set_name(dev, "%d", rbd_dev->id);
2088 ret = device_register(dev);
2089 if (ret < 0)
2090 goto done_free;
2091
2092 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2093 ret = rbd_register_snap_dev(rbd_dev, snap,
2094 &rbd_dev->dev);
2095 if (ret < 0)
602adf40
YS
2096 break;
2097 }
2098
2099 mutex_unlock(&ctl_mutex);
dfc5606d
YS
2100 return 0;
2101done_free:
2102 mutex_unlock(&ctl_mutex);
2103 return ret;
602adf40
YS
2104}
2105
dfc5606d
YS
2106static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2107{
2108 device_unregister(&rbd_dev->dev);
2109}
2110
59c2be1e
YS
2111static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2112{
2113 int ret, rc;
2114
2115 do {
2116 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2117 rbd_dev->header.obj_version);
2118 if (ret == -ERANGE) {
2119 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2120 rc = __rbd_update_snaps(rbd_dev);
2121 mutex_unlock(&ctl_mutex);
2122 if (rc < 0)
2123 return rc;
2124 }
2125 } while (ret == -ERANGE);
2126
2127 return ret;
2128}
2129
2130static ssize_t rbd_add(struct bus_type *bus,
2131 const char *buf,
2132 size_t count)
602adf40
YS
2133{
2134 struct ceph_osd_client *osdc;
2135 struct rbd_device *rbd_dev;
2136 ssize_t rc = -ENOMEM;
2137 int irc, new_id = 0;
2138 struct list_head *tmp;
2139 char *mon_dev_name;
2140 char *options;
2141
2142 if (!try_module_get(THIS_MODULE))
2143 return -ENODEV;
2144
2145 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2146 if (!mon_dev_name)
2147 goto err_out_mod;
2148
2149 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2150 if (!options)
2151 goto err_mon_dev;
2152
2153 /* new rbd_device object */
2154 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2155 if (!rbd_dev)
2156 goto err_out_opt;
2157
2158 /* static rbd_device initialization */
2159 spin_lock_init(&rbd_dev->lock);
2160 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2161 INIT_LIST_HEAD(&rbd_dev->snaps);
602adf40
YS
2162
2163 /* generate unique id: find highest unique id, add one */
2164 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2165
2166 list_for_each(tmp, &rbd_dev_list) {
2167 struct rbd_device *rbd_dev;
2168
2169 rbd_dev = list_entry(tmp, struct rbd_device, node);
2170 if (rbd_dev->id >= new_id)
2171 new_id = rbd_dev->id + 1;
2172 }
2173
2174 rbd_dev->id = new_id;
2175
2176 /* add to global list */
2177 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2178
2179 /* parse add command */
2180 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2181 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2182 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2183 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2184 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2185 mon_dev_name, options, rbd_dev->pool_name,
2186 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2187 rc = -EINVAL;
2188 goto err_out_slot;
2189 }
2190
2191 if (rbd_dev->snap_name[0] == 0)
2192 rbd_dev->snap_name[0] = '-';
2193
2194 rbd_dev->obj_len = strlen(rbd_dev->obj);
2195 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2196 rbd_dev->obj, RBD_SUFFIX);
2197
2198 /* initialize rest of new object */
2199 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2200 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2201 if (rc < 0)
2202 goto err_out_slot;
2203
2204 mutex_unlock(&ctl_mutex);
2205
2206 /* pick the pool */
2207 osdc = &rbd_dev->client->osdc;
2208 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2209 if (rc < 0)
2210 goto err_out_client;
2211 rbd_dev->poolid = rc;
2212
2213 /* register our block device */
2214 irc = register_blkdev(0, rbd_dev->name);
2215 if (irc < 0) {
2216 rc = irc;
2217 goto err_out_client;
2218 }
2219 rbd_dev->major = irc;
2220
dfc5606d
YS
2221 rc = rbd_bus_add_dev(rbd_dev);
2222 if (rc)
766fc439
YS
2223 goto err_out_blkdev;
2224
602adf40
YS
2225 /* set up and announce blkdev mapping */
2226 rc = rbd_init_disk(rbd_dev);
2227 if (rc)
766fc439 2228 goto err_out_bus;
602adf40 2229
59c2be1e
YS
2230 rc = rbd_init_watch_dev(rbd_dev);
2231 if (rc)
2232 goto err_out_bus;
2233
602adf40
YS
2234 return count;
2235
766fc439
YS
2236err_out_bus:
2237 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2238 list_del_init(&rbd_dev->node);
2239 mutex_unlock(&ctl_mutex);
2240
2241 /* this will also clean up rest of rbd_dev stuff */
2242
2243 rbd_bus_del_dev(rbd_dev);
2244 kfree(options);
2245 kfree(mon_dev_name);
2246 return rc;
2247
602adf40
YS
2248err_out_blkdev:
2249 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2250err_out_client:
2251 rbd_put_client(rbd_dev);
2252 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2253err_out_slot:
2254 list_del_init(&rbd_dev->node);
2255 mutex_unlock(&ctl_mutex);
2256
2257 kfree(rbd_dev);
2258err_out_opt:
2259 kfree(options);
2260err_mon_dev:
2261 kfree(mon_dev_name);
2262err_out_mod:
2263 dout("Error adding device %s\n", buf);
2264 module_put(THIS_MODULE);
2265 return rc;
2266}
2267
2268static struct rbd_device *__rbd_get_dev(unsigned long id)
2269{
2270 struct list_head *tmp;
2271 struct rbd_device *rbd_dev;
2272
2273 list_for_each(tmp, &rbd_dev_list) {
2274 rbd_dev = list_entry(tmp, struct rbd_device, node);
2275 if (rbd_dev->id == id)
2276 return rbd_dev;
2277 }
2278 return NULL;
2279}
2280
dfc5606d 2281static void rbd_dev_release(struct device *dev)
602adf40 2282{
dfc5606d
YS
2283 struct rbd_device *rbd_dev =
2284 container_of(dev, struct rbd_device, dev);
602adf40 2285
59c2be1e
YS
2286 if (rbd_dev->watch_request)
2287 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2288 rbd_dev->watch_request);
2289 if (rbd_dev->watch_event)
2290 ceph_osdc_cancel_event(rbd_dev->watch_event);
2291
602adf40
YS
2292 rbd_put_client(rbd_dev);
2293
2294 /* clean up and free blkdev */
2295 rbd_free_disk(rbd_dev);
2296 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2297 kfree(rbd_dev);
2298
2299 /* release module ref */
2300 module_put(THIS_MODULE);
602adf40
YS
2301}
2302
dfc5606d
YS
2303static ssize_t rbd_remove(struct bus_type *bus,
2304 const char *buf,
2305 size_t count)
602adf40
YS
2306{
2307 struct rbd_device *rbd_dev = NULL;
2308 int target_id, rc;
2309 unsigned long ul;
2310 int ret = count;
2311
2312 rc = strict_strtoul(buf, 10, &ul);
2313 if (rc)
2314 return rc;
2315
2316 /* convert to int; abort if we lost anything in the conversion */
2317 target_id = (int) ul;
2318 if (target_id != ul)
2319 return -EINVAL;
2320
2321 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2322
2323 rbd_dev = __rbd_get_dev(target_id);
2324 if (!rbd_dev) {
2325 ret = -ENOENT;
2326 goto done;
2327 }
2328
dfc5606d
YS
2329 list_del_init(&rbd_dev->node);
2330
2331 __rbd_remove_all_snaps(rbd_dev);
2332 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2333
2334done:
2335 mutex_unlock(&ctl_mutex);
2336 return ret;
2337}
2338
dfc5606d
YS
2339static ssize_t rbd_snap_add(struct device *dev,
2340 struct device_attribute *attr,
2341 const char *buf,
2342 size_t count)
602adf40 2343{
dfc5606d
YS
2344 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2345 int ret;
2346 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2347 if (!name)
2348 return -ENOMEM;
2349
dfc5606d 2350 snprintf(name, count, "%s", buf);
602adf40
YS
2351
2352 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2353
602adf40
YS
2354 ret = rbd_header_add_snap(rbd_dev,
2355 name, GFP_KERNEL);
2356 if (ret < 0)
59c2be1e 2357 goto err_unlock;
602adf40 2358
dfc5606d 2359 ret = __rbd_update_snaps(rbd_dev);
602adf40 2360 if (ret < 0)
59c2be1e
YS
2361 goto err_unlock;
2362
2363 /* shouldn't hold ctl_mutex when notifying.. notify might
2364 trigger a watch callback that would need to get that mutex */
2365 mutex_unlock(&ctl_mutex);
2366
2367 /* make a best effort, don't error if failed */
2368 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
602adf40
YS
2369
2370 ret = count;
59c2be1e
YS
2371 kfree(name);
2372 return ret;
2373
2374err_unlock:
602adf40 2375 mutex_unlock(&ctl_mutex);
602adf40
YS
2376 kfree(name);
2377 return ret;
2378}
2379
dfc5606d
YS
2380static ssize_t rbd_snap_rollback(struct device *dev,
2381 struct device_attribute *attr,
2382 const char *buf,
2383 size_t count)
602adf40 2384{
dfc5606d
YS
2385 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2386 int ret;
602adf40 2387 u64 snapid;
602adf40 2388 u64 cur_ofs;
dfc5606d
YS
2389 char *seg_name = NULL;
2390 char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2391 ret = -ENOMEM;
2392 if (!snap_name)
2393 return ret;
602adf40
YS
2394
2395 /* parse snaps add command */
dfc5606d 2396 snprintf(snap_name, count, "%s", buf);
602adf40
YS
2397 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2398 if (!seg_name)
dfc5606d 2399 goto done;
602adf40
YS
2400
2401 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2402
602adf40
YS
2403 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2404 if (ret < 0)
2405 goto done_unlock;
2406
2407 dout("snapid=%lld\n", snapid);
2408
2409 cur_ofs = 0;
2410 while (cur_ofs < rbd_dev->header.image_size) {
2411 cur_ofs += rbd_get_segment(&rbd_dev->header,
2412 rbd_dev->obj,
2413 cur_ofs, (u64)-1,
2414 seg_name, NULL);
2415 dout("seg_name=%s\n", seg_name);
2416
2417 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2418 if (ret < 0)
2419 pr_warning("could not roll back obj %s err=%d\n",
2420 seg_name, ret);
2421 }
2422
dfc5606d 2423 ret = __rbd_update_snaps(rbd_dev);
602adf40
YS
2424 if (ret < 0)
2425 goto done_unlock;
2426
2427 ret = count;
2428
2429done_unlock:
2430 mutex_unlock(&ctl_mutex);
dfc5606d 2431done:
602adf40 2432 kfree(seg_name);
dfc5606d 2433 kfree(snap_name);
602adf40
YS
2434
2435 return ret;
2436}
2437
dfc5606d
YS
2438static struct bus_attribute rbd_bus_attrs[] = {
2439 __ATTR(add, S_IWUSR, NULL, rbd_add),
2440 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
602adf40
YS
2441 __ATTR_NULL
2442};
2443
2444/*
2445 * create control files in sysfs
dfc5606d 2446 * /sys/bus/rbd/...
602adf40
YS
2447 */
2448static int rbd_sysfs_init(void)
2449{
dfc5606d 2450 int ret;
602adf40 2451
dfc5606d 2452 rbd_bus_type.bus_attrs = rbd_bus_attrs;
602adf40 2453
dfc5606d
YS
2454 ret = bus_register(&rbd_bus_type);
2455 if (ret < 0)
2456 return ret;
602adf40 2457
dfc5606d 2458 ret = device_register(&rbd_root_dev);
602adf40 2459
602adf40
YS
2460 return ret;
2461}
2462
2463static void rbd_sysfs_cleanup(void)
2464{
dfc5606d
YS
2465 device_unregister(&rbd_root_dev);
2466 bus_unregister(&rbd_bus_type);
602adf40
YS
2467}
2468
2469int __init rbd_init(void)
2470{
2471 int rc;
2472
2473 rc = rbd_sysfs_init();
2474 if (rc)
2475 return rc;
2476 spin_lock_init(&node_lock);
2477 pr_info("loaded " DRV_NAME_LONG "\n");
2478 return 0;
2479}
2480
2481void __exit rbd_exit(void)
2482{
2483 rbd_sysfs_cleanup();
2484}
2485
2486module_init(rbd_init);
2487module_exit(rbd_exit);
2488
2489MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2490MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2491MODULE_DESCRIPTION("rados block device");
2492
2493/* following authorship retained from original osdblk.c */
2494MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2495
2496MODULE_LICENSE("GPL");