rbd.c revision c0cd10db4685a76397f32bed246e861705642576
1/*
2   rbd.c -- Export ceph rados objects as a Linux block device
3
4
5   based on drivers/block/osdblk.c:
6
7   Copyright 2009 Red Hat, Inc.
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with this program; see the file COPYING.  If not, write to
20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24   For usage instructions, please refer to:
25
26                 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
34#include <linux/parser.h>
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44#define RBD_DEBUG	/* Activate rbd_assert() calls */
45
46/*
47 * The basic unit of block I/O is a sector.  It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes.  These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define	SECTOR_SHIFT	9
53#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
54
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58#define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
59
60#define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
61#define RBD_MAX_SNAP_NAME_LEN	\
62			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64#define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
65
66#define RBD_SNAP_HEAD_NAME	"-"
67
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
70#define RBD_IMAGE_ID_LEN_MAX	64
71
72#define RBD_OBJ_PREFIX_LEN_MAX	64
73
74/* Feature bits */
75
76#define RBD_FEATURE_LAYERING	(1<<0)
77#define RBD_FEATURE_STRIPINGV2	(1<<1)
78#define RBD_FEATURES_ALL \
79	    (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81/* Features supported by this (client software) implementation. */
82
83#define RBD_FEATURES_SUPPORTED	(RBD_FEATURES_ALL)
84
85/*
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
90 */
91#define DEV_NAME_LEN		32
92#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
93
94/*
95 * block device image metadata (in-memory version)
96 */
97struct rbd_image_header {
98	/* These four fields never change for a given rbd image */
99	char *object_prefix;
100	u64 features;
101	__u8 obj_order;
102	__u8 crypt_type;
103	__u8 comp_type;
104
105	/* The remaining fields need to be updated occasionally */
106	u64 image_size;
107	struct ceph_snap_context *snapc;
108	char *snap_names;
109	u64 *snap_sizes;
110
111	u64 stripe_unit;
112	u64 stripe_count;
113
114	u64 obj_version;
115};
116
117/*
118 * An rbd image specification.
119 *
120 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
121 * identify an image.  Each rbd_dev structure includes a pointer to
122 * an rbd_spec structure that encapsulates this identity.
123 *
124 * Each of the id's in an rbd_spec has an associated name.  For a
125 * user-mapped image, the names are supplied and the id's associated
126 * with them are looked up.  For a layered image, a parent image is
127 * defined by the tuple, and the names are looked up.
128 *
129 * An rbd_dev structure contains a parent_spec pointer which is
130 * non-null if the image it represents is a child in a layered
131 * image.  This pointer will refer to the rbd_spec structure used
132 * by the parent rbd_dev for its own identity (i.e., the structure
133 * is shared between the parent and child).
134 *
135 * Since these structures are populated once, during the discovery
136 * phase of image construction, they are effectively immutable so
137 * we make no effort to synchronize access to them.
138 *
139 * Note that code herein does not assume the image name is known (it
140 * could be a null pointer).
141 */
142struct rbd_spec {
143	u64		pool_id;
144	const char	*pool_name;
145
146	const char	*image_id;
147	const char	*image_name;
148
149	u64		snap_id;
150	const char	*snap_name;
151
152	struct kref	kref;
153};
154
155/*
156 * an instance of the client.  multiple devices may share an rbd client.
157 */
158struct rbd_client {
159	struct ceph_client	*client;
160	struct kref		kref;
161	struct list_head	node;
162};
163
164struct rbd_img_request;
165typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
166
167#define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
168
169struct rbd_obj_request;
170typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
171
172enum obj_request_type {
173	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
174};
175
176enum obj_req_flags {
177	OBJ_REQ_DONE,		/* completion flag: not done = 0, done = 1 */
178	OBJ_REQ_IMG_DATA,	/* object usage: standalone = 0, image = 1 */
179	OBJ_REQ_KNOWN,		/* EXISTS flag valid: no = 0, yes = 1 */
180	OBJ_REQ_EXISTS,		/* target exists: no = 0, yes = 1 */
181};
182
183struct rbd_obj_request {
184	const char		*object_name;
185	u64			offset;		/* object start byte */
186	u64			length;		/* bytes from offset */
187	unsigned long		flags;
188
189	/*
190	 * An object request associated with an image will have its
191	 * img_data flag set; a standalone object request will not.
192	 *
193	 * A standalone object request will have which == BAD_WHICH
194	 * and a null obj_request pointer.
195	 *
196	 * An object request initiated in support of a layered image
197	 * object (to check for its existence before a write) will
198	 * have which == BAD_WHICH and a non-null obj_request pointer.
199	 *
200	 * Finally, an object request for rbd image data will have
201	 * which != BAD_WHICH, and will have a non-null img_request
202	 * pointer.  The value of which will be in the range
203	 * 0..(img_request->obj_request_count-1).
204	 */
205	union {
206		struct rbd_obj_request	*obj_request;	/* STAT op */
207		struct {
208			struct rbd_img_request	*img_request;
209			u64			img_offset;
210			/* links for img_request->obj_requests list */
211			struct list_head	links;
212		};
213	};
214	u32			which;		/* posn image request list */
215
216	enum obj_request_type	type;
217	union {
218		struct bio	*bio_list;
219		struct {
220			struct page	**pages;
221			u32		page_count;
222		};
223	};
224	struct page		**copyup_pages;
225
226	struct ceph_osd_request	*osd_req;
227
228	u64			xferred;	/* bytes transferred */
229	u64			version;
230	int			result;
231
232	rbd_obj_callback_t	callback;
233	struct completion	completion;
234
235	struct kref		kref;
236};
237
238enum img_req_flags {
239	IMG_REQ_WRITE,		/* I/O direction: read = 0, write = 1 */
240	IMG_REQ_CHILD,		/* initiator: block = 0, child image = 1 */
241	IMG_REQ_LAYERED,	/* ENOENT handling: normal = 0, layered = 1 */
242};
243
244struct rbd_img_request {
245	struct rbd_device	*rbd_dev;
246	u64			offset;	/* starting image byte offset */
247	u64			length;	/* byte count from offset */
248	unsigned long		flags;
249	union {
250		u64			snap_id;	/* for reads */
251		struct ceph_snap_context *snapc;	/* for writes */
252	};
253	union {
254		struct request		*rq;		/* block request */
255		struct rbd_obj_request	*obj_request;	/* obj req initiator */
256	};
257	struct page		**copyup_pages;
258	spinlock_t		completion_lock;/* protects next_completion */
259	u32			next_completion;
260	rbd_img_callback_t	callback;
261	u64			xferred;/* aggregate bytes transferred */
262	int			result;	/* first nonzero obj_request result */
263
264	u32			obj_request_count;
265	struct list_head	obj_requests;	/* rbd_obj_request structs */
266
267	struct kref		kref;
268};
269
270#define for_each_obj_request(ireq, oreq) \
271	list_for_each_entry(oreq, &(ireq)->obj_requests, links)
272#define for_each_obj_request_from(ireq, oreq) \
273	list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
274#define for_each_obj_request_safe(ireq, oreq, n) \
275	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276
277struct rbd_snap {
278	const char		*name;
279	u64			size;
280	struct list_head	node;
281	u64			id;
282	u64			features;
283};
284
285struct rbd_mapping {
286	u64                     size;
287	u64                     features;
288	bool			read_only;
289};
290
291/*
292 * a single device
293 */
294struct rbd_device {
295	int			dev_id;		/* blkdev unique id */
296
297	int			major;		/* blkdev assigned major */
298	struct gendisk		*disk;		/* blkdev's gendisk and rq */
299
300	u32			image_format;	/* Either 1 or 2 */
301	struct rbd_client	*rbd_client;
302
303	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
304
305	spinlock_t		lock;		/* queue, flags, open_count */
306
307	struct rbd_image_header	header;
308	unsigned long		flags;		/* possibly lock protected */
309	struct rbd_spec		*spec;
310
311	char			*header_name;
312
313	struct ceph_file_layout	layout;
314
315	struct ceph_osd_event   *watch_event;
316	struct rbd_obj_request	*watch_request;
317
318	struct rbd_spec		*parent_spec;
319	u64			parent_overlap;
320	struct rbd_device	*parent;
321
322	/* protects updating the header */
323	struct rw_semaphore     header_rwsem;
324
325	struct rbd_mapping	mapping;
326
327	struct list_head	node;
328
329	/* list of snapshots */
330	struct list_head	snaps;
331
332	/* sysfs related */
333	struct device		dev;
334	unsigned long		open_count;	/* protected by lock */
335};
336
337/*
338 * Flag bits for rbd_dev->flags.  If atomicity is required,
339 * rbd_dev->lock is used to protect access.
340 *
341 * Currently, only the "removing" flag (which is coupled with the
342 * "open_count" field) requires atomic access.
343 */
344enum rbd_dev_flags {
345	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
346	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
347};
348
349static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
350
351static LIST_HEAD(rbd_dev_list);    /* devices */
352static DEFINE_SPINLOCK(rbd_dev_list_lock);
353
354static LIST_HEAD(rbd_client_list);		/* clients */
355static DEFINE_SPINLOCK(rbd_client_list_lock);
356
357static int rbd_img_request_submit(struct rbd_img_request *img_request);
358
359static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
360
361static void rbd_dev_release(struct device *dev);
362static void rbd_snap_destroy(struct rbd_snap *snap);
363
364static ssize_t rbd_add(struct bus_type *bus, const char *buf,
365		       size_t count);
366static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
367			  size_t count);
368static int rbd_dev_probe(struct rbd_device *rbd_dev);
369
370static struct bus_attribute rbd_bus_attrs[] = {
371	__ATTR(add, S_IWUSR, NULL, rbd_add),
372	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
373	__ATTR_NULL
374};
375
376static struct bus_type rbd_bus_type = {
377	.name		= "rbd",
378	.bus_attrs	= rbd_bus_attrs,
379};
380
381static void rbd_root_dev_release(struct device *dev)
382{
383}
384
385static struct device rbd_root_dev = {
386	.init_name =    "rbd",
387	.release =      rbd_root_dev_release,
388};
389
390static __printf(2, 3)
391void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
392{
393	struct va_format vaf;
394	va_list args;
395
396	va_start(args, fmt);
397	vaf.fmt = fmt;
398	vaf.va = &args;
399
400	if (!rbd_dev)
401		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
402	else if (rbd_dev->disk)
403		printk(KERN_WARNING "%s: %s: %pV\n",
404			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
405	else if (rbd_dev->spec && rbd_dev->spec->image_name)
406		printk(KERN_WARNING "%s: image %s: %pV\n",
407			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
408	else if (rbd_dev->spec && rbd_dev->spec->image_id)
409		printk(KERN_WARNING "%s: id %s: %pV\n",
410			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
411	else	/* punt */
412		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
413			RBD_DRV_NAME, rbd_dev, &vaf);
414	va_end(args);
415}
416
417#ifdef RBD_DEBUG
418#define rbd_assert(expr)						\
419		if (unlikely(!(expr))) {				\
420			printk(KERN_ERR "\nAssertion failure in %s() "	\
421						"at line %d:\n\n"	\
422					"\trbd_assert(%s);\n\n",	\
423					__func__, __LINE__, #expr);	\
424			BUG();						\
425		}
426#else /* !RBD_DEBUG */
427#  define rbd_assert(expr)	((void) 0)
428#endif /* !RBD_DEBUG */
429
430static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
431static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
432
433static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
434static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
435
436static int rbd_open(struct block_device *bdev, fmode_t mode)
437{
438	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
439	bool removing = false;
440
441	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
442		return -EROFS;
443
444	spin_lock_irq(&rbd_dev->lock);
445	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
446		removing = true;
447	else
448		rbd_dev->open_count++;
449	spin_unlock_irq(&rbd_dev->lock);
450	if (removing)
451		return -ENOENT;
452
453	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
454	(void) get_device(&rbd_dev->dev);
455	set_device_ro(bdev, rbd_dev->mapping.read_only);
456	mutex_unlock(&ctl_mutex);
457
458	return 0;
459}
460
461static int rbd_release(struct gendisk *disk, fmode_t mode)
462{
463	struct rbd_device *rbd_dev = disk->private_data;
464	unsigned long open_count_before;
465
466	spin_lock_irq(&rbd_dev->lock);
467	open_count_before = rbd_dev->open_count--;
468	spin_unlock_irq(&rbd_dev->lock);
469	rbd_assert(open_count_before > 0);
470
471	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
472	put_device(&rbd_dev->dev);
473	mutex_unlock(&ctl_mutex);
474
475	return 0;
476}
477
478static const struct block_device_operations rbd_bd_ops = {
479	.owner			= THIS_MODULE,
480	.open			= rbd_open,
481	.release		= rbd_release,
482};
483
484/*
485 * Initialize an rbd client instance.
486 * We own *ceph_opts.
487 */
488static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
489{
490	struct rbd_client *rbdc;
491	int ret = -ENOMEM;
492
493	dout("%s:\n", __func__);
494	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
495	if (!rbdc)
496		goto out_opt;
497
498	kref_init(&rbdc->kref);
499	INIT_LIST_HEAD(&rbdc->node);
500
501	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
502
503	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
504	if (IS_ERR(rbdc->client))
505		goto out_mutex;
506	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
507
508	ret = ceph_open_session(rbdc->client);
509	if (ret < 0)
510		goto out_err;
511
512	spin_lock(&rbd_client_list_lock);
513	list_add_tail(&rbdc->node, &rbd_client_list);
514	spin_unlock(&rbd_client_list_lock);
515
516	mutex_unlock(&ctl_mutex);
517	dout("%s: rbdc %p\n", __func__, rbdc);
518
519	return rbdc;
520
521out_err:
522	ceph_destroy_client(rbdc->client);
523out_mutex:
524	mutex_unlock(&ctl_mutex);
525	kfree(rbdc);
526out_opt:
527	if (ceph_opts)
528		ceph_destroy_options(ceph_opts);
529	dout("%s: error %d\n", __func__, ret);
530
531	return ERR_PTR(ret);
532}
533
534static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
535{
536	kref_get(&rbdc->kref);
537
538	return rbdc;
539}
540
541/*
542 * Find a ceph client with specific addr and configuration.  If
543 * found, bump its reference count.
544 */
545static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
546{
547	struct rbd_client *client_node;
548	bool found = false;
549
550	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
551		return NULL;
552
553	spin_lock(&rbd_client_list_lock);
554	list_for_each_entry(client_node, &rbd_client_list, node) {
555		if (!ceph_compare_options(ceph_opts, client_node->client)) {
556			__rbd_get_client(client_node);
557
558			found = true;
559			break;
560		}
561	}
562	spin_unlock(&rbd_client_list_lock);
563
564	return found ? client_node : NULL;
565}
566
567/*
568 * mount options
569 */
570enum {
571	Opt_last_int,
572	/* int args above */
573	Opt_last_string,
574	/* string args above */
575	Opt_read_only,
576	Opt_read_write,
577	/* Boolean args above */
578	Opt_last_bool,
579};
580
581static match_table_t rbd_opts_tokens = {
582	/* int args above */
583	/* string args above */
584	{Opt_read_only, "read_only"},
585	{Opt_read_only, "ro"},		/* Alternate spelling */
586	{Opt_read_write, "read_write"},
587	{Opt_read_write, "rw"},		/* Alternate spelling */
588	/* Boolean args above */
589	{-1, NULL}
590};
591
592struct rbd_options {
593	bool	read_only;
594};
595
596#define RBD_READ_ONLY_DEFAULT	false
597
598static int parse_rbd_opts_token(char *c, void *private)
599{
600	struct rbd_options *rbd_opts = private;
601	substring_t argstr[MAX_OPT_ARGS];
602	int token, intval, ret;
603
604	token = match_token(c, rbd_opts_tokens, argstr);
605	if (token < 0)
606		return -EINVAL;
607
608	if (token < Opt_last_int) {
609		ret = match_int(&argstr[0], &intval);
610		if (ret < 0) {
611			pr_err("bad mount option arg (not int) "
612			       "at '%s'\n", c);
613			return ret;
614		}
615		dout("got int token %d val %d\n", token, intval);
616	} else if (token > Opt_last_int && token < Opt_last_string) {
617		dout("got string token %d val %s\n", token,
618		     argstr[0].from);
619	} else if (token > Opt_last_string && token < Opt_last_bool) {
620		dout("got Boolean token %d\n", token);
621	} else {
622		dout("got token %d\n", token);
623	}
624
625	switch (token) {
626	case Opt_read_only:
627		rbd_opts->read_only = true;
628		break;
629	case Opt_read_write:
630		rbd_opts->read_only = false;
631		break;
632	default:
633		rbd_assert(false);
634		break;
635	}
636	return 0;
637}
638
639/*
640 * Get a ceph client with specific addr and configuration, if one does
641 * not exist create it.
642 */
643static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
644{
645	struct rbd_client *rbdc;
646
647	rbdc = rbd_client_find(ceph_opts);
648	if (rbdc)	/* using an existing client */
649		ceph_destroy_options(ceph_opts);
650	else
651		rbdc = rbd_client_create(ceph_opts);
652
653	return rbdc;
654}
655
656/*
657 * Destroy ceph client
658 *
659 * Caller must hold rbd_client_list_lock.
660 */
661static void rbd_client_release(struct kref *kref)
662{
663	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
664
665	dout("%s: rbdc %p\n", __func__, rbdc);
666	spin_lock(&rbd_client_list_lock);
667	list_del(&rbdc->node);
668	spin_unlock(&rbd_client_list_lock);
669
670	ceph_destroy_client(rbdc->client);
671	kfree(rbdc);
672}
673
674/*
675 * Drop reference to ceph client node. If it's not referenced anymore, release
676 * it.
677 */
678static void rbd_put_client(struct rbd_client *rbdc)
679{
680	if (rbdc)
681		kref_put(&rbdc->kref, rbd_client_release);
682}
683
684static bool rbd_image_format_valid(u32 image_format)
685{
686	return image_format == 1 || image_format == 2;
687}
688
689static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
690{
691	size_t size;
692	u32 snap_count;
693
694	/* The header has to start with the magic rbd header text */
695	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
696		return false;
697
698	/* The bio layer requires at least sector-sized I/O */
699
700	if (ondisk->options.order < SECTOR_SHIFT)
701		return false;
702
703	/* If we use u64 in a few spots we may be able to loosen this */
704
705	if (ondisk->options.order > 8 * sizeof (int) - 1)
706		return false;
707
708	/*
709	 * The size of a snapshot header has to fit in a size_t, and
710	 * that limits the number of snapshots.
711	 */
712	snap_count = le32_to_cpu(ondisk->snap_count);
713	size = SIZE_MAX - sizeof (struct ceph_snap_context);
714	if (snap_count > size / sizeof (__le64))
715		return false;
716
717	/*
718	 * Not only that, but the size of the entire the snapshot
719	 * header must also be representable in a size_t.
720	 */
721	size -= snap_count * sizeof (__le64);
722	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
723		return false;
724
725	return true;
726}
727
728/*
729 * Create a new header structure, translate header format from the on-disk
730 * header.
731 */
732static int rbd_header_from_disk(struct rbd_image_header *header,
733				 struct rbd_image_header_ondisk *ondisk)
734{
735	u32 snap_count;
736	size_t len;
737	size_t size;
738	u32 i;
739
740	memset(header, 0, sizeof (*header));
741
742	snap_count = le32_to_cpu(ondisk->snap_count);
743
744	len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
745	header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
746	if (!header->object_prefix)
747		return -ENOMEM;
748	memcpy(header->object_prefix, ondisk->object_prefix, len);
749	header->object_prefix[len] = '\0';
750
751	if (snap_count) {
752		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
753
754		/* Save a copy of the snapshot names */
755
756		if (snap_names_len > (u64) SIZE_MAX)
757			return -EIO;
758		header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
759		if (!header->snap_names)
760			goto out_err;
761		/*
762		 * Note that rbd_dev_v1_header_read() guarantees
763		 * the ondisk buffer we're working with has
764		 * snap_names_len bytes beyond the end of the
765		 * snapshot id array, this memcpy() is safe.
766		 */
767		memcpy(header->snap_names, &ondisk->snaps[snap_count],
768			snap_names_len);
769
770		/* Record each snapshot's size */
771
772		size = snap_count * sizeof (*header->snap_sizes);
773		header->snap_sizes = kmalloc(size, GFP_KERNEL);
774		if (!header->snap_sizes)
775			goto out_err;
776		for (i = 0; i < snap_count; i++)
777			header->snap_sizes[i] =
778				le64_to_cpu(ondisk->snaps[i].image_size);
779	} else {
780		header->snap_names = NULL;
781		header->snap_sizes = NULL;
782	}
783
784	header->features = 0;	/* No features support in v1 images */
785	header->obj_order = ondisk->options.order;
786	header->crypt_type = ondisk->options.crypt_type;
787	header->comp_type = ondisk->options.comp_type;
788
789	/* Allocate and fill in the snapshot context */
790
791	header->image_size = le64_to_cpu(ondisk->image_size);
792	size = sizeof (struct ceph_snap_context);
793	size += snap_count * sizeof (header->snapc->snaps[0]);
794	header->snapc = kzalloc(size, GFP_KERNEL);
795	if (!header->snapc)
796		goto out_err;
797
798	atomic_set(&header->snapc->nref, 1);
799	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
800	header->snapc->num_snaps = snap_count;
801	for (i = 0; i < snap_count; i++)
802		header->snapc->snaps[i] =
803			le64_to_cpu(ondisk->snaps[i].id);
804
805	return 0;
806
807out_err:
808	kfree(header->snap_sizes);
809	header->snap_sizes = NULL;
810	kfree(header->snap_names);
811	header->snap_names = NULL;
812	kfree(header->object_prefix);
813	header->object_prefix = NULL;
814
815	return -ENOMEM;
816}
817
818static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
819{
820	struct rbd_snap *snap;
821
822	if (snap_id == CEPH_NOSNAP)
823		return RBD_SNAP_HEAD_NAME;
824
825	list_for_each_entry(snap, &rbd_dev->snaps, node)
826		if (snap_id == snap->id)
827			return snap->name;
828
829	return NULL;
830}
831
832static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
833					const char *snap_name)
834{
835	struct rbd_snap *snap;
836
837	list_for_each_entry(snap, &rbd_dev->snaps, node)
838		if (!strcmp(snap_name, snap->name))
839			return snap;
840
841	return NULL;
842}
843
844static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
845{
846	if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
847		    sizeof (RBD_SNAP_HEAD_NAME))) {
848		rbd_dev->mapping.size = rbd_dev->header.image_size;
849		rbd_dev->mapping.features = rbd_dev->header.features;
850	} else {
851		struct rbd_snap *snap;
852
853		snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
854		if (!snap)
855			return -ENOENT;
856		rbd_dev->mapping.size = snap->size;
857		rbd_dev->mapping.features = snap->features;
858		rbd_dev->mapping.read_only = true;
859	}
860	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
861
862	return 0;
863}
864
865static void rbd_header_free(struct rbd_image_header *header)
866{
867	kfree(header->object_prefix);
868	header->object_prefix = NULL;
869	kfree(header->snap_sizes);
870	header->snap_sizes = NULL;
871	kfree(header->snap_names);
872	header->snap_names = NULL;
873	ceph_put_snap_context(header->snapc);
874	header->snapc = NULL;
875}
876
877static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
878{
879	char *name;
880	u64 segment;
881	int ret;
882
883	name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
884	if (!name)
885		return NULL;
886	segment = offset >> rbd_dev->header.obj_order;
887	ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
888			rbd_dev->header.object_prefix, segment);
889	if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
890		pr_err("error formatting segment name for #%llu (%d)\n",
891			segment, ret);
892		kfree(name);
893		name = NULL;
894	}
895
896	return name;
897}
898
899static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
900{
901	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
902
903	return offset & (segment_size - 1);
904}
905
906static u64 rbd_segment_length(struct rbd_device *rbd_dev,
907				u64 offset, u64 length)
908{
909	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
910
911	offset &= segment_size - 1;
912
913	rbd_assert(length <= U64_MAX - offset);
914	if (offset + length > segment_size)
915		length = segment_size - offset;
916
917	return length;
918}
919
920/*
921 * returns the size of an object in the image
922 */
923static u64 rbd_obj_bytes(struct rbd_image_header *header)
924{
925	return 1 << header->obj_order;
926}
927
928/*
929 * bio helpers
930 */
931
932static void bio_chain_put(struct bio *chain)
933{
934	struct bio *tmp;
935
936	while (chain) {
937		tmp = chain;
938		chain = chain->bi_next;
939		bio_put(tmp);
940	}
941}
942
943/*
944 * zeros a bio chain, starting at specific offset
945 */
946static void zero_bio_chain(struct bio *chain, int start_ofs)
947{
948	struct bio_vec *bv;
949	unsigned long flags;
950	void *buf;
951	int i;
952	int pos = 0;
953
954	while (chain) {
955		bio_for_each_segment(bv, chain, i) {
956			if (pos + bv->bv_len > start_ofs) {
957				int remainder = max(start_ofs - pos, 0);
958				buf = bvec_kmap_irq(bv, &flags);
959				memset(buf + remainder, 0,
960				       bv->bv_len - remainder);
961				bvec_kunmap_irq(buf, &flags);
962			}
963			pos += bv->bv_len;
964		}
965
966		chain = chain->bi_next;
967	}
968}
969
970/*
971 * similar to zero_bio_chain(), zeros data defined by a page array,
972 * starting at the given byte offset from the start of the array and
973 * continuing up to the given end offset.  The pages array is
974 * assumed to be big enough to hold all bytes up to the end.
975 */
976static void zero_pages(struct page **pages, u64 offset, u64 end)
977{
978	struct page **page = &pages[offset >> PAGE_SHIFT];
979
980	rbd_assert(end > offset);
981	rbd_assert(end - offset <= (u64)SIZE_MAX);
982	while (offset < end) {
983		size_t page_offset;
984		size_t length;
985		unsigned long flags;
986		void *kaddr;
987
988		page_offset = (size_t)(offset & ~PAGE_MASK);
989		length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
990		local_irq_save(flags);
991		kaddr = kmap_atomic(*page);
992		memset(kaddr + page_offset, 0, length);
993		kunmap_atomic(kaddr);
994		local_irq_restore(flags);
995
996		offset += length;
997		page++;
998	}
999}
1000
1001/*
1002 * Clone a portion of a bio, starting at the given byte offset
1003 * and continuing for the number of bytes indicated.
1004 */
1005static struct bio *bio_clone_range(struct bio *bio_src,
1006					unsigned int offset,
1007					unsigned int len,
1008					gfp_t gfpmask)
1009{
1010	struct bio_vec *bv;
1011	unsigned int resid;
1012	unsigned short idx;
1013	unsigned int voff;
1014	unsigned short end_idx;
1015	unsigned short vcnt;
1016	struct bio *bio;
1017
1018	/* Handle the easy case for the caller */
1019
1020	if (!offset && len == bio_src->bi_size)
1021		return bio_clone(bio_src, gfpmask);
1022
1023	if (WARN_ON_ONCE(!len))
1024		return NULL;
1025	if (WARN_ON_ONCE(len > bio_src->bi_size))
1026		return NULL;
1027	if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1028		return NULL;
1029
1030	/* Find first affected segment... */
1031
1032	resid = offset;
1033	__bio_for_each_segment(bv, bio_src, idx, 0) {
1034		if (resid < bv->bv_len)
1035			break;
1036		resid -= bv->bv_len;
1037	}
1038	voff = resid;
1039
1040	/* ...and the last affected segment */
1041
1042	resid += len;
1043	__bio_for_each_segment(bv, bio_src, end_idx, idx) {
1044		if (resid <= bv->bv_len)
1045			break;
1046		resid -= bv->bv_len;
1047	}
1048	vcnt = end_idx - idx + 1;
1049
1050	/* Build the clone */
1051
1052	bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1053	if (!bio)
1054		return NULL;	/* ENOMEM */
1055
1056	bio->bi_bdev = bio_src->bi_bdev;
1057	bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1058	bio->bi_rw = bio_src->bi_rw;
1059	bio->bi_flags |= 1 << BIO_CLONED;
1060
1061	/*
1062	 * Copy over our part of the bio_vec, then update the first
1063	 * and last (or only) entries.
1064	 */
1065	memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1066			vcnt * sizeof (struct bio_vec));
1067	bio->bi_io_vec[0].bv_offset += voff;
1068	if (vcnt > 1) {
1069		bio->bi_io_vec[0].bv_len -= voff;
1070		bio->bi_io_vec[vcnt - 1].bv_len = resid;
1071	} else {
1072		bio->bi_io_vec[0].bv_len = len;
1073	}
1074
1075	bio->bi_vcnt = vcnt;
1076	bio->bi_size = len;
1077	bio->bi_idx = 0;
1078
1079	return bio;
1080}
1081
1082/*
1083 * Clone a portion of a bio chain, starting at the given byte offset
1084 * into the first bio in the source chain and continuing for the
1085 * number of bytes indicated.  The result is another bio chain of
1086 * exactly the given length, or a null pointer on error.
1087 *
1088 * The bio_src and offset parameters are both in-out.  On entry they
1089 * refer to the first source bio and the offset into that bio where
1090 * the start of data to be cloned is located.
1091 *
1092 * On return, bio_src is updated to refer to the bio in the source
1093 * chain that contains first un-cloned byte, and *offset will
1094 * contain the offset of that byte within that bio.
1095 */
1096static struct bio *bio_chain_clone_range(struct bio **bio_src,
1097					unsigned int *offset,
1098					unsigned int len,
1099					gfp_t gfpmask)
1100{
1101	struct bio *bi = *bio_src;
1102	unsigned int off = *offset;
1103	struct bio *chain = NULL;
1104	struct bio **end;
1105
1106	/* Build up a chain of clone bios up to the limit */
1107
1108	if (!bi || off >= bi->bi_size || !len)
1109		return NULL;		/* Nothing to clone */
1110
1111	end = &chain;
1112	while (len) {
1113		unsigned int bi_size;
1114		struct bio *bio;
1115
1116		if (!bi) {
1117			rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1118			goto out_err;	/* EINVAL; ran out of bio's */
1119		}
1120		bi_size = min_t(unsigned int, bi->bi_size - off, len);
1121		bio = bio_clone_range(bi, off, bi_size, gfpmask);
1122		if (!bio)
1123			goto out_err;	/* ENOMEM */
1124
1125		*end = bio;
1126		end = &bio->bi_next;
1127
1128		off += bi_size;
1129		if (off == bi->bi_size) {
1130			bi = bi->bi_next;
1131			off = 0;
1132		}
1133		len -= bi_size;
1134	}
1135	*bio_src = bi;
1136	*offset = off;
1137
1138	return chain;
1139out_err:
1140	bio_chain_put(chain);
1141
1142	return NULL;
1143}
1144
1145/*
1146 * The default/initial value for all object request flags is 0.  For
1147 * each flag, once its value is set to 1 it is never reset to 0
1148 * again.
1149 */
1150static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1151{
1152	if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1153		struct rbd_device *rbd_dev;
1154
1155		rbd_dev = obj_request->img_request->rbd_dev;
1156		rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1157			obj_request);
1158	}
1159}
1160
1161static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1162{
1163	smp_mb();
1164	return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1165}
1166
1167static void obj_request_done_set(struct rbd_obj_request *obj_request)
1168{
1169	if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1170		struct rbd_device *rbd_dev = NULL;
1171
1172		if (obj_request_img_data_test(obj_request))
1173			rbd_dev = obj_request->img_request->rbd_dev;
1174		rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1175			obj_request);
1176	}
1177}
1178
1179static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1180{
1181	smp_mb();
1182	return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1183}
1184
1185/*
1186 * This sets the KNOWN flag after (possibly) setting the EXISTS
1187 * flag.  The latter is set based on the "exists" value provided.
1188 *
1189 * Note that for our purposes once an object exists it never goes
1190 * away again.  It's possible that the response from two existence
1191 * checks are separated by the creation of the target object, and
1192 * the first ("doesn't exist") response arrives *after* the second
1193 * ("does exist").  In that case we ignore the second one.
1194 */
1195static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1196				bool exists)
1197{
1198	if (exists)
1199		set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1200	set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1201	smp_mb();
1202}
1203
1204static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1205{
1206	smp_mb();
1207	return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1208}
1209
1210static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1211{
1212	smp_mb();
1213	return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1214}
1215
1216static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1217{
1218	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1219		atomic_read(&obj_request->kref.refcount));
1220	kref_get(&obj_request->kref);
1221}
1222
1223static void rbd_obj_request_destroy(struct kref *kref);
1224static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1225{
1226	rbd_assert(obj_request != NULL);
1227	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1228		atomic_read(&obj_request->kref.refcount));
1229	kref_put(&obj_request->kref, rbd_obj_request_destroy);
1230}
1231
1232static void rbd_img_request_get(struct rbd_img_request *img_request)
1233{
1234	dout("%s: img %p (was %d)\n", __func__, img_request,
1235		atomic_read(&img_request->kref.refcount));
1236	kref_get(&img_request->kref);
1237}
1238
1239static void rbd_img_request_destroy(struct kref *kref);
1240static void rbd_img_request_put(struct rbd_img_request *img_request)
1241{
1242	rbd_assert(img_request != NULL);
1243	dout("%s: img %p (was %d)\n", __func__, img_request,
1244		atomic_read(&img_request->kref.refcount));
1245	kref_put(&img_request->kref, rbd_img_request_destroy);
1246}
1247
1248static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1249					struct rbd_obj_request *obj_request)
1250{
1251	rbd_assert(obj_request->img_request == NULL);
1252
1253	/* Image request now owns object's original reference */
1254	obj_request->img_request = img_request;
1255	obj_request->which = img_request->obj_request_count;
1256	rbd_assert(!obj_request_img_data_test(obj_request));
1257	obj_request_img_data_set(obj_request);
1258	rbd_assert(obj_request->which != BAD_WHICH);
1259	img_request->obj_request_count++;
1260	list_add_tail(&obj_request->links, &img_request->obj_requests);
1261	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1262		obj_request->which);
1263}
1264
1265static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1266					struct rbd_obj_request *obj_request)
1267{
1268	rbd_assert(obj_request->which != BAD_WHICH);
1269
1270	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1271		obj_request->which);
1272	list_del(&obj_request->links);
1273	rbd_assert(img_request->obj_request_count > 0);
1274	img_request->obj_request_count--;
1275	rbd_assert(obj_request->which == img_request->obj_request_count);
1276	obj_request->which = BAD_WHICH;
1277	rbd_assert(obj_request_img_data_test(obj_request));
1278	rbd_assert(obj_request->img_request == img_request);
1279	obj_request->img_request = NULL;
1280	obj_request->callback = NULL;
1281	rbd_obj_request_put(obj_request);
1282}
1283
1284static bool obj_request_type_valid(enum obj_request_type type)
1285{
1286	switch (type) {
1287	case OBJ_REQUEST_NODATA:
1288	case OBJ_REQUEST_BIO:
1289	case OBJ_REQUEST_PAGES:
1290		return true;
1291	default:
1292		return false;
1293	}
1294}
1295
1296static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1297				struct rbd_obj_request *obj_request)
1298{
1299	dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1300
1301	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1302}
1303
1304static void rbd_img_request_complete(struct rbd_img_request *img_request)
1305{
1306
1307	dout("%s: img %p\n", __func__, img_request);
1308
1309	/*
1310	 * If no error occurred, compute the aggregate transfer
1311	 * count for the image request.  We could instead use
1312	 * atomic64_cmpxchg() to update it as each object request
1313	 * completes; not clear which way is better off hand.
1314	 */
1315	if (!img_request->result) {
1316		struct rbd_obj_request *obj_request;
1317		u64 xferred = 0;
1318
1319		for_each_obj_request(img_request, obj_request)
1320			xferred += obj_request->xferred;
1321		img_request->xferred = xferred;
1322	}
1323
1324	if (img_request->callback)
1325		img_request->callback(img_request);
1326	else
1327		rbd_img_request_put(img_request);
1328}
1329
1330/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1331
1332static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1333{
1334	dout("%s: obj %p\n", __func__, obj_request);
1335
1336	return wait_for_completion_interruptible(&obj_request->completion);
1337}
1338
1339/*
1340 * The default/initial value for all image request flags is 0.  Each
1341 * is conditionally set to 1 at image request initialization time
1342 * and currently never change thereafter.
1343 */
1344static void img_request_write_set(struct rbd_img_request *img_request)
1345{
1346	set_bit(IMG_REQ_WRITE, &img_request->flags);
1347	smp_mb();
1348}
1349
1350static bool img_request_write_test(struct rbd_img_request *img_request)
1351{
1352	smp_mb();
1353	return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1354}
1355
1356static void img_request_child_set(struct rbd_img_request *img_request)
1357{
1358	set_bit(IMG_REQ_CHILD, &img_request->flags);
1359	smp_mb();
1360}
1361
1362static bool img_request_child_test(struct rbd_img_request *img_request)
1363{
1364	smp_mb();
1365	return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1366}
1367
1368static void img_request_layered_set(struct rbd_img_request *img_request)
1369{
1370	set_bit(IMG_REQ_LAYERED, &img_request->flags);
1371	smp_mb();
1372}
1373
1374static bool img_request_layered_test(struct rbd_img_request *img_request)
1375{
1376	smp_mb();
1377	return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1378}
1379
1380static void
1381rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1382{
1383	u64 xferred = obj_request->xferred;
1384	u64 length = obj_request->length;
1385
1386	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1387		obj_request, obj_request->img_request, obj_request->result,
1388		xferred, length);
1389	/*
1390	 * ENOENT means a hole in the image.  We zero-fill the
1391	 * entire length of the request.  A short read also implies
1392	 * zero-fill to the end of the request.  Either way we
1393	 * update the xferred count to indicate the whole request
1394	 * was satisfied.
1395	 */
1396	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1397	if (obj_request->result == -ENOENT) {
1398		if (obj_request->type == OBJ_REQUEST_BIO)
1399			zero_bio_chain(obj_request->bio_list, 0);
1400		else
1401			zero_pages(obj_request->pages, 0, length);
1402		obj_request->result = 0;
1403		obj_request->xferred = length;
1404	} else if (xferred < length && !obj_request->result) {
1405		if (obj_request->type == OBJ_REQUEST_BIO)
1406			zero_bio_chain(obj_request->bio_list, xferred);
1407		else
1408			zero_pages(obj_request->pages, xferred, length);
1409		obj_request->xferred = length;
1410	}
1411	obj_request_done_set(obj_request);
1412}
1413
1414static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1415{
1416	dout("%s: obj %p cb %p\n", __func__, obj_request,
1417		obj_request->callback);
1418	if (obj_request->callback)
1419		obj_request->callback(obj_request);
1420	else
1421		complete_all(&obj_request->completion);
1422}
1423
1424static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1425{
1426	dout("%s: obj %p\n", __func__, obj_request);
1427	obj_request_done_set(obj_request);
1428}
1429
1430static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1431{
1432	struct rbd_img_request *img_request = NULL;
1433	struct rbd_device *rbd_dev = NULL;
1434	bool layered = false;
1435
1436	if (obj_request_img_data_test(obj_request)) {
1437		img_request = obj_request->img_request;
1438		layered = img_request && img_request_layered_test(img_request);
1439		rbd_dev = img_request->rbd_dev;
1440	}
1441
1442	dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1443		obj_request, img_request, obj_request->result,
1444		obj_request->xferred, obj_request->length);
1445	if (layered && obj_request->result == -ENOENT &&
1446			obj_request->img_offset < rbd_dev->parent_overlap)
1447		rbd_img_parent_read(obj_request);
1448	else if (img_request)
1449		rbd_img_obj_request_read_callback(obj_request);
1450	else
1451		obj_request_done_set(obj_request);
1452}
1453
1454static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1455{
1456	dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1457		obj_request->result, obj_request->length);
1458	/*
1459	 * There is no such thing as a successful short write.  Set
1460	 * it to our originally-requested length.
1461	 */
1462	obj_request->xferred = obj_request->length;
1463	obj_request_done_set(obj_request);
1464}
1465
1466/*
1467 * For a simple stat call there's nothing to do.  We'll do more if
1468 * this is part of a write sequence for a layered image.
1469 */
1470static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1471{
1472	dout("%s: obj %p\n", __func__, obj_request);
1473	obj_request_done_set(obj_request);
1474}
1475
1476static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1477				struct ceph_msg *msg)
1478{
1479	struct rbd_obj_request *obj_request = osd_req->r_priv;
1480	u16 opcode;
1481
1482	dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1483	rbd_assert(osd_req == obj_request->osd_req);
1484	if (obj_request_img_data_test(obj_request)) {
1485		rbd_assert(obj_request->img_request);
1486		rbd_assert(obj_request->which != BAD_WHICH);
1487	} else {
1488		rbd_assert(obj_request->which == BAD_WHICH);
1489	}
1490
1491	if (osd_req->r_result < 0)
1492		obj_request->result = osd_req->r_result;
1493	obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1494
1495	BUG_ON(osd_req->r_num_ops > 2);
1496
1497	/*
1498	 * We support a 64-bit length, but ultimately it has to be
1499	 * passed to blk_end_request(), which takes an unsigned int.
1500	 */
1501	obj_request->xferred = osd_req->r_reply_op_len[0];
1502	rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1503	opcode = osd_req->r_ops[0].op;
1504	switch (opcode) {
1505	case CEPH_OSD_OP_READ:
1506		rbd_osd_read_callback(obj_request);
1507		break;
1508	case CEPH_OSD_OP_WRITE:
1509		rbd_osd_write_callback(obj_request);
1510		break;
1511	case CEPH_OSD_OP_STAT:
1512		rbd_osd_stat_callback(obj_request);
1513		break;
1514	case CEPH_OSD_OP_CALL:
1515	case CEPH_OSD_OP_NOTIFY_ACK:
1516	case CEPH_OSD_OP_WATCH:
1517		rbd_osd_trivial_callback(obj_request);
1518		break;
1519	default:
1520		rbd_warn(NULL, "%s: unsupported op %hu\n",
1521			obj_request->object_name, (unsigned short) opcode);
1522		break;
1523	}
1524
1525	if (obj_request_done_test(obj_request))
1526		rbd_obj_request_complete(obj_request);
1527}
1528
1529static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1530{
1531	struct rbd_img_request *img_request = obj_request->img_request;
1532	struct ceph_osd_request *osd_req = obj_request->osd_req;
1533	u64 snap_id;
1534
1535	rbd_assert(osd_req != NULL);
1536
1537	snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1538	ceph_osdc_build_request(osd_req, obj_request->offset,
1539			NULL, snap_id, NULL);
1540}
1541
1542static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1543{
1544	struct rbd_img_request *img_request = obj_request->img_request;
1545	struct ceph_osd_request *osd_req = obj_request->osd_req;
1546	struct ceph_snap_context *snapc;
1547	struct timespec mtime = CURRENT_TIME;
1548
1549	rbd_assert(osd_req != NULL);
1550
1551	snapc = img_request ? img_request->snapc : NULL;
1552	ceph_osdc_build_request(osd_req, obj_request->offset,
1553			snapc, CEPH_NOSNAP, &mtime);
1554}
1555
1556static struct ceph_osd_request *rbd_osd_req_create(
1557					struct rbd_device *rbd_dev,
1558					bool write_request,
1559					struct rbd_obj_request *obj_request)
1560{
1561	struct ceph_snap_context *snapc = NULL;
1562	struct ceph_osd_client *osdc;
1563	struct ceph_osd_request *osd_req;
1564
1565	if (obj_request_img_data_test(obj_request)) {
1566		struct rbd_img_request *img_request = obj_request->img_request;
1567
1568		rbd_assert(write_request ==
1569				img_request_write_test(img_request));
1570		if (write_request)
1571			snapc = img_request->snapc;
1572	}
1573
1574	/* Allocate and initialize the request, for the single op */
1575
1576	osdc = &rbd_dev->rbd_client->client->osdc;
1577	osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1578	if (!osd_req)
1579		return NULL;	/* ENOMEM */
1580
1581	if (write_request)
1582		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1583	else
1584		osd_req->r_flags = CEPH_OSD_FLAG_READ;
1585
1586	osd_req->r_callback = rbd_osd_req_callback;
1587	osd_req->r_priv = obj_request;
1588
1589	osd_req->r_oid_len = strlen(obj_request->object_name);
1590	rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1591	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1592
1593	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
1594
1595	return osd_req;
1596}
1597
1598/*
1599 * Create a copyup osd request based on the information in the
1600 * object request supplied.  A copyup request has two osd ops,
1601 * a copyup method call, and a "normal" write request.
1602 */
1603static struct ceph_osd_request *
1604rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1605{
1606	struct rbd_img_request *img_request;
1607	struct ceph_snap_context *snapc;
1608	struct rbd_device *rbd_dev;
1609	struct ceph_osd_client *osdc;
1610	struct ceph_osd_request *osd_req;
1611
1612	rbd_assert(obj_request_img_data_test(obj_request));
1613	img_request = obj_request->img_request;
1614	rbd_assert(img_request);
1615	rbd_assert(img_request_write_test(img_request));
1616
1617	/* Allocate and initialize the request, for the two ops */
1618
1619	snapc = img_request->snapc;
1620	rbd_dev = img_request->rbd_dev;
1621	osdc = &rbd_dev->rbd_client->client->osdc;
1622	osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1623	if (!osd_req)
1624		return NULL;	/* ENOMEM */
1625
1626	osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1627	osd_req->r_callback = rbd_osd_req_callback;
1628	osd_req->r_priv = obj_request;
1629
1630	osd_req->r_oid_len = strlen(obj_request->object_name);
1631	rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1632	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1633
1634	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
1635
1636	return osd_req;
1637}
1638
1639
1640static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1641{
1642	ceph_osdc_put_request(osd_req);
1643}
1644
1645/* object_name is assumed to be a non-null pointer and NUL-terminated */
1646
1647static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1648						u64 offset, u64 length,
1649						enum obj_request_type type)
1650{
1651	struct rbd_obj_request *obj_request;
1652	size_t size;
1653	char *name;
1654
1655	rbd_assert(obj_request_type_valid(type));
1656
1657	size = strlen(object_name) + 1;
1658	obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1659	if (!obj_request)
1660		return NULL;
1661
1662	name = (char *)(obj_request + 1);
1663	obj_request->object_name = memcpy(name, object_name, size);
1664	obj_request->offset = offset;
1665	obj_request->length = length;
1666	obj_request->flags = 0;
1667	obj_request->which = BAD_WHICH;
1668	obj_request->type = type;
1669	INIT_LIST_HEAD(&obj_request->links);
1670	init_completion(&obj_request->completion);
1671	kref_init(&obj_request->kref);
1672
1673	dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1674		offset, length, (int)type, obj_request);
1675
1676	return obj_request;
1677}
1678
1679static void rbd_obj_request_destroy(struct kref *kref)
1680{
1681	struct rbd_obj_request *obj_request;
1682
1683	obj_request = container_of(kref, struct rbd_obj_request, kref);
1684
1685	dout("%s: obj %p\n", __func__, obj_request);
1686
1687	rbd_assert(obj_request->img_request == NULL);
1688	rbd_assert(obj_request->which == BAD_WHICH);
1689
1690	if (obj_request->osd_req)
1691		rbd_osd_req_destroy(obj_request->osd_req);
1692
1693	rbd_assert(obj_request_type_valid(obj_request->type));
1694	switch (obj_request->type) {
1695	case OBJ_REQUEST_NODATA:
1696		break;		/* Nothing to do */
1697	case OBJ_REQUEST_BIO:
1698		if (obj_request->bio_list)
1699			bio_chain_put(obj_request->bio_list);
1700		break;
1701	case OBJ_REQUEST_PAGES:
1702		if (obj_request->pages)
1703			ceph_release_page_vector(obj_request->pages,
1704						obj_request->page_count);
1705		break;
1706	}
1707
1708	kfree(obj_request);
1709}
1710
1711/*
1712 * Caller is responsible for filling in the list of object requests
1713 * that comprises the image request, and the Linux request pointer
1714 * (if there is one).
1715 */
1716static struct rbd_img_request *rbd_img_request_create(
1717					struct rbd_device *rbd_dev,
1718					u64 offset, u64 length,
1719					bool write_request,
1720					bool child_request)
1721{
1722	struct rbd_img_request *img_request;
1723	struct ceph_snap_context *snapc = NULL;
1724
1725	img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1726	if (!img_request)
1727		return NULL;
1728
1729	if (write_request) {
1730		down_read(&rbd_dev->header_rwsem);
1731		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1732		up_read(&rbd_dev->header_rwsem);
1733		if (WARN_ON(!snapc)) {
1734			kfree(img_request);
1735			return NULL;	/* Shouldn't happen */
1736		}
1737
1738	}
1739
1740	img_request->rq = NULL;
1741	img_request->rbd_dev = rbd_dev;
1742	img_request->offset = offset;
1743	img_request->length = length;
1744	img_request->flags = 0;
1745	if (write_request) {
1746		img_request_write_set(img_request);
1747		img_request->snapc = snapc;
1748	} else {
1749		img_request->snap_id = rbd_dev->spec->snap_id;
1750	}
1751	if (child_request)
1752		img_request_child_set(img_request);
1753	if (rbd_dev->parent_spec)
1754		img_request_layered_set(img_request);
1755	spin_lock_init(&img_request->completion_lock);
1756	img_request->next_completion = 0;
1757	img_request->callback = NULL;
1758	img_request->result = 0;
1759	img_request->obj_request_count = 0;
1760	INIT_LIST_HEAD(&img_request->obj_requests);
1761	kref_init(&img_request->kref);
1762
1763	rbd_img_request_get(img_request);	/* Avoid a warning */
1764	rbd_img_request_put(img_request);	/* TEMPORARY */
1765
1766	dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1767		write_request ? "write" : "read", offset, length,
1768		img_request);
1769
1770	return img_request;
1771}
1772
1773static void rbd_img_request_destroy(struct kref *kref)
1774{
1775	struct rbd_img_request *img_request;
1776	struct rbd_obj_request *obj_request;
1777	struct rbd_obj_request *next_obj_request;
1778
1779	img_request = container_of(kref, struct rbd_img_request, kref);
1780
1781	dout("%s: img %p\n", __func__, img_request);
1782
1783	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1784		rbd_img_obj_request_del(img_request, obj_request);
1785	rbd_assert(img_request->obj_request_count == 0);
1786
1787	if (img_request_write_test(img_request))
1788		ceph_put_snap_context(img_request->snapc);
1789
1790	if (img_request_child_test(img_request))
1791		rbd_obj_request_put(img_request->obj_request);
1792
1793	kfree(img_request);
1794}
1795
1796static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1797{
1798	struct rbd_img_request *img_request;
1799	unsigned int xferred;
1800	int result;
1801	bool more;
1802
1803	rbd_assert(obj_request_img_data_test(obj_request));
1804	img_request = obj_request->img_request;
1805
1806	rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1807	xferred = (unsigned int)obj_request->xferred;
1808	result = obj_request->result;
1809	if (result) {
1810		struct rbd_device *rbd_dev = img_request->rbd_dev;
1811
1812		rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1813			img_request_write_test(img_request) ? "write" : "read",
1814			obj_request->length, obj_request->img_offset,
1815			obj_request->offset);
1816		rbd_warn(rbd_dev, "  result %d xferred %x\n",
1817			result, xferred);
1818		if (!img_request->result)
1819			img_request->result = result;
1820	}
1821
1822	/* Image object requests don't own their page array */
1823
1824	if (obj_request->type == OBJ_REQUEST_PAGES) {
1825		obj_request->pages = NULL;
1826		obj_request->page_count = 0;
1827	}
1828
1829	if (img_request_child_test(img_request)) {
1830		rbd_assert(img_request->obj_request != NULL);
1831		more = obj_request->which < img_request->obj_request_count - 1;
1832	} else {
1833		rbd_assert(img_request->rq != NULL);
1834		more = blk_end_request(img_request->rq, result, xferred);
1835	}
1836
1837	return more;
1838}
1839
1840static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1841{
1842	struct rbd_img_request *img_request;
1843	u32 which = obj_request->which;
1844	bool more = true;
1845
1846	rbd_assert(obj_request_img_data_test(obj_request));
1847	img_request = obj_request->img_request;
1848
1849	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1850	rbd_assert(img_request != NULL);
1851	rbd_assert(img_request->obj_request_count > 0);
1852	rbd_assert(which != BAD_WHICH);
1853	rbd_assert(which < img_request->obj_request_count);
1854	rbd_assert(which >= img_request->next_completion);
1855
1856	spin_lock_irq(&img_request->completion_lock);
1857	if (which != img_request->next_completion)
1858		goto out;
1859
1860	for_each_obj_request_from(img_request, obj_request) {
1861		rbd_assert(more);
1862		rbd_assert(which < img_request->obj_request_count);
1863
1864		if (!obj_request_done_test(obj_request))
1865			break;
1866		more = rbd_img_obj_end_request(obj_request);
1867		which++;
1868	}
1869
1870	rbd_assert(more ^ (which == img_request->obj_request_count));
1871	img_request->next_completion = which;
1872out:
1873	spin_unlock_irq(&img_request->completion_lock);
1874
1875	if (!more)
1876		rbd_img_request_complete(img_request);
1877}
1878
1879/*
1880 * Split up an image request into one or more object requests, each
1881 * to a different object.  The "type" parameter indicates whether
1882 * "data_desc" is the pointer to the head of a list of bio
1883 * structures, or the base of a page array.  In either case this
1884 * function assumes data_desc describes memory sufficient to hold
1885 * all data described by the image request.
1886 */
1887static int rbd_img_request_fill(struct rbd_img_request *img_request,
1888					enum obj_request_type type,
1889					void *data_desc)
1890{
1891	struct rbd_device *rbd_dev = img_request->rbd_dev;
1892	struct rbd_obj_request *obj_request = NULL;
1893	struct rbd_obj_request *next_obj_request;
1894	bool write_request = img_request_write_test(img_request);
1895	struct bio *bio_list;
1896	unsigned int bio_offset = 0;
1897	struct page **pages;
1898	u64 img_offset;
1899	u64 resid;
1900	u16 opcode;
1901
1902	dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1903		(int)type, data_desc);
1904
1905	opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1906	img_offset = img_request->offset;
1907	resid = img_request->length;
1908	rbd_assert(resid > 0);
1909
1910	if (type == OBJ_REQUEST_BIO) {
1911		bio_list = data_desc;
1912		rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1913	} else {
1914		rbd_assert(type == OBJ_REQUEST_PAGES);
1915		pages = data_desc;
1916	}
1917
1918	while (resid) {
1919		struct ceph_osd_request *osd_req;
1920		const char *object_name;
1921		u64 offset;
1922		u64 length;
1923
1924		object_name = rbd_segment_name(rbd_dev, img_offset);
1925		if (!object_name)
1926			goto out_unwind;
1927		offset = rbd_segment_offset(rbd_dev, img_offset);
1928		length = rbd_segment_length(rbd_dev, img_offset, resid);
1929		obj_request = rbd_obj_request_create(object_name,
1930						offset, length, type);
1931		kfree(object_name);	/* object request has its own copy */
1932		if (!obj_request)
1933			goto out_unwind;
1934
1935		if (type == OBJ_REQUEST_BIO) {
1936			unsigned int clone_size;
1937
1938			rbd_assert(length <= (u64)UINT_MAX);
1939			clone_size = (unsigned int)length;
1940			obj_request->bio_list =
1941					bio_chain_clone_range(&bio_list,
1942								&bio_offset,
1943								clone_size,
1944								GFP_ATOMIC);
1945			if (!obj_request->bio_list)
1946				goto out_partial;
1947		} else {
1948			unsigned int page_count;
1949
1950			obj_request->pages = pages;
1951			page_count = (u32)calc_pages_for(offset, length);
1952			obj_request->page_count = page_count;
1953			if ((offset + length) & ~PAGE_MASK)
1954				page_count--;	/* more on last page */
1955			pages += page_count;
1956		}
1957
1958		osd_req = rbd_osd_req_create(rbd_dev, write_request,
1959						obj_request);
1960		if (!osd_req)
1961			goto out_partial;
1962		obj_request->osd_req = osd_req;
1963		obj_request->callback = rbd_img_obj_callback;
1964
1965		osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1966						0, 0);
1967		if (type == OBJ_REQUEST_BIO)
1968			osd_req_op_extent_osd_data_bio(osd_req, 0,
1969					obj_request->bio_list, length);
1970		else
1971			osd_req_op_extent_osd_data_pages(osd_req, 0,
1972					obj_request->pages, length,
1973					offset & ~PAGE_MASK, false, false);
1974
1975		if (write_request)
1976			rbd_osd_req_format_write(obj_request);
1977		else
1978			rbd_osd_req_format_read(obj_request);
1979
1980		obj_request->img_offset = img_offset;
1981		rbd_img_obj_request_add(img_request, obj_request);
1982
1983		img_offset += length;
1984		resid -= length;
1985	}
1986
1987	return 0;
1988
1989out_partial:
1990	rbd_obj_request_put(obj_request);
1991out_unwind:
1992	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1993		rbd_obj_request_put(obj_request);
1994
1995	return -ENOMEM;
1996}
1997
1998static void
1999rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2000{
2001	struct rbd_img_request *img_request;
2002	struct rbd_device *rbd_dev;
2003	u64 length;
2004	u32 page_count;
2005
2006	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2007	rbd_assert(obj_request_img_data_test(obj_request));
2008	img_request = obj_request->img_request;
2009	rbd_assert(img_request);
2010
2011	rbd_dev = img_request->rbd_dev;
2012	rbd_assert(rbd_dev);
2013	length = (u64)1 << rbd_dev->header.obj_order;
2014	page_count = (u32)calc_pages_for(0, length);
2015
2016	rbd_assert(obj_request->copyup_pages);
2017	ceph_release_page_vector(obj_request->copyup_pages, page_count);
2018	obj_request->copyup_pages = NULL;
2019
2020	/*
2021	 * We want the transfer count to reflect the size of the
2022	 * original write request.  There is no such thing as a
2023	 * successful short write, so if the request was successful
2024	 * we can just set it to the originally-requested length.
2025	 */
2026	if (!obj_request->result)
2027		obj_request->xferred = obj_request->length;
2028
2029	/* Finish up with the normal image object callback */
2030
2031	rbd_img_obj_callback(obj_request);
2032}
2033
2034static void
2035rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2036{
2037	struct rbd_obj_request *orig_request;
2038	struct ceph_osd_request *osd_req;
2039	struct ceph_osd_client *osdc;
2040	struct rbd_device *rbd_dev;
2041	struct page **pages;
2042	int result;
2043	u64 obj_size;
2044	u64 xferred;
2045
2046	rbd_assert(img_request_child_test(img_request));
2047
2048	/* First get what we need from the image request */
2049
2050	pages = img_request->copyup_pages;
2051	rbd_assert(pages != NULL);
2052	img_request->copyup_pages = NULL;
2053
2054	orig_request = img_request->obj_request;
2055	rbd_assert(orig_request != NULL);
2056	rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2057	result = img_request->result;
2058	obj_size = img_request->length;
2059	xferred = img_request->xferred;
2060
2061	rbd_dev = img_request->rbd_dev;
2062	rbd_assert(rbd_dev);
2063	rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2064
2065	rbd_img_request_put(img_request);
2066
2067	if (result)
2068		goto out_err;
2069
2070	/* Allocate the new copyup osd request for the original request */
2071
2072	result = -ENOMEM;
2073	rbd_assert(!orig_request->osd_req);
2074	osd_req = rbd_osd_req_create_copyup(orig_request);
2075	if (!osd_req)
2076		goto out_err;
2077	orig_request->osd_req = osd_req;
2078	orig_request->copyup_pages = pages;
2079
2080	/* Initialize the copyup op */
2081
2082	osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2083	osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2084						false, false);
2085
2086	/* Then the original write request op */
2087
2088	osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2089					orig_request->offset,
2090					orig_request->length, 0, 0);
2091	osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2092					orig_request->length);
2093
2094	rbd_osd_req_format_write(orig_request);
2095
2096	/* All set, send it off. */
2097
2098	orig_request->callback = rbd_img_obj_copyup_callback;
2099	osdc = &rbd_dev->rbd_client->client->osdc;
2100	result = rbd_obj_request_submit(osdc, orig_request);
2101	if (!result)
2102		return;
2103out_err:
2104	/* Record the error code and complete the request */
2105
2106	orig_request->result = result;
2107	orig_request->xferred = 0;
2108	obj_request_done_set(orig_request);
2109	rbd_obj_request_complete(orig_request);
2110}
2111
2112/*
2113 * Read from the parent image the range of data that covers the
2114 * entire target of the given object request.  This is used for
2115 * satisfying a layered image write request when the target of an
2116 * object request from the image request does not exist.
2117 *
2118 * A page array big enough to hold the returned data is allocated
2119 * and supplied to rbd_img_request_fill() as the "data descriptor."
2120 * When the read completes, this page array will be transferred to
2121 * the original object request for the copyup operation.
2122 *
2123 * If an error occurs, record it as the result of the original
2124 * object request and mark it done so it gets completed.
2125 */
2126static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2127{
2128	struct rbd_img_request *img_request = NULL;
2129	struct rbd_img_request *parent_request = NULL;
2130	struct rbd_device *rbd_dev;
2131	u64 img_offset;
2132	u64 length;
2133	struct page **pages = NULL;
2134	u32 page_count;
2135	int result;
2136
2137	rbd_assert(obj_request_img_data_test(obj_request));
2138	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2139
2140	img_request = obj_request->img_request;
2141	rbd_assert(img_request != NULL);
2142	rbd_dev = img_request->rbd_dev;
2143	rbd_assert(rbd_dev->parent != NULL);
2144
2145	/*
2146	 * First things first.  The original osd request is of no
2147	 * use to use any more, we'll need a new one that can hold
2148	 * the two ops in a copyup request.  We'll get that later,
2149	 * but for now we can release the old one.
2150	 */
2151	rbd_osd_req_destroy(obj_request->osd_req);
2152	obj_request->osd_req = NULL;
2153
2154	/*
2155	 * Determine the byte range covered by the object in the
2156	 * child image to which the original request was to be sent.
2157	 */
2158	img_offset = obj_request->img_offset - obj_request->offset;
2159	length = (u64)1 << rbd_dev->header.obj_order;
2160
2161	/*
2162	 * There is no defined parent data beyond the parent
2163	 * overlap, so limit what we read at that boundary if
2164	 * necessary.
2165	 */
2166	if (img_offset + length > rbd_dev->parent_overlap) {
2167		rbd_assert(img_offset < rbd_dev->parent_overlap);
2168		length = rbd_dev->parent_overlap - img_offset;
2169	}
2170
2171	/*
2172	 * Allocate a page array big enough to receive the data read
2173	 * from the parent.
2174	 */
2175	page_count = (u32)calc_pages_for(0, length);
2176	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2177	if (IS_ERR(pages)) {
2178		result = PTR_ERR(pages);
2179		pages = NULL;
2180		goto out_err;
2181	}
2182
2183	result = -ENOMEM;
2184	parent_request = rbd_img_request_create(rbd_dev->parent,
2185						img_offset, length,
2186						false, true);
2187	if (!parent_request)
2188		goto out_err;
2189	rbd_obj_request_get(obj_request);
2190	parent_request->obj_request = obj_request;
2191
2192	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2193	if (result)
2194		goto out_err;
2195	parent_request->copyup_pages = pages;
2196
2197	parent_request->callback = rbd_img_obj_parent_read_full_callback;
2198	result = rbd_img_request_submit(parent_request);
2199	if (!result)
2200		return 0;
2201
2202	parent_request->copyup_pages = NULL;
2203	parent_request->obj_request = NULL;
2204	rbd_obj_request_put(obj_request);
2205out_err:
2206	if (pages)
2207		ceph_release_page_vector(pages, page_count);
2208	if (parent_request)
2209		rbd_img_request_put(parent_request);
2210	obj_request->result = result;
2211	obj_request->xferred = 0;
2212	obj_request_done_set(obj_request);
2213
2214	return result;
2215}
2216
2217static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2218{
2219	struct rbd_obj_request *orig_request;
2220	int result;
2221
2222	rbd_assert(!obj_request_img_data_test(obj_request));
2223
2224	/*
2225	 * All we need from the object request is the original
2226	 * request and the result of the STAT op.  Grab those, then
2227	 * we're done with the request.
2228	 */
2229	orig_request = obj_request->obj_request;
2230	obj_request->obj_request = NULL;
2231	rbd_assert(orig_request);
2232	rbd_assert(orig_request->img_request);
2233
2234	result = obj_request->result;
2235	obj_request->result = 0;
2236
2237	dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2238		obj_request, orig_request, result,
2239		obj_request->xferred, obj_request->length);
2240	rbd_obj_request_put(obj_request);
2241
2242	rbd_assert(orig_request);
2243	rbd_assert(orig_request->img_request);
2244
2245	/*
2246	 * Our only purpose here is to determine whether the object
2247	 * exists, and we don't want to treat the non-existence as
2248	 * an error.  If something else comes back, transfer the
2249	 * error to the original request and complete it now.
2250	 */
2251	if (!result) {
2252		obj_request_existence_set(orig_request, true);
2253	} else if (result == -ENOENT) {
2254		obj_request_existence_set(orig_request, false);
2255	} else if (result) {
2256		orig_request->result = result;
2257		goto out;
2258	}
2259
2260	/*
2261	 * Resubmit the original request now that we have recorded
2262	 * whether the target object exists.
2263	 */
2264	orig_request->result = rbd_img_obj_request_submit(orig_request);
2265out:
2266	if (orig_request->result)
2267		rbd_obj_request_complete(orig_request);
2268	rbd_obj_request_put(orig_request);
2269}
2270
2271static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2272{
2273	struct rbd_obj_request *stat_request;
2274	struct rbd_device *rbd_dev;
2275	struct ceph_osd_client *osdc;
2276	struct page **pages = NULL;
2277	u32 page_count;
2278	size_t size;
2279	int ret;
2280
2281	/*
2282	 * The response data for a STAT call consists of:
2283	 *     le64 length;
2284	 *     struct {
2285	 *         le32 tv_sec;
2286	 *         le32 tv_nsec;
2287	 *     } mtime;
2288	 */
2289	size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2290	page_count = (u32)calc_pages_for(0, size);
2291	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2292	if (IS_ERR(pages))
2293		return PTR_ERR(pages);
2294
2295	ret = -ENOMEM;
2296	stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2297							OBJ_REQUEST_PAGES);
2298	if (!stat_request)
2299		goto out;
2300
2301	rbd_obj_request_get(obj_request);
2302	stat_request->obj_request = obj_request;
2303	stat_request->pages = pages;
2304	stat_request->page_count = page_count;
2305
2306	rbd_assert(obj_request->img_request);
2307	rbd_dev = obj_request->img_request->rbd_dev;
2308	stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2309						stat_request);
2310	if (!stat_request->osd_req)
2311		goto out;
2312	stat_request->callback = rbd_img_obj_exists_callback;
2313
2314	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2315	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2316					false, false);
2317	rbd_osd_req_format_read(stat_request);
2318
2319	osdc = &rbd_dev->rbd_client->client->osdc;
2320	ret = rbd_obj_request_submit(osdc, stat_request);
2321out:
2322	if (ret)
2323		rbd_obj_request_put(obj_request);
2324
2325	return ret;
2326}
2327
2328static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2329{
2330	struct rbd_img_request *img_request;
2331	struct rbd_device *rbd_dev;
2332	bool known;
2333
2334	rbd_assert(obj_request_img_data_test(obj_request));
2335
2336	img_request = obj_request->img_request;
2337	rbd_assert(img_request);
2338	rbd_dev = img_request->rbd_dev;
2339
2340	/*
2341	 * Only writes to layered images need special handling.
2342	 * Reads and non-layered writes are simple object requests.
2343	 * Layered writes that start beyond the end of the overlap
2344	 * with the parent have no parent data, so they too are
2345	 * simple object requests.  Finally, if the target object is
2346	 * known to already exist, its parent data has already been
2347	 * copied, so a write to the object can also be handled as a
2348	 * simple object request.
2349	 */
2350	if (!img_request_write_test(img_request) ||
2351		!img_request_layered_test(img_request) ||
2352		rbd_dev->parent_overlap <= obj_request->img_offset ||
2353		((known = obj_request_known_test(obj_request)) &&
2354			obj_request_exists_test(obj_request))) {
2355
2356		struct rbd_device *rbd_dev;
2357		struct ceph_osd_client *osdc;
2358
2359		rbd_dev = obj_request->img_request->rbd_dev;
2360		osdc = &rbd_dev->rbd_client->client->osdc;
2361
2362		return rbd_obj_request_submit(osdc, obj_request);
2363	}
2364
2365	/*
2366	 * It's a layered write.  The target object might exist but
2367	 * we may not know that yet.  If we know it doesn't exist,
2368	 * start by reading the data for the full target object from
2369	 * the parent so we can use it for a copyup to the target.
2370	 */
2371	if (known)
2372		return rbd_img_obj_parent_read_full(obj_request);
2373
2374	/* We don't know whether the target exists.  Go find out. */
2375
2376	return rbd_img_obj_exists_submit(obj_request);
2377}
2378
2379static int rbd_img_request_submit(struct rbd_img_request *img_request)
2380{
2381	struct rbd_obj_request *obj_request;
2382	struct rbd_obj_request *next_obj_request;
2383
2384	dout("%s: img %p\n", __func__, img_request);
2385	for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2386		int ret;
2387
2388		ret = rbd_img_obj_request_submit(obj_request);
2389		if (ret)
2390			return ret;
2391	}
2392
2393	return 0;
2394}
2395
2396static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2397{
2398	struct rbd_obj_request *obj_request;
2399	struct rbd_device *rbd_dev;
2400	u64 obj_end;
2401
2402	rbd_assert(img_request_child_test(img_request));
2403
2404	obj_request = img_request->obj_request;
2405	rbd_assert(obj_request);
2406	rbd_assert(obj_request->img_request);
2407
2408	obj_request->result = img_request->result;
2409	if (obj_request->result)
2410		goto out;
2411
2412	/*
2413	 * We need to zero anything beyond the parent overlap
2414	 * boundary.  Since rbd_img_obj_request_read_callback()
2415	 * will zero anything beyond the end of a short read, an
2416	 * easy way to do this is to pretend the data from the
2417	 * parent came up short--ending at the overlap boundary.
2418	 */
2419	rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2420	obj_end = obj_request->img_offset + obj_request->length;
2421	rbd_dev = obj_request->img_request->rbd_dev;
2422	if (obj_end > rbd_dev->parent_overlap) {
2423		u64 xferred = 0;
2424
2425		if (obj_request->img_offset < rbd_dev->parent_overlap)
2426			xferred = rbd_dev->parent_overlap -
2427					obj_request->img_offset;
2428
2429		obj_request->xferred = min(img_request->xferred, xferred);
2430	} else {
2431		obj_request->xferred = img_request->xferred;
2432	}
2433out:
2434	rbd_img_obj_request_read_callback(obj_request);
2435	rbd_obj_request_complete(obj_request);
2436}
2437
2438static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2439{
2440	struct rbd_device *rbd_dev;
2441	struct rbd_img_request *img_request;
2442	int result;
2443
2444	rbd_assert(obj_request_img_data_test(obj_request));
2445	rbd_assert(obj_request->img_request != NULL);
2446	rbd_assert(obj_request->result == (s32) -ENOENT);
2447	rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2448
2449	rbd_dev = obj_request->img_request->rbd_dev;
2450	rbd_assert(rbd_dev->parent != NULL);
2451	/* rbd_read_finish(obj_request, obj_request->length); */
2452	img_request = rbd_img_request_create(rbd_dev->parent,
2453						obj_request->img_offset,
2454						obj_request->length,
2455						false, true);
2456	result = -ENOMEM;
2457	if (!img_request)
2458		goto out_err;
2459
2460	rbd_obj_request_get(obj_request);
2461	img_request->obj_request = obj_request;
2462
2463	result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2464					obj_request->bio_list);
2465	if (result)
2466		goto out_err;
2467
2468	img_request->callback = rbd_img_parent_read_callback;
2469	result = rbd_img_request_submit(img_request);
2470	if (result)
2471		goto out_err;
2472
2473	return;
2474out_err:
2475	if (img_request)
2476		rbd_img_request_put(img_request);
2477	obj_request->result = result;
2478	obj_request->xferred = 0;
2479	obj_request_done_set(obj_request);
2480}
2481
2482static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2483				   u64 ver, u64 notify_id)
2484{
2485	struct rbd_obj_request *obj_request;
2486	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2487	int ret;
2488
2489	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2490							OBJ_REQUEST_NODATA);
2491	if (!obj_request)
2492		return -ENOMEM;
2493
2494	ret = -ENOMEM;
2495	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2496	if (!obj_request->osd_req)
2497		goto out;
2498	obj_request->callback = rbd_obj_request_put;
2499
2500	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2501					notify_id, ver, 0);
2502	rbd_osd_req_format_read(obj_request);
2503
2504	ret = rbd_obj_request_submit(osdc, obj_request);
2505out:
2506	if (ret)
2507		rbd_obj_request_put(obj_request);
2508
2509	return ret;
2510}
2511
2512static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2513{
2514	struct rbd_device *rbd_dev = (struct rbd_device *)data;
2515	u64 hver;
2516
2517	if (!rbd_dev)
2518		return;
2519
2520	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2521		rbd_dev->header_name, (unsigned long long) notify_id,
2522		(unsigned int) opcode);
2523	(void)rbd_dev_refresh(rbd_dev, &hver);
2524
2525	rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2526}
2527
2528/*
2529 * Request sync osd watch/unwatch.  The value of "start" determines
2530 * whether a watch request is being initiated or torn down.
2531 */
2532static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2533{
2534	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2535	struct rbd_obj_request *obj_request;
2536	int ret;
2537
2538	rbd_assert(start ^ !!rbd_dev->watch_event);
2539	rbd_assert(start ^ !!rbd_dev->watch_request);
2540
2541	if (start) {
2542		ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2543						&rbd_dev->watch_event);
2544		if (ret < 0)
2545			return ret;
2546		rbd_assert(rbd_dev->watch_event != NULL);
2547	}
2548
2549	ret = -ENOMEM;
2550	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2551							OBJ_REQUEST_NODATA);
2552	if (!obj_request)
2553		goto out_cancel;
2554
2555	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2556	if (!obj_request->osd_req)
2557		goto out_cancel;
2558
2559	if (start)
2560		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2561	else
2562		ceph_osdc_unregister_linger_request(osdc,
2563					rbd_dev->watch_request->osd_req);
2564
2565	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2566				rbd_dev->watch_event->cookie,
2567				rbd_dev->header.obj_version, start);
2568	rbd_osd_req_format_write(obj_request);
2569
2570	ret = rbd_obj_request_submit(osdc, obj_request);
2571	if (ret)
2572		goto out_cancel;
2573	ret = rbd_obj_request_wait(obj_request);
2574	if (ret)
2575		goto out_cancel;
2576	ret = obj_request->result;
2577	if (ret)
2578		goto out_cancel;
2579
2580	/*
2581	 * A watch request is set to linger, so the underlying osd
2582	 * request won't go away until we unregister it.  We retain
2583	 * a pointer to the object request during that time (in
2584	 * rbd_dev->watch_request), so we'll keep a reference to
2585	 * it.  We'll drop that reference (below) after we've
2586	 * unregistered it.
2587	 */
2588	if (start) {
2589		rbd_dev->watch_request = obj_request;
2590
2591		return 0;
2592	}
2593
2594	/* We have successfully torn down the watch request */
2595
2596	rbd_obj_request_put(rbd_dev->watch_request);
2597	rbd_dev->watch_request = NULL;
2598out_cancel:
2599	/* Cancel the event if we're tearing down, or on error */
2600	ceph_osdc_cancel_event(rbd_dev->watch_event);
2601	rbd_dev->watch_event = NULL;
2602	if (obj_request)
2603		rbd_obj_request_put(obj_request);
2604
2605	return ret;
2606}
2607
2608/*
2609 * Synchronous osd object method call.  Returns the number of bytes
2610 * returned in the outbound buffer, or a negative error code.
2611 */
2612static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2613			     const char *object_name,
2614			     const char *class_name,
2615			     const char *method_name,
2616			     const void *outbound,
2617			     size_t outbound_size,
2618			     void *inbound,
2619			     size_t inbound_size,
2620			     u64 *version)
2621{
2622	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2623	struct rbd_obj_request *obj_request;
2624	struct page **pages;
2625	u32 page_count;
2626	int ret;
2627
2628	/*
2629	 * Method calls are ultimately read operations.  The result
2630	 * should placed into the inbound buffer provided.  They
2631	 * also supply outbound data--parameters for the object
2632	 * method.  Currently if this is present it will be a
2633	 * snapshot id.
2634	 */
2635	page_count = (u32)calc_pages_for(0, inbound_size);
2636	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2637	if (IS_ERR(pages))
2638		return PTR_ERR(pages);
2639
2640	ret = -ENOMEM;
2641	obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2642							OBJ_REQUEST_PAGES);
2643	if (!obj_request)
2644		goto out;
2645
2646	obj_request->pages = pages;
2647	obj_request->page_count = page_count;
2648
2649	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2650	if (!obj_request->osd_req)
2651		goto out;
2652
2653	osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2654					class_name, method_name);
2655	if (outbound_size) {
2656		struct ceph_pagelist *pagelist;
2657
2658		pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2659		if (!pagelist)
2660			goto out;
2661
2662		ceph_pagelist_init(pagelist);
2663		ceph_pagelist_append(pagelist, outbound, outbound_size);
2664		osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2665						pagelist);
2666	}
2667	osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2668					obj_request->pages, inbound_size,
2669					0, false, false);
2670	rbd_osd_req_format_read(obj_request);
2671
2672	ret = rbd_obj_request_submit(osdc, obj_request);
2673	if (ret)
2674		goto out;
2675	ret = rbd_obj_request_wait(obj_request);
2676	if (ret)
2677		goto out;
2678
2679	ret = obj_request->result;
2680	if (ret < 0)
2681		goto out;
2682
2683	rbd_assert(obj_request->xferred < (u64)INT_MAX);
2684	ret = (int)obj_request->xferred;
2685	ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2686	if (version)
2687		*version = obj_request->version;
2688out:
2689	if (obj_request)
2690		rbd_obj_request_put(obj_request);
2691	else
2692		ceph_release_page_vector(pages, page_count);
2693
2694	return ret;
2695}
2696
2697static void rbd_request_fn(struct request_queue *q)
2698		__releases(q->queue_lock) __acquires(q->queue_lock)
2699{
2700	struct rbd_device *rbd_dev = q->queuedata;
2701	bool read_only = rbd_dev->mapping.read_only;
2702	struct request *rq;
2703	int result;
2704
2705	while ((rq = blk_fetch_request(q))) {
2706		bool write_request = rq_data_dir(rq) == WRITE;
2707		struct rbd_img_request *img_request;
2708		u64 offset;
2709		u64 length;
2710
2711		/* Ignore any non-FS requests that filter through. */
2712
2713		if (rq->cmd_type != REQ_TYPE_FS) {
2714			dout("%s: non-fs request type %d\n", __func__,
2715				(int) rq->cmd_type);
2716			__blk_end_request_all(rq, 0);
2717			continue;
2718		}
2719
2720		/* Ignore/skip any zero-length requests */
2721
2722		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2723		length = (u64) blk_rq_bytes(rq);
2724
2725		if (!length) {
2726			dout("%s: zero-length request\n", __func__);
2727			__blk_end_request_all(rq, 0);
2728			continue;
2729		}
2730
2731		spin_unlock_irq(q->queue_lock);
2732
2733		/* Disallow writes to a read-only device */
2734
2735		if (write_request) {
2736			result = -EROFS;
2737			if (read_only)
2738				goto end_request;
2739			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2740		}
2741
2742		/*
2743		 * Quit early if the mapped snapshot no longer
2744		 * exists.  It's still possible the snapshot will
2745		 * have disappeared by the time our request arrives
2746		 * at the osd, but there's no sense in sending it if
2747		 * we already know.
2748		 */
2749		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2750			dout("request for non-existent snapshot");
2751			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2752			result = -ENXIO;
2753			goto end_request;
2754		}
2755
2756		result = -EINVAL;
2757		if (offset && length > U64_MAX - offset + 1) {
2758			rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2759				offset, length);
2760			goto end_request;	/* Shouldn't happen */
2761		}
2762
2763		result = -ENOMEM;
2764		img_request = rbd_img_request_create(rbd_dev, offset, length,
2765							write_request, false);
2766		if (!img_request)
2767			goto end_request;
2768
2769		img_request->rq = rq;
2770
2771		result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2772						rq->bio);
2773		if (!result)
2774			result = rbd_img_request_submit(img_request);
2775		if (result)
2776			rbd_img_request_put(img_request);
2777end_request:
2778		spin_lock_irq(q->queue_lock);
2779		if (result < 0) {
2780			rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2781				write_request ? "write" : "read",
2782				length, offset, result);
2783
2784			__blk_end_request_all(rq, result);
2785		}
2786	}
2787}
2788
2789/*
2790 * a queue callback. Makes sure that we don't create a bio that spans across
2791 * multiple osd objects. One exception would be with a single page bios,
2792 * which we handle later at bio_chain_clone_range()
2793 */
2794static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2795			  struct bio_vec *bvec)
2796{
2797	struct rbd_device *rbd_dev = q->queuedata;
2798	sector_t sector_offset;
2799	sector_t sectors_per_obj;
2800	sector_t obj_sector_offset;
2801	int ret;
2802
2803	/*
2804	 * Find how far into its rbd object the partition-relative
2805	 * bio start sector is to offset relative to the enclosing
2806	 * device.
2807	 */
2808	sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2809	sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2810	obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2811
2812	/*
2813	 * Compute the number of bytes from that offset to the end
2814	 * of the object.  Account for what's already used by the bio.
2815	 */
2816	ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2817	if (ret > bmd->bi_size)
2818		ret -= bmd->bi_size;
2819	else
2820		ret = 0;
2821
2822	/*
2823	 * Don't send back more than was asked for.  And if the bio
2824	 * was empty, let the whole thing through because:  "Note
2825	 * that a block device *must* allow a single page to be
2826	 * added to an empty bio."
2827	 */
2828	rbd_assert(bvec->bv_len <= PAGE_SIZE);
2829	if (ret > (int) bvec->bv_len || !bmd->bi_size)
2830		ret = (int) bvec->bv_len;
2831
2832	return ret;
2833}
2834
2835static void rbd_free_disk(struct rbd_device *rbd_dev)
2836{
2837	struct gendisk *disk = rbd_dev->disk;
2838
2839	if (!disk)
2840		return;
2841
2842	rbd_dev->disk = NULL;
2843	if (disk->flags & GENHD_FL_UP) {
2844		del_gendisk(disk);
2845		if (disk->queue)
2846			blk_cleanup_queue(disk->queue);
2847	}
2848	put_disk(disk);
2849}
2850
2851static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2852				const char *object_name,
2853				u64 offset, u64 length,
2854				void *buf, u64 *version)
2855
2856{
2857	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2858	struct rbd_obj_request *obj_request;
2859	struct page **pages = NULL;
2860	u32 page_count;
2861	size_t size;
2862	int ret;
2863
2864	page_count = (u32) calc_pages_for(offset, length);
2865	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2866	if (IS_ERR(pages))
2867		ret = PTR_ERR(pages);
2868
2869	ret = -ENOMEM;
2870	obj_request = rbd_obj_request_create(object_name, offset, length,
2871							OBJ_REQUEST_PAGES);
2872	if (!obj_request)
2873		goto out;
2874
2875	obj_request->pages = pages;
2876	obj_request->page_count = page_count;
2877
2878	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2879	if (!obj_request->osd_req)
2880		goto out;
2881
2882	osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2883					offset, length, 0, 0);
2884	osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2885					obj_request->pages,
2886					obj_request->length,
2887					obj_request->offset & ~PAGE_MASK,
2888					false, false);
2889	rbd_osd_req_format_read(obj_request);
2890
2891	ret = rbd_obj_request_submit(osdc, obj_request);
2892	if (ret)
2893		goto out;
2894	ret = rbd_obj_request_wait(obj_request);
2895	if (ret)
2896		goto out;
2897
2898	ret = obj_request->result;
2899	if (ret < 0)
2900		goto out;
2901
2902	rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2903	size = (size_t) obj_request->xferred;
2904	ceph_copy_from_page_vector(pages, buf, 0, size);
2905	rbd_assert(size <= (size_t) INT_MAX);
2906	ret = (int) size;
2907	if (version)
2908		*version = obj_request->version;
2909out:
2910	if (obj_request)
2911		rbd_obj_request_put(obj_request);
2912	else
2913		ceph_release_page_vector(pages, page_count);
2914
2915	return ret;
2916}
2917
2918/*
2919 * Read the complete header for the given rbd device.
2920 *
2921 * Returns a pointer to a dynamically-allocated buffer containing
2922 * the complete and validated header.  Caller can pass the address
2923 * of a variable that will be filled in with the version of the
2924 * header object at the time it was read.
2925 *
2926 * Returns a pointer-coded errno if a failure occurs.
2927 */
2928static struct rbd_image_header_ondisk *
2929rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2930{
2931	struct rbd_image_header_ondisk *ondisk = NULL;
2932	u32 snap_count = 0;
2933	u64 names_size = 0;
2934	u32 want_count;
2935	int ret;
2936
2937	/*
2938	 * The complete header will include an array of its 64-bit
2939	 * snapshot ids, followed by the names of those snapshots as
2940	 * a contiguous block of NUL-terminated strings.  Note that
2941	 * the number of snapshots could change by the time we read
2942	 * it in, in which case we re-read it.
2943	 */
2944	do {
2945		size_t size;
2946
2947		kfree(ondisk);
2948
2949		size = sizeof (*ondisk);
2950		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2951		size += names_size;
2952		ondisk = kmalloc(size, GFP_KERNEL);
2953		if (!ondisk)
2954			return ERR_PTR(-ENOMEM);
2955
2956		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2957				       0, size, ondisk, version);
2958		if (ret < 0)
2959			goto out_err;
2960		if ((size_t)ret < size) {
2961			ret = -ENXIO;
2962			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2963				size, ret);
2964			goto out_err;
2965		}
2966		if (!rbd_dev_ondisk_valid(ondisk)) {
2967			ret = -ENXIO;
2968			rbd_warn(rbd_dev, "invalid header");
2969			goto out_err;
2970		}
2971
2972		names_size = le64_to_cpu(ondisk->snap_names_len);
2973		want_count = snap_count;
2974		snap_count = le32_to_cpu(ondisk->snap_count);
2975	} while (snap_count != want_count);
2976
2977	return ondisk;
2978
2979out_err:
2980	kfree(ondisk);
2981
2982	return ERR_PTR(ret);
2983}
2984
2985/*
2986 * reload the ondisk the header
2987 */
2988static int rbd_read_header(struct rbd_device *rbd_dev,
2989			   struct rbd_image_header *header)
2990{
2991	struct rbd_image_header_ondisk *ondisk;
2992	u64 ver = 0;
2993	int ret;
2994
2995	ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2996	if (IS_ERR(ondisk))
2997		return PTR_ERR(ondisk);
2998	ret = rbd_header_from_disk(header, ondisk);
2999	if (ret >= 0)
3000		header->obj_version = ver;
3001	kfree(ondisk);
3002
3003	return ret;
3004}
3005
3006static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3007{
3008	struct rbd_snap *snap;
3009	struct rbd_snap *next;
3010
3011	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
3012		list_del(&snap->node);
3013		rbd_snap_destroy(snap);
3014	}
3015}
3016
3017static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3018{
3019	sector_t size;
3020
3021	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3022		return;
3023
3024	size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
3025	dout("setting size to %llu sectors", (unsigned long long) size);
3026	rbd_dev->mapping.size = (u64) size;
3027	set_capacity(rbd_dev->disk, size);
3028}
3029
3030/*
3031 * only read the first part of the ondisk header, without the snaps info
3032 */
3033static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3034{
3035	int ret;
3036	struct rbd_image_header h;
3037
3038	ret = rbd_read_header(rbd_dev, &h);
3039	if (ret < 0)
3040		return ret;
3041
3042	down_write(&rbd_dev->header_rwsem);
3043
3044	/* Update image size, and check for resize of mapped image */
3045	rbd_dev->header.image_size = h.image_size;
3046	rbd_update_mapping_size(rbd_dev);
3047
3048	/* rbd_dev->header.object_prefix shouldn't change */
3049	kfree(rbd_dev->header.snap_sizes);
3050	kfree(rbd_dev->header.snap_names);
3051	/* osd requests may still refer to snapc */
3052	ceph_put_snap_context(rbd_dev->header.snapc);
3053
3054	if (hver)
3055		*hver = h.obj_version;
3056	rbd_dev->header.obj_version = h.obj_version;
3057	rbd_dev->header.image_size = h.image_size;
3058	rbd_dev->header.snapc = h.snapc;
3059	rbd_dev->header.snap_names = h.snap_names;
3060	rbd_dev->header.snap_sizes = h.snap_sizes;
3061	/* Free the extra copy of the object prefix */
3062	if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3063		rbd_warn(rbd_dev, "object prefix changed (ignoring)");
3064	kfree(h.object_prefix);
3065
3066	ret = rbd_dev_snaps_update(rbd_dev);
3067
3068	up_write(&rbd_dev->header_rwsem);
3069
3070	return ret;
3071}
3072
3073static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3074{
3075	int ret;
3076
3077	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3078	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3079	if (rbd_dev->image_format == 1)
3080		ret = rbd_dev_v1_refresh(rbd_dev, hver);
3081	else
3082		ret = rbd_dev_v2_refresh(rbd_dev, hver);
3083	mutex_unlock(&ctl_mutex);
3084	revalidate_disk(rbd_dev->disk);
3085	if (ret)
3086		rbd_warn(rbd_dev, "got notification but failed to "
3087			   " update snaps: %d\n", ret);
3088
3089	return ret;
3090}
3091
3092static int rbd_init_disk(struct rbd_device *rbd_dev)
3093{
3094	struct gendisk *disk;
3095	struct request_queue *q;
3096	u64 segment_size;
3097
3098	/* create gendisk info */
3099	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3100	if (!disk)
3101		return -ENOMEM;
3102
3103	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3104		 rbd_dev->dev_id);
3105	disk->major = rbd_dev->major;
3106	disk->first_minor = 0;
3107	disk->fops = &rbd_bd_ops;
3108	disk->private_data = rbd_dev;
3109
3110	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3111	if (!q)
3112		goto out_disk;
3113
3114	/* We use the default size, but let's be explicit about it. */
3115	blk_queue_physical_block_size(q, SECTOR_SIZE);
3116
3117	/* set io sizes to object size */
3118	segment_size = rbd_obj_bytes(&rbd_dev->header);
3119	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3120	blk_queue_max_segment_size(q, segment_size);
3121	blk_queue_io_min(q, segment_size);
3122	blk_queue_io_opt(q, segment_size);
3123
3124	blk_queue_merge_bvec(q, rbd_merge_bvec);
3125	disk->queue = q;
3126
3127	q->queuedata = rbd_dev;
3128
3129	rbd_dev->disk = disk;
3130
3131	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3132
3133	return 0;
3134out_disk:
3135	put_disk(disk);
3136
3137	return -ENOMEM;
3138}
3139
3140/*
3141  sysfs
3142*/
3143
3144static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3145{
3146	return container_of(dev, struct rbd_device, dev);
3147}
3148
3149static ssize_t rbd_size_show(struct device *dev,
3150			     struct device_attribute *attr, char *buf)
3151{
3152	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3153	sector_t size;
3154
3155	down_read(&rbd_dev->header_rwsem);
3156	size = get_capacity(rbd_dev->disk);
3157	up_read(&rbd_dev->header_rwsem);
3158
3159	return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
3160}
3161
3162/*
3163 * Note this shows the features for whatever's mapped, which is not
3164 * necessarily the base image.
3165 */
3166static ssize_t rbd_features_show(struct device *dev,
3167			     struct device_attribute *attr, char *buf)
3168{
3169	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3170
3171	return sprintf(buf, "0x%016llx\n",
3172			(unsigned long long) rbd_dev->mapping.features);
3173}
3174
3175static ssize_t rbd_major_show(struct device *dev,
3176			      struct device_attribute *attr, char *buf)
3177{
3178	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3179
3180	return sprintf(buf, "%d\n", rbd_dev->major);
3181}
3182
3183static ssize_t rbd_client_id_show(struct device *dev,
3184				  struct device_attribute *attr, char *buf)
3185{
3186	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3187
3188	return sprintf(buf, "client%lld\n",
3189			ceph_client_id(rbd_dev->rbd_client->client));
3190}
3191
3192static ssize_t rbd_pool_show(struct device *dev,
3193			     struct device_attribute *attr, char *buf)
3194{
3195	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3196
3197	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3198}
3199
3200static ssize_t rbd_pool_id_show(struct device *dev,
3201			     struct device_attribute *attr, char *buf)
3202{
3203	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3204
3205	return sprintf(buf, "%llu\n",
3206		(unsigned long long) rbd_dev->spec->pool_id);
3207}
3208
3209static ssize_t rbd_name_show(struct device *dev,
3210			     struct device_attribute *attr, char *buf)
3211{
3212	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3213
3214	if (rbd_dev->spec->image_name)
3215		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3216
3217	return sprintf(buf, "(unknown)\n");
3218}
3219
3220static ssize_t rbd_image_id_show(struct device *dev,
3221			     struct device_attribute *attr, char *buf)
3222{
3223	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3224
3225	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3226}
3227
3228/*
3229 * Shows the name of the currently-mapped snapshot (or
3230 * RBD_SNAP_HEAD_NAME for the base image).
3231 */
3232static ssize_t rbd_snap_show(struct device *dev,
3233			     struct device_attribute *attr,
3234			     char *buf)
3235{
3236	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3237
3238	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3239}
3240
3241/*
3242 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3243 * for the parent image.  If there is no parent, simply shows
3244 * "(no parent image)".
3245 */
3246static ssize_t rbd_parent_show(struct device *dev,
3247			     struct device_attribute *attr,
3248			     char *buf)
3249{
3250	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3251	struct rbd_spec *spec = rbd_dev->parent_spec;
3252	int count;
3253	char *bufp = buf;
3254
3255	if (!spec)
3256		return sprintf(buf, "(no parent image)\n");
3257
3258	count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3259			(unsigned long long) spec->pool_id, spec->pool_name);
3260	if (count < 0)
3261		return count;
3262	bufp += count;
3263
3264	count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3265			spec->image_name ? spec->image_name : "(unknown)");
3266	if (count < 0)
3267		return count;
3268	bufp += count;
3269
3270	count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3271			(unsigned long long) spec->snap_id, spec->snap_name);
3272	if (count < 0)
3273		return count;
3274	bufp += count;
3275
3276	count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3277	if (count < 0)
3278		return count;
3279	bufp += count;
3280
3281	return (ssize_t) (bufp - buf);
3282}
3283
3284static ssize_t rbd_image_refresh(struct device *dev,
3285				 struct device_attribute *attr,
3286				 const char *buf,
3287				 size_t size)
3288{
3289	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3290	int ret;
3291
3292	ret = rbd_dev_refresh(rbd_dev, NULL);
3293
3294	return ret < 0 ? ret : size;
3295}
3296
3297static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3298static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3299static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3300static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3301static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3302static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3303static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3304static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3305static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3306static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3307static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3308
3309static struct attribute *rbd_attrs[] = {
3310	&dev_attr_size.attr,
3311	&dev_attr_features.attr,
3312	&dev_attr_major.attr,
3313	&dev_attr_client_id.attr,
3314	&dev_attr_pool.attr,
3315	&dev_attr_pool_id.attr,
3316	&dev_attr_name.attr,
3317	&dev_attr_image_id.attr,
3318	&dev_attr_current_snap.attr,
3319	&dev_attr_parent.attr,
3320	&dev_attr_refresh.attr,
3321	NULL
3322};
3323
3324static struct attribute_group rbd_attr_group = {
3325	.attrs = rbd_attrs,
3326};
3327
3328static const struct attribute_group *rbd_attr_groups[] = {
3329	&rbd_attr_group,
3330	NULL
3331};
3332
3333static void rbd_sysfs_dev_release(struct device *dev)
3334{
3335}
3336
3337static struct device_type rbd_device_type = {
3338	.name		= "rbd",
3339	.groups		= rbd_attr_groups,
3340	.release	= rbd_sysfs_dev_release,
3341};
3342
3343static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3344{
3345	kref_get(&spec->kref);
3346
3347	return spec;
3348}
3349
3350static void rbd_spec_free(struct kref *kref);
3351static void rbd_spec_put(struct rbd_spec *spec)
3352{
3353	if (spec)
3354		kref_put(&spec->kref, rbd_spec_free);
3355}
3356
3357static struct rbd_spec *rbd_spec_alloc(void)
3358{
3359	struct rbd_spec *spec;
3360
3361	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3362	if (!spec)
3363		return NULL;
3364	kref_init(&spec->kref);
3365
3366	return spec;
3367}
3368
3369static void rbd_spec_free(struct kref *kref)
3370{
3371	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3372
3373	kfree(spec->pool_name);
3374	kfree(spec->image_id);
3375	kfree(spec->image_name);
3376	kfree(spec->snap_name);
3377	kfree(spec);
3378}
3379
3380static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3381				struct rbd_spec *spec)
3382{
3383	struct rbd_device *rbd_dev;
3384
3385	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3386	if (!rbd_dev)
3387		return NULL;
3388
3389	spin_lock_init(&rbd_dev->lock);
3390	rbd_dev->flags = 0;
3391	INIT_LIST_HEAD(&rbd_dev->node);
3392	INIT_LIST_HEAD(&rbd_dev->snaps);
3393	init_rwsem(&rbd_dev->header_rwsem);
3394
3395	rbd_dev->spec = spec;
3396	rbd_dev->rbd_client = rbdc;
3397
3398	/* Initialize the layout used for all rbd requests */
3399
3400	rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3401	rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3402	rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3403	rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3404
3405	return rbd_dev;
3406}
3407
3408static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3409{
3410	rbd_spec_put(rbd_dev->parent_spec);
3411	kfree(rbd_dev->header_name);
3412	rbd_put_client(rbd_dev->rbd_client);
3413	rbd_spec_put(rbd_dev->spec);
3414	kfree(rbd_dev);
3415}
3416
3417static void rbd_snap_destroy(struct rbd_snap *snap)
3418{
3419	kfree(snap->name);
3420	kfree(snap);
3421}
3422
3423static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
3424						const char *snap_name,
3425						u64 snap_id, u64 snap_size,
3426						u64 snap_features)
3427{
3428	struct rbd_snap *snap;
3429
3430	snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3431	if (!snap)
3432		return ERR_PTR(-ENOMEM);
3433
3434	snap->name = snap_name;
3435	snap->id = snap_id;
3436	snap->size = snap_size;
3437	snap->features = snap_features;
3438
3439	return snap;
3440}
3441
3442/*
3443 * Returns a dynamically-allocated snapshot name if successful, or a
3444 * pointer-coded error otherwise.
3445 */
3446static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3447		u64 *snap_size, u64 *snap_features)
3448{
3449	char *snap_name;
3450	int i;
3451
3452	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3453
3454	/* Skip over names until we find the one we are looking for */
3455
3456	snap_name = rbd_dev->header.snap_names;
3457	for (i = 0; i < which; i++)
3458		snap_name += strlen(snap_name) + 1;
3459
3460	snap_name = kstrdup(snap_name, GFP_KERNEL);
3461	if (!snap_name)
3462		return ERR_PTR(-ENOMEM);
3463
3464	*snap_size = rbd_dev->header.snap_sizes[which];
3465	*snap_features = 0;	/* No features for v1 */
3466
3467	return snap_name;
3468}
3469
3470/*
3471 * Get the size and object order for an image snapshot, or if
3472 * snap_id is CEPH_NOSNAP, gets this information for the base
3473 * image.
3474 */
3475static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3476				u8 *order, u64 *snap_size)
3477{
3478	__le64 snapid = cpu_to_le64(snap_id);
3479	int ret;
3480	struct {
3481		u8 order;
3482		__le64 size;
3483	} __attribute__ ((packed)) size_buf = { 0 };
3484
3485	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3486				"rbd", "get_size",
3487				&snapid, sizeof (snapid),
3488				&size_buf, sizeof (size_buf), NULL);
3489	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3490	if (ret < 0)
3491		return ret;
3492	if (ret < sizeof (size_buf))
3493		return -ERANGE;
3494
3495	if (order)
3496		*order = size_buf.order;
3497	*snap_size = le64_to_cpu(size_buf.size);
3498
3499	dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3500		(unsigned long long)snap_id, (unsigned int)*order,
3501		(unsigned long long)*snap_size);
3502
3503	return 0;
3504}
3505
3506static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3507{
3508	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3509					&rbd_dev->header.obj_order,
3510					&rbd_dev->header.image_size);
3511}
3512
3513static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3514{
3515	void *reply_buf;
3516	int ret;
3517	void *p;
3518
3519	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3520	if (!reply_buf)
3521		return -ENOMEM;
3522
3523	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3524				"rbd", "get_object_prefix", NULL, 0,
3525				reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3526	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3527	if (ret < 0)
3528		goto out;
3529
3530	p = reply_buf;
3531	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3532						p + ret, NULL, GFP_NOIO);
3533	ret = 0;
3534
3535	if (IS_ERR(rbd_dev->header.object_prefix)) {
3536		ret = PTR_ERR(rbd_dev->header.object_prefix);
3537		rbd_dev->header.object_prefix = NULL;
3538	} else {
3539		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3540	}
3541out:
3542	kfree(reply_buf);
3543
3544	return ret;
3545}
3546
3547static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3548		u64 *snap_features)
3549{
3550	__le64 snapid = cpu_to_le64(snap_id);
3551	struct {
3552		__le64 features;
3553		__le64 incompat;
3554	} __attribute__ ((packed)) features_buf = { 0 };
3555	u64 incompat;
3556	int ret;
3557
3558	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3559				"rbd", "get_features",
3560				&snapid, sizeof (snapid),
3561				&features_buf, sizeof (features_buf), NULL);
3562	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3563	if (ret < 0)
3564		return ret;
3565	if (ret < sizeof (features_buf))
3566		return -ERANGE;
3567
3568	incompat = le64_to_cpu(features_buf.incompat);
3569	if (incompat & ~RBD_FEATURES_SUPPORTED)
3570		return -ENXIO;
3571
3572	*snap_features = le64_to_cpu(features_buf.features);
3573
3574	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3575		(unsigned long long)snap_id,
3576		(unsigned long long)*snap_features,
3577		(unsigned long long)le64_to_cpu(features_buf.incompat));
3578
3579	return 0;
3580}
3581
3582static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3583{
3584	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3585						&rbd_dev->header.features);
3586}
3587
3588static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3589{
3590	struct rbd_spec *parent_spec;
3591	size_t size;
3592	void *reply_buf = NULL;
3593	__le64 snapid;
3594	void *p;
3595	void *end;
3596	char *image_id;
3597	u64 overlap;
3598	int ret;
3599
3600	parent_spec = rbd_spec_alloc();
3601	if (!parent_spec)
3602		return -ENOMEM;
3603
3604	size = sizeof (__le64) +				/* pool_id */
3605		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
3606		sizeof (__le64) +				/* snap_id */
3607		sizeof (__le64);				/* overlap */
3608	reply_buf = kmalloc(size, GFP_KERNEL);
3609	if (!reply_buf) {
3610		ret = -ENOMEM;
3611		goto out_err;
3612	}
3613
3614	snapid = cpu_to_le64(CEPH_NOSNAP);
3615	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3616				"rbd", "get_parent",
3617				&snapid, sizeof (snapid),
3618				reply_buf, size, NULL);
3619	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3620	if (ret < 0)
3621		goto out_err;
3622
3623	p = reply_buf;
3624	end = reply_buf + ret;
3625	ret = -ERANGE;
3626	ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3627	if (parent_spec->pool_id == CEPH_NOPOOL)
3628		goto out;	/* No parent?  No problem. */
3629
3630	/* The ceph file layout needs to fit pool id in 32 bits */
3631
3632	ret = -EIO;
3633	if (parent_spec->pool_id > (u64)U32_MAX) {
3634		rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3635			(unsigned long long)parent_spec->pool_id, U32_MAX);
3636		goto out_err;
3637	}
3638
3639	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3640	if (IS_ERR(image_id)) {
3641		ret = PTR_ERR(image_id);
3642		goto out_err;
3643	}
3644	parent_spec->image_id = image_id;
3645	ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3646	ceph_decode_64_safe(&p, end, overlap, out_err);
3647
3648	rbd_dev->parent_overlap = overlap;
3649	rbd_dev->parent_spec = parent_spec;
3650	parent_spec = NULL;	/* rbd_dev now owns this */
3651out:
3652	ret = 0;
3653out_err:
3654	kfree(reply_buf);
3655	rbd_spec_put(parent_spec);
3656
3657	return ret;
3658}
3659
3660static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3661{
3662	struct {
3663		__le64 stripe_unit;
3664		__le64 stripe_count;
3665	} __attribute__ ((packed)) striping_info_buf = { 0 };
3666	size_t size = sizeof (striping_info_buf);
3667	void *p;
3668	u64 obj_size;
3669	u64 stripe_unit;
3670	u64 stripe_count;
3671	int ret;
3672
3673	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3674				"rbd", "get_stripe_unit_count", NULL, 0,
3675				(char *)&striping_info_buf, size, NULL);
3676	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3677	if (ret < 0)
3678		return ret;
3679	if (ret < size)
3680		return -ERANGE;
3681
3682	/*
3683	 * We don't actually support the "fancy striping" feature
3684	 * (STRIPINGV2) yet, but if the striping sizes are the
3685	 * defaults the behavior is the same as before.  So find
3686	 * out, and only fail if the image has non-default values.
3687	 */
3688	ret = -EINVAL;
3689	obj_size = (u64)1 << rbd_dev->header.obj_order;
3690	p = &striping_info_buf;
3691	stripe_unit = ceph_decode_64(&p);
3692	if (stripe_unit != obj_size) {
3693		rbd_warn(rbd_dev, "unsupported stripe unit "
3694				"(got %llu want %llu)",
3695				stripe_unit, obj_size);
3696		return -EINVAL;
3697	}
3698	stripe_count = ceph_decode_64(&p);
3699	if (stripe_count != 1) {
3700		rbd_warn(rbd_dev, "unsupported stripe count "
3701				"(got %llu want 1)", stripe_count);
3702		return -EINVAL;
3703	}
3704	rbd_dev->header.stripe_unit = stripe_unit;
3705	rbd_dev->header.stripe_count = stripe_count;
3706
3707	return 0;
3708}
3709
3710static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3711{
3712	size_t image_id_size;
3713	char *image_id;
3714	void *p;
3715	void *end;
3716	size_t size;
3717	void *reply_buf = NULL;
3718	size_t len = 0;
3719	char *image_name = NULL;
3720	int ret;
3721
3722	rbd_assert(!rbd_dev->spec->image_name);
3723
3724	len = strlen(rbd_dev->spec->image_id);
3725	image_id_size = sizeof (__le32) + len;
3726	image_id = kmalloc(image_id_size, GFP_KERNEL);
3727	if (!image_id)
3728		return NULL;
3729
3730	p = image_id;
3731	end = image_id + image_id_size;
3732	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3733
3734	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3735	reply_buf = kmalloc(size, GFP_KERNEL);
3736	if (!reply_buf)
3737		goto out;
3738
3739	ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3740				"rbd", "dir_get_name",
3741				image_id, image_id_size,
3742				reply_buf, size, NULL);
3743	if (ret < 0)
3744		goto out;
3745	p = reply_buf;
3746	end = reply_buf + ret;
3747
3748	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3749	if (IS_ERR(image_name))
3750		image_name = NULL;
3751	else
3752		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3753out:
3754	kfree(reply_buf);
3755	kfree(image_id);
3756
3757	return image_name;
3758}
3759
3760/*
3761 * When a parent image gets probed, we only have the pool, image,
3762 * and snapshot ids but not the names of any of them.  This call
3763 * is made later to fill in those names.  It has to be done after
3764 * rbd_dev_snaps_update() has completed because some of the
3765 * information (in particular, snapshot name) is not available
3766 * until then.
3767 *
3768 * When an image being mapped (not a parent) is probed, we have the
3769 * pool name and pool id, image name and image id, and the snapshot
3770 * name.  The only thing we're missing is the snapshot id.
3771 */
3772static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3773{
3774	struct ceph_osd_client *osdc;
3775	const char *name;
3776	void *reply_buf = NULL;
3777	int ret;
3778
3779	/*
3780	 * An image being mapped will have the pool name (etc.), but
3781	 * we need to look up the snapshot id.
3782	 */
3783	if (rbd_dev->spec->pool_name) {
3784		if (strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME)) {
3785			struct rbd_snap *snap;
3786
3787			snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
3788			if (!snap)
3789				return -ENOENT;
3790			rbd_dev->spec->snap_id = snap->id;
3791		} else {
3792			rbd_dev->spec->snap_id = CEPH_NOSNAP;
3793		}
3794
3795		return 0;
3796	}
3797
3798	/* Look up the pool name */
3799
3800	osdc = &rbd_dev->rbd_client->client->osdc;
3801	name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3802	if (!name) {
3803		rbd_warn(rbd_dev, "there is no pool with id %llu",
3804			rbd_dev->spec->pool_id);	/* Really a BUG() */
3805		return -EIO;
3806	}
3807
3808	rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3809	if (!rbd_dev->spec->pool_name)
3810		return -ENOMEM;
3811
3812	/* Fetch the image name; tolerate failure here */
3813
3814	name = rbd_dev_image_name(rbd_dev);
3815	if (name)
3816		rbd_dev->spec->image_name = (char *)name;
3817	else
3818		rbd_warn(rbd_dev, "unable to get image name");
3819
3820	/* Look up the snapshot name. */
3821
3822	name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3823	if (!name) {
3824		rbd_warn(rbd_dev, "no snapshot with id %llu",
3825			rbd_dev->spec->snap_id);	/* Really a BUG() */
3826		ret = -EIO;
3827		goto out_err;
3828	}
3829	rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3830	if(!rbd_dev->spec->snap_name)
3831		goto out_err;
3832
3833	return 0;
3834out_err:
3835	kfree(reply_buf);
3836	kfree(rbd_dev->spec->pool_name);
3837	rbd_dev->spec->pool_name = NULL;
3838
3839	return ret;
3840}
3841
3842static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3843{
3844	size_t size;
3845	int ret;
3846	void *reply_buf;
3847	void *p;
3848	void *end;
3849	u64 seq;
3850	u32 snap_count;
3851	struct ceph_snap_context *snapc;
3852	u32 i;
3853
3854	/*
3855	 * We'll need room for the seq value (maximum snapshot id),
3856	 * snapshot count, and array of that many snapshot ids.
3857	 * For now we have a fixed upper limit on the number we're
3858	 * prepared to receive.
3859	 */
3860	size = sizeof (__le64) + sizeof (__le32) +
3861			RBD_MAX_SNAP_COUNT * sizeof (__le64);
3862	reply_buf = kzalloc(size, GFP_KERNEL);
3863	if (!reply_buf)
3864		return -ENOMEM;
3865
3866	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3867				"rbd", "get_snapcontext", NULL, 0,
3868				reply_buf, size, ver);
3869	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3870	if (ret < 0)
3871		goto out;
3872
3873	p = reply_buf;
3874	end = reply_buf + ret;
3875	ret = -ERANGE;
3876	ceph_decode_64_safe(&p, end, seq, out);
3877	ceph_decode_32_safe(&p, end, snap_count, out);
3878
3879	/*
3880	 * Make sure the reported number of snapshot ids wouldn't go
3881	 * beyond the end of our buffer.  But before checking that,
3882	 * make sure the computed size of the snapshot context we
3883	 * allocate is representable in a size_t.
3884	 */
3885	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3886				 / sizeof (u64)) {
3887		ret = -EINVAL;
3888		goto out;
3889	}
3890	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3891		goto out;
3892
3893	size = sizeof (struct ceph_snap_context) +
3894				snap_count * sizeof (snapc->snaps[0]);
3895	snapc = kmalloc(size, GFP_KERNEL);
3896	if (!snapc) {
3897		ret = -ENOMEM;
3898		goto out;
3899	}
3900	ret = 0;
3901
3902	atomic_set(&snapc->nref, 1);
3903	snapc->seq = seq;
3904	snapc->num_snaps = snap_count;
3905	for (i = 0; i < snap_count; i++)
3906		snapc->snaps[i] = ceph_decode_64(&p);
3907
3908	rbd_dev->header.snapc = snapc;
3909
3910	dout("  snap context seq = %llu, snap_count = %u\n",
3911		(unsigned long long)seq, (unsigned int)snap_count);
3912out:
3913	kfree(reply_buf);
3914
3915	return ret;
3916}
3917
3918static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3919{
3920	size_t size;
3921	void *reply_buf;
3922	__le64 snap_id;
3923	int ret;
3924	void *p;
3925	void *end;
3926	char *snap_name;
3927
3928	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3929	reply_buf = kmalloc(size, GFP_KERNEL);
3930	if (!reply_buf)
3931		return ERR_PTR(-ENOMEM);
3932
3933	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3934	snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3935	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3936				"rbd", "get_snapshot_name",
3937				&snap_id, sizeof (snap_id),
3938				reply_buf, size, NULL);
3939	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3940	if (ret < 0) {
3941		snap_name = ERR_PTR(ret);
3942		goto out;
3943	}
3944
3945	p = reply_buf;
3946	end = reply_buf + ret;
3947	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3948	if (IS_ERR(snap_name))
3949		goto out;
3950
3951	dout("  snap_id 0x%016llx snap_name = %s\n",
3952		(unsigned long long)le64_to_cpu(snap_id), snap_name);
3953out:
3954	kfree(reply_buf);
3955
3956	return snap_name;
3957}
3958
3959static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3960		u64 *snap_size, u64 *snap_features)
3961{
3962	u64 snap_id;
3963	u64 size;
3964	u64 features;
3965	char *snap_name;
3966	int ret;
3967
3968	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3969	snap_id = rbd_dev->header.snapc->snaps[which];
3970	ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
3971	if (ret)
3972		goto out_err;
3973
3974	ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
3975	if (ret)
3976		goto out_err;
3977
3978	snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3979	if (!IS_ERR(snap_name)) {
3980		*snap_size = size;
3981		*snap_features = features;
3982	}
3983
3984	return snap_name;
3985out_err:
3986	return ERR_PTR(ret);
3987}
3988
3989static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3990		u64 *snap_size, u64 *snap_features)
3991{
3992	if (rbd_dev->image_format == 1)
3993		return rbd_dev_v1_snap_info(rbd_dev, which,
3994					snap_size, snap_features);
3995	if (rbd_dev->image_format == 2)
3996		return rbd_dev_v2_snap_info(rbd_dev, which,
3997					snap_size, snap_features);
3998	return ERR_PTR(-EINVAL);
3999}
4000
4001static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4002{
4003	int ret;
4004	__u8 obj_order;
4005
4006	down_write(&rbd_dev->header_rwsem);
4007
4008	/* Grab old order first, to see if it changes */
4009
4010	obj_order = rbd_dev->header.obj_order,
4011	ret = rbd_dev_v2_image_size(rbd_dev);
4012	if (ret)
4013		goto out;
4014	if (rbd_dev->header.obj_order != obj_order) {
4015		ret = -EIO;
4016		goto out;
4017	}
4018	rbd_update_mapping_size(rbd_dev);
4019
4020	ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4021	dout("rbd_dev_v2_snap_context returned %d\n", ret);
4022	if (ret)
4023		goto out;
4024	ret = rbd_dev_snaps_update(rbd_dev);
4025	dout("rbd_dev_snaps_update returned %d\n", ret);
4026	if (ret)
4027		goto out;
4028out:
4029	up_write(&rbd_dev->header_rwsem);
4030
4031	return ret;
4032}
4033
4034/*
4035 * Scan the rbd device's current snapshot list and compare it to the
4036 * newly-received snapshot context.  Remove any existing snapshots
4037 * not present in the new snapshot context.  Add a new snapshot for
4038 * any snaphots in the snapshot context not in the current list.
4039 * And verify there are no changes to snapshots we already know
4040 * about.
4041 *
4042 * Assumes the snapshots in the snapshot context are sorted by
4043 * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4044 * are also maintained in that order.)
4045 *
4046 * Note that any error occurs while updating the snapshot list
4047 * aborts the update, and the entire list is cleared.  The snapshot
4048 * list becomes inconsistent at that point anyway, so it might as
4049 * well be empty.
4050 */
4051static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4052{
4053	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4054	const u32 snap_count = snapc->num_snaps;
4055	struct list_head *head = &rbd_dev->snaps;
4056	struct list_head *links = head->next;
4057	u32 index = 0;
4058	int ret = 0;
4059
4060	dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
4061	while (index < snap_count || links != head) {
4062		u64 snap_id;
4063		struct rbd_snap *snap;
4064		char *snap_name;
4065		u64 snap_size = 0;
4066		u64 snap_features = 0;
4067
4068		snap_id = index < snap_count ? snapc->snaps[index]
4069					     : CEPH_NOSNAP;
4070		snap = links != head ? list_entry(links, struct rbd_snap, node)
4071				     : NULL;
4072		rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4073
4074		if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4075			struct list_head *next = links->next;
4076
4077			/*
4078			 * A previously-existing snapshot is not in
4079			 * the new snap context.
4080			 *
4081			 * If the now-missing snapshot is the one
4082			 * the image represents, clear its existence
4083			 * flag so we can avoid sending any more
4084			 * requests to it.
4085			 */
4086			if (rbd_dev->spec->snap_id == snap->id)
4087				clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4088			dout("removing %ssnap id %llu\n",
4089				rbd_dev->spec->snap_id == snap->id ?
4090							"mapped " : "",
4091				(unsigned long long)snap->id);
4092
4093			list_del(&snap->node);
4094			rbd_snap_destroy(snap);
4095
4096			/* Done with this list entry; advance */
4097
4098			links = next;
4099			continue;
4100		}
4101
4102		snap_name = rbd_dev_snap_info(rbd_dev, index,
4103					&snap_size, &snap_features);
4104		if (IS_ERR(snap_name)) {
4105			ret = PTR_ERR(snap_name);
4106			dout("failed to get snap info, error %d\n", ret);
4107			goto out_err;
4108		}
4109
4110		dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4111			(unsigned long long)snap_id);
4112		if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4113			struct rbd_snap *new_snap;
4114
4115			/* We haven't seen this snapshot before */
4116
4117			new_snap = rbd_snap_create(rbd_dev, snap_name,
4118					snap_id, snap_size, snap_features);
4119			if (IS_ERR(new_snap)) {
4120				ret = PTR_ERR(new_snap);
4121				dout("  failed to add dev, error %d\n", ret);
4122				goto out_err;
4123			}
4124
4125			/* New goes before existing, or at end of list */
4126
4127			dout("  added dev%s\n", snap ? "" : " at end\n");
4128			if (snap)
4129				list_add_tail(&new_snap->node, &snap->node);
4130			else
4131				list_add_tail(&new_snap->node, head);
4132		} else {
4133			/* Already have this one */
4134
4135			dout("  already present\n");
4136
4137			rbd_assert(snap->size == snap_size);
4138			rbd_assert(!strcmp(snap->name, snap_name));
4139			rbd_assert(snap->features == snap_features);
4140
4141			/* Done with this list entry; advance */
4142
4143			links = links->next;
4144		}
4145
4146		/* Advance to the next entry in the snapshot context */
4147
4148		index++;
4149	}
4150	dout("%s: done\n", __func__);
4151
4152	return 0;
4153out_err:
4154	rbd_remove_all_snaps(rbd_dev);
4155
4156	return ret;
4157}
4158
4159static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4160{
4161	struct device *dev;
4162	int ret;
4163
4164	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4165
4166	dev = &rbd_dev->dev;
4167	dev->bus = &rbd_bus_type;
4168	dev->type = &rbd_device_type;
4169	dev->parent = &rbd_root_dev;
4170	dev->release = rbd_dev_release;
4171	dev_set_name(dev, "%d", rbd_dev->dev_id);
4172	ret = device_register(dev);
4173
4174	mutex_unlock(&ctl_mutex);
4175
4176	return ret;
4177}
4178
4179static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4180{
4181	device_unregister(&rbd_dev->dev);
4182}
4183
4184static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4185
4186/*
4187 * Get a unique rbd identifier for the given new rbd_dev, and add
4188 * the rbd_dev to the global list.  The minimum rbd id is 1.
4189 */
4190static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4191{
4192	rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4193
4194	spin_lock(&rbd_dev_list_lock);
4195	list_add_tail(&rbd_dev->node, &rbd_dev_list);
4196	spin_unlock(&rbd_dev_list_lock);
4197	dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4198		(unsigned long long) rbd_dev->dev_id);
4199}
4200
4201/*
4202 * Remove an rbd_dev from the global list, and record that its
4203 * identifier is no longer in use.
4204 */
4205static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4206{
4207	struct list_head *tmp;
4208	int rbd_id = rbd_dev->dev_id;
4209	int max_id;
4210
4211	rbd_assert(rbd_id > 0);
4212
4213	dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4214		(unsigned long long) rbd_dev->dev_id);
4215	spin_lock(&rbd_dev_list_lock);
4216	list_del_init(&rbd_dev->node);
4217
4218	/*
4219	 * If the id being "put" is not the current maximum, there
4220	 * is nothing special we need to do.
4221	 */
4222	if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4223		spin_unlock(&rbd_dev_list_lock);
4224		return;
4225	}
4226
4227	/*
4228	 * We need to update the current maximum id.  Search the
4229	 * list to find out what it is.  We're more likely to find
4230	 * the maximum at the end, so search the list backward.
4231	 */
4232	max_id = 0;
4233	list_for_each_prev(tmp, &rbd_dev_list) {
4234		struct rbd_device *rbd_dev;
4235
4236		rbd_dev = list_entry(tmp, struct rbd_device, node);
4237		if (rbd_dev->dev_id > max_id)
4238			max_id = rbd_dev->dev_id;
4239	}
4240	spin_unlock(&rbd_dev_list_lock);
4241
4242	/*
4243	 * The max id could have been updated by rbd_dev_id_get(), in
4244	 * which case it now accurately reflects the new maximum.
4245	 * Be careful not to overwrite the maximum value in that
4246	 * case.
4247	 */
4248	atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4249	dout("  max dev id has been reset\n");
4250}
4251
4252/*
4253 * Skips over white space at *buf, and updates *buf to point to the
4254 * first found non-space character (if any). Returns the length of
4255 * the token (string of non-white space characters) found.  Note
4256 * that *buf must be terminated with '\0'.
4257 */
4258static inline size_t next_token(const char **buf)
4259{
4260        /*
4261        * These are the characters that produce nonzero for
4262        * isspace() in the "C" and "POSIX" locales.
4263        */
4264        const char *spaces = " \f\n\r\t\v";
4265
4266        *buf += strspn(*buf, spaces);	/* Find start of token */
4267
4268	return strcspn(*buf, spaces);   /* Return token length */
4269}
4270
4271/*
4272 * Finds the next token in *buf, and if the provided token buffer is
4273 * big enough, copies the found token into it.  The result, if
4274 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4275 * must be terminated with '\0' on entry.
4276 *
4277 * Returns the length of the token found (not including the '\0').
4278 * Return value will be 0 if no token is found, and it will be >=
4279 * token_size if the token would not fit.
4280 *
4281 * The *buf pointer will be updated to point beyond the end of the
4282 * found token.  Note that this occurs even if the token buffer is
4283 * too small to hold it.
4284 */
4285static inline size_t copy_token(const char **buf,
4286				char *token,
4287				size_t token_size)
4288{
4289        size_t len;
4290
4291	len = next_token(buf);
4292	if (len < token_size) {
4293		memcpy(token, *buf, len);
4294		*(token + len) = '\0';
4295	}
4296	*buf += len;
4297
4298        return len;
4299}
4300
4301/*
4302 * Finds the next token in *buf, dynamically allocates a buffer big
4303 * enough to hold a copy of it, and copies the token into the new
4304 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4305 * that a duplicate buffer is created even for a zero-length token.
4306 *
4307 * Returns a pointer to the newly-allocated duplicate, or a null
4308 * pointer if memory for the duplicate was not available.  If
4309 * the lenp argument is a non-null pointer, the length of the token
4310 * (not including the '\0') is returned in *lenp.
4311 *
4312 * If successful, the *buf pointer will be updated to point beyond
4313 * the end of the found token.
4314 *
4315 * Note: uses GFP_KERNEL for allocation.
4316 */
4317static inline char *dup_token(const char **buf, size_t *lenp)
4318{
4319	char *dup;
4320	size_t len;
4321
4322	len = next_token(buf);
4323	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4324	if (!dup)
4325		return NULL;
4326	*(dup + len) = '\0';
4327	*buf += len;
4328
4329	if (lenp)
4330		*lenp = len;
4331
4332	return dup;
4333}
4334
4335/*
4336 * Parse the options provided for an "rbd add" (i.e., rbd image
4337 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4338 * and the data written is passed here via a NUL-terminated buffer.
4339 * Returns 0 if successful or an error code otherwise.
4340 *
4341 * The information extracted from these options is recorded in
4342 * the other parameters which return dynamically-allocated
4343 * structures:
4344 *  ceph_opts
4345 *      The address of a pointer that will refer to a ceph options
4346 *      structure.  Caller must release the returned pointer using
4347 *      ceph_destroy_options() when it is no longer needed.
4348 *  rbd_opts
4349 *	Address of an rbd options pointer.  Fully initialized by
4350 *	this function; caller must release with kfree().
4351 *  spec
4352 *	Address of an rbd image specification pointer.  Fully
4353 *	initialized by this function based on parsed options.
4354 *	Caller must release with rbd_spec_put().
4355 *
4356 * The options passed take this form:
4357 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4358 * where:
4359 *  <mon_addrs>
4360 *      A comma-separated list of one or more monitor addresses.
4361 *      A monitor address is an ip address, optionally followed
4362 *      by a port number (separated by a colon).
4363 *        I.e.:  ip1[:port1][,ip2[:port2]...]
4364 *  <options>
4365 *      A comma-separated list of ceph and/or rbd options.
4366 *  <pool_name>
4367 *      The name of the rados pool containing the rbd image.
4368 *  <image_name>
4369 *      The name of the image in that pool to map.
4370 *  <snap_id>
4371 *      An optional snapshot id.  If provided, the mapping will
4372 *      present data from the image at the time that snapshot was
4373 *      created.  The image head is used if no snapshot id is
4374 *      provided.  Snapshot mappings are always read-only.
4375 */
4376static int rbd_add_parse_args(const char *buf,
4377				struct ceph_options **ceph_opts,
4378				struct rbd_options **opts,
4379				struct rbd_spec **rbd_spec)
4380{
4381	size_t len;
4382	char *options;
4383	const char *mon_addrs;
4384	char *snap_name;
4385	size_t mon_addrs_size;
4386	struct rbd_spec *spec = NULL;
4387	struct rbd_options *rbd_opts = NULL;
4388	struct ceph_options *copts;
4389	int ret;
4390
4391	/* The first four tokens are required */
4392
4393	len = next_token(&buf);
4394	if (!len) {
4395		rbd_warn(NULL, "no monitor address(es) provided");
4396		return -EINVAL;
4397	}
4398	mon_addrs = buf;
4399	mon_addrs_size = len + 1;
4400	buf += len;
4401
4402	ret = -EINVAL;
4403	options = dup_token(&buf, NULL);
4404	if (!options)
4405		return -ENOMEM;
4406	if (!*options) {
4407		rbd_warn(NULL, "no options provided");
4408		goto out_err;
4409	}
4410
4411	spec = rbd_spec_alloc();
4412	if (!spec)
4413		goto out_mem;
4414
4415	spec->pool_name = dup_token(&buf, NULL);
4416	if (!spec->pool_name)
4417		goto out_mem;
4418	if (!*spec->pool_name) {
4419		rbd_warn(NULL, "no pool name provided");
4420		goto out_err;
4421	}
4422
4423	spec->image_name = dup_token(&buf, NULL);
4424	if (!spec->image_name)
4425		goto out_mem;
4426	if (!*spec->image_name) {
4427		rbd_warn(NULL, "no image name provided");
4428		goto out_err;
4429	}
4430
4431	/*
4432	 * Snapshot name is optional; default is to use "-"
4433	 * (indicating the head/no snapshot).
4434	 */
4435	len = next_token(&buf);
4436	if (!len) {
4437		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4438		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4439	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
4440		ret = -ENAMETOOLONG;
4441		goto out_err;
4442	}
4443	snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4444	if (!snap_name)
4445		goto out_mem;
4446	*(snap_name + len) = '\0';
4447	spec->snap_name = snap_name;
4448
4449	/* Initialize all rbd options to the defaults */
4450
4451	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4452	if (!rbd_opts)
4453		goto out_mem;
4454
4455	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4456
4457	copts = ceph_parse_options(options, mon_addrs,
4458					mon_addrs + mon_addrs_size - 1,
4459					parse_rbd_opts_token, rbd_opts);
4460	if (IS_ERR(copts)) {
4461		ret = PTR_ERR(copts);
4462		goto out_err;
4463	}
4464	kfree(options);
4465
4466	*ceph_opts = copts;
4467	*opts = rbd_opts;
4468	*rbd_spec = spec;
4469
4470	return 0;
4471out_mem:
4472	ret = -ENOMEM;
4473out_err:
4474	kfree(rbd_opts);
4475	rbd_spec_put(spec);
4476	kfree(options);
4477
4478	return ret;
4479}
4480
4481/*
4482 * An rbd format 2 image has a unique identifier, distinct from the
4483 * name given to it by the user.  Internally, that identifier is
4484 * what's used to specify the names of objects related to the image.
4485 *
4486 * A special "rbd id" object is used to map an rbd image name to its
4487 * id.  If that object doesn't exist, then there is no v2 rbd image
4488 * with the supplied name.
4489 *
4490 * This function will record the given rbd_dev's image_id field if
4491 * it can be determined, and in that case will return 0.  If any
4492 * errors occur a negative errno will be returned and the rbd_dev's
4493 * image_id field will be unchanged (and should be NULL).
4494 */
4495static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4496{
4497	int ret;
4498	size_t size;
4499	char *object_name;
4500	void *response;
4501	char *image_id;
4502
4503	/*
4504	 * When probing a parent image, the image id is already
4505	 * known (and the image name likely is not).  There's no
4506	 * need to fetch the image id again in this case.  We
4507	 * do still need to set the image format though.
4508	 */
4509	if (rbd_dev->spec->image_id) {
4510		rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4511
4512		return 0;
4513	}
4514
4515	/*
4516	 * First, see if the format 2 image id file exists, and if
4517	 * so, get the image's persistent id from it.
4518	 */
4519	size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4520	object_name = kmalloc(size, GFP_NOIO);
4521	if (!object_name)
4522		return -ENOMEM;
4523	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4524	dout("rbd id object name is %s\n", object_name);
4525
4526	/* Response will be an encoded string, which includes a length */
4527
4528	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4529	response = kzalloc(size, GFP_NOIO);
4530	if (!response) {
4531		ret = -ENOMEM;
4532		goto out;
4533	}
4534
4535	/* If it doesn't exist we'll assume it's a format 1 image */
4536
4537	ret = rbd_obj_method_sync(rbd_dev, object_name,
4538				"rbd", "get_id", NULL, 0,
4539				response, RBD_IMAGE_ID_LEN_MAX, NULL);
4540	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4541	if (ret == -ENOENT) {
4542		image_id = kstrdup("", GFP_KERNEL);
4543		ret = image_id ? 0 : -ENOMEM;
4544		if (!ret)
4545			rbd_dev->image_format = 1;
4546	} else if (ret > sizeof (__le32)) {
4547		void *p = response;
4548
4549		image_id = ceph_extract_encoded_string(&p, p + ret,
4550						NULL, GFP_NOIO);
4551		ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4552		if (!ret)
4553			rbd_dev->image_format = 2;
4554	} else {
4555		ret = -EINVAL;
4556	}
4557
4558	if (!ret) {
4559		rbd_dev->spec->image_id = image_id;
4560		dout("image_id is %s\n", image_id);
4561	}
4562out:
4563	kfree(response);
4564	kfree(object_name);
4565
4566	return ret;
4567}
4568
4569static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4570{
4571	int ret;
4572	size_t size;
4573
4574	/* Record the header object name for this rbd image. */
4575
4576	size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4577	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4578	if (!rbd_dev->header_name) {
4579		ret = -ENOMEM;
4580		goto out_err;
4581	}
4582	sprintf(rbd_dev->header_name, "%s%s",
4583		rbd_dev->spec->image_name, RBD_SUFFIX);
4584
4585	/* Populate rbd image metadata */
4586
4587	ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4588	if (ret < 0)
4589		goto out_err;
4590
4591	/* Version 1 images have no parent (no layering) */
4592
4593	rbd_dev->parent_spec = NULL;
4594	rbd_dev->parent_overlap = 0;
4595
4596	dout("discovered version 1 image, header name is %s\n",
4597		rbd_dev->header_name);
4598
4599	return 0;
4600
4601out_err:
4602	kfree(rbd_dev->header_name);
4603	rbd_dev->header_name = NULL;
4604	kfree(rbd_dev->spec->image_id);
4605	rbd_dev->spec->image_id = NULL;
4606
4607	return ret;
4608}
4609
4610static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4611{
4612	size_t size;
4613	int ret;
4614	u64 ver = 0;
4615
4616	/*
4617	 * Image id was filled in by the caller.  Record the header
4618	 * object name for this rbd image.
4619	 */
4620	size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4621	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4622	if (!rbd_dev->header_name)
4623		return -ENOMEM;
4624	sprintf(rbd_dev->header_name, "%s%s",
4625			RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4626
4627	/* Get the size and object order for the image */
4628	ret = rbd_dev_v2_image_size(rbd_dev);
4629	if (ret)
4630		goto out_err;
4631
4632	/* Get the object prefix (a.k.a. block_name) for the image */
4633
4634	ret = rbd_dev_v2_object_prefix(rbd_dev);
4635	if (ret)
4636		goto out_err;
4637
4638	/* Get the and check features for the image */
4639
4640	ret = rbd_dev_v2_features(rbd_dev);
4641	if (ret)
4642		goto out_err;
4643
4644	/* If the image supports layering, get the parent info */
4645
4646	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4647		ret = rbd_dev_v2_parent_info(rbd_dev);
4648		if (ret)
4649			goto out_err;
4650		rbd_warn(rbd_dev, "WARNING: kernel support for "
4651					"layered rbd images is EXPERIMENTAL!");
4652	}
4653
4654	/* If the image supports fancy striping, get its parameters */
4655
4656	if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4657		ret = rbd_dev_v2_striping_info(rbd_dev);
4658		if (ret < 0)
4659			goto out_err;
4660	}
4661
4662	/* crypto and compression type aren't (yet) supported for v2 images */
4663
4664	rbd_dev->header.crypt_type = 0;
4665	rbd_dev->header.comp_type = 0;
4666
4667	/* Get the snapshot context, plus the header version */
4668
4669	ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4670	if (ret)
4671		goto out_err;
4672	rbd_dev->header.obj_version = ver;
4673
4674	dout("discovered version 2 image, header name is %s\n",
4675		rbd_dev->header_name);
4676
4677	return 0;
4678out_err:
4679	rbd_dev->parent_overlap = 0;
4680	rbd_spec_put(rbd_dev->parent_spec);
4681	rbd_dev->parent_spec = NULL;
4682	kfree(rbd_dev->header_name);
4683	rbd_dev->header_name = NULL;
4684	kfree(rbd_dev->header.object_prefix);
4685	rbd_dev->header.object_prefix = NULL;
4686
4687	return ret;
4688}
4689
4690static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4691{
4692	struct rbd_device *parent = NULL;
4693	struct rbd_spec *parent_spec = NULL;
4694	struct rbd_client *rbdc = NULL;
4695	int ret;
4696
4697	/* no need to lock here, as rbd_dev is not registered yet */
4698	ret = rbd_dev_snaps_update(rbd_dev);
4699	if (ret)
4700		return ret;
4701
4702	ret = rbd_dev_probe_update_spec(rbd_dev);
4703	if (ret)
4704		goto err_out_snaps;
4705
4706	ret = rbd_dev_set_mapping(rbd_dev);
4707	if (ret)
4708		goto err_out_snaps;
4709
4710	/* generate unique id: find highest unique id, add one */
4711	rbd_dev_id_get(rbd_dev);
4712
4713	/* Fill in the device name, now that we have its id. */
4714	BUILD_BUG_ON(DEV_NAME_LEN
4715			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4716	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4717
4718	/* Get our block major device number. */
4719
4720	ret = register_blkdev(0, rbd_dev->name);
4721	if (ret < 0)
4722		goto err_out_id;
4723	rbd_dev->major = ret;
4724
4725	/* Set up the blkdev mapping. */
4726
4727	ret = rbd_init_disk(rbd_dev);
4728	if (ret)
4729		goto err_out_blkdev;
4730
4731	ret = rbd_bus_add_dev(rbd_dev);
4732	if (ret)
4733		goto err_out_disk;
4734
4735	/*
4736	 * At this point cleanup in the event of an error is the job
4737	 * of the sysfs code (initiated by rbd_bus_del_dev()).
4738	 */
4739	/* Probe the parent if there is one */
4740
4741	if (rbd_dev->parent_spec) {
4742		/*
4743		 * We need to pass a reference to the client and the
4744		 * parent spec when creating the parent rbd_dev.
4745		 * Images related by parent/child relationships
4746		 * always share both.
4747		 */
4748		parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4749		rbdc = __rbd_get_client(rbd_dev->rbd_client);
4750
4751		parent = rbd_dev_create(rbdc, parent_spec);
4752		if (!parent) {
4753			ret = -ENOMEM;
4754			goto err_out_spec;
4755		}
4756		rbdc = NULL;		/* parent now owns reference */
4757		parent_spec = NULL;	/* parent now owns reference */
4758		ret = rbd_dev_probe(parent);
4759		if (ret < 0)
4760			goto err_out_parent;
4761		rbd_dev->parent = parent;
4762	}
4763
4764	ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4765	if (ret)
4766		goto err_out_bus;
4767
4768	/* Everything's ready.  Announce the disk to the world. */
4769
4770	add_disk(rbd_dev->disk);
4771
4772	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4773		(unsigned long long) rbd_dev->mapping.size);
4774
4775	return ret;
4776
4777err_out_parent:
4778	rbd_dev_destroy(parent);
4779err_out_spec:
4780	rbd_spec_put(parent_spec);
4781	rbd_put_client(rbdc);
4782err_out_bus:
4783	/* this will also clean up rest of rbd_dev stuff */
4784
4785	rbd_bus_del_dev(rbd_dev);
4786
4787	return ret;
4788err_out_disk:
4789	rbd_free_disk(rbd_dev);
4790err_out_blkdev:
4791	unregister_blkdev(rbd_dev->major, rbd_dev->name);
4792err_out_id:
4793	rbd_dev_id_put(rbd_dev);
4794err_out_snaps:
4795	rbd_remove_all_snaps(rbd_dev);
4796
4797	return ret;
4798}
4799
4800/*
4801 * Probe for the existence of the header object for the given rbd
4802 * device.  For format 2 images this includes determining the image
4803 * id.
4804 */
4805static int rbd_dev_probe(struct rbd_device *rbd_dev)
4806{
4807	int ret;
4808
4809	/*
4810	 * Get the id from the image id object.  If it's not a
4811	 * format 2 image, we'll get ENOENT back, and we'll assume
4812	 * it's a format 1 image.
4813	 */
4814	ret = rbd_dev_image_id(rbd_dev);
4815	if (ret)
4816		return ret;
4817	rbd_assert(rbd_dev->spec->image_id);
4818	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4819
4820	if (rbd_dev->image_format == 1)
4821		ret = rbd_dev_v1_probe(rbd_dev);
4822	else
4823		ret = rbd_dev_v2_probe(rbd_dev);
4824	if (ret)
4825		goto out_err;
4826
4827	ret = rbd_dev_probe_finish(rbd_dev);
4828	if (ret)
4829		rbd_header_free(&rbd_dev->header);
4830
4831	return ret;
4832out_err:
4833	kfree(rbd_dev->spec->image_id);
4834	rbd_dev->spec->image_id = NULL;
4835
4836	dout("probe failed, returning %d\n", ret);
4837
4838	return ret;
4839}
4840
4841static ssize_t rbd_add(struct bus_type *bus,
4842		       const char *buf,
4843		       size_t count)
4844{
4845	struct rbd_device *rbd_dev = NULL;
4846	struct ceph_options *ceph_opts = NULL;
4847	struct rbd_options *rbd_opts = NULL;
4848	struct rbd_spec *spec = NULL;
4849	struct rbd_client *rbdc;
4850	struct ceph_osd_client *osdc;
4851	int rc = -ENOMEM;
4852
4853	if (!try_module_get(THIS_MODULE))
4854		return -ENODEV;
4855
4856	/* parse add command */
4857	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4858	if (rc < 0)
4859		goto err_out_module;
4860
4861	rbdc = rbd_get_client(ceph_opts);
4862	if (IS_ERR(rbdc)) {
4863		rc = PTR_ERR(rbdc);
4864		goto err_out_args;
4865	}
4866	ceph_opts = NULL;	/* rbd_dev client now owns this */
4867
4868	/* pick the pool */
4869	osdc = &rbdc->client->osdc;
4870	rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4871	if (rc < 0)
4872		goto err_out_client;
4873	spec->pool_id = (u64)rc;
4874
4875	/* The ceph file layout needs to fit pool id in 32 bits */
4876
4877	if (spec->pool_id > (u64)U32_MAX) {
4878		rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4879				(unsigned long long)spec->pool_id, U32_MAX);
4880		rc = -EIO;
4881		goto err_out_client;
4882	}
4883
4884	rbd_dev = rbd_dev_create(rbdc, spec);
4885	if (!rbd_dev)
4886		goto err_out_client;
4887	rbdc = NULL;		/* rbd_dev now owns this */
4888	spec = NULL;		/* rbd_dev now owns this */
4889
4890	rbd_dev->mapping.read_only = rbd_opts->read_only;
4891	kfree(rbd_opts);
4892	rbd_opts = NULL;	/* done with this */
4893
4894	rc = rbd_dev_probe(rbd_dev);
4895	if (rc < 0)
4896		goto err_out_rbd_dev;
4897
4898	return count;
4899err_out_rbd_dev:
4900	rbd_dev_destroy(rbd_dev);
4901err_out_client:
4902	rbd_put_client(rbdc);
4903err_out_args:
4904	if (ceph_opts)
4905		ceph_destroy_options(ceph_opts);
4906	kfree(rbd_opts);
4907	rbd_spec_put(spec);
4908err_out_module:
4909	module_put(THIS_MODULE);
4910
4911	dout("Error adding device %s\n", buf);
4912
4913	return (ssize_t)rc;
4914}
4915
4916static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4917{
4918	struct list_head *tmp;
4919	struct rbd_device *rbd_dev;
4920
4921	spin_lock(&rbd_dev_list_lock);
4922	list_for_each(tmp, &rbd_dev_list) {
4923		rbd_dev = list_entry(tmp, struct rbd_device, node);
4924		if (rbd_dev->dev_id == dev_id) {
4925			spin_unlock(&rbd_dev_list_lock);
4926			return rbd_dev;
4927		}
4928	}
4929	spin_unlock(&rbd_dev_list_lock);
4930	return NULL;
4931}
4932
4933static void rbd_dev_release(struct device *dev)
4934{
4935	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4936
4937	if (rbd_dev->watch_event)
4938		rbd_dev_header_watch_sync(rbd_dev, 0);
4939
4940	/* clean up and free blkdev */
4941	rbd_free_disk(rbd_dev);
4942	unregister_blkdev(rbd_dev->major, rbd_dev->name);
4943
4944	/* release allocated disk header fields */
4945	rbd_header_free(&rbd_dev->header);
4946
4947	/* done with the id, and with the rbd_dev */
4948	rbd_dev_id_put(rbd_dev);
4949	rbd_assert(rbd_dev->rbd_client != NULL);
4950	rbd_dev_destroy(rbd_dev);
4951
4952	/* release module ref */
4953	module_put(THIS_MODULE);
4954}
4955
4956static void __rbd_remove(struct rbd_device *rbd_dev)
4957{
4958	rbd_remove_all_snaps(rbd_dev);
4959	rbd_bus_del_dev(rbd_dev);
4960}
4961
4962static ssize_t rbd_remove(struct bus_type *bus,
4963			  const char *buf,
4964			  size_t count)
4965{
4966	struct rbd_device *rbd_dev = NULL;
4967	int target_id, rc;
4968	unsigned long ul;
4969	int ret = count;
4970
4971	rc = strict_strtoul(buf, 10, &ul);
4972	if (rc)
4973		return rc;
4974
4975	/* convert to int; abort if we lost anything in the conversion */
4976	target_id = (int) ul;
4977	if (target_id != ul)
4978		return -EINVAL;
4979
4980	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4981
4982	rbd_dev = __rbd_get_dev(target_id);
4983	if (!rbd_dev) {
4984		ret = -ENOENT;
4985		goto done;
4986	}
4987
4988	spin_lock_irq(&rbd_dev->lock);
4989	if (rbd_dev->open_count)
4990		ret = -EBUSY;
4991	else
4992		set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4993	spin_unlock_irq(&rbd_dev->lock);
4994	if (ret < 0)
4995		goto done;
4996
4997	while (rbd_dev->parent_spec) {
4998		struct rbd_device *first = rbd_dev;
4999		struct rbd_device *second = first->parent;
5000		struct rbd_device *third;
5001
5002		/*
5003		 * Follow to the parent with no grandparent and
5004		 * remove it.
5005		 */
5006		while (second && (third = second->parent)) {
5007			first = second;
5008			second = third;
5009		}
5010		__rbd_remove(second);
5011		rbd_spec_put(first->parent_spec);
5012		first->parent_spec = NULL;
5013		first->parent_overlap = 0;
5014		first->parent = NULL;
5015	}
5016	__rbd_remove(rbd_dev);
5017
5018done:
5019	mutex_unlock(&ctl_mutex);
5020
5021	return ret;
5022}
5023
5024/*
5025 * create control files in sysfs
5026 * /sys/bus/rbd/...
5027 */
5028static int rbd_sysfs_init(void)
5029{
5030	int ret;
5031
5032	ret = device_register(&rbd_root_dev);
5033	if (ret < 0)
5034		return ret;
5035
5036	ret = bus_register(&rbd_bus_type);
5037	if (ret < 0)
5038		device_unregister(&rbd_root_dev);
5039
5040	return ret;
5041}
5042
5043static void rbd_sysfs_cleanup(void)
5044{
5045	bus_unregister(&rbd_bus_type);
5046	device_unregister(&rbd_root_dev);
5047}
5048
5049static int __init rbd_init(void)
5050{
5051	int rc;
5052
5053	if (!libceph_compatible(NULL)) {
5054		rbd_warn(NULL, "libceph incompatibility (quitting)");
5055
5056		return -EINVAL;
5057	}
5058	rc = rbd_sysfs_init();
5059	if (rc)
5060		return rc;
5061	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5062	return 0;
5063}
5064
5065static void __exit rbd_exit(void)
5066{
5067	rbd_sysfs_cleanup();
5068}
5069
5070module_init(rbd_init);
5071module_exit(rbd_exit);
5072
5073MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5074MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5075MODULE_DESCRIPTION("rados block device");
5076
5077/* following authorship retained from original osdblk.c */
5078MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5079
5080MODULE_LICENSE("GPL");
5081