rbd.c revision be466c1cc36621590ef17b05a6d342dfd33f7280
1/*
2   rbd.c -- Export ceph rados objects as a Linux block device
3
4
5   based on drivers/block/osdblk.c:
6
7   Copyright 2009 Red Hat, Inc.
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with this program; see the file COPYING.  If not, write to
20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24   For usage instructions, please refer to:
25
26                 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
34#include <linux/parser.h>
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44#define RBD_DEBUG	/* Activate rbd_assert() calls */
45
46/*
47 * The basic unit of block I/O is a sector.  It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes.  These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define	SECTOR_SHIFT	9
53#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
54
55/* It might be useful to have this defined elsewhere too */
56
57#define	U64_MAX	((u64) (~0ULL))
58
59#define RBD_DRV_NAME "rbd"
60#define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62#define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
63
64#define RBD_MAX_SNAP_NAME_LEN	32
65#define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
66#define RBD_MAX_OPT_LEN		1024
67
68#define RBD_SNAP_HEAD_NAME	"-"
69
70#define RBD_IMAGE_ID_LEN_MAX	64
71#define RBD_OBJ_PREFIX_LEN_MAX	64
72
73/* Feature bits */
74
75#define RBD_FEATURE_LAYERING      1
76
77/* Features supported by this (client software) implementation. */
78
79#define RBD_FEATURES_ALL          (0)
80
81/*
82 * An RBD device name will be "rbd#", where the "rbd" comes from
83 * RBD_DRV_NAME above, and # is a unique integer identifier.
84 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
85 * enough to hold all possible device names.
86 */
87#define DEV_NAME_LEN		32
88#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
89
90#define RBD_READ_ONLY_DEFAULT		false
91
92/*
93 * block device image metadata (in-memory version)
94 */
95struct rbd_image_header {
96	/* These four fields never change for a given rbd image */
97	char *object_prefix;
98	u64 features;
99	__u8 obj_order;
100	__u8 crypt_type;
101	__u8 comp_type;
102
103	/* The remaining fields need to be updated occasionally */
104	u64 image_size;
105	struct ceph_snap_context *snapc;
106	char *snap_names;
107	u64 *snap_sizes;
108
109	u64 obj_version;
110};
111
112struct rbd_options {
113	bool	read_only;
114};
115
116/*
117 * an instance of the client.  multiple devices may share an rbd client.
118 */
119struct rbd_client {
120	struct ceph_client	*client;
121	struct kref		kref;
122	struct list_head	node;
123};
124
125/*
126 * a request completion status
127 */
128struct rbd_req_status {
129	int done;
130	int rc;
131	u64 bytes;
132};
133
134/*
135 * a collection of requests
136 */
137struct rbd_req_coll {
138	int			total;
139	int			num_done;
140	struct kref		kref;
141	struct rbd_req_status	status[0];
142};
143
144/*
145 * a single io request
146 */
147struct rbd_request {
148	struct request		*rq;		/* blk layer request */
149	struct bio		*bio;		/* cloned bio */
150	struct page		**pages;	/* list of used pages */
151	u64			len;
152	int			coll_index;
153	struct rbd_req_coll	*coll;
154};
155
156struct rbd_snap {
157	struct	device		dev;
158	const char		*name;
159	u64			size;
160	struct list_head	node;
161	u64			id;
162	u64			features;
163};
164
165struct rbd_mapping {
166	char                    *snap_name;
167	u64                     snap_id;
168	u64                     size;
169	u64                     features;
170	bool                    snap_exists;
171	bool			read_only;
172};
173
174/*
175 * a single device
176 */
177struct rbd_device {
178	int			dev_id;		/* blkdev unique id */
179
180	int			major;		/* blkdev assigned major */
181	struct gendisk		*disk;		/* blkdev's gendisk and rq */
182
183	u32			image_format;	/* Either 1 or 2 */
184	struct rbd_options	rbd_opts;
185	struct rbd_client	*rbd_client;
186
187	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
188
189	spinlock_t		lock;		/* queue lock */
190
191	struct rbd_image_header	header;
192	char			*image_id;
193	size_t			image_id_len;
194	char			*image_name;
195	size_t			image_name_len;
196	char			*header_name;
197	char			*pool_name;
198	int			pool_id;
199
200	struct ceph_osd_event   *watch_event;
201	struct ceph_osd_request *watch_request;
202
203	/* protects updating the header */
204	struct rw_semaphore     header_rwsem;
205
206	struct rbd_mapping	mapping;
207
208	struct list_head	node;
209
210	/* list of snapshots */
211	struct list_head	snaps;
212
213	/* sysfs related */
214	struct device		dev;
215};
216
217static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
218
219static LIST_HEAD(rbd_dev_list);    /* devices */
220static DEFINE_SPINLOCK(rbd_dev_list_lock);
221
222static LIST_HEAD(rbd_client_list);		/* clients */
223static DEFINE_SPINLOCK(rbd_client_list_lock);
224
225static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
226static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
227
228static void rbd_dev_release(struct device *dev);
229static void __rbd_remove_snap_dev(struct rbd_snap *snap);
230
231static ssize_t rbd_add(struct bus_type *bus, const char *buf,
232		       size_t count);
233static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
234			  size_t count);
235
236static struct bus_attribute rbd_bus_attrs[] = {
237	__ATTR(add, S_IWUSR, NULL, rbd_add),
238	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
239	__ATTR_NULL
240};
241
242static struct bus_type rbd_bus_type = {
243	.name		= "rbd",
244	.bus_attrs	= rbd_bus_attrs,
245};
246
247static void rbd_root_dev_release(struct device *dev)
248{
249}
250
251static struct device rbd_root_dev = {
252	.init_name =    "rbd",
253	.release =      rbd_root_dev_release,
254};
255
256#ifdef RBD_DEBUG
257#define rbd_assert(expr)						\
258		if (unlikely(!(expr))) {				\
259			printk(KERN_ERR "\nAssertion failure in %s() "	\
260						"at line %d:\n\n"	\
261					"\trbd_assert(%s);\n\n",	\
262					__func__, __LINE__, #expr);	\
263			BUG();						\
264		}
265#else /* !RBD_DEBUG */
266#  define rbd_assert(expr)	((void) 0)
267#endif /* !RBD_DEBUG */
268
269static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
270{
271	return get_device(&rbd_dev->dev);
272}
273
274static void rbd_put_dev(struct rbd_device *rbd_dev)
275{
276	put_device(&rbd_dev->dev);
277}
278
279static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
280static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
281
282static int rbd_open(struct block_device *bdev, fmode_t mode)
283{
284	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
285
286	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
287		return -EROFS;
288
289	rbd_get_dev(rbd_dev);
290	set_device_ro(bdev, rbd_dev->mapping.read_only);
291
292	return 0;
293}
294
295static int rbd_release(struct gendisk *disk, fmode_t mode)
296{
297	struct rbd_device *rbd_dev = disk->private_data;
298
299	rbd_put_dev(rbd_dev);
300
301	return 0;
302}
303
304static const struct block_device_operations rbd_bd_ops = {
305	.owner			= THIS_MODULE,
306	.open			= rbd_open,
307	.release		= rbd_release,
308};
309
310/*
311 * Initialize an rbd client instance.
312 * We own *ceph_opts.
313 */
314static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
315{
316	struct rbd_client *rbdc;
317	int ret = -ENOMEM;
318
319	dout("rbd_client_create\n");
320	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
321	if (!rbdc)
322		goto out_opt;
323
324	kref_init(&rbdc->kref);
325	INIT_LIST_HEAD(&rbdc->node);
326
327	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
328
329	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
330	if (IS_ERR(rbdc->client))
331		goto out_mutex;
332	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
333
334	ret = ceph_open_session(rbdc->client);
335	if (ret < 0)
336		goto out_err;
337
338	spin_lock(&rbd_client_list_lock);
339	list_add_tail(&rbdc->node, &rbd_client_list);
340	spin_unlock(&rbd_client_list_lock);
341
342	mutex_unlock(&ctl_mutex);
343
344	dout("rbd_client_create created %p\n", rbdc);
345	return rbdc;
346
347out_err:
348	ceph_destroy_client(rbdc->client);
349out_mutex:
350	mutex_unlock(&ctl_mutex);
351	kfree(rbdc);
352out_opt:
353	if (ceph_opts)
354		ceph_destroy_options(ceph_opts);
355	return ERR_PTR(ret);
356}
357
358/*
359 * Find a ceph client with specific addr and configuration.  If
360 * found, bump its reference count.
361 */
362static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
363{
364	struct rbd_client *client_node;
365	bool found = false;
366
367	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
368		return NULL;
369
370	spin_lock(&rbd_client_list_lock);
371	list_for_each_entry(client_node, &rbd_client_list, node) {
372		if (!ceph_compare_options(ceph_opts, client_node->client)) {
373			kref_get(&client_node->kref);
374			found = true;
375			break;
376		}
377	}
378	spin_unlock(&rbd_client_list_lock);
379
380	return found ? client_node : NULL;
381}
382
383/*
384 * mount options
385 */
386enum {
387	Opt_last_int,
388	/* int args above */
389	Opt_last_string,
390	/* string args above */
391	Opt_read_only,
392	Opt_read_write,
393	/* Boolean args above */
394	Opt_last_bool,
395};
396
397static match_table_t rbd_opts_tokens = {
398	/* int args above */
399	/* string args above */
400	{Opt_read_only, "read_only"},
401	{Opt_read_only, "ro"},		/* Alternate spelling */
402	{Opt_read_write, "read_write"},
403	{Opt_read_write, "rw"},		/* Alternate spelling */
404	/* Boolean args above */
405	{-1, NULL}
406};
407
408static int parse_rbd_opts_token(char *c, void *private)
409{
410	struct rbd_options *rbd_opts = private;
411	substring_t argstr[MAX_OPT_ARGS];
412	int token, intval, ret;
413
414	token = match_token(c, rbd_opts_tokens, argstr);
415	if (token < 0)
416		return -EINVAL;
417
418	if (token < Opt_last_int) {
419		ret = match_int(&argstr[0], &intval);
420		if (ret < 0) {
421			pr_err("bad mount option arg (not int) "
422			       "at '%s'\n", c);
423			return ret;
424		}
425		dout("got int token %d val %d\n", token, intval);
426	} else if (token > Opt_last_int && token < Opt_last_string) {
427		dout("got string token %d val %s\n", token,
428		     argstr[0].from);
429	} else if (token > Opt_last_string && token < Opt_last_bool) {
430		dout("got Boolean token %d\n", token);
431	} else {
432		dout("got token %d\n", token);
433	}
434
435	switch (token) {
436	case Opt_read_only:
437		rbd_opts->read_only = true;
438		break;
439	case Opt_read_write:
440		rbd_opts->read_only = false;
441		break;
442	default:
443		rbd_assert(false);
444		break;
445	}
446	return 0;
447}
448
449/*
450 * Get a ceph client with specific addr and configuration, if one does
451 * not exist create it.
452 */
453static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
454				size_t mon_addr_len, char *options)
455{
456	struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
457	struct ceph_options *ceph_opts;
458	struct rbd_client *rbdc;
459
460	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
461
462	ceph_opts = ceph_parse_options(options, mon_addr,
463					mon_addr + mon_addr_len,
464					parse_rbd_opts_token, rbd_opts);
465	if (IS_ERR(ceph_opts))
466		return PTR_ERR(ceph_opts);
467
468	rbdc = rbd_client_find(ceph_opts);
469	if (rbdc) {
470		/* using an existing client */
471		ceph_destroy_options(ceph_opts);
472	} else {
473		rbdc = rbd_client_create(ceph_opts);
474		if (IS_ERR(rbdc))
475			return PTR_ERR(rbdc);
476	}
477	rbd_dev->rbd_client = rbdc;
478
479	return 0;
480}
481
482/*
483 * Destroy ceph client
484 *
485 * Caller must hold rbd_client_list_lock.
486 */
487static void rbd_client_release(struct kref *kref)
488{
489	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
490
491	dout("rbd_release_client %p\n", rbdc);
492	spin_lock(&rbd_client_list_lock);
493	list_del(&rbdc->node);
494	spin_unlock(&rbd_client_list_lock);
495
496	ceph_destroy_client(rbdc->client);
497	kfree(rbdc);
498}
499
500/*
501 * Drop reference to ceph client node. If it's not referenced anymore, release
502 * it.
503 */
504static void rbd_put_client(struct rbd_device *rbd_dev)
505{
506	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
507	rbd_dev->rbd_client = NULL;
508}
509
510/*
511 * Destroy requests collection
512 */
513static void rbd_coll_release(struct kref *kref)
514{
515	struct rbd_req_coll *coll =
516		container_of(kref, struct rbd_req_coll, kref);
517
518	dout("rbd_coll_release %p\n", coll);
519	kfree(coll);
520}
521
522static bool rbd_image_format_valid(u32 image_format)
523{
524	return image_format == 1 || image_format == 2;
525}
526
527static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
528{
529	size_t size;
530	u32 snap_count;
531
532	/* The header has to start with the magic rbd header text */
533	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
534		return false;
535
536	/*
537	 * The size of a snapshot header has to fit in a size_t, and
538	 * that limits the number of snapshots.
539	 */
540	snap_count = le32_to_cpu(ondisk->snap_count);
541	size = SIZE_MAX - sizeof (struct ceph_snap_context);
542	if (snap_count > size / sizeof (__le64))
543		return false;
544
545	/*
546	 * Not only that, but the size of the entire the snapshot
547	 * header must also be representable in a size_t.
548	 */
549	size -= snap_count * sizeof (__le64);
550	if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
551		return false;
552
553	return true;
554}
555
556/*
557 * Create a new header structure, translate header format from the on-disk
558 * header.
559 */
560static int rbd_header_from_disk(struct rbd_image_header *header,
561				 struct rbd_image_header_ondisk *ondisk)
562{
563	u32 snap_count;
564	size_t len;
565	size_t size;
566	u32 i;
567
568	memset(header, 0, sizeof (*header));
569
570	snap_count = le32_to_cpu(ondisk->snap_count);
571
572	len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
573	header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
574	if (!header->object_prefix)
575		return -ENOMEM;
576	memcpy(header->object_prefix, ondisk->object_prefix, len);
577	header->object_prefix[len] = '\0';
578
579	if (snap_count) {
580		u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
581
582		/* Save a copy of the snapshot names */
583
584		if (snap_names_len > (u64) SIZE_MAX)
585			return -EIO;
586		header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
587		if (!header->snap_names)
588			goto out_err;
589		/*
590		 * Note that rbd_dev_v1_header_read() guarantees
591		 * the ondisk buffer we're working with has
592		 * snap_names_len bytes beyond the end of the
593		 * snapshot id array, this memcpy() is safe.
594		 */
595		memcpy(header->snap_names, &ondisk->snaps[snap_count],
596			snap_names_len);
597
598		/* Record each snapshot's size */
599
600		size = snap_count * sizeof (*header->snap_sizes);
601		header->snap_sizes = kmalloc(size, GFP_KERNEL);
602		if (!header->snap_sizes)
603			goto out_err;
604		for (i = 0; i < snap_count; i++)
605			header->snap_sizes[i] =
606				le64_to_cpu(ondisk->snaps[i].image_size);
607	} else {
608		WARN_ON(ondisk->snap_names_len);
609		header->snap_names = NULL;
610		header->snap_sizes = NULL;
611	}
612
613	header->features = 0;	/* No features support in v1 images */
614	header->obj_order = ondisk->options.order;
615	header->crypt_type = ondisk->options.crypt_type;
616	header->comp_type = ondisk->options.comp_type;
617
618	/* Allocate and fill in the snapshot context */
619
620	header->image_size = le64_to_cpu(ondisk->image_size);
621	size = sizeof (struct ceph_snap_context);
622	size += snap_count * sizeof (header->snapc->snaps[0]);
623	header->snapc = kzalloc(size, GFP_KERNEL);
624	if (!header->snapc)
625		goto out_err;
626
627	atomic_set(&header->snapc->nref, 1);
628	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
629	header->snapc->num_snaps = snap_count;
630	for (i = 0; i < snap_count; i++)
631		header->snapc->snaps[i] =
632			le64_to_cpu(ondisk->snaps[i].id);
633
634	return 0;
635
636out_err:
637	kfree(header->snap_sizes);
638	header->snap_sizes = NULL;
639	kfree(header->snap_names);
640	header->snap_names = NULL;
641	kfree(header->object_prefix);
642	header->object_prefix = NULL;
643
644	return -ENOMEM;
645}
646
647static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
648{
649
650	struct rbd_snap *snap;
651
652	list_for_each_entry(snap, &rbd_dev->snaps, node) {
653		if (!strcmp(snap_name, snap->name)) {
654			rbd_dev->mapping.snap_id = snap->id;
655			rbd_dev->mapping.size = snap->size;
656			rbd_dev->mapping.features = snap->features;
657
658			return 0;
659		}
660	}
661
662	return -ENOENT;
663}
664
665static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
666{
667	int ret;
668
669	if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
670		    sizeof (RBD_SNAP_HEAD_NAME))) {
671		rbd_dev->mapping.snap_id = CEPH_NOSNAP;
672		rbd_dev->mapping.size = rbd_dev->header.image_size;
673		rbd_dev->mapping.features = rbd_dev->header.features;
674		rbd_dev->mapping.snap_exists = false;
675		rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
676		ret = 0;
677	} else {
678		ret = snap_by_name(rbd_dev, snap_name);
679		if (ret < 0)
680			goto done;
681		rbd_dev->mapping.snap_exists = true;
682		rbd_dev->mapping.read_only = true;
683	}
684	rbd_dev->mapping.snap_name = snap_name;
685done:
686	return ret;
687}
688
689static void rbd_header_free(struct rbd_image_header *header)
690{
691	kfree(header->object_prefix);
692	header->object_prefix = NULL;
693	kfree(header->snap_sizes);
694	header->snap_sizes = NULL;
695	kfree(header->snap_names);
696	header->snap_names = NULL;
697	ceph_put_snap_context(header->snapc);
698	header->snapc = NULL;
699}
700
701static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
702{
703	char *name;
704	u64 segment;
705	int ret;
706
707	name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
708	if (!name)
709		return NULL;
710	segment = offset >> rbd_dev->header.obj_order;
711	ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
712			rbd_dev->header.object_prefix, segment);
713	if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
714		pr_err("error formatting segment name for #%llu (%d)\n",
715			segment, ret);
716		kfree(name);
717		name = NULL;
718	}
719
720	return name;
721}
722
723static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
724{
725	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
726
727	return offset & (segment_size - 1);
728}
729
730static u64 rbd_segment_length(struct rbd_device *rbd_dev,
731				u64 offset, u64 length)
732{
733	u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
734
735	offset &= segment_size - 1;
736
737	rbd_assert(length <= U64_MAX - offset);
738	if (offset + length > segment_size)
739		length = segment_size - offset;
740
741	return length;
742}
743
744static int rbd_get_num_segments(struct rbd_image_header *header,
745				u64 ofs, u64 len)
746{
747	u64 start_seg;
748	u64 end_seg;
749
750	if (!len)
751		return 0;
752	if (len - 1 > U64_MAX - ofs)
753		return -ERANGE;
754
755	start_seg = ofs >> header->obj_order;
756	end_seg = (ofs + len - 1) >> header->obj_order;
757
758	return end_seg - start_seg + 1;
759}
760
761/*
762 * returns the size of an object in the image
763 */
764static u64 rbd_obj_bytes(struct rbd_image_header *header)
765{
766	return 1 << header->obj_order;
767}
768
769/*
770 * bio helpers
771 */
772
773static void bio_chain_put(struct bio *chain)
774{
775	struct bio *tmp;
776
777	while (chain) {
778		tmp = chain;
779		chain = chain->bi_next;
780		bio_put(tmp);
781	}
782}
783
784/*
785 * zeros a bio chain, starting at specific offset
786 */
787static void zero_bio_chain(struct bio *chain, int start_ofs)
788{
789	struct bio_vec *bv;
790	unsigned long flags;
791	void *buf;
792	int i;
793	int pos = 0;
794
795	while (chain) {
796		bio_for_each_segment(bv, chain, i) {
797			if (pos + bv->bv_len > start_ofs) {
798				int remainder = max(start_ofs - pos, 0);
799				buf = bvec_kmap_irq(bv, &flags);
800				memset(buf + remainder, 0,
801				       bv->bv_len - remainder);
802				bvec_kunmap_irq(buf, &flags);
803			}
804			pos += bv->bv_len;
805		}
806
807		chain = chain->bi_next;
808	}
809}
810
811/*
812 * bio_chain_clone - clone a chain of bios up to a certain length.
813 * might return a bio_pair that will need to be released.
814 */
815static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
816				   struct bio_pair **bp,
817				   int len, gfp_t gfpmask)
818{
819	struct bio *old_chain = *old;
820	struct bio *new_chain = NULL;
821	struct bio *tail;
822	int total = 0;
823
824	if (*bp) {
825		bio_pair_release(*bp);
826		*bp = NULL;
827	}
828
829	while (old_chain && (total < len)) {
830		struct bio *tmp;
831
832		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
833		if (!tmp)
834			goto err_out;
835		gfpmask &= ~__GFP_WAIT;	/* can't wait after the first */
836
837		if (total + old_chain->bi_size > len) {
838			struct bio_pair *bp;
839
840			/*
841			 * this split can only happen with a single paged bio,
842			 * split_bio will BUG_ON if this is not the case
843			 */
844			dout("bio_chain_clone split! total=%d remaining=%d"
845			     "bi_size=%u\n",
846			     total, len - total, old_chain->bi_size);
847
848			/* split the bio. We'll release it either in the next
849			   call, or it will have to be released outside */
850			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
851			if (!bp)
852				goto err_out;
853
854			__bio_clone(tmp, &bp->bio1);
855
856			*next = &bp->bio2;
857		} else {
858			__bio_clone(tmp, old_chain);
859			*next = old_chain->bi_next;
860		}
861
862		tmp->bi_bdev = NULL;
863		tmp->bi_next = NULL;
864		if (new_chain)
865			tail->bi_next = tmp;
866		else
867			new_chain = tmp;
868		tail = tmp;
869		old_chain = old_chain->bi_next;
870
871		total += tmp->bi_size;
872	}
873
874	rbd_assert(total == len);
875
876	*old = old_chain;
877
878	return new_chain;
879
880err_out:
881	dout("bio_chain_clone with err\n");
882	bio_chain_put(new_chain);
883	return NULL;
884}
885
886/*
887 * helpers for osd request op vectors.
888 */
889static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
890					int opcode, u32 payload_len)
891{
892	struct ceph_osd_req_op *ops;
893
894	ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
895	if (!ops)
896		return NULL;
897
898	ops[0].op = opcode;
899
900	/*
901	 * op extent offset and length will be set later on
902	 * in calc_raw_layout()
903	 */
904	ops[0].payload_len = payload_len;
905
906	return ops;
907}
908
909static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
910{
911	kfree(ops);
912}
913
914static void rbd_coll_end_req_index(struct request *rq,
915				   struct rbd_req_coll *coll,
916				   int index,
917				   int ret, u64 len)
918{
919	struct request_queue *q;
920	int min, max, i;
921
922	dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
923	     coll, index, ret, (unsigned long long) len);
924
925	if (!rq)
926		return;
927
928	if (!coll) {
929		blk_end_request(rq, ret, len);
930		return;
931	}
932
933	q = rq->q;
934
935	spin_lock_irq(q->queue_lock);
936	coll->status[index].done = 1;
937	coll->status[index].rc = ret;
938	coll->status[index].bytes = len;
939	max = min = coll->num_done;
940	while (max < coll->total && coll->status[max].done)
941		max++;
942
943	for (i = min; i<max; i++) {
944		__blk_end_request(rq, coll->status[i].rc,
945				  coll->status[i].bytes);
946		coll->num_done++;
947		kref_put(&coll->kref, rbd_coll_release);
948	}
949	spin_unlock_irq(q->queue_lock);
950}
951
952static void rbd_coll_end_req(struct rbd_request *req,
953			     int ret, u64 len)
954{
955	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
956}
957
958/*
959 * Send ceph osd request
960 */
961static int rbd_do_request(struct request *rq,
962			  struct rbd_device *rbd_dev,
963			  struct ceph_snap_context *snapc,
964			  u64 snapid,
965			  const char *object_name, u64 ofs, u64 len,
966			  struct bio *bio,
967			  struct page **pages,
968			  int num_pages,
969			  int flags,
970			  struct ceph_osd_req_op *ops,
971			  struct rbd_req_coll *coll,
972			  int coll_index,
973			  void (*rbd_cb)(struct ceph_osd_request *req,
974					 struct ceph_msg *msg),
975			  struct ceph_osd_request **linger_req,
976			  u64 *ver)
977{
978	struct ceph_osd_request *req;
979	struct ceph_file_layout *layout;
980	int ret;
981	u64 bno;
982	struct timespec mtime = CURRENT_TIME;
983	struct rbd_request *req_data;
984	struct ceph_osd_request_head *reqhead;
985	struct ceph_osd_client *osdc;
986
987	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
988	if (!req_data) {
989		if (coll)
990			rbd_coll_end_req_index(rq, coll, coll_index,
991					       -ENOMEM, len);
992		return -ENOMEM;
993	}
994
995	if (coll) {
996		req_data->coll = coll;
997		req_data->coll_index = coll_index;
998	}
999
1000	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
1001		(unsigned long long) ofs, (unsigned long long) len);
1002
1003	osdc = &rbd_dev->rbd_client->client->osdc;
1004	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1005					false, GFP_NOIO, pages, bio);
1006	if (!req) {
1007		ret = -ENOMEM;
1008		goto done_pages;
1009	}
1010
1011	req->r_callback = rbd_cb;
1012
1013	req_data->rq = rq;
1014	req_data->bio = bio;
1015	req_data->pages = pages;
1016	req_data->len = len;
1017
1018	req->r_priv = req_data;
1019
1020	reqhead = req->r_request->front.iov_base;
1021	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1022
1023	strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1024	req->r_oid_len = strlen(req->r_oid);
1025
1026	layout = &req->r_file_layout;
1027	memset(layout, 0, sizeof(*layout));
1028	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1029	layout->fl_stripe_count = cpu_to_le32(1);
1030	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1031	layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1032	ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1033				   req, ops);
1034	rbd_assert(ret == 0);
1035
1036	ceph_osdc_build_request(req, ofs, &len,
1037				ops,
1038				snapc,
1039				&mtime,
1040				req->r_oid, req->r_oid_len);
1041
1042	if (linger_req) {
1043		ceph_osdc_set_request_linger(osdc, req);
1044		*linger_req = req;
1045	}
1046
1047	ret = ceph_osdc_start_request(osdc, req, false);
1048	if (ret < 0)
1049		goto done_err;
1050
1051	if (!rbd_cb) {
1052		ret = ceph_osdc_wait_request(osdc, req);
1053		if (ver)
1054			*ver = le64_to_cpu(req->r_reassert_version.version);
1055		dout("reassert_ver=%llu\n",
1056			(unsigned long long)
1057				le64_to_cpu(req->r_reassert_version.version));
1058		ceph_osdc_put_request(req);
1059	}
1060	return ret;
1061
1062done_err:
1063	bio_chain_put(req_data->bio);
1064	ceph_osdc_put_request(req);
1065done_pages:
1066	rbd_coll_end_req(req_data, ret, len);
1067	kfree(req_data);
1068	return ret;
1069}
1070
1071/*
1072 * Ceph osd op callback
1073 */
1074static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1075{
1076	struct rbd_request *req_data = req->r_priv;
1077	struct ceph_osd_reply_head *replyhead;
1078	struct ceph_osd_op *op;
1079	__s32 rc;
1080	u64 bytes;
1081	int read_op;
1082
1083	/* parse reply */
1084	replyhead = msg->front.iov_base;
1085	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1086	op = (void *)(replyhead + 1);
1087	rc = le32_to_cpu(replyhead->result);
1088	bytes = le64_to_cpu(op->extent.length);
1089	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1090
1091	dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1092		(unsigned long long) bytes, read_op, (int) rc);
1093
1094	if (rc == -ENOENT && read_op) {
1095		zero_bio_chain(req_data->bio, 0);
1096		rc = 0;
1097	} else if (rc == 0 && read_op && bytes < req_data->len) {
1098		zero_bio_chain(req_data->bio, bytes);
1099		bytes = req_data->len;
1100	}
1101
1102	rbd_coll_end_req(req_data, rc, bytes);
1103
1104	if (req_data->bio)
1105		bio_chain_put(req_data->bio);
1106
1107	ceph_osdc_put_request(req);
1108	kfree(req_data);
1109}
1110
1111static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1112{
1113	ceph_osdc_put_request(req);
1114}
1115
1116/*
1117 * Do a synchronous ceph osd operation
1118 */
1119static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1120			   struct ceph_snap_context *snapc,
1121			   u64 snapid,
1122			   int flags,
1123			   struct ceph_osd_req_op *ops,
1124			   const char *object_name,
1125			   u64 ofs, u64 inbound_size,
1126			   char *inbound,
1127			   struct ceph_osd_request **linger_req,
1128			   u64 *ver)
1129{
1130	int ret;
1131	struct page **pages;
1132	int num_pages;
1133
1134	rbd_assert(ops != NULL);
1135
1136	num_pages = calc_pages_for(ofs, inbound_size);
1137	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1138	if (IS_ERR(pages))
1139		return PTR_ERR(pages);
1140
1141	ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1142			  object_name, ofs, inbound_size, NULL,
1143			  pages, num_pages,
1144			  flags,
1145			  ops,
1146			  NULL, 0,
1147			  NULL,
1148			  linger_req, ver);
1149	if (ret < 0)
1150		goto done;
1151
1152	if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1153		ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1154
1155done:
1156	ceph_release_page_vector(pages, num_pages);
1157	return ret;
1158}
1159
1160/*
1161 * Do an asynchronous ceph osd operation
1162 */
1163static int rbd_do_op(struct request *rq,
1164		     struct rbd_device *rbd_dev,
1165		     struct ceph_snap_context *snapc,
1166		     u64 snapid,
1167		     int opcode, int flags,
1168		     u64 ofs, u64 len,
1169		     struct bio *bio,
1170		     struct rbd_req_coll *coll,
1171		     int coll_index)
1172{
1173	char *seg_name;
1174	u64 seg_ofs;
1175	u64 seg_len;
1176	int ret;
1177	struct ceph_osd_req_op *ops;
1178	u32 payload_len;
1179
1180	seg_name = rbd_segment_name(rbd_dev, ofs);
1181	if (!seg_name)
1182		return -ENOMEM;
1183	seg_len = rbd_segment_length(rbd_dev, ofs, len);
1184	seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1185
1186	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1187
1188	ret = -ENOMEM;
1189	ops = rbd_create_rw_ops(1, opcode, payload_len);
1190	if (!ops)
1191		goto done;
1192
1193	/* we've taken care of segment sizes earlier when we
1194	   cloned the bios. We should never have a segment
1195	   truncated at this point */
1196	rbd_assert(seg_len == len);
1197
1198	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1199			     seg_name, seg_ofs, seg_len,
1200			     bio,
1201			     NULL, 0,
1202			     flags,
1203			     ops,
1204			     coll, coll_index,
1205			     rbd_req_cb, 0, NULL);
1206
1207	rbd_destroy_ops(ops);
1208done:
1209	kfree(seg_name);
1210	return ret;
1211}
1212
1213/*
1214 * Request async osd write
1215 */
1216static int rbd_req_write(struct request *rq,
1217			 struct rbd_device *rbd_dev,
1218			 struct ceph_snap_context *snapc,
1219			 u64 ofs, u64 len,
1220			 struct bio *bio,
1221			 struct rbd_req_coll *coll,
1222			 int coll_index)
1223{
1224	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1225			 CEPH_OSD_OP_WRITE,
1226			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1227			 ofs, len, bio, coll, coll_index);
1228}
1229
1230/*
1231 * Request async osd read
1232 */
1233static int rbd_req_read(struct request *rq,
1234			 struct rbd_device *rbd_dev,
1235			 u64 snapid,
1236			 u64 ofs, u64 len,
1237			 struct bio *bio,
1238			 struct rbd_req_coll *coll,
1239			 int coll_index)
1240{
1241	return rbd_do_op(rq, rbd_dev, NULL,
1242			 snapid,
1243			 CEPH_OSD_OP_READ,
1244			 CEPH_OSD_FLAG_READ,
1245			 ofs, len, bio, coll, coll_index);
1246}
1247
1248/*
1249 * Request sync osd read
1250 */
1251static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1252			  u64 snapid,
1253			  const char *object_name,
1254			  u64 ofs, u64 len,
1255			  char *buf,
1256			  u64 *ver)
1257{
1258	struct ceph_osd_req_op *ops;
1259	int ret;
1260
1261	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1262	if (!ops)
1263		return -ENOMEM;
1264
1265	ret = rbd_req_sync_op(rbd_dev, NULL,
1266			       snapid,
1267			       CEPH_OSD_FLAG_READ,
1268			       ops, object_name, ofs, len, buf, NULL, ver);
1269	rbd_destroy_ops(ops);
1270
1271	return ret;
1272}
1273
1274/*
1275 * Request sync osd watch
1276 */
1277static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1278				   u64 ver,
1279				   u64 notify_id)
1280{
1281	struct ceph_osd_req_op *ops;
1282	int ret;
1283
1284	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1285	if (!ops)
1286		return -ENOMEM;
1287
1288	ops[0].watch.ver = cpu_to_le64(ver);
1289	ops[0].watch.cookie = notify_id;
1290	ops[0].watch.flag = 0;
1291
1292	ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1293			  rbd_dev->header_name, 0, 0, NULL,
1294			  NULL, 0,
1295			  CEPH_OSD_FLAG_READ,
1296			  ops,
1297			  NULL, 0,
1298			  rbd_simple_req_cb, 0, NULL);
1299
1300	rbd_destroy_ops(ops);
1301	return ret;
1302}
1303
1304static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1305{
1306	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1307	u64 hver;
1308	int rc;
1309
1310	if (!rbd_dev)
1311		return;
1312
1313	dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1314		rbd_dev->header_name, (unsigned long long) notify_id,
1315		(unsigned int) opcode);
1316	rc = rbd_dev_refresh(rbd_dev, &hver);
1317	if (rc)
1318		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1319			   " update snaps: %d\n", rbd_dev->major, rc);
1320
1321	rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1322}
1323
1324/*
1325 * Request sync osd watch
1326 */
1327static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1328{
1329	struct ceph_osd_req_op *ops;
1330	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1331	int ret;
1332
1333	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1334	if (!ops)
1335		return -ENOMEM;
1336
1337	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1338				     (void *)rbd_dev, &rbd_dev->watch_event);
1339	if (ret < 0)
1340		goto fail;
1341
1342	ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1343	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1344	ops[0].watch.flag = 1;
1345
1346	ret = rbd_req_sync_op(rbd_dev, NULL,
1347			      CEPH_NOSNAP,
1348			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1349			      ops,
1350			      rbd_dev->header_name,
1351			      0, 0, NULL,
1352			      &rbd_dev->watch_request, NULL);
1353
1354	if (ret < 0)
1355		goto fail_event;
1356
1357	rbd_destroy_ops(ops);
1358	return 0;
1359
1360fail_event:
1361	ceph_osdc_cancel_event(rbd_dev->watch_event);
1362	rbd_dev->watch_event = NULL;
1363fail:
1364	rbd_destroy_ops(ops);
1365	return ret;
1366}
1367
1368/*
1369 * Request sync osd unwatch
1370 */
1371static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1372{
1373	struct ceph_osd_req_op *ops;
1374	int ret;
1375
1376	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1377	if (!ops)
1378		return -ENOMEM;
1379
1380	ops[0].watch.ver = 0;
1381	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1382	ops[0].watch.flag = 0;
1383
1384	ret = rbd_req_sync_op(rbd_dev, NULL,
1385			      CEPH_NOSNAP,
1386			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1387			      ops,
1388			      rbd_dev->header_name,
1389			      0, 0, NULL, NULL, NULL);
1390
1391
1392	rbd_destroy_ops(ops);
1393	ceph_osdc_cancel_event(rbd_dev->watch_event);
1394	rbd_dev->watch_event = NULL;
1395	return ret;
1396}
1397
1398/*
1399 * Synchronous osd object method call
1400 */
1401static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1402			     const char *object_name,
1403			     const char *class_name,
1404			     const char *method_name,
1405			     const char *outbound,
1406			     size_t outbound_size,
1407			     char *inbound,
1408			     size_t inbound_size,
1409			     int flags,
1410			     u64 *ver)
1411{
1412	struct ceph_osd_req_op *ops;
1413	int class_name_len = strlen(class_name);
1414	int method_name_len = strlen(method_name);
1415	int payload_size;
1416	int ret;
1417
1418	/*
1419	 * Any input parameters required by the method we're calling
1420	 * will be sent along with the class and method names as
1421	 * part of the message payload.  That data and its size are
1422	 * supplied via the indata and indata_len fields (named from
1423	 * the perspective of the server side) in the OSD request
1424	 * operation.
1425	 */
1426	payload_size = class_name_len + method_name_len + outbound_size;
1427	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1428	if (!ops)
1429		return -ENOMEM;
1430
1431	ops[0].cls.class_name = class_name;
1432	ops[0].cls.class_len = (__u8) class_name_len;
1433	ops[0].cls.method_name = method_name;
1434	ops[0].cls.method_len = (__u8) method_name_len;
1435	ops[0].cls.argc = 0;
1436	ops[0].cls.indata = outbound;
1437	ops[0].cls.indata_len = outbound_size;
1438
1439	ret = rbd_req_sync_op(rbd_dev, NULL,
1440			       CEPH_NOSNAP,
1441			       flags, ops,
1442			       object_name, 0, inbound_size, inbound,
1443			       NULL, ver);
1444
1445	rbd_destroy_ops(ops);
1446
1447	dout("cls_exec returned %d\n", ret);
1448	return ret;
1449}
1450
1451static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1452{
1453	struct rbd_req_coll *coll =
1454			kzalloc(sizeof(struct rbd_req_coll) +
1455			        sizeof(struct rbd_req_status) * num_reqs,
1456				GFP_ATOMIC);
1457
1458	if (!coll)
1459		return NULL;
1460	coll->total = num_reqs;
1461	kref_init(&coll->kref);
1462	return coll;
1463}
1464
1465/*
1466 * block device queue callback
1467 */
1468static void rbd_rq_fn(struct request_queue *q)
1469{
1470	struct rbd_device *rbd_dev = q->queuedata;
1471	struct request *rq;
1472	struct bio_pair *bp = NULL;
1473
1474	while ((rq = blk_fetch_request(q))) {
1475		struct bio *bio;
1476		struct bio *rq_bio, *next_bio = NULL;
1477		bool do_write;
1478		unsigned int size;
1479		u64 op_size = 0;
1480		u64 ofs;
1481		int num_segs, cur_seg = 0;
1482		struct rbd_req_coll *coll;
1483		struct ceph_snap_context *snapc;
1484
1485		dout("fetched request\n");
1486
1487		/* filter out block requests we don't understand */
1488		if ((rq->cmd_type != REQ_TYPE_FS)) {
1489			__blk_end_request_all(rq, 0);
1490			continue;
1491		}
1492
1493		/* deduce our operation (read, write) */
1494		do_write = (rq_data_dir(rq) == WRITE);
1495
1496		size = blk_rq_bytes(rq);
1497		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1498		rq_bio = rq->bio;
1499		if (do_write && rbd_dev->mapping.read_only) {
1500			__blk_end_request_all(rq, -EROFS);
1501			continue;
1502		}
1503
1504		spin_unlock_irq(q->queue_lock);
1505
1506		down_read(&rbd_dev->header_rwsem);
1507
1508		if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1509				!rbd_dev->mapping.snap_exists) {
1510			up_read(&rbd_dev->header_rwsem);
1511			dout("request for non-existent snapshot");
1512			spin_lock_irq(q->queue_lock);
1513			__blk_end_request_all(rq, -ENXIO);
1514			continue;
1515		}
1516
1517		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1518
1519		up_read(&rbd_dev->header_rwsem);
1520
1521		dout("%s 0x%x bytes at 0x%llx\n",
1522		     do_write ? "write" : "read",
1523		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1524
1525		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1526		if (num_segs <= 0) {
1527			spin_lock_irq(q->queue_lock);
1528			__blk_end_request_all(rq, num_segs);
1529			ceph_put_snap_context(snapc);
1530			continue;
1531		}
1532		coll = rbd_alloc_coll(num_segs);
1533		if (!coll) {
1534			spin_lock_irq(q->queue_lock);
1535			__blk_end_request_all(rq, -ENOMEM);
1536			ceph_put_snap_context(snapc);
1537			continue;
1538		}
1539
1540		do {
1541			/* a bio clone to be passed down to OSD req */
1542			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1543			op_size = rbd_segment_length(rbd_dev, ofs, size);
1544			kref_get(&coll->kref);
1545			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1546					      op_size, GFP_ATOMIC);
1547			if (!bio) {
1548				rbd_coll_end_req_index(rq, coll, cur_seg,
1549						       -ENOMEM, op_size);
1550				goto next_seg;
1551			}
1552
1553
1554			/* init OSD command: write or read */
1555			if (do_write)
1556				rbd_req_write(rq, rbd_dev,
1557					      snapc,
1558					      ofs,
1559					      op_size, bio,
1560					      coll, cur_seg);
1561			else
1562				rbd_req_read(rq, rbd_dev,
1563					     rbd_dev->mapping.snap_id,
1564					     ofs,
1565					     op_size, bio,
1566					     coll, cur_seg);
1567
1568next_seg:
1569			size -= op_size;
1570			ofs += op_size;
1571
1572			cur_seg++;
1573			rq_bio = next_bio;
1574		} while (size > 0);
1575		kref_put(&coll->kref, rbd_coll_release);
1576
1577		if (bp)
1578			bio_pair_release(bp);
1579		spin_lock_irq(q->queue_lock);
1580
1581		ceph_put_snap_context(snapc);
1582	}
1583}
1584
1585/*
1586 * a queue callback. Makes sure that we don't create a bio that spans across
1587 * multiple osd objects. One exception would be with a single page bios,
1588 * which we handle later at bio_chain_clone
1589 */
1590static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1591			  struct bio_vec *bvec)
1592{
1593	struct rbd_device *rbd_dev = q->queuedata;
1594	unsigned int chunk_sectors;
1595	sector_t sector;
1596	unsigned int bio_sectors;
1597	int max;
1598
1599	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1600	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1601	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1602
1603	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1604				 + bio_sectors)) << SECTOR_SHIFT;
1605	if (max < 0)
1606		max = 0; /* bio_add cannot handle a negative return */
1607	if (max <= bvec->bv_len && bio_sectors == 0)
1608		return bvec->bv_len;
1609	return max;
1610}
1611
1612static void rbd_free_disk(struct rbd_device *rbd_dev)
1613{
1614	struct gendisk *disk = rbd_dev->disk;
1615
1616	if (!disk)
1617		return;
1618
1619	if (disk->flags & GENHD_FL_UP)
1620		del_gendisk(disk);
1621	if (disk->queue)
1622		blk_cleanup_queue(disk->queue);
1623	put_disk(disk);
1624}
1625
1626/*
1627 * Read the complete header for the given rbd device.
1628 *
1629 * Returns a pointer to a dynamically-allocated buffer containing
1630 * the complete and validated header.  Caller can pass the address
1631 * of a variable that will be filled in with the version of the
1632 * header object at the time it was read.
1633 *
1634 * Returns a pointer-coded errno if a failure occurs.
1635 */
1636static struct rbd_image_header_ondisk *
1637rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1638{
1639	struct rbd_image_header_ondisk *ondisk = NULL;
1640	u32 snap_count = 0;
1641	u64 names_size = 0;
1642	u32 want_count;
1643	int ret;
1644
1645	/*
1646	 * The complete header will include an array of its 64-bit
1647	 * snapshot ids, followed by the names of those snapshots as
1648	 * a contiguous block of NUL-terminated strings.  Note that
1649	 * the number of snapshots could change by the time we read
1650	 * it in, in which case we re-read it.
1651	 */
1652	do {
1653		size_t size;
1654
1655		kfree(ondisk);
1656
1657		size = sizeof (*ondisk);
1658		size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1659		size += names_size;
1660		ondisk = kmalloc(size, GFP_KERNEL);
1661		if (!ondisk)
1662			return ERR_PTR(-ENOMEM);
1663
1664		ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1665				       rbd_dev->header_name,
1666				       0, size,
1667				       (char *) ondisk, version);
1668
1669		if (ret < 0)
1670			goto out_err;
1671		if (WARN_ON((size_t) ret < size)) {
1672			ret = -ENXIO;
1673			pr_warning("short header read for image %s"
1674					" (want %zd got %d)\n",
1675				rbd_dev->image_name, size, ret);
1676			goto out_err;
1677		}
1678		if (!rbd_dev_ondisk_valid(ondisk)) {
1679			ret = -ENXIO;
1680			pr_warning("invalid header for image %s\n",
1681				rbd_dev->image_name);
1682			goto out_err;
1683		}
1684
1685		names_size = le64_to_cpu(ondisk->snap_names_len);
1686		want_count = snap_count;
1687		snap_count = le32_to_cpu(ondisk->snap_count);
1688	} while (snap_count != want_count);
1689
1690	return ondisk;
1691
1692out_err:
1693	kfree(ondisk);
1694
1695	return ERR_PTR(ret);
1696}
1697
1698/*
1699 * reload the ondisk the header
1700 */
1701static int rbd_read_header(struct rbd_device *rbd_dev,
1702			   struct rbd_image_header *header)
1703{
1704	struct rbd_image_header_ondisk *ondisk;
1705	u64 ver = 0;
1706	int ret;
1707
1708	ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1709	if (IS_ERR(ondisk))
1710		return PTR_ERR(ondisk);
1711	ret = rbd_header_from_disk(header, ondisk);
1712	if (ret >= 0)
1713		header->obj_version = ver;
1714	kfree(ondisk);
1715
1716	return ret;
1717}
1718
1719static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1720{
1721	struct rbd_snap *snap;
1722	struct rbd_snap *next;
1723
1724	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1725		__rbd_remove_snap_dev(snap);
1726}
1727
1728static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1729{
1730	sector_t size;
1731
1732	if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1733		return;
1734
1735	size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1736	dout("setting size to %llu sectors", (unsigned long long) size);
1737	rbd_dev->mapping.size = (u64) size;
1738	set_capacity(rbd_dev->disk, size);
1739}
1740
1741/*
1742 * only read the first part of the ondisk header, without the snaps info
1743 */
1744static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1745{
1746	int ret;
1747	struct rbd_image_header h;
1748
1749	ret = rbd_read_header(rbd_dev, &h);
1750	if (ret < 0)
1751		return ret;
1752
1753	down_write(&rbd_dev->header_rwsem);
1754
1755	/* Update image size, and check for resize of mapped image */
1756	rbd_dev->header.image_size = h.image_size;
1757	rbd_update_mapping_size(rbd_dev);
1758
1759	/* rbd_dev->header.object_prefix shouldn't change */
1760	kfree(rbd_dev->header.snap_sizes);
1761	kfree(rbd_dev->header.snap_names);
1762	/* osd requests may still refer to snapc */
1763	ceph_put_snap_context(rbd_dev->header.snapc);
1764
1765	if (hver)
1766		*hver = h.obj_version;
1767	rbd_dev->header.obj_version = h.obj_version;
1768	rbd_dev->header.image_size = h.image_size;
1769	rbd_dev->header.snapc = h.snapc;
1770	rbd_dev->header.snap_names = h.snap_names;
1771	rbd_dev->header.snap_sizes = h.snap_sizes;
1772	/* Free the extra copy of the object prefix */
1773	WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1774	kfree(h.object_prefix);
1775
1776	ret = rbd_dev_snaps_update(rbd_dev);
1777	if (!ret)
1778		ret = rbd_dev_snaps_register(rbd_dev);
1779
1780	up_write(&rbd_dev->header_rwsem);
1781
1782	return ret;
1783}
1784
1785static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1786{
1787	int ret;
1788
1789	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1790	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1791	if (rbd_dev->image_format == 1)
1792		ret = rbd_dev_v1_refresh(rbd_dev, hver);
1793	else
1794		ret = rbd_dev_v2_refresh(rbd_dev, hver);
1795	mutex_unlock(&ctl_mutex);
1796
1797	return ret;
1798}
1799
1800static int rbd_init_disk(struct rbd_device *rbd_dev)
1801{
1802	struct gendisk *disk;
1803	struct request_queue *q;
1804	u64 segment_size;
1805
1806	/* create gendisk info */
1807	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1808	if (!disk)
1809		return -ENOMEM;
1810
1811	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1812		 rbd_dev->dev_id);
1813	disk->major = rbd_dev->major;
1814	disk->first_minor = 0;
1815	disk->fops = &rbd_bd_ops;
1816	disk->private_data = rbd_dev;
1817
1818	/* init rq */
1819	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1820	if (!q)
1821		goto out_disk;
1822
1823	/* We use the default size, but let's be explicit about it. */
1824	blk_queue_physical_block_size(q, SECTOR_SIZE);
1825
1826	/* set io sizes to object size */
1827	segment_size = rbd_obj_bytes(&rbd_dev->header);
1828	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1829	blk_queue_max_segment_size(q, segment_size);
1830	blk_queue_io_min(q, segment_size);
1831	blk_queue_io_opt(q, segment_size);
1832
1833	blk_queue_merge_bvec(q, rbd_merge_bvec);
1834	disk->queue = q;
1835
1836	q->queuedata = rbd_dev;
1837
1838	rbd_dev->disk = disk;
1839
1840	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1841
1842	return 0;
1843out_disk:
1844	put_disk(disk);
1845
1846	return -ENOMEM;
1847}
1848
1849/*
1850  sysfs
1851*/
1852
1853static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1854{
1855	return container_of(dev, struct rbd_device, dev);
1856}
1857
1858static ssize_t rbd_size_show(struct device *dev,
1859			     struct device_attribute *attr, char *buf)
1860{
1861	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1862	sector_t size;
1863
1864	down_read(&rbd_dev->header_rwsem);
1865	size = get_capacity(rbd_dev->disk);
1866	up_read(&rbd_dev->header_rwsem);
1867
1868	return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1869}
1870
1871/*
1872 * Note this shows the features for whatever's mapped, which is not
1873 * necessarily the base image.
1874 */
1875static ssize_t rbd_features_show(struct device *dev,
1876			     struct device_attribute *attr, char *buf)
1877{
1878	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879
1880	return sprintf(buf, "0x%016llx\n",
1881			(unsigned long long) rbd_dev->mapping.features);
1882}
1883
1884static ssize_t rbd_major_show(struct device *dev,
1885			      struct device_attribute *attr, char *buf)
1886{
1887	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888
1889	return sprintf(buf, "%d\n", rbd_dev->major);
1890}
1891
1892static ssize_t rbd_client_id_show(struct device *dev,
1893				  struct device_attribute *attr, char *buf)
1894{
1895	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1896
1897	return sprintf(buf, "client%lld\n",
1898			ceph_client_id(rbd_dev->rbd_client->client));
1899}
1900
1901static ssize_t rbd_pool_show(struct device *dev,
1902			     struct device_attribute *attr, char *buf)
1903{
1904	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1905
1906	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1907}
1908
1909static ssize_t rbd_pool_id_show(struct device *dev,
1910			     struct device_attribute *attr, char *buf)
1911{
1912	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913
1914	return sprintf(buf, "%d\n", rbd_dev->pool_id);
1915}
1916
1917static ssize_t rbd_name_show(struct device *dev,
1918			     struct device_attribute *attr, char *buf)
1919{
1920	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1921
1922	return sprintf(buf, "%s\n", rbd_dev->image_name);
1923}
1924
1925static ssize_t rbd_image_id_show(struct device *dev,
1926			     struct device_attribute *attr, char *buf)
1927{
1928	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1929
1930	return sprintf(buf, "%s\n", rbd_dev->image_id);
1931}
1932
1933/*
1934 * Shows the name of the currently-mapped snapshot (or
1935 * RBD_SNAP_HEAD_NAME for the base image).
1936 */
1937static ssize_t rbd_snap_show(struct device *dev,
1938			     struct device_attribute *attr,
1939			     char *buf)
1940{
1941	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1942
1943	return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
1944}
1945
1946static ssize_t rbd_image_refresh(struct device *dev,
1947				 struct device_attribute *attr,
1948				 const char *buf,
1949				 size_t size)
1950{
1951	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1952	int ret;
1953
1954	ret = rbd_dev_refresh(rbd_dev, NULL);
1955
1956	return ret < 0 ? ret : size;
1957}
1958
1959static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1960static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
1961static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1962static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1963static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1964static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1965static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1966static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
1967static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1968static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1969
1970static struct attribute *rbd_attrs[] = {
1971	&dev_attr_size.attr,
1972	&dev_attr_features.attr,
1973	&dev_attr_major.attr,
1974	&dev_attr_client_id.attr,
1975	&dev_attr_pool.attr,
1976	&dev_attr_pool_id.attr,
1977	&dev_attr_name.attr,
1978	&dev_attr_image_id.attr,
1979	&dev_attr_current_snap.attr,
1980	&dev_attr_refresh.attr,
1981	NULL
1982};
1983
1984static struct attribute_group rbd_attr_group = {
1985	.attrs = rbd_attrs,
1986};
1987
1988static const struct attribute_group *rbd_attr_groups[] = {
1989	&rbd_attr_group,
1990	NULL
1991};
1992
1993static void rbd_sysfs_dev_release(struct device *dev)
1994{
1995}
1996
1997static struct device_type rbd_device_type = {
1998	.name		= "rbd",
1999	.groups		= rbd_attr_groups,
2000	.release	= rbd_sysfs_dev_release,
2001};
2002
2003
2004/*
2005  sysfs - snapshots
2006*/
2007
2008static ssize_t rbd_snap_size_show(struct device *dev,
2009				  struct device_attribute *attr,
2010				  char *buf)
2011{
2012	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2013
2014	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2015}
2016
2017static ssize_t rbd_snap_id_show(struct device *dev,
2018				struct device_attribute *attr,
2019				char *buf)
2020{
2021	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2022
2023	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2024}
2025
2026static ssize_t rbd_snap_features_show(struct device *dev,
2027				struct device_attribute *attr,
2028				char *buf)
2029{
2030	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2031
2032	return sprintf(buf, "0x%016llx\n",
2033			(unsigned long long) snap->features);
2034}
2035
2036static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2037static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2038static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2039
2040static struct attribute *rbd_snap_attrs[] = {
2041	&dev_attr_snap_size.attr,
2042	&dev_attr_snap_id.attr,
2043	&dev_attr_snap_features.attr,
2044	NULL,
2045};
2046
2047static struct attribute_group rbd_snap_attr_group = {
2048	.attrs = rbd_snap_attrs,
2049};
2050
2051static void rbd_snap_dev_release(struct device *dev)
2052{
2053	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2054	kfree(snap->name);
2055	kfree(snap);
2056}
2057
2058static const struct attribute_group *rbd_snap_attr_groups[] = {
2059	&rbd_snap_attr_group,
2060	NULL
2061};
2062
2063static struct device_type rbd_snap_device_type = {
2064	.groups		= rbd_snap_attr_groups,
2065	.release	= rbd_snap_dev_release,
2066};
2067
2068static bool rbd_snap_registered(struct rbd_snap *snap)
2069{
2070	bool ret = snap->dev.type == &rbd_snap_device_type;
2071	bool reg = device_is_registered(&snap->dev);
2072
2073	rbd_assert(!ret ^ reg);
2074
2075	return ret;
2076}
2077
2078static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2079{
2080	list_del(&snap->node);
2081	if (device_is_registered(&snap->dev))
2082		device_unregister(&snap->dev);
2083}
2084
2085static int rbd_register_snap_dev(struct rbd_snap *snap,
2086				  struct device *parent)
2087{
2088	struct device *dev = &snap->dev;
2089	int ret;
2090
2091	dev->type = &rbd_snap_device_type;
2092	dev->parent = parent;
2093	dev->release = rbd_snap_dev_release;
2094	dev_set_name(dev, "snap_%s", snap->name);
2095	dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2096
2097	ret = device_register(dev);
2098
2099	return ret;
2100}
2101
2102static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2103						const char *snap_name,
2104						u64 snap_id, u64 snap_size,
2105						u64 snap_features)
2106{
2107	struct rbd_snap *snap;
2108	int ret;
2109
2110	snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2111	if (!snap)
2112		return ERR_PTR(-ENOMEM);
2113
2114	ret = -ENOMEM;
2115	snap->name = kstrdup(snap_name, GFP_KERNEL);
2116	if (!snap->name)
2117		goto err;
2118
2119	snap->id = snap_id;
2120	snap->size = snap_size;
2121	snap->features = snap_features;
2122
2123	return snap;
2124
2125err:
2126	kfree(snap->name);
2127	kfree(snap);
2128
2129	return ERR_PTR(ret);
2130}
2131
2132static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2133		u64 *snap_size, u64 *snap_features)
2134{
2135	char *snap_name;
2136
2137	rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2138
2139	*snap_size = rbd_dev->header.snap_sizes[which];
2140	*snap_features = 0;	/* No features for v1 */
2141
2142	/* Skip over names until we find the one we are looking for */
2143
2144	snap_name = rbd_dev->header.snap_names;
2145	while (which--)
2146		snap_name += strlen(snap_name) + 1;
2147
2148	return snap_name;
2149}
2150
2151/*
2152 * Get the size and object order for an image snapshot, or if
2153 * snap_id is CEPH_NOSNAP, gets this information for the base
2154 * image.
2155 */
2156static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2157				u8 *order, u64 *snap_size)
2158{
2159	__le64 snapid = cpu_to_le64(snap_id);
2160	int ret;
2161	struct {
2162		u8 order;
2163		__le64 size;
2164	} __attribute__ ((packed)) size_buf = { 0 };
2165
2166	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2167				"rbd", "get_size",
2168				(char *) &snapid, sizeof (snapid),
2169				(char *) &size_buf, sizeof (size_buf),
2170				CEPH_OSD_FLAG_READ, NULL);
2171	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2172	if (ret < 0)
2173		return ret;
2174
2175	*order = size_buf.order;
2176	*snap_size = le64_to_cpu(size_buf.size);
2177
2178	dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
2179		(unsigned long long) snap_id, (unsigned int) *order,
2180		(unsigned long long) *snap_size);
2181
2182	return 0;
2183}
2184
2185static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2186{
2187	return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2188					&rbd_dev->header.obj_order,
2189					&rbd_dev->header.image_size);
2190}
2191
2192static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2193{
2194	void *reply_buf;
2195	int ret;
2196	void *p;
2197
2198	reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2199	if (!reply_buf)
2200		return -ENOMEM;
2201
2202	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2203				"rbd", "get_object_prefix",
2204				NULL, 0,
2205				reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2206				CEPH_OSD_FLAG_READ, NULL);
2207	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2208	if (ret < 0)
2209		goto out;
2210	ret = 0;    /* rbd_req_sync_exec() can return positive */
2211
2212	p = reply_buf;
2213	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2214						p + RBD_OBJ_PREFIX_LEN_MAX,
2215						NULL, GFP_NOIO);
2216
2217	if (IS_ERR(rbd_dev->header.object_prefix)) {
2218		ret = PTR_ERR(rbd_dev->header.object_prefix);
2219		rbd_dev->header.object_prefix = NULL;
2220	} else {
2221		dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
2222	}
2223
2224out:
2225	kfree(reply_buf);
2226
2227	return ret;
2228}
2229
2230static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2231		u64 *snap_features)
2232{
2233	__le64 snapid = cpu_to_le64(snap_id);
2234	struct {
2235		__le64 features;
2236		__le64 incompat;
2237	} features_buf = { 0 };
2238	u64 incompat;
2239	int ret;
2240
2241	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2242				"rbd", "get_features",
2243				(char *) &snapid, sizeof (snapid),
2244				(char *) &features_buf, sizeof (features_buf),
2245				CEPH_OSD_FLAG_READ, NULL);
2246	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2247	if (ret < 0)
2248		return ret;
2249
2250	incompat = le64_to_cpu(features_buf.incompat);
2251	if (incompat & ~RBD_FEATURES_ALL)
2252		return -ENOTSUPP;
2253
2254	*snap_features = le64_to_cpu(features_buf.features);
2255
2256	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2257		(unsigned long long) snap_id,
2258		(unsigned long long) *snap_features,
2259		(unsigned long long) le64_to_cpu(features_buf.incompat));
2260
2261	return 0;
2262}
2263
2264static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2265{
2266	return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2267						&rbd_dev->header.features);
2268}
2269
2270static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2271{
2272	size_t size;
2273	int ret;
2274	void *reply_buf;
2275	void *p;
2276	void *end;
2277	u64 seq;
2278	u32 snap_count;
2279	struct ceph_snap_context *snapc;
2280	u32 i;
2281
2282	/*
2283	 * We'll need room for the seq value (maximum snapshot id),
2284	 * snapshot count, and array of that many snapshot ids.
2285	 * For now we have a fixed upper limit on the number we're
2286	 * prepared to receive.
2287	 */
2288	size = sizeof (__le64) + sizeof (__le32) +
2289			RBD_MAX_SNAP_COUNT * sizeof (__le64);
2290	reply_buf = kzalloc(size, GFP_KERNEL);
2291	if (!reply_buf)
2292		return -ENOMEM;
2293
2294	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2295				"rbd", "get_snapcontext",
2296				NULL, 0,
2297				reply_buf, size,
2298				CEPH_OSD_FLAG_READ, ver);
2299	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2300	if (ret < 0)
2301		goto out;
2302
2303	ret = -ERANGE;
2304	p = reply_buf;
2305	end = (char *) reply_buf + size;
2306	ceph_decode_64_safe(&p, end, seq, out);
2307	ceph_decode_32_safe(&p, end, snap_count, out);
2308
2309	/*
2310	 * Make sure the reported number of snapshot ids wouldn't go
2311	 * beyond the end of our buffer.  But before checking that,
2312	 * make sure the computed size of the snapshot context we
2313	 * allocate is representable in a size_t.
2314	 */
2315	if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2316				 / sizeof (u64)) {
2317		ret = -EINVAL;
2318		goto out;
2319	}
2320	if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2321		goto out;
2322
2323	size = sizeof (struct ceph_snap_context) +
2324				snap_count * sizeof (snapc->snaps[0]);
2325	snapc = kmalloc(size, GFP_KERNEL);
2326	if (!snapc) {
2327		ret = -ENOMEM;
2328		goto out;
2329	}
2330
2331	atomic_set(&snapc->nref, 1);
2332	snapc->seq = seq;
2333	snapc->num_snaps = snap_count;
2334	for (i = 0; i < snap_count; i++)
2335		snapc->snaps[i] = ceph_decode_64(&p);
2336
2337	rbd_dev->header.snapc = snapc;
2338
2339	dout("  snap context seq = %llu, snap_count = %u\n",
2340		(unsigned long long) seq, (unsigned int) snap_count);
2341
2342out:
2343	kfree(reply_buf);
2344
2345	return 0;
2346}
2347
2348static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2349{
2350	size_t size;
2351	void *reply_buf;
2352	__le64 snap_id;
2353	int ret;
2354	void *p;
2355	void *end;
2356	size_t snap_name_len;
2357	char *snap_name;
2358
2359	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2360	reply_buf = kmalloc(size, GFP_KERNEL);
2361	if (!reply_buf)
2362		return ERR_PTR(-ENOMEM);
2363
2364	snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2365	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2366				"rbd", "get_snapshot_name",
2367				(char *) &snap_id, sizeof (snap_id),
2368				reply_buf, size,
2369				CEPH_OSD_FLAG_READ, NULL);
2370	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2371	if (ret < 0)
2372		goto out;
2373
2374	p = reply_buf;
2375	end = (char *) reply_buf + size;
2376	snap_name_len = 0;
2377	snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
2378				GFP_KERNEL);
2379	if (IS_ERR(snap_name)) {
2380		ret = PTR_ERR(snap_name);
2381		goto out;
2382	} else {
2383		dout("  snap_id 0x%016llx snap_name = %s\n",
2384			(unsigned long long) le64_to_cpu(snap_id), snap_name);
2385	}
2386	kfree(reply_buf);
2387
2388	return snap_name;
2389out:
2390	kfree(reply_buf);
2391
2392	return ERR_PTR(ret);
2393}
2394
2395static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2396		u64 *snap_size, u64 *snap_features)
2397{
2398	__le64 snap_id;
2399	u8 order;
2400	int ret;
2401
2402	snap_id = rbd_dev->header.snapc->snaps[which];
2403	ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2404	if (ret)
2405		return ERR_PTR(ret);
2406	ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2407	if (ret)
2408		return ERR_PTR(ret);
2409
2410	return rbd_dev_v2_snap_name(rbd_dev, which);
2411}
2412
2413static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2414		u64 *snap_size, u64 *snap_features)
2415{
2416	if (rbd_dev->image_format == 1)
2417		return rbd_dev_v1_snap_info(rbd_dev, which,
2418					snap_size, snap_features);
2419	if (rbd_dev->image_format == 2)
2420		return rbd_dev_v2_snap_info(rbd_dev, which,
2421					snap_size, snap_features);
2422	return ERR_PTR(-EINVAL);
2423}
2424
2425static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2426{
2427	int ret;
2428	__u8 obj_order;
2429
2430	down_write(&rbd_dev->header_rwsem);
2431
2432	/* Grab old order first, to see if it changes */
2433
2434	obj_order = rbd_dev->header.obj_order,
2435	ret = rbd_dev_v2_image_size(rbd_dev);
2436	if (ret)
2437		goto out;
2438	if (rbd_dev->header.obj_order != obj_order) {
2439		ret = -EIO;
2440		goto out;
2441	}
2442	rbd_update_mapping_size(rbd_dev);
2443
2444	ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2445	dout("rbd_dev_v2_snap_context returned %d\n", ret);
2446	if (ret)
2447		goto out;
2448	ret = rbd_dev_snaps_update(rbd_dev);
2449	dout("rbd_dev_snaps_update returned %d\n", ret);
2450	if (ret)
2451		goto out;
2452	ret = rbd_dev_snaps_register(rbd_dev);
2453	dout("rbd_dev_snaps_register returned %d\n", ret);
2454out:
2455	up_write(&rbd_dev->header_rwsem);
2456
2457	return ret;
2458}
2459
2460/*
2461 * Scan the rbd device's current snapshot list and compare it to the
2462 * newly-received snapshot context.  Remove any existing snapshots
2463 * not present in the new snapshot context.  Add a new snapshot for
2464 * any snaphots in the snapshot context not in the current list.
2465 * And verify there are no changes to snapshots we already know
2466 * about.
2467 *
2468 * Assumes the snapshots in the snapshot context are sorted by
2469 * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2470 * are also maintained in that order.)
2471 */
2472static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2473{
2474	struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2475	const u32 snap_count = snapc->num_snaps;
2476	struct list_head *head = &rbd_dev->snaps;
2477	struct list_head *links = head->next;
2478	u32 index = 0;
2479
2480	dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2481	while (index < snap_count || links != head) {
2482		u64 snap_id;
2483		struct rbd_snap *snap;
2484		char *snap_name;
2485		u64 snap_size = 0;
2486		u64 snap_features = 0;
2487
2488		snap_id = index < snap_count ? snapc->snaps[index]
2489					     : CEPH_NOSNAP;
2490		snap = links != head ? list_entry(links, struct rbd_snap, node)
2491				     : NULL;
2492		rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2493
2494		if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2495			struct list_head *next = links->next;
2496
2497			/* Existing snapshot not in the new snap context */
2498
2499			if (rbd_dev->mapping.snap_id == snap->id)
2500				rbd_dev->mapping.snap_exists = false;
2501			__rbd_remove_snap_dev(snap);
2502			dout("%ssnap id %llu has been removed\n",
2503				rbd_dev->mapping.snap_id == snap->id ?
2504								"mapped " : "",
2505				(unsigned long long) snap->id);
2506
2507			/* Done with this list entry; advance */
2508
2509			links = next;
2510			continue;
2511		}
2512
2513		snap_name = rbd_dev_snap_info(rbd_dev, index,
2514					&snap_size, &snap_features);
2515		if (IS_ERR(snap_name))
2516			return PTR_ERR(snap_name);
2517
2518		dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2519			(unsigned long long) snap_id);
2520		if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2521			struct rbd_snap *new_snap;
2522
2523			/* We haven't seen this snapshot before */
2524
2525			new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2526					snap_id, snap_size, snap_features);
2527			if (IS_ERR(new_snap)) {
2528				int err = PTR_ERR(new_snap);
2529
2530				dout("  failed to add dev, error %d\n", err);
2531
2532				return err;
2533			}
2534
2535			/* New goes before existing, or at end of list */
2536
2537			dout("  added dev%s\n", snap ? "" : " at end\n");
2538			if (snap)
2539				list_add_tail(&new_snap->node, &snap->node);
2540			else
2541				list_add_tail(&new_snap->node, head);
2542		} else {
2543			/* Already have this one */
2544
2545			dout("  already present\n");
2546
2547			rbd_assert(snap->size == snap_size);
2548			rbd_assert(!strcmp(snap->name, snap_name));
2549			rbd_assert(snap->features == snap_features);
2550
2551			/* Done with this list entry; advance */
2552
2553			links = links->next;
2554		}
2555
2556		/* Advance to the next entry in the snapshot context */
2557
2558		index++;
2559	}
2560	dout("%s: done\n", __func__);
2561
2562	return 0;
2563}
2564
2565/*
2566 * Scan the list of snapshots and register the devices for any that
2567 * have not already been registered.
2568 */
2569static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2570{
2571	struct rbd_snap *snap;
2572	int ret = 0;
2573
2574	dout("%s called\n", __func__);
2575	if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2576		return -EIO;
2577
2578	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2579		if (!rbd_snap_registered(snap)) {
2580			ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2581			if (ret < 0)
2582				break;
2583		}
2584	}
2585	dout("%s: returning %d\n", __func__, ret);
2586
2587	return ret;
2588}
2589
2590static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2591{
2592	struct device *dev;
2593	int ret;
2594
2595	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2596
2597	dev = &rbd_dev->dev;
2598	dev->bus = &rbd_bus_type;
2599	dev->type = &rbd_device_type;
2600	dev->parent = &rbd_root_dev;
2601	dev->release = rbd_dev_release;
2602	dev_set_name(dev, "%d", rbd_dev->dev_id);
2603	ret = device_register(dev);
2604
2605	mutex_unlock(&ctl_mutex);
2606
2607	return ret;
2608}
2609
2610static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2611{
2612	device_unregister(&rbd_dev->dev);
2613}
2614
2615static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2616{
2617	int ret, rc;
2618
2619	do {
2620		ret = rbd_req_sync_watch(rbd_dev);
2621		if (ret == -ERANGE) {
2622			rc = rbd_dev_refresh(rbd_dev, NULL);
2623			if (rc < 0)
2624				return rc;
2625		}
2626	} while (ret == -ERANGE);
2627
2628	return ret;
2629}
2630
2631static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2632
2633/*
2634 * Get a unique rbd identifier for the given new rbd_dev, and add
2635 * the rbd_dev to the global list.  The minimum rbd id is 1.
2636 */
2637static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2638{
2639	rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2640
2641	spin_lock(&rbd_dev_list_lock);
2642	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2643	spin_unlock(&rbd_dev_list_lock);
2644	dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2645		(unsigned long long) rbd_dev->dev_id);
2646}
2647
2648/*
2649 * Remove an rbd_dev from the global list, and record that its
2650 * identifier is no longer in use.
2651 */
2652static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2653{
2654	struct list_head *tmp;
2655	int rbd_id = rbd_dev->dev_id;
2656	int max_id;
2657
2658	rbd_assert(rbd_id > 0);
2659
2660	dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2661		(unsigned long long) rbd_dev->dev_id);
2662	spin_lock(&rbd_dev_list_lock);
2663	list_del_init(&rbd_dev->node);
2664
2665	/*
2666	 * If the id being "put" is not the current maximum, there
2667	 * is nothing special we need to do.
2668	 */
2669	if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2670		spin_unlock(&rbd_dev_list_lock);
2671		return;
2672	}
2673
2674	/*
2675	 * We need to update the current maximum id.  Search the
2676	 * list to find out what it is.  We're more likely to find
2677	 * the maximum at the end, so search the list backward.
2678	 */
2679	max_id = 0;
2680	list_for_each_prev(tmp, &rbd_dev_list) {
2681		struct rbd_device *rbd_dev;
2682
2683		rbd_dev = list_entry(tmp, struct rbd_device, node);
2684		if (rbd_dev->dev_id > max_id)
2685			max_id = rbd_dev->dev_id;
2686	}
2687	spin_unlock(&rbd_dev_list_lock);
2688
2689	/*
2690	 * The max id could have been updated by rbd_dev_id_get(), in
2691	 * which case it now accurately reflects the new maximum.
2692	 * Be careful not to overwrite the maximum value in that
2693	 * case.
2694	 */
2695	atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2696	dout("  max dev id has been reset\n");
2697}
2698
2699/*
2700 * Skips over white space at *buf, and updates *buf to point to the
2701 * first found non-space character (if any). Returns the length of
2702 * the token (string of non-white space characters) found.  Note
2703 * that *buf must be terminated with '\0'.
2704 */
2705static inline size_t next_token(const char **buf)
2706{
2707        /*
2708        * These are the characters that produce nonzero for
2709        * isspace() in the "C" and "POSIX" locales.
2710        */
2711        const char *spaces = " \f\n\r\t\v";
2712
2713        *buf += strspn(*buf, spaces);	/* Find start of token */
2714
2715	return strcspn(*buf, spaces);   /* Return token length */
2716}
2717
2718/*
2719 * Finds the next token in *buf, and if the provided token buffer is
2720 * big enough, copies the found token into it.  The result, if
2721 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2722 * must be terminated with '\0' on entry.
2723 *
2724 * Returns the length of the token found (not including the '\0').
2725 * Return value will be 0 if no token is found, and it will be >=
2726 * token_size if the token would not fit.
2727 *
2728 * The *buf pointer will be updated to point beyond the end of the
2729 * found token.  Note that this occurs even if the token buffer is
2730 * too small to hold it.
2731 */
2732static inline size_t copy_token(const char **buf,
2733				char *token,
2734				size_t token_size)
2735{
2736        size_t len;
2737
2738	len = next_token(buf);
2739	if (len < token_size) {
2740		memcpy(token, *buf, len);
2741		*(token + len) = '\0';
2742	}
2743	*buf += len;
2744
2745        return len;
2746}
2747
2748/*
2749 * Finds the next token in *buf, dynamically allocates a buffer big
2750 * enough to hold a copy of it, and copies the token into the new
2751 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2752 * that a duplicate buffer is created even for a zero-length token.
2753 *
2754 * Returns a pointer to the newly-allocated duplicate, or a null
2755 * pointer if memory for the duplicate was not available.  If
2756 * the lenp argument is a non-null pointer, the length of the token
2757 * (not including the '\0') is returned in *lenp.
2758 *
2759 * If successful, the *buf pointer will be updated to point beyond
2760 * the end of the found token.
2761 *
2762 * Note: uses GFP_KERNEL for allocation.
2763 */
2764static inline char *dup_token(const char **buf, size_t *lenp)
2765{
2766	char *dup;
2767	size_t len;
2768
2769	len = next_token(buf);
2770	dup = kmalloc(len + 1, GFP_KERNEL);
2771	if (!dup)
2772		return NULL;
2773
2774	memcpy(dup, *buf, len);
2775	*(dup + len) = '\0';
2776	*buf += len;
2777
2778	if (lenp)
2779		*lenp = len;
2780
2781	return dup;
2782}
2783
2784/*
2785 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2786 * rbd_md_name, and name fields of the given rbd_dev, based on the
2787 * list of monitor addresses and other options provided via
2788 * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
2789 * copy of the snapshot name to map if successful, or a
2790 * pointer-coded error otherwise.
2791 *
2792 * Note: rbd_dev is assumed to have been initially zero-filled.
2793 */
2794static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2795				const char *buf,
2796				const char **mon_addrs,
2797				size_t *mon_addrs_size,
2798				char *options,
2799				size_t options_size)
2800{
2801	size_t len;
2802	char *err_ptr = ERR_PTR(-EINVAL);
2803	char *snap_name;
2804
2805	/* The first four tokens are required */
2806
2807	len = next_token(&buf);
2808	if (!len)
2809		return err_ptr;
2810	*mon_addrs_size = len + 1;
2811	*mon_addrs = buf;
2812
2813	buf += len;
2814
2815	len = copy_token(&buf, options, options_size);
2816	if (!len || len >= options_size)
2817		return err_ptr;
2818
2819	err_ptr = ERR_PTR(-ENOMEM);
2820	rbd_dev->pool_name = dup_token(&buf, NULL);
2821	if (!rbd_dev->pool_name)
2822		goto out_err;
2823
2824	rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2825	if (!rbd_dev->image_name)
2826		goto out_err;
2827
2828	/* Snapshot name is optional */
2829	len = next_token(&buf);
2830	if (!len) {
2831		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2832		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2833	}
2834	snap_name = kmalloc(len + 1, GFP_KERNEL);
2835	if (!snap_name)
2836		goto out_err;
2837	memcpy(snap_name, buf, len);
2838	*(snap_name + len) = '\0';
2839
2840dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2841
2842	return snap_name;
2843
2844out_err:
2845	kfree(rbd_dev->image_name);
2846	rbd_dev->image_name = NULL;
2847	rbd_dev->image_name_len = 0;
2848	kfree(rbd_dev->pool_name);
2849	rbd_dev->pool_name = NULL;
2850
2851	return err_ptr;
2852}
2853
2854/*
2855 * An rbd format 2 image has a unique identifier, distinct from the
2856 * name given to it by the user.  Internally, that identifier is
2857 * what's used to specify the names of objects related to the image.
2858 *
2859 * A special "rbd id" object is used to map an rbd image name to its
2860 * id.  If that object doesn't exist, then there is no v2 rbd image
2861 * with the supplied name.
2862 *
2863 * This function will record the given rbd_dev's image_id field if
2864 * it can be determined, and in that case will return 0.  If any
2865 * errors occur a negative errno will be returned and the rbd_dev's
2866 * image_id field will be unchanged (and should be NULL).
2867 */
2868static int rbd_dev_image_id(struct rbd_device *rbd_dev)
2869{
2870	int ret;
2871	size_t size;
2872	char *object_name;
2873	void *response;
2874	void *p;
2875
2876	/*
2877	 * First, see if the format 2 image id file exists, and if
2878	 * so, get the image's persistent id from it.
2879	 */
2880	size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
2881	object_name = kmalloc(size, GFP_NOIO);
2882	if (!object_name)
2883		return -ENOMEM;
2884	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
2885	dout("rbd id object name is %s\n", object_name);
2886
2887	/* Response will be an encoded string, which includes a length */
2888
2889	size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
2890	response = kzalloc(size, GFP_NOIO);
2891	if (!response) {
2892		ret = -ENOMEM;
2893		goto out;
2894	}
2895
2896	ret = rbd_req_sync_exec(rbd_dev, object_name,
2897				"rbd", "get_id",
2898				NULL, 0,
2899				response, RBD_IMAGE_ID_LEN_MAX,
2900				CEPH_OSD_FLAG_READ, NULL);
2901	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2902	if (ret < 0)
2903		goto out;
2904	ret = 0;    /* rbd_req_sync_exec() can return positive */
2905
2906	p = response;
2907	rbd_dev->image_id = ceph_extract_encoded_string(&p,
2908						p + RBD_IMAGE_ID_LEN_MAX,
2909						&rbd_dev->image_id_len,
2910						GFP_NOIO);
2911	if (IS_ERR(rbd_dev->image_id)) {
2912		ret = PTR_ERR(rbd_dev->image_id);
2913		rbd_dev->image_id = NULL;
2914	} else {
2915		dout("image_id is %s\n", rbd_dev->image_id);
2916	}
2917out:
2918	kfree(response);
2919	kfree(object_name);
2920
2921	return ret;
2922}
2923
2924static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
2925{
2926	int ret;
2927	size_t size;
2928
2929	/* Version 1 images have no id; empty string is used */
2930
2931	rbd_dev->image_id = kstrdup("", GFP_KERNEL);
2932	if (!rbd_dev->image_id)
2933		return -ENOMEM;
2934	rbd_dev->image_id_len = 0;
2935
2936	/* Record the header object name for this rbd image. */
2937
2938	size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
2939	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2940	if (!rbd_dev->header_name) {
2941		ret = -ENOMEM;
2942		goto out_err;
2943	}
2944	sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2945
2946	/* Populate rbd image metadata */
2947
2948	ret = rbd_read_header(rbd_dev, &rbd_dev->header);
2949	if (ret < 0)
2950		goto out_err;
2951	rbd_dev->image_format = 1;
2952
2953	dout("discovered version 1 image, header name is %s\n",
2954		rbd_dev->header_name);
2955
2956	return 0;
2957
2958out_err:
2959	kfree(rbd_dev->header_name);
2960	rbd_dev->header_name = NULL;
2961	kfree(rbd_dev->image_id);
2962	rbd_dev->image_id = NULL;
2963
2964	return ret;
2965}
2966
2967static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
2968{
2969	size_t size;
2970	int ret;
2971	u64 ver = 0;
2972
2973	/*
2974	 * Image id was filled in by the caller.  Record the header
2975	 * object name for this rbd image.
2976	 */
2977	size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
2978	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
2979	if (!rbd_dev->header_name)
2980		return -ENOMEM;
2981	sprintf(rbd_dev->header_name, "%s%s",
2982			RBD_HEADER_PREFIX, rbd_dev->image_id);
2983
2984	/* Get the size and object order for the image */
2985
2986	ret = rbd_dev_v2_image_size(rbd_dev);
2987	if (ret < 0)
2988		goto out_err;
2989
2990	/* Get the object prefix (a.k.a. block_name) for the image */
2991
2992	ret = rbd_dev_v2_object_prefix(rbd_dev);
2993	if (ret < 0)
2994		goto out_err;
2995
2996	/* Get the and check features for the image */
2997
2998	ret = rbd_dev_v2_features(rbd_dev);
2999	if (ret < 0)
3000		goto out_err;
3001
3002	/* crypto and compression type aren't (yet) supported for v2 images */
3003
3004	rbd_dev->header.crypt_type = 0;
3005	rbd_dev->header.comp_type = 0;
3006
3007	/* Get the snapshot context, plus the header version */
3008
3009	ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3010	if (ret)
3011		goto out_err;
3012	rbd_dev->header.obj_version = ver;
3013
3014	rbd_dev->image_format = 2;
3015
3016	dout("discovered version 2 image, header name is %s\n",
3017		rbd_dev->header_name);
3018
3019	return 0;
3020out_err:
3021	kfree(rbd_dev->header_name);
3022	rbd_dev->header_name = NULL;
3023	kfree(rbd_dev->header.object_prefix);
3024	rbd_dev->header.object_prefix = NULL;
3025
3026	return ret;
3027}
3028
3029/*
3030 * Probe for the existence of the header object for the given rbd
3031 * device.  For format 2 images this includes determining the image
3032 * id.
3033 */
3034static int rbd_dev_probe(struct rbd_device *rbd_dev)
3035{
3036	int ret;
3037
3038	/*
3039	 * Get the id from the image id object.  If it's not a
3040	 * format 2 image, we'll get ENOENT back, and we'll assume
3041	 * it's a format 1 image.
3042	 */
3043	ret = rbd_dev_image_id(rbd_dev);
3044	if (ret)
3045		ret = rbd_dev_v1_probe(rbd_dev);
3046	else
3047		ret = rbd_dev_v2_probe(rbd_dev);
3048	if (ret)
3049		dout("probe failed, returning %d\n", ret);
3050
3051	return ret;
3052}
3053
3054static ssize_t rbd_add(struct bus_type *bus,
3055		       const char *buf,
3056		       size_t count)
3057{
3058	char *options;
3059	struct rbd_device *rbd_dev = NULL;
3060	const char *mon_addrs = NULL;
3061	size_t mon_addrs_size = 0;
3062	struct ceph_osd_client *osdc;
3063	int rc = -ENOMEM;
3064	char *snap_name;
3065
3066	if (!try_module_get(THIS_MODULE))
3067		return -ENODEV;
3068
3069	options = kmalloc(count, GFP_KERNEL);
3070	if (!options)
3071		goto err_out_mem;
3072	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
3073	if (!rbd_dev)
3074		goto err_out_mem;
3075
3076	/* static rbd_device initialization */
3077	spin_lock_init(&rbd_dev->lock);
3078	INIT_LIST_HEAD(&rbd_dev->node);
3079	INIT_LIST_HEAD(&rbd_dev->snaps);
3080	init_rwsem(&rbd_dev->header_rwsem);
3081
3082	/* parse add command */
3083	snap_name = rbd_add_parse_args(rbd_dev, buf,
3084				&mon_addrs, &mon_addrs_size, options, count);
3085	if (IS_ERR(snap_name)) {
3086		rc = PTR_ERR(snap_name);
3087		goto err_out_mem;
3088	}
3089
3090	rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
3091	if (rc < 0)
3092		goto err_out_args;
3093
3094	/* pick the pool */
3095	osdc = &rbd_dev->rbd_client->client->osdc;
3096	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
3097	if (rc < 0)
3098		goto err_out_client;
3099	rbd_dev->pool_id = rc;
3100
3101	rc = rbd_dev_probe(rbd_dev);
3102	if (rc < 0)
3103		goto err_out_client;
3104
3105	/* no need to lock here, as rbd_dev is not registered yet */
3106	rc = rbd_dev_snaps_update(rbd_dev);
3107	if (rc)
3108		goto err_out_header;
3109
3110	rc = rbd_dev_set_mapping(rbd_dev, snap_name);
3111	if (rc)
3112		goto err_out_header;
3113
3114	/* generate unique id: find highest unique id, add one */
3115	rbd_dev_id_get(rbd_dev);
3116
3117	/* Fill in the device name, now that we have its id. */
3118	BUILD_BUG_ON(DEV_NAME_LEN
3119			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3120	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3121
3122	/* Get our block major device number. */
3123
3124	rc = register_blkdev(0, rbd_dev->name);
3125	if (rc < 0)
3126		goto err_out_id;
3127	rbd_dev->major = rc;
3128
3129	/* Set up the blkdev mapping. */
3130
3131	rc = rbd_init_disk(rbd_dev);
3132	if (rc)
3133		goto err_out_blkdev;
3134
3135	rc = rbd_bus_add_dev(rbd_dev);
3136	if (rc)
3137		goto err_out_disk;
3138
3139	/*
3140	 * At this point cleanup in the event of an error is the job
3141	 * of the sysfs code (initiated by rbd_bus_del_dev()).
3142	 */
3143
3144	down_write(&rbd_dev->header_rwsem);
3145	rc = rbd_dev_snaps_register(rbd_dev);
3146	up_write(&rbd_dev->header_rwsem);
3147	if (rc)
3148		goto err_out_bus;
3149
3150	rc = rbd_init_watch_dev(rbd_dev);
3151	if (rc)
3152		goto err_out_bus;
3153
3154	/* Everything's ready.  Announce the disk to the world. */
3155
3156	add_disk(rbd_dev->disk);
3157
3158	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3159		(unsigned long long) rbd_dev->mapping.size);
3160
3161	return count;
3162
3163err_out_bus:
3164	/* this will also clean up rest of rbd_dev stuff */
3165
3166	rbd_bus_del_dev(rbd_dev);
3167	kfree(options);
3168	return rc;
3169
3170err_out_disk:
3171	rbd_free_disk(rbd_dev);
3172err_out_blkdev:
3173	unregister_blkdev(rbd_dev->major, rbd_dev->name);
3174err_out_id:
3175	rbd_dev_id_put(rbd_dev);
3176err_out_header:
3177	rbd_header_free(&rbd_dev->header);
3178err_out_client:
3179	kfree(rbd_dev->header_name);
3180	rbd_put_client(rbd_dev);
3181	kfree(rbd_dev->image_id);
3182err_out_args:
3183	kfree(rbd_dev->mapping.snap_name);
3184	kfree(rbd_dev->image_name);
3185	kfree(rbd_dev->pool_name);
3186err_out_mem:
3187	kfree(rbd_dev);
3188	kfree(options);
3189
3190	dout("Error adding device %s\n", buf);
3191	module_put(THIS_MODULE);
3192
3193	return (ssize_t) rc;
3194}
3195
3196static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3197{
3198	struct list_head *tmp;
3199	struct rbd_device *rbd_dev;
3200
3201	spin_lock(&rbd_dev_list_lock);
3202	list_for_each(tmp, &rbd_dev_list) {
3203		rbd_dev = list_entry(tmp, struct rbd_device, node);
3204		if (rbd_dev->dev_id == dev_id) {
3205			spin_unlock(&rbd_dev_list_lock);
3206			return rbd_dev;
3207		}
3208	}
3209	spin_unlock(&rbd_dev_list_lock);
3210	return NULL;
3211}
3212
3213static void rbd_dev_release(struct device *dev)
3214{
3215	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3216
3217	if (rbd_dev->watch_request) {
3218		struct ceph_client *client = rbd_dev->rbd_client->client;
3219
3220		ceph_osdc_unregister_linger_request(&client->osdc,
3221						    rbd_dev->watch_request);
3222	}
3223	if (rbd_dev->watch_event)
3224		rbd_req_sync_unwatch(rbd_dev);
3225
3226	rbd_put_client(rbd_dev);
3227
3228	/* clean up and free blkdev */
3229	rbd_free_disk(rbd_dev);
3230	unregister_blkdev(rbd_dev->major, rbd_dev->name);
3231
3232	/* release allocated disk header fields */
3233	rbd_header_free(&rbd_dev->header);
3234
3235	/* done with the id, and with the rbd_dev */
3236	kfree(rbd_dev->mapping.snap_name);
3237	kfree(rbd_dev->image_id);
3238	kfree(rbd_dev->header_name);
3239	kfree(rbd_dev->pool_name);
3240	kfree(rbd_dev->image_name);
3241	rbd_dev_id_put(rbd_dev);
3242	kfree(rbd_dev);
3243
3244	/* release module ref */
3245	module_put(THIS_MODULE);
3246}
3247
3248static ssize_t rbd_remove(struct bus_type *bus,
3249			  const char *buf,
3250			  size_t count)
3251{
3252	struct rbd_device *rbd_dev = NULL;
3253	int target_id, rc;
3254	unsigned long ul;
3255	int ret = count;
3256
3257	rc = strict_strtoul(buf, 10, &ul);
3258	if (rc)
3259		return rc;
3260
3261	/* convert to int; abort if we lost anything in the conversion */
3262	target_id = (int) ul;
3263	if (target_id != ul)
3264		return -EINVAL;
3265
3266	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3267
3268	rbd_dev = __rbd_get_dev(target_id);
3269	if (!rbd_dev) {
3270		ret = -ENOENT;
3271		goto done;
3272	}
3273
3274	__rbd_remove_all_snaps(rbd_dev);
3275	rbd_bus_del_dev(rbd_dev);
3276
3277done:
3278	mutex_unlock(&ctl_mutex);
3279
3280	return ret;
3281}
3282
3283/*
3284 * create control files in sysfs
3285 * /sys/bus/rbd/...
3286 */
3287static int rbd_sysfs_init(void)
3288{
3289	int ret;
3290
3291	ret = device_register(&rbd_root_dev);
3292	if (ret < 0)
3293		return ret;
3294
3295	ret = bus_register(&rbd_bus_type);
3296	if (ret < 0)
3297		device_unregister(&rbd_root_dev);
3298
3299	return ret;
3300}
3301
3302static void rbd_sysfs_cleanup(void)
3303{
3304	bus_unregister(&rbd_bus_type);
3305	device_unregister(&rbd_root_dev);
3306}
3307
3308int __init rbd_init(void)
3309{
3310	int rc;
3311
3312	rc = rbd_sysfs_init();
3313	if (rc)
3314		return rc;
3315	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3316	return 0;
3317}
3318
3319void __exit rbd_exit(void)
3320{
3321	rbd_sysfs_cleanup();
3322}
3323
3324module_init(rbd_init);
3325module_exit(rbd_exit);
3326
3327MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3328MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3329MODULE_DESCRIPTION("rados block device");
3330
3331/* following authorship retained from original osdblk.c */
3332MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3333
3334MODULE_LICENSE("GPL");
3335