rbd.c revision 4d9b67cddd9b9bc320473a334cc8023a4186092f
1
2/*
3   rbd.c -- Export ceph rados objects as a Linux block device
4
5
6   based on drivers/block/osdblk.c:
7
8   Copyright 2009 Red Hat, Inc.
9
10   This program is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation.
13
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License for more details.
18
19   You should have received a copy of the GNU General Public License
20   along with this program; see the file COPYING.  If not, write to
21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25   For usage instructions, please refer to:
26
27                 Documentation/ABI/testing/sysfs-bus-rbd
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
35#include <linux/parser.h>
36#include <linux/bsearch.h>
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
43#include <linux/slab.h>
44#include <linux/idr.h>
45
46#include "rbd_types.h"
47
48#define RBD_DEBUG	/* Activate rbd_assert() calls */
49
50/*
51 * The basic unit of block I/O is a sector.  It is interpreted in a
52 * number of contexts in Linux (blk, bio, genhd), but the default is
53 * universally 512 bytes.  These symbols are just slightly more
54 * meaningful than the bare numbers they represent.
55 */
56#define	SECTOR_SHIFT	9
57#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
58
59/*
60 * Increment the given counter and return its updated value.
61 * If the counter is already 0 it will not be incremented.
62 * If the counter is already at its maximum value returns
63 * -EINVAL without updating it.
64 */
65static int atomic_inc_return_safe(atomic_t *v)
66{
67	unsigned int counter;
68
69	counter = (unsigned int)__atomic_add_unless(v, 1, 0);
70	if (counter <= (unsigned int)INT_MAX)
71		return (int)counter;
72
73	atomic_dec(v);
74
75	return -EINVAL;
76}
77
78/* Decrement the counter.  Return the resulting value, or -EINVAL */
79static int atomic_dec_return_safe(atomic_t *v)
80{
81	int counter;
82
83	counter = atomic_dec_return(v);
84	if (counter >= 0)
85		return counter;
86
87	atomic_inc(v);
88
89	return -EINVAL;
90}
91
92#define RBD_DRV_NAME "rbd"
93
94#define RBD_MINORS_PER_MAJOR		256
95#define RBD_SINGLE_MAJOR_PART_SHIFT	4
96
97#define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
98#define RBD_MAX_SNAP_NAME_LEN	\
99			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
100
101#define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
102
103#define RBD_SNAP_HEAD_NAME	"-"
104
105#define	BAD_SNAP_INDEX	U32_MAX		/* invalid index into snap array */
106
107/* This allows a single page to hold an image name sent by OSD */
108#define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
109#define RBD_IMAGE_ID_LEN_MAX	64
110
111#define RBD_OBJ_PREFIX_LEN_MAX	64
112
113/* Feature bits */
114
115#define RBD_FEATURE_LAYERING	(1<<0)
116#define RBD_FEATURE_STRIPINGV2	(1<<1)
117#define RBD_FEATURES_ALL \
118	    (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
119
120/* Features supported by this (client software) implementation. */
121
122#define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
123
124/*
125 * An RBD device name will be "rbd#", where the "rbd" comes from
126 * RBD_DRV_NAME above, and # is a unique integer identifier.
127 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
128 * enough to hold all possible device names.
129 */
130#define DEV_NAME_LEN		32
131#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
132
133/*
134 * block device image metadata (in-memory version)
135 */
136struct rbd_image_header {
137	/* These six fields never change for a given rbd image */
138	char *object_prefix;
139	__u8 obj_order;
140	__u8 crypt_type;
141	__u8 comp_type;
142	u64 stripe_unit;
143	u64 stripe_count;
144	u64 features;		/* Might be changeable someday? */
145
146	/* The remaining fields need to be updated occasionally */
147	u64 image_size;
148	struct ceph_snap_context *snapc;
149	char *snap_names;	/* format 1 only */
150	u64 *snap_sizes;	/* format 1 only */
151};
152
153/*
154 * An rbd image specification.
155 *
156 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
157 * identify an image.  Each rbd_dev structure includes a pointer to
158 * an rbd_spec structure that encapsulates this identity.
159 *
160 * Each of the id's in an rbd_spec has an associated name.  For a
161 * user-mapped image, the names are supplied and the id's associated
162 * with them are looked up.  For a layered image, a parent image is
163 * defined by the tuple, and the names are looked up.
164 *
165 * An rbd_dev structure contains a parent_spec pointer which is
166 * non-null if the image it represents is a child in a layered
167 * image.  This pointer will refer to the rbd_spec structure used
168 * by the parent rbd_dev for its own identity (i.e., the structure
169 * is shared between the parent and child).
170 *
171 * Since these structures are populated once, during the discovery
172 * phase of image construction, they are effectively immutable so
173 * we make no effort to synchronize access to them.
174 *
175 * Note that code herein does not assume the image name is known (it
176 * could be a null pointer).
177 */
178struct rbd_spec {
179	u64		pool_id;
180	const char	*pool_name;
181
182	const char	*image_id;
183	const char	*image_name;
184
185	u64		snap_id;
186	const char	*snap_name;
187
188	struct kref	kref;
189};
190
191/*
192 * an instance of the client.  multiple devices may share an rbd client.
193 */
194struct rbd_client {
195	struct ceph_client	*client;
196	struct kref		kref;
197	struct list_head	node;
198};
199
200struct rbd_img_request;
201typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
202
203#define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
204
205struct rbd_obj_request;
206typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
207
208enum obj_request_type {
209	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
210};
211
212enum obj_req_flags {
213	OBJ_REQ_DONE,		/* completion flag: not done = 0, done = 1 */
214	OBJ_REQ_IMG_DATA,	/* object usage: standalone = 0, image = 1 */
215	OBJ_REQ_KNOWN,		/* EXISTS flag valid: no = 0, yes = 1 */
216	OBJ_REQ_EXISTS,		/* target exists: no = 0, yes = 1 */
217};
218
219struct rbd_obj_request {
220	const char		*object_name;
221	u64			offset;		/* object start byte */
222	u64			length;		/* bytes from offset */
223	unsigned long		flags;
224
225	/*
226	 * An object request associated with an image will have its
227	 * img_data flag set; a standalone object request will not.
228	 *
229	 * A standalone object request will have which == BAD_WHICH
230	 * and a null obj_request pointer.
231	 *
232	 * An object request initiated in support of a layered image
233	 * object (to check for its existence before a write) will
234	 * have which == BAD_WHICH and a non-null obj_request pointer.
235	 *
236	 * Finally, an object request for rbd image data will have
237	 * which != BAD_WHICH, and will have a non-null img_request
238	 * pointer.  The value of which will be in the range
239	 * 0..(img_request->obj_request_count-1).
240	 */
241	union {
242		struct rbd_obj_request	*obj_request;	/* STAT op */
243		struct {
244			struct rbd_img_request	*img_request;
245			u64			img_offset;
246			/* links for img_request->obj_requests list */
247			struct list_head	links;
248		};
249	};
250	u32			which;		/* posn image request list */
251
252	enum obj_request_type	type;
253	union {
254		struct bio	*bio_list;
255		struct {
256			struct page	**pages;
257			u32		page_count;
258		};
259	};
260	struct page		**copyup_pages;
261	u32			copyup_page_count;
262
263	struct ceph_osd_request	*osd_req;
264
265	u64			xferred;	/* bytes transferred */
266	int			result;
267
268	rbd_obj_callback_t	callback;
269	struct completion	completion;
270
271	struct kref		kref;
272};
273
274enum img_req_flags {
275	IMG_REQ_WRITE,		/* I/O direction: read = 0, write = 1 */
276	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
277	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
278};
279
280struct rbd_img_request {
281	struct rbd_device	*rbd_dev;
282	u64			offset;	/* starting image byte offset */
283	u64			length;	/* byte count from offset */
284	unsigned long		flags;
285	union {
286		u64			snap_id;	/* for reads */
287		struct ceph_snap_context *snapc;	/* for writes */
288	};
289	union {
290		struct request		*rq;		/* block request */
291		struct rbd_obj_request	*obj_request;	/* obj req initiator */
292	};
293	struct page		**copyup_pages;
294	u32			copyup_page_count;
295	spinlock_t		completion_lock;/* protects next_completion */
296	u32			next_completion;
297	rbd_img_callback_t	callback;
298	u64			xferred;/* aggregate bytes transferred */
299	int			result;	/* first nonzero obj_request result */
300
301	u32			obj_request_count;
302	struct list_head	obj_requests;	/* rbd_obj_request structs */
303
304	struct kref		kref;
305};
306
307#define for_each_obj_request(ireq, oreq) \
308	list_for_each_entry(oreq, &(ireq)->obj_requests, links)
309#define for_each_obj_request_from(ireq, oreq) \
310	list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
311#define for_each_obj_request_safe(ireq, oreq, n) \
312	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
313
314struct rbd_mapping {
315	u64                     size;
316	u64                     features;
317	bool			read_only;
318};
319
320/*
321 * a single device
322 */
323struct rbd_device {
324	int			dev_id;		/* blkdev unique id */
325
326	int			major;		/* blkdev assigned major */
327	int			minor;
328	struct gendisk		*disk;		/* blkdev's gendisk and rq */
329
330	u32			image_format;	/* Either 1 or 2 */
331	struct rbd_client	*rbd_client;
332
333	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
334
335	spinlock_t		lock;		/* queue, flags, open_count */
336
337	struct rbd_image_header	header;
338	unsigned long		flags;		/* possibly lock protected */
339	struct rbd_spec		*spec;
340
341	char			*header_name;
342
343	struct ceph_file_layout	layout;
344
345	struct ceph_osd_event   *watch_event;
346	struct rbd_obj_request	*watch_request;
347
348	struct rbd_spec		*parent_spec;
349	u64			parent_overlap;
350	atomic_t		parent_ref;
351	struct rbd_device	*parent;
352
353	/* protects updating the header */
354	struct rw_semaphore     header_rwsem;
355
356	struct rbd_mapping	mapping;
357
358	struct list_head	node;
359
360	/* sysfs related */
361	struct device		dev;
362	unsigned long		open_count;	/* protected by lock */
363};
364
365/*
366 * Flag bits for rbd_dev->flags.  If atomicity is required,
367 * rbd_dev->lock is used to protect access.
368 *
369 * Currently, only the "removing" flag (which is coupled with the
370 * "open_count" field) requires atomic access.
371 */
372enum rbd_dev_flags {
373	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
374	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
375};
376
377static DEFINE_MUTEX(client_mutex);	/* Serialize client creation */
378
379static LIST_HEAD(rbd_dev_list);    /* devices */
380static DEFINE_SPINLOCK(rbd_dev_list_lock);
381
382static LIST_HEAD(rbd_client_list);		/* clients */
383static DEFINE_SPINLOCK(rbd_client_list_lock);
384
385/* Slab caches for frequently-allocated structures */
386
387static struct kmem_cache	*rbd_img_request_cache;
388static struct kmem_cache	*rbd_obj_request_cache;
389static struct kmem_cache	*rbd_segment_name_cache;
390
391static int rbd_major;
392static DEFINE_IDA(rbd_dev_id_ida);
393
394/*
395 * Default to false for now, as single-major requires >= 0.75 version of
396 * userspace rbd utility.
397 */
398static bool single_major = false;
399module_param(single_major, bool, S_IRUGO);
400MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
401
402static int rbd_img_request_submit(struct rbd_img_request *img_request);
403
404static void rbd_dev_device_release(struct device *dev);
405
406static ssize_t rbd_add(struct bus_type *bus, const char *buf,
407		       size_t count);
408static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
409			  size_t count);
410static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
411				    size_t count);
412static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
413				       size_t count);
414static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
415static void rbd_spec_put(struct rbd_spec *spec);
416
417static int rbd_dev_id_to_minor(int dev_id)
418{
419	return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
420}
421
422static int minor_to_rbd_dev_id(int minor)
423{
424	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
425}
426
427static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
428static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
429static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
430static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
431
432static struct attribute *rbd_bus_attrs[] = {
433	&bus_attr_add.attr,
434	&bus_attr_remove.attr,
435	&bus_attr_add_single_major.attr,
436	&bus_attr_remove_single_major.attr,
437	NULL,
438};
439
440static umode_t rbd_bus_is_visible(struct kobject *kobj,
441				  struct attribute *attr, int index)
442{
443	if (!single_major &&
444	    (attr == &bus_attr_add_single_major.attr ||
445	     attr == &bus_attr_remove_single_major.attr))
446		return 0;
447
448	return attr->mode;
449}
450
451static const struct attribute_group rbd_bus_group = {
452	.attrs = rbd_bus_attrs,
453	.is_visible = rbd_bus_is_visible,
454};
455__ATTRIBUTE_GROUPS(rbd_bus);
456
457static struct bus_type rbd_bus_type = {
458	.name		= "rbd",
459	.bus_groups	= rbd_bus_groups,
460};
461
462static void rbd_root_dev_release(struct device *dev)
463{
464}
465
466static struct device rbd_root_dev = {
467	.init_name =    "rbd",
468	.release =      rbd_root_dev_release,
469};
470
471static __printf(2, 3)
472void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
473{
474	struct va_format vaf;
475	va_list args;
476
477	va_start(args, fmt);
478	vaf.fmt = fmt;
479	vaf.va = &args;
480
481	if (!rbd_dev)
482		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
483	else if (rbd_dev->disk)
484		printk(KERN_WARNING "%s: %s: %pV\n",
485			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
486	else if (rbd_dev->spec && rbd_dev->spec->image_name)
487		printk(KERN_WARNING "%s: image %s: %pV\n",
488			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
489	else if (rbd_dev->spec && rbd_dev->spec->image_id)
490		printk(KERN_WARNING "%s: id %s: %pV\n",
491			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
492	else	/* punt */
493		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
494			RBD_DRV_NAME, rbd_dev, &vaf);
495	va_end(args);
496}
497
498#ifdef RBD_DEBUG
499#define rbd_assert(expr)						\
500		if (unlikely(!(expr))) {				\
501			printk(KERN_ERR "\nAssertion failure in %s() "	\
502						"at line %d:\n\n"	\
503					"\trbd_assert(%s);\n\n",	\
504					__func__, __LINE__, #expr);	\
505			BUG();						\
506		}
507#else /* !RBD_DEBUG */
508#  define rbd_assert(expr)	((void) 0)
509#endif /* !RBD_DEBUG */
510
511static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
512static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
513static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
514
515static int rbd_dev_refresh(struct rbd_device *rbd_dev);
516static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
517static int rbd_dev_header_info(struct rbd_device *rbd_dev);
518static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
519static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
520					u64 snap_id);
521static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
522				u8 *order, u64 *snap_size);
523static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
524		u64 *snap_features);
525static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
526
527static int rbd_open(struct block_device *bdev, fmode_t mode)
528{
529	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
530	bool removing = false;
531
532	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
533		return -EROFS;
534
535	spin_lock_irq(&rbd_dev->lock);
536	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
537		removing = true;
538	else
539		rbd_dev->open_count++;
540	spin_unlock_irq(&rbd_dev->lock);
541	if (removing)
542		return -ENOENT;
543
544	(void) get_device(&rbd_dev->dev);
545
546	return 0;
547}
548
549static void rbd_release(struct gendisk *disk, fmode_t mode)
550{
551	struct rbd_device *rbd_dev = disk->private_data;
552	unsigned long open_count_before;
553
554	spin_lock_irq(&rbd_dev->lock);
555	open_count_before = rbd_dev->open_count--;
556	spin_unlock_irq(&rbd_dev->lock);
557	rbd_assert(open_count_before > 0);
558
559	put_device(&rbd_dev->dev);
560}
561
562static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
563{
564	int ret = 0;
565	int val;
566	bool ro;
567	bool ro_changed = false;
568
569	/* get_user() may sleep, so call it before taking rbd_dev->lock */
570	if (get_user(val, (int __user *)(arg)))
571		return -EFAULT;
572
573	ro = val ? true : false;
574	/* Snapshot doesn't allow to write*/
575	if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
576		return -EROFS;
577
578	spin_lock_irq(&rbd_dev->lock);
579	/* prevent others open this device */
580	if (rbd_dev->open_count > 1) {
581		ret = -EBUSY;
582		goto out;
583	}
584
585	if (rbd_dev->mapping.read_only != ro) {
586		rbd_dev->mapping.read_only = ro;
587		ro_changed = true;
588	}
589
590out:
591	spin_unlock_irq(&rbd_dev->lock);
592	/* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
593	if (ret == 0 && ro_changed)
594		set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
595
596	return ret;
597}
598
599static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
600			unsigned int cmd, unsigned long arg)
601{
602	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
603	int ret = 0;
604
605	switch (cmd) {
606	case BLKROSET:
607		ret = rbd_ioctl_set_ro(rbd_dev, arg);
608		break;
609	default:
610		ret = -ENOTTY;
611	}
612
613	return ret;
614}
615
616#ifdef CONFIG_COMPAT
617static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
618				unsigned int cmd, unsigned long arg)
619{
620	return rbd_ioctl(bdev, mode, cmd, arg);
621}
622#endif /* CONFIG_COMPAT */
623
624static const struct block_device_operations rbd_bd_ops = {
625	.owner			= THIS_MODULE,
626	.open			= rbd_open,
627	.release		= rbd_release,
628	.ioctl			= rbd_ioctl,
629#ifdef CONFIG_COMPAT
630	.compat_ioctl		= rbd_compat_ioctl,
631#endif
632};
633
634/*
635 * Initialize an rbd client instance.  Success or not, this function
636 * consumes ceph_opts.  Caller holds client_mutex.
637 */
638static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
639{
640	struct rbd_client *rbdc;
641	int ret = -ENOMEM;
642
643	dout("%s:\n", __func__);
644	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
645	if (!rbdc)
646		goto out_opt;
647
648	kref_init(&rbdc->kref);
649	INIT_LIST_HEAD(&rbdc->node);
650
651	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
652	if (IS_ERR(rbdc->client))
653		goto out_rbdc;
654	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
655
656	ret = ceph_open_session(rbdc->client);
657	if (ret < 0)
658		goto out_client;
659
660	spin_lock(&rbd_client_list_lock);
661	list_add_tail(&rbdc->node, &rbd_client_list);
662	spin_unlock(&rbd_client_list_lock);
663
664	dout("%s: rbdc %p\n", __func__, rbdc);
665
666	return rbdc;
667out_client:
668	ceph_destroy_client(rbdc->client);
669out_rbdc:
670	kfree(rbdc);
671out_opt:
672	if (ceph_opts)
673		ceph_destroy_options(ceph_opts);
674	dout("%s: error %d\n", __func__, ret);
675
676	return ERR_PTR(ret);
677}
678
679static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
680{
681	kref_get(&rbdc->kref);
682
683	return rbdc;
684}
685
686/*
687 * Find a ceph client with specific addr and configuration.  If
688 * found, bump its reference count.
689 */
690static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
691{
692	struct rbd_client *client_node;
693	bool found = false;
694
695	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
696		return NULL;
697
698	spin_lock(&rbd_client_list_lock);
699	list_for_each_entry(client_node, &rbd_client_list, node) {
700		if (!ceph_compare_options(ceph_opts, client_node->client)) {
701			__rbd_get_client(client_node);
702
703			found = true;
704			break;
705		}
706	}
707	spin_unlock(&rbd_client_list_lock);
708
709	return found ? client_node : NULL;
710}
711
712/*
713 * mount options
714 */
715enum {
716	Opt_last_int,
717	/* int args above */
718	Opt_last_string,
719	/* string args above */
720	Opt_read_only,
721	Opt_read_write,
722	/* Boolean args above */
723	Opt_last_bool,
724};
725
726static match_table_t rbd_opts_tokens = {
727	/* int args above */
728	/* string args above */
729	{Opt_read_only, "read_only"},
730	{Opt_read_only, "ro"},		/* Alternate spelling */
731	{Opt_read_write, "read_write"},
732	{Opt_read_write, "rw"},		/* Alternate spelling */
733	/* Boolean args above */
734	{-1, NULL}
735};
736
737struct rbd_options {
738	bool	read_only;
739};
740
741#define RBD_READ_ONLY_DEFAULT	false
742
743static int parse_rbd_opts_token(char *c, void *private)
744{
745	struct rbd_options *rbd_opts = private;
746	substring_t argstr[MAX_OPT_ARGS];
747	int token, intval, ret;
748
749	token = match_token(c, rbd_opts_tokens, argstr);
750	if (token < 0)
751		return -EINVAL;
752
753	if (token < Opt_last_int) {
754		ret = match_int(&argstr[0], &intval);
755		if (ret < 0) {
756			pr_err("bad mount option arg (not int) "
757			       "at '%s'\n", c);
758			return ret;
759		}
760		dout("got int token %d val %d\n", token, intval);
761	} else if (token > Opt_last_int && token < Opt_last_string) {
762		dout("got string token %d val %s\n", token,
763		     argstr[0].from);
764	} else if (token > Opt_last_string && token < Opt_last_bool) {
765		dout("got Boolean token %d\n", token);
766	} else {
767		dout("got token %d\n", token);
768	}
769
770	switch (token) {
771	case Opt_read_only:
772		rbd_opts->read_only = true;
773		break;
774	case Opt_read_write:
775		rbd_opts->read_only = false;
776		break;
777	default:
778		rbd_assert(false);
779		break;
780	}
781	return 0;
782}
783
784/*
785 * Get a ceph client with specific addr and configuration, if one does
786 * not exist create it.  Either way, ceph_opts is consumed by this
787 * function.
788 */
789static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
790{
791	struct rbd_client *rbdc;
792
793	mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
794	rbdc = rbd_client_find(ceph_opts);
795	if (rbdc)	/* using an existing client */
796		ceph_destroy_options(ceph_opts);
797	else
798		rbdc = rbd_client_create(ceph_opts);
799	mutex_unlock(&client_mutex);
800
801	return rbdc;
802}
803
804/*
805 * Destroy ceph client
806 *
807 * Caller must hold rbd_client_list_lock.
808 */
809static void rbd_client_release(struct kref *kref)
810{
811	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
812
813	dout("%s: rbdc %p\n", __func__, rbdc);
814	spin_lock(&rbd_client_list_lock);
815	list_del(&rbdc->node);
816	spin_unlock(&rbd_client_list_lock);
817
818	ceph_destroy_client(rbdc->client);
819	kfree(rbdc);
820}
821
822/*
823 * Drop reference to ceph client node. If it's not referenced anymore, release
824 * it.
825 */
826static void rbd_put_client(struct rbd_client *rbdc)
827{
828	if (rbdc)
829		kref_put(&rbdc->kref, rbd_client_release);
830}
831
832static bool rbd_image_format_valid(u32 image_format)
833{
834	return image_format == 1 || image_format == 2;
835}
836
837static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
838{
839	size_t size;
840	u32 snap_count;
841
842	/* The header has to start with the magic rbd header text */
843	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
844		return false;
845
846	/* The bio layer requires at least sector-sized I/O */
847
848	if (ondisk->options.order < SECTOR_SHIFT)
849		return false;
850
851	/* If we use u64 in a few spots we may be able to loosen this */
852
853	if (ondisk->options.order > 8 * sizeof (int) - 1)
854		return false;
855
856	/*
857	 * The size of a snapshot header has to fit in a size_t, and
858	 * that limits the number of snapshots.
859	 */
860	snap_count = le32_to_cpu(ondisk->snap_count);
861	size = SIZE_MAX - sizeof (struct ceph_snap_context);
862	if (snap_count > size / sizeof (__le64))
863		return false;
864
865	/*
866	 * Not only that, but the size of the entire the snapshot
867	 * header must also be representable in a size_t.
868	 */
869	size -= snap_count * sizeof (__le64);
870	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
871		return false;
872
873	return true;
874}
875
876/*
877 * Fill an rbd image header with information from the given format 1
878 * on-disk header.
879 */
880static int rbd_header_from_disk(struct rbd_device *rbd_dev,
881				 struct rbd_image_header_ondisk *ondisk)
882{
883	struct rbd_image_header *header = &rbd_dev->header;
884	bool first_time = header->object_prefix == NULL;
885	struct ceph_snap_context *snapc;
886	char *object_prefix = NULL;
887	char *snap_names = NULL;
888	u64 *snap_sizes = NULL;
889	u32 snap_count;
890	size_t size;
891	int ret = -ENOMEM;
892	u32 i;
893
894	/* Allocate this now to avoid having to handle failure below */
895
896	if (first_time) {
897		size_t len;
898
899		len = strnlen(ondisk->object_prefix,
900				sizeof (ondisk->object_prefix));
901		object_prefix = kmalloc(len + 1, GFP_KERNEL);
902		if (!object_prefix)
903			return -ENOMEM;
904		memcpy(object_prefix, ondisk->object_prefix, len);
905		object_prefix[len] = '\0';
906	}
907
908	/* Allocate the snapshot context and fill it in */
909
910	snap_count = le32_to_cpu(ondisk->snap_count);
911	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
912	if (!snapc)
913		goto out_err;
914	snapc->seq = le64_to_cpu(ondisk->snap_seq);
915	if (snap_count) {
916		struct rbd_image_snap_ondisk *snaps;
917		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
918
919		/* We'll keep a copy of the snapshot names... */
920
921		if (snap_names_len > (u64)SIZE_MAX)
922			goto out_2big;
923		snap_names = kmalloc(snap_names_len, GFP_KERNEL);
924		if (!snap_names)
925			goto out_err;
926
927		/* ...as well as the array of their sizes. */
928
929		size = snap_count * sizeof (*header->snap_sizes);
930		snap_sizes = kmalloc(size, GFP_KERNEL);
931		if (!snap_sizes)
932			goto out_err;
933
934		/*
935		 * Copy the names, and fill in each snapshot's id
936		 * and size.
937		 *
938		 * Note that rbd_dev_v1_header_info() guarantees the
939		 * ondisk buffer we're working with has
940		 * snap_names_len bytes beyond the end of the
941		 * snapshot id array, this memcpy() is safe.
942		 */
943		memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
944		snaps = ondisk->snaps;
945		for (i = 0; i < snap_count; i++) {
946			snapc->snaps[i] = le64_to_cpu(snaps[i].id);
947			snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
948		}
949	}
950
951	/* We won't fail any more, fill in the header */
952
953	if (first_time) {
954		header->object_prefix = object_prefix;
955		header->obj_order = ondisk->options.order;
956		header->crypt_type = ondisk->options.crypt_type;
957		header->comp_type = ondisk->options.comp_type;
958		/* The rest aren't used for format 1 images */
959		header->stripe_unit = 0;
960		header->stripe_count = 0;
961		header->features = 0;
962	} else {
963		ceph_put_snap_context(header->snapc);
964		kfree(header->snap_names);
965		kfree(header->snap_sizes);
966	}
967
968	/* The remaining fields always get updated (when we refresh) */
969
970	header->image_size = le64_to_cpu(ondisk->image_size);
971	header->snapc = snapc;
972	header->snap_names = snap_names;
973	header->snap_sizes = snap_sizes;
974
975	return 0;
976out_2big:
977	ret = -EIO;
978out_err:
979	kfree(snap_sizes);
980	kfree(snap_names);
981	ceph_put_snap_context(snapc);
982	kfree(object_prefix);
983
984	return ret;
985}
986
987static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
988{
989	const char *snap_name;
990
991	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
992
993	/* Skip over names until we find the one we are looking for */
994
995	snap_name = rbd_dev->header.snap_names;
996	while (which--)
997		snap_name += strlen(snap_name) + 1;
998
999	return kstrdup(snap_name, GFP_KERNEL);
1000}
1001
1002/*
1003 * Snapshot id comparison function for use with qsort()/bsearch().
1004 * Note that result is for snapshots in *descending* order.
1005 */
1006static int snapid_compare_reverse(const void *s1, const void *s2)
1007{
1008	u64 snap_id1 = *(u64 *)s1;
1009	u64 snap_id2 = *(u64 *)s2;
1010
1011	if (snap_id1 < snap_id2)
1012		return 1;
1013	return snap_id1 == snap_id2 ? 0 : -1;
1014}
1015
1016/*
1017 * Search a snapshot context to see if the given snapshot id is
1018 * present.
1019 *
1020 * Returns the position of the snapshot id in the array if it's found,
1021 * or BAD_SNAP_INDEX otherwise.
1022 *
1023 * Note: The snapshot array is in kept sorted (by the osd) in
1024 * reverse order, highest snapshot id first.
1025 */
1026static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1027{
1028	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1029	u64 *found;
1030
1031	found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1032				sizeof (snap_id), snapid_compare_reverse);
1033
1034	return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1035}
1036
1037static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1038					u64 snap_id)
1039{
1040	u32 which;
1041	const char *snap_name;
1042
1043	which = rbd_dev_snap_index(rbd_dev, snap_id);
1044	if (which == BAD_SNAP_INDEX)
1045		return ERR_PTR(-ENOENT);
1046
1047	snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1048	return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1049}
1050
1051static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1052{
1053	if (snap_id == CEPH_NOSNAP)
1054		return RBD_SNAP_HEAD_NAME;
1055
1056	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1057	if (rbd_dev->image_format == 1)
1058		return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1059
1060	return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1061}
1062
1063static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1064				u64 *snap_size)
1065{
1066	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1067	if (snap_id == CEPH_NOSNAP) {
1068		*snap_size = rbd_dev->header.image_size;
1069	} else if (rbd_dev->image_format == 1) {
1070		u32 which;
1071
1072		which = rbd_dev_snap_index(rbd_dev, snap_id);
1073		if (which == BAD_SNAP_INDEX)
1074			return -ENOENT;
1075
1076		*snap_size = rbd_dev->header.snap_sizes[which];
1077	} else {
1078		u64 size = 0;
1079		int ret;
1080
1081		ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1082		if (ret)
1083			return ret;
1084
1085		*snap_size = size;
1086	}
1087	return 0;
1088}
1089
1090static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1091			u64 *snap_features)
1092{
1093	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1094	if (snap_id == CEPH_NOSNAP) {
1095		*snap_features = rbd_dev->header.features;
1096	} else if (rbd_dev->image_format == 1) {
1097		*snap_features = 0;	/* No features for format 1 */
1098	} else {
1099		u64 features = 0;
1100		int ret;
1101
1102		ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1103		if (ret)
1104			return ret;
1105
1106		*snap_features = features;
1107	}
1108	return 0;
1109}
1110
1111static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1112{
1113	u64 snap_id = rbd_dev->spec->snap_id;
1114	u64 size = 0;
1115	u64 features = 0;
1116	int ret;
1117
1118	ret = rbd_snap_size(rbd_dev, snap_id, &size);
1119	if (ret)
1120		return ret;
1121	ret = rbd_snap_features(rbd_dev, snap_id, &features);
1122	if (ret)
1123		return ret;
1124
1125	rbd_dev->mapping.size = size;
1126	rbd_dev->mapping.features = features;
1127
1128	return 0;
1129}
1130
1131static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1132{
1133	rbd_dev->mapping.size = 0;
1134	rbd_dev->mapping.features = 0;
1135}
1136
1137static void rbd_segment_name_free(const char *name)
1138{
1139	/* The explicit cast here is needed to drop the const qualifier */
1140
1141	kmem_cache_free(rbd_segment_name_cache, (void *)name);
1142}
1143
1144static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1145{
1146	char *name;
1147	u64 segment;
1148	int ret;
1149	char *name_format;
1150
1151	name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1152	if (!name)
1153		return NULL;
1154	segment = offset >> rbd_dev->header.obj_order;
1155	name_format = "%s.%012llx";
1156	if (rbd_dev->image_format == 2)
1157		name_format = "%s.%016llx";
1158	ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1159			rbd_dev->header.object_prefix, segment);
1160	if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1161		pr_err("error formatting segment name for #%llu (%d)\n",
1162			segment, ret);
1163		rbd_segment_name_free(name);
1164		name = NULL;
1165	}
1166
1167	return name;
1168}
1169
1170static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1171{
1172	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1173
1174	return offset & (segment_size - 1);
1175}
1176
1177static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1178				u64 offset, u64 length)
1179{
1180	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1181
1182	offset &= segment_size - 1;
1183
1184	rbd_assert(length <= U64_MAX - offset);
1185	if (offset + length > segment_size)
1186		length = segment_size - offset;
1187
1188	return length;
1189}
1190
1191/*
1192 * returns the size of an object in the image
1193 */
1194static u64 rbd_obj_bytes(struct rbd_image_header *header)
1195{
1196	return 1 << header->obj_order;
1197}
1198
1199/*
1200 * bio helpers
1201 */
1202
1203static void bio_chain_put(struct bio *chain)
1204{
1205	struct bio *tmp;
1206
1207	while (chain) {
1208		tmp = chain;
1209		chain = chain->bi_next;
1210		bio_put(tmp);
1211	}
1212}
1213
1214/*
1215 * zeros a bio chain, starting at specific offset
1216 */
1217static void zero_bio_chain(struct bio *chain, int start_ofs)
1218{
1219	struct bio_vec bv;
1220	struct bvec_iter iter;
1221	unsigned long flags;
1222	void *buf;
1223	int pos = 0;
1224
1225	while (chain) {
1226		bio_for_each_segment(bv, chain, iter) {
1227			if (pos + bv.bv_len > start_ofs) {
1228				int remainder = max(start_ofs - pos, 0);
1229				buf = bvec_kmap_irq(&bv, &flags);
1230				memset(buf + remainder, 0,
1231				       bv.bv_len - remainder);
1232				flush_dcache_page(bv.bv_page);
1233				bvec_kunmap_irq(buf, &flags);
1234			}
1235			pos += bv.bv_len;
1236		}
1237
1238		chain = chain->bi_next;
1239	}
1240}
1241
1242/*
1243 * similar to zero_bio_chain(), zeros data defined by a page array,
1244 * starting at the given byte offset from the start of the array and
1245 * continuing up to the given end offset.  The pages array is
1246 * assumed to be big enough to hold all bytes up to the end.
1247 */
1248static void zero_pages(struct page **pages, u64 offset, u64 end)
1249{
1250	struct page **page = &pages[offset >> PAGE_SHIFT];
1251
1252	rbd_assert(end > offset);
1253	rbd_assert(end - offset <= (u64)SIZE_MAX);
1254	while (offset < end) {
1255		size_t page_offset;
1256		size_t length;
1257		unsigned long flags;
1258		void *kaddr;
1259
1260		page_offset = offset & ~PAGE_MASK;
1261		length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1262		local_irq_save(flags);
1263		kaddr = kmap_atomic(*page);
1264		memset(kaddr + page_offset, 0, length);
1265		flush_dcache_page(*page);
1266		kunmap_atomic(kaddr);
1267		local_irq_restore(flags);
1268
1269		offset += length;
1270		page++;
1271	}
1272}
1273
1274/*
1275 * Clone a portion of a bio, starting at the given byte offset
1276 * and continuing for the number of bytes indicated.
1277 */
1278static struct bio *bio_clone_range(struct bio *bio_src,
1279					unsigned int offset,
1280					unsigned int len,
1281					gfp_t gfpmask)
1282{
1283	struct bio *bio;
1284
1285	bio = bio_clone(bio_src, gfpmask);
1286	if (!bio)
1287		return NULL;	/* ENOMEM */
1288
1289	bio_advance(bio, offset);
1290	bio->bi_iter.bi_size = len;
1291
1292	return bio;
1293}
1294
1295/*
1296 * Clone a portion of a bio chain, starting at the given byte offset
1297 * into the first bio in the source chain and continuing for the
1298 * number of bytes indicated.  The result is another bio chain of
1299 * exactly the given length, or a null pointer on error.
1300 *
1301 * The bio_src and offset parameters are both in-out.  On entry they
1302 * refer to the first source bio and the offset into that bio where
1303 * the start of data to be cloned is located.
1304 *
1305 * On return, bio_src is updated to refer to the bio in the source
1306 * chain that contains first un-cloned byte, and *offset will
1307 * contain the offset of that byte within that bio.
1308 */
1309static struct bio *bio_chain_clone_range(struct bio **bio_src,
1310					unsigned int *offset,
1311					unsigned int len,
1312					gfp_t gfpmask)
1313{
1314	struct bio *bi = *bio_src;
1315	unsigned int off = *offset;
1316	struct bio *chain = NULL;
1317	struct bio **end;
1318
1319	/* Build up a chain of clone bios up to the limit */
1320
1321	if (!bi || off >= bi->bi_iter.bi_size || !len)
1322		return NULL;		/* Nothing to clone */
1323
1324	end = &chain;
1325	while (len) {
1326		unsigned int bi_size;
1327		struct bio *bio;
1328
1329		if (!bi) {
1330			rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1331			goto out_err;	/* EINVAL; ran out of bio's */
1332		}
1333		bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1334		bio = bio_clone_range(bi, off, bi_size, gfpmask);
1335		if (!bio)
1336			goto out_err;	/* ENOMEM */
1337
1338		*end = bio;
1339		end = &bio->bi_next;
1340
1341		off += bi_size;
1342		if (off == bi->bi_iter.bi_size) {
1343			bi = bi->bi_next;
1344			off = 0;
1345		}
1346		len -= bi_size;
1347	}
1348	*bio_src = bi;
1349	*offset = off;
1350
1351	return chain;
1352out_err:
1353	bio_chain_put(chain);
1354
1355	return NULL;
1356}
1357
1358/*
1359 * The default/initial value for all object request flags is 0.  For
1360 * each flag, once its value is set to 1 it is never reset to 0
1361 * again.
1362 */
1363static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1364{
1365	if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1366		struct rbd_device *rbd_dev;
1367
1368		rbd_dev = obj_request->img_request->rbd_dev;
1369		rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1370			obj_request);
1371	}
1372}
1373
1374static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1375{
1376	smp_mb();
1377	return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1378}
1379
1380static void obj_request_done_set(struct rbd_obj_request *obj_request)
1381{
1382	if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1383		struct rbd_device *rbd_dev = NULL;
1384
1385		if (obj_request_img_data_test(obj_request))
1386			rbd_dev = obj_request->img_request->rbd_dev;
1387		rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1388			obj_request);
1389	}
1390}
1391
1392static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1393{
1394	smp_mb();
1395	return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1396}
1397
1398/*
1399 * This sets the KNOWN flag after (possibly) setting the EXISTS
1400 * flag.  The latter is set based on the "exists" value provided.
1401 *
1402 * Note that for our purposes once an object exists it never goes
1403 * away again.  It's possible that the response from two existence
1404 * checks are separated by the creation of the target object, and
1405 * the first ("doesn't exist") response arrives *after* the second
1406 * ("does exist").  In that case we ignore the second one.
1407 */
1408static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1409				bool exists)
1410{
1411	if (exists)
1412		set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1413	set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1414	smp_mb();
1415}
1416
1417static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1418{
1419	smp_mb();
1420	return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1421}
1422
1423static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1424{
1425	smp_mb();
1426	return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1427}
1428
1429static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1430{
1431	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1432
1433	return obj_request->img_offset <
1434	    round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1435}
1436
1437static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1438{
1439	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1440		atomic_read(&obj_request->kref.refcount));
1441	kref_get(&obj_request->kref);
1442}
1443
1444static void rbd_obj_request_destroy(struct kref *kref);
1445static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1446{
1447	rbd_assert(obj_request != NULL);
1448	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1449		atomic_read(&obj_request->kref.refcount));
1450	kref_put(&obj_request->kref, rbd_obj_request_destroy);
1451}
1452
1453static void rbd_img_request_get(struct rbd_img_request *img_request)
1454{
1455	dout("%s: img %p (was %d)\n", __func__, img_request,
1456	     atomic_read(&img_request->kref.refcount));
1457	kref_get(&img_request->kref);
1458}
1459
1460static bool img_request_child_test(struct rbd_img_request *img_request);
1461static void rbd_parent_request_destroy(struct kref *kref);
1462static void rbd_img_request_destroy(struct kref *kref);
1463static void rbd_img_request_put(struct rbd_img_request *img_request)
1464{
1465	rbd_assert(img_request != NULL);
1466	dout("%s: img %p (was %d)\n", __func__, img_request,
1467		atomic_read(&img_request->kref.refcount));
1468	if (img_request_child_test(img_request))
1469		kref_put(&img_request->kref, rbd_parent_request_destroy);
1470	else
1471		kref_put(&img_request->kref, rbd_img_request_destroy);
1472}
1473
1474static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1475					struct rbd_obj_request *obj_request)
1476{
1477	rbd_assert(obj_request->img_request == NULL);
1478
1479	/* Image request now owns object's original reference */
1480	obj_request->img_request = img_request;
1481	obj_request->which = img_request->obj_request_count;
1482	rbd_assert(!obj_request_img_data_test(obj_request));
1483	obj_request_img_data_set(obj_request);
1484	rbd_assert(obj_request->which != BAD_WHICH);
1485	img_request->obj_request_count++;
1486	list_add_tail(&obj_request->links, &img_request->obj_requests);
1487	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1488		obj_request->which);
1489}
1490
1491static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1492					struct rbd_obj_request *obj_request)
1493{
1494	rbd_assert(obj_request->which != BAD_WHICH);
1495
1496	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1497		obj_request->which);
1498	list_del(&obj_request->links);
1499	rbd_assert(img_request->obj_request_count > 0);
1500	img_request->obj_request_count--;
1501	rbd_assert(obj_request->which == img_request->obj_request_count);
1502	obj_request->which = BAD_WHICH;
1503	rbd_assert(obj_request_img_data_test(obj_request));
1504	rbd_assert(obj_request->img_request == img_request);
1505	obj_request->img_request = NULL;
1506	obj_request->callback = NULL;
1507	rbd_obj_request_put(obj_request);
1508}
1509
1510static bool obj_request_type_valid(enum obj_request_type type)
1511{
1512	switch (type) {
1513	case OBJ_REQUEST_NODATA:
1514	case OBJ_REQUEST_BIO:
1515	case OBJ_REQUEST_PAGES:
1516		return true;
1517	default:
1518		return false;
1519	}
1520}
1521
1522static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1523				struct rbd_obj_request *obj_request)
1524{
1525	dout("%s %p\n", __func__, obj_request);
1526	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1527}
1528
1529static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1530{
1531	dout("%s %p\n", __func__, obj_request);
1532	ceph_osdc_cancel_request(obj_request->osd_req);
1533}
1534
1535/*
1536 * Wait for an object request to complete.  If interrupted, cancel the
1537 * underlying osd request.
1538 */
1539static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1540{
1541	int ret;
1542
1543	dout("%s %p\n", __func__, obj_request);
1544
1545	ret = wait_for_completion_interruptible(&obj_request->completion);
1546	if (ret < 0) {
1547		dout("%s %p interrupted\n", __func__, obj_request);
1548		rbd_obj_request_end(obj_request);
1549		return ret;
1550	}
1551
1552	dout("%s %p done\n", __func__, obj_request);
1553	return 0;
1554}
1555
1556static void rbd_img_request_complete(struct rbd_img_request *img_request)
1557{
1558
1559	dout("%s: img %p\n", __func__, img_request);
1560
1561	/*
1562	 * If no error occurred, compute the aggregate transfer
1563	 * count for the image request.  We could instead use
1564	 * atomic64_cmpxchg() to update it as each object request
1565	 * completes; not clear which way is better off hand.
1566	 */
1567	if (!img_request->result) {
1568		struct rbd_obj_request *obj_request;
1569		u64 xferred = 0;
1570
1571		for_each_obj_request(img_request, obj_request)
1572			xferred += obj_request->xferred;
1573		img_request->xferred = xferred;
1574	}
1575
1576	if (img_request->callback)
1577		img_request->callback(img_request);
1578	else
1579		rbd_img_request_put(img_request);
1580}
1581
1582/*
1583 * The default/initial value for all image request flags is 0.  Each
1584 * is conditionally set to 1 at image request initialization time
1585 * and currently never change thereafter.
1586 */
1587static void img_request_write_set(struct rbd_img_request *img_request)
1588{
1589	set_bit(IMG_REQ_WRITE, &img_request->flags);
1590	smp_mb();
1591}
1592
1593static bool img_request_write_test(struct rbd_img_request *img_request)
1594{
1595	smp_mb();
1596	return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1597}
1598
1599static void img_request_child_set(struct rbd_img_request *img_request)
1600{
1601	set_bit(IMG_REQ_CHILD, &img_request->flags);
1602	smp_mb();
1603}
1604
1605static void img_request_child_clear(struct rbd_img_request *img_request)
1606{
1607	clear_bit(IMG_REQ_CHILD, &img_request->flags);
1608	smp_mb();
1609}
1610
1611static bool img_request_child_test(struct rbd_img_request *img_request)
1612{
1613	smp_mb();
1614	return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1615}
1616
1617static void img_request_layered_set(struct rbd_img_request *img_request)
1618{
1619	set_bit(IMG_REQ_LAYERED, &img_request->flags);
1620	smp_mb();
1621}
1622
1623static void img_request_layered_clear(struct rbd_img_request *img_request)
1624{
1625	clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1626	smp_mb();
1627}
1628
1629static bool img_request_layered_test(struct rbd_img_request *img_request)
1630{
1631	smp_mb();
1632	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1633}
1634
1635static void
1636rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1637{
1638	u64 xferred = obj_request->xferred;
1639	u64 length = obj_request->length;
1640
1641	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1642		obj_request, obj_request->img_request, obj_request->result,
1643		xferred, length);
1644	/*
1645	 * ENOENT means a hole in the image.  We zero-fill the entire
1646	 * length of the request.  A short read also implies zero-fill
1647	 * to the end of the request.  An error requires the whole
1648	 * length of the request to be reported finished with an error
1649	 * to the block layer.  In each case we update the xferred
1650	 * count to indicate the whole request was satisfied.
1651	 */
1652	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1653	if (obj_request->result == -ENOENT) {
1654		if (obj_request->type == OBJ_REQUEST_BIO)
1655			zero_bio_chain(obj_request->bio_list, 0);
1656		else
1657			zero_pages(obj_request->pages, 0, length);
1658		obj_request->result = 0;
1659	} else if (xferred < length && !obj_request->result) {
1660		if (obj_request->type == OBJ_REQUEST_BIO)
1661			zero_bio_chain(obj_request->bio_list, xferred);
1662		else
1663			zero_pages(obj_request->pages, xferred, length);
1664	}
1665	obj_request->xferred = length;
1666	obj_request_done_set(obj_request);
1667}
1668
1669static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1670{
1671	dout("%s: obj %p cb %p\n", __func__, obj_request,
1672		obj_request->callback);
1673	if (obj_request->callback)
1674		obj_request->callback(obj_request);
1675	else
1676		complete_all(&obj_request->completion);
1677}
1678
1679static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1680{
1681	dout("%s: obj %p\n", __func__, obj_request);
1682	obj_request_done_set(obj_request);
1683}
1684
1685static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1686{
1687	struct rbd_img_request *img_request = NULL;
1688	struct rbd_device *rbd_dev = NULL;
1689	bool layered = false;
1690
1691	if (obj_request_img_data_test(obj_request)) {
1692		img_request = obj_request->img_request;
1693		layered = img_request && img_request_layered_test(img_request);
1694		rbd_dev = img_request->rbd_dev;
1695	}
1696
1697	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1698		obj_request, img_request, obj_request->result,
1699		obj_request->xferred, obj_request->length);
1700	if (layered && obj_request->result == -ENOENT &&
1701			obj_request->img_offset < rbd_dev->parent_overlap)
1702		rbd_img_parent_read(obj_request);
1703	else if (img_request)
1704		rbd_img_obj_request_read_callback(obj_request);
1705	else
1706		obj_request_done_set(obj_request);
1707}
1708
1709static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1710{
1711	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1712		obj_request->result, obj_request->length);
1713	/*
1714	 * There is no such thing as a successful short write.  Set
1715	 * it to our originally-requested length.
1716	 */
1717	obj_request->xferred = obj_request->length;
1718	obj_request_done_set(obj_request);
1719}
1720
1721/*
1722 * For a simple stat call there's nothing to do.  We'll do more if
1723 * this is part of a write sequence for a layered image.
1724 */
1725static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1726{
1727	dout("%s: obj %p\n", __func__, obj_request);
1728	obj_request_done_set(obj_request);
1729}
1730
1731static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1732				struct ceph_msg *msg)
1733{
1734	struct rbd_obj_request *obj_request = osd_req->r_priv;
1735	u16 opcode;
1736
1737	dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1738	rbd_assert(osd_req == obj_request->osd_req);
1739	if (obj_request_img_data_test(obj_request)) {
1740		rbd_assert(obj_request->img_request);
1741		rbd_assert(obj_request->which != BAD_WHICH);
1742	} else {
1743		rbd_assert(obj_request->which == BAD_WHICH);
1744	}
1745
1746	if (osd_req->r_result < 0)
1747		obj_request->result = osd_req->r_result;
1748
1749	rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1750
1751	/*
1752	 * We support a 64-bit length, but ultimately it has to be
1753	 * passed to blk_end_request(), which takes an unsigned int.
1754	 */
1755	obj_request->xferred = osd_req->r_reply_op_len[0];
1756	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1757
1758	opcode = osd_req->r_ops[0].op;
1759	switch (opcode) {
1760	case CEPH_OSD_OP_READ:
1761		rbd_osd_read_callback(obj_request);
1762		break;
1763	case CEPH_OSD_OP_SETALLOCHINT:
1764		rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
1765		/* fall through */
1766	case CEPH_OSD_OP_WRITE:
1767		rbd_osd_write_callback(obj_request);
1768		break;
1769	case CEPH_OSD_OP_STAT:
1770		rbd_osd_stat_callback(obj_request);
1771		break;
1772	case CEPH_OSD_OP_CALL:
1773	case CEPH_OSD_OP_NOTIFY_ACK:
1774	case CEPH_OSD_OP_WATCH:
1775		rbd_osd_trivial_callback(obj_request);
1776		break;
1777	default:
1778		rbd_warn(NULL, "%s: unsupported op %hu\n",
1779			obj_request->object_name, (unsigned short) opcode);
1780		break;
1781	}
1782
1783	if (obj_request_done_test(obj_request))
1784		rbd_obj_request_complete(obj_request);
1785}
1786
1787static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1788{
1789	struct rbd_img_request *img_request = obj_request->img_request;
1790	struct ceph_osd_request *osd_req = obj_request->osd_req;
1791	u64 snap_id;
1792
1793	rbd_assert(osd_req != NULL);
1794
1795	snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1796	ceph_osdc_build_request(osd_req, obj_request->offset,
1797			NULL, snap_id, NULL);
1798}
1799
1800static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1801{
1802	struct rbd_img_request *img_request = obj_request->img_request;
1803	struct ceph_osd_request *osd_req = obj_request->osd_req;
1804	struct ceph_snap_context *snapc;
1805	struct timespec mtime = CURRENT_TIME;
1806
1807	rbd_assert(osd_req != NULL);
1808
1809	snapc = img_request ? img_request->snapc : NULL;
1810	ceph_osdc_build_request(osd_req, obj_request->offset,
1811			snapc, CEPH_NOSNAP, &mtime);
1812}
1813
1814/*
1815 * Create an osd request.  A read request has one osd op (read).
1816 * A write request has either one (watch) or two (hint+write) osd ops.
1817 * (All rbd data writes are prefixed with an allocation hint op, but
1818 * technically osd watch is a write request, hence this distinction.)
1819 */
1820static struct ceph_osd_request *rbd_osd_req_create(
1821					struct rbd_device *rbd_dev,
1822					bool write_request,
1823					unsigned int num_ops,
1824					struct rbd_obj_request *obj_request)
1825{
1826	struct ceph_snap_context *snapc = NULL;
1827	struct ceph_osd_client *osdc;
1828	struct ceph_osd_request *osd_req;
1829
1830	if (obj_request_img_data_test(obj_request)) {
1831		struct rbd_img_request *img_request = obj_request->img_request;
1832
1833		rbd_assert(write_request ==
1834				img_request_write_test(img_request));
1835		if (write_request)
1836			snapc = img_request->snapc;
1837	}
1838
1839	rbd_assert(num_ops == 1 || (write_request && num_ops == 2));
1840
1841	/* Allocate and initialize the request, for the num_ops ops */
1842
1843	osdc = &rbd_dev->rbd_client->client->osdc;
1844	osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
1845					  GFP_ATOMIC);
1846	if (!osd_req)
1847		return NULL;	/* ENOMEM */
1848
1849	if (write_request)
1850		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1851	else
1852		osd_req->r_flags = CEPH_OSD_FLAG_READ;
1853
1854	osd_req->r_callback = rbd_osd_req_callback;
1855	osd_req->r_priv = obj_request;
1856
1857	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1858	ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1859
1860	return osd_req;
1861}
1862
1863/*
1864 * Create a copyup osd request based on the information in the
1865 * object request supplied.  A copyup request has three osd ops,
1866 * a copyup method call, a hint op, and a write op.
1867 */
1868static struct ceph_osd_request *
1869rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1870{
1871	struct rbd_img_request *img_request;
1872	struct ceph_snap_context *snapc;
1873	struct rbd_device *rbd_dev;
1874	struct ceph_osd_client *osdc;
1875	struct ceph_osd_request *osd_req;
1876
1877	rbd_assert(obj_request_img_data_test(obj_request));
1878	img_request = obj_request->img_request;
1879	rbd_assert(img_request);
1880	rbd_assert(img_request_write_test(img_request));
1881
1882	/* Allocate and initialize the request, for the three ops */
1883
1884	snapc = img_request->snapc;
1885	rbd_dev = img_request->rbd_dev;
1886	osdc = &rbd_dev->rbd_client->client->osdc;
1887	osd_req = ceph_osdc_alloc_request(osdc, snapc, 3, false, GFP_ATOMIC);
1888	if (!osd_req)
1889		return NULL;	/* ENOMEM */
1890
1891	osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1892	osd_req->r_callback = rbd_osd_req_callback;
1893	osd_req->r_priv = obj_request;
1894
1895	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1896	ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1897
1898	return osd_req;
1899}
1900
1901
1902static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1903{
1904	ceph_osdc_put_request(osd_req);
1905}
1906
1907/* object_name is assumed to be a non-null pointer and NUL-terminated */
1908
1909static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1910						u64 offset, u64 length,
1911						enum obj_request_type type)
1912{
1913	struct rbd_obj_request *obj_request;
1914	size_t size;
1915	char *name;
1916
1917	rbd_assert(obj_request_type_valid(type));
1918
1919	size = strlen(object_name) + 1;
1920	name = kmalloc(size, GFP_KERNEL);
1921	if (!name)
1922		return NULL;
1923
1924	obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
1925	if (!obj_request) {
1926		kfree(name);
1927		return NULL;
1928	}
1929
1930	obj_request->object_name = memcpy(name, object_name, size);
1931	obj_request->offset = offset;
1932	obj_request->length = length;
1933	obj_request->flags = 0;
1934	obj_request->which = BAD_WHICH;
1935	obj_request->type = type;
1936	INIT_LIST_HEAD(&obj_request->links);
1937	init_completion(&obj_request->completion);
1938	kref_init(&obj_request->kref);
1939
1940	dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1941		offset, length, (int)type, obj_request);
1942
1943	return obj_request;
1944}
1945
1946static void rbd_obj_request_destroy(struct kref *kref)
1947{
1948	struct rbd_obj_request *obj_request;
1949
1950	obj_request = container_of(kref, struct rbd_obj_request, kref);
1951
1952	dout("%s: obj %p\n", __func__, obj_request);
1953
1954	rbd_assert(obj_request->img_request == NULL);
1955	rbd_assert(obj_request->which == BAD_WHICH);
1956
1957	if (obj_request->osd_req)
1958		rbd_osd_req_destroy(obj_request->osd_req);
1959
1960	rbd_assert(obj_request_type_valid(obj_request->type));
1961	switch (obj_request->type) {
1962	case OBJ_REQUEST_NODATA:
1963		break;		/* Nothing to do */
1964	case OBJ_REQUEST_BIO:
1965		if (obj_request->bio_list)
1966			bio_chain_put(obj_request->bio_list);
1967		break;
1968	case OBJ_REQUEST_PAGES:
1969		if (obj_request->pages)
1970			ceph_release_page_vector(obj_request->pages,
1971						obj_request->page_count);
1972		break;
1973	}
1974
1975	kfree(obj_request->object_name);
1976	obj_request->object_name = NULL;
1977	kmem_cache_free(rbd_obj_request_cache, obj_request);
1978}
1979
1980/* It's OK to call this for a device with no parent */
1981
1982static void rbd_spec_put(struct rbd_spec *spec);
1983static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1984{
1985	rbd_dev_remove_parent(rbd_dev);
1986	rbd_spec_put(rbd_dev->parent_spec);
1987	rbd_dev->parent_spec = NULL;
1988	rbd_dev->parent_overlap = 0;
1989}
1990
1991/*
1992 * Parent image reference counting is used to determine when an
1993 * image's parent fields can be safely torn down--after there are no
1994 * more in-flight requests to the parent image.  When the last
1995 * reference is dropped, cleaning them up is safe.
1996 */
1997static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1998{
1999	int counter;
2000
2001	if (!rbd_dev->parent_spec)
2002		return;
2003
2004	counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2005	if (counter > 0)
2006		return;
2007
2008	/* Last reference; clean up parent data structures */
2009
2010	if (!counter)
2011		rbd_dev_unparent(rbd_dev);
2012	else
2013		rbd_warn(rbd_dev, "parent reference underflow\n");
2014}
2015
2016/*
2017 * If an image has a non-zero parent overlap, get a reference to its
2018 * parent.
2019 *
2020 * We must get the reference before checking for the overlap to
2021 * coordinate properly with zeroing the parent overlap in
2022 * rbd_dev_v2_parent_info() when an image gets flattened.  We
2023 * drop it again if there is no overlap.
2024 *
2025 * Returns true if the rbd device has a parent with a non-zero
2026 * overlap and a reference for it was successfully taken, or
2027 * false otherwise.
2028 */
2029static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2030{
2031	int counter;
2032
2033	if (!rbd_dev->parent_spec)
2034		return false;
2035
2036	counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2037	if (counter > 0 && rbd_dev->parent_overlap)
2038		return true;
2039
2040	/* Image was flattened, but parent is not yet torn down */
2041
2042	if (counter < 0)
2043		rbd_warn(rbd_dev, "parent reference overflow\n");
2044
2045	return false;
2046}
2047
2048/*
2049 * Caller is responsible for filling in the list of object requests
2050 * that comprises the image request, and the Linux request pointer
2051 * (if there is one).
2052 */
2053static struct rbd_img_request *rbd_img_request_create(
2054					struct rbd_device *rbd_dev,
2055					u64 offset, u64 length,
2056					bool write_request)
2057{
2058	struct rbd_img_request *img_request;
2059
2060	img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
2061	if (!img_request)
2062		return NULL;
2063
2064	if (write_request) {
2065		down_read(&rbd_dev->header_rwsem);
2066		ceph_get_snap_context(rbd_dev->header.snapc);
2067		up_read(&rbd_dev->header_rwsem);
2068	}
2069
2070	img_request->rq = NULL;
2071	img_request->rbd_dev = rbd_dev;
2072	img_request->offset = offset;
2073	img_request->length = length;
2074	img_request->flags = 0;
2075	if (write_request) {
2076		img_request_write_set(img_request);
2077		img_request->snapc = rbd_dev->header.snapc;
2078	} else {
2079		img_request->snap_id = rbd_dev->spec->snap_id;
2080	}
2081	if (rbd_dev_parent_get(rbd_dev))
2082		img_request_layered_set(img_request);
2083	spin_lock_init(&img_request->completion_lock);
2084	img_request->next_completion = 0;
2085	img_request->callback = NULL;
2086	img_request->result = 0;
2087	img_request->obj_request_count = 0;
2088	INIT_LIST_HEAD(&img_request->obj_requests);
2089	kref_init(&img_request->kref);
2090
2091	dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2092		write_request ? "write" : "read", offset, length,
2093		img_request);
2094
2095	return img_request;
2096}
2097
2098static void rbd_img_request_destroy(struct kref *kref)
2099{
2100	struct rbd_img_request *img_request;
2101	struct rbd_obj_request *obj_request;
2102	struct rbd_obj_request *next_obj_request;
2103
2104	img_request = container_of(kref, struct rbd_img_request, kref);
2105
2106	dout("%s: img %p\n", __func__, img_request);
2107
2108	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2109		rbd_img_obj_request_del(img_request, obj_request);
2110	rbd_assert(img_request->obj_request_count == 0);
2111
2112	if (img_request_layered_test(img_request)) {
2113		img_request_layered_clear(img_request);
2114		rbd_dev_parent_put(img_request->rbd_dev);
2115	}
2116
2117	if (img_request_write_test(img_request))
2118		ceph_put_snap_context(img_request->snapc);
2119
2120	kmem_cache_free(rbd_img_request_cache, img_request);
2121}
2122
2123static struct rbd_img_request *rbd_parent_request_create(
2124					struct rbd_obj_request *obj_request,
2125					u64 img_offset, u64 length)
2126{
2127	struct rbd_img_request *parent_request;
2128	struct rbd_device *rbd_dev;
2129
2130	rbd_assert(obj_request->img_request);
2131	rbd_dev = obj_request->img_request->rbd_dev;
2132
2133	parent_request = rbd_img_request_create(rbd_dev->parent,
2134						img_offset, length, false);
2135	if (!parent_request)
2136		return NULL;
2137
2138	img_request_child_set(parent_request);
2139	rbd_obj_request_get(obj_request);
2140	parent_request->obj_request = obj_request;
2141
2142	return parent_request;
2143}
2144
2145static void rbd_parent_request_destroy(struct kref *kref)
2146{
2147	struct rbd_img_request *parent_request;
2148	struct rbd_obj_request *orig_request;
2149
2150	parent_request = container_of(kref, struct rbd_img_request, kref);
2151	orig_request = parent_request->obj_request;
2152
2153	parent_request->obj_request = NULL;
2154	rbd_obj_request_put(orig_request);
2155	img_request_child_clear(parent_request);
2156
2157	rbd_img_request_destroy(kref);
2158}
2159
2160static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2161{
2162	struct rbd_img_request *img_request;
2163	unsigned int xferred;
2164	int result;
2165	bool more;
2166
2167	rbd_assert(obj_request_img_data_test(obj_request));
2168	img_request = obj_request->img_request;
2169
2170	rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2171	xferred = (unsigned int)obj_request->xferred;
2172	result = obj_request->result;
2173	if (result) {
2174		struct rbd_device *rbd_dev = img_request->rbd_dev;
2175
2176		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2177			img_request_write_test(img_request) ? "write" : "read",
2178			obj_request->length, obj_request->img_offset,
2179			obj_request->offset);
2180		rbd_warn(rbd_dev, "  result %d xferred %x\n",
2181			result, xferred);
2182		if (!img_request->result)
2183			img_request->result = result;
2184	}
2185
2186	/* Image object requests don't own their page array */
2187
2188	if (obj_request->type == OBJ_REQUEST_PAGES) {
2189		obj_request->pages = NULL;
2190		obj_request->page_count = 0;
2191	}
2192
2193	if (img_request_child_test(img_request)) {
2194		rbd_assert(img_request->obj_request != NULL);
2195		more = obj_request->which < img_request->obj_request_count - 1;
2196	} else {
2197		rbd_assert(img_request->rq != NULL);
2198		more = blk_end_request(img_request->rq, result, xferred);
2199	}
2200
2201	return more;
2202}
2203
2204static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2205{
2206	struct rbd_img_request *img_request;
2207	u32 which = obj_request->which;
2208	bool more = true;
2209
2210	rbd_assert(obj_request_img_data_test(obj_request));
2211	img_request = obj_request->img_request;
2212
2213	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2214	rbd_assert(img_request != NULL);
2215	rbd_assert(img_request->obj_request_count > 0);
2216	rbd_assert(which != BAD_WHICH);
2217	rbd_assert(which < img_request->obj_request_count);
2218
2219	spin_lock_irq(&img_request->completion_lock);
2220	if (which != img_request->next_completion)
2221		goto out;
2222
2223	for_each_obj_request_from(img_request, obj_request) {
2224		rbd_assert(more);
2225		rbd_assert(which < img_request->obj_request_count);
2226
2227		if (!obj_request_done_test(obj_request))
2228			break;
2229		more = rbd_img_obj_end_request(obj_request);
2230		which++;
2231	}
2232
2233	rbd_assert(more ^ (which == img_request->obj_request_count));
2234	img_request->next_completion = which;
2235out:
2236	spin_unlock_irq(&img_request->completion_lock);
2237	rbd_img_request_put(img_request);
2238
2239	if (!more)
2240		rbd_img_request_complete(img_request);
2241}
2242
2243/*
2244 * Split up an image request into one or more object requests, each
2245 * to a different object.  The "type" parameter indicates whether
2246 * "data_desc" is the pointer to the head of a list of bio
2247 * structures, or the base of a page array.  In either case this
2248 * function assumes data_desc describes memory sufficient to hold
2249 * all data described by the image request.
2250 */
2251static int rbd_img_request_fill(struct rbd_img_request *img_request,
2252					enum obj_request_type type,
2253					void *data_desc)
2254{
2255	struct rbd_device *rbd_dev = img_request->rbd_dev;
2256	struct rbd_obj_request *obj_request = NULL;
2257	struct rbd_obj_request *next_obj_request;
2258	bool write_request = img_request_write_test(img_request);
2259	struct bio *bio_list = NULL;
2260	unsigned int bio_offset = 0;
2261	struct page **pages = NULL;
2262	u64 img_offset;
2263	u64 resid;
2264	u16 opcode;
2265
2266	dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2267		(int)type, data_desc);
2268
2269	opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
2270	img_offset = img_request->offset;
2271	resid = img_request->length;
2272	rbd_assert(resid > 0);
2273
2274	if (type == OBJ_REQUEST_BIO) {
2275		bio_list = data_desc;
2276		rbd_assert(img_offset ==
2277			   bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2278	} else {
2279		rbd_assert(type == OBJ_REQUEST_PAGES);
2280		pages = data_desc;
2281	}
2282
2283	while (resid) {
2284		struct ceph_osd_request *osd_req;
2285		const char *object_name;
2286		u64 offset;
2287		u64 length;
2288		unsigned int which = 0;
2289
2290		object_name = rbd_segment_name(rbd_dev, img_offset);
2291		if (!object_name)
2292			goto out_unwind;
2293		offset = rbd_segment_offset(rbd_dev, img_offset);
2294		length = rbd_segment_length(rbd_dev, img_offset, resid);
2295		obj_request = rbd_obj_request_create(object_name,
2296						offset, length, type);
2297		/* object request has its own copy of the object name */
2298		rbd_segment_name_free(object_name);
2299		if (!obj_request)
2300			goto out_unwind;
2301
2302		/*
2303		 * set obj_request->img_request before creating the
2304		 * osd_request so that it gets the right snapc
2305		 */
2306		rbd_img_obj_request_add(img_request, obj_request);
2307
2308		if (type == OBJ_REQUEST_BIO) {
2309			unsigned int clone_size;
2310
2311			rbd_assert(length <= (u64)UINT_MAX);
2312			clone_size = (unsigned int)length;
2313			obj_request->bio_list =
2314					bio_chain_clone_range(&bio_list,
2315								&bio_offset,
2316								clone_size,
2317								GFP_ATOMIC);
2318			if (!obj_request->bio_list)
2319				goto out_unwind;
2320		} else {
2321			unsigned int page_count;
2322
2323			obj_request->pages = pages;
2324			page_count = (u32)calc_pages_for(offset, length);
2325			obj_request->page_count = page_count;
2326			if ((offset + length) & ~PAGE_MASK)
2327				page_count--;	/* more on last page */
2328			pages += page_count;
2329		}
2330
2331		osd_req = rbd_osd_req_create(rbd_dev, write_request,
2332					     (write_request ? 2 : 1),
2333					     obj_request);
2334		if (!osd_req)
2335			goto out_unwind;
2336		obj_request->osd_req = osd_req;
2337		obj_request->callback = rbd_img_obj_callback;
2338		rbd_img_request_get(img_request);
2339
2340		if (write_request) {
2341			osd_req_op_alloc_hint_init(osd_req, which,
2342					     rbd_obj_bytes(&rbd_dev->header),
2343					     rbd_obj_bytes(&rbd_dev->header));
2344			which++;
2345		}
2346
2347		osd_req_op_extent_init(osd_req, which, opcode, offset, length,
2348				       0, 0);
2349		if (type == OBJ_REQUEST_BIO)
2350			osd_req_op_extent_osd_data_bio(osd_req, which,
2351					obj_request->bio_list, length);
2352		else
2353			osd_req_op_extent_osd_data_pages(osd_req, which,
2354					obj_request->pages, length,
2355					offset & ~PAGE_MASK, false, false);
2356
2357		if (write_request)
2358			rbd_osd_req_format_write(obj_request);
2359		else
2360			rbd_osd_req_format_read(obj_request);
2361
2362		obj_request->img_offset = img_offset;
2363
2364		img_offset += length;
2365		resid -= length;
2366	}
2367
2368	return 0;
2369
2370out_unwind:
2371	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2372		rbd_img_obj_request_del(img_request, obj_request);
2373
2374	return -ENOMEM;
2375}
2376
2377static void
2378rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2379{
2380	struct rbd_img_request *img_request;
2381	struct rbd_device *rbd_dev;
2382	struct page **pages;
2383	u32 page_count;
2384
2385	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2386	rbd_assert(obj_request_img_data_test(obj_request));
2387	img_request = obj_request->img_request;
2388	rbd_assert(img_request);
2389
2390	rbd_dev = img_request->rbd_dev;
2391	rbd_assert(rbd_dev);
2392
2393	pages = obj_request->copyup_pages;
2394	rbd_assert(pages != NULL);
2395	obj_request->copyup_pages = NULL;
2396	page_count = obj_request->copyup_page_count;
2397	rbd_assert(page_count);
2398	obj_request->copyup_page_count = 0;
2399	ceph_release_page_vector(pages, page_count);
2400
2401	/*
2402	 * We want the transfer count to reflect the size of the
2403	 * original write request.  There is no such thing as a
2404	 * successful short write, so if the request was successful
2405	 * we can just set it to the originally-requested length.
2406	 */
2407	if (!obj_request->result)
2408		obj_request->xferred = obj_request->length;
2409
2410	/* Finish up with the normal image object callback */
2411
2412	rbd_img_obj_callback(obj_request);
2413}
2414
2415static void
2416rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2417{
2418	struct rbd_obj_request *orig_request;
2419	struct ceph_osd_request *osd_req;
2420	struct ceph_osd_client *osdc;
2421	struct rbd_device *rbd_dev;
2422	struct page **pages;
2423	u32 page_count;
2424	int img_result;
2425	u64 parent_length;
2426	u64 offset;
2427	u64 length;
2428
2429	rbd_assert(img_request_child_test(img_request));
2430
2431	/* First get what we need from the image request */
2432
2433	pages = img_request->copyup_pages;
2434	rbd_assert(pages != NULL);
2435	img_request->copyup_pages = NULL;
2436	page_count = img_request->copyup_page_count;
2437	rbd_assert(page_count);
2438	img_request->copyup_page_count = 0;
2439
2440	orig_request = img_request->obj_request;
2441	rbd_assert(orig_request != NULL);
2442	rbd_assert(obj_request_type_valid(orig_request->type));
2443	img_result = img_request->result;
2444	parent_length = img_request->length;
2445	rbd_assert(parent_length == img_request->xferred);
2446	rbd_img_request_put(img_request);
2447
2448	rbd_assert(orig_request->img_request);
2449	rbd_dev = orig_request->img_request->rbd_dev;
2450	rbd_assert(rbd_dev);
2451
2452	/*
2453	 * If the overlap has become 0 (most likely because the
2454	 * image has been flattened) we need to free the pages
2455	 * and re-submit the original write request.
2456	 */
2457	if (!rbd_dev->parent_overlap) {
2458		struct ceph_osd_client *osdc;
2459
2460		ceph_release_page_vector(pages, page_count);
2461		osdc = &rbd_dev->rbd_client->client->osdc;
2462		img_result = rbd_obj_request_submit(osdc, orig_request);
2463		if (!img_result)
2464			return;
2465	}
2466
2467	if (img_result)
2468		goto out_err;
2469
2470	/*
2471	 * The original osd request is of no use to use any more.
2472	 * We need a new one that can hold the three ops in a copyup
2473	 * request.  Allocate the new copyup osd request for the
2474	 * original request, and release the old one.
2475	 */
2476	img_result = -ENOMEM;
2477	osd_req = rbd_osd_req_create_copyup(orig_request);
2478	if (!osd_req)
2479		goto out_err;
2480	rbd_osd_req_destroy(orig_request->osd_req);
2481	orig_request->osd_req = osd_req;
2482	orig_request->copyup_pages = pages;
2483	orig_request->copyup_page_count = page_count;
2484
2485	/* Initialize the copyup op */
2486
2487	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2488	osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2489						false, false);
2490
2491	/* Then the hint op */
2492
2493	osd_req_op_alloc_hint_init(osd_req, 1, rbd_obj_bytes(&rbd_dev->header),
2494				   rbd_obj_bytes(&rbd_dev->header));
2495
2496	/* And the original write request op */
2497
2498	offset = orig_request->offset;
2499	length = orig_request->length;
2500	osd_req_op_extent_init(osd_req, 2, CEPH_OSD_OP_WRITE,
2501					offset, length, 0, 0);
2502	if (orig_request->type == OBJ_REQUEST_BIO)
2503		osd_req_op_extent_osd_data_bio(osd_req, 2,
2504					orig_request->bio_list, length);
2505	else
2506		osd_req_op_extent_osd_data_pages(osd_req, 2,
2507					orig_request->pages, length,
2508					offset & ~PAGE_MASK, false, false);
2509
2510	rbd_osd_req_format_write(orig_request);
2511
2512	/* All set, send it off. */
2513
2514	orig_request->callback = rbd_img_obj_copyup_callback;
2515	osdc = &rbd_dev->rbd_client->client->osdc;
2516	img_result = rbd_obj_request_submit(osdc, orig_request);
2517	if (!img_result)
2518		return;
2519out_err:
2520	/* Record the error code and complete the request */
2521
2522	orig_request->result = img_result;
2523	orig_request->xferred = 0;
2524	obj_request_done_set(orig_request);
2525	rbd_obj_request_complete(orig_request);
2526}
2527
2528/*
2529 * Read from the parent image the range of data that covers the
2530 * entire target of the given object request.  This is used for
2531 * satisfying a layered image write request when the target of an
2532 * object request from the image request does not exist.
2533 *
2534 * A page array big enough to hold the returned data is allocated
2535 * and supplied to rbd_img_request_fill() as the "data descriptor."
2536 * When the read completes, this page array will be transferred to
2537 * the original object request for the copyup operation.
2538 *
2539 * If an error occurs, record it as the result of the original
2540 * object request and mark it done so it gets completed.
2541 */
2542static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2543{
2544	struct rbd_img_request *img_request = NULL;
2545	struct rbd_img_request *parent_request = NULL;
2546	struct rbd_device *rbd_dev;
2547	u64 img_offset;
2548	u64 length;
2549	struct page **pages = NULL;
2550	u32 page_count;
2551	int result;
2552
2553	rbd_assert(obj_request_img_data_test(obj_request));
2554	rbd_assert(obj_request_type_valid(obj_request->type));
2555
2556	img_request = obj_request->img_request;
2557	rbd_assert(img_request != NULL);
2558	rbd_dev = img_request->rbd_dev;
2559	rbd_assert(rbd_dev->parent != NULL);
2560
2561	/*
2562	 * Determine the byte range covered by the object in the
2563	 * child image to which the original request was to be sent.
2564	 */
2565	img_offset = obj_request->img_offset - obj_request->offset;
2566	length = (u64)1 << rbd_dev->header.obj_order;
2567
2568	/*
2569	 * There is no defined parent data beyond the parent
2570	 * overlap, so limit what we read at that boundary if
2571	 * necessary.
2572	 */
2573	if (img_offset + length > rbd_dev->parent_overlap) {
2574		rbd_assert(img_offset < rbd_dev->parent_overlap);
2575		length = rbd_dev->parent_overlap - img_offset;
2576	}
2577
2578	/*
2579	 * Allocate a page array big enough to receive the data read
2580	 * from the parent.
2581	 */
2582	page_count = (u32)calc_pages_for(0, length);
2583	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2584	if (IS_ERR(pages)) {
2585		result = PTR_ERR(pages);
2586		pages = NULL;
2587		goto out_err;
2588	}
2589
2590	result = -ENOMEM;
2591	parent_request = rbd_parent_request_create(obj_request,
2592						img_offset, length);
2593	if (!parent_request)
2594		goto out_err;
2595
2596	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2597	if (result)
2598		goto out_err;
2599	parent_request->copyup_pages = pages;
2600	parent_request->copyup_page_count = page_count;
2601
2602	parent_request->callback = rbd_img_obj_parent_read_full_callback;
2603	result = rbd_img_request_submit(parent_request);
2604	if (!result)
2605		return 0;
2606
2607	parent_request->copyup_pages = NULL;
2608	parent_request->copyup_page_count = 0;
2609	parent_request->obj_request = NULL;
2610	rbd_obj_request_put(obj_request);
2611out_err:
2612	if (pages)
2613		ceph_release_page_vector(pages, page_count);
2614	if (parent_request)
2615		rbd_img_request_put(parent_request);
2616	obj_request->result = result;
2617	obj_request->xferred = 0;
2618	obj_request_done_set(obj_request);
2619
2620	return result;
2621}
2622
2623static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2624{
2625	struct rbd_obj_request *orig_request;
2626	struct rbd_device *rbd_dev;
2627	int result;
2628
2629	rbd_assert(!obj_request_img_data_test(obj_request));
2630
2631	/*
2632	 * All we need from the object request is the original
2633	 * request and the result of the STAT op.  Grab those, then
2634	 * we're done with the request.
2635	 */
2636	orig_request = obj_request->obj_request;
2637	obj_request->obj_request = NULL;
2638	rbd_obj_request_put(orig_request);
2639	rbd_assert(orig_request);
2640	rbd_assert(orig_request->img_request);
2641
2642	result = obj_request->result;
2643	obj_request->result = 0;
2644
2645	dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2646		obj_request, orig_request, result,
2647		obj_request->xferred, obj_request->length);
2648	rbd_obj_request_put(obj_request);
2649
2650	/*
2651	 * If the overlap has become 0 (most likely because the
2652	 * image has been flattened) we need to free the pages
2653	 * and re-submit the original write request.
2654	 */
2655	rbd_dev = orig_request->img_request->rbd_dev;
2656	if (!rbd_dev->parent_overlap) {
2657		struct ceph_osd_client *osdc;
2658
2659		osdc = &rbd_dev->rbd_client->client->osdc;
2660		result = rbd_obj_request_submit(osdc, orig_request);
2661		if (!result)
2662			return;
2663	}
2664
2665	/*
2666	 * Our only purpose here is to determine whether the object
2667	 * exists, and we don't want to treat the non-existence as
2668	 * an error.  If something else comes back, transfer the
2669	 * error to the original request and complete it now.
2670	 */
2671	if (!result) {
2672		obj_request_existence_set(orig_request, true);
2673	} else if (result == -ENOENT) {
2674		obj_request_existence_set(orig_request, false);
2675	} else if (result) {
2676		orig_request->result = result;
2677		goto out;
2678	}
2679
2680	/*
2681	 * Resubmit the original request now that we have recorded
2682	 * whether the target object exists.
2683	 */
2684	orig_request->result = rbd_img_obj_request_submit(orig_request);
2685out:
2686	if (orig_request->result)
2687		rbd_obj_request_complete(orig_request);
2688}
2689
2690static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2691{
2692	struct rbd_obj_request *stat_request;
2693	struct rbd_device *rbd_dev;
2694	struct ceph_osd_client *osdc;
2695	struct page **pages = NULL;
2696	u32 page_count;
2697	size_t size;
2698	int ret;
2699
2700	/*
2701	 * The response data for a STAT call consists of:
2702	 *     le64 length;
2703	 *     struct {
2704	 *         le32 tv_sec;
2705	 *         le32 tv_nsec;
2706	 *     } mtime;
2707	 */
2708	size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2709	page_count = (u32)calc_pages_for(0, size);
2710	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2711	if (IS_ERR(pages))
2712		return PTR_ERR(pages);
2713
2714	ret = -ENOMEM;
2715	stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2716							OBJ_REQUEST_PAGES);
2717	if (!stat_request)
2718		goto out;
2719
2720	rbd_obj_request_get(obj_request);
2721	stat_request->obj_request = obj_request;
2722	stat_request->pages = pages;
2723	stat_request->page_count = page_count;
2724
2725	rbd_assert(obj_request->img_request);
2726	rbd_dev = obj_request->img_request->rbd_dev;
2727	stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
2728						   stat_request);
2729	if (!stat_request->osd_req)
2730		goto out;
2731	stat_request->callback = rbd_img_obj_exists_callback;
2732
2733	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2734	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2735					false, false);
2736	rbd_osd_req_format_read(stat_request);
2737
2738	osdc = &rbd_dev->rbd_client->client->osdc;
2739	ret = rbd_obj_request_submit(osdc, stat_request);
2740out:
2741	if (ret)
2742		rbd_obj_request_put(obj_request);
2743
2744	return ret;
2745}
2746
2747static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2748{
2749	struct rbd_img_request *img_request;
2750	struct rbd_device *rbd_dev;
2751	bool known;
2752
2753	rbd_assert(obj_request_img_data_test(obj_request));
2754
2755	img_request = obj_request->img_request;
2756	rbd_assert(img_request);
2757	rbd_dev = img_request->rbd_dev;
2758
2759	/*
2760	 * Only writes to layered images need special handling.
2761	 * Reads and non-layered writes are simple object requests.
2762	 * Layered writes that start beyond the end of the overlap
2763	 * with the parent have no parent data, so they too are
2764	 * simple object requests.  Finally, if the target object is
2765	 * known to already exist, its parent data has already been
2766	 * copied, so a write to the object can also be handled as a
2767	 * simple object request.
2768	 */
2769	if (!img_request_write_test(img_request) ||
2770		!img_request_layered_test(img_request) ||
2771		!obj_request_overlaps_parent(obj_request) ||
2772		((known = obj_request_known_test(obj_request)) &&
2773			obj_request_exists_test(obj_request))) {
2774
2775		struct rbd_device *rbd_dev;
2776		struct ceph_osd_client *osdc;
2777
2778		rbd_dev = obj_request->img_request->rbd_dev;
2779		osdc = &rbd_dev->rbd_client->client->osdc;
2780
2781		return rbd_obj_request_submit(osdc, obj_request);
2782	}
2783
2784	/*
2785	 * It's a layered write.  The target object might exist but
2786	 * we may not know that yet.  If we know it doesn't exist,
2787	 * start by reading the data for the full target object from
2788	 * the parent so we can use it for a copyup to the target.
2789	 */
2790	if (known)
2791		return rbd_img_obj_parent_read_full(obj_request);
2792
2793	/* We don't know whether the target exists.  Go find out. */
2794
2795	return rbd_img_obj_exists_submit(obj_request);
2796}
2797
2798static int rbd_img_request_submit(struct rbd_img_request *img_request)
2799{
2800	struct rbd_obj_request *obj_request;
2801	struct rbd_obj_request *next_obj_request;
2802
2803	dout("%s: img %p\n", __func__, img_request);
2804	for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2805		int ret;
2806
2807		ret = rbd_img_obj_request_submit(obj_request);
2808		if (ret)
2809			return ret;
2810	}
2811
2812	return 0;
2813}
2814
2815static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2816{
2817	struct rbd_obj_request *obj_request;
2818	struct rbd_device *rbd_dev;
2819	u64 obj_end;
2820	u64 img_xferred;
2821	int img_result;
2822
2823	rbd_assert(img_request_child_test(img_request));
2824
2825	/* First get what we need from the image request and release it */
2826
2827	obj_request = img_request->obj_request;
2828	img_xferred = img_request->xferred;
2829	img_result = img_request->result;
2830	rbd_img_request_put(img_request);
2831
2832	/*
2833	 * If the overlap has become 0 (most likely because the
2834	 * image has been flattened) we need to re-submit the
2835	 * original request.
2836	 */
2837	rbd_assert(obj_request);
2838	rbd_assert(obj_request->img_request);
2839	rbd_dev = obj_request->img_request->rbd_dev;
2840	if (!rbd_dev->parent_overlap) {
2841		struct ceph_osd_client *osdc;
2842
2843		osdc = &rbd_dev->rbd_client->client->osdc;
2844		img_result = rbd_obj_request_submit(osdc, obj_request);
2845		if (!img_result)
2846			return;
2847	}
2848
2849	obj_request->result = img_result;
2850	if (obj_request->result)
2851		goto out;
2852
2853	/*
2854	 * We need to zero anything beyond the parent overlap
2855	 * boundary.  Since rbd_img_obj_request_read_callback()
2856	 * will zero anything beyond the end of a short read, an
2857	 * easy way to do this is to pretend the data from the
2858	 * parent came up short--ending at the overlap boundary.
2859	 */
2860	rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2861	obj_end = obj_request->img_offset + obj_request->length;
2862	if (obj_end > rbd_dev->parent_overlap) {
2863		u64 xferred = 0;
2864
2865		if (obj_request->img_offset < rbd_dev->parent_overlap)
2866			xferred = rbd_dev->parent_overlap -
2867					obj_request->img_offset;
2868
2869		obj_request->xferred = min(img_xferred, xferred);
2870	} else {
2871		obj_request->xferred = img_xferred;
2872	}
2873out:
2874	rbd_img_obj_request_read_callback(obj_request);
2875	rbd_obj_request_complete(obj_request);
2876}
2877
2878static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2879{
2880	struct rbd_img_request *img_request;
2881	int result;
2882
2883	rbd_assert(obj_request_img_data_test(obj_request));
2884	rbd_assert(obj_request->img_request != NULL);
2885	rbd_assert(obj_request->result == (s32) -ENOENT);
2886	rbd_assert(obj_request_type_valid(obj_request->type));
2887
2888	/* rbd_read_finish(obj_request, obj_request->length); */
2889	img_request = rbd_parent_request_create(obj_request,
2890						obj_request->img_offset,
2891						obj_request->length);
2892	result = -ENOMEM;
2893	if (!img_request)
2894		goto out_err;
2895
2896	if (obj_request->type == OBJ_REQUEST_BIO)
2897		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2898						obj_request->bio_list);
2899	else
2900		result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2901						obj_request->pages);
2902	if (result)
2903		goto out_err;
2904
2905	img_request->callback = rbd_img_parent_read_callback;
2906	result = rbd_img_request_submit(img_request);
2907	if (result)
2908		goto out_err;
2909
2910	return;
2911out_err:
2912	if (img_request)
2913		rbd_img_request_put(img_request);
2914	obj_request->result = result;
2915	obj_request->xferred = 0;
2916	obj_request_done_set(obj_request);
2917}
2918
2919static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
2920{
2921	struct rbd_obj_request *obj_request;
2922	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2923	int ret;
2924
2925	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2926							OBJ_REQUEST_NODATA);
2927	if (!obj_request)
2928		return -ENOMEM;
2929
2930	ret = -ENOMEM;
2931	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
2932						  obj_request);
2933	if (!obj_request->osd_req)
2934		goto out;
2935
2936	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2937					notify_id, 0, 0);
2938	rbd_osd_req_format_read(obj_request);
2939
2940	ret = rbd_obj_request_submit(osdc, obj_request);
2941	if (ret)
2942		goto out;
2943	ret = rbd_obj_request_wait(obj_request);
2944out:
2945	rbd_obj_request_put(obj_request);
2946
2947	return ret;
2948}
2949
2950static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2951{
2952	struct rbd_device *rbd_dev = (struct rbd_device *)data;
2953	int ret;
2954
2955	if (!rbd_dev)
2956		return;
2957
2958	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2959		rbd_dev->header_name, (unsigned long long)notify_id,
2960		(unsigned int)opcode);
2961
2962	/*
2963	 * Until adequate refresh error handling is in place, there is
2964	 * not much we can do here, except warn.
2965	 *
2966	 * See http://tracker.ceph.com/issues/5040
2967	 */
2968	ret = rbd_dev_refresh(rbd_dev);
2969	if (ret)
2970		rbd_warn(rbd_dev, "refresh failed: %d\n", ret);
2971
2972	ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
2973	if (ret)
2974		rbd_warn(rbd_dev, "notify_ack ret %d\n", ret);
2975}
2976
2977/*
2978 * Send a (un)watch request and wait for the ack.  Return a request
2979 * with a ref held on success or error.
2980 */
2981static struct rbd_obj_request *rbd_obj_watch_request_helper(
2982						struct rbd_device *rbd_dev,
2983						bool watch)
2984{
2985	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2986	struct rbd_obj_request *obj_request;
2987	int ret;
2988
2989	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2990					     OBJ_REQUEST_NODATA);
2991	if (!obj_request)
2992		return ERR_PTR(-ENOMEM);
2993
2994	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
2995						  obj_request);
2996	if (!obj_request->osd_req) {
2997		ret = -ENOMEM;
2998		goto out;
2999	}
3000
3001	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
3002			      rbd_dev->watch_event->cookie, 0, watch);
3003	rbd_osd_req_format_write(obj_request);
3004
3005	if (watch)
3006		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
3007
3008	ret = rbd_obj_request_submit(osdc, obj_request);
3009	if (ret)
3010		goto out;
3011
3012	ret = rbd_obj_request_wait(obj_request);
3013	if (ret)
3014		goto out;
3015
3016	ret = obj_request->result;
3017	if (ret) {
3018		if (watch)
3019			rbd_obj_request_end(obj_request);
3020		goto out;
3021	}
3022
3023	return obj_request;
3024
3025out:
3026	rbd_obj_request_put(obj_request);
3027	return ERR_PTR(ret);
3028}
3029
3030/*
3031 * Initiate a watch request, synchronously.
3032 */
3033static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3034{
3035	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3036	struct rbd_obj_request *obj_request;
3037	int ret;
3038
3039	rbd_assert(!rbd_dev->watch_event);
3040	rbd_assert(!rbd_dev->watch_request);
3041
3042	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
3043				     &rbd_dev->watch_event);
3044	if (ret < 0)
3045		return ret;
3046
3047	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
3048	if (IS_ERR(obj_request)) {
3049		ceph_osdc_cancel_event(rbd_dev->watch_event);
3050		rbd_dev->watch_event = NULL;
3051		return PTR_ERR(obj_request);
3052	}
3053
3054	/*
3055	 * A watch request is set to linger, so the underlying osd
3056	 * request won't go away until we unregister it.  We retain
3057	 * a pointer to the object request during that time (in
3058	 * rbd_dev->watch_request), so we'll keep a reference to it.
3059	 * We'll drop that reference after we've unregistered it in
3060	 * rbd_dev_header_unwatch_sync().
3061	 */
3062	rbd_dev->watch_request = obj_request;
3063
3064	return 0;
3065}
3066
3067/*
3068 * Tear down a watch request, synchronously.
3069 */
3070static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3071{
3072	struct rbd_obj_request *obj_request;
3073
3074	rbd_assert(rbd_dev->watch_event);
3075	rbd_assert(rbd_dev->watch_request);
3076
3077	rbd_obj_request_end(rbd_dev->watch_request);
3078	rbd_obj_request_put(rbd_dev->watch_request);
3079	rbd_dev->watch_request = NULL;
3080
3081	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
3082	if (!IS_ERR(obj_request))
3083		rbd_obj_request_put(obj_request);
3084	else
3085		rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
3086			 PTR_ERR(obj_request));
3087
3088	ceph_osdc_cancel_event(rbd_dev->watch_event);
3089	rbd_dev->watch_event = NULL;
3090}
3091
3092/*
3093 * Synchronous osd object method call.  Returns the number of bytes
3094 * returned in the outbound buffer, or a negative error code.
3095 */
3096static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3097			     const char *object_name,
3098			     const char *class_name,
3099			     const char *method_name,
3100			     const void *outbound,
3101			     size_t outbound_size,
3102			     void *inbound,
3103			     size_t inbound_size)
3104{
3105	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3106	struct rbd_obj_request *obj_request;
3107	struct page **pages;
3108	u32 page_count;
3109	int ret;
3110
3111	/*
3112	 * Method calls are ultimately read operations.  The result
3113	 * should placed into the inbound buffer provided.  They
3114	 * also supply outbound data--parameters for the object
3115	 * method.  Currently if this is present it will be a
3116	 * snapshot id.
3117	 */
3118	page_count = (u32)calc_pages_for(0, inbound_size);
3119	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3120	if (IS_ERR(pages))
3121		return PTR_ERR(pages);
3122
3123	ret = -ENOMEM;
3124	obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
3125							OBJ_REQUEST_PAGES);
3126	if (!obj_request)
3127		goto out;
3128
3129	obj_request->pages = pages;
3130	obj_request->page_count = page_count;
3131
3132	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
3133						  obj_request);
3134	if (!obj_request->osd_req)
3135		goto out;
3136
3137	osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
3138					class_name, method_name);
3139	if (outbound_size) {
3140		struct ceph_pagelist *pagelist;
3141
3142		pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3143		if (!pagelist)
3144			goto out;
3145
3146		ceph_pagelist_init(pagelist);
3147		ceph_pagelist_append(pagelist, outbound, outbound_size);
3148		osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3149						pagelist);
3150	}
3151	osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3152					obj_request->pages, inbound_size,
3153					0, false, false);
3154	rbd_osd_req_format_read(obj_request);
3155
3156	ret = rbd_obj_request_submit(osdc, obj_request);
3157	if (ret)
3158		goto out;
3159	ret = rbd_obj_request_wait(obj_request);
3160	if (ret)
3161		goto out;
3162
3163	ret = obj_request->result;
3164	if (ret < 0)
3165		goto out;
3166
3167	rbd_assert(obj_request->xferred < (u64)INT_MAX);
3168	ret = (int)obj_request->xferred;
3169	ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3170out:
3171	if (obj_request)
3172		rbd_obj_request_put(obj_request);
3173	else
3174		ceph_release_page_vector(pages, page_count);
3175
3176	return ret;
3177}
3178
3179static void rbd_request_fn(struct request_queue *q)
3180		__releases(q->queue_lock) __acquires(q->queue_lock)
3181{
3182	struct rbd_device *rbd_dev = q->queuedata;
3183	struct request *rq;
3184	int result;
3185
3186	while ((rq = blk_fetch_request(q))) {
3187		bool write_request = rq_data_dir(rq) == WRITE;
3188		struct rbd_img_request *img_request;
3189		u64 offset;
3190		u64 length;
3191
3192		/* Ignore any non-FS requests that filter through. */
3193
3194		if (rq->cmd_type != REQ_TYPE_FS) {
3195			dout("%s: non-fs request type %d\n", __func__,
3196				(int) rq->cmd_type);
3197			__blk_end_request_all(rq, 0);
3198			continue;
3199		}
3200
3201		/* Ignore/skip any zero-length requests */
3202
3203		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3204		length = (u64) blk_rq_bytes(rq);
3205
3206		if (!length) {
3207			dout("%s: zero-length request\n", __func__);
3208			__blk_end_request_all(rq, 0);
3209			continue;
3210		}
3211
3212		spin_unlock_irq(q->queue_lock);
3213
3214		/* Disallow writes to a read-only device */
3215
3216		if (write_request) {
3217			result = -EROFS;
3218			if (rbd_dev->mapping.read_only)
3219				goto end_request;
3220			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3221		}
3222
3223		/*
3224		 * Quit early if the mapped snapshot no longer
3225		 * exists.  It's still possible the snapshot will
3226		 * have disappeared by the time our request arrives
3227		 * at the osd, but there's no sense in sending it if
3228		 * we already know.
3229		 */
3230		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3231			dout("request for non-existent snapshot");
3232			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3233			result = -ENXIO;
3234			goto end_request;
3235		}
3236
3237		result = -EINVAL;
3238		if (offset && length > U64_MAX - offset + 1) {
3239			rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3240				offset, length);
3241			goto end_request;	/* Shouldn't happen */
3242		}
3243
3244		result = -EIO;
3245		if (offset + length > rbd_dev->mapping.size) {
3246			rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3247				offset, length, rbd_dev->mapping.size);
3248			goto end_request;
3249		}
3250
3251		result = -ENOMEM;
3252		img_request = rbd_img_request_create(rbd_dev, offset, length,
3253							write_request);
3254		if (!img_request)
3255			goto end_request;
3256
3257		img_request->rq = rq;
3258
3259		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3260						rq->bio);
3261		if (!result)
3262			result = rbd_img_request_submit(img_request);
3263		if (result)
3264			rbd_img_request_put(img_request);
3265end_request:
3266		spin_lock_irq(q->queue_lock);
3267		if (result < 0) {
3268			rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3269				write_request ? "write" : "read",
3270				length, offset, result);
3271
3272			__blk_end_request_all(rq, result);
3273		}
3274	}
3275}
3276
3277/*
3278 * a queue callback. Makes sure that we don't create a bio that spans across
3279 * multiple osd objects. One exception would be with a single page bios,
3280 * which we handle later at bio_chain_clone_range()
3281 */
3282static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3283			  struct bio_vec *bvec)
3284{
3285	struct rbd_device *rbd_dev = q->queuedata;
3286	sector_t sector_offset;
3287	sector_t sectors_per_obj;
3288	sector_t obj_sector_offset;
3289	int ret;
3290
3291	/*
3292	 * Find how far into its rbd object the partition-relative
3293	 * bio start sector is to offset relative to the enclosing
3294	 * device.
3295	 */
3296	sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3297	sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3298	obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3299
3300	/*
3301	 * Compute the number of bytes from that offset to the end
3302	 * of the object.  Account for what's already used by the bio.
3303	 */
3304	ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3305	if (ret > bmd->bi_size)
3306		ret -= bmd->bi_size;
3307	else
3308		ret = 0;
3309
3310	/*
3311	 * Don't send back more than was asked for.  And if the bio
3312	 * was empty, let the whole thing through because:  "Note
3313	 * that a block device *must* allow a single page to be
3314	 * added to an empty bio."
3315	 */
3316	rbd_assert(bvec->bv_len <= PAGE_SIZE);
3317	if (ret > (int) bvec->bv_len || !bmd->bi_size)
3318		ret = (int) bvec->bv_len;
3319
3320	return ret;
3321}
3322
3323static void rbd_free_disk(struct rbd_device *rbd_dev)
3324{
3325	struct gendisk *disk = rbd_dev->disk;
3326
3327	if (!disk)
3328		return;
3329
3330	rbd_dev->disk = NULL;
3331	if (disk->flags & GENHD_FL_UP) {
3332		del_gendisk(disk);
3333		if (disk->queue)
3334			blk_cleanup_queue(disk->queue);
3335	}
3336	put_disk(disk);
3337}
3338
3339static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3340				const char *object_name,
3341				u64 offset, u64 length, void *buf)
3342
3343{
3344	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3345	struct rbd_obj_request *obj_request;
3346	struct page **pages = NULL;
3347	u32 page_count;
3348	size_t size;
3349	int ret;
3350
3351	page_count = (u32) calc_pages_for(offset, length);
3352	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3353	if (IS_ERR(pages))
3354		ret = PTR_ERR(pages);
3355
3356	ret = -ENOMEM;
3357	obj_request = rbd_obj_request_create(object_name, offset, length,
3358							OBJ_REQUEST_PAGES);
3359	if (!obj_request)
3360		goto out;
3361
3362	obj_request->pages = pages;
3363	obj_request->page_count = page_count;
3364
3365	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1,
3366						  obj_request);
3367	if (!obj_request->osd_req)
3368		goto out;
3369
3370	osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3371					offset, length, 0, 0);
3372	osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3373					obj_request->pages,
3374					obj_request->length,
3375					obj_request->offset & ~PAGE_MASK,
3376					false, false);
3377	rbd_osd_req_format_read(obj_request);
3378
3379	ret = rbd_obj_request_submit(osdc, obj_request);
3380	if (ret)
3381		goto out;
3382	ret = rbd_obj_request_wait(obj_request);
3383	if (ret)
3384		goto out;
3385
3386	ret = obj_request->result;
3387	if (ret < 0)
3388		goto out;
3389
3390	rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3391	size = (size_t) obj_request->xferred;
3392	ceph_copy_from_page_vector(pages, buf, 0, size);
3393	rbd_assert(size <= (size_t)INT_MAX);
3394	ret = (int)size;
3395out:
3396	if (obj_request)
3397		rbd_obj_request_put(obj_request);
3398	else
3399		ceph_release_page_vector(pages, page_count);
3400
3401	return ret;
3402}
3403
3404/*
3405 * Read the complete header for the given rbd device.  On successful
3406 * return, the rbd_dev->header field will contain up-to-date
3407 * information about the image.
3408 */
3409static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3410{
3411	struct rbd_image_header_ondisk *ondisk = NULL;
3412	u32 snap_count = 0;
3413	u64 names_size = 0;
3414	u32 want_count;
3415	int ret;
3416
3417	/*
3418	 * The complete header will include an array of its 64-bit
3419	 * snapshot ids, followed by the names of those snapshots as
3420	 * a contiguous block of NUL-terminated strings.  Note that
3421	 * the number of snapshots could change by the time we read
3422	 * it in, in which case we re-read it.
3423	 */
3424	do {
3425		size_t size;
3426
3427		kfree(ondisk);
3428
3429		size = sizeof (*ondisk);
3430		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3431		size += names_size;
3432		ondisk = kmalloc(size, GFP_KERNEL);
3433		if (!ondisk)
3434			return -ENOMEM;
3435
3436		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3437				       0, size, ondisk);
3438		if (ret < 0)
3439			goto out;
3440		if ((size_t)ret < size) {
3441			ret = -ENXIO;
3442			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3443				size, ret);
3444			goto out;
3445		}
3446		if (!rbd_dev_ondisk_valid(ondisk)) {
3447			ret = -ENXIO;
3448			rbd_warn(rbd_dev, "invalid header");
3449			goto out;
3450		}
3451
3452		names_size = le64_to_cpu(ondisk->snap_names_len);
3453		want_count = snap_count;
3454		snap_count = le32_to_cpu(ondisk->snap_count);
3455	} while (snap_count != want_count);
3456
3457	ret = rbd_header_from_disk(rbd_dev, ondisk);
3458out:
3459	kfree(ondisk);
3460
3461	return ret;
3462}
3463
3464/*
3465 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3466 * has disappeared from the (just updated) snapshot context.
3467 */
3468static void rbd_exists_validate(struct rbd_device *rbd_dev)
3469{
3470	u64 snap_id;
3471
3472	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3473		return;
3474
3475	snap_id = rbd_dev->spec->snap_id;
3476	if (snap_id == CEPH_NOSNAP)
3477		return;
3478
3479	if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3480		clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3481}
3482
3483static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3484{
3485	sector_t size;
3486	bool removing;
3487
3488	/*
3489	 * Don't hold the lock while doing disk operations,
3490	 * or lock ordering will conflict with the bdev mutex via:
3491	 * rbd_add() -> blkdev_get() -> rbd_open()
3492	 */
3493	spin_lock_irq(&rbd_dev->lock);
3494	removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3495	spin_unlock_irq(&rbd_dev->lock);
3496	/*
3497	 * If the device is being removed, rbd_dev->disk has
3498	 * been destroyed, so don't try to update its size
3499	 */
3500	if (!removing) {
3501		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3502		dout("setting size to %llu sectors", (unsigned long long)size);
3503		set_capacity(rbd_dev->disk, size);
3504		revalidate_disk(rbd_dev->disk);
3505	}
3506}
3507
3508static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3509{
3510	u64 mapping_size;
3511	int ret;
3512
3513	down_write(&rbd_dev->header_rwsem);
3514	mapping_size = rbd_dev->mapping.size;
3515
3516	ret = rbd_dev_header_info(rbd_dev);
3517	if (ret)
3518		return ret;
3519
3520	/*
3521	 * If there is a parent, see if it has disappeared due to the
3522	 * mapped image getting flattened.
3523	 */
3524	if (rbd_dev->parent) {
3525		ret = rbd_dev_v2_parent_info(rbd_dev);
3526		if (ret)
3527			return ret;
3528	}
3529
3530	if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3531		if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3532			rbd_dev->mapping.size = rbd_dev->header.image_size;
3533	} else {
3534		/* validate mapped snapshot's EXISTS flag */
3535		rbd_exists_validate(rbd_dev);
3536	}
3537
3538	up_write(&rbd_dev->header_rwsem);
3539
3540	if (mapping_size != rbd_dev->mapping.size)
3541		rbd_dev_update_size(rbd_dev);
3542
3543	return 0;
3544}
3545
3546static int rbd_init_disk(struct rbd_device *rbd_dev)
3547{
3548	struct gendisk *disk;
3549	struct request_queue *q;
3550	u64 segment_size;
3551
3552	/* create gendisk info */
3553	disk = alloc_disk(single_major ?
3554			  (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3555			  RBD_MINORS_PER_MAJOR);
3556	if (!disk)
3557		return -ENOMEM;
3558
3559	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3560		 rbd_dev->dev_id);
3561	disk->major = rbd_dev->major;
3562	disk->first_minor = rbd_dev->minor;
3563	if (single_major)
3564		disk->flags |= GENHD_FL_EXT_DEVT;
3565	disk->fops = &rbd_bd_ops;
3566	disk->private_data = rbd_dev;
3567
3568	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3569	if (!q)
3570		goto out_disk;
3571
3572	/* We use the default size, but let's be explicit about it. */
3573	blk_queue_physical_block_size(q, SECTOR_SIZE);
3574
3575	/* set io sizes to object size */
3576	segment_size = rbd_obj_bytes(&rbd_dev->header);
3577	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3578	blk_queue_max_segment_size(q, segment_size);
3579	blk_queue_io_min(q, segment_size);
3580	blk_queue_io_opt(q, segment_size);
3581
3582	blk_queue_merge_bvec(q, rbd_merge_bvec);
3583	disk->queue = q;
3584
3585	q->queuedata = rbd_dev;
3586
3587	rbd_dev->disk = disk;
3588
3589	return 0;
3590out_disk:
3591	put_disk(disk);
3592
3593	return -ENOMEM;
3594}
3595
3596/*
3597  sysfs
3598*/
3599
3600static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3601{
3602	return container_of(dev, struct rbd_device, dev);
3603}
3604
3605static ssize_t rbd_size_show(struct device *dev,
3606			     struct device_attribute *attr, char *buf)
3607{
3608	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3609
3610	return sprintf(buf, "%llu\n",
3611		(unsigned long long)rbd_dev->mapping.size);
3612}
3613
3614/*
3615 * Note this shows the features for whatever's mapped, which is not
3616 * necessarily the base image.
3617 */
3618static ssize_t rbd_features_show(struct device *dev,
3619			     struct device_attribute *attr, char *buf)
3620{
3621	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3622
3623	return sprintf(buf, "0x%016llx\n",
3624			(unsigned long long)rbd_dev->mapping.features);
3625}
3626
3627static ssize_t rbd_major_show(struct device *dev,
3628			      struct device_attribute *attr, char *buf)
3629{
3630	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3631
3632	if (rbd_dev->major)
3633		return sprintf(buf, "%d\n", rbd_dev->major);
3634
3635	return sprintf(buf, "(none)\n");
3636}
3637
3638static ssize_t rbd_minor_show(struct device *dev,
3639			      struct device_attribute *attr, char *buf)
3640{
3641	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3642
3643	return sprintf(buf, "%d\n", rbd_dev->minor);
3644}
3645
3646static ssize_t rbd_client_id_show(struct device *dev,
3647				  struct device_attribute *attr, char *buf)
3648{
3649	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3650
3651	return sprintf(buf, "client%lld\n",
3652			ceph_client_id(rbd_dev->rbd_client->client));
3653}
3654
3655static ssize_t rbd_pool_show(struct device *dev,
3656			     struct device_attribute *attr, char *buf)
3657{
3658	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3659
3660	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3661}
3662
3663static ssize_t rbd_pool_id_show(struct device *dev,
3664			     struct device_attribute *attr, char *buf)
3665{
3666	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3667
3668	return sprintf(buf, "%llu\n",
3669			(unsigned long long) rbd_dev->spec->pool_id);
3670}
3671
3672static ssize_t rbd_name_show(struct device *dev,
3673			     struct device_attribute *attr, char *buf)
3674{
3675	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3676
3677	if (rbd_dev->spec->image_name)
3678		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3679
3680	return sprintf(buf, "(unknown)\n");
3681}
3682
3683static ssize_t rbd_image_id_show(struct device *dev,
3684			     struct device_attribute *attr, char *buf)
3685{
3686	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3687
3688	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3689}
3690
3691/*
3692 * Shows the name of the currently-mapped snapshot (or
3693 * RBD_SNAP_HEAD_NAME for the base image).
3694 */
3695static ssize_t rbd_snap_show(struct device *dev,
3696			     struct device_attribute *attr,
3697			     char *buf)
3698{
3699	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3700
3701	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3702}
3703
3704/*
3705 * For a v2 image, shows the chain of parent images, separated by empty
3706 * lines.  For v1 images or if there is no parent, shows "(no parent
3707 * image)".
3708 */
3709static ssize_t rbd_parent_show(struct device *dev,
3710			       struct device_attribute *attr,
3711			       char *buf)
3712{
3713	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3714	ssize_t count = 0;
3715
3716	if (!rbd_dev->parent)
3717		return sprintf(buf, "(no parent image)\n");
3718
3719	for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
3720		struct rbd_spec *spec = rbd_dev->parent_spec;
3721
3722		count += sprintf(&buf[count], "%s"
3723			    "pool_id %llu\npool_name %s\n"
3724			    "image_id %s\nimage_name %s\n"
3725			    "snap_id %llu\nsnap_name %s\n"
3726			    "overlap %llu\n",
3727			    !count ? "" : "\n", /* first? */
3728			    spec->pool_id, spec->pool_name,
3729			    spec->image_id, spec->image_name ?: "(unknown)",
3730			    spec->snap_id, spec->snap_name,
3731			    rbd_dev->parent_overlap);
3732	}
3733
3734	return count;
3735}
3736
3737static ssize_t rbd_image_refresh(struct device *dev,
3738				 struct device_attribute *attr,
3739				 const char *buf,
3740				 size_t size)
3741{
3742	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3743	int ret;
3744
3745	ret = rbd_dev_refresh(rbd_dev);
3746	if (ret)
3747		return ret;
3748
3749	return size;
3750}
3751
3752static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3753static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3754static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3755static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3756static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3757static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3758static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3759static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3760static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3761static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3762static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3763static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3764
3765static struct attribute *rbd_attrs[] = {
3766	&dev_attr_size.attr,
3767	&dev_attr_features.attr,
3768	&dev_attr_major.attr,
3769	&dev_attr_minor.attr,
3770	&dev_attr_client_id.attr,
3771	&dev_attr_pool.attr,
3772	&dev_attr_pool_id.attr,
3773	&dev_attr_name.attr,
3774	&dev_attr_image_id.attr,
3775	&dev_attr_current_snap.attr,
3776	&dev_attr_parent.attr,
3777	&dev_attr_refresh.attr,
3778	NULL
3779};
3780
3781static struct attribute_group rbd_attr_group = {
3782	.attrs = rbd_attrs,
3783};
3784
3785static const struct attribute_group *rbd_attr_groups[] = {
3786	&rbd_attr_group,
3787	NULL
3788};
3789
3790static void rbd_sysfs_dev_release(struct device *dev)
3791{
3792}
3793
3794static struct device_type rbd_device_type = {
3795	.name		= "rbd",
3796	.groups		= rbd_attr_groups,
3797	.release	= rbd_sysfs_dev_release,
3798};
3799
3800static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3801{
3802	kref_get(&spec->kref);
3803
3804	return spec;
3805}
3806
3807static void rbd_spec_free(struct kref *kref);
3808static void rbd_spec_put(struct rbd_spec *spec)
3809{
3810	if (spec)
3811		kref_put(&spec->kref, rbd_spec_free);
3812}
3813
3814static struct rbd_spec *rbd_spec_alloc(void)
3815{
3816	struct rbd_spec *spec;
3817
3818	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3819	if (!spec)
3820		return NULL;
3821
3822	spec->pool_id = CEPH_NOPOOL;
3823	spec->snap_id = CEPH_NOSNAP;
3824	kref_init(&spec->kref);
3825
3826	return spec;
3827}
3828
3829static void rbd_spec_free(struct kref *kref)
3830{
3831	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3832
3833	kfree(spec->pool_name);
3834	kfree(spec->image_id);
3835	kfree(spec->image_name);
3836	kfree(spec->snap_name);
3837	kfree(spec);
3838}
3839
3840static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3841				struct rbd_spec *spec)
3842{
3843	struct rbd_device *rbd_dev;
3844
3845	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3846	if (!rbd_dev)
3847		return NULL;
3848
3849	spin_lock_init(&rbd_dev->lock);
3850	rbd_dev->flags = 0;
3851	atomic_set(&rbd_dev->parent_ref, 0);
3852	INIT_LIST_HEAD(&rbd_dev->node);
3853	init_rwsem(&rbd_dev->header_rwsem);
3854
3855	rbd_dev->spec = spec;
3856	rbd_dev->rbd_client = rbdc;
3857
3858	/* Initialize the layout used for all rbd requests */
3859
3860	rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3861	rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3862	rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3863	rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3864
3865	return rbd_dev;
3866}
3867
3868static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3869{
3870	rbd_put_client(rbd_dev->rbd_client);
3871	rbd_spec_put(rbd_dev->spec);
3872	kfree(rbd_dev);
3873}
3874
3875/*
3876 * Get the size and object order for an image snapshot, or if
3877 * snap_id is CEPH_NOSNAP, gets this information for the base
3878 * image.
3879 */
3880static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3881				u8 *order, u64 *snap_size)
3882{
3883	__le64 snapid = cpu_to_le64(snap_id);
3884	int ret;
3885	struct {
3886		u8 order;
3887		__le64 size;
3888	} __attribute__ ((packed)) size_buf = { 0 };
3889
3890	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3891				"rbd", "get_size",
3892				&snapid, sizeof (snapid),
3893				&size_buf, sizeof (size_buf));
3894	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3895	if (ret < 0)
3896		return ret;
3897	if (ret < sizeof (size_buf))
3898		return -ERANGE;
3899
3900	if (order) {
3901		*order = size_buf.order;
3902		dout("  order %u", (unsigned int)*order);
3903	}
3904	*snap_size = le64_to_cpu(size_buf.size);
3905
3906	dout("  snap_id 0x%016llx snap_size = %llu\n",
3907		(unsigned long long)snap_id,
3908		(unsigned long long)*snap_size);
3909
3910	return 0;
3911}
3912
3913static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3914{
3915	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3916					&rbd_dev->header.obj_order,
3917					&rbd_dev->header.image_size);
3918}
3919
3920static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3921{
3922	void *reply_buf;
3923	int ret;
3924	void *p;
3925
3926	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3927	if (!reply_buf)
3928		return -ENOMEM;
3929
3930	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3931				"rbd", "get_object_prefix", NULL, 0,
3932				reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
3933	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3934	if (ret < 0)
3935		goto out;
3936
3937	p = reply_buf;
3938	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3939						p + ret, NULL, GFP_NOIO);
3940	ret = 0;
3941
3942	if (IS_ERR(rbd_dev->header.object_prefix)) {
3943		ret = PTR_ERR(rbd_dev->header.object_prefix);
3944		rbd_dev->header.object_prefix = NULL;
3945	} else {
3946		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3947	}
3948out:
3949	kfree(reply_buf);
3950
3951	return ret;
3952}
3953
3954static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3955		u64 *snap_features)
3956{
3957	__le64 snapid = cpu_to_le64(snap_id);
3958	struct {
3959		__le64 features;
3960		__le64 incompat;
3961	} __attribute__ ((packed)) features_buf = { 0 };
3962	u64 incompat;
3963	int ret;
3964
3965	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3966				"rbd", "get_features",
3967				&snapid, sizeof (snapid),
3968				&features_buf, sizeof (features_buf));
3969	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3970	if (ret < 0)
3971		return ret;
3972	if (ret < sizeof (features_buf))
3973		return -ERANGE;
3974
3975	incompat = le64_to_cpu(features_buf.incompat);
3976	if (incompat & ~RBD_FEATURES_SUPPORTED)
3977		return -ENXIO;
3978
3979	*snap_features = le64_to_cpu(features_buf.features);
3980
3981	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3982		(unsigned long long)snap_id,
3983		(unsigned long long)*snap_features,
3984		(unsigned long long)le64_to_cpu(features_buf.incompat));
3985
3986	return 0;
3987}
3988
3989static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3990{
3991	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3992						&rbd_dev->header.features);
3993}
3994
3995static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3996{
3997	struct rbd_spec *parent_spec;
3998	size_t size;
3999	void *reply_buf = NULL;
4000	__le64 snapid;
4001	void *p;
4002	void *end;
4003	u64 pool_id;
4004	char *image_id;
4005	u64 snap_id;
4006	u64 overlap;
4007	int ret;
4008
4009	parent_spec = rbd_spec_alloc();
4010	if (!parent_spec)
4011		return -ENOMEM;
4012
4013	size = sizeof (__le64) +				/* pool_id */
4014		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
4015		sizeof (__le64) +				/* snap_id */
4016		sizeof (__le64);				/* overlap */
4017	reply_buf = kmalloc(size, GFP_KERNEL);
4018	if (!reply_buf) {
4019		ret = -ENOMEM;
4020		goto out_err;
4021	}
4022
4023	snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4024	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4025				"rbd", "get_parent",
4026				&snapid, sizeof (snapid),
4027				reply_buf, size);
4028	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4029	if (ret < 0)
4030		goto out_err;
4031
4032	p = reply_buf;
4033	end = reply_buf + ret;
4034	ret = -ERANGE;
4035	ceph_decode_64_safe(&p, end, pool_id, out_err);
4036	if (pool_id == CEPH_NOPOOL) {
4037		/*
4038		 * Either the parent never existed, or we have
4039		 * record of it but the image got flattened so it no
4040		 * longer has a parent.  When the parent of a
4041		 * layered image disappears we immediately set the
4042		 * overlap to 0.  The effect of this is that all new
4043		 * requests will be treated as if the image had no
4044		 * parent.
4045		 */
4046		if (rbd_dev->parent_overlap) {
4047			rbd_dev->parent_overlap = 0;
4048			smp_mb();
4049			rbd_dev_parent_put(rbd_dev);
4050			pr_info("%s: clone image has been flattened\n",
4051				rbd_dev->disk->disk_name);
4052		}
4053
4054		goto out;	/* No parent?  No problem. */
4055	}
4056
4057	/* The ceph file layout needs to fit pool id in 32 bits */
4058
4059	ret = -EIO;
4060	if (pool_id > (u64)U32_MAX) {
4061		rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
4062			(unsigned long long)pool_id, U32_MAX);
4063		goto out_err;
4064	}
4065
4066	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4067	if (IS_ERR(image_id)) {
4068		ret = PTR_ERR(image_id);
4069		goto out_err;
4070	}
4071	ceph_decode_64_safe(&p, end, snap_id, out_err);
4072	ceph_decode_64_safe(&p, end, overlap, out_err);
4073
4074	/*
4075	 * The parent won't change (except when the clone is
4076	 * flattened, already handled that).  So we only need to
4077	 * record the parent spec we have not already done so.
4078	 */
4079	if (!rbd_dev->parent_spec) {
4080		parent_spec->pool_id = pool_id;
4081		parent_spec->image_id = image_id;
4082		parent_spec->snap_id = snap_id;
4083		rbd_dev->parent_spec = parent_spec;
4084		parent_spec = NULL;	/* rbd_dev now owns this */
4085	} else {
4086		kfree(image_id);
4087	}
4088
4089	/*
4090	 * We always update the parent overlap.  If it's zero we
4091	 * treat it specially.
4092	 */
4093	rbd_dev->parent_overlap = overlap;
4094	smp_mb();
4095	if (!overlap) {
4096
4097		/* A null parent_spec indicates it's the initial probe */
4098
4099		if (parent_spec) {
4100			/*
4101			 * The overlap has become zero, so the clone
4102			 * must have been resized down to 0 at some
4103			 * point.  Treat this the same as a flatten.
4104			 */
4105			rbd_dev_parent_put(rbd_dev);
4106			pr_info("%s: clone image now standalone\n",
4107				rbd_dev->disk->disk_name);
4108		} else {
4109			/*
4110			 * For the initial probe, if we find the
4111			 * overlap is zero we just pretend there was
4112			 * no parent image.
4113			 */
4114			rbd_warn(rbd_dev, "ignoring parent of "
4115						"clone with overlap 0\n");
4116		}
4117	}
4118out:
4119	ret = 0;
4120out_err:
4121	kfree(reply_buf);
4122	rbd_spec_put(parent_spec);
4123
4124	return ret;
4125}
4126
4127static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4128{
4129	struct {
4130		__le64 stripe_unit;
4131		__le64 stripe_count;
4132	} __attribute__ ((packed)) striping_info_buf = { 0 };
4133	size_t size = sizeof (striping_info_buf);
4134	void *p;
4135	u64 obj_size;
4136	u64 stripe_unit;
4137	u64 stripe_count;
4138	int ret;
4139
4140	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4141				"rbd", "get_stripe_unit_count", NULL, 0,
4142				(char *)&striping_info_buf, size);
4143	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4144	if (ret < 0)
4145		return ret;
4146	if (ret < size)
4147		return -ERANGE;
4148
4149	/*
4150	 * We don't actually support the "fancy striping" feature
4151	 * (STRIPINGV2) yet, but if the striping sizes are the
4152	 * defaults the behavior is the same as before.  So find
4153	 * out, and only fail if the image has non-default values.
4154	 */
4155	ret = -EINVAL;
4156	obj_size = (u64)1 << rbd_dev->header.obj_order;
4157	p = &striping_info_buf;
4158	stripe_unit = ceph_decode_64(&p);
4159	if (stripe_unit != obj_size) {
4160		rbd_warn(rbd_dev, "unsupported stripe unit "
4161				"(got %llu want %llu)",
4162				stripe_unit, obj_size);
4163		return -EINVAL;
4164	}
4165	stripe_count = ceph_decode_64(&p);
4166	if (stripe_count != 1) {
4167		rbd_warn(rbd_dev, "unsupported stripe count "
4168				"(got %llu want 1)", stripe_count);
4169		return -EINVAL;
4170	}
4171	rbd_dev->header.stripe_unit = stripe_unit;
4172	rbd_dev->header.stripe_count = stripe_count;
4173
4174	return 0;
4175}
4176
4177static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4178{
4179	size_t image_id_size;
4180	char *image_id;
4181	void *p;
4182	void *end;
4183	size_t size;
4184	void *reply_buf = NULL;
4185	size_t len = 0;
4186	char *image_name = NULL;
4187	int ret;
4188
4189	rbd_assert(!rbd_dev->spec->image_name);
4190
4191	len = strlen(rbd_dev->spec->image_id);
4192	image_id_size = sizeof (__le32) + len;
4193	image_id = kmalloc(image_id_size, GFP_KERNEL);
4194	if (!image_id)
4195		return NULL;
4196
4197	p = image_id;
4198	end = image_id + image_id_size;
4199	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4200
4201	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4202	reply_buf = kmalloc(size, GFP_KERNEL);
4203	if (!reply_buf)
4204		goto out;
4205
4206	ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4207				"rbd", "dir_get_name",
4208				image_id, image_id_size,
4209				reply_buf, size);
4210	if (ret < 0)
4211		goto out;
4212	p = reply_buf;
4213	end = reply_buf + ret;
4214
4215	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4216	if (IS_ERR(image_name))
4217		image_name = NULL;
4218	else
4219		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4220out:
4221	kfree(reply_buf);
4222	kfree(image_id);
4223
4224	return image_name;
4225}
4226
4227static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4228{
4229	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4230	const char *snap_name;
4231	u32 which = 0;
4232
4233	/* Skip over names until we find the one we are looking for */
4234
4235	snap_name = rbd_dev->header.snap_names;
4236	while (which < snapc->num_snaps) {
4237		if (!strcmp(name, snap_name))
4238			return snapc->snaps[which];
4239		snap_name += strlen(snap_name) + 1;
4240		which++;
4241	}
4242	return CEPH_NOSNAP;
4243}
4244
4245static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4246{
4247	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4248	u32 which;
4249	bool found = false;
4250	u64 snap_id;
4251
4252	for (which = 0; !found && which < snapc->num_snaps; which++) {
4253		const char *snap_name;
4254
4255		snap_id = snapc->snaps[which];
4256		snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4257		if (IS_ERR(snap_name)) {
4258			/* ignore no-longer existing snapshots */
4259			if (PTR_ERR(snap_name) == -ENOENT)
4260				continue;
4261			else
4262				break;
4263		}
4264		found = !strcmp(name, snap_name);
4265		kfree(snap_name);
4266	}
4267	return found ? snap_id : CEPH_NOSNAP;
4268}
4269
4270/*
4271 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4272 * no snapshot by that name is found, or if an error occurs.
4273 */
4274static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4275{
4276	if (rbd_dev->image_format == 1)
4277		return rbd_v1_snap_id_by_name(rbd_dev, name);
4278
4279	return rbd_v2_snap_id_by_name(rbd_dev, name);
4280}
4281
4282/*
4283 * An image being mapped will have everything but the snap id.
4284 */
4285static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4286{
4287	struct rbd_spec *spec = rbd_dev->spec;
4288
4289	rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4290	rbd_assert(spec->image_id && spec->image_name);
4291	rbd_assert(spec->snap_name);
4292
4293	if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4294		u64 snap_id;
4295
4296		snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4297		if (snap_id == CEPH_NOSNAP)
4298			return -ENOENT;
4299
4300		spec->snap_id = snap_id;
4301	} else {
4302		spec->snap_id = CEPH_NOSNAP;
4303	}
4304
4305	return 0;
4306}
4307
4308/*
4309 * A parent image will have all ids but none of the names.
4310 *
4311 * All names in an rbd spec are dynamically allocated.  It's OK if we
4312 * can't figure out the name for an image id.
4313 */
4314static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4315{
4316	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4317	struct rbd_spec *spec = rbd_dev->spec;
4318	const char *pool_name;
4319	const char *image_name;
4320	const char *snap_name;
4321	int ret;
4322
4323	rbd_assert(spec->pool_id != CEPH_NOPOOL);
4324	rbd_assert(spec->image_id);
4325	rbd_assert(spec->snap_id != CEPH_NOSNAP);
4326
4327	/* Get the pool name; we have to make our own copy of this */
4328
4329	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4330	if (!pool_name) {
4331		rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4332		return -EIO;
4333	}
4334	pool_name = kstrdup(pool_name, GFP_KERNEL);
4335	if (!pool_name)
4336		return -ENOMEM;
4337
4338	/* Fetch the image name; tolerate failure here */
4339
4340	image_name = rbd_dev_image_name(rbd_dev);
4341	if (!image_name)
4342		rbd_warn(rbd_dev, "unable to get image name");
4343
4344	/* Fetch the snapshot name */
4345
4346	snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4347	if (IS_ERR(snap_name)) {
4348		ret = PTR_ERR(snap_name);
4349		goto out_err;
4350	}
4351
4352	spec->pool_name = pool_name;
4353	spec->image_name = image_name;
4354	spec->snap_name = snap_name;
4355
4356	return 0;
4357
4358out_err:
4359	kfree(image_name);
4360	kfree(pool_name);
4361	return ret;
4362}
4363
4364static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4365{
4366	size_t size;
4367	int ret;
4368	void *reply_buf;
4369	void *p;
4370	void *end;
4371	u64 seq;
4372	u32 snap_count;
4373	struct ceph_snap_context *snapc;
4374	u32 i;
4375
4376	/*
4377	 * We'll need room for the seq value (maximum snapshot id),
4378	 * snapshot count, and array of that many snapshot ids.
4379	 * For now we have a fixed upper limit on the number we're
4380	 * prepared to receive.
4381	 */
4382	size = sizeof (__le64) + sizeof (__le32) +
4383			RBD_MAX_SNAP_COUNT * sizeof (__le64);
4384	reply_buf = kzalloc(size, GFP_KERNEL);
4385	if (!reply_buf)
4386		return -ENOMEM;
4387
4388	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4389				"rbd", "get_snapcontext", NULL, 0,
4390				reply_buf, size);
4391	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4392	if (ret < 0)
4393		goto out;
4394
4395	p = reply_buf;
4396	end = reply_buf + ret;
4397	ret = -ERANGE;
4398	ceph_decode_64_safe(&p, end, seq, out);
4399	ceph_decode_32_safe(&p, end, snap_count, out);
4400
4401	/*
4402	 * Make sure the reported number of snapshot ids wouldn't go
4403	 * beyond the end of our buffer.  But before checking that,
4404	 * make sure the computed size of the snapshot context we
4405	 * allocate is representable in a size_t.
4406	 */
4407	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4408				 / sizeof (u64)) {
4409		ret = -EINVAL;
4410		goto out;
4411	}
4412	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4413		goto out;
4414	ret = 0;
4415
4416	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4417	if (!snapc) {
4418		ret = -ENOMEM;
4419		goto out;
4420	}
4421	snapc->seq = seq;
4422	for (i = 0; i < snap_count; i++)
4423		snapc->snaps[i] = ceph_decode_64(&p);
4424
4425	ceph_put_snap_context(rbd_dev->header.snapc);
4426	rbd_dev->header.snapc = snapc;
4427
4428	dout("  snap context seq = %llu, snap_count = %u\n",
4429		(unsigned long long)seq, (unsigned int)snap_count);
4430out:
4431	kfree(reply_buf);
4432
4433	return ret;
4434}
4435
4436static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4437					u64 snap_id)
4438{
4439	size_t size;
4440	void *reply_buf;
4441	__le64 snapid;
4442	int ret;
4443	void *p;
4444	void *end;
4445	char *snap_name;
4446
4447	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4448	reply_buf = kmalloc(size, GFP_KERNEL);
4449	if (!reply_buf)
4450		return ERR_PTR(-ENOMEM);
4451
4452	snapid = cpu_to_le64(snap_id);
4453	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4454				"rbd", "get_snapshot_name",
4455				&snapid, sizeof (snapid),
4456				reply_buf, size);
4457	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4458	if (ret < 0) {
4459		snap_name = ERR_PTR(ret);
4460		goto out;
4461	}
4462
4463	p = reply_buf;
4464	end = reply_buf + ret;
4465	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4466	if (IS_ERR(snap_name))
4467		goto out;
4468
4469	dout("  snap_id 0x%016llx snap_name = %s\n",
4470		(unsigned long long)snap_id, snap_name);
4471out:
4472	kfree(reply_buf);
4473
4474	return snap_name;
4475}
4476
4477static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4478{
4479	bool first_time = rbd_dev->header.object_prefix == NULL;
4480	int ret;
4481
4482	ret = rbd_dev_v2_image_size(rbd_dev);
4483	if (ret)
4484		return ret;
4485
4486	if (first_time) {
4487		ret = rbd_dev_v2_header_onetime(rbd_dev);
4488		if (ret)
4489			return ret;
4490	}
4491
4492	ret = rbd_dev_v2_snap_context(rbd_dev);
4493	dout("rbd_dev_v2_snap_context returned %d\n", ret);
4494
4495	return ret;
4496}
4497
4498static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4499{
4500	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4501
4502	if (rbd_dev->image_format == 1)
4503		return rbd_dev_v1_header_info(rbd_dev);
4504
4505	return rbd_dev_v2_header_info(rbd_dev);
4506}
4507
4508static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4509{
4510	struct device *dev;
4511	int ret;
4512
4513	dev = &rbd_dev->dev;
4514	dev->bus = &rbd_bus_type;
4515	dev->type = &rbd_device_type;
4516	dev->parent = &rbd_root_dev;
4517	dev->release = rbd_dev_device_release;
4518	dev_set_name(dev, "%d", rbd_dev->dev_id);
4519	ret = device_register(dev);
4520
4521	return ret;
4522}
4523
4524static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4525{
4526	device_unregister(&rbd_dev->dev);
4527}
4528
4529/*
4530 * Get a unique rbd identifier for the given new rbd_dev, and add
4531 * the rbd_dev to the global list.
4532 */
4533static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4534{
4535	int new_dev_id;
4536
4537	new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4538				    0, minor_to_rbd_dev_id(1 << MINORBITS),
4539				    GFP_KERNEL);
4540	if (new_dev_id < 0)
4541		return new_dev_id;
4542
4543	rbd_dev->dev_id = new_dev_id;
4544
4545	spin_lock(&rbd_dev_list_lock);
4546	list_add_tail(&rbd_dev->node, &rbd_dev_list);
4547	spin_unlock(&rbd_dev_list_lock);
4548
4549	dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4550
4551	return 0;
4552}
4553
4554/*
4555 * Remove an rbd_dev from the global list, and record that its
4556 * identifier is no longer in use.
4557 */
4558static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4559{
4560	spin_lock(&rbd_dev_list_lock);
4561	list_del_init(&rbd_dev->node);
4562	spin_unlock(&rbd_dev_list_lock);
4563
4564	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4565
4566	dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4567}
4568
4569/*
4570 * Skips over white space at *buf, and updates *buf to point to the
4571 * first found non-space character (if any). Returns the length of
4572 * the token (string of non-white space characters) found.  Note
4573 * that *buf must be terminated with '\0'.
4574 */
4575static inline size_t next_token(const char **buf)
4576{
4577        /*
4578        * These are the characters that produce nonzero for
4579        * isspace() in the "C" and "POSIX" locales.
4580        */
4581        const char *spaces = " \f\n\r\t\v";
4582
4583        *buf += strspn(*buf, spaces);	/* Find start of token */
4584
4585	return strcspn(*buf, spaces);   /* Return token length */
4586}
4587
4588/*
4589 * Finds the next token in *buf, and if the provided token buffer is
4590 * big enough, copies the found token into it.  The result, if
4591 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4592 * must be terminated with '\0' on entry.
4593 *
4594 * Returns the length of the token found (not including the '\0').
4595 * Return value will be 0 if no token is found, and it will be >=
4596 * token_size if the token would not fit.
4597 *
4598 * The *buf pointer will be updated to point beyond the end of the
4599 * found token.  Note that this occurs even if the token buffer is
4600 * too small to hold it.
4601 */
4602static inline size_t copy_token(const char **buf,
4603				char *token,
4604				size_t token_size)
4605{
4606        size_t len;
4607
4608	len = next_token(buf);
4609	if (len < token_size) {
4610		memcpy(token, *buf, len);
4611		*(token + len) = '\0';
4612	}
4613	*buf += len;
4614
4615        return len;
4616}
4617
4618/*
4619 * Finds the next token in *buf, dynamically allocates a buffer big
4620 * enough to hold a copy of it, and copies the token into the new
4621 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4622 * that a duplicate buffer is created even for a zero-length token.
4623 *
4624 * Returns a pointer to the newly-allocated duplicate, or a null
4625 * pointer if memory for the duplicate was not available.  If
4626 * the lenp argument is a non-null pointer, the length of the token
4627 * (not including the '\0') is returned in *lenp.
4628 *
4629 * If successful, the *buf pointer will be updated to point beyond
4630 * the end of the found token.
4631 *
4632 * Note: uses GFP_KERNEL for allocation.
4633 */
4634static inline char *dup_token(const char **buf, size_t *lenp)
4635{
4636	char *dup;
4637	size_t len;
4638
4639	len = next_token(buf);
4640	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4641	if (!dup)
4642		return NULL;
4643	*(dup + len) = '\0';
4644	*buf += len;
4645
4646	if (lenp)
4647		*lenp = len;
4648
4649	return dup;
4650}
4651
4652/*
4653 * Parse the options provided for an "rbd add" (i.e., rbd image
4654 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4655 * and the data written is passed here via a NUL-terminated buffer.
4656 * Returns 0 if successful or an error code otherwise.
4657 *
4658 * The information extracted from these options is recorded in
4659 * the other parameters which return dynamically-allocated
4660 * structures:
4661 *  ceph_opts
4662 *      The address of a pointer that will refer to a ceph options
4663 *      structure.  Caller must release the returned pointer using
4664 *      ceph_destroy_options() when it is no longer needed.
4665 *  rbd_opts
4666 *	Address of an rbd options pointer.  Fully initialized by
4667 *	this function; caller must release with kfree().
4668 *  spec
4669 *	Address of an rbd image specification pointer.  Fully
4670 *	initialized by this function based on parsed options.
4671 *	Caller must release with rbd_spec_put().
4672 *
4673 * The options passed take this form:
4674 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4675 * where:
4676 *  <mon_addrs>
4677 *      A comma-separated list of one or more monitor addresses.
4678 *      A monitor address is an ip address, optionally followed
4679 *      by a port number (separated by a colon).
4680 *        I.e.:  ip1[:port1][,ip2[:port2]...]
4681 *  <options>
4682 *      A comma-separated list of ceph and/or rbd options.
4683 *  <pool_name>
4684 *      The name of the rados pool containing the rbd image.
4685 *  <image_name>
4686 *      The name of the image in that pool to map.
4687 *  <snap_id>
4688 *      An optional snapshot id.  If provided, the mapping will
4689 *      present data from the image at the time that snapshot was
4690 *      created.  The image head is used if no snapshot id is
4691 *      provided.  Snapshot mappings are always read-only.
4692 */
4693static int rbd_add_parse_args(const char *buf,
4694				struct ceph_options **ceph_opts,
4695				struct rbd_options **opts,
4696				struct rbd_spec **rbd_spec)
4697{
4698	size_t len;
4699	char *options;
4700	const char *mon_addrs;
4701	char *snap_name;
4702	size_t mon_addrs_size;
4703	struct rbd_spec *spec = NULL;
4704	struct rbd_options *rbd_opts = NULL;
4705	struct ceph_options *copts;
4706	int ret;
4707
4708	/* The first four tokens are required */
4709
4710	len = next_token(&buf);
4711	if (!len) {
4712		rbd_warn(NULL, "no monitor address(es) provided");
4713		return -EINVAL;
4714	}
4715	mon_addrs = buf;
4716	mon_addrs_size = len + 1;
4717	buf += len;
4718
4719	ret = -EINVAL;
4720	options = dup_token(&buf, NULL);
4721	if (!options)
4722		return -ENOMEM;
4723	if (!*options) {
4724		rbd_warn(NULL, "no options provided");
4725		goto out_err;
4726	}
4727
4728	spec = rbd_spec_alloc();
4729	if (!spec)
4730		goto out_mem;
4731
4732	spec->pool_name = dup_token(&buf, NULL);
4733	if (!spec->pool_name)
4734		goto out_mem;
4735	if (!*spec->pool_name) {
4736		rbd_warn(NULL, "no pool name provided");
4737		goto out_err;
4738	}
4739
4740	spec->image_name = dup_token(&buf, NULL);
4741	if (!spec->image_name)
4742		goto out_mem;
4743	if (!*spec->image_name) {
4744		rbd_warn(NULL, "no image name provided");
4745		goto out_err;
4746	}
4747
4748	/*
4749	 * Snapshot name is optional; default is to use "-"
4750	 * (indicating the head/no snapshot).
4751	 */
4752	len = next_token(&buf);
4753	if (!len) {
4754		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4755		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4756	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
4757		ret = -ENAMETOOLONG;
4758		goto out_err;
4759	}
4760	snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4761	if (!snap_name)
4762		goto out_mem;
4763	*(snap_name + len) = '\0';
4764	spec->snap_name = snap_name;
4765
4766	/* Initialize all rbd options to the defaults */
4767
4768	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4769	if (!rbd_opts)
4770		goto out_mem;
4771
4772	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4773
4774	copts = ceph_parse_options(options, mon_addrs,
4775					mon_addrs + mon_addrs_size - 1,
4776					parse_rbd_opts_token, rbd_opts);
4777	if (IS_ERR(copts)) {
4778		ret = PTR_ERR(copts);
4779		goto out_err;
4780	}
4781	kfree(options);
4782
4783	*ceph_opts = copts;
4784	*opts = rbd_opts;
4785	*rbd_spec = spec;
4786
4787	return 0;
4788out_mem:
4789	ret = -ENOMEM;
4790out_err:
4791	kfree(rbd_opts);
4792	rbd_spec_put(spec);
4793	kfree(options);
4794
4795	return ret;
4796}
4797
4798/*
4799 * Return pool id (>= 0) or a negative error code.
4800 */
4801static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
4802{
4803	u64 newest_epoch;
4804	unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
4805	int tries = 0;
4806	int ret;
4807
4808again:
4809	ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
4810	if (ret == -ENOENT && tries++ < 1) {
4811		ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
4812					       &newest_epoch);
4813		if (ret < 0)
4814			return ret;
4815
4816		if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
4817			ceph_monc_request_next_osdmap(&rbdc->client->monc);
4818			(void) ceph_monc_wait_osdmap(&rbdc->client->monc,
4819						     newest_epoch, timeout);
4820			goto again;
4821		} else {
4822			/* the osdmap we have is new enough */
4823			return -ENOENT;
4824		}
4825	}
4826
4827	return ret;
4828}
4829
4830/*
4831 * An rbd format 2 image has a unique identifier, distinct from the
4832 * name given to it by the user.  Internally, that identifier is
4833 * what's used to specify the names of objects related to the image.
4834 *
4835 * A special "rbd id" object is used to map an rbd image name to its
4836 * id.  If that object doesn't exist, then there is no v2 rbd image
4837 * with the supplied name.
4838 *
4839 * This function will record the given rbd_dev's image_id field if
4840 * it can be determined, and in that case will return 0.  If any
4841 * errors occur a negative errno will be returned and the rbd_dev's
4842 * image_id field will be unchanged (and should be NULL).
4843 */
4844static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4845{
4846	int ret;
4847	size_t size;
4848	char *object_name;
4849	void *response;
4850	char *image_id;
4851
4852	/*
4853	 * When probing a parent image, the image id is already
4854	 * known (and the image name likely is not).  There's no
4855	 * need to fetch the image id again in this case.  We
4856	 * do still need to set the image format though.
4857	 */
4858	if (rbd_dev->spec->image_id) {
4859		rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4860
4861		return 0;
4862	}
4863
4864	/*
4865	 * First, see if the format 2 image id file exists, and if
4866	 * so, get the image's persistent id from it.
4867	 */
4868	size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4869	object_name = kmalloc(size, GFP_NOIO);
4870	if (!object_name)
4871		return -ENOMEM;
4872	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4873	dout("rbd id object name is %s\n", object_name);
4874
4875	/* Response will be an encoded string, which includes a length */
4876
4877	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4878	response = kzalloc(size, GFP_NOIO);
4879	if (!response) {
4880		ret = -ENOMEM;
4881		goto out;
4882	}
4883
4884	/* If it doesn't exist we'll assume it's a format 1 image */
4885
4886	ret = rbd_obj_method_sync(rbd_dev, object_name,
4887				"rbd", "get_id", NULL, 0,
4888				response, RBD_IMAGE_ID_LEN_MAX);
4889	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4890	if (ret == -ENOENT) {
4891		image_id = kstrdup("", GFP_KERNEL);
4892		ret = image_id ? 0 : -ENOMEM;
4893		if (!ret)
4894			rbd_dev->image_format = 1;
4895	} else if (ret > sizeof (__le32)) {
4896		void *p = response;
4897
4898		image_id = ceph_extract_encoded_string(&p, p + ret,
4899						NULL, GFP_NOIO);
4900		ret = PTR_ERR_OR_ZERO(image_id);
4901		if (!ret)
4902			rbd_dev->image_format = 2;
4903	} else {
4904		ret = -EINVAL;
4905	}
4906
4907	if (!ret) {
4908		rbd_dev->spec->image_id = image_id;
4909		dout("image_id is %s\n", image_id);
4910	}
4911out:
4912	kfree(response);
4913	kfree(object_name);
4914
4915	return ret;
4916}
4917
4918/*
4919 * Undo whatever state changes are made by v1 or v2 header info
4920 * call.
4921 */
4922static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4923{
4924	struct rbd_image_header	*header;
4925
4926	/* Drop parent reference unless it's already been done (or none) */
4927
4928	if (rbd_dev->parent_overlap)
4929		rbd_dev_parent_put(rbd_dev);
4930
4931	/* Free dynamic fields from the header, then zero it out */
4932
4933	header = &rbd_dev->header;
4934	ceph_put_snap_context(header->snapc);
4935	kfree(header->snap_sizes);
4936	kfree(header->snap_names);
4937	kfree(header->object_prefix);
4938	memset(header, 0, sizeof (*header));
4939}
4940
4941static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
4942{
4943	int ret;
4944
4945	ret = rbd_dev_v2_object_prefix(rbd_dev);
4946	if (ret)
4947		goto out_err;
4948
4949	/*
4950	 * Get the and check features for the image.  Currently the
4951	 * features are assumed to never change.
4952	 */
4953	ret = rbd_dev_v2_features(rbd_dev);
4954	if (ret)
4955		goto out_err;
4956
4957	/* If the image supports fancy striping, get its parameters */
4958
4959	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4960		ret = rbd_dev_v2_striping_info(rbd_dev);
4961		if (ret < 0)
4962			goto out_err;
4963	}
4964	/* No support for crypto and compression type format 2 images */
4965
4966	return 0;
4967out_err:
4968	rbd_dev->header.features = 0;
4969	kfree(rbd_dev->header.object_prefix);
4970	rbd_dev->header.object_prefix = NULL;
4971
4972	return ret;
4973}
4974
4975static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
4976{
4977	struct rbd_device *parent = NULL;
4978	struct rbd_spec *parent_spec;
4979	struct rbd_client *rbdc;
4980	int ret;
4981
4982	if (!rbd_dev->parent_spec)
4983		return 0;
4984	/*
4985	 * We need to pass a reference to the client and the parent
4986	 * spec when creating the parent rbd_dev.  Images related by
4987	 * parent/child relationships always share both.
4988	 */
4989	parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4990	rbdc = __rbd_get_client(rbd_dev->rbd_client);
4991
4992	ret = -ENOMEM;
4993	parent = rbd_dev_create(rbdc, parent_spec);
4994	if (!parent)
4995		goto out_err;
4996
4997	ret = rbd_dev_image_probe(parent, false);
4998	if (ret < 0)
4999		goto out_err;
5000	rbd_dev->parent = parent;
5001	atomic_set(&rbd_dev->parent_ref, 1);
5002
5003	return 0;
5004out_err:
5005	if (parent) {
5006		rbd_dev_unparent(rbd_dev);
5007		kfree(rbd_dev->header_name);
5008		rbd_dev_destroy(parent);
5009	} else {
5010		rbd_put_client(rbdc);
5011		rbd_spec_put(parent_spec);
5012	}
5013
5014	return ret;
5015}
5016
5017static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5018{
5019	int ret;
5020
5021	/* Get an id and fill in device name. */
5022
5023	ret = rbd_dev_id_get(rbd_dev);
5024	if (ret)
5025		return ret;
5026
5027	BUILD_BUG_ON(DEV_NAME_LEN
5028			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
5029	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
5030
5031	/* Record our major and minor device numbers. */
5032
5033	if (!single_major) {
5034		ret = register_blkdev(0, rbd_dev->name);
5035		if (ret < 0)
5036			goto err_out_id;
5037
5038		rbd_dev->major = ret;
5039		rbd_dev->minor = 0;
5040	} else {
5041		rbd_dev->major = rbd_major;
5042		rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5043	}
5044
5045	/* Set up the blkdev mapping. */
5046
5047	ret = rbd_init_disk(rbd_dev);
5048	if (ret)
5049		goto err_out_blkdev;
5050
5051	ret = rbd_dev_mapping_set(rbd_dev);
5052	if (ret)
5053		goto err_out_disk;
5054	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5055	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5056
5057	ret = rbd_bus_add_dev(rbd_dev);
5058	if (ret)
5059		goto err_out_mapping;
5060
5061	/* Everything's ready.  Announce the disk to the world. */
5062
5063	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5064	add_disk(rbd_dev->disk);
5065
5066	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
5067		(unsigned long long) rbd_dev->mapping.size);
5068
5069	return ret;
5070
5071err_out_mapping:
5072	rbd_dev_mapping_clear(rbd_dev);
5073err_out_disk:
5074	rbd_free_disk(rbd_dev);
5075err_out_blkdev:
5076	if (!single_major)
5077		unregister_blkdev(rbd_dev->major, rbd_dev->name);
5078err_out_id:
5079	rbd_dev_id_put(rbd_dev);
5080	rbd_dev_mapping_clear(rbd_dev);
5081
5082	return ret;
5083}
5084
5085static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5086{
5087	struct rbd_spec *spec = rbd_dev->spec;
5088	size_t size;
5089
5090	/* Record the header object name for this rbd image. */
5091
5092	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5093
5094	if (rbd_dev->image_format == 1)
5095		size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
5096	else
5097		size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
5098
5099	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
5100	if (!rbd_dev->header_name)
5101		return -ENOMEM;
5102
5103	if (rbd_dev->image_format == 1)
5104		sprintf(rbd_dev->header_name, "%s%s",
5105			spec->image_name, RBD_SUFFIX);
5106	else
5107		sprintf(rbd_dev->header_name, "%s%s",
5108			RBD_HEADER_PREFIX, spec->image_id);
5109	return 0;
5110}
5111
5112static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5113{
5114	rbd_dev_unprobe(rbd_dev);
5115	kfree(rbd_dev->header_name);
5116	rbd_dev->header_name = NULL;
5117	rbd_dev->image_format = 0;
5118	kfree(rbd_dev->spec->image_id);
5119	rbd_dev->spec->image_id = NULL;
5120
5121	rbd_dev_destroy(rbd_dev);
5122}
5123
5124/*
5125 * Probe for the existence of the header object for the given rbd
5126 * device.  If this image is the one being mapped (i.e., not a
5127 * parent), initiate a watch on its header object before using that
5128 * object to get detailed information about the rbd image.
5129 */
5130static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5131{
5132	int ret;
5133
5134	/*
5135	 * Get the id from the image id object.  Unless there's an
5136	 * error, rbd_dev->spec->image_id will be filled in with
5137	 * a dynamically-allocated string, and rbd_dev->image_format
5138	 * will be set to either 1 or 2.
5139	 */
5140	ret = rbd_dev_image_id(rbd_dev);
5141	if (ret)
5142		return ret;
5143
5144	ret = rbd_dev_header_name(rbd_dev);
5145	if (ret)
5146		goto err_out_format;
5147
5148	if (mapping) {
5149		ret = rbd_dev_header_watch_sync(rbd_dev);
5150		if (ret)
5151			goto out_header_name;
5152	}
5153
5154	ret = rbd_dev_header_info(rbd_dev);
5155	if (ret)
5156		goto err_out_watch;
5157
5158	/*
5159	 * If this image is the one being mapped, we have pool name and
5160	 * id, image name and id, and snap name - need to fill snap id.
5161	 * Otherwise this is a parent image, identified by pool, image
5162	 * and snap ids - need to fill in names for those ids.
5163	 */
5164	if (mapping)
5165		ret = rbd_spec_fill_snap_id(rbd_dev);
5166	else
5167		ret = rbd_spec_fill_names(rbd_dev);
5168	if (ret)
5169		goto err_out_probe;
5170
5171	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5172		ret = rbd_dev_v2_parent_info(rbd_dev);
5173		if (ret)
5174			goto err_out_probe;
5175
5176		/*
5177		 * Need to warn users if this image is the one being
5178		 * mapped and has a parent.
5179		 */
5180		if (mapping && rbd_dev->parent_spec)
5181			rbd_warn(rbd_dev,
5182				 "WARNING: kernel layering is EXPERIMENTAL!");
5183	}
5184
5185	ret = rbd_dev_probe_parent(rbd_dev);
5186	if (ret)
5187		goto err_out_probe;
5188
5189	dout("discovered format %u image, header name is %s\n",
5190		rbd_dev->image_format, rbd_dev->header_name);
5191	return 0;
5192
5193err_out_probe:
5194	rbd_dev_unprobe(rbd_dev);
5195err_out_watch:
5196	if (mapping)
5197		rbd_dev_header_unwatch_sync(rbd_dev);
5198out_header_name:
5199	kfree(rbd_dev->header_name);
5200	rbd_dev->header_name = NULL;
5201err_out_format:
5202	rbd_dev->image_format = 0;
5203	kfree(rbd_dev->spec->image_id);
5204	rbd_dev->spec->image_id = NULL;
5205	return ret;
5206}
5207
5208static ssize_t do_rbd_add(struct bus_type *bus,
5209			  const char *buf,
5210			  size_t count)
5211{
5212	struct rbd_device *rbd_dev = NULL;
5213	struct ceph_options *ceph_opts = NULL;
5214	struct rbd_options *rbd_opts = NULL;
5215	struct rbd_spec *spec = NULL;
5216	struct rbd_client *rbdc;
5217	bool read_only;
5218	int rc = -ENOMEM;
5219
5220	if (!try_module_get(THIS_MODULE))
5221		return -ENODEV;
5222
5223	/* parse add command */
5224	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5225	if (rc < 0)
5226		goto err_out_module;
5227	read_only = rbd_opts->read_only;
5228	kfree(rbd_opts);
5229	rbd_opts = NULL;	/* done with this */
5230
5231	rbdc = rbd_get_client(ceph_opts);
5232	if (IS_ERR(rbdc)) {
5233		rc = PTR_ERR(rbdc);
5234		goto err_out_args;
5235	}
5236
5237	/* pick the pool */
5238	rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
5239	if (rc < 0)
5240		goto err_out_client;
5241	spec->pool_id = (u64)rc;
5242
5243	/* The ceph file layout needs to fit pool id in 32 bits */
5244
5245	if (spec->pool_id > (u64)U32_MAX) {
5246		rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5247				(unsigned long long)spec->pool_id, U32_MAX);
5248		rc = -EIO;
5249		goto err_out_client;
5250	}
5251
5252	rbd_dev = rbd_dev_create(rbdc, spec);
5253	if (!rbd_dev)
5254		goto err_out_client;
5255	rbdc = NULL;		/* rbd_dev now owns this */
5256	spec = NULL;		/* rbd_dev now owns this */
5257
5258	rc = rbd_dev_image_probe(rbd_dev, true);
5259	if (rc < 0)
5260		goto err_out_rbd_dev;
5261
5262	/* If we are mapping a snapshot it must be marked read-only */
5263
5264	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5265		read_only = true;
5266	rbd_dev->mapping.read_only = read_only;
5267
5268	rc = rbd_dev_device_setup(rbd_dev);
5269	if (rc) {
5270		/*
5271		 * rbd_dev_header_unwatch_sync() can't be moved into
5272		 * rbd_dev_image_release() without refactoring, see
5273		 * commit 1f3ef78861ac.
5274		 */
5275		rbd_dev_header_unwatch_sync(rbd_dev);
5276		rbd_dev_image_release(rbd_dev);
5277		goto err_out_module;
5278	}
5279
5280	return count;
5281
5282err_out_rbd_dev:
5283	rbd_dev_destroy(rbd_dev);
5284err_out_client:
5285	rbd_put_client(rbdc);
5286err_out_args:
5287	rbd_spec_put(spec);
5288err_out_module:
5289	module_put(THIS_MODULE);
5290
5291	dout("Error adding device %s\n", buf);
5292
5293	return (ssize_t)rc;
5294}
5295
5296static ssize_t rbd_add(struct bus_type *bus,
5297		       const char *buf,
5298		       size_t count)
5299{
5300	if (single_major)
5301		return -EINVAL;
5302
5303	return do_rbd_add(bus, buf, count);
5304}
5305
5306static ssize_t rbd_add_single_major(struct bus_type *bus,
5307				    const char *buf,
5308				    size_t count)
5309{
5310	return do_rbd_add(bus, buf, count);
5311}
5312
5313static void rbd_dev_device_release(struct device *dev)
5314{
5315	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5316
5317	rbd_free_disk(rbd_dev);
5318	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5319	rbd_dev_mapping_clear(rbd_dev);
5320	if (!single_major)
5321		unregister_blkdev(rbd_dev->major, rbd_dev->name);
5322	rbd_dev_id_put(rbd_dev);
5323	rbd_dev_mapping_clear(rbd_dev);
5324}
5325
5326static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5327{
5328	while (rbd_dev->parent) {
5329		struct rbd_device *first = rbd_dev;
5330		struct rbd_device *second = first->parent;
5331		struct rbd_device *third;
5332
5333		/*
5334		 * Follow to the parent with no grandparent and
5335		 * remove it.
5336		 */
5337		while (second && (third = second->parent)) {
5338			first = second;
5339			second = third;
5340		}
5341		rbd_assert(second);
5342		rbd_dev_image_release(second);
5343		first->parent = NULL;
5344		first->parent_overlap = 0;
5345
5346		rbd_assert(first->parent_spec);
5347		rbd_spec_put(first->parent_spec);
5348		first->parent_spec = NULL;
5349	}
5350}
5351
5352static ssize_t do_rbd_remove(struct bus_type *bus,
5353			     const char *buf,
5354			     size_t count)
5355{
5356	struct rbd_device *rbd_dev = NULL;
5357	struct list_head *tmp;
5358	int dev_id;
5359	unsigned long ul;
5360	bool already = false;
5361	int ret;
5362
5363	ret = kstrtoul(buf, 10, &ul);
5364	if (ret)
5365		return ret;
5366
5367	/* convert to int; abort if we lost anything in the conversion */
5368	dev_id = (int)ul;
5369	if (dev_id != ul)
5370		return -EINVAL;
5371
5372	ret = -ENOENT;
5373	spin_lock(&rbd_dev_list_lock);
5374	list_for_each(tmp, &rbd_dev_list) {
5375		rbd_dev = list_entry(tmp, struct rbd_device, node);
5376		if (rbd_dev->dev_id == dev_id) {
5377			ret = 0;
5378			break;
5379		}
5380	}
5381	if (!ret) {
5382		spin_lock_irq(&rbd_dev->lock);
5383		if (rbd_dev->open_count)
5384			ret = -EBUSY;
5385		else
5386			already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5387							&rbd_dev->flags);
5388		spin_unlock_irq(&rbd_dev->lock);
5389	}
5390	spin_unlock(&rbd_dev_list_lock);
5391	if (ret < 0 || already)
5392		return ret;
5393
5394	rbd_dev_header_unwatch_sync(rbd_dev);
5395	/*
5396	 * flush remaining watch callbacks - these must be complete
5397	 * before the osd_client is shutdown
5398	 */
5399	dout("%s: flushing notifies", __func__);
5400	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5401
5402	/*
5403	 * Don't free anything from rbd_dev->disk until after all
5404	 * notifies are completely processed. Otherwise
5405	 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5406	 * in a potential use after free of rbd_dev->disk or rbd_dev.
5407	 */
5408	rbd_bus_del_dev(rbd_dev);
5409	rbd_dev_image_release(rbd_dev);
5410	module_put(THIS_MODULE);
5411
5412	return count;
5413}
5414
5415static ssize_t rbd_remove(struct bus_type *bus,
5416			  const char *buf,
5417			  size_t count)
5418{
5419	if (single_major)
5420		return -EINVAL;
5421
5422	return do_rbd_remove(bus, buf, count);
5423}
5424
5425static ssize_t rbd_remove_single_major(struct bus_type *bus,
5426				       const char *buf,
5427				       size_t count)
5428{
5429	return do_rbd_remove(bus, buf, count);
5430}
5431
5432/*
5433 * create control files in sysfs
5434 * /sys/bus/rbd/...
5435 */
5436static int rbd_sysfs_init(void)
5437{
5438	int ret;
5439
5440	ret = device_register(&rbd_root_dev);
5441	if (ret < 0)
5442		return ret;
5443
5444	ret = bus_register(&rbd_bus_type);
5445	if (ret < 0)
5446		device_unregister(&rbd_root_dev);
5447
5448	return ret;
5449}
5450
5451static void rbd_sysfs_cleanup(void)
5452{
5453	bus_unregister(&rbd_bus_type);
5454	device_unregister(&rbd_root_dev);
5455}
5456
5457static int rbd_slab_init(void)
5458{
5459	rbd_assert(!rbd_img_request_cache);
5460	rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5461					sizeof (struct rbd_img_request),
5462					__alignof__(struct rbd_img_request),
5463					0, NULL);
5464	if (!rbd_img_request_cache)
5465		return -ENOMEM;
5466
5467	rbd_assert(!rbd_obj_request_cache);
5468	rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5469					sizeof (struct rbd_obj_request),
5470					__alignof__(struct rbd_obj_request),
5471					0, NULL);
5472	if (!rbd_obj_request_cache)
5473		goto out_err;
5474
5475	rbd_assert(!rbd_segment_name_cache);
5476	rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5477					CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5478	if (rbd_segment_name_cache)
5479		return 0;
5480out_err:
5481	if (rbd_obj_request_cache) {
5482		kmem_cache_destroy(rbd_obj_request_cache);
5483		rbd_obj_request_cache = NULL;
5484	}
5485
5486	kmem_cache_destroy(rbd_img_request_cache);
5487	rbd_img_request_cache = NULL;
5488
5489	return -ENOMEM;
5490}
5491
5492static void rbd_slab_exit(void)
5493{
5494	rbd_assert(rbd_segment_name_cache);
5495	kmem_cache_destroy(rbd_segment_name_cache);
5496	rbd_segment_name_cache = NULL;
5497
5498	rbd_assert(rbd_obj_request_cache);
5499	kmem_cache_destroy(rbd_obj_request_cache);
5500	rbd_obj_request_cache = NULL;
5501
5502	rbd_assert(rbd_img_request_cache);
5503	kmem_cache_destroy(rbd_img_request_cache);
5504	rbd_img_request_cache = NULL;
5505}
5506
5507static int __init rbd_init(void)
5508{
5509	int rc;
5510
5511	if (!libceph_compatible(NULL)) {
5512		rbd_warn(NULL, "libceph incompatibility (quitting)");
5513		return -EINVAL;
5514	}
5515
5516	rc = rbd_slab_init();
5517	if (rc)
5518		return rc;
5519
5520	if (single_major) {
5521		rbd_major = register_blkdev(0, RBD_DRV_NAME);
5522		if (rbd_major < 0) {
5523			rc = rbd_major;
5524			goto err_out_slab;
5525		}
5526	}
5527
5528	rc = rbd_sysfs_init();
5529	if (rc)
5530		goto err_out_blkdev;
5531
5532	if (single_major)
5533		pr_info("loaded (major %d)\n", rbd_major);
5534	else
5535		pr_info("loaded\n");
5536
5537	return 0;
5538
5539err_out_blkdev:
5540	if (single_major)
5541		unregister_blkdev(rbd_major, RBD_DRV_NAME);
5542err_out_slab:
5543	rbd_slab_exit();
5544	return rc;
5545}
5546
5547static void __exit rbd_exit(void)
5548{
5549	ida_destroy(&rbd_dev_id_ida);
5550	rbd_sysfs_cleanup();
5551	if (single_major)
5552		unregister_blkdev(rbd_major, RBD_DRV_NAME);
5553	rbd_slab_exit();
5554}
5555
5556module_init(rbd_init);
5557module_exit(rbd_exit);
5558
5559MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5560MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5561MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5562/* following authorship retained from original osdblk.c */
5563MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5564
5565MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5566MODULE_LICENSE("GPL");
5567