]> git.ipfire.org Git - people/arne_f/kernel.git/blame - drivers/block/rbd.c
rbd: reference count parent requests
[people/arne_f/kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
a2acd00e
AE
58/*
59 * Increment the given counter and return its updated value.
60 * If the counter is already 0 it will not be incremented.
61 * If the counter is already at its maximum value returns
62 * -EINVAL without updating it.
63 */
64static int atomic_inc_return_safe(atomic_t *v)
65{
66 unsigned int counter;
67
68 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
69 if (counter <= (unsigned int)INT_MAX)
70 return (int)counter;
71
72 atomic_dec(v);
73
74 return -EINVAL;
75}
76
77/* Decrement the counter. Return the resulting value, or -EINVAL */
78static int atomic_dec_return_safe(atomic_t *v)
79{
80 int counter;
81
82 counter = atomic_dec_return(v);
83 if (counter >= 0)
84 return counter;
85
86 atomic_inc(v);
87
88 return -EINVAL;
89}
90
f0f8cef5
AE
91#define RBD_DRV_NAME "rbd"
92#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
93
94#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95
d4b125e9
AE
96#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97#define RBD_MAX_SNAP_NAME_LEN \
98 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
35d489f9 100#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
101
102#define RBD_SNAP_HEAD_NAME "-"
103
9682fc6d
AE
104#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105
9e15b77d
AE
106/* This allows a single page to hold an image name sent by OSD */
107#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 108#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 109
1e130199 110#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 111
d889140c
AE
112/* Feature bits */
113
5cbf6f12
AE
114#define RBD_FEATURE_LAYERING (1<<0)
115#define RBD_FEATURE_STRIPINGV2 (1<<1)
116#define RBD_FEATURES_ALL \
117 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
118
119/* Features supported by this (client software) implementation. */
120
770eba6e 121#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 122
81a89793
AE
123/*
124 * An RBD device name will be "rbd#", where the "rbd" comes from
125 * RBD_DRV_NAME above, and # is a unique integer identifier.
126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127 * enough to hold all possible device names.
128 */
602adf40 129#define DEV_NAME_LEN 32
81a89793 130#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
131
132/*
133 * block device image metadata (in-memory version)
134 */
135struct rbd_image_header {
f35a4dee 136 /* These six fields never change for a given rbd image */
849b4260 137 char *object_prefix;
602adf40
YS
138 __u8 obj_order;
139 __u8 crypt_type;
140 __u8 comp_type;
f35a4dee
AE
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
602adf40 144
f84344f3
AE
145 /* The remaining fields need to be updated occasionally */
146 u64 image_size;
147 struct ceph_snap_context *snapc;
f35a4dee
AE
148 char *snap_names; /* format 1 only */
149 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
150};
151
0d7dbfce
AE
152/*
153 * An rbd image specification.
154 *
155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
156 * identify an image. Each rbd_dev structure includes a pointer to
157 * an rbd_spec structure that encapsulates this identity.
158 *
159 * Each of the id's in an rbd_spec has an associated name. For a
160 * user-mapped image, the names are supplied and the id's associated
161 * with them are looked up. For a layered image, a parent image is
162 * defined by the tuple, and the names are looked up.
163 *
164 * An rbd_dev structure contains a parent_spec pointer which is
165 * non-null if the image it represents is a child in a layered
166 * image. This pointer will refer to the rbd_spec structure used
167 * by the parent rbd_dev for its own identity (i.e., the structure
168 * is shared between the parent and child).
169 *
170 * Since these structures are populated once, during the discovery
171 * phase of image construction, they are effectively immutable so
172 * we make no effort to synchronize access to them.
173 *
174 * Note that code herein does not assume the image name is known (it
175 * could be a null pointer).
0d7dbfce
AE
176 */
177struct rbd_spec {
178 u64 pool_id;
ecb4dc22 179 const char *pool_name;
0d7dbfce 180
ecb4dc22
AE
181 const char *image_id;
182 const char *image_name;
0d7dbfce
AE
183
184 u64 snap_id;
ecb4dc22 185 const char *snap_name;
0d7dbfce
AE
186
187 struct kref kref;
188};
189
602adf40 190/*
f0f8cef5 191 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
192 */
193struct rbd_client {
194 struct ceph_client *client;
195 struct kref kref;
196 struct list_head node;
197};
198
bf0d5f50
AE
199struct rbd_img_request;
200typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203
204struct rbd_obj_request;
205typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
9969ebc5
AE
207enum obj_request_type {
208 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209};
bf0d5f50 210
926f9b3f
AE
211enum obj_req_flags {
212 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 213 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
214 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
215 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
216};
217
bf0d5f50
AE
218struct rbd_obj_request {
219 const char *object_name;
220 u64 offset; /* object start byte */
221 u64 length; /* bytes from offset */
926f9b3f 222 unsigned long flags;
bf0d5f50 223
c5b5ef6c
AE
224 /*
225 * An object request associated with an image will have its
226 * img_data flag set; a standalone object request will not.
227 *
228 * A standalone object request will have which == BAD_WHICH
229 * and a null obj_request pointer.
230 *
231 * An object request initiated in support of a layered image
232 * object (to check for its existence before a write) will
233 * have which == BAD_WHICH and a non-null obj_request pointer.
234 *
235 * Finally, an object request for rbd image data will have
236 * which != BAD_WHICH, and will have a non-null img_request
237 * pointer. The value of which will be in the range
238 * 0..(img_request->obj_request_count-1).
239 */
240 union {
241 struct rbd_obj_request *obj_request; /* STAT op */
242 struct {
243 struct rbd_img_request *img_request;
244 u64 img_offset;
245 /* links for img_request->obj_requests list */
246 struct list_head links;
247 };
248 };
bf0d5f50
AE
249 u32 which; /* posn image request list */
250
251 enum obj_request_type type;
788e2df3
AE
252 union {
253 struct bio *bio_list;
254 struct {
255 struct page **pages;
256 u32 page_count;
257 };
258 };
0eefd470 259 struct page **copyup_pages;
ebda6408 260 u32 copyup_page_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50
AE
266
267 rbd_obj_callback_t callback;
788e2df3 268 struct completion completion;
bf0d5f50
AE
269
270 struct kref kref;
271};
272
0c425248 273enum img_req_flags {
9849e986
AE
274 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
275 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 276 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
277};
278
bf0d5f50 279struct rbd_img_request {
bf0d5f50
AE
280 struct rbd_device *rbd_dev;
281 u64 offset; /* starting image byte offset */
282 u64 length; /* byte count from offset */
0c425248 283 unsigned long flags;
bf0d5f50 284 union {
9849e986 285 u64 snap_id; /* for reads */
bf0d5f50 286 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
287 };
288 union {
289 struct request *rq; /* block request */
290 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 291 };
3d7efd18 292 struct page **copyup_pages;
ebda6408 293 u32 copyup_page_count;
bf0d5f50
AE
294 spinlock_t completion_lock;/* protects next_completion */
295 u32 next_completion;
296 rbd_img_callback_t callback;
55f27e09 297 u64 xferred;/* aggregate bytes transferred */
a5a337d4 298 int result; /* first nonzero obj_request result */
bf0d5f50
AE
299
300 u32 obj_request_count;
301 struct list_head obj_requests; /* rbd_obj_request structs */
302
303 struct kref kref;
304};
305
306#define for_each_obj_request(ireq, oreq) \
ef06f4d3 307 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 308#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 309 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 310#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 311 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 312
f84344f3 313struct rbd_mapping {
99c1f08f 314 u64 size;
34b13184 315 u64 features;
f84344f3
AE
316 bool read_only;
317};
318
602adf40
YS
319/*
320 * a single device
321 */
322struct rbd_device {
de71a297 323 int dev_id; /* blkdev unique id */
602adf40
YS
324
325 int major; /* blkdev assigned major */
326 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 327
a30b71b9 328 u32 image_format; /* Either 1 or 2 */
602adf40
YS
329 struct rbd_client *rbd_client;
330
331 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
332
b82d167b 333 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
334
335 struct rbd_image_header header;
b82d167b 336 unsigned long flags; /* possibly lock protected */
0d7dbfce 337 struct rbd_spec *spec;
602adf40 338
0d7dbfce 339 char *header_name;
971f839a 340
0903e875
AE
341 struct ceph_file_layout layout;
342
59c2be1e 343 struct ceph_osd_event *watch_event;
975241af 344 struct rbd_obj_request *watch_request;
59c2be1e 345
86b00e0d
AE
346 struct rbd_spec *parent_spec;
347 u64 parent_overlap;
a2acd00e 348 atomic_t parent_ref;
2f82ee54 349 struct rbd_device *parent;
86b00e0d 350
c666601a
JD
351 /* protects updating the header */
352 struct rw_semaphore header_rwsem;
f84344f3
AE
353
354 struct rbd_mapping mapping;
602adf40
YS
355
356 struct list_head node;
dfc5606d 357
dfc5606d
YS
358 /* sysfs related */
359 struct device dev;
b82d167b 360 unsigned long open_count; /* protected by lock */
dfc5606d
YS
361};
362
b82d167b
AE
363/*
364 * Flag bits for rbd_dev->flags. If atomicity is required,
365 * rbd_dev->lock is used to protect access.
366 *
367 * Currently, only the "removing" flag (which is coupled with the
368 * "open_count" field) requires atomic access.
369 */
6d292906
AE
370enum rbd_dev_flags {
371 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 372 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
373};
374
602adf40 375static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 376
602adf40 377static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
378static DEFINE_SPINLOCK(rbd_dev_list_lock);
379
432b8587
AE
380static LIST_HEAD(rbd_client_list); /* clients */
381static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 382
78c2a44a
AE
383/* Slab caches for frequently-allocated structures */
384
1c2a9dfe 385static struct kmem_cache *rbd_img_request_cache;
868311b1 386static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 387static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 388
3d7efd18
AE
389static int rbd_img_request_submit(struct rbd_img_request *img_request);
390
200a6a8b 391static void rbd_dev_device_release(struct device *dev);
dfc5606d 392
f0f8cef5
AE
393static ssize_t rbd_add(struct bus_type *bus, const char *buf,
394 size_t count);
395static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
396 size_t count);
1f3ef788 397static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
a2acd00e 398static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5
AE
399
400static struct bus_attribute rbd_bus_attrs[] = {
401 __ATTR(add, S_IWUSR, NULL, rbd_add),
402 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
403 __ATTR_NULL
404};
405
406static struct bus_type rbd_bus_type = {
407 .name = "rbd",
408 .bus_attrs = rbd_bus_attrs,
409};
410
411static void rbd_root_dev_release(struct device *dev)
412{
413}
414
415static struct device rbd_root_dev = {
416 .init_name = "rbd",
417 .release = rbd_root_dev_release,
418};
419
06ecc6cb
AE
420static __printf(2, 3)
421void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
422{
423 struct va_format vaf;
424 va_list args;
425
426 va_start(args, fmt);
427 vaf.fmt = fmt;
428 vaf.va = &args;
429
430 if (!rbd_dev)
431 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
432 else if (rbd_dev->disk)
433 printk(KERN_WARNING "%s: %s: %pV\n",
434 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
435 else if (rbd_dev->spec && rbd_dev->spec->image_name)
436 printk(KERN_WARNING "%s: image %s: %pV\n",
437 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
438 else if (rbd_dev->spec && rbd_dev->spec->image_id)
439 printk(KERN_WARNING "%s: id %s: %pV\n",
440 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
441 else /* punt */
442 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
443 RBD_DRV_NAME, rbd_dev, &vaf);
444 va_end(args);
445}
446
aafb230e
AE
447#ifdef RBD_DEBUG
448#define rbd_assert(expr) \
449 if (unlikely(!(expr))) { \
450 printk(KERN_ERR "\nAssertion failure in %s() " \
451 "at line %d:\n\n" \
452 "\trbd_assert(%s);\n\n", \
453 __func__, __LINE__, #expr); \
454 BUG(); \
455 }
456#else /* !RBD_DEBUG */
457# define rbd_assert(expr) ((void) 0)
458#endif /* !RBD_DEBUG */
dfc5606d 459
b454e36d 460static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
461static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
462static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 463
cc4a38bd 464static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
465static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
466static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
467static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
468 u64 snap_id);
2ad3d716
AE
469static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
470 u8 *order, u64 *snap_size);
471static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
472 u64 *snap_features);
473static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 474
602adf40
YS
475static int rbd_open(struct block_device *bdev, fmode_t mode)
476{
f0f8cef5 477 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 478 bool removing = false;
602adf40 479
f84344f3 480 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
481 return -EROFS;
482
a14ea269 483 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
484 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
485 removing = true;
486 else
487 rbd_dev->open_count++;
a14ea269 488 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
489 if (removing)
490 return -ENOENT;
491
42382b70 492 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 493 (void) get_device(&rbd_dev->dev);
f84344f3 494 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 495 mutex_unlock(&ctl_mutex);
340c7a2b 496
602adf40
YS
497 return 0;
498}
499
dfc5606d
YS
500static int rbd_release(struct gendisk *disk, fmode_t mode)
501{
502 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
503 unsigned long open_count_before;
504
a14ea269 505 spin_lock_irq(&rbd_dev->lock);
b82d167b 506 open_count_before = rbd_dev->open_count--;
a14ea269 507 spin_unlock_irq(&rbd_dev->lock);
b82d167b 508 rbd_assert(open_count_before > 0);
dfc5606d 509
42382b70 510 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 511 put_device(&rbd_dev->dev);
42382b70 512 mutex_unlock(&ctl_mutex);
dfc5606d
YS
513
514 return 0;
515}
516
602adf40
YS
517static const struct block_device_operations rbd_bd_ops = {
518 .owner = THIS_MODULE,
519 .open = rbd_open,
dfc5606d 520 .release = rbd_release,
602adf40
YS
521};
522
523/*
524 * Initialize an rbd client instance.
43ae4701 525 * We own *ceph_opts.
602adf40 526 */
f8c38929 527static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
528{
529 struct rbd_client *rbdc;
530 int ret = -ENOMEM;
531
37206ee5 532 dout("%s:\n", __func__);
602adf40
YS
533 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
534 if (!rbdc)
535 goto out_opt;
536
537 kref_init(&rbdc->kref);
538 INIT_LIST_HEAD(&rbdc->node);
539
bc534d86
AE
540 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
541
43ae4701 542 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 543 if (IS_ERR(rbdc->client))
bc534d86 544 goto out_mutex;
43ae4701 545 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
546
547 ret = ceph_open_session(rbdc->client);
548 if (ret < 0)
549 goto out_err;
550
432b8587 551 spin_lock(&rbd_client_list_lock);
602adf40 552 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 553 spin_unlock(&rbd_client_list_lock);
602adf40 554
bc534d86 555 mutex_unlock(&ctl_mutex);
37206ee5 556 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 557
602adf40
YS
558 return rbdc;
559
560out_err:
561 ceph_destroy_client(rbdc->client);
bc534d86
AE
562out_mutex:
563 mutex_unlock(&ctl_mutex);
602adf40
YS
564 kfree(rbdc);
565out_opt:
43ae4701
AE
566 if (ceph_opts)
567 ceph_destroy_options(ceph_opts);
37206ee5
AE
568 dout("%s: error %d\n", __func__, ret);
569
28f259b7 570 return ERR_PTR(ret);
602adf40
YS
571}
572
2f82ee54
AE
573static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
574{
575 kref_get(&rbdc->kref);
576
577 return rbdc;
578}
579
602adf40 580/*
1f7ba331
AE
581 * Find a ceph client with specific addr and configuration. If
582 * found, bump its reference count.
602adf40 583 */
1f7ba331 584static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
585{
586 struct rbd_client *client_node;
1f7ba331 587 bool found = false;
602adf40 588
43ae4701 589 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
590 return NULL;
591
1f7ba331
AE
592 spin_lock(&rbd_client_list_lock);
593 list_for_each_entry(client_node, &rbd_client_list, node) {
594 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
595 __rbd_get_client(client_node);
596
1f7ba331
AE
597 found = true;
598 break;
599 }
600 }
601 spin_unlock(&rbd_client_list_lock);
602
603 return found ? client_node : NULL;
602adf40
YS
604}
605
59c2be1e
YS
606/*
607 * mount options
608 */
609enum {
59c2be1e
YS
610 Opt_last_int,
611 /* int args above */
612 Opt_last_string,
613 /* string args above */
cc0538b6
AE
614 Opt_read_only,
615 Opt_read_write,
616 /* Boolean args above */
617 Opt_last_bool,
59c2be1e
YS
618};
619
43ae4701 620static match_table_t rbd_opts_tokens = {
59c2be1e
YS
621 /* int args above */
622 /* string args above */
be466c1c 623 {Opt_read_only, "read_only"},
cc0538b6
AE
624 {Opt_read_only, "ro"}, /* Alternate spelling */
625 {Opt_read_write, "read_write"},
626 {Opt_read_write, "rw"}, /* Alternate spelling */
627 /* Boolean args above */
59c2be1e
YS
628 {-1, NULL}
629};
630
98571b5a
AE
631struct rbd_options {
632 bool read_only;
633};
634
635#define RBD_READ_ONLY_DEFAULT false
636
59c2be1e
YS
637static int parse_rbd_opts_token(char *c, void *private)
638{
43ae4701 639 struct rbd_options *rbd_opts = private;
59c2be1e
YS
640 substring_t argstr[MAX_OPT_ARGS];
641 int token, intval, ret;
642
43ae4701 643 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
644 if (token < 0)
645 return -EINVAL;
646
647 if (token < Opt_last_int) {
648 ret = match_int(&argstr[0], &intval);
649 if (ret < 0) {
650 pr_err("bad mount option arg (not int) "
651 "at '%s'\n", c);
652 return ret;
653 }
654 dout("got int token %d val %d\n", token, intval);
655 } else if (token > Opt_last_int && token < Opt_last_string) {
656 dout("got string token %d val %s\n", token,
657 argstr[0].from);
cc0538b6
AE
658 } else if (token > Opt_last_string && token < Opt_last_bool) {
659 dout("got Boolean token %d\n", token);
59c2be1e
YS
660 } else {
661 dout("got token %d\n", token);
662 }
663
664 switch (token) {
cc0538b6
AE
665 case Opt_read_only:
666 rbd_opts->read_only = true;
667 break;
668 case Opt_read_write:
669 rbd_opts->read_only = false;
670 break;
59c2be1e 671 default:
aafb230e
AE
672 rbd_assert(false);
673 break;
59c2be1e
YS
674 }
675 return 0;
676}
677
602adf40
YS
678/*
679 * Get a ceph client with specific addr and configuration, if one does
680 * not exist create it.
681 */
9d3997fd 682static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 683{
f8c38929 684 struct rbd_client *rbdc;
59c2be1e 685
1f7ba331 686 rbdc = rbd_client_find(ceph_opts);
9d3997fd 687 if (rbdc) /* using an existing client */
43ae4701 688 ceph_destroy_options(ceph_opts);
9d3997fd 689 else
f8c38929 690 rbdc = rbd_client_create(ceph_opts);
602adf40 691
9d3997fd 692 return rbdc;
602adf40
YS
693}
694
695/*
696 * Destroy ceph client
d23a4b3f 697 *
432b8587 698 * Caller must hold rbd_client_list_lock.
602adf40
YS
699 */
700static void rbd_client_release(struct kref *kref)
701{
702 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
703
37206ee5 704 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 705 spin_lock(&rbd_client_list_lock);
602adf40 706 list_del(&rbdc->node);
cd9d9f5d 707 spin_unlock(&rbd_client_list_lock);
602adf40
YS
708
709 ceph_destroy_client(rbdc->client);
710 kfree(rbdc);
711}
712
713/*
714 * Drop reference to ceph client node. If it's not referenced anymore, release
715 * it.
716 */
9d3997fd 717static void rbd_put_client(struct rbd_client *rbdc)
602adf40 718{
c53d5893
AE
719 if (rbdc)
720 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
721}
722
a30b71b9
AE
723static bool rbd_image_format_valid(u32 image_format)
724{
725 return image_format == 1 || image_format == 2;
726}
727
8e94af8e
AE
728static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
729{
103a150f
AE
730 size_t size;
731 u32 snap_count;
732
733 /* The header has to start with the magic rbd header text */
734 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
735 return false;
736
db2388b6
AE
737 /* The bio layer requires at least sector-sized I/O */
738
739 if (ondisk->options.order < SECTOR_SHIFT)
740 return false;
741
742 /* If we use u64 in a few spots we may be able to loosen this */
743
744 if (ondisk->options.order > 8 * sizeof (int) - 1)
745 return false;
746
103a150f
AE
747 /*
748 * The size of a snapshot header has to fit in a size_t, and
749 * that limits the number of snapshots.
750 */
751 snap_count = le32_to_cpu(ondisk->snap_count);
752 size = SIZE_MAX - sizeof (struct ceph_snap_context);
753 if (snap_count > size / sizeof (__le64))
754 return false;
755
756 /*
757 * Not only that, but the size of the entire the snapshot
758 * header must also be representable in a size_t.
759 */
760 size -= snap_count * sizeof (__le64);
761 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
762 return false;
763
764 return true;
8e94af8e
AE
765}
766
602adf40 767/*
bb23e37a
AE
768 * Fill an rbd image header with information from the given format 1
769 * on-disk header.
602adf40 770 */
662518b1 771static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 772 struct rbd_image_header_ondisk *ondisk)
602adf40 773{
662518b1 774 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
775 bool first_time = header->object_prefix == NULL;
776 struct ceph_snap_context *snapc;
777 char *object_prefix = NULL;
778 char *snap_names = NULL;
779 u64 *snap_sizes = NULL;
ccece235 780 u32 snap_count;
d2bb24e5 781 size_t size;
bb23e37a 782 int ret = -ENOMEM;
621901d6 783 u32 i;
602adf40 784
bb23e37a
AE
785 /* Allocate this now to avoid having to handle failure below */
786
787 if (first_time) {
788 size_t len;
789
790 len = strnlen(ondisk->object_prefix,
791 sizeof (ondisk->object_prefix));
792 object_prefix = kmalloc(len + 1, GFP_KERNEL);
793 if (!object_prefix)
794 return -ENOMEM;
795 memcpy(object_prefix, ondisk->object_prefix, len);
796 object_prefix[len] = '\0';
797 }
103a150f 798
bb23e37a 799 /* Allocate the snapshot context and fill it in */
00f1f36f 800
bb23e37a
AE
801 snap_count = le32_to_cpu(ondisk->snap_count);
802 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
803 if (!snapc)
804 goto out_err;
805 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 806 if (snap_count) {
bb23e37a 807 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
808 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
809
bb23e37a 810 /* We'll keep a copy of the snapshot names... */
621901d6 811
bb23e37a
AE
812 if (snap_names_len > (u64)SIZE_MAX)
813 goto out_2big;
814 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
815 if (!snap_names)
6a52325f
AE
816 goto out_err;
817
bb23e37a 818 /* ...as well as the array of their sizes. */
621901d6 819
d2bb24e5 820 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
821 snap_sizes = kmalloc(size, GFP_KERNEL);
822 if (!snap_sizes)
6a52325f 823 goto out_err;
bb23e37a
AE
824
825 /*
826 * Copy the names, and fill in each snapshot's id
827 * and size.
828 *
99a41ebc 829 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a
AE
830 * ondisk buffer we're working with has
831 * snap_names_len bytes beyond the end of the
832 * snapshot id array, this memcpy() is safe.
833 */
834 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
835 snaps = ondisk->snaps;
836 for (i = 0; i < snap_count; i++) {
837 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
838 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
839 }
602adf40 840 }
849b4260 841
bb23e37a
AE
842 /* We won't fail any more, fill in the header */
843
662518b1 844 down_write(&rbd_dev->header_rwsem);
bb23e37a
AE
845 if (first_time) {
846 header->object_prefix = object_prefix;
847 header->obj_order = ondisk->options.order;
848 header->crypt_type = ondisk->options.crypt_type;
849 header->comp_type = ondisk->options.comp_type;
850 /* The rest aren't used for format 1 images */
851 header->stripe_unit = 0;
852 header->stripe_count = 0;
853 header->features = 0;
662518b1
AE
854 } else {
855 ceph_put_snap_context(header->snapc);
856 kfree(header->snap_names);
857 kfree(header->snap_sizes);
bb23e37a 858 }
6a52325f 859
bb23e37a 860 /* The remaining fields always get updated (when we refresh) */
621901d6 861
f84344f3 862 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
863 header->snapc = snapc;
864 header->snap_names = snap_names;
865 header->snap_sizes = snap_sizes;
602adf40 866
662518b1
AE
867 /* Make sure mapping size is consistent with header info */
868
869 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
870 if (rbd_dev->mapping.size != header->image_size)
871 rbd_dev->mapping.size = header->image_size;
872
873 up_write(&rbd_dev->header_rwsem);
874
602adf40 875 return 0;
bb23e37a
AE
876out_2big:
877 ret = -EIO;
6a52325f 878out_err:
bb23e37a
AE
879 kfree(snap_sizes);
880 kfree(snap_names);
881 ceph_put_snap_context(snapc);
882 kfree(object_prefix);
883
884 return ret;
602adf40
YS
885}
886
9682fc6d
AE
887static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
888{
889 const char *snap_name;
890
891 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
892
893 /* Skip over names until we find the one we are looking for */
894
895 snap_name = rbd_dev->header.snap_names;
896 while (which--)
897 snap_name += strlen(snap_name) + 1;
898
899 return kstrdup(snap_name, GFP_KERNEL);
900}
901
30d1cff8
AE
902/*
903 * Snapshot id comparison function for use with qsort()/bsearch().
904 * Note that result is for snapshots in *descending* order.
905 */
906static int snapid_compare_reverse(const void *s1, const void *s2)
907{
908 u64 snap_id1 = *(u64 *)s1;
909 u64 snap_id2 = *(u64 *)s2;
910
911 if (snap_id1 < snap_id2)
912 return 1;
913 return snap_id1 == snap_id2 ? 0 : -1;
914}
915
916/*
917 * Search a snapshot context to see if the given snapshot id is
918 * present.
919 *
920 * Returns the position of the snapshot id in the array if it's found,
921 * or BAD_SNAP_INDEX otherwise.
922 *
923 * Note: The snapshot array is in kept sorted (by the osd) in
924 * reverse order, highest snapshot id first.
925 */
9682fc6d
AE
926static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
927{
928 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 929 u64 *found;
9682fc6d 930
30d1cff8
AE
931 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
932 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 933
30d1cff8 934 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
935}
936
2ad3d716
AE
937static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
938 u64 snap_id)
9e15b77d 939{
54cac61f 940 u32 which;
9e15b77d 941
54cac61f
AE
942 which = rbd_dev_snap_index(rbd_dev, snap_id);
943 if (which == BAD_SNAP_INDEX)
944 return NULL;
945
946 return _rbd_dev_v1_snap_name(rbd_dev, which);
947}
948
949static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
950{
9e15b77d
AE
951 if (snap_id == CEPH_NOSNAP)
952 return RBD_SNAP_HEAD_NAME;
953
54cac61f
AE
954 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
955 if (rbd_dev->image_format == 1)
956 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 957
54cac61f 958 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
959}
960
2ad3d716
AE
961static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
962 u64 *snap_size)
602adf40 963{
2ad3d716
AE
964 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
965 if (snap_id == CEPH_NOSNAP) {
966 *snap_size = rbd_dev->header.image_size;
967 } else if (rbd_dev->image_format == 1) {
968 u32 which;
602adf40 969
2ad3d716
AE
970 which = rbd_dev_snap_index(rbd_dev, snap_id);
971 if (which == BAD_SNAP_INDEX)
972 return -ENOENT;
e86924a8 973
2ad3d716
AE
974 *snap_size = rbd_dev->header.snap_sizes[which];
975 } else {
976 u64 size = 0;
977 int ret;
978
979 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
980 if (ret)
981 return ret;
982
983 *snap_size = size;
984 }
985 return 0;
602adf40
YS
986}
987
2ad3d716
AE
988static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
989 u64 *snap_features)
602adf40 990{
2ad3d716
AE
991 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
992 if (snap_id == CEPH_NOSNAP) {
993 *snap_features = rbd_dev->header.features;
994 } else if (rbd_dev->image_format == 1) {
995 *snap_features = 0; /* No features for format 1 */
602adf40 996 } else {
2ad3d716
AE
997 u64 features = 0;
998 int ret;
8b0241f8 999
2ad3d716
AE
1000 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1001 if (ret)
1002 return ret;
1003
1004 *snap_features = features;
1005 }
1006 return 0;
1007}
1008
1009static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1010{
8f4b7d98 1011 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1012 u64 size = 0;
1013 u64 features = 0;
1014 int ret;
1015
2ad3d716
AE
1016 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1017 if (ret)
1018 return ret;
1019 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1020 if (ret)
1021 return ret;
1022
1023 rbd_dev->mapping.size = size;
1024 rbd_dev->mapping.features = features;
1025
8b0241f8 1026 return 0;
602adf40
YS
1027}
1028
d1cf5788
AE
1029static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1030{
1031 rbd_dev->mapping.size = 0;
1032 rbd_dev->mapping.features = 0;
d1cf5788
AE
1033}
1034
98571b5a 1035static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1036{
65ccfe21
AE
1037 char *name;
1038 u64 segment;
1039 int ret;
602adf40 1040
78c2a44a 1041 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1042 if (!name)
1043 return NULL;
1044 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 1045 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 1046 rbd_dev->header.object_prefix, segment);
2fd82b9e 1047 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1048 pr_err("error formatting segment name for #%llu (%d)\n",
1049 segment, ret);
1050 kfree(name);
1051 name = NULL;
1052 }
602adf40 1053
65ccfe21
AE
1054 return name;
1055}
602adf40 1056
78c2a44a
AE
1057static void rbd_segment_name_free(const char *name)
1058{
1059 /* The explicit cast here is needed to drop the const qualifier */
1060
1061 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1062}
1063
65ccfe21
AE
1064static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1065{
1066 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1067
65ccfe21
AE
1068 return offset & (segment_size - 1);
1069}
1070
1071static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1072 u64 offset, u64 length)
1073{
1074 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1075
1076 offset &= segment_size - 1;
1077
aafb230e 1078 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1079 if (offset + length > segment_size)
1080 length = segment_size - offset;
1081
1082 return length;
602adf40
YS
1083}
1084
029bcbd8
JD
1085/*
1086 * returns the size of an object in the image
1087 */
1088static u64 rbd_obj_bytes(struct rbd_image_header *header)
1089{
1090 return 1 << header->obj_order;
1091}
1092
602adf40
YS
1093/*
1094 * bio helpers
1095 */
1096
1097static void bio_chain_put(struct bio *chain)
1098{
1099 struct bio *tmp;
1100
1101 while (chain) {
1102 tmp = chain;
1103 chain = chain->bi_next;
1104 bio_put(tmp);
1105 }
1106}
1107
1108/*
1109 * zeros a bio chain, starting at specific offset
1110 */
1111static void zero_bio_chain(struct bio *chain, int start_ofs)
1112{
1113 struct bio_vec *bv;
1114 unsigned long flags;
1115 void *buf;
1116 int i;
1117 int pos = 0;
1118
1119 while (chain) {
1120 bio_for_each_segment(bv, chain, i) {
1121 if (pos + bv->bv_len > start_ofs) {
1122 int remainder = max(start_ofs - pos, 0);
1123 buf = bvec_kmap_irq(bv, &flags);
1124 memset(buf + remainder, 0,
1125 bv->bv_len - remainder);
85b5aaa6 1126 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1127 }
1128 pos += bv->bv_len;
1129 }
1130
1131 chain = chain->bi_next;
1132 }
1133}
1134
b9434c5b
AE
1135/*
1136 * similar to zero_bio_chain(), zeros data defined by a page array,
1137 * starting at the given byte offset from the start of the array and
1138 * continuing up to the given end offset. The pages array is
1139 * assumed to be big enough to hold all bytes up to the end.
1140 */
1141static void zero_pages(struct page **pages, u64 offset, u64 end)
1142{
1143 struct page **page = &pages[offset >> PAGE_SHIFT];
1144
1145 rbd_assert(end > offset);
1146 rbd_assert(end - offset <= (u64)SIZE_MAX);
1147 while (offset < end) {
1148 size_t page_offset;
1149 size_t length;
1150 unsigned long flags;
1151 void *kaddr;
1152
1153 page_offset = (size_t)(offset & ~PAGE_MASK);
1154 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1155 local_irq_save(flags);
1156 kaddr = kmap_atomic(*page);
1157 memset(kaddr + page_offset, 0, length);
1158 kunmap_atomic(kaddr);
1159 local_irq_restore(flags);
1160
1161 offset += length;
1162 page++;
1163 }
1164}
1165
602adf40 1166/*
f7760dad
AE
1167 * Clone a portion of a bio, starting at the given byte offset
1168 * and continuing for the number of bytes indicated.
602adf40 1169 */
f7760dad
AE
1170static struct bio *bio_clone_range(struct bio *bio_src,
1171 unsigned int offset,
1172 unsigned int len,
1173 gfp_t gfpmask)
602adf40 1174{
f7760dad
AE
1175 struct bio_vec *bv;
1176 unsigned int resid;
1177 unsigned short idx;
1178 unsigned int voff;
1179 unsigned short end_idx;
1180 unsigned short vcnt;
1181 struct bio *bio;
1182
1183 /* Handle the easy case for the caller */
1184
1185 if (!offset && len == bio_src->bi_size)
1186 return bio_clone(bio_src, gfpmask);
1187
1188 if (WARN_ON_ONCE(!len))
1189 return NULL;
1190 if (WARN_ON_ONCE(len > bio_src->bi_size))
1191 return NULL;
1192 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1193 return NULL;
1194
1195 /* Find first affected segment... */
1196
1197 resid = offset;
1198 __bio_for_each_segment(bv, bio_src, idx, 0) {
1199 if (resid < bv->bv_len)
1200 break;
1201 resid -= bv->bv_len;
602adf40 1202 }
f7760dad 1203 voff = resid;
602adf40 1204
f7760dad 1205 /* ...and the last affected segment */
602adf40 1206
f7760dad
AE
1207 resid += len;
1208 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1209 if (resid <= bv->bv_len)
1210 break;
1211 resid -= bv->bv_len;
1212 }
1213 vcnt = end_idx - idx + 1;
1214
1215 /* Build the clone */
1216
1217 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1218 if (!bio)
1219 return NULL; /* ENOMEM */
602adf40 1220
f7760dad
AE
1221 bio->bi_bdev = bio_src->bi_bdev;
1222 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1223 bio->bi_rw = bio_src->bi_rw;
1224 bio->bi_flags |= 1 << BIO_CLONED;
1225
1226 /*
1227 * Copy over our part of the bio_vec, then update the first
1228 * and last (or only) entries.
1229 */
1230 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1231 vcnt * sizeof (struct bio_vec));
1232 bio->bi_io_vec[0].bv_offset += voff;
1233 if (vcnt > 1) {
1234 bio->bi_io_vec[0].bv_len -= voff;
1235 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1236 } else {
1237 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1238 }
1239
f7760dad
AE
1240 bio->bi_vcnt = vcnt;
1241 bio->bi_size = len;
1242 bio->bi_idx = 0;
1243
1244 return bio;
1245}
1246
1247/*
1248 * Clone a portion of a bio chain, starting at the given byte offset
1249 * into the first bio in the source chain and continuing for the
1250 * number of bytes indicated. The result is another bio chain of
1251 * exactly the given length, or a null pointer on error.
1252 *
1253 * The bio_src and offset parameters are both in-out. On entry they
1254 * refer to the first source bio and the offset into that bio where
1255 * the start of data to be cloned is located.
1256 *
1257 * On return, bio_src is updated to refer to the bio in the source
1258 * chain that contains first un-cloned byte, and *offset will
1259 * contain the offset of that byte within that bio.
1260 */
1261static struct bio *bio_chain_clone_range(struct bio **bio_src,
1262 unsigned int *offset,
1263 unsigned int len,
1264 gfp_t gfpmask)
1265{
1266 struct bio *bi = *bio_src;
1267 unsigned int off = *offset;
1268 struct bio *chain = NULL;
1269 struct bio **end;
1270
1271 /* Build up a chain of clone bios up to the limit */
1272
1273 if (!bi || off >= bi->bi_size || !len)
1274 return NULL; /* Nothing to clone */
602adf40 1275
f7760dad
AE
1276 end = &chain;
1277 while (len) {
1278 unsigned int bi_size;
1279 struct bio *bio;
1280
f5400b7a
AE
1281 if (!bi) {
1282 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1283 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1284 }
f7760dad
AE
1285 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1286 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1287 if (!bio)
1288 goto out_err; /* ENOMEM */
1289
1290 *end = bio;
1291 end = &bio->bi_next;
602adf40 1292
f7760dad
AE
1293 off += bi_size;
1294 if (off == bi->bi_size) {
1295 bi = bi->bi_next;
1296 off = 0;
1297 }
1298 len -= bi_size;
1299 }
1300 *bio_src = bi;
1301 *offset = off;
1302
1303 return chain;
1304out_err:
1305 bio_chain_put(chain);
602adf40 1306
602adf40
YS
1307 return NULL;
1308}
1309
926f9b3f
AE
1310/*
1311 * The default/initial value for all object request flags is 0. For
1312 * each flag, once its value is set to 1 it is never reset to 0
1313 * again.
1314 */
57acbaa7 1315static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1316{
57acbaa7 1317 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1318 struct rbd_device *rbd_dev;
1319
57acbaa7
AE
1320 rbd_dev = obj_request->img_request->rbd_dev;
1321 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1322 obj_request);
1323 }
1324}
1325
57acbaa7 1326static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1327{
1328 smp_mb();
57acbaa7 1329 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1330}
1331
57acbaa7 1332static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1333{
57acbaa7
AE
1334 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1335 struct rbd_device *rbd_dev = NULL;
6365d33a 1336
57acbaa7
AE
1337 if (obj_request_img_data_test(obj_request))
1338 rbd_dev = obj_request->img_request->rbd_dev;
1339 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1340 obj_request);
1341 }
1342}
1343
57acbaa7 1344static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1345{
1346 smp_mb();
57acbaa7 1347 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1348}
1349
5679c59f
AE
1350/*
1351 * This sets the KNOWN flag after (possibly) setting the EXISTS
1352 * flag. The latter is set based on the "exists" value provided.
1353 *
1354 * Note that for our purposes once an object exists it never goes
1355 * away again. It's possible that the response from two existence
1356 * checks are separated by the creation of the target object, and
1357 * the first ("doesn't exist") response arrives *after* the second
1358 * ("does exist"). In that case we ignore the second one.
1359 */
1360static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1361 bool exists)
1362{
1363 if (exists)
1364 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1365 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1366 smp_mb();
1367}
1368
1369static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1370{
1371 smp_mb();
1372 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1373}
1374
1375static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1376{
1377 smp_mb();
1378 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1379}
1380
bf0d5f50
AE
1381static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1382{
37206ee5
AE
1383 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1384 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1385 kref_get(&obj_request->kref);
1386}
1387
1388static void rbd_obj_request_destroy(struct kref *kref);
1389static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1390{
1391 rbd_assert(obj_request != NULL);
37206ee5
AE
1392 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1393 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1394 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1395}
1396
e93f3152
AE
1397static bool img_request_child_test(struct rbd_img_request *img_request);
1398static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1399static void rbd_img_request_destroy(struct kref *kref);
1400static void rbd_img_request_put(struct rbd_img_request *img_request)
1401{
1402 rbd_assert(img_request != NULL);
37206ee5
AE
1403 dout("%s: img %p (was %d)\n", __func__, img_request,
1404 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1405 if (img_request_child_test(img_request))
1406 kref_put(&img_request->kref, rbd_parent_request_destroy);
1407 else
1408 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1409}
1410
1411static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1412 struct rbd_obj_request *obj_request)
1413{
25dcf954
AE
1414 rbd_assert(obj_request->img_request == NULL);
1415
b155e86c 1416 /* Image request now owns object's original reference */
bf0d5f50 1417 obj_request->img_request = img_request;
25dcf954 1418 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1419 rbd_assert(!obj_request_img_data_test(obj_request));
1420 obj_request_img_data_set(obj_request);
bf0d5f50 1421 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1422 img_request->obj_request_count++;
1423 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1424 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1425 obj_request->which);
bf0d5f50
AE
1426}
1427
1428static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1429 struct rbd_obj_request *obj_request)
1430{
1431 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1432
37206ee5
AE
1433 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1434 obj_request->which);
bf0d5f50 1435 list_del(&obj_request->links);
25dcf954
AE
1436 rbd_assert(img_request->obj_request_count > 0);
1437 img_request->obj_request_count--;
1438 rbd_assert(obj_request->which == img_request->obj_request_count);
1439 obj_request->which = BAD_WHICH;
6365d33a 1440 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1441 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1442 obj_request->img_request = NULL;
25dcf954 1443 obj_request->callback = NULL;
bf0d5f50
AE
1444 rbd_obj_request_put(obj_request);
1445}
1446
1447static bool obj_request_type_valid(enum obj_request_type type)
1448{
1449 switch (type) {
9969ebc5 1450 case OBJ_REQUEST_NODATA:
bf0d5f50 1451 case OBJ_REQUEST_BIO:
788e2df3 1452 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1453 return true;
1454 default:
1455 return false;
1456 }
1457}
1458
bf0d5f50
AE
1459static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1460 struct rbd_obj_request *obj_request)
1461{
37206ee5
AE
1462 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1463
bf0d5f50
AE
1464 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1465}
1466
1467static void rbd_img_request_complete(struct rbd_img_request *img_request)
1468{
55f27e09 1469
37206ee5 1470 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1471
1472 /*
1473 * If no error occurred, compute the aggregate transfer
1474 * count for the image request. We could instead use
1475 * atomic64_cmpxchg() to update it as each object request
1476 * completes; not clear which way is better off hand.
1477 */
1478 if (!img_request->result) {
1479 struct rbd_obj_request *obj_request;
1480 u64 xferred = 0;
1481
1482 for_each_obj_request(img_request, obj_request)
1483 xferred += obj_request->xferred;
1484 img_request->xferred = xferred;
1485 }
1486
bf0d5f50
AE
1487 if (img_request->callback)
1488 img_request->callback(img_request);
1489 else
1490 rbd_img_request_put(img_request);
1491}
1492
788e2df3
AE
1493/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1494
1495static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1496{
37206ee5
AE
1497 dout("%s: obj %p\n", __func__, obj_request);
1498
788e2df3
AE
1499 return wait_for_completion_interruptible(&obj_request->completion);
1500}
1501
0c425248
AE
1502/*
1503 * The default/initial value for all image request flags is 0. Each
1504 * is conditionally set to 1 at image request initialization time
1505 * and currently never change thereafter.
1506 */
1507static void img_request_write_set(struct rbd_img_request *img_request)
1508{
1509 set_bit(IMG_REQ_WRITE, &img_request->flags);
1510 smp_mb();
1511}
1512
1513static bool img_request_write_test(struct rbd_img_request *img_request)
1514{
1515 smp_mb();
1516 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1517}
1518
9849e986
AE
1519static void img_request_child_set(struct rbd_img_request *img_request)
1520{
1521 set_bit(IMG_REQ_CHILD, &img_request->flags);
1522 smp_mb();
1523}
1524
e93f3152
AE
1525static void img_request_child_clear(struct rbd_img_request *img_request)
1526{
1527 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1528 smp_mb();
1529}
1530
9849e986
AE
1531static bool img_request_child_test(struct rbd_img_request *img_request)
1532{
1533 smp_mb();
1534 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1535}
1536
d0b2e944
AE
1537static void img_request_layered_set(struct rbd_img_request *img_request)
1538{
1539 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1540 smp_mb();
1541}
1542
a2acd00e
AE
1543static void img_request_layered_clear(struct rbd_img_request *img_request)
1544{
1545 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1546 smp_mb();
1547}
1548
d0b2e944
AE
1549static bool img_request_layered_test(struct rbd_img_request *img_request)
1550{
1551 smp_mb();
1552 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1553}
1554
6e2a4505
AE
1555static void
1556rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1557{
b9434c5b
AE
1558 u64 xferred = obj_request->xferred;
1559 u64 length = obj_request->length;
1560
6e2a4505
AE
1561 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1562 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1563 xferred, length);
6e2a4505
AE
1564 /*
1565 * ENOENT means a hole in the image. We zero-fill the
1566 * entire length of the request. A short read also implies
1567 * zero-fill to the end of the request. Either way we
1568 * update the xferred count to indicate the whole request
1569 * was satisfied.
1570 */
b9434c5b 1571 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1572 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1573 if (obj_request->type == OBJ_REQUEST_BIO)
1574 zero_bio_chain(obj_request->bio_list, 0);
1575 else
1576 zero_pages(obj_request->pages, 0, length);
6e2a4505 1577 obj_request->result = 0;
b9434c5b
AE
1578 obj_request->xferred = length;
1579 } else if (xferred < length && !obj_request->result) {
1580 if (obj_request->type == OBJ_REQUEST_BIO)
1581 zero_bio_chain(obj_request->bio_list, xferred);
1582 else
1583 zero_pages(obj_request->pages, xferred, length);
1584 obj_request->xferred = length;
6e2a4505
AE
1585 }
1586 obj_request_done_set(obj_request);
1587}
1588
bf0d5f50
AE
1589static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1590{
37206ee5
AE
1591 dout("%s: obj %p cb %p\n", __func__, obj_request,
1592 obj_request->callback);
bf0d5f50
AE
1593 if (obj_request->callback)
1594 obj_request->callback(obj_request);
788e2df3
AE
1595 else
1596 complete_all(&obj_request->completion);
bf0d5f50
AE
1597}
1598
c47f9371 1599static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1600{
1601 dout("%s: obj %p\n", __func__, obj_request);
1602 obj_request_done_set(obj_request);
1603}
1604
c47f9371 1605static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1606{
57acbaa7 1607 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1608 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1609 bool layered = false;
1610
1611 if (obj_request_img_data_test(obj_request)) {
1612 img_request = obj_request->img_request;
1613 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1614 rbd_dev = img_request->rbd_dev;
57acbaa7 1615 }
8b3e1a56
AE
1616
1617 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1618 obj_request, img_request, obj_request->result,
1619 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1620 if (layered && obj_request->result == -ENOENT &&
1621 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1622 rbd_img_parent_read(obj_request);
1623 else if (img_request)
6e2a4505
AE
1624 rbd_img_obj_request_read_callback(obj_request);
1625 else
1626 obj_request_done_set(obj_request);
bf0d5f50
AE
1627}
1628
c47f9371 1629static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1630{
1b83bef2
SW
1631 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1632 obj_request->result, obj_request->length);
1633 /*
8b3e1a56
AE
1634 * There is no such thing as a successful short write. Set
1635 * it to our originally-requested length.
1b83bef2
SW
1636 */
1637 obj_request->xferred = obj_request->length;
07741308 1638 obj_request_done_set(obj_request);
bf0d5f50
AE
1639}
1640
fbfab539
AE
1641/*
1642 * For a simple stat call there's nothing to do. We'll do more if
1643 * this is part of a write sequence for a layered image.
1644 */
c47f9371 1645static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1646{
37206ee5 1647 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1648 obj_request_done_set(obj_request);
1649}
1650
bf0d5f50
AE
1651static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1652 struct ceph_msg *msg)
1653{
1654 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1655 u16 opcode;
1656
37206ee5 1657 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1658 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1659 if (obj_request_img_data_test(obj_request)) {
1660 rbd_assert(obj_request->img_request);
1661 rbd_assert(obj_request->which != BAD_WHICH);
1662 } else {
1663 rbd_assert(obj_request->which == BAD_WHICH);
1664 }
bf0d5f50 1665
1b83bef2
SW
1666 if (osd_req->r_result < 0)
1667 obj_request->result = osd_req->r_result;
bf0d5f50 1668
0eefd470 1669 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1670
c47f9371
AE
1671 /*
1672 * We support a 64-bit length, but ultimately it has to be
1673 * passed to blk_end_request(), which takes an unsigned int.
1674 */
1b83bef2 1675 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1676 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1677 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1678 switch (opcode) {
1679 case CEPH_OSD_OP_READ:
c47f9371 1680 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1681 break;
1682 case CEPH_OSD_OP_WRITE:
c47f9371 1683 rbd_osd_write_callback(obj_request);
bf0d5f50 1684 break;
fbfab539 1685 case CEPH_OSD_OP_STAT:
c47f9371 1686 rbd_osd_stat_callback(obj_request);
fbfab539 1687 break;
36be9a76 1688 case CEPH_OSD_OP_CALL:
b8d70035 1689 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1690 case CEPH_OSD_OP_WATCH:
c47f9371 1691 rbd_osd_trivial_callback(obj_request);
9969ebc5 1692 break;
bf0d5f50
AE
1693 default:
1694 rbd_warn(NULL, "%s: unsupported op %hu\n",
1695 obj_request->object_name, (unsigned short) opcode);
1696 break;
1697 }
1698
07741308 1699 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1700 rbd_obj_request_complete(obj_request);
1701}
1702
9d4df01f 1703static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1704{
1705 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1706 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1707 u64 snap_id;
430c28c3 1708
8c042b0d 1709 rbd_assert(osd_req != NULL);
430c28c3 1710
9d4df01f 1711 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1712 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1713 NULL, snap_id, NULL);
1714}
1715
1716static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1717{
1718 struct rbd_img_request *img_request = obj_request->img_request;
1719 struct ceph_osd_request *osd_req = obj_request->osd_req;
1720 struct ceph_snap_context *snapc;
1721 struct timespec mtime = CURRENT_TIME;
1722
1723 rbd_assert(osd_req != NULL);
1724
1725 snapc = img_request ? img_request->snapc : NULL;
1726 ceph_osdc_build_request(osd_req, obj_request->offset,
1727 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1728}
1729
bf0d5f50
AE
1730static struct ceph_osd_request *rbd_osd_req_create(
1731 struct rbd_device *rbd_dev,
1732 bool write_request,
430c28c3 1733 struct rbd_obj_request *obj_request)
bf0d5f50 1734{
bf0d5f50
AE
1735 struct ceph_snap_context *snapc = NULL;
1736 struct ceph_osd_client *osdc;
1737 struct ceph_osd_request *osd_req;
bf0d5f50 1738
6365d33a
AE
1739 if (obj_request_img_data_test(obj_request)) {
1740 struct rbd_img_request *img_request = obj_request->img_request;
1741
0c425248
AE
1742 rbd_assert(write_request ==
1743 img_request_write_test(img_request));
1744 if (write_request)
bf0d5f50 1745 snapc = img_request->snapc;
bf0d5f50
AE
1746 }
1747
1748 /* Allocate and initialize the request, for the single op */
1749
1750 osdc = &rbd_dev->rbd_client->client->osdc;
1751 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1752 if (!osd_req)
1753 return NULL; /* ENOMEM */
bf0d5f50 1754
430c28c3 1755 if (write_request)
bf0d5f50 1756 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1757 else
bf0d5f50 1758 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1759
1760 osd_req->r_callback = rbd_osd_req_callback;
1761 osd_req->r_priv = obj_request;
1762
1763 osd_req->r_oid_len = strlen(obj_request->object_name);
1764 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1765 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1766
1767 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1768
bf0d5f50
AE
1769 return osd_req;
1770}
1771
0eefd470
AE
1772/*
1773 * Create a copyup osd request based on the information in the
1774 * object request supplied. A copyup request has two osd ops,
1775 * a copyup method call, and a "normal" write request.
1776 */
1777static struct ceph_osd_request *
1778rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1779{
1780 struct rbd_img_request *img_request;
1781 struct ceph_snap_context *snapc;
1782 struct rbd_device *rbd_dev;
1783 struct ceph_osd_client *osdc;
1784 struct ceph_osd_request *osd_req;
1785
1786 rbd_assert(obj_request_img_data_test(obj_request));
1787 img_request = obj_request->img_request;
1788 rbd_assert(img_request);
1789 rbd_assert(img_request_write_test(img_request));
1790
1791 /* Allocate and initialize the request, for the two ops */
1792
1793 snapc = img_request->snapc;
1794 rbd_dev = img_request->rbd_dev;
1795 osdc = &rbd_dev->rbd_client->client->osdc;
1796 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1797 if (!osd_req)
1798 return NULL; /* ENOMEM */
1799
1800 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1801 osd_req->r_callback = rbd_osd_req_callback;
1802 osd_req->r_priv = obj_request;
1803
1804 osd_req->r_oid_len = strlen(obj_request->object_name);
1805 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1806 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1807
1808 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1809
1810 return osd_req;
1811}
1812
1813
bf0d5f50
AE
1814static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1815{
1816 ceph_osdc_put_request(osd_req);
1817}
1818
1819/* object_name is assumed to be a non-null pointer and NUL-terminated */
1820
1821static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1822 u64 offset, u64 length,
1823 enum obj_request_type type)
1824{
1825 struct rbd_obj_request *obj_request;
1826 size_t size;
1827 char *name;
1828
1829 rbd_assert(obj_request_type_valid(type));
1830
1831 size = strlen(object_name) + 1;
f907ad55
AE
1832 name = kmalloc(size, GFP_KERNEL);
1833 if (!name)
bf0d5f50
AE
1834 return NULL;
1835
868311b1 1836 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1837 if (!obj_request) {
1838 kfree(name);
1839 return NULL;
1840 }
1841
bf0d5f50
AE
1842 obj_request->object_name = memcpy(name, object_name, size);
1843 obj_request->offset = offset;
1844 obj_request->length = length;
926f9b3f 1845 obj_request->flags = 0;
bf0d5f50
AE
1846 obj_request->which = BAD_WHICH;
1847 obj_request->type = type;
1848 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1849 init_completion(&obj_request->completion);
bf0d5f50
AE
1850 kref_init(&obj_request->kref);
1851
37206ee5
AE
1852 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1853 offset, length, (int)type, obj_request);
1854
bf0d5f50
AE
1855 return obj_request;
1856}
1857
1858static void rbd_obj_request_destroy(struct kref *kref)
1859{
1860 struct rbd_obj_request *obj_request;
1861
1862 obj_request = container_of(kref, struct rbd_obj_request, kref);
1863
37206ee5
AE
1864 dout("%s: obj %p\n", __func__, obj_request);
1865
bf0d5f50
AE
1866 rbd_assert(obj_request->img_request == NULL);
1867 rbd_assert(obj_request->which == BAD_WHICH);
1868
1869 if (obj_request->osd_req)
1870 rbd_osd_req_destroy(obj_request->osd_req);
1871
1872 rbd_assert(obj_request_type_valid(obj_request->type));
1873 switch (obj_request->type) {
9969ebc5
AE
1874 case OBJ_REQUEST_NODATA:
1875 break; /* Nothing to do */
bf0d5f50
AE
1876 case OBJ_REQUEST_BIO:
1877 if (obj_request->bio_list)
1878 bio_chain_put(obj_request->bio_list);
1879 break;
788e2df3
AE
1880 case OBJ_REQUEST_PAGES:
1881 if (obj_request->pages)
1882 ceph_release_page_vector(obj_request->pages,
1883 obj_request->page_count);
1884 break;
bf0d5f50
AE
1885 }
1886
f907ad55 1887 kfree(obj_request->object_name);
868311b1
AE
1888 obj_request->object_name = NULL;
1889 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1890}
1891
fb65d228
AE
1892/* It's OK to call this for a device with no parent */
1893
1894static void rbd_spec_put(struct rbd_spec *spec);
1895static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1896{
1897 rbd_dev_remove_parent(rbd_dev);
1898 rbd_spec_put(rbd_dev->parent_spec);
1899 rbd_dev->parent_spec = NULL;
1900 rbd_dev->parent_overlap = 0;
1901}
1902
a2acd00e
AE
1903/*
1904 * Parent image reference counting is used to determine when an
1905 * image's parent fields can be safely torn down--after there are no
1906 * more in-flight requests to the parent image. When the last
1907 * reference is dropped, cleaning them up is safe.
1908 */
1909static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1910{
1911 int counter;
1912
1913 if (!rbd_dev->parent_spec)
1914 return;
1915
1916 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1917 if (counter > 0)
1918 return;
1919
1920 /* Last reference; clean up parent data structures */
1921
1922 if (!counter)
1923 rbd_dev_unparent(rbd_dev);
1924 else
1925 rbd_warn(rbd_dev, "parent reference underflow\n");
1926}
1927
1928/*
1929 * If an image has a non-zero parent overlap, get a reference to its
1930 * parent.
1931 *
1932 * Returns true if the rbd device has a parent with a non-zero
1933 * overlap and a reference for it was successfully taken, or
1934 * false otherwise.
1935 */
1936static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1937{
1938 int counter;
1939
1940 if (!rbd_dev->parent_spec)
1941 return false;
1942
1943 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1944 if (counter > 0 && rbd_dev->parent_overlap)
1945 return true;
1946
1947 /* Image was flattened, but parent is not yet torn down */
1948
1949 if (counter < 0)
1950 rbd_warn(rbd_dev, "parent reference overflow\n");
1951
1952 return false;
1953}
1954
bf0d5f50
AE
1955/*
1956 * Caller is responsible for filling in the list of object requests
1957 * that comprises the image request, and the Linux request pointer
1958 * (if there is one).
1959 */
cc344fa1
AE
1960static struct rbd_img_request *rbd_img_request_create(
1961 struct rbd_device *rbd_dev,
bf0d5f50 1962 u64 offset, u64 length,
e93f3152 1963 bool write_request)
bf0d5f50
AE
1964{
1965 struct rbd_img_request *img_request;
bf0d5f50 1966
1c2a9dfe 1967 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1968 if (!img_request)
1969 return NULL;
1970
1971 if (write_request) {
1972 down_read(&rbd_dev->header_rwsem);
812164f8 1973 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1974 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1975 }
1976
1977 img_request->rq = NULL;
1978 img_request->rbd_dev = rbd_dev;
1979 img_request->offset = offset;
1980 img_request->length = length;
0c425248
AE
1981 img_request->flags = 0;
1982 if (write_request) {
1983 img_request_write_set(img_request);
468521c1 1984 img_request->snapc = rbd_dev->header.snapc;
0c425248 1985 } else {
bf0d5f50 1986 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1987 }
a2acd00e 1988 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 1989 img_request_layered_set(img_request);
bf0d5f50
AE
1990 spin_lock_init(&img_request->completion_lock);
1991 img_request->next_completion = 0;
1992 img_request->callback = NULL;
a5a337d4 1993 img_request->result = 0;
bf0d5f50
AE
1994 img_request->obj_request_count = 0;
1995 INIT_LIST_HEAD(&img_request->obj_requests);
1996 kref_init(&img_request->kref);
1997
37206ee5
AE
1998 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1999 write_request ? "write" : "read", offset, length,
2000 img_request);
2001
bf0d5f50
AE
2002 return img_request;
2003}
2004
2005static void rbd_img_request_destroy(struct kref *kref)
2006{
2007 struct rbd_img_request *img_request;
2008 struct rbd_obj_request *obj_request;
2009 struct rbd_obj_request *next_obj_request;
2010
2011 img_request = container_of(kref, struct rbd_img_request, kref);
2012
37206ee5
AE
2013 dout("%s: img %p\n", __func__, img_request);
2014
bf0d5f50
AE
2015 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2016 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2017 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2018
a2acd00e
AE
2019 if (img_request_layered_test(img_request)) {
2020 img_request_layered_clear(img_request);
2021 rbd_dev_parent_put(img_request->rbd_dev);
2022 }
2023
0c425248 2024 if (img_request_write_test(img_request))
812164f8 2025 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2026
1c2a9dfe 2027 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2028}
2029
e93f3152
AE
2030static struct rbd_img_request *rbd_parent_request_create(
2031 struct rbd_obj_request *obj_request,
2032 u64 img_offset, u64 length)
2033{
2034 struct rbd_img_request *parent_request;
2035 struct rbd_device *rbd_dev;
2036
2037 rbd_assert(obj_request->img_request);
2038 rbd_dev = obj_request->img_request->rbd_dev;
2039
2040 parent_request = rbd_img_request_create(rbd_dev->parent,
2041 img_offset, length, false);
2042 if (!parent_request)
2043 return NULL;
2044
2045 img_request_child_set(parent_request);
2046 rbd_obj_request_get(obj_request);
2047 parent_request->obj_request = obj_request;
2048
2049 return parent_request;
2050}
2051
2052static void rbd_parent_request_destroy(struct kref *kref)
2053{
2054 struct rbd_img_request *parent_request;
2055 struct rbd_obj_request *orig_request;
2056
2057 parent_request = container_of(kref, struct rbd_img_request, kref);
2058 orig_request = parent_request->obj_request;
2059
2060 parent_request->obj_request = NULL;
2061 rbd_obj_request_put(orig_request);
2062 img_request_child_clear(parent_request);
2063
2064 rbd_img_request_destroy(kref);
2065}
2066
1217857f
AE
2067static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2068{
6365d33a 2069 struct rbd_img_request *img_request;
1217857f
AE
2070 unsigned int xferred;
2071 int result;
8b3e1a56 2072 bool more;
1217857f 2073
6365d33a
AE
2074 rbd_assert(obj_request_img_data_test(obj_request));
2075 img_request = obj_request->img_request;
2076
1217857f
AE
2077 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2078 xferred = (unsigned int)obj_request->xferred;
2079 result = obj_request->result;
2080 if (result) {
2081 struct rbd_device *rbd_dev = img_request->rbd_dev;
2082
2083 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2084 img_request_write_test(img_request) ? "write" : "read",
2085 obj_request->length, obj_request->img_offset,
2086 obj_request->offset);
2087 rbd_warn(rbd_dev, " result %d xferred %x\n",
2088 result, xferred);
2089 if (!img_request->result)
2090 img_request->result = result;
2091 }
2092
f1a4739f
AE
2093 /* Image object requests don't own their page array */
2094
2095 if (obj_request->type == OBJ_REQUEST_PAGES) {
2096 obj_request->pages = NULL;
2097 obj_request->page_count = 0;
2098 }
2099
8b3e1a56
AE
2100 if (img_request_child_test(img_request)) {
2101 rbd_assert(img_request->obj_request != NULL);
2102 more = obj_request->which < img_request->obj_request_count - 1;
2103 } else {
2104 rbd_assert(img_request->rq != NULL);
2105 more = blk_end_request(img_request->rq, result, xferred);
2106 }
2107
2108 return more;
1217857f
AE
2109}
2110
2169238d
AE
2111static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2112{
2113 struct rbd_img_request *img_request;
2114 u32 which = obj_request->which;
2115 bool more = true;
2116
6365d33a 2117 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2118 img_request = obj_request->img_request;
2119
2120 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2121 rbd_assert(img_request != NULL);
2169238d
AE
2122 rbd_assert(img_request->obj_request_count > 0);
2123 rbd_assert(which != BAD_WHICH);
2124 rbd_assert(which < img_request->obj_request_count);
2125 rbd_assert(which >= img_request->next_completion);
2126
2127 spin_lock_irq(&img_request->completion_lock);
2128 if (which != img_request->next_completion)
2129 goto out;
2130
2131 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2132 rbd_assert(more);
2133 rbd_assert(which < img_request->obj_request_count);
2134
2135 if (!obj_request_done_test(obj_request))
2136 break;
1217857f 2137 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2138 which++;
2139 }
2140
2141 rbd_assert(more ^ (which == img_request->obj_request_count));
2142 img_request->next_completion = which;
2143out:
2144 spin_unlock_irq(&img_request->completion_lock);
2145
2146 if (!more)
2147 rbd_img_request_complete(img_request);
2148}
2149
f1a4739f
AE
2150/*
2151 * Split up an image request into one or more object requests, each
2152 * to a different object. The "type" parameter indicates whether
2153 * "data_desc" is the pointer to the head of a list of bio
2154 * structures, or the base of a page array. In either case this
2155 * function assumes data_desc describes memory sufficient to hold
2156 * all data described by the image request.
2157 */
2158static int rbd_img_request_fill(struct rbd_img_request *img_request,
2159 enum obj_request_type type,
2160 void *data_desc)
bf0d5f50
AE
2161{
2162 struct rbd_device *rbd_dev = img_request->rbd_dev;
2163 struct rbd_obj_request *obj_request = NULL;
2164 struct rbd_obj_request *next_obj_request;
0c425248 2165 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
2166 struct bio *bio_list;
2167 unsigned int bio_offset = 0;
2168 struct page **pages;
7da22d29 2169 u64 img_offset;
bf0d5f50
AE
2170 u64 resid;
2171 u16 opcode;
2172
f1a4739f
AE
2173 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2174 (int)type, data_desc);
37206ee5 2175
430c28c3 2176 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2177 img_offset = img_request->offset;
bf0d5f50 2178 resid = img_request->length;
4dda41d3 2179 rbd_assert(resid > 0);
f1a4739f
AE
2180
2181 if (type == OBJ_REQUEST_BIO) {
2182 bio_list = data_desc;
2183 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2184 } else {
2185 rbd_assert(type == OBJ_REQUEST_PAGES);
2186 pages = data_desc;
2187 }
2188
bf0d5f50 2189 while (resid) {
2fa12320 2190 struct ceph_osd_request *osd_req;
bf0d5f50 2191 const char *object_name;
bf0d5f50
AE
2192 u64 offset;
2193 u64 length;
2194
7da22d29 2195 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2196 if (!object_name)
2197 goto out_unwind;
7da22d29
AE
2198 offset = rbd_segment_offset(rbd_dev, img_offset);
2199 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2200 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2201 offset, length, type);
78c2a44a
AE
2202 /* object request has its own copy of the object name */
2203 rbd_segment_name_free(object_name);
bf0d5f50
AE
2204 if (!obj_request)
2205 goto out_unwind;
2206
f1a4739f
AE
2207 if (type == OBJ_REQUEST_BIO) {
2208 unsigned int clone_size;
2209
2210 rbd_assert(length <= (u64)UINT_MAX);
2211 clone_size = (unsigned int)length;
2212 obj_request->bio_list =
2213 bio_chain_clone_range(&bio_list,
2214 &bio_offset,
2215 clone_size,
2216 GFP_ATOMIC);
2217 if (!obj_request->bio_list)
2218 goto out_partial;
2219 } else {
2220 unsigned int page_count;
2221
2222 obj_request->pages = pages;
2223 page_count = (u32)calc_pages_for(offset, length);
2224 obj_request->page_count = page_count;
2225 if ((offset + length) & ~PAGE_MASK)
2226 page_count--; /* more on last page */
2227 pages += page_count;
2228 }
bf0d5f50 2229
2fa12320
AE
2230 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2231 obj_request);
2232 if (!osd_req)
bf0d5f50 2233 goto out_partial;
2fa12320 2234 obj_request->osd_req = osd_req;
2169238d 2235 obj_request->callback = rbd_img_obj_callback;
430c28c3 2236
2fa12320
AE
2237 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2238 0, 0);
f1a4739f
AE
2239 if (type == OBJ_REQUEST_BIO)
2240 osd_req_op_extent_osd_data_bio(osd_req, 0,
2241 obj_request->bio_list, length);
2242 else
2243 osd_req_op_extent_osd_data_pages(osd_req, 0,
2244 obj_request->pages, length,
2245 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2246
2247 if (write_request)
2248 rbd_osd_req_format_write(obj_request);
2249 else
2250 rbd_osd_req_format_read(obj_request);
430c28c3 2251
7da22d29 2252 obj_request->img_offset = img_offset;
bf0d5f50
AE
2253 rbd_img_obj_request_add(img_request, obj_request);
2254
7da22d29 2255 img_offset += length;
bf0d5f50
AE
2256 resid -= length;
2257 }
2258
2259 return 0;
2260
2261out_partial:
2262 rbd_obj_request_put(obj_request);
2263out_unwind:
2264 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2265 rbd_obj_request_put(obj_request);
2266
2267 return -ENOMEM;
2268}
2269
0eefd470
AE
2270static void
2271rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2272{
2273 struct rbd_img_request *img_request;
2274 struct rbd_device *rbd_dev;
ebda6408 2275 struct page **pages;
0eefd470
AE
2276 u32 page_count;
2277
2278 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2279 rbd_assert(obj_request_img_data_test(obj_request));
2280 img_request = obj_request->img_request;
2281 rbd_assert(img_request);
2282
2283 rbd_dev = img_request->rbd_dev;
2284 rbd_assert(rbd_dev);
0eefd470 2285
ebda6408
AE
2286 pages = obj_request->copyup_pages;
2287 rbd_assert(pages != NULL);
0eefd470 2288 obj_request->copyup_pages = NULL;
ebda6408
AE
2289 page_count = obj_request->copyup_page_count;
2290 rbd_assert(page_count);
2291 obj_request->copyup_page_count = 0;
2292 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2293
2294 /*
2295 * We want the transfer count to reflect the size of the
2296 * original write request. There is no such thing as a
2297 * successful short write, so if the request was successful
2298 * we can just set it to the originally-requested length.
2299 */
2300 if (!obj_request->result)
2301 obj_request->xferred = obj_request->length;
2302
2303 /* Finish up with the normal image object callback */
2304
2305 rbd_img_obj_callback(obj_request);
2306}
2307
3d7efd18
AE
2308static void
2309rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2310{
2311 struct rbd_obj_request *orig_request;
0eefd470
AE
2312 struct ceph_osd_request *osd_req;
2313 struct ceph_osd_client *osdc;
2314 struct rbd_device *rbd_dev;
3d7efd18 2315 struct page **pages;
ebda6408 2316 u32 page_count;
3d7efd18 2317 int result;
ebda6408 2318 u64 parent_length;
b91f09f1
AE
2319 u64 offset;
2320 u64 length;
3d7efd18
AE
2321
2322 rbd_assert(img_request_child_test(img_request));
2323
2324 /* First get what we need from the image request */
2325
2326 pages = img_request->copyup_pages;
2327 rbd_assert(pages != NULL);
2328 img_request->copyup_pages = NULL;
ebda6408
AE
2329 page_count = img_request->copyup_page_count;
2330 rbd_assert(page_count);
2331 img_request->copyup_page_count = 0;
3d7efd18
AE
2332
2333 orig_request = img_request->obj_request;
2334 rbd_assert(orig_request != NULL);
b91f09f1 2335 rbd_assert(obj_request_type_valid(orig_request->type));
3d7efd18 2336 result = img_request->result;
ebda6408
AE
2337 parent_length = img_request->length;
2338 rbd_assert(parent_length == img_request->xferred);
91c6febb 2339 rbd_img_request_put(img_request);
3d7efd18 2340
91c6febb
AE
2341 rbd_assert(orig_request->img_request);
2342 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2343 rbd_assert(rbd_dev);
0eefd470 2344
0eefd470
AE
2345 if (result)
2346 goto out_err;
2347
8785b1d4
AE
2348 /*
2349 * The original osd request is of no use to use any more.
2350 * We need a new one that can hold the two ops in a copyup
2351 * request. Allocate the new copyup osd request for the
2352 * original request, and release the old one.
2353 */
0eefd470 2354 result = -ENOMEM;
0eefd470
AE
2355 osd_req = rbd_osd_req_create_copyup(orig_request);
2356 if (!osd_req)
2357 goto out_err;
8785b1d4 2358 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2359 orig_request->osd_req = osd_req;
2360 orig_request->copyup_pages = pages;
ebda6408 2361 orig_request->copyup_page_count = page_count;
3d7efd18 2362
0eefd470 2363 /* Initialize the copyup op */
3d7efd18 2364
0eefd470 2365 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2366 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2367 false, false);
3d7efd18 2368
0eefd470
AE
2369 /* Then the original write request op */
2370
b91f09f1
AE
2371 offset = orig_request->offset;
2372 length = orig_request->length;
0eefd470 2373 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2374 offset, length, 0, 0);
2375 if (orig_request->type == OBJ_REQUEST_BIO)
2376 osd_req_op_extent_osd_data_bio(osd_req, 1,
2377 orig_request->bio_list, length);
2378 else
2379 osd_req_op_extent_osd_data_pages(osd_req, 1,
2380 orig_request->pages, length,
2381 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2382
2383 rbd_osd_req_format_write(orig_request);
2384
2385 /* All set, send it off. */
2386
2387 orig_request->callback = rbd_img_obj_copyup_callback;
2388 osdc = &rbd_dev->rbd_client->client->osdc;
2389 result = rbd_obj_request_submit(osdc, orig_request);
2390 if (!result)
2391 return;
2392out_err:
2393 /* Record the error code and complete the request */
2394
2395 orig_request->result = result;
2396 orig_request->xferred = 0;
2397 obj_request_done_set(orig_request);
2398 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2399}
2400
2401/*
2402 * Read from the parent image the range of data that covers the
2403 * entire target of the given object request. This is used for
2404 * satisfying a layered image write request when the target of an
2405 * object request from the image request does not exist.
2406 *
2407 * A page array big enough to hold the returned data is allocated
2408 * and supplied to rbd_img_request_fill() as the "data descriptor."
2409 * When the read completes, this page array will be transferred to
2410 * the original object request for the copyup operation.
2411 *
2412 * If an error occurs, record it as the result of the original
2413 * object request and mark it done so it gets completed.
2414 */
2415static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2416{
2417 struct rbd_img_request *img_request = NULL;
2418 struct rbd_img_request *parent_request = NULL;
2419 struct rbd_device *rbd_dev;
2420 u64 img_offset;
2421 u64 length;
2422 struct page **pages = NULL;
2423 u32 page_count;
2424 int result;
2425
2426 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2427 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2428
2429 img_request = obj_request->img_request;
2430 rbd_assert(img_request != NULL);
2431 rbd_dev = img_request->rbd_dev;
2432 rbd_assert(rbd_dev->parent != NULL);
2433
2434 /*
2435 * Determine the byte range covered by the object in the
2436 * child image to which the original request was to be sent.
2437 */
2438 img_offset = obj_request->img_offset - obj_request->offset;
2439 length = (u64)1 << rbd_dev->header.obj_order;
2440
a9e8ba2c
AE
2441 /*
2442 * There is no defined parent data beyond the parent
2443 * overlap, so limit what we read at that boundary if
2444 * necessary.
2445 */
2446 if (img_offset + length > rbd_dev->parent_overlap) {
2447 rbd_assert(img_offset < rbd_dev->parent_overlap);
2448 length = rbd_dev->parent_overlap - img_offset;
2449 }
2450
3d7efd18
AE
2451 /*
2452 * Allocate a page array big enough to receive the data read
2453 * from the parent.
2454 */
2455 page_count = (u32)calc_pages_for(0, length);
2456 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2457 if (IS_ERR(pages)) {
2458 result = PTR_ERR(pages);
2459 pages = NULL;
2460 goto out_err;
2461 }
2462
2463 result = -ENOMEM;
e93f3152
AE
2464 parent_request = rbd_parent_request_create(obj_request,
2465 img_offset, length);
3d7efd18
AE
2466 if (!parent_request)
2467 goto out_err;
3d7efd18
AE
2468
2469 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2470 if (result)
2471 goto out_err;
2472 parent_request->copyup_pages = pages;
ebda6408 2473 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2474
2475 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2476 result = rbd_img_request_submit(parent_request);
2477 if (!result)
2478 return 0;
2479
2480 parent_request->copyup_pages = NULL;
ebda6408 2481 parent_request->copyup_page_count = 0;
3d7efd18
AE
2482 parent_request->obj_request = NULL;
2483 rbd_obj_request_put(obj_request);
2484out_err:
2485 if (pages)
2486 ceph_release_page_vector(pages, page_count);
2487 if (parent_request)
2488 rbd_img_request_put(parent_request);
2489 obj_request->result = result;
2490 obj_request->xferred = 0;
2491 obj_request_done_set(obj_request);
2492
2493 return result;
2494}
2495
c5b5ef6c
AE
2496static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2497{
c5b5ef6c
AE
2498 struct rbd_obj_request *orig_request;
2499 int result;
2500
2501 rbd_assert(!obj_request_img_data_test(obj_request));
2502
2503 /*
2504 * All we need from the object request is the original
2505 * request and the result of the STAT op. Grab those, then
2506 * we're done with the request.
2507 */
2508 orig_request = obj_request->obj_request;
2509 obj_request->obj_request = NULL;
2510 rbd_assert(orig_request);
2511 rbd_assert(orig_request->img_request);
2512
2513 result = obj_request->result;
2514 obj_request->result = 0;
2515
2516 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2517 obj_request, orig_request, result,
2518 obj_request->xferred, obj_request->length);
2519 rbd_obj_request_put(obj_request);
2520
2521 rbd_assert(orig_request);
2522 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2523
2524 /*
2525 * Our only purpose here is to determine whether the object
2526 * exists, and we don't want to treat the non-existence as
2527 * an error. If something else comes back, transfer the
2528 * error to the original request and complete it now.
2529 */
2530 if (!result) {
2531 obj_request_existence_set(orig_request, true);
2532 } else if (result == -ENOENT) {
2533 obj_request_existence_set(orig_request, false);
2534 } else if (result) {
2535 orig_request->result = result;
3d7efd18 2536 goto out;
c5b5ef6c
AE
2537 }
2538
2539 /*
2540 * Resubmit the original request now that we have recorded
2541 * whether the target object exists.
2542 */
b454e36d 2543 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2544out:
c5b5ef6c
AE
2545 if (orig_request->result)
2546 rbd_obj_request_complete(orig_request);
2547 rbd_obj_request_put(orig_request);
2548}
2549
2550static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2551{
2552 struct rbd_obj_request *stat_request;
2553 struct rbd_device *rbd_dev;
2554 struct ceph_osd_client *osdc;
2555 struct page **pages = NULL;
2556 u32 page_count;
2557 size_t size;
2558 int ret;
2559
2560 /*
2561 * The response data for a STAT call consists of:
2562 * le64 length;
2563 * struct {
2564 * le32 tv_sec;
2565 * le32 tv_nsec;
2566 * } mtime;
2567 */
2568 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2569 page_count = (u32)calc_pages_for(0, size);
2570 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2571 if (IS_ERR(pages))
2572 return PTR_ERR(pages);
2573
2574 ret = -ENOMEM;
2575 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2576 OBJ_REQUEST_PAGES);
2577 if (!stat_request)
2578 goto out;
2579
2580 rbd_obj_request_get(obj_request);
2581 stat_request->obj_request = obj_request;
2582 stat_request->pages = pages;
2583 stat_request->page_count = page_count;
2584
2585 rbd_assert(obj_request->img_request);
2586 rbd_dev = obj_request->img_request->rbd_dev;
2587 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2588 stat_request);
2589 if (!stat_request->osd_req)
2590 goto out;
2591 stat_request->callback = rbd_img_obj_exists_callback;
2592
2593 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2594 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2595 false, false);
9d4df01f 2596 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2597
2598 osdc = &rbd_dev->rbd_client->client->osdc;
2599 ret = rbd_obj_request_submit(osdc, stat_request);
2600out:
2601 if (ret)
2602 rbd_obj_request_put(obj_request);
2603
2604 return ret;
2605}
2606
b454e36d
AE
2607static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2608{
2609 struct rbd_img_request *img_request;
a9e8ba2c 2610 struct rbd_device *rbd_dev;
3d7efd18 2611 bool known;
b454e36d
AE
2612
2613 rbd_assert(obj_request_img_data_test(obj_request));
2614
2615 img_request = obj_request->img_request;
2616 rbd_assert(img_request);
a9e8ba2c 2617 rbd_dev = img_request->rbd_dev;
b454e36d 2618
b454e36d 2619 /*
a9e8ba2c
AE
2620 * Only writes to layered images need special handling.
2621 * Reads and non-layered writes are simple object requests.
2622 * Layered writes that start beyond the end of the overlap
2623 * with the parent have no parent data, so they too are
2624 * simple object requests. Finally, if the target object is
2625 * known to already exist, its parent data has already been
2626 * copied, so a write to the object can also be handled as a
2627 * simple object request.
b454e36d
AE
2628 */
2629 if (!img_request_write_test(img_request) ||
2630 !img_request_layered_test(img_request) ||
a9e8ba2c 2631 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2632 ((known = obj_request_known_test(obj_request)) &&
2633 obj_request_exists_test(obj_request))) {
b454e36d
AE
2634
2635 struct rbd_device *rbd_dev;
2636 struct ceph_osd_client *osdc;
2637
2638 rbd_dev = obj_request->img_request->rbd_dev;
2639 osdc = &rbd_dev->rbd_client->client->osdc;
2640
2641 return rbd_obj_request_submit(osdc, obj_request);
2642 }
2643
2644 /*
3d7efd18
AE
2645 * It's a layered write. The target object might exist but
2646 * we may not know that yet. If we know it doesn't exist,
2647 * start by reading the data for the full target object from
2648 * the parent so we can use it for a copyup to the target.
b454e36d 2649 */
3d7efd18
AE
2650 if (known)
2651 return rbd_img_obj_parent_read_full(obj_request);
2652
2653 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2654
2655 return rbd_img_obj_exists_submit(obj_request);
2656}
2657
bf0d5f50
AE
2658static int rbd_img_request_submit(struct rbd_img_request *img_request)
2659{
bf0d5f50 2660 struct rbd_obj_request *obj_request;
46faeed4 2661 struct rbd_obj_request *next_obj_request;
bf0d5f50 2662
37206ee5 2663 dout("%s: img %p\n", __func__, img_request);
46faeed4 2664 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2665 int ret;
2666
b454e36d 2667 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2668 if (ret)
2669 return ret;
bf0d5f50
AE
2670 }
2671
2672 return 0;
2673}
8b3e1a56
AE
2674
2675static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2676{
2677 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2678 struct rbd_device *rbd_dev;
2679 u64 obj_end;
8b3e1a56
AE
2680
2681 rbd_assert(img_request_child_test(img_request));
2682
2683 obj_request = img_request->obj_request;
a9e8ba2c
AE
2684 rbd_assert(obj_request);
2685 rbd_assert(obj_request->img_request);
2686
8b3e1a56 2687 obj_request->result = img_request->result;
a9e8ba2c
AE
2688 if (obj_request->result)
2689 goto out;
2690
2691 /*
2692 * We need to zero anything beyond the parent overlap
2693 * boundary. Since rbd_img_obj_request_read_callback()
2694 * will zero anything beyond the end of a short read, an
2695 * easy way to do this is to pretend the data from the
2696 * parent came up short--ending at the overlap boundary.
2697 */
2698 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2699 obj_end = obj_request->img_offset + obj_request->length;
2700 rbd_dev = obj_request->img_request->rbd_dev;
2701 if (obj_end > rbd_dev->parent_overlap) {
2702 u64 xferred = 0;
2703
2704 if (obj_request->img_offset < rbd_dev->parent_overlap)
2705 xferred = rbd_dev->parent_overlap -
2706 obj_request->img_offset;
8b3e1a56 2707
a9e8ba2c
AE
2708 obj_request->xferred = min(img_request->xferred, xferred);
2709 } else {
2710 obj_request->xferred = img_request->xferred;
2711 }
2712out:
b5b09be3 2713 rbd_img_request_put(img_request);
8b3e1a56
AE
2714 rbd_img_obj_request_read_callback(obj_request);
2715 rbd_obj_request_complete(obj_request);
2716}
2717
2718static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2719{
8b3e1a56
AE
2720 struct rbd_img_request *img_request;
2721 int result;
2722
2723 rbd_assert(obj_request_img_data_test(obj_request));
2724 rbd_assert(obj_request->img_request != NULL);
2725 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2726 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2727
8b3e1a56 2728 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2729 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2730 obj_request->img_offset,
e93f3152 2731 obj_request->length);
8b3e1a56
AE
2732 result = -ENOMEM;
2733 if (!img_request)
2734 goto out_err;
2735
5b2ab72d
AE
2736 if (obj_request->type == OBJ_REQUEST_BIO)
2737 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2738 obj_request->bio_list);
2739 else
2740 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2741 obj_request->pages);
8b3e1a56
AE
2742 if (result)
2743 goto out_err;
2744
2745 img_request->callback = rbd_img_parent_read_callback;
2746 result = rbd_img_request_submit(img_request);
2747 if (result)
2748 goto out_err;
2749
2750 return;
2751out_err:
2752 if (img_request)
2753 rbd_img_request_put(img_request);
2754 obj_request->result = result;
2755 obj_request->xferred = 0;
2756 obj_request_done_set(obj_request);
2757}
bf0d5f50 2758
cc4a38bd 2759static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2760{
2761 struct rbd_obj_request *obj_request;
2169238d 2762 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2763 int ret;
2764
2765 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2766 OBJ_REQUEST_NODATA);
2767 if (!obj_request)
2768 return -ENOMEM;
2769
2770 ret = -ENOMEM;
430c28c3 2771 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2772 if (!obj_request->osd_req)
2773 goto out;
2169238d 2774 obj_request->callback = rbd_obj_request_put;
b8d70035 2775
c99d2d4a 2776 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2777 notify_id, 0, 0);
9d4df01f 2778 rbd_osd_req_format_read(obj_request);
430c28c3 2779
b8d70035 2780 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2781out:
cf81b60e
AE
2782 if (ret)
2783 rbd_obj_request_put(obj_request);
b8d70035
AE
2784
2785 return ret;
2786}
2787
2788static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2789{
2790 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2791 int ret;
b8d70035
AE
2792
2793 if (!rbd_dev)
2794 return;
2795
37206ee5 2796 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2797 rbd_dev->header_name, (unsigned long long)notify_id,
2798 (unsigned int)opcode);
e627db08
AE
2799 ret = rbd_dev_refresh(rbd_dev);
2800 if (ret)
2801 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2802
cc4a38bd 2803 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2804}
2805
9969ebc5
AE
2806/*
2807 * Request sync osd watch/unwatch. The value of "start" determines
2808 * whether a watch request is being initiated or torn down.
2809 */
1f3ef788 2810static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2811{
2812 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2813 struct rbd_obj_request *obj_request;
9969ebc5
AE
2814 int ret;
2815
2816 rbd_assert(start ^ !!rbd_dev->watch_event);
2817 rbd_assert(start ^ !!rbd_dev->watch_request);
2818
2819 if (start) {
3c663bbd 2820 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2821 &rbd_dev->watch_event);
2822 if (ret < 0)
2823 return ret;
8eb87565 2824 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2825 }
2826
2827 ret = -ENOMEM;
2828 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2829 OBJ_REQUEST_NODATA);
2830 if (!obj_request)
2831 goto out_cancel;
2832
430c28c3
AE
2833 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2834 if (!obj_request->osd_req)
2835 goto out_cancel;
2836
8eb87565 2837 if (start)
975241af 2838 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2839 else
6977c3f9 2840 ceph_osdc_unregister_linger_request(osdc,
975241af 2841 rbd_dev->watch_request->osd_req);
2169238d
AE
2842
2843 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2844 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2845 rbd_osd_req_format_write(obj_request);
2169238d 2846
9969ebc5
AE
2847 ret = rbd_obj_request_submit(osdc, obj_request);
2848 if (ret)
2849 goto out_cancel;
2850 ret = rbd_obj_request_wait(obj_request);
2851 if (ret)
2852 goto out_cancel;
9969ebc5
AE
2853 ret = obj_request->result;
2854 if (ret)
2855 goto out_cancel;
2856
8eb87565
AE
2857 /*
2858 * A watch request is set to linger, so the underlying osd
2859 * request won't go away until we unregister it. We retain
2860 * a pointer to the object request during that time (in
2861 * rbd_dev->watch_request), so we'll keep a reference to
2862 * it. We'll drop that reference (below) after we've
2863 * unregistered it.
2864 */
2865 if (start) {
2866 rbd_dev->watch_request = obj_request;
2867
2868 return 0;
2869 }
2870
2871 /* We have successfully torn down the watch request */
2872
2873 rbd_obj_request_put(rbd_dev->watch_request);
2874 rbd_dev->watch_request = NULL;
9969ebc5
AE
2875out_cancel:
2876 /* Cancel the event if we're tearing down, or on error */
2877 ceph_osdc_cancel_event(rbd_dev->watch_event);
2878 rbd_dev->watch_event = NULL;
9969ebc5
AE
2879 if (obj_request)
2880 rbd_obj_request_put(obj_request);
2881
2882 return ret;
2883}
2884
36be9a76 2885/*
f40eb349
AE
2886 * Synchronous osd object method call. Returns the number of bytes
2887 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2888 */
2889static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2890 const char *object_name,
2891 const char *class_name,
2892 const char *method_name,
4157976b 2893 const void *outbound,
36be9a76 2894 size_t outbound_size,
4157976b 2895 void *inbound,
e2a58ee5 2896 size_t inbound_size)
36be9a76 2897{
2169238d 2898 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2899 struct rbd_obj_request *obj_request;
36be9a76
AE
2900 struct page **pages;
2901 u32 page_count;
2902 int ret;
2903
2904 /*
6010a451
AE
2905 * Method calls are ultimately read operations. The result
2906 * should placed into the inbound buffer provided. They
2907 * also supply outbound data--parameters for the object
2908 * method. Currently if this is present it will be a
2909 * snapshot id.
36be9a76 2910 */
57385b51 2911 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2912 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2913 if (IS_ERR(pages))
2914 return PTR_ERR(pages);
2915
2916 ret = -ENOMEM;
6010a451 2917 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2918 OBJ_REQUEST_PAGES);
2919 if (!obj_request)
2920 goto out;
2921
2922 obj_request->pages = pages;
2923 obj_request->page_count = page_count;
2924
430c28c3 2925 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2926 if (!obj_request->osd_req)
2927 goto out;
2928
c99d2d4a 2929 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2930 class_name, method_name);
2931 if (outbound_size) {
2932 struct ceph_pagelist *pagelist;
2933
2934 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2935 if (!pagelist)
2936 goto out;
2937
2938 ceph_pagelist_init(pagelist);
2939 ceph_pagelist_append(pagelist, outbound, outbound_size);
2940 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2941 pagelist);
2942 }
a4ce40a9
AE
2943 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2944 obj_request->pages, inbound_size,
44cd188d 2945 0, false, false);
9d4df01f 2946 rbd_osd_req_format_read(obj_request);
430c28c3 2947
36be9a76
AE
2948 ret = rbd_obj_request_submit(osdc, obj_request);
2949 if (ret)
2950 goto out;
2951 ret = rbd_obj_request_wait(obj_request);
2952 if (ret)
2953 goto out;
2954
2955 ret = obj_request->result;
2956 if (ret < 0)
2957 goto out;
57385b51
AE
2958
2959 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2960 ret = (int)obj_request->xferred;
903bb32e 2961 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2962out:
2963 if (obj_request)
2964 rbd_obj_request_put(obj_request);
2965 else
2966 ceph_release_page_vector(pages, page_count);
2967
2968 return ret;
2969}
2970
bf0d5f50 2971static void rbd_request_fn(struct request_queue *q)
cc344fa1 2972 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2973{
2974 struct rbd_device *rbd_dev = q->queuedata;
2975 bool read_only = rbd_dev->mapping.read_only;
2976 struct request *rq;
2977 int result;
2978
2979 while ((rq = blk_fetch_request(q))) {
2980 bool write_request = rq_data_dir(rq) == WRITE;
2981 struct rbd_img_request *img_request;
2982 u64 offset;
2983 u64 length;
2984
2985 /* Ignore any non-FS requests that filter through. */
2986
2987 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2988 dout("%s: non-fs request type %d\n", __func__,
2989 (int) rq->cmd_type);
2990 __blk_end_request_all(rq, 0);
2991 continue;
2992 }
2993
2994 /* Ignore/skip any zero-length requests */
2995
2996 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2997 length = (u64) blk_rq_bytes(rq);
2998
2999 if (!length) {
3000 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3001 __blk_end_request_all(rq, 0);
3002 continue;
3003 }
3004
3005 spin_unlock_irq(q->queue_lock);
3006
3007 /* Disallow writes to a read-only device */
3008
3009 if (write_request) {
3010 result = -EROFS;
3011 if (read_only)
3012 goto end_request;
3013 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3014 }
3015
6d292906
AE
3016 /*
3017 * Quit early if the mapped snapshot no longer
3018 * exists. It's still possible the snapshot will
3019 * have disappeared by the time our request arrives
3020 * at the osd, but there's no sense in sending it if
3021 * we already know.
3022 */
3023 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3024 dout("request for non-existent snapshot");
3025 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3026 result = -ENXIO;
3027 goto end_request;
3028 }
3029
bf0d5f50 3030 result = -EINVAL;
c0cd10db
AE
3031 if (offset && length > U64_MAX - offset + 1) {
3032 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3033 offset, length);
bf0d5f50 3034 goto end_request; /* Shouldn't happen */
c0cd10db 3035 }
bf0d5f50 3036
00a653e2
AE
3037 result = -EIO;
3038 if (offset + length > rbd_dev->mapping.size) {
3039 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3040 offset, length, rbd_dev->mapping.size);
3041 goto end_request;
3042 }
3043
bf0d5f50
AE
3044 result = -ENOMEM;
3045 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3046 write_request);
bf0d5f50
AE
3047 if (!img_request)
3048 goto end_request;
3049
3050 img_request->rq = rq;
3051
f1a4739f
AE
3052 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3053 rq->bio);
bf0d5f50
AE
3054 if (!result)
3055 result = rbd_img_request_submit(img_request);
3056 if (result)
3057 rbd_img_request_put(img_request);
3058end_request:
3059 spin_lock_irq(q->queue_lock);
3060 if (result < 0) {
7da22d29
AE
3061 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3062 write_request ? "write" : "read",
3063 length, offset, result);
3064
bf0d5f50
AE
3065 __blk_end_request_all(rq, result);
3066 }
3067 }
3068}
3069
602adf40
YS
3070/*
3071 * a queue callback. Makes sure that we don't create a bio that spans across
3072 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3073 * which we handle later at bio_chain_clone_range()
602adf40
YS
3074 */
3075static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3076 struct bio_vec *bvec)
3077{
3078 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3079 sector_t sector_offset;
3080 sector_t sectors_per_obj;
3081 sector_t obj_sector_offset;
3082 int ret;
3083
3084 /*
3085 * Find how far into its rbd object the partition-relative
3086 * bio start sector is to offset relative to the enclosing
3087 * device.
3088 */
3089 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3090 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3091 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3092
3093 /*
3094 * Compute the number of bytes from that offset to the end
3095 * of the object. Account for what's already used by the bio.
3096 */
3097 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3098 if (ret > bmd->bi_size)
3099 ret -= bmd->bi_size;
3100 else
3101 ret = 0;
3102
3103 /*
3104 * Don't send back more than was asked for. And if the bio
3105 * was empty, let the whole thing through because: "Note
3106 * that a block device *must* allow a single page to be
3107 * added to an empty bio."
3108 */
3109 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3110 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3111 ret = (int) bvec->bv_len;
3112
3113 return ret;
602adf40
YS
3114}
3115
3116static void rbd_free_disk(struct rbd_device *rbd_dev)
3117{
3118 struct gendisk *disk = rbd_dev->disk;
3119
3120 if (!disk)
3121 return;
3122
a0cab924
AE
3123 rbd_dev->disk = NULL;
3124 if (disk->flags & GENHD_FL_UP) {
602adf40 3125 del_gendisk(disk);
a0cab924
AE
3126 if (disk->queue)
3127 blk_cleanup_queue(disk->queue);
3128 }
602adf40
YS
3129 put_disk(disk);
3130}
3131
788e2df3
AE
3132static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3133 const char *object_name,
7097f8df 3134 u64 offset, u64 length, void *buf)
788e2df3
AE
3135
3136{
2169238d 3137 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3138 struct rbd_obj_request *obj_request;
788e2df3
AE
3139 struct page **pages = NULL;
3140 u32 page_count;
1ceae7ef 3141 size_t size;
788e2df3
AE
3142 int ret;
3143
3144 page_count = (u32) calc_pages_for(offset, length);
3145 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3146 if (IS_ERR(pages))
3147 ret = PTR_ERR(pages);
3148
3149 ret = -ENOMEM;
3150 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3151 OBJ_REQUEST_PAGES);
788e2df3
AE
3152 if (!obj_request)
3153 goto out;
3154
3155 obj_request->pages = pages;
3156 obj_request->page_count = page_count;
3157
430c28c3 3158 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3159 if (!obj_request->osd_req)
3160 goto out;
3161
c99d2d4a
AE
3162 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3163 offset, length, 0, 0);
406e2c9f 3164 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3165 obj_request->pages,
44cd188d
AE
3166 obj_request->length,
3167 obj_request->offset & ~PAGE_MASK,
3168 false, false);
9d4df01f 3169 rbd_osd_req_format_read(obj_request);
430c28c3 3170
788e2df3
AE
3171 ret = rbd_obj_request_submit(osdc, obj_request);
3172 if (ret)
3173 goto out;
3174 ret = rbd_obj_request_wait(obj_request);
3175 if (ret)
3176 goto out;
3177
3178 ret = obj_request->result;
3179 if (ret < 0)
3180 goto out;
1ceae7ef
AE
3181
3182 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3183 size = (size_t) obj_request->xferred;
903bb32e 3184 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3185 rbd_assert(size <= (size_t)INT_MAX);
3186 ret = (int)size;
788e2df3
AE
3187out:
3188 if (obj_request)
3189 rbd_obj_request_put(obj_request);
3190 else
3191 ceph_release_page_vector(pages, page_count);
3192
3193 return ret;
3194}
3195
602adf40 3196/*
662518b1
AE
3197 * Read the complete header for the given rbd device. On successful
3198 * return, the rbd_dev->header field will contain up-to-date
3199 * information about the image.
602adf40 3200 */
99a41ebc 3201static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3202{
4156d998 3203 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3204 u32 snap_count = 0;
4156d998
AE
3205 u64 names_size = 0;
3206 u32 want_count;
3207 int ret;
602adf40 3208
00f1f36f 3209 /*
4156d998
AE
3210 * The complete header will include an array of its 64-bit
3211 * snapshot ids, followed by the names of those snapshots as
3212 * a contiguous block of NUL-terminated strings. Note that
3213 * the number of snapshots could change by the time we read
3214 * it in, in which case we re-read it.
00f1f36f 3215 */
4156d998
AE
3216 do {
3217 size_t size;
3218
3219 kfree(ondisk);
3220
3221 size = sizeof (*ondisk);
3222 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3223 size += names_size;
3224 ondisk = kmalloc(size, GFP_KERNEL);
3225 if (!ondisk)
662518b1 3226 return -ENOMEM;
4156d998 3227
788e2df3 3228 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3229 0, size, ondisk);
4156d998 3230 if (ret < 0)
662518b1 3231 goto out;
c0cd10db 3232 if ((size_t)ret < size) {
4156d998 3233 ret = -ENXIO;
06ecc6cb
AE
3234 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3235 size, ret);
662518b1 3236 goto out;
4156d998
AE
3237 }
3238 if (!rbd_dev_ondisk_valid(ondisk)) {
3239 ret = -ENXIO;
06ecc6cb 3240 rbd_warn(rbd_dev, "invalid header");
662518b1 3241 goto out;
81e759fb 3242 }
602adf40 3243
4156d998
AE
3244 names_size = le64_to_cpu(ondisk->snap_names_len);
3245 want_count = snap_count;
3246 snap_count = le32_to_cpu(ondisk->snap_count);
3247 } while (snap_count != want_count);
00f1f36f 3248
662518b1
AE
3249 ret = rbd_header_from_disk(rbd_dev, ondisk);
3250out:
4156d998
AE
3251 kfree(ondisk);
3252
3253 return ret;
602adf40
YS
3254}
3255
15228ede
AE
3256/*
3257 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3258 * has disappeared from the (just updated) snapshot context.
3259 */
3260static void rbd_exists_validate(struct rbd_device *rbd_dev)
3261{
3262 u64 snap_id;
3263
3264 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3265 return;
3266
3267 snap_id = rbd_dev->spec->snap_id;
3268 if (snap_id == CEPH_NOSNAP)
3269 return;
3270
3271 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3272 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3273}
3274
cc4a38bd 3275static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3276{
e627db08 3277 u64 mapping_size;
1fe5e993
AE
3278 int ret;
3279
117973fb 3280 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3281 mapping_size = rbd_dev->mapping.size;
1fe5e993 3282 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3283 if (rbd_dev->image_format == 1)
99a41ebc 3284 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3285 else
2df3fac7 3286 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3287
3288 /* If it's a mapped snapshot, validate its EXISTS flag */
3289
3290 rbd_exists_validate(rbd_dev);
1fe5e993 3291 mutex_unlock(&ctl_mutex);
00a653e2
AE
3292 if (mapping_size != rbd_dev->mapping.size) {
3293 sector_t size;
3294
3295 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3296 dout("setting size to %llu sectors", (unsigned long long)size);
3297 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3298 revalidate_disk(rbd_dev->disk);
00a653e2 3299 }
1fe5e993
AE
3300
3301 return ret;
3302}
3303
602adf40
YS
3304static int rbd_init_disk(struct rbd_device *rbd_dev)
3305{
3306 struct gendisk *disk;
3307 struct request_queue *q;
593a9e7b 3308 u64 segment_size;
602adf40 3309
602adf40 3310 /* create gendisk info */
602adf40
YS
3311 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3312 if (!disk)
1fcdb8aa 3313 return -ENOMEM;
602adf40 3314
f0f8cef5 3315 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3316 rbd_dev->dev_id);
602adf40
YS
3317 disk->major = rbd_dev->major;
3318 disk->first_minor = 0;
3319 disk->fops = &rbd_bd_ops;
3320 disk->private_data = rbd_dev;
3321
bf0d5f50 3322 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3323 if (!q)
3324 goto out_disk;
029bcbd8 3325
593a9e7b
AE
3326 /* We use the default size, but let's be explicit about it. */
3327 blk_queue_physical_block_size(q, SECTOR_SIZE);
3328
029bcbd8 3329 /* set io sizes to object size */
593a9e7b
AE
3330 segment_size = rbd_obj_bytes(&rbd_dev->header);
3331 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3332 blk_queue_max_segment_size(q, segment_size);
3333 blk_queue_io_min(q, segment_size);
3334 blk_queue_io_opt(q, segment_size);
029bcbd8 3335
602adf40
YS
3336 blk_queue_merge_bvec(q, rbd_merge_bvec);
3337 disk->queue = q;
3338
3339 q->queuedata = rbd_dev;
3340
3341 rbd_dev->disk = disk;
602adf40 3342
602adf40 3343 return 0;
602adf40
YS
3344out_disk:
3345 put_disk(disk);
1fcdb8aa
AE
3346
3347 return -ENOMEM;
602adf40
YS
3348}
3349
dfc5606d
YS
3350/*
3351 sysfs
3352*/
3353
593a9e7b
AE
3354static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3355{
3356 return container_of(dev, struct rbd_device, dev);
3357}
3358
dfc5606d
YS
3359static ssize_t rbd_size_show(struct device *dev,
3360 struct device_attribute *attr, char *buf)
3361{
593a9e7b 3362 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3363
fc71d833
AE
3364 return sprintf(buf, "%llu\n",
3365 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3366}
3367
34b13184
AE
3368/*
3369 * Note this shows the features for whatever's mapped, which is not
3370 * necessarily the base image.
3371 */
3372static ssize_t rbd_features_show(struct device *dev,
3373 struct device_attribute *attr, char *buf)
3374{
3375 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3376
3377 return sprintf(buf, "0x%016llx\n",
fc71d833 3378 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3379}
3380
dfc5606d
YS
3381static ssize_t rbd_major_show(struct device *dev,
3382 struct device_attribute *attr, char *buf)
3383{
593a9e7b 3384 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3385
fc71d833
AE
3386 if (rbd_dev->major)
3387 return sprintf(buf, "%d\n", rbd_dev->major);
3388
3389 return sprintf(buf, "(none)\n");
3390
dfc5606d
YS
3391}
3392
3393static ssize_t rbd_client_id_show(struct device *dev,
3394 struct device_attribute *attr, char *buf)
602adf40 3395{
593a9e7b 3396 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3397
1dbb4399
AE
3398 return sprintf(buf, "client%lld\n",
3399 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3400}
3401
dfc5606d
YS
3402static ssize_t rbd_pool_show(struct device *dev,
3403 struct device_attribute *attr, char *buf)
602adf40 3404{
593a9e7b 3405 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3406
0d7dbfce 3407 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3408}
3409
9bb2f334
AE
3410static ssize_t rbd_pool_id_show(struct device *dev,
3411 struct device_attribute *attr, char *buf)
3412{
3413 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3414
0d7dbfce 3415 return sprintf(buf, "%llu\n",
fc71d833 3416 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3417}
3418
dfc5606d
YS
3419static ssize_t rbd_name_show(struct device *dev,
3420 struct device_attribute *attr, char *buf)
3421{
593a9e7b 3422 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3423
a92ffdf8
AE
3424 if (rbd_dev->spec->image_name)
3425 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3426
3427 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3428}
3429
589d30e0
AE
3430static ssize_t rbd_image_id_show(struct device *dev,
3431 struct device_attribute *attr, char *buf)
3432{
3433 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3434
0d7dbfce 3435 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3436}
3437
34b13184
AE
3438/*
3439 * Shows the name of the currently-mapped snapshot (or
3440 * RBD_SNAP_HEAD_NAME for the base image).
3441 */
dfc5606d
YS
3442static ssize_t rbd_snap_show(struct device *dev,
3443 struct device_attribute *attr,
3444 char *buf)
3445{
593a9e7b 3446 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3447
0d7dbfce 3448 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3449}
3450
86b00e0d
AE
3451/*
3452 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3453 * for the parent image. If there is no parent, simply shows
3454 * "(no parent image)".
3455 */
3456static ssize_t rbd_parent_show(struct device *dev,
3457 struct device_attribute *attr,
3458 char *buf)
3459{
3460 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3461 struct rbd_spec *spec = rbd_dev->parent_spec;
3462 int count;
3463 char *bufp = buf;
3464
3465 if (!spec)
3466 return sprintf(buf, "(no parent image)\n");
3467
3468 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3469 (unsigned long long) spec->pool_id, spec->pool_name);
3470 if (count < 0)
3471 return count;
3472 bufp += count;
3473
3474 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3475 spec->image_name ? spec->image_name : "(unknown)");
3476 if (count < 0)
3477 return count;
3478 bufp += count;
3479
3480 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3481 (unsigned long long) spec->snap_id, spec->snap_name);
3482 if (count < 0)
3483 return count;
3484 bufp += count;
3485
3486 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3487 if (count < 0)
3488 return count;
3489 bufp += count;
3490
3491 return (ssize_t) (bufp - buf);
3492}
3493
dfc5606d
YS
3494static ssize_t rbd_image_refresh(struct device *dev,
3495 struct device_attribute *attr,
3496 const char *buf,
3497 size_t size)
3498{
593a9e7b 3499 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3500 int ret;
602adf40 3501
cc4a38bd 3502 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3503 if (ret)
3504 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3505
3506 return ret < 0 ? ret : size;
dfc5606d 3507}
602adf40 3508
dfc5606d 3509static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3510static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3511static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3512static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3513static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3514static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3515static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3516static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3517static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3518static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3519static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3520
3521static struct attribute *rbd_attrs[] = {
3522 &dev_attr_size.attr,
34b13184 3523 &dev_attr_features.attr,
dfc5606d
YS
3524 &dev_attr_major.attr,
3525 &dev_attr_client_id.attr,
3526 &dev_attr_pool.attr,
9bb2f334 3527 &dev_attr_pool_id.attr,
dfc5606d 3528 &dev_attr_name.attr,
589d30e0 3529 &dev_attr_image_id.attr,
dfc5606d 3530 &dev_attr_current_snap.attr,
86b00e0d 3531 &dev_attr_parent.attr,
dfc5606d 3532 &dev_attr_refresh.attr,
dfc5606d
YS
3533 NULL
3534};
3535
3536static struct attribute_group rbd_attr_group = {
3537 .attrs = rbd_attrs,
3538};
3539
3540static const struct attribute_group *rbd_attr_groups[] = {
3541 &rbd_attr_group,
3542 NULL
3543};
3544
3545static void rbd_sysfs_dev_release(struct device *dev)
3546{
3547}
3548
3549static struct device_type rbd_device_type = {
3550 .name = "rbd",
3551 .groups = rbd_attr_groups,
3552 .release = rbd_sysfs_dev_release,
3553};
3554
8b8fb99c
AE
3555static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3556{
3557 kref_get(&spec->kref);
3558
3559 return spec;
3560}
3561
3562static void rbd_spec_free(struct kref *kref);
3563static void rbd_spec_put(struct rbd_spec *spec)
3564{
3565 if (spec)
3566 kref_put(&spec->kref, rbd_spec_free);
3567}
3568
3569static struct rbd_spec *rbd_spec_alloc(void)
3570{
3571 struct rbd_spec *spec;
3572
3573 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3574 if (!spec)
3575 return NULL;
3576 kref_init(&spec->kref);
3577
8b8fb99c
AE
3578 return spec;
3579}
3580
3581static void rbd_spec_free(struct kref *kref)
3582{
3583 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3584
3585 kfree(spec->pool_name);
3586 kfree(spec->image_id);
3587 kfree(spec->image_name);
3588 kfree(spec->snap_name);
3589 kfree(spec);
3590}
3591
cc344fa1 3592static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3593 struct rbd_spec *spec)
3594{
3595 struct rbd_device *rbd_dev;
3596
3597 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3598 if (!rbd_dev)
3599 return NULL;
3600
3601 spin_lock_init(&rbd_dev->lock);
6d292906 3602 rbd_dev->flags = 0;
a2acd00e 3603 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3604 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3605 init_rwsem(&rbd_dev->header_rwsem);
3606
3607 rbd_dev->spec = spec;
3608 rbd_dev->rbd_client = rbdc;
3609
0903e875
AE
3610 /* Initialize the layout used for all rbd requests */
3611
3612 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3613 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3614 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3615 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3616
c53d5893
AE
3617 return rbd_dev;
3618}
3619
3620static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3621{
c53d5893
AE
3622 rbd_put_client(rbd_dev->rbd_client);
3623 rbd_spec_put(rbd_dev->spec);
3624 kfree(rbd_dev);
3625}
3626
9d475de5
AE
3627/*
3628 * Get the size and object order for an image snapshot, or if
3629 * snap_id is CEPH_NOSNAP, gets this information for the base
3630 * image.
3631 */
3632static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3633 u8 *order, u64 *snap_size)
3634{
3635 __le64 snapid = cpu_to_le64(snap_id);
3636 int ret;
3637 struct {
3638 u8 order;
3639 __le64 size;
3640 } __attribute__ ((packed)) size_buf = { 0 };
3641
36be9a76 3642 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3643 "rbd", "get_size",
4157976b 3644 &snapid, sizeof (snapid),
e2a58ee5 3645 &size_buf, sizeof (size_buf));
36be9a76 3646 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3647 if (ret < 0)
3648 return ret;
57385b51
AE
3649 if (ret < sizeof (size_buf))
3650 return -ERANGE;
9d475de5 3651
c86f86e9
AE
3652 if (order)
3653 *order = size_buf.order;
9d475de5
AE
3654 *snap_size = le64_to_cpu(size_buf.size);
3655
3656 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3657 (unsigned long long)snap_id, (unsigned int)*order,
3658 (unsigned long long)*snap_size);
9d475de5
AE
3659
3660 return 0;
3661}
3662
3663static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3664{
3665 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3666 &rbd_dev->header.obj_order,
3667 &rbd_dev->header.image_size);
3668}
3669
1e130199
AE
3670static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3671{
3672 void *reply_buf;
3673 int ret;
3674 void *p;
3675
3676 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3677 if (!reply_buf)
3678 return -ENOMEM;
3679
36be9a76 3680 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3681 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3682 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3683 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3684 if (ret < 0)
3685 goto out;
3686
3687 p = reply_buf;
3688 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3689 p + ret, NULL, GFP_NOIO);
3690 ret = 0;
1e130199
AE
3691
3692 if (IS_ERR(rbd_dev->header.object_prefix)) {
3693 ret = PTR_ERR(rbd_dev->header.object_prefix);
3694 rbd_dev->header.object_prefix = NULL;
3695 } else {
3696 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3697 }
1e130199
AE
3698out:
3699 kfree(reply_buf);
3700
3701 return ret;
3702}
3703
b1b5402a
AE
3704static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3705 u64 *snap_features)
3706{
3707 __le64 snapid = cpu_to_le64(snap_id);
3708 struct {
3709 __le64 features;
3710 __le64 incompat;
4157976b 3711 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3712 u64 incompat;
b1b5402a
AE
3713 int ret;
3714
36be9a76 3715 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3716 "rbd", "get_features",
4157976b 3717 &snapid, sizeof (snapid),
e2a58ee5 3718 &features_buf, sizeof (features_buf));
36be9a76 3719 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3720 if (ret < 0)
3721 return ret;
57385b51
AE
3722 if (ret < sizeof (features_buf))
3723 return -ERANGE;
d889140c
AE
3724
3725 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3726 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3727 return -ENXIO;
d889140c 3728
b1b5402a
AE
3729 *snap_features = le64_to_cpu(features_buf.features);
3730
3731 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3732 (unsigned long long)snap_id,
3733 (unsigned long long)*snap_features,
3734 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3735
3736 return 0;
3737}
3738
3739static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3740{
3741 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3742 &rbd_dev->header.features);
3743}
3744
86b00e0d
AE
3745static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3746{
3747 struct rbd_spec *parent_spec;
3748 size_t size;
3749 void *reply_buf = NULL;
3750 __le64 snapid;
3751 void *p;
3752 void *end;
642a2537 3753 u64 pool_id;
86b00e0d
AE
3754 char *image_id;
3755 u64 overlap;
86b00e0d
AE
3756 int ret;
3757
3758 parent_spec = rbd_spec_alloc();
3759 if (!parent_spec)
3760 return -ENOMEM;
3761
3762 size = sizeof (__le64) + /* pool_id */
3763 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3764 sizeof (__le64) + /* snap_id */
3765 sizeof (__le64); /* overlap */
3766 reply_buf = kmalloc(size, GFP_KERNEL);
3767 if (!reply_buf) {
3768 ret = -ENOMEM;
3769 goto out_err;
3770 }
3771
3772 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3773 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3774 "rbd", "get_parent",
4157976b 3775 &snapid, sizeof (snapid),
e2a58ee5 3776 reply_buf, size);
36be9a76 3777 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3778 if (ret < 0)
3779 goto out_err;
3780
86b00e0d 3781 p = reply_buf;
57385b51
AE
3782 end = reply_buf + ret;
3783 ret = -ERANGE;
642a2537
AE
3784 ceph_decode_64_safe(&p, end, pool_id, out_err);
3785 if (pool_id == CEPH_NOPOOL)
86b00e0d
AE
3786 goto out; /* No parent? No problem. */
3787
0903e875
AE
3788 /* The ceph file layout needs to fit pool id in 32 bits */
3789
3790 ret = -EIO;
642a2537 3791 if (pool_id > (u64)U32_MAX) {
c0cd10db 3792 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3793 (unsigned long long)pool_id, U32_MAX);
57385b51 3794 goto out_err;
c0cd10db 3795 }
642a2537 3796 parent_spec->pool_id = pool_id;
0903e875 3797
979ed480 3798 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3799 if (IS_ERR(image_id)) {
3800 ret = PTR_ERR(image_id);
3801 goto out_err;
3802 }
3803 parent_spec->image_id = image_id;
3804 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3805 ceph_decode_64_safe(&p, end, overlap, out_err);
3806
70cf49cf 3807 if (overlap) {
642a2537 3808 rbd_spec_put(rbd_dev->parent_spec);
70cf49cf
AE
3809 rbd_dev->parent_spec = parent_spec;
3810 parent_spec = NULL; /* rbd_dev now owns this */
3811 rbd_dev->parent_overlap = overlap;
3812 } else {
3813 rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
3814 }
86b00e0d
AE
3815out:
3816 ret = 0;
3817out_err:
3818 kfree(reply_buf);
3819 rbd_spec_put(parent_spec);
3820
3821 return ret;
3822}
3823
cc070d59
AE
3824static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3825{
3826 struct {
3827 __le64 stripe_unit;
3828 __le64 stripe_count;
3829 } __attribute__ ((packed)) striping_info_buf = { 0 };
3830 size_t size = sizeof (striping_info_buf);
3831 void *p;
3832 u64 obj_size;
3833 u64 stripe_unit;
3834 u64 stripe_count;
3835 int ret;
3836
3837 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3838 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3839 (char *)&striping_info_buf, size);
cc070d59
AE
3840 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3841 if (ret < 0)
3842 return ret;
3843 if (ret < size)
3844 return -ERANGE;
3845
3846 /*
3847 * We don't actually support the "fancy striping" feature
3848 * (STRIPINGV2) yet, but if the striping sizes are the
3849 * defaults the behavior is the same as before. So find
3850 * out, and only fail if the image has non-default values.
3851 */
3852 ret = -EINVAL;
3853 obj_size = (u64)1 << rbd_dev->header.obj_order;
3854 p = &striping_info_buf;
3855 stripe_unit = ceph_decode_64(&p);
3856 if (stripe_unit != obj_size) {
3857 rbd_warn(rbd_dev, "unsupported stripe unit "
3858 "(got %llu want %llu)",
3859 stripe_unit, obj_size);
3860 return -EINVAL;
3861 }
3862 stripe_count = ceph_decode_64(&p);
3863 if (stripe_count != 1) {
3864 rbd_warn(rbd_dev, "unsupported stripe count "
3865 "(got %llu want 1)", stripe_count);
3866 return -EINVAL;
3867 }
500d0c0f
AE
3868 rbd_dev->header.stripe_unit = stripe_unit;
3869 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3870
3871 return 0;
3872}
3873
9e15b77d
AE
3874static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3875{
3876 size_t image_id_size;
3877 char *image_id;
3878 void *p;
3879 void *end;
3880 size_t size;
3881 void *reply_buf = NULL;
3882 size_t len = 0;
3883 char *image_name = NULL;
3884 int ret;
3885
3886 rbd_assert(!rbd_dev->spec->image_name);
3887
69e7a02f
AE
3888 len = strlen(rbd_dev->spec->image_id);
3889 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3890 image_id = kmalloc(image_id_size, GFP_KERNEL);
3891 if (!image_id)
3892 return NULL;
3893
3894 p = image_id;
4157976b 3895 end = image_id + image_id_size;
57385b51 3896 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3897
3898 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3899 reply_buf = kmalloc(size, GFP_KERNEL);
3900 if (!reply_buf)
3901 goto out;
3902
36be9a76 3903 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3904 "rbd", "dir_get_name",
3905 image_id, image_id_size,
e2a58ee5 3906 reply_buf, size);
9e15b77d
AE
3907 if (ret < 0)
3908 goto out;
3909 p = reply_buf;
f40eb349
AE
3910 end = reply_buf + ret;
3911
9e15b77d
AE
3912 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3913 if (IS_ERR(image_name))
3914 image_name = NULL;
3915 else
3916 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3917out:
3918 kfree(reply_buf);
3919 kfree(image_id);
3920
3921 return image_name;
3922}
3923
2ad3d716
AE
3924static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3925{
3926 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3927 const char *snap_name;
3928 u32 which = 0;
3929
3930 /* Skip over names until we find the one we are looking for */
3931
3932 snap_name = rbd_dev->header.snap_names;
3933 while (which < snapc->num_snaps) {
3934 if (!strcmp(name, snap_name))
3935 return snapc->snaps[which];
3936 snap_name += strlen(snap_name) + 1;
3937 which++;
3938 }
3939 return CEPH_NOSNAP;
3940}
3941
3942static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3943{
3944 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3945 u32 which;
3946 bool found = false;
3947 u64 snap_id;
3948
3949 for (which = 0; !found && which < snapc->num_snaps; which++) {
3950 const char *snap_name;
3951
3952 snap_id = snapc->snaps[which];
3953 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3954 if (IS_ERR(snap_name))
3955 break;
3956 found = !strcmp(name, snap_name);
3957 kfree(snap_name);
3958 }
3959 return found ? snap_id : CEPH_NOSNAP;
3960}
3961
3962/*
3963 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3964 * no snapshot by that name is found, or if an error occurs.
3965 */
3966static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3967{
3968 if (rbd_dev->image_format == 1)
3969 return rbd_v1_snap_id_by_name(rbd_dev, name);
3970
3971 return rbd_v2_snap_id_by_name(rbd_dev, name);
3972}
3973
9e15b77d 3974/*
2e9f7f1c
AE
3975 * When an rbd image has a parent image, it is identified by the
3976 * pool, image, and snapshot ids (not names). This function fills
3977 * in the names for those ids. (It's OK if we can't figure out the
3978 * name for an image id, but the pool and snapshot ids should always
3979 * exist and have names.) All names in an rbd spec are dynamically
3980 * allocated.
e1d4213f
AE
3981 *
3982 * When an image being mapped (not a parent) is probed, we have the
3983 * pool name and pool id, image name and image id, and the snapshot
3984 * name. The only thing we're missing is the snapshot id.
9e15b77d 3985 */
2e9f7f1c 3986static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 3987{
2e9f7f1c
AE
3988 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3989 struct rbd_spec *spec = rbd_dev->spec;
3990 const char *pool_name;
3991 const char *image_name;
3992 const char *snap_name;
9e15b77d
AE
3993 int ret;
3994
e1d4213f
AE
3995 /*
3996 * An image being mapped will have the pool name (etc.), but
3997 * we need to look up the snapshot id.
3998 */
2e9f7f1c
AE
3999 if (spec->pool_name) {
4000 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4001 u64 snap_id;
e1d4213f 4002
2ad3d716
AE
4003 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4004 if (snap_id == CEPH_NOSNAP)
e1d4213f 4005 return -ENOENT;
2ad3d716 4006 spec->snap_id = snap_id;
e1d4213f 4007 } else {
2e9f7f1c 4008 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4009 }
4010
4011 return 0;
4012 }
9e15b77d 4013
2e9f7f1c 4014 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4015
2e9f7f1c
AE
4016 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4017 if (!pool_name) {
4018 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4019 return -EIO;
4020 }
2e9f7f1c
AE
4021 pool_name = kstrdup(pool_name, GFP_KERNEL);
4022 if (!pool_name)
9e15b77d
AE
4023 return -ENOMEM;
4024
4025 /* Fetch the image name; tolerate failure here */
4026
2e9f7f1c
AE
4027 image_name = rbd_dev_image_name(rbd_dev);
4028 if (!image_name)
06ecc6cb 4029 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4030
2e9f7f1c 4031 /* Look up the snapshot name, and make a copy */
9e15b77d 4032
2e9f7f1c 4033 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
4034 if (!snap_name) {
4035 ret = -ENOMEM;
9e15b77d 4036 goto out_err;
2e9f7f1c
AE
4037 }
4038
4039 spec->pool_name = pool_name;
4040 spec->image_name = image_name;
4041 spec->snap_name = snap_name;
9e15b77d
AE
4042
4043 return 0;
4044out_err:
2e9f7f1c
AE
4045 kfree(image_name);
4046 kfree(pool_name);
9e15b77d
AE
4047
4048 return ret;
4049}
4050
cc4a38bd 4051static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4052{
4053 size_t size;
4054 int ret;
4055 void *reply_buf;
4056 void *p;
4057 void *end;
4058 u64 seq;
4059 u32 snap_count;
4060 struct ceph_snap_context *snapc;
4061 u32 i;
4062
4063 /*
4064 * We'll need room for the seq value (maximum snapshot id),
4065 * snapshot count, and array of that many snapshot ids.
4066 * For now we have a fixed upper limit on the number we're
4067 * prepared to receive.
4068 */
4069 size = sizeof (__le64) + sizeof (__le32) +
4070 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4071 reply_buf = kzalloc(size, GFP_KERNEL);
4072 if (!reply_buf)
4073 return -ENOMEM;
4074
36be9a76 4075 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4076 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4077 reply_buf, size);
36be9a76 4078 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4079 if (ret < 0)
4080 goto out;
4081
35d489f9 4082 p = reply_buf;
57385b51
AE
4083 end = reply_buf + ret;
4084 ret = -ERANGE;
35d489f9
AE
4085 ceph_decode_64_safe(&p, end, seq, out);
4086 ceph_decode_32_safe(&p, end, snap_count, out);
4087
4088 /*
4089 * Make sure the reported number of snapshot ids wouldn't go
4090 * beyond the end of our buffer. But before checking that,
4091 * make sure the computed size of the snapshot context we
4092 * allocate is representable in a size_t.
4093 */
4094 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4095 / sizeof (u64)) {
4096 ret = -EINVAL;
4097 goto out;
4098 }
4099 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4100 goto out;
468521c1 4101 ret = 0;
35d489f9 4102
812164f8 4103 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4104 if (!snapc) {
4105 ret = -ENOMEM;
4106 goto out;
4107 }
35d489f9 4108 snapc->seq = seq;
35d489f9
AE
4109 for (i = 0; i < snap_count; i++)
4110 snapc->snaps[i] = ceph_decode_64(&p);
4111
49ece554 4112 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4113 rbd_dev->header.snapc = snapc;
4114
4115 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4116 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4117out:
4118 kfree(reply_buf);
4119
57385b51 4120 return ret;
35d489f9
AE
4121}
4122
54cac61f
AE
4123static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4124 u64 snap_id)
b8b1e2db
AE
4125{
4126 size_t size;
4127 void *reply_buf;
54cac61f 4128 __le64 snapid;
b8b1e2db
AE
4129 int ret;
4130 void *p;
4131 void *end;
b8b1e2db
AE
4132 char *snap_name;
4133
4134 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4135 reply_buf = kmalloc(size, GFP_KERNEL);
4136 if (!reply_buf)
4137 return ERR_PTR(-ENOMEM);
4138
54cac61f 4139 snapid = cpu_to_le64(snap_id);
36be9a76 4140 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4141 "rbd", "get_snapshot_name",
54cac61f 4142 &snapid, sizeof (snapid),
e2a58ee5 4143 reply_buf, size);
36be9a76 4144 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4145 if (ret < 0) {
4146 snap_name = ERR_PTR(ret);
b8b1e2db 4147 goto out;
f40eb349 4148 }
b8b1e2db
AE
4149
4150 p = reply_buf;
f40eb349 4151 end = reply_buf + ret;
e5c35534 4152 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4153 if (IS_ERR(snap_name))
b8b1e2db 4154 goto out;
b8b1e2db 4155
f40eb349 4156 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4157 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4158out:
4159 kfree(reply_buf);
4160
f40eb349 4161 return snap_name;
b8b1e2db
AE
4162}
4163
2df3fac7 4164static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4165{
2df3fac7 4166 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4167 int ret;
117973fb
AE
4168
4169 down_write(&rbd_dev->header_rwsem);
4170
2df3fac7
AE
4171 if (first_time) {
4172 ret = rbd_dev_v2_header_onetime(rbd_dev);
4173 if (ret)
4174 goto out;
4175 }
4176
642a2537
AE
4177 /*
4178 * If the image supports layering, get the parent info. We
4179 * need to probe the first time regardless. Thereafter we
4180 * only need to if there's a parent, to see if it has
4181 * disappeared due to the mapped image getting flattened.
4182 */
4183 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4184 (first_time || rbd_dev->parent_spec)) {
4185 bool warn;
4186
4187 ret = rbd_dev_v2_parent_info(rbd_dev);
4188 if (ret)
4189 goto out;
4190
4191 /*
4192 * Print a warning if this is the initial probe and
4193 * the image has a parent. Don't print it if the
4194 * image now being probed is itself a parent. We
4195 * can tell at this point because we won't know its
4196 * pool name yet (just its pool id).
4197 */
4198 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4199 if (first_time && warn)
4200 rbd_warn(rbd_dev, "WARNING: kernel layering "
4201 "is EXPERIMENTAL!");
4202 }
4203
117973fb
AE
4204 ret = rbd_dev_v2_image_size(rbd_dev);
4205 if (ret)
4206 goto out;
642a2537 4207
29334ba4
AE
4208 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4209 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4210 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4211
cc4a38bd 4212 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4213 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4214out:
4215 up_write(&rbd_dev->header_rwsem);
4216
4217 return ret;
4218}
4219
dfc5606d
YS
4220static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4221{
dfc5606d 4222 struct device *dev;
cd789ab9 4223 int ret;
dfc5606d
YS
4224
4225 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4226
cd789ab9 4227 dev = &rbd_dev->dev;
dfc5606d
YS
4228 dev->bus = &rbd_bus_type;
4229 dev->type = &rbd_device_type;
4230 dev->parent = &rbd_root_dev;
200a6a8b 4231 dev->release = rbd_dev_device_release;
de71a297 4232 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4233 ret = device_register(dev);
dfc5606d 4234
dfc5606d 4235 mutex_unlock(&ctl_mutex);
cd789ab9 4236
dfc5606d 4237 return ret;
602adf40
YS
4238}
4239
dfc5606d
YS
4240static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4241{
4242 device_unregister(&rbd_dev->dev);
4243}
4244
e2839308 4245static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4246
4247/*
499afd5b
AE
4248 * Get a unique rbd identifier for the given new rbd_dev, and add
4249 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4250 */
e2839308 4251static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4252{
e2839308 4253 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4254
4255 spin_lock(&rbd_dev_list_lock);
4256 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4257 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4258 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4259 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4260}
b7f23c36 4261
1ddbe94e 4262/*
499afd5b
AE
4263 * Remove an rbd_dev from the global list, and record that its
4264 * identifier is no longer in use.
1ddbe94e 4265 */
e2839308 4266static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4267{
d184f6bf 4268 struct list_head *tmp;
de71a297 4269 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4270 int max_id;
4271
aafb230e 4272 rbd_assert(rbd_id > 0);
499afd5b 4273
e2839308
AE
4274 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4275 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4276 spin_lock(&rbd_dev_list_lock);
4277 list_del_init(&rbd_dev->node);
d184f6bf
AE
4278
4279 /*
4280 * If the id being "put" is not the current maximum, there
4281 * is nothing special we need to do.
4282 */
e2839308 4283 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4284 spin_unlock(&rbd_dev_list_lock);
4285 return;
4286 }
4287
4288 /*
4289 * We need to update the current maximum id. Search the
4290 * list to find out what it is. We're more likely to find
4291 * the maximum at the end, so search the list backward.
4292 */
4293 max_id = 0;
4294 list_for_each_prev(tmp, &rbd_dev_list) {
4295 struct rbd_device *rbd_dev;
4296
4297 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4298 if (rbd_dev->dev_id > max_id)
4299 max_id = rbd_dev->dev_id;
d184f6bf 4300 }
499afd5b 4301 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4302
1ddbe94e 4303 /*
e2839308 4304 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4305 * which case it now accurately reflects the new maximum.
4306 * Be careful not to overwrite the maximum value in that
4307 * case.
1ddbe94e 4308 */
e2839308
AE
4309 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4310 dout(" max dev id has been reset\n");
b7f23c36
AE
4311}
4312
e28fff26
AE
4313/*
4314 * Skips over white space at *buf, and updates *buf to point to the
4315 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4316 * the token (string of non-white space characters) found. Note
4317 * that *buf must be terminated with '\0'.
e28fff26
AE
4318 */
4319static inline size_t next_token(const char **buf)
4320{
4321 /*
4322 * These are the characters that produce nonzero for
4323 * isspace() in the "C" and "POSIX" locales.
4324 */
4325 const char *spaces = " \f\n\r\t\v";
4326
4327 *buf += strspn(*buf, spaces); /* Find start of token */
4328
4329 return strcspn(*buf, spaces); /* Return token length */
4330}
4331
4332/*
4333 * Finds the next token in *buf, and if the provided token buffer is
4334 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4335 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4336 * must be terminated with '\0' on entry.
e28fff26
AE
4337 *
4338 * Returns the length of the token found (not including the '\0').
4339 * Return value will be 0 if no token is found, and it will be >=
4340 * token_size if the token would not fit.
4341 *
593a9e7b 4342 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4343 * found token. Note that this occurs even if the token buffer is
4344 * too small to hold it.
4345 */
4346static inline size_t copy_token(const char **buf,
4347 char *token,
4348 size_t token_size)
4349{
4350 size_t len;
4351
4352 len = next_token(buf);
4353 if (len < token_size) {
4354 memcpy(token, *buf, len);
4355 *(token + len) = '\0';
4356 }
4357 *buf += len;
4358
4359 return len;
4360}
4361
ea3352f4
AE
4362/*
4363 * Finds the next token in *buf, dynamically allocates a buffer big
4364 * enough to hold a copy of it, and copies the token into the new
4365 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4366 * that a duplicate buffer is created even for a zero-length token.
4367 *
4368 * Returns a pointer to the newly-allocated duplicate, or a null
4369 * pointer if memory for the duplicate was not available. If
4370 * the lenp argument is a non-null pointer, the length of the token
4371 * (not including the '\0') is returned in *lenp.
4372 *
4373 * If successful, the *buf pointer will be updated to point beyond
4374 * the end of the found token.
4375 *
4376 * Note: uses GFP_KERNEL for allocation.
4377 */
4378static inline char *dup_token(const char **buf, size_t *lenp)
4379{
4380 char *dup;
4381 size_t len;
4382
4383 len = next_token(buf);
4caf35f9 4384 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4385 if (!dup)
4386 return NULL;
ea3352f4
AE
4387 *(dup + len) = '\0';
4388 *buf += len;
4389
4390 if (lenp)
4391 *lenp = len;
4392
4393 return dup;
4394}
4395
a725f65e 4396/*
859c31df
AE
4397 * Parse the options provided for an "rbd add" (i.e., rbd image
4398 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4399 * and the data written is passed here via a NUL-terminated buffer.
4400 * Returns 0 if successful or an error code otherwise.
d22f76e7 4401 *
859c31df
AE
4402 * The information extracted from these options is recorded in
4403 * the other parameters which return dynamically-allocated
4404 * structures:
4405 * ceph_opts
4406 * The address of a pointer that will refer to a ceph options
4407 * structure. Caller must release the returned pointer using
4408 * ceph_destroy_options() when it is no longer needed.
4409 * rbd_opts
4410 * Address of an rbd options pointer. Fully initialized by
4411 * this function; caller must release with kfree().
4412 * spec
4413 * Address of an rbd image specification pointer. Fully
4414 * initialized by this function based on parsed options.
4415 * Caller must release with rbd_spec_put().
4416 *
4417 * The options passed take this form:
4418 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4419 * where:
4420 * <mon_addrs>
4421 * A comma-separated list of one or more monitor addresses.
4422 * A monitor address is an ip address, optionally followed
4423 * by a port number (separated by a colon).
4424 * I.e.: ip1[:port1][,ip2[:port2]...]
4425 * <options>
4426 * A comma-separated list of ceph and/or rbd options.
4427 * <pool_name>
4428 * The name of the rados pool containing the rbd image.
4429 * <image_name>
4430 * The name of the image in that pool to map.
4431 * <snap_id>
4432 * An optional snapshot id. If provided, the mapping will
4433 * present data from the image at the time that snapshot was
4434 * created. The image head is used if no snapshot id is
4435 * provided. Snapshot mappings are always read-only.
a725f65e 4436 */
859c31df 4437static int rbd_add_parse_args(const char *buf,
dc79b113 4438 struct ceph_options **ceph_opts,
859c31df
AE
4439 struct rbd_options **opts,
4440 struct rbd_spec **rbd_spec)
e28fff26 4441{
d22f76e7 4442 size_t len;
859c31df 4443 char *options;
0ddebc0c 4444 const char *mon_addrs;
ecb4dc22 4445 char *snap_name;
0ddebc0c 4446 size_t mon_addrs_size;
859c31df 4447 struct rbd_spec *spec = NULL;
4e9afeba 4448 struct rbd_options *rbd_opts = NULL;
859c31df 4449 struct ceph_options *copts;
dc79b113 4450 int ret;
e28fff26
AE
4451
4452 /* The first four tokens are required */
4453
7ef3214a 4454 len = next_token(&buf);
4fb5d671
AE
4455 if (!len) {
4456 rbd_warn(NULL, "no monitor address(es) provided");
4457 return -EINVAL;
4458 }
0ddebc0c 4459 mon_addrs = buf;
f28e565a 4460 mon_addrs_size = len + 1;
7ef3214a 4461 buf += len;
a725f65e 4462
dc79b113 4463 ret = -EINVAL;
f28e565a
AE
4464 options = dup_token(&buf, NULL);
4465 if (!options)
dc79b113 4466 return -ENOMEM;
4fb5d671
AE
4467 if (!*options) {
4468 rbd_warn(NULL, "no options provided");
4469 goto out_err;
4470 }
e28fff26 4471
859c31df
AE
4472 spec = rbd_spec_alloc();
4473 if (!spec)
f28e565a 4474 goto out_mem;
859c31df
AE
4475
4476 spec->pool_name = dup_token(&buf, NULL);
4477 if (!spec->pool_name)
4478 goto out_mem;
4fb5d671
AE
4479 if (!*spec->pool_name) {
4480 rbd_warn(NULL, "no pool name provided");
4481 goto out_err;
4482 }
e28fff26 4483
69e7a02f 4484 spec->image_name = dup_token(&buf, NULL);
859c31df 4485 if (!spec->image_name)
f28e565a 4486 goto out_mem;
4fb5d671
AE
4487 if (!*spec->image_name) {
4488 rbd_warn(NULL, "no image name provided");
4489 goto out_err;
4490 }
d4b125e9 4491
f28e565a
AE
4492 /*
4493 * Snapshot name is optional; default is to use "-"
4494 * (indicating the head/no snapshot).
4495 */
3feeb894 4496 len = next_token(&buf);
820a5f3e 4497 if (!len) {
3feeb894
AE
4498 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4499 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4500 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4501 ret = -ENAMETOOLONG;
f28e565a 4502 goto out_err;
849b4260 4503 }
ecb4dc22
AE
4504 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4505 if (!snap_name)
f28e565a 4506 goto out_mem;
ecb4dc22
AE
4507 *(snap_name + len) = '\0';
4508 spec->snap_name = snap_name;
e5c35534 4509
0ddebc0c 4510 /* Initialize all rbd options to the defaults */
e28fff26 4511
4e9afeba
AE
4512 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4513 if (!rbd_opts)
4514 goto out_mem;
4515
4516 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4517
859c31df 4518 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4519 mon_addrs + mon_addrs_size - 1,
4e9afeba 4520 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4521 if (IS_ERR(copts)) {
4522 ret = PTR_ERR(copts);
dc79b113
AE
4523 goto out_err;
4524 }
859c31df
AE
4525 kfree(options);
4526
4527 *ceph_opts = copts;
4e9afeba 4528 *opts = rbd_opts;
859c31df 4529 *rbd_spec = spec;
0ddebc0c 4530
dc79b113 4531 return 0;
f28e565a 4532out_mem:
dc79b113 4533 ret = -ENOMEM;
d22f76e7 4534out_err:
859c31df
AE
4535 kfree(rbd_opts);
4536 rbd_spec_put(spec);
f28e565a 4537 kfree(options);
d22f76e7 4538
dc79b113 4539 return ret;
a725f65e
AE
4540}
4541
589d30e0
AE
4542/*
4543 * An rbd format 2 image has a unique identifier, distinct from the
4544 * name given to it by the user. Internally, that identifier is
4545 * what's used to specify the names of objects related to the image.
4546 *
4547 * A special "rbd id" object is used to map an rbd image name to its
4548 * id. If that object doesn't exist, then there is no v2 rbd image
4549 * with the supplied name.
4550 *
4551 * This function will record the given rbd_dev's image_id field if
4552 * it can be determined, and in that case will return 0. If any
4553 * errors occur a negative errno will be returned and the rbd_dev's
4554 * image_id field will be unchanged (and should be NULL).
4555 */
4556static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4557{
4558 int ret;
4559 size_t size;
4560 char *object_name;
4561 void *response;
c0fba368 4562 char *image_id;
2f82ee54 4563
2c0d0a10
AE
4564 /*
4565 * When probing a parent image, the image id is already
4566 * known (and the image name likely is not). There's no
c0fba368
AE
4567 * need to fetch the image id again in this case. We
4568 * do still need to set the image format though.
2c0d0a10 4569 */
c0fba368
AE
4570 if (rbd_dev->spec->image_id) {
4571 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4572
2c0d0a10 4573 return 0;
c0fba368 4574 }
2c0d0a10 4575
589d30e0
AE
4576 /*
4577 * First, see if the format 2 image id file exists, and if
4578 * so, get the image's persistent id from it.
4579 */
69e7a02f 4580 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4581 object_name = kmalloc(size, GFP_NOIO);
4582 if (!object_name)
4583 return -ENOMEM;
0d7dbfce 4584 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4585 dout("rbd id object name is %s\n", object_name);
4586
4587 /* Response will be an encoded string, which includes a length */
4588
4589 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4590 response = kzalloc(size, GFP_NOIO);
4591 if (!response) {
4592 ret = -ENOMEM;
4593 goto out;
4594 }
4595
c0fba368
AE
4596 /* If it doesn't exist we'll assume it's a format 1 image */
4597
36be9a76 4598 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4599 "rbd", "get_id", NULL, 0,
e2a58ee5 4600 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4601 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4602 if (ret == -ENOENT) {
4603 image_id = kstrdup("", GFP_KERNEL);
4604 ret = image_id ? 0 : -ENOMEM;
4605 if (!ret)
4606 rbd_dev->image_format = 1;
4607 } else if (ret > sizeof (__le32)) {
4608 void *p = response;
4609
4610 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4611 NULL, GFP_NOIO);
c0fba368
AE
4612 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4613 if (!ret)
4614 rbd_dev->image_format = 2;
589d30e0 4615 } else {
c0fba368
AE
4616 ret = -EINVAL;
4617 }
4618
4619 if (!ret) {
4620 rbd_dev->spec->image_id = image_id;
4621 dout("image_id is %s\n", image_id);
589d30e0
AE
4622 }
4623out:
4624 kfree(response);
4625 kfree(object_name);
4626
4627 return ret;
4628}
4629
6fd48b3b
AE
4630/* Undo whatever state changes are made by v1 or v2 image probe */
4631
4632static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4633{
4634 struct rbd_image_header *header;
4635
a2acd00e 4636 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4637
4638 /* Free dynamic fields from the header, then zero it out */
4639
4640 header = &rbd_dev->header;
812164f8 4641 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4642 kfree(header->snap_sizes);
4643 kfree(header->snap_names);
4644 kfree(header->object_prefix);
4645 memset(header, 0, sizeof (*header));
4646}
4647
2df3fac7 4648static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9 4649{
9d475de5 4650 int ret;
a30b71b9 4651
1e130199 4652 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4653 if (ret)
b1b5402a
AE
4654 goto out_err;
4655
2df3fac7
AE
4656 /*
4657 * Get the and check features for the image. Currently the
4658 * features are assumed to never change.
4659 */
b1b5402a 4660 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4661 if (ret)
9d475de5 4662 goto out_err;
35d489f9 4663
cc070d59
AE
4664 /* If the image supports fancy striping, get its parameters */
4665
4666 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4667 ret = rbd_dev_v2_striping_info(rbd_dev);
4668 if (ret < 0)
4669 goto out_err;
4670 }
2df3fac7 4671 /* No support for crypto and compression type format 2 images */
6e14b1a6 4672
35152979 4673 return 0;
9d475de5 4674out_err:
642a2537 4675 rbd_dev->header.features = 0;
1e130199
AE
4676 kfree(rbd_dev->header.object_prefix);
4677 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4678
4679 return ret;
a30b71b9
AE
4680}
4681
124afba2 4682static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4683{
2f82ee54 4684 struct rbd_device *parent = NULL;
124afba2
AE
4685 struct rbd_spec *parent_spec;
4686 struct rbd_client *rbdc;
4687 int ret;
4688
4689 if (!rbd_dev->parent_spec)
4690 return 0;
4691 /*
4692 * We need to pass a reference to the client and the parent
4693 * spec when creating the parent rbd_dev. Images related by
4694 * parent/child relationships always share both.
4695 */
4696 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4697 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4698
4699 ret = -ENOMEM;
4700 parent = rbd_dev_create(rbdc, parent_spec);
4701 if (!parent)
4702 goto out_err;
4703
1f3ef788 4704 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4705 if (ret < 0)
4706 goto out_err;
4707 rbd_dev->parent = parent;
a2acd00e 4708 atomic_set(&rbd_dev->parent_ref, 1);
124afba2
AE
4709
4710 return 0;
4711out_err:
4712 if (parent) {
fb65d228 4713 rbd_dev_unparent(rbd_dev);
124afba2
AE
4714 kfree(rbd_dev->header_name);
4715 rbd_dev_destroy(parent);
4716 } else {
4717 rbd_put_client(rbdc);
4718 rbd_spec_put(parent_spec);
4719 }
4720
4721 return ret;
4722}
4723
200a6a8b 4724static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4725{
83a06263 4726 int ret;
d1cf5788 4727
83a06263
AE
4728 /* generate unique id: find highest unique id, add one */
4729 rbd_dev_id_get(rbd_dev);
4730
4731 /* Fill in the device name, now that we have its id. */
4732 BUILD_BUG_ON(DEV_NAME_LEN
4733 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4734 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4735
4736 /* Get our block major device number. */
4737
4738 ret = register_blkdev(0, rbd_dev->name);
4739 if (ret < 0)
4740 goto err_out_id;
4741 rbd_dev->major = ret;
4742
4743 /* Set up the blkdev mapping. */
4744
4745 ret = rbd_init_disk(rbd_dev);
4746 if (ret)
4747 goto err_out_blkdev;
4748
f35a4dee 4749 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4750 if (ret)
4751 goto err_out_disk;
f35a4dee
AE
4752 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4753
4754 ret = rbd_bus_add_dev(rbd_dev);
4755 if (ret)
4756 goto err_out_mapping;
83a06263 4757
83a06263
AE
4758 /* Everything's ready. Announce the disk to the world. */
4759
129b79d4 4760 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4761 add_disk(rbd_dev->disk);
4762
4763 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4764 (unsigned long long) rbd_dev->mapping.size);
4765
4766 return ret;
2f82ee54 4767
f35a4dee
AE
4768err_out_mapping:
4769 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4770err_out_disk:
4771 rbd_free_disk(rbd_dev);
4772err_out_blkdev:
4773 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4774err_out_id:
4775 rbd_dev_id_put(rbd_dev);
d1cf5788 4776 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4777
4778 return ret;
4779}
4780
332bb12d
AE
4781static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4782{
4783 struct rbd_spec *spec = rbd_dev->spec;
4784 size_t size;
4785
4786 /* Record the header object name for this rbd image. */
4787
4788 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4789
4790 if (rbd_dev->image_format == 1)
4791 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4792 else
4793 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4794
4795 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4796 if (!rbd_dev->header_name)
4797 return -ENOMEM;
4798
4799 if (rbd_dev->image_format == 1)
4800 sprintf(rbd_dev->header_name, "%s%s",
4801 spec->image_name, RBD_SUFFIX);
4802 else
4803 sprintf(rbd_dev->header_name, "%s%s",
4804 RBD_HEADER_PREFIX, spec->image_id);
4805 return 0;
4806}
4807
200a6a8b
AE
4808static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4809{
6fd48b3b 4810 rbd_dev_unprobe(rbd_dev);
200a6a8b 4811 kfree(rbd_dev->header_name);
6fd48b3b
AE
4812 rbd_dev->header_name = NULL;
4813 rbd_dev->image_format = 0;
4814 kfree(rbd_dev->spec->image_id);
4815 rbd_dev->spec->image_id = NULL;
4816
200a6a8b
AE
4817 rbd_dev_destroy(rbd_dev);
4818}
4819
a30b71b9
AE
4820/*
4821 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4822 * device. If this image is the one being mapped (i.e., not a
4823 * parent), initiate a watch on its header object before using that
4824 * object to get detailed information about the rbd image.
a30b71b9 4825 */
1f3ef788 4826static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4827{
4828 int ret;
b644de2b 4829 int tmp;
a30b71b9
AE
4830
4831 /*
4832 * Get the id from the image id object. If it's not a
4833 * format 2 image, we'll get ENOENT back, and we'll assume
4834 * it's a format 1 image.
4835 */
4836 ret = rbd_dev_image_id(rbd_dev);
4837 if (ret)
c0fba368
AE
4838 return ret;
4839 rbd_assert(rbd_dev->spec->image_id);
4840 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4841
332bb12d
AE
4842 ret = rbd_dev_header_name(rbd_dev);
4843 if (ret)
4844 goto err_out_format;
4845
1f3ef788
AE
4846 if (mapping) {
4847 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4848 if (ret)
4849 goto out_header_name;
4850 }
b644de2b 4851
c0fba368 4852 if (rbd_dev->image_format == 1)
99a41ebc 4853 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 4854 else
2df3fac7 4855 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 4856 if (ret)
b644de2b 4857 goto err_out_watch;
83a06263 4858
9bb81c9b
AE
4859 ret = rbd_dev_spec_update(rbd_dev);
4860 if (ret)
33dca39f 4861 goto err_out_probe;
9bb81c9b
AE
4862
4863 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
4864 if (ret)
4865 goto err_out_probe;
4866
4867 dout("discovered format %u image, header name is %s\n",
4868 rbd_dev->image_format, rbd_dev->header_name);
83a06263 4869
30d60ba2 4870 return 0;
6fd48b3b
AE
4871err_out_probe:
4872 rbd_dev_unprobe(rbd_dev);
b644de2b 4873err_out_watch:
1f3ef788
AE
4874 if (mapping) {
4875 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
4876 if (tmp)
4877 rbd_warn(rbd_dev, "unable to tear down "
4878 "watch request (%d)\n", tmp);
4879 }
332bb12d
AE
4880out_header_name:
4881 kfree(rbd_dev->header_name);
4882 rbd_dev->header_name = NULL;
4883err_out_format:
4884 rbd_dev->image_format = 0;
5655c4d9
AE
4885 kfree(rbd_dev->spec->image_id);
4886 rbd_dev->spec->image_id = NULL;
4887
4888 dout("probe failed, returning %d\n", ret);
4889
a30b71b9
AE
4890 return ret;
4891}
4892
59c2be1e
YS
4893static ssize_t rbd_add(struct bus_type *bus,
4894 const char *buf,
4895 size_t count)
602adf40 4896{
cb8627c7 4897 struct rbd_device *rbd_dev = NULL;
dc79b113 4898 struct ceph_options *ceph_opts = NULL;
4e9afeba 4899 struct rbd_options *rbd_opts = NULL;
859c31df 4900 struct rbd_spec *spec = NULL;
9d3997fd 4901 struct rbd_client *rbdc;
27cc2594 4902 struct ceph_osd_client *osdc;
51344a38 4903 bool read_only;
27cc2594 4904 int rc = -ENOMEM;
602adf40
YS
4905
4906 if (!try_module_get(THIS_MODULE))
4907 return -ENODEV;
4908
602adf40 4909 /* parse add command */
859c31df 4910 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4911 if (rc < 0)
bd4ba655 4912 goto err_out_module;
51344a38
AE
4913 read_only = rbd_opts->read_only;
4914 kfree(rbd_opts);
4915 rbd_opts = NULL; /* done with this */
78cea76e 4916
9d3997fd
AE
4917 rbdc = rbd_get_client(ceph_opts);
4918 if (IS_ERR(rbdc)) {
4919 rc = PTR_ERR(rbdc);
0ddebc0c 4920 goto err_out_args;
9d3997fd 4921 }
c53d5893 4922 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4923
602adf40 4924 /* pick the pool */
9d3997fd 4925 osdc = &rbdc->client->osdc;
859c31df 4926 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4927 if (rc < 0)
4928 goto err_out_client;
c0cd10db 4929 spec->pool_id = (u64)rc;
859c31df 4930
0903e875
AE
4931 /* The ceph file layout needs to fit pool id in 32 bits */
4932
c0cd10db
AE
4933 if (spec->pool_id > (u64)U32_MAX) {
4934 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4935 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
4936 rc = -EIO;
4937 goto err_out_client;
4938 }
4939
c53d5893 4940 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4941 if (!rbd_dev)
4942 goto err_out_client;
c53d5893
AE
4943 rbdc = NULL; /* rbd_dev now owns this */
4944 spec = NULL; /* rbd_dev now owns this */
602adf40 4945
1f3ef788 4946 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 4947 if (rc < 0)
c53d5893 4948 goto err_out_rbd_dev;
05fd6f6f 4949
7ce4eef7
AE
4950 /* If we are mapping a snapshot it must be marked read-only */
4951
4952 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4953 read_only = true;
4954 rbd_dev->mapping.read_only = read_only;
4955
b536f69a
AE
4956 rc = rbd_dev_device_setup(rbd_dev);
4957 if (!rc)
4958 return count;
4959
4960 rbd_dev_image_release(rbd_dev);
c53d5893
AE
4961err_out_rbd_dev:
4962 rbd_dev_destroy(rbd_dev);
bd4ba655 4963err_out_client:
9d3997fd 4964 rbd_put_client(rbdc);
0ddebc0c 4965err_out_args:
78cea76e
AE
4966 if (ceph_opts)
4967 ceph_destroy_options(ceph_opts);
4e9afeba 4968 kfree(rbd_opts);
859c31df 4969 rbd_spec_put(spec);
bd4ba655
AE
4970err_out_module:
4971 module_put(THIS_MODULE);
27cc2594 4972
602adf40 4973 dout("Error adding device %s\n", buf);
27cc2594 4974
c0cd10db 4975 return (ssize_t)rc;
602adf40
YS
4976}
4977
de71a297 4978static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4979{
4980 struct list_head *tmp;
4981 struct rbd_device *rbd_dev;
4982
e124a82f 4983 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4984 list_for_each(tmp, &rbd_dev_list) {
4985 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4986 if (rbd_dev->dev_id == dev_id) {
e124a82f 4987 spin_unlock(&rbd_dev_list_lock);
602adf40 4988 return rbd_dev;
e124a82f 4989 }
602adf40 4990 }
e124a82f 4991 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4992 return NULL;
4993}
4994
200a6a8b 4995static void rbd_dev_device_release(struct device *dev)
602adf40 4996{
593a9e7b 4997 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4998
602adf40 4999 rbd_free_disk(rbd_dev);
200a6a8b 5000 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5001 rbd_dev_mapping_clear(rbd_dev);
602adf40 5002 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 5003 rbd_dev->major = 0;
e2839308 5004 rbd_dev_id_put(rbd_dev);
d1cf5788 5005 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5006}
5007
05a46afd
AE
5008static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5009{
ad945fc1 5010 while (rbd_dev->parent) {
05a46afd
AE
5011 struct rbd_device *first = rbd_dev;
5012 struct rbd_device *second = first->parent;
5013 struct rbd_device *third;
5014
5015 /*
5016 * Follow to the parent with no grandparent and
5017 * remove it.
5018 */
5019 while (second && (third = second->parent)) {
5020 first = second;
5021 second = third;
5022 }
ad945fc1 5023 rbd_assert(second);
8ad42cd0 5024 rbd_dev_image_release(second);
ad945fc1
AE
5025 first->parent = NULL;
5026 first->parent_overlap = 0;
5027
5028 rbd_assert(first->parent_spec);
05a46afd
AE
5029 rbd_spec_put(first->parent_spec);
5030 first->parent_spec = NULL;
05a46afd
AE
5031 }
5032}
5033
dfc5606d
YS
5034static ssize_t rbd_remove(struct bus_type *bus,
5035 const char *buf,
5036 size_t count)
602adf40
YS
5037{
5038 struct rbd_device *rbd_dev = NULL;
0d8189e1 5039 int target_id;
602adf40 5040 unsigned long ul;
0d8189e1 5041 int ret;
602adf40 5042
0d8189e1
AE
5043 ret = strict_strtoul(buf, 10, &ul);
5044 if (ret)
5045 return ret;
602adf40
YS
5046
5047 /* convert to int; abort if we lost anything in the conversion */
5048 target_id = (int) ul;
5049 if (target_id != ul)
5050 return -EINVAL;
5051
5052 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5053
5054 rbd_dev = __rbd_get_dev(target_id);
5055 if (!rbd_dev) {
5056 ret = -ENOENT;
5057 goto done;
42382b70
AE
5058 }
5059
a14ea269 5060 spin_lock_irq(&rbd_dev->lock);
b82d167b 5061 if (rbd_dev->open_count)
42382b70 5062 ret = -EBUSY;
b82d167b
AE
5063 else
5064 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 5065 spin_unlock_irq(&rbd_dev->lock);
b82d167b 5066 if (ret < 0)
42382b70 5067 goto done;
b480815a 5068 rbd_bus_del_dev(rbd_dev);
1f3ef788
AE
5069 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5070 if (ret)
5071 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
8ad42cd0 5072 rbd_dev_image_release(rbd_dev);
79ab7558 5073 module_put(THIS_MODULE);
1f3ef788 5074 ret = count;
602adf40
YS
5075done:
5076 mutex_unlock(&ctl_mutex);
aafb230e 5077
602adf40
YS
5078 return ret;
5079}
5080
602adf40
YS
5081/*
5082 * create control files in sysfs
dfc5606d 5083 * /sys/bus/rbd/...
602adf40
YS
5084 */
5085static int rbd_sysfs_init(void)
5086{
dfc5606d 5087 int ret;
602adf40 5088
fed4c143 5089 ret = device_register(&rbd_root_dev);
21079786 5090 if (ret < 0)
dfc5606d 5091 return ret;
602adf40 5092
fed4c143
AE
5093 ret = bus_register(&rbd_bus_type);
5094 if (ret < 0)
5095 device_unregister(&rbd_root_dev);
602adf40 5096
602adf40
YS
5097 return ret;
5098}
5099
5100static void rbd_sysfs_cleanup(void)
5101{
dfc5606d 5102 bus_unregister(&rbd_bus_type);
fed4c143 5103 device_unregister(&rbd_root_dev);
602adf40
YS
5104}
5105
1c2a9dfe
AE
5106static int rbd_slab_init(void)
5107{
5108 rbd_assert(!rbd_img_request_cache);
5109 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5110 sizeof (struct rbd_img_request),
5111 __alignof__(struct rbd_img_request),
5112 0, NULL);
868311b1
AE
5113 if (!rbd_img_request_cache)
5114 return -ENOMEM;
5115
5116 rbd_assert(!rbd_obj_request_cache);
5117 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5118 sizeof (struct rbd_obj_request),
5119 __alignof__(struct rbd_obj_request),
5120 0, NULL);
78c2a44a
AE
5121 if (!rbd_obj_request_cache)
5122 goto out_err;
5123
5124 rbd_assert(!rbd_segment_name_cache);
5125 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5126 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5127 if (rbd_segment_name_cache)
1c2a9dfe 5128 return 0;
78c2a44a
AE
5129out_err:
5130 if (rbd_obj_request_cache) {
5131 kmem_cache_destroy(rbd_obj_request_cache);
5132 rbd_obj_request_cache = NULL;
5133 }
1c2a9dfe 5134
868311b1
AE
5135 kmem_cache_destroy(rbd_img_request_cache);
5136 rbd_img_request_cache = NULL;
5137
1c2a9dfe
AE
5138 return -ENOMEM;
5139}
5140
5141static void rbd_slab_exit(void)
5142{
78c2a44a
AE
5143 rbd_assert(rbd_segment_name_cache);
5144 kmem_cache_destroy(rbd_segment_name_cache);
5145 rbd_segment_name_cache = NULL;
5146
868311b1
AE
5147 rbd_assert(rbd_obj_request_cache);
5148 kmem_cache_destroy(rbd_obj_request_cache);
5149 rbd_obj_request_cache = NULL;
5150
1c2a9dfe
AE
5151 rbd_assert(rbd_img_request_cache);
5152 kmem_cache_destroy(rbd_img_request_cache);
5153 rbd_img_request_cache = NULL;
5154}
5155
cc344fa1 5156static int __init rbd_init(void)
602adf40
YS
5157{
5158 int rc;
5159
1e32d34c
AE
5160 if (!libceph_compatible(NULL)) {
5161 rbd_warn(NULL, "libceph incompatibility (quitting)");
5162
5163 return -EINVAL;
5164 }
1c2a9dfe 5165 rc = rbd_slab_init();
602adf40
YS
5166 if (rc)
5167 return rc;
1c2a9dfe
AE
5168 rc = rbd_sysfs_init();
5169 if (rc)
5170 rbd_slab_exit();
5171 else
5172 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5173
5174 return rc;
602adf40
YS
5175}
5176
cc344fa1 5177static void __exit rbd_exit(void)
602adf40
YS
5178{
5179 rbd_sysfs_cleanup();
1c2a9dfe 5180 rbd_slab_exit();
602adf40
YS
5181}
5182
5183module_init(rbd_init);
5184module_exit(rbd_exit);
5185
5186MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5187MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5188MODULE_DESCRIPTION("rados block device");
5189
5190/* following authorship retained from original osdblk.c */
5191MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5192
5193MODULE_LICENSE("GPL");