]> git.ipfire.org Git - people/arne_f/kernel.git/blame - drivers/block/rbd.c
rbd: fix up the layering warning message
[people/arne_f/kernel.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
f0f8cef5
AE
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
57
58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
59
d4b125e9
AE
60#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61#define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
35d489f9 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
65
66#define RBD_SNAP_HEAD_NAME "-"
67
9e15b77d
AE
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 70#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 71
1e130199 72#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 73
d889140c
AE
74/* Feature bits */
75
5cbf6f12
AE
76#define RBD_FEATURE_LAYERING (1<<0)
77#define RBD_FEATURE_STRIPINGV2 (1<<1)
78#define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
80
81/* Features supported by this (client software) implementation. */
82
770eba6e 83#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 84
81a89793
AE
85/*
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
90 */
602adf40 91#define DEV_NAME_LEN 32
81a89793 92#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
93
94/*
95 * block device image metadata (in-memory version)
96 */
97struct rbd_image_header {
f84344f3 98 /* These four fields never change for a given rbd image */
849b4260 99 char *object_prefix;
34b13184 100 u64 features;
602adf40
YS
101 __u8 obj_order;
102 __u8 crypt_type;
103 __u8 comp_type;
602adf40 104
f84344f3
AE
105 /* The remaining fields need to be updated occasionally */
106 u64 image_size;
107 struct ceph_snap_context *snapc;
602adf40
YS
108 char *snap_names;
109 u64 *snap_sizes;
59c2be1e 110
500d0c0f
AE
111 u64 stripe_unit;
112 u64 stripe_count;
113
59c2be1e
YS
114 u64 obj_version;
115};
116
0d7dbfce
AE
117/*
118 * An rbd image specification.
119 *
120 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
121 * identify an image. Each rbd_dev structure includes a pointer to
122 * an rbd_spec structure that encapsulates this identity.
123 *
124 * Each of the id's in an rbd_spec has an associated name. For a
125 * user-mapped image, the names are supplied and the id's associated
126 * with them are looked up. For a layered image, a parent image is
127 * defined by the tuple, and the names are looked up.
128 *
129 * An rbd_dev structure contains a parent_spec pointer which is
130 * non-null if the image it represents is a child in a layered
131 * image. This pointer will refer to the rbd_spec structure used
132 * by the parent rbd_dev for its own identity (i.e., the structure
133 * is shared between the parent and child).
134 *
135 * Since these structures are populated once, during the discovery
136 * phase of image construction, they are effectively immutable so
137 * we make no effort to synchronize access to them.
138 *
139 * Note that code herein does not assume the image name is known (it
140 * could be a null pointer).
0d7dbfce
AE
141 */
142struct rbd_spec {
143 u64 pool_id;
ecb4dc22 144 const char *pool_name;
0d7dbfce 145
ecb4dc22
AE
146 const char *image_id;
147 const char *image_name;
0d7dbfce
AE
148
149 u64 snap_id;
ecb4dc22 150 const char *snap_name;
0d7dbfce
AE
151
152 struct kref kref;
153};
154
602adf40 155/*
f0f8cef5 156 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
157 */
158struct rbd_client {
159 struct ceph_client *client;
160 struct kref kref;
161 struct list_head node;
162};
163
bf0d5f50
AE
164struct rbd_img_request;
165typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
168
169struct rbd_obj_request;
170typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
9969ebc5
AE
172enum obj_request_type {
173 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174};
bf0d5f50 175
926f9b3f
AE
176enum obj_req_flags {
177 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 178 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
179 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
180 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
181};
182
bf0d5f50
AE
183struct rbd_obj_request {
184 const char *object_name;
185 u64 offset; /* object start byte */
186 u64 length; /* bytes from offset */
926f9b3f 187 unsigned long flags;
bf0d5f50 188
c5b5ef6c
AE
189 /*
190 * An object request associated with an image will have its
191 * img_data flag set; a standalone object request will not.
192 *
193 * A standalone object request will have which == BAD_WHICH
194 * and a null obj_request pointer.
195 *
196 * An object request initiated in support of a layered image
197 * object (to check for its existence before a write) will
198 * have which == BAD_WHICH and a non-null obj_request pointer.
199 *
200 * Finally, an object request for rbd image data will have
201 * which != BAD_WHICH, and will have a non-null img_request
202 * pointer. The value of which will be in the range
203 * 0..(img_request->obj_request_count-1).
204 */
205 union {
206 struct rbd_obj_request *obj_request; /* STAT op */
207 struct {
208 struct rbd_img_request *img_request;
209 u64 img_offset;
210 /* links for img_request->obj_requests list */
211 struct list_head links;
212 };
213 };
bf0d5f50
AE
214 u32 which; /* posn image request list */
215
216 enum obj_request_type type;
788e2df3
AE
217 union {
218 struct bio *bio_list;
219 struct {
220 struct page **pages;
221 u32 page_count;
222 };
223 };
0eefd470 224 struct page **copyup_pages;
bf0d5f50
AE
225
226 struct ceph_osd_request *osd_req;
227
228 u64 xferred; /* bytes transferred */
229 u64 version;
1b83bef2 230 int result;
bf0d5f50
AE
231
232 rbd_obj_callback_t callback;
788e2df3 233 struct completion completion;
bf0d5f50
AE
234
235 struct kref kref;
236};
237
0c425248 238enum img_req_flags {
9849e986
AE
239 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
240 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 241 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
242};
243
bf0d5f50 244struct rbd_img_request {
bf0d5f50
AE
245 struct rbd_device *rbd_dev;
246 u64 offset; /* starting image byte offset */
247 u64 length; /* byte count from offset */
0c425248 248 unsigned long flags;
bf0d5f50 249 union {
9849e986 250 u64 snap_id; /* for reads */
bf0d5f50 251 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
252 };
253 union {
254 struct request *rq; /* block request */
255 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 256 };
3d7efd18 257 struct page **copyup_pages;
bf0d5f50
AE
258 spinlock_t completion_lock;/* protects next_completion */
259 u32 next_completion;
260 rbd_img_callback_t callback;
55f27e09 261 u64 xferred;/* aggregate bytes transferred */
a5a337d4 262 int result; /* first nonzero obj_request result */
bf0d5f50
AE
263
264 u32 obj_request_count;
265 struct list_head obj_requests; /* rbd_obj_request structs */
266
267 struct kref kref;
268};
269
270#define for_each_obj_request(ireq, oreq) \
ef06f4d3 271 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 272#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 273 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 274#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 275 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 276
dfc5606d 277struct rbd_snap {
dfc5606d 278 const char *name;
3591538f 279 u64 size;
dfc5606d
YS
280 struct list_head node;
281 u64 id;
34b13184 282 u64 features;
dfc5606d
YS
283};
284
f84344f3 285struct rbd_mapping {
99c1f08f 286 u64 size;
34b13184 287 u64 features;
f84344f3
AE
288 bool read_only;
289};
290
602adf40
YS
291/*
292 * a single device
293 */
294struct rbd_device {
de71a297 295 int dev_id; /* blkdev unique id */
602adf40
YS
296
297 int major; /* blkdev assigned major */
298 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 299
a30b71b9 300 u32 image_format; /* Either 1 or 2 */
602adf40
YS
301 struct rbd_client *rbd_client;
302
303 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
b82d167b 305 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
306
307 struct rbd_image_header header;
b82d167b 308 unsigned long flags; /* possibly lock protected */
0d7dbfce 309 struct rbd_spec *spec;
602adf40 310
0d7dbfce 311 char *header_name;
971f839a 312
0903e875
AE
313 struct ceph_file_layout layout;
314
59c2be1e 315 struct ceph_osd_event *watch_event;
975241af 316 struct rbd_obj_request *watch_request;
59c2be1e 317
86b00e0d
AE
318 struct rbd_spec *parent_spec;
319 u64 parent_overlap;
2f82ee54 320 struct rbd_device *parent;
86b00e0d 321
c666601a
JD
322 /* protects updating the header */
323 struct rw_semaphore header_rwsem;
f84344f3
AE
324
325 struct rbd_mapping mapping;
602adf40
YS
326
327 struct list_head node;
dfc5606d
YS
328
329 /* list of snapshots */
330 struct list_head snaps;
331
332 /* sysfs related */
333 struct device dev;
b82d167b 334 unsigned long open_count; /* protected by lock */
dfc5606d
YS
335};
336
b82d167b
AE
337/*
338 * Flag bits for rbd_dev->flags. If atomicity is required,
339 * rbd_dev->lock is used to protect access.
340 *
341 * Currently, only the "removing" flag (which is coupled with the
342 * "open_count" field) requires atomic access.
343 */
6d292906
AE
344enum rbd_dev_flags {
345 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 346 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
347};
348
602adf40 349static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 350
602adf40 351static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
352static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
432b8587
AE
354static LIST_HEAD(rbd_client_list); /* clients */
355static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 356
3d7efd18
AE
357static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
304f6808 359static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
304f6808 360
200a6a8b 361static void rbd_dev_device_release(struct device *dev);
6087b51b 362static void rbd_snap_destroy(struct rbd_snap *snap);
dfc5606d 363
f0f8cef5
AE
364static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365 size_t count);
366static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367 size_t count);
71f293e2 368static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
f0f8cef5
AE
369
370static struct bus_attribute rbd_bus_attrs[] = {
371 __ATTR(add, S_IWUSR, NULL, rbd_add),
372 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
373 __ATTR_NULL
374};
375
376static struct bus_type rbd_bus_type = {
377 .name = "rbd",
378 .bus_attrs = rbd_bus_attrs,
379};
380
381static void rbd_root_dev_release(struct device *dev)
382{
383}
384
385static struct device rbd_root_dev = {
386 .init_name = "rbd",
387 .release = rbd_root_dev_release,
388};
389
06ecc6cb
AE
390static __printf(2, 3)
391void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392{
393 struct va_format vaf;
394 va_list args;
395
396 va_start(args, fmt);
397 vaf.fmt = fmt;
398 vaf.va = &args;
399
400 if (!rbd_dev)
401 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402 else if (rbd_dev->disk)
403 printk(KERN_WARNING "%s: %s: %pV\n",
404 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405 else if (rbd_dev->spec && rbd_dev->spec->image_name)
406 printk(KERN_WARNING "%s: image %s: %pV\n",
407 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408 else if (rbd_dev->spec && rbd_dev->spec->image_id)
409 printk(KERN_WARNING "%s: id %s: %pV\n",
410 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411 else /* punt */
412 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413 RBD_DRV_NAME, rbd_dev, &vaf);
414 va_end(args);
415}
416
aafb230e
AE
417#ifdef RBD_DEBUG
418#define rbd_assert(expr) \
419 if (unlikely(!(expr))) { \
420 printk(KERN_ERR "\nAssertion failure in %s() " \
421 "at line %d:\n\n" \
422 "\trbd_assert(%s);\n\n", \
423 __func__, __LINE__, #expr); \
424 BUG(); \
425 }
426#else /* !RBD_DEBUG */
427# define rbd_assert(expr) ((void) 0)
428#endif /* !RBD_DEBUG */
dfc5606d 429
b454e36d 430static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
431static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
432static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 433
117973fb
AE
434static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
435static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 436
602adf40
YS
437static int rbd_open(struct block_device *bdev, fmode_t mode)
438{
f0f8cef5 439 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 440 bool removing = false;
602adf40 441
f84344f3 442 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
443 return -EROFS;
444
a14ea269 445 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
446 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447 removing = true;
448 else
449 rbd_dev->open_count++;
a14ea269 450 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
451 if (removing)
452 return -ENOENT;
453
42382b70 454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 455 (void) get_device(&rbd_dev->dev);
f84344f3 456 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 457 mutex_unlock(&ctl_mutex);
340c7a2b 458
602adf40
YS
459 return 0;
460}
461
dfc5606d
YS
462static int rbd_release(struct gendisk *disk, fmode_t mode)
463{
464 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
465 unsigned long open_count_before;
466
a14ea269 467 spin_lock_irq(&rbd_dev->lock);
b82d167b 468 open_count_before = rbd_dev->open_count--;
a14ea269 469 spin_unlock_irq(&rbd_dev->lock);
b82d167b 470 rbd_assert(open_count_before > 0);
dfc5606d 471
42382b70 472 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 473 put_device(&rbd_dev->dev);
42382b70 474 mutex_unlock(&ctl_mutex);
dfc5606d
YS
475
476 return 0;
477}
478
602adf40
YS
479static const struct block_device_operations rbd_bd_ops = {
480 .owner = THIS_MODULE,
481 .open = rbd_open,
dfc5606d 482 .release = rbd_release,
602adf40
YS
483};
484
485/*
486 * Initialize an rbd client instance.
43ae4701 487 * We own *ceph_opts.
602adf40 488 */
f8c38929 489static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
490{
491 struct rbd_client *rbdc;
492 int ret = -ENOMEM;
493
37206ee5 494 dout("%s:\n", __func__);
602adf40
YS
495 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496 if (!rbdc)
497 goto out_opt;
498
499 kref_init(&rbdc->kref);
500 INIT_LIST_HEAD(&rbdc->node);
501
bc534d86
AE
502 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
43ae4701 504 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 505 if (IS_ERR(rbdc->client))
bc534d86 506 goto out_mutex;
43ae4701 507 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
508
509 ret = ceph_open_session(rbdc->client);
510 if (ret < 0)
511 goto out_err;
512
432b8587 513 spin_lock(&rbd_client_list_lock);
602adf40 514 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 515 spin_unlock(&rbd_client_list_lock);
602adf40 516
bc534d86 517 mutex_unlock(&ctl_mutex);
37206ee5 518 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 519
602adf40
YS
520 return rbdc;
521
522out_err:
523 ceph_destroy_client(rbdc->client);
bc534d86
AE
524out_mutex:
525 mutex_unlock(&ctl_mutex);
602adf40
YS
526 kfree(rbdc);
527out_opt:
43ae4701
AE
528 if (ceph_opts)
529 ceph_destroy_options(ceph_opts);
37206ee5
AE
530 dout("%s: error %d\n", __func__, ret);
531
28f259b7 532 return ERR_PTR(ret);
602adf40
YS
533}
534
2f82ee54
AE
535static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536{
537 kref_get(&rbdc->kref);
538
539 return rbdc;
540}
541
602adf40 542/*
1f7ba331
AE
543 * Find a ceph client with specific addr and configuration. If
544 * found, bump its reference count.
602adf40 545 */
1f7ba331 546static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
547{
548 struct rbd_client *client_node;
1f7ba331 549 bool found = false;
602adf40 550
43ae4701 551 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
552 return NULL;
553
1f7ba331
AE
554 spin_lock(&rbd_client_list_lock);
555 list_for_each_entry(client_node, &rbd_client_list, node) {
556 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
557 __rbd_get_client(client_node);
558
1f7ba331
AE
559 found = true;
560 break;
561 }
562 }
563 spin_unlock(&rbd_client_list_lock);
564
565 return found ? client_node : NULL;
602adf40
YS
566}
567
59c2be1e
YS
568/*
569 * mount options
570 */
571enum {
59c2be1e
YS
572 Opt_last_int,
573 /* int args above */
574 Opt_last_string,
575 /* string args above */
cc0538b6
AE
576 Opt_read_only,
577 Opt_read_write,
578 /* Boolean args above */
579 Opt_last_bool,
59c2be1e
YS
580};
581
43ae4701 582static match_table_t rbd_opts_tokens = {
59c2be1e
YS
583 /* int args above */
584 /* string args above */
be466c1c 585 {Opt_read_only, "read_only"},
cc0538b6
AE
586 {Opt_read_only, "ro"}, /* Alternate spelling */
587 {Opt_read_write, "read_write"},
588 {Opt_read_write, "rw"}, /* Alternate spelling */
589 /* Boolean args above */
59c2be1e
YS
590 {-1, NULL}
591};
592
98571b5a
AE
593struct rbd_options {
594 bool read_only;
595};
596
597#define RBD_READ_ONLY_DEFAULT false
598
59c2be1e
YS
599static int parse_rbd_opts_token(char *c, void *private)
600{
43ae4701 601 struct rbd_options *rbd_opts = private;
59c2be1e
YS
602 substring_t argstr[MAX_OPT_ARGS];
603 int token, intval, ret;
604
43ae4701 605 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
606 if (token < 0)
607 return -EINVAL;
608
609 if (token < Opt_last_int) {
610 ret = match_int(&argstr[0], &intval);
611 if (ret < 0) {
612 pr_err("bad mount option arg (not int) "
613 "at '%s'\n", c);
614 return ret;
615 }
616 dout("got int token %d val %d\n", token, intval);
617 } else if (token > Opt_last_int && token < Opt_last_string) {
618 dout("got string token %d val %s\n", token,
619 argstr[0].from);
cc0538b6
AE
620 } else if (token > Opt_last_string && token < Opt_last_bool) {
621 dout("got Boolean token %d\n", token);
59c2be1e
YS
622 } else {
623 dout("got token %d\n", token);
624 }
625
626 switch (token) {
cc0538b6
AE
627 case Opt_read_only:
628 rbd_opts->read_only = true;
629 break;
630 case Opt_read_write:
631 rbd_opts->read_only = false;
632 break;
59c2be1e 633 default:
aafb230e
AE
634 rbd_assert(false);
635 break;
59c2be1e
YS
636 }
637 return 0;
638}
639
602adf40
YS
640/*
641 * Get a ceph client with specific addr and configuration, if one does
642 * not exist create it.
643 */
9d3997fd 644static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 645{
f8c38929 646 struct rbd_client *rbdc;
59c2be1e 647
1f7ba331 648 rbdc = rbd_client_find(ceph_opts);
9d3997fd 649 if (rbdc) /* using an existing client */
43ae4701 650 ceph_destroy_options(ceph_opts);
9d3997fd 651 else
f8c38929 652 rbdc = rbd_client_create(ceph_opts);
602adf40 653
9d3997fd 654 return rbdc;
602adf40
YS
655}
656
657/*
658 * Destroy ceph client
d23a4b3f 659 *
432b8587 660 * Caller must hold rbd_client_list_lock.
602adf40
YS
661 */
662static void rbd_client_release(struct kref *kref)
663{
664 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
37206ee5 666 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 667 spin_lock(&rbd_client_list_lock);
602adf40 668 list_del(&rbdc->node);
cd9d9f5d 669 spin_unlock(&rbd_client_list_lock);
602adf40
YS
670
671 ceph_destroy_client(rbdc->client);
672 kfree(rbdc);
673}
674
675/*
676 * Drop reference to ceph client node. If it's not referenced anymore, release
677 * it.
678 */
9d3997fd 679static void rbd_put_client(struct rbd_client *rbdc)
602adf40 680{
c53d5893
AE
681 if (rbdc)
682 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
683}
684
a30b71b9
AE
685static bool rbd_image_format_valid(u32 image_format)
686{
687 return image_format == 1 || image_format == 2;
688}
689
8e94af8e
AE
690static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
691{
103a150f
AE
692 size_t size;
693 u32 snap_count;
694
695 /* The header has to start with the magic rbd header text */
696 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697 return false;
698
db2388b6
AE
699 /* The bio layer requires at least sector-sized I/O */
700
701 if (ondisk->options.order < SECTOR_SHIFT)
702 return false;
703
704 /* If we use u64 in a few spots we may be able to loosen this */
705
706 if (ondisk->options.order > 8 * sizeof (int) - 1)
707 return false;
708
103a150f
AE
709 /*
710 * The size of a snapshot header has to fit in a size_t, and
711 * that limits the number of snapshots.
712 */
713 snap_count = le32_to_cpu(ondisk->snap_count);
714 size = SIZE_MAX - sizeof (struct ceph_snap_context);
715 if (snap_count > size / sizeof (__le64))
716 return false;
717
718 /*
719 * Not only that, but the size of the entire the snapshot
720 * header must also be representable in a size_t.
721 */
722 size -= snap_count * sizeof (__le64);
723 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724 return false;
725
726 return true;
8e94af8e
AE
727}
728
602adf40
YS
729/*
730 * Create a new header structure, translate header format from the on-disk
731 * header.
732 */
733static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 734 struct rbd_image_header_ondisk *ondisk)
602adf40 735{
ccece235 736 u32 snap_count;
58c17b0e 737 size_t len;
d2bb24e5 738 size_t size;
621901d6 739 u32 i;
602adf40 740
6a52325f
AE
741 memset(header, 0, sizeof (*header));
742
103a150f
AE
743 snap_count = le32_to_cpu(ondisk->snap_count);
744
58c17b0e
AE
745 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
746 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 747 if (!header->object_prefix)
602adf40 748 return -ENOMEM;
58c17b0e
AE
749 memcpy(header->object_prefix, ondisk->object_prefix, len);
750 header->object_prefix[len] = '\0';
00f1f36f 751
602adf40 752 if (snap_count) {
f785cc1d
AE
753 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
754
621901d6
AE
755 /* Save a copy of the snapshot names */
756
f785cc1d
AE
757 if (snap_names_len > (u64) SIZE_MAX)
758 return -EIO;
759 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 760 if (!header->snap_names)
6a52325f 761 goto out_err;
f785cc1d
AE
762 /*
763 * Note that rbd_dev_v1_header_read() guarantees
764 * the ondisk buffer we're working with has
765 * snap_names_len bytes beyond the end of the
766 * snapshot id array, this memcpy() is safe.
767 */
768 memcpy(header->snap_names, &ondisk->snaps[snap_count],
769 snap_names_len);
6a52325f 770
621901d6
AE
771 /* Record each snapshot's size */
772
d2bb24e5
AE
773 size = snap_count * sizeof (*header->snap_sizes);
774 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 775 if (!header->snap_sizes)
6a52325f 776 goto out_err;
621901d6
AE
777 for (i = 0; i < snap_count; i++)
778 header->snap_sizes[i] =
779 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40
YS
780 } else {
781 header->snap_names = NULL;
782 header->snap_sizes = NULL;
783 }
849b4260 784
34b13184 785 header->features = 0; /* No features support in v1 images */
602adf40
YS
786 header->obj_order = ondisk->options.order;
787 header->crypt_type = ondisk->options.crypt_type;
788 header->comp_type = ondisk->options.comp_type;
6a52325f 789
621901d6
AE
790 /* Allocate and fill in the snapshot context */
791
f84344f3 792 header->image_size = le64_to_cpu(ondisk->image_size);
468521c1 793
812164f8 794 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6a52325f
AE
795 if (!header->snapc)
796 goto out_err;
505cbb9b 797 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
621901d6 798 for (i = 0; i < snap_count; i++)
468521c1 799 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
800
801 return 0;
802
6a52325f 803out_err:
849b4260 804 kfree(header->snap_sizes);
ccece235 805 header->snap_sizes = NULL;
602adf40 806 kfree(header->snap_names);
ccece235 807 header->snap_names = NULL;
6a52325f
AE
808 kfree(header->object_prefix);
809 header->object_prefix = NULL;
ccece235 810
00f1f36f 811 return -ENOMEM;
602adf40
YS
812}
813
9e15b77d
AE
814static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
815{
816 struct rbd_snap *snap;
817
818 if (snap_id == CEPH_NOSNAP)
819 return RBD_SNAP_HEAD_NAME;
820
821 list_for_each_entry(snap, &rbd_dev->snaps, node)
822 if (snap_id == snap->id)
823 return snap->name;
824
825 return NULL;
826}
827
8b0241f8
AE
828static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
829 const char *snap_name)
602adf40 830{
e86924a8 831 struct rbd_snap *snap;
602adf40 832
8b0241f8
AE
833 list_for_each_entry(snap, &rbd_dev->snaps, node)
834 if (!strcmp(snap_name, snap->name))
835 return snap;
e86924a8 836
8b0241f8 837 return NULL;
602adf40
YS
838}
839
d1cf5788 840static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
602adf40 841{
0d7dbfce 842 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 843 sizeof (RBD_SNAP_HEAD_NAME))) {
99c1f08f 844 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 845 rbd_dev->mapping.features = rbd_dev->header.features;
602adf40 846 } else {
8b0241f8
AE
847 struct rbd_snap *snap;
848
849 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
850 if (!snap)
851 return -ENOENT;
8b0241f8
AE
852 rbd_dev->mapping.size = snap->size;
853 rbd_dev->mapping.features = snap->features;
f84344f3 854 rbd_dev->mapping.read_only = true;
602adf40 855 }
6d292906 856
8b0241f8 857 return 0;
602adf40
YS
858}
859
d1cf5788
AE
860static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
861{
862 rbd_dev->mapping.size = 0;
863 rbd_dev->mapping.features = 0;
864 rbd_dev->mapping.read_only = true;
865}
866
200a6a8b
AE
867static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
868{
869 rbd_dev->mapping.size = 0;
870 rbd_dev->mapping.features = 0;
871 rbd_dev->mapping.read_only = true;
872}
873
98571b5a 874static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 875{
65ccfe21
AE
876 char *name;
877 u64 segment;
878 int ret;
602adf40 879
2fd82b9e 880 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
881 if (!name)
882 return NULL;
883 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 884 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 885 rbd_dev->header.object_prefix, segment);
2fd82b9e 886 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
887 pr_err("error formatting segment name for #%llu (%d)\n",
888 segment, ret);
889 kfree(name);
890 name = NULL;
891 }
602adf40 892
65ccfe21
AE
893 return name;
894}
602adf40 895
65ccfe21
AE
896static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
897{
898 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 899
65ccfe21
AE
900 return offset & (segment_size - 1);
901}
902
903static u64 rbd_segment_length(struct rbd_device *rbd_dev,
904 u64 offset, u64 length)
905{
906 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
907
908 offset &= segment_size - 1;
909
aafb230e 910 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
911 if (offset + length > segment_size)
912 length = segment_size - offset;
913
914 return length;
602adf40
YS
915}
916
029bcbd8
JD
917/*
918 * returns the size of an object in the image
919 */
920static u64 rbd_obj_bytes(struct rbd_image_header *header)
921{
922 return 1 << header->obj_order;
923}
924
602adf40
YS
925/*
926 * bio helpers
927 */
928
929static void bio_chain_put(struct bio *chain)
930{
931 struct bio *tmp;
932
933 while (chain) {
934 tmp = chain;
935 chain = chain->bi_next;
936 bio_put(tmp);
937 }
938}
939
940/*
941 * zeros a bio chain, starting at specific offset
942 */
943static void zero_bio_chain(struct bio *chain, int start_ofs)
944{
945 struct bio_vec *bv;
946 unsigned long flags;
947 void *buf;
948 int i;
949 int pos = 0;
950
951 while (chain) {
952 bio_for_each_segment(bv, chain, i) {
953 if (pos + bv->bv_len > start_ofs) {
954 int remainder = max(start_ofs - pos, 0);
955 buf = bvec_kmap_irq(bv, &flags);
956 memset(buf + remainder, 0,
957 bv->bv_len - remainder);
85b5aaa6 958 bvec_kunmap_irq(buf, &flags);
602adf40
YS
959 }
960 pos += bv->bv_len;
961 }
962
963 chain = chain->bi_next;
964 }
965}
966
b9434c5b
AE
967/*
968 * similar to zero_bio_chain(), zeros data defined by a page array,
969 * starting at the given byte offset from the start of the array and
970 * continuing up to the given end offset. The pages array is
971 * assumed to be big enough to hold all bytes up to the end.
972 */
973static void zero_pages(struct page **pages, u64 offset, u64 end)
974{
975 struct page **page = &pages[offset >> PAGE_SHIFT];
976
977 rbd_assert(end > offset);
978 rbd_assert(end - offset <= (u64)SIZE_MAX);
979 while (offset < end) {
980 size_t page_offset;
981 size_t length;
982 unsigned long flags;
983 void *kaddr;
984
985 page_offset = (size_t)(offset & ~PAGE_MASK);
986 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
987 local_irq_save(flags);
988 kaddr = kmap_atomic(*page);
989 memset(kaddr + page_offset, 0, length);
990 kunmap_atomic(kaddr);
991 local_irq_restore(flags);
992
993 offset += length;
994 page++;
995 }
996}
997
602adf40 998/*
f7760dad
AE
999 * Clone a portion of a bio, starting at the given byte offset
1000 * and continuing for the number of bytes indicated.
602adf40 1001 */
f7760dad
AE
1002static struct bio *bio_clone_range(struct bio *bio_src,
1003 unsigned int offset,
1004 unsigned int len,
1005 gfp_t gfpmask)
602adf40 1006{
f7760dad
AE
1007 struct bio_vec *bv;
1008 unsigned int resid;
1009 unsigned short idx;
1010 unsigned int voff;
1011 unsigned short end_idx;
1012 unsigned short vcnt;
1013 struct bio *bio;
1014
1015 /* Handle the easy case for the caller */
1016
1017 if (!offset && len == bio_src->bi_size)
1018 return bio_clone(bio_src, gfpmask);
1019
1020 if (WARN_ON_ONCE(!len))
1021 return NULL;
1022 if (WARN_ON_ONCE(len > bio_src->bi_size))
1023 return NULL;
1024 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1025 return NULL;
1026
1027 /* Find first affected segment... */
1028
1029 resid = offset;
1030 __bio_for_each_segment(bv, bio_src, idx, 0) {
1031 if (resid < bv->bv_len)
1032 break;
1033 resid -= bv->bv_len;
602adf40 1034 }
f7760dad 1035 voff = resid;
602adf40 1036
f7760dad 1037 /* ...and the last affected segment */
602adf40 1038
f7760dad
AE
1039 resid += len;
1040 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1041 if (resid <= bv->bv_len)
1042 break;
1043 resid -= bv->bv_len;
1044 }
1045 vcnt = end_idx - idx + 1;
1046
1047 /* Build the clone */
1048
1049 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1050 if (!bio)
1051 return NULL; /* ENOMEM */
602adf40 1052
f7760dad
AE
1053 bio->bi_bdev = bio_src->bi_bdev;
1054 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1055 bio->bi_rw = bio_src->bi_rw;
1056 bio->bi_flags |= 1 << BIO_CLONED;
1057
1058 /*
1059 * Copy over our part of the bio_vec, then update the first
1060 * and last (or only) entries.
1061 */
1062 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1063 vcnt * sizeof (struct bio_vec));
1064 bio->bi_io_vec[0].bv_offset += voff;
1065 if (vcnt > 1) {
1066 bio->bi_io_vec[0].bv_len -= voff;
1067 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1068 } else {
1069 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1070 }
1071
f7760dad
AE
1072 bio->bi_vcnt = vcnt;
1073 bio->bi_size = len;
1074 bio->bi_idx = 0;
1075
1076 return bio;
1077}
1078
1079/*
1080 * Clone a portion of a bio chain, starting at the given byte offset
1081 * into the first bio in the source chain and continuing for the
1082 * number of bytes indicated. The result is another bio chain of
1083 * exactly the given length, or a null pointer on error.
1084 *
1085 * The bio_src and offset parameters are both in-out. On entry they
1086 * refer to the first source bio and the offset into that bio where
1087 * the start of data to be cloned is located.
1088 *
1089 * On return, bio_src is updated to refer to the bio in the source
1090 * chain that contains first un-cloned byte, and *offset will
1091 * contain the offset of that byte within that bio.
1092 */
1093static struct bio *bio_chain_clone_range(struct bio **bio_src,
1094 unsigned int *offset,
1095 unsigned int len,
1096 gfp_t gfpmask)
1097{
1098 struct bio *bi = *bio_src;
1099 unsigned int off = *offset;
1100 struct bio *chain = NULL;
1101 struct bio **end;
1102
1103 /* Build up a chain of clone bios up to the limit */
1104
1105 if (!bi || off >= bi->bi_size || !len)
1106 return NULL; /* Nothing to clone */
602adf40 1107
f7760dad
AE
1108 end = &chain;
1109 while (len) {
1110 unsigned int bi_size;
1111 struct bio *bio;
1112
f5400b7a
AE
1113 if (!bi) {
1114 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1115 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1116 }
f7760dad
AE
1117 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1118 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1119 if (!bio)
1120 goto out_err; /* ENOMEM */
1121
1122 *end = bio;
1123 end = &bio->bi_next;
602adf40 1124
f7760dad
AE
1125 off += bi_size;
1126 if (off == bi->bi_size) {
1127 bi = bi->bi_next;
1128 off = 0;
1129 }
1130 len -= bi_size;
1131 }
1132 *bio_src = bi;
1133 *offset = off;
1134
1135 return chain;
1136out_err:
1137 bio_chain_put(chain);
602adf40 1138
602adf40
YS
1139 return NULL;
1140}
1141
926f9b3f
AE
1142/*
1143 * The default/initial value for all object request flags is 0. For
1144 * each flag, once its value is set to 1 it is never reset to 0
1145 * again.
1146 */
57acbaa7 1147static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1148{
57acbaa7 1149 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1150 struct rbd_device *rbd_dev;
1151
57acbaa7
AE
1152 rbd_dev = obj_request->img_request->rbd_dev;
1153 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1154 obj_request);
1155 }
1156}
1157
57acbaa7 1158static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1159{
1160 smp_mb();
57acbaa7 1161 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1162}
1163
57acbaa7 1164static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1165{
57acbaa7
AE
1166 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1167 struct rbd_device *rbd_dev = NULL;
6365d33a 1168
57acbaa7
AE
1169 if (obj_request_img_data_test(obj_request))
1170 rbd_dev = obj_request->img_request->rbd_dev;
1171 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1172 obj_request);
1173 }
1174}
1175
57acbaa7 1176static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1177{
1178 smp_mb();
57acbaa7 1179 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1180}
1181
5679c59f
AE
1182/*
1183 * This sets the KNOWN flag after (possibly) setting the EXISTS
1184 * flag. The latter is set based on the "exists" value provided.
1185 *
1186 * Note that for our purposes once an object exists it never goes
1187 * away again. It's possible that the response from two existence
1188 * checks are separated by the creation of the target object, and
1189 * the first ("doesn't exist") response arrives *after* the second
1190 * ("does exist"). In that case we ignore the second one.
1191 */
1192static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1193 bool exists)
1194{
1195 if (exists)
1196 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1197 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1198 smp_mb();
1199}
1200
1201static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1202{
1203 smp_mb();
1204 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1205}
1206
1207static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1208{
1209 smp_mb();
1210 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1211}
1212
bf0d5f50
AE
1213static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1214{
37206ee5
AE
1215 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1216 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1217 kref_get(&obj_request->kref);
1218}
1219
1220static void rbd_obj_request_destroy(struct kref *kref);
1221static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1222{
1223 rbd_assert(obj_request != NULL);
37206ee5
AE
1224 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1225 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1226 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1227}
1228
1229static void rbd_img_request_get(struct rbd_img_request *img_request)
1230{
37206ee5
AE
1231 dout("%s: img %p (was %d)\n", __func__, img_request,
1232 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1233 kref_get(&img_request->kref);
1234}
1235
1236static void rbd_img_request_destroy(struct kref *kref);
1237static void rbd_img_request_put(struct rbd_img_request *img_request)
1238{
1239 rbd_assert(img_request != NULL);
37206ee5
AE
1240 dout("%s: img %p (was %d)\n", __func__, img_request,
1241 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1242 kref_put(&img_request->kref, rbd_img_request_destroy);
1243}
1244
1245static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1246 struct rbd_obj_request *obj_request)
1247{
25dcf954
AE
1248 rbd_assert(obj_request->img_request == NULL);
1249
b155e86c 1250 /* Image request now owns object's original reference */
bf0d5f50 1251 obj_request->img_request = img_request;
25dcf954 1252 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1253 rbd_assert(!obj_request_img_data_test(obj_request));
1254 obj_request_img_data_set(obj_request);
bf0d5f50 1255 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1256 img_request->obj_request_count++;
1257 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1258 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1259 obj_request->which);
bf0d5f50
AE
1260}
1261
1262static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1263 struct rbd_obj_request *obj_request)
1264{
1265 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1266
37206ee5
AE
1267 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1268 obj_request->which);
bf0d5f50 1269 list_del(&obj_request->links);
25dcf954
AE
1270 rbd_assert(img_request->obj_request_count > 0);
1271 img_request->obj_request_count--;
1272 rbd_assert(obj_request->which == img_request->obj_request_count);
1273 obj_request->which = BAD_WHICH;
6365d33a 1274 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1275 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1276 obj_request->img_request = NULL;
25dcf954 1277 obj_request->callback = NULL;
bf0d5f50
AE
1278 rbd_obj_request_put(obj_request);
1279}
1280
1281static bool obj_request_type_valid(enum obj_request_type type)
1282{
1283 switch (type) {
9969ebc5 1284 case OBJ_REQUEST_NODATA:
bf0d5f50 1285 case OBJ_REQUEST_BIO:
788e2df3 1286 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1287 return true;
1288 default:
1289 return false;
1290 }
1291}
1292
bf0d5f50
AE
1293static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1294 struct rbd_obj_request *obj_request)
1295{
37206ee5
AE
1296 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1297
bf0d5f50
AE
1298 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1299}
1300
1301static void rbd_img_request_complete(struct rbd_img_request *img_request)
1302{
55f27e09 1303
37206ee5 1304 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1305
1306 /*
1307 * If no error occurred, compute the aggregate transfer
1308 * count for the image request. We could instead use
1309 * atomic64_cmpxchg() to update it as each object request
1310 * completes; not clear which way is better off hand.
1311 */
1312 if (!img_request->result) {
1313 struct rbd_obj_request *obj_request;
1314 u64 xferred = 0;
1315
1316 for_each_obj_request(img_request, obj_request)
1317 xferred += obj_request->xferred;
1318 img_request->xferred = xferred;
1319 }
1320
bf0d5f50
AE
1321 if (img_request->callback)
1322 img_request->callback(img_request);
1323 else
1324 rbd_img_request_put(img_request);
1325}
1326
788e2df3
AE
1327/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1328
1329static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1330{
37206ee5
AE
1331 dout("%s: obj %p\n", __func__, obj_request);
1332
788e2df3
AE
1333 return wait_for_completion_interruptible(&obj_request->completion);
1334}
1335
0c425248
AE
1336/*
1337 * The default/initial value for all image request flags is 0. Each
1338 * is conditionally set to 1 at image request initialization time
1339 * and currently never change thereafter.
1340 */
1341static void img_request_write_set(struct rbd_img_request *img_request)
1342{
1343 set_bit(IMG_REQ_WRITE, &img_request->flags);
1344 smp_mb();
1345}
1346
1347static bool img_request_write_test(struct rbd_img_request *img_request)
1348{
1349 smp_mb();
1350 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1351}
1352
9849e986
AE
1353static void img_request_child_set(struct rbd_img_request *img_request)
1354{
1355 set_bit(IMG_REQ_CHILD, &img_request->flags);
1356 smp_mb();
1357}
1358
1359static bool img_request_child_test(struct rbd_img_request *img_request)
1360{
1361 smp_mb();
1362 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1363}
1364
d0b2e944
AE
1365static void img_request_layered_set(struct rbd_img_request *img_request)
1366{
1367 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1368 smp_mb();
1369}
1370
1371static bool img_request_layered_test(struct rbd_img_request *img_request)
1372{
1373 smp_mb();
1374 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1375}
1376
6e2a4505
AE
1377static void
1378rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1379{
b9434c5b
AE
1380 u64 xferred = obj_request->xferred;
1381 u64 length = obj_request->length;
1382
6e2a4505
AE
1383 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1384 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1385 xferred, length);
6e2a4505
AE
1386 /*
1387 * ENOENT means a hole in the image. We zero-fill the
1388 * entire length of the request. A short read also implies
1389 * zero-fill to the end of the request. Either way we
1390 * update the xferred count to indicate the whole request
1391 * was satisfied.
1392 */
b9434c5b 1393 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1394 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1395 if (obj_request->type == OBJ_REQUEST_BIO)
1396 zero_bio_chain(obj_request->bio_list, 0);
1397 else
1398 zero_pages(obj_request->pages, 0, length);
6e2a4505 1399 obj_request->result = 0;
b9434c5b
AE
1400 obj_request->xferred = length;
1401 } else if (xferred < length && !obj_request->result) {
1402 if (obj_request->type == OBJ_REQUEST_BIO)
1403 zero_bio_chain(obj_request->bio_list, xferred);
1404 else
1405 zero_pages(obj_request->pages, xferred, length);
1406 obj_request->xferred = length;
6e2a4505
AE
1407 }
1408 obj_request_done_set(obj_request);
1409}
1410
bf0d5f50
AE
1411static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1412{
37206ee5
AE
1413 dout("%s: obj %p cb %p\n", __func__, obj_request,
1414 obj_request->callback);
bf0d5f50
AE
1415 if (obj_request->callback)
1416 obj_request->callback(obj_request);
788e2df3
AE
1417 else
1418 complete_all(&obj_request->completion);
bf0d5f50
AE
1419}
1420
c47f9371 1421static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1422{
1423 dout("%s: obj %p\n", __func__, obj_request);
1424 obj_request_done_set(obj_request);
1425}
1426
c47f9371 1427static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1428{
57acbaa7 1429 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1430 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1431 bool layered = false;
1432
1433 if (obj_request_img_data_test(obj_request)) {
1434 img_request = obj_request->img_request;
1435 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1436 rbd_dev = img_request->rbd_dev;
57acbaa7 1437 }
8b3e1a56
AE
1438
1439 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1440 obj_request, img_request, obj_request->result,
1441 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1442 if (layered && obj_request->result == -ENOENT &&
1443 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1444 rbd_img_parent_read(obj_request);
1445 else if (img_request)
6e2a4505
AE
1446 rbd_img_obj_request_read_callback(obj_request);
1447 else
1448 obj_request_done_set(obj_request);
bf0d5f50
AE
1449}
1450
c47f9371 1451static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1452{
1b83bef2
SW
1453 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1454 obj_request->result, obj_request->length);
1455 /*
8b3e1a56
AE
1456 * There is no such thing as a successful short write. Set
1457 * it to our originally-requested length.
1b83bef2
SW
1458 */
1459 obj_request->xferred = obj_request->length;
07741308 1460 obj_request_done_set(obj_request);
bf0d5f50
AE
1461}
1462
fbfab539
AE
1463/*
1464 * For a simple stat call there's nothing to do. We'll do more if
1465 * this is part of a write sequence for a layered image.
1466 */
c47f9371 1467static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1468{
37206ee5 1469 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1470 obj_request_done_set(obj_request);
1471}
1472
bf0d5f50
AE
1473static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1474 struct ceph_msg *msg)
1475{
1476 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1477 u16 opcode;
1478
37206ee5 1479 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1480 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1481 if (obj_request_img_data_test(obj_request)) {
1482 rbd_assert(obj_request->img_request);
1483 rbd_assert(obj_request->which != BAD_WHICH);
1484 } else {
1485 rbd_assert(obj_request->which == BAD_WHICH);
1486 }
bf0d5f50 1487
1b83bef2
SW
1488 if (osd_req->r_result < 0)
1489 obj_request->result = osd_req->r_result;
bf0d5f50
AE
1490 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1491
0eefd470 1492 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1493
c47f9371
AE
1494 /*
1495 * We support a 64-bit length, but ultimately it has to be
1496 * passed to blk_end_request(), which takes an unsigned int.
1497 */
1b83bef2 1498 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1499 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1500 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1501 switch (opcode) {
1502 case CEPH_OSD_OP_READ:
c47f9371 1503 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1504 break;
1505 case CEPH_OSD_OP_WRITE:
c47f9371 1506 rbd_osd_write_callback(obj_request);
bf0d5f50 1507 break;
fbfab539 1508 case CEPH_OSD_OP_STAT:
c47f9371 1509 rbd_osd_stat_callback(obj_request);
fbfab539 1510 break;
36be9a76 1511 case CEPH_OSD_OP_CALL:
b8d70035 1512 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1513 case CEPH_OSD_OP_WATCH:
c47f9371 1514 rbd_osd_trivial_callback(obj_request);
9969ebc5 1515 break;
bf0d5f50
AE
1516 default:
1517 rbd_warn(NULL, "%s: unsupported op %hu\n",
1518 obj_request->object_name, (unsigned short) opcode);
1519 break;
1520 }
1521
07741308 1522 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1523 rbd_obj_request_complete(obj_request);
1524}
1525
9d4df01f 1526static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1527{
1528 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1529 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1530 u64 snap_id;
430c28c3 1531
8c042b0d 1532 rbd_assert(osd_req != NULL);
430c28c3 1533
9d4df01f 1534 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1535 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1536 NULL, snap_id, NULL);
1537}
1538
1539static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1540{
1541 struct rbd_img_request *img_request = obj_request->img_request;
1542 struct ceph_osd_request *osd_req = obj_request->osd_req;
1543 struct ceph_snap_context *snapc;
1544 struct timespec mtime = CURRENT_TIME;
1545
1546 rbd_assert(osd_req != NULL);
1547
1548 snapc = img_request ? img_request->snapc : NULL;
1549 ceph_osdc_build_request(osd_req, obj_request->offset,
1550 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1551}
1552
bf0d5f50
AE
1553static struct ceph_osd_request *rbd_osd_req_create(
1554 struct rbd_device *rbd_dev,
1555 bool write_request,
430c28c3 1556 struct rbd_obj_request *obj_request)
bf0d5f50 1557{
bf0d5f50
AE
1558 struct ceph_snap_context *snapc = NULL;
1559 struct ceph_osd_client *osdc;
1560 struct ceph_osd_request *osd_req;
bf0d5f50 1561
6365d33a
AE
1562 if (obj_request_img_data_test(obj_request)) {
1563 struct rbd_img_request *img_request = obj_request->img_request;
1564
0c425248
AE
1565 rbd_assert(write_request ==
1566 img_request_write_test(img_request));
1567 if (write_request)
bf0d5f50 1568 snapc = img_request->snapc;
bf0d5f50
AE
1569 }
1570
1571 /* Allocate and initialize the request, for the single op */
1572
1573 osdc = &rbd_dev->rbd_client->client->osdc;
1574 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1575 if (!osd_req)
1576 return NULL; /* ENOMEM */
bf0d5f50 1577
430c28c3 1578 if (write_request)
bf0d5f50 1579 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1580 else
bf0d5f50 1581 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1582
1583 osd_req->r_callback = rbd_osd_req_callback;
1584 osd_req->r_priv = obj_request;
1585
1586 osd_req->r_oid_len = strlen(obj_request->object_name);
1587 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1588 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1589
1590 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1591
bf0d5f50
AE
1592 return osd_req;
1593}
1594
0eefd470
AE
1595/*
1596 * Create a copyup osd request based on the information in the
1597 * object request supplied. A copyup request has two osd ops,
1598 * a copyup method call, and a "normal" write request.
1599 */
1600static struct ceph_osd_request *
1601rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1602{
1603 struct rbd_img_request *img_request;
1604 struct ceph_snap_context *snapc;
1605 struct rbd_device *rbd_dev;
1606 struct ceph_osd_client *osdc;
1607 struct ceph_osd_request *osd_req;
1608
1609 rbd_assert(obj_request_img_data_test(obj_request));
1610 img_request = obj_request->img_request;
1611 rbd_assert(img_request);
1612 rbd_assert(img_request_write_test(img_request));
1613
1614 /* Allocate and initialize the request, for the two ops */
1615
1616 snapc = img_request->snapc;
1617 rbd_dev = img_request->rbd_dev;
1618 osdc = &rbd_dev->rbd_client->client->osdc;
1619 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1620 if (!osd_req)
1621 return NULL; /* ENOMEM */
1622
1623 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1624 osd_req->r_callback = rbd_osd_req_callback;
1625 osd_req->r_priv = obj_request;
1626
1627 osd_req->r_oid_len = strlen(obj_request->object_name);
1628 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1629 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1630
1631 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1632
1633 return osd_req;
1634}
1635
1636
bf0d5f50
AE
1637static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1638{
1639 ceph_osdc_put_request(osd_req);
1640}
1641
1642/* object_name is assumed to be a non-null pointer and NUL-terminated */
1643
1644static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1645 u64 offset, u64 length,
1646 enum obj_request_type type)
1647{
1648 struct rbd_obj_request *obj_request;
1649 size_t size;
1650 char *name;
1651
1652 rbd_assert(obj_request_type_valid(type));
1653
1654 size = strlen(object_name) + 1;
1655 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1656 if (!obj_request)
1657 return NULL;
1658
1659 name = (char *)(obj_request + 1);
1660 obj_request->object_name = memcpy(name, object_name, size);
1661 obj_request->offset = offset;
1662 obj_request->length = length;
926f9b3f 1663 obj_request->flags = 0;
bf0d5f50
AE
1664 obj_request->which = BAD_WHICH;
1665 obj_request->type = type;
1666 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1667 init_completion(&obj_request->completion);
bf0d5f50
AE
1668 kref_init(&obj_request->kref);
1669
37206ee5
AE
1670 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1671 offset, length, (int)type, obj_request);
1672
bf0d5f50
AE
1673 return obj_request;
1674}
1675
1676static void rbd_obj_request_destroy(struct kref *kref)
1677{
1678 struct rbd_obj_request *obj_request;
1679
1680 obj_request = container_of(kref, struct rbd_obj_request, kref);
1681
37206ee5
AE
1682 dout("%s: obj %p\n", __func__, obj_request);
1683
bf0d5f50
AE
1684 rbd_assert(obj_request->img_request == NULL);
1685 rbd_assert(obj_request->which == BAD_WHICH);
1686
1687 if (obj_request->osd_req)
1688 rbd_osd_req_destroy(obj_request->osd_req);
1689
1690 rbd_assert(obj_request_type_valid(obj_request->type));
1691 switch (obj_request->type) {
9969ebc5
AE
1692 case OBJ_REQUEST_NODATA:
1693 break; /* Nothing to do */
bf0d5f50
AE
1694 case OBJ_REQUEST_BIO:
1695 if (obj_request->bio_list)
1696 bio_chain_put(obj_request->bio_list);
1697 break;
788e2df3
AE
1698 case OBJ_REQUEST_PAGES:
1699 if (obj_request->pages)
1700 ceph_release_page_vector(obj_request->pages,
1701 obj_request->page_count);
1702 break;
bf0d5f50
AE
1703 }
1704
1705 kfree(obj_request);
1706}
1707
1708/*
1709 * Caller is responsible for filling in the list of object requests
1710 * that comprises the image request, and the Linux request pointer
1711 * (if there is one).
1712 */
cc344fa1
AE
1713static struct rbd_img_request *rbd_img_request_create(
1714 struct rbd_device *rbd_dev,
bf0d5f50 1715 u64 offset, u64 length,
9849e986
AE
1716 bool write_request,
1717 bool child_request)
bf0d5f50
AE
1718{
1719 struct rbd_img_request *img_request;
bf0d5f50
AE
1720
1721 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1722 if (!img_request)
1723 return NULL;
1724
1725 if (write_request) {
1726 down_read(&rbd_dev->header_rwsem);
812164f8 1727 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1728 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1729 }
1730
1731 img_request->rq = NULL;
1732 img_request->rbd_dev = rbd_dev;
1733 img_request->offset = offset;
1734 img_request->length = length;
0c425248
AE
1735 img_request->flags = 0;
1736 if (write_request) {
1737 img_request_write_set(img_request);
468521c1 1738 img_request->snapc = rbd_dev->header.snapc;
0c425248 1739 } else {
bf0d5f50 1740 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1741 }
9849e986
AE
1742 if (child_request)
1743 img_request_child_set(img_request);
d0b2e944
AE
1744 if (rbd_dev->parent_spec)
1745 img_request_layered_set(img_request);
bf0d5f50
AE
1746 spin_lock_init(&img_request->completion_lock);
1747 img_request->next_completion = 0;
1748 img_request->callback = NULL;
a5a337d4 1749 img_request->result = 0;
bf0d5f50
AE
1750 img_request->obj_request_count = 0;
1751 INIT_LIST_HEAD(&img_request->obj_requests);
1752 kref_init(&img_request->kref);
1753
1754 rbd_img_request_get(img_request); /* Avoid a warning */
1755 rbd_img_request_put(img_request); /* TEMPORARY */
1756
37206ee5
AE
1757 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1758 write_request ? "write" : "read", offset, length,
1759 img_request);
1760
bf0d5f50
AE
1761 return img_request;
1762}
1763
1764static void rbd_img_request_destroy(struct kref *kref)
1765{
1766 struct rbd_img_request *img_request;
1767 struct rbd_obj_request *obj_request;
1768 struct rbd_obj_request *next_obj_request;
1769
1770 img_request = container_of(kref, struct rbd_img_request, kref);
1771
37206ee5
AE
1772 dout("%s: img %p\n", __func__, img_request);
1773
bf0d5f50
AE
1774 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1775 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1776 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1777
0c425248 1778 if (img_request_write_test(img_request))
812164f8 1779 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1780
8b3e1a56
AE
1781 if (img_request_child_test(img_request))
1782 rbd_obj_request_put(img_request->obj_request);
1783
bf0d5f50
AE
1784 kfree(img_request);
1785}
1786
1217857f
AE
1787static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1788{
6365d33a 1789 struct rbd_img_request *img_request;
1217857f
AE
1790 unsigned int xferred;
1791 int result;
8b3e1a56 1792 bool more;
1217857f 1793
6365d33a
AE
1794 rbd_assert(obj_request_img_data_test(obj_request));
1795 img_request = obj_request->img_request;
1796
1217857f
AE
1797 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1798 xferred = (unsigned int)obj_request->xferred;
1799 result = obj_request->result;
1800 if (result) {
1801 struct rbd_device *rbd_dev = img_request->rbd_dev;
1802
1803 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1804 img_request_write_test(img_request) ? "write" : "read",
1805 obj_request->length, obj_request->img_offset,
1806 obj_request->offset);
1807 rbd_warn(rbd_dev, " result %d xferred %x\n",
1808 result, xferred);
1809 if (!img_request->result)
1810 img_request->result = result;
1811 }
1812
f1a4739f
AE
1813 /* Image object requests don't own their page array */
1814
1815 if (obj_request->type == OBJ_REQUEST_PAGES) {
1816 obj_request->pages = NULL;
1817 obj_request->page_count = 0;
1818 }
1819
8b3e1a56
AE
1820 if (img_request_child_test(img_request)) {
1821 rbd_assert(img_request->obj_request != NULL);
1822 more = obj_request->which < img_request->obj_request_count - 1;
1823 } else {
1824 rbd_assert(img_request->rq != NULL);
1825 more = blk_end_request(img_request->rq, result, xferred);
1826 }
1827
1828 return more;
1217857f
AE
1829}
1830
2169238d
AE
1831static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1832{
1833 struct rbd_img_request *img_request;
1834 u32 which = obj_request->which;
1835 bool more = true;
1836
6365d33a 1837 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1838 img_request = obj_request->img_request;
1839
1840 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1841 rbd_assert(img_request != NULL);
2169238d
AE
1842 rbd_assert(img_request->obj_request_count > 0);
1843 rbd_assert(which != BAD_WHICH);
1844 rbd_assert(which < img_request->obj_request_count);
1845 rbd_assert(which >= img_request->next_completion);
1846
1847 spin_lock_irq(&img_request->completion_lock);
1848 if (which != img_request->next_completion)
1849 goto out;
1850
1851 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1852 rbd_assert(more);
1853 rbd_assert(which < img_request->obj_request_count);
1854
1855 if (!obj_request_done_test(obj_request))
1856 break;
1217857f 1857 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1858 which++;
1859 }
1860
1861 rbd_assert(more ^ (which == img_request->obj_request_count));
1862 img_request->next_completion = which;
1863out:
1864 spin_unlock_irq(&img_request->completion_lock);
1865
1866 if (!more)
1867 rbd_img_request_complete(img_request);
1868}
1869
f1a4739f
AE
1870/*
1871 * Split up an image request into one or more object requests, each
1872 * to a different object. The "type" parameter indicates whether
1873 * "data_desc" is the pointer to the head of a list of bio
1874 * structures, or the base of a page array. In either case this
1875 * function assumes data_desc describes memory sufficient to hold
1876 * all data described by the image request.
1877 */
1878static int rbd_img_request_fill(struct rbd_img_request *img_request,
1879 enum obj_request_type type,
1880 void *data_desc)
bf0d5f50
AE
1881{
1882 struct rbd_device *rbd_dev = img_request->rbd_dev;
1883 struct rbd_obj_request *obj_request = NULL;
1884 struct rbd_obj_request *next_obj_request;
0c425248 1885 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
1886 struct bio *bio_list;
1887 unsigned int bio_offset = 0;
1888 struct page **pages;
7da22d29 1889 u64 img_offset;
bf0d5f50
AE
1890 u64 resid;
1891 u16 opcode;
1892
f1a4739f
AE
1893 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1894 (int)type, data_desc);
37206ee5 1895
430c28c3 1896 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 1897 img_offset = img_request->offset;
bf0d5f50 1898 resid = img_request->length;
4dda41d3 1899 rbd_assert(resid > 0);
f1a4739f
AE
1900
1901 if (type == OBJ_REQUEST_BIO) {
1902 bio_list = data_desc;
1903 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1904 } else {
1905 rbd_assert(type == OBJ_REQUEST_PAGES);
1906 pages = data_desc;
1907 }
1908
bf0d5f50 1909 while (resid) {
2fa12320 1910 struct ceph_osd_request *osd_req;
bf0d5f50 1911 const char *object_name;
bf0d5f50
AE
1912 u64 offset;
1913 u64 length;
1914
7da22d29 1915 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
1916 if (!object_name)
1917 goto out_unwind;
7da22d29
AE
1918 offset = rbd_segment_offset(rbd_dev, img_offset);
1919 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 1920 obj_request = rbd_obj_request_create(object_name,
f1a4739f 1921 offset, length, type);
bf0d5f50
AE
1922 kfree(object_name); /* object request has its own copy */
1923 if (!obj_request)
1924 goto out_unwind;
1925
f1a4739f
AE
1926 if (type == OBJ_REQUEST_BIO) {
1927 unsigned int clone_size;
1928
1929 rbd_assert(length <= (u64)UINT_MAX);
1930 clone_size = (unsigned int)length;
1931 obj_request->bio_list =
1932 bio_chain_clone_range(&bio_list,
1933 &bio_offset,
1934 clone_size,
1935 GFP_ATOMIC);
1936 if (!obj_request->bio_list)
1937 goto out_partial;
1938 } else {
1939 unsigned int page_count;
1940
1941 obj_request->pages = pages;
1942 page_count = (u32)calc_pages_for(offset, length);
1943 obj_request->page_count = page_count;
1944 if ((offset + length) & ~PAGE_MASK)
1945 page_count--; /* more on last page */
1946 pages += page_count;
1947 }
bf0d5f50 1948
2fa12320
AE
1949 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1950 obj_request);
1951 if (!osd_req)
bf0d5f50 1952 goto out_partial;
2fa12320 1953 obj_request->osd_req = osd_req;
2169238d 1954 obj_request->callback = rbd_img_obj_callback;
430c28c3 1955
2fa12320
AE
1956 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1957 0, 0);
f1a4739f
AE
1958 if (type == OBJ_REQUEST_BIO)
1959 osd_req_op_extent_osd_data_bio(osd_req, 0,
1960 obj_request->bio_list, length);
1961 else
1962 osd_req_op_extent_osd_data_pages(osd_req, 0,
1963 obj_request->pages, length,
1964 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
1965
1966 if (write_request)
1967 rbd_osd_req_format_write(obj_request);
1968 else
1969 rbd_osd_req_format_read(obj_request);
430c28c3 1970
7da22d29 1971 obj_request->img_offset = img_offset;
bf0d5f50
AE
1972 rbd_img_obj_request_add(img_request, obj_request);
1973
7da22d29 1974 img_offset += length;
bf0d5f50
AE
1975 resid -= length;
1976 }
1977
1978 return 0;
1979
1980out_partial:
1981 rbd_obj_request_put(obj_request);
1982out_unwind:
1983 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1984 rbd_obj_request_put(obj_request);
1985
1986 return -ENOMEM;
1987}
1988
0eefd470
AE
1989static void
1990rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1991{
1992 struct rbd_img_request *img_request;
1993 struct rbd_device *rbd_dev;
1994 u64 length;
1995 u32 page_count;
1996
1997 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1998 rbd_assert(obj_request_img_data_test(obj_request));
1999 img_request = obj_request->img_request;
2000 rbd_assert(img_request);
2001
2002 rbd_dev = img_request->rbd_dev;
2003 rbd_assert(rbd_dev);
2004 length = (u64)1 << rbd_dev->header.obj_order;
2005 page_count = (u32)calc_pages_for(0, length);
2006
2007 rbd_assert(obj_request->copyup_pages);
2008 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2009 obj_request->copyup_pages = NULL;
2010
2011 /*
2012 * We want the transfer count to reflect the size of the
2013 * original write request. There is no such thing as a
2014 * successful short write, so if the request was successful
2015 * we can just set it to the originally-requested length.
2016 */
2017 if (!obj_request->result)
2018 obj_request->xferred = obj_request->length;
2019
2020 /* Finish up with the normal image object callback */
2021
2022 rbd_img_obj_callback(obj_request);
2023}
2024
3d7efd18
AE
2025static void
2026rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2027{
2028 struct rbd_obj_request *orig_request;
0eefd470
AE
2029 struct ceph_osd_request *osd_req;
2030 struct ceph_osd_client *osdc;
2031 struct rbd_device *rbd_dev;
3d7efd18 2032 struct page **pages;
3d7efd18
AE
2033 int result;
2034 u64 obj_size;
2035 u64 xferred;
2036
2037 rbd_assert(img_request_child_test(img_request));
2038
2039 /* First get what we need from the image request */
2040
2041 pages = img_request->copyup_pages;
2042 rbd_assert(pages != NULL);
2043 img_request->copyup_pages = NULL;
2044
2045 orig_request = img_request->obj_request;
2046 rbd_assert(orig_request != NULL);
0eefd470 2047 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2048 result = img_request->result;
2049 obj_size = img_request->length;
2050 xferred = img_request->xferred;
2051
0eefd470
AE
2052 rbd_dev = img_request->rbd_dev;
2053 rbd_assert(rbd_dev);
2054 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2055
3d7efd18
AE
2056 rbd_img_request_put(img_request);
2057
0eefd470
AE
2058 if (result)
2059 goto out_err;
2060
2061 /* Allocate the new copyup osd request for the original request */
2062
2063 result = -ENOMEM;
2064 rbd_assert(!orig_request->osd_req);
2065 osd_req = rbd_osd_req_create_copyup(orig_request);
2066 if (!osd_req)
2067 goto out_err;
2068 orig_request->osd_req = osd_req;
2069 orig_request->copyup_pages = pages;
3d7efd18 2070
0eefd470 2071 /* Initialize the copyup op */
3d7efd18 2072
0eefd470
AE
2073 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2074 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2075 false, false);
3d7efd18 2076
0eefd470
AE
2077 /* Then the original write request op */
2078
2079 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2080 orig_request->offset,
2081 orig_request->length, 0, 0);
2082 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2083 orig_request->length);
2084
2085 rbd_osd_req_format_write(orig_request);
2086
2087 /* All set, send it off. */
2088
2089 orig_request->callback = rbd_img_obj_copyup_callback;
2090 osdc = &rbd_dev->rbd_client->client->osdc;
2091 result = rbd_obj_request_submit(osdc, orig_request);
2092 if (!result)
2093 return;
2094out_err:
2095 /* Record the error code and complete the request */
2096
2097 orig_request->result = result;
2098 orig_request->xferred = 0;
2099 obj_request_done_set(orig_request);
2100 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2101}
2102
2103/*
2104 * Read from the parent image the range of data that covers the
2105 * entire target of the given object request. This is used for
2106 * satisfying a layered image write request when the target of an
2107 * object request from the image request does not exist.
2108 *
2109 * A page array big enough to hold the returned data is allocated
2110 * and supplied to rbd_img_request_fill() as the "data descriptor."
2111 * When the read completes, this page array will be transferred to
2112 * the original object request for the copyup operation.
2113 *
2114 * If an error occurs, record it as the result of the original
2115 * object request and mark it done so it gets completed.
2116 */
2117static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2118{
2119 struct rbd_img_request *img_request = NULL;
2120 struct rbd_img_request *parent_request = NULL;
2121 struct rbd_device *rbd_dev;
2122 u64 img_offset;
2123 u64 length;
2124 struct page **pages = NULL;
2125 u32 page_count;
2126 int result;
2127
2128 rbd_assert(obj_request_img_data_test(obj_request));
2129 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2130
2131 img_request = obj_request->img_request;
2132 rbd_assert(img_request != NULL);
2133 rbd_dev = img_request->rbd_dev;
2134 rbd_assert(rbd_dev->parent != NULL);
2135
0eefd470
AE
2136 /*
2137 * First things first. The original osd request is of no
2138 * use to use any more, we'll need a new one that can hold
2139 * the two ops in a copyup request. We'll get that later,
2140 * but for now we can release the old one.
2141 */
2142 rbd_osd_req_destroy(obj_request->osd_req);
2143 obj_request->osd_req = NULL;
2144
3d7efd18
AE
2145 /*
2146 * Determine the byte range covered by the object in the
2147 * child image to which the original request was to be sent.
2148 */
2149 img_offset = obj_request->img_offset - obj_request->offset;
2150 length = (u64)1 << rbd_dev->header.obj_order;
2151
a9e8ba2c
AE
2152 /*
2153 * There is no defined parent data beyond the parent
2154 * overlap, so limit what we read at that boundary if
2155 * necessary.
2156 */
2157 if (img_offset + length > rbd_dev->parent_overlap) {
2158 rbd_assert(img_offset < rbd_dev->parent_overlap);
2159 length = rbd_dev->parent_overlap - img_offset;
2160 }
2161
3d7efd18
AE
2162 /*
2163 * Allocate a page array big enough to receive the data read
2164 * from the parent.
2165 */
2166 page_count = (u32)calc_pages_for(0, length);
2167 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2168 if (IS_ERR(pages)) {
2169 result = PTR_ERR(pages);
2170 pages = NULL;
2171 goto out_err;
2172 }
2173
2174 result = -ENOMEM;
2175 parent_request = rbd_img_request_create(rbd_dev->parent,
2176 img_offset, length,
2177 false, true);
2178 if (!parent_request)
2179 goto out_err;
2180 rbd_obj_request_get(obj_request);
2181 parent_request->obj_request = obj_request;
2182
2183 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2184 if (result)
2185 goto out_err;
2186 parent_request->copyup_pages = pages;
2187
2188 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2189 result = rbd_img_request_submit(parent_request);
2190 if (!result)
2191 return 0;
2192
2193 parent_request->copyup_pages = NULL;
2194 parent_request->obj_request = NULL;
2195 rbd_obj_request_put(obj_request);
2196out_err:
2197 if (pages)
2198 ceph_release_page_vector(pages, page_count);
2199 if (parent_request)
2200 rbd_img_request_put(parent_request);
2201 obj_request->result = result;
2202 obj_request->xferred = 0;
2203 obj_request_done_set(obj_request);
2204
2205 return result;
2206}
2207
c5b5ef6c
AE
2208static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2209{
c5b5ef6c
AE
2210 struct rbd_obj_request *orig_request;
2211 int result;
2212
2213 rbd_assert(!obj_request_img_data_test(obj_request));
2214
2215 /*
2216 * All we need from the object request is the original
2217 * request and the result of the STAT op. Grab those, then
2218 * we're done with the request.
2219 */
2220 orig_request = obj_request->obj_request;
2221 obj_request->obj_request = NULL;
2222 rbd_assert(orig_request);
2223 rbd_assert(orig_request->img_request);
2224
2225 result = obj_request->result;
2226 obj_request->result = 0;
2227
2228 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2229 obj_request, orig_request, result,
2230 obj_request->xferred, obj_request->length);
2231 rbd_obj_request_put(obj_request);
2232
2233 rbd_assert(orig_request);
2234 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2235
2236 /*
2237 * Our only purpose here is to determine whether the object
2238 * exists, and we don't want to treat the non-existence as
2239 * an error. If something else comes back, transfer the
2240 * error to the original request and complete it now.
2241 */
2242 if (!result) {
2243 obj_request_existence_set(orig_request, true);
2244 } else if (result == -ENOENT) {
2245 obj_request_existence_set(orig_request, false);
2246 } else if (result) {
2247 orig_request->result = result;
3d7efd18 2248 goto out;
c5b5ef6c
AE
2249 }
2250
2251 /*
2252 * Resubmit the original request now that we have recorded
2253 * whether the target object exists.
2254 */
b454e36d 2255 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2256out:
c5b5ef6c
AE
2257 if (orig_request->result)
2258 rbd_obj_request_complete(orig_request);
2259 rbd_obj_request_put(orig_request);
2260}
2261
2262static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2263{
2264 struct rbd_obj_request *stat_request;
2265 struct rbd_device *rbd_dev;
2266 struct ceph_osd_client *osdc;
2267 struct page **pages = NULL;
2268 u32 page_count;
2269 size_t size;
2270 int ret;
2271
2272 /*
2273 * The response data for a STAT call consists of:
2274 * le64 length;
2275 * struct {
2276 * le32 tv_sec;
2277 * le32 tv_nsec;
2278 * } mtime;
2279 */
2280 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2281 page_count = (u32)calc_pages_for(0, size);
2282 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2283 if (IS_ERR(pages))
2284 return PTR_ERR(pages);
2285
2286 ret = -ENOMEM;
2287 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2288 OBJ_REQUEST_PAGES);
2289 if (!stat_request)
2290 goto out;
2291
2292 rbd_obj_request_get(obj_request);
2293 stat_request->obj_request = obj_request;
2294 stat_request->pages = pages;
2295 stat_request->page_count = page_count;
2296
2297 rbd_assert(obj_request->img_request);
2298 rbd_dev = obj_request->img_request->rbd_dev;
2299 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2300 stat_request);
2301 if (!stat_request->osd_req)
2302 goto out;
2303 stat_request->callback = rbd_img_obj_exists_callback;
2304
2305 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2306 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2307 false, false);
9d4df01f 2308 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2309
2310 osdc = &rbd_dev->rbd_client->client->osdc;
2311 ret = rbd_obj_request_submit(osdc, stat_request);
2312out:
2313 if (ret)
2314 rbd_obj_request_put(obj_request);
2315
2316 return ret;
2317}
2318
b454e36d
AE
2319static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2320{
2321 struct rbd_img_request *img_request;
a9e8ba2c 2322 struct rbd_device *rbd_dev;
3d7efd18 2323 bool known;
b454e36d
AE
2324
2325 rbd_assert(obj_request_img_data_test(obj_request));
2326
2327 img_request = obj_request->img_request;
2328 rbd_assert(img_request);
a9e8ba2c 2329 rbd_dev = img_request->rbd_dev;
b454e36d 2330
b454e36d 2331 /*
a9e8ba2c
AE
2332 * Only writes to layered images need special handling.
2333 * Reads and non-layered writes are simple object requests.
2334 * Layered writes that start beyond the end of the overlap
2335 * with the parent have no parent data, so they too are
2336 * simple object requests. Finally, if the target object is
2337 * known to already exist, its parent data has already been
2338 * copied, so a write to the object can also be handled as a
2339 * simple object request.
b454e36d
AE
2340 */
2341 if (!img_request_write_test(img_request) ||
2342 !img_request_layered_test(img_request) ||
a9e8ba2c 2343 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2344 ((known = obj_request_known_test(obj_request)) &&
2345 obj_request_exists_test(obj_request))) {
b454e36d
AE
2346
2347 struct rbd_device *rbd_dev;
2348 struct ceph_osd_client *osdc;
2349
2350 rbd_dev = obj_request->img_request->rbd_dev;
2351 osdc = &rbd_dev->rbd_client->client->osdc;
2352
2353 return rbd_obj_request_submit(osdc, obj_request);
2354 }
2355
2356 /*
3d7efd18
AE
2357 * It's a layered write. The target object might exist but
2358 * we may not know that yet. If we know it doesn't exist,
2359 * start by reading the data for the full target object from
2360 * the parent so we can use it for a copyup to the target.
b454e36d 2361 */
3d7efd18
AE
2362 if (known)
2363 return rbd_img_obj_parent_read_full(obj_request);
2364
2365 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2366
2367 return rbd_img_obj_exists_submit(obj_request);
2368}
2369
bf0d5f50
AE
2370static int rbd_img_request_submit(struct rbd_img_request *img_request)
2371{
bf0d5f50 2372 struct rbd_obj_request *obj_request;
46faeed4 2373 struct rbd_obj_request *next_obj_request;
bf0d5f50 2374
37206ee5 2375 dout("%s: img %p\n", __func__, img_request);
46faeed4 2376 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2377 int ret;
2378
b454e36d 2379 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2380 if (ret)
2381 return ret;
bf0d5f50
AE
2382 }
2383
2384 return 0;
2385}
8b3e1a56
AE
2386
2387static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2388{
2389 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2390 struct rbd_device *rbd_dev;
2391 u64 obj_end;
8b3e1a56
AE
2392
2393 rbd_assert(img_request_child_test(img_request));
2394
2395 obj_request = img_request->obj_request;
a9e8ba2c
AE
2396 rbd_assert(obj_request);
2397 rbd_assert(obj_request->img_request);
2398
8b3e1a56 2399 obj_request->result = img_request->result;
a9e8ba2c
AE
2400 if (obj_request->result)
2401 goto out;
2402
2403 /*
2404 * We need to zero anything beyond the parent overlap
2405 * boundary. Since rbd_img_obj_request_read_callback()
2406 * will zero anything beyond the end of a short read, an
2407 * easy way to do this is to pretend the data from the
2408 * parent came up short--ending at the overlap boundary.
2409 */
2410 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2411 obj_end = obj_request->img_offset + obj_request->length;
2412 rbd_dev = obj_request->img_request->rbd_dev;
2413 if (obj_end > rbd_dev->parent_overlap) {
2414 u64 xferred = 0;
2415
2416 if (obj_request->img_offset < rbd_dev->parent_overlap)
2417 xferred = rbd_dev->parent_overlap -
2418 obj_request->img_offset;
8b3e1a56 2419
a9e8ba2c
AE
2420 obj_request->xferred = min(img_request->xferred, xferred);
2421 } else {
2422 obj_request->xferred = img_request->xferred;
2423 }
2424out:
8b3e1a56
AE
2425 rbd_img_obj_request_read_callback(obj_request);
2426 rbd_obj_request_complete(obj_request);
2427}
2428
2429static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2430{
2431 struct rbd_device *rbd_dev;
2432 struct rbd_img_request *img_request;
2433 int result;
2434
2435 rbd_assert(obj_request_img_data_test(obj_request));
2436 rbd_assert(obj_request->img_request != NULL);
2437 rbd_assert(obj_request->result == (s32) -ENOENT);
2438 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2439
2440 rbd_dev = obj_request->img_request->rbd_dev;
2441 rbd_assert(rbd_dev->parent != NULL);
2442 /* rbd_read_finish(obj_request, obj_request->length); */
2443 img_request = rbd_img_request_create(rbd_dev->parent,
2444 obj_request->img_offset,
2445 obj_request->length,
2446 false, true);
2447 result = -ENOMEM;
2448 if (!img_request)
2449 goto out_err;
2450
2451 rbd_obj_request_get(obj_request);
2452 img_request->obj_request = obj_request;
2453
f1a4739f
AE
2454 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2455 obj_request->bio_list);
8b3e1a56
AE
2456 if (result)
2457 goto out_err;
2458
2459 img_request->callback = rbd_img_parent_read_callback;
2460 result = rbd_img_request_submit(img_request);
2461 if (result)
2462 goto out_err;
2463
2464 return;
2465out_err:
2466 if (img_request)
2467 rbd_img_request_put(img_request);
2468 obj_request->result = result;
2469 obj_request->xferred = 0;
2470 obj_request_done_set(obj_request);
2471}
bf0d5f50 2472
cf81b60e 2473static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
b8d70035
AE
2474 u64 ver, u64 notify_id)
2475{
2476 struct rbd_obj_request *obj_request;
2169238d 2477 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2478 int ret;
2479
2480 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2481 OBJ_REQUEST_NODATA);
2482 if (!obj_request)
2483 return -ENOMEM;
2484
2485 ret = -ENOMEM;
430c28c3 2486 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2487 if (!obj_request->osd_req)
2488 goto out;
2169238d 2489 obj_request->callback = rbd_obj_request_put;
b8d70035 2490
c99d2d4a
AE
2491 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2492 notify_id, ver, 0);
9d4df01f 2493 rbd_osd_req_format_read(obj_request);
430c28c3 2494
b8d70035 2495 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2496out:
cf81b60e
AE
2497 if (ret)
2498 rbd_obj_request_put(obj_request);
b8d70035
AE
2499
2500 return ret;
2501}
2502
2503static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2504{
2505 struct rbd_device *rbd_dev = (struct rbd_device *)data;
2506 u64 hver;
b8d70035
AE
2507
2508 if (!rbd_dev)
2509 return;
2510
37206ee5 2511 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
b8d70035
AE
2512 rbd_dev->header_name, (unsigned long long) notify_id,
2513 (unsigned int) opcode);
522a0cc0 2514 (void)rbd_dev_refresh(rbd_dev, &hver);
b8d70035 2515
cf81b60e 2516 rbd_obj_notify_ack(rbd_dev, hver, notify_id);
b8d70035
AE
2517}
2518
9969ebc5
AE
2519/*
2520 * Request sync osd watch/unwatch. The value of "start" determines
2521 * whether a watch request is being initiated or torn down.
2522 */
2523static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2524{
2525 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2526 struct rbd_obj_request *obj_request;
9969ebc5
AE
2527 int ret;
2528
2529 rbd_assert(start ^ !!rbd_dev->watch_event);
2530 rbd_assert(start ^ !!rbd_dev->watch_request);
2531
2532 if (start) {
3c663bbd 2533 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2534 &rbd_dev->watch_event);
2535 if (ret < 0)
2536 return ret;
8eb87565 2537 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2538 }
2539
2540 ret = -ENOMEM;
2541 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2542 OBJ_REQUEST_NODATA);
2543 if (!obj_request)
2544 goto out_cancel;
2545
430c28c3
AE
2546 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2547 if (!obj_request->osd_req)
2548 goto out_cancel;
2549
8eb87565 2550 if (start)
975241af 2551 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2552 else
6977c3f9 2553 ceph_osdc_unregister_linger_request(osdc,
975241af 2554 rbd_dev->watch_request->osd_req);
2169238d
AE
2555
2556 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2557 rbd_dev->watch_event->cookie,
2558 rbd_dev->header.obj_version, start);
9d4df01f 2559 rbd_osd_req_format_write(obj_request);
2169238d 2560
9969ebc5
AE
2561 ret = rbd_obj_request_submit(osdc, obj_request);
2562 if (ret)
2563 goto out_cancel;
2564 ret = rbd_obj_request_wait(obj_request);
2565 if (ret)
2566 goto out_cancel;
9969ebc5
AE
2567 ret = obj_request->result;
2568 if (ret)
2569 goto out_cancel;
2570
8eb87565
AE
2571 /*
2572 * A watch request is set to linger, so the underlying osd
2573 * request won't go away until we unregister it. We retain
2574 * a pointer to the object request during that time (in
2575 * rbd_dev->watch_request), so we'll keep a reference to
2576 * it. We'll drop that reference (below) after we've
2577 * unregistered it.
2578 */
2579 if (start) {
2580 rbd_dev->watch_request = obj_request;
2581
2582 return 0;
2583 }
2584
2585 /* We have successfully torn down the watch request */
2586
2587 rbd_obj_request_put(rbd_dev->watch_request);
2588 rbd_dev->watch_request = NULL;
9969ebc5
AE
2589out_cancel:
2590 /* Cancel the event if we're tearing down, or on error */
2591 ceph_osdc_cancel_event(rbd_dev->watch_event);
2592 rbd_dev->watch_event = NULL;
9969ebc5
AE
2593 if (obj_request)
2594 rbd_obj_request_put(obj_request);
2595
2596 return ret;
2597}
2598
36be9a76 2599/*
f40eb349
AE
2600 * Synchronous osd object method call. Returns the number of bytes
2601 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2602 */
2603static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2604 const char *object_name,
2605 const char *class_name,
2606 const char *method_name,
4157976b 2607 const void *outbound,
36be9a76 2608 size_t outbound_size,
4157976b 2609 void *inbound,
36be9a76
AE
2610 size_t inbound_size,
2611 u64 *version)
2612{
2169238d 2613 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2614 struct rbd_obj_request *obj_request;
36be9a76
AE
2615 struct page **pages;
2616 u32 page_count;
2617 int ret;
2618
2619 /*
6010a451
AE
2620 * Method calls are ultimately read operations. The result
2621 * should placed into the inbound buffer provided. They
2622 * also supply outbound data--parameters for the object
2623 * method. Currently if this is present it will be a
2624 * snapshot id.
36be9a76 2625 */
57385b51 2626 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2627 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2628 if (IS_ERR(pages))
2629 return PTR_ERR(pages);
2630
2631 ret = -ENOMEM;
6010a451 2632 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2633 OBJ_REQUEST_PAGES);
2634 if (!obj_request)
2635 goto out;
2636
2637 obj_request->pages = pages;
2638 obj_request->page_count = page_count;
2639
430c28c3 2640 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2641 if (!obj_request->osd_req)
2642 goto out;
2643
c99d2d4a 2644 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2645 class_name, method_name);
2646 if (outbound_size) {
2647 struct ceph_pagelist *pagelist;
2648
2649 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2650 if (!pagelist)
2651 goto out;
2652
2653 ceph_pagelist_init(pagelist);
2654 ceph_pagelist_append(pagelist, outbound, outbound_size);
2655 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2656 pagelist);
2657 }
a4ce40a9
AE
2658 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2659 obj_request->pages, inbound_size,
44cd188d 2660 0, false, false);
9d4df01f 2661 rbd_osd_req_format_read(obj_request);
430c28c3 2662
36be9a76
AE
2663 ret = rbd_obj_request_submit(osdc, obj_request);
2664 if (ret)
2665 goto out;
2666 ret = rbd_obj_request_wait(obj_request);
2667 if (ret)
2668 goto out;
2669
2670 ret = obj_request->result;
2671 if (ret < 0)
2672 goto out;
57385b51
AE
2673
2674 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2675 ret = (int)obj_request->xferred;
903bb32e 2676 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2677 if (version)
2678 *version = obj_request->version;
2679out:
2680 if (obj_request)
2681 rbd_obj_request_put(obj_request);
2682 else
2683 ceph_release_page_vector(pages, page_count);
2684
2685 return ret;
2686}
2687
bf0d5f50 2688static void rbd_request_fn(struct request_queue *q)
cc344fa1 2689 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2690{
2691 struct rbd_device *rbd_dev = q->queuedata;
2692 bool read_only = rbd_dev->mapping.read_only;
2693 struct request *rq;
2694 int result;
2695
2696 while ((rq = blk_fetch_request(q))) {
2697 bool write_request = rq_data_dir(rq) == WRITE;
2698 struct rbd_img_request *img_request;
2699 u64 offset;
2700 u64 length;
2701
2702 /* Ignore any non-FS requests that filter through. */
2703
2704 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2705 dout("%s: non-fs request type %d\n", __func__,
2706 (int) rq->cmd_type);
2707 __blk_end_request_all(rq, 0);
2708 continue;
2709 }
2710
2711 /* Ignore/skip any zero-length requests */
2712
2713 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2714 length = (u64) blk_rq_bytes(rq);
2715
2716 if (!length) {
2717 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2718 __blk_end_request_all(rq, 0);
2719 continue;
2720 }
2721
2722 spin_unlock_irq(q->queue_lock);
2723
2724 /* Disallow writes to a read-only device */
2725
2726 if (write_request) {
2727 result = -EROFS;
2728 if (read_only)
2729 goto end_request;
2730 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2731 }
2732
6d292906
AE
2733 /*
2734 * Quit early if the mapped snapshot no longer
2735 * exists. It's still possible the snapshot will
2736 * have disappeared by the time our request arrives
2737 * at the osd, but there's no sense in sending it if
2738 * we already know.
2739 */
2740 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2741 dout("request for non-existent snapshot");
2742 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2743 result = -ENXIO;
2744 goto end_request;
2745 }
2746
bf0d5f50 2747 result = -EINVAL;
c0cd10db
AE
2748 if (offset && length > U64_MAX - offset + 1) {
2749 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2750 offset, length);
bf0d5f50 2751 goto end_request; /* Shouldn't happen */
c0cd10db 2752 }
bf0d5f50
AE
2753
2754 result = -ENOMEM;
2755 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2756 write_request, false);
bf0d5f50
AE
2757 if (!img_request)
2758 goto end_request;
2759
2760 img_request->rq = rq;
2761
f1a4739f
AE
2762 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2763 rq->bio);
bf0d5f50
AE
2764 if (!result)
2765 result = rbd_img_request_submit(img_request);
2766 if (result)
2767 rbd_img_request_put(img_request);
2768end_request:
2769 spin_lock_irq(q->queue_lock);
2770 if (result < 0) {
7da22d29
AE
2771 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2772 write_request ? "write" : "read",
2773 length, offset, result);
2774
bf0d5f50
AE
2775 __blk_end_request_all(rq, result);
2776 }
2777 }
2778}
2779
602adf40
YS
2780/*
2781 * a queue callback. Makes sure that we don't create a bio that spans across
2782 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2783 * which we handle later at bio_chain_clone_range()
602adf40
YS
2784 */
2785static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2786 struct bio_vec *bvec)
2787{
2788 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2789 sector_t sector_offset;
2790 sector_t sectors_per_obj;
2791 sector_t obj_sector_offset;
2792 int ret;
2793
2794 /*
2795 * Find how far into its rbd object the partition-relative
2796 * bio start sector is to offset relative to the enclosing
2797 * device.
2798 */
2799 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2800 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2801 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2802
2803 /*
2804 * Compute the number of bytes from that offset to the end
2805 * of the object. Account for what's already used by the bio.
2806 */
2807 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2808 if (ret > bmd->bi_size)
2809 ret -= bmd->bi_size;
2810 else
2811 ret = 0;
2812
2813 /*
2814 * Don't send back more than was asked for. And if the bio
2815 * was empty, let the whole thing through because: "Note
2816 * that a block device *must* allow a single page to be
2817 * added to an empty bio."
2818 */
2819 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2820 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2821 ret = (int) bvec->bv_len;
2822
2823 return ret;
602adf40
YS
2824}
2825
2826static void rbd_free_disk(struct rbd_device *rbd_dev)
2827{
2828 struct gendisk *disk = rbd_dev->disk;
2829
2830 if (!disk)
2831 return;
2832
a0cab924
AE
2833 rbd_dev->disk = NULL;
2834 if (disk->flags & GENHD_FL_UP) {
602adf40 2835 del_gendisk(disk);
a0cab924
AE
2836 if (disk->queue)
2837 blk_cleanup_queue(disk->queue);
2838 }
602adf40
YS
2839 put_disk(disk);
2840}
2841
788e2df3
AE
2842static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2843 const char *object_name,
2844 u64 offset, u64 length,
80ef15bf 2845 void *buf, u64 *version)
788e2df3
AE
2846
2847{
2169238d 2848 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2849 struct rbd_obj_request *obj_request;
788e2df3
AE
2850 struct page **pages = NULL;
2851 u32 page_count;
1ceae7ef 2852 size_t size;
788e2df3
AE
2853 int ret;
2854
2855 page_count = (u32) calc_pages_for(offset, length);
2856 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2857 if (IS_ERR(pages))
2858 ret = PTR_ERR(pages);
2859
2860 ret = -ENOMEM;
2861 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2862 OBJ_REQUEST_PAGES);
788e2df3
AE
2863 if (!obj_request)
2864 goto out;
2865
2866 obj_request->pages = pages;
2867 obj_request->page_count = page_count;
2868
430c28c3 2869 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2870 if (!obj_request->osd_req)
2871 goto out;
2872
c99d2d4a
AE
2873 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2874 offset, length, 0, 0);
406e2c9f 2875 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 2876 obj_request->pages,
44cd188d
AE
2877 obj_request->length,
2878 obj_request->offset & ~PAGE_MASK,
2879 false, false);
9d4df01f 2880 rbd_osd_req_format_read(obj_request);
430c28c3 2881
788e2df3
AE
2882 ret = rbd_obj_request_submit(osdc, obj_request);
2883 if (ret)
2884 goto out;
2885 ret = rbd_obj_request_wait(obj_request);
2886 if (ret)
2887 goto out;
2888
2889 ret = obj_request->result;
2890 if (ret < 0)
2891 goto out;
1ceae7ef
AE
2892
2893 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2894 size = (size_t) obj_request->xferred;
903bb32e 2895 ceph_copy_from_page_vector(pages, buf, 0, size);
23ed6e13
AE
2896 rbd_assert(size <= (size_t) INT_MAX);
2897 ret = (int) size;
788e2df3
AE
2898 if (version)
2899 *version = obj_request->version;
2900out:
2901 if (obj_request)
2902 rbd_obj_request_put(obj_request);
2903 else
2904 ceph_release_page_vector(pages, page_count);
2905
2906 return ret;
2907}
2908
602adf40 2909/*
4156d998
AE
2910 * Read the complete header for the given rbd device.
2911 *
2912 * Returns a pointer to a dynamically-allocated buffer containing
2913 * the complete and validated header. Caller can pass the address
2914 * of a variable that will be filled in with the version of the
2915 * header object at the time it was read.
2916 *
2917 * Returns a pointer-coded errno if a failure occurs.
602adf40 2918 */
4156d998
AE
2919static struct rbd_image_header_ondisk *
2920rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 2921{
4156d998 2922 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2923 u32 snap_count = 0;
4156d998
AE
2924 u64 names_size = 0;
2925 u32 want_count;
2926 int ret;
602adf40 2927
00f1f36f 2928 /*
4156d998
AE
2929 * The complete header will include an array of its 64-bit
2930 * snapshot ids, followed by the names of those snapshots as
2931 * a contiguous block of NUL-terminated strings. Note that
2932 * the number of snapshots could change by the time we read
2933 * it in, in which case we re-read it.
00f1f36f 2934 */
4156d998
AE
2935 do {
2936 size_t size;
2937
2938 kfree(ondisk);
2939
2940 size = sizeof (*ondisk);
2941 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2942 size += names_size;
2943 ondisk = kmalloc(size, GFP_KERNEL);
2944 if (!ondisk)
2945 return ERR_PTR(-ENOMEM);
2946
788e2df3 2947 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
80ef15bf 2948 0, size, ondisk, version);
4156d998
AE
2949 if (ret < 0)
2950 goto out_err;
c0cd10db 2951 if ((size_t)ret < size) {
4156d998 2952 ret = -ENXIO;
06ecc6cb
AE
2953 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2954 size, ret);
4156d998
AE
2955 goto out_err;
2956 }
2957 if (!rbd_dev_ondisk_valid(ondisk)) {
2958 ret = -ENXIO;
06ecc6cb 2959 rbd_warn(rbd_dev, "invalid header");
4156d998 2960 goto out_err;
81e759fb 2961 }
602adf40 2962
4156d998
AE
2963 names_size = le64_to_cpu(ondisk->snap_names_len);
2964 want_count = snap_count;
2965 snap_count = le32_to_cpu(ondisk->snap_count);
2966 } while (snap_count != want_count);
00f1f36f 2967
4156d998 2968 return ondisk;
00f1f36f 2969
4156d998
AE
2970out_err:
2971 kfree(ondisk);
2972
2973 return ERR_PTR(ret);
2974}
2975
2976/*
2977 * reload the ondisk the header
2978 */
2979static int rbd_read_header(struct rbd_device *rbd_dev,
2980 struct rbd_image_header *header)
2981{
2982 struct rbd_image_header_ondisk *ondisk;
2983 u64 ver = 0;
2984 int ret;
602adf40 2985
4156d998
AE
2986 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2987 if (IS_ERR(ondisk))
2988 return PTR_ERR(ondisk);
2989 ret = rbd_header_from_disk(header, ondisk);
2990 if (ret >= 0)
2991 header->obj_version = ver;
2992 kfree(ondisk);
2993
2994 return ret;
602adf40
YS
2995}
2996
41f38c2b 2997static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2998{
2999 struct rbd_snap *snap;
a0593290 3000 struct rbd_snap *next;
dfc5606d 3001
6087b51b
AE
3002 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3003 list_del(&snap->node);
3004 rbd_snap_destroy(snap);
3005 }
dfc5606d
YS
3006}
3007
9478554a
AE
3008static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3009{
0d7dbfce 3010 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
3011 return;
3012
e28626a0
AE
3013 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3014 sector_t size;
3015
3016 rbd_dev->mapping.size = rbd_dev->header.image_size;
3017 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3018 dout("setting size to %llu sectors", (unsigned long long)size);
3019 set_capacity(rbd_dev->disk, size);
3020 }
9478554a
AE
3021}
3022
602adf40
YS
3023/*
3024 * only read the first part of the ondisk header, without the snaps info
3025 */
117973fb 3026static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
3027{
3028 int ret;
3029 struct rbd_image_header h;
602adf40
YS
3030
3031 ret = rbd_read_header(rbd_dev, &h);
3032 if (ret < 0)
3033 return ret;
3034
a51aa0c0
JD
3035 down_write(&rbd_dev->header_rwsem);
3036
9478554a
AE
3037 /* Update image size, and check for resize of mapped image */
3038 rbd_dev->header.image_size = h.image_size;
3039 rbd_update_mapping_size(rbd_dev);
9db4b3e3 3040
849b4260 3041 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 3042 kfree(rbd_dev->header.snap_sizes);
849b4260 3043 kfree(rbd_dev->header.snap_names);
d1d25646 3044 /* osd requests may still refer to snapc */
812164f8 3045 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 3046
b813623a
AE
3047 if (hver)
3048 *hver = h.obj_version;
a71b891b 3049 rbd_dev->header.obj_version = h.obj_version;
93a24e08 3050 rbd_dev->header.image_size = h.image_size;
602adf40
YS
3051 rbd_dev->header.snapc = h.snapc;
3052 rbd_dev->header.snap_names = h.snap_names;
3053 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260 3054 /* Free the extra copy of the object prefix */
c0cd10db
AE
3055 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3056 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
849b4260
AE
3057 kfree(h.object_prefix);
3058
304f6808 3059 ret = rbd_dev_snaps_update(rbd_dev);
dfc5606d 3060
c666601a 3061 up_write(&rbd_dev->header_rwsem);
602adf40 3062
dfc5606d 3063 return ret;
602adf40
YS
3064}
3065
117973fb 3066static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1fe5e993
AE
3067{
3068 int ret;
3069
117973fb 3070 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1fe5e993 3071 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb
AE
3072 if (rbd_dev->image_format == 1)
3073 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3074 else
3075 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1fe5e993 3076 mutex_unlock(&ctl_mutex);
d98df63e 3077 revalidate_disk(rbd_dev->disk);
522a0cc0
AE
3078 if (ret)
3079 rbd_warn(rbd_dev, "got notification but failed to "
3080 " update snaps: %d\n", ret);
1fe5e993
AE
3081
3082 return ret;
3083}
3084
602adf40
YS
3085static int rbd_init_disk(struct rbd_device *rbd_dev)
3086{
3087 struct gendisk *disk;
3088 struct request_queue *q;
593a9e7b 3089 u64 segment_size;
602adf40 3090
602adf40 3091 /* create gendisk info */
602adf40
YS
3092 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3093 if (!disk)
1fcdb8aa 3094 return -ENOMEM;
602adf40 3095
f0f8cef5 3096 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3097 rbd_dev->dev_id);
602adf40
YS
3098 disk->major = rbd_dev->major;
3099 disk->first_minor = 0;
3100 disk->fops = &rbd_bd_ops;
3101 disk->private_data = rbd_dev;
3102
bf0d5f50 3103 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3104 if (!q)
3105 goto out_disk;
029bcbd8 3106
593a9e7b
AE
3107 /* We use the default size, but let's be explicit about it. */
3108 blk_queue_physical_block_size(q, SECTOR_SIZE);
3109
029bcbd8 3110 /* set io sizes to object size */
593a9e7b
AE
3111 segment_size = rbd_obj_bytes(&rbd_dev->header);
3112 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3113 blk_queue_max_segment_size(q, segment_size);
3114 blk_queue_io_min(q, segment_size);
3115 blk_queue_io_opt(q, segment_size);
029bcbd8 3116
602adf40
YS
3117 blk_queue_merge_bvec(q, rbd_merge_bvec);
3118 disk->queue = q;
3119
3120 q->queuedata = rbd_dev;
3121
3122 rbd_dev->disk = disk;
602adf40 3123
602adf40 3124 return 0;
602adf40
YS
3125out_disk:
3126 put_disk(disk);
1fcdb8aa
AE
3127
3128 return -ENOMEM;
602adf40
YS
3129}
3130
dfc5606d
YS
3131/*
3132 sysfs
3133*/
3134
593a9e7b
AE
3135static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3136{
3137 return container_of(dev, struct rbd_device, dev);
3138}
3139
dfc5606d
YS
3140static ssize_t rbd_size_show(struct device *dev,
3141 struct device_attribute *attr, char *buf)
3142{
593a9e7b 3143 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3144
fc71d833
AE
3145 return sprintf(buf, "%llu\n",
3146 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3147}
3148
34b13184
AE
3149/*
3150 * Note this shows the features for whatever's mapped, which is not
3151 * necessarily the base image.
3152 */
3153static ssize_t rbd_features_show(struct device *dev,
3154 struct device_attribute *attr, char *buf)
3155{
3156 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3157
3158 return sprintf(buf, "0x%016llx\n",
fc71d833 3159 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3160}
3161
dfc5606d
YS
3162static ssize_t rbd_major_show(struct device *dev,
3163 struct device_attribute *attr, char *buf)
3164{
593a9e7b 3165 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3166
fc71d833
AE
3167 if (rbd_dev->major)
3168 return sprintf(buf, "%d\n", rbd_dev->major);
3169
3170 return sprintf(buf, "(none)\n");
3171
dfc5606d
YS
3172}
3173
3174static ssize_t rbd_client_id_show(struct device *dev,
3175 struct device_attribute *attr, char *buf)
602adf40 3176{
593a9e7b 3177 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3178
1dbb4399
AE
3179 return sprintf(buf, "client%lld\n",
3180 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3181}
3182
dfc5606d
YS
3183static ssize_t rbd_pool_show(struct device *dev,
3184 struct device_attribute *attr, char *buf)
602adf40 3185{
593a9e7b 3186 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3187
0d7dbfce 3188 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3189}
3190
9bb2f334
AE
3191static ssize_t rbd_pool_id_show(struct device *dev,
3192 struct device_attribute *attr, char *buf)
3193{
3194 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3195
0d7dbfce 3196 return sprintf(buf, "%llu\n",
fc71d833 3197 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3198}
3199
dfc5606d
YS
3200static ssize_t rbd_name_show(struct device *dev,
3201 struct device_attribute *attr, char *buf)
3202{
593a9e7b 3203 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3204
a92ffdf8
AE
3205 if (rbd_dev->spec->image_name)
3206 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3207
3208 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3209}
3210
589d30e0
AE
3211static ssize_t rbd_image_id_show(struct device *dev,
3212 struct device_attribute *attr, char *buf)
3213{
3214 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3215
0d7dbfce 3216 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3217}
3218
34b13184
AE
3219/*
3220 * Shows the name of the currently-mapped snapshot (or
3221 * RBD_SNAP_HEAD_NAME for the base image).
3222 */
dfc5606d
YS
3223static ssize_t rbd_snap_show(struct device *dev,
3224 struct device_attribute *attr,
3225 char *buf)
3226{
593a9e7b 3227 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3228
0d7dbfce 3229 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3230}
3231
86b00e0d
AE
3232/*
3233 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3234 * for the parent image. If there is no parent, simply shows
3235 * "(no parent image)".
3236 */
3237static ssize_t rbd_parent_show(struct device *dev,
3238 struct device_attribute *attr,
3239 char *buf)
3240{
3241 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3242 struct rbd_spec *spec = rbd_dev->parent_spec;
3243 int count;
3244 char *bufp = buf;
3245
3246 if (!spec)
3247 return sprintf(buf, "(no parent image)\n");
3248
3249 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3250 (unsigned long long) spec->pool_id, spec->pool_name);
3251 if (count < 0)
3252 return count;
3253 bufp += count;
3254
3255 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3256 spec->image_name ? spec->image_name : "(unknown)");
3257 if (count < 0)
3258 return count;
3259 bufp += count;
3260
3261 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3262 (unsigned long long) spec->snap_id, spec->snap_name);
3263 if (count < 0)
3264 return count;
3265 bufp += count;
3266
3267 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3268 if (count < 0)
3269 return count;
3270 bufp += count;
3271
3272 return (ssize_t) (bufp - buf);
3273}
3274
dfc5606d
YS
3275static ssize_t rbd_image_refresh(struct device *dev,
3276 struct device_attribute *attr,
3277 const char *buf,
3278 size_t size)
3279{
593a9e7b 3280 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3281 int ret;
602adf40 3282
117973fb 3283 ret = rbd_dev_refresh(rbd_dev, NULL);
b813623a
AE
3284
3285 return ret < 0 ? ret : size;
dfc5606d 3286}
602adf40 3287
dfc5606d 3288static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3289static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3290static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3291static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3292static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3293static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3294static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3295static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3296static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3297static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3298static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3299
3300static struct attribute *rbd_attrs[] = {
3301 &dev_attr_size.attr,
34b13184 3302 &dev_attr_features.attr,
dfc5606d
YS
3303 &dev_attr_major.attr,
3304 &dev_attr_client_id.attr,
3305 &dev_attr_pool.attr,
9bb2f334 3306 &dev_attr_pool_id.attr,
dfc5606d 3307 &dev_attr_name.attr,
589d30e0 3308 &dev_attr_image_id.attr,
dfc5606d 3309 &dev_attr_current_snap.attr,
86b00e0d 3310 &dev_attr_parent.attr,
dfc5606d 3311 &dev_attr_refresh.attr,
dfc5606d
YS
3312 NULL
3313};
3314
3315static struct attribute_group rbd_attr_group = {
3316 .attrs = rbd_attrs,
3317};
3318
3319static const struct attribute_group *rbd_attr_groups[] = {
3320 &rbd_attr_group,
3321 NULL
3322};
3323
3324static void rbd_sysfs_dev_release(struct device *dev)
3325{
3326}
3327
3328static struct device_type rbd_device_type = {
3329 .name = "rbd",
3330 .groups = rbd_attr_groups,
3331 .release = rbd_sysfs_dev_release,
3332};
3333
8b8fb99c
AE
3334static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3335{
3336 kref_get(&spec->kref);
3337
3338 return spec;
3339}
3340
3341static void rbd_spec_free(struct kref *kref);
3342static void rbd_spec_put(struct rbd_spec *spec)
3343{
3344 if (spec)
3345 kref_put(&spec->kref, rbd_spec_free);
3346}
3347
3348static struct rbd_spec *rbd_spec_alloc(void)
3349{
3350 struct rbd_spec *spec;
3351
3352 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3353 if (!spec)
3354 return NULL;
3355 kref_init(&spec->kref);
3356
8b8fb99c
AE
3357 return spec;
3358}
3359
3360static void rbd_spec_free(struct kref *kref)
3361{
3362 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3363
3364 kfree(spec->pool_name);
3365 kfree(spec->image_id);
3366 kfree(spec->image_name);
3367 kfree(spec->snap_name);
3368 kfree(spec);
3369}
3370
cc344fa1 3371static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3372 struct rbd_spec *spec)
3373{
3374 struct rbd_device *rbd_dev;
3375
3376 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3377 if (!rbd_dev)
3378 return NULL;
3379
3380 spin_lock_init(&rbd_dev->lock);
6d292906 3381 rbd_dev->flags = 0;
c53d5893
AE
3382 INIT_LIST_HEAD(&rbd_dev->node);
3383 INIT_LIST_HEAD(&rbd_dev->snaps);
3384 init_rwsem(&rbd_dev->header_rwsem);
3385
3386 rbd_dev->spec = spec;
3387 rbd_dev->rbd_client = rbdc;
3388
0903e875
AE
3389 /* Initialize the layout used for all rbd requests */
3390
3391 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3392 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3393 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3394 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3395
c53d5893
AE
3396 return rbd_dev;
3397}
3398
3399static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3400{
c53d5893
AE
3401 rbd_put_client(rbd_dev->rbd_client);
3402 rbd_spec_put(rbd_dev->spec);
3403 kfree(rbd_dev);
3404}
3405
6087b51b 3406static void rbd_snap_destroy(struct rbd_snap *snap)
dfc5606d 3407{
3e83b65b
AE
3408 kfree(snap->name);
3409 kfree(snap);
dfc5606d
YS
3410}
3411
6087b51b 3412static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
c8d18425 3413 const char *snap_name,
34b13184
AE
3414 u64 snap_id, u64 snap_size,
3415 u64 snap_features)
dfc5606d 3416{
4e891e0a 3417 struct rbd_snap *snap;
4e891e0a
AE
3418
3419 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 3420 if (!snap)
4e891e0a
AE
3421 return ERR_PTR(-ENOMEM);
3422
6e584f52 3423 snap->name = snap_name;
c8d18425
AE
3424 snap->id = snap_id;
3425 snap->size = snap_size;
34b13184 3426 snap->features = snap_features;
4e891e0a
AE
3427
3428 return snap;
dfc5606d
YS
3429}
3430
6e584f52
AE
3431/*
3432 * Returns a dynamically-allocated snapshot name if successful, or a
3433 * pointer-coded error otherwise.
3434 */
cd892126
AE
3435static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3436 u64 *snap_size, u64 *snap_features)
3437{
3438 char *snap_name;
6e584f52 3439 int i;
cd892126
AE
3440
3441 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3442
cd892126
AE
3443 /* Skip over names until we find the one we are looking for */
3444
3445 snap_name = rbd_dev->header.snap_names;
6e584f52 3446 for (i = 0; i < which; i++)
cd892126
AE
3447 snap_name += strlen(snap_name) + 1;
3448
6e584f52
AE
3449 snap_name = kstrdup(snap_name, GFP_KERNEL);
3450 if (!snap_name)
3451 return ERR_PTR(-ENOMEM);
3452
3453 *snap_size = rbd_dev->header.snap_sizes[which];
3454 *snap_features = 0; /* No features for v1 */
3455
cd892126
AE
3456 return snap_name;
3457}
3458
9d475de5
AE
3459/*
3460 * Get the size and object order for an image snapshot, or if
3461 * snap_id is CEPH_NOSNAP, gets this information for the base
3462 * image.
3463 */
3464static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3465 u8 *order, u64 *snap_size)
3466{
3467 __le64 snapid = cpu_to_le64(snap_id);
3468 int ret;
3469 struct {
3470 u8 order;
3471 __le64 size;
3472 } __attribute__ ((packed)) size_buf = { 0 };
3473
36be9a76 3474 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3475 "rbd", "get_size",
4157976b
AE
3476 &snapid, sizeof (snapid),
3477 &size_buf, sizeof (size_buf), NULL);
36be9a76 3478 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3479 if (ret < 0)
3480 return ret;
57385b51
AE
3481 if (ret < sizeof (size_buf))
3482 return -ERANGE;
9d475de5 3483
c86f86e9
AE
3484 if (order)
3485 *order = size_buf.order;
9d475de5
AE
3486 *snap_size = le64_to_cpu(size_buf.size);
3487
3488 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3489 (unsigned long long)snap_id, (unsigned int)*order,
3490 (unsigned long long)*snap_size);
9d475de5
AE
3491
3492 return 0;
3493}
3494
3495static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3496{
3497 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3498 &rbd_dev->header.obj_order,
3499 &rbd_dev->header.image_size);
3500}
3501
1e130199
AE
3502static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3503{
3504 void *reply_buf;
3505 int ret;
3506 void *p;
3507
3508 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3509 if (!reply_buf)
3510 return -ENOMEM;
3511
36be9a76 3512 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3513 "rbd", "get_object_prefix", NULL, 0,
07b2391f 3514 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 3515 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3516 if (ret < 0)
3517 goto out;
3518
3519 p = reply_buf;
3520 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3521 p + ret, NULL, GFP_NOIO);
3522 ret = 0;
1e130199
AE
3523
3524 if (IS_ERR(rbd_dev->header.object_prefix)) {
3525 ret = PTR_ERR(rbd_dev->header.object_prefix);
3526 rbd_dev->header.object_prefix = NULL;
3527 } else {
3528 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3529 }
1e130199
AE
3530out:
3531 kfree(reply_buf);
3532
3533 return ret;
3534}
3535
b1b5402a
AE
3536static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3537 u64 *snap_features)
3538{
3539 __le64 snapid = cpu_to_le64(snap_id);
3540 struct {
3541 __le64 features;
3542 __le64 incompat;
4157976b 3543 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3544 u64 incompat;
b1b5402a
AE
3545 int ret;
3546
36be9a76 3547 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3548 "rbd", "get_features",
4157976b
AE
3549 &snapid, sizeof (snapid),
3550 &features_buf, sizeof (features_buf), NULL);
36be9a76 3551 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3552 if (ret < 0)
3553 return ret;
57385b51
AE
3554 if (ret < sizeof (features_buf))
3555 return -ERANGE;
d889140c
AE
3556
3557 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3558 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3559 return -ENXIO;
d889140c 3560
b1b5402a
AE
3561 *snap_features = le64_to_cpu(features_buf.features);
3562
3563 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3564 (unsigned long long)snap_id,
3565 (unsigned long long)*snap_features,
3566 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3567
3568 return 0;
3569}
3570
3571static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3572{
3573 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3574 &rbd_dev->header.features);
3575}
3576
86b00e0d
AE
3577static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3578{
3579 struct rbd_spec *parent_spec;
3580 size_t size;
3581 void *reply_buf = NULL;
3582 __le64 snapid;
3583 void *p;
3584 void *end;
3585 char *image_id;
3586 u64 overlap;
86b00e0d
AE
3587 int ret;
3588
3589 parent_spec = rbd_spec_alloc();
3590 if (!parent_spec)
3591 return -ENOMEM;
3592
3593 size = sizeof (__le64) + /* pool_id */
3594 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3595 sizeof (__le64) + /* snap_id */
3596 sizeof (__le64); /* overlap */
3597 reply_buf = kmalloc(size, GFP_KERNEL);
3598 if (!reply_buf) {
3599 ret = -ENOMEM;
3600 goto out_err;
3601 }
3602
3603 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3604 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3605 "rbd", "get_parent",
4157976b
AE
3606 &snapid, sizeof (snapid),
3607 reply_buf, size, NULL);
36be9a76 3608 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3609 if (ret < 0)
3610 goto out_err;
3611
86b00e0d 3612 p = reply_buf;
57385b51
AE
3613 end = reply_buf + ret;
3614 ret = -ERANGE;
86b00e0d
AE
3615 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3616 if (parent_spec->pool_id == CEPH_NOPOOL)
3617 goto out; /* No parent? No problem. */
3618
0903e875
AE
3619 /* The ceph file layout needs to fit pool id in 32 bits */
3620
3621 ret = -EIO;
c0cd10db
AE
3622 if (parent_spec->pool_id > (u64)U32_MAX) {
3623 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3624 (unsigned long long)parent_spec->pool_id, U32_MAX);
57385b51 3625 goto out_err;
c0cd10db 3626 }
0903e875 3627
979ed480 3628 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3629 if (IS_ERR(image_id)) {
3630 ret = PTR_ERR(image_id);
3631 goto out_err;
3632 }
3633 parent_spec->image_id = image_id;
3634 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3635 ceph_decode_64_safe(&p, end, overlap, out_err);
3636
3637 rbd_dev->parent_overlap = overlap;
3638 rbd_dev->parent_spec = parent_spec;
3639 parent_spec = NULL; /* rbd_dev now owns this */
3640out:
3641 ret = 0;
3642out_err:
3643 kfree(reply_buf);
3644 rbd_spec_put(parent_spec);
3645
3646 return ret;
3647}
3648
cc070d59
AE
3649static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3650{
3651 struct {
3652 __le64 stripe_unit;
3653 __le64 stripe_count;
3654 } __attribute__ ((packed)) striping_info_buf = { 0 };
3655 size_t size = sizeof (striping_info_buf);
3656 void *p;
3657 u64 obj_size;
3658 u64 stripe_unit;
3659 u64 stripe_count;
3660 int ret;
3661
3662 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3663 "rbd", "get_stripe_unit_count", NULL, 0,
3664 (char *)&striping_info_buf, size, NULL);
3665 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3666 if (ret < 0)
3667 return ret;
3668 if (ret < size)
3669 return -ERANGE;
3670
3671 /*
3672 * We don't actually support the "fancy striping" feature
3673 * (STRIPINGV2) yet, but if the striping sizes are the
3674 * defaults the behavior is the same as before. So find
3675 * out, and only fail if the image has non-default values.
3676 */
3677 ret = -EINVAL;
3678 obj_size = (u64)1 << rbd_dev->header.obj_order;
3679 p = &striping_info_buf;
3680 stripe_unit = ceph_decode_64(&p);
3681 if (stripe_unit != obj_size) {
3682 rbd_warn(rbd_dev, "unsupported stripe unit "
3683 "(got %llu want %llu)",
3684 stripe_unit, obj_size);
3685 return -EINVAL;
3686 }
3687 stripe_count = ceph_decode_64(&p);
3688 if (stripe_count != 1) {
3689 rbd_warn(rbd_dev, "unsupported stripe count "
3690 "(got %llu want 1)", stripe_count);
3691 return -EINVAL;
3692 }
500d0c0f
AE
3693 rbd_dev->header.stripe_unit = stripe_unit;
3694 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3695
3696 return 0;
3697}
3698
9e15b77d
AE
3699static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3700{
3701 size_t image_id_size;
3702 char *image_id;
3703 void *p;
3704 void *end;
3705 size_t size;
3706 void *reply_buf = NULL;
3707 size_t len = 0;
3708 char *image_name = NULL;
3709 int ret;
3710
3711 rbd_assert(!rbd_dev->spec->image_name);
3712
69e7a02f
AE
3713 len = strlen(rbd_dev->spec->image_id);
3714 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3715 image_id = kmalloc(image_id_size, GFP_KERNEL);
3716 if (!image_id)
3717 return NULL;
3718
3719 p = image_id;
4157976b 3720 end = image_id + image_id_size;
57385b51 3721 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3722
3723 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3724 reply_buf = kmalloc(size, GFP_KERNEL);
3725 if (!reply_buf)
3726 goto out;
3727
36be9a76 3728 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3729 "rbd", "dir_get_name",
3730 image_id, image_id_size,
4157976b 3731 reply_buf, size, NULL);
9e15b77d
AE
3732 if (ret < 0)
3733 goto out;
3734 p = reply_buf;
f40eb349
AE
3735 end = reply_buf + ret;
3736
9e15b77d
AE
3737 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3738 if (IS_ERR(image_name))
3739 image_name = NULL;
3740 else
3741 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3742out:
3743 kfree(reply_buf);
3744 kfree(image_id);
3745
3746 return image_name;
3747}
3748
3749/*
2e9f7f1c
AE
3750 * When an rbd image has a parent image, it is identified by the
3751 * pool, image, and snapshot ids (not names). This function fills
3752 * in the names for those ids. (It's OK if we can't figure out the
3753 * name for an image id, but the pool and snapshot ids should always
3754 * exist and have names.) All names in an rbd spec are dynamically
3755 * allocated.
e1d4213f
AE
3756 *
3757 * When an image being mapped (not a parent) is probed, we have the
3758 * pool name and pool id, image name and image id, and the snapshot
3759 * name. The only thing we're missing is the snapshot id.
2e9f7f1c
AE
3760 *
3761 * The set of snapshots for an image is not known until they have
3762 * been read by rbd_dev_snaps_update(), so we can't completely fill
3763 * in this information until after that has been called.
9e15b77d 3764 */
2e9f7f1c 3765static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 3766{
2e9f7f1c
AE
3767 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3768 struct rbd_spec *spec = rbd_dev->spec;
3769 const char *pool_name;
3770 const char *image_name;
3771 const char *snap_name;
9e15b77d
AE
3772 int ret;
3773
e1d4213f
AE
3774 /*
3775 * An image being mapped will have the pool name (etc.), but
3776 * we need to look up the snapshot id.
3777 */
2e9f7f1c
AE
3778 if (spec->pool_name) {
3779 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
e1d4213f
AE
3780 struct rbd_snap *snap;
3781
2e9f7f1c 3782 snap = snap_by_name(rbd_dev, spec->snap_name);
e1d4213f
AE
3783 if (!snap)
3784 return -ENOENT;
2e9f7f1c 3785 spec->snap_id = snap->id;
e1d4213f 3786 } else {
2e9f7f1c 3787 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
3788 }
3789
3790 return 0;
3791 }
9e15b77d 3792
2e9f7f1c 3793 /* Get the pool name; we have to make our own copy of this */
9e15b77d 3794
2e9f7f1c
AE
3795 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3796 if (!pool_name) {
3797 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
3798 return -EIO;
3799 }
2e9f7f1c
AE
3800 pool_name = kstrdup(pool_name, GFP_KERNEL);
3801 if (!pool_name)
9e15b77d
AE
3802 return -ENOMEM;
3803
3804 /* Fetch the image name; tolerate failure here */
3805
2e9f7f1c
AE
3806 image_name = rbd_dev_image_name(rbd_dev);
3807 if (!image_name)
06ecc6cb 3808 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 3809
2e9f7f1c 3810 /* Look up the snapshot name, and make a copy */
9e15b77d 3811
2e9f7f1c
AE
3812 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3813 if (!snap_name) {
3814 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
9e15b77d
AE
3815 ret = -EIO;
3816 goto out_err;
3817 }
2e9f7f1c
AE
3818 snap_name = kstrdup(snap_name, GFP_KERNEL);
3819 if (!snap_name) {
3820 ret = -ENOMEM;
9e15b77d 3821 goto out_err;
2e9f7f1c
AE
3822 }
3823
3824 spec->pool_name = pool_name;
3825 spec->image_name = image_name;
3826 spec->snap_name = snap_name;
9e15b77d
AE
3827
3828 return 0;
3829out_err:
2e9f7f1c
AE
3830 kfree(image_name);
3831 kfree(pool_name);
9e15b77d
AE
3832
3833 return ret;
3834}
3835
6e14b1a6 3836static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
35d489f9
AE
3837{
3838 size_t size;
3839 int ret;
3840 void *reply_buf;
3841 void *p;
3842 void *end;
3843 u64 seq;
3844 u32 snap_count;
3845 struct ceph_snap_context *snapc;
3846 u32 i;
3847
3848 /*
3849 * We'll need room for the seq value (maximum snapshot id),
3850 * snapshot count, and array of that many snapshot ids.
3851 * For now we have a fixed upper limit on the number we're
3852 * prepared to receive.
3853 */
3854 size = sizeof (__le64) + sizeof (__le32) +
3855 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3856 reply_buf = kzalloc(size, GFP_KERNEL);
3857 if (!reply_buf)
3858 return -ENOMEM;
3859
36be9a76 3860 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3861 "rbd", "get_snapcontext", NULL, 0,
07b2391f 3862 reply_buf, size, ver);
36be9a76 3863 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3864 if (ret < 0)
3865 goto out;
3866
35d489f9 3867 p = reply_buf;
57385b51
AE
3868 end = reply_buf + ret;
3869 ret = -ERANGE;
35d489f9
AE
3870 ceph_decode_64_safe(&p, end, seq, out);
3871 ceph_decode_32_safe(&p, end, snap_count, out);
3872
3873 /*
3874 * Make sure the reported number of snapshot ids wouldn't go
3875 * beyond the end of our buffer. But before checking that,
3876 * make sure the computed size of the snapshot context we
3877 * allocate is representable in a size_t.
3878 */
3879 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3880 / sizeof (u64)) {
3881 ret = -EINVAL;
3882 goto out;
3883 }
3884 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3885 goto out;
468521c1 3886 ret = 0;
35d489f9 3887
812164f8 3888 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
3889 if (!snapc) {
3890 ret = -ENOMEM;
3891 goto out;
3892 }
35d489f9 3893 snapc->seq = seq;
35d489f9
AE
3894 for (i = 0; i < snap_count; i++)
3895 snapc->snaps[i] = ceph_decode_64(&p);
3896
3897 rbd_dev->header.snapc = snapc;
3898
3899 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 3900 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
3901out:
3902 kfree(reply_buf);
3903
57385b51 3904 return ret;
35d489f9
AE
3905}
3906
b8b1e2db
AE
3907static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3908{
3909 size_t size;
3910 void *reply_buf;
3911 __le64 snap_id;
3912 int ret;
3913 void *p;
3914 void *end;
b8b1e2db
AE
3915 char *snap_name;
3916
3917 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3918 reply_buf = kmalloc(size, GFP_KERNEL);
3919 if (!reply_buf)
3920 return ERR_PTR(-ENOMEM);
3921
acb1b6ca 3922 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
b8b1e2db 3923 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3924 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 3925 "rbd", "get_snapshot_name",
4157976b 3926 &snap_id, sizeof (snap_id),
07b2391f 3927 reply_buf, size, NULL);
36be9a76 3928 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
3929 if (ret < 0) {
3930 snap_name = ERR_PTR(ret);
b8b1e2db 3931 goto out;
f40eb349 3932 }
b8b1e2db
AE
3933
3934 p = reply_buf;
f40eb349 3935 end = reply_buf + ret;
e5c35534 3936 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 3937 if (IS_ERR(snap_name))
b8b1e2db 3938 goto out;
b8b1e2db 3939
f40eb349
AE
3940 dout(" snap_id 0x%016llx snap_name = %s\n",
3941 (unsigned long long)le64_to_cpu(snap_id), snap_name);
b8b1e2db
AE
3942out:
3943 kfree(reply_buf);
3944
f40eb349 3945 return snap_name;
b8b1e2db
AE
3946}
3947
3948static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3949 u64 *snap_size, u64 *snap_features)
3950{
e0b49868 3951 u64 snap_id;
acb1b6ca
AE
3952 u64 size;
3953 u64 features;
3954 char *snap_name;
b8b1e2db
AE
3955 int ret;
3956
acb1b6ca 3957 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
b8b1e2db 3958 snap_id = rbd_dev->header.snapc->snaps[which];
acb1b6ca 3959 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
b8b1e2db 3960 if (ret)
acb1b6ca
AE
3961 goto out_err;
3962
3963 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
b8b1e2db 3964 if (ret)
acb1b6ca
AE
3965 goto out_err;
3966
3967 snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3968 if (!IS_ERR(snap_name)) {
3969 *snap_size = size;
3970 *snap_features = features;
3971 }
b8b1e2db 3972
acb1b6ca
AE
3973 return snap_name;
3974out_err:
3975 return ERR_PTR(ret);
b8b1e2db
AE
3976}
3977
3978static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3979 u64 *snap_size, u64 *snap_features)
3980{
3981 if (rbd_dev->image_format == 1)
3982 return rbd_dev_v1_snap_info(rbd_dev, which,
3983 snap_size, snap_features);
3984 if (rbd_dev->image_format == 2)
3985 return rbd_dev_v2_snap_info(rbd_dev, which,
3986 snap_size, snap_features);
3987 return ERR_PTR(-EINVAL);
3988}
3989
117973fb
AE
3990static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3991{
3992 int ret;
117973fb
AE
3993
3994 down_write(&rbd_dev->header_rwsem);
3995
117973fb
AE
3996 ret = rbd_dev_v2_image_size(rbd_dev);
3997 if (ret)
3998 goto out;
117973fb
AE
3999 rbd_update_mapping_size(rbd_dev);
4000
4001 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4002 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4003 if (ret)
4004 goto out;
4005 ret = rbd_dev_snaps_update(rbd_dev);
4006 dout("rbd_dev_snaps_update returned %d\n", ret);
4007 if (ret)
4008 goto out;
117973fb
AE
4009out:
4010 up_write(&rbd_dev->header_rwsem);
4011
4012 return ret;
4013}
4014
dfc5606d 4015/*
35938150
AE
4016 * Scan the rbd device's current snapshot list and compare it to the
4017 * newly-received snapshot context. Remove any existing snapshots
4018 * not present in the new snapshot context. Add a new snapshot for
4019 * any snaphots in the snapshot context not in the current list.
4020 * And verify there are no changes to snapshots we already know
4021 * about.
4022 *
4023 * Assumes the snapshots in the snapshot context are sorted by
4024 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4025 * are also maintained in that order.)
522a0cc0
AE
4026 *
4027 * Note that any error occurs while updating the snapshot list
4028 * aborts the update, and the entire list is cleared. The snapshot
4029 * list becomes inconsistent at that point anyway, so it might as
4030 * well be empty.
dfc5606d 4031 */
304f6808 4032static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 4033{
35938150
AE
4034 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4035 const u32 snap_count = snapc->num_snaps;
35938150
AE
4036 struct list_head *head = &rbd_dev->snaps;
4037 struct list_head *links = head->next;
4038 u32 index = 0;
522a0cc0 4039 int ret = 0;
dfc5606d 4040
522a0cc0 4041 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
35938150
AE
4042 while (index < snap_count || links != head) {
4043 u64 snap_id;
4044 struct rbd_snap *snap;
cd892126
AE
4045 char *snap_name;
4046 u64 snap_size = 0;
4047 u64 snap_features = 0;
dfc5606d 4048
35938150
AE
4049 snap_id = index < snap_count ? snapc->snaps[index]
4050 : CEPH_NOSNAP;
4051 snap = links != head ? list_entry(links, struct rbd_snap, node)
4052 : NULL;
aafb230e 4053 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 4054
35938150
AE
4055 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4056 struct list_head *next = links->next;
dfc5606d 4057
6d292906
AE
4058 /*
4059 * A previously-existing snapshot is not in
4060 * the new snap context.
4061 *
522a0cc0
AE
4062 * If the now-missing snapshot is the one
4063 * the image represents, clear its existence
4064 * flag so we can avoid sending any more
4065 * requests to it.
6d292906 4066 */
0d7dbfce 4067 if (rbd_dev->spec->snap_id == snap->id)
6d292906 4068 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3e83b65b 4069 dout("removing %ssnap id %llu\n",
0d7dbfce
AE
4070 rbd_dev->spec->snap_id == snap->id ?
4071 "mapped " : "",
522a0cc0 4072 (unsigned long long)snap->id);
6087b51b
AE
4073
4074 list_del(&snap->node);
4075 rbd_snap_destroy(snap);
35938150
AE
4076
4077 /* Done with this list entry; advance */
4078
4079 links = next;
dfc5606d
YS
4080 continue;
4081 }
35938150 4082
b8b1e2db
AE
4083 snap_name = rbd_dev_snap_info(rbd_dev, index,
4084 &snap_size, &snap_features);
522a0cc0
AE
4085 if (IS_ERR(snap_name)) {
4086 ret = PTR_ERR(snap_name);
4087 dout("failed to get snap info, error %d\n", ret);
4088 goto out_err;
4089 }
cd892126 4090
522a0cc0
AE
4091 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4092 (unsigned long long)snap_id);
35938150
AE
4093 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4094 struct rbd_snap *new_snap;
4095
4096 /* We haven't seen this snapshot before */
4097
6087b51b 4098 new_snap = rbd_snap_create(rbd_dev, snap_name,
cd892126 4099 snap_id, snap_size, snap_features);
9fcbb800 4100 if (IS_ERR(new_snap)) {
522a0cc0
AE
4101 ret = PTR_ERR(new_snap);
4102 dout(" failed to add dev, error %d\n", ret);
4103 goto out_err;
9fcbb800 4104 }
35938150
AE
4105
4106 /* New goes before existing, or at end of list */
4107
9fcbb800 4108 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
4109 if (snap)
4110 list_add_tail(&new_snap->node, &snap->node);
4111 else
523f3258 4112 list_add_tail(&new_snap->node, head);
35938150
AE
4113 } else {
4114 /* Already have this one */
4115
9fcbb800
AE
4116 dout(" already present\n");
4117
cd892126 4118 rbd_assert(snap->size == snap_size);
aafb230e 4119 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 4120 rbd_assert(snap->features == snap_features);
35938150
AE
4121
4122 /* Done with this list entry; advance */
4123
4124 links = links->next;
dfc5606d 4125 }
35938150
AE
4126
4127 /* Advance to the next entry in the snapshot context */
4128
4129 index++;
dfc5606d 4130 }
9fcbb800 4131 dout("%s: done\n", __func__);
dfc5606d
YS
4132
4133 return 0;
522a0cc0
AE
4134out_err:
4135 rbd_remove_all_snaps(rbd_dev);
4136
4137 return ret;
dfc5606d
YS
4138}
4139
dfc5606d
YS
4140static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4141{
dfc5606d 4142 struct device *dev;
cd789ab9 4143 int ret;
dfc5606d
YS
4144
4145 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4146
cd789ab9 4147 dev = &rbd_dev->dev;
dfc5606d
YS
4148 dev->bus = &rbd_bus_type;
4149 dev->type = &rbd_device_type;
4150 dev->parent = &rbd_root_dev;
200a6a8b 4151 dev->release = rbd_dev_device_release;
de71a297 4152 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4153 ret = device_register(dev);
dfc5606d 4154
dfc5606d 4155 mutex_unlock(&ctl_mutex);
cd789ab9 4156
dfc5606d 4157 return ret;
602adf40
YS
4158}
4159
dfc5606d
YS
4160static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4161{
4162 device_unregister(&rbd_dev->dev);
4163}
4164
e2839308 4165static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4166
4167/*
499afd5b
AE
4168 * Get a unique rbd identifier for the given new rbd_dev, and add
4169 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4170 */
e2839308 4171static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4172{
e2839308 4173 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4174
4175 spin_lock(&rbd_dev_list_lock);
4176 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4177 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4178 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4179 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4180}
b7f23c36 4181
1ddbe94e 4182/*
499afd5b
AE
4183 * Remove an rbd_dev from the global list, and record that its
4184 * identifier is no longer in use.
1ddbe94e 4185 */
e2839308 4186static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4187{
d184f6bf 4188 struct list_head *tmp;
de71a297 4189 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4190 int max_id;
4191
aafb230e 4192 rbd_assert(rbd_id > 0);
499afd5b 4193
e2839308
AE
4194 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4195 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4196 spin_lock(&rbd_dev_list_lock);
4197 list_del_init(&rbd_dev->node);
d184f6bf
AE
4198
4199 /*
4200 * If the id being "put" is not the current maximum, there
4201 * is nothing special we need to do.
4202 */
e2839308 4203 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4204 spin_unlock(&rbd_dev_list_lock);
4205 return;
4206 }
4207
4208 /*
4209 * We need to update the current maximum id. Search the
4210 * list to find out what it is. We're more likely to find
4211 * the maximum at the end, so search the list backward.
4212 */
4213 max_id = 0;
4214 list_for_each_prev(tmp, &rbd_dev_list) {
4215 struct rbd_device *rbd_dev;
4216
4217 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4218 if (rbd_dev->dev_id > max_id)
4219 max_id = rbd_dev->dev_id;
d184f6bf 4220 }
499afd5b 4221 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4222
1ddbe94e 4223 /*
e2839308 4224 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4225 * which case it now accurately reflects the new maximum.
4226 * Be careful not to overwrite the maximum value in that
4227 * case.
1ddbe94e 4228 */
e2839308
AE
4229 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4230 dout(" max dev id has been reset\n");
b7f23c36
AE
4231}
4232
e28fff26
AE
4233/*
4234 * Skips over white space at *buf, and updates *buf to point to the
4235 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4236 * the token (string of non-white space characters) found. Note
4237 * that *buf must be terminated with '\0'.
e28fff26
AE
4238 */
4239static inline size_t next_token(const char **buf)
4240{
4241 /*
4242 * These are the characters that produce nonzero for
4243 * isspace() in the "C" and "POSIX" locales.
4244 */
4245 const char *spaces = " \f\n\r\t\v";
4246
4247 *buf += strspn(*buf, spaces); /* Find start of token */
4248
4249 return strcspn(*buf, spaces); /* Return token length */
4250}
4251
4252/*
4253 * Finds the next token in *buf, and if the provided token buffer is
4254 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4255 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4256 * must be terminated with '\0' on entry.
e28fff26
AE
4257 *
4258 * Returns the length of the token found (not including the '\0').
4259 * Return value will be 0 if no token is found, and it will be >=
4260 * token_size if the token would not fit.
4261 *
593a9e7b 4262 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4263 * found token. Note that this occurs even if the token buffer is
4264 * too small to hold it.
4265 */
4266static inline size_t copy_token(const char **buf,
4267 char *token,
4268 size_t token_size)
4269{
4270 size_t len;
4271
4272 len = next_token(buf);
4273 if (len < token_size) {
4274 memcpy(token, *buf, len);
4275 *(token + len) = '\0';
4276 }
4277 *buf += len;
4278
4279 return len;
4280}
4281
ea3352f4
AE
4282/*
4283 * Finds the next token in *buf, dynamically allocates a buffer big
4284 * enough to hold a copy of it, and copies the token into the new
4285 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4286 * that a duplicate buffer is created even for a zero-length token.
4287 *
4288 * Returns a pointer to the newly-allocated duplicate, or a null
4289 * pointer if memory for the duplicate was not available. If
4290 * the lenp argument is a non-null pointer, the length of the token
4291 * (not including the '\0') is returned in *lenp.
4292 *
4293 * If successful, the *buf pointer will be updated to point beyond
4294 * the end of the found token.
4295 *
4296 * Note: uses GFP_KERNEL for allocation.
4297 */
4298static inline char *dup_token(const char **buf, size_t *lenp)
4299{
4300 char *dup;
4301 size_t len;
4302
4303 len = next_token(buf);
4caf35f9 4304 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4305 if (!dup)
4306 return NULL;
ea3352f4
AE
4307 *(dup + len) = '\0';
4308 *buf += len;
4309
4310 if (lenp)
4311 *lenp = len;
4312
4313 return dup;
4314}
4315
a725f65e 4316/*
859c31df
AE
4317 * Parse the options provided for an "rbd add" (i.e., rbd image
4318 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4319 * and the data written is passed here via a NUL-terminated buffer.
4320 * Returns 0 if successful or an error code otherwise.
d22f76e7 4321 *
859c31df
AE
4322 * The information extracted from these options is recorded in
4323 * the other parameters which return dynamically-allocated
4324 * structures:
4325 * ceph_opts
4326 * The address of a pointer that will refer to a ceph options
4327 * structure. Caller must release the returned pointer using
4328 * ceph_destroy_options() when it is no longer needed.
4329 * rbd_opts
4330 * Address of an rbd options pointer. Fully initialized by
4331 * this function; caller must release with kfree().
4332 * spec
4333 * Address of an rbd image specification pointer. Fully
4334 * initialized by this function based on parsed options.
4335 * Caller must release with rbd_spec_put().
4336 *
4337 * The options passed take this form:
4338 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4339 * where:
4340 * <mon_addrs>
4341 * A comma-separated list of one or more monitor addresses.
4342 * A monitor address is an ip address, optionally followed
4343 * by a port number (separated by a colon).
4344 * I.e.: ip1[:port1][,ip2[:port2]...]
4345 * <options>
4346 * A comma-separated list of ceph and/or rbd options.
4347 * <pool_name>
4348 * The name of the rados pool containing the rbd image.
4349 * <image_name>
4350 * The name of the image in that pool to map.
4351 * <snap_id>
4352 * An optional snapshot id. If provided, the mapping will
4353 * present data from the image at the time that snapshot was
4354 * created. The image head is used if no snapshot id is
4355 * provided. Snapshot mappings are always read-only.
a725f65e 4356 */
859c31df 4357static int rbd_add_parse_args(const char *buf,
dc79b113 4358 struct ceph_options **ceph_opts,
859c31df
AE
4359 struct rbd_options **opts,
4360 struct rbd_spec **rbd_spec)
e28fff26 4361{
d22f76e7 4362 size_t len;
859c31df 4363 char *options;
0ddebc0c 4364 const char *mon_addrs;
ecb4dc22 4365 char *snap_name;
0ddebc0c 4366 size_t mon_addrs_size;
859c31df 4367 struct rbd_spec *spec = NULL;
4e9afeba 4368 struct rbd_options *rbd_opts = NULL;
859c31df 4369 struct ceph_options *copts;
dc79b113 4370 int ret;
e28fff26
AE
4371
4372 /* The first four tokens are required */
4373
7ef3214a 4374 len = next_token(&buf);
4fb5d671
AE
4375 if (!len) {
4376 rbd_warn(NULL, "no monitor address(es) provided");
4377 return -EINVAL;
4378 }
0ddebc0c 4379 mon_addrs = buf;
f28e565a 4380 mon_addrs_size = len + 1;
7ef3214a 4381 buf += len;
a725f65e 4382
dc79b113 4383 ret = -EINVAL;
f28e565a
AE
4384 options = dup_token(&buf, NULL);
4385 if (!options)
dc79b113 4386 return -ENOMEM;
4fb5d671
AE
4387 if (!*options) {
4388 rbd_warn(NULL, "no options provided");
4389 goto out_err;
4390 }
e28fff26 4391
859c31df
AE
4392 spec = rbd_spec_alloc();
4393 if (!spec)
f28e565a 4394 goto out_mem;
859c31df
AE
4395
4396 spec->pool_name = dup_token(&buf, NULL);
4397 if (!spec->pool_name)
4398 goto out_mem;
4fb5d671
AE
4399 if (!*spec->pool_name) {
4400 rbd_warn(NULL, "no pool name provided");
4401 goto out_err;
4402 }
e28fff26 4403
69e7a02f 4404 spec->image_name = dup_token(&buf, NULL);
859c31df 4405 if (!spec->image_name)
f28e565a 4406 goto out_mem;
4fb5d671
AE
4407 if (!*spec->image_name) {
4408 rbd_warn(NULL, "no image name provided");
4409 goto out_err;
4410 }
d4b125e9 4411
f28e565a
AE
4412 /*
4413 * Snapshot name is optional; default is to use "-"
4414 * (indicating the head/no snapshot).
4415 */
3feeb894 4416 len = next_token(&buf);
820a5f3e 4417 if (!len) {
3feeb894
AE
4418 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4419 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4420 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4421 ret = -ENAMETOOLONG;
f28e565a 4422 goto out_err;
849b4260 4423 }
ecb4dc22
AE
4424 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4425 if (!snap_name)
f28e565a 4426 goto out_mem;
ecb4dc22
AE
4427 *(snap_name + len) = '\0';
4428 spec->snap_name = snap_name;
e5c35534 4429
0ddebc0c 4430 /* Initialize all rbd options to the defaults */
e28fff26 4431
4e9afeba
AE
4432 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4433 if (!rbd_opts)
4434 goto out_mem;
4435
4436 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4437
859c31df 4438 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4439 mon_addrs + mon_addrs_size - 1,
4e9afeba 4440 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4441 if (IS_ERR(copts)) {
4442 ret = PTR_ERR(copts);
dc79b113
AE
4443 goto out_err;
4444 }
859c31df
AE
4445 kfree(options);
4446
4447 *ceph_opts = copts;
4e9afeba 4448 *opts = rbd_opts;
859c31df 4449 *rbd_spec = spec;
0ddebc0c 4450
dc79b113 4451 return 0;
f28e565a 4452out_mem:
dc79b113 4453 ret = -ENOMEM;
d22f76e7 4454out_err:
859c31df
AE
4455 kfree(rbd_opts);
4456 rbd_spec_put(spec);
f28e565a 4457 kfree(options);
d22f76e7 4458
dc79b113 4459 return ret;
a725f65e
AE
4460}
4461
589d30e0
AE
4462/*
4463 * An rbd format 2 image has a unique identifier, distinct from the
4464 * name given to it by the user. Internally, that identifier is
4465 * what's used to specify the names of objects related to the image.
4466 *
4467 * A special "rbd id" object is used to map an rbd image name to its
4468 * id. If that object doesn't exist, then there is no v2 rbd image
4469 * with the supplied name.
4470 *
4471 * This function will record the given rbd_dev's image_id field if
4472 * it can be determined, and in that case will return 0. If any
4473 * errors occur a negative errno will be returned and the rbd_dev's
4474 * image_id field will be unchanged (and should be NULL).
4475 */
4476static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4477{
4478 int ret;
4479 size_t size;
4480 char *object_name;
4481 void *response;
c0fba368 4482 char *image_id;
2f82ee54 4483
2c0d0a10
AE
4484 /*
4485 * When probing a parent image, the image id is already
4486 * known (and the image name likely is not). There's no
c0fba368
AE
4487 * need to fetch the image id again in this case. We
4488 * do still need to set the image format though.
2c0d0a10 4489 */
c0fba368
AE
4490 if (rbd_dev->spec->image_id) {
4491 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4492
2c0d0a10 4493 return 0;
c0fba368 4494 }
2c0d0a10 4495
589d30e0
AE
4496 /*
4497 * First, see if the format 2 image id file exists, and if
4498 * so, get the image's persistent id from it.
4499 */
69e7a02f 4500 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4501 object_name = kmalloc(size, GFP_NOIO);
4502 if (!object_name)
4503 return -ENOMEM;
0d7dbfce 4504 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4505 dout("rbd id object name is %s\n", object_name);
4506
4507 /* Response will be an encoded string, which includes a length */
4508
4509 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4510 response = kzalloc(size, GFP_NOIO);
4511 if (!response) {
4512 ret = -ENOMEM;
4513 goto out;
4514 }
4515
c0fba368
AE
4516 /* If it doesn't exist we'll assume it's a format 1 image */
4517
36be9a76 4518 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4519 "rbd", "get_id", NULL, 0,
07b2391f 4520 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 4521 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4522 if (ret == -ENOENT) {
4523 image_id = kstrdup("", GFP_KERNEL);
4524 ret = image_id ? 0 : -ENOMEM;
4525 if (!ret)
4526 rbd_dev->image_format = 1;
4527 } else if (ret > sizeof (__le32)) {
4528 void *p = response;
4529
4530 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4531 NULL, GFP_NOIO);
c0fba368
AE
4532 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4533 if (!ret)
4534 rbd_dev->image_format = 2;
589d30e0 4535 } else {
c0fba368
AE
4536 ret = -EINVAL;
4537 }
4538
4539 if (!ret) {
4540 rbd_dev->spec->image_id = image_id;
4541 dout("image_id is %s\n", image_id);
589d30e0
AE
4542 }
4543out:
4544 kfree(response);
4545 kfree(object_name);
4546
4547 return ret;
4548}
4549
6fd48b3b
AE
4550/* Undo whatever state changes are made by v1 or v2 image probe */
4551
4552static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4553{
4554 struct rbd_image_header *header;
4555
4556 rbd_dev_remove_parent(rbd_dev);
4557 rbd_spec_put(rbd_dev->parent_spec);
4558 rbd_dev->parent_spec = NULL;
4559 rbd_dev->parent_overlap = 0;
4560
4561 /* Free dynamic fields from the header, then zero it out */
4562
4563 header = &rbd_dev->header;
812164f8 4564 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4565 kfree(header->snap_sizes);
4566 kfree(header->snap_names);
4567 kfree(header->object_prefix);
4568 memset(header, 0, sizeof (*header));
4569}
4570
a30b71b9
AE
4571static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4572{
4573 int ret;
a30b71b9
AE
4574
4575 /* Populate rbd image metadata */
4576
4577 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4578 if (ret < 0)
4579 goto out_err;
86b00e0d
AE
4580
4581 /* Version 1 images have no parent (no layering) */
4582
4583 rbd_dev->parent_spec = NULL;
4584 rbd_dev->parent_overlap = 0;
4585
a30b71b9
AE
4586 dout("discovered version 1 image, header name is %s\n",
4587 rbd_dev->header_name);
4588
4589 return 0;
4590
4591out_err:
4592 kfree(rbd_dev->header_name);
4593 rbd_dev->header_name = NULL;
0d7dbfce
AE
4594 kfree(rbd_dev->spec->image_id);
4595 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4596
4597 return ret;
4598}
4599
4600static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4601{
9d475de5 4602 int ret;
6e14b1a6 4603 u64 ver = 0;
a30b71b9 4604
9d475de5 4605 ret = rbd_dev_v2_image_size(rbd_dev);
57385b51 4606 if (ret)
1e130199
AE
4607 goto out_err;
4608
4609 /* Get the object prefix (a.k.a. block_name) for the image */
4610
4611 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4612 if (ret)
b1b5402a
AE
4613 goto out_err;
4614
d889140c 4615 /* Get the and check features for the image */
b1b5402a
AE
4616
4617 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4618 if (ret)
9d475de5 4619 goto out_err;
35d489f9 4620
86b00e0d
AE
4621 /* If the image supports layering, get the parent info */
4622
4623 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4624 ret = rbd_dev_v2_parent_info(rbd_dev);
57385b51 4625 if (ret)
86b00e0d 4626 goto out_err;
96882f55
AE
4627
4628 /*
4629 * Don't print a warning for parent images. We can
4630 * tell this point because we won't know its pool
4631 * name yet (just its pool id).
4632 */
4633 if (rbd_dev->spec->pool_name)
4634 rbd_warn(rbd_dev, "WARNING: kernel layering "
4635 "is EXPERIMENTAL!");
86b00e0d
AE
4636 }
4637
cc070d59
AE
4638 /* If the image supports fancy striping, get its parameters */
4639
4640 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4641 ret = rbd_dev_v2_striping_info(rbd_dev);
4642 if (ret < 0)
4643 goto out_err;
4644 }
4645
6e14b1a6
AE
4646 /* crypto and compression type aren't (yet) supported for v2 images */
4647
4648 rbd_dev->header.crypt_type = 0;
4649 rbd_dev->header.comp_type = 0;
35d489f9 4650
6e14b1a6
AE
4651 /* Get the snapshot context, plus the header version */
4652
4653 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
35d489f9
AE
4654 if (ret)
4655 goto out_err;
6e14b1a6
AE
4656 rbd_dev->header.obj_version = ver;
4657
a30b71b9
AE
4658 dout("discovered version 2 image, header name is %s\n",
4659 rbd_dev->header_name);
4660
35152979 4661 return 0;
9d475de5 4662out_err:
86b00e0d
AE
4663 rbd_dev->parent_overlap = 0;
4664 rbd_spec_put(rbd_dev->parent_spec);
4665 rbd_dev->parent_spec = NULL;
9d475de5
AE
4666 kfree(rbd_dev->header_name);
4667 rbd_dev->header_name = NULL;
1e130199
AE
4668 kfree(rbd_dev->header.object_prefix);
4669 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4670
4671 return ret;
a30b71b9
AE
4672}
4673
124afba2 4674static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4675{
2f82ee54 4676 struct rbd_device *parent = NULL;
124afba2
AE
4677 struct rbd_spec *parent_spec;
4678 struct rbd_client *rbdc;
4679 int ret;
4680
4681 if (!rbd_dev->parent_spec)
4682 return 0;
4683 /*
4684 * We need to pass a reference to the client and the parent
4685 * spec when creating the parent rbd_dev. Images related by
4686 * parent/child relationships always share both.
4687 */
4688 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4689 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4690
4691 ret = -ENOMEM;
4692 parent = rbd_dev_create(rbdc, parent_spec);
4693 if (!parent)
4694 goto out_err;
4695
4696 ret = rbd_dev_image_probe(parent);
4697 if (ret < 0)
4698 goto out_err;
4699 rbd_dev->parent = parent;
4700
4701 return 0;
4702out_err:
4703 if (parent) {
4704 rbd_spec_put(rbd_dev->parent_spec);
4705 kfree(rbd_dev->header_name);
4706 rbd_dev_destroy(parent);
4707 } else {
4708 rbd_put_client(rbdc);
4709 rbd_spec_put(parent_spec);
4710 }
4711
4712 return ret;
4713}
4714
200a6a8b 4715static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4716{
83a06263 4717 int ret;
d1cf5788
AE
4718
4719 ret = rbd_dev_mapping_set(rbd_dev);
83a06263 4720 if (ret)
9bb81c9b 4721 return ret;
5de10f3b 4722
83a06263
AE
4723 /* generate unique id: find highest unique id, add one */
4724 rbd_dev_id_get(rbd_dev);
4725
4726 /* Fill in the device name, now that we have its id. */
4727 BUILD_BUG_ON(DEV_NAME_LEN
4728 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4729 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4730
4731 /* Get our block major device number. */
4732
4733 ret = register_blkdev(0, rbd_dev->name);
4734 if (ret < 0)
4735 goto err_out_id;
4736 rbd_dev->major = ret;
4737
4738 /* Set up the blkdev mapping. */
4739
4740 ret = rbd_init_disk(rbd_dev);
4741 if (ret)
4742 goto err_out_blkdev;
4743
4744 ret = rbd_bus_add_dev(rbd_dev);
4745 if (ret)
4746 goto err_out_disk;
4747
83a06263
AE
4748 /* Everything's ready. Announce the disk to the world. */
4749
b5156e76 4750 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
129b79d4 4751 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4752 add_disk(rbd_dev->disk);
4753
4754 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4755 (unsigned long long) rbd_dev->mapping.size);
4756
4757 return ret;
2f82ee54 4758
83a06263
AE
4759err_out_disk:
4760 rbd_free_disk(rbd_dev);
4761err_out_blkdev:
4762 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4763err_out_id:
4764 rbd_dev_id_put(rbd_dev);
d1cf5788 4765 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4766
4767 return ret;
4768}
4769
332bb12d
AE
4770static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4771{
4772 struct rbd_spec *spec = rbd_dev->spec;
4773 size_t size;
4774
4775 /* Record the header object name for this rbd image. */
4776
4777 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4778
4779 if (rbd_dev->image_format == 1)
4780 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4781 else
4782 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4783
4784 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4785 if (!rbd_dev->header_name)
4786 return -ENOMEM;
4787
4788 if (rbd_dev->image_format == 1)
4789 sprintf(rbd_dev->header_name, "%s%s",
4790 spec->image_name, RBD_SUFFIX);
4791 else
4792 sprintf(rbd_dev->header_name, "%s%s",
4793 RBD_HEADER_PREFIX, spec->image_id);
4794 return 0;
4795}
4796
200a6a8b
AE
4797static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4798{
6fd48b3b
AE
4799 int ret;
4800
4801 rbd_remove_all_snaps(rbd_dev);
4802 rbd_dev_unprobe(rbd_dev);
4803 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4804 if (ret)
4805 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
200a6a8b 4806 kfree(rbd_dev->header_name);
6fd48b3b
AE
4807 rbd_dev->header_name = NULL;
4808 rbd_dev->image_format = 0;
4809 kfree(rbd_dev->spec->image_id);
4810 rbd_dev->spec->image_id = NULL;
4811
200a6a8b
AE
4812 rbd_dev_destroy(rbd_dev);
4813}
4814
a30b71b9
AE
4815/*
4816 * Probe for the existence of the header object for the given rbd
4817 * device. For format 2 images this includes determining the image
4818 * id.
4819 */
71f293e2 4820static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
a30b71b9
AE
4821{
4822 int ret;
b644de2b 4823 int tmp;
a30b71b9
AE
4824
4825 /*
4826 * Get the id from the image id object. If it's not a
4827 * format 2 image, we'll get ENOENT back, and we'll assume
4828 * it's a format 1 image.
4829 */
4830 ret = rbd_dev_image_id(rbd_dev);
4831 if (ret)
c0fba368
AE
4832 return ret;
4833 rbd_assert(rbd_dev->spec->image_id);
4834 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4835
332bb12d
AE
4836 ret = rbd_dev_header_name(rbd_dev);
4837 if (ret)
4838 goto err_out_format;
4839
b644de2b
AE
4840 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4841 if (ret)
4842 goto out_header_name;
4843
c0fba368 4844 if (rbd_dev->image_format == 1)
a30b71b9
AE
4845 ret = rbd_dev_v1_probe(rbd_dev);
4846 else
4847 ret = rbd_dev_v2_probe(rbd_dev);
5655c4d9 4848 if (ret)
b644de2b 4849 goto err_out_watch;
83a06263 4850
9bb81c9b
AE
4851 ret = rbd_dev_snaps_update(rbd_dev);
4852 if (ret)
6fd48b3b 4853 goto err_out_probe;
9bb81c9b
AE
4854
4855 ret = rbd_dev_spec_update(rbd_dev);
4856 if (ret)
4857 goto err_out_snaps;
4858
4859 ret = rbd_dev_probe_parent(rbd_dev);
6fd48b3b
AE
4860 if (!ret)
4861 return 0;
83a06263 4862
9bb81c9b
AE
4863err_out_snaps:
4864 rbd_remove_all_snaps(rbd_dev);
6fd48b3b
AE
4865err_out_probe:
4866 rbd_dev_unprobe(rbd_dev);
b644de2b
AE
4867err_out_watch:
4868 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4869 if (tmp)
4870 rbd_warn(rbd_dev, "unable to tear down watch request\n");
332bb12d
AE
4871out_header_name:
4872 kfree(rbd_dev->header_name);
4873 rbd_dev->header_name = NULL;
4874err_out_format:
4875 rbd_dev->image_format = 0;
5655c4d9
AE
4876 kfree(rbd_dev->spec->image_id);
4877 rbd_dev->spec->image_id = NULL;
4878
4879 dout("probe failed, returning %d\n", ret);
4880
a30b71b9
AE
4881 return ret;
4882}
4883
59c2be1e
YS
4884static ssize_t rbd_add(struct bus_type *bus,
4885 const char *buf,
4886 size_t count)
602adf40 4887{
cb8627c7 4888 struct rbd_device *rbd_dev = NULL;
dc79b113 4889 struct ceph_options *ceph_opts = NULL;
4e9afeba 4890 struct rbd_options *rbd_opts = NULL;
859c31df 4891 struct rbd_spec *spec = NULL;
9d3997fd 4892 struct rbd_client *rbdc;
27cc2594
AE
4893 struct ceph_osd_client *osdc;
4894 int rc = -ENOMEM;
602adf40
YS
4895
4896 if (!try_module_get(THIS_MODULE))
4897 return -ENODEV;
4898
602adf40 4899 /* parse add command */
859c31df 4900 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4901 if (rc < 0)
bd4ba655 4902 goto err_out_module;
78cea76e 4903
9d3997fd
AE
4904 rbdc = rbd_get_client(ceph_opts);
4905 if (IS_ERR(rbdc)) {
4906 rc = PTR_ERR(rbdc);
0ddebc0c 4907 goto err_out_args;
9d3997fd 4908 }
c53d5893 4909 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4910
602adf40 4911 /* pick the pool */
9d3997fd 4912 osdc = &rbdc->client->osdc;
859c31df 4913 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4914 if (rc < 0)
4915 goto err_out_client;
c0cd10db 4916 spec->pool_id = (u64)rc;
859c31df 4917
0903e875
AE
4918 /* The ceph file layout needs to fit pool id in 32 bits */
4919
c0cd10db
AE
4920 if (spec->pool_id > (u64)U32_MAX) {
4921 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4922 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
4923 rc = -EIO;
4924 goto err_out_client;
4925 }
4926
c53d5893 4927 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4928 if (!rbd_dev)
4929 goto err_out_client;
c53d5893
AE
4930 rbdc = NULL; /* rbd_dev now owns this */
4931 spec = NULL; /* rbd_dev now owns this */
602adf40 4932
bd4ba655 4933 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4934 kfree(rbd_opts);
4935 rbd_opts = NULL; /* done with this */
bd4ba655 4936
71f293e2 4937 rc = rbd_dev_image_probe(rbd_dev);
a30b71b9 4938 if (rc < 0)
c53d5893 4939 goto err_out_rbd_dev;
05fd6f6f 4940
b536f69a
AE
4941 rc = rbd_dev_device_setup(rbd_dev);
4942 if (!rc)
4943 return count;
4944
4945 rbd_dev_image_release(rbd_dev);
c53d5893
AE
4946err_out_rbd_dev:
4947 rbd_dev_destroy(rbd_dev);
bd4ba655 4948err_out_client:
9d3997fd 4949 rbd_put_client(rbdc);
0ddebc0c 4950err_out_args:
78cea76e
AE
4951 if (ceph_opts)
4952 ceph_destroy_options(ceph_opts);
4e9afeba 4953 kfree(rbd_opts);
859c31df 4954 rbd_spec_put(spec);
bd4ba655
AE
4955err_out_module:
4956 module_put(THIS_MODULE);
27cc2594 4957
602adf40 4958 dout("Error adding device %s\n", buf);
27cc2594 4959
c0cd10db 4960 return (ssize_t)rc;
602adf40
YS
4961}
4962
de71a297 4963static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4964{
4965 struct list_head *tmp;
4966 struct rbd_device *rbd_dev;
4967
e124a82f 4968 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4969 list_for_each(tmp, &rbd_dev_list) {
4970 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4971 if (rbd_dev->dev_id == dev_id) {
e124a82f 4972 spin_unlock(&rbd_dev_list_lock);
602adf40 4973 return rbd_dev;
e124a82f 4974 }
602adf40 4975 }
e124a82f 4976 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4977 return NULL;
4978}
4979
200a6a8b 4980static void rbd_dev_device_release(struct device *dev)
602adf40 4981{
593a9e7b 4982 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4983
602adf40 4984 rbd_free_disk(rbd_dev);
200a6a8b
AE
4985 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4986 rbd_dev_clear_mapping(rbd_dev);
602adf40 4987 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 4988 rbd_dev->major = 0;
e2839308 4989 rbd_dev_id_put(rbd_dev);
d1cf5788 4990 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
4991}
4992
05a46afd
AE
4993static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4994{
ad945fc1 4995 while (rbd_dev->parent) {
05a46afd
AE
4996 struct rbd_device *first = rbd_dev;
4997 struct rbd_device *second = first->parent;
4998 struct rbd_device *third;
4999
5000 /*
5001 * Follow to the parent with no grandparent and
5002 * remove it.
5003 */
5004 while (second && (third = second->parent)) {
5005 first = second;
5006 second = third;
5007 }
ad945fc1 5008 rbd_assert(second);
8ad42cd0 5009 rbd_dev_image_release(second);
ad945fc1
AE
5010 first->parent = NULL;
5011 first->parent_overlap = 0;
5012
5013 rbd_assert(first->parent_spec);
05a46afd
AE
5014 rbd_spec_put(first->parent_spec);
5015 first->parent_spec = NULL;
05a46afd
AE
5016 }
5017}
5018
dfc5606d
YS
5019static ssize_t rbd_remove(struct bus_type *bus,
5020 const char *buf,
5021 size_t count)
602adf40
YS
5022{
5023 struct rbd_device *rbd_dev = NULL;
0d8189e1 5024 int target_id;
602adf40 5025 unsigned long ul;
0d8189e1 5026 int ret;
602adf40 5027
0d8189e1
AE
5028 ret = strict_strtoul(buf, 10, &ul);
5029 if (ret)
5030 return ret;
602adf40
YS
5031
5032 /* convert to int; abort if we lost anything in the conversion */
5033 target_id = (int) ul;
5034 if (target_id != ul)
5035 return -EINVAL;
5036
5037 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5038
5039 rbd_dev = __rbd_get_dev(target_id);
5040 if (!rbd_dev) {
5041 ret = -ENOENT;
5042 goto done;
42382b70
AE
5043 }
5044
a14ea269 5045 spin_lock_irq(&rbd_dev->lock);
b82d167b 5046 if (rbd_dev->open_count)
42382b70 5047 ret = -EBUSY;
b82d167b
AE
5048 else
5049 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 5050 spin_unlock_irq(&rbd_dev->lock);
b82d167b 5051 if (ret < 0)
42382b70 5052 goto done;
0d8189e1 5053 ret = count;
b480815a 5054 rbd_bus_del_dev(rbd_dev);
8ad42cd0 5055 rbd_dev_image_release(rbd_dev);
79ab7558 5056 module_put(THIS_MODULE);
602adf40
YS
5057done:
5058 mutex_unlock(&ctl_mutex);
aafb230e 5059
602adf40
YS
5060 return ret;
5061}
5062
602adf40
YS
5063/*
5064 * create control files in sysfs
dfc5606d 5065 * /sys/bus/rbd/...
602adf40
YS
5066 */
5067static int rbd_sysfs_init(void)
5068{
dfc5606d 5069 int ret;
602adf40 5070
fed4c143 5071 ret = device_register(&rbd_root_dev);
21079786 5072 if (ret < 0)
dfc5606d 5073 return ret;
602adf40 5074
fed4c143
AE
5075 ret = bus_register(&rbd_bus_type);
5076 if (ret < 0)
5077 device_unregister(&rbd_root_dev);
602adf40 5078
602adf40
YS
5079 return ret;
5080}
5081
5082static void rbd_sysfs_cleanup(void)
5083{
dfc5606d 5084 bus_unregister(&rbd_bus_type);
fed4c143 5085 device_unregister(&rbd_root_dev);
602adf40
YS
5086}
5087
cc344fa1 5088static int __init rbd_init(void)
602adf40
YS
5089{
5090 int rc;
5091
1e32d34c
AE
5092 if (!libceph_compatible(NULL)) {
5093 rbd_warn(NULL, "libceph incompatibility (quitting)");
5094
5095 return -EINVAL;
5096 }
602adf40
YS
5097 rc = rbd_sysfs_init();
5098 if (rc)
5099 return rc;
f0f8cef5 5100 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
5101 return 0;
5102}
5103
cc344fa1 5104static void __exit rbd_exit(void)
602adf40
YS
5105{
5106 rbd_sysfs_cleanup();
5107}
5108
5109module_init(rbd_init);
5110module_exit(rbd_exit);
5111
5112MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5113MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5114MODULE_DESCRIPTION("rados block device");
5115
5116/* following authorship retained from original osdblk.c */
5117MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5118
5119MODULE_LICENSE("GPL");