]> git.ipfire.org Git - people/arne_f/kernel.git/blame - drivers/block/rbd.c
rbd: require global CAP_SYS_ADMIN for mapping and unmapping
[people/arne_f/kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
ed95b21a 34#include <linux/ceph/cls_lock_client.h>
602adf40 35#include <linux/ceph/decode.h>
59c2be1e 36#include <linux/parser.h>
30d1cff8 37#include <linux/bsearch.h>
602adf40
YS
38
39#include <linux/kernel.h>
40#include <linux/device.h>
41#include <linux/module.h>
7ad18afa 42#include <linux/blk-mq.h>
602adf40
YS
43#include <linux/fs.h>
44#include <linux/blkdev.h>
1c2a9dfe 45#include <linux/slab.h>
f8a22fc2 46#include <linux/idr.h>
bc1ecc65 47#include <linux/workqueue.h>
602adf40
YS
48
49#include "rbd_types.h"
50
aafb230e
AE
51#define RBD_DEBUG /* Activate rbd_assert() calls */
52
a2acd00e
AE
53/*
54 * Increment the given counter and return its updated value.
55 * If the counter is already 0 it will not be incremented.
56 * If the counter is already at its maximum value returns
57 * -EINVAL without updating it.
58 */
59static int atomic_inc_return_safe(atomic_t *v)
60{
61 unsigned int counter;
62
63 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
64 if (counter <= (unsigned int)INT_MAX)
65 return (int)counter;
66
67 atomic_dec(v);
68
69 return -EINVAL;
70}
71
72/* Decrement the counter. Return the resulting value, or -EINVAL */
73static int atomic_dec_return_safe(atomic_t *v)
74{
75 int counter;
76
77 counter = atomic_dec_return(v);
78 if (counter >= 0)
79 return counter;
80
81 atomic_inc(v);
82
83 return -EINVAL;
84}
85
f0f8cef5 86#define RBD_DRV_NAME "rbd"
602adf40 87
7e513d43
ID
88#define RBD_MINORS_PER_MAJOR 256
89#define RBD_SINGLE_MAJOR_PART_SHIFT 4
602adf40 90
6d69bb53
ID
91#define RBD_MAX_PARENT_CHAIN_LEN 16
92
d4b125e9
AE
93#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
94#define RBD_MAX_SNAP_NAME_LEN \
95 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
96
35d489f9 97#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
98
99#define RBD_SNAP_HEAD_NAME "-"
100
9682fc6d
AE
101#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
102
9e15b77d
AE
103/* This allows a single page to hold an image name sent by OSD */
104#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 105#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 106
1e130199 107#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 108
ed95b21a 109#define RBD_NOTIFY_TIMEOUT 5 /* seconds */
99d16943
ID
110#define RBD_RETRY_DELAY msecs_to_jiffies(1000)
111
d889140c
AE
112/* Feature bits */
113
8767b293
ID
114#define RBD_FEATURE_LAYERING (1ULL<<0)
115#define RBD_FEATURE_STRIPINGV2 (1ULL<<1)
116#define RBD_FEATURE_EXCLUSIVE_LOCK (1ULL<<2)
117#define RBD_FEATURE_DATA_POOL (1ULL<<7)
0569dd9b 118#define RBD_FEATURE_OPERATIONS (1ULL<<8)
8767b293 119
ed95b21a
ID
120#define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \
121 RBD_FEATURE_STRIPINGV2 | \
7e97332e 122 RBD_FEATURE_EXCLUSIVE_LOCK | \
0569dd9b
ID
123 RBD_FEATURE_DATA_POOL | \
124 RBD_FEATURE_OPERATIONS)
d889140c
AE
125
126/* Features supported by this (client software) implementation. */
127
770eba6e 128#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 129
81a89793
AE
130/*
131 * An RBD device name will be "rbd#", where the "rbd" comes from
132 * RBD_DRV_NAME above, and # is a unique integer identifier.
81a89793 133 */
602adf40
YS
134#define DEV_NAME_LEN 32
135
136/*
137 * block device image metadata (in-memory version)
138 */
139struct rbd_image_header {
f35a4dee 140 /* These six fields never change for a given rbd image */
849b4260 141 char *object_prefix;
602adf40 142 __u8 obj_order;
f35a4dee
AE
143 u64 stripe_unit;
144 u64 stripe_count;
7e97332e 145 s64 data_pool_id;
f35a4dee 146 u64 features; /* Might be changeable someday? */
602adf40 147
f84344f3
AE
148 /* The remaining fields need to be updated occasionally */
149 u64 image_size;
150 struct ceph_snap_context *snapc;
f35a4dee
AE
151 char *snap_names; /* format 1 only */
152 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
153};
154
0d7dbfce
AE
155/*
156 * An rbd image specification.
157 *
158 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
159 * identify an image. Each rbd_dev structure includes a pointer to
160 * an rbd_spec structure that encapsulates this identity.
161 *
162 * Each of the id's in an rbd_spec has an associated name. For a
163 * user-mapped image, the names are supplied and the id's associated
164 * with them are looked up. For a layered image, a parent image is
165 * defined by the tuple, and the names are looked up.
166 *
167 * An rbd_dev structure contains a parent_spec pointer which is
168 * non-null if the image it represents is a child in a layered
169 * image. This pointer will refer to the rbd_spec structure used
170 * by the parent rbd_dev for its own identity (i.e., the structure
171 * is shared between the parent and child).
172 *
173 * Since these structures are populated once, during the discovery
174 * phase of image construction, they are effectively immutable so
175 * we make no effort to synchronize access to them.
176 *
177 * Note that code herein does not assume the image name is known (it
178 * could be a null pointer).
0d7dbfce
AE
179 */
180struct rbd_spec {
181 u64 pool_id;
ecb4dc22 182 const char *pool_name;
0d7dbfce 183
ecb4dc22
AE
184 const char *image_id;
185 const char *image_name;
0d7dbfce
AE
186
187 u64 snap_id;
ecb4dc22 188 const char *snap_name;
0d7dbfce
AE
189
190 struct kref kref;
191};
192
602adf40 193/*
f0f8cef5 194 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
195 */
196struct rbd_client {
197 struct ceph_client *client;
198 struct kref kref;
199 struct list_head node;
200};
201
bf0d5f50
AE
202struct rbd_img_request;
203typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
204
205#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
206
207struct rbd_obj_request;
208typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
209
9969ebc5
AE
210enum obj_request_type {
211 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
212};
bf0d5f50 213
6d2940c8
GZ
214enum obj_operation_type {
215 OBJ_OP_WRITE,
216 OBJ_OP_READ,
90e98c52 217 OBJ_OP_DISCARD,
6d2940c8
GZ
218};
219
926f9b3f
AE
220enum obj_req_flags {
221 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 222 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
223 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
224 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
225};
226
bf0d5f50 227struct rbd_obj_request {
a90bb0c1 228 u64 object_no;
bf0d5f50
AE
229 u64 offset; /* object start byte */
230 u64 length; /* bytes from offset */
926f9b3f 231 unsigned long flags;
bf0d5f50 232
c5b5ef6c
AE
233 /*
234 * An object request associated with an image will have its
235 * img_data flag set; a standalone object request will not.
236 *
237 * A standalone object request will have which == BAD_WHICH
238 * and a null obj_request pointer.
239 *
240 * An object request initiated in support of a layered image
241 * object (to check for its existence before a write) will
242 * have which == BAD_WHICH and a non-null obj_request pointer.
243 *
244 * Finally, an object request for rbd image data will have
245 * which != BAD_WHICH, and will have a non-null img_request
246 * pointer. The value of which will be in the range
247 * 0..(img_request->obj_request_count-1).
248 */
249 union {
250 struct rbd_obj_request *obj_request; /* STAT op */
251 struct {
252 struct rbd_img_request *img_request;
253 u64 img_offset;
254 /* links for img_request->obj_requests list */
255 struct list_head links;
256 };
257 };
bf0d5f50
AE
258 u32 which; /* posn image request list */
259
260 enum obj_request_type type;
788e2df3
AE
261 union {
262 struct bio *bio_list;
263 struct {
264 struct page **pages;
265 u32 page_count;
266 };
267 };
0eefd470 268 struct page **copyup_pages;
ebda6408 269 u32 copyup_page_count;
bf0d5f50
AE
270
271 struct ceph_osd_request *osd_req;
272
273 u64 xferred; /* bytes transferred */
1b83bef2 274 int result;
bf0d5f50
AE
275
276 rbd_obj_callback_t callback;
788e2df3 277 struct completion completion;
bf0d5f50
AE
278
279 struct kref kref;
280};
281
0c425248 282enum img_req_flags {
9849e986
AE
283 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
284 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 285 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
90e98c52 286 IMG_REQ_DISCARD, /* discard: normal = 0, discard request = 1 */
0c425248
AE
287};
288
bf0d5f50 289struct rbd_img_request {
bf0d5f50
AE
290 struct rbd_device *rbd_dev;
291 u64 offset; /* starting image byte offset */
292 u64 length; /* byte count from offset */
0c425248 293 unsigned long flags;
bf0d5f50 294 union {
9849e986 295 u64 snap_id; /* for reads */
bf0d5f50 296 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
297 };
298 union {
299 struct request *rq; /* block request */
300 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 301 };
3d7efd18 302 struct page **copyup_pages;
ebda6408 303 u32 copyup_page_count;
bf0d5f50
AE
304 spinlock_t completion_lock;/* protects next_completion */
305 u32 next_completion;
306 rbd_img_callback_t callback;
55f27e09 307 u64 xferred;/* aggregate bytes transferred */
a5a337d4 308 int result; /* first nonzero obj_request result */
bf0d5f50
AE
309
310 u32 obj_request_count;
311 struct list_head obj_requests; /* rbd_obj_request structs */
312
313 struct kref kref;
314};
315
316#define for_each_obj_request(ireq, oreq) \
ef06f4d3 317 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 318#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 319 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 320#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 321 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 322
99d16943
ID
323enum rbd_watch_state {
324 RBD_WATCH_STATE_UNREGISTERED,
325 RBD_WATCH_STATE_REGISTERED,
326 RBD_WATCH_STATE_ERROR,
327};
328
ed95b21a
ID
329enum rbd_lock_state {
330 RBD_LOCK_STATE_UNLOCKED,
331 RBD_LOCK_STATE_LOCKED,
332 RBD_LOCK_STATE_RELEASING,
333};
334
335/* WatchNotify::ClientId */
336struct rbd_client_id {
337 u64 gid;
338 u64 handle;
339};
340
f84344f3 341struct rbd_mapping {
99c1f08f 342 u64 size;
34b13184 343 u64 features;
f84344f3
AE
344 bool read_only;
345};
346
602adf40
YS
347/*
348 * a single device
349 */
350struct rbd_device {
de71a297 351 int dev_id; /* blkdev unique id */
602adf40
YS
352
353 int major; /* blkdev assigned major */
dd82fff1 354 int minor;
602adf40 355 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 356
a30b71b9 357 u32 image_format; /* Either 1 or 2 */
602adf40
YS
358 struct rbd_client *rbd_client;
359
360 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
361
b82d167b 362 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
363
364 struct rbd_image_header header;
b82d167b 365 unsigned long flags; /* possibly lock protected */
0d7dbfce 366 struct rbd_spec *spec;
d147543d 367 struct rbd_options *opts;
0d6d1e9c 368 char *config_info; /* add{,_single_major} string */
602adf40 369
c41d13a3 370 struct ceph_object_id header_oid;
922dab61 371 struct ceph_object_locator header_oloc;
971f839a 372
1643dfa4 373 struct ceph_file_layout layout; /* used for all rbd requests */
0903e875 374
99d16943
ID
375 struct mutex watch_mutex;
376 enum rbd_watch_state watch_state;
922dab61 377 struct ceph_osd_linger_request *watch_handle;
99d16943
ID
378 u64 watch_cookie;
379 struct delayed_work watch_dwork;
59c2be1e 380
ed95b21a
ID
381 struct rw_semaphore lock_rwsem;
382 enum rbd_lock_state lock_state;
cbbfb0ff 383 char lock_cookie[32];
ed95b21a
ID
384 struct rbd_client_id owner_cid;
385 struct work_struct acquired_lock_work;
386 struct work_struct released_lock_work;
387 struct delayed_work lock_dwork;
388 struct work_struct unlock_work;
389 wait_queue_head_t lock_waitq;
390
1643dfa4 391 struct workqueue_struct *task_wq;
59c2be1e 392
86b00e0d
AE
393 struct rbd_spec *parent_spec;
394 u64 parent_overlap;
a2acd00e 395 atomic_t parent_ref;
2f82ee54 396 struct rbd_device *parent;
86b00e0d 397
7ad18afa
CH
398 /* Block layer tags. */
399 struct blk_mq_tag_set tag_set;
400
c666601a
JD
401 /* protects updating the header */
402 struct rw_semaphore header_rwsem;
f84344f3
AE
403
404 struct rbd_mapping mapping;
602adf40
YS
405
406 struct list_head node;
dfc5606d 407
dfc5606d
YS
408 /* sysfs related */
409 struct device dev;
b82d167b 410 unsigned long open_count; /* protected by lock */
dfc5606d
YS
411};
412
b82d167b 413/*
87c0fded
ID
414 * Flag bits for rbd_dev->flags:
415 * - REMOVING (which is coupled with rbd_dev->open_count) is protected
416 * by rbd_dev->lock
417 * - BLACKLISTED is protected by rbd_dev->lock_rwsem
b82d167b 418 */
6d292906
AE
419enum rbd_dev_flags {
420 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 421 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
87c0fded 422 RBD_DEV_FLAG_BLACKLISTED, /* our ceph_client is blacklisted */
6d292906
AE
423};
424
cfbf6377 425static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 426
602adf40 427static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
428static DEFINE_SPINLOCK(rbd_dev_list_lock);
429
432b8587
AE
430static LIST_HEAD(rbd_client_list); /* clients */
431static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 432
78c2a44a
AE
433/* Slab caches for frequently-allocated structures */
434
1c2a9dfe 435static struct kmem_cache *rbd_img_request_cache;
868311b1 436static struct kmem_cache *rbd_obj_request_cache;
1c2a9dfe 437
f856dc36
N
438static struct bio_set *rbd_bio_clone;
439
9b60e70b 440static int rbd_major;
f8a22fc2
ID
441static DEFINE_IDA(rbd_dev_id_ida);
442
f5ee37bd
ID
443static struct workqueue_struct *rbd_wq;
444
9b60e70b
ID
445/*
446 * Default to false for now, as single-major requires >= 0.75 version of
447 * userspace rbd utility.
448 */
449static bool single_major = false;
450module_param(single_major, bool, S_IRUGO);
451MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
452
3d7efd18
AE
453static int rbd_img_request_submit(struct rbd_img_request *img_request);
454
f0f8cef5
AE
455static ssize_t rbd_add(struct bus_type *bus, const char *buf,
456 size_t count);
457static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
458 size_t count);
9b60e70b
ID
459static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
460 size_t count);
461static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
462 size_t count);
6d69bb53 463static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth);
a2acd00e 464static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 465
9b60e70b
ID
466static int rbd_dev_id_to_minor(int dev_id)
467{
7e513d43 468 return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
469}
470
471static int minor_to_rbd_dev_id(int minor)
472{
7e513d43 473 return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
9b60e70b
ID
474}
475
ed95b21a
ID
476static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
477{
478 return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
479 rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
480}
481
482static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
483{
484 bool is_lock_owner;
485
486 down_read(&rbd_dev->lock_rwsem);
487 is_lock_owner = __rbd_is_lock_owner(rbd_dev);
488 up_read(&rbd_dev->lock_rwsem);
489 return is_lock_owner;
490}
491
8767b293
ID
492static ssize_t rbd_supported_features_show(struct bus_type *bus, char *buf)
493{
494 return sprintf(buf, "0x%llx\n", RBD_FEATURES_SUPPORTED);
495}
496
b15a21dd
GKH
497static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
498static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
9b60e70b
ID
499static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
500static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
8767b293 501static BUS_ATTR(supported_features, S_IRUGO, rbd_supported_features_show, NULL);
b15a21dd
GKH
502
503static struct attribute *rbd_bus_attrs[] = {
504 &bus_attr_add.attr,
505 &bus_attr_remove.attr,
9b60e70b
ID
506 &bus_attr_add_single_major.attr,
507 &bus_attr_remove_single_major.attr,
8767b293 508 &bus_attr_supported_features.attr,
b15a21dd 509 NULL,
f0f8cef5 510};
92c76dc0
ID
511
512static umode_t rbd_bus_is_visible(struct kobject *kobj,
513 struct attribute *attr, int index)
514{
9b60e70b
ID
515 if (!single_major &&
516 (attr == &bus_attr_add_single_major.attr ||
517 attr == &bus_attr_remove_single_major.attr))
518 return 0;
519
92c76dc0
ID
520 return attr->mode;
521}
522
523static const struct attribute_group rbd_bus_group = {
524 .attrs = rbd_bus_attrs,
525 .is_visible = rbd_bus_is_visible,
526};
527__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
528
529static struct bus_type rbd_bus_type = {
530 .name = "rbd",
b15a21dd 531 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
532};
533
534static void rbd_root_dev_release(struct device *dev)
535{
536}
537
538static struct device rbd_root_dev = {
539 .init_name = "rbd",
540 .release = rbd_root_dev_release,
541};
542
06ecc6cb
AE
543static __printf(2, 3)
544void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
545{
546 struct va_format vaf;
547 va_list args;
548
549 va_start(args, fmt);
550 vaf.fmt = fmt;
551 vaf.va = &args;
552
553 if (!rbd_dev)
554 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
555 else if (rbd_dev->disk)
556 printk(KERN_WARNING "%s: %s: %pV\n",
557 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
558 else if (rbd_dev->spec && rbd_dev->spec->image_name)
559 printk(KERN_WARNING "%s: image %s: %pV\n",
560 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
561 else if (rbd_dev->spec && rbd_dev->spec->image_id)
562 printk(KERN_WARNING "%s: id %s: %pV\n",
563 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
564 else /* punt */
565 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
566 RBD_DRV_NAME, rbd_dev, &vaf);
567 va_end(args);
568}
569
aafb230e
AE
570#ifdef RBD_DEBUG
571#define rbd_assert(expr) \
572 if (unlikely(!(expr))) { \
573 printk(KERN_ERR "\nAssertion failure in %s() " \
574 "at line %d:\n\n" \
575 "\trbd_assert(%s);\n\n", \
576 __func__, __LINE__, #expr); \
577 BUG(); \
578 }
579#else /* !RBD_DEBUG */
580# define rbd_assert(expr) ((void) 0)
581#endif /* !RBD_DEBUG */
dfc5606d 582
2761713d 583static void rbd_osd_copyup_callback(struct rbd_obj_request *obj_request);
b454e36d 584static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
585static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
586static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 587
cc4a38bd 588static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7 589static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
a720ae09 590static int rbd_dev_header_info(struct rbd_device *rbd_dev);
e8f59b59 591static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
54cac61f
AE
592static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
593 u64 snap_id);
2ad3d716
AE
594static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
595 u8 *order, u64 *snap_size);
596static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
597 u64 *snap_features);
59c2be1e 598
602adf40
YS
599static int rbd_open(struct block_device *bdev, fmode_t mode)
600{
f0f8cef5 601 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 602 bool removing = false;
602adf40 603
f84344f3 604 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
605 return -EROFS;
606
a14ea269 607 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
608 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
609 removing = true;
610 else
611 rbd_dev->open_count++;
a14ea269 612 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
613 if (removing)
614 return -ENOENT;
615
c3e946ce 616 (void) get_device(&rbd_dev->dev);
340c7a2b 617
602adf40
YS
618 return 0;
619}
620
db2a144b 621static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
622{
623 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
624 unsigned long open_count_before;
625
a14ea269 626 spin_lock_irq(&rbd_dev->lock);
b82d167b 627 open_count_before = rbd_dev->open_count--;
a14ea269 628 spin_unlock_irq(&rbd_dev->lock);
b82d167b 629 rbd_assert(open_count_before > 0);
dfc5606d 630
c3e946ce 631 put_device(&rbd_dev->dev);
dfc5606d
YS
632}
633
131fd9f6
GZ
634static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
635{
77f33c03 636 int ret = 0;
131fd9f6
GZ
637 int val;
638 bool ro;
77f33c03 639 bool ro_changed = false;
131fd9f6 640
77f33c03 641 /* get_user() may sleep, so call it before taking rbd_dev->lock */
131fd9f6
GZ
642 if (get_user(val, (int __user *)(arg)))
643 return -EFAULT;
644
645 ro = val ? true : false;
646 /* Snapshot doesn't allow to write*/
647 if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
648 return -EROFS;
649
77f33c03
JD
650 spin_lock_irq(&rbd_dev->lock);
651 /* prevent others open this device */
652 if (rbd_dev->open_count > 1) {
653 ret = -EBUSY;
654 goto out;
655 }
656
131fd9f6
GZ
657 if (rbd_dev->mapping.read_only != ro) {
658 rbd_dev->mapping.read_only = ro;
77f33c03 659 ro_changed = true;
131fd9f6
GZ
660 }
661
77f33c03
JD
662out:
663 spin_unlock_irq(&rbd_dev->lock);
664 /* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
665 if (ret == 0 && ro_changed)
666 set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
667
668 return ret;
131fd9f6
GZ
669}
670
671static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
672 unsigned int cmd, unsigned long arg)
673{
674 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
675 int ret = 0;
676
131fd9f6
GZ
677 switch (cmd) {
678 case BLKROSET:
679 ret = rbd_ioctl_set_ro(rbd_dev, arg);
680 break;
681 default:
682 ret = -ENOTTY;
683 }
684
131fd9f6
GZ
685 return ret;
686}
687
688#ifdef CONFIG_COMPAT
689static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
690 unsigned int cmd, unsigned long arg)
691{
692 return rbd_ioctl(bdev, mode, cmd, arg);
693}
694#endif /* CONFIG_COMPAT */
695
602adf40
YS
696static const struct block_device_operations rbd_bd_ops = {
697 .owner = THIS_MODULE,
698 .open = rbd_open,
dfc5606d 699 .release = rbd_release,
131fd9f6
GZ
700 .ioctl = rbd_ioctl,
701#ifdef CONFIG_COMPAT
702 .compat_ioctl = rbd_compat_ioctl,
703#endif
602adf40
YS
704};
705
706/*
7262cfca 707 * Initialize an rbd client instance. Success or not, this function
cfbf6377 708 * consumes ceph_opts. Caller holds client_mutex.
602adf40 709 */
f8c38929 710static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
711{
712 struct rbd_client *rbdc;
713 int ret = -ENOMEM;
714
37206ee5 715 dout("%s:\n", __func__);
602adf40
YS
716 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
717 if (!rbdc)
718 goto out_opt;
719
720 kref_init(&rbdc->kref);
721 INIT_LIST_HEAD(&rbdc->node);
722
74da4a0f 723 rbdc->client = ceph_create_client(ceph_opts, rbdc);
602adf40 724 if (IS_ERR(rbdc->client))
08f75463 725 goto out_rbdc;
43ae4701 726 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
727
728 ret = ceph_open_session(rbdc->client);
729 if (ret < 0)
08f75463 730 goto out_client;
602adf40 731
432b8587 732 spin_lock(&rbd_client_list_lock);
602adf40 733 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 734 spin_unlock(&rbd_client_list_lock);
602adf40 735
37206ee5 736 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 737
602adf40 738 return rbdc;
08f75463 739out_client:
602adf40 740 ceph_destroy_client(rbdc->client);
08f75463 741out_rbdc:
602adf40
YS
742 kfree(rbdc);
743out_opt:
43ae4701
AE
744 if (ceph_opts)
745 ceph_destroy_options(ceph_opts);
37206ee5
AE
746 dout("%s: error %d\n", __func__, ret);
747
28f259b7 748 return ERR_PTR(ret);
602adf40
YS
749}
750
2f82ee54
AE
751static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
752{
753 kref_get(&rbdc->kref);
754
755 return rbdc;
756}
757
602adf40 758/*
1f7ba331
AE
759 * Find a ceph client with specific addr and configuration. If
760 * found, bump its reference count.
602adf40 761 */
1f7ba331 762static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
763{
764 struct rbd_client *client_node;
1f7ba331 765 bool found = false;
602adf40 766
43ae4701 767 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
768 return NULL;
769
1f7ba331
AE
770 spin_lock(&rbd_client_list_lock);
771 list_for_each_entry(client_node, &rbd_client_list, node) {
772 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
773 __rbd_get_client(client_node);
774
1f7ba331
AE
775 found = true;
776 break;
777 }
778 }
779 spin_unlock(&rbd_client_list_lock);
780
781 return found ? client_node : NULL;
602adf40
YS
782}
783
59c2be1e 784/*
210c104c 785 * (Per device) rbd map options
59c2be1e
YS
786 */
787enum {
b5584180 788 Opt_queue_depth,
59c2be1e
YS
789 Opt_last_int,
790 /* int args above */
791 Opt_last_string,
792 /* string args above */
cc0538b6
AE
793 Opt_read_only,
794 Opt_read_write,
80de1912 795 Opt_lock_on_read,
e010dd0a 796 Opt_exclusive,
210c104c 797 Opt_err
59c2be1e
YS
798};
799
43ae4701 800static match_table_t rbd_opts_tokens = {
b5584180 801 {Opt_queue_depth, "queue_depth=%d"},
59c2be1e
YS
802 /* int args above */
803 /* string args above */
be466c1c 804 {Opt_read_only, "read_only"},
cc0538b6
AE
805 {Opt_read_only, "ro"}, /* Alternate spelling */
806 {Opt_read_write, "read_write"},
807 {Opt_read_write, "rw"}, /* Alternate spelling */
80de1912 808 {Opt_lock_on_read, "lock_on_read"},
e010dd0a 809 {Opt_exclusive, "exclusive"},
210c104c 810 {Opt_err, NULL}
59c2be1e
YS
811};
812
98571b5a 813struct rbd_options {
b5584180 814 int queue_depth;
98571b5a 815 bool read_only;
80de1912 816 bool lock_on_read;
e010dd0a 817 bool exclusive;
98571b5a
AE
818};
819
b5584180 820#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
98571b5a 821#define RBD_READ_ONLY_DEFAULT false
80de1912 822#define RBD_LOCK_ON_READ_DEFAULT false
e010dd0a 823#define RBD_EXCLUSIVE_DEFAULT false
98571b5a 824
59c2be1e
YS
825static int parse_rbd_opts_token(char *c, void *private)
826{
43ae4701 827 struct rbd_options *rbd_opts = private;
59c2be1e
YS
828 substring_t argstr[MAX_OPT_ARGS];
829 int token, intval, ret;
830
43ae4701 831 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
832 if (token < Opt_last_int) {
833 ret = match_int(&argstr[0], &intval);
834 if (ret < 0) {
210c104c 835 pr_err("bad mount option arg (not int) at '%s'\n", c);
59c2be1e
YS
836 return ret;
837 }
838 dout("got int token %d val %d\n", token, intval);
839 } else if (token > Opt_last_int && token < Opt_last_string) {
210c104c 840 dout("got string token %d val %s\n", token, argstr[0].from);
59c2be1e
YS
841 } else {
842 dout("got token %d\n", token);
843 }
844
845 switch (token) {
b5584180
ID
846 case Opt_queue_depth:
847 if (intval < 1) {
848 pr_err("queue_depth out of range\n");
849 return -EINVAL;
850 }
851 rbd_opts->queue_depth = intval;
852 break;
cc0538b6
AE
853 case Opt_read_only:
854 rbd_opts->read_only = true;
855 break;
856 case Opt_read_write:
857 rbd_opts->read_only = false;
858 break;
80de1912
ID
859 case Opt_lock_on_read:
860 rbd_opts->lock_on_read = true;
861 break;
e010dd0a
ID
862 case Opt_exclusive:
863 rbd_opts->exclusive = true;
864 break;
59c2be1e 865 default:
210c104c
ID
866 /* libceph prints "bad option" msg */
867 return -EINVAL;
59c2be1e 868 }
210c104c 869
59c2be1e
YS
870 return 0;
871}
872
6d2940c8
GZ
873static char* obj_op_name(enum obj_operation_type op_type)
874{
875 switch (op_type) {
876 case OBJ_OP_READ:
877 return "read";
878 case OBJ_OP_WRITE:
879 return "write";
90e98c52
GZ
880 case OBJ_OP_DISCARD:
881 return "discard";
6d2940c8
GZ
882 default:
883 return "???";
884 }
885}
886
602adf40
YS
887/*
888 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
889 * not exist create it. Either way, ceph_opts is consumed by this
890 * function.
602adf40 891 */
9d3997fd 892static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 893{
f8c38929 894 struct rbd_client *rbdc;
59c2be1e 895
cfbf6377 896 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 897 rbdc = rbd_client_find(ceph_opts);
9d3997fd 898 if (rbdc) /* using an existing client */
43ae4701 899 ceph_destroy_options(ceph_opts);
9d3997fd 900 else
f8c38929 901 rbdc = rbd_client_create(ceph_opts);
cfbf6377 902 mutex_unlock(&client_mutex);
602adf40 903
9d3997fd 904 return rbdc;
602adf40
YS
905}
906
907/*
908 * Destroy ceph client
d23a4b3f 909 *
432b8587 910 * Caller must hold rbd_client_list_lock.
602adf40
YS
911 */
912static void rbd_client_release(struct kref *kref)
913{
914 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
915
37206ee5 916 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 917 spin_lock(&rbd_client_list_lock);
602adf40 918 list_del(&rbdc->node);
cd9d9f5d 919 spin_unlock(&rbd_client_list_lock);
602adf40
YS
920
921 ceph_destroy_client(rbdc->client);
922 kfree(rbdc);
923}
924
925/*
926 * Drop reference to ceph client node. If it's not referenced anymore, release
927 * it.
928 */
9d3997fd 929static void rbd_put_client(struct rbd_client *rbdc)
602adf40 930{
c53d5893
AE
931 if (rbdc)
932 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
933}
934
a30b71b9
AE
935static bool rbd_image_format_valid(u32 image_format)
936{
937 return image_format == 1 || image_format == 2;
938}
939
8e94af8e
AE
940static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
941{
103a150f
AE
942 size_t size;
943 u32 snap_count;
944
945 /* The header has to start with the magic rbd header text */
946 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
947 return false;
948
db2388b6
AE
949 /* The bio layer requires at least sector-sized I/O */
950
951 if (ondisk->options.order < SECTOR_SHIFT)
952 return false;
953
954 /* If we use u64 in a few spots we may be able to loosen this */
955
956 if (ondisk->options.order > 8 * sizeof (int) - 1)
957 return false;
958
103a150f
AE
959 /*
960 * The size of a snapshot header has to fit in a size_t, and
961 * that limits the number of snapshots.
962 */
963 snap_count = le32_to_cpu(ondisk->snap_count);
964 size = SIZE_MAX - sizeof (struct ceph_snap_context);
965 if (snap_count > size / sizeof (__le64))
966 return false;
967
968 /*
969 * Not only that, but the size of the entire the snapshot
970 * header must also be representable in a size_t.
971 */
972 size -= snap_count * sizeof (__le64);
973 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
974 return false;
975
976 return true;
8e94af8e
AE
977}
978
5bc3fb17
ID
979/*
980 * returns the size of an object in the image
981 */
982static u32 rbd_obj_bytes(struct rbd_image_header *header)
983{
984 return 1U << header->obj_order;
985}
986
263423f8
ID
987static void rbd_init_layout(struct rbd_device *rbd_dev)
988{
989 if (rbd_dev->header.stripe_unit == 0 ||
990 rbd_dev->header.stripe_count == 0) {
991 rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header);
992 rbd_dev->header.stripe_count = 1;
993 }
994
995 rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit;
996 rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count;
997 rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header);
7e97332e
ID
998 rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ?
999 rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id;
263423f8
ID
1000 RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
1001}
1002
602adf40 1003/*
bb23e37a
AE
1004 * Fill an rbd image header with information from the given format 1
1005 * on-disk header.
602adf40 1006 */
662518b1 1007static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 1008 struct rbd_image_header_ondisk *ondisk)
602adf40 1009{
662518b1 1010 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
1011 bool first_time = header->object_prefix == NULL;
1012 struct ceph_snap_context *snapc;
1013 char *object_prefix = NULL;
1014 char *snap_names = NULL;
1015 u64 *snap_sizes = NULL;
ccece235 1016 u32 snap_count;
bb23e37a 1017 int ret = -ENOMEM;
621901d6 1018 u32 i;
602adf40 1019
bb23e37a 1020 /* Allocate this now to avoid having to handle failure below */
6a52325f 1021
bb23e37a 1022 if (first_time) {
848d796c
ID
1023 object_prefix = kstrndup(ondisk->object_prefix,
1024 sizeof(ondisk->object_prefix),
1025 GFP_KERNEL);
bb23e37a
AE
1026 if (!object_prefix)
1027 return -ENOMEM;
bb23e37a 1028 }
00f1f36f 1029
bb23e37a 1030 /* Allocate the snapshot context and fill it in */
00f1f36f 1031
bb23e37a
AE
1032 snap_count = le32_to_cpu(ondisk->snap_count);
1033 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
1034 if (!snapc)
1035 goto out_err;
1036 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 1037 if (snap_count) {
bb23e37a 1038 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
1039 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
1040
bb23e37a 1041 /* We'll keep a copy of the snapshot names... */
621901d6 1042
bb23e37a
AE
1043 if (snap_names_len > (u64)SIZE_MAX)
1044 goto out_2big;
1045 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
1046 if (!snap_names)
6a52325f
AE
1047 goto out_err;
1048
bb23e37a 1049 /* ...as well as the array of their sizes. */
88a25a5f
ME
1050 snap_sizes = kmalloc_array(snap_count,
1051 sizeof(*header->snap_sizes),
1052 GFP_KERNEL);
bb23e37a 1053 if (!snap_sizes)
6a52325f 1054 goto out_err;
bb23e37a 1055
f785cc1d 1056 /*
bb23e37a
AE
1057 * Copy the names, and fill in each snapshot's id
1058 * and size.
1059 *
99a41ebc 1060 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 1061 * ondisk buffer we're working with has
f785cc1d
AE
1062 * snap_names_len bytes beyond the end of the
1063 * snapshot id array, this memcpy() is safe.
1064 */
bb23e37a
AE
1065 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
1066 snaps = ondisk->snaps;
1067 for (i = 0; i < snap_count; i++) {
1068 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
1069 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
1070 }
602adf40 1071 }
6a52325f 1072
bb23e37a 1073 /* We won't fail any more, fill in the header */
621901d6 1074
bb23e37a
AE
1075 if (first_time) {
1076 header->object_prefix = object_prefix;
1077 header->obj_order = ondisk->options.order;
263423f8 1078 rbd_init_layout(rbd_dev);
602adf40 1079 } else {
662518b1
AE
1080 ceph_put_snap_context(header->snapc);
1081 kfree(header->snap_names);
1082 kfree(header->snap_sizes);
602adf40 1083 }
849b4260 1084
bb23e37a 1085 /* The remaining fields always get updated (when we refresh) */
621901d6 1086
f84344f3 1087 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
1088 header->snapc = snapc;
1089 header->snap_names = snap_names;
1090 header->snap_sizes = snap_sizes;
468521c1 1091
602adf40 1092 return 0;
bb23e37a
AE
1093out_2big:
1094 ret = -EIO;
6a52325f 1095out_err:
bb23e37a
AE
1096 kfree(snap_sizes);
1097 kfree(snap_names);
1098 ceph_put_snap_context(snapc);
1099 kfree(object_prefix);
ccece235 1100
bb23e37a 1101 return ret;
602adf40
YS
1102}
1103
9682fc6d
AE
1104static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1105{
1106 const char *snap_name;
1107
1108 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1109
1110 /* Skip over names until we find the one we are looking for */
1111
1112 snap_name = rbd_dev->header.snap_names;
1113 while (which--)
1114 snap_name += strlen(snap_name) + 1;
1115
1116 return kstrdup(snap_name, GFP_KERNEL);
1117}
1118
30d1cff8
AE
1119/*
1120 * Snapshot id comparison function for use with qsort()/bsearch().
1121 * Note that result is for snapshots in *descending* order.
1122 */
1123static int snapid_compare_reverse(const void *s1, const void *s2)
1124{
1125 u64 snap_id1 = *(u64 *)s1;
1126 u64 snap_id2 = *(u64 *)s2;
1127
1128 if (snap_id1 < snap_id2)
1129 return 1;
1130 return snap_id1 == snap_id2 ? 0 : -1;
1131}
1132
1133/*
1134 * Search a snapshot context to see if the given snapshot id is
1135 * present.
1136 *
1137 * Returns the position of the snapshot id in the array if it's found,
1138 * or BAD_SNAP_INDEX otherwise.
1139 *
1140 * Note: The snapshot array is in kept sorted (by the osd) in
1141 * reverse order, highest snapshot id first.
1142 */
9682fc6d
AE
1143static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1144{
1145 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 1146 u64 *found;
9682fc6d 1147
30d1cff8
AE
1148 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1149 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 1150
30d1cff8 1151 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
1152}
1153
2ad3d716
AE
1154static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1155 u64 snap_id)
9e15b77d 1156{
54cac61f 1157 u32 which;
da6a6b63 1158 const char *snap_name;
9e15b77d 1159
54cac61f
AE
1160 which = rbd_dev_snap_index(rbd_dev, snap_id);
1161 if (which == BAD_SNAP_INDEX)
da6a6b63 1162 return ERR_PTR(-ENOENT);
54cac61f 1163
da6a6b63
JD
1164 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1165 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
1166}
1167
1168static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1169{
9e15b77d
AE
1170 if (snap_id == CEPH_NOSNAP)
1171 return RBD_SNAP_HEAD_NAME;
1172
54cac61f
AE
1173 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1174 if (rbd_dev->image_format == 1)
1175 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 1176
54cac61f 1177 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
1178}
1179
2ad3d716
AE
1180static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1181 u64 *snap_size)
602adf40 1182{
2ad3d716
AE
1183 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1184 if (snap_id == CEPH_NOSNAP) {
1185 *snap_size = rbd_dev->header.image_size;
1186 } else if (rbd_dev->image_format == 1) {
1187 u32 which;
602adf40 1188
2ad3d716
AE
1189 which = rbd_dev_snap_index(rbd_dev, snap_id);
1190 if (which == BAD_SNAP_INDEX)
1191 return -ENOENT;
e86924a8 1192
2ad3d716
AE
1193 *snap_size = rbd_dev->header.snap_sizes[which];
1194 } else {
1195 u64 size = 0;
1196 int ret;
1197
1198 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1199 if (ret)
1200 return ret;
1201
1202 *snap_size = size;
1203 }
1204 return 0;
602adf40
YS
1205}
1206
2ad3d716
AE
1207static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1208 u64 *snap_features)
602adf40 1209{
2ad3d716
AE
1210 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1211 if (snap_id == CEPH_NOSNAP) {
1212 *snap_features = rbd_dev->header.features;
1213 } else if (rbd_dev->image_format == 1) {
1214 *snap_features = 0; /* No features for format 1 */
602adf40 1215 } else {
2ad3d716
AE
1216 u64 features = 0;
1217 int ret;
8b0241f8 1218
2ad3d716
AE
1219 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1220 if (ret)
1221 return ret;
1222
1223 *snap_features = features;
1224 }
1225 return 0;
1226}
1227
1228static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1229{
8f4b7d98 1230 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1231 u64 size = 0;
1232 u64 features = 0;
1233 int ret;
1234
2ad3d716
AE
1235 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1236 if (ret)
1237 return ret;
1238 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1239 if (ret)
1240 return ret;
1241
1242 rbd_dev->mapping.size = size;
1243 rbd_dev->mapping.features = features;
1244
8b0241f8 1245 return 0;
602adf40
YS
1246}
1247
d1cf5788
AE
1248static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1249{
1250 rbd_dev->mapping.size = 0;
1251 rbd_dev->mapping.features = 0;
200a6a8b
AE
1252}
1253
65ccfe21
AE
1254static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1255{
5bc3fb17 1256 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
602adf40 1257
65ccfe21
AE
1258 return offset & (segment_size - 1);
1259}
1260
1261static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1262 u64 offset, u64 length)
1263{
5bc3fb17 1264 u64 segment_size = rbd_obj_bytes(&rbd_dev->header);
65ccfe21
AE
1265
1266 offset &= segment_size - 1;
1267
aafb230e 1268 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1269 if (offset + length > segment_size)
1270 length = segment_size - offset;
1271
1272 return length;
602adf40
YS
1273}
1274
1275/*
1276 * bio helpers
1277 */
1278
1279static void bio_chain_put(struct bio *chain)
1280{
1281 struct bio *tmp;
1282
1283 while (chain) {
1284 tmp = chain;
1285 chain = chain->bi_next;
1286 bio_put(tmp);
1287 }
1288}
1289
1290/*
1291 * zeros a bio chain, starting at specific offset
1292 */
1293static void zero_bio_chain(struct bio *chain, int start_ofs)
1294{
7988613b
KO
1295 struct bio_vec bv;
1296 struct bvec_iter iter;
602adf40
YS
1297 unsigned long flags;
1298 void *buf;
602adf40
YS
1299 int pos = 0;
1300
1301 while (chain) {
7988613b
KO
1302 bio_for_each_segment(bv, chain, iter) {
1303 if (pos + bv.bv_len > start_ofs) {
602adf40 1304 int remainder = max(start_ofs - pos, 0);
7988613b 1305 buf = bvec_kmap_irq(&bv, &flags);
602adf40 1306 memset(buf + remainder, 0,
7988613b
KO
1307 bv.bv_len - remainder);
1308 flush_dcache_page(bv.bv_page);
85b5aaa6 1309 bvec_kunmap_irq(buf, &flags);
602adf40 1310 }
7988613b 1311 pos += bv.bv_len;
602adf40
YS
1312 }
1313
1314 chain = chain->bi_next;
1315 }
1316}
1317
b9434c5b
AE
1318/*
1319 * similar to zero_bio_chain(), zeros data defined by a page array,
1320 * starting at the given byte offset from the start of the array and
1321 * continuing up to the given end offset. The pages array is
1322 * assumed to be big enough to hold all bytes up to the end.
1323 */
1324static void zero_pages(struct page **pages, u64 offset, u64 end)
1325{
1326 struct page **page = &pages[offset >> PAGE_SHIFT];
1327
1328 rbd_assert(end > offset);
1329 rbd_assert(end - offset <= (u64)SIZE_MAX);
1330 while (offset < end) {
1331 size_t page_offset;
1332 size_t length;
1333 unsigned long flags;
1334 void *kaddr;
1335
491205a8
GU
1336 page_offset = offset & ~PAGE_MASK;
1337 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1338 local_irq_save(flags);
1339 kaddr = kmap_atomic(*page);
1340 memset(kaddr + page_offset, 0, length);
e2156054 1341 flush_dcache_page(*page);
b9434c5b
AE
1342 kunmap_atomic(kaddr);
1343 local_irq_restore(flags);
1344
1345 offset += length;
1346 page++;
1347 }
1348}
1349
602adf40 1350/*
f7760dad
AE
1351 * Clone a portion of a bio, starting at the given byte offset
1352 * and continuing for the number of bytes indicated.
602adf40 1353 */
f7760dad
AE
1354static struct bio *bio_clone_range(struct bio *bio_src,
1355 unsigned int offset,
1356 unsigned int len,
1357 gfp_t gfpmask)
602adf40 1358{
f7760dad
AE
1359 struct bio *bio;
1360
f856dc36 1361 bio = bio_clone_fast(bio_src, gfpmask, rbd_bio_clone);
f7760dad
AE
1362 if (!bio)
1363 return NULL; /* ENOMEM */
602adf40 1364
5341a627 1365 bio_advance(bio, offset);
4f024f37 1366 bio->bi_iter.bi_size = len;
f7760dad
AE
1367
1368 return bio;
1369}
1370
1371/*
1372 * Clone a portion of a bio chain, starting at the given byte offset
1373 * into the first bio in the source chain and continuing for the
1374 * number of bytes indicated. The result is another bio chain of
1375 * exactly the given length, or a null pointer on error.
1376 *
1377 * The bio_src and offset parameters are both in-out. On entry they
1378 * refer to the first source bio and the offset into that bio where
1379 * the start of data to be cloned is located.
1380 *
1381 * On return, bio_src is updated to refer to the bio in the source
1382 * chain that contains first un-cloned byte, and *offset will
1383 * contain the offset of that byte within that bio.
1384 */
1385static struct bio *bio_chain_clone_range(struct bio **bio_src,
1386 unsigned int *offset,
1387 unsigned int len,
1388 gfp_t gfpmask)
1389{
1390 struct bio *bi = *bio_src;
1391 unsigned int off = *offset;
1392 struct bio *chain = NULL;
1393 struct bio **end;
1394
1395 /* Build up a chain of clone bios up to the limit */
1396
4f024f37 1397 if (!bi || off >= bi->bi_iter.bi_size || !len)
f7760dad 1398 return NULL; /* Nothing to clone */
602adf40 1399
f7760dad
AE
1400 end = &chain;
1401 while (len) {
1402 unsigned int bi_size;
1403 struct bio *bio;
1404
f5400b7a
AE
1405 if (!bi) {
1406 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1407 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1408 }
4f024f37 1409 bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
f7760dad
AE
1410 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1411 if (!bio)
1412 goto out_err; /* ENOMEM */
1413
1414 *end = bio;
1415 end = &bio->bi_next;
602adf40 1416
f7760dad 1417 off += bi_size;
4f024f37 1418 if (off == bi->bi_iter.bi_size) {
f7760dad
AE
1419 bi = bi->bi_next;
1420 off = 0;
1421 }
1422 len -= bi_size;
1423 }
1424 *bio_src = bi;
1425 *offset = off;
1426
1427 return chain;
1428out_err:
1429 bio_chain_put(chain);
602adf40 1430
602adf40
YS
1431 return NULL;
1432}
1433
926f9b3f
AE
1434/*
1435 * The default/initial value for all object request flags is 0. For
1436 * each flag, once its value is set to 1 it is never reset to 0
1437 * again.
1438 */
57acbaa7 1439static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1440{
57acbaa7 1441 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1442 struct rbd_device *rbd_dev;
1443
57acbaa7 1444 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1445 rbd_warn(rbd_dev, "obj_request %p already marked img_data",
926f9b3f
AE
1446 obj_request);
1447 }
1448}
1449
57acbaa7 1450static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1451{
1452 smp_mb();
57acbaa7 1453 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1454}
1455
57acbaa7 1456static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1457{
57acbaa7
AE
1458 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1459 struct rbd_device *rbd_dev = NULL;
6365d33a 1460
57acbaa7
AE
1461 if (obj_request_img_data_test(obj_request))
1462 rbd_dev = obj_request->img_request->rbd_dev;
9584d508 1463 rbd_warn(rbd_dev, "obj_request %p already marked done",
6365d33a
AE
1464 obj_request);
1465 }
1466}
1467
57acbaa7 1468static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1469{
1470 smp_mb();
57acbaa7 1471 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1472}
1473
5679c59f
AE
1474/*
1475 * This sets the KNOWN flag after (possibly) setting the EXISTS
1476 * flag. The latter is set based on the "exists" value provided.
1477 *
1478 * Note that for our purposes once an object exists it never goes
1479 * away again. It's possible that the response from two existence
1480 * checks are separated by the creation of the target object, and
1481 * the first ("doesn't exist") response arrives *after* the second
1482 * ("does exist"). In that case we ignore the second one.
1483 */
1484static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1485 bool exists)
1486{
1487 if (exists)
1488 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1489 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1490 smp_mb();
1491}
1492
1493static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1494{
1495 smp_mb();
1496 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1497}
1498
1499static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1500{
1501 smp_mb();
1502 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1503}
1504
9638556a
ID
1505static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1506{
1507 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1508
1509 return obj_request->img_offset <
1510 round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1511}
1512
bf0d5f50
AE
1513static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1514{
37206ee5 1515 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1516 kref_read(&obj_request->kref));
bf0d5f50
AE
1517 kref_get(&obj_request->kref);
1518}
1519
1520static void rbd_obj_request_destroy(struct kref *kref);
1521static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1522{
1523 rbd_assert(obj_request != NULL);
37206ee5 1524 dout("%s: obj %p (was %d)\n", __func__, obj_request,
2c935bc5 1525 kref_read(&obj_request->kref));
bf0d5f50
AE
1526 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1527}
1528
0f2d5be7
AE
1529static void rbd_img_request_get(struct rbd_img_request *img_request)
1530{
1531 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1532 kref_read(&img_request->kref));
0f2d5be7
AE
1533 kref_get(&img_request->kref);
1534}
1535
e93f3152
AE
1536static bool img_request_child_test(struct rbd_img_request *img_request);
1537static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1538static void rbd_img_request_destroy(struct kref *kref);
1539static void rbd_img_request_put(struct rbd_img_request *img_request)
1540{
1541 rbd_assert(img_request != NULL);
37206ee5 1542 dout("%s: img %p (was %d)\n", __func__, img_request,
2c935bc5 1543 kref_read(&img_request->kref));
e93f3152
AE
1544 if (img_request_child_test(img_request))
1545 kref_put(&img_request->kref, rbd_parent_request_destroy);
1546 else
1547 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1548}
1549
1550static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1551 struct rbd_obj_request *obj_request)
1552{
25dcf954
AE
1553 rbd_assert(obj_request->img_request == NULL);
1554
b155e86c 1555 /* Image request now owns object's original reference */
bf0d5f50 1556 obj_request->img_request = img_request;
25dcf954 1557 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1558 rbd_assert(!obj_request_img_data_test(obj_request));
1559 obj_request_img_data_set(obj_request);
bf0d5f50 1560 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1561 img_request->obj_request_count++;
1562 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1563 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1564 obj_request->which);
bf0d5f50
AE
1565}
1566
1567static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1568 struct rbd_obj_request *obj_request)
1569{
1570 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1571
37206ee5
AE
1572 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1573 obj_request->which);
bf0d5f50 1574 list_del(&obj_request->links);
25dcf954
AE
1575 rbd_assert(img_request->obj_request_count > 0);
1576 img_request->obj_request_count--;
1577 rbd_assert(obj_request->which == img_request->obj_request_count);
1578 obj_request->which = BAD_WHICH;
6365d33a 1579 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1580 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1581 obj_request->img_request = NULL;
25dcf954 1582 obj_request->callback = NULL;
bf0d5f50
AE
1583 rbd_obj_request_put(obj_request);
1584}
1585
1586static bool obj_request_type_valid(enum obj_request_type type)
1587{
1588 switch (type) {
9969ebc5 1589 case OBJ_REQUEST_NODATA:
bf0d5f50 1590 case OBJ_REQUEST_BIO:
788e2df3 1591 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1592 return true;
1593 default:
1594 return false;
1595 }
1596}
1597
4a17dadc
ID
1598static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
1599
980917fc 1600static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
bf0d5f50 1601{
980917fc
ID
1602 struct ceph_osd_request *osd_req = obj_request->osd_req;
1603
a90bb0c1
ID
1604 dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__,
1605 obj_request, obj_request->object_no, obj_request->offset,
67e2b652 1606 obj_request->length, osd_req);
4a17dadc
ID
1607 if (obj_request_img_data_test(obj_request)) {
1608 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1609 rbd_img_request_get(obj_request->img_request);
1610 }
980917fc 1611 ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
bf0d5f50
AE
1612}
1613
1614static void rbd_img_request_complete(struct rbd_img_request *img_request)
1615{
55f27e09 1616
37206ee5 1617 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1618
1619 /*
1620 * If no error occurred, compute the aggregate transfer
1621 * count for the image request. We could instead use
1622 * atomic64_cmpxchg() to update it as each object request
1623 * completes; not clear which way is better off hand.
1624 */
1625 if (!img_request->result) {
1626 struct rbd_obj_request *obj_request;
1627 u64 xferred = 0;
1628
1629 for_each_obj_request(img_request, obj_request)
1630 xferred += obj_request->xferred;
1631 img_request->xferred = xferred;
1632 }
1633
bf0d5f50
AE
1634 if (img_request->callback)
1635 img_request->callback(img_request);
1636 else
1637 rbd_img_request_put(img_request);
1638}
1639
0c425248
AE
1640/*
1641 * The default/initial value for all image request flags is 0. Each
1642 * is conditionally set to 1 at image request initialization time
1643 * and currently never change thereafter.
1644 */
1645static void img_request_write_set(struct rbd_img_request *img_request)
1646{
1647 set_bit(IMG_REQ_WRITE, &img_request->flags);
1648 smp_mb();
1649}
1650
1651static bool img_request_write_test(struct rbd_img_request *img_request)
1652{
1653 smp_mb();
1654 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1655}
1656
90e98c52
GZ
1657/*
1658 * Set the discard flag when the img_request is an discard request
1659 */
1660static void img_request_discard_set(struct rbd_img_request *img_request)
1661{
1662 set_bit(IMG_REQ_DISCARD, &img_request->flags);
1663 smp_mb();
1664}
1665
1666static bool img_request_discard_test(struct rbd_img_request *img_request)
1667{
1668 smp_mb();
1669 return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1670}
1671
9849e986
AE
1672static void img_request_child_set(struct rbd_img_request *img_request)
1673{
1674 set_bit(IMG_REQ_CHILD, &img_request->flags);
1675 smp_mb();
1676}
1677
e93f3152
AE
1678static void img_request_child_clear(struct rbd_img_request *img_request)
1679{
1680 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1681 smp_mb();
1682}
1683
9849e986
AE
1684static bool img_request_child_test(struct rbd_img_request *img_request)
1685{
1686 smp_mb();
1687 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1688}
1689
d0b2e944
AE
1690static void img_request_layered_set(struct rbd_img_request *img_request)
1691{
1692 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1693 smp_mb();
1694}
1695
a2acd00e
AE
1696static void img_request_layered_clear(struct rbd_img_request *img_request)
1697{
1698 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1699 smp_mb();
1700}
1701
d0b2e944
AE
1702static bool img_request_layered_test(struct rbd_img_request *img_request)
1703{
1704 smp_mb();
1705 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1706}
1707
3b434a2a
JD
1708static enum obj_operation_type
1709rbd_img_request_op_type(struct rbd_img_request *img_request)
1710{
1711 if (img_request_write_test(img_request))
1712 return OBJ_OP_WRITE;
1713 else if (img_request_discard_test(img_request))
1714 return OBJ_OP_DISCARD;
1715 else
1716 return OBJ_OP_READ;
1717}
1718
6e2a4505
AE
1719static void
1720rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1721{
b9434c5b
AE
1722 u64 xferred = obj_request->xferred;
1723 u64 length = obj_request->length;
1724
6e2a4505
AE
1725 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1726 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1727 xferred, length);
6e2a4505 1728 /*
17c1cc1d
JD
1729 * ENOENT means a hole in the image. We zero-fill the entire
1730 * length of the request. A short read also implies zero-fill
1731 * to the end of the request. An error requires the whole
1732 * length of the request to be reported finished with an error
1733 * to the block layer. In each case we update the xferred
1734 * count to indicate the whole request was satisfied.
6e2a4505 1735 */
b9434c5b 1736 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1737 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1738 if (obj_request->type == OBJ_REQUEST_BIO)
1739 zero_bio_chain(obj_request->bio_list, 0);
1740 else
1741 zero_pages(obj_request->pages, 0, length);
6e2a4505 1742 obj_request->result = 0;
b9434c5b
AE
1743 } else if (xferred < length && !obj_request->result) {
1744 if (obj_request->type == OBJ_REQUEST_BIO)
1745 zero_bio_chain(obj_request->bio_list, xferred);
1746 else
1747 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1748 }
17c1cc1d 1749 obj_request->xferred = length;
6e2a4505
AE
1750 obj_request_done_set(obj_request);
1751}
1752
bf0d5f50
AE
1753static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1754{
37206ee5
AE
1755 dout("%s: obj %p cb %p\n", __func__, obj_request,
1756 obj_request->callback);
bf0d5f50
AE
1757 if (obj_request->callback)
1758 obj_request->callback(obj_request);
788e2df3
AE
1759 else
1760 complete_all(&obj_request->completion);
bf0d5f50
AE
1761}
1762
0dcc685e
ID
1763static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
1764{
1765 obj_request->result = err;
1766 obj_request->xferred = 0;
1767 /*
1768 * kludge - mirror rbd_obj_request_submit() to match a put in
1769 * rbd_img_obj_callback()
1770 */
1771 if (obj_request_img_data_test(obj_request)) {
1772 WARN_ON(obj_request->callback != rbd_img_obj_callback);
1773 rbd_img_request_get(obj_request->img_request);
1774 }
1775 obj_request_done_set(obj_request);
1776 rbd_obj_request_complete(obj_request);
1777}
1778
c47f9371 1779static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1780{
57acbaa7 1781 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1782 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1783 bool layered = false;
1784
1785 if (obj_request_img_data_test(obj_request)) {
1786 img_request = obj_request->img_request;
1787 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1788 rbd_dev = img_request->rbd_dev;
57acbaa7 1789 }
8b3e1a56
AE
1790
1791 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1792 obj_request, img_request, obj_request->result,
1793 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1794 if (layered && obj_request->result == -ENOENT &&
1795 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1796 rbd_img_parent_read(obj_request);
1797 else if (img_request)
6e2a4505
AE
1798 rbd_img_obj_request_read_callback(obj_request);
1799 else
1800 obj_request_done_set(obj_request);
bf0d5f50
AE
1801}
1802
c47f9371 1803static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1804{
1b83bef2
SW
1805 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1806 obj_request->result, obj_request->length);
1807 /*
8b3e1a56
AE
1808 * There is no such thing as a successful short write. Set
1809 * it to our originally-requested length.
1b83bef2
SW
1810 */
1811 obj_request->xferred = obj_request->length;
07741308 1812 obj_request_done_set(obj_request);
bf0d5f50
AE
1813}
1814
90e98c52
GZ
1815static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1816{
1817 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1818 obj_request->result, obj_request->length);
1819 /*
1820 * There is no such thing as a successful short discard. Set
1821 * it to our originally-requested length.
1822 */
1823 obj_request->xferred = obj_request->length;
d0265de7
JD
1824 /* discarding a non-existent object is not a problem */
1825 if (obj_request->result == -ENOENT)
1826 obj_request->result = 0;
90e98c52
GZ
1827 obj_request_done_set(obj_request);
1828}
1829
fbfab539
AE
1830/*
1831 * For a simple stat call there's nothing to do. We'll do more if
1832 * this is part of a write sequence for a layered image.
1833 */
c47f9371 1834static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1835{
37206ee5 1836 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1837 obj_request_done_set(obj_request);
1838}
1839
2761713d
ID
1840static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
1841{
1842 dout("%s: obj %p\n", __func__, obj_request);
1843
1844 if (obj_request_img_data_test(obj_request))
1845 rbd_osd_copyup_callback(obj_request);
1846 else
1847 obj_request_done_set(obj_request);
1848}
1849
85e084fe 1850static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
bf0d5f50
AE
1851{
1852 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1853 u16 opcode;
1854
85e084fe 1855 dout("%s: osd_req %p\n", __func__, osd_req);
bf0d5f50 1856 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1857 if (obj_request_img_data_test(obj_request)) {
1858 rbd_assert(obj_request->img_request);
1859 rbd_assert(obj_request->which != BAD_WHICH);
1860 } else {
1861 rbd_assert(obj_request->which == BAD_WHICH);
1862 }
bf0d5f50 1863
1b83bef2
SW
1864 if (osd_req->r_result < 0)
1865 obj_request->result = osd_req->r_result;
bf0d5f50 1866
c47f9371
AE
1867 /*
1868 * We support a 64-bit length, but ultimately it has to be
7ad18afa
CH
1869 * passed to the block layer, which just supports a 32-bit
1870 * length field.
c47f9371 1871 */
7665d85b 1872 obj_request->xferred = osd_req->r_ops[0].outdata_len;
8b3e1a56 1873 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
0ccd5926 1874
79528734 1875 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1876 switch (opcode) {
1877 case CEPH_OSD_OP_READ:
c47f9371 1878 rbd_osd_read_callback(obj_request);
bf0d5f50 1879 break;
0ccd5926 1880 case CEPH_OSD_OP_SETALLOCHINT:
e30b7577
ID
1881 rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
1882 osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
0ccd5926 1883 /* fall through */
bf0d5f50 1884 case CEPH_OSD_OP_WRITE:
e30b7577 1885 case CEPH_OSD_OP_WRITEFULL:
c47f9371 1886 rbd_osd_write_callback(obj_request);
bf0d5f50 1887 break;
fbfab539 1888 case CEPH_OSD_OP_STAT:
c47f9371 1889 rbd_osd_stat_callback(obj_request);
fbfab539 1890 break;
90e98c52
GZ
1891 case CEPH_OSD_OP_DELETE:
1892 case CEPH_OSD_OP_TRUNCATE:
1893 case CEPH_OSD_OP_ZERO:
1894 rbd_osd_discard_callback(obj_request);
1895 break;
36be9a76 1896 case CEPH_OSD_OP_CALL:
2761713d
ID
1897 rbd_osd_call_callback(obj_request);
1898 break;
bf0d5f50 1899 default:
a90bb0c1
ID
1900 rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
1901 obj_request->object_no, opcode);
bf0d5f50
AE
1902 break;
1903 }
1904
07741308 1905 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1906 rbd_obj_request_complete(obj_request);
1907}
1908
9d4df01f 1909static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3 1910{
8c042b0d 1911 struct ceph_osd_request *osd_req = obj_request->osd_req;
430c28c3 1912
7c84883a
ID
1913 rbd_assert(obj_request_img_data_test(obj_request));
1914 osd_req->r_snapid = obj_request->img_request->snap_id;
9d4df01f
AE
1915}
1916
1917static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1918{
9d4df01f 1919 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1920
1134e091 1921 ktime_get_real_ts(&osd_req->r_mtime);
bb873b53 1922 osd_req->r_data_offset = obj_request->offset;
430c28c3
AE
1923}
1924
bc81207e
ID
1925static struct ceph_osd_request *
1926__rbd_osd_req_create(struct rbd_device *rbd_dev,
1927 struct ceph_snap_context *snapc,
1928 int num_ops, unsigned int flags,
1929 struct rbd_obj_request *obj_request)
1930{
1931 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1932 struct ceph_osd_request *req;
a90bb0c1
ID
1933 const char *name_format = rbd_dev->image_format == 1 ?
1934 RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT;
bc81207e
ID
1935
1936 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO);
1937 if (!req)
1938 return NULL;
1939
1940 req->r_flags = flags;
1941 req->r_callback = rbd_osd_req_callback;
1942 req->r_priv = obj_request;
1943
1944 req->r_base_oloc.pool = rbd_dev->layout.pool_id;
a90bb0c1
ID
1945 if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format,
1946 rbd_dev->header.object_prefix, obj_request->object_no))
bc81207e
ID
1947 goto err_req;
1948
1949 if (ceph_osdc_alloc_messages(req, GFP_NOIO))
1950 goto err_req;
1951
1952 return req;
1953
1954err_req:
1955 ceph_osdc_put_request(req);
1956 return NULL;
1957}
1958
0ccd5926
ID
1959/*
1960 * Create an osd request. A read request has one osd op (read).
1961 * A write request has either one (watch) or two (hint+write) osd ops.
1962 * (All rbd data writes are prefixed with an allocation hint op, but
1963 * technically osd watch is a write request, hence this distinction.)
1964 */
bf0d5f50
AE
1965static struct ceph_osd_request *rbd_osd_req_create(
1966 struct rbd_device *rbd_dev,
6d2940c8 1967 enum obj_operation_type op_type,
deb236b3 1968 unsigned int num_ops,
430c28c3 1969 struct rbd_obj_request *obj_request)
bf0d5f50 1970{
bf0d5f50 1971 struct ceph_snap_context *snapc = NULL;
bf0d5f50 1972
90e98c52
GZ
1973 if (obj_request_img_data_test(obj_request) &&
1974 (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
6365d33a 1975 struct rbd_img_request *img_request = obj_request->img_request;
90e98c52
GZ
1976 if (op_type == OBJ_OP_WRITE) {
1977 rbd_assert(img_request_write_test(img_request));
1978 } else {
1979 rbd_assert(img_request_discard_test(img_request));
1980 }
6d2940c8 1981 snapc = img_request->snapc;
bf0d5f50
AE
1982 }
1983
6d2940c8 1984 rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
deb236b3 1985
bc81207e
ID
1986 return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
1987 (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
54ea0046 1988 CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
bf0d5f50
AE
1989}
1990
0eefd470 1991/*
d3246fb0
JD
1992 * Create a copyup osd request based on the information in the object
1993 * request supplied. A copyup request has two or three osd ops, a
1994 * copyup method call, potentially a hint op, and a write or truncate
1995 * or zero op.
0eefd470
AE
1996 */
1997static struct ceph_osd_request *
1998rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1999{
2000 struct rbd_img_request *img_request;
d3246fb0 2001 int num_osd_ops = 3;
0eefd470
AE
2002
2003 rbd_assert(obj_request_img_data_test(obj_request));
2004 img_request = obj_request->img_request;
2005 rbd_assert(img_request);
d3246fb0
JD
2006 rbd_assert(img_request_write_test(img_request) ||
2007 img_request_discard_test(img_request));
0eefd470 2008
d3246fb0
JD
2009 if (img_request_discard_test(img_request))
2010 num_osd_ops = 2;
2011
bc81207e
ID
2012 return __rbd_osd_req_create(img_request->rbd_dev,
2013 img_request->snapc, num_osd_ops,
54ea0046 2014 CEPH_OSD_FLAG_WRITE, obj_request);
0eefd470
AE
2015}
2016
bf0d5f50
AE
2017static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
2018{
2019 ceph_osdc_put_request(osd_req);
2020}
2021
6c696d85
ID
2022static struct rbd_obj_request *
2023rbd_obj_request_create(enum obj_request_type type)
bf0d5f50
AE
2024{
2025 struct rbd_obj_request *obj_request;
bf0d5f50
AE
2026
2027 rbd_assert(obj_request_type_valid(type));
2028
5a60e876 2029 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO);
6c696d85 2030 if (!obj_request)
f907ad55 2031 return NULL;
f907ad55 2032
bf0d5f50
AE
2033 obj_request->which = BAD_WHICH;
2034 obj_request->type = type;
2035 INIT_LIST_HEAD(&obj_request->links);
788e2df3 2036 init_completion(&obj_request->completion);
bf0d5f50
AE
2037 kref_init(&obj_request->kref);
2038
67e2b652 2039 dout("%s %p\n", __func__, obj_request);
bf0d5f50
AE
2040 return obj_request;
2041}
2042
2043static void rbd_obj_request_destroy(struct kref *kref)
2044{
2045 struct rbd_obj_request *obj_request;
2046
2047 obj_request = container_of(kref, struct rbd_obj_request, kref);
2048
37206ee5
AE
2049 dout("%s: obj %p\n", __func__, obj_request);
2050
bf0d5f50
AE
2051 rbd_assert(obj_request->img_request == NULL);
2052 rbd_assert(obj_request->which == BAD_WHICH);
2053
2054 if (obj_request->osd_req)
2055 rbd_osd_req_destroy(obj_request->osd_req);
2056
2057 rbd_assert(obj_request_type_valid(obj_request->type));
2058 switch (obj_request->type) {
9969ebc5
AE
2059 case OBJ_REQUEST_NODATA:
2060 break; /* Nothing to do */
bf0d5f50
AE
2061 case OBJ_REQUEST_BIO:
2062 if (obj_request->bio_list)
2063 bio_chain_put(obj_request->bio_list);
2064 break;
788e2df3 2065 case OBJ_REQUEST_PAGES:
04dc923c
ID
2066 /* img_data requests don't own their page array */
2067 if (obj_request->pages &&
2068 !obj_request_img_data_test(obj_request))
788e2df3
AE
2069 ceph_release_page_vector(obj_request->pages,
2070 obj_request->page_count);
2071 break;
bf0d5f50
AE
2072 }
2073
868311b1 2074 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
2075}
2076
fb65d228
AE
2077/* It's OK to call this for a device with no parent */
2078
2079static void rbd_spec_put(struct rbd_spec *spec);
2080static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2081{
2082 rbd_dev_remove_parent(rbd_dev);
2083 rbd_spec_put(rbd_dev->parent_spec);
2084 rbd_dev->parent_spec = NULL;
2085 rbd_dev->parent_overlap = 0;
2086}
2087
a2acd00e
AE
2088/*
2089 * Parent image reference counting is used to determine when an
2090 * image's parent fields can be safely torn down--after there are no
2091 * more in-flight requests to the parent image. When the last
2092 * reference is dropped, cleaning them up is safe.
2093 */
2094static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2095{
2096 int counter;
2097
2098 if (!rbd_dev->parent_spec)
2099 return;
2100
2101 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2102 if (counter > 0)
2103 return;
2104
2105 /* Last reference; clean up parent data structures */
2106
2107 if (!counter)
2108 rbd_dev_unparent(rbd_dev);
2109 else
9584d508 2110 rbd_warn(rbd_dev, "parent reference underflow");
a2acd00e
AE
2111}
2112
2113/*
2114 * If an image has a non-zero parent overlap, get a reference to its
2115 * parent.
2116 *
2117 * Returns true if the rbd device has a parent with a non-zero
2118 * overlap and a reference for it was successfully taken, or
2119 * false otherwise.
2120 */
2121static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2122{
ae43e9d0 2123 int counter = 0;
a2acd00e
AE
2124
2125 if (!rbd_dev->parent_spec)
2126 return false;
2127
ae43e9d0
ID
2128 down_read(&rbd_dev->header_rwsem);
2129 if (rbd_dev->parent_overlap)
2130 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2131 up_read(&rbd_dev->header_rwsem);
a2acd00e
AE
2132
2133 if (counter < 0)
9584d508 2134 rbd_warn(rbd_dev, "parent reference overflow");
a2acd00e 2135
ae43e9d0 2136 return counter > 0;
a2acd00e
AE
2137}
2138
bf0d5f50
AE
2139/*
2140 * Caller is responsible for filling in the list of object requests
2141 * that comprises the image request, and the Linux request pointer
2142 * (if there is one).
2143 */
cc344fa1
AE
2144static struct rbd_img_request *rbd_img_request_create(
2145 struct rbd_device *rbd_dev,
bf0d5f50 2146 u64 offset, u64 length,
6d2940c8 2147 enum obj_operation_type op_type,
4e752f0a 2148 struct ceph_snap_context *snapc)
bf0d5f50
AE
2149{
2150 struct rbd_img_request *img_request;
bf0d5f50 2151
7a716aac 2152 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
bf0d5f50
AE
2153 if (!img_request)
2154 return NULL;
2155
bf0d5f50
AE
2156 img_request->rq = NULL;
2157 img_request->rbd_dev = rbd_dev;
2158 img_request->offset = offset;
2159 img_request->length = length;
0c425248 2160 img_request->flags = 0;
90e98c52
GZ
2161 if (op_type == OBJ_OP_DISCARD) {
2162 img_request_discard_set(img_request);
2163 img_request->snapc = snapc;
2164 } else if (op_type == OBJ_OP_WRITE) {
0c425248 2165 img_request_write_set(img_request);
4e752f0a 2166 img_request->snapc = snapc;
0c425248 2167 } else {
bf0d5f50 2168 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2169 }
a2acd00e 2170 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2171 img_request_layered_set(img_request);
bf0d5f50
AE
2172 spin_lock_init(&img_request->completion_lock);
2173 img_request->next_completion = 0;
2174 img_request->callback = NULL;
a5a337d4 2175 img_request->result = 0;
bf0d5f50
AE
2176 img_request->obj_request_count = 0;
2177 INIT_LIST_HEAD(&img_request->obj_requests);
2178 kref_init(&img_request->kref);
2179
37206ee5 2180 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
6d2940c8 2181 obj_op_name(op_type), offset, length, img_request);
37206ee5 2182
bf0d5f50
AE
2183 return img_request;
2184}
2185
2186static void rbd_img_request_destroy(struct kref *kref)
2187{
2188 struct rbd_img_request *img_request;
2189 struct rbd_obj_request *obj_request;
2190 struct rbd_obj_request *next_obj_request;
2191
2192 img_request = container_of(kref, struct rbd_img_request, kref);
2193
37206ee5
AE
2194 dout("%s: img %p\n", __func__, img_request);
2195
bf0d5f50
AE
2196 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2197 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2198 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2199
a2acd00e
AE
2200 if (img_request_layered_test(img_request)) {
2201 img_request_layered_clear(img_request);
2202 rbd_dev_parent_put(img_request->rbd_dev);
2203 }
2204
bef95455
JD
2205 if (img_request_write_test(img_request) ||
2206 img_request_discard_test(img_request))
812164f8 2207 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2208
1c2a9dfe 2209 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2210}
2211
e93f3152
AE
2212static struct rbd_img_request *rbd_parent_request_create(
2213 struct rbd_obj_request *obj_request,
2214 u64 img_offset, u64 length)
2215{
2216 struct rbd_img_request *parent_request;
2217 struct rbd_device *rbd_dev;
2218
2219 rbd_assert(obj_request->img_request);
2220 rbd_dev = obj_request->img_request->rbd_dev;
2221
4e752f0a 2222 parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
6d2940c8 2223 length, OBJ_OP_READ, NULL);
e93f3152
AE
2224 if (!parent_request)
2225 return NULL;
2226
2227 img_request_child_set(parent_request);
2228 rbd_obj_request_get(obj_request);
2229 parent_request->obj_request = obj_request;
2230
2231 return parent_request;
2232}
2233
2234static void rbd_parent_request_destroy(struct kref *kref)
2235{
2236 struct rbd_img_request *parent_request;
2237 struct rbd_obj_request *orig_request;
2238
2239 parent_request = container_of(kref, struct rbd_img_request, kref);
2240 orig_request = parent_request->obj_request;
2241
2242 parent_request->obj_request = NULL;
2243 rbd_obj_request_put(orig_request);
2244 img_request_child_clear(parent_request);
2245
2246 rbd_img_request_destroy(kref);
2247}
2248
1217857f
AE
2249static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2250{
6365d33a 2251 struct rbd_img_request *img_request;
1217857f
AE
2252 unsigned int xferred;
2253 int result;
8b3e1a56 2254 bool more;
1217857f 2255
6365d33a
AE
2256 rbd_assert(obj_request_img_data_test(obj_request));
2257 img_request = obj_request->img_request;
2258
1217857f
AE
2259 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2260 xferred = (unsigned int)obj_request->xferred;
2261 result = obj_request->result;
2262 if (result) {
2263 struct rbd_device *rbd_dev = img_request->rbd_dev;
6d2940c8
GZ
2264 enum obj_operation_type op_type;
2265
90e98c52
GZ
2266 if (img_request_discard_test(img_request))
2267 op_type = OBJ_OP_DISCARD;
2268 else if (img_request_write_test(img_request))
2269 op_type = OBJ_OP_WRITE;
2270 else
2271 op_type = OBJ_OP_READ;
1217857f 2272
9584d508 2273 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
6d2940c8
GZ
2274 obj_op_name(op_type), obj_request->length,
2275 obj_request->img_offset, obj_request->offset);
9584d508 2276 rbd_warn(rbd_dev, " result %d xferred %x",
1217857f
AE
2277 result, xferred);
2278 if (!img_request->result)
2279 img_request->result = result;
082a75da
ID
2280 /*
2281 * Need to end I/O on the entire obj_request worth of
2282 * bytes in case of error.
2283 */
2284 xferred = obj_request->length;
1217857f
AE
2285 }
2286
8b3e1a56
AE
2287 if (img_request_child_test(img_request)) {
2288 rbd_assert(img_request->obj_request != NULL);
2289 more = obj_request->which < img_request->obj_request_count - 1;
2290 } else {
2a842aca
CH
2291 blk_status_t status = errno_to_blk_status(result);
2292
8b3e1a56 2293 rbd_assert(img_request->rq != NULL);
7ad18afa 2294
2a842aca 2295 more = blk_update_request(img_request->rq, status, xferred);
7ad18afa 2296 if (!more)
2a842aca 2297 __blk_mq_end_request(img_request->rq, status);
8b3e1a56
AE
2298 }
2299
2300 return more;
1217857f
AE
2301}
2302
2169238d
AE
2303static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2304{
2305 struct rbd_img_request *img_request;
2306 u32 which = obj_request->which;
2307 bool more = true;
2308
6365d33a 2309 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2310 img_request = obj_request->img_request;
2311
2312 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2313 rbd_assert(img_request != NULL);
2169238d
AE
2314 rbd_assert(img_request->obj_request_count > 0);
2315 rbd_assert(which != BAD_WHICH);
2316 rbd_assert(which < img_request->obj_request_count);
2169238d
AE
2317
2318 spin_lock_irq(&img_request->completion_lock);
2319 if (which != img_request->next_completion)
2320 goto out;
2321
2322 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2323 rbd_assert(more);
2324 rbd_assert(which < img_request->obj_request_count);
2325
2326 if (!obj_request_done_test(obj_request))
2327 break;
1217857f 2328 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2329 which++;
2330 }
2331
2332 rbd_assert(more ^ (which == img_request->obj_request_count));
2333 img_request->next_completion = which;
2334out:
2335 spin_unlock_irq(&img_request->completion_lock);
0f2d5be7 2336 rbd_img_request_put(img_request);
2169238d
AE
2337
2338 if (!more)
2339 rbd_img_request_complete(img_request);
2340}
2341
3b434a2a
JD
2342/*
2343 * Add individual osd ops to the given ceph_osd_request and prepare
2344 * them for submission. num_ops is the current number of
2345 * osd operations already to the object request.
2346 */
2347static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2348 struct ceph_osd_request *osd_request,
2349 enum obj_operation_type op_type,
2350 unsigned int num_ops)
2351{
2352 struct rbd_img_request *img_request = obj_request->img_request;
2353 struct rbd_device *rbd_dev = img_request->rbd_dev;
2354 u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2355 u64 offset = obj_request->offset;
2356 u64 length = obj_request->length;
2357 u64 img_end;
2358 u16 opcode;
2359
2360 if (op_type == OBJ_OP_DISCARD) {
d3246fb0
JD
2361 if (!offset && length == object_size &&
2362 (!img_request_layered_test(img_request) ||
2363 !obj_request_overlaps_parent(obj_request))) {
3b434a2a
JD
2364 opcode = CEPH_OSD_OP_DELETE;
2365 } else if ((offset + length == object_size)) {
2366 opcode = CEPH_OSD_OP_TRUNCATE;
2367 } else {
2368 down_read(&rbd_dev->header_rwsem);
2369 img_end = rbd_dev->header.image_size;
2370 up_read(&rbd_dev->header_rwsem);
2371
2372 if (obj_request->img_offset + length == img_end)
2373 opcode = CEPH_OSD_OP_TRUNCATE;
2374 else
2375 opcode = CEPH_OSD_OP_ZERO;
2376 }
2377 } else if (op_type == OBJ_OP_WRITE) {
e30b7577
ID
2378 if (!offset && length == object_size)
2379 opcode = CEPH_OSD_OP_WRITEFULL;
2380 else
2381 opcode = CEPH_OSD_OP_WRITE;
3b434a2a
JD
2382 osd_req_op_alloc_hint_init(osd_request, num_ops,
2383 object_size, object_size);
2384 num_ops++;
2385 } else {
2386 opcode = CEPH_OSD_OP_READ;
2387 }
2388
7e868b6e 2389 if (opcode == CEPH_OSD_OP_DELETE)
144cba14 2390 osd_req_op_init(osd_request, num_ops, opcode, 0);
7e868b6e
ID
2391 else
2392 osd_req_op_extent_init(osd_request, num_ops, opcode,
2393 offset, length, 0, 0);
2394
3b434a2a
JD
2395 if (obj_request->type == OBJ_REQUEST_BIO)
2396 osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2397 obj_request->bio_list, length);
2398 else if (obj_request->type == OBJ_REQUEST_PAGES)
2399 osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2400 obj_request->pages, length,
2401 offset & ~PAGE_MASK, false, false);
2402
2403 /* Discards are also writes */
2404 if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2405 rbd_osd_req_format_write(obj_request);
2406 else
2407 rbd_osd_req_format_read(obj_request);
2408}
2409
f1a4739f
AE
2410/*
2411 * Split up an image request into one or more object requests, each
2412 * to a different object. The "type" parameter indicates whether
2413 * "data_desc" is the pointer to the head of a list of bio
2414 * structures, or the base of a page array. In either case this
2415 * function assumes data_desc describes memory sufficient to hold
2416 * all data described by the image request.
2417 */
2418static int rbd_img_request_fill(struct rbd_img_request *img_request,
2419 enum obj_request_type type,
2420 void *data_desc)
bf0d5f50
AE
2421{
2422 struct rbd_device *rbd_dev = img_request->rbd_dev;
2423 struct rbd_obj_request *obj_request = NULL;
2424 struct rbd_obj_request *next_obj_request;
a158073c 2425 struct bio *bio_list = NULL;
f1a4739f 2426 unsigned int bio_offset = 0;
a158073c 2427 struct page **pages = NULL;
6d2940c8 2428 enum obj_operation_type op_type;
7da22d29 2429 u64 img_offset;
bf0d5f50 2430 u64 resid;
bf0d5f50 2431
f1a4739f
AE
2432 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2433 (int)type, data_desc);
37206ee5 2434
7da22d29 2435 img_offset = img_request->offset;
bf0d5f50 2436 resid = img_request->length;
4dda41d3 2437 rbd_assert(resid > 0);
3b434a2a 2438 op_type = rbd_img_request_op_type(img_request);
f1a4739f
AE
2439
2440 if (type == OBJ_REQUEST_BIO) {
2441 bio_list = data_desc;
4f024f37
KO
2442 rbd_assert(img_offset ==
2443 bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
90e98c52 2444 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2445 pages = data_desc;
2446 }
2447
bf0d5f50 2448 while (resid) {
2fa12320 2449 struct ceph_osd_request *osd_req;
a90bb0c1 2450 u64 object_no = img_offset >> rbd_dev->header.obj_order;
67e2b652
ID
2451 u64 offset = rbd_segment_offset(rbd_dev, img_offset);
2452 u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2453
6c696d85 2454 obj_request = rbd_obj_request_create(type);
bf0d5f50
AE
2455 if (!obj_request)
2456 goto out_unwind;
62054da6 2457
a90bb0c1 2458 obj_request->object_no = object_no;
67e2b652
ID
2459 obj_request->offset = offset;
2460 obj_request->length = length;
2461
03507db6
JD
2462 /*
2463 * set obj_request->img_request before creating the
2464 * osd_request so that it gets the right snapc
2465 */
2466 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2467
f1a4739f
AE
2468 if (type == OBJ_REQUEST_BIO) {
2469 unsigned int clone_size;
2470
2471 rbd_assert(length <= (u64)UINT_MAX);
2472 clone_size = (unsigned int)length;
2473 obj_request->bio_list =
2474 bio_chain_clone_range(&bio_list,
2475 &bio_offset,
2476 clone_size,
2224d879 2477 GFP_NOIO);
f1a4739f 2478 if (!obj_request->bio_list)
62054da6 2479 goto out_unwind;
90e98c52 2480 } else if (type == OBJ_REQUEST_PAGES) {
f1a4739f
AE
2481 unsigned int page_count;
2482
2483 obj_request->pages = pages;
2484 page_count = (u32)calc_pages_for(offset, length);
2485 obj_request->page_count = page_count;
2486 if ((offset + length) & ~PAGE_MASK)
2487 page_count--; /* more on last page */
2488 pages += page_count;
2489 }
bf0d5f50 2490
6d2940c8
GZ
2491 osd_req = rbd_osd_req_create(rbd_dev, op_type,
2492 (op_type == OBJ_OP_WRITE) ? 2 : 1,
2493 obj_request);
2fa12320 2494 if (!osd_req)
62054da6 2495 goto out_unwind;
3b434a2a 2496
2fa12320 2497 obj_request->osd_req = osd_req;
2169238d 2498 obj_request->callback = rbd_img_obj_callback;
3b434a2a 2499 obj_request->img_offset = img_offset;
9d4df01f 2500
3b434a2a 2501 rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
430c28c3 2502
7da22d29 2503 img_offset += length;
bf0d5f50
AE
2504 resid -= length;
2505 }
2506
2507 return 0;
2508
bf0d5f50
AE
2509out_unwind:
2510 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
42dd037c 2511 rbd_img_obj_request_del(img_request, obj_request);
bf0d5f50
AE
2512
2513 return -ENOMEM;
2514}
2515
0eefd470 2516static void
2761713d 2517rbd_osd_copyup_callback(struct rbd_obj_request *obj_request)
0eefd470
AE
2518{
2519 struct rbd_img_request *img_request;
2520 struct rbd_device *rbd_dev;
ebda6408 2521 struct page **pages;
0eefd470
AE
2522 u32 page_count;
2523
2761713d
ID
2524 dout("%s: obj %p\n", __func__, obj_request);
2525
d3246fb0
JD
2526 rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2527 obj_request->type == OBJ_REQUEST_NODATA);
0eefd470
AE
2528 rbd_assert(obj_request_img_data_test(obj_request));
2529 img_request = obj_request->img_request;
2530 rbd_assert(img_request);
2531
2532 rbd_dev = img_request->rbd_dev;
2533 rbd_assert(rbd_dev);
0eefd470 2534
ebda6408
AE
2535 pages = obj_request->copyup_pages;
2536 rbd_assert(pages != NULL);
0eefd470 2537 obj_request->copyup_pages = NULL;
ebda6408
AE
2538 page_count = obj_request->copyup_page_count;
2539 rbd_assert(page_count);
2540 obj_request->copyup_page_count = 0;
2541 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2542
2543 /*
2544 * We want the transfer count to reflect the size of the
2545 * original write request. There is no such thing as a
2546 * successful short write, so if the request was successful
2547 * we can just set it to the originally-requested length.
2548 */
2549 if (!obj_request->result)
2550 obj_request->xferred = obj_request->length;
2551
2761713d 2552 obj_request_done_set(obj_request);
0eefd470
AE
2553}
2554
3d7efd18
AE
2555static void
2556rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2557{
2558 struct rbd_obj_request *orig_request;
0eefd470 2559 struct ceph_osd_request *osd_req;
0eefd470 2560 struct rbd_device *rbd_dev;
3d7efd18 2561 struct page **pages;
d3246fb0 2562 enum obj_operation_type op_type;
ebda6408 2563 u32 page_count;
bbea1c1a 2564 int img_result;
ebda6408 2565 u64 parent_length;
3d7efd18
AE
2566
2567 rbd_assert(img_request_child_test(img_request));
2568
2569 /* First get what we need from the image request */
2570
2571 pages = img_request->copyup_pages;
2572 rbd_assert(pages != NULL);
2573 img_request->copyup_pages = NULL;
ebda6408
AE
2574 page_count = img_request->copyup_page_count;
2575 rbd_assert(page_count);
2576 img_request->copyup_page_count = 0;
3d7efd18
AE
2577
2578 orig_request = img_request->obj_request;
2579 rbd_assert(orig_request != NULL);
b91f09f1 2580 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2581 img_result = img_request->result;
ebda6408 2582 parent_length = img_request->length;
fa355112 2583 rbd_assert(img_result || parent_length == img_request->xferred);
91c6febb 2584 rbd_img_request_put(img_request);
3d7efd18 2585
91c6febb
AE
2586 rbd_assert(orig_request->img_request);
2587 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2588 rbd_assert(rbd_dev);
0eefd470 2589
bbea1c1a
AE
2590 /*
2591 * If the overlap has become 0 (most likely because the
2592 * image has been flattened) we need to free the pages
2593 * and re-submit the original write request.
2594 */
2595 if (!rbd_dev->parent_overlap) {
bbea1c1a 2596 ceph_release_page_vector(pages, page_count);
980917fc
ID
2597 rbd_obj_request_submit(orig_request);
2598 return;
bbea1c1a 2599 }
0eefd470 2600
bbea1c1a 2601 if (img_result)
0eefd470 2602 goto out_err;
0eefd470 2603
8785b1d4
AE
2604 /*
2605 * The original osd request is of no use to use any more.
0ccd5926 2606 * We need a new one that can hold the three ops in a copyup
8785b1d4
AE
2607 * request. Allocate the new copyup osd request for the
2608 * original request, and release the old one.
2609 */
bbea1c1a 2610 img_result = -ENOMEM;
0eefd470
AE
2611 osd_req = rbd_osd_req_create_copyup(orig_request);
2612 if (!osd_req)
2613 goto out_err;
8785b1d4 2614 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2615 orig_request->osd_req = osd_req;
2616 orig_request->copyup_pages = pages;
ebda6408 2617 orig_request->copyup_page_count = page_count;
3d7efd18 2618
0eefd470 2619 /* Initialize the copyup op */
3d7efd18 2620
0eefd470 2621 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2622 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2623 false, false);
3d7efd18 2624
d3246fb0 2625 /* Add the other op(s) */
0eefd470 2626
d3246fb0
JD
2627 op_type = rbd_img_request_op_type(orig_request->img_request);
2628 rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
0eefd470
AE
2629
2630 /* All set, send it off. */
2631
980917fc
ID
2632 rbd_obj_request_submit(orig_request);
2633 return;
0eefd470 2634
0eefd470 2635out_err:
fa355112 2636 ceph_release_page_vector(pages, page_count);
0dcc685e 2637 rbd_obj_request_error(orig_request, img_result);
3d7efd18
AE
2638}
2639
2640/*
2641 * Read from the parent image the range of data that covers the
2642 * entire target of the given object request. This is used for
2643 * satisfying a layered image write request when the target of an
2644 * object request from the image request does not exist.
2645 *
2646 * A page array big enough to hold the returned data is allocated
2647 * and supplied to rbd_img_request_fill() as the "data descriptor."
2648 * When the read completes, this page array will be transferred to
2649 * the original object request for the copyup operation.
2650 *
c2e82414
ID
2651 * If an error occurs, it is recorded as the result of the original
2652 * object request in rbd_img_obj_exists_callback().
3d7efd18
AE
2653 */
2654static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2655{
058aa991 2656 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
3d7efd18 2657 struct rbd_img_request *parent_request = NULL;
3d7efd18
AE
2658 u64 img_offset;
2659 u64 length;
2660 struct page **pages = NULL;
2661 u32 page_count;
2662 int result;
2663
3d7efd18
AE
2664 rbd_assert(rbd_dev->parent != NULL);
2665
2666 /*
2667 * Determine the byte range covered by the object in the
2668 * child image to which the original request was to be sent.
2669 */
2670 img_offset = obj_request->img_offset - obj_request->offset;
5bc3fb17 2671 length = rbd_obj_bytes(&rbd_dev->header);
3d7efd18 2672
a9e8ba2c
AE
2673 /*
2674 * There is no defined parent data beyond the parent
2675 * overlap, so limit what we read at that boundary if
2676 * necessary.
2677 */
2678 if (img_offset + length > rbd_dev->parent_overlap) {
2679 rbd_assert(img_offset < rbd_dev->parent_overlap);
2680 length = rbd_dev->parent_overlap - img_offset;
2681 }
2682
3d7efd18
AE
2683 /*
2684 * Allocate a page array big enough to receive the data read
2685 * from the parent.
2686 */
2687 page_count = (u32)calc_pages_for(0, length);
1e37f2f8 2688 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
3d7efd18
AE
2689 if (IS_ERR(pages)) {
2690 result = PTR_ERR(pages);
2691 pages = NULL;
2692 goto out_err;
2693 }
2694
2695 result = -ENOMEM;
e93f3152
AE
2696 parent_request = rbd_parent_request_create(obj_request,
2697 img_offset, length);
3d7efd18
AE
2698 if (!parent_request)
2699 goto out_err;
3d7efd18
AE
2700
2701 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2702 if (result)
2703 goto out_err;
058aa991 2704
3d7efd18 2705 parent_request->copyup_pages = pages;
ebda6408 2706 parent_request->copyup_page_count = page_count;
3d7efd18 2707 parent_request->callback = rbd_img_obj_parent_read_full_callback;
058aa991 2708
3d7efd18
AE
2709 result = rbd_img_request_submit(parent_request);
2710 if (!result)
2711 return 0;
2712
2713 parent_request->copyup_pages = NULL;
ebda6408 2714 parent_request->copyup_page_count = 0;
3d7efd18
AE
2715 parent_request->obj_request = NULL;
2716 rbd_obj_request_put(obj_request);
2717out_err:
2718 if (pages)
2719 ceph_release_page_vector(pages, page_count);
2720 if (parent_request)
2721 rbd_img_request_put(parent_request);
3d7efd18
AE
2722 return result;
2723}
2724
c5b5ef6c
AE
2725static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2726{
c5b5ef6c 2727 struct rbd_obj_request *orig_request;
638f5abe 2728 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2729 int result;
2730
2731 rbd_assert(!obj_request_img_data_test(obj_request));
2732
2733 /*
2734 * All we need from the object request is the original
2735 * request and the result of the STAT op. Grab those, then
2736 * we're done with the request.
2737 */
2738 orig_request = obj_request->obj_request;
2739 obj_request->obj_request = NULL;
912c317d 2740 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2741 rbd_assert(orig_request);
2742 rbd_assert(orig_request->img_request);
2743
2744 result = obj_request->result;
2745 obj_request->result = 0;
2746
2747 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2748 obj_request, orig_request, result,
2749 obj_request->xferred, obj_request->length);
2750 rbd_obj_request_put(obj_request);
2751
638f5abe
AE
2752 /*
2753 * If the overlap has become 0 (most likely because the
980917fc
ID
2754 * image has been flattened) we need to re-submit the
2755 * original request.
638f5abe
AE
2756 */
2757 rbd_dev = orig_request->img_request->rbd_dev;
2758 if (!rbd_dev->parent_overlap) {
980917fc
ID
2759 rbd_obj_request_submit(orig_request);
2760 return;
638f5abe 2761 }
c5b5ef6c
AE
2762
2763 /*
2764 * Our only purpose here is to determine whether the object
2765 * exists, and we don't want to treat the non-existence as
2766 * an error. If something else comes back, transfer the
2767 * error to the original request and complete it now.
2768 */
2769 if (!result) {
2770 obj_request_existence_set(orig_request, true);
2771 } else if (result == -ENOENT) {
2772 obj_request_existence_set(orig_request, false);
c2e82414
ID
2773 } else {
2774 goto fail_orig_request;
c5b5ef6c
AE
2775 }
2776
2777 /*
2778 * Resubmit the original request now that we have recorded
2779 * whether the target object exists.
2780 */
c2e82414
ID
2781 result = rbd_img_obj_request_submit(orig_request);
2782 if (result)
2783 goto fail_orig_request;
2784
2785 return;
2786
2787fail_orig_request:
0dcc685e 2788 rbd_obj_request_error(orig_request, result);
c5b5ef6c
AE
2789}
2790
2791static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2792{
058aa991 2793 struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
c5b5ef6c 2794 struct rbd_obj_request *stat_request;
710214e3 2795 struct page **pages;
c5b5ef6c
AE
2796 u32 page_count;
2797 size_t size;
2798 int ret;
2799
6c696d85 2800 stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES);
710214e3
ID
2801 if (!stat_request)
2802 return -ENOMEM;
2803
a90bb0c1
ID
2804 stat_request->object_no = obj_request->object_no;
2805
710214e3
ID
2806 stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2807 stat_request);
2808 if (!stat_request->osd_req) {
2809 ret = -ENOMEM;
2810 goto fail_stat_request;
2811 }
2812
c5b5ef6c
AE
2813 /*
2814 * The response data for a STAT call consists of:
2815 * le64 length;
2816 * struct {
2817 * le32 tv_sec;
2818 * le32 tv_nsec;
2819 * } mtime;
2820 */
2821 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2822 page_count = (u32)calc_pages_for(0, size);
1e37f2f8 2823 pages = ceph_alloc_page_vector(page_count, GFP_NOIO);
710214e3
ID
2824 if (IS_ERR(pages)) {
2825 ret = PTR_ERR(pages);
2826 goto fail_stat_request;
2827 }
c5b5ef6c 2828
710214e3
ID
2829 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
2830 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2831 false, false);
c5b5ef6c
AE
2832
2833 rbd_obj_request_get(obj_request);
2834 stat_request->obj_request = obj_request;
2835 stat_request->pages = pages;
2836 stat_request->page_count = page_count;
c5b5ef6c
AE
2837 stat_request->callback = rbd_img_obj_exists_callback;
2838
980917fc
ID
2839 rbd_obj_request_submit(stat_request);
2840 return 0;
c5b5ef6c 2841
710214e3
ID
2842fail_stat_request:
2843 rbd_obj_request_put(stat_request);
c5b5ef6c
AE
2844 return ret;
2845}
2846
70d045f6 2847static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
b454e36d 2848{
058aa991
ID
2849 struct rbd_img_request *img_request = obj_request->img_request;
2850 struct rbd_device *rbd_dev = img_request->rbd_dev;
b454e36d 2851
70d045f6 2852 /* Reads */
1c220881
JD
2853 if (!img_request_write_test(img_request) &&
2854 !img_request_discard_test(img_request))
70d045f6
ID
2855 return true;
2856
2857 /* Non-layered writes */
2858 if (!img_request_layered_test(img_request))
2859 return true;
2860
b454e36d 2861 /*
70d045f6
ID
2862 * Layered writes outside of the parent overlap range don't
2863 * share any data with the parent.
b454e36d 2864 */
70d045f6
ID
2865 if (!obj_request_overlaps_parent(obj_request))
2866 return true;
b454e36d 2867
c622d226
GZ
2868 /*
2869 * Entire-object layered writes - we will overwrite whatever
2870 * parent data there is anyway.
2871 */
2872 if (!obj_request->offset &&
2873 obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2874 return true;
2875
70d045f6
ID
2876 /*
2877 * If the object is known to already exist, its parent data has
2878 * already been copied.
2879 */
2880 if (obj_request_known_test(obj_request) &&
2881 obj_request_exists_test(obj_request))
2882 return true;
2883
2884 return false;
2885}
2886
2887static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2888{
058aa991
ID
2889 rbd_assert(obj_request_img_data_test(obj_request));
2890 rbd_assert(obj_request_type_valid(obj_request->type));
2891 rbd_assert(obj_request->img_request);
b454e36d 2892
70d045f6 2893 if (img_obj_request_simple(obj_request)) {
980917fc
ID
2894 rbd_obj_request_submit(obj_request);
2895 return 0;
b454e36d
AE
2896 }
2897
2898 /*
3d7efd18
AE
2899 * It's a layered write. The target object might exist but
2900 * we may not know that yet. If we know it doesn't exist,
2901 * start by reading the data for the full target object from
2902 * the parent so we can use it for a copyup to the target.
b454e36d 2903 */
70d045f6 2904 if (obj_request_known_test(obj_request))
3d7efd18
AE
2905 return rbd_img_obj_parent_read_full(obj_request);
2906
2907 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2908
2909 return rbd_img_obj_exists_submit(obj_request);
2910}
2911
bf0d5f50
AE
2912static int rbd_img_request_submit(struct rbd_img_request *img_request)
2913{
bf0d5f50 2914 struct rbd_obj_request *obj_request;
46faeed4 2915 struct rbd_obj_request *next_obj_request;
663ae2cc 2916 int ret = 0;
bf0d5f50 2917
37206ee5 2918 dout("%s: img %p\n", __func__, img_request);
bf0d5f50 2919
663ae2cc
ID
2920 rbd_img_request_get(img_request);
2921 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
b454e36d 2922 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50 2923 if (ret)
663ae2cc 2924 goto out_put_ireq;
bf0d5f50
AE
2925 }
2926
663ae2cc
ID
2927out_put_ireq:
2928 rbd_img_request_put(img_request);
2929 return ret;
bf0d5f50 2930}
8b3e1a56
AE
2931
2932static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2933{
2934 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2935 struct rbd_device *rbd_dev;
2936 u64 obj_end;
02c74fba
AE
2937 u64 img_xferred;
2938 int img_result;
8b3e1a56
AE
2939
2940 rbd_assert(img_request_child_test(img_request));
2941
02c74fba
AE
2942 /* First get what we need from the image request and release it */
2943
8b3e1a56 2944 obj_request = img_request->obj_request;
02c74fba
AE
2945 img_xferred = img_request->xferred;
2946 img_result = img_request->result;
2947 rbd_img_request_put(img_request);
2948
2949 /*
2950 * If the overlap has become 0 (most likely because the
2951 * image has been flattened) we need to re-submit the
2952 * original request.
2953 */
a9e8ba2c
AE
2954 rbd_assert(obj_request);
2955 rbd_assert(obj_request->img_request);
02c74fba
AE
2956 rbd_dev = obj_request->img_request->rbd_dev;
2957 if (!rbd_dev->parent_overlap) {
980917fc
ID
2958 rbd_obj_request_submit(obj_request);
2959 return;
02c74fba 2960 }
a9e8ba2c 2961
02c74fba 2962 obj_request->result = img_result;
a9e8ba2c
AE
2963 if (obj_request->result)
2964 goto out;
2965
2966 /*
2967 * We need to zero anything beyond the parent overlap
2968 * boundary. Since rbd_img_obj_request_read_callback()
2969 * will zero anything beyond the end of a short read, an
2970 * easy way to do this is to pretend the data from the
2971 * parent came up short--ending at the overlap boundary.
2972 */
2973 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2974 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2975 if (obj_end > rbd_dev->parent_overlap) {
2976 u64 xferred = 0;
2977
2978 if (obj_request->img_offset < rbd_dev->parent_overlap)
2979 xferred = rbd_dev->parent_overlap -
2980 obj_request->img_offset;
8b3e1a56 2981
02c74fba 2982 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2983 } else {
02c74fba 2984 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2985 }
2986out:
8b3e1a56
AE
2987 rbd_img_obj_request_read_callback(obj_request);
2988 rbd_obj_request_complete(obj_request);
2989}
2990
2991static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2992{
8b3e1a56
AE
2993 struct rbd_img_request *img_request;
2994 int result;
2995
2996 rbd_assert(obj_request_img_data_test(obj_request));
2997 rbd_assert(obj_request->img_request != NULL);
2998 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2999 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 3000
8b3e1a56 3001 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 3002 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 3003 obj_request->img_offset,
e93f3152 3004 obj_request->length);
8b3e1a56
AE
3005 result = -ENOMEM;
3006 if (!img_request)
3007 goto out_err;
3008
5b2ab72d
AE
3009 if (obj_request->type == OBJ_REQUEST_BIO)
3010 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3011 obj_request->bio_list);
3012 else
3013 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3014 obj_request->pages);
8b3e1a56
AE
3015 if (result)
3016 goto out_err;
3017
3018 img_request->callback = rbd_img_parent_read_callback;
3019 result = rbd_img_request_submit(img_request);
3020 if (result)
3021 goto out_err;
3022
3023 return;
3024out_err:
3025 if (img_request)
3026 rbd_img_request_put(img_request);
3027 obj_request->result = result;
3028 obj_request->xferred = 0;
3029 obj_request_done_set(obj_request);
3030}
bf0d5f50 3031
ed95b21a 3032static const struct rbd_client_id rbd_empty_cid;
b8d70035 3033
ed95b21a
ID
3034static bool rbd_cid_equal(const struct rbd_client_id *lhs,
3035 const struct rbd_client_id *rhs)
3036{
3037 return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
3038}
3039
3040static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
3041{
3042 struct rbd_client_id cid;
3043
3044 mutex_lock(&rbd_dev->watch_mutex);
3045 cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
3046 cid.handle = rbd_dev->watch_cookie;
3047 mutex_unlock(&rbd_dev->watch_mutex);
3048 return cid;
3049}
3050
3051/*
3052 * lock_rwsem must be held for write
3053 */
3054static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
3055 const struct rbd_client_id *cid)
3056{
3057 dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
3058 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
3059 cid->gid, cid->handle);
3060 rbd_dev->owner_cid = *cid; /* struct */
3061}
3062
3063static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
3064{
3065 mutex_lock(&rbd_dev->watch_mutex);
3066 sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
3067 mutex_unlock(&rbd_dev->watch_mutex);
3068}
3069
e068cdee
FM
3070static void __rbd_lock(struct rbd_device *rbd_dev, const char *cookie)
3071{
3072 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3073
3074 strcpy(rbd_dev->lock_cookie, cookie);
3075 rbd_set_owner_cid(rbd_dev, &cid);
3076 queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
3077}
3078
ed95b21a
ID
3079/*
3080 * lock_rwsem must be held for write
3081 */
3082static int rbd_lock(struct rbd_device *rbd_dev)
b8d70035 3083{
922dab61 3084 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a 3085 char cookie[32];
e627db08 3086 int ret;
b8d70035 3087
cbbfb0ff
ID
3088 WARN_ON(__rbd_is_lock_owner(rbd_dev) ||
3089 rbd_dev->lock_cookie[0] != '\0');
52bb1f9b 3090
ed95b21a
ID
3091 format_lock_cookie(rbd_dev, cookie);
3092 ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
3093 RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
3094 RBD_LOCK_TAG, "", 0);
e627db08 3095 if (ret)
ed95b21a 3096 return ret;
b8d70035 3097
ed95b21a 3098 rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
e068cdee 3099 __rbd_lock(rbd_dev, cookie);
ed95b21a 3100 return 0;
b8d70035
AE
3101}
3102
ed95b21a
ID
3103/*
3104 * lock_rwsem must be held for write
3105 */
bbead745 3106static void rbd_unlock(struct rbd_device *rbd_dev)
bb040aa0 3107{
922dab61 3108 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
bb040aa0
ID
3109 int ret;
3110
cbbfb0ff
ID
3111 WARN_ON(!__rbd_is_lock_owner(rbd_dev) ||
3112 rbd_dev->lock_cookie[0] == '\0');
bb040aa0 3113
ed95b21a 3114 ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
cbbfb0ff 3115 RBD_LOCK_NAME, rbd_dev->lock_cookie);
bbead745
ID
3116 if (ret && ret != -ENOENT)
3117 rbd_warn(rbd_dev, "failed to unlock: %d", ret);
bb040aa0 3118
bbead745
ID
3119 /* treat errors as the image is unlocked */
3120 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
cbbfb0ff 3121 rbd_dev->lock_cookie[0] = '\0';
ed95b21a
ID
3122 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3123 queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
bb040aa0
ID
3124}
3125
ed95b21a
ID
3126static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
3127 enum rbd_notify_op notify_op,
3128 struct page ***preply_pages,
3129 size_t *preply_len)
9969ebc5
AE
3130{
3131 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
ed95b21a
ID
3132 struct rbd_client_id cid = rbd_get_cid(rbd_dev);
3133 int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
3134 char buf[buf_size];
3135 void *p = buf;
9969ebc5 3136
ed95b21a 3137 dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
9969ebc5 3138
ed95b21a
ID
3139 /* encode *LockPayload NotifyMessage (op + ClientId) */
3140 ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
3141 ceph_encode_32(&p, notify_op);
3142 ceph_encode_64(&p, cid.gid);
3143 ceph_encode_64(&p, cid.handle);
8eb87565 3144
ed95b21a
ID
3145 return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
3146 &rbd_dev->header_oloc, buf, buf_size,
3147 RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
b30a01f2
ID
3148}
3149
ed95b21a
ID
3150static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
3151 enum rbd_notify_op notify_op)
b30a01f2 3152{
ed95b21a
ID
3153 struct page **reply_pages;
3154 size_t reply_len;
b30a01f2 3155
ed95b21a
ID
3156 __rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
3157 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3158}
b30a01f2 3159
ed95b21a
ID
3160static void rbd_notify_acquired_lock(struct work_struct *work)
3161{
3162 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3163 acquired_lock_work);
76756a51 3164
ed95b21a 3165 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
c525f036
ID
3166}
3167
ed95b21a 3168static void rbd_notify_released_lock(struct work_struct *work)
c525f036 3169{
ed95b21a
ID
3170 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3171 released_lock_work);
811c6688 3172
ed95b21a 3173 rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
fca27065
ID
3174}
3175
ed95b21a 3176static int rbd_request_lock(struct rbd_device *rbd_dev)
36be9a76 3177{
ed95b21a
ID
3178 struct page **reply_pages;
3179 size_t reply_len;
3180 bool lock_owner_responded = false;
36be9a76
AE
3181 int ret;
3182
ed95b21a 3183 dout("%s rbd_dev %p\n", __func__, rbd_dev);
36be9a76 3184
ed95b21a
ID
3185 ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
3186 &reply_pages, &reply_len);
3187 if (ret && ret != -ETIMEDOUT) {
3188 rbd_warn(rbd_dev, "failed to request lock: %d", ret);
36be9a76 3189 goto out;
ed95b21a 3190 }
36be9a76 3191
ed95b21a
ID
3192 if (reply_len > 0 && reply_len <= PAGE_SIZE) {
3193 void *p = page_address(reply_pages[0]);
3194 void *const end = p + reply_len;
3195 u32 n;
36be9a76 3196
ed95b21a
ID
3197 ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
3198 while (n--) {
3199 u8 struct_v;
3200 u32 len;
36be9a76 3201
ed95b21a
ID
3202 ceph_decode_need(&p, end, 8 + 8, e_inval);
3203 p += 8 + 8; /* skip gid and cookie */
04017e29 3204
ed95b21a
ID
3205 ceph_decode_32_safe(&p, end, len, e_inval);
3206 if (!len)
3207 continue;
3208
3209 if (lock_owner_responded) {
3210 rbd_warn(rbd_dev,
3211 "duplicate lock owners detected");
3212 ret = -EIO;
3213 goto out;
3214 }
3215
3216 lock_owner_responded = true;
3217 ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
3218 &struct_v, &len);
3219 if (ret) {
3220 rbd_warn(rbd_dev,
3221 "failed to decode ResponseMessage: %d",
3222 ret);
3223 goto e_inval;
3224 }
3225
3226 ret = ceph_decode_32(&p);
3227 }
3228 }
3229
3230 if (!lock_owner_responded) {
3231 rbd_warn(rbd_dev, "no lock owners detected");
3232 ret = -ETIMEDOUT;
3233 }
3234
3235out:
3236 ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
3237 return ret;
3238
3239e_inval:
3240 ret = -EINVAL;
3241 goto out;
3242}
3243
3244static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
3245{
3246 dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
3247
3248 cancel_delayed_work(&rbd_dev->lock_dwork);
3249 if (wake_all)
3250 wake_up_all(&rbd_dev->lock_waitq);
3251 else
3252 wake_up(&rbd_dev->lock_waitq);
3253}
3254
3255static int get_lock_owner_info(struct rbd_device *rbd_dev,
3256 struct ceph_locker **lockers, u32 *num_lockers)
3257{
3258 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3259 u8 lock_type;
3260 char *lock_tag;
3261 int ret;
3262
3263 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3264
3265 ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
3266 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3267 &lock_type, &lock_tag, lockers, num_lockers);
3268 if (ret)
3269 return ret;
3270
3271 if (*num_lockers == 0) {
3272 dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
3273 goto out;
3274 }
3275
3276 if (strcmp(lock_tag, RBD_LOCK_TAG)) {
3277 rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
3278 lock_tag);
3279 ret = -EBUSY;
3280 goto out;
3281 }
3282
3283 if (lock_type == CEPH_CLS_LOCK_SHARED) {
3284 rbd_warn(rbd_dev, "shared lock type detected");
3285 ret = -EBUSY;
3286 goto out;
3287 }
3288
3289 if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
3290 strlen(RBD_LOCK_COOKIE_PREFIX))) {
3291 rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
3292 (*lockers)[0].id.cookie);
3293 ret = -EBUSY;
3294 goto out;
3295 }
3296
3297out:
3298 kfree(lock_tag);
3299 return ret;
3300}
3301
3302static int find_watcher(struct rbd_device *rbd_dev,
3303 const struct ceph_locker *locker)
3304{
3305 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3306 struct ceph_watch_item *watchers;
3307 u32 num_watchers;
3308 u64 cookie;
3309 int i;
3310 int ret;
3311
3312 ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
3313 &rbd_dev->header_oloc, &watchers,
3314 &num_watchers);
3315 if (ret)
3316 return ret;
3317
3318 sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
3319 for (i = 0; i < num_watchers; i++) {
3320 if (!memcmp(&watchers[i].addr, &locker->info.addr,
3321 sizeof(locker->info.addr)) &&
3322 watchers[i].cookie == cookie) {
3323 struct rbd_client_id cid = {
3324 .gid = le64_to_cpu(watchers[i].name.num),
3325 .handle = cookie,
3326 };
3327
3328 dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
3329 rbd_dev, cid.gid, cid.handle);
3330 rbd_set_owner_cid(rbd_dev, &cid);
3331 ret = 1;
3332 goto out;
3333 }
3334 }
3335
3336 dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
3337 ret = 0;
3338out:
3339 kfree(watchers);
3340 return ret;
3341}
3342
3343/*
3344 * lock_rwsem must be held for write
3345 */
3346static int rbd_try_lock(struct rbd_device *rbd_dev)
3347{
3348 struct ceph_client *client = rbd_dev->rbd_client->client;
3349 struct ceph_locker *lockers;
3350 u32 num_lockers;
3351 int ret;
3352
3353 for (;;) {
3354 ret = rbd_lock(rbd_dev);
3355 if (ret != -EBUSY)
3356 return ret;
3357
3358 /* determine if the current lock holder is still alive */
3359 ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
3360 if (ret)
3361 return ret;
3362
3363 if (num_lockers == 0)
3364 goto again;
3365
3366 ret = find_watcher(rbd_dev, lockers);
3367 if (ret) {
3368 if (ret > 0)
3369 ret = 0; /* have to request lock */
3370 goto out;
3371 }
3372
3373 rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
3374 ENTITY_NAME(lockers[0].id.name));
3375
3376 ret = ceph_monc_blacklist_add(&client->monc,
3377 &lockers[0].info.addr);
3378 if (ret) {
3379 rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
3380 ENTITY_NAME(lockers[0].id.name), ret);
3381 goto out;
3382 }
3383
3384 ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
3385 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3386 lockers[0].id.cookie,
3387 &lockers[0].id.name);
3388 if (ret && ret != -ENOENT)
3389 goto out;
3390
3391again:
3392 ceph_free_lockers(lockers, num_lockers);
3393 }
3394
3395out:
3396 ceph_free_lockers(lockers, num_lockers);
3397 return ret;
3398}
3399
3400/*
3401 * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
3402 */
3403static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
3404 int *pret)
3405{
3406 enum rbd_lock_state lock_state;
3407
3408 down_read(&rbd_dev->lock_rwsem);
3409 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3410 rbd_dev->lock_state);
3411 if (__rbd_is_lock_owner(rbd_dev)) {
3412 lock_state = rbd_dev->lock_state;
3413 up_read(&rbd_dev->lock_rwsem);
3414 return lock_state;
3415 }
3416
3417 up_read(&rbd_dev->lock_rwsem);
3418 down_write(&rbd_dev->lock_rwsem);
3419 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3420 rbd_dev->lock_state);
3421 if (!__rbd_is_lock_owner(rbd_dev)) {
3422 *pret = rbd_try_lock(rbd_dev);
3423 if (*pret)
3424 rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
3425 }
3426
3427 lock_state = rbd_dev->lock_state;
3428 up_write(&rbd_dev->lock_rwsem);
3429 return lock_state;
3430}
3431
3432static void rbd_acquire_lock(struct work_struct *work)
3433{
3434 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3435 struct rbd_device, lock_dwork);
3436 enum rbd_lock_state lock_state;
37f13252 3437 int ret = 0;
ed95b21a
ID
3438
3439 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3440again:
3441 lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
3442 if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
3443 if (lock_state == RBD_LOCK_STATE_LOCKED)
3444 wake_requests(rbd_dev, true);
3445 dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
3446 rbd_dev, lock_state, ret);
3447 return;
3448 }
3449
3450 ret = rbd_request_lock(rbd_dev);
3451 if (ret == -ETIMEDOUT) {
3452 goto again; /* treat this as a dead client */
e010dd0a
ID
3453 } else if (ret == -EROFS) {
3454 rbd_warn(rbd_dev, "peer will not release lock");
3455 /*
3456 * If this is rbd_add_acquire_lock(), we want to fail
3457 * immediately -- reuse BLACKLISTED flag. Otherwise we
3458 * want to block.
3459 */
3460 if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
3461 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
3462 /* wake "rbd map --exclusive" process */
3463 wake_requests(rbd_dev, false);
3464 }
ed95b21a
ID
3465 } else if (ret < 0) {
3466 rbd_warn(rbd_dev, "error requesting lock: %d", ret);
3467 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3468 RBD_RETRY_DELAY);
3469 } else {
3470 /*
3471 * lock owner acked, but resend if we don't see them
3472 * release the lock
3473 */
3474 dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
3475 rbd_dev);
3476 mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
3477 msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
3478 }
3479}
3480
3481/*
3482 * lock_rwsem must be held for write
3483 */
3484static bool rbd_release_lock(struct rbd_device *rbd_dev)
3485{
3486 dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
3487 rbd_dev->lock_state);
3488 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
3489 return false;
3490
3491 rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
3492 downgrade_write(&rbd_dev->lock_rwsem);
52bb1f9b 3493 /*
ed95b21a 3494 * Ensure that all in-flight IO is flushed.
52bb1f9b 3495 *
ed95b21a
ID
3496 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
3497 * may be shared with other devices.
52bb1f9b 3498 */
ed95b21a
ID
3499 ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
3500 up_read(&rbd_dev->lock_rwsem);
3501
3502 down_write(&rbd_dev->lock_rwsem);
3503 dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
3504 rbd_dev->lock_state);
3505 if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
3506 return false;
3507
bbead745
ID
3508 rbd_unlock(rbd_dev);
3509 /*
3510 * Give others a chance to grab the lock - we would re-acquire
3511 * almost immediately if we got new IO during ceph_osdc_sync()
3512 * otherwise. We need to ack our own notifications, so this
3513 * lock_dwork will be requeued from rbd_wait_state_locked()
3514 * after wake_requests() in rbd_handle_released_lock().
3515 */
3516 cancel_delayed_work(&rbd_dev->lock_dwork);
ed95b21a
ID
3517 return true;
3518}
3519
3520static void rbd_release_lock_work(struct work_struct *work)
3521{
3522 struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
3523 unlock_work);
3524
3525 down_write(&rbd_dev->lock_rwsem);
3526 rbd_release_lock(rbd_dev);
3527 up_write(&rbd_dev->lock_rwsem);
3528}
3529
3530static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
3531 void **p)
3532{
3533 struct rbd_client_id cid = { 0 };
3534
3535 if (struct_v >= 2) {
3536 cid.gid = ceph_decode_64(p);
3537 cid.handle = ceph_decode_64(p);
3538 }
3539
3540 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3541 cid.handle);
3542 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3543 down_write(&rbd_dev->lock_rwsem);
3544 if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3545 /*
3546 * we already know that the remote client is
3547 * the owner
3548 */
3549 up_write(&rbd_dev->lock_rwsem);
3550 return;
3551 }
3552
3553 rbd_set_owner_cid(rbd_dev, &cid);
3554 downgrade_write(&rbd_dev->lock_rwsem);
3555 } else {
3556 down_read(&rbd_dev->lock_rwsem);
3557 }
3558
3559 if (!__rbd_is_lock_owner(rbd_dev))
3560 wake_requests(rbd_dev, false);
3561 up_read(&rbd_dev->lock_rwsem);
3562}
3563
3564static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
3565 void **p)
3566{
3567 struct rbd_client_id cid = { 0 };
3568
3569 if (struct_v >= 2) {
3570 cid.gid = ceph_decode_64(p);
3571 cid.handle = ceph_decode_64(p);
3572 }
3573
3574 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3575 cid.handle);
3576 if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
3577 down_write(&rbd_dev->lock_rwsem);
3578 if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
3579 dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
3580 __func__, rbd_dev, cid.gid, cid.handle,
3581 rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
3582 up_write(&rbd_dev->lock_rwsem);
3583 return;
3584 }
3585
3586 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3587 downgrade_write(&rbd_dev->lock_rwsem);
3588 } else {
3589 down_read(&rbd_dev->lock_rwsem);
3590 }
3591
3592 if (!__rbd_is_lock_owner(rbd_dev))
3593 wake_requests(rbd_dev, false);
3594 up_read(&rbd_dev->lock_rwsem);
3595}
3596
3b77faa0
ID
3597/*
3598 * Returns result for ResponseMessage to be encoded (<= 0), or 1 if no
3599 * ResponseMessage is needed.
3600 */
3601static int rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
3602 void **p)
ed95b21a
ID
3603{
3604 struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
3605 struct rbd_client_id cid = { 0 };
3b77faa0 3606 int result = 1;
ed95b21a
ID
3607
3608 if (struct_v >= 2) {
3609 cid.gid = ceph_decode_64(p);
3610 cid.handle = ceph_decode_64(p);
3611 }
3612
3613 dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
3614 cid.handle);
3615 if (rbd_cid_equal(&cid, &my_cid))
3b77faa0 3616 return result;
ed95b21a
ID
3617
3618 down_read(&rbd_dev->lock_rwsem);
3b77faa0
ID
3619 if (__rbd_is_lock_owner(rbd_dev)) {
3620 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED &&
3621 rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid))
3622 goto out_unlock;
3623
3624 /*
3625 * encode ResponseMessage(0) so the peer can detect
3626 * a missing owner
3627 */
3628 result = 0;
3629
3630 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
e010dd0a
ID
3631 if (!rbd_dev->opts->exclusive) {
3632 dout("%s rbd_dev %p queueing unlock_work\n",
3633 __func__, rbd_dev);
3634 queue_work(rbd_dev->task_wq,
3635 &rbd_dev->unlock_work);
3636 } else {
3637 /* refuse to release the lock */
3638 result = -EROFS;
3639 }
ed95b21a
ID
3640 }
3641 }
3b77faa0
ID
3642
3643out_unlock:
ed95b21a 3644 up_read(&rbd_dev->lock_rwsem);
3b77faa0 3645 return result;
ed95b21a
ID
3646}
3647
3648static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
3649 u64 notify_id, u64 cookie, s32 *result)
3650{
3651 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3652 int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
3653 char buf[buf_size];
3654 int ret;
3655
3656 if (result) {
3657 void *p = buf;
3658
3659 /* encode ResponseMessage */
3660 ceph_start_encoding(&p, 1, 1,
3661 buf_size - CEPH_ENCODING_START_BLK_LEN);
3662 ceph_encode_32(&p, *result);
3663 } else {
3664 buf_size = 0;
3665 }
b8d70035 3666
922dab61
ID
3667 ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
3668 &rbd_dev->header_oloc, notify_id, cookie,
ed95b21a 3669 buf, buf_size);
52bb1f9b 3670 if (ret)
ed95b21a
ID
3671 rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
3672}
3673
3674static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
3675 u64 cookie)
3676{
3677 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3678 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
3679}
3680
3681static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
3682 u64 notify_id, u64 cookie, s32 result)
3683{
3684 dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
3685 __rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
3686}
3687
3688static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
3689 u64 notifier_id, void *data, size_t data_len)
3690{
3691 struct rbd_device *rbd_dev = arg;
3692 void *p = data;
3693 void *const end = p + data_len;
d4c2269b 3694 u8 struct_v = 0;
ed95b21a
ID
3695 u32 len;
3696 u32 notify_op;
3697 int ret;
3698
3699 dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
3700 __func__, rbd_dev, cookie, notify_id, data_len);
3701 if (data_len) {
3702 ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
3703 &struct_v, &len);
3704 if (ret) {
3705 rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
3706 ret);
3707 return;
3708 }
3709
3710 notify_op = ceph_decode_32(&p);
3711 } else {
3712 /* legacy notification for header updates */
3713 notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
3714 len = 0;
3715 }
3716
3717 dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
3718 switch (notify_op) {
3719 case RBD_NOTIFY_OP_ACQUIRED_LOCK:
3720 rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
3721 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3722 break;
3723 case RBD_NOTIFY_OP_RELEASED_LOCK:
3724 rbd_handle_released_lock(rbd_dev, struct_v, &p);
3725 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3726 break;
3727 case RBD_NOTIFY_OP_REQUEST_LOCK:
3b77faa0
ID
3728 ret = rbd_handle_request_lock(rbd_dev, struct_v, &p);
3729 if (ret <= 0)
ed95b21a 3730 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3b77faa0 3731 cookie, ret);
ed95b21a
ID
3732 else
3733 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3734 break;
3735 case RBD_NOTIFY_OP_HEADER_UPDATE:
3736 ret = rbd_dev_refresh(rbd_dev);
3737 if (ret)
3738 rbd_warn(rbd_dev, "refresh failed: %d", ret);
3739
3740 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3741 break;
3742 default:
3743 if (rbd_is_lock_owner(rbd_dev))
3744 rbd_acknowledge_notify_result(rbd_dev, notify_id,
3745 cookie, -EOPNOTSUPP);
3746 else
3747 rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
3748 break;
3749 }
b8d70035
AE
3750}
3751
99d16943
ID
3752static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
3753
922dab61 3754static void rbd_watch_errcb(void *arg, u64 cookie, int err)
bb040aa0 3755{
922dab61 3756 struct rbd_device *rbd_dev = arg;
bb040aa0 3757
922dab61 3758 rbd_warn(rbd_dev, "encountered watch error: %d", err);
bb040aa0 3759
ed95b21a
ID
3760 down_write(&rbd_dev->lock_rwsem);
3761 rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
3762 up_write(&rbd_dev->lock_rwsem);
3763
99d16943
ID
3764 mutex_lock(&rbd_dev->watch_mutex);
3765 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
3766 __rbd_unregister_watch(rbd_dev);
3767 rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
bb040aa0 3768
99d16943 3769 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
bb040aa0 3770 }
99d16943 3771 mutex_unlock(&rbd_dev->watch_mutex);
bb040aa0
ID
3772}
3773
9969ebc5 3774/*
99d16943 3775 * watch_mutex must be locked
9969ebc5 3776 */
99d16943 3777static int __rbd_register_watch(struct rbd_device *rbd_dev)
9969ebc5
AE
3778{
3779 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
922dab61 3780 struct ceph_osd_linger_request *handle;
9969ebc5 3781
922dab61 3782 rbd_assert(!rbd_dev->watch_handle);
99d16943 3783 dout("%s rbd_dev %p\n", __func__, rbd_dev);
9969ebc5 3784
922dab61
ID
3785 handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
3786 &rbd_dev->header_oloc, rbd_watch_cb,
3787 rbd_watch_errcb, rbd_dev);
3788 if (IS_ERR(handle))
3789 return PTR_ERR(handle);
8eb87565 3790
922dab61 3791 rbd_dev->watch_handle = handle;
b30a01f2 3792 return 0;
b30a01f2
ID
3793}
3794
99d16943
ID
3795/*
3796 * watch_mutex must be locked
3797 */
3798static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
b30a01f2 3799{
922dab61
ID
3800 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3801 int ret;
b30a01f2 3802
99d16943
ID
3803 rbd_assert(rbd_dev->watch_handle);
3804 dout("%s rbd_dev %p\n", __func__, rbd_dev);
b30a01f2 3805
922dab61
ID
3806 ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
3807 if (ret)
3808 rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
76756a51 3809
922dab61 3810 rbd_dev->watch_handle = NULL;
c525f036
ID
3811}
3812
99d16943
ID
3813static int rbd_register_watch(struct rbd_device *rbd_dev)
3814{
3815 int ret;
3816
3817 mutex_lock(&rbd_dev->watch_mutex);
3818 rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
3819 ret = __rbd_register_watch(rbd_dev);
3820 if (ret)
3821 goto out;
3822
3823 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3824 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3825
3826out:
3827 mutex_unlock(&rbd_dev->watch_mutex);
3828 return ret;
3829}
3830
3831static void cancel_tasks_sync(struct rbd_device *rbd_dev)
c525f036 3832{
99d16943
ID
3833 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3834
ed95b21a
ID
3835 cancel_work_sync(&rbd_dev->acquired_lock_work);
3836 cancel_work_sync(&rbd_dev->released_lock_work);
3837 cancel_delayed_work_sync(&rbd_dev->lock_dwork);
3838 cancel_work_sync(&rbd_dev->unlock_work);
99d16943
ID
3839}
3840
fce4bd57
ID
3841/*
3842 * header_rwsem must not be held to avoid a deadlock with
3843 * rbd_dev_refresh() when flushing notifies.
3844 */
99d16943
ID
3845static void rbd_unregister_watch(struct rbd_device *rbd_dev)
3846{
ed95b21a 3847 WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
99d16943
ID
3848 cancel_tasks_sync(rbd_dev);
3849
3850 mutex_lock(&rbd_dev->watch_mutex);
3851 if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
3852 __rbd_unregister_watch(rbd_dev);
3853 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
3854 mutex_unlock(&rbd_dev->watch_mutex);
811c6688 3855
76022230 3856 cancel_delayed_work_sync(&rbd_dev->watch_dwork);
811c6688 3857 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
fca27065
ID
3858}
3859
14bb211d
ID
3860/*
3861 * lock_rwsem must be held for write
3862 */
3863static void rbd_reacquire_lock(struct rbd_device *rbd_dev)
3864{
3865 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3866 char cookie[32];
3867 int ret;
3868
3869 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
3870
3871 format_lock_cookie(rbd_dev, cookie);
3872 ret = ceph_cls_set_cookie(osdc, &rbd_dev->header_oid,
3873 &rbd_dev->header_oloc, RBD_LOCK_NAME,
3874 CEPH_CLS_LOCK_EXCLUSIVE, rbd_dev->lock_cookie,
3875 RBD_LOCK_TAG, cookie);
3876 if (ret) {
3877 if (ret != -EOPNOTSUPP)
3878 rbd_warn(rbd_dev, "failed to update lock cookie: %d",
3879 ret);
3880
3881 /*
3882 * Lock cookie cannot be updated on older OSDs, so do
3883 * a manual release and queue an acquire.
3884 */
3885 if (rbd_release_lock(rbd_dev))
3886 queue_delayed_work(rbd_dev->task_wq,
3887 &rbd_dev->lock_dwork, 0);
3888 } else {
e068cdee 3889 __rbd_lock(rbd_dev, cookie);
14bb211d
ID
3890 }
3891}
3892
99d16943
ID
3893static void rbd_reregister_watch(struct work_struct *work)
3894{
3895 struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
3896 struct rbd_device, watch_dwork);
3897 int ret;
3898
3899 dout("%s rbd_dev %p\n", __func__, rbd_dev);
3900
3901 mutex_lock(&rbd_dev->watch_mutex);
87c0fded
ID
3902 if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR) {
3903 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3904 return;
87c0fded 3905 }
99d16943
ID
3906
3907 ret = __rbd_register_watch(rbd_dev);
3908 if (ret) {
3909 rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
4d73644b 3910 if (ret == -EBLACKLISTED || ret == -ENOENT) {
87c0fded 3911 set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
14bb211d 3912 wake_requests(rbd_dev, true);
87c0fded 3913 } else {
99d16943
ID
3914 queue_delayed_work(rbd_dev->task_wq,
3915 &rbd_dev->watch_dwork,
3916 RBD_RETRY_DELAY);
87c0fded
ID
3917 }
3918 mutex_unlock(&rbd_dev->watch_mutex);
14bb211d 3919 return;
99d16943
ID
3920 }
3921
3922 rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
3923 rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
3924 mutex_unlock(&rbd_dev->watch_mutex);
3925
14bb211d
ID
3926 down_write(&rbd_dev->lock_rwsem);
3927 if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
3928 rbd_reacquire_lock(rbd_dev);
3929 up_write(&rbd_dev->lock_rwsem);
3930
99d16943
ID
3931 ret = rbd_dev_refresh(rbd_dev);
3932 if (ret)
3933 rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
99d16943
ID
3934}
3935
36be9a76 3936/*
f40eb349
AE
3937 * Synchronous osd object method call. Returns the number of bytes
3938 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
3939 */
3940static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
ecd4a68a
ID
3941 struct ceph_object_id *oid,
3942 struct ceph_object_locator *oloc,
36be9a76 3943 const char *method_name,
4157976b 3944 const void *outbound,
36be9a76 3945 size_t outbound_size,
4157976b 3946 void *inbound,
e2a58ee5 3947 size_t inbound_size)
36be9a76 3948{
ecd4a68a
ID
3949 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3950 struct page *req_page = NULL;
3951 struct page *reply_page;
36be9a76
AE
3952 int ret;
3953
3954 /*
6010a451
AE
3955 * Method calls are ultimately read operations. The result
3956 * should placed into the inbound buffer provided. They
3957 * also supply outbound data--parameters for the object
3958 * method. Currently if this is present it will be a
3959 * snapshot id.
36be9a76 3960 */
ecd4a68a
ID
3961 if (outbound) {
3962 if (outbound_size > PAGE_SIZE)
3963 return -E2BIG;
36be9a76 3964
ecd4a68a
ID
3965 req_page = alloc_page(GFP_KERNEL);
3966 if (!req_page)
3967 return -ENOMEM;
04017e29 3968
ecd4a68a 3969 memcpy(page_address(req_page), outbound, outbound_size);
04017e29 3970 }
36be9a76 3971
ecd4a68a
ID
3972 reply_page = alloc_page(GFP_KERNEL);
3973 if (!reply_page) {
3974 if (req_page)
3975 __free_page(req_page);
3976 return -ENOMEM;
3977 }
57385b51 3978
ecd4a68a
ID
3979 ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name,
3980 CEPH_OSD_FLAG_READ, req_page, outbound_size,
3981 reply_page, &inbound_size);
3982 if (!ret) {
3983 memcpy(inbound, page_address(reply_page), inbound_size);
3984 ret = inbound_size;
3985 }
36be9a76 3986
ecd4a68a
ID
3987 if (req_page)
3988 __free_page(req_page);
3989 __free_page(reply_page);
36be9a76
AE
3990 return ret;
3991}
3992
ed95b21a
ID
3993/*
3994 * lock_rwsem must be held for read
3995 */
3996static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
3997{
3998 DEFINE_WAIT(wait);
3999
4000 do {
4001 /*
4002 * Note the use of mod_delayed_work() in rbd_acquire_lock()
4003 * and cancel_delayed_work() in wake_requests().
4004 */
4005 dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
4006 queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
4007 prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
4008 TASK_UNINTERRUPTIBLE);
4009 up_read(&rbd_dev->lock_rwsem);
4010 schedule();
4011 down_read(&rbd_dev->lock_rwsem);
87c0fded
ID
4012 } while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
4013 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags));
4014
ed95b21a
ID
4015 finish_wait(&rbd_dev->lock_waitq, &wait);
4016}
4017
7ad18afa 4018static void rbd_queue_workfn(struct work_struct *work)
bf0d5f50 4019{
7ad18afa
CH
4020 struct request *rq = blk_mq_rq_from_pdu(work);
4021 struct rbd_device *rbd_dev = rq->q->queuedata;
bc1ecc65 4022 struct rbd_img_request *img_request;
4e752f0a 4023 struct ceph_snap_context *snapc = NULL;
bc1ecc65
ID
4024 u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
4025 u64 length = blk_rq_bytes(rq);
6d2940c8 4026 enum obj_operation_type op_type;
4e752f0a 4027 u64 mapping_size;
80de1912 4028 bool must_be_locked;
bf0d5f50
AE
4029 int result;
4030
aebf526b
CH
4031 switch (req_op(rq)) {
4032 case REQ_OP_DISCARD:
6ac56951 4033 case REQ_OP_WRITE_ZEROES:
90e98c52 4034 op_type = OBJ_OP_DISCARD;
aebf526b
CH
4035 break;
4036 case REQ_OP_WRITE:
6d2940c8 4037 op_type = OBJ_OP_WRITE;
aebf526b
CH
4038 break;
4039 case REQ_OP_READ:
6d2940c8 4040 op_type = OBJ_OP_READ;
aebf526b
CH
4041 break;
4042 default:
4043 dout("%s: non-fs request type %d\n", __func__, req_op(rq));
4044 result = -EIO;
4045 goto err;
4046 }
6d2940c8 4047
bc1ecc65 4048 /* Ignore/skip any zero-length requests */
bf0d5f50 4049
bc1ecc65
ID
4050 if (!length) {
4051 dout("%s: zero-length request\n", __func__);
4052 result = 0;
4053 goto err_rq;
4054 }
bf0d5f50 4055
6d2940c8 4056 /* Only reads are allowed to a read-only device */
bc1ecc65 4057
6d2940c8 4058 if (op_type != OBJ_OP_READ) {
bc1ecc65
ID
4059 if (rbd_dev->mapping.read_only) {
4060 result = -EROFS;
4061 goto err_rq;
4dda41d3 4062 }
bc1ecc65
ID
4063 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
4064 }
4dda41d3 4065
bc1ecc65
ID
4066 /*
4067 * Quit early if the mapped snapshot no longer exists. It's
4068 * still possible the snapshot will have disappeared by the
4069 * time our request arrives at the osd, but there's no sense in
4070 * sending it if we already know.
4071 */
4072 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
4073 dout("request for non-existent snapshot");
4074 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
4075 result = -ENXIO;
4076 goto err_rq;
4077 }
4dda41d3 4078
bc1ecc65
ID
4079 if (offset && length > U64_MAX - offset + 1) {
4080 rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
4081 length);
4082 result = -EINVAL;
4083 goto err_rq; /* Shouldn't happen */
4084 }
4dda41d3 4085
7ad18afa
CH
4086 blk_mq_start_request(rq);
4087
4e752f0a
JD
4088 down_read(&rbd_dev->header_rwsem);
4089 mapping_size = rbd_dev->mapping.size;
6d2940c8 4090 if (op_type != OBJ_OP_READ) {
4e752f0a
JD
4091 snapc = rbd_dev->header.snapc;
4092 ceph_get_snap_context(snapc);
4093 }
4094 up_read(&rbd_dev->header_rwsem);
4095
4096 if (offset + length > mapping_size) {
bc1ecc65 4097 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
4e752f0a 4098 length, mapping_size);
bc1ecc65
ID
4099 result = -EIO;
4100 goto err_rq;
4101 }
bf0d5f50 4102
f9bebd58
ID
4103 must_be_locked =
4104 (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
4105 (op_type != OBJ_OP_READ || rbd_dev->opts->lock_on_read);
ed95b21a
ID
4106 if (must_be_locked) {
4107 down_read(&rbd_dev->lock_rwsem);
87c0fded 4108 if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
e010dd0a
ID
4109 !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4110 if (rbd_dev->opts->exclusive) {
4111 rbd_warn(rbd_dev, "exclusive lock required");
4112 result = -EROFS;
4113 goto err_unlock;
4114 }
ed95b21a 4115 rbd_wait_state_locked(rbd_dev);
e010dd0a 4116 }
87c0fded
ID
4117 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
4118 result = -EBLACKLISTED;
4119 goto err_unlock;
4120 }
ed95b21a
ID
4121 }
4122
6d2940c8 4123 img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
4e752f0a 4124 snapc);
bc1ecc65
ID
4125 if (!img_request) {
4126 result = -ENOMEM;
ed95b21a 4127 goto err_unlock;
bc1ecc65
ID
4128 }
4129 img_request->rq = rq;
70b16db8 4130 snapc = NULL; /* img_request consumes a ref */
bf0d5f50 4131
90e98c52
GZ
4132 if (op_type == OBJ_OP_DISCARD)
4133 result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
4134 NULL);
4135 else
4136 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
4137 rq->bio);
bc1ecc65
ID
4138 if (result)
4139 goto err_img_request;
bf0d5f50 4140
bc1ecc65
ID
4141 result = rbd_img_request_submit(img_request);
4142 if (result)
4143 goto err_img_request;
bf0d5f50 4144
ed95b21a
ID
4145 if (must_be_locked)
4146 up_read(&rbd_dev->lock_rwsem);
bc1ecc65 4147 return;
bf0d5f50 4148
bc1ecc65
ID
4149err_img_request:
4150 rbd_img_request_put(img_request);
ed95b21a
ID
4151err_unlock:
4152 if (must_be_locked)
4153 up_read(&rbd_dev->lock_rwsem);
bc1ecc65
ID
4154err_rq:
4155 if (result)
4156 rbd_warn(rbd_dev, "%s %llx at %llx result %d",
6d2940c8 4157 obj_op_name(op_type), length, offset, result);
e96a650a 4158 ceph_put_snap_context(snapc);
7ad18afa 4159err:
2a842aca 4160 blk_mq_end_request(rq, errno_to_blk_status(result));
bc1ecc65 4161}
bf0d5f50 4162
fc17b653 4163static blk_status_t rbd_queue_rq(struct blk_mq_hw_ctx *hctx,
7ad18afa 4164 const struct blk_mq_queue_data *bd)
bc1ecc65 4165{
7ad18afa
CH
4166 struct request *rq = bd->rq;
4167 struct work_struct *work = blk_mq_rq_to_pdu(rq);
bf0d5f50 4168
7ad18afa 4169 queue_work(rbd_wq, work);
fc17b653 4170 return BLK_STS_OK;
bf0d5f50
AE
4171}
4172
602adf40
YS
4173static void rbd_free_disk(struct rbd_device *rbd_dev)
4174{
5769ed0c
ID
4175 blk_cleanup_queue(rbd_dev->disk->queue);
4176 blk_mq_free_tag_set(&rbd_dev->tag_set);
4177 put_disk(rbd_dev->disk);
a0cab924 4178 rbd_dev->disk = NULL;
602adf40
YS
4179}
4180
788e2df3 4181static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
fe5478e0
ID
4182 struct ceph_object_id *oid,
4183 struct ceph_object_locator *oloc,
4184 void *buf, int buf_len)
788e2df3
AE
4185
4186{
fe5478e0
ID
4187 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4188 struct ceph_osd_request *req;
4189 struct page **pages;
4190 int num_pages = calc_pages_for(0, buf_len);
788e2df3
AE
4191 int ret;
4192
fe5478e0
ID
4193 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL);
4194 if (!req)
4195 return -ENOMEM;
788e2df3 4196
fe5478e0
ID
4197 ceph_oid_copy(&req->r_base_oid, oid);
4198 ceph_oloc_copy(&req->r_base_oloc, oloc);
4199 req->r_flags = CEPH_OSD_FLAG_READ;
430c28c3 4200
fe5478e0 4201 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
788e2df3 4202 if (ret)
fe5478e0 4203 goto out_req;
788e2df3 4204
fe5478e0
ID
4205 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
4206 if (IS_ERR(pages)) {
4207 ret = PTR_ERR(pages);
4208 goto out_req;
4209 }
1ceae7ef 4210
fe5478e0
ID
4211 osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0);
4212 osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
4213 true);
4214
4215 ceph_osdc_start_request(osdc, req, false);
4216 ret = ceph_osdc_wait_request(osdc, req);
4217 if (ret >= 0)
4218 ceph_copy_from_page_vector(pages, buf, 0, ret);
788e2df3 4219
fe5478e0
ID
4220out_req:
4221 ceph_osdc_put_request(req);
788e2df3
AE
4222 return ret;
4223}
4224
602adf40 4225/*
662518b1
AE
4226 * Read the complete header for the given rbd device. On successful
4227 * return, the rbd_dev->header field will contain up-to-date
4228 * information about the image.
602adf40 4229 */
99a41ebc 4230static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 4231{
4156d998 4232 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 4233 u32 snap_count = 0;
4156d998
AE
4234 u64 names_size = 0;
4235 u32 want_count;
4236 int ret;
602adf40 4237
00f1f36f 4238 /*
4156d998
AE
4239 * The complete header will include an array of its 64-bit
4240 * snapshot ids, followed by the names of those snapshots as
4241 * a contiguous block of NUL-terminated strings. Note that
4242 * the number of snapshots could change by the time we read
4243 * it in, in which case we re-read it.
00f1f36f 4244 */
4156d998
AE
4245 do {
4246 size_t size;
4247
4248 kfree(ondisk);
4249
4250 size = sizeof (*ondisk);
4251 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
4252 size += names_size;
4253 ondisk = kmalloc(size, GFP_KERNEL);
4254 if (!ondisk)
662518b1 4255 return -ENOMEM;
4156d998 4256
fe5478e0
ID
4257 ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid,
4258 &rbd_dev->header_oloc, ondisk, size);
4156d998 4259 if (ret < 0)
662518b1 4260 goto out;
c0cd10db 4261 if ((size_t)ret < size) {
4156d998 4262 ret = -ENXIO;
06ecc6cb
AE
4263 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
4264 size, ret);
662518b1 4265 goto out;
4156d998
AE
4266 }
4267 if (!rbd_dev_ondisk_valid(ondisk)) {
4268 ret = -ENXIO;
06ecc6cb 4269 rbd_warn(rbd_dev, "invalid header");
662518b1 4270 goto out;
81e759fb 4271 }
602adf40 4272
4156d998
AE
4273 names_size = le64_to_cpu(ondisk->snap_names_len);
4274 want_count = snap_count;
4275 snap_count = le32_to_cpu(ondisk->snap_count);
4276 } while (snap_count != want_count);
00f1f36f 4277
662518b1
AE
4278 ret = rbd_header_from_disk(rbd_dev, ondisk);
4279out:
4156d998
AE
4280 kfree(ondisk);
4281
4282 return ret;
602adf40
YS
4283}
4284
15228ede
AE
4285/*
4286 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
4287 * has disappeared from the (just updated) snapshot context.
4288 */
4289static void rbd_exists_validate(struct rbd_device *rbd_dev)
4290{
4291 u64 snap_id;
4292
4293 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
4294 return;
4295
4296 snap_id = rbd_dev->spec->snap_id;
4297 if (snap_id == CEPH_NOSNAP)
4298 return;
4299
4300 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
4301 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4302}
4303
9875201e
JD
4304static void rbd_dev_update_size(struct rbd_device *rbd_dev)
4305{
4306 sector_t size;
9875201e
JD
4307
4308 /*
811c6688
ID
4309 * If EXISTS is not set, rbd_dev->disk may be NULL, so don't
4310 * try to update its size. If REMOVING is set, updating size
4311 * is just useless work since the device can't be opened.
9875201e 4312 */
811c6688
ID
4313 if (test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags) &&
4314 !test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) {
9875201e
JD
4315 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
4316 dout("setting size to %llu sectors", (unsigned long long)size);
4317 set_capacity(rbd_dev->disk, size);
4318 revalidate_disk(rbd_dev->disk);
4319 }
4320}
4321
cc4a38bd 4322static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 4323{
e627db08 4324 u64 mapping_size;
1fe5e993
AE
4325 int ret;
4326
cfbf6377 4327 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 4328 mapping_size = rbd_dev->mapping.size;
a720ae09
ID
4329
4330 ret = rbd_dev_header_info(rbd_dev);
52bb1f9b 4331 if (ret)
73e39e4d 4332 goto out;
15228ede 4333
e8f59b59
ID
4334 /*
4335 * If there is a parent, see if it has disappeared due to the
4336 * mapped image getting flattened.
4337 */
4338 if (rbd_dev->parent) {
4339 ret = rbd_dev_v2_parent_info(rbd_dev);
4340 if (ret)
73e39e4d 4341 goto out;
e8f59b59
ID
4342 }
4343
5ff1108c 4344 if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
73e39e4d 4345 rbd_dev->mapping.size = rbd_dev->header.image_size;
5ff1108c
ID
4346 } else {
4347 /* validate mapped snapshot's EXISTS flag */
4348 rbd_exists_validate(rbd_dev);
4349 }
15228ede 4350
73e39e4d 4351out:
cfbf6377 4352 up_write(&rbd_dev->header_rwsem);
73e39e4d 4353 if (!ret && mapping_size != rbd_dev->mapping.size)
9875201e 4354 rbd_dev_update_size(rbd_dev);
1fe5e993 4355
73e39e4d 4356 return ret;
1fe5e993
AE
4357}
4358
d6296d39
CH
4359static int rbd_init_request(struct blk_mq_tag_set *set, struct request *rq,
4360 unsigned int hctx_idx, unsigned int numa_node)
7ad18afa
CH
4361{
4362 struct work_struct *work = blk_mq_rq_to_pdu(rq);
4363
4364 INIT_WORK(work, rbd_queue_workfn);
4365 return 0;
4366}
4367
f363b089 4368static const struct blk_mq_ops rbd_mq_ops = {
7ad18afa 4369 .queue_rq = rbd_queue_rq,
7ad18afa
CH
4370 .init_request = rbd_init_request,
4371};
4372
602adf40
YS
4373static int rbd_init_disk(struct rbd_device *rbd_dev)
4374{
4375 struct gendisk *disk;
4376 struct request_queue *q;
593a9e7b 4377 u64 segment_size;
7ad18afa 4378 int err;
602adf40 4379
602adf40 4380 /* create gendisk info */
7e513d43
ID
4381 disk = alloc_disk(single_major ?
4382 (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
4383 RBD_MINORS_PER_MAJOR);
602adf40 4384 if (!disk)
1fcdb8aa 4385 return -ENOMEM;
602adf40 4386
f0f8cef5 4387 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 4388 rbd_dev->dev_id);
602adf40 4389 disk->major = rbd_dev->major;
dd82fff1 4390 disk->first_minor = rbd_dev->minor;
7e513d43
ID
4391 if (single_major)
4392 disk->flags |= GENHD_FL_EXT_DEVT;
602adf40
YS
4393 disk->fops = &rbd_bd_ops;
4394 disk->private_data = rbd_dev;
4395
7ad18afa
CH
4396 memset(&rbd_dev->tag_set, 0, sizeof(rbd_dev->tag_set));
4397 rbd_dev->tag_set.ops = &rbd_mq_ops;
b5584180 4398 rbd_dev->tag_set.queue_depth = rbd_dev->opts->queue_depth;
7ad18afa 4399 rbd_dev->tag_set.numa_node = NUMA_NO_NODE;
b5584180 4400 rbd_dev->tag_set.flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
7ad18afa
CH
4401 rbd_dev->tag_set.nr_hw_queues = 1;
4402 rbd_dev->tag_set.cmd_size = sizeof(struct work_struct);
4403
4404 err = blk_mq_alloc_tag_set(&rbd_dev->tag_set);
4405 if (err)
602adf40 4406 goto out_disk;
029bcbd8 4407
7ad18afa
CH
4408 q = blk_mq_init_queue(&rbd_dev->tag_set);
4409 if (IS_ERR(q)) {
4410 err = PTR_ERR(q);
4411 goto out_tag_set;
4412 }
4413
d8a2c89c
ID
4414 queue_flag_set_unlocked(QUEUE_FLAG_NONROT, q);
4415 /* QUEUE_FLAG_ADD_RANDOM is off by default for blk-mq */
593a9e7b 4416
029bcbd8 4417 /* set io sizes to object size */
593a9e7b
AE
4418 segment_size = rbd_obj_bytes(&rbd_dev->header);
4419 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
0d9fde4f 4420 q->limits.max_sectors = queue_max_hw_sectors(q);
0b5e3dbf 4421 blk_queue_max_segments(q, USHRT_MAX);
593a9e7b
AE
4422 blk_queue_max_segment_size(q, segment_size);
4423 blk_queue_io_min(q, segment_size);
4424 blk_queue_io_opt(q, segment_size);
029bcbd8 4425
90e98c52
GZ
4426 /* enable the discard support */
4427 queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
4428 q->limits.discard_granularity = segment_size;
4429 q->limits.discard_alignment = segment_size;
2bb4cd5c 4430 blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
6ac56951 4431 blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
90e98c52 4432
bae818ee 4433 if (!ceph_test_opt(rbd_dev->rbd_client->client, NOCRC))
dc3b17cc 4434 q->backing_dev_info->capabilities |= BDI_CAP_STABLE_WRITES;
bae818ee 4435
5769ed0c
ID
4436 /*
4437 * disk_release() expects a queue ref from add_disk() and will
4438 * put it. Hold an extra ref until add_disk() is called.
4439 */
4440 WARN_ON(!blk_get_queue(q));
602adf40 4441 disk->queue = q;
602adf40
YS
4442 q->queuedata = rbd_dev;
4443
4444 rbd_dev->disk = disk;
602adf40 4445
602adf40 4446 return 0;
7ad18afa
CH
4447out_tag_set:
4448 blk_mq_free_tag_set(&rbd_dev->tag_set);
602adf40
YS
4449out_disk:
4450 put_disk(disk);
7ad18afa 4451 return err;
602adf40
YS
4452}
4453
dfc5606d
YS
4454/*
4455 sysfs
4456*/
4457
593a9e7b
AE
4458static struct rbd_device *dev_to_rbd_dev(struct device *dev)
4459{
4460 return container_of(dev, struct rbd_device, dev);
4461}
4462
dfc5606d
YS
4463static ssize_t rbd_size_show(struct device *dev,
4464 struct device_attribute *attr, char *buf)
4465{
593a9e7b 4466 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 4467
fc71d833
AE
4468 return sprintf(buf, "%llu\n",
4469 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
4470}
4471
34b13184
AE
4472/*
4473 * Note this shows the features for whatever's mapped, which is not
4474 * necessarily the base image.
4475 */
4476static ssize_t rbd_features_show(struct device *dev,
4477 struct device_attribute *attr, char *buf)
4478{
4479 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4480
4481 return sprintf(buf, "0x%016llx\n",
fc71d833 4482 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
4483}
4484
dfc5606d
YS
4485static ssize_t rbd_major_show(struct device *dev,
4486 struct device_attribute *attr, char *buf)
4487{
593a9e7b 4488 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4489
fc71d833
AE
4490 if (rbd_dev->major)
4491 return sprintf(buf, "%d\n", rbd_dev->major);
4492
4493 return sprintf(buf, "(none)\n");
dd82fff1
ID
4494}
4495
4496static ssize_t rbd_minor_show(struct device *dev,
4497 struct device_attribute *attr, char *buf)
4498{
4499 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 4500
dd82fff1 4501 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
4502}
4503
005a07bf
ID
4504static ssize_t rbd_client_addr_show(struct device *dev,
4505 struct device_attribute *attr, char *buf)
4506{
4507 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4508 struct ceph_entity_addr *client_addr =
4509 ceph_client_addr(rbd_dev->rbd_client->client);
4510
4511 return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
4512 le32_to_cpu(client_addr->nonce));
4513}
4514
dfc5606d
YS
4515static ssize_t rbd_client_id_show(struct device *dev,
4516 struct device_attribute *attr, char *buf)
602adf40 4517{
593a9e7b 4518 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4519
1dbb4399 4520 return sprintf(buf, "client%lld\n",
033268a5 4521 ceph_client_gid(rbd_dev->rbd_client->client));
602adf40
YS
4522}
4523
267fb90b
MC
4524static ssize_t rbd_cluster_fsid_show(struct device *dev,
4525 struct device_attribute *attr, char *buf)
4526{
4527 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4528
4529 return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
4530}
4531
0d6d1e9c
MC
4532static ssize_t rbd_config_info_show(struct device *dev,
4533 struct device_attribute *attr, char *buf)
4534{
4535 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4536
af4a8899
ID
4537 if (!capable(CAP_SYS_ADMIN))
4538 return -EPERM;
4539
0d6d1e9c 4540 return sprintf(buf, "%s\n", rbd_dev->config_info);
602adf40
YS
4541}
4542
dfc5606d
YS
4543static ssize_t rbd_pool_show(struct device *dev,
4544 struct device_attribute *attr, char *buf)
602adf40 4545{
593a9e7b 4546 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4547
0d7dbfce 4548 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
4549}
4550
9bb2f334
AE
4551static ssize_t rbd_pool_id_show(struct device *dev,
4552 struct device_attribute *attr, char *buf)
4553{
4554 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4555
0d7dbfce 4556 return sprintf(buf, "%llu\n",
fc71d833 4557 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
4558}
4559
dfc5606d
YS
4560static ssize_t rbd_name_show(struct device *dev,
4561 struct device_attribute *attr, char *buf)
4562{
593a9e7b 4563 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4564
a92ffdf8
AE
4565 if (rbd_dev->spec->image_name)
4566 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
4567
4568 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
4569}
4570
589d30e0
AE
4571static ssize_t rbd_image_id_show(struct device *dev,
4572 struct device_attribute *attr, char *buf)
4573{
4574 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4575
0d7dbfce 4576 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
4577}
4578
34b13184
AE
4579/*
4580 * Shows the name of the currently-mapped snapshot (or
4581 * RBD_SNAP_HEAD_NAME for the base image).
4582 */
dfc5606d
YS
4583static ssize_t rbd_snap_show(struct device *dev,
4584 struct device_attribute *attr,
4585 char *buf)
4586{
593a9e7b 4587 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 4588
0d7dbfce 4589 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
4590}
4591
92a58671
MC
4592static ssize_t rbd_snap_id_show(struct device *dev,
4593 struct device_attribute *attr, char *buf)
4594{
4595 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4596
4597 return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
4598}
4599
86b00e0d 4600/*
ff96128f
ID
4601 * For a v2 image, shows the chain of parent images, separated by empty
4602 * lines. For v1 images or if there is no parent, shows "(no parent
4603 * image)".
86b00e0d
AE
4604 */
4605static ssize_t rbd_parent_show(struct device *dev,
ff96128f
ID
4606 struct device_attribute *attr,
4607 char *buf)
86b00e0d
AE
4608{
4609 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
ff96128f 4610 ssize_t count = 0;
86b00e0d 4611
ff96128f 4612 if (!rbd_dev->parent)
86b00e0d
AE
4613 return sprintf(buf, "(no parent image)\n");
4614
ff96128f
ID
4615 for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
4616 struct rbd_spec *spec = rbd_dev->parent_spec;
4617
4618 count += sprintf(&buf[count], "%s"
4619 "pool_id %llu\npool_name %s\n"
4620 "image_id %s\nimage_name %s\n"
4621 "snap_id %llu\nsnap_name %s\n"
4622 "overlap %llu\n",
4623 !count ? "" : "\n", /* first? */
4624 spec->pool_id, spec->pool_name,
4625 spec->image_id, spec->image_name ?: "(unknown)",
4626 spec->snap_id, spec->snap_name,
4627 rbd_dev->parent_overlap);
4628 }
4629
4630 return count;
86b00e0d
AE
4631}
4632
dfc5606d
YS
4633static ssize_t rbd_image_refresh(struct device *dev,
4634 struct device_attribute *attr,
4635 const char *buf,
4636 size_t size)
4637{
593a9e7b 4638 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 4639 int ret;
602adf40 4640
af4a8899
ID
4641 if (!capable(CAP_SYS_ADMIN))
4642 return -EPERM;
4643
cc4a38bd 4644 ret = rbd_dev_refresh(rbd_dev);
e627db08 4645 if (ret)
52bb1f9b 4646 return ret;
b813623a 4647
52bb1f9b 4648 return size;
dfc5606d 4649}
602adf40 4650
dfc5606d 4651static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 4652static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 4653static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 4654static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
005a07bf 4655static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
dfc5606d 4656static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
267fb90b 4657static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
0d6d1e9c 4658static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
dfc5606d 4659static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 4660static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 4661static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 4662static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
4663static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
4664static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
92a58671 4665static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
86b00e0d 4666static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
4667
4668static struct attribute *rbd_attrs[] = {
4669 &dev_attr_size.attr,
34b13184 4670 &dev_attr_features.attr,
dfc5606d 4671 &dev_attr_major.attr,
dd82fff1 4672 &dev_attr_minor.attr,
005a07bf 4673 &dev_attr_client_addr.attr,
dfc5606d 4674 &dev_attr_client_id.attr,
267fb90b 4675 &dev_attr_cluster_fsid.attr,
0d6d1e9c 4676 &dev_attr_config_info.attr,
dfc5606d 4677 &dev_attr_pool.attr,
9bb2f334 4678 &dev_attr_pool_id.attr,
dfc5606d 4679 &dev_attr_name.attr,
589d30e0 4680 &dev_attr_image_id.attr,
dfc5606d 4681 &dev_attr_current_snap.attr,
92a58671 4682 &dev_attr_snap_id.attr,
86b00e0d 4683 &dev_attr_parent.attr,
dfc5606d 4684 &dev_attr_refresh.attr,
dfc5606d
YS
4685 NULL
4686};
4687
4688static struct attribute_group rbd_attr_group = {
4689 .attrs = rbd_attrs,
4690};
4691
4692static const struct attribute_group *rbd_attr_groups[] = {
4693 &rbd_attr_group,
4694 NULL
4695};
4696
6cac4695 4697static void rbd_dev_release(struct device *dev);
dfc5606d 4698
b9942bc9 4699static const struct device_type rbd_device_type = {
dfc5606d
YS
4700 .name = "rbd",
4701 .groups = rbd_attr_groups,
6cac4695 4702 .release = rbd_dev_release,
dfc5606d
YS
4703};
4704
8b8fb99c
AE
4705static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
4706{
4707 kref_get(&spec->kref);
4708
4709 return spec;
4710}
4711
4712static void rbd_spec_free(struct kref *kref);
4713static void rbd_spec_put(struct rbd_spec *spec)
4714{
4715 if (spec)
4716 kref_put(&spec->kref, rbd_spec_free);
4717}
4718
4719static struct rbd_spec *rbd_spec_alloc(void)
4720{
4721 struct rbd_spec *spec;
4722
4723 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4724 if (!spec)
4725 return NULL;
04077599
ID
4726
4727 spec->pool_id = CEPH_NOPOOL;
4728 spec->snap_id = CEPH_NOSNAP;
8b8fb99c
AE
4729 kref_init(&spec->kref);
4730
8b8fb99c
AE
4731 return spec;
4732}
4733
4734static void rbd_spec_free(struct kref *kref)
4735{
4736 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4737
4738 kfree(spec->pool_name);
4739 kfree(spec->image_id);
4740 kfree(spec->image_name);
4741 kfree(spec->snap_name);
4742 kfree(spec);
4743}
4744
1643dfa4 4745static void rbd_dev_free(struct rbd_device *rbd_dev)
dd5ac32d 4746{
99d16943 4747 WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
ed95b21a 4748 WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
dd5ac32d 4749
c41d13a3 4750 ceph_oid_destroy(&rbd_dev->header_oid);
6b6dddbe 4751 ceph_oloc_destroy(&rbd_dev->header_oloc);
0d6d1e9c 4752 kfree(rbd_dev->config_info);
c41d13a3 4753
dd5ac32d
ID
4754 rbd_put_client(rbd_dev->rbd_client);
4755 rbd_spec_put(rbd_dev->spec);
4756 kfree(rbd_dev->opts);
4757 kfree(rbd_dev);
1643dfa4
ID
4758}
4759
4760static void rbd_dev_release(struct device *dev)
4761{
4762 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4763 bool need_put = !!rbd_dev->opts;
4764
4765 if (need_put) {
4766 destroy_workqueue(rbd_dev->task_wq);
4767 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4768 }
4769
4770 rbd_dev_free(rbd_dev);
dd5ac32d
ID
4771
4772 /*
4773 * This is racy, but way better than putting module outside of
4774 * the release callback. The race window is pretty small, so
4775 * doing something similar to dm (dm-builtin.c) is overkill.
4776 */
4777 if (need_put)
4778 module_put(THIS_MODULE);
4779}
4780
1643dfa4
ID
4781static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
4782 struct rbd_spec *spec)
c53d5893
AE
4783{
4784 struct rbd_device *rbd_dev;
4785
1643dfa4 4786 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
c53d5893
AE
4787 if (!rbd_dev)
4788 return NULL;
4789
4790 spin_lock_init(&rbd_dev->lock);
4791 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
4792 init_rwsem(&rbd_dev->header_rwsem);
4793
7e97332e 4794 rbd_dev->header.data_pool_id = CEPH_NOPOOL;
c41d13a3 4795 ceph_oid_init(&rbd_dev->header_oid);
431a02cd 4796 rbd_dev->header_oloc.pool = spec->pool_id;
c41d13a3 4797
99d16943
ID
4798 mutex_init(&rbd_dev->watch_mutex);
4799 rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
4800 INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
4801
ed95b21a
ID
4802 init_rwsem(&rbd_dev->lock_rwsem);
4803 rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
4804 INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
4805 INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
4806 INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
4807 INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
4808 init_waitqueue_head(&rbd_dev->lock_waitq);
4809
dd5ac32d
ID
4810 rbd_dev->dev.bus = &rbd_bus_type;
4811 rbd_dev->dev.type = &rbd_device_type;
4812 rbd_dev->dev.parent = &rbd_root_dev;
dd5ac32d
ID
4813 device_initialize(&rbd_dev->dev);
4814
c53d5893 4815 rbd_dev->rbd_client = rbdc;
d147543d 4816 rbd_dev->spec = spec;
0903e875 4817
1643dfa4
ID
4818 return rbd_dev;
4819}
4820
4821/*
4822 * Create a mapping rbd_dev.
4823 */
4824static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4825 struct rbd_spec *spec,
4826 struct rbd_options *opts)
4827{
4828 struct rbd_device *rbd_dev;
4829
4830 rbd_dev = __rbd_dev_create(rbdc, spec);
4831 if (!rbd_dev)
4832 return NULL;
4833
4834 rbd_dev->opts = opts;
4835
4836 /* get an id and fill in device name */
4837 rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
4838 minor_to_rbd_dev_id(1 << MINORBITS),
4839 GFP_KERNEL);
4840 if (rbd_dev->dev_id < 0)
4841 goto fail_rbd_dev;
4842
4843 sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
4844 rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
4845 rbd_dev->name);
4846 if (!rbd_dev->task_wq)
4847 goto fail_dev_id;
dd5ac32d 4848
1643dfa4
ID
4849 /* we have a ref from do_rbd_add() */
4850 __module_get(THIS_MODULE);
dd5ac32d 4851
1643dfa4 4852 dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
c53d5893 4853 return rbd_dev;
1643dfa4
ID
4854
4855fail_dev_id:
4856 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4857fail_rbd_dev:
4858 rbd_dev_free(rbd_dev);
4859 return NULL;
c53d5893
AE
4860}
4861
4862static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4863{
dd5ac32d
ID
4864 if (rbd_dev)
4865 put_device(&rbd_dev->dev);
c53d5893
AE
4866}
4867
9d475de5
AE
4868/*
4869 * Get the size and object order for an image snapshot, or if
4870 * snap_id is CEPH_NOSNAP, gets this information for the base
4871 * image.
4872 */
4873static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4874 u8 *order, u64 *snap_size)
4875{
4876 __le64 snapid = cpu_to_le64(snap_id);
4877 int ret;
4878 struct {
4879 u8 order;
4880 __le64 size;
4881 } __attribute__ ((packed)) size_buf = { 0 };
4882
ecd4a68a
ID
4883 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4884 &rbd_dev->header_oloc, "get_size",
4885 &snapid, sizeof(snapid),
4886 &size_buf, sizeof(size_buf));
36be9a76 4887 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
4888 if (ret < 0)
4889 return ret;
57385b51
AE
4890 if (ret < sizeof (size_buf))
4891 return -ERANGE;
9d475de5 4892
c3545579 4893 if (order) {
c86f86e9 4894 *order = size_buf.order;
c3545579
JD
4895 dout(" order %u", (unsigned int)*order);
4896 }
9d475de5
AE
4897 *snap_size = le64_to_cpu(size_buf.size);
4898
c3545579
JD
4899 dout(" snap_id 0x%016llx snap_size = %llu\n",
4900 (unsigned long long)snap_id,
57385b51 4901 (unsigned long long)*snap_size);
9d475de5
AE
4902
4903 return 0;
4904}
4905
4906static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4907{
4908 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4909 &rbd_dev->header.obj_order,
4910 &rbd_dev->header.image_size);
4911}
4912
1e130199
AE
4913static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4914{
4915 void *reply_buf;
4916 int ret;
4917 void *p;
4918
4919 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4920 if (!reply_buf)
4921 return -ENOMEM;
4922
ecd4a68a
ID
4923 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4924 &rbd_dev->header_oloc, "get_object_prefix",
4925 NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 4926 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
4927 if (ret < 0)
4928 goto out;
4929
4930 p = reply_buf;
4931 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
4932 p + ret, NULL, GFP_NOIO);
4933 ret = 0;
1e130199
AE
4934
4935 if (IS_ERR(rbd_dev->header.object_prefix)) {
4936 ret = PTR_ERR(rbd_dev->header.object_prefix);
4937 rbd_dev->header.object_prefix = NULL;
4938 } else {
4939 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
4940 }
1e130199
AE
4941out:
4942 kfree(reply_buf);
4943
4944 return ret;
4945}
4946
b1b5402a
AE
4947static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4948 u64 *snap_features)
4949{
4950 __le64 snapid = cpu_to_le64(snap_id);
4951 struct {
4952 __le64 features;
4953 __le64 incompat;
4157976b 4954 } __attribute__ ((packed)) features_buf = { 0 };
d3767f0f 4955 u64 unsup;
b1b5402a
AE
4956 int ret;
4957
ecd4a68a
ID
4958 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
4959 &rbd_dev->header_oloc, "get_features",
4960 &snapid, sizeof(snapid),
4961 &features_buf, sizeof(features_buf));
36be9a76 4962 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
4963 if (ret < 0)
4964 return ret;
57385b51
AE
4965 if (ret < sizeof (features_buf))
4966 return -ERANGE;
d889140c 4967
d3767f0f
ID
4968 unsup = le64_to_cpu(features_buf.incompat) & ~RBD_FEATURES_SUPPORTED;
4969 if (unsup) {
4970 rbd_warn(rbd_dev, "image uses unsupported features: 0x%llx",
4971 unsup);
b8f5c6ed 4972 return -ENXIO;
d3767f0f 4973 }
d889140c 4974
b1b5402a
AE
4975 *snap_features = le64_to_cpu(features_buf.features);
4976
4977 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
4978 (unsigned long long)snap_id,
4979 (unsigned long long)*snap_features,
4980 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
4981
4982 return 0;
4983}
4984
4985static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4986{
4987 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4988 &rbd_dev->header.features);
4989}
4990
86b00e0d
AE
4991static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4992{
4993 struct rbd_spec *parent_spec;
4994 size_t size;
4995 void *reply_buf = NULL;
4996 __le64 snapid;
4997 void *p;
4998 void *end;
642a2537 4999 u64 pool_id;
86b00e0d 5000 char *image_id;
3b5cf2a2 5001 u64 snap_id;
86b00e0d 5002 u64 overlap;
86b00e0d
AE
5003 int ret;
5004
5005 parent_spec = rbd_spec_alloc();
5006 if (!parent_spec)
5007 return -ENOMEM;
5008
5009 size = sizeof (__le64) + /* pool_id */
5010 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
5011 sizeof (__le64) + /* snap_id */
5012 sizeof (__le64); /* overlap */
5013 reply_buf = kmalloc(size, GFP_KERNEL);
5014 if (!reply_buf) {
5015 ret = -ENOMEM;
5016 goto out_err;
5017 }
5018
4d9b67cd 5019 snapid = cpu_to_le64(rbd_dev->spec->snap_id);
ecd4a68a
ID
5020 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5021 &rbd_dev->header_oloc, "get_parent",
5022 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 5023 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
5024 if (ret < 0)
5025 goto out_err;
5026
86b00e0d 5027 p = reply_buf;
57385b51
AE
5028 end = reply_buf + ret;
5029 ret = -ERANGE;
642a2537 5030 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
5031 if (pool_id == CEPH_NOPOOL) {
5032 /*
5033 * Either the parent never existed, or we have
5034 * record of it but the image got flattened so it no
5035 * longer has a parent. When the parent of a
5036 * layered image disappears we immediately set the
5037 * overlap to 0. The effect of this is that all new
5038 * requests will be treated as if the image had no
5039 * parent.
5040 */
5041 if (rbd_dev->parent_overlap) {
5042 rbd_dev->parent_overlap = 0;
392a9dad
AE
5043 rbd_dev_parent_put(rbd_dev);
5044 pr_info("%s: clone image has been flattened\n",
5045 rbd_dev->disk->disk_name);
5046 }
5047
86b00e0d 5048 goto out; /* No parent? No problem. */
392a9dad 5049 }
86b00e0d 5050
0903e875
AE
5051 /* The ceph file layout needs to fit pool id in 32 bits */
5052
5053 ret = -EIO;
642a2537 5054 if (pool_id > (u64)U32_MAX) {
9584d508 5055 rbd_warn(NULL, "parent pool id too large (%llu > %u)",
642a2537 5056 (unsigned long long)pool_id, U32_MAX);
57385b51 5057 goto out_err;
c0cd10db 5058 }
0903e875 5059
979ed480 5060 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
5061 if (IS_ERR(image_id)) {
5062 ret = PTR_ERR(image_id);
5063 goto out_err;
5064 }
3b5cf2a2 5065 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
5066 ceph_decode_64_safe(&p, end, overlap, out_err);
5067
3b5cf2a2
AE
5068 /*
5069 * The parent won't change (except when the clone is
5070 * flattened, already handled that). So we only need to
5071 * record the parent spec we have not already done so.
5072 */
5073 if (!rbd_dev->parent_spec) {
5074 parent_spec->pool_id = pool_id;
5075 parent_spec->image_id = image_id;
5076 parent_spec->snap_id = snap_id;
70cf49cf
AE
5077 rbd_dev->parent_spec = parent_spec;
5078 parent_spec = NULL; /* rbd_dev now owns this */
fbba11b3
ID
5079 } else {
5080 kfree(image_id);
3b5cf2a2
AE
5081 }
5082
5083 /*
cf32bd9c
ID
5084 * We always update the parent overlap. If it's zero we issue
5085 * a warning, as we will proceed as if there was no parent.
3b5cf2a2 5086 */
3b5cf2a2 5087 if (!overlap) {
3b5cf2a2 5088 if (parent_spec) {
cf32bd9c
ID
5089 /* refresh, careful to warn just once */
5090 if (rbd_dev->parent_overlap)
5091 rbd_warn(rbd_dev,
5092 "clone now standalone (overlap became 0)");
3b5cf2a2 5093 } else {
cf32bd9c
ID
5094 /* initial probe */
5095 rbd_warn(rbd_dev, "clone is standalone (overlap 0)");
3b5cf2a2 5096 }
70cf49cf 5097 }
cf32bd9c
ID
5098 rbd_dev->parent_overlap = overlap;
5099
86b00e0d
AE
5100out:
5101 ret = 0;
5102out_err:
5103 kfree(reply_buf);
5104 rbd_spec_put(parent_spec);
5105
5106 return ret;
5107}
5108
cc070d59
AE
5109static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
5110{
5111 struct {
5112 __le64 stripe_unit;
5113 __le64 stripe_count;
5114 } __attribute__ ((packed)) striping_info_buf = { 0 };
5115 size_t size = sizeof (striping_info_buf);
5116 void *p;
5117 u64 obj_size;
5118 u64 stripe_unit;
5119 u64 stripe_count;
5120 int ret;
5121
ecd4a68a
ID
5122 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5123 &rbd_dev->header_oloc, "get_stripe_unit_count",
5124 NULL, 0, &striping_info_buf, size);
cc070d59
AE
5125 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5126 if (ret < 0)
5127 return ret;
5128 if (ret < size)
5129 return -ERANGE;
5130
5131 /*
5132 * We don't actually support the "fancy striping" feature
5133 * (STRIPINGV2) yet, but if the striping sizes are the
5134 * defaults the behavior is the same as before. So find
5135 * out, and only fail if the image has non-default values.
5136 */
5137 ret = -EINVAL;
5bc3fb17 5138 obj_size = rbd_obj_bytes(&rbd_dev->header);
cc070d59
AE
5139 p = &striping_info_buf;
5140 stripe_unit = ceph_decode_64(&p);
5141 if (stripe_unit != obj_size) {
5142 rbd_warn(rbd_dev, "unsupported stripe unit "
5143 "(got %llu want %llu)",
5144 stripe_unit, obj_size);
5145 return -EINVAL;
5146 }
5147 stripe_count = ceph_decode_64(&p);
5148 if (stripe_count != 1) {
5149 rbd_warn(rbd_dev, "unsupported stripe count "
5150 "(got %llu want 1)", stripe_count);
5151 return -EINVAL;
5152 }
500d0c0f
AE
5153 rbd_dev->header.stripe_unit = stripe_unit;
5154 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
5155
5156 return 0;
5157}
5158
7e97332e
ID
5159static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev)
5160{
5161 __le64 data_pool_id;
5162 int ret;
5163
5164 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5165 &rbd_dev->header_oloc, "get_data_pool",
5166 NULL, 0, &data_pool_id, sizeof(data_pool_id));
5167 if (ret < 0)
5168 return ret;
5169 if (ret < sizeof(data_pool_id))
5170 return -EBADMSG;
5171
5172 rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id);
5173 WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL);
5174 return 0;
5175}
5176
9e15b77d
AE
5177static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
5178{
ecd4a68a 5179 CEPH_DEFINE_OID_ONSTACK(oid);
9e15b77d
AE
5180 size_t image_id_size;
5181 char *image_id;
5182 void *p;
5183 void *end;
5184 size_t size;
5185 void *reply_buf = NULL;
5186 size_t len = 0;
5187 char *image_name = NULL;
5188 int ret;
5189
5190 rbd_assert(!rbd_dev->spec->image_name);
5191
69e7a02f
AE
5192 len = strlen(rbd_dev->spec->image_id);
5193 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
5194 image_id = kmalloc(image_id_size, GFP_KERNEL);
5195 if (!image_id)
5196 return NULL;
5197
5198 p = image_id;
4157976b 5199 end = image_id + image_id_size;
57385b51 5200 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
5201
5202 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
5203 reply_buf = kmalloc(size, GFP_KERNEL);
5204 if (!reply_buf)
5205 goto out;
5206
ecd4a68a
ID
5207 ceph_oid_printf(&oid, "%s", RBD_DIRECTORY);
5208 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5209 "dir_get_name", image_id, image_id_size,
5210 reply_buf, size);
9e15b77d
AE
5211 if (ret < 0)
5212 goto out;
5213 p = reply_buf;
f40eb349
AE
5214 end = reply_buf + ret;
5215
9e15b77d
AE
5216 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
5217 if (IS_ERR(image_name))
5218 image_name = NULL;
5219 else
5220 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
5221out:
5222 kfree(reply_buf);
5223 kfree(image_id);
5224
5225 return image_name;
5226}
5227
2ad3d716
AE
5228static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5229{
5230 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5231 const char *snap_name;
5232 u32 which = 0;
5233
5234 /* Skip over names until we find the one we are looking for */
5235
5236 snap_name = rbd_dev->header.snap_names;
5237 while (which < snapc->num_snaps) {
5238 if (!strcmp(name, snap_name))
5239 return snapc->snaps[which];
5240 snap_name += strlen(snap_name) + 1;
5241 which++;
5242 }
5243 return CEPH_NOSNAP;
5244}
5245
5246static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5247{
5248 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
5249 u32 which;
5250 bool found = false;
5251 u64 snap_id;
5252
5253 for (which = 0; !found && which < snapc->num_snaps; which++) {
5254 const char *snap_name;
5255
5256 snap_id = snapc->snaps[which];
5257 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
5258 if (IS_ERR(snap_name)) {
5259 /* ignore no-longer existing snapshots */
5260 if (PTR_ERR(snap_name) == -ENOENT)
5261 continue;
5262 else
5263 break;
5264 }
2ad3d716
AE
5265 found = !strcmp(name, snap_name);
5266 kfree(snap_name);
5267 }
5268 return found ? snap_id : CEPH_NOSNAP;
5269}
5270
5271/*
5272 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
5273 * no snapshot by that name is found, or if an error occurs.
5274 */
5275static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
5276{
5277 if (rbd_dev->image_format == 1)
5278 return rbd_v1_snap_id_by_name(rbd_dev, name);
5279
5280 return rbd_v2_snap_id_by_name(rbd_dev, name);
5281}
5282
9e15b77d 5283/*
04077599
ID
5284 * An image being mapped will have everything but the snap id.
5285 */
5286static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
5287{
5288 struct rbd_spec *spec = rbd_dev->spec;
5289
5290 rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
5291 rbd_assert(spec->image_id && spec->image_name);
5292 rbd_assert(spec->snap_name);
5293
5294 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
5295 u64 snap_id;
5296
5297 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
5298 if (snap_id == CEPH_NOSNAP)
5299 return -ENOENT;
5300
5301 spec->snap_id = snap_id;
5302 } else {
5303 spec->snap_id = CEPH_NOSNAP;
5304 }
5305
5306 return 0;
5307}
5308
5309/*
5310 * A parent image will have all ids but none of the names.
e1d4213f 5311 *
04077599
ID
5312 * All names in an rbd spec are dynamically allocated. It's OK if we
5313 * can't figure out the name for an image id.
9e15b77d 5314 */
04077599 5315static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
9e15b77d 5316{
2e9f7f1c
AE
5317 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
5318 struct rbd_spec *spec = rbd_dev->spec;
5319 const char *pool_name;
5320 const char *image_name;
5321 const char *snap_name;
9e15b77d
AE
5322 int ret;
5323
04077599
ID
5324 rbd_assert(spec->pool_id != CEPH_NOPOOL);
5325 rbd_assert(spec->image_id);
5326 rbd_assert(spec->snap_id != CEPH_NOSNAP);
9e15b77d 5327
2e9f7f1c 5328 /* Get the pool name; we have to make our own copy of this */
9e15b77d 5329
2e9f7f1c
AE
5330 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
5331 if (!pool_name) {
5332 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
5333 return -EIO;
5334 }
2e9f7f1c
AE
5335 pool_name = kstrdup(pool_name, GFP_KERNEL);
5336 if (!pool_name)
9e15b77d
AE
5337 return -ENOMEM;
5338
5339 /* Fetch the image name; tolerate failure here */
5340
2e9f7f1c
AE
5341 image_name = rbd_dev_image_name(rbd_dev);
5342 if (!image_name)
06ecc6cb 5343 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 5344
04077599 5345 /* Fetch the snapshot name */
9e15b77d 5346
2e9f7f1c 5347 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
5348 if (IS_ERR(snap_name)) {
5349 ret = PTR_ERR(snap_name);
9e15b77d 5350 goto out_err;
2e9f7f1c
AE
5351 }
5352
5353 spec->pool_name = pool_name;
5354 spec->image_name = image_name;
5355 spec->snap_name = snap_name;
9e15b77d
AE
5356
5357 return 0;
04077599 5358
9e15b77d 5359out_err:
2e9f7f1c
AE
5360 kfree(image_name);
5361 kfree(pool_name);
9e15b77d
AE
5362 return ret;
5363}
5364
cc4a38bd 5365static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
5366{
5367 size_t size;
5368 int ret;
5369 void *reply_buf;
5370 void *p;
5371 void *end;
5372 u64 seq;
5373 u32 snap_count;
5374 struct ceph_snap_context *snapc;
5375 u32 i;
5376
5377 /*
5378 * We'll need room for the seq value (maximum snapshot id),
5379 * snapshot count, and array of that many snapshot ids.
5380 * For now we have a fixed upper limit on the number we're
5381 * prepared to receive.
5382 */
5383 size = sizeof (__le64) + sizeof (__le32) +
5384 RBD_MAX_SNAP_COUNT * sizeof (__le64);
5385 reply_buf = kzalloc(size, GFP_KERNEL);
5386 if (!reply_buf)
5387 return -ENOMEM;
5388
ecd4a68a
ID
5389 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5390 &rbd_dev->header_oloc, "get_snapcontext",
5391 NULL, 0, reply_buf, size);
36be9a76 5392 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
5393 if (ret < 0)
5394 goto out;
5395
35d489f9 5396 p = reply_buf;
57385b51
AE
5397 end = reply_buf + ret;
5398 ret = -ERANGE;
35d489f9
AE
5399 ceph_decode_64_safe(&p, end, seq, out);
5400 ceph_decode_32_safe(&p, end, snap_count, out);
5401
5402 /*
5403 * Make sure the reported number of snapshot ids wouldn't go
5404 * beyond the end of our buffer. But before checking that,
5405 * make sure the computed size of the snapshot context we
5406 * allocate is representable in a size_t.
5407 */
5408 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
5409 / sizeof (u64)) {
5410 ret = -EINVAL;
5411 goto out;
5412 }
5413 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
5414 goto out;
468521c1 5415 ret = 0;
35d489f9 5416
812164f8 5417 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
5418 if (!snapc) {
5419 ret = -ENOMEM;
5420 goto out;
5421 }
35d489f9 5422 snapc->seq = seq;
35d489f9
AE
5423 for (i = 0; i < snap_count; i++)
5424 snapc->snaps[i] = ceph_decode_64(&p);
5425
49ece554 5426 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
5427 rbd_dev->header.snapc = snapc;
5428
5429 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 5430 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
5431out:
5432 kfree(reply_buf);
5433
57385b51 5434 return ret;
35d489f9
AE
5435}
5436
54cac61f
AE
5437static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
5438 u64 snap_id)
b8b1e2db
AE
5439{
5440 size_t size;
5441 void *reply_buf;
54cac61f 5442 __le64 snapid;
b8b1e2db
AE
5443 int ret;
5444 void *p;
5445 void *end;
b8b1e2db
AE
5446 char *snap_name;
5447
5448 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
5449 reply_buf = kmalloc(size, GFP_KERNEL);
5450 if (!reply_buf)
5451 return ERR_PTR(-ENOMEM);
5452
54cac61f 5453 snapid = cpu_to_le64(snap_id);
ecd4a68a
ID
5454 ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid,
5455 &rbd_dev->header_oloc, "get_snapshot_name",
5456 &snapid, sizeof(snapid), reply_buf, size);
36be9a76 5457 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
5458 if (ret < 0) {
5459 snap_name = ERR_PTR(ret);
b8b1e2db 5460 goto out;
f40eb349 5461 }
b8b1e2db
AE
5462
5463 p = reply_buf;
f40eb349 5464 end = reply_buf + ret;
e5c35534 5465 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 5466 if (IS_ERR(snap_name))
b8b1e2db 5467 goto out;
b8b1e2db 5468
f40eb349 5469 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 5470 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
5471out:
5472 kfree(reply_buf);
5473
f40eb349 5474 return snap_name;
b8b1e2db
AE
5475}
5476
2df3fac7 5477static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 5478{
2df3fac7 5479 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 5480 int ret;
117973fb 5481
1617e40c
JD
5482 ret = rbd_dev_v2_image_size(rbd_dev);
5483 if (ret)
cfbf6377 5484 return ret;
1617e40c 5485
2df3fac7
AE
5486 if (first_time) {
5487 ret = rbd_dev_v2_header_onetime(rbd_dev);
5488 if (ret)
cfbf6377 5489 return ret;
2df3fac7
AE
5490 }
5491
cc4a38bd 5492 ret = rbd_dev_v2_snap_context(rbd_dev);
d194cd1d
ID
5493 if (ret && first_time) {
5494 kfree(rbd_dev->header.object_prefix);
5495 rbd_dev->header.object_prefix = NULL;
5496 }
117973fb
AE
5497
5498 return ret;
5499}
5500
a720ae09
ID
5501static int rbd_dev_header_info(struct rbd_device *rbd_dev)
5502{
5503 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5504
5505 if (rbd_dev->image_format == 1)
5506 return rbd_dev_v1_header_info(rbd_dev);
5507
5508 return rbd_dev_v2_header_info(rbd_dev);
5509}
5510
e28fff26
AE
5511/*
5512 * Skips over white space at *buf, and updates *buf to point to the
5513 * first found non-space character (if any). Returns the length of
593a9e7b
AE
5514 * the token (string of non-white space characters) found. Note
5515 * that *buf must be terminated with '\0'.
e28fff26
AE
5516 */
5517static inline size_t next_token(const char **buf)
5518{
5519 /*
5520 * These are the characters that produce nonzero for
5521 * isspace() in the "C" and "POSIX" locales.
5522 */
5523 const char *spaces = " \f\n\r\t\v";
5524
5525 *buf += strspn(*buf, spaces); /* Find start of token */
5526
5527 return strcspn(*buf, spaces); /* Return token length */
5528}
5529
ea3352f4
AE
5530/*
5531 * Finds the next token in *buf, dynamically allocates a buffer big
5532 * enough to hold a copy of it, and copies the token into the new
5533 * buffer. The copy is guaranteed to be terminated with '\0'. Note
5534 * that a duplicate buffer is created even for a zero-length token.
5535 *
5536 * Returns a pointer to the newly-allocated duplicate, or a null
5537 * pointer if memory for the duplicate was not available. If
5538 * the lenp argument is a non-null pointer, the length of the token
5539 * (not including the '\0') is returned in *lenp.
5540 *
5541 * If successful, the *buf pointer will be updated to point beyond
5542 * the end of the found token.
5543 *
5544 * Note: uses GFP_KERNEL for allocation.
5545 */
5546static inline char *dup_token(const char **buf, size_t *lenp)
5547{
5548 char *dup;
5549 size_t len;
5550
5551 len = next_token(buf);
4caf35f9 5552 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
5553 if (!dup)
5554 return NULL;
ea3352f4
AE
5555 *(dup + len) = '\0';
5556 *buf += len;
5557
5558 if (lenp)
5559 *lenp = len;
5560
5561 return dup;
5562}
5563
a725f65e 5564/*
859c31df
AE
5565 * Parse the options provided for an "rbd add" (i.e., rbd image
5566 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
5567 * and the data written is passed here via a NUL-terminated buffer.
5568 * Returns 0 if successful or an error code otherwise.
d22f76e7 5569 *
859c31df
AE
5570 * The information extracted from these options is recorded in
5571 * the other parameters which return dynamically-allocated
5572 * structures:
5573 * ceph_opts
5574 * The address of a pointer that will refer to a ceph options
5575 * structure. Caller must release the returned pointer using
5576 * ceph_destroy_options() when it is no longer needed.
5577 * rbd_opts
5578 * Address of an rbd options pointer. Fully initialized by
5579 * this function; caller must release with kfree().
5580 * spec
5581 * Address of an rbd image specification pointer. Fully
5582 * initialized by this function based on parsed options.
5583 * Caller must release with rbd_spec_put().
5584 *
5585 * The options passed take this form:
5586 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
5587 * where:
5588 * <mon_addrs>
5589 * A comma-separated list of one or more monitor addresses.
5590 * A monitor address is an ip address, optionally followed
5591 * by a port number (separated by a colon).
5592 * I.e.: ip1[:port1][,ip2[:port2]...]
5593 * <options>
5594 * A comma-separated list of ceph and/or rbd options.
5595 * <pool_name>
5596 * The name of the rados pool containing the rbd image.
5597 * <image_name>
5598 * The name of the image in that pool to map.
5599 * <snap_id>
5600 * An optional snapshot id. If provided, the mapping will
5601 * present data from the image at the time that snapshot was
5602 * created. The image head is used if no snapshot id is
5603 * provided. Snapshot mappings are always read-only.
a725f65e 5604 */
859c31df 5605static int rbd_add_parse_args(const char *buf,
dc79b113 5606 struct ceph_options **ceph_opts,
859c31df
AE
5607 struct rbd_options **opts,
5608 struct rbd_spec **rbd_spec)
e28fff26 5609{
d22f76e7 5610 size_t len;
859c31df 5611 char *options;
0ddebc0c 5612 const char *mon_addrs;
ecb4dc22 5613 char *snap_name;
0ddebc0c 5614 size_t mon_addrs_size;
859c31df 5615 struct rbd_spec *spec = NULL;
4e9afeba 5616 struct rbd_options *rbd_opts = NULL;
859c31df 5617 struct ceph_options *copts;
dc79b113 5618 int ret;
e28fff26
AE
5619
5620 /* The first four tokens are required */
5621
7ef3214a 5622 len = next_token(&buf);
4fb5d671
AE
5623 if (!len) {
5624 rbd_warn(NULL, "no monitor address(es) provided");
5625 return -EINVAL;
5626 }
0ddebc0c 5627 mon_addrs = buf;
f28e565a 5628 mon_addrs_size = len + 1;
7ef3214a 5629 buf += len;
a725f65e 5630
dc79b113 5631 ret = -EINVAL;
f28e565a
AE
5632 options = dup_token(&buf, NULL);
5633 if (!options)
dc79b113 5634 return -ENOMEM;
4fb5d671
AE
5635 if (!*options) {
5636 rbd_warn(NULL, "no options provided");
5637 goto out_err;
5638 }
e28fff26 5639
859c31df
AE
5640 spec = rbd_spec_alloc();
5641 if (!spec)
f28e565a 5642 goto out_mem;
859c31df
AE
5643
5644 spec->pool_name = dup_token(&buf, NULL);
5645 if (!spec->pool_name)
5646 goto out_mem;
4fb5d671
AE
5647 if (!*spec->pool_name) {
5648 rbd_warn(NULL, "no pool name provided");
5649 goto out_err;
5650 }
e28fff26 5651
69e7a02f 5652 spec->image_name = dup_token(&buf, NULL);
859c31df 5653 if (!spec->image_name)
f28e565a 5654 goto out_mem;
4fb5d671
AE
5655 if (!*spec->image_name) {
5656 rbd_warn(NULL, "no image name provided");
5657 goto out_err;
5658 }
d4b125e9 5659
f28e565a
AE
5660 /*
5661 * Snapshot name is optional; default is to use "-"
5662 * (indicating the head/no snapshot).
5663 */
3feeb894 5664 len = next_token(&buf);
820a5f3e 5665 if (!len) {
3feeb894
AE
5666 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
5667 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 5668 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 5669 ret = -ENAMETOOLONG;
f28e565a 5670 goto out_err;
849b4260 5671 }
ecb4dc22
AE
5672 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
5673 if (!snap_name)
f28e565a 5674 goto out_mem;
ecb4dc22
AE
5675 *(snap_name + len) = '\0';
5676 spec->snap_name = snap_name;
e5c35534 5677
0ddebc0c 5678 /* Initialize all rbd options to the defaults */
e28fff26 5679
4e9afeba
AE
5680 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
5681 if (!rbd_opts)
5682 goto out_mem;
5683
5684 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
b5584180 5685 rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
80de1912 5686 rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
e010dd0a 5687 rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
d22f76e7 5688
859c31df 5689 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 5690 mon_addrs + mon_addrs_size - 1,
4e9afeba 5691 parse_rbd_opts_token, rbd_opts);
859c31df
AE
5692 if (IS_ERR(copts)) {
5693 ret = PTR_ERR(copts);
dc79b113
AE
5694 goto out_err;
5695 }
859c31df
AE
5696 kfree(options);
5697
5698 *ceph_opts = copts;
4e9afeba 5699 *opts = rbd_opts;
859c31df 5700 *rbd_spec = spec;
0ddebc0c 5701
dc79b113 5702 return 0;
f28e565a 5703out_mem:
dc79b113 5704 ret = -ENOMEM;
d22f76e7 5705out_err:
859c31df
AE
5706 kfree(rbd_opts);
5707 rbd_spec_put(spec);
f28e565a 5708 kfree(options);
d22f76e7 5709
dc79b113 5710 return ret;
a725f65e
AE
5711}
5712
30ba1f02
ID
5713/*
5714 * Return pool id (>= 0) or a negative error code.
5715 */
5716static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
5717{
a319bf56 5718 struct ceph_options *opts = rbdc->client->options;
30ba1f02 5719 u64 newest_epoch;
30ba1f02
ID
5720 int tries = 0;
5721 int ret;
5722
5723again:
5724 ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5725 if (ret == -ENOENT && tries++ < 1) {
d0b19705
ID
5726 ret = ceph_monc_get_version(&rbdc->client->monc, "osdmap",
5727 &newest_epoch);
30ba1f02
ID
5728 if (ret < 0)
5729 return ret;
5730
5731 if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
7cca78c9 5732 ceph_osdc_maybe_request_map(&rbdc->client->osdc);
30ba1f02 5733 (void) ceph_monc_wait_osdmap(&rbdc->client->monc,
a319bf56
ID
5734 newest_epoch,
5735 opts->mount_timeout);
30ba1f02
ID
5736 goto again;
5737 } else {
5738 /* the osdmap we have is new enough */
5739 return -ENOENT;
5740 }
5741 }
5742
5743 return ret;
5744}
5745
e010dd0a
ID
5746static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
5747{
5748 down_write(&rbd_dev->lock_rwsem);
5749 if (__rbd_is_lock_owner(rbd_dev))
5750 rbd_unlock(rbd_dev);
5751 up_write(&rbd_dev->lock_rwsem);
5752}
5753
5754static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
5755{
5756 if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
5757 rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
5758 return -EINVAL;
5759 }
5760
5761 /* FIXME: "rbd map --exclusive" should be in interruptible */
5762 down_read(&rbd_dev->lock_rwsem);
5763 rbd_wait_state_locked(rbd_dev);
5764 up_read(&rbd_dev->lock_rwsem);
5765 if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
5766 rbd_warn(rbd_dev, "failed to acquire exclusive lock");
5767 return -EROFS;
5768 }
5769
5770 return 0;
5771}
5772
589d30e0
AE
5773/*
5774 * An rbd format 2 image has a unique identifier, distinct from the
5775 * name given to it by the user. Internally, that identifier is
5776 * what's used to specify the names of objects related to the image.
5777 *
5778 * A special "rbd id" object is used to map an rbd image name to its
5779 * id. If that object doesn't exist, then there is no v2 rbd image
5780 * with the supplied name.
5781 *
5782 * This function will record the given rbd_dev's image_id field if
5783 * it can be determined, and in that case will return 0. If any
5784 * errors occur a negative errno will be returned and the rbd_dev's
5785 * image_id field will be unchanged (and should be NULL).
5786 */
5787static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5788{
5789 int ret;
5790 size_t size;
ecd4a68a 5791 CEPH_DEFINE_OID_ONSTACK(oid);
589d30e0 5792 void *response;
c0fba368 5793 char *image_id;
2f82ee54 5794
2c0d0a10
AE
5795 /*
5796 * When probing a parent image, the image id is already
5797 * known (and the image name likely is not). There's no
c0fba368
AE
5798 * need to fetch the image id again in this case. We
5799 * do still need to set the image format though.
2c0d0a10 5800 */
c0fba368
AE
5801 if (rbd_dev->spec->image_id) {
5802 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5803
2c0d0a10 5804 return 0;
c0fba368 5805 }
2c0d0a10 5806
589d30e0
AE
5807 /*
5808 * First, see if the format 2 image id file exists, and if
5809 * so, get the image's persistent id from it.
5810 */
ecd4a68a
ID
5811 ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX,
5812 rbd_dev->spec->image_name);
5813 if (ret)
5814 return ret;
5815
5816 dout("rbd id object name is %s\n", oid.name);
589d30e0
AE
5817
5818 /* Response will be an encoded string, which includes a length */
5819
5820 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5821 response = kzalloc(size, GFP_NOIO);
5822 if (!response) {
5823 ret = -ENOMEM;
5824 goto out;
5825 }
5826
c0fba368
AE
5827 /* If it doesn't exist we'll assume it's a format 1 image */
5828
ecd4a68a
ID
5829 ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc,
5830 "get_id", NULL, 0,
5831 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 5832 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
5833 if (ret == -ENOENT) {
5834 image_id = kstrdup("", GFP_KERNEL);
5835 ret = image_id ? 0 : -ENOMEM;
5836 if (!ret)
5837 rbd_dev->image_format = 1;
7dd440c9 5838 } else if (ret >= 0) {
c0fba368
AE
5839 void *p = response;
5840
5841 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 5842 NULL, GFP_NOIO);
461f758a 5843 ret = PTR_ERR_OR_ZERO(image_id);
c0fba368
AE
5844 if (!ret)
5845 rbd_dev->image_format = 2;
c0fba368
AE
5846 }
5847
5848 if (!ret) {
5849 rbd_dev->spec->image_id = image_id;
5850 dout("image_id is %s\n", image_id);
589d30e0
AE
5851 }
5852out:
5853 kfree(response);
ecd4a68a 5854 ceph_oid_destroy(&oid);
589d30e0
AE
5855 return ret;
5856}
5857
3abef3b3
AE
5858/*
5859 * Undo whatever state changes are made by v1 or v2 header info
5860 * call.
5861 */
6fd48b3b
AE
5862static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5863{
5864 struct rbd_image_header *header;
5865
e69b8d41 5866 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
5867
5868 /* Free dynamic fields from the header, then zero it out */
5869
5870 header = &rbd_dev->header;
812164f8 5871 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
5872 kfree(header->snap_sizes);
5873 kfree(header->snap_names);
5874 kfree(header->object_prefix);
5875 memset(header, 0, sizeof (*header));
5876}
5877
2df3fac7 5878static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
5879{
5880 int ret;
a30b71b9 5881
1e130199 5882 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 5883 if (ret)
b1b5402a
AE
5884 goto out_err;
5885
2df3fac7
AE
5886 /*
5887 * Get the and check features for the image. Currently the
5888 * features are assumed to never change.
5889 */
b1b5402a 5890 ret = rbd_dev_v2_features(rbd_dev);
57385b51 5891 if (ret)
9d475de5 5892 goto out_err;
35d489f9 5893
cc070d59
AE
5894 /* If the image supports fancy striping, get its parameters */
5895
5896 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5897 ret = rbd_dev_v2_striping_info(rbd_dev);
5898 if (ret < 0)
5899 goto out_err;
5900 }
a30b71b9 5901
7e97332e
ID
5902 if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) {
5903 ret = rbd_dev_v2_data_pool(rbd_dev);
5904 if (ret)
5905 goto out_err;
5906 }
5907
263423f8 5908 rbd_init_layout(rbd_dev);
35152979 5909 return 0;
263423f8 5910
9d475de5 5911out_err:
642a2537 5912 rbd_dev->header.features = 0;
1e130199
AE
5913 kfree(rbd_dev->header.object_prefix);
5914 rbd_dev->header.object_prefix = NULL;
9d475de5 5915 return ret;
a30b71b9
AE
5916}
5917
6d69bb53
ID
5918/*
5919 * @depth is rbd_dev_image_probe() -> rbd_dev_probe_parent() ->
5920 * rbd_dev_image_probe() recursion depth, which means it's also the
5921 * length of the already discovered part of the parent chain.
5922 */
5923static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
83a06263 5924{
2f82ee54 5925 struct rbd_device *parent = NULL;
124afba2
AE
5926 int ret;
5927
5928 if (!rbd_dev->parent_spec)
5929 return 0;
124afba2 5930
6d69bb53
ID
5931 if (++depth > RBD_MAX_PARENT_CHAIN_LEN) {
5932 pr_info("parent chain is too long (%d)\n", depth);
5933 ret = -EINVAL;
5934 goto out_err;
5935 }
5936
1643dfa4 5937 parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
1f2c6651
ID
5938 if (!parent) {
5939 ret = -ENOMEM;
124afba2 5940 goto out_err;
1f2c6651
ID
5941 }
5942
5943 /*
5944 * Images related by parent/child relationships always share
5945 * rbd_client and spec/parent_spec, so bump their refcounts.
5946 */
5947 __rbd_get_client(rbd_dev->rbd_client);
5948 rbd_spec_get(rbd_dev->parent_spec);
124afba2 5949
6d69bb53 5950 ret = rbd_dev_image_probe(parent, depth);
124afba2
AE
5951 if (ret < 0)
5952 goto out_err;
1f2c6651 5953
124afba2 5954 rbd_dev->parent = parent;
a2acd00e 5955 atomic_set(&rbd_dev->parent_ref, 1);
124afba2 5956 return 0;
1f2c6651 5957
124afba2 5958out_err:
1f2c6651 5959 rbd_dev_unparent(rbd_dev);
1761b229 5960 rbd_dev_destroy(parent);
124afba2
AE
5961 return ret;
5962}
5963
5769ed0c
ID
5964static void rbd_dev_device_release(struct rbd_device *rbd_dev)
5965{
5966 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5967 rbd_dev_mapping_clear(rbd_dev);
5968 rbd_free_disk(rbd_dev);
5969 if (!single_major)
5970 unregister_blkdev(rbd_dev->major, rbd_dev->name);
5971}
5972
811c6688
ID
5973/*
5974 * rbd_dev->header_rwsem must be locked for write and will be unlocked
5975 * upon return.
5976 */
200a6a8b 5977static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 5978{
83a06263 5979 int ret;
d1cf5788 5980
9b60e70b 5981 /* Record our major and minor device numbers. */
83a06263 5982
9b60e70b
ID
5983 if (!single_major) {
5984 ret = register_blkdev(0, rbd_dev->name);
5985 if (ret < 0)
1643dfa4 5986 goto err_out_unlock;
9b60e70b
ID
5987
5988 rbd_dev->major = ret;
5989 rbd_dev->minor = 0;
5990 } else {
5991 rbd_dev->major = rbd_major;
5992 rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5993 }
83a06263
AE
5994
5995 /* Set up the blkdev mapping. */
5996
5997 ret = rbd_init_disk(rbd_dev);
5998 if (ret)
5999 goto err_out_blkdev;
6000
f35a4dee 6001 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
6002 if (ret)
6003 goto err_out_disk;
bc1ecc65 6004
f35a4dee 6005 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
22001f61 6006 set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
f35a4dee 6007
5769ed0c 6008 ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
f35a4dee 6009 if (ret)
f5ee37bd 6010 goto err_out_mapping;
83a06263 6011
129b79d4 6012 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
811c6688 6013 up_write(&rbd_dev->header_rwsem);
5769ed0c 6014 return 0;
2f82ee54 6015
f35a4dee
AE
6016err_out_mapping:
6017 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
6018err_out_disk:
6019 rbd_free_disk(rbd_dev);
6020err_out_blkdev:
9b60e70b
ID
6021 if (!single_major)
6022 unregister_blkdev(rbd_dev->major, rbd_dev->name);
811c6688
ID
6023err_out_unlock:
6024 up_write(&rbd_dev->header_rwsem);
83a06263
AE
6025 return ret;
6026}
6027
332bb12d
AE
6028static int rbd_dev_header_name(struct rbd_device *rbd_dev)
6029{
6030 struct rbd_spec *spec = rbd_dev->spec;
c41d13a3 6031 int ret;
332bb12d
AE
6032
6033 /* Record the header object name for this rbd image. */
6034
6035 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
332bb12d 6036 if (rbd_dev->image_format == 1)
c41d13a3
ID
6037 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6038 spec->image_name, RBD_SUFFIX);
332bb12d 6039 else
c41d13a3
ID
6040 ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
6041 RBD_HEADER_PREFIX, spec->image_id);
332bb12d 6042
c41d13a3 6043 return ret;
332bb12d
AE
6044}
6045
200a6a8b
AE
6046static void rbd_dev_image_release(struct rbd_device *rbd_dev)
6047{
fd22aef8
ID
6048 if (rbd_dev->opts)
6049 rbd_unregister_watch(rbd_dev);
f463b127
ID
6050
6051 rbd_dev_unprobe(rbd_dev);
6fd48b3b
AE
6052 rbd_dev->image_format = 0;
6053 kfree(rbd_dev->spec->image_id);
6054 rbd_dev->spec->image_id = NULL;
200a6a8b
AE
6055}
6056
a30b71b9
AE
6057/*
6058 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
6059 * device. If this image is the one being mapped (i.e., not a
6060 * parent), initiate a watch on its header object before using that
6061 * object to get detailed information about the rbd image.
fce4bd57
ID
6062 *
6063 * On success, returns with header_rwsem held for write if called
6064 * with @depth == 0.
a30b71b9 6065 */
6d69bb53 6066static int rbd_dev_image_probe(struct rbd_device *rbd_dev, int depth)
a30b71b9
AE
6067{
6068 int ret;
6069
6070 /*
3abef3b3
AE
6071 * Get the id from the image id object. Unless there's an
6072 * error, rbd_dev->spec->image_id will be filled in with
6073 * a dynamically-allocated string, and rbd_dev->image_format
6074 * will be set to either 1 or 2.
a30b71b9
AE
6075 */
6076 ret = rbd_dev_image_id(rbd_dev);
6077 if (ret)
c0fba368 6078 return ret;
c0fba368 6079
332bb12d
AE
6080 ret = rbd_dev_header_name(rbd_dev);
6081 if (ret)
6082 goto err_out_format;
6083
6d69bb53 6084 if (!depth) {
99d16943 6085 ret = rbd_register_watch(rbd_dev);
1fe48023
ID
6086 if (ret) {
6087 if (ret == -ENOENT)
6088 pr_info("image %s/%s does not exist\n",
6089 rbd_dev->spec->pool_name,
6090 rbd_dev->spec->image_name);
c41d13a3 6091 goto err_out_format;
1fe48023 6092 }
1f3ef788 6093 }
b644de2b 6094
fce4bd57
ID
6095 if (!depth)
6096 down_write(&rbd_dev->header_rwsem);
6097
a720ae09 6098 ret = rbd_dev_header_info(rbd_dev);
5655c4d9 6099 if (ret)
f463b127 6100 goto err_out_probe;
83a06263 6101
04077599
ID
6102 /*
6103 * If this image is the one being mapped, we have pool name and
6104 * id, image name and id, and snap name - need to fill snap id.
6105 * Otherwise this is a parent image, identified by pool, image
6106 * and snap ids - need to fill in names for those ids.
6107 */
6d69bb53 6108 if (!depth)
04077599
ID
6109 ret = rbd_spec_fill_snap_id(rbd_dev);
6110 else
6111 ret = rbd_spec_fill_names(rbd_dev);
1fe48023
ID
6112 if (ret) {
6113 if (ret == -ENOENT)
6114 pr_info("snap %s/%s@%s does not exist\n",
6115 rbd_dev->spec->pool_name,
6116 rbd_dev->spec->image_name,
6117 rbd_dev->spec->snap_name);
33dca39f 6118 goto err_out_probe;
1fe48023 6119 }
9bb81c9b 6120
e8f59b59
ID
6121 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
6122 ret = rbd_dev_v2_parent_info(rbd_dev);
6123 if (ret)
6124 goto err_out_probe;
6125
6126 /*
6127 * Need to warn users if this image is the one being
6128 * mapped and has a parent.
6129 */
6d69bb53 6130 if (!depth && rbd_dev->parent_spec)
e8f59b59
ID
6131 rbd_warn(rbd_dev,
6132 "WARNING: kernel layering is EXPERIMENTAL!");
6133 }
6134
6d69bb53 6135 ret = rbd_dev_probe_parent(rbd_dev, depth);
30d60ba2
AE
6136 if (ret)
6137 goto err_out_probe;
6138
6139 dout("discovered format %u image, header name is %s\n",
c41d13a3 6140 rbd_dev->image_format, rbd_dev->header_oid.name);
30d60ba2 6141 return 0;
e8f59b59 6142
6fd48b3b 6143err_out_probe:
fce4bd57
ID
6144 if (!depth)
6145 up_write(&rbd_dev->header_rwsem);
6d69bb53 6146 if (!depth)
99d16943 6147 rbd_unregister_watch(rbd_dev);
f463b127 6148 rbd_dev_unprobe(rbd_dev);
332bb12d
AE
6149err_out_format:
6150 rbd_dev->image_format = 0;
5655c4d9
AE
6151 kfree(rbd_dev->spec->image_id);
6152 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
6153 return ret;
6154}
6155
9b60e70b
ID
6156static ssize_t do_rbd_add(struct bus_type *bus,
6157 const char *buf,
6158 size_t count)
602adf40 6159{
cb8627c7 6160 struct rbd_device *rbd_dev = NULL;
dc79b113 6161 struct ceph_options *ceph_opts = NULL;
4e9afeba 6162 struct rbd_options *rbd_opts = NULL;
859c31df 6163 struct rbd_spec *spec = NULL;
9d3997fd 6164 struct rbd_client *rbdc;
51344a38 6165 bool read_only;
b51c83c2 6166 int rc;
602adf40 6167
af4a8899
ID
6168 if (!capable(CAP_SYS_ADMIN))
6169 return -EPERM;
6170
602adf40
YS
6171 if (!try_module_get(THIS_MODULE))
6172 return -ENODEV;
6173
602adf40 6174 /* parse add command */
859c31df 6175 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 6176 if (rc < 0)
dd5ac32d 6177 goto out;
78cea76e 6178
9d3997fd
AE
6179 rbdc = rbd_get_client(ceph_opts);
6180 if (IS_ERR(rbdc)) {
6181 rc = PTR_ERR(rbdc);
0ddebc0c 6182 goto err_out_args;
9d3997fd 6183 }
602adf40 6184
602adf40 6185 /* pick the pool */
30ba1f02 6186 rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
1fe48023
ID
6187 if (rc < 0) {
6188 if (rc == -ENOENT)
6189 pr_info("pool %s does not exist\n", spec->pool_name);
602adf40 6190 goto err_out_client;
1fe48023 6191 }
c0cd10db 6192 spec->pool_id = (u64)rc;
859c31df 6193
d147543d 6194 rbd_dev = rbd_dev_create(rbdc, spec, rbd_opts);
b51c83c2
ID
6195 if (!rbd_dev) {
6196 rc = -ENOMEM;
bd4ba655 6197 goto err_out_client;
b51c83c2 6198 }
c53d5893
AE
6199 rbdc = NULL; /* rbd_dev now owns this */
6200 spec = NULL; /* rbd_dev now owns this */
d147543d 6201 rbd_opts = NULL; /* rbd_dev now owns this */
602adf40 6202
0d6d1e9c
MC
6203 rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
6204 if (!rbd_dev->config_info) {
6205 rc = -ENOMEM;
6206 goto err_out_rbd_dev;
6207 }
6208
6d69bb53 6209 rc = rbd_dev_image_probe(rbd_dev, 0);
fce4bd57 6210 if (rc < 0)
c53d5893 6211 goto err_out_rbd_dev;
05fd6f6f 6212
7ce4eef7
AE
6213 /* If we are mapping a snapshot it must be marked read-only */
6214
d147543d 6215 read_only = rbd_dev->opts->read_only;
7ce4eef7
AE
6216 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
6217 read_only = true;
6218 rbd_dev->mapping.read_only = read_only;
6219
b536f69a 6220 rc = rbd_dev_device_setup(rbd_dev);
fd22aef8 6221 if (rc)
8b679ec5 6222 goto err_out_image_probe;
3abef3b3 6223
e010dd0a
ID
6224 if (rbd_dev->opts->exclusive) {
6225 rc = rbd_add_acquire_lock(rbd_dev);
6226 if (rc)
6227 goto err_out_device_setup;
3abef3b3
AE
6228 }
6229
5769ed0c
ID
6230 /* Everything's ready. Announce the disk to the world. */
6231
6232 rc = device_add(&rbd_dev->dev);
6233 if (rc)
e010dd0a 6234 goto err_out_image_lock;
5769ed0c
ID
6235
6236 add_disk(rbd_dev->disk);
6237 /* see rbd_init_disk() */
6238 blk_put_queue(rbd_dev->disk->queue);
6239
6240 spin_lock(&rbd_dev_list_lock);
6241 list_add_tail(&rbd_dev->node, &rbd_dev_list);
6242 spin_unlock(&rbd_dev_list_lock);
6243
6244 pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
6245 (unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
6246 rbd_dev->header.features);
dd5ac32d
ID
6247 rc = count;
6248out:
6249 module_put(THIS_MODULE);
6250 return rc;
b536f69a 6251
e010dd0a
ID
6252err_out_image_lock:
6253 rbd_dev_image_unlock(rbd_dev);
5769ed0c
ID
6254err_out_device_setup:
6255 rbd_dev_device_release(rbd_dev);
8b679ec5
ID
6256err_out_image_probe:
6257 rbd_dev_image_release(rbd_dev);
c53d5893
AE
6258err_out_rbd_dev:
6259 rbd_dev_destroy(rbd_dev);
bd4ba655 6260err_out_client:
9d3997fd 6261 rbd_put_client(rbdc);
0ddebc0c 6262err_out_args:
859c31df 6263 rbd_spec_put(spec);
d147543d 6264 kfree(rbd_opts);
dd5ac32d 6265 goto out;
602adf40
YS
6266}
6267
9b60e70b
ID
6268static ssize_t rbd_add(struct bus_type *bus,
6269 const char *buf,
6270 size_t count)
6271{
6272 if (single_major)
6273 return -EINVAL;
6274
6275 return do_rbd_add(bus, buf, count);
6276}
6277
6278static ssize_t rbd_add_single_major(struct bus_type *bus,
6279 const char *buf,
6280 size_t count)
6281{
6282 return do_rbd_add(bus, buf, count);
6283}
6284
05a46afd
AE
6285static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
6286{
ad945fc1 6287 while (rbd_dev->parent) {
05a46afd
AE
6288 struct rbd_device *first = rbd_dev;
6289 struct rbd_device *second = first->parent;
6290 struct rbd_device *third;
6291
6292 /*
6293 * Follow to the parent with no grandparent and
6294 * remove it.
6295 */
6296 while (second && (third = second->parent)) {
6297 first = second;
6298 second = third;
6299 }
ad945fc1 6300 rbd_assert(second);
8ad42cd0 6301 rbd_dev_image_release(second);
8b679ec5 6302 rbd_dev_destroy(second);
ad945fc1
AE
6303 first->parent = NULL;
6304 first->parent_overlap = 0;
6305
6306 rbd_assert(first->parent_spec);
05a46afd
AE
6307 rbd_spec_put(first->parent_spec);
6308 first->parent_spec = NULL;
05a46afd
AE
6309 }
6310}
6311
9b60e70b
ID
6312static ssize_t do_rbd_remove(struct bus_type *bus,
6313 const char *buf,
6314 size_t count)
602adf40
YS
6315{
6316 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
6317 struct list_head *tmp;
6318 int dev_id;
0276dca6 6319 char opt_buf[6];
0276dca6 6320 bool force = false;
0d8189e1 6321 int ret;
602adf40 6322
af4a8899
ID
6323 if (!capable(CAP_SYS_ADMIN))
6324 return -EPERM;
6325
0276dca6
MC
6326 dev_id = -1;
6327 opt_buf[0] = '\0';
6328 sscanf(buf, "%d %5s", &dev_id, opt_buf);
6329 if (dev_id < 0) {
6330 pr_err("dev_id out of range\n");
602adf40 6331 return -EINVAL;
0276dca6
MC
6332 }
6333 if (opt_buf[0] != '\0') {
6334 if (!strcmp(opt_buf, "force")) {
6335 force = true;
6336 } else {
6337 pr_err("bad remove option at '%s'\n", opt_buf);
6338 return -EINVAL;
6339 }
6340 }
602adf40 6341
751cc0e3
AE
6342 ret = -ENOENT;
6343 spin_lock(&rbd_dev_list_lock);
6344 list_for_each(tmp, &rbd_dev_list) {
6345 rbd_dev = list_entry(tmp, struct rbd_device, node);
6346 if (rbd_dev->dev_id == dev_id) {
6347 ret = 0;
6348 break;
6349 }
42382b70 6350 }
751cc0e3
AE
6351 if (!ret) {
6352 spin_lock_irq(&rbd_dev->lock);
0276dca6 6353 if (rbd_dev->open_count && !force)
751cc0e3 6354 ret = -EBUSY;
022ce60c
ID
6355 else if (test_and_set_bit(RBD_DEV_FLAG_REMOVING,
6356 &rbd_dev->flags))
6357 ret = -EINPROGRESS;
751cc0e3
AE
6358 spin_unlock_irq(&rbd_dev->lock);
6359 }
6360 spin_unlock(&rbd_dev_list_lock);
022ce60c 6361 if (ret)
1ba0f1e7 6362 return ret;
751cc0e3 6363
0276dca6
MC
6364 if (force) {
6365 /*
6366 * Prevent new IO from being queued and wait for existing
6367 * IO to complete/fail.
6368 */
6369 blk_mq_freeze_queue(rbd_dev->disk->queue);
6370 blk_set_queue_dying(rbd_dev->disk->queue);
6371 }
6372
5769ed0c
ID
6373 del_gendisk(rbd_dev->disk);
6374 spin_lock(&rbd_dev_list_lock);
6375 list_del_init(&rbd_dev->node);
6376 spin_unlock(&rbd_dev_list_lock);
6377 device_del(&rbd_dev->dev);
fca27065 6378
e010dd0a 6379 rbd_dev_image_unlock(rbd_dev);
dd5ac32d 6380 rbd_dev_device_release(rbd_dev);
8ad42cd0 6381 rbd_dev_image_release(rbd_dev);
8b679ec5 6382 rbd_dev_destroy(rbd_dev);
1ba0f1e7 6383 return count;
602adf40
YS
6384}
6385
9b60e70b
ID
6386static ssize_t rbd_remove(struct bus_type *bus,
6387 const char *buf,
6388 size_t count)
6389{
6390 if (single_major)
6391 return -EINVAL;
6392
6393 return do_rbd_remove(bus, buf, count);
6394}
6395
6396static ssize_t rbd_remove_single_major(struct bus_type *bus,
6397 const char *buf,
6398 size_t count)
6399{
6400 return do_rbd_remove(bus, buf, count);
6401}
6402
602adf40
YS
6403/*
6404 * create control files in sysfs
dfc5606d 6405 * /sys/bus/rbd/...
602adf40
YS
6406 */
6407static int rbd_sysfs_init(void)
6408{
dfc5606d 6409 int ret;
602adf40 6410
fed4c143 6411 ret = device_register(&rbd_root_dev);
21079786 6412 if (ret < 0)
dfc5606d 6413 return ret;
602adf40 6414
fed4c143
AE
6415 ret = bus_register(&rbd_bus_type);
6416 if (ret < 0)
6417 device_unregister(&rbd_root_dev);
602adf40 6418
602adf40
YS
6419 return ret;
6420}
6421
6422static void rbd_sysfs_cleanup(void)
6423{
dfc5606d 6424 bus_unregister(&rbd_bus_type);
fed4c143 6425 device_unregister(&rbd_root_dev);
602adf40
YS
6426}
6427
1c2a9dfe
AE
6428static int rbd_slab_init(void)
6429{
6430 rbd_assert(!rbd_img_request_cache);
03d94406 6431 rbd_img_request_cache = KMEM_CACHE(rbd_img_request, 0);
868311b1
AE
6432 if (!rbd_img_request_cache)
6433 return -ENOMEM;
6434
6435 rbd_assert(!rbd_obj_request_cache);
03d94406 6436 rbd_obj_request_cache = KMEM_CACHE(rbd_obj_request, 0);
78c2a44a
AE
6437 if (!rbd_obj_request_cache)
6438 goto out_err;
6439
f856dc36
N
6440 rbd_assert(!rbd_bio_clone);
6441 rbd_bio_clone = bioset_create(BIO_POOL_SIZE, 0, 0);
6442 if (!rbd_bio_clone)
6443 goto out_err_clone;
6444
6c696d85 6445 return 0;
1c2a9dfe 6446
f856dc36
N
6447out_err_clone:
6448 kmem_cache_destroy(rbd_obj_request_cache);
6449 rbd_obj_request_cache = NULL;
6c696d85 6450out_err:
868311b1
AE
6451 kmem_cache_destroy(rbd_img_request_cache);
6452 rbd_img_request_cache = NULL;
1c2a9dfe
AE
6453 return -ENOMEM;
6454}
6455
6456static void rbd_slab_exit(void)
6457{
868311b1
AE
6458 rbd_assert(rbd_obj_request_cache);
6459 kmem_cache_destroy(rbd_obj_request_cache);
6460 rbd_obj_request_cache = NULL;
6461
1c2a9dfe
AE
6462 rbd_assert(rbd_img_request_cache);
6463 kmem_cache_destroy(rbd_img_request_cache);
6464 rbd_img_request_cache = NULL;
f856dc36
N
6465
6466 rbd_assert(rbd_bio_clone);
6467 bioset_free(rbd_bio_clone);
6468 rbd_bio_clone = NULL;
1c2a9dfe
AE
6469}
6470
cc344fa1 6471static int __init rbd_init(void)
602adf40
YS
6472{
6473 int rc;
6474
1e32d34c
AE
6475 if (!libceph_compatible(NULL)) {
6476 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
6477 return -EINVAL;
6478 }
e1b4d96d 6479
1c2a9dfe 6480 rc = rbd_slab_init();
602adf40
YS
6481 if (rc)
6482 return rc;
e1b4d96d 6483
f5ee37bd
ID
6484 /*
6485 * The number of active work items is limited by the number of
f77303bd 6486 * rbd devices * queue depth, so leave @max_active at default.
f5ee37bd
ID
6487 */
6488 rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
6489 if (!rbd_wq) {
6490 rc = -ENOMEM;
6491 goto err_out_slab;
6492 }
6493
9b60e70b
ID
6494 if (single_major) {
6495 rbd_major = register_blkdev(0, RBD_DRV_NAME);
6496 if (rbd_major < 0) {
6497 rc = rbd_major;
f5ee37bd 6498 goto err_out_wq;
9b60e70b
ID
6499 }
6500 }
6501
1c2a9dfe
AE
6502 rc = rbd_sysfs_init();
6503 if (rc)
9b60e70b
ID
6504 goto err_out_blkdev;
6505
6506 if (single_major)
6507 pr_info("loaded (major %d)\n", rbd_major);
6508 else
6509 pr_info("loaded\n");
1c2a9dfe 6510
e1b4d96d
ID
6511 return 0;
6512
9b60e70b
ID
6513err_out_blkdev:
6514 if (single_major)
6515 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd
ID
6516err_out_wq:
6517 destroy_workqueue(rbd_wq);
e1b4d96d
ID
6518err_out_slab:
6519 rbd_slab_exit();
1c2a9dfe 6520 return rc;
602adf40
YS
6521}
6522
cc344fa1 6523static void __exit rbd_exit(void)
602adf40 6524{
ffe312cf 6525 ida_destroy(&rbd_dev_id_ida);
602adf40 6526 rbd_sysfs_cleanup();
9b60e70b
ID
6527 if (single_major)
6528 unregister_blkdev(rbd_major, RBD_DRV_NAME);
f5ee37bd 6529 destroy_workqueue(rbd_wq);
1c2a9dfe 6530 rbd_slab_exit();
602adf40
YS
6531}
6532
6533module_init(rbd_init);
6534module_exit(rbd_exit);
6535
d552c619 6536MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
6537MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
6538MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
6539/* following authorship retained from original osdblk.c */
6540MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
6541
90da258b 6542MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 6543MODULE_LICENSE("GPL");