rbd.c revision c47f9371545abe2510ac3b66c3fc180921816f65
1/*
2   rbd.c -- Export ceph rados objects as a Linux block device
3
4
5   based on drivers/block/osdblk.c:
6
7   Copyright 2009 Red Hat, Inc.
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with this program; see the file COPYING.  If not, write to
20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24   For usage instructions, please refer to:
25
26                 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
34#include <linux/parser.h>
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44#define RBD_DEBUG	/* Activate rbd_assert() calls */
45
46/*
47 * The basic unit of block I/O is a sector.  It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes.  These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define	SECTOR_SHIFT	9
53#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
54
55/* It might be useful to have these defined elsewhere */
56
57#define	U8_MAX	((u8)	(~0U))
58#define	U16_MAX	((u16)	(~0U))
59#define	U32_MAX	((u32)	(~0U))
60#define	U64_MAX	((u64)	(~0ULL))
61
62#define RBD_DRV_NAME "rbd"
63#define RBD_DRV_NAME_LONG "rbd (rados block device)"
64
65#define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
66
67#define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
68#define RBD_MAX_SNAP_NAME_LEN	\
69			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
70
71#define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
72
73#define RBD_SNAP_HEAD_NAME	"-"
74
75/* This allows a single page to hold an image name sent by OSD */
76#define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
77#define RBD_IMAGE_ID_LEN_MAX	64
78
79#define RBD_OBJ_PREFIX_LEN_MAX	64
80
81/* Feature bits */
82
83#define RBD_FEATURE_LAYERING      1
84
85/* Features supported by this (client software) implementation. */
86
87#define RBD_FEATURES_ALL          (0)
88
89/*
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
94 */
95#define DEV_NAME_LEN		32
96#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
97
98/*
99 * block device image metadata (in-memory version)
100 */
101struct rbd_image_header {
102	/* These four fields never change for a given rbd image */
103	char *object_prefix;
104	u64 features;
105	__u8 obj_order;
106	__u8 crypt_type;
107	__u8 comp_type;
108
109	/* The remaining fields need to be updated occasionally */
110	u64 image_size;
111	struct ceph_snap_context *snapc;
112	char *snap_names;
113	u64 *snap_sizes;
114
115	u64 obj_version;
116};
117
118/*
119 * An rbd image specification.
120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image.  Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
124 *
125 * Each of the id's in an rbd_spec has an associated name.  For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up.  For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
129 *
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image.  This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
135 *
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
139 *
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
142 */
143struct rbd_spec {
144	u64		pool_id;
145	char		*pool_name;
146
147	char		*image_id;
148	char		*image_name;
149
150	u64		snap_id;
151	char		*snap_name;
152
153	struct kref	kref;
154};
155
156/*
157 * an instance of the client.  multiple devices may share an rbd client.
158 */
159struct rbd_client {
160	struct ceph_client	*client;
161	struct kref		kref;
162	struct list_head	node;
163};
164
165struct rbd_img_request;
166typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
167
168#define	BAD_WHICH	U32_MAX		/* Good which or bad which, which? */
169
170struct rbd_obj_request;
171typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
172
173enum obj_request_type {
174	OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
175};
176
177struct rbd_obj_request {
178	const char		*object_name;
179	u64			offset;		/* object start byte */
180	u64			length;		/* bytes from offset */
181
182	struct rbd_img_request	*img_request;
183	struct list_head	links;		/* img_request->obj_requests */
184	u32			which;		/* posn image request list */
185
186	enum obj_request_type	type;
187	union {
188		struct bio	*bio_list;
189		struct {
190			struct page	**pages;
191			u32		page_count;
192		};
193	};
194
195	struct ceph_osd_request	*osd_req;
196
197	u64			xferred;	/* bytes transferred */
198	u64			version;
199	s32			result;
200	atomic_t		done;
201
202	rbd_obj_callback_t	callback;
203	struct completion	completion;
204
205	struct kref		kref;
206};
207
208struct rbd_img_request {
209	struct request		*rq;
210	struct rbd_device	*rbd_dev;
211	u64			offset;	/* starting image byte offset */
212	u64			length;	/* byte count from offset */
213	bool			write_request;	/* false for read */
214	union {
215		struct ceph_snap_context *snapc;	/* for writes */
216		u64		snap_id;		/* for reads */
217	};
218	spinlock_t		completion_lock;/* protects next_completion */
219	u32			next_completion;
220	rbd_img_callback_t	callback;
221
222	u32			obj_request_count;
223	struct list_head	obj_requests;	/* rbd_obj_request structs */
224
225	struct kref		kref;
226};
227
228#define for_each_obj_request(ireq, oreq) \
229	list_for_each_entry(oreq, &(ireq)->obj_requests, links)
230#define for_each_obj_request_from(ireq, oreq) \
231	list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
232#define for_each_obj_request_safe(ireq, oreq, n) \
233	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
234
235struct rbd_snap {
236	struct	device		dev;
237	const char		*name;
238	u64			size;
239	struct list_head	node;
240	u64			id;
241	u64			features;
242};
243
244struct rbd_mapping {
245	u64                     size;
246	u64                     features;
247	bool			read_only;
248};
249
250/*
251 * a single device
252 */
253struct rbd_device {
254	int			dev_id;		/* blkdev unique id */
255
256	int			major;		/* blkdev assigned major */
257	struct gendisk		*disk;		/* blkdev's gendisk and rq */
258
259	u32			image_format;	/* Either 1 or 2 */
260	struct rbd_client	*rbd_client;
261
262	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
263
264	spinlock_t		lock;		/* queue, flags, open_count */
265
266	struct rbd_image_header	header;
267	unsigned long		flags;		/* possibly lock protected */
268	struct rbd_spec		*spec;
269
270	char			*header_name;
271
272	struct ceph_file_layout	layout;
273
274	struct ceph_osd_event   *watch_event;
275	struct rbd_obj_request	*watch_request;
276
277	struct rbd_spec		*parent_spec;
278	u64			parent_overlap;
279
280	/* protects updating the header */
281	struct rw_semaphore     header_rwsem;
282
283	struct rbd_mapping	mapping;
284
285	struct list_head	node;
286
287	/* list of snapshots */
288	struct list_head	snaps;
289
290	/* sysfs related */
291	struct device		dev;
292	unsigned long		open_count;	/* protected by lock */
293};
294
295/*
296 * Flag bits for rbd_dev->flags.  If atomicity is required,
297 * rbd_dev->lock is used to protect access.
298 *
299 * Currently, only the "removing" flag (which is coupled with the
300 * "open_count" field) requires atomic access.
301 */
302enum rbd_dev_flags {
303	RBD_DEV_FLAG_EXISTS,	/* mapped snapshot has not been deleted */
304	RBD_DEV_FLAG_REMOVING,	/* this mapping is being removed */
305};
306
307static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
308
309static LIST_HEAD(rbd_dev_list);    /* devices */
310static DEFINE_SPINLOCK(rbd_dev_list_lock);
311
312static LIST_HEAD(rbd_client_list);		/* clients */
313static DEFINE_SPINLOCK(rbd_client_list_lock);
314
315static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
316static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
317
318static void rbd_dev_release(struct device *dev);
319static void rbd_remove_snap_dev(struct rbd_snap *snap);
320
321static ssize_t rbd_add(struct bus_type *bus, const char *buf,
322		       size_t count);
323static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
324			  size_t count);
325
326static struct bus_attribute rbd_bus_attrs[] = {
327	__ATTR(add, S_IWUSR, NULL, rbd_add),
328	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
329	__ATTR_NULL
330};
331
332static struct bus_type rbd_bus_type = {
333	.name		= "rbd",
334	.bus_attrs	= rbd_bus_attrs,
335};
336
337static void rbd_root_dev_release(struct device *dev)
338{
339}
340
341static struct device rbd_root_dev = {
342	.init_name =    "rbd",
343	.release =      rbd_root_dev_release,
344};
345
346static __printf(2, 3)
347void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
348{
349	struct va_format vaf;
350	va_list args;
351
352	va_start(args, fmt);
353	vaf.fmt = fmt;
354	vaf.va = &args;
355
356	if (!rbd_dev)
357		printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
358	else if (rbd_dev->disk)
359		printk(KERN_WARNING "%s: %s: %pV\n",
360			RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
361	else if (rbd_dev->spec && rbd_dev->spec->image_name)
362		printk(KERN_WARNING "%s: image %s: %pV\n",
363			RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
364	else if (rbd_dev->spec && rbd_dev->spec->image_id)
365		printk(KERN_WARNING "%s: id %s: %pV\n",
366			RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
367	else	/* punt */
368		printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
369			RBD_DRV_NAME, rbd_dev, &vaf);
370	va_end(args);
371}
372
373#ifdef RBD_DEBUG
374#define rbd_assert(expr)						\
375		if (unlikely(!(expr))) {				\
376			printk(KERN_ERR "\nAssertion failure in %s() "	\
377						"at line %d:\n\n"	\
378					"\trbd_assert(%s);\n\n",	\
379					__func__, __LINE__, #expr);	\
380			BUG();						\
381		}
382#else /* !RBD_DEBUG */
383#  define rbd_assert(expr)	((void) 0)
384#endif /* !RBD_DEBUG */
385
386static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
387static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
388
389static int rbd_open(struct block_device *bdev, fmode_t mode)
390{
391	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
392	bool removing = false;
393
394	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
395		return -EROFS;
396
397	spin_lock_irq(&rbd_dev->lock);
398	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
399		removing = true;
400	else
401		rbd_dev->open_count++;
402	spin_unlock_irq(&rbd_dev->lock);
403	if (removing)
404		return -ENOENT;
405
406	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
407	(void) get_device(&rbd_dev->dev);
408	set_device_ro(bdev, rbd_dev->mapping.read_only);
409	mutex_unlock(&ctl_mutex);
410
411	return 0;
412}
413
414static int rbd_release(struct gendisk *disk, fmode_t mode)
415{
416	struct rbd_device *rbd_dev = disk->private_data;
417	unsigned long open_count_before;
418
419	spin_lock_irq(&rbd_dev->lock);
420	open_count_before = rbd_dev->open_count--;
421	spin_unlock_irq(&rbd_dev->lock);
422	rbd_assert(open_count_before > 0);
423
424	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
425	put_device(&rbd_dev->dev);
426	mutex_unlock(&ctl_mutex);
427
428	return 0;
429}
430
431static const struct block_device_operations rbd_bd_ops = {
432	.owner			= THIS_MODULE,
433	.open			= rbd_open,
434	.release		= rbd_release,
435};
436
437/*
438 * Initialize an rbd client instance.
439 * We own *ceph_opts.
440 */
441static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
442{
443	struct rbd_client *rbdc;
444	int ret = -ENOMEM;
445
446	dout("%s:\n", __func__);
447	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
448	if (!rbdc)
449		goto out_opt;
450
451	kref_init(&rbdc->kref);
452	INIT_LIST_HEAD(&rbdc->node);
453
454	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
455
456	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
457	if (IS_ERR(rbdc->client))
458		goto out_mutex;
459	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
460
461	ret = ceph_open_session(rbdc->client);
462	if (ret < 0)
463		goto out_err;
464
465	spin_lock(&rbd_client_list_lock);
466	list_add_tail(&rbdc->node, &rbd_client_list);
467	spin_unlock(&rbd_client_list_lock);
468
469	mutex_unlock(&ctl_mutex);
470	dout("%s: rbdc %p\n", __func__, rbdc);
471
472	return rbdc;
473
474out_err:
475	ceph_destroy_client(rbdc->client);
476out_mutex:
477	mutex_unlock(&ctl_mutex);
478	kfree(rbdc);
479out_opt:
480	if (ceph_opts)
481		ceph_destroy_options(ceph_opts);
482	dout("%s: error %d\n", __func__, ret);
483
484	return ERR_PTR(ret);
485}
486
487/*
488 * Find a ceph client with specific addr and configuration.  If
489 * found, bump its reference count.
490 */
491static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
492{
493	struct rbd_client *client_node;
494	bool found = false;
495
496	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
497		return NULL;
498
499	spin_lock(&rbd_client_list_lock);
500	list_for_each_entry(client_node, &rbd_client_list, node) {
501		if (!ceph_compare_options(ceph_opts, client_node->client)) {
502			kref_get(&client_node->kref);
503			found = true;
504			break;
505		}
506	}
507	spin_unlock(&rbd_client_list_lock);
508
509	return found ? client_node : NULL;
510}
511
512/*
513 * mount options
514 */
515enum {
516	Opt_last_int,
517	/* int args above */
518	Opt_last_string,
519	/* string args above */
520	Opt_read_only,
521	Opt_read_write,
522	/* Boolean args above */
523	Opt_last_bool,
524};
525
526static match_table_t rbd_opts_tokens = {
527	/* int args above */
528	/* string args above */
529	{Opt_read_only, "read_only"},
530	{Opt_read_only, "ro"},		/* Alternate spelling */
531	{Opt_read_write, "read_write"},
532	{Opt_read_write, "rw"},		/* Alternate spelling */
533	/* Boolean args above */
534	{-1, NULL}
535};
536
537struct rbd_options {
538	bool	read_only;
539};
540
541#define RBD_READ_ONLY_DEFAULT	false
542
543static int parse_rbd_opts_token(char *c, void *private)
544{
545	struct rbd_options *rbd_opts = private;
546	substring_t argstr[MAX_OPT_ARGS];
547	int token, intval, ret;
548
549	token = match_token(c, rbd_opts_tokens, argstr);
550	if (token < 0)
551		return -EINVAL;
552
553	if (token < Opt_last_int) {
554		ret = match_int(&argstr[0], &intval);
555		if (ret < 0) {
556			pr_err("bad mount option arg (not int) "
557			       "at '%s'\n", c);
558			return ret;
559		}
560		dout("got int token %d val %d\n", token, intval);
561	} else if (token > Opt_last_int && token < Opt_last_string) {
562		dout("got string token %d val %s\n", token,
563		     argstr[0].from);
564	} else if (token > Opt_last_string && token < Opt_last_bool) {
565		dout("got Boolean token %d\n", token);
566	} else {
567		dout("got token %d\n", token);
568	}
569
570	switch (token) {
571	case Opt_read_only:
572		rbd_opts->read_only = true;
573		break;
574	case Opt_read_write:
575		rbd_opts->read_only = false;
576		break;
577	default:
578		rbd_assert(false);
579		break;
580	}
581	return 0;
582}
583
584/*
585 * Get a ceph client with specific addr and configuration, if one does
586 * not exist create it.
587 */
588static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
589{
590	struct rbd_client *rbdc;
591
592	rbdc = rbd_client_find(ceph_opts);
593	if (rbdc)	/* using an existing client */
594		ceph_destroy_options(ceph_opts);
595	else
596		rbdc = rbd_client_create(ceph_opts);
597
598	return rbdc;
599}
600
601/*
602 * Destroy ceph client
603 *
604 * Caller must hold rbd_client_list_lock.
605 */
606static void rbd_client_release(struct kref *kref)
607{
608	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
609
610	dout("%s: rbdc %p\n", __func__, rbdc);
611	spin_lock(&rbd_client_list_lock);
612	list_del(&rbdc->node);
613	spin_unlock(&rbd_client_list_lock);
614
615	ceph_destroy_client(rbdc->client);
616	kfree(rbdc);
617}
618
619/*
620 * Drop reference to ceph client node. If it's not referenced anymore, release
621 * it.
622 */
623static void rbd_put_client(struct rbd_client *rbdc)
624{
625	if (rbdc)
626		kref_put(&rbdc->kref, rbd_client_release);
627}
628
629static bool rbd_image_format_valid(u32 image_format)
630{
631	return image_format == 1 || image_format == 2;
632}
633
634static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
635{
636	size_t size;
637	u32 snap_count;
638
639	/* The header has to start with the magic rbd header text */
640	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
641		return false;
642
643	/* The bio layer requires at least sector-sized I/O */
644
645	if (ondisk->options.order < SECTOR_SHIFT)
646		return false;
647
648	/* If we use u64 in a few spots we may be able to loosen this */
649
650	if (ondisk->options.order > 8 * sizeof (int) - 1)
651		return false;
652
653	/*
654	 * The size of a snapshot header has to fit in a size_t, and
655	 * that limits the number of snapshots.
656	 */
657	snap_count = le32_to_cpu(ondisk->snap_count);
658	size = SIZE_MAX - sizeof (struct ceph_snap_context);
659	if (snap_count > size / sizeof (__le64))
660		return false;
661
662	/*
663	 * Not only that, but the size of the entire the snapshot
664	 * header must also be representable in a size_t.
665	 */
666	size -= snap_count * sizeof (__le64);
667	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
668		return false;
669
670	return true;
671}
672
673/*
674 * Create a new header structure, translate header format from the on-disk
675 * header.
676 */
677static int rbd_header_from_disk(struct rbd_image_header *header,
678				 struct rbd_image_header_ondisk *ondisk)
679{
680	u32 snap_count;
681	size_t len;
682	size_t size;
683	u32 i;
684
685	memset(header, 0, sizeof (*header));
686
687	snap_count = le32_to_cpu(ondisk->snap_count);
688
689	len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
690	header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
691	if (!header->object_prefix)
692		return -ENOMEM;
693	memcpy(header->object_prefix, ondisk->object_prefix, len);
694	header->object_prefix[len] = '\0';
695
696	if (snap_count) {
697		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
698
699		/* Save a copy of the snapshot names */
700
701		if (snap_names_len > (u64) SIZE_MAX)
702			return -EIO;
703		header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
704		if (!header->snap_names)
705			goto out_err;
706		/*
707		 * Note that rbd_dev_v1_header_read() guarantees
708		 * the ondisk buffer we're working with has
709		 * snap_names_len bytes beyond the end of the
710		 * snapshot id array, this memcpy() is safe.
711		 */
712		memcpy(header->snap_names, &ondisk->snaps[snap_count],
713			snap_names_len);
714
715		/* Record each snapshot's size */
716
717		size = snap_count * sizeof (*header->snap_sizes);
718		header->snap_sizes = kmalloc(size, GFP_KERNEL);
719		if (!header->snap_sizes)
720			goto out_err;
721		for (i = 0; i < snap_count; i++)
722			header->snap_sizes[i] =
723				le64_to_cpu(ondisk->snaps[i].image_size);
724	} else {
725		WARN_ON(ondisk->snap_names_len);
726		header->snap_names = NULL;
727		header->snap_sizes = NULL;
728	}
729
730	header->features = 0;	/* No features support in v1 images */
731	header->obj_order = ondisk->options.order;
732	header->crypt_type = ondisk->options.crypt_type;
733	header->comp_type = ondisk->options.comp_type;
734
735	/* Allocate and fill in the snapshot context */
736
737	header->image_size = le64_to_cpu(ondisk->image_size);
738	size = sizeof (struct ceph_snap_context);
739	size += snap_count * sizeof (header->snapc->snaps[0]);
740	header->snapc = kzalloc(size, GFP_KERNEL);
741	if (!header->snapc)
742		goto out_err;
743
744	atomic_set(&header->snapc->nref, 1);
745	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
746	header->snapc->num_snaps = snap_count;
747	for (i = 0; i < snap_count; i++)
748		header->snapc->snaps[i] =
749			le64_to_cpu(ondisk->snaps[i].id);
750
751	return 0;
752
753out_err:
754	kfree(header->snap_sizes);
755	header->snap_sizes = NULL;
756	kfree(header->snap_names);
757	header->snap_names = NULL;
758	kfree(header->object_prefix);
759	header->object_prefix = NULL;
760
761	return -ENOMEM;
762}
763
764static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
765{
766	struct rbd_snap *snap;
767
768	if (snap_id == CEPH_NOSNAP)
769		return RBD_SNAP_HEAD_NAME;
770
771	list_for_each_entry(snap, &rbd_dev->snaps, node)
772		if (snap_id == snap->id)
773			return snap->name;
774
775	return NULL;
776}
777
778static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
779{
780
781	struct rbd_snap *snap;
782
783	list_for_each_entry(snap, &rbd_dev->snaps, node) {
784		if (!strcmp(snap_name, snap->name)) {
785			rbd_dev->spec->snap_id = snap->id;
786			rbd_dev->mapping.size = snap->size;
787			rbd_dev->mapping.features = snap->features;
788
789			return 0;
790		}
791	}
792
793	return -ENOENT;
794}
795
796static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
797{
798	int ret;
799
800	if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
801		    sizeof (RBD_SNAP_HEAD_NAME))) {
802		rbd_dev->spec->snap_id = CEPH_NOSNAP;
803		rbd_dev->mapping.size = rbd_dev->header.image_size;
804		rbd_dev->mapping.features = rbd_dev->header.features;
805		ret = 0;
806	} else {
807		ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
808		if (ret < 0)
809			goto done;
810		rbd_dev->mapping.read_only = true;
811	}
812	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
813
814done:
815	return ret;
816}
817
818static void rbd_header_free(struct rbd_image_header *header)
819{
820	kfree(header->object_prefix);
821	header->object_prefix = NULL;
822	kfree(header->snap_sizes);
823	header->snap_sizes = NULL;
824	kfree(header->snap_names);
825	header->snap_names = NULL;
826	ceph_put_snap_context(header->snapc);
827	header->snapc = NULL;
828}
829
830static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
831{
832	char *name;
833	u64 segment;
834	int ret;
835
836	name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
837	if (!name)
838		return NULL;
839	segment = offset >> rbd_dev->header.obj_order;
840	ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
841			rbd_dev->header.object_prefix, segment);
842	if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
843		pr_err("error formatting segment name for #%llu (%d)\n",
844			segment, ret);
845		kfree(name);
846		name = NULL;
847	}
848
849	return name;
850}
851
852static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
853{
854	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
855
856	return offset & (segment_size - 1);
857}
858
859static u64 rbd_segment_length(struct rbd_device *rbd_dev,
860				u64 offset, u64 length)
861{
862	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
863
864	offset &= segment_size - 1;
865
866	rbd_assert(length <= U64_MAX - offset);
867	if (offset + length > segment_size)
868		length = segment_size - offset;
869
870	return length;
871}
872
873/*
874 * returns the size of an object in the image
875 */
876static u64 rbd_obj_bytes(struct rbd_image_header *header)
877{
878	return 1 << header->obj_order;
879}
880
881/*
882 * bio helpers
883 */
884
885static void bio_chain_put(struct bio *chain)
886{
887	struct bio *tmp;
888
889	while (chain) {
890		tmp = chain;
891		chain = chain->bi_next;
892		bio_put(tmp);
893	}
894}
895
896/*
897 * zeros a bio chain, starting at specific offset
898 */
899static void zero_bio_chain(struct bio *chain, int start_ofs)
900{
901	struct bio_vec *bv;
902	unsigned long flags;
903	void *buf;
904	int i;
905	int pos = 0;
906
907	while (chain) {
908		bio_for_each_segment(bv, chain, i) {
909			if (pos + bv->bv_len > start_ofs) {
910				int remainder = max(start_ofs - pos, 0);
911				buf = bvec_kmap_irq(bv, &flags);
912				memset(buf + remainder, 0,
913				       bv->bv_len - remainder);
914				bvec_kunmap_irq(buf, &flags);
915			}
916			pos += bv->bv_len;
917		}
918
919		chain = chain->bi_next;
920	}
921}
922
923/*
924 * Clone a portion of a bio, starting at the given byte offset
925 * and continuing for the number of bytes indicated.
926 */
927static struct bio *bio_clone_range(struct bio *bio_src,
928					unsigned int offset,
929					unsigned int len,
930					gfp_t gfpmask)
931{
932	struct bio_vec *bv;
933	unsigned int resid;
934	unsigned short idx;
935	unsigned int voff;
936	unsigned short end_idx;
937	unsigned short vcnt;
938	struct bio *bio;
939
940	/* Handle the easy case for the caller */
941
942	if (!offset && len == bio_src->bi_size)
943		return bio_clone(bio_src, gfpmask);
944
945	if (WARN_ON_ONCE(!len))
946		return NULL;
947	if (WARN_ON_ONCE(len > bio_src->bi_size))
948		return NULL;
949	if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
950		return NULL;
951
952	/* Find first affected segment... */
953
954	resid = offset;
955	__bio_for_each_segment(bv, bio_src, idx, 0) {
956		if (resid < bv->bv_len)
957			break;
958		resid -= bv->bv_len;
959	}
960	voff = resid;
961
962	/* ...and the last affected segment */
963
964	resid += len;
965	__bio_for_each_segment(bv, bio_src, end_idx, idx) {
966		if (resid <= bv->bv_len)
967			break;
968		resid -= bv->bv_len;
969	}
970	vcnt = end_idx - idx + 1;
971
972	/* Build the clone */
973
974	bio = bio_alloc(gfpmask, (unsigned int) vcnt);
975	if (!bio)
976		return NULL;	/* ENOMEM */
977
978	bio->bi_bdev = bio_src->bi_bdev;
979	bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
980	bio->bi_rw = bio_src->bi_rw;
981	bio->bi_flags |= 1 << BIO_CLONED;
982
983	/*
984	 * Copy over our part of the bio_vec, then update the first
985	 * and last (or only) entries.
986	 */
987	memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
988			vcnt * sizeof (struct bio_vec));
989	bio->bi_io_vec[0].bv_offset += voff;
990	if (vcnt > 1) {
991		bio->bi_io_vec[0].bv_len -= voff;
992		bio->bi_io_vec[vcnt - 1].bv_len = resid;
993	} else {
994		bio->bi_io_vec[0].bv_len = len;
995	}
996
997	bio->bi_vcnt = vcnt;
998	bio->bi_size = len;
999	bio->bi_idx = 0;
1000
1001	return bio;
1002}
1003
1004/*
1005 * Clone a portion of a bio chain, starting at the given byte offset
1006 * into the first bio in the source chain and continuing for the
1007 * number of bytes indicated.  The result is another bio chain of
1008 * exactly the given length, or a null pointer on error.
1009 *
1010 * The bio_src and offset parameters are both in-out.  On entry they
1011 * refer to the first source bio and the offset into that bio where
1012 * the start of data to be cloned is located.
1013 *
1014 * On return, bio_src is updated to refer to the bio in the source
1015 * chain that contains first un-cloned byte, and *offset will
1016 * contain the offset of that byte within that bio.
1017 */
1018static struct bio *bio_chain_clone_range(struct bio **bio_src,
1019					unsigned int *offset,
1020					unsigned int len,
1021					gfp_t gfpmask)
1022{
1023	struct bio *bi = *bio_src;
1024	unsigned int off = *offset;
1025	struct bio *chain = NULL;
1026	struct bio **end;
1027
1028	/* Build up a chain of clone bios up to the limit */
1029
1030	if (!bi || off >= bi->bi_size || !len)
1031		return NULL;		/* Nothing to clone */
1032
1033	end = &chain;
1034	while (len) {
1035		unsigned int bi_size;
1036		struct bio *bio;
1037
1038		if (!bi) {
1039			rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1040			goto out_err;	/* EINVAL; ran out of bio's */
1041		}
1042		bi_size = min_t(unsigned int, bi->bi_size - off, len);
1043		bio = bio_clone_range(bi, off, bi_size, gfpmask);
1044		if (!bio)
1045			goto out_err;	/* ENOMEM */
1046
1047		*end = bio;
1048		end = &bio->bi_next;
1049
1050		off += bi_size;
1051		if (off == bi->bi_size) {
1052			bi = bi->bi_next;
1053			off = 0;
1054		}
1055		len -= bi_size;
1056	}
1057	*bio_src = bi;
1058	*offset = off;
1059
1060	return chain;
1061out_err:
1062	bio_chain_put(chain);
1063
1064	return NULL;
1065}
1066
1067static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1068{
1069	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1070		atomic_read(&obj_request->kref.refcount));
1071	kref_get(&obj_request->kref);
1072}
1073
1074static void rbd_obj_request_destroy(struct kref *kref);
1075static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1076{
1077	rbd_assert(obj_request != NULL);
1078	dout("%s: obj %p (was %d)\n", __func__, obj_request,
1079		atomic_read(&obj_request->kref.refcount));
1080	kref_put(&obj_request->kref, rbd_obj_request_destroy);
1081}
1082
1083static void rbd_img_request_get(struct rbd_img_request *img_request)
1084{
1085	dout("%s: img %p (was %d)\n", __func__, img_request,
1086		atomic_read(&img_request->kref.refcount));
1087	kref_get(&img_request->kref);
1088}
1089
1090static void rbd_img_request_destroy(struct kref *kref);
1091static void rbd_img_request_put(struct rbd_img_request *img_request)
1092{
1093	rbd_assert(img_request != NULL);
1094	dout("%s: img %p (was %d)\n", __func__, img_request,
1095		atomic_read(&img_request->kref.refcount));
1096	kref_put(&img_request->kref, rbd_img_request_destroy);
1097}
1098
1099static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1100					struct rbd_obj_request *obj_request)
1101{
1102	rbd_assert(obj_request->img_request == NULL);
1103
1104	rbd_obj_request_get(obj_request);
1105	obj_request->img_request = img_request;
1106	obj_request->which = img_request->obj_request_count;
1107	rbd_assert(obj_request->which != BAD_WHICH);
1108	img_request->obj_request_count++;
1109	list_add_tail(&obj_request->links, &img_request->obj_requests);
1110	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1111		obj_request->which);
1112}
1113
1114static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1115					struct rbd_obj_request *obj_request)
1116{
1117	rbd_assert(obj_request->which != BAD_WHICH);
1118
1119	dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1120		obj_request->which);
1121	list_del(&obj_request->links);
1122	rbd_assert(img_request->obj_request_count > 0);
1123	img_request->obj_request_count--;
1124	rbd_assert(obj_request->which == img_request->obj_request_count);
1125	obj_request->which = BAD_WHICH;
1126	rbd_assert(obj_request->img_request == img_request);
1127	obj_request->img_request = NULL;
1128	obj_request->callback = NULL;
1129	rbd_obj_request_put(obj_request);
1130}
1131
1132static bool obj_request_type_valid(enum obj_request_type type)
1133{
1134	switch (type) {
1135	case OBJ_REQUEST_NODATA:
1136	case OBJ_REQUEST_BIO:
1137	case OBJ_REQUEST_PAGES:
1138		return true;
1139	default:
1140		return false;
1141	}
1142}
1143
1144static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
1145{
1146	struct ceph_osd_req_op *op;
1147	va_list args;
1148	size_t size;
1149
1150	op = kzalloc(sizeof (*op), GFP_NOIO);
1151	if (!op)
1152		return NULL;
1153	op->op = opcode;
1154	va_start(args, opcode);
1155	switch (opcode) {
1156	case CEPH_OSD_OP_READ:
1157	case CEPH_OSD_OP_WRITE:
1158		/* rbd_osd_req_op_create(READ, offset, length) */
1159		/* rbd_osd_req_op_create(WRITE, offset, length) */
1160		op->extent.offset = va_arg(args, u64);
1161		op->extent.length = va_arg(args, u64);
1162		if (opcode == CEPH_OSD_OP_WRITE)
1163			op->payload_len = op->extent.length;
1164		break;
1165	case CEPH_OSD_OP_STAT:
1166		break;
1167	case CEPH_OSD_OP_CALL:
1168		/* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1169		op->cls.class_name = va_arg(args, char *);
1170		size = strlen(op->cls.class_name);
1171		rbd_assert(size <= (size_t) U8_MAX);
1172		op->cls.class_len = size;
1173		op->payload_len = size;
1174
1175		op->cls.method_name = va_arg(args, char *);
1176		size = strlen(op->cls.method_name);
1177		rbd_assert(size <= (size_t) U8_MAX);
1178		op->cls.method_len = size;
1179		op->payload_len += size;
1180
1181		op->cls.argc = 0;
1182		op->cls.indata = va_arg(args, void *);
1183		size = va_arg(args, size_t);
1184		rbd_assert(size <= (size_t) U32_MAX);
1185		op->cls.indata_len = (u32) size;
1186		op->payload_len += size;
1187		break;
1188	case CEPH_OSD_OP_NOTIFY_ACK:
1189	case CEPH_OSD_OP_WATCH:
1190		/* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1191		/* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1192		op->watch.cookie = va_arg(args, u64);
1193		op->watch.ver = va_arg(args, u64);
1194		op->watch.ver = cpu_to_le64(op->watch.ver);
1195		if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int))
1196			op->watch.flag = (u8) 1;
1197		break;
1198	default:
1199		rbd_warn(NULL, "unsupported opcode %hu\n", opcode);
1200		kfree(op);
1201		op = NULL;
1202		break;
1203	}
1204	va_end(args);
1205
1206	return op;
1207}
1208
1209static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
1210{
1211	kfree(op);
1212}
1213
1214static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1215				struct rbd_obj_request *obj_request)
1216{
1217	dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1218
1219	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1220}
1221
1222static void rbd_img_request_complete(struct rbd_img_request *img_request)
1223{
1224	dout("%s: img %p\n", __func__, img_request);
1225	if (img_request->callback)
1226		img_request->callback(img_request);
1227	else
1228		rbd_img_request_put(img_request);
1229}
1230
1231/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1232
1233static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1234{
1235	dout("%s: obj %p\n", __func__, obj_request);
1236
1237	return wait_for_completion_interruptible(&obj_request->completion);
1238}
1239
1240static void obj_request_done_init(struct rbd_obj_request *obj_request)
1241{
1242	atomic_set(&obj_request->done, 0);
1243	smp_wmb();
1244}
1245
1246static void obj_request_done_set(struct rbd_obj_request *obj_request)
1247{
1248	int done;
1249
1250	done = atomic_inc_return(&obj_request->done);
1251	if (done > 1) {
1252		struct rbd_img_request *img_request = obj_request->img_request;
1253		struct rbd_device *rbd_dev;
1254
1255		rbd_dev = img_request ? img_request->rbd_dev : NULL;
1256		rbd_warn(rbd_dev, "obj_request %p was already done\n",
1257			obj_request);
1258	}
1259}
1260
1261static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1262{
1263	smp_mb();
1264	return atomic_read(&obj_request->done) != 0;
1265}
1266
1267static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1268{
1269	dout("%s: obj %p cb %p\n", __func__, obj_request,
1270		obj_request->callback);
1271	if (obj_request->callback)
1272		obj_request->callback(obj_request);
1273	else
1274		complete_all(&obj_request->completion);
1275}
1276
1277static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1278{
1279	dout("%s: obj %p\n", __func__, obj_request);
1280	obj_request_done_set(obj_request);
1281}
1282
1283static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1284{
1285
1286	dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1287		obj_request->result, obj_request->xferred, obj_request->length);
1288	if (obj_request->result == (s32) -ENOENT) {
1289		zero_bio_chain(obj_request->bio_list, 0);
1290		obj_request->result = 0;
1291	} else if (obj_request->xferred < obj_request->length &&
1292			!obj_request->result) {
1293		zero_bio_chain(obj_request->bio_list, obj_request->xferred);
1294		obj_request->xferred = obj_request->length;
1295	}
1296	obj_request_done_set(obj_request);
1297}
1298
1299static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1300{
1301	dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
1302		obj_request->result, obj_request->xferred, obj_request->length);
1303
1304	/* A short write really shouldn't occur.  Warn if we see one */
1305
1306	if (obj_request->xferred != obj_request->length) {
1307		struct rbd_img_request *img_request = obj_request->img_request;
1308		struct rbd_device *rbd_dev;
1309
1310		rbd_dev = img_request ? img_request->rbd_dev : NULL;
1311		rbd_warn(rbd_dev, "wrote %llu want %llu\n",
1312			obj_request->xferred, obj_request->length);
1313	}
1314
1315	obj_request_done_set(obj_request);
1316}
1317
1318/*
1319 * For a simple stat call there's nothing to do.  We'll do more if
1320 * this is part of a write sequence for a layered image.
1321 */
1322static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1323{
1324	dout("%s: obj %p\n", __func__, obj_request);
1325	obj_request_done_set(obj_request);
1326}
1327
1328static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1329				struct ceph_msg *msg)
1330{
1331	struct rbd_obj_request *obj_request = osd_req->r_priv;
1332	struct ceph_osd_reply_head *reply_head;
1333	struct ceph_osd_op *op;
1334	u32 num_ops;
1335	u16 opcode;
1336
1337	dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1338	rbd_assert(osd_req == obj_request->osd_req);
1339	rbd_assert(!!obj_request->img_request ^
1340				(obj_request->which == BAD_WHICH));
1341
1342	reply_head = msg->front.iov_base;
1343	obj_request->result = (s32) le32_to_cpu(reply_head->result);
1344	obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1345
1346	num_ops = le32_to_cpu(reply_head->num_ops);
1347	WARN_ON(num_ops != 1);	/* For now */
1348
1349	/*
1350	 * We support a 64-bit length, but ultimately it has to be
1351	 * passed to blk_end_request(), which takes an unsigned int.
1352	 */
1353	op = &reply_head->ops[0];
1354	obj_request->xferred = le64_to_cpu(op->extent.length);
1355	rbd_assert(obj_request->xferred < (u64) UINT_MAX);
1356
1357	opcode = le16_to_cpu(op->op);
1358	switch (opcode) {
1359	case CEPH_OSD_OP_READ:
1360		rbd_osd_read_callback(obj_request);
1361		break;
1362	case CEPH_OSD_OP_WRITE:
1363		rbd_osd_write_callback(obj_request);
1364		break;
1365	case CEPH_OSD_OP_STAT:
1366		rbd_osd_stat_callback(obj_request);
1367		break;
1368	case CEPH_OSD_OP_CALL:
1369	case CEPH_OSD_OP_NOTIFY_ACK:
1370	case CEPH_OSD_OP_WATCH:
1371		rbd_osd_trivial_callback(obj_request);
1372		break;
1373	default:
1374		rbd_warn(NULL, "%s: unsupported op %hu\n",
1375			obj_request->object_name, (unsigned short) opcode);
1376		break;
1377	}
1378
1379	if (obj_request_done_test(obj_request))
1380		rbd_obj_request_complete(obj_request);
1381}
1382
1383static struct ceph_osd_request *rbd_osd_req_create(
1384					struct rbd_device *rbd_dev,
1385					bool write_request,
1386					struct rbd_obj_request *obj_request,
1387					struct ceph_osd_req_op *op)
1388{
1389	struct rbd_img_request *img_request = obj_request->img_request;
1390	struct ceph_snap_context *snapc = NULL;
1391	struct ceph_osd_client *osdc;
1392	struct ceph_osd_request *osd_req;
1393	struct timespec now;
1394	struct timespec *mtime;
1395	u64 snap_id = CEPH_NOSNAP;
1396	u64 offset = obj_request->offset;
1397	u64 length = obj_request->length;
1398
1399	if (img_request) {
1400		rbd_assert(img_request->write_request == write_request);
1401		if (img_request->write_request)
1402			snapc = img_request->snapc;
1403		else
1404			snap_id = img_request->snap_id;
1405	}
1406
1407	/* Allocate and initialize the request, for the single op */
1408
1409	osdc = &rbd_dev->rbd_client->client->osdc;
1410	osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1411	if (!osd_req)
1412		return NULL;	/* ENOMEM */
1413
1414	rbd_assert(obj_request_type_valid(obj_request->type));
1415	switch (obj_request->type) {
1416	case OBJ_REQUEST_NODATA:
1417		break;		/* Nothing to do */
1418	case OBJ_REQUEST_BIO:
1419		rbd_assert(obj_request->bio_list != NULL);
1420		osd_req->r_bio = obj_request->bio_list;
1421		break;
1422	case OBJ_REQUEST_PAGES:
1423		osd_req->r_pages = obj_request->pages;
1424		osd_req->r_num_pages = obj_request->page_count;
1425		osd_req->r_page_alignment = offset & ~PAGE_MASK;
1426		break;
1427	}
1428
1429	if (write_request) {
1430		osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1431		now = CURRENT_TIME;
1432		mtime = &now;
1433	} else {
1434		osd_req->r_flags = CEPH_OSD_FLAG_READ;
1435		mtime = NULL;	/* not needed for reads */
1436		offset = 0;	/* These are not used... */
1437		length = 0;	/* ...for osd read requests */
1438	}
1439
1440	osd_req->r_callback = rbd_osd_req_callback;
1441	osd_req->r_priv = obj_request;
1442
1443	osd_req->r_oid_len = strlen(obj_request->object_name);
1444	rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1445	memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1446
1447	osd_req->r_file_layout = rbd_dev->layout;	/* struct */
1448
1449	/* osd_req will get its own reference to snapc (if non-null) */
1450
1451	ceph_osdc_build_request(osd_req, offset, length, 1, op,
1452				snapc, snap_id, mtime);
1453
1454	return osd_req;
1455}
1456
1457static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1458{
1459	ceph_osdc_put_request(osd_req);
1460}
1461
1462/* object_name is assumed to be a non-null pointer and NUL-terminated */
1463
1464static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1465						u64 offset, u64 length,
1466						enum obj_request_type type)
1467{
1468	struct rbd_obj_request *obj_request;
1469	size_t size;
1470	char *name;
1471
1472	rbd_assert(obj_request_type_valid(type));
1473
1474	size = strlen(object_name) + 1;
1475	obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1476	if (!obj_request)
1477		return NULL;
1478
1479	name = (char *)(obj_request + 1);
1480	obj_request->object_name = memcpy(name, object_name, size);
1481	obj_request->offset = offset;
1482	obj_request->length = length;
1483	obj_request->which = BAD_WHICH;
1484	obj_request->type = type;
1485	INIT_LIST_HEAD(&obj_request->links);
1486	obj_request_done_init(obj_request);
1487	init_completion(&obj_request->completion);
1488	kref_init(&obj_request->kref);
1489
1490	dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1491		offset, length, (int)type, obj_request);
1492
1493	return obj_request;
1494}
1495
1496static void rbd_obj_request_destroy(struct kref *kref)
1497{
1498	struct rbd_obj_request *obj_request;
1499
1500	obj_request = container_of(kref, struct rbd_obj_request, kref);
1501
1502	dout("%s: obj %p\n", __func__, obj_request);
1503
1504	rbd_assert(obj_request->img_request == NULL);
1505	rbd_assert(obj_request->which == BAD_WHICH);
1506
1507	if (obj_request->osd_req)
1508		rbd_osd_req_destroy(obj_request->osd_req);
1509
1510	rbd_assert(obj_request_type_valid(obj_request->type));
1511	switch (obj_request->type) {
1512	case OBJ_REQUEST_NODATA:
1513		break;		/* Nothing to do */
1514	case OBJ_REQUEST_BIO:
1515		if (obj_request->bio_list)
1516			bio_chain_put(obj_request->bio_list);
1517		break;
1518	case OBJ_REQUEST_PAGES:
1519		if (obj_request->pages)
1520			ceph_release_page_vector(obj_request->pages,
1521						obj_request->page_count);
1522		break;
1523	}
1524
1525	kfree(obj_request);
1526}
1527
1528/*
1529 * Caller is responsible for filling in the list of object requests
1530 * that comprises the image request, and the Linux request pointer
1531 * (if there is one).
1532 */
1533static struct rbd_img_request *rbd_img_request_create(
1534					struct rbd_device *rbd_dev,
1535					u64 offset, u64 length,
1536					bool write_request)
1537{
1538	struct rbd_img_request *img_request;
1539	struct ceph_snap_context *snapc = NULL;
1540
1541	img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1542	if (!img_request)
1543		return NULL;
1544
1545	if (write_request) {
1546		down_read(&rbd_dev->header_rwsem);
1547		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1548		up_read(&rbd_dev->header_rwsem);
1549		if (WARN_ON(!snapc)) {
1550			kfree(img_request);
1551			return NULL;	/* Shouldn't happen */
1552		}
1553	}
1554
1555	img_request->rq = NULL;
1556	img_request->rbd_dev = rbd_dev;
1557	img_request->offset = offset;
1558	img_request->length = length;
1559	img_request->write_request = write_request;
1560	if (write_request)
1561		img_request->snapc = snapc;
1562	else
1563		img_request->snap_id = rbd_dev->spec->snap_id;
1564	spin_lock_init(&img_request->completion_lock);
1565	img_request->next_completion = 0;
1566	img_request->callback = NULL;
1567	img_request->obj_request_count = 0;
1568	INIT_LIST_HEAD(&img_request->obj_requests);
1569	kref_init(&img_request->kref);
1570
1571	rbd_img_request_get(img_request);	/* Avoid a warning */
1572	rbd_img_request_put(img_request);	/* TEMPORARY */
1573
1574	dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1575		write_request ? "write" : "read", offset, length,
1576		img_request);
1577
1578	return img_request;
1579}
1580
1581static void rbd_img_request_destroy(struct kref *kref)
1582{
1583	struct rbd_img_request *img_request;
1584	struct rbd_obj_request *obj_request;
1585	struct rbd_obj_request *next_obj_request;
1586
1587	img_request = container_of(kref, struct rbd_img_request, kref);
1588
1589	dout("%s: img %p\n", __func__, img_request);
1590
1591	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1592		rbd_img_obj_request_del(img_request, obj_request);
1593	rbd_assert(img_request->obj_request_count == 0);
1594
1595	if (img_request->write_request)
1596		ceph_put_snap_context(img_request->snapc);
1597
1598	kfree(img_request);
1599}
1600
1601static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
1602					struct bio *bio_list)
1603{
1604	struct rbd_device *rbd_dev = img_request->rbd_dev;
1605	struct rbd_obj_request *obj_request = NULL;
1606	struct rbd_obj_request *next_obj_request;
1607	unsigned int bio_offset;
1608	u64 image_offset;
1609	u64 resid;
1610	u16 opcode;
1611
1612	dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
1613
1614	opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
1615					      : CEPH_OSD_OP_READ;
1616	bio_offset = 0;
1617	image_offset = img_request->offset;
1618	rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
1619	resid = img_request->length;
1620	rbd_assert(resid > 0);
1621	while (resid) {
1622		const char *object_name;
1623		unsigned int clone_size;
1624		struct ceph_osd_req_op *op;
1625		u64 offset;
1626		u64 length;
1627
1628		object_name = rbd_segment_name(rbd_dev, image_offset);
1629		if (!object_name)
1630			goto out_unwind;
1631		offset = rbd_segment_offset(rbd_dev, image_offset);
1632		length = rbd_segment_length(rbd_dev, image_offset, resid);
1633		obj_request = rbd_obj_request_create(object_name,
1634						offset, length,
1635						OBJ_REQUEST_BIO);
1636		kfree(object_name);	/* object request has its own copy */
1637		if (!obj_request)
1638			goto out_unwind;
1639
1640		rbd_assert(length <= (u64) UINT_MAX);
1641		clone_size = (unsigned int) length;
1642		obj_request->bio_list = bio_chain_clone_range(&bio_list,
1643						&bio_offset, clone_size,
1644						GFP_ATOMIC);
1645		if (!obj_request->bio_list)
1646			goto out_partial;
1647
1648		/*
1649		 * Build up the op to use in building the osd
1650		 * request.  Note that the contents of the op are
1651		 * copied by rbd_osd_req_create().
1652		 */
1653		op = rbd_osd_req_op_create(opcode, offset, length);
1654		if (!op)
1655			goto out_partial;
1656		obj_request->osd_req = rbd_osd_req_create(rbd_dev,
1657						img_request->write_request,
1658						obj_request, op);
1659		rbd_osd_req_op_destroy(op);
1660		if (!obj_request->osd_req)
1661			goto out_partial;
1662		/* status and version are initially zero-filled */
1663
1664		rbd_img_obj_request_add(img_request, obj_request);
1665
1666		image_offset += length;
1667		resid -= length;
1668	}
1669
1670	return 0;
1671
1672out_partial:
1673	rbd_obj_request_put(obj_request);
1674out_unwind:
1675	for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1676		rbd_obj_request_put(obj_request);
1677
1678	return -ENOMEM;
1679}
1680
1681static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1682{
1683	struct rbd_img_request *img_request;
1684	u32 which = obj_request->which;
1685	bool more = true;
1686
1687	img_request = obj_request->img_request;
1688
1689	dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1690	rbd_assert(img_request != NULL);
1691	rbd_assert(img_request->rq != NULL);
1692	rbd_assert(img_request->obj_request_count > 0);
1693	rbd_assert(which != BAD_WHICH);
1694	rbd_assert(which < img_request->obj_request_count);
1695	rbd_assert(which >= img_request->next_completion);
1696
1697	spin_lock_irq(&img_request->completion_lock);
1698	if (which != img_request->next_completion)
1699		goto out;
1700
1701	for_each_obj_request_from(img_request, obj_request) {
1702		unsigned int xferred;
1703		int result;
1704
1705		rbd_assert(more);
1706		rbd_assert(which < img_request->obj_request_count);
1707
1708		if (!obj_request_done_test(obj_request))
1709			break;
1710
1711		rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
1712		xferred = (unsigned int) obj_request->xferred;
1713		result = (int) obj_request->result;
1714		if (result)
1715			rbd_warn(NULL, "obj_request %s result %d xferred %u\n",
1716				img_request->write_request ? "write" : "read",
1717				result, xferred);
1718
1719		more = blk_end_request(img_request->rq, result, xferred);
1720		which++;
1721	}
1722	rbd_assert(more ^ (which == img_request->obj_request_count));
1723	img_request->next_completion = which;
1724out:
1725	spin_unlock_irq(&img_request->completion_lock);
1726
1727	if (!more)
1728		rbd_img_request_complete(img_request);
1729}
1730
1731static int rbd_img_request_submit(struct rbd_img_request *img_request)
1732{
1733	struct rbd_device *rbd_dev = img_request->rbd_dev;
1734	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1735	struct rbd_obj_request *obj_request;
1736
1737	dout("%s: img %p\n", __func__, img_request);
1738	for_each_obj_request(img_request, obj_request) {
1739		int ret;
1740
1741		obj_request->callback = rbd_img_obj_callback;
1742		ret = rbd_obj_request_submit(osdc, obj_request);
1743		if (ret)
1744			return ret;
1745		/*
1746		 * The image request has its own reference to each
1747		 * of its object requests, so we can safely drop the
1748		 * initial one here.
1749		 */
1750		rbd_obj_request_put(obj_request);
1751	}
1752
1753	return 0;
1754}
1755
1756static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
1757				   u64 ver, u64 notify_id)
1758{
1759	struct rbd_obj_request *obj_request;
1760	struct ceph_osd_req_op *op;
1761	struct ceph_osd_client *osdc;
1762	int ret;
1763
1764	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1765							OBJ_REQUEST_NODATA);
1766	if (!obj_request)
1767		return -ENOMEM;
1768
1769	ret = -ENOMEM;
1770	op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
1771	if (!op)
1772		goto out;
1773	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1774						obj_request, op);
1775	rbd_osd_req_op_destroy(op);
1776	if (!obj_request->osd_req)
1777		goto out;
1778
1779	osdc = &rbd_dev->rbd_client->client->osdc;
1780	obj_request->callback = rbd_obj_request_put;
1781	ret = rbd_obj_request_submit(osdc, obj_request);
1782out:
1783	if (ret)
1784		rbd_obj_request_put(obj_request);
1785
1786	return ret;
1787}
1788
1789static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1790{
1791	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1792	u64 hver;
1793	int rc;
1794
1795	if (!rbd_dev)
1796		return;
1797
1798	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
1799		rbd_dev->header_name, (unsigned long long) notify_id,
1800		(unsigned int) opcode);
1801	rc = rbd_dev_refresh(rbd_dev, &hver);
1802	if (rc)
1803		rbd_warn(rbd_dev, "got notification but failed to "
1804			   " update snaps: %d\n", rc);
1805
1806	rbd_obj_notify_ack(rbd_dev, hver, notify_id);
1807}
1808
1809/*
1810 * Request sync osd watch/unwatch.  The value of "start" determines
1811 * whether a watch request is being initiated or torn down.
1812 */
1813static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
1814{
1815	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1816	struct rbd_obj_request *obj_request;
1817	struct ceph_osd_req_op *op;
1818	int ret;
1819
1820	rbd_assert(start ^ !!rbd_dev->watch_event);
1821	rbd_assert(start ^ !!rbd_dev->watch_request);
1822
1823	if (start) {
1824		ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
1825						&rbd_dev->watch_event);
1826		if (ret < 0)
1827			return ret;
1828		rbd_assert(rbd_dev->watch_event != NULL);
1829	}
1830
1831	ret = -ENOMEM;
1832	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
1833							OBJ_REQUEST_NODATA);
1834	if (!obj_request)
1835		goto out_cancel;
1836
1837	op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
1838				rbd_dev->watch_event->cookie,
1839				rbd_dev->header.obj_version, start);
1840	if (!op)
1841		goto out_cancel;
1842	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
1843							obj_request, op);
1844	rbd_osd_req_op_destroy(op);
1845	if (!obj_request->osd_req)
1846		goto out_cancel;
1847
1848	if (start)
1849		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
1850	else
1851		ceph_osdc_unregister_linger_request(osdc,
1852					rbd_dev->watch_request->osd_req);
1853	ret = rbd_obj_request_submit(osdc, obj_request);
1854	if (ret)
1855		goto out_cancel;
1856	ret = rbd_obj_request_wait(obj_request);
1857	if (ret)
1858		goto out_cancel;
1859	ret = obj_request->result;
1860	if (ret)
1861		goto out_cancel;
1862
1863	/*
1864	 * A watch request is set to linger, so the underlying osd
1865	 * request won't go away until we unregister it.  We retain
1866	 * a pointer to the object request during that time (in
1867	 * rbd_dev->watch_request), so we'll keep a reference to
1868	 * it.  We'll drop that reference (below) after we've
1869	 * unregistered it.
1870	 */
1871	if (start) {
1872		rbd_dev->watch_request = obj_request;
1873
1874		return 0;
1875	}
1876
1877	/* We have successfully torn down the watch request */
1878
1879	rbd_obj_request_put(rbd_dev->watch_request);
1880	rbd_dev->watch_request = NULL;
1881out_cancel:
1882	/* Cancel the event if we're tearing down, or on error */
1883	ceph_osdc_cancel_event(rbd_dev->watch_event);
1884	rbd_dev->watch_event = NULL;
1885	if (obj_request)
1886		rbd_obj_request_put(obj_request);
1887
1888	return ret;
1889}
1890
1891/*
1892 * Synchronous osd object method call
1893 */
1894static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
1895			     const char *object_name,
1896			     const char *class_name,
1897			     const char *method_name,
1898			     const char *outbound,
1899			     size_t outbound_size,
1900			     char *inbound,
1901			     size_t inbound_size,
1902			     u64 *version)
1903{
1904	struct rbd_obj_request *obj_request;
1905	struct ceph_osd_client *osdc;
1906	struct ceph_osd_req_op *op;
1907	struct page **pages;
1908	u32 page_count;
1909	int ret;
1910
1911	/*
1912	 * Method calls are ultimately read operations but they
1913	 * don't involve object data (so no offset or length).
1914	 * The result should placed into the inbound buffer
1915	 * provided.  They also supply outbound data--parameters for
1916	 * the object method.  Currently if this is present it will
1917	 * be a snapshot id.
1918	 */
1919	page_count = (u32) calc_pages_for(0, inbound_size);
1920	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
1921	if (IS_ERR(pages))
1922		return PTR_ERR(pages);
1923
1924	ret = -ENOMEM;
1925	obj_request = rbd_obj_request_create(object_name, 0, 0,
1926							OBJ_REQUEST_PAGES);
1927	if (!obj_request)
1928		goto out;
1929
1930	obj_request->pages = pages;
1931	obj_request->page_count = page_count;
1932
1933	op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
1934					method_name, outbound, outbound_size);
1935	if (!op)
1936		goto out;
1937	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
1938						obj_request, op);
1939	rbd_osd_req_op_destroy(op);
1940	if (!obj_request->osd_req)
1941		goto out;
1942
1943	osdc = &rbd_dev->rbd_client->client->osdc;
1944	ret = rbd_obj_request_submit(osdc, obj_request);
1945	if (ret)
1946		goto out;
1947	ret = rbd_obj_request_wait(obj_request);
1948	if (ret)
1949		goto out;
1950
1951	ret = obj_request->result;
1952	if (ret < 0)
1953		goto out;
1954	ret = 0;
1955	ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
1956	if (version)
1957		*version = obj_request->version;
1958out:
1959	if (obj_request)
1960		rbd_obj_request_put(obj_request);
1961	else
1962		ceph_release_page_vector(pages, page_count);
1963
1964	return ret;
1965}
1966
1967static void rbd_request_fn(struct request_queue *q)
1968		__releases(q->queue_lock) __acquires(q->queue_lock)
1969{
1970	struct rbd_device *rbd_dev = q->queuedata;
1971	bool read_only = rbd_dev->mapping.read_only;
1972	struct request *rq;
1973	int result;
1974
1975	while ((rq = blk_fetch_request(q))) {
1976		bool write_request = rq_data_dir(rq) == WRITE;
1977		struct rbd_img_request *img_request;
1978		u64 offset;
1979		u64 length;
1980
1981		/* Ignore any non-FS requests that filter through. */
1982
1983		if (rq->cmd_type != REQ_TYPE_FS) {
1984			dout("%s: non-fs request type %d\n", __func__,
1985				(int) rq->cmd_type);
1986			__blk_end_request_all(rq, 0);
1987			continue;
1988		}
1989
1990		/* Ignore/skip any zero-length requests */
1991
1992		offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
1993		length = (u64) blk_rq_bytes(rq);
1994
1995		if (!length) {
1996			dout("%s: zero-length request\n", __func__);
1997			__blk_end_request_all(rq, 0);
1998			continue;
1999		}
2000
2001		spin_unlock_irq(q->queue_lock);
2002
2003		/* Disallow writes to a read-only device */
2004
2005		if (write_request) {
2006			result = -EROFS;
2007			if (read_only)
2008				goto end_request;
2009			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2010		}
2011
2012		/*
2013		 * Quit early if the mapped snapshot no longer
2014		 * exists.  It's still possible the snapshot will
2015		 * have disappeared by the time our request arrives
2016		 * at the osd, but there's no sense in sending it if
2017		 * we already know.
2018		 */
2019		if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2020			dout("request for non-existent snapshot");
2021			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2022			result = -ENXIO;
2023			goto end_request;
2024		}
2025
2026		result = -EINVAL;
2027		if (WARN_ON(offset && length > U64_MAX - offset + 1))
2028			goto end_request;	/* Shouldn't happen */
2029
2030		result = -ENOMEM;
2031		img_request = rbd_img_request_create(rbd_dev, offset, length,
2032							write_request);
2033		if (!img_request)
2034			goto end_request;
2035
2036		img_request->rq = rq;
2037
2038		result = rbd_img_request_fill_bio(img_request, rq->bio);
2039		if (!result)
2040			result = rbd_img_request_submit(img_request);
2041		if (result)
2042			rbd_img_request_put(img_request);
2043end_request:
2044		spin_lock_irq(q->queue_lock);
2045		if (result < 0) {
2046			rbd_warn(rbd_dev, "obj_request %s result %d\n",
2047				write_request ? "write" : "read", result);
2048			__blk_end_request_all(rq, result);
2049		}
2050	}
2051}
2052
2053/*
2054 * a queue callback. Makes sure that we don't create a bio that spans across
2055 * multiple osd objects. One exception would be with a single page bios,
2056 * which we handle later at bio_chain_clone_range()
2057 */
2058static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2059			  struct bio_vec *bvec)
2060{
2061	struct rbd_device *rbd_dev = q->queuedata;
2062	sector_t sector_offset;
2063	sector_t sectors_per_obj;
2064	sector_t obj_sector_offset;
2065	int ret;
2066
2067	/*
2068	 * Find how far into its rbd object the partition-relative
2069	 * bio start sector is to offset relative to the enclosing
2070	 * device.
2071	 */
2072	sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2073	sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2074	obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2075
2076	/*
2077	 * Compute the number of bytes from that offset to the end
2078	 * of the object.  Account for what's already used by the bio.
2079	 */
2080	ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2081	if (ret > bmd->bi_size)
2082		ret -= bmd->bi_size;
2083	else
2084		ret = 0;
2085
2086	/*
2087	 * Don't send back more than was asked for.  And if the bio
2088	 * was empty, let the whole thing through because:  "Note
2089	 * that a block device *must* allow a single page to be
2090	 * added to an empty bio."
2091	 */
2092	rbd_assert(bvec->bv_len <= PAGE_SIZE);
2093	if (ret > (int) bvec->bv_len || !bmd->bi_size)
2094		ret = (int) bvec->bv_len;
2095
2096	return ret;
2097}
2098
2099static void rbd_free_disk(struct rbd_device *rbd_dev)
2100{
2101	struct gendisk *disk = rbd_dev->disk;
2102
2103	if (!disk)
2104		return;
2105
2106	if (disk->flags & GENHD_FL_UP)
2107		del_gendisk(disk);
2108	if (disk->queue)
2109		blk_cleanup_queue(disk->queue);
2110	put_disk(disk);
2111}
2112
2113static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2114				const char *object_name,
2115				u64 offset, u64 length,
2116				char *buf, u64 *version)
2117
2118{
2119	struct ceph_osd_req_op *op;
2120	struct rbd_obj_request *obj_request;
2121	struct ceph_osd_client *osdc;
2122	struct page **pages = NULL;
2123	u32 page_count;
2124	size_t size;
2125	int ret;
2126
2127	page_count = (u32) calc_pages_for(offset, length);
2128	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2129	if (IS_ERR(pages))
2130		ret = PTR_ERR(pages);
2131
2132	ret = -ENOMEM;
2133	obj_request = rbd_obj_request_create(object_name, offset, length,
2134							OBJ_REQUEST_PAGES);
2135	if (!obj_request)
2136		goto out;
2137
2138	obj_request->pages = pages;
2139	obj_request->page_count = page_count;
2140
2141	op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length);
2142	if (!op)
2143		goto out;
2144	obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2145						obj_request, op);
2146	rbd_osd_req_op_destroy(op);
2147	if (!obj_request->osd_req)
2148		goto out;
2149
2150	osdc = &rbd_dev->rbd_client->client->osdc;
2151	ret = rbd_obj_request_submit(osdc, obj_request);
2152	if (ret)
2153		goto out;
2154	ret = rbd_obj_request_wait(obj_request);
2155	if (ret)
2156		goto out;
2157
2158	ret = obj_request->result;
2159	if (ret < 0)
2160		goto out;
2161
2162	rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2163	size = (size_t) obj_request->xferred;
2164	ceph_copy_from_page_vector(pages, buf, 0, size);
2165	rbd_assert(size <= (size_t) INT_MAX);
2166	ret = (int) size;
2167	if (version)
2168		*version = obj_request->version;
2169out:
2170	if (obj_request)
2171		rbd_obj_request_put(obj_request);
2172	else
2173		ceph_release_page_vector(pages, page_count);
2174
2175	return ret;
2176}
2177
2178/*
2179 * Read the complete header for the given rbd device.
2180 *
2181 * Returns a pointer to a dynamically-allocated buffer containing
2182 * the complete and validated header.  Caller can pass the address
2183 * of a variable that will be filled in with the version of the
2184 * header object at the time it was read.
2185 *
2186 * Returns a pointer-coded errno if a failure occurs.
2187 */
2188static struct rbd_image_header_ondisk *
2189rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2190{
2191	struct rbd_image_header_ondisk *ondisk = NULL;
2192	u32 snap_count = 0;
2193	u64 names_size = 0;
2194	u32 want_count;
2195	int ret;
2196
2197	/*
2198	 * The complete header will include an array of its 64-bit
2199	 * snapshot ids, followed by the names of those snapshots as
2200	 * a contiguous block of NUL-terminated strings.  Note that
2201	 * the number of snapshots could change by the time we read
2202	 * it in, in which case we re-read it.
2203	 */
2204	do {
2205		size_t size;
2206
2207		kfree(ondisk);
2208
2209		size = sizeof (*ondisk);
2210		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2211		size += names_size;
2212		ondisk = kmalloc(size, GFP_KERNEL);
2213		if (!ondisk)
2214			return ERR_PTR(-ENOMEM);
2215
2216		ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2217				       0, size,
2218				       (char *) ondisk, version);
2219		if (ret < 0)
2220			goto out_err;
2221		if (WARN_ON((size_t) ret < size)) {
2222			ret = -ENXIO;
2223			rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2224				size, ret);
2225			goto out_err;
2226		}
2227		if (!rbd_dev_ondisk_valid(ondisk)) {
2228			ret = -ENXIO;
2229			rbd_warn(rbd_dev, "invalid header");
2230			goto out_err;
2231		}
2232
2233		names_size = le64_to_cpu(ondisk->snap_names_len);
2234		want_count = snap_count;
2235		snap_count = le32_to_cpu(ondisk->snap_count);
2236	} while (snap_count != want_count);
2237
2238	return ondisk;
2239
2240out_err:
2241	kfree(ondisk);
2242
2243	return ERR_PTR(ret);
2244}
2245
2246/*
2247 * reload the ondisk the header
2248 */
2249static int rbd_read_header(struct rbd_device *rbd_dev,
2250			   struct rbd_image_header *header)
2251{
2252	struct rbd_image_header_ondisk *ondisk;
2253	u64 ver = 0;
2254	int ret;
2255
2256	ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
2257	if (IS_ERR(ondisk))
2258		return PTR_ERR(ondisk);
2259	ret = rbd_header_from_disk(header, ondisk);
2260	if (ret >= 0)
2261		header->obj_version = ver;
2262	kfree(ondisk);
2263
2264	return ret;
2265}
2266
2267static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
2268{
2269	struct rbd_snap *snap;
2270	struct rbd_snap *next;
2271
2272	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
2273		rbd_remove_snap_dev(snap);
2274}
2275
2276static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2277{
2278	sector_t size;
2279
2280	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
2281		return;
2282
2283	size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
2284	dout("setting size to %llu sectors", (unsigned long long) size);
2285	rbd_dev->mapping.size = (u64) size;
2286	set_capacity(rbd_dev->disk, size);
2287}
2288
2289/*
2290 * only read the first part of the ondisk header, without the snaps info
2291 */
2292static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
2293{
2294	int ret;
2295	struct rbd_image_header h;
2296
2297	ret = rbd_read_header(rbd_dev, &h);
2298	if (ret < 0)
2299		return ret;
2300
2301	down_write(&rbd_dev->header_rwsem);
2302
2303	/* Update image size, and check for resize of mapped image */
2304	rbd_dev->header.image_size = h.image_size;
2305	rbd_update_mapping_size(rbd_dev);
2306
2307	/* rbd_dev->header.object_prefix shouldn't change */
2308	kfree(rbd_dev->header.snap_sizes);
2309	kfree(rbd_dev->header.snap_names);
2310	/* osd requests may still refer to snapc */
2311	ceph_put_snap_context(rbd_dev->header.snapc);
2312
2313	if (hver)
2314		*hver = h.obj_version;
2315	rbd_dev->header.obj_version = h.obj_version;
2316	rbd_dev->header.image_size = h.image_size;
2317	rbd_dev->header.snapc = h.snapc;
2318	rbd_dev->header.snap_names = h.snap_names;
2319	rbd_dev->header.snap_sizes = h.snap_sizes;
2320	/* Free the extra copy of the object prefix */
2321	WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
2322	kfree(h.object_prefix);
2323
2324	ret = rbd_dev_snaps_update(rbd_dev);
2325	if (!ret)
2326		ret = rbd_dev_snaps_register(rbd_dev);
2327
2328	up_write(&rbd_dev->header_rwsem);
2329
2330	return ret;
2331}
2332
2333static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
2334{
2335	int ret;
2336
2337	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
2338	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2339	if (rbd_dev->image_format == 1)
2340		ret = rbd_dev_v1_refresh(rbd_dev, hver);
2341	else
2342		ret = rbd_dev_v2_refresh(rbd_dev, hver);
2343	mutex_unlock(&ctl_mutex);
2344
2345	return ret;
2346}
2347
2348static int rbd_init_disk(struct rbd_device *rbd_dev)
2349{
2350	struct gendisk *disk;
2351	struct request_queue *q;
2352	u64 segment_size;
2353
2354	/* create gendisk info */
2355	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
2356	if (!disk)
2357		return -ENOMEM;
2358
2359	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
2360		 rbd_dev->dev_id);
2361	disk->major = rbd_dev->major;
2362	disk->first_minor = 0;
2363	disk->fops = &rbd_bd_ops;
2364	disk->private_data = rbd_dev;
2365
2366	q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
2367	if (!q)
2368		goto out_disk;
2369
2370	/* We use the default size, but let's be explicit about it. */
2371	blk_queue_physical_block_size(q, SECTOR_SIZE);
2372
2373	/* set io sizes to object size */
2374	segment_size = rbd_obj_bytes(&rbd_dev->header);
2375	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
2376	blk_queue_max_segment_size(q, segment_size);
2377	blk_queue_io_min(q, segment_size);
2378	blk_queue_io_opt(q, segment_size);
2379
2380	blk_queue_merge_bvec(q, rbd_merge_bvec);
2381	disk->queue = q;
2382
2383	q->queuedata = rbd_dev;
2384
2385	rbd_dev->disk = disk;
2386
2387	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
2388
2389	return 0;
2390out_disk:
2391	put_disk(disk);
2392
2393	return -ENOMEM;
2394}
2395
2396/*
2397  sysfs
2398*/
2399
2400static struct rbd_device *dev_to_rbd_dev(struct device *dev)
2401{
2402	return container_of(dev, struct rbd_device, dev);
2403}
2404
2405static ssize_t rbd_size_show(struct device *dev,
2406			     struct device_attribute *attr, char *buf)
2407{
2408	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2409	sector_t size;
2410
2411	down_read(&rbd_dev->header_rwsem);
2412	size = get_capacity(rbd_dev->disk);
2413	up_read(&rbd_dev->header_rwsem);
2414
2415	return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
2416}
2417
2418/*
2419 * Note this shows the features for whatever's mapped, which is not
2420 * necessarily the base image.
2421 */
2422static ssize_t rbd_features_show(struct device *dev,
2423			     struct device_attribute *attr, char *buf)
2424{
2425	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2426
2427	return sprintf(buf, "0x%016llx\n",
2428			(unsigned long long) rbd_dev->mapping.features);
2429}
2430
2431static ssize_t rbd_major_show(struct device *dev,
2432			      struct device_attribute *attr, char *buf)
2433{
2434	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2435
2436	return sprintf(buf, "%d\n", rbd_dev->major);
2437}
2438
2439static ssize_t rbd_client_id_show(struct device *dev,
2440				  struct device_attribute *attr, char *buf)
2441{
2442	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2443
2444	return sprintf(buf, "client%lld\n",
2445			ceph_client_id(rbd_dev->rbd_client->client));
2446}
2447
2448static ssize_t rbd_pool_show(struct device *dev,
2449			     struct device_attribute *attr, char *buf)
2450{
2451	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2452
2453	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
2454}
2455
2456static ssize_t rbd_pool_id_show(struct device *dev,
2457			     struct device_attribute *attr, char *buf)
2458{
2459	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2460
2461	return sprintf(buf, "%llu\n",
2462		(unsigned long long) rbd_dev->spec->pool_id);
2463}
2464
2465static ssize_t rbd_name_show(struct device *dev,
2466			     struct device_attribute *attr, char *buf)
2467{
2468	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2469
2470	if (rbd_dev->spec->image_name)
2471		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2472
2473	return sprintf(buf, "(unknown)\n");
2474}
2475
2476static ssize_t rbd_image_id_show(struct device *dev,
2477			     struct device_attribute *attr, char *buf)
2478{
2479	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2480
2481	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2482}
2483
2484/*
2485 * Shows the name of the currently-mapped snapshot (or
2486 * RBD_SNAP_HEAD_NAME for the base image).
2487 */
2488static ssize_t rbd_snap_show(struct device *dev,
2489			     struct device_attribute *attr,
2490			     char *buf)
2491{
2492	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2493
2494	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2495}
2496
2497/*
2498 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2499 * for the parent image.  If there is no parent, simply shows
2500 * "(no parent image)".
2501 */
2502static ssize_t rbd_parent_show(struct device *dev,
2503			     struct device_attribute *attr,
2504			     char *buf)
2505{
2506	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2507	struct rbd_spec *spec = rbd_dev->parent_spec;
2508	int count;
2509	char *bufp = buf;
2510
2511	if (!spec)
2512		return sprintf(buf, "(no parent image)\n");
2513
2514	count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2515			(unsigned long long) spec->pool_id, spec->pool_name);
2516	if (count < 0)
2517		return count;
2518	bufp += count;
2519
2520	count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2521			spec->image_name ? spec->image_name : "(unknown)");
2522	if (count < 0)
2523		return count;
2524	bufp += count;
2525
2526	count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2527			(unsigned long long) spec->snap_id, spec->snap_name);
2528	if (count < 0)
2529		return count;
2530	bufp += count;
2531
2532	count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2533	if (count < 0)
2534		return count;
2535	bufp += count;
2536
2537	return (ssize_t) (bufp - buf);
2538}
2539
2540static ssize_t rbd_image_refresh(struct device *dev,
2541				 struct device_attribute *attr,
2542				 const char *buf,
2543				 size_t size)
2544{
2545	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2546	int ret;
2547
2548	ret = rbd_dev_refresh(rbd_dev, NULL);
2549
2550	return ret < 0 ? ret : size;
2551}
2552
2553static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2554static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2555static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2556static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2557static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2558static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2559static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2560static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2561static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2562static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2563static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2564
2565static struct attribute *rbd_attrs[] = {
2566	&dev_attr_size.attr,
2567	&dev_attr_features.attr,
2568	&dev_attr_major.attr,
2569	&dev_attr_client_id.attr,
2570	&dev_attr_pool.attr,
2571	&dev_attr_pool_id.attr,
2572	&dev_attr_name.attr,
2573	&dev_attr_image_id.attr,
2574	&dev_attr_current_snap.attr,
2575	&dev_attr_parent.attr,
2576	&dev_attr_refresh.attr,
2577	NULL
2578};
2579
2580static struct attribute_group rbd_attr_group = {
2581	.attrs = rbd_attrs,
2582};
2583
2584static const struct attribute_group *rbd_attr_groups[] = {
2585	&rbd_attr_group,
2586	NULL
2587};
2588
2589static void rbd_sysfs_dev_release(struct device *dev)
2590{
2591}
2592
2593static struct device_type rbd_device_type = {
2594	.name		= "rbd",
2595	.groups		= rbd_attr_groups,
2596	.release	= rbd_sysfs_dev_release,
2597};
2598
2599
2600/*
2601  sysfs - snapshots
2602*/
2603
2604static ssize_t rbd_snap_size_show(struct device *dev,
2605				  struct device_attribute *attr,
2606				  char *buf)
2607{
2608	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2609
2610	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2611}
2612
2613static ssize_t rbd_snap_id_show(struct device *dev,
2614				struct device_attribute *attr,
2615				char *buf)
2616{
2617	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2618
2619	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2620}
2621
2622static ssize_t rbd_snap_features_show(struct device *dev,
2623				struct device_attribute *attr,
2624				char *buf)
2625{
2626	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2627
2628	return sprintf(buf, "0x%016llx\n",
2629			(unsigned long long) snap->features);
2630}
2631
2632static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2633static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2634static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2635
2636static struct attribute *rbd_snap_attrs[] = {
2637	&dev_attr_snap_size.attr,
2638	&dev_attr_snap_id.attr,
2639	&dev_attr_snap_features.attr,
2640	NULL,
2641};
2642
2643static struct attribute_group rbd_snap_attr_group = {
2644	.attrs = rbd_snap_attrs,
2645};
2646
2647static void rbd_snap_dev_release(struct device *dev)
2648{
2649	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2650	kfree(snap->name);
2651	kfree(snap);
2652}
2653
2654static const struct attribute_group *rbd_snap_attr_groups[] = {
2655	&rbd_snap_attr_group,
2656	NULL
2657};
2658
2659static struct device_type rbd_snap_device_type = {
2660	.groups		= rbd_snap_attr_groups,
2661	.release	= rbd_snap_dev_release,
2662};
2663
2664static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2665{
2666	kref_get(&spec->kref);
2667
2668	return spec;
2669}
2670
2671static void rbd_spec_free(struct kref *kref);
2672static void rbd_spec_put(struct rbd_spec *spec)
2673{
2674	if (spec)
2675		kref_put(&spec->kref, rbd_spec_free);
2676}
2677
2678static struct rbd_spec *rbd_spec_alloc(void)
2679{
2680	struct rbd_spec *spec;
2681
2682	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2683	if (!spec)
2684		return NULL;
2685	kref_init(&spec->kref);
2686
2687	rbd_spec_put(rbd_spec_get(spec));	/* TEMPORARY */
2688
2689	return spec;
2690}
2691
2692static void rbd_spec_free(struct kref *kref)
2693{
2694	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2695
2696	kfree(spec->pool_name);
2697	kfree(spec->image_id);
2698	kfree(spec->image_name);
2699	kfree(spec->snap_name);
2700	kfree(spec);
2701}
2702
2703static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2704				struct rbd_spec *spec)
2705{
2706	struct rbd_device *rbd_dev;
2707
2708	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2709	if (!rbd_dev)
2710		return NULL;
2711
2712	spin_lock_init(&rbd_dev->lock);
2713	rbd_dev->flags = 0;
2714	INIT_LIST_HEAD(&rbd_dev->node);
2715	INIT_LIST_HEAD(&rbd_dev->snaps);
2716	init_rwsem(&rbd_dev->header_rwsem);
2717
2718	rbd_dev->spec = spec;
2719	rbd_dev->rbd_client = rbdc;
2720
2721	/* Initialize the layout used for all rbd requests */
2722
2723	rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2724	rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
2725	rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
2726	rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
2727
2728	return rbd_dev;
2729}
2730
2731static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2732{
2733	rbd_spec_put(rbd_dev->parent_spec);
2734	kfree(rbd_dev->header_name);
2735	rbd_put_client(rbd_dev->rbd_client);
2736	rbd_spec_put(rbd_dev->spec);
2737	kfree(rbd_dev);
2738}
2739
2740static bool rbd_snap_registered(struct rbd_snap *snap)
2741{
2742	bool ret = snap->dev.type == &rbd_snap_device_type;
2743	bool reg = device_is_registered(&snap->dev);
2744
2745	rbd_assert(!ret ^ reg);
2746
2747	return ret;
2748}
2749
2750static void rbd_remove_snap_dev(struct rbd_snap *snap)
2751{
2752	list_del(&snap->node);
2753	if (device_is_registered(&snap->dev))
2754		device_unregister(&snap->dev);
2755}
2756
2757static int rbd_register_snap_dev(struct rbd_snap *snap,
2758				  struct device *parent)
2759{
2760	struct device *dev = &snap->dev;
2761	int ret;
2762
2763	dev->type = &rbd_snap_device_type;
2764	dev->parent = parent;
2765	dev->release = rbd_snap_dev_release;
2766	dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2767	dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2768
2769	ret = device_register(dev);
2770
2771	return ret;
2772}
2773
2774static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2775						const char *snap_name,
2776						u64 snap_id, u64 snap_size,
2777						u64 snap_features)
2778{
2779	struct rbd_snap *snap;
2780	int ret;
2781
2782	snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2783	if (!snap)
2784		return ERR_PTR(-ENOMEM);
2785
2786	ret = -ENOMEM;
2787	snap->name = kstrdup(snap_name, GFP_KERNEL);
2788	if (!snap->name)
2789		goto err;
2790
2791	snap->id = snap_id;
2792	snap->size = snap_size;
2793	snap->features = snap_features;
2794
2795	return snap;
2796
2797err:
2798	kfree(snap->name);
2799	kfree(snap);
2800
2801	return ERR_PTR(ret);
2802}
2803
2804static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2805		u64 *snap_size, u64 *snap_features)
2806{
2807	char *snap_name;
2808
2809	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2810
2811	*snap_size = rbd_dev->header.snap_sizes[which];
2812	*snap_features = 0;	/* No features for v1 */
2813
2814	/* Skip over names until we find the one we are looking for */
2815
2816	snap_name = rbd_dev->header.snap_names;
2817	while (which--)
2818		snap_name += strlen(snap_name) + 1;
2819
2820	return snap_name;
2821}
2822
2823/*
2824 * Get the size and object order for an image snapshot, or if
2825 * snap_id is CEPH_NOSNAP, gets this information for the base
2826 * image.
2827 */
2828static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2829				u8 *order, u64 *snap_size)
2830{
2831	__le64 snapid = cpu_to_le64(snap_id);
2832	int ret;
2833	struct {
2834		u8 order;
2835		__le64 size;
2836	} __attribute__ ((packed)) size_buf = { 0 };
2837
2838	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2839				"rbd", "get_size",
2840				(char *) &snapid, sizeof (snapid),
2841				(char *) &size_buf, sizeof (size_buf), NULL);
2842	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2843	if (ret < 0)
2844		return ret;
2845
2846	*order = size_buf.order;
2847	*snap_size = le64_to_cpu(size_buf.size);
2848
2849	dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2850		(unsigned long long) snap_id, (unsigned int) *order,
2851		(unsigned long long) *snap_size);
2852
2853	return 0;
2854}
2855
2856static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2857{
2858	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2859					&rbd_dev->header.obj_order,
2860					&rbd_dev->header.image_size);
2861}
2862
2863static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2864{
2865	void *reply_buf;
2866	int ret;
2867	void *p;
2868
2869	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2870	if (!reply_buf)
2871		return -ENOMEM;
2872
2873	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2874				"rbd", "get_object_prefix",
2875				NULL, 0,
2876				reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
2877	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2878	if (ret < 0)
2879		goto out;
2880
2881	p = reply_buf;
2882	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2883						p + RBD_OBJ_PREFIX_LEN_MAX,
2884						NULL, GFP_NOIO);
2885
2886	if (IS_ERR(rbd_dev->header.object_prefix)) {
2887		ret = PTR_ERR(rbd_dev->header.object_prefix);
2888		rbd_dev->header.object_prefix = NULL;
2889	} else {
2890		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2891	}
2892
2893out:
2894	kfree(reply_buf);
2895
2896	return ret;
2897}
2898
2899static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2900		u64 *snap_features)
2901{
2902	__le64 snapid = cpu_to_le64(snap_id);
2903	struct {
2904		__le64 features;
2905		__le64 incompat;
2906	} features_buf = { 0 };
2907	u64 incompat;
2908	int ret;
2909
2910	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2911				"rbd", "get_features",
2912				(char *) &snapid, sizeof (snapid),
2913				(char *) &features_buf, sizeof (features_buf),
2914				NULL);
2915	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2916	if (ret < 0)
2917		return ret;
2918
2919	incompat = le64_to_cpu(features_buf.incompat);
2920	if (incompat & ~RBD_FEATURES_ALL)
2921		return -ENXIO;
2922
2923	*snap_features = le64_to_cpu(features_buf.features);
2924
2925	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2926		(unsigned long long) snap_id,
2927		(unsigned long long) *snap_features,
2928		(unsigned long long) le64_to_cpu(features_buf.incompat));
2929
2930	return 0;
2931}
2932
2933static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2934{
2935	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2936						&rbd_dev->header.features);
2937}
2938
2939static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2940{
2941	struct rbd_spec *parent_spec;
2942	size_t size;
2943	void *reply_buf = NULL;
2944	__le64 snapid;
2945	void *p;
2946	void *end;
2947	char *image_id;
2948	u64 overlap;
2949	int ret;
2950
2951	parent_spec = rbd_spec_alloc();
2952	if (!parent_spec)
2953		return -ENOMEM;
2954
2955	size = sizeof (__le64) +				/* pool_id */
2956		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
2957		sizeof (__le64) +				/* snap_id */
2958		sizeof (__le64);				/* overlap */
2959	reply_buf = kmalloc(size, GFP_KERNEL);
2960	if (!reply_buf) {
2961		ret = -ENOMEM;
2962		goto out_err;
2963	}
2964
2965	snapid = cpu_to_le64(CEPH_NOSNAP);
2966	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
2967				"rbd", "get_parent",
2968				(char *) &snapid, sizeof (snapid),
2969				(char *) reply_buf, size, NULL);
2970	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
2971	if (ret < 0)
2972		goto out_err;
2973
2974	ret = -ERANGE;
2975	p = reply_buf;
2976	end = (char *) reply_buf + size;
2977	ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2978	if (parent_spec->pool_id == CEPH_NOPOOL)
2979		goto out;	/* No parent?  No problem. */
2980
2981	/* The ceph file layout needs to fit pool id in 32 bits */
2982
2983	ret = -EIO;
2984	if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX))
2985		goto out;
2986
2987	image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2988	if (IS_ERR(image_id)) {
2989		ret = PTR_ERR(image_id);
2990		goto out_err;
2991	}
2992	parent_spec->image_id = image_id;
2993	ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2994	ceph_decode_64_safe(&p, end, overlap, out_err);
2995
2996	rbd_dev->parent_overlap = overlap;
2997	rbd_dev->parent_spec = parent_spec;
2998	parent_spec = NULL;	/* rbd_dev now owns this */
2999out:
3000	ret = 0;
3001out_err:
3002	kfree(reply_buf);
3003	rbd_spec_put(parent_spec);
3004
3005	return ret;
3006}
3007
3008static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3009{
3010	size_t image_id_size;
3011	char *image_id;
3012	void *p;
3013	void *end;
3014	size_t size;
3015	void *reply_buf = NULL;
3016	size_t len = 0;
3017	char *image_name = NULL;
3018	int ret;
3019
3020	rbd_assert(!rbd_dev->spec->image_name);
3021
3022	len = strlen(rbd_dev->spec->image_id);
3023	image_id_size = sizeof (__le32) + len;
3024	image_id = kmalloc(image_id_size, GFP_KERNEL);
3025	if (!image_id)
3026		return NULL;
3027
3028	p = image_id;
3029	end = (char *) image_id + image_id_size;
3030	ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len);
3031
3032	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3033	reply_buf = kmalloc(size, GFP_KERNEL);
3034	if (!reply_buf)
3035		goto out;
3036
3037	ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3038				"rbd", "dir_get_name",
3039				image_id, image_id_size,
3040				(char *) reply_buf, size, NULL);
3041	if (ret < 0)
3042		goto out;
3043	p = reply_buf;
3044	end = (char *) reply_buf + size;
3045	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3046	if (IS_ERR(image_name))
3047		image_name = NULL;
3048	else
3049		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3050out:
3051	kfree(reply_buf);
3052	kfree(image_id);
3053
3054	return image_name;
3055}
3056
3057/*
3058 * When a parent image gets probed, we only have the pool, image,
3059 * and snapshot ids but not the names of any of them.  This call
3060 * is made later to fill in those names.  It has to be done after
3061 * rbd_dev_snaps_update() has completed because some of the
3062 * information (in particular, snapshot name) is not available
3063 * until then.
3064 */
3065static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3066{
3067	struct ceph_osd_client *osdc;
3068	const char *name;
3069	void *reply_buf = NULL;
3070	int ret;
3071
3072	if (rbd_dev->spec->pool_name)
3073		return 0;	/* Already have the names */
3074
3075	/* Look up the pool name */
3076
3077	osdc = &rbd_dev->rbd_client->client->osdc;
3078	name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3079	if (!name) {
3080		rbd_warn(rbd_dev, "there is no pool with id %llu",
3081			rbd_dev->spec->pool_id);	/* Really a BUG() */
3082		return -EIO;
3083	}
3084
3085	rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3086	if (!rbd_dev->spec->pool_name)
3087		return -ENOMEM;
3088
3089	/* Fetch the image name; tolerate failure here */
3090
3091	name = rbd_dev_image_name(rbd_dev);
3092	if (name)
3093		rbd_dev->spec->image_name = (char *) name;
3094	else
3095		rbd_warn(rbd_dev, "unable to get image name");
3096
3097	/* Look up the snapshot name. */
3098
3099	name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3100	if (!name) {
3101		rbd_warn(rbd_dev, "no snapshot with id %llu",
3102			rbd_dev->spec->snap_id);	/* Really a BUG() */
3103		ret = -EIO;
3104		goto out_err;
3105	}
3106	rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3107	if(!rbd_dev->spec->snap_name)
3108		goto out_err;
3109
3110	return 0;
3111out_err:
3112	kfree(reply_buf);
3113	kfree(rbd_dev->spec->pool_name);
3114	rbd_dev->spec->pool_name = NULL;
3115
3116	return ret;
3117}
3118
3119static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3120{
3121	size_t size;
3122	int ret;
3123	void *reply_buf;
3124	void *p;
3125	void *end;
3126	u64 seq;
3127	u32 snap_count;
3128	struct ceph_snap_context *snapc;
3129	u32 i;
3130
3131	/*
3132	 * We'll need room for the seq value (maximum snapshot id),
3133	 * snapshot count, and array of that many snapshot ids.
3134	 * For now we have a fixed upper limit on the number we're
3135	 * prepared to receive.
3136	 */
3137	size = sizeof (__le64) + sizeof (__le32) +
3138			RBD_MAX_SNAP_COUNT * sizeof (__le64);
3139	reply_buf = kzalloc(size, GFP_KERNEL);
3140	if (!reply_buf)
3141		return -ENOMEM;
3142
3143	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3144				"rbd", "get_snapcontext",
3145				NULL, 0,
3146				reply_buf, size, ver);
3147	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3148	if (ret < 0)
3149		goto out;
3150
3151	ret = -ERANGE;
3152	p = reply_buf;
3153	end = (char *) reply_buf + size;
3154	ceph_decode_64_safe(&p, end, seq, out);
3155	ceph_decode_32_safe(&p, end, snap_count, out);
3156
3157	/*
3158	 * Make sure the reported number of snapshot ids wouldn't go
3159	 * beyond the end of our buffer.  But before checking that,
3160	 * make sure the computed size of the snapshot context we
3161	 * allocate is representable in a size_t.
3162	 */
3163	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3164				 / sizeof (u64)) {
3165		ret = -EINVAL;
3166		goto out;
3167	}
3168	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3169		goto out;
3170
3171	size = sizeof (struct ceph_snap_context) +
3172				snap_count * sizeof (snapc->snaps[0]);
3173	snapc = kmalloc(size, GFP_KERNEL);
3174	if (!snapc) {
3175		ret = -ENOMEM;
3176		goto out;
3177	}
3178
3179	atomic_set(&snapc->nref, 1);
3180	snapc->seq = seq;
3181	snapc->num_snaps = snap_count;
3182	for (i = 0; i < snap_count; i++)
3183		snapc->snaps[i] = ceph_decode_64(&p);
3184
3185	rbd_dev->header.snapc = snapc;
3186
3187	dout("  snap context seq = %llu, snap_count = %u\n",
3188		(unsigned long long) seq, (unsigned int) snap_count);
3189
3190out:
3191	kfree(reply_buf);
3192
3193	return 0;
3194}
3195
3196static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3197{
3198	size_t size;
3199	void *reply_buf;
3200	__le64 snap_id;
3201	int ret;
3202	void *p;
3203	void *end;
3204	char *snap_name;
3205
3206	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3207	reply_buf = kmalloc(size, GFP_KERNEL);
3208	if (!reply_buf)
3209		return ERR_PTR(-ENOMEM);
3210
3211	snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3212	ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3213				"rbd", "get_snapshot_name",
3214				(char *) &snap_id, sizeof (snap_id),
3215				reply_buf, size, NULL);
3216	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3217	if (ret < 0)
3218		goto out;
3219
3220	p = reply_buf;
3221	end = (char *) reply_buf + size;
3222	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3223	if (IS_ERR(snap_name)) {
3224		ret = PTR_ERR(snap_name);
3225		goto out;
3226	} else {
3227		dout("  snap_id 0x%016llx snap_name = %s\n",
3228			(unsigned long long) le64_to_cpu(snap_id), snap_name);
3229	}
3230	kfree(reply_buf);
3231
3232	return snap_name;
3233out:
3234	kfree(reply_buf);
3235
3236	return ERR_PTR(ret);
3237}
3238
3239static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3240		u64 *snap_size, u64 *snap_features)
3241{
3242	u64 snap_id;
3243	u8 order;
3244	int ret;
3245
3246	snap_id = rbd_dev->header.snapc->snaps[which];
3247	ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3248	if (ret)
3249		return ERR_PTR(ret);
3250	ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3251	if (ret)
3252		return ERR_PTR(ret);
3253
3254	return rbd_dev_v2_snap_name(rbd_dev, which);
3255}
3256
3257static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
3258		u64 *snap_size, u64 *snap_features)
3259{
3260	if (rbd_dev->image_format == 1)
3261		return rbd_dev_v1_snap_info(rbd_dev, which,
3262					snap_size, snap_features);
3263	if (rbd_dev->image_format == 2)
3264		return rbd_dev_v2_snap_info(rbd_dev, which,
3265					snap_size, snap_features);
3266	return ERR_PTR(-EINVAL);
3267}
3268
3269static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
3270{
3271	int ret;
3272	__u8 obj_order;
3273
3274	down_write(&rbd_dev->header_rwsem);
3275
3276	/* Grab old order first, to see if it changes */
3277
3278	obj_order = rbd_dev->header.obj_order,
3279	ret = rbd_dev_v2_image_size(rbd_dev);
3280	if (ret)
3281		goto out;
3282	if (rbd_dev->header.obj_order != obj_order) {
3283		ret = -EIO;
3284		goto out;
3285	}
3286	rbd_update_mapping_size(rbd_dev);
3287
3288	ret = rbd_dev_v2_snap_context(rbd_dev, hver);
3289	dout("rbd_dev_v2_snap_context returned %d\n", ret);
3290	if (ret)
3291		goto out;
3292	ret = rbd_dev_snaps_update(rbd_dev);
3293	dout("rbd_dev_snaps_update returned %d\n", ret);
3294	if (ret)
3295		goto out;
3296	ret = rbd_dev_snaps_register(rbd_dev);
3297	dout("rbd_dev_snaps_register returned %d\n", ret);
3298out:
3299	up_write(&rbd_dev->header_rwsem);
3300
3301	return ret;
3302}
3303
3304/*
3305 * Scan the rbd device's current snapshot list and compare it to the
3306 * newly-received snapshot context.  Remove any existing snapshots
3307 * not present in the new snapshot context.  Add a new snapshot for
3308 * any snaphots in the snapshot context not in the current list.
3309 * And verify there are no changes to snapshots we already know
3310 * about.
3311 *
3312 * Assumes the snapshots in the snapshot context are sorted by
3313 * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
3314 * are also maintained in that order.)
3315 */
3316static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
3317{
3318	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3319	const u32 snap_count = snapc->num_snaps;
3320	struct list_head *head = &rbd_dev->snaps;
3321	struct list_head *links = head->next;
3322	u32 index = 0;
3323
3324	dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
3325	while (index < snap_count || links != head) {
3326		u64 snap_id;
3327		struct rbd_snap *snap;
3328		char *snap_name;
3329		u64 snap_size = 0;
3330		u64 snap_features = 0;
3331
3332		snap_id = index < snap_count ? snapc->snaps[index]
3333					     : CEPH_NOSNAP;
3334		snap = links != head ? list_entry(links, struct rbd_snap, node)
3335				     : NULL;
3336		rbd_assert(!snap || snap->id != CEPH_NOSNAP);
3337
3338		if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
3339			struct list_head *next = links->next;
3340
3341			/*
3342			 * A previously-existing snapshot is not in
3343			 * the new snap context.
3344			 *
3345			 * If the now missing snapshot is the one the
3346			 * image is mapped to, clear its exists flag
3347			 * so we can avoid sending any more requests
3348			 * to it.
3349			 */
3350			if (rbd_dev->spec->snap_id == snap->id)
3351				clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3352			rbd_remove_snap_dev(snap);
3353			dout("%ssnap id %llu has been removed\n",
3354				rbd_dev->spec->snap_id == snap->id ?
3355							"mapped " : "",
3356				(unsigned long long) snap->id);
3357
3358			/* Done with this list entry; advance */
3359
3360			links = next;
3361			continue;
3362		}
3363
3364		snap_name = rbd_dev_snap_info(rbd_dev, index,
3365					&snap_size, &snap_features);
3366		if (IS_ERR(snap_name))
3367			return PTR_ERR(snap_name);
3368
3369		dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
3370			(unsigned long long) snap_id);
3371		if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
3372			struct rbd_snap *new_snap;
3373
3374			/* We haven't seen this snapshot before */
3375
3376			new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
3377					snap_id, snap_size, snap_features);
3378			if (IS_ERR(new_snap)) {
3379				int err = PTR_ERR(new_snap);
3380
3381				dout("  failed to add dev, error %d\n", err);
3382
3383				return err;
3384			}
3385
3386			/* New goes before existing, or at end of list */
3387
3388			dout("  added dev%s\n", snap ? "" : " at end\n");
3389			if (snap)
3390				list_add_tail(&new_snap->node, &snap->node);
3391			else
3392				list_add_tail(&new_snap->node, head);
3393		} else {
3394			/* Already have this one */
3395
3396			dout("  already present\n");
3397
3398			rbd_assert(snap->size == snap_size);
3399			rbd_assert(!strcmp(snap->name, snap_name));
3400			rbd_assert(snap->features == snap_features);
3401
3402			/* Done with this list entry; advance */
3403
3404			links = links->next;
3405		}
3406
3407		/* Advance to the next entry in the snapshot context */
3408
3409		index++;
3410	}
3411	dout("%s: done\n", __func__);
3412
3413	return 0;
3414}
3415
3416/*
3417 * Scan the list of snapshots and register the devices for any that
3418 * have not already been registered.
3419 */
3420static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
3421{
3422	struct rbd_snap *snap;
3423	int ret = 0;
3424
3425	dout("%s:\n", __func__);
3426	if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
3427		return -EIO;
3428
3429	list_for_each_entry(snap, &rbd_dev->snaps, node) {
3430		if (!rbd_snap_registered(snap)) {
3431			ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
3432			if (ret < 0)
3433				break;
3434		}
3435	}
3436	dout("%s: returning %d\n", __func__, ret);
3437
3438	return ret;
3439}
3440
3441static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
3442{
3443	struct device *dev;
3444	int ret;
3445
3446	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3447
3448	dev = &rbd_dev->dev;
3449	dev->bus = &rbd_bus_type;
3450	dev->type = &rbd_device_type;
3451	dev->parent = &rbd_root_dev;
3452	dev->release = rbd_dev_release;
3453	dev_set_name(dev, "%d", rbd_dev->dev_id);
3454	ret = device_register(dev);
3455
3456	mutex_unlock(&ctl_mutex);
3457
3458	return ret;
3459}
3460
3461static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
3462{
3463	device_unregister(&rbd_dev->dev);
3464}
3465
3466static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3467
3468/*
3469 * Get a unique rbd identifier for the given new rbd_dev, and add
3470 * the rbd_dev to the global list.  The minimum rbd id is 1.
3471 */
3472static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3473{
3474	rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3475
3476	spin_lock(&rbd_dev_list_lock);
3477	list_add_tail(&rbd_dev->node, &rbd_dev_list);
3478	spin_unlock(&rbd_dev_list_lock);
3479	dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3480		(unsigned long long) rbd_dev->dev_id);
3481}
3482
3483/*
3484 * Remove an rbd_dev from the global list, and record that its
3485 * identifier is no longer in use.
3486 */
3487static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3488{
3489	struct list_head *tmp;
3490	int rbd_id = rbd_dev->dev_id;
3491	int max_id;
3492
3493	rbd_assert(rbd_id > 0);
3494
3495	dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3496		(unsigned long long) rbd_dev->dev_id);
3497	spin_lock(&rbd_dev_list_lock);
3498	list_del_init(&rbd_dev->node);
3499
3500	/*
3501	 * If the id being "put" is not the current maximum, there
3502	 * is nothing special we need to do.
3503	 */
3504	if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3505		spin_unlock(&rbd_dev_list_lock);
3506		return;
3507	}
3508
3509	/*
3510	 * We need to update the current maximum id.  Search the
3511	 * list to find out what it is.  We're more likely to find
3512	 * the maximum at the end, so search the list backward.
3513	 */
3514	max_id = 0;
3515	list_for_each_prev(tmp, &rbd_dev_list) {
3516		struct rbd_device *rbd_dev;
3517
3518		rbd_dev = list_entry(tmp, struct rbd_device, node);
3519		if (rbd_dev->dev_id > max_id)
3520			max_id = rbd_dev->dev_id;
3521	}
3522	spin_unlock(&rbd_dev_list_lock);
3523
3524	/*
3525	 * The max id could have been updated by rbd_dev_id_get(), in
3526	 * which case it now accurately reflects the new maximum.
3527	 * Be careful not to overwrite the maximum value in that
3528	 * case.
3529	 */
3530	atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3531	dout("  max dev id has been reset\n");
3532}
3533
3534/*
3535 * Skips over white space at *buf, and updates *buf to point to the
3536 * first found non-space character (if any). Returns the length of
3537 * the token (string of non-white space characters) found.  Note
3538 * that *buf must be terminated with '\0'.
3539 */
3540static inline size_t next_token(const char **buf)
3541{
3542        /*
3543        * These are the characters that produce nonzero for
3544        * isspace() in the "C" and "POSIX" locales.
3545        */
3546        const char *spaces = " \f\n\r\t\v";
3547
3548        *buf += strspn(*buf, spaces);	/* Find start of token */
3549
3550	return strcspn(*buf, spaces);   /* Return token length */
3551}
3552
3553/*
3554 * Finds the next token in *buf, and if the provided token buffer is
3555 * big enough, copies the found token into it.  The result, if
3556 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
3557 * must be terminated with '\0' on entry.
3558 *
3559 * Returns the length of the token found (not including the '\0').
3560 * Return value will be 0 if no token is found, and it will be >=
3561 * token_size if the token would not fit.
3562 *
3563 * The *buf pointer will be updated to point beyond the end of the
3564 * found token.  Note that this occurs even if the token buffer is
3565 * too small to hold it.
3566 */
3567static inline size_t copy_token(const char **buf,
3568				char *token,
3569				size_t token_size)
3570{
3571        size_t len;
3572
3573	len = next_token(buf);
3574	if (len < token_size) {
3575		memcpy(token, *buf, len);
3576		*(token + len) = '\0';
3577	}
3578	*buf += len;
3579
3580        return len;
3581}
3582
3583/*
3584 * Finds the next token in *buf, dynamically allocates a buffer big
3585 * enough to hold a copy of it, and copies the token into the new
3586 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
3587 * that a duplicate buffer is created even for a zero-length token.
3588 *
3589 * Returns a pointer to the newly-allocated duplicate, or a null
3590 * pointer if memory for the duplicate was not available.  If
3591 * the lenp argument is a non-null pointer, the length of the token
3592 * (not including the '\0') is returned in *lenp.
3593 *
3594 * If successful, the *buf pointer will be updated to point beyond
3595 * the end of the found token.
3596 *
3597 * Note: uses GFP_KERNEL for allocation.
3598 */
3599static inline char *dup_token(const char **buf, size_t *lenp)
3600{
3601	char *dup;
3602	size_t len;
3603
3604	len = next_token(buf);
3605	dup = kmemdup(*buf, len + 1, GFP_KERNEL);
3606	if (!dup)
3607		return NULL;
3608	*(dup + len) = '\0';
3609	*buf += len;
3610
3611	if (lenp)
3612		*lenp = len;
3613
3614	return dup;
3615}
3616
3617/*
3618 * Parse the options provided for an "rbd add" (i.e., rbd image
3619 * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
3620 * and the data written is passed here via a NUL-terminated buffer.
3621 * Returns 0 if successful or an error code otherwise.
3622 *
3623 * The information extracted from these options is recorded in
3624 * the other parameters which return dynamically-allocated
3625 * structures:
3626 *  ceph_opts
3627 *      The address of a pointer that will refer to a ceph options
3628 *      structure.  Caller must release the returned pointer using
3629 *      ceph_destroy_options() when it is no longer needed.
3630 *  rbd_opts
3631 *	Address of an rbd options pointer.  Fully initialized by
3632 *	this function; caller must release with kfree().
3633 *  spec
3634 *	Address of an rbd image specification pointer.  Fully
3635 *	initialized by this function based on parsed options.
3636 *	Caller must release with rbd_spec_put().
3637 *
3638 * The options passed take this form:
3639 *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3640 * where:
3641 *  <mon_addrs>
3642 *      A comma-separated list of one or more monitor addresses.
3643 *      A monitor address is an ip address, optionally followed
3644 *      by a port number (separated by a colon).
3645 *        I.e.:  ip1[:port1][,ip2[:port2]...]
3646 *  <options>
3647 *      A comma-separated list of ceph and/or rbd options.
3648 *  <pool_name>
3649 *      The name of the rados pool containing the rbd image.
3650 *  <image_name>
3651 *      The name of the image in that pool to map.
3652 *  <snap_id>
3653 *      An optional snapshot id.  If provided, the mapping will
3654 *      present data from the image at the time that snapshot was
3655 *      created.  The image head is used if no snapshot id is
3656 *      provided.  Snapshot mappings are always read-only.
3657 */
3658static int rbd_add_parse_args(const char *buf,
3659				struct ceph_options **ceph_opts,
3660				struct rbd_options **opts,
3661				struct rbd_spec **rbd_spec)
3662{
3663	size_t len;
3664	char *options;
3665	const char *mon_addrs;
3666	size_t mon_addrs_size;
3667	struct rbd_spec *spec = NULL;
3668	struct rbd_options *rbd_opts = NULL;
3669	struct ceph_options *copts;
3670	int ret;
3671
3672	/* The first four tokens are required */
3673
3674	len = next_token(&buf);
3675	if (!len) {
3676		rbd_warn(NULL, "no monitor address(es) provided");
3677		return -EINVAL;
3678	}
3679	mon_addrs = buf;
3680	mon_addrs_size = len + 1;
3681	buf += len;
3682
3683	ret = -EINVAL;
3684	options = dup_token(&buf, NULL);
3685	if (!options)
3686		return -ENOMEM;
3687	if (!*options) {
3688		rbd_warn(NULL, "no options provided");
3689		goto out_err;
3690	}
3691
3692	spec = rbd_spec_alloc();
3693	if (!spec)
3694		goto out_mem;
3695
3696	spec->pool_name = dup_token(&buf, NULL);
3697	if (!spec->pool_name)
3698		goto out_mem;
3699	if (!*spec->pool_name) {
3700		rbd_warn(NULL, "no pool name provided");
3701		goto out_err;
3702	}
3703
3704	spec->image_name = dup_token(&buf, NULL);
3705	if (!spec->image_name)
3706		goto out_mem;
3707	if (!*spec->image_name) {
3708		rbd_warn(NULL, "no image name provided");
3709		goto out_err;
3710	}
3711
3712	/*
3713	 * Snapshot name is optional; default is to use "-"
3714	 * (indicating the head/no snapshot).
3715	 */
3716	len = next_token(&buf);
3717	if (!len) {
3718		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3719		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3720	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
3721		ret = -ENAMETOOLONG;
3722		goto out_err;
3723	}
3724	spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
3725	if (!spec->snap_name)
3726		goto out_mem;
3727	*(spec->snap_name + len) = '\0';
3728
3729	/* Initialize all rbd options to the defaults */
3730
3731	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3732	if (!rbd_opts)
3733		goto out_mem;
3734
3735	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3736
3737	copts = ceph_parse_options(options, mon_addrs,
3738					mon_addrs + mon_addrs_size - 1,
3739					parse_rbd_opts_token, rbd_opts);
3740	if (IS_ERR(copts)) {
3741		ret = PTR_ERR(copts);
3742		goto out_err;
3743	}
3744	kfree(options);
3745
3746	*ceph_opts = copts;
3747	*opts = rbd_opts;
3748	*rbd_spec = spec;
3749
3750	return 0;
3751out_mem:
3752	ret = -ENOMEM;
3753out_err:
3754	kfree(rbd_opts);
3755	rbd_spec_put(spec);
3756	kfree(options);
3757
3758	return ret;
3759}
3760
3761/*
3762 * An rbd format 2 image has a unique identifier, distinct from the
3763 * name given to it by the user.  Internally, that identifier is
3764 * what's used to specify the names of objects related to the image.
3765 *
3766 * A special "rbd id" object is used to map an rbd image name to its
3767 * id.  If that object doesn't exist, then there is no v2 rbd image
3768 * with the supplied name.
3769 *
3770 * This function will record the given rbd_dev's image_id field if
3771 * it can be determined, and in that case will return 0.  If any
3772 * errors occur a negative errno will be returned and the rbd_dev's
3773 * image_id field will be unchanged (and should be NULL).
3774 */
3775static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3776{
3777	int ret;
3778	size_t size;
3779	char *object_name;
3780	void *response;
3781	void *p;
3782
3783	/*
3784	 * When probing a parent image, the image id is already
3785	 * known (and the image name likely is not).  There's no
3786	 * need to fetch the image id again in this case.
3787	 */
3788	if (rbd_dev->spec->image_id)
3789		return 0;
3790
3791	/*
3792	 * First, see if the format 2 image id file exists, and if
3793	 * so, get the image's persistent id from it.
3794	 */
3795	size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
3796	object_name = kmalloc(size, GFP_NOIO);
3797	if (!object_name)
3798		return -ENOMEM;
3799	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3800	dout("rbd id object name is %s\n", object_name);
3801
3802	/* Response will be an encoded string, which includes a length */
3803
3804	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3805	response = kzalloc(size, GFP_NOIO);
3806	if (!response) {
3807		ret = -ENOMEM;
3808		goto out;
3809	}
3810
3811	ret = rbd_obj_method_sync(rbd_dev, object_name,
3812				"rbd", "get_id",
3813				NULL, 0,
3814				response, RBD_IMAGE_ID_LEN_MAX, NULL);
3815	dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3816	if (ret < 0)
3817		goto out;
3818
3819	p = response;
3820	rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3821						p + RBD_IMAGE_ID_LEN_MAX,
3822						NULL, GFP_NOIO);
3823	if (IS_ERR(rbd_dev->spec->image_id)) {
3824		ret = PTR_ERR(rbd_dev->spec->image_id);
3825		rbd_dev->spec->image_id = NULL;
3826	} else {
3827		dout("image_id is %s\n", rbd_dev->spec->image_id);
3828	}
3829out:
3830	kfree(response);
3831	kfree(object_name);
3832
3833	return ret;
3834}
3835
3836static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3837{
3838	int ret;
3839	size_t size;
3840
3841	/* Version 1 images have no id; empty string is used */
3842
3843	rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3844	if (!rbd_dev->spec->image_id)
3845		return -ENOMEM;
3846
3847	/* Record the header object name for this rbd image. */
3848
3849	size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
3850	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3851	if (!rbd_dev->header_name) {
3852		ret = -ENOMEM;
3853		goto out_err;
3854	}
3855	sprintf(rbd_dev->header_name, "%s%s",
3856		rbd_dev->spec->image_name, RBD_SUFFIX);
3857
3858	/* Populate rbd image metadata */
3859
3860	ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3861	if (ret < 0)
3862		goto out_err;
3863
3864	/* Version 1 images have no parent (no layering) */
3865
3866	rbd_dev->parent_spec = NULL;
3867	rbd_dev->parent_overlap = 0;
3868
3869	rbd_dev->image_format = 1;
3870
3871	dout("discovered version 1 image, header name is %s\n",
3872		rbd_dev->header_name);
3873
3874	return 0;
3875
3876out_err:
3877	kfree(rbd_dev->header_name);
3878	rbd_dev->header_name = NULL;
3879	kfree(rbd_dev->spec->image_id);
3880	rbd_dev->spec->image_id = NULL;
3881
3882	return ret;
3883}
3884
3885static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3886{
3887	size_t size;
3888	int ret;
3889	u64 ver = 0;
3890
3891	/*
3892	 * Image id was filled in by the caller.  Record the header
3893	 * object name for this rbd image.
3894	 */
3895	size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
3896	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3897	if (!rbd_dev->header_name)
3898		return -ENOMEM;
3899	sprintf(rbd_dev->header_name, "%s%s",
3900			RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3901
3902	/* Get the size and object order for the image */
3903
3904	ret = rbd_dev_v2_image_size(rbd_dev);
3905	if (ret < 0)
3906		goto out_err;
3907
3908	/* Get the object prefix (a.k.a. block_name) for the image */
3909
3910	ret = rbd_dev_v2_object_prefix(rbd_dev);
3911	if (ret < 0)
3912		goto out_err;
3913
3914	/* Get the and check features for the image */
3915
3916	ret = rbd_dev_v2_features(rbd_dev);
3917	if (ret < 0)
3918		goto out_err;
3919
3920	/* If the image supports layering, get the parent info */
3921
3922	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3923		ret = rbd_dev_v2_parent_info(rbd_dev);
3924		if (ret < 0)
3925			goto out_err;
3926	}
3927
3928	/* crypto and compression type aren't (yet) supported for v2 images */
3929
3930	rbd_dev->header.crypt_type = 0;
3931	rbd_dev->header.comp_type = 0;
3932
3933	/* Get the snapshot context, plus the header version */
3934
3935	ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3936	if (ret)
3937		goto out_err;
3938	rbd_dev->header.obj_version = ver;
3939
3940	rbd_dev->image_format = 2;
3941
3942	dout("discovered version 2 image, header name is %s\n",
3943		rbd_dev->header_name);
3944
3945	return 0;
3946out_err:
3947	rbd_dev->parent_overlap = 0;
3948	rbd_spec_put(rbd_dev->parent_spec);
3949	rbd_dev->parent_spec = NULL;
3950	kfree(rbd_dev->header_name);
3951	rbd_dev->header_name = NULL;
3952	kfree(rbd_dev->header.object_prefix);
3953	rbd_dev->header.object_prefix = NULL;
3954
3955	return ret;
3956}
3957
3958static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3959{
3960	int ret;
3961
3962	/* no need to lock here, as rbd_dev is not registered yet */
3963	ret = rbd_dev_snaps_update(rbd_dev);
3964	if (ret)
3965		return ret;
3966
3967	ret = rbd_dev_probe_update_spec(rbd_dev);
3968	if (ret)
3969		goto err_out_snaps;
3970
3971	ret = rbd_dev_set_mapping(rbd_dev);
3972	if (ret)
3973		goto err_out_snaps;
3974
3975	/* generate unique id: find highest unique id, add one */
3976	rbd_dev_id_get(rbd_dev);
3977
3978	/* Fill in the device name, now that we have its id. */
3979	BUILD_BUG_ON(DEV_NAME_LEN
3980			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3981	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3982
3983	/* Get our block major device number. */
3984
3985	ret = register_blkdev(0, rbd_dev->name);
3986	if (ret < 0)
3987		goto err_out_id;
3988	rbd_dev->major = ret;
3989
3990	/* Set up the blkdev mapping. */
3991
3992	ret = rbd_init_disk(rbd_dev);
3993	if (ret)
3994		goto err_out_blkdev;
3995
3996	ret = rbd_bus_add_dev(rbd_dev);
3997	if (ret)
3998		goto err_out_disk;
3999
4000	/*
4001	 * At this point cleanup in the event of an error is the job
4002	 * of the sysfs code (initiated by rbd_bus_del_dev()).
4003	 */
4004	down_write(&rbd_dev->header_rwsem);
4005	ret = rbd_dev_snaps_register(rbd_dev);
4006	up_write(&rbd_dev->header_rwsem);
4007	if (ret)
4008		goto err_out_bus;
4009
4010	ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4011	if (ret)
4012		goto err_out_bus;
4013
4014	/* Everything's ready.  Announce the disk to the world. */
4015
4016	add_disk(rbd_dev->disk);
4017
4018	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4019		(unsigned long long) rbd_dev->mapping.size);
4020
4021	return ret;
4022err_out_bus:
4023	/* this will also clean up rest of rbd_dev stuff */
4024
4025	rbd_bus_del_dev(rbd_dev);
4026
4027	return ret;
4028err_out_disk:
4029	rbd_free_disk(rbd_dev);
4030err_out_blkdev:
4031	unregister_blkdev(rbd_dev->major, rbd_dev->name);
4032err_out_id:
4033	rbd_dev_id_put(rbd_dev);
4034err_out_snaps:
4035	rbd_remove_all_snaps(rbd_dev);
4036
4037	return ret;
4038}
4039
4040/*
4041 * Probe for the existence of the header object for the given rbd
4042 * device.  For format 2 images this includes determining the image
4043 * id.
4044 */
4045static int rbd_dev_probe(struct rbd_device *rbd_dev)
4046{
4047	int ret;
4048
4049	/*
4050	 * Get the id from the image id object.  If it's not a
4051	 * format 2 image, we'll get ENOENT back, and we'll assume
4052	 * it's a format 1 image.
4053	 */
4054	ret = rbd_dev_image_id(rbd_dev);
4055	if (ret)
4056		ret = rbd_dev_v1_probe(rbd_dev);
4057	else
4058		ret = rbd_dev_v2_probe(rbd_dev);
4059	if (ret) {
4060		dout("probe failed, returning %d\n", ret);
4061
4062		return ret;
4063	}
4064
4065	ret = rbd_dev_probe_finish(rbd_dev);
4066	if (ret)
4067		rbd_header_free(&rbd_dev->header);
4068
4069	return ret;
4070}
4071
4072static ssize_t rbd_add(struct bus_type *bus,
4073		       const char *buf,
4074		       size_t count)
4075{
4076	struct rbd_device *rbd_dev = NULL;
4077	struct ceph_options *ceph_opts = NULL;
4078	struct rbd_options *rbd_opts = NULL;
4079	struct rbd_spec *spec = NULL;
4080	struct rbd_client *rbdc;
4081	struct ceph_osd_client *osdc;
4082	int rc = -ENOMEM;
4083
4084	if (!try_module_get(THIS_MODULE))
4085		return -ENODEV;
4086
4087	/* parse add command */
4088	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4089	if (rc < 0)
4090		goto err_out_module;
4091
4092	rbdc = rbd_get_client(ceph_opts);
4093	if (IS_ERR(rbdc)) {
4094		rc = PTR_ERR(rbdc);
4095		goto err_out_args;
4096	}
4097	ceph_opts = NULL;	/* rbd_dev client now owns this */
4098
4099	/* pick the pool */
4100	osdc = &rbdc->client->osdc;
4101	rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4102	if (rc < 0)
4103		goto err_out_client;
4104	spec->pool_id = (u64) rc;
4105
4106	/* The ceph file layout needs to fit pool id in 32 bits */
4107
4108	if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4109		rc = -EIO;
4110		goto err_out_client;
4111	}
4112
4113	rbd_dev = rbd_dev_create(rbdc, spec);
4114	if (!rbd_dev)
4115		goto err_out_client;
4116	rbdc = NULL;		/* rbd_dev now owns this */
4117	spec = NULL;		/* rbd_dev now owns this */
4118
4119	rbd_dev->mapping.read_only = rbd_opts->read_only;
4120	kfree(rbd_opts);
4121	rbd_opts = NULL;	/* done with this */
4122
4123	rc = rbd_dev_probe(rbd_dev);
4124	if (rc < 0)
4125		goto err_out_rbd_dev;
4126
4127	return count;
4128err_out_rbd_dev:
4129	rbd_dev_destroy(rbd_dev);
4130err_out_client:
4131	rbd_put_client(rbdc);
4132err_out_args:
4133	if (ceph_opts)
4134		ceph_destroy_options(ceph_opts);
4135	kfree(rbd_opts);
4136	rbd_spec_put(spec);
4137err_out_module:
4138	module_put(THIS_MODULE);
4139
4140	dout("Error adding device %s\n", buf);
4141
4142	return (ssize_t) rc;
4143}
4144
4145static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4146{
4147	struct list_head *tmp;
4148	struct rbd_device *rbd_dev;
4149
4150	spin_lock(&rbd_dev_list_lock);
4151	list_for_each(tmp, &rbd_dev_list) {
4152		rbd_dev = list_entry(tmp, struct rbd_device, node);
4153		if (rbd_dev->dev_id == dev_id) {
4154			spin_unlock(&rbd_dev_list_lock);
4155			return rbd_dev;
4156		}
4157	}
4158	spin_unlock(&rbd_dev_list_lock);
4159	return NULL;
4160}
4161
4162static void rbd_dev_release(struct device *dev)
4163{
4164	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4165
4166	if (rbd_dev->watch_event)
4167		rbd_dev_header_watch_sync(rbd_dev, 0);
4168
4169	/* clean up and free blkdev */
4170	rbd_free_disk(rbd_dev);
4171	unregister_blkdev(rbd_dev->major, rbd_dev->name);
4172
4173	/* release allocated disk header fields */
4174	rbd_header_free(&rbd_dev->header);
4175
4176	/* done with the id, and with the rbd_dev */
4177	rbd_dev_id_put(rbd_dev);
4178	rbd_assert(rbd_dev->rbd_client != NULL);
4179	rbd_dev_destroy(rbd_dev);
4180
4181	/* release module ref */
4182	module_put(THIS_MODULE);
4183}
4184
4185static ssize_t rbd_remove(struct bus_type *bus,
4186			  const char *buf,
4187			  size_t count)
4188{
4189	struct rbd_device *rbd_dev = NULL;
4190	int target_id, rc;
4191	unsigned long ul;
4192	int ret = count;
4193
4194	rc = strict_strtoul(buf, 10, &ul);
4195	if (rc)
4196		return rc;
4197
4198	/* convert to int; abort if we lost anything in the conversion */
4199	target_id = (int) ul;
4200	if (target_id != ul)
4201		return -EINVAL;
4202
4203	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4204
4205	rbd_dev = __rbd_get_dev(target_id);
4206	if (!rbd_dev) {
4207		ret = -ENOENT;
4208		goto done;
4209	}
4210
4211	spin_lock_irq(&rbd_dev->lock);
4212	if (rbd_dev->open_count)
4213		ret = -EBUSY;
4214	else
4215		set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
4216	spin_unlock_irq(&rbd_dev->lock);
4217	if (ret < 0)
4218		goto done;
4219
4220	rbd_remove_all_snaps(rbd_dev);
4221	rbd_bus_del_dev(rbd_dev);
4222
4223done:
4224	mutex_unlock(&ctl_mutex);
4225
4226	return ret;
4227}
4228
4229/*
4230 * create control files in sysfs
4231 * /sys/bus/rbd/...
4232 */
4233static int rbd_sysfs_init(void)
4234{
4235	int ret;
4236
4237	ret = device_register(&rbd_root_dev);
4238	if (ret < 0)
4239		return ret;
4240
4241	ret = bus_register(&rbd_bus_type);
4242	if (ret < 0)
4243		device_unregister(&rbd_root_dev);
4244
4245	return ret;
4246}
4247
4248static void rbd_sysfs_cleanup(void)
4249{
4250	bus_unregister(&rbd_bus_type);
4251	device_unregister(&rbd_root_dev);
4252}
4253
4254static int __init rbd_init(void)
4255{
4256	int rc;
4257
4258	if (!libceph_compatible(NULL)) {
4259		rbd_warn(NULL, "libceph incompatibility (quitting)");
4260
4261		return -EINVAL;
4262	}
4263	rc = rbd_sysfs_init();
4264	if (rc)
4265		return rc;
4266	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
4267	return 0;
4268}
4269
4270static void __exit rbd_exit(void)
4271{
4272	rbd_sysfs_cleanup();
4273}
4274
4275module_init(rbd_init);
4276module_exit(rbd_exit);
4277
4278MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4279MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4280MODULE_DESCRIPTION("rados block device");
4281
4282/* following authorship retained from original osdblk.c */
4283MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4284
4285MODULE_LICENSE("GPL");
4286