1
2/*
3   rbd.c -- Export ceph rados objects as a Linux block device
4
5
6   based on drivers/block/osdblk.c:
7
8   Copyright 2009 Red Hat, Inc.
9
10   This program is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation.
13
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License for more details.
18
19   You should have received a copy of the GNU General Public License
20   along with this program; see the file COPYING.  If not, write to
21   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
25   For usage instructions, please refer to:
26
27                 Documentation/ABI/testing/sysfs-bus-rbd
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
35#include <linux/parser.h>
36#include <linux/bsearch.h>
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
43#include <linux/slab.h>
44#include <linux/idr.h>
45#include <linux/workqueue.h>
46
47#include "rbd_types.h"
48
49#define RBD_DEBUG	/* Activate rbd_assert() calls */
50
51/*
52 * The basic unit of block I/O is a sector.  It is interpreted in a
53 * number of contexts in Linux (blk, bio, genhd), but the default is
54 * universally 512 bytes.  These symbols are just slightly more
55 * meaningful than the bare numbers they represent.
56 */
57#define	SECTOR_SHIFT	9
58#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
59
60/*
61 * Increment the given counter and return its updated value.
62 * If the counter is already 0 it will not be incremented.
63 * If the counter is already at its maximum value returns
64 * -EINVAL without updating it.
65 */
66static int atomic_inc_return_safe(atomic_t *v)
67{
68	unsigned int counter;
69
70	counter = (unsigned int)__atomic_add_unless(v, 1, 0);
71	if (counter <= (unsigned int)INT_MAX)
72		return (int)counter;
73
74	atomic_dec(v);
75
76	return -EINVAL;
77}
78
79/* Decrement the counter.  Return the resulting value, or -EINVAL */
80static int atomic_dec_return_safe(atomic_t *v)
81{
82	int counter;
83
84	counter = atomic_dec_return(v);
85	if (counter >= 0)
86		return counter;
87
88	atomic_inc(v);
89
90	return -EINVAL;
91}
92
93#define RBD_DRV_NAME "rbd"
94
95#define RBD_MINORS_PER_MAJOR		256
96#define RBD_SINGLE_MAJOR_PART_SHIFT	4
97
98#define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
99#define RBD_MAX_SNAP_NAME_LEN	\
100			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
101
102#define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
103
104#define RBD_SNAP_HEAD_NAME	"-"
105
106#define	BAD_SNAP_INDEX	U32_MAX		/* invalid index into snap array */
107
108/* This allows a single page to hold an image name sent by OSD */
109#define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
110#define RBD_IMAGE_ID_LEN_MAX	64
111
112#define RBD_OBJ_PREFIX_LEN_MAX	64
113
114/* Feature bits */
115
116#define RBD_FEATURE_LAYERING	(1<<0)
117#define RBD_FEATURE_STRIPINGV2	(1<<1)
118#define RBD_FEATURES_ALL \
119	    (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
120
121/* Features supported by this (client software) implementation. */
122
123#define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
124
125/*
126 * An RBD device name will be "rbd#", where the "rbd" comes from
127 * RBD_DRV_NAME above, and # is a unique integer identifier.
128 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
129 * enough to hold all possible device names.
130 */
131#define DEV_NAME_LEN		32
132#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
133
134/*
135 * block device image metadata (in-memory version)
136 */
137struct rbd_image_header {
138	/* These six fields never change for a given rbd image */
139	char *object_prefix;
140	__u8 obj_order;
141	__u8 crypt_type;
142	__u8 comp_type;
143	u64 stripe_unit;
144	u64 stripe_count;
145	u64 features;		/* Might be changeable someday? */
146
147	/* The remaining fields need to be updated occasionally */
148	u64 image_size;
149	struct ceph_snap_context *snapc;
150	char *snap_names;	/* format 1 only */
151	u64 *snap_sizes;	/* format 1 only */
152};
153
154/*
155 * An rbd image specification.
156 *
157 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
158 * identify an image.  Each rbd_dev structure includes a pointer to
159 * an rbd_spec structure that encapsulates this identity.
160 *
161 * Each of the id's in an rbd_spec has an associated name.  For a
162 * user-mapped image, the names are supplied and the id's associated
163 * with them are looked up.  For a layered image, a parent image is
164 * defined by the tuple, and the names are looked up.
165 *
166 * An rbd_dev structure contains a parent_spec pointer which is
167 * non-null if the image it represents is a child in a layered
168 * image.  This pointer will refer to the rbd_spec structure used
169 * by the parent rbd_dev for its own identity (i.e., the structure
170 * is shared between the parent and child).
171 *
172 * Since these structures are populated once, during the discovery
173 * phase of image construction, they are effectively immutable so
174 * we make no effort to synchronize access to them.
175 *
176 * Note that code herein does not assume the image name is known (it
177 * could be a null pointer).
178 */
179struct rbd_spec {
180	u64		pool_id;
181	const char	*pool_name;
182
183	const char	*image_id;
184	const char	*image_name;
185
186	u64		snap_id;
187	const char	*snap_name;
188
189	struct kref	kref;
190};
191
192/*
193 * an instance of the client.  multiple devices may share an rbd client.
194 */
195struct rbd_client {
196	struct ceph_client	*client;
197	struct kref		kref;
198	struct list_head	node;
199};
200
201struct rbd_img_request;
202typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
203
204#define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
205
206struct rbd_obj_request;
207typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
208
209enum obj_request_type {
210	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
211};
212
213enum obj_operation_type {
214	OBJ_OP_WRITE,
215	OBJ_OP_READ,
216	OBJ_OP_DISCARD,
217};
218
219enum obj_req_flags {
220	OBJ_REQ_DONE,		/* completion flag: not done = 0, done = 1 */
221	OBJ_REQ_IMG_DATA,	/* object usage: standalone = 0, image = 1 */
222	OBJ_REQ_KNOWN,		/* EXISTS flag valid: no = 0, yes = 1 */
223	OBJ_REQ_EXISTS,		/* target exists: no = 0, yes = 1 */
224};
225
226struct rbd_obj_request {
227	const char		*object_name;
228	u64			offset;		/* object start byte */
229	u64			length;		/* bytes from offset */
230	unsigned long		flags;
231
232	/*
233	 * An object request associated with an image will have its
234	 * img_data flag set; a standalone object request will not.
235	 *
236	 * A standalone object request will have which == BAD_WHICH
237	 * and a null obj_request pointer.
238	 *
239	 * An object request initiated in support of a layered image
240	 * object (to check for its existence before a write) will
241	 * have which == BAD_WHICH and a non-null obj_request pointer.
242	 *
243	 * Finally, an object request for rbd image data will have
244	 * which != BAD_WHICH, and will have a non-null img_request
245	 * pointer.  The value of which will be in the range
246	 * 0..(img_request->obj_request_count-1).
247	 */
248	union {
249		struct rbd_obj_request	*obj_request;	/* STAT op */
250		struct {
251			struct rbd_img_request	*img_request;
252			u64			img_offset;
253			/* links for img_request->obj_requests list */
254			struct list_head	links;
255		};
256	};
257	u32			which;		/* posn image request list */
258
259	enum obj_request_type	type;
260	union {
261		struct bio	*bio_list;
262		struct {
263			struct page	**pages;
264			u32		page_count;
265		};
266	};
267	struct page		**copyup_pages;
268	u32			copyup_page_count;
269
270	struct ceph_osd_request	*osd_req;
271
272	u64			xferred;	/* bytes transferred */
273	int			result;
274
275	rbd_obj_callback_t	callback;
276	struct completion	completion;
277
278	struct kref		kref;
279};
280
281enum img_req_flags {
282	IMG_REQ_WRITE,		/* I/O direction: read = 0, write = 1 */
283	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
284	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
285	IMG_REQ_DISCARD,	/* discard: normal = 0, discard request = 1 */
286};
287
288struct rbd_img_request {
289	struct rbd_device	*rbd_dev;
290	u64			offset;	/* starting image byte offset */
291	u64			length;	/* byte count from offset */
292	unsigned long		flags;
293	union {
294		u64			snap_id;	/* for reads */
295		struct ceph_snap_context *snapc;	/* for writes */
296	};
297	union {
298		struct request		*rq;		/* block request */
299		struct rbd_obj_request	*obj_request;	/* obj req initiator */
300	};
301	struct page		**copyup_pages;
302	u32			copyup_page_count;
303	spinlock_t		completion_lock;/* protects next_completion */
304	u32			next_completion;
305	rbd_img_callback_t	callback;
306	u64			xferred;/* aggregate bytes transferred */
307	int			result;	/* first nonzero obj_request result */
308
309	u32			obj_request_count;
310	struct list_head	obj_requests;	/* rbd_obj_request structs */
311
312	struct kref		kref;
313};
314
315#define for_each_obj_request(ireq, oreq) \
316	list_for_each_entry(oreq, &(ireq)->obj_requests, links)
317#define for_each_obj_request_from(ireq, oreq) \
318	list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
319#define for_each_obj_request_safe(ireq, oreq, n) \
320	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
321
322struct rbd_mapping {
323	u64                     size;
324	u64                     features;
325	bool			read_only;
326};
327
328/*
329 * a single device
330 */
331struct rbd_device {
332	int			dev_id;		/* blkdev unique id */
333
334	int			major;		/* blkdev assigned major */
335	int			minor;
336	struct gendisk		*disk;		/* blkdev's gendisk and rq */
337
338	u32			image_format;	/* Either 1 or 2 */
339	struct rbd_client	*rbd_client;
340
341	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
342
343	struct list_head	rq_queue;	/* incoming rq queue */
344	spinlock_t		lock;		/* queue, flags, open_count */
345	struct work_struct	rq_work;
346
347	struct rbd_image_header	header;
348	unsigned long		flags;		/* possibly lock protected */
349	struct rbd_spec		*spec;
350
351	char			*header_name;
352
353	struct ceph_file_layout	layout;
354
355	struct ceph_osd_event   *watch_event;
356	struct rbd_obj_request	*watch_request;
357
358	struct rbd_spec		*parent_spec;
359	u64			parent_overlap;
360	atomic_t		parent_ref;
361	struct rbd_device	*parent;
362
363	/* protects updating the header */
364	struct rw_semaphore     header_rwsem;
365
366	struct rbd_mapping	mapping;
367
368	struct list_head	node;
369
370	/* sysfs related */
371	struct device		dev;
372	unsigned long		open_count;	/* protected by lock */
373};
374
375/*
376 * Flag bits for rbd_dev->flags.  If atomicity is required,
377 * rbd_dev->lock is used to protect access.
378 *
379 * Currently, only the "removing" flag (which is coupled with the
380 * "open_count" field) requires atomic access.
381 */
382enum rbd_dev_flags {
383	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
384	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
385};
386
387static DEFINE_MUTEX(client_mutex);	/* Serialize client creation */
388
389static LIST_HEAD(rbd_dev_list);    /* devices */
390static DEFINE_SPINLOCK(rbd_dev_list_lock);
391
392static LIST_HEAD(rbd_client_list);		/* clients */
393static DEFINE_SPINLOCK(rbd_client_list_lock);
394
395/* Slab caches for frequently-allocated structures */
396
397static struct kmem_cache	*rbd_img_request_cache;
398static struct kmem_cache	*rbd_obj_request_cache;
399static struct kmem_cache	*rbd_segment_name_cache;
400
401static int rbd_major;
402static DEFINE_IDA(rbd_dev_id_ida);
403
404static struct workqueue_struct *rbd_wq;
405
406/*
407 * Default to false for now, as single-major requires >= 0.75 version of
408 * userspace rbd utility.
409 */
410static bool single_major = false;
411module_param(single_major, bool, S_IRUGO);
412MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
413
414static int rbd_img_request_submit(struct rbd_img_request *img_request);
415
416static void rbd_dev_device_release(struct device *dev);
417
418static ssize_t rbd_add(struct bus_type *bus, const char *buf,
419		       size_t count);
420static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
421			  size_t count);
422static ssize_t rbd_add_single_major(struct bus_type *bus, const char *buf,
423				    size_t count);
424static ssize_t rbd_remove_single_major(struct bus_type *bus, const char *buf,
425				       size_t count);
426static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
427static void rbd_spec_put(struct rbd_spec *spec);
428
429static int rbd_dev_id_to_minor(int dev_id)
430{
431	return dev_id << RBD_SINGLE_MAJOR_PART_SHIFT;
432}
433
434static int minor_to_rbd_dev_id(int minor)
435{
436	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
437}
438
439static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
440static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
441static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
442static BUS_ATTR(remove_single_major, S_IWUSR, NULL, rbd_remove_single_major);
443
444static struct attribute *rbd_bus_attrs[] = {
445	&bus_attr_add.attr,
446	&bus_attr_remove.attr,
447	&bus_attr_add_single_major.attr,
448	&bus_attr_remove_single_major.attr,
449	NULL,
450};
451
452static umode_t rbd_bus_is_visible(struct kobject *kobj,
453				  struct attribute *attr, int index)
454{
455	if (!single_major &&
456	    (attr == &bus_attr_add_single_major.attr ||
457	     attr == &bus_attr_remove_single_major.attr))
458		return 0;
459
460	return attr->mode;
461}
462
463static const struct attribute_group rbd_bus_group = {
464	.attrs = rbd_bus_attrs,
465	.is_visible = rbd_bus_is_visible,
466};
467__ATTRIBUTE_GROUPS(rbd_bus);
468
469static struct bus_type rbd_bus_type = {
470	.name		= "rbd",
471	.bus_groups	= rbd_bus_groups,
472};
473
474static void rbd_root_dev_release(struct device *dev)
475{
476}
477
478static struct device rbd_root_dev = {
479	.init_name =    "rbd",
480	.release =      rbd_root_dev_release,
481};
482
483static __printf(2, 3)
484void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
485{
486	struct va_format vaf;
487	va_list args;
488
489	va_start(args, fmt);
490	vaf.fmt = fmt;
491	vaf.va = &args;
492
493	if (!rbd_dev)
494		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
495	else if (rbd_dev->disk)
496		printk(KERN_WARNING "%s: %s: %pV\n",
497			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
498	else if (rbd_dev->spec && rbd_dev->spec->image_name)
499		printk(KERN_WARNING "%s: image %s: %pV\n",
500			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
501	else if (rbd_dev->spec && rbd_dev->spec->image_id)
502		printk(KERN_WARNING "%s: id %s: %pV\n",
503			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
504	else	/* punt */
505		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
506			RBD_DRV_NAME, rbd_dev, &vaf);
507	va_end(args);
508}
509
510#ifdef RBD_DEBUG
511#define rbd_assert(expr)						\
512		if (unlikely(!(expr))) {				\
513			printk(KERN_ERR "\nAssertion failure in %s() "	\
514						"at line %d:\n\n"	\
515					"\trbd_assert(%s);\n\n",	\
516					__func__, __LINE__, #expr);	\
517			BUG();						\
518		}
519#else /* !RBD_DEBUG */
520#  define rbd_assert(expr)	((void) 0)
521#endif /* !RBD_DEBUG */
522
523static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
524static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
525static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
526
527static int rbd_dev_refresh(struct rbd_device *rbd_dev);
528static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
529static int rbd_dev_header_info(struct rbd_device *rbd_dev);
530static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev);
531static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
532					u64 snap_id);
533static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
534				u8 *order, u64 *snap_size);
535static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
536		u64 *snap_features);
537static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
538
539static int rbd_open(struct block_device *bdev, fmode_t mode)
540{
541	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
542	bool removing = false;
543
544	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
545		return -EROFS;
546
547	spin_lock_irq(&rbd_dev->lock);
548	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
549		removing = true;
550	else
551		rbd_dev->open_count++;
552	spin_unlock_irq(&rbd_dev->lock);
553	if (removing)
554		return -ENOENT;
555
556	(void) get_device(&rbd_dev->dev);
557
558	return 0;
559}
560
561static void rbd_release(struct gendisk *disk, fmode_t mode)
562{
563	struct rbd_device *rbd_dev = disk->private_data;
564	unsigned long open_count_before;
565
566	spin_lock_irq(&rbd_dev->lock);
567	open_count_before = rbd_dev->open_count--;
568	spin_unlock_irq(&rbd_dev->lock);
569	rbd_assert(open_count_before > 0);
570
571	put_device(&rbd_dev->dev);
572}
573
574static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
575{
576	int ret = 0;
577	int val;
578	bool ro;
579	bool ro_changed = false;
580
581	/* get_user() may sleep, so call it before taking rbd_dev->lock */
582	if (get_user(val, (int __user *)(arg)))
583		return -EFAULT;
584
585	ro = val ? true : false;
586	/* Snapshot doesn't allow to write*/
587	if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
588		return -EROFS;
589
590	spin_lock_irq(&rbd_dev->lock);
591	/* prevent others open this device */
592	if (rbd_dev->open_count > 1) {
593		ret = -EBUSY;
594		goto out;
595	}
596
597	if (rbd_dev->mapping.read_only != ro) {
598		rbd_dev->mapping.read_only = ro;
599		ro_changed = true;
600	}
601
602out:
603	spin_unlock_irq(&rbd_dev->lock);
604	/* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
605	if (ret == 0 && ro_changed)
606		set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
607
608	return ret;
609}
610
611static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
612			unsigned int cmd, unsigned long arg)
613{
614	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
615	int ret = 0;
616
617	switch (cmd) {
618	case BLKROSET:
619		ret = rbd_ioctl_set_ro(rbd_dev, arg);
620		break;
621	default:
622		ret = -ENOTTY;
623	}
624
625	return ret;
626}
627
628#ifdef CONFIG_COMPAT
629static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
630				unsigned int cmd, unsigned long arg)
631{
632	return rbd_ioctl(bdev, mode, cmd, arg);
633}
634#endif /* CONFIG_COMPAT */
635
636static const struct block_device_operations rbd_bd_ops = {
637	.owner			= THIS_MODULE,
638	.open			= rbd_open,
639	.release		= rbd_release,
640	.ioctl			= rbd_ioctl,
641#ifdef CONFIG_COMPAT
642	.compat_ioctl		= rbd_compat_ioctl,
643#endif
644};
645
646/*
647 * Initialize an rbd client instance.  Success or not, this function
648 * consumes ceph_opts.  Caller holds client_mutex.
649 */
650static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
651{
652	struct rbd_client *rbdc;
653	int ret = -ENOMEM;
654
655	dout("%s:\n", __func__);
656	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
657	if (!rbdc)
658		goto out_opt;
659
660	kref_init(&rbdc->kref);
661	INIT_LIST_HEAD(&rbdc->node);
662
663	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
664	if (IS_ERR(rbdc->client))
665		goto out_rbdc;
666	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
667
668	ret = ceph_open_session(rbdc->client);
669	if (ret < 0)
670		goto out_client;
671
672	spin_lock(&rbd_client_list_lock);
673	list_add_tail(&rbdc->node, &rbd_client_list);
674	spin_unlock(&rbd_client_list_lock);
675
676	dout("%s: rbdc %p\n", __func__, rbdc);
677
678	return rbdc;
679out_client:
680	ceph_destroy_client(rbdc->client);
681out_rbdc:
682	kfree(rbdc);
683out_opt:
684	if (ceph_opts)
685		ceph_destroy_options(ceph_opts);
686	dout("%s: error %d\n", __func__, ret);
687
688	return ERR_PTR(ret);
689}
690
691static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
692{
693	kref_get(&rbdc->kref);
694
695	return rbdc;
696}
697
698/*
699 * Find a ceph client with specific addr and configuration.  If
700 * found, bump its reference count.
701 */
702static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
703{
704	struct rbd_client *client_node;
705	bool found = false;
706
707	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
708		return NULL;
709
710	spin_lock(&rbd_client_list_lock);
711	list_for_each_entry(client_node, &rbd_client_list, node) {
712		if (!ceph_compare_options(ceph_opts, client_node->client)) {
713			__rbd_get_client(client_node);
714
715			found = true;
716			break;
717		}
718	}
719	spin_unlock(&rbd_client_list_lock);
720
721	return found ? client_node : NULL;
722}
723
724/*
725 * mount options
726 */
727enum {
728	Opt_last_int,
729	/* int args above */
730	Opt_last_string,
731	/* string args above */
732	Opt_read_only,
733	Opt_read_write,
734	/* Boolean args above */
735	Opt_last_bool,
736};
737
738static match_table_t rbd_opts_tokens = {
739	/* int args above */
740	/* string args above */
741	{Opt_read_only, "read_only"},
742	{Opt_read_only, "ro"},		/* Alternate spelling */
743	{Opt_read_write, "read_write"},
744	{Opt_read_write, "rw"},		/* Alternate spelling */
745	/* Boolean args above */
746	{-1, NULL}
747};
748
749struct rbd_options {
750	bool	read_only;
751};
752
753#define RBD_READ_ONLY_DEFAULT	false
754
755static int parse_rbd_opts_token(char *c, void *private)
756{
757	struct rbd_options *rbd_opts = private;
758	substring_t argstr[MAX_OPT_ARGS];
759	int token, intval, ret;
760
761	token = match_token(c, rbd_opts_tokens, argstr);
762	if (token < 0)
763		return -EINVAL;
764
765	if (token < Opt_last_int) {
766		ret = match_int(&argstr[0], &intval);
767		if (ret < 0) {
768			pr_err("bad mount option arg (not int) "
769			       "at '%s'\n", c);
770			return ret;
771		}
772		dout("got int token %d val %d\n", token, intval);
773	} else if (token > Opt_last_int && token < Opt_last_string) {
774		dout("got string token %d val %s\n", token,
775		     argstr[0].from);
776	} else if (token > Opt_last_string && token < Opt_last_bool) {
777		dout("got Boolean token %d\n", token);
778	} else {
779		dout("got token %d\n", token);
780	}
781
782	switch (token) {
783	case Opt_read_only:
784		rbd_opts->read_only = true;
785		break;
786	case Opt_read_write:
787		rbd_opts->read_only = false;
788		break;
789	default:
790		rbd_assert(false);
791		break;
792	}
793	return 0;
794}
795
796static char* obj_op_name(enum obj_operation_type op_type)
797{
798	switch (op_type) {
799	case OBJ_OP_READ:
800		return "read";
801	case OBJ_OP_WRITE:
802		return "write";
803	case OBJ_OP_DISCARD:
804		return "discard";
805	default:
806		return "???";
807	}
808}
809
810/*
811 * Get a ceph client with specific addr and configuration, if one does
812 * not exist create it.  Either way, ceph_opts is consumed by this
813 * function.
814 */
815static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
816{
817	struct rbd_client *rbdc;
818
819	mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
820	rbdc = rbd_client_find(ceph_opts);
821	if (rbdc)	/* using an existing client */
822		ceph_destroy_options(ceph_opts);
823	else
824		rbdc = rbd_client_create(ceph_opts);
825	mutex_unlock(&client_mutex);
826
827	return rbdc;
828}
829
830/*
831 * Destroy ceph client
832 *
833 * Caller must hold rbd_client_list_lock.
834 */
835static void rbd_client_release(struct kref *kref)
836{
837	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
838
839	dout("%s: rbdc %p\n", __func__, rbdc);
840	spin_lock(&rbd_client_list_lock);
841	list_del(&rbdc->node);
842	spin_unlock(&rbd_client_list_lock);
843
844	ceph_destroy_client(rbdc->client);
845	kfree(rbdc);
846}
847
848/*
849 * Drop reference to ceph client node. If it's not referenced anymore, release
850 * it.
851 */
852static void rbd_put_client(struct rbd_client *rbdc)
853{
854	if (rbdc)
855		kref_put(&rbdc->kref, rbd_client_release);
856}
857
858static bool rbd_image_format_valid(u32 image_format)
859{
860	return image_format == 1 || image_format == 2;
861}
862
863static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
864{
865	size_t size;
866	u32 snap_count;
867
868	/* The header has to start with the magic rbd header text */
869	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
870		return false;
871
872	/* The bio layer requires at least sector-sized I/O */
873
874	if (ondisk->options.order < SECTOR_SHIFT)
875		return false;
876
877	/* If we use u64 in a few spots we may be able to loosen this */
878
879	if (ondisk->options.order > 8 * sizeof (int) - 1)
880		return false;
881
882	/*
883	 * The size of a snapshot header has to fit in a size_t, and
884	 * that limits the number of snapshots.
885	 */
886	snap_count = le32_to_cpu(ondisk->snap_count);
887	size = SIZE_MAX - sizeof (struct ceph_snap_context);
888	if (snap_count > size / sizeof (__le64))
889		return false;
890
891	/*
892	 * Not only that, but the size of the entire the snapshot
893	 * header must also be representable in a size_t.
894	 */
895	size -= snap_count * sizeof (__le64);
896	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
897		return false;
898
899	return true;
900}
901
902/*
903 * Fill an rbd image header with information from the given format 1
904 * on-disk header.
905 */
906static int rbd_header_from_disk(struct rbd_device *rbd_dev,
907				 struct rbd_image_header_ondisk *ondisk)
908{
909	struct rbd_image_header *header = &rbd_dev->header;
910	bool first_time = header->object_prefix == NULL;
911	struct ceph_snap_context *snapc;
912	char *object_prefix = NULL;
913	char *snap_names = NULL;
914	u64 *snap_sizes = NULL;
915	u32 snap_count;
916	size_t size;
917	int ret = -ENOMEM;
918	u32 i;
919
920	/* Allocate this now to avoid having to handle failure below */
921
922	if (first_time) {
923		size_t len;
924
925		len = strnlen(ondisk->object_prefix,
926				sizeof (ondisk->object_prefix));
927		object_prefix = kmalloc(len + 1, GFP_KERNEL);
928		if (!object_prefix)
929			return -ENOMEM;
930		memcpy(object_prefix, ondisk->object_prefix, len);
931		object_prefix[len] = '\0';
932	}
933
934	/* Allocate the snapshot context and fill it in */
935
936	snap_count = le32_to_cpu(ondisk->snap_count);
937	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
938	if (!snapc)
939		goto out_err;
940	snapc->seq = le64_to_cpu(ondisk->snap_seq);
941	if (snap_count) {
942		struct rbd_image_snap_ondisk *snaps;
943		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
944
945		/* We'll keep a copy of the snapshot names... */
946
947		if (snap_names_len > (u64)SIZE_MAX)
948			goto out_2big;
949		snap_names = kmalloc(snap_names_len, GFP_KERNEL);
950		if (!snap_names)
951			goto out_err;
952
953		/* ...as well as the array of their sizes. */
954
955		size = snap_count * sizeof (*header->snap_sizes);
956		snap_sizes = kmalloc(size, GFP_KERNEL);
957		if (!snap_sizes)
958			goto out_err;
959
960		/*
961		 * Copy the names, and fill in each snapshot's id
962		 * and size.
963		 *
964		 * Note that rbd_dev_v1_header_info() guarantees the
965		 * ondisk buffer we're working with has
966		 * snap_names_len bytes beyond the end of the
967		 * snapshot id array, this memcpy() is safe.
968		 */
969		memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
970		snaps = ondisk->snaps;
971		for (i = 0; i < snap_count; i++) {
972			snapc->snaps[i] = le64_to_cpu(snaps[i].id);
973			snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
974		}
975	}
976
977	/* We won't fail any more, fill in the header */
978
979	if (first_time) {
980		header->object_prefix = object_prefix;
981		header->obj_order = ondisk->options.order;
982		header->crypt_type = ondisk->options.crypt_type;
983		header->comp_type = ondisk->options.comp_type;
984		/* The rest aren't used for format 1 images */
985		header->stripe_unit = 0;
986		header->stripe_count = 0;
987		header->features = 0;
988	} else {
989		ceph_put_snap_context(header->snapc);
990		kfree(header->snap_names);
991		kfree(header->snap_sizes);
992	}
993
994	/* The remaining fields always get updated (when we refresh) */
995
996	header->image_size = le64_to_cpu(ondisk->image_size);
997	header->snapc = snapc;
998	header->snap_names = snap_names;
999	header->snap_sizes = snap_sizes;
1000
1001	return 0;
1002out_2big:
1003	ret = -EIO;
1004out_err:
1005	kfree(snap_sizes);
1006	kfree(snap_names);
1007	ceph_put_snap_context(snapc);
1008	kfree(object_prefix);
1009
1010	return ret;
1011}
1012
1013static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
1014{
1015	const char *snap_name;
1016
1017	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
1018
1019	/* Skip over names until we find the one we are looking for */
1020
1021	snap_name = rbd_dev->header.snap_names;
1022	while (which--)
1023		snap_name += strlen(snap_name) + 1;
1024
1025	return kstrdup(snap_name, GFP_KERNEL);
1026}
1027
1028/*
1029 * Snapshot id comparison function for use with qsort()/bsearch().
1030 * Note that result is for snapshots in *descending* order.
1031 */
1032static int snapid_compare_reverse(const void *s1, const void *s2)
1033{
1034	u64 snap_id1 = *(u64 *)s1;
1035	u64 snap_id2 = *(u64 *)s2;
1036
1037	if (snap_id1 < snap_id2)
1038		return 1;
1039	return snap_id1 == snap_id2 ? 0 : -1;
1040}
1041
1042/*
1043 * Search a snapshot context to see if the given snapshot id is
1044 * present.
1045 *
1046 * Returns the position of the snapshot id in the array if it's found,
1047 * or BAD_SNAP_INDEX otherwise.
1048 *
1049 * Note: The snapshot array is in kept sorted (by the osd) in
1050 * reverse order, highest snapshot id first.
1051 */
1052static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
1053{
1054	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
1055	u64 *found;
1056
1057	found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
1058				sizeof (snap_id), snapid_compare_reverse);
1059
1060	return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
1061}
1062
1063static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
1064					u64 snap_id)
1065{
1066	u32 which;
1067	const char *snap_name;
1068
1069	which = rbd_dev_snap_index(rbd_dev, snap_id);
1070	if (which == BAD_SNAP_INDEX)
1071		return ERR_PTR(-ENOENT);
1072
1073	snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
1074	return snap_name ? snap_name : ERR_PTR(-ENOMEM);
1075}
1076
1077static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
1078{
1079	if (snap_id == CEPH_NOSNAP)
1080		return RBD_SNAP_HEAD_NAME;
1081
1082	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1083	if (rbd_dev->image_format == 1)
1084		return rbd_dev_v1_snap_name(rbd_dev, snap_id);
1085
1086	return rbd_dev_v2_snap_name(rbd_dev, snap_id);
1087}
1088
1089static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
1090				u64 *snap_size)
1091{
1092	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1093	if (snap_id == CEPH_NOSNAP) {
1094		*snap_size = rbd_dev->header.image_size;
1095	} else if (rbd_dev->image_format == 1) {
1096		u32 which;
1097
1098		which = rbd_dev_snap_index(rbd_dev, snap_id);
1099		if (which == BAD_SNAP_INDEX)
1100			return -ENOENT;
1101
1102		*snap_size = rbd_dev->header.snap_sizes[which];
1103	} else {
1104		u64 size = 0;
1105		int ret;
1106
1107		ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
1108		if (ret)
1109			return ret;
1110
1111		*snap_size = size;
1112	}
1113	return 0;
1114}
1115
1116static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
1117			u64 *snap_features)
1118{
1119	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1120	if (snap_id == CEPH_NOSNAP) {
1121		*snap_features = rbd_dev->header.features;
1122	} else if (rbd_dev->image_format == 1) {
1123		*snap_features = 0;	/* No features for format 1 */
1124	} else {
1125		u64 features = 0;
1126		int ret;
1127
1128		ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1129		if (ret)
1130			return ret;
1131
1132		*snap_features = features;
1133	}
1134	return 0;
1135}
1136
1137static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1138{
1139	u64 snap_id = rbd_dev->spec->snap_id;
1140	u64 size = 0;
1141	u64 features = 0;
1142	int ret;
1143
1144	ret = rbd_snap_size(rbd_dev, snap_id, &size);
1145	if (ret)
1146		return ret;
1147	ret = rbd_snap_features(rbd_dev, snap_id, &features);
1148	if (ret)
1149		return ret;
1150
1151	rbd_dev->mapping.size = size;
1152	rbd_dev->mapping.features = features;
1153
1154	return 0;
1155}
1156
1157static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1158{
1159	rbd_dev->mapping.size = 0;
1160	rbd_dev->mapping.features = 0;
1161}
1162
1163static void rbd_segment_name_free(const char *name)
1164{
1165	/* The explicit cast here is needed to drop the const qualifier */
1166
1167	kmem_cache_free(rbd_segment_name_cache, (void *)name);
1168}
1169
1170static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
1171{
1172	char *name;
1173	u64 segment;
1174	int ret;
1175	char *name_format;
1176
1177	name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
1178	if (!name)
1179		return NULL;
1180	segment = offset >> rbd_dev->header.obj_order;
1181	name_format = "%s.%012llx";
1182	if (rbd_dev->image_format == 2)
1183		name_format = "%s.%016llx";
1184	ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format,
1185			rbd_dev->header.object_prefix, segment);
1186	if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) {
1187		pr_err("error formatting segment name for #%llu (%d)\n",
1188			segment, ret);
1189		rbd_segment_name_free(name);
1190		name = NULL;
1191	}
1192
1193	return name;
1194}
1195
1196static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1197{
1198	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1199
1200	return offset & (segment_size - 1);
1201}
1202
1203static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1204				u64 offset, u64 length)
1205{
1206	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1207
1208	offset &= segment_size - 1;
1209
1210	rbd_assert(length <= U64_MAX - offset);
1211	if (offset + length > segment_size)
1212		length = segment_size - offset;
1213
1214	return length;
1215}
1216
1217/*
1218 * returns the size of an object in the image
1219 */
1220static u64 rbd_obj_bytes(struct rbd_image_header *header)
1221{
1222	return 1 << header->obj_order;
1223}
1224
1225/*
1226 * bio helpers
1227 */
1228
1229static void bio_chain_put(struct bio *chain)
1230{
1231	struct bio *tmp;
1232
1233	while (chain) {
1234		tmp = chain;
1235		chain = chain->bi_next;
1236		bio_put(tmp);
1237	}
1238}
1239
1240/*
1241 * zeros a bio chain, starting at specific offset
1242 */
1243static void zero_bio_chain(struct bio *chain, int start_ofs)
1244{
1245	struct bio_vec bv;
1246	struct bvec_iter iter;
1247	unsigned long flags;
1248	void *buf;
1249	int pos = 0;
1250
1251	while (chain) {
1252		bio_for_each_segment(bv, chain, iter) {
1253			if (pos + bv.bv_len > start_ofs) {
1254				int remainder = max(start_ofs - pos, 0);
1255				buf = bvec_kmap_irq(&bv, &flags);
1256				memset(buf + remainder, 0,
1257				       bv.bv_len - remainder);
1258				flush_dcache_page(bv.bv_page);
1259				bvec_kunmap_irq(buf, &flags);
1260			}
1261			pos += bv.bv_len;
1262		}
1263
1264		chain = chain->bi_next;
1265	}
1266}
1267
1268/*
1269 * similar to zero_bio_chain(), zeros data defined by a page array,
1270 * starting at the given byte offset from the start of the array and
1271 * continuing up to the given end offset.  The pages array is
1272 * assumed to be big enough to hold all bytes up to the end.
1273 */
1274static void zero_pages(struct page **pages, u64 offset, u64 end)
1275{
1276	struct page **page = &pages[offset >> PAGE_SHIFT];
1277
1278	rbd_assert(end > offset);
1279	rbd_assert(end - offset <= (u64)SIZE_MAX);
1280	while (offset < end) {
1281		size_t page_offset;
1282		size_t length;
1283		unsigned long flags;
1284		void *kaddr;
1285
1286		page_offset = offset & ~PAGE_MASK;
1287		length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
1288		local_irq_save(flags);
1289		kaddr = kmap_atomic(*page);
1290		memset(kaddr + page_offset, 0, length);
1291		flush_dcache_page(*page);
1292		kunmap_atomic(kaddr);
1293		local_irq_restore(flags);
1294
1295		offset += length;
1296		page++;
1297	}
1298}
1299
1300/*
1301 * Clone a portion of a bio, starting at the given byte offset
1302 * and continuing for the number of bytes indicated.
1303 */
1304static struct bio *bio_clone_range(struct bio *bio_src,
1305					unsigned int offset,
1306					unsigned int len,
1307					gfp_t gfpmask)
1308{
1309	struct bio *bio;
1310
1311	bio = bio_clone(bio_src, gfpmask);
1312	if (!bio)
1313		return NULL;	/* ENOMEM */
1314
1315	bio_advance(bio, offset);
1316	bio->bi_iter.bi_size = len;
1317
1318	return bio;
1319}
1320
1321/*
1322 * Clone a portion of a bio chain, starting at the given byte offset
1323 * into the first bio in the source chain and continuing for the
1324 * number of bytes indicated.  The result is another bio chain of
1325 * exactly the given length, or a null pointer on error.
1326 *
1327 * The bio_src and offset parameters are both in-out.  On entry they
1328 * refer to the first source bio and the offset into that bio where
1329 * the start of data to be cloned is located.
1330 *
1331 * On return, bio_src is updated to refer to the bio in the source
1332 * chain that contains first un-cloned byte, and *offset will
1333 * contain the offset of that byte within that bio.
1334 */
1335static struct bio *bio_chain_clone_range(struct bio **bio_src,
1336					unsigned int *offset,
1337					unsigned int len,
1338					gfp_t gfpmask)
1339{
1340	struct bio *bi = *bio_src;
1341	unsigned int off = *offset;
1342	struct bio *chain = NULL;
1343	struct bio **end;
1344
1345	/* Build up a chain of clone bios up to the limit */
1346
1347	if (!bi || off >= bi->bi_iter.bi_size || !len)
1348		return NULL;		/* Nothing to clone */
1349
1350	end = &chain;
1351	while (len) {
1352		unsigned int bi_size;
1353		struct bio *bio;
1354
1355		if (!bi) {
1356			rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1357			goto out_err;	/* EINVAL; ran out of bio's */
1358		}
1359		bi_size = min_t(unsigned int, bi->bi_iter.bi_size - off, len);
1360		bio = bio_clone_range(bi, off, bi_size, gfpmask);
1361		if (!bio)
1362			goto out_err;	/* ENOMEM */
1363
1364		*end = bio;
1365		end = &bio->bi_next;
1366
1367		off += bi_size;
1368		if (off == bi->bi_iter.bi_size) {
1369			bi = bi->bi_next;
1370			off = 0;
1371		}
1372		len -= bi_size;
1373	}
1374	*bio_src = bi;
1375	*offset = off;
1376
1377	return chain;
1378out_err:
1379	bio_chain_put(chain);
1380
1381	return NULL;
1382}
1383
1384/*
1385 * The default/initial value for all object request flags is 0.  For
1386 * each flag, once its value is set to 1 it is never reset to 0
1387 * again.
1388 */
1389static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1390{
1391	if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1392		struct rbd_device *rbd_dev;
1393
1394		rbd_dev = obj_request->img_request->rbd_dev;
1395		rbd_warn(rbd_dev, "obj_request %p already marked img_data",
1396			obj_request);
1397	}
1398}
1399
1400static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1401{
1402	smp_mb();
1403	return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1404}
1405
1406static void obj_request_done_set(struct rbd_obj_request *obj_request)
1407{
1408	if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1409		struct rbd_device *rbd_dev = NULL;
1410
1411		if (obj_request_img_data_test(obj_request))
1412			rbd_dev = obj_request->img_request->rbd_dev;
1413		rbd_warn(rbd_dev, "obj_request %p already marked done",
1414			obj_request);
1415	}
1416}
1417
1418static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1419{
1420	smp_mb();
1421	return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1422}
1423
1424/*
1425 * This sets the KNOWN flag after (possibly) setting the EXISTS
1426 * flag.  The latter is set based on the "exists" value provided.
1427 *
1428 * Note that for our purposes once an object exists it never goes
1429 * away again.  It's possible that the response from two existence
1430 * checks are separated by the creation of the target object, and
1431 * the first ("doesn't exist") response arrives *after* the second
1432 * ("does exist").  In that case we ignore the second one.
1433 */
1434static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1435				bool exists)
1436{
1437	if (exists)
1438		set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1439	set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1440	smp_mb();
1441}
1442
1443static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1444{
1445	smp_mb();
1446	return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1447}
1448
1449static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1450{
1451	smp_mb();
1452	return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1453}
1454
1455static bool obj_request_overlaps_parent(struct rbd_obj_request *obj_request)
1456{
1457	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
1458
1459	return obj_request->img_offset <
1460	    round_up(rbd_dev->parent_overlap, rbd_obj_bytes(&rbd_dev->header));
1461}
1462
1463static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1464{
1465	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1466		atomic_read(&obj_request->kref.refcount));
1467	kref_get(&obj_request->kref);
1468}
1469
1470static void rbd_obj_request_destroy(struct kref *kref);
1471static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1472{
1473	rbd_assert(obj_request != NULL);
1474	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1475		atomic_read(&obj_request->kref.refcount));
1476	kref_put(&obj_request->kref, rbd_obj_request_destroy);
1477}
1478
1479static void rbd_img_request_get(struct rbd_img_request *img_request)
1480{
1481	dout("%s: img %p (was %d)\n", __func__, img_request,
1482	     atomic_read(&img_request->kref.refcount));
1483	kref_get(&img_request->kref);
1484}
1485
1486static bool img_request_child_test(struct rbd_img_request *img_request);
1487static void rbd_parent_request_destroy(struct kref *kref);
1488static void rbd_img_request_destroy(struct kref *kref);
1489static void rbd_img_request_put(struct rbd_img_request *img_request)
1490{
1491	rbd_assert(img_request != NULL);
1492	dout("%s: img %p (was %d)\n", __func__, img_request,
1493		atomic_read(&img_request->kref.refcount));
1494	if (img_request_child_test(img_request))
1495		kref_put(&img_request->kref, rbd_parent_request_destroy);
1496	else
1497		kref_put(&img_request->kref, rbd_img_request_destroy);
1498}
1499
1500static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1501					struct rbd_obj_request *obj_request)
1502{
1503	rbd_assert(obj_request->img_request == NULL);
1504
1505	/* Image request now owns object's original reference */
1506	obj_request->img_request = img_request;
1507	obj_request->which = img_request->obj_request_count;
1508	rbd_assert(!obj_request_img_data_test(obj_request));
1509	obj_request_img_data_set(obj_request);
1510	rbd_assert(obj_request->which != BAD_WHICH);
1511	img_request->obj_request_count++;
1512	list_add_tail(&obj_request->links, &img_request->obj_requests);
1513	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1514		obj_request->which);
1515}
1516
1517static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1518					struct rbd_obj_request *obj_request)
1519{
1520	rbd_assert(obj_request->which != BAD_WHICH);
1521
1522	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1523		obj_request->which);
1524	list_del(&obj_request->links);
1525	rbd_assert(img_request->obj_request_count > 0);
1526	img_request->obj_request_count--;
1527	rbd_assert(obj_request->which == img_request->obj_request_count);
1528	obj_request->which = BAD_WHICH;
1529	rbd_assert(obj_request_img_data_test(obj_request));
1530	rbd_assert(obj_request->img_request == img_request);
1531	obj_request->img_request = NULL;
1532	obj_request->callback = NULL;
1533	rbd_obj_request_put(obj_request);
1534}
1535
1536static bool obj_request_type_valid(enum obj_request_type type)
1537{
1538	switch (type) {
1539	case OBJ_REQUEST_NODATA:
1540	case OBJ_REQUEST_BIO:
1541	case OBJ_REQUEST_PAGES:
1542		return true;
1543	default:
1544		return false;
1545	}
1546}
1547
1548static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1549				struct rbd_obj_request *obj_request)
1550{
1551	dout("%s %p\n", __func__, obj_request);
1552	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1553}
1554
1555static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
1556{
1557	dout("%s %p\n", __func__, obj_request);
1558	ceph_osdc_cancel_request(obj_request->osd_req);
1559}
1560
1561/*
1562 * Wait for an object request to complete.  If interrupted, cancel the
1563 * underlying osd request.
1564 */
1565static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1566{
1567	int ret;
1568
1569	dout("%s %p\n", __func__, obj_request);
1570
1571	ret = wait_for_completion_interruptible(&obj_request->completion);
1572	if (ret < 0) {
1573		dout("%s %p interrupted\n", __func__, obj_request);
1574		rbd_obj_request_end(obj_request);
1575		return ret;
1576	}
1577
1578	dout("%s %p done\n", __func__, obj_request);
1579	return 0;
1580}
1581
1582static void rbd_img_request_complete(struct rbd_img_request *img_request)
1583{
1584
1585	dout("%s: img %p\n", __func__, img_request);
1586
1587	/*
1588	 * If no error occurred, compute the aggregate transfer
1589	 * count for the image request.  We could instead use
1590	 * atomic64_cmpxchg() to update it as each object request
1591	 * completes; not clear which way is better off hand.
1592	 */
1593	if (!img_request->result) {
1594		struct rbd_obj_request *obj_request;
1595		u64 xferred = 0;
1596
1597		for_each_obj_request(img_request, obj_request)
1598			xferred += obj_request->xferred;
1599		img_request->xferred = xferred;
1600	}
1601
1602	if (img_request->callback)
1603		img_request->callback(img_request);
1604	else
1605		rbd_img_request_put(img_request);
1606}
1607
1608/*
1609 * The default/initial value for all image request flags is 0.  Each
1610 * is conditionally set to 1 at image request initialization time
1611 * and currently never change thereafter.
1612 */
1613static void img_request_write_set(struct rbd_img_request *img_request)
1614{
1615	set_bit(IMG_REQ_WRITE, &img_request->flags);
1616	smp_mb();
1617}
1618
1619static bool img_request_write_test(struct rbd_img_request *img_request)
1620{
1621	smp_mb();
1622	return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1623}
1624
1625/*
1626 * Set the discard flag when the img_request is an discard request
1627 */
1628static void img_request_discard_set(struct rbd_img_request *img_request)
1629{
1630	set_bit(IMG_REQ_DISCARD, &img_request->flags);
1631	smp_mb();
1632}
1633
1634static bool img_request_discard_test(struct rbd_img_request *img_request)
1635{
1636	smp_mb();
1637	return test_bit(IMG_REQ_DISCARD, &img_request->flags) != 0;
1638}
1639
1640static void img_request_child_set(struct rbd_img_request *img_request)
1641{
1642	set_bit(IMG_REQ_CHILD, &img_request->flags);
1643	smp_mb();
1644}
1645
1646static void img_request_child_clear(struct rbd_img_request *img_request)
1647{
1648	clear_bit(IMG_REQ_CHILD, &img_request->flags);
1649	smp_mb();
1650}
1651
1652static bool img_request_child_test(struct rbd_img_request *img_request)
1653{
1654	smp_mb();
1655	return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1656}
1657
1658static void img_request_layered_set(struct rbd_img_request *img_request)
1659{
1660	set_bit(IMG_REQ_LAYERED, &img_request->flags);
1661	smp_mb();
1662}
1663
1664static void img_request_layered_clear(struct rbd_img_request *img_request)
1665{
1666	clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1667	smp_mb();
1668}
1669
1670static bool img_request_layered_test(struct rbd_img_request *img_request)
1671{
1672	smp_mb();
1673	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1674}
1675
1676static enum obj_operation_type
1677rbd_img_request_op_type(struct rbd_img_request *img_request)
1678{
1679	if (img_request_write_test(img_request))
1680		return OBJ_OP_WRITE;
1681	else if (img_request_discard_test(img_request))
1682		return OBJ_OP_DISCARD;
1683	else
1684		return OBJ_OP_READ;
1685}
1686
1687static void
1688rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1689{
1690	u64 xferred = obj_request->xferred;
1691	u64 length = obj_request->length;
1692
1693	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1694		obj_request, obj_request->img_request, obj_request->result,
1695		xferred, length);
1696	/*
1697	 * ENOENT means a hole in the image.  We zero-fill the entire
1698	 * length of the request.  A short read also implies zero-fill
1699	 * to the end of the request.  An error requires the whole
1700	 * length of the request to be reported finished with an error
1701	 * to the block layer.  In each case we update the xferred
1702	 * count to indicate the whole request was satisfied.
1703	 */
1704	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1705	if (obj_request->result == -ENOENT) {
1706		if (obj_request->type == OBJ_REQUEST_BIO)
1707			zero_bio_chain(obj_request->bio_list, 0);
1708		else
1709			zero_pages(obj_request->pages, 0, length);
1710		obj_request->result = 0;
1711	} else if (xferred < length && !obj_request->result) {
1712		if (obj_request->type == OBJ_REQUEST_BIO)
1713			zero_bio_chain(obj_request->bio_list, xferred);
1714		else
1715			zero_pages(obj_request->pages, xferred, length);
1716	}
1717	obj_request->xferred = length;
1718	obj_request_done_set(obj_request);
1719}
1720
1721static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1722{
1723	dout("%s: obj %p cb %p\n", __func__, obj_request,
1724		obj_request->callback);
1725	if (obj_request->callback)
1726		obj_request->callback(obj_request);
1727	else
1728		complete_all(&obj_request->completion);
1729}
1730
1731static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1732{
1733	dout("%s: obj %p\n", __func__, obj_request);
1734	obj_request_done_set(obj_request);
1735}
1736
1737static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1738{
1739	struct rbd_img_request *img_request = NULL;
1740	struct rbd_device *rbd_dev = NULL;
1741	bool layered = false;
1742
1743	if (obj_request_img_data_test(obj_request)) {
1744		img_request = obj_request->img_request;
1745		layered = img_request && img_request_layered_test(img_request);
1746		rbd_dev = img_request->rbd_dev;
1747	}
1748
1749	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1750		obj_request, img_request, obj_request->result,
1751		obj_request->xferred, obj_request->length);
1752	if (layered && obj_request->result == -ENOENT &&
1753			obj_request->img_offset < rbd_dev->parent_overlap)
1754		rbd_img_parent_read(obj_request);
1755	else if (img_request)
1756		rbd_img_obj_request_read_callback(obj_request);
1757	else
1758		obj_request_done_set(obj_request);
1759}
1760
1761static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1762{
1763	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1764		obj_request->result, obj_request->length);
1765	/*
1766	 * There is no such thing as a successful short write.  Set
1767	 * it to our originally-requested length.
1768	 */
1769	obj_request->xferred = obj_request->length;
1770	obj_request_done_set(obj_request);
1771}
1772
1773static void rbd_osd_discard_callback(struct rbd_obj_request *obj_request)
1774{
1775	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1776		obj_request->result, obj_request->length);
1777	/*
1778	 * There is no such thing as a successful short discard.  Set
1779	 * it to our originally-requested length.
1780	 */
1781	obj_request->xferred = obj_request->length;
1782	/* discarding a non-existent object is not a problem */
1783	if (obj_request->result == -ENOENT)
1784		obj_request->result = 0;
1785	obj_request_done_set(obj_request);
1786}
1787
1788/*
1789 * For a simple stat call there's nothing to do.  We'll do more if
1790 * this is part of a write sequence for a layered image.
1791 */
1792static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1793{
1794	dout("%s: obj %p\n", __func__, obj_request);
1795	obj_request_done_set(obj_request);
1796}
1797
1798static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1799				struct ceph_msg *msg)
1800{
1801	struct rbd_obj_request *obj_request = osd_req->r_priv;
1802	u16 opcode;
1803
1804	dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1805	rbd_assert(osd_req == obj_request->osd_req);
1806	if (obj_request_img_data_test(obj_request)) {
1807		rbd_assert(obj_request->img_request);
1808		rbd_assert(obj_request->which != BAD_WHICH);
1809	} else {
1810		rbd_assert(obj_request->which == BAD_WHICH);
1811	}
1812
1813	if (osd_req->r_result < 0)
1814		obj_request->result = osd_req->r_result;
1815
1816	rbd_assert(osd_req->r_num_ops <= CEPH_OSD_MAX_OP);
1817
1818	/*
1819	 * We support a 64-bit length, but ultimately it has to be
1820	 * passed to blk_end_request(), which takes an unsigned int.
1821	 */
1822	obj_request->xferred = osd_req->r_reply_op_len[0];
1823	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1824
1825	opcode = osd_req->r_ops[0].op;
1826	switch (opcode) {
1827	case CEPH_OSD_OP_READ:
1828		rbd_osd_read_callback(obj_request);
1829		break;
1830	case CEPH_OSD_OP_SETALLOCHINT:
1831		rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE);
1832		/* fall through */
1833	case CEPH_OSD_OP_WRITE:
1834		rbd_osd_write_callback(obj_request);
1835		break;
1836	case CEPH_OSD_OP_STAT:
1837		rbd_osd_stat_callback(obj_request);
1838		break;
1839	case CEPH_OSD_OP_DELETE:
1840	case CEPH_OSD_OP_TRUNCATE:
1841	case CEPH_OSD_OP_ZERO:
1842		rbd_osd_discard_callback(obj_request);
1843		break;
1844	case CEPH_OSD_OP_CALL:
1845	case CEPH_OSD_OP_NOTIFY_ACK:
1846	case CEPH_OSD_OP_WATCH:
1847		rbd_osd_trivial_callback(obj_request);
1848		break;
1849	default:
1850		rbd_warn(NULL, "%s: unsupported op %hu",
1851			obj_request->object_name, (unsigned short) opcode);
1852		break;
1853	}
1854
1855	if (obj_request_done_test(obj_request))
1856		rbd_obj_request_complete(obj_request);
1857}
1858
1859static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1860{
1861	struct rbd_img_request *img_request = obj_request->img_request;
1862	struct ceph_osd_request *osd_req = obj_request->osd_req;
1863	u64 snap_id;
1864
1865	rbd_assert(osd_req != NULL);
1866
1867	snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1868	ceph_osdc_build_request(osd_req, obj_request->offset,
1869			NULL, snap_id, NULL);
1870}
1871
1872static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1873{
1874	struct rbd_img_request *img_request = obj_request->img_request;
1875	struct ceph_osd_request *osd_req = obj_request->osd_req;
1876	struct ceph_snap_context *snapc;
1877	struct timespec mtime = CURRENT_TIME;
1878
1879	rbd_assert(osd_req != NULL);
1880
1881	snapc = img_request ? img_request->snapc : NULL;
1882	ceph_osdc_build_request(osd_req, obj_request->offset,
1883			snapc, CEPH_NOSNAP, &mtime);
1884}
1885
1886/*
1887 * Create an osd request.  A read request has one osd op (read).
1888 * A write request has either one (watch) or two (hint+write) osd ops.
1889 * (All rbd data writes are prefixed with an allocation hint op, but
1890 * technically osd watch is a write request, hence this distinction.)
1891 */
1892static struct ceph_osd_request *rbd_osd_req_create(
1893					struct rbd_device *rbd_dev,
1894					enum obj_operation_type op_type,
1895					unsigned int num_ops,
1896					struct rbd_obj_request *obj_request)
1897{
1898	struct ceph_snap_context *snapc = NULL;
1899	struct ceph_osd_client *osdc;
1900	struct ceph_osd_request *osd_req;
1901
1902	if (obj_request_img_data_test(obj_request) &&
1903		(op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) {
1904		struct rbd_img_request *img_request = obj_request->img_request;
1905		if (op_type == OBJ_OP_WRITE) {
1906			rbd_assert(img_request_write_test(img_request));
1907		} else {
1908			rbd_assert(img_request_discard_test(img_request));
1909		}
1910		snapc = img_request->snapc;
1911	}
1912
1913	rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
1914
1915	/* Allocate and initialize the request, for the num_ops ops */
1916
1917	osdc = &rbd_dev->rbd_client->client->osdc;
1918	osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false,
1919					  GFP_ATOMIC);
1920	if (!osd_req)
1921		return NULL;	/* ENOMEM */
1922
1923	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
1924		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1925	else
1926		osd_req->r_flags = CEPH_OSD_FLAG_READ;
1927
1928	osd_req->r_callback = rbd_osd_req_callback;
1929	osd_req->r_priv = obj_request;
1930
1931	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1932	ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1933
1934	return osd_req;
1935}
1936
1937/*
1938 * Create a copyup osd request based on the information in the object
1939 * request supplied.  A copyup request has two or three osd ops, a
1940 * copyup method call, potentially a hint op, and a write or truncate
1941 * or zero op.
1942 */
1943static struct ceph_osd_request *
1944rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1945{
1946	struct rbd_img_request *img_request;
1947	struct ceph_snap_context *snapc;
1948	struct rbd_device *rbd_dev;
1949	struct ceph_osd_client *osdc;
1950	struct ceph_osd_request *osd_req;
1951	int num_osd_ops = 3;
1952
1953	rbd_assert(obj_request_img_data_test(obj_request));
1954	img_request = obj_request->img_request;
1955	rbd_assert(img_request);
1956	rbd_assert(img_request_write_test(img_request) ||
1957			img_request_discard_test(img_request));
1958
1959	if (img_request_discard_test(img_request))
1960		num_osd_ops = 2;
1961
1962	/* Allocate and initialize the request, for all the ops */
1963
1964	snapc = img_request->snapc;
1965	rbd_dev = img_request->rbd_dev;
1966	osdc = &rbd_dev->rbd_client->client->osdc;
1967	osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops,
1968						false, GFP_ATOMIC);
1969	if (!osd_req)
1970		return NULL;	/* ENOMEM */
1971
1972	osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1973	osd_req->r_callback = rbd_osd_req_callback;
1974	osd_req->r_priv = obj_request;
1975
1976	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
1977	ceph_oid_set_name(&osd_req->r_base_oid, obj_request->object_name);
1978
1979	return osd_req;
1980}
1981
1982
1983static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1984{
1985	ceph_osdc_put_request(osd_req);
1986}
1987
1988/* object_name is assumed to be a non-null pointer and NUL-terminated */
1989
1990static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1991						u64 offset, u64 length,
1992						enum obj_request_type type)
1993{
1994	struct rbd_obj_request *obj_request;
1995	size_t size;
1996	char *name;
1997
1998	rbd_assert(obj_request_type_valid(type));
1999
2000	size = strlen(object_name) + 1;
2001	name = kmalloc(size, GFP_KERNEL);
2002	if (!name)
2003		return NULL;
2004
2005	obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
2006	if (!obj_request) {
2007		kfree(name);
2008		return NULL;
2009	}
2010
2011	obj_request->object_name = memcpy(name, object_name, size);
2012	obj_request->offset = offset;
2013	obj_request->length = length;
2014	obj_request->flags = 0;
2015	obj_request->which = BAD_WHICH;
2016	obj_request->type = type;
2017	INIT_LIST_HEAD(&obj_request->links);
2018	init_completion(&obj_request->completion);
2019	kref_init(&obj_request->kref);
2020
2021	dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
2022		offset, length, (int)type, obj_request);
2023
2024	return obj_request;
2025}
2026
2027static void rbd_obj_request_destroy(struct kref *kref)
2028{
2029	struct rbd_obj_request *obj_request;
2030
2031	obj_request = container_of(kref, struct rbd_obj_request, kref);
2032
2033	dout("%s: obj %p\n", __func__, obj_request);
2034
2035	rbd_assert(obj_request->img_request == NULL);
2036	rbd_assert(obj_request->which == BAD_WHICH);
2037
2038	if (obj_request->osd_req)
2039		rbd_osd_req_destroy(obj_request->osd_req);
2040
2041	rbd_assert(obj_request_type_valid(obj_request->type));
2042	switch (obj_request->type) {
2043	case OBJ_REQUEST_NODATA:
2044		break;		/* Nothing to do */
2045	case OBJ_REQUEST_BIO:
2046		if (obj_request->bio_list)
2047			bio_chain_put(obj_request->bio_list);
2048		break;
2049	case OBJ_REQUEST_PAGES:
2050		if (obj_request->pages)
2051			ceph_release_page_vector(obj_request->pages,
2052						obj_request->page_count);
2053		break;
2054	}
2055
2056	kfree(obj_request->object_name);
2057	obj_request->object_name = NULL;
2058	kmem_cache_free(rbd_obj_request_cache, obj_request);
2059}
2060
2061/* It's OK to call this for a device with no parent */
2062
2063static void rbd_spec_put(struct rbd_spec *spec);
2064static void rbd_dev_unparent(struct rbd_device *rbd_dev)
2065{
2066	rbd_dev_remove_parent(rbd_dev);
2067	rbd_spec_put(rbd_dev->parent_spec);
2068	rbd_dev->parent_spec = NULL;
2069	rbd_dev->parent_overlap = 0;
2070}
2071
2072/*
2073 * Parent image reference counting is used to determine when an
2074 * image's parent fields can be safely torn down--after there are no
2075 * more in-flight requests to the parent image.  When the last
2076 * reference is dropped, cleaning them up is safe.
2077 */
2078static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
2079{
2080	int counter;
2081
2082	if (!rbd_dev->parent_spec)
2083		return;
2084
2085	counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
2086	if (counter > 0)
2087		return;
2088
2089	/* Last reference; clean up parent data structures */
2090
2091	if (!counter)
2092		rbd_dev_unparent(rbd_dev);
2093	else
2094		rbd_warn(rbd_dev, "parent reference underflow");
2095}
2096
2097/*
2098 * If an image has a non-zero parent overlap, get a reference to its
2099 * parent.
2100 *
2101 * We must get the reference before checking for the overlap to
2102 * coordinate properly with zeroing the parent overlap in
2103 * rbd_dev_v2_parent_info() when an image gets flattened.  We
2104 * drop it again if there is no overlap.
2105 *
2106 * Returns true if the rbd device has a parent with a non-zero
2107 * overlap and a reference for it was successfully taken, or
2108 * false otherwise.
2109 */
2110static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
2111{
2112	int counter;
2113
2114	if (!rbd_dev->parent_spec)
2115		return false;
2116
2117	counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
2118	if (counter > 0 && rbd_dev->parent_overlap)
2119		return true;
2120
2121	/* Image was flattened, but parent is not yet torn down */
2122
2123	if (counter < 0)
2124		rbd_warn(rbd_dev, "parent reference overflow");
2125
2126	return false;
2127}
2128
2129/*
2130 * Caller is responsible for filling in the list of object requests
2131 * that comprises the image request, and the Linux request pointer
2132 * (if there is one).
2133 */
2134static struct rbd_img_request *rbd_img_request_create(
2135					struct rbd_device *rbd_dev,
2136					u64 offset, u64 length,
2137					enum obj_operation_type op_type,
2138					struct ceph_snap_context *snapc)
2139{
2140	struct rbd_img_request *img_request;
2141
2142	img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_NOIO);
2143	if (!img_request)
2144		return NULL;
2145
2146	img_request->rq = NULL;
2147	img_request->rbd_dev = rbd_dev;
2148	img_request->offset = offset;
2149	img_request->length = length;
2150	img_request->flags = 0;
2151	if (op_type == OBJ_OP_DISCARD) {
2152		img_request_discard_set(img_request);
2153		img_request->snapc = snapc;
2154	} else if (op_type == OBJ_OP_WRITE) {
2155		img_request_write_set(img_request);
2156		img_request->snapc = snapc;
2157	} else {
2158		img_request->snap_id = rbd_dev->spec->snap_id;
2159	}
2160	if (rbd_dev_parent_get(rbd_dev))
2161		img_request_layered_set(img_request);
2162	spin_lock_init(&img_request->completion_lock);
2163	img_request->next_completion = 0;
2164	img_request->callback = NULL;
2165	img_request->result = 0;
2166	img_request->obj_request_count = 0;
2167	INIT_LIST_HEAD(&img_request->obj_requests);
2168	kref_init(&img_request->kref);
2169
2170	dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2171		obj_op_name(op_type), offset, length, img_request);
2172
2173	return img_request;
2174}
2175
2176static void rbd_img_request_destroy(struct kref *kref)
2177{
2178	struct rbd_img_request *img_request;
2179	struct rbd_obj_request *obj_request;
2180	struct rbd_obj_request *next_obj_request;
2181
2182	img_request = container_of(kref, struct rbd_img_request, kref);
2183
2184	dout("%s: img %p\n", __func__, img_request);
2185
2186	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2187		rbd_img_obj_request_del(img_request, obj_request);
2188	rbd_assert(img_request->obj_request_count == 0);
2189
2190	if (img_request_layered_test(img_request)) {
2191		img_request_layered_clear(img_request);
2192		rbd_dev_parent_put(img_request->rbd_dev);
2193	}
2194
2195	if (img_request_write_test(img_request) ||
2196		img_request_discard_test(img_request))
2197		ceph_put_snap_context(img_request->snapc);
2198
2199	kmem_cache_free(rbd_img_request_cache, img_request);
2200}
2201
2202static struct rbd_img_request *rbd_parent_request_create(
2203					struct rbd_obj_request *obj_request,
2204					u64 img_offset, u64 length)
2205{
2206	struct rbd_img_request *parent_request;
2207	struct rbd_device *rbd_dev;
2208
2209	rbd_assert(obj_request->img_request);
2210	rbd_dev = obj_request->img_request->rbd_dev;
2211
2212	parent_request = rbd_img_request_create(rbd_dev->parent, img_offset,
2213						length, OBJ_OP_READ, NULL);
2214	if (!parent_request)
2215		return NULL;
2216
2217	img_request_child_set(parent_request);
2218	rbd_obj_request_get(obj_request);
2219	parent_request->obj_request = obj_request;
2220
2221	return parent_request;
2222}
2223
2224static void rbd_parent_request_destroy(struct kref *kref)
2225{
2226	struct rbd_img_request *parent_request;
2227	struct rbd_obj_request *orig_request;
2228
2229	parent_request = container_of(kref, struct rbd_img_request, kref);
2230	orig_request = parent_request->obj_request;
2231
2232	parent_request->obj_request = NULL;
2233	rbd_obj_request_put(orig_request);
2234	img_request_child_clear(parent_request);
2235
2236	rbd_img_request_destroy(kref);
2237}
2238
2239static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2240{
2241	struct rbd_img_request *img_request;
2242	unsigned int xferred;
2243	int result;
2244	bool more;
2245
2246	rbd_assert(obj_request_img_data_test(obj_request));
2247	img_request = obj_request->img_request;
2248
2249	rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2250	xferred = (unsigned int)obj_request->xferred;
2251	result = obj_request->result;
2252	if (result) {
2253		struct rbd_device *rbd_dev = img_request->rbd_dev;
2254		enum obj_operation_type op_type;
2255
2256		if (img_request_discard_test(img_request))
2257			op_type = OBJ_OP_DISCARD;
2258		else if (img_request_write_test(img_request))
2259			op_type = OBJ_OP_WRITE;
2260		else
2261			op_type = OBJ_OP_READ;
2262
2263		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)",
2264			obj_op_name(op_type), obj_request->length,
2265			obj_request->img_offset, obj_request->offset);
2266		rbd_warn(rbd_dev, "  result %d xferred %x",
2267			result, xferred);
2268		if (!img_request->result)
2269			img_request->result = result;
2270	}
2271
2272	/* Image object requests don't own their page array */
2273
2274	if (obj_request->type == OBJ_REQUEST_PAGES) {
2275		obj_request->pages = NULL;
2276		obj_request->page_count = 0;
2277	}
2278
2279	if (img_request_child_test(img_request)) {
2280		rbd_assert(img_request->obj_request != NULL);
2281		more = obj_request->which < img_request->obj_request_count - 1;
2282	} else {
2283		rbd_assert(img_request->rq != NULL);
2284		more = blk_end_request(img_request->rq, result, xferred);
2285	}
2286
2287	return more;
2288}
2289
2290static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2291{
2292	struct rbd_img_request *img_request;
2293	u32 which = obj_request->which;
2294	bool more = true;
2295
2296	rbd_assert(obj_request_img_data_test(obj_request));
2297	img_request = obj_request->img_request;
2298
2299	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2300	rbd_assert(img_request != NULL);
2301	rbd_assert(img_request->obj_request_count > 0);
2302	rbd_assert(which != BAD_WHICH);
2303	rbd_assert(which < img_request->obj_request_count);
2304
2305	spin_lock_irq(&img_request->completion_lock);
2306	if (which != img_request->next_completion)
2307		goto out;
2308
2309	for_each_obj_request_from(img_request, obj_request) {
2310		rbd_assert(more);
2311		rbd_assert(which < img_request->obj_request_count);
2312
2313		if (!obj_request_done_test(obj_request))
2314			break;
2315		more = rbd_img_obj_end_request(obj_request);
2316		which++;
2317	}
2318
2319	rbd_assert(more ^ (which == img_request->obj_request_count));
2320	img_request->next_completion = which;
2321out:
2322	spin_unlock_irq(&img_request->completion_lock);
2323	rbd_img_request_put(img_request);
2324
2325	if (!more)
2326		rbd_img_request_complete(img_request);
2327}
2328
2329/*
2330 * Add individual osd ops to the given ceph_osd_request and prepare
2331 * them for submission. num_ops is the current number of
2332 * osd operations already to the object request.
2333 */
2334static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
2335				struct ceph_osd_request *osd_request,
2336				enum obj_operation_type op_type,
2337				unsigned int num_ops)
2338{
2339	struct rbd_img_request *img_request = obj_request->img_request;
2340	struct rbd_device *rbd_dev = img_request->rbd_dev;
2341	u64 object_size = rbd_obj_bytes(&rbd_dev->header);
2342	u64 offset = obj_request->offset;
2343	u64 length = obj_request->length;
2344	u64 img_end;
2345	u16 opcode;
2346
2347	if (op_type == OBJ_OP_DISCARD) {
2348		if (!offset && length == object_size &&
2349		    (!img_request_layered_test(img_request) ||
2350		     !obj_request_overlaps_parent(obj_request))) {
2351			opcode = CEPH_OSD_OP_DELETE;
2352		} else if ((offset + length == object_size)) {
2353			opcode = CEPH_OSD_OP_TRUNCATE;
2354		} else {
2355			down_read(&rbd_dev->header_rwsem);
2356			img_end = rbd_dev->header.image_size;
2357			up_read(&rbd_dev->header_rwsem);
2358
2359			if (obj_request->img_offset + length == img_end)
2360				opcode = CEPH_OSD_OP_TRUNCATE;
2361			else
2362				opcode = CEPH_OSD_OP_ZERO;
2363		}
2364	} else if (op_type == OBJ_OP_WRITE) {
2365		opcode = CEPH_OSD_OP_WRITE;
2366		osd_req_op_alloc_hint_init(osd_request, num_ops,
2367					object_size, object_size);
2368		num_ops++;
2369	} else {
2370		opcode = CEPH_OSD_OP_READ;
2371	}
2372
2373	osd_req_op_extent_init(osd_request, num_ops, opcode, offset, length,
2374				0, 0);
2375	if (obj_request->type == OBJ_REQUEST_BIO)
2376		osd_req_op_extent_osd_data_bio(osd_request, num_ops,
2377					obj_request->bio_list, length);
2378	else if (obj_request->type == OBJ_REQUEST_PAGES)
2379		osd_req_op_extent_osd_data_pages(osd_request, num_ops,
2380					obj_request->pages, length,
2381					offset & ~PAGE_MASK, false, false);
2382
2383	/* Discards are also writes */
2384	if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD)
2385		rbd_osd_req_format_write(obj_request);
2386	else
2387		rbd_osd_req_format_read(obj_request);
2388}
2389
2390/*
2391 * Split up an image request into one or more object requests, each
2392 * to a different object.  The "type" parameter indicates whether
2393 * "data_desc" is the pointer to the head of a list of bio
2394 * structures, or the base of a page array.  In either case this
2395 * function assumes data_desc describes memory sufficient to hold
2396 * all data described by the image request.
2397 */
2398static int rbd_img_request_fill(struct rbd_img_request *img_request,
2399					enum obj_request_type type,
2400					void *data_desc)
2401{
2402	struct rbd_device *rbd_dev = img_request->rbd_dev;
2403	struct rbd_obj_request *obj_request = NULL;
2404	struct rbd_obj_request *next_obj_request;
2405	struct bio *bio_list = NULL;
2406	unsigned int bio_offset = 0;
2407	struct page **pages = NULL;
2408	enum obj_operation_type op_type;
2409	u64 img_offset;
2410	u64 resid;
2411
2412	dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2413		(int)type, data_desc);
2414
2415	img_offset = img_request->offset;
2416	resid = img_request->length;
2417	rbd_assert(resid > 0);
2418	op_type = rbd_img_request_op_type(img_request);
2419
2420	if (type == OBJ_REQUEST_BIO) {
2421		bio_list = data_desc;
2422		rbd_assert(img_offset ==
2423			   bio_list->bi_iter.bi_sector << SECTOR_SHIFT);
2424	} else if (type == OBJ_REQUEST_PAGES) {
2425		pages = data_desc;
2426	}
2427
2428	while (resid) {
2429		struct ceph_osd_request *osd_req;
2430		const char *object_name;
2431		u64 offset;
2432		u64 length;
2433
2434		object_name = rbd_segment_name(rbd_dev, img_offset);
2435		if (!object_name)
2436			goto out_unwind;
2437		offset = rbd_segment_offset(rbd_dev, img_offset);
2438		length = rbd_segment_length(rbd_dev, img_offset, resid);
2439		obj_request = rbd_obj_request_create(object_name,
2440						offset, length, type);
2441		/* object request has its own copy of the object name */
2442		rbd_segment_name_free(object_name);
2443		if (!obj_request)
2444			goto out_unwind;
2445
2446		/*
2447		 * set obj_request->img_request before creating the
2448		 * osd_request so that it gets the right snapc
2449		 */
2450		rbd_img_obj_request_add(img_request, obj_request);
2451
2452		if (type == OBJ_REQUEST_BIO) {
2453			unsigned int clone_size;
2454
2455			rbd_assert(length <= (u64)UINT_MAX);
2456			clone_size = (unsigned int)length;
2457			obj_request->bio_list =
2458					bio_chain_clone_range(&bio_list,
2459								&bio_offset,
2460								clone_size,
2461								GFP_ATOMIC);
2462			if (!obj_request->bio_list)
2463				goto out_unwind;
2464		} else if (type == OBJ_REQUEST_PAGES) {
2465			unsigned int page_count;
2466
2467			obj_request->pages = pages;
2468			page_count = (u32)calc_pages_for(offset, length);
2469			obj_request->page_count = page_count;
2470			if ((offset + length) & ~PAGE_MASK)
2471				page_count--;	/* more on last page */
2472			pages += page_count;
2473		}
2474
2475		osd_req = rbd_osd_req_create(rbd_dev, op_type,
2476					(op_type == OBJ_OP_WRITE) ? 2 : 1,
2477					obj_request);
2478		if (!osd_req)
2479			goto out_unwind;
2480
2481		obj_request->osd_req = osd_req;
2482		obj_request->callback = rbd_img_obj_callback;
2483		obj_request->img_offset = img_offset;
2484
2485		rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
2486
2487		rbd_img_request_get(img_request);
2488
2489		img_offset += length;
2490		resid -= length;
2491	}
2492
2493	return 0;
2494
2495out_unwind:
2496	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2497		rbd_img_obj_request_del(img_request, obj_request);
2498
2499	return -ENOMEM;
2500}
2501
2502static void
2503rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2504{
2505	struct rbd_img_request *img_request;
2506	struct rbd_device *rbd_dev;
2507	struct page **pages;
2508	u32 page_count;
2509
2510	rbd_assert(obj_request->type == OBJ_REQUEST_BIO ||
2511		obj_request->type == OBJ_REQUEST_NODATA);
2512	rbd_assert(obj_request_img_data_test(obj_request));
2513	img_request = obj_request->img_request;
2514	rbd_assert(img_request);
2515
2516	rbd_dev = img_request->rbd_dev;
2517	rbd_assert(rbd_dev);
2518
2519	pages = obj_request->copyup_pages;
2520	rbd_assert(pages != NULL);
2521	obj_request->copyup_pages = NULL;
2522	page_count = obj_request->copyup_page_count;
2523	rbd_assert(page_count);
2524	obj_request->copyup_page_count = 0;
2525	ceph_release_page_vector(pages, page_count);
2526
2527	/*
2528	 * We want the transfer count to reflect the size of the
2529	 * original write request.  There is no such thing as a
2530	 * successful short write, so if the request was successful
2531	 * we can just set it to the originally-requested length.
2532	 */
2533	if (!obj_request->result)
2534		obj_request->xferred = obj_request->length;
2535
2536	/* Finish up with the normal image object callback */
2537
2538	rbd_img_obj_callback(obj_request);
2539}
2540
2541static void
2542rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2543{
2544	struct rbd_obj_request *orig_request;
2545	struct ceph_osd_request *osd_req;
2546	struct ceph_osd_client *osdc;
2547	struct rbd_device *rbd_dev;
2548	struct page **pages;
2549	enum obj_operation_type op_type;
2550	u32 page_count;
2551	int img_result;
2552	u64 parent_length;
2553
2554	rbd_assert(img_request_child_test(img_request));
2555
2556	/* First get what we need from the image request */
2557
2558	pages = img_request->copyup_pages;
2559	rbd_assert(pages != NULL);
2560	img_request->copyup_pages = NULL;
2561	page_count = img_request->copyup_page_count;
2562	rbd_assert(page_count);
2563	img_request->copyup_page_count = 0;
2564
2565	orig_request = img_request->obj_request;
2566	rbd_assert(orig_request != NULL);
2567	rbd_assert(obj_request_type_valid(orig_request->type));
2568	img_result = img_request->result;
2569	parent_length = img_request->length;
2570	rbd_assert(parent_length == img_request->xferred);
2571	rbd_img_request_put(img_request);
2572
2573	rbd_assert(orig_request->img_request);
2574	rbd_dev = orig_request->img_request->rbd_dev;
2575	rbd_assert(rbd_dev);
2576
2577	/*
2578	 * If the overlap has become 0 (most likely because the
2579	 * image has been flattened) we need to free the pages
2580	 * and re-submit the original write request.
2581	 */
2582	if (!rbd_dev->parent_overlap) {
2583		struct ceph_osd_client *osdc;
2584
2585		ceph_release_page_vector(pages, page_count);
2586		osdc = &rbd_dev->rbd_client->client->osdc;
2587		img_result = rbd_obj_request_submit(osdc, orig_request);
2588		if (!img_result)
2589			return;
2590	}
2591
2592	if (img_result)
2593		goto out_err;
2594
2595	/*
2596	 * The original osd request is of no use to use any more.
2597	 * We need a new one that can hold the three ops in a copyup
2598	 * request.  Allocate the new copyup osd request for the
2599	 * original request, and release the old one.
2600	 */
2601	img_result = -ENOMEM;
2602	osd_req = rbd_osd_req_create_copyup(orig_request);
2603	if (!osd_req)
2604		goto out_err;
2605	rbd_osd_req_destroy(orig_request->osd_req);
2606	orig_request->osd_req = osd_req;
2607	orig_request->copyup_pages = pages;
2608	orig_request->copyup_page_count = page_count;
2609
2610	/* Initialize the copyup op */
2611
2612	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2613	osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
2614						false, false);
2615
2616	/* Add the other op(s) */
2617
2618	op_type = rbd_img_request_op_type(orig_request->img_request);
2619	rbd_img_obj_request_fill(orig_request, osd_req, op_type, 1);
2620
2621	/* All set, send it off. */
2622
2623	orig_request->callback = rbd_img_obj_copyup_callback;
2624	osdc = &rbd_dev->rbd_client->client->osdc;
2625	img_result = rbd_obj_request_submit(osdc, orig_request);
2626	if (!img_result)
2627		return;
2628out_err:
2629	/* Record the error code and complete the request */
2630
2631	orig_request->result = img_result;
2632	orig_request->xferred = 0;
2633	obj_request_done_set(orig_request);
2634	rbd_obj_request_complete(orig_request);
2635}
2636
2637/*
2638 * Read from the parent image the range of data that covers the
2639 * entire target of the given object request.  This is used for
2640 * satisfying a layered image write request when the target of an
2641 * object request from the image request does not exist.
2642 *
2643 * A page array big enough to hold the returned data is allocated
2644 * and supplied to rbd_img_request_fill() as the "data descriptor."
2645 * When the read completes, this page array will be transferred to
2646 * the original object request for the copyup operation.
2647 *
2648 * If an error occurs, record it as the result of the original
2649 * object request and mark it done so it gets completed.
2650 */
2651static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2652{
2653	struct rbd_img_request *img_request = NULL;
2654	struct rbd_img_request *parent_request = NULL;
2655	struct rbd_device *rbd_dev;
2656	u64 img_offset;
2657	u64 length;
2658	struct page **pages = NULL;
2659	u32 page_count;
2660	int result;
2661
2662	rbd_assert(obj_request_img_data_test(obj_request));
2663	rbd_assert(obj_request_type_valid(obj_request->type));
2664
2665	img_request = obj_request->img_request;
2666	rbd_assert(img_request != NULL);
2667	rbd_dev = img_request->rbd_dev;
2668	rbd_assert(rbd_dev->parent != NULL);
2669
2670	/*
2671	 * Determine the byte range covered by the object in the
2672	 * child image to which the original request was to be sent.
2673	 */
2674	img_offset = obj_request->img_offset - obj_request->offset;
2675	length = (u64)1 << rbd_dev->header.obj_order;
2676
2677	/*
2678	 * There is no defined parent data beyond the parent
2679	 * overlap, so limit what we read at that boundary if
2680	 * necessary.
2681	 */
2682	if (img_offset + length > rbd_dev->parent_overlap) {
2683		rbd_assert(img_offset < rbd_dev->parent_overlap);
2684		length = rbd_dev->parent_overlap - img_offset;
2685	}
2686
2687	/*
2688	 * Allocate a page array big enough to receive the data read
2689	 * from the parent.
2690	 */
2691	page_count = (u32)calc_pages_for(0, length);
2692	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2693	if (IS_ERR(pages)) {
2694		result = PTR_ERR(pages);
2695		pages = NULL;
2696		goto out_err;
2697	}
2698
2699	result = -ENOMEM;
2700	parent_request = rbd_parent_request_create(obj_request,
2701						img_offset, length);
2702	if (!parent_request)
2703		goto out_err;
2704
2705	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2706	if (result)
2707		goto out_err;
2708	parent_request->copyup_pages = pages;
2709	parent_request->copyup_page_count = page_count;
2710
2711	parent_request->callback = rbd_img_obj_parent_read_full_callback;
2712	result = rbd_img_request_submit(parent_request);
2713	if (!result)
2714		return 0;
2715
2716	parent_request->copyup_pages = NULL;
2717	parent_request->copyup_page_count = 0;
2718	parent_request->obj_request = NULL;
2719	rbd_obj_request_put(obj_request);
2720out_err:
2721	if (pages)
2722		ceph_release_page_vector(pages, page_count);
2723	if (parent_request)
2724		rbd_img_request_put(parent_request);
2725	obj_request->result = result;
2726	obj_request->xferred = 0;
2727	obj_request_done_set(obj_request);
2728
2729	return result;
2730}
2731
2732static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2733{
2734	struct rbd_obj_request *orig_request;
2735	struct rbd_device *rbd_dev;
2736	int result;
2737
2738	rbd_assert(!obj_request_img_data_test(obj_request));
2739
2740	/*
2741	 * All we need from the object request is the original
2742	 * request and the result of the STAT op.  Grab those, then
2743	 * we're done with the request.
2744	 */
2745	orig_request = obj_request->obj_request;
2746	obj_request->obj_request = NULL;
2747	rbd_obj_request_put(orig_request);
2748	rbd_assert(orig_request);
2749	rbd_assert(orig_request->img_request);
2750
2751	result = obj_request->result;
2752	obj_request->result = 0;
2753
2754	dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2755		obj_request, orig_request, result,
2756		obj_request->xferred, obj_request->length);
2757	rbd_obj_request_put(obj_request);
2758
2759	/*
2760	 * If the overlap has become 0 (most likely because the
2761	 * image has been flattened) we need to free the pages
2762	 * and re-submit the original write request.
2763	 */
2764	rbd_dev = orig_request->img_request->rbd_dev;
2765	if (!rbd_dev->parent_overlap) {
2766		struct ceph_osd_client *osdc;
2767
2768		osdc = &rbd_dev->rbd_client->client->osdc;
2769		result = rbd_obj_request_submit(osdc, orig_request);
2770		if (!result)
2771			return;
2772	}
2773
2774	/*
2775	 * Our only purpose here is to determine whether the object
2776	 * exists, and we don't want to treat the non-existence as
2777	 * an error.  If something else comes back, transfer the
2778	 * error to the original request and complete it now.
2779	 */
2780	if (!result) {
2781		obj_request_existence_set(orig_request, true);
2782	} else if (result == -ENOENT) {
2783		obj_request_existence_set(orig_request, false);
2784	} else if (result) {
2785		orig_request->result = result;
2786		goto out;
2787	}
2788
2789	/*
2790	 * Resubmit the original request now that we have recorded
2791	 * whether the target object exists.
2792	 */
2793	orig_request->result = rbd_img_obj_request_submit(orig_request);
2794out:
2795	if (orig_request->result)
2796		rbd_obj_request_complete(orig_request);
2797}
2798
2799static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2800{
2801	struct rbd_obj_request *stat_request;
2802	struct rbd_device *rbd_dev;
2803	struct ceph_osd_client *osdc;
2804	struct page **pages = NULL;
2805	u32 page_count;
2806	size_t size;
2807	int ret;
2808
2809	/*
2810	 * The response data for a STAT call consists of:
2811	 *     le64 length;
2812	 *     struct {
2813	 *         le32 tv_sec;
2814	 *         le32 tv_nsec;
2815	 *     } mtime;
2816	 */
2817	size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2818	page_count = (u32)calc_pages_for(0, size);
2819	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2820	if (IS_ERR(pages))
2821		return PTR_ERR(pages);
2822
2823	ret = -ENOMEM;
2824	stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2825							OBJ_REQUEST_PAGES);
2826	if (!stat_request)
2827		goto out;
2828
2829	rbd_obj_request_get(obj_request);
2830	stat_request->obj_request = obj_request;
2831	stat_request->pages = pages;
2832	stat_request->page_count = page_count;
2833
2834	rbd_assert(obj_request->img_request);
2835	rbd_dev = obj_request->img_request->rbd_dev;
2836	stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
2837						   stat_request);
2838	if (!stat_request->osd_req)
2839		goto out;
2840	stat_request->callback = rbd_img_obj_exists_callback;
2841
2842	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2843	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2844					false, false);
2845	rbd_osd_req_format_read(stat_request);
2846
2847	osdc = &rbd_dev->rbd_client->client->osdc;
2848	ret = rbd_obj_request_submit(osdc, stat_request);
2849out:
2850	if (ret)
2851		rbd_obj_request_put(obj_request);
2852
2853	return ret;
2854}
2855
2856static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
2857{
2858	struct rbd_img_request *img_request;
2859	struct rbd_device *rbd_dev;
2860
2861	rbd_assert(obj_request_img_data_test(obj_request));
2862
2863	img_request = obj_request->img_request;
2864	rbd_assert(img_request);
2865	rbd_dev = img_request->rbd_dev;
2866
2867	/* Reads */
2868	if (!img_request_write_test(img_request) &&
2869	    !img_request_discard_test(img_request))
2870		return true;
2871
2872	/* Non-layered writes */
2873	if (!img_request_layered_test(img_request))
2874		return true;
2875
2876	/*
2877	 * Layered writes outside of the parent overlap range don't
2878	 * share any data with the parent.
2879	 */
2880	if (!obj_request_overlaps_parent(obj_request))
2881		return true;
2882
2883	/*
2884	 * Entire-object layered writes - we will overwrite whatever
2885	 * parent data there is anyway.
2886	 */
2887	if (!obj_request->offset &&
2888	    obj_request->length == rbd_obj_bytes(&rbd_dev->header))
2889		return true;
2890
2891	/*
2892	 * If the object is known to already exist, its parent data has
2893	 * already been copied.
2894	 */
2895	if (obj_request_known_test(obj_request) &&
2896	    obj_request_exists_test(obj_request))
2897		return true;
2898
2899	return false;
2900}
2901
2902static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2903{
2904	if (img_obj_request_simple(obj_request)) {
2905		struct rbd_device *rbd_dev;
2906		struct ceph_osd_client *osdc;
2907
2908		rbd_dev = obj_request->img_request->rbd_dev;
2909		osdc = &rbd_dev->rbd_client->client->osdc;
2910
2911		return rbd_obj_request_submit(osdc, obj_request);
2912	}
2913
2914	/*
2915	 * It's a layered write.  The target object might exist but
2916	 * we may not know that yet.  If we know it doesn't exist,
2917	 * start by reading the data for the full target object from
2918	 * the parent so we can use it for a copyup to the target.
2919	 */
2920	if (obj_request_known_test(obj_request))
2921		return rbd_img_obj_parent_read_full(obj_request);
2922
2923	/* We don't know whether the target exists.  Go find out. */
2924
2925	return rbd_img_obj_exists_submit(obj_request);
2926}
2927
2928static int rbd_img_request_submit(struct rbd_img_request *img_request)
2929{
2930	struct rbd_obj_request *obj_request;
2931	struct rbd_obj_request *next_obj_request;
2932
2933	dout("%s: img %p\n", __func__, img_request);
2934	for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2935		int ret;
2936
2937		ret = rbd_img_obj_request_submit(obj_request);
2938		if (ret)
2939			return ret;
2940	}
2941
2942	return 0;
2943}
2944
2945static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2946{
2947	struct rbd_obj_request *obj_request;
2948	struct rbd_device *rbd_dev;
2949	u64 obj_end;
2950	u64 img_xferred;
2951	int img_result;
2952
2953	rbd_assert(img_request_child_test(img_request));
2954
2955	/* First get what we need from the image request and release it */
2956
2957	obj_request = img_request->obj_request;
2958	img_xferred = img_request->xferred;
2959	img_result = img_request->result;
2960	rbd_img_request_put(img_request);
2961
2962	/*
2963	 * If the overlap has become 0 (most likely because the
2964	 * image has been flattened) we need to re-submit the
2965	 * original request.
2966	 */
2967	rbd_assert(obj_request);
2968	rbd_assert(obj_request->img_request);
2969	rbd_dev = obj_request->img_request->rbd_dev;
2970	if (!rbd_dev->parent_overlap) {
2971		struct ceph_osd_client *osdc;
2972
2973		osdc = &rbd_dev->rbd_client->client->osdc;
2974		img_result = rbd_obj_request_submit(osdc, obj_request);
2975		if (!img_result)
2976			return;
2977	}
2978
2979	obj_request->result = img_result;
2980	if (obj_request->result)
2981		goto out;
2982
2983	/*
2984	 * We need to zero anything beyond the parent overlap
2985	 * boundary.  Since rbd_img_obj_request_read_callback()
2986	 * will zero anything beyond the end of a short read, an
2987	 * easy way to do this is to pretend the data from the
2988	 * parent came up short--ending at the overlap boundary.
2989	 */
2990	rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2991	obj_end = obj_request->img_offset + obj_request->length;
2992	if (obj_end > rbd_dev->parent_overlap) {
2993		u64 xferred = 0;
2994
2995		if (obj_request->img_offset < rbd_dev->parent_overlap)
2996			xferred = rbd_dev->parent_overlap -
2997					obj_request->img_offset;
2998
2999		obj_request->xferred = min(img_xferred, xferred);
3000	} else {
3001		obj_request->xferred = img_xferred;
3002	}
3003out:
3004	rbd_img_obj_request_read_callback(obj_request);
3005	rbd_obj_request_complete(obj_request);
3006}
3007
3008static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
3009{
3010	struct rbd_img_request *img_request;
3011	int result;
3012
3013	rbd_assert(obj_request_img_data_test(obj_request));
3014	rbd_assert(obj_request->img_request != NULL);
3015	rbd_assert(obj_request->result == (s32) -ENOENT);
3016	rbd_assert(obj_request_type_valid(obj_request->type));
3017
3018	/* rbd_read_finish(obj_request, obj_request->length); */
3019	img_request = rbd_parent_request_create(obj_request,
3020						obj_request->img_offset,
3021						obj_request->length);
3022	result = -ENOMEM;
3023	if (!img_request)
3024		goto out_err;
3025
3026	if (obj_request->type == OBJ_REQUEST_BIO)
3027		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3028						obj_request->bio_list);
3029	else
3030		result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
3031						obj_request->pages);
3032	if (result)
3033		goto out_err;
3034
3035	img_request->callback = rbd_img_parent_read_callback;
3036	result = rbd_img_request_submit(img_request);
3037	if (result)
3038		goto out_err;
3039
3040	return;
3041out_err:
3042	if (img_request)
3043		rbd_img_request_put(img_request);
3044	obj_request->result = result;
3045	obj_request->xferred = 0;
3046	obj_request_done_set(obj_request);
3047}
3048
3049static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
3050{
3051	struct rbd_obj_request *obj_request;
3052	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3053	int ret;
3054
3055	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3056							OBJ_REQUEST_NODATA);
3057	if (!obj_request)
3058		return -ENOMEM;
3059
3060	ret = -ENOMEM;
3061	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3062						  obj_request);
3063	if (!obj_request->osd_req)
3064		goto out;
3065
3066	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
3067					notify_id, 0, 0);
3068	rbd_osd_req_format_read(obj_request);
3069
3070	ret = rbd_obj_request_submit(osdc, obj_request);
3071	if (ret)
3072		goto out;
3073	ret = rbd_obj_request_wait(obj_request);
3074out:
3075	rbd_obj_request_put(obj_request);
3076
3077	return ret;
3078}
3079
3080static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
3081{
3082	struct rbd_device *rbd_dev = (struct rbd_device *)data;
3083	int ret;
3084
3085	if (!rbd_dev)
3086		return;
3087
3088	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
3089		rbd_dev->header_name, (unsigned long long)notify_id,
3090		(unsigned int)opcode);
3091
3092	/*
3093	 * Until adequate refresh error handling is in place, there is
3094	 * not much we can do here, except warn.
3095	 *
3096	 * See http://tracker.ceph.com/issues/5040
3097	 */
3098	ret = rbd_dev_refresh(rbd_dev);
3099	if (ret)
3100		rbd_warn(rbd_dev, "refresh failed: %d", ret);
3101
3102	ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
3103	if (ret)
3104		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
3105}
3106
3107/*
3108 * Send a (un)watch request and wait for the ack.  Return a request
3109 * with a ref held on success or error.
3110 */
3111static struct rbd_obj_request *rbd_obj_watch_request_helper(
3112						struct rbd_device *rbd_dev,
3113						bool watch)
3114{
3115	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3116	struct rbd_obj_request *obj_request;
3117	int ret;
3118
3119	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
3120					     OBJ_REQUEST_NODATA);
3121	if (!obj_request)
3122		return ERR_PTR(-ENOMEM);
3123
3124	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
3125						  obj_request);
3126	if (!obj_request->osd_req) {
3127		ret = -ENOMEM;
3128		goto out;
3129	}
3130
3131	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
3132			      rbd_dev->watch_event->cookie, 0, watch);
3133	rbd_osd_req_format_write(obj_request);
3134
3135	if (watch)
3136		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
3137
3138	ret = rbd_obj_request_submit(osdc, obj_request);
3139	if (ret)
3140		goto out;
3141
3142	ret = rbd_obj_request_wait(obj_request);
3143	if (ret)
3144		goto out;
3145
3146	ret = obj_request->result;
3147	if (ret) {
3148		if (watch)
3149			rbd_obj_request_end(obj_request);
3150		goto out;
3151	}
3152
3153	return obj_request;
3154
3155out:
3156	rbd_obj_request_put(obj_request);
3157	return ERR_PTR(ret);
3158}
3159
3160/*
3161 * Initiate a watch request, synchronously.
3162 */
3163static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
3164{
3165	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3166	struct rbd_obj_request *obj_request;
3167	int ret;
3168
3169	rbd_assert(!rbd_dev->watch_event);
3170	rbd_assert(!rbd_dev->watch_request);
3171
3172	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
3173				     &rbd_dev->watch_event);
3174	if (ret < 0)
3175		return ret;
3176
3177	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
3178	if (IS_ERR(obj_request)) {
3179		ceph_osdc_cancel_event(rbd_dev->watch_event);
3180		rbd_dev->watch_event = NULL;
3181		return PTR_ERR(obj_request);
3182	}
3183
3184	/*
3185	 * A watch request is set to linger, so the underlying osd
3186	 * request won't go away until we unregister it.  We retain
3187	 * a pointer to the object request during that time (in
3188	 * rbd_dev->watch_request), so we'll keep a reference to it.
3189	 * We'll drop that reference after we've unregistered it in
3190	 * rbd_dev_header_unwatch_sync().
3191	 */
3192	rbd_dev->watch_request = obj_request;
3193
3194	return 0;
3195}
3196
3197/*
3198 * Tear down a watch request, synchronously.
3199 */
3200static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
3201{
3202	struct rbd_obj_request *obj_request;
3203
3204	rbd_assert(rbd_dev->watch_event);
3205	rbd_assert(rbd_dev->watch_request);
3206
3207	rbd_obj_request_end(rbd_dev->watch_request);
3208	rbd_obj_request_put(rbd_dev->watch_request);
3209	rbd_dev->watch_request = NULL;
3210
3211	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
3212	if (!IS_ERR(obj_request))
3213		rbd_obj_request_put(obj_request);
3214	else
3215		rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
3216			 PTR_ERR(obj_request));
3217
3218	ceph_osdc_cancel_event(rbd_dev->watch_event);
3219	rbd_dev->watch_event = NULL;
3220}
3221
3222/*
3223 * Synchronous osd object method call.  Returns the number of bytes
3224 * returned in the outbound buffer, or a negative error code.
3225 */
3226static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
3227			     const char *object_name,
3228			     const char *class_name,
3229			     const char *method_name,
3230			     const void *outbound,
3231			     size_t outbound_size,
3232			     void *inbound,
3233			     size_t inbound_size)
3234{
3235	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3236	struct rbd_obj_request *obj_request;
3237	struct page **pages;
3238	u32 page_count;
3239	int ret;
3240
3241	/*
3242	 * Method calls are ultimately read operations.  The result
3243	 * should placed into the inbound buffer provided.  They
3244	 * also supply outbound data--parameters for the object
3245	 * method.  Currently if this is present it will be a
3246	 * snapshot id.
3247	 */
3248	page_count = (u32)calc_pages_for(0, inbound_size);
3249	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3250	if (IS_ERR(pages))
3251		return PTR_ERR(pages);
3252
3253	ret = -ENOMEM;
3254	obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
3255							OBJ_REQUEST_PAGES);
3256	if (!obj_request)
3257		goto out;
3258
3259	obj_request->pages = pages;
3260	obj_request->page_count = page_count;
3261
3262	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3263						  obj_request);
3264	if (!obj_request->osd_req)
3265		goto out;
3266
3267	osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
3268					class_name, method_name);
3269	if (outbound_size) {
3270		struct ceph_pagelist *pagelist;
3271
3272		pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3273		if (!pagelist)
3274			goto out;
3275
3276		ceph_pagelist_init(pagelist);
3277		ceph_pagelist_append(pagelist, outbound, outbound_size);
3278		osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3279						pagelist);
3280	}
3281	osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3282					obj_request->pages, inbound_size,
3283					0, false, false);
3284	rbd_osd_req_format_read(obj_request);
3285
3286	ret = rbd_obj_request_submit(osdc, obj_request);
3287	if (ret)
3288		goto out;
3289	ret = rbd_obj_request_wait(obj_request);
3290	if (ret)
3291		goto out;
3292
3293	ret = obj_request->result;
3294	if (ret < 0)
3295		goto out;
3296
3297	rbd_assert(obj_request->xferred < (u64)INT_MAX);
3298	ret = (int)obj_request->xferred;
3299	ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
3300out:
3301	if (obj_request)
3302		rbd_obj_request_put(obj_request);
3303	else
3304		ceph_release_page_vector(pages, page_count);
3305
3306	return ret;
3307}
3308
3309static void rbd_handle_request(struct rbd_device *rbd_dev, struct request *rq)
3310{
3311	struct rbd_img_request *img_request;
3312	struct ceph_snap_context *snapc = NULL;
3313	u64 offset = (u64)blk_rq_pos(rq) << SECTOR_SHIFT;
3314	u64 length = blk_rq_bytes(rq);
3315	enum obj_operation_type op_type;
3316	u64 mapping_size;
3317	int result;
3318
3319	if (rq->cmd_flags & REQ_DISCARD)
3320		op_type = OBJ_OP_DISCARD;
3321	else if (rq->cmd_flags & REQ_WRITE)
3322		op_type = OBJ_OP_WRITE;
3323	else
3324		op_type = OBJ_OP_READ;
3325
3326	/* Ignore/skip any zero-length requests */
3327
3328	if (!length) {
3329		dout("%s: zero-length request\n", __func__);
3330		result = 0;
3331		goto err_rq;
3332	}
3333
3334	/* Only reads are allowed to a read-only device */
3335
3336	if (op_type != OBJ_OP_READ) {
3337		if (rbd_dev->mapping.read_only) {
3338			result = -EROFS;
3339			goto err_rq;
3340		}
3341		rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3342	}
3343
3344	/*
3345	 * Quit early if the mapped snapshot no longer exists.  It's
3346	 * still possible the snapshot will have disappeared by the
3347	 * time our request arrives at the osd, but there's no sense in
3348	 * sending it if we already know.
3349	 */
3350	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
3351		dout("request for non-existent snapshot");
3352		rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3353		result = -ENXIO;
3354		goto err_rq;
3355	}
3356
3357	if (offset && length > U64_MAX - offset + 1) {
3358		rbd_warn(rbd_dev, "bad request range (%llu~%llu)", offset,
3359			 length);
3360		result = -EINVAL;
3361		goto err_rq;	/* Shouldn't happen */
3362	}
3363
3364	down_read(&rbd_dev->header_rwsem);
3365	mapping_size = rbd_dev->mapping.size;
3366	if (op_type != OBJ_OP_READ) {
3367		snapc = rbd_dev->header.snapc;
3368		ceph_get_snap_context(snapc);
3369	}
3370	up_read(&rbd_dev->header_rwsem);
3371
3372	if (offset + length > mapping_size) {
3373		rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)", offset,
3374			 length, mapping_size);
3375		result = -EIO;
3376		goto err_rq;
3377	}
3378
3379	img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
3380					     snapc);
3381	if (!img_request) {
3382		result = -ENOMEM;
3383		goto err_rq;
3384	}
3385	img_request->rq = rq;
3386
3387	if (op_type == OBJ_OP_DISCARD)
3388		result = rbd_img_request_fill(img_request, OBJ_REQUEST_NODATA,
3389					      NULL);
3390	else
3391		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3392					      rq->bio);
3393	if (result)
3394		goto err_img_request;
3395
3396	result = rbd_img_request_submit(img_request);
3397	if (result)
3398		goto err_img_request;
3399
3400	return;
3401
3402err_img_request:
3403	rbd_img_request_put(img_request);
3404err_rq:
3405	if (result)
3406		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
3407			 obj_op_name(op_type), length, offset, result);
3408	if (snapc)
3409		ceph_put_snap_context(snapc);
3410	blk_end_request_all(rq, result);
3411}
3412
3413static void rbd_request_workfn(struct work_struct *work)
3414{
3415	struct rbd_device *rbd_dev =
3416	    container_of(work, struct rbd_device, rq_work);
3417	struct request *rq, *next;
3418	LIST_HEAD(requests);
3419
3420	spin_lock_irq(&rbd_dev->lock); /* rq->q->queue_lock */
3421	list_splice_init(&rbd_dev->rq_queue, &requests);
3422	spin_unlock_irq(&rbd_dev->lock);
3423
3424	list_for_each_entry_safe(rq, next, &requests, queuelist) {
3425		list_del_init(&rq->queuelist);
3426		rbd_handle_request(rbd_dev, rq);
3427	}
3428}
3429
3430/*
3431 * Called with q->queue_lock held and interrupts disabled, possibly on
3432 * the way to schedule().  Do not sleep here!
3433 */
3434static void rbd_request_fn(struct request_queue *q)
3435{
3436	struct rbd_device *rbd_dev = q->queuedata;
3437	struct request *rq;
3438	int queued = 0;
3439
3440	rbd_assert(rbd_dev);
3441
3442	while ((rq = blk_fetch_request(q))) {
3443		/* Ignore any non-FS requests that filter through. */
3444		if (rq->cmd_type != REQ_TYPE_FS) {
3445			dout("%s: non-fs request type %d\n", __func__,
3446				(int) rq->cmd_type);
3447			__blk_end_request_all(rq, 0);
3448			continue;
3449		}
3450
3451		list_add_tail(&rq->queuelist, &rbd_dev->rq_queue);
3452		queued++;
3453	}
3454
3455	if (queued)
3456		queue_work(rbd_wq, &rbd_dev->rq_work);
3457}
3458
3459/*
3460 * a queue callback. Makes sure that we don't create a bio that spans across
3461 * multiple osd objects. One exception would be with a single page bios,
3462 * which we handle later at bio_chain_clone_range()
3463 */
3464static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3465			  struct bio_vec *bvec)
3466{
3467	struct rbd_device *rbd_dev = q->queuedata;
3468	sector_t sector_offset;
3469	sector_t sectors_per_obj;
3470	sector_t obj_sector_offset;
3471	int ret;
3472
3473	/*
3474	 * Find how far into its rbd object the partition-relative
3475	 * bio start sector is to offset relative to the enclosing
3476	 * device.
3477	 */
3478	sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3479	sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3480	obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3481
3482	/*
3483	 * Compute the number of bytes from that offset to the end
3484	 * of the object.  Account for what's already used by the bio.
3485	 */
3486	ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3487	if (ret > bmd->bi_size)
3488		ret -= bmd->bi_size;
3489	else
3490		ret = 0;
3491
3492	/*
3493	 * Don't send back more than was asked for.  And if the bio
3494	 * was empty, let the whole thing through because:  "Note
3495	 * that a block device *must* allow a single page to be
3496	 * added to an empty bio."
3497	 */
3498	rbd_assert(bvec->bv_len <= PAGE_SIZE);
3499	if (ret > (int) bvec->bv_len || !bmd->bi_size)
3500		ret = (int) bvec->bv_len;
3501
3502	return ret;
3503}
3504
3505static void rbd_free_disk(struct rbd_device *rbd_dev)
3506{
3507	struct gendisk *disk = rbd_dev->disk;
3508
3509	if (!disk)
3510		return;
3511
3512	rbd_dev->disk = NULL;
3513	if (disk->flags & GENHD_FL_UP) {
3514		del_gendisk(disk);
3515		if (disk->queue)
3516			blk_cleanup_queue(disk->queue);
3517	}
3518	put_disk(disk);
3519}
3520
3521static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3522				const char *object_name,
3523				u64 offset, u64 length, void *buf)
3524
3525{
3526	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3527	struct rbd_obj_request *obj_request;
3528	struct page **pages = NULL;
3529	u32 page_count;
3530	size_t size;
3531	int ret;
3532
3533	page_count = (u32) calc_pages_for(offset, length);
3534	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3535	if (IS_ERR(pages))
3536		return PTR_ERR(pages);
3537
3538	ret = -ENOMEM;
3539	obj_request = rbd_obj_request_create(object_name, offset, length,
3540							OBJ_REQUEST_PAGES);
3541	if (!obj_request)
3542		goto out;
3543
3544	obj_request->pages = pages;
3545	obj_request->page_count = page_count;
3546
3547	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
3548						  obj_request);
3549	if (!obj_request->osd_req)
3550		goto out;
3551
3552	osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3553					offset, length, 0, 0);
3554	osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
3555					obj_request->pages,
3556					obj_request->length,
3557					obj_request->offset & ~PAGE_MASK,
3558					false, false);
3559	rbd_osd_req_format_read(obj_request);
3560
3561	ret = rbd_obj_request_submit(osdc, obj_request);
3562	if (ret)
3563		goto out;
3564	ret = rbd_obj_request_wait(obj_request);
3565	if (ret)
3566		goto out;
3567
3568	ret = obj_request->result;
3569	if (ret < 0)
3570		goto out;
3571
3572	rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3573	size = (size_t) obj_request->xferred;
3574	ceph_copy_from_page_vector(pages, buf, 0, size);
3575	rbd_assert(size <= (size_t)INT_MAX);
3576	ret = (int)size;
3577out:
3578	if (obj_request)
3579		rbd_obj_request_put(obj_request);
3580	else
3581		ceph_release_page_vector(pages, page_count);
3582
3583	return ret;
3584}
3585
3586/*
3587 * Read the complete header for the given rbd device.  On successful
3588 * return, the rbd_dev->header field will contain up-to-date
3589 * information about the image.
3590 */
3591static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
3592{
3593	struct rbd_image_header_ondisk *ondisk = NULL;
3594	u32 snap_count = 0;
3595	u64 names_size = 0;
3596	u32 want_count;
3597	int ret;
3598
3599	/*
3600	 * The complete header will include an array of its 64-bit
3601	 * snapshot ids, followed by the names of those snapshots as
3602	 * a contiguous block of NUL-terminated strings.  Note that
3603	 * the number of snapshots could change by the time we read
3604	 * it in, in which case we re-read it.
3605	 */
3606	do {
3607		size_t size;
3608
3609		kfree(ondisk);
3610
3611		size = sizeof (*ondisk);
3612		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3613		size += names_size;
3614		ondisk = kmalloc(size, GFP_KERNEL);
3615		if (!ondisk)
3616			return -ENOMEM;
3617
3618		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
3619				       0, size, ondisk);
3620		if (ret < 0)
3621			goto out;
3622		if ((size_t)ret < size) {
3623			ret = -ENXIO;
3624			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3625				size, ret);
3626			goto out;
3627		}
3628		if (!rbd_dev_ondisk_valid(ondisk)) {
3629			ret = -ENXIO;
3630			rbd_warn(rbd_dev, "invalid header");
3631			goto out;
3632		}
3633
3634		names_size = le64_to_cpu(ondisk->snap_names_len);
3635		want_count = snap_count;
3636		snap_count = le32_to_cpu(ondisk->snap_count);
3637	} while (snap_count != want_count);
3638
3639	ret = rbd_header_from_disk(rbd_dev, ondisk);
3640out:
3641	kfree(ondisk);
3642
3643	return ret;
3644}
3645
3646/*
3647 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3648 * has disappeared from the (just updated) snapshot context.
3649 */
3650static void rbd_exists_validate(struct rbd_device *rbd_dev)
3651{
3652	u64 snap_id;
3653
3654	if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3655		return;
3656
3657	snap_id = rbd_dev->spec->snap_id;
3658	if (snap_id == CEPH_NOSNAP)
3659		return;
3660
3661	if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3662		clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3663}
3664
3665static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3666{
3667	sector_t size;
3668	bool removing;
3669
3670	/*
3671	 * Don't hold the lock while doing disk operations,
3672	 * or lock ordering will conflict with the bdev mutex via:
3673	 * rbd_add() -> blkdev_get() -> rbd_open()
3674	 */
3675	spin_lock_irq(&rbd_dev->lock);
3676	removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3677	spin_unlock_irq(&rbd_dev->lock);
3678	/*
3679	 * If the device is being removed, rbd_dev->disk has
3680	 * been destroyed, so don't try to update its size
3681	 */
3682	if (!removing) {
3683		size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3684		dout("setting size to %llu sectors", (unsigned long long)size);
3685		set_capacity(rbd_dev->disk, size);
3686		revalidate_disk(rbd_dev->disk);
3687	}
3688}
3689
3690static int rbd_dev_refresh(struct rbd_device *rbd_dev)
3691{
3692	u64 mapping_size;
3693	int ret;
3694
3695	down_write(&rbd_dev->header_rwsem);
3696	mapping_size = rbd_dev->mapping.size;
3697
3698	ret = rbd_dev_header_info(rbd_dev);
3699	if (ret)
3700		return ret;
3701
3702	/*
3703	 * If there is a parent, see if it has disappeared due to the
3704	 * mapped image getting flattened.
3705	 */
3706	if (rbd_dev->parent) {
3707		ret = rbd_dev_v2_parent_info(rbd_dev);
3708		if (ret)
3709			return ret;
3710	}
3711
3712	if (rbd_dev->spec->snap_id == CEPH_NOSNAP) {
3713		if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3714			rbd_dev->mapping.size = rbd_dev->header.image_size;
3715	} else {
3716		/* validate mapped snapshot's EXISTS flag */
3717		rbd_exists_validate(rbd_dev);
3718	}
3719
3720	up_write(&rbd_dev->header_rwsem);
3721
3722	if (mapping_size != rbd_dev->mapping.size)
3723		rbd_dev_update_size(rbd_dev);
3724
3725	return 0;
3726}
3727
3728static int rbd_init_disk(struct rbd_device *rbd_dev)
3729{
3730	struct gendisk *disk;
3731	struct request_queue *q;
3732	u64 segment_size;
3733
3734	/* create gendisk info */
3735	disk = alloc_disk(single_major ?
3736			  (1 << RBD_SINGLE_MAJOR_PART_SHIFT) :
3737			  RBD_MINORS_PER_MAJOR);
3738	if (!disk)
3739		return -ENOMEM;
3740
3741	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3742		 rbd_dev->dev_id);
3743	disk->major = rbd_dev->major;
3744	disk->first_minor = rbd_dev->minor;
3745	if (single_major)
3746		disk->flags |= GENHD_FL_EXT_DEVT;
3747	disk->fops = &rbd_bd_ops;
3748	disk->private_data = rbd_dev;
3749
3750	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3751	if (!q)
3752		goto out_disk;
3753
3754	/* We use the default size, but let's be explicit about it. */
3755	blk_queue_physical_block_size(q, SECTOR_SIZE);
3756
3757	/* set io sizes to object size */
3758	segment_size = rbd_obj_bytes(&rbd_dev->header);
3759	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3760	blk_queue_max_segment_size(q, segment_size);
3761	blk_queue_io_min(q, segment_size);
3762	blk_queue_io_opt(q, segment_size);
3763
3764	/* enable the discard support */
3765	queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
3766	q->limits.discard_granularity = segment_size;
3767	q->limits.discard_alignment = segment_size;
3768	q->limits.max_discard_sectors = segment_size / SECTOR_SIZE;
3769	q->limits.discard_zeroes_data = 1;
3770
3771	blk_queue_merge_bvec(q, rbd_merge_bvec);
3772	disk->queue = q;
3773
3774	q->queuedata = rbd_dev;
3775
3776	rbd_dev->disk = disk;
3777
3778	return 0;
3779out_disk:
3780	put_disk(disk);
3781
3782	return -ENOMEM;
3783}
3784
3785/*
3786  sysfs
3787*/
3788
3789static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3790{
3791	return container_of(dev, struct rbd_device, dev);
3792}
3793
3794static ssize_t rbd_size_show(struct device *dev,
3795			     struct device_attribute *attr, char *buf)
3796{
3797	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3798
3799	return sprintf(buf, "%llu\n",
3800		(unsigned long long)rbd_dev->mapping.size);
3801}
3802
3803/*
3804 * Note this shows the features for whatever's mapped, which is not
3805 * necessarily the base image.
3806 */
3807static ssize_t rbd_features_show(struct device *dev,
3808			     struct device_attribute *attr, char *buf)
3809{
3810	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3811
3812	return sprintf(buf, "0x%016llx\n",
3813			(unsigned long long)rbd_dev->mapping.features);
3814}
3815
3816static ssize_t rbd_major_show(struct device *dev,
3817			      struct device_attribute *attr, char *buf)
3818{
3819	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3820
3821	if (rbd_dev->major)
3822		return sprintf(buf, "%d\n", rbd_dev->major);
3823
3824	return sprintf(buf, "(none)\n");
3825}
3826
3827static ssize_t rbd_minor_show(struct device *dev,
3828			      struct device_attribute *attr, char *buf)
3829{
3830	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3831
3832	return sprintf(buf, "%d\n", rbd_dev->minor);
3833}
3834
3835static ssize_t rbd_client_id_show(struct device *dev,
3836				  struct device_attribute *attr, char *buf)
3837{
3838	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3839
3840	return sprintf(buf, "client%lld\n",
3841			ceph_client_id(rbd_dev->rbd_client->client));
3842}
3843
3844static ssize_t rbd_pool_show(struct device *dev,
3845			     struct device_attribute *attr, char *buf)
3846{
3847	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3848
3849	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3850}
3851
3852static ssize_t rbd_pool_id_show(struct device *dev,
3853			     struct device_attribute *attr, char *buf)
3854{
3855	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3856
3857	return sprintf(buf, "%llu\n",
3858			(unsigned long long) rbd_dev->spec->pool_id);
3859}
3860
3861static ssize_t rbd_name_show(struct device *dev,
3862			     struct device_attribute *attr, char *buf)
3863{
3864	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3865
3866	if (rbd_dev->spec->image_name)
3867		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3868
3869	return sprintf(buf, "(unknown)\n");
3870}
3871
3872static ssize_t rbd_image_id_show(struct device *dev,
3873			     struct device_attribute *attr, char *buf)
3874{
3875	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3876
3877	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3878}
3879
3880/*
3881 * Shows the name of the currently-mapped snapshot (or
3882 * RBD_SNAP_HEAD_NAME for the base image).
3883 */
3884static ssize_t rbd_snap_show(struct device *dev,
3885			     struct device_attribute *attr,
3886			     char *buf)
3887{
3888	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3889
3890	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3891}
3892
3893/*
3894 * For a v2 image, shows the chain of parent images, separated by empty
3895 * lines.  For v1 images or if there is no parent, shows "(no parent
3896 * image)".
3897 */
3898static ssize_t rbd_parent_show(struct device *dev,
3899			       struct device_attribute *attr,
3900			       char *buf)
3901{
3902	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3903	ssize_t count = 0;
3904
3905	if (!rbd_dev->parent)
3906		return sprintf(buf, "(no parent image)\n");
3907
3908	for ( ; rbd_dev->parent; rbd_dev = rbd_dev->parent) {
3909		struct rbd_spec *spec = rbd_dev->parent_spec;
3910
3911		count += sprintf(&buf[count], "%s"
3912			    "pool_id %llu\npool_name %s\n"
3913			    "image_id %s\nimage_name %s\n"
3914			    "snap_id %llu\nsnap_name %s\n"
3915			    "overlap %llu\n",
3916			    !count ? "" : "\n", /* first? */
3917			    spec->pool_id, spec->pool_name,
3918			    spec->image_id, spec->image_name ?: "(unknown)",
3919			    spec->snap_id, spec->snap_name,
3920			    rbd_dev->parent_overlap);
3921	}
3922
3923	return count;
3924}
3925
3926static ssize_t rbd_image_refresh(struct device *dev,
3927				 struct device_attribute *attr,
3928				 const char *buf,
3929				 size_t size)
3930{
3931	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3932	int ret;
3933
3934	ret = rbd_dev_refresh(rbd_dev);
3935	if (ret)
3936		return ret;
3937
3938	return size;
3939}
3940
3941static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3942static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3943static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3944static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
3945static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3946static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3947static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3948static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3949static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3950static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3951static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3952static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3953
3954static struct attribute *rbd_attrs[] = {
3955	&dev_attr_size.attr,
3956	&dev_attr_features.attr,
3957	&dev_attr_major.attr,
3958	&dev_attr_minor.attr,
3959	&dev_attr_client_id.attr,
3960	&dev_attr_pool.attr,
3961	&dev_attr_pool_id.attr,
3962	&dev_attr_name.attr,
3963	&dev_attr_image_id.attr,
3964	&dev_attr_current_snap.attr,
3965	&dev_attr_parent.attr,
3966	&dev_attr_refresh.attr,
3967	NULL
3968};
3969
3970static struct attribute_group rbd_attr_group = {
3971	.attrs = rbd_attrs,
3972};
3973
3974static const struct attribute_group *rbd_attr_groups[] = {
3975	&rbd_attr_group,
3976	NULL
3977};
3978
3979static void rbd_sysfs_dev_release(struct device *dev)
3980{
3981}
3982
3983static struct device_type rbd_device_type = {
3984	.name		= "rbd",
3985	.groups		= rbd_attr_groups,
3986	.release	= rbd_sysfs_dev_release,
3987};
3988
3989static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3990{
3991	kref_get(&spec->kref);
3992
3993	return spec;
3994}
3995
3996static void rbd_spec_free(struct kref *kref);
3997static void rbd_spec_put(struct rbd_spec *spec)
3998{
3999	if (spec)
4000		kref_put(&spec->kref, rbd_spec_free);
4001}
4002
4003static struct rbd_spec *rbd_spec_alloc(void)
4004{
4005	struct rbd_spec *spec;
4006
4007	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
4008	if (!spec)
4009		return NULL;
4010
4011	spec->pool_id = CEPH_NOPOOL;
4012	spec->snap_id = CEPH_NOSNAP;
4013	kref_init(&spec->kref);
4014
4015	return spec;
4016}
4017
4018static void rbd_spec_free(struct kref *kref)
4019{
4020	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
4021
4022	kfree(spec->pool_name);
4023	kfree(spec->image_id);
4024	kfree(spec->image_name);
4025	kfree(spec->snap_name);
4026	kfree(spec);
4027}
4028
4029static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
4030				struct rbd_spec *spec)
4031{
4032	struct rbd_device *rbd_dev;
4033
4034	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
4035	if (!rbd_dev)
4036		return NULL;
4037
4038	spin_lock_init(&rbd_dev->lock);
4039	INIT_LIST_HEAD(&rbd_dev->rq_queue);
4040	INIT_WORK(&rbd_dev->rq_work, rbd_request_workfn);
4041	rbd_dev->flags = 0;
4042	atomic_set(&rbd_dev->parent_ref, 0);
4043	INIT_LIST_HEAD(&rbd_dev->node);
4044	init_rwsem(&rbd_dev->header_rwsem);
4045
4046	rbd_dev->spec = spec;
4047	rbd_dev->rbd_client = rbdc;
4048
4049	/* Initialize the layout used for all rbd requests */
4050
4051	rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
4052	rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
4053	rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
4054	rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
4055
4056	return rbd_dev;
4057}
4058
4059static void rbd_dev_destroy(struct rbd_device *rbd_dev)
4060{
4061	rbd_put_client(rbd_dev->rbd_client);
4062	rbd_spec_put(rbd_dev->spec);
4063	kfree(rbd_dev);
4064}
4065
4066/*
4067 * Get the size and object order for an image snapshot, or if
4068 * snap_id is CEPH_NOSNAP, gets this information for the base
4069 * image.
4070 */
4071static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
4072				u8 *order, u64 *snap_size)
4073{
4074	__le64 snapid = cpu_to_le64(snap_id);
4075	int ret;
4076	struct {
4077		u8 order;
4078		__le64 size;
4079	} __attribute__ ((packed)) size_buf = { 0 };
4080
4081	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4082				"rbd", "get_size",
4083				&snapid, sizeof (snapid),
4084				&size_buf, sizeof (size_buf));
4085	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4086	if (ret < 0)
4087		return ret;
4088	if (ret < sizeof (size_buf))
4089		return -ERANGE;
4090
4091	if (order) {
4092		*order = size_buf.order;
4093		dout("  order %u", (unsigned int)*order);
4094	}
4095	*snap_size = le64_to_cpu(size_buf.size);
4096
4097	dout("  snap_id 0x%016llx snap_size = %llu\n",
4098		(unsigned long long)snap_id,
4099		(unsigned long long)*snap_size);
4100
4101	return 0;
4102}
4103
4104static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
4105{
4106	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
4107					&rbd_dev->header.obj_order,
4108					&rbd_dev->header.image_size);
4109}
4110
4111static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
4112{
4113	void *reply_buf;
4114	int ret;
4115	void *p;
4116
4117	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
4118	if (!reply_buf)
4119		return -ENOMEM;
4120
4121	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4122				"rbd", "get_object_prefix", NULL, 0,
4123				reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
4124	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4125	if (ret < 0)
4126		goto out;
4127
4128	p = reply_buf;
4129	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
4130						p + ret, NULL, GFP_NOIO);
4131	ret = 0;
4132
4133	if (IS_ERR(rbd_dev->header.object_prefix)) {
4134		ret = PTR_ERR(rbd_dev->header.object_prefix);
4135		rbd_dev->header.object_prefix = NULL;
4136	} else {
4137		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
4138	}
4139out:
4140	kfree(reply_buf);
4141
4142	return ret;
4143}
4144
4145static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
4146		u64 *snap_features)
4147{
4148	__le64 snapid = cpu_to_le64(snap_id);
4149	struct {
4150		__le64 features;
4151		__le64 incompat;
4152	} __attribute__ ((packed)) features_buf = { 0 };
4153	u64 incompat;
4154	int ret;
4155
4156	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157				"rbd", "get_features",
4158				&snapid, sizeof (snapid),
4159				&features_buf, sizeof (features_buf));
4160	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4161	if (ret < 0)
4162		return ret;
4163	if (ret < sizeof (features_buf))
4164		return -ERANGE;
4165
4166	incompat = le64_to_cpu(features_buf.incompat);
4167	if (incompat & ~RBD_FEATURES_SUPPORTED)
4168		return -ENXIO;
4169
4170	*snap_features = le64_to_cpu(features_buf.features);
4171
4172	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
4173		(unsigned long long)snap_id,
4174		(unsigned long long)*snap_features,
4175		(unsigned long long)le64_to_cpu(features_buf.incompat));
4176
4177	return 0;
4178}
4179
4180static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
4181{
4182	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
4183						&rbd_dev->header.features);
4184}
4185
4186static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
4187{
4188	struct rbd_spec *parent_spec;
4189	size_t size;
4190	void *reply_buf = NULL;
4191	__le64 snapid;
4192	void *p;
4193	void *end;
4194	u64 pool_id;
4195	char *image_id;
4196	u64 snap_id;
4197	u64 overlap;
4198	int ret;
4199
4200	parent_spec = rbd_spec_alloc();
4201	if (!parent_spec)
4202		return -ENOMEM;
4203
4204	size = sizeof (__le64) +				/* pool_id */
4205		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
4206		sizeof (__le64) +				/* snap_id */
4207		sizeof (__le64);				/* overlap */
4208	reply_buf = kmalloc(size, GFP_KERNEL);
4209	if (!reply_buf) {
4210		ret = -ENOMEM;
4211		goto out_err;
4212	}
4213
4214	snapid = cpu_to_le64(rbd_dev->spec->snap_id);
4215	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4216				"rbd", "get_parent",
4217				&snapid, sizeof (snapid),
4218				reply_buf, size);
4219	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4220	if (ret < 0)
4221		goto out_err;
4222
4223	p = reply_buf;
4224	end = reply_buf + ret;
4225	ret = -ERANGE;
4226	ceph_decode_64_safe(&p, end, pool_id, out_err);
4227	if (pool_id == CEPH_NOPOOL) {
4228		/*
4229		 * Either the parent never existed, or we have
4230		 * record of it but the image got flattened so it no
4231		 * longer has a parent.  When the parent of a
4232		 * layered image disappears we immediately set the
4233		 * overlap to 0.  The effect of this is that all new
4234		 * requests will be treated as if the image had no
4235		 * parent.
4236		 */
4237		if (rbd_dev->parent_overlap) {
4238			rbd_dev->parent_overlap = 0;
4239			smp_mb();
4240			rbd_dev_parent_put(rbd_dev);
4241			pr_info("%s: clone image has been flattened\n",
4242				rbd_dev->disk->disk_name);
4243		}
4244
4245		goto out;	/* No parent?  No problem. */
4246	}
4247
4248	/* The ceph file layout needs to fit pool id in 32 bits */
4249
4250	ret = -EIO;
4251	if (pool_id > (u64)U32_MAX) {
4252		rbd_warn(NULL, "parent pool id too large (%llu > %u)",
4253			(unsigned long long)pool_id, U32_MAX);
4254		goto out_err;
4255	}
4256
4257	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4258	if (IS_ERR(image_id)) {
4259		ret = PTR_ERR(image_id);
4260		goto out_err;
4261	}
4262	ceph_decode_64_safe(&p, end, snap_id, out_err);
4263	ceph_decode_64_safe(&p, end, overlap, out_err);
4264
4265	/*
4266	 * The parent won't change (except when the clone is
4267	 * flattened, already handled that).  So we only need to
4268	 * record the parent spec we have not already done so.
4269	 */
4270	if (!rbd_dev->parent_spec) {
4271		parent_spec->pool_id = pool_id;
4272		parent_spec->image_id = image_id;
4273		parent_spec->snap_id = snap_id;
4274		rbd_dev->parent_spec = parent_spec;
4275		parent_spec = NULL;	/* rbd_dev now owns this */
4276	} else {
4277		kfree(image_id);
4278	}
4279
4280	/*
4281	 * We always update the parent overlap.  If it's zero we
4282	 * treat it specially.
4283	 */
4284	rbd_dev->parent_overlap = overlap;
4285	smp_mb();
4286	if (!overlap) {
4287
4288		/* A null parent_spec indicates it's the initial probe */
4289
4290		if (parent_spec) {
4291			/*
4292			 * The overlap has become zero, so the clone
4293			 * must have been resized down to 0 at some
4294			 * point.  Treat this the same as a flatten.
4295			 */
4296			rbd_dev_parent_put(rbd_dev);
4297			pr_info("%s: clone image now standalone\n",
4298				rbd_dev->disk->disk_name);
4299		} else {
4300			/*
4301			 * For the initial probe, if we find the
4302			 * overlap is zero we just pretend there was
4303			 * no parent image.
4304			 */
4305			rbd_warn(rbd_dev, "ignoring parent with overlap 0");
4306		}
4307	}
4308out:
4309	ret = 0;
4310out_err:
4311	kfree(reply_buf);
4312	rbd_spec_put(parent_spec);
4313
4314	return ret;
4315}
4316
4317static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
4318{
4319	struct {
4320		__le64 stripe_unit;
4321		__le64 stripe_count;
4322	} __attribute__ ((packed)) striping_info_buf = { 0 };
4323	size_t size = sizeof (striping_info_buf);
4324	void *p;
4325	u64 obj_size;
4326	u64 stripe_unit;
4327	u64 stripe_count;
4328	int ret;
4329
4330	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4331				"rbd", "get_stripe_unit_count", NULL, 0,
4332				(char *)&striping_info_buf, size);
4333	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4334	if (ret < 0)
4335		return ret;
4336	if (ret < size)
4337		return -ERANGE;
4338
4339	/*
4340	 * We don't actually support the "fancy striping" feature
4341	 * (STRIPINGV2) yet, but if the striping sizes are the
4342	 * defaults the behavior is the same as before.  So find
4343	 * out, and only fail if the image has non-default values.
4344	 */
4345	ret = -EINVAL;
4346	obj_size = (u64)1 << rbd_dev->header.obj_order;
4347	p = &striping_info_buf;
4348	stripe_unit = ceph_decode_64(&p);
4349	if (stripe_unit != obj_size) {
4350		rbd_warn(rbd_dev, "unsupported stripe unit "
4351				"(got %llu want %llu)",
4352				stripe_unit, obj_size);
4353		return -EINVAL;
4354	}
4355	stripe_count = ceph_decode_64(&p);
4356	if (stripe_count != 1) {
4357		rbd_warn(rbd_dev, "unsupported stripe count "
4358				"(got %llu want 1)", stripe_count);
4359		return -EINVAL;
4360	}
4361	rbd_dev->header.stripe_unit = stripe_unit;
4362	rbd_dev->header.stripe_count = stripe_count;
4363
4364	return 0;
4365}
4366
4367static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4368{
4369	size_t image_id_size;
4370	char *image_id;
4371	void *p;
4372	void *end;
4373	size_t size;
4374	void *reply_buf = NULL;
4375	size_t len = 0;
4376	char *image_name = NULL;
4377	int ret;
4378
4379	rbd_assert(!rbd_dev->spec->image_name);
4380
4381	len = strlen(rbd_dev->spec->image_id);
4382	image_id_size = sizeof (__le32) + len;
4383	image_id = kmalloc(image_id_size, GFP_KERNEL);
4384	if (!image_id)
4385		return NULL;
4386
4387	p = image_id;
4388	end = image_id + image_id_size;
4389	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
4390
4391	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4392	reply_buf = kmalloc(size, GFP_KERNEL);
4393	if (!reply_buf)
4394		goto out;
4395
4396	ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
4397				"rbd", "dir_get_name",
4398				image_id, image_id_size,
4399				reply_buf, size);
4400	if (ret < 0)
4401		goto out;
4402	p = reply_buf;
4403	end = reply_buf + ret;
4404
4405	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4406	if (IS_ERR(image_name))
4407		image_name = NULL;
4408	else
4409		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4410out:
4411	kfree(reply_buf);
4412	kfree(image_id);
4413
4414	return image_name;
4415}
4416
4417static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4418{
4419	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4420	const char *snap_name;
4421	u32 which = 0;
4422
4423	/* Skip over names until we find the one we are looking for */
4424
4425	snap_name = rbd_dev->header.snap_names;
4426	while (which < snapc->num_snaps) {
4427		if (!strcmp(name, snap_name))
4428			return snapc->snaps[which];
4429		snap_name += strlen(snap_name) + 1;
4430		which++;
4431	}
4432	return CEPH_NOSNAP;
4433}
4434
4435static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4436{
4437	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4438	u32 which;
4439	bool found = false;
4440	u64 snap_id;
4441
4442	for (which = 0; !found && which < snapc->num_snaps; which++) {
4443		const char *snap_name;
4444
4445		snap_id = snapc->snaps[which];
4446		snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
4447		if (IS_ERR(snap_name)) {
4448			/* ignore no-longer existing snapshots */
4449			if (PTR_ERR(snap_name) == -ENOENT)
4450				continue;
4451			else
4452				break;
4453		}
4454		found = !strcmp(name, snap_name);
4455		kfree(snap_name);
4456	}
4457	return found ? snap_id : CEPH_NOSNAP;
4458}
4459
4460/*
4461 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4462 * no snapshot by that name is found, or if an error occurs.
4463 */
4464static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4465{
4466	if (rbd_dev->image_format == 1)
4467		return rbd_v1_snap_id_by_name(rbd_dev, name);
4468
4469	return rbd_v2_snap_id_by_name(rbd_dev, name);
4470}
4471
4472/*
4473 * An image being mapped will have everything but the snap id.
4474 */
4475static int rbd_spec_fill_snap_id(struct rbd_device *rbd_dev)
4476{
4477	struct rbd_spec *spec = rbd_dev->spec;
4478
4479	rbd_assert(spec->pool_id != CEPH_NOPOOL && spec->pool_name);
4480	rbd_assert(spec->image_id && spec->image_name);
4481	rbd_assert(spec->snap_name);
4482
4483	if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
4484		u64 snap_id;
4485
4486		snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4487		if (snap_id == CEPH_NOSNAP)
4488			return -ENOENT;
4489
4490		spec->snap_id = snap_id;
4491	} else {
4492		spec->snap_id = CEPH_NOSNAP;
4493	}
4494
4495	return 0;
4496}
4497
4498/*
4499 * A parent image will have all ids but none of the names.
4500 *
4501 * All names in an rbd spec are dynamically allocated.  It's OK if we
4502 * can't figure out the name for an image id.
4503 */
4504static int rbd_spec_fill_names(struct rbd_device *rbd_dev)
4505{
4506	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4507	struct rbd_spec *spec = rbd_dev->spec;
4508	const char *pool_name;
4509	const char *image_name;
4510	const char *snap_name;
4511	int ret;
4512
4513	rbd_assert(spec->pool_id != CEPH_NOPOOL);
4514	rbd_assert(spec->image_id);
4515	rbd_assert(spec->snap_id != CEPH_NOSNAP);
4516
4517	/* Get the pool name; we have to make our own copy of this */
4518
4519	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4520	if (!pool_name) {
4521		rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
4522		return -EIO;
4523	}
4524	pool_name = kstrdup(pool_name, GFP_KERNEL);
4525	if (!pool_name)
4526		return -ENOMEM;
4527
4528	/* Fetch the image name; tolerate failure here */
4529
4530	image_name = rbd_dev_image_name(rbd_dev);
4531	if (!image_name)
4532		rbd_warn(rbd_dev, "unable to get image name");
4533
4534	/* Fetch the snapshot name */
4535
4536	snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
4537	if (IS_ERR(snap_name)) {
4538		ret = PTR_ERR(snap_name);
4539		goto out_err;
4540	}
4541
4542	spec->pool_name = pool_name;
4543	spec->image_name = image_name;
4544	spec->snap_name = snap_name;
4545
4546	return 0;
4547
4548out_err:
4549	kfree(image_name);
4550	kfree(pool_name);
4551	return ret;
4552}
4553
4554static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
4555{
4556	size_t size;
4557	int ret;
4558	void *reply_buf;
4559	void *p;
4560	void *end;
4561	u64 seq;
4562	u32 snap_count;
4563	struct ceph_snap_context *snapc;
4564	u32 i;
4565
4566	/*
4567	 * We'll need room for the seq value (maximum snapshot id),
4568	 * snapshot count, and array of that many snapshot ids.
4569	 * For now we have a fixed upper limit on the number we're
4570	 * prepared to receive.
4571	 */
4572	size = sizeof (__le64) + sizeof (__le32) +
4573			RBD_MAX_SNAP_COUNT * sizeof (__le64);
4574	reply_buf = kzalloc(size, GFP_KERNEL);
4575	if (!reply_buf)
4576		return -ENOMEM;
4577
4578	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4579				"rbd", "get_snapcontext", NULL, 0,
4580				reply_buf, size);
4581	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4582	if (ret < 0)
4583		goto out;
4584
4585	p = reply_buf;
4586	end = reply_buf + ret;
4587	ret = -ERANGE;
4588	ceph_decode_64_safe(&p, end, seq, out);
4589	ceph_decode_32_safe(&p, end, snap_count, out);
4590
4591	/*
4592	 * Make sure the reported number of snapshot ids wouldn't go
4593	 * beyond the end of our buffer.  But before checking that,
4594	 * make sure the computed size of the snapshot context we
4595	 * allocate is representable in a size_t.
4596	 */
4597	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4598				 / sizeof (u64)) {
4599		ret = -EINVAL;
4600		goto out;
4601	}
4602	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4603		goto out;
4604	ret = 0;
4605
4606	snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
4607	if (!snapc) {
4608		ret = -ENOMEM;
4609		goto out;
4610	}
4611	snapc->seq = seq;
4612	for (i = 0; i < snap_count; i++)
4613		snapc->snaps[i] = ceph_decode_64(&p);
4614
4615	ceph_put_snap_context(rbd_dev->header.snapc);
4616	rbd_dev->header.snapc = snapc;
4617
4618	dout("  snap context seq = %llu, snap_count = %u\n",
4619		(unsigned long long)seq, (unsigned int)snap_count);
4620out:
4621	kfree(reply_buf);
4622
4623	return ret;
4624}
4625
4626static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4627					u64 snap_id)
4628{
4629	size_t size;
4630	void *reply_buf;
4631	__le64 snapid;
4632	int ret;
4633	void *p;
4634	void *end;
4635	char *snap_name;
4636
4637	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4638	reply_buf = kmalloc(size, GFP_KERNEL);
4639	if (!reply_buf)
4640		return ERR_PTR(-ENOMEM);
4641
4642	snapid = cpu_to_le64(snap_id);
4643	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4644				"rbd", "get_snapshot_name",
4645				&snapid, sizeof (snapid),
4646				reply_buf, size);
4647	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4648	if (ret < 0) {
4649		snap_name = ERR_PTR(ret);
4650		goto out;
4651	}
4652
4653	p = reply_buf;
4654	end = reply_buf + ret;
4655	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
4656	if (IS_ERR(snap_name))
4657		goto out;
4658
4659	dout("  snap_id 0x%016llx snap_name = %s\n",
4660		(unsigned long long)snap_id, snap_name);
4661out:
4662	kfree(reply_buf);
4663
4664	return snap_name;
4665}
4666
4667static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
4668{
4669	bool first_time = rbd_dev->header.object_prefix == NULL;
4670	int ret;
4671
4672	ret = rbd_dev_v2_image_size(rbd_dev);
4673	if (ret)
4674		return ret;
4675
4676	if (first_time) {
4677		ret = rbd_dev_v2_header_onetime(rbd_dev);
4678		if (ret)
4679			return ret;
4680	}
4681
4682	ret = rbd_dev_v2_snap_context(rbd_dev);
4683	dout("rbd_dev_v2_snap_context returned %d\n", ret);
4684
4685	return ret;
4686}
4687
4688static int rbd_dev_header_info(struct rbd_device *rbd_dev)
4689{
4690	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4691
4692	if (rbd_dev->image_format == 1)
4693		return rbd_dev_v1_header_info(rbd_dev);
4694
4695	return rbd_dev_v2_header_info(rbd_dev);
4696}
4697
4698static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4699{
4700	struct device *dev;
4701	int ret;
4702
4703	dev = &rbd_dev->dev;
4704	dev->bus = &rbd_bus_type;
4705	dev->type = &rbd_device_type;
4706	dev->parent = &rbd_root_dev;
4707	dev->release = rbd_dev_device_release;
4708	dev_set_name(dev, "%d", rbd_dev->dev_id);
4709	ret = device_register(dev);
4710
4711	return ret;
4712}
4713
4714static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4715{
4716	device_unregister(&rbd_dev->dev);
4717}
4718
4719/*
4720 * Get a unique rbd identifier for the given new rbd_dev, and add
4721 * the rbd_dev to the global list.
4722 */
4723static int rbd_dev_id_get(struct rbd_device *rbd_dev)
4724{
4725	int new_dev_id;
4726
4727	new_dev_id = ida_simple_get(&rbd_dev_id_ida,
4728				    0, minor_to_rbd_dev_id(1 << MINORBITS),
4729				    GFP_KERNEL);
4730	if (new_dev_id < 0)
4731		return new_dev_id;
4732
4733	rbd_dev->dev_id = new_dev_id;
4734
4735	spin_lock(&rbd_dev_list_lock);
4736	list_add_tail(&rbd_dev->node, &rbd_dev_list);
4737	spin_unlock(&rbd_dev_list_lock);
4738
4739	dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
4740
4741	return 0;
4742}
4743
4744/*
4745 * Remove an rbd_dev from the global list, and record that its
4746 * identifier is no longer in use.
4747 */
4748static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4749{
4750	spin_lock(&rbd_dev_list_lock);
4751	list_del_init(&rbd_dev->node);
4752	spin_unlock(&rbd_dev_list_lock);
4753
4754	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4755
4756	dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
4757}
4758
4759/*
4760 * Skips over white space at *buf, and updates *buf to point to the
4761 * first found non-space character (if any). Returns the length of
4762 * the token (string of non-white space characters) found.  Note
4763 * that *buf must be terminated with '\0'.
4764 */
4765static inline size_t next_token(const char **buf)
4766{
4767        /*
4768        * These are the characters that produce nonzero for
4769        * isspace() in the "C" and "POSIX" locales.
4770        */
4771        const char *spaces = " \f\n\r\t\v";
4772
4773        *buf += strspn(*buf, spaces);	/* Find start of token */
4774
4775	return strcspn(*buf, spaces);   /* Return token length */
4776}
4777
4778/*
4779 * Finds the next token in *buf, and if the provided token buffer is
4780 * big enough, copies the found token into it.  The result, if
4781 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4782 * must be terminated with '\0' on entry.
4783 *
4784 * Returns the length of the token found (not including the '\0').
4785 * Return value will be 0 if no token is found, and it will be >=
4786 * token_size if the token would not fit.
4787 *
4788 * The *buf pointer will be updated to point beyond the end of the
4789 * found token.  Note that this occurs even if the token buffer is
4790 * too small to hold it.
4791 */
4792static inline size_t copy_token(const char **buf,
4793				char *token,
4794				size_t token_size)
4795{
4796        size_t len;
4797
4798	len = next_token(buf);
4799	if (len < token_size) {
4800		memcpy(token, *buf, len);
4801		*(token + len) = '\0';
4802	}
4803	*buf += len;
4804
4805        return len;
4806}
4807
4808/*
4809 * Finds the next token in *buf, dynamically allocates a buffer big
4810 * enough to hold a copy of it, and copies the token into the new
4811 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4812 * that a duplicate buffer is created even for a zero-length token.
4813 *
4814 * Returns a pointer to the newly-allocated duplicate, or a null
4815 * pointer if memory for the duplicate was not available.  If
4816 * the lenp argument is a non-null pointer, the length of the token
4817 * (not including the '\0') is returned in *lenp.
4818 *
4819 * If successful, the *buf pointer will be updated to point beyond
4820 * the end of the found token.
4821 *
4822 * Note: uses GFP_KERNEL for allocation.
4823 */
4824static inline char *dup_token(const char **buf, size_t *lenp)
4825{
4826	char *dup;
4827	size_t len;
4828
4829	len = next_token(buf);
4830	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4831	if (!dup)
4832		return NULL;
4833	*(dup + len) = '\0';
4834	*buf += len;
4835
4836	if (lenp)
4837		*lenp = len;
4838
4839	return dup;
4840}
4841
4842/*
4843 * Parse the options provided for an "rbd add" (i.e., rbd image
4844 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4845 * and the data written is passed here via a NUL-terminated buffer.
4846 * Returns 0 if successful or an error code otherwise.
4847 *
4848 * The information extracted from these options is recorded in
4849 * the other parameters which return dynamically-allocated
4850 * structures:
4851 *  ceph_opts
4852 *      The address of a pointer that will refer to a ceph options
4853 *      structure.  Caller must release the returned pointer using
4854 *      ceph_destroy_options() when it is no longer needed.
4855 *  rbd_opts
4856 *	Address of an rbd options pointer.  Fully initialized by
4857 *	this function; caller must release with kfree().
4858 *  spec
4859 *	Address of an rbd image specification pointer.  Fully
4860 *	initialized by this function based on parsed options.
4861 *	Caller must release with rbd_spec_put().
4862 *
4863 * The options passed take this form:
4864 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4865 * where:
4866 *  <mon_addrs>
4867 *      A comma-separated list of one or more monitor addresses.
4868 *      A monitor address is an ip address, optionally followed
4869 *      by a port number (separated by a colon).
4870 *        I.e.:  ip1[:port1][,ip2[:port2]...]
4871 *  <options>
4872 *      A comma-separated list of ceph and/or rbd options.
4873 *  <pool_name>
4874 *      The name of the rados pool containing the rbd image.
4875 *  <image_name>
4876 *      The name of the image in that pool to map.
4877 *  <snap_id>
4878 *      An optional snapshot id.  If provided, the mapping will
4879 *      present data from the image at the time that snapshot was
4880 *      created.  The image head is used if no snapshot id is
4881 *      provided.  Snapshot mappings are always read-only.
4882 */
4883static int rbd_add_parse_args(const char *buf,
4884				struct ceph_options **ceph_opts,
4885				struct rbd_options **opts,
4886				struct rbd_spec **rbd_spec)
4887{
4888	size_t len;
4889	char *options;
4890	const char *mon_addrs;
4891	char *snap_name;
4892	size_t mon_addrs_size;
4893	struct rbd_spec *spec = NULL;
4894	struct rbd_options *rbd_opts = NULL;
4895	struct ceph_options *copts;
4896	int ret;
4897
4898	/* The first four tokens are required */
4899
4900	len = next_token(&buf);
4901	if (!len) {
4902		rbd_warn(NULL, "no monitor address(es) provided");
4903		return -EINVAL;
4904	}
4905	mon_addrs = buf;
4906	mon_addrs_size = len + 1;
4907	buf += len;
4908
4909	ret = -EINVAL;
4910	options = dup_token(&buf, NULL);
4911	if (!options)
4912		return -ENOMEM;
4913	if (!*options) {
4914		rbd_warn(NULL, "no options provided");
4915		goto out_err;
4916	}
4917
4918	spec = rbd_spec_alloc();
4919	if (!spec)
4920		goto out_mem;
4921
4922	spec->pool_name = dup_token(&buf, NULL);
4923	if (!spec->pool_name)
4924		goto out_mem;
4925	if (!*spec->pool_name) {
4926		rbd_warn(NULL, "no pool name provided");
4927		goto out_err;
4928	}
4929
4930	spec->image_name = dup_token(&buf, NULL);
4931	if (!spec->image_name)
4932		goto out_mem;
4933	if (!*spec->image_name) {
4934		rbd_warn(NULL, "no image name provided");
4935		goto out_err;
4936	}
4937
4938	/*
4939	 * Snapshot name is optional; default is to use "-"
4940	 * (indicating the head/no snapshot).
4941	 */
4942	len = next_token(&buf);
4943	if (!len) {
4944		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4945		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4946	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
4947		ret = -ENAMETOOLONG;
4948		goto out_err;
4949	}
4950	snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4951	if (!snap_name)
4952		goto out_mem;
4953	*(snap_name + len) = '\0';
4954	spec->snap_name = snap_name;
4955
4956	/* Initialize all rbd options to the defaults */
4957
4958	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4959	if (!rbd_opts)
4960		goto out_mem;
4961
4962	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4963
4964	copts = ceph_parse_options(options, mon_addrs,
4965					mon_addrs + mon_addrs_size - 1,
4966					parse_rbd_opts_token, rbd_opts);
4967	if (IS_ERR(copts)) {
4968		ret = PTR_ERR(copts);
4969		goto out_err;
4970	}
4971	kfree(options);
4972
4973	*ceph_opts = copts;
4974	*opts = rbd_opts;
4975	*rbd_spec = spec;
4976
4977	return 0;
4978out_mem:
4979	ret = -ENOMEM;
4980out_err:
4981	kfree(rbd_opts);
4982	rbd_spec_put(spec);
4983	kfree(options);
4984
4985	return ret;
4986}
4987
4988/*
4989 * Return pool id (>= 0) or a negative error code.
4990 */
4991static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
4992{
4993	u64 newest_epoch;
4994	unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
4995	int tries = 0;
4996	int ret;
4997
4998again:
4999	ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
5000	if (ret == -ENOENT && tries++ < 1) {
5001		ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
5002					       &newest_epoch);
5003		if (ret < 0)
5004			return ret;
5005
5006		if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
5007			ceph_monc_request_next_osdmap(&rbdc->client->monc);
5008			(void) ceph_monc_wait_osdmap(&rbdc->client->monc,
5009						     newest_epoch, timeout);
5010			goto again;
5011		} else {
5012			/* the osdmap we have is new enough */
5013			return -ENOENT;
5014		}
5015	}
5016
5017	return ret;
5018}
5019
5020/*
5021 * An rbd format 2 image has a unique identifier, distinct from the
5022 * name given to it by the user.  Internally, that identifier is
5023 * what's used to specify the names of objects related to the image.
5024 *
5025 * A special "rbd id" object is used to map an rbd image name to its
5026 * id.  If that object doesn't exist, then there is no v2 rbd image
5027 * with the supplied name.
5028 *
5029 * This function will record the given rbd_dev's image_id field if
5030 * it can be determined, and in that case will return 0.  If any
5031 * errors occur a negative errno will be returned and the rbd_dev's
5032 * image_id field will be unchanged (and should be NULL).
5033 */
5034static int rbd_dev_image_id(struct rbd_device *rbd_dev)
5035{
5036	int ret;
5037	size_t size;
5038	char *object_name;
5039	void *response;
5040	char *image_id;
5041
5042	/*
5043	 * When probing a parent image, the image id is already
5044	 * known (and the image name likely is not).  There's no
5045	 * need to fetch the image id again in this case.  We
5046	 * do still need to set the image format though.
5047	 */
5048	if (rbd_dev->spec->image_id) {
5049		rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
5050
5051		return 0;
5052	}
5053
5054	/*
5055	 * First, see if the format 2 image id file exists, and if
5056	 * so, get the image's persistent id from it.
5057	 */
5058	size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
5059	object_name = kmalloc(size, GFP_NOIO);
5060	if (!object_name)
5061		return -ENOMEM;
5062	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
5063	dout("rbd id object name is %s\n", object_name);
5064
5065	/* Response will be an encoded string, which includes a length */
5066
5067	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
5068	response = kzalloc(size, GFP_NOIO);
5069	if (!response) {
5070		ret = -ENOMEM;
5071		goto out;
5072	}
5073
5074	/* If it doesn't exist we'll assume it's a format 1 image */
5075
5076	ret = rbd_obj_method_sync(rbd_dev, object_name,
5077				"rbd", "get_id", NULL, 0,
5078				response, RBD_IMAGE_ID_LEN_MAX);
5079	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
5080	if (ret == -ENOENT) {
5081		image_id = kstrdup("", GFP_KERNEL);
5082		ret = image_id ? 0 : -ENOMEM;
5083		if (!ret)
5084			rbd_dev->image_format = 1;
5085	} else if (ret >= 0) {
5086		void *p = response;
5087
5088		image_id = ceph_extract_encoded_string(&p, p + ret,
5089						NULL, GFP_NOIO);
5090		ret = PTR_ERR_OR_ZERO(image_id);
5091		if (!ret)
5092			rbd_dev->image_format = 2;
5093	}
5094
5095	if (!ret) {
5096		rbd_dev->spec->image_id = image_id;
5097		dout("image_id is %s\n", image_id);
5098	}
5099out:
5100	kfree(response);
5101	kfree(object_name);
5102
5103	return ret;
5104}
5105
5106/*
5107 * Undo whatever state changes are made by v1 or v2 header info
5108 * call.
5109 */
5110static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
5111{
5112	struct rbd_image_header	*header;
5113
5114	/* Drop parent reference unless it's already been done (or none) */
5115
5116	if (rbd_dev->parent_overlap)
5117		rbd_dev_parent_put(rbd_dev);
5118
5119	/* Free dynamic fields from the header, then zero it out */
5120
5121	header = &rbd_dev->header;
5122	ceph_put_snap_context(header->snapc);
5123	kfree(header->snap_sizes);
5124	kfree(header->snap_names);
5125	kfree(header->object_prefix);
5126	memset(header, 0, sizeof (*header));
5127}
5128
5129static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
5130{
5131	int ret;
5132
5133	ret = rbd_dev_v2_object_prefix(rbd_dev);
5134	if (ret)
5135		goto out_err;
5136
5137	/*
5138	 * Get the and check features for the image.  Currently the
5139	 * features are assumed to never change.
5140	 */
5141	ret = rbd_dev_v2_features(rbd_dev);
5142	if (ret)
5143		goto out_err;
5144
5145	/* If the image supports fancy striping, get its parameters */
5146
5147	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
5148		ret = rbd_dev_v2_striping_info(rbd_dev);
5149		if (ret < 0)
5150			goto out_err;
5151	}
5152	/* No support for crypto and compression type format 2 images */
5153
5154	return 0;
5155out_err:
5156	rbd_dev->header.features = 0;
5157	kfree(rbd_dev->header.object_prefix);
5158	rbd_dev->header.object_prefix = NULL;
5159
5160	return ret;
5161}
5162
5163static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
5164{
5165	struct rbd_device *parent = NULL;
5166	struct rbd_spec *parent_spec;
5167	struct rbd_client *rbdc;
5168	int ret;
5169
5170	if (!rbd_dev->parent_spec)
5171		return 0;
5172	/*
5173	 * We need to pass a reference to the client and the parent
5174	 * spec when creating the parent rbd_dev.  Images related by
5175	 * parent/child relationships always share both.
5176	 */
5177	parent_spec = rbd_spec_get(rbd_dev->parent_spec);
5178	rbdc = __rbd_get_client(rbd_dev->rbd_client);
5179
5180	ret = -ENOMEM;
5181	parent = rbd_dev_create(rbdc, parent_spec);
5182	if (!parent)
5183		goto out_err;
5184
5185	ret = rbd_dev_image_probe(parent, false);
5186	if (ret < 0)
5187		goto out_err;
5188	rbd_dev->parent = parent;
5189	atomic_set(&rbd_dev->parent_ref, 1);
5190
5191	return 0;
5192out_err:
5193	if (parent) {
5194		rbd_dev_unparent(rbd_dev);
5195		kfree(rbd_dev->header_name);
5196		rbd_dev_destroy(parent);
5197	} else {
5198		rbd_put_client(rbdc);
5199		rbd_spec_put(parent_spec);
5200	}
5201
5202	return ret;
5203}
5204
5205static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
5206{
5207	int ret;
5208
5209	/* Get an id and fill in device name. */
5210
5211	ret = rbd_dev_id_get(rbd_dev);
5212	if (ret)
5213		return ret;
5214
5215	BUILD_BUG_ON(DEV_NAME_LEN
5216			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
5217	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
5218
5219	/* Record our major and minor device numbers. */
5220
5221	if (!single_major) {
5222		ret = register_blkdev(0, rbd_dev->name);
5223		if (ret < 0)
5224			goto err_out_id;
5225
5226		rbd_dev->major = ret;
5227		rbd_dev->minor = 0;
5228	} else {
5229		rbd_dev->major = rbd_major;
5230		rbd_dev->minor = rbd_dev_id_to_minor(rbd_dev->dev_id);
5231	}
5232
5233	/* Set up the blkdev mapping. */
5234
5235	ret = rbd_init_disk(rbd_dev);
5236	if (ret)
5237		goto err_out_blkdev;
5238
5239	ret = rbd_dev_mapping_set(rbd_dev);
5240	if (ret)
5241		goto err_out_disk;
5242
5243	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
5244	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
5245
5246	ret = rbd_bus_add_dev(rbd_dev);
5247	if (ret)
5248		goto err_out_mapping;
5249
5250	/* Everything's ready.  Announce the disk to the world. */
5251
5252	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5253	add_disk(rbd_dev->disk);
5254
5255	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
5256		(unsigned long long) rbd_dev->mapping.size);
5257
5258	return ret;
5259
5260err_out_mapping:
5261	rbd_dev_mapping_clear(rbd_dev);
5262err_out_disk:
5263	rbd_free_disk(rbd_dev);
5264err_out_blkdev:
5265	if (!single_major)
5266		unregister_blkdev(rbd_dev->major, rbd_dev->name);
5267err_out_id:
5268	rbd_dev_id_put(rbd_dev);
5269	rbd_dev_mapping_clear(rbd_dev);
5270
5271	return ret;
5272}
5273
5274static int rbd_dev_header_name(struct rbd_device *rbd_dev)
5275{
5276	struct rbd_spec *spec = rbd_dev->spec;
5277	size_t size;
5278
5279	/* Record the header object name for this rbd image. */
5280
5281	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
5282
5283	if (rbd_dev->image_format == 1)
5284		size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
5285	else
5286		size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
5287
5288	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
5289	if (!rbd_dev->header_name)
5290		return -ENOMEM;
5291
5292	if (rbd_dev->image_format == 1)
5293		sprintf(rbd_dev->header_name, "%s%s",
5294			spec->image_name, RBD_SUFFIX);
5295	else
5296		sprintf(rbd_dev->header_name, "%s%s",
5297			RBD_HEADER_PREFIX, spec->image_id);
5298	return 0;
5299}
5300
5301static void rbd_dev_image_release(struct rbd_device *rbd_dev)
5302{
5303	rbd_dev_unprobe(rbd_dev);
5304	kfree(rbd_dev->header_name);
5305	rbd_dev->header_name = NULL;
5306	rbd_dev->image_format = 0;
5307	kfree(rbd_dev->spec->image_id);
5308	rbd_dev->spec->image_id = NULL;
5309
5310	rbd_dev_destroy(rbd_dev);
5311}
5312
5313/*
5314 * Probe for the existence of the header object for the given rbd
5315 * device.  If this image is the one being mapped (i.e., not a
5316 * parent), initiate a watch on its header object before using that
5317 * object to get detailed information about the rbd image.
5318 */
5319static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
5320{
5321	int ret;
5322
5323	/*
5324	 * Get the id from the image id object.  Unless there's an
5325	 * error, rbd_dev->spec->image_id will be filled in with
5326	 * a dynamically-allocated string, and rbd_dev->image_format
5327	 * will be set to either 1 or 2.
5328	 */
5329	ret = rbd_dev_image_id(rbd_dev);
5330	if (ret)
5331		return ret;
5332
5333	ret = rbd_dev_header_name(rbd_dev);
5334	if (ret)
5335		goto err_out_format;
5336
5337	if (mapping) {
5338		ret = rbd_dev_header_watch_sync(rbd_dev);
5339		if (ret)
5340			goto out_header_name;
5341	}
5342
5343	ret = rbd_dev_header_info(rbd_dev);
5344	if (ret)
5345		goto err_out_watch;
5346
5347	/*
5348	 * If this image is the one being mapped, we have pool name and
5349	 * id, image name and id, and snap name - need to fill snap id.
5350	 * Otherwise this is a parent image, identified by pool, image
5351	 * and snap ids - need to fill in names for those ids.
5352	 */
5353	if (mapping)
5354		ret = rbd_spec_fill_snap_id(rbd_dev);
5355	else
5356		ret = rbd_spec_fill_names(rbd_dev);
5357	if (ret)
5358		goto err_out_probe;
5359
5360	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
5361		ret = rbd_dev_v2_parent_info(rbd_dev);
5362		if (ret)
5363			goto err_out_probe;
5364
5365		/*
5366		 * Need to warn users if this image is the one being
5367		 * mapped and has a parent.
5368		 */
5369		if (mapping && rbd_dev->parent_spec)
5370			rbd_warn(rbd_dev,
5371				 "WARNING: kernel layering is EXPERIMENTAL!");
5372	}
5373
5374	ret = rbd_dev_probe_parent(rbd_dev);
5375	if (ret)
5376		goto err_out_probe;
5377
5378	dout("discovered format %u image, header name is %s\n",
5379		rbd_dev->image_format, rbd_dev->header_name);
5380	return 0;
5381
5382err_out_probe:
5383	rbd_dev_unprobe(rbd_dev);
5384err_out_watch:
5385	if (mapping)
5386		rbd_dev_header_unwatch_sync(rbd_dev);
5387out_header_name:
5388	kfree(rbd_dev->header_name);
5389	rbd_dev->header_name = NULL;
5390err_out_format:
5391	rbd_dev->image_format = 0;
5392	kfree(rbd_dev->spec->image_id);
5393	rbd_dev->spec->image_id = NULL;
5394	return ret;
5395}
5396
5397static ssize_t do_rbd_add(struct bus_type *bus,
5398			  const char *buf,
5399			  size_t count)
5400{
5401	struct rbd_device *rbd_dev = NULL;
5402	struct ceph_options *ceph_opts = NULL;
5403	struct rbd_options *rbd_opts = NULL;
5404	struct rbd_spec *spec = NULL;
5405	struct rbd_client *rbdc;
5406	bool read_only;
5407	int rc = -ENOMEM;
5408
5409	if (!try_module_get(THIS_MODULE))
5410		return -ENODEV;
5411
5412	/* parse add command */
5413	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
5414	if (rc < 0)
5415		goto err_out_module;
5416	read_only = rbd_opts->read_only;
5417	kfree(rbd_opts);
5418	rbd_opts = NULL;	/* done with this */
5419
5420	rbdc = rbd_get_client(ceph_opts);
5421	if (IS_ERR(rbdc)) {
5422		rc = PTR_ERR(rbdc);
5423		goto err_out_args;
5424	}
5425
5426	/* pick the pool */
5427	rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
5428	if (rc < 0)
5429		goto err_out_client;
5430	spec->pool_id = (u64)rc;
5431
5432	/* The ceph file layout needs to fit pool id in 32 bits */
5433
5434	if (spec->pool_id > (u64)U32_MAX) {
5435		rbd_warn(NULL, "pool id too large (%llu > %u)",
5436				(unsigned long long)spec->pool_id, U32_MAX);
5437		rc = -EIO;
5438		goto err_out_client;
5439	}
5440
5441	rbd_dev = rbd_dev_create(rbdc, spec);
5442	if (!rbd_dev)
5443		goto err_out_client;
5444	rbdc = NULL;		/* rbd_dev now owns this */
5445	spec = NULL;		/* rbd_dev now owns this */
5446
5447	rc = rbd_dev_image_probe(rbd_dev, true);
5448	if (rc < 0)
5449		goto err_out_rbd_dev;
5450
5451	/* If we are mapping a snapshot it must be marked read-only */
5452
5453	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5454		read_only = true;
5455	rbd_dev->mapping.read_only = read_only;
5456
5457	rc = rbd_dev_device_setup(rbd_dev);
5458	if (rc) {
5459		/*
5460		 * rbd_dev_header_unwatch_sync() can't be moved into
5461		 * rbd_dev_image_release() without refactoring, see
5462		 * commit 1f3ef78861ac.
5463		 */
5464		rbd_dev_header_unwatch_sync(rbd_dev);
5465		rbd_dev_image_release(rbd_dev);
5466		goto err_out_module;
5467	}
5468
5469	return count;
5470
5471err_out_rbd_dev:
5472	rbd_dev_destroy(rbd_dev);
5473err_out_client:
5474	rbd_put_client(rbdc);
5475err_out_args:
5476	rbd_spec_put(spec);
5477err_out_module:
5478	module_put(THIS_MODULE);
5479
5480	dout("Error adding device %s\n", buf);
5481
5482	return (ssize_t)rc;
5483}
5484
5485static ssize_t rbd_add(struct bus_type *bus,
5486		       const char *buf,
5487		       size_t count)
5488{
5489	if (single_major)
5490		return -EINVAL;
5491
5492	return do_rbd_add(bus, buf, count);
5493}
5494
5495static ssize_t rbd_add_single_major(struct bus_type *bus,
5496				    const char *buf,
5497				    size_t count)
5498{
5499	return do_rbd_add(bus, buf, count);
5500}
5501
5502static void rbd_dev_device_release(struct device *dev)
5503{
5504	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
5505
5506	rbd_free_disk(rbd_dev);
5507	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
5508	rbd_dev_mapping_clear(rbd_dev);
5509	if (!single_major)
5510		unregister_blkdev(rbd_dev->major, rbd_dev->name);
5511	rbd_dev_id_put(rbd_dev);
5512	rbd_dev_mapping_clear(rbd_dev);
5513}
5514
5515static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5516{
5517	while (rbd_dev->parent) {
5518		struct rbd_device *first = rbd_dev;
5519		struct rbd_device *second = first->parent;
5520		struct rbd_device *third;
5521
5522		/*
5523		 * Follow to the parent with no grandparent and
5524		 * remove it.
5525		 */
5526		while (second && (third = second->parent)) {
5527			first = second;
5528			second = third;
5529		}
5530		rbd_assert(second);
5531		rbd_dev_image_release(second);
5532		first->parent = NULL;
5533		first->parent_overlap = 0;
5534
5535		rbd_assert(first->parent_spec);
5536		rbd_spec_put(first->parent_spec);
5537		first->parent_spec = NULL;
5538	}
5539}
5540
5541static ssize_t do_rbd_remove(struct bus_type *bus,
5542			     const char *buf,
5543			     size_t count)
5544{
5545	struct rbd_device *rbd_dev = NULL;
5546	struct list_head *tmp;
5547	int dev_id;
5548	unsigned long ul;
5549	bool already = false;
5550	int ret;
5551
5552	ret = kstrtoul(buf, 10, &ul);
5553	if (ret)
5554		return ret;
5555
5556	/* convert to int; abort if we lost anything in the conversion */
5557	dev_id = (int)ul;
5558	if (dev_id != ul)
5559		return -EINVAL;
5560
5561	ret = -ENOENT;
5562	spin_lock(&rbd_dev_list_lock);
5563	list_for_each(tmp, &rbd_dev_list) {
5564		rbd_dev = list_entry(tmp, struct rbd_device, node);
5565		if (rbd_dev->dev_id == dev_id) {
5566			ret = 0;
5567			break;
5568		}
5569	}
5570	if (!ret) {
5571		spin_lock_irq(&rbd_dev->lock);
5572		if (rbd_dev->open_count)
5573			ret = -EBUSY;
5574		else
5575			already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5576							&rbd_dev->flags);
5577		spin_unlock_irq(&rbd_dev->lock);
5578	}
5579	spin_unlock(&rbd_dev_list_lock);
5580	if (ret < 0 || already)
5581		return ret;
5582
5583	rbd_dev_header_unwatch_sync(rbd_dev);
5584	/*
5585	 * flush remaining watch callbacks - these must be complete
5586	 * before the osd_client is shutdown
5587	 */
5588	dout("%s: flushing notifies", __func__);
5589	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
5590
5591	/*
5592	 * Don't free anything from rbd_dev->disk until after all
5593	 * notifies are completely processed. Otherwise
5594	 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5595	 * in a potential use after free of rbd_dev->disk or rbd_dev.
5596	 */
5597	rbd_bus_del_dev(rbd_dev);
5598	rbd_dev_image_release(rbd_dev);
5599	module_put(THIS_MODULE);
5600
5601	return count;
5602}
5603
5604static ssize_t rbd_remove(struct bus_type *bus,
5605			  const char *buf,
5606			  size_t count)
5607{
5608	if (single_major)
5609		return -EINVAL;
5610
5611	return do_rbd_remove(bus, buf, count);
5612}
5613
5614static ssize_t rbd_remove_single_major(struct bus_type *bus,
5615				       const char *buf,
5616				       size_t count)
5617{
5618	return do_rbd_remove(bus, buf, count);
5619}
5620
5621/*
5622 * create control files in sysfs
5623 * /sys/bus/rbd/...
5624 */
5625static int rbd_sysfs_init(void)
5626{
5627	int ret;
5628
5629	ret = device_register(&rbd_root_dev);
5630	if (ret < 0)
5631		return ret;
5632
5633	ret = bus_register(&rbd_bus_type);
5634	if (ret < 0)
5635		device_unregister(&rbd_root_dev);
5636
5637	return ret;
5638}
5639
5640static void rbd_sysfs_cleanup(void)
5641{
5642	bus_unregister(&rbd_bus_type);
5643	device_unregister(&rbd_root_dev);
5644}
5645
5646static int rbd_slab_init(void)
5647{
5648	rbd_assert(!rbd_img_request_cache);
5649	rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5650					sizeof (struct rbd_img_request),
5651					__alignof__(struct rbd_img_request),
5652					0, NULL);
5653	if (!rbd_img_request_cache)
5654		return -ENOMEM;
5655
5656	rbd_assert(!rbd_obj_request_cache);
5657	rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5658					sizeof (struct rbd_obj_request),
5659					__alignof__(struct rbd_obj_request),
5660					0, NULL);
5661	if (!rbd_obj_request_cache)
5662		goto out_err;
5663
5664	rbd_assert(!rbd_segment_name_cache);
5665	rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5666					CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL);
5667	if (rbd_segment_name_cache)
5668		return 0;
5669out_err:
5670	if (rbd_obj_request_cache) {
5671		kmem_cache_destroy(rbd_obj_request_cache);
5672		rbd_obj_request_cache = NULL;
5673	}
5674
5675	kmem_cache_destroy(rbd_img_request_cache);
5676	rbd_img_request_cache = NULL;
5677
5678	return -ENOMEM;
5679}
5680
5681static void rbd_slab_exit(void)
5682{
5683	rbd_assert(rbd_segment_name_cache);
5684	kmem_cache_destroy(rbd_segment_name_cache);
5685	rbd_segment_name_cache = NULL;
5686
5687	rbd_assert(rbd_obj_request_cache);
5688	kmem_cache_destroy(rbd_obj_request_cache);
5689	rbd_obj_request_cache = NULL;
5690
5691	rbd_assert(rbd_img_request_cache);
5692	kmem_cache_destroy(rbd_img_request_cache);
5693	rbd_img_request_cache = NULL;
5694}
5695
5696static int __init rbd_init(void)
5697{
5698	int rc;
5699
5700	if (!libceph_compatible(NULL)) {
5701		rbd_warn(NULL, "libceph incompatibility (quitting)");
5702		return -EINVAL;
5703	}
5704
5705	rc = rbd_slab_init();
5706	if (rc)
5707		return rc;
5708
5709	/*
5710	 * The number of active work items is limited by the number of
5711	 * rbd devices, so leave @max_active at default.
5712	 */
5713	rbd_wq = alloc_workqueue(RBD_DRV_NAME, WQ_MEM_RECLAIM, 0);
5714	if (!rbd_wq) {
5715		rc = -ENOMEM;
5716		goto err_out_slab;
5717	}
5718
5719	if (single_major) {
5720		rbd_major = register_blkdev(0, RBD_DRV_NAME);
5721		if (rbd_major < 0) {
5722			rc = rbd_major;
5723			goto err_out_wq;
5724		}
5725	}
5726
5727	rc = rbd_sysfs_init();
5728	if (rc)
5729		goto err_out_blkdev;
5730
5731	if (single_major)
5732		pr_info("loaded (major %d)\n", rbd_major);
5733	else
5734		pr_info("loaded\n");
5735
5736	return 0;
5737
5738err_out_blkdev:
5739	if (single_major)
5740		unregister_blkdev(rbd_major, RBD_DRV_NAME);
5741err_out_wq:
5742	destroy_workqueue(rbd_wq);
5743err_out_slab:
5744	rbd_slab_exit();
5745	return rc;
5746}
5747
5748static void __exit rbd_exit(void)
5749{
5750	ida_destroy(&rbd_dev_id_ida);
5751	rbd_sysfs_cleanup();
5752	if (single_major)
5753		unregister_blkdev(rbd_major, RBD_DRV_NAME);
5754	destroy_workqueue(rbd_wq);
5755	rbd_slab_exit();
5756}
5757
5758module_init(rbd_init);
5759module_exit(rbd_exit);
5760
5761MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
5762MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5763MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5764/* following authorship retained from original osdblk.c */
5765MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5766
5767MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
5768MODULE_LICENSE("GPL");
5769