rbd.c revision 340c7a2b2c9a2da640af28a8c196356484ac8b50
1/*
2   rbd.c -- Export ceph rados objects as a Linux block device
3
4
5   based on drivers/block/osdblk.c:
6
7   Copyright 2009 Red Hat, Inc.
8
9   This program is free software; you can redistribute it and/or modify
10   it under the terms of the GNU General Public License as published by
11   the Free Software Foundation.
12
13   This program is distributed in the hope that it will be useful,
14   but WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   GNU General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with this program; see the file COPYING.  If not, write to
20   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24   For usage instructions, please refer to:
25
26                 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
34#include <linux/parser.h>
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44/*
45 * The basic unit of block I/O is a sector.  It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes.  These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define	SECTOR_SHIFT	9
51#define	SECTOR_SIZE	(1ULL << SECTOR_SHIFT)
52
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56#define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
57
58#define RBD_MAX_SNAP_NAME_LEN	32
59#define RBD_MAX_OPT_LEN		1024
60
61#define RBD_SNAP_HEAD_NAME	"-"
62
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
69#define DEV_NAME_LEN		32
70#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
71
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78	u64 image_size;
79	char *object_prefix;
80	__u8 obj_order;
81	__u8 crypt_type;
82	__u8 comp_type;
83	struct ceph_snap_context *snapc;
84	size_t snap_names_len;
85	u32 total_snaps;
86
87	char *snap_names;
88	u64 *snap_sizes;
89
90	u64 obj_version;
91};
92
93struct rbd_options {
94	int	notify_timeout;
95};
96
97/*
98 * an instance of the client.  multiple devices may share an rbd client.
99 */
100struct rbd_client {
101	struct ceph_client	*client;
102	struct rbd_options	*rbd_opts;
103	struct kref		kref;
104	struct list_head	node;
105};
106
107/*
108 * a request completion status
109 */
110struct rbd_req_status {
111	int done;
112	int rc;
113	u64 bytes;
114};
115
116/*
117 * a collection of requests
118 */
119struct rbd_req_coll {
120	int			total;
121	int			num_done;
122	struct kref		kref;
123	struct rbd_req_status	status[0];
124};
125
126/*
127 * a single io request
128 */
129struct rbd_request {
130	struct request		*rq;		/* blk layer request */
131	struct bio		*bio;		/* cloned bio */
132	struct page		**pages;	/* list of used pages */
133	u64			len;
134	int			coll_index;
135	struct rbd_req_coll	*coll;
136};
137
138struct rbd_snap {
139	struct	device		dev;
140	const char		*name;
141	u64			size;
142	struct list_head	node;
143	u64			id;
144};
145
146/*
147 * a single device
148 */
149struct rbd_device {
150	int			dev_id;		/* blkdev unique id */
151
152	int			major;		/* blkdev assigned major */
153	struct gendisk		*disk;		/* blkdev's gendisk and rq */
154	struct request_queue	*q;
155
156	struct rbd_client	*rbd_client;
157
158	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160	spinlock_t		lock;		/* queue lock */
161
162	struct rbd_image_header	header;
163	char			*image_name;
164	size_t			image_name_len;
165	char			*header_name;
166	char			*pool_name;
167	int			pool_id;
168
169	struct ceph_osd_event   *watch_event;
170	struct ceph_osd_request *watch_request;
171
172	/* protects updating the header */
173	struct rw_semaphore     header_rwsem;
174	/* name of the snapshot this device reads from */
175	char                    *snap_name;
176	/* id of the snapshot this device reads from */
177	u64                     snap_id;	/* current snapshot id */
178	/* whether the snap_id this device reads from still exists */
179	bool                    snap_exists;
180	int                     read_only;
181
182	struct list_head	node;
183
184	/* list of snapshots */
185	struct list_head	snaps;
186
187	/* sysfs related */
188	struct device		dev;
189};
190
191static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
192
193static LIST_HEAD(rbd_dev_list);    /* devices */
194static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196static LIST_HEAD(rbd_client_list);		/* clients */
197static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200static void rbd_dev_release(struct device *dev);
201static ssize_t rbd_snap_add(struct device *dev,
202			    struct device_attribute *attr,
203			    const char *buf,
204			    size_t count);
205static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208		       size_t count);
209static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210			  size_t count);
211
212static struct bus_attribute rbd_bus_attrs[] = {
213	__ATTR(add, S_IWUSR, NULL, rbd_add),
214	__ATTR(remove, S_IWUSR, NULL, rbd_remove),
215	__ATTR_NULL
216};
217
218static struct bus_type rbd_bus_type = {
219	.name		= "rbd",
220	.bus_attrs	= rbd_bus_attrs,
221};
222
223static void rbd_root_dev_release(struct device *dev)
224{
225}
226
227static struct device rbd_root_dev = {
228	.init_name =    "rbd",
229	.release =      rbd_root_dev_release,
230};
231
232
233static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234{
235	return get_device(&rbd_dev->dev);
236}
237
238static void rbd_put_dev(struct rbd_device *rbd_dev)
239{
240	put_device(&rbd_dev->dev);
241}
242
243static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245static int rbd_open(struct block_device *bdev, fmode_t mode)
246{
247	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249	if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250		return -EROFS;
251
252	rbd_get_dev(rbd_dev);
253	set_device_ro(bdev, rbd_dev->read_only);
254
255	return 0;
256}
257
258static int rbd_release(struct gendisk *disk, fmode_t mode)
259{
260	struct rbd_device *rbd_dev = disk->private_data;
261
262	rbd_put_dev(rbd_dev);
263
264	return 0;
265}
266
267static const struct block_device_operations rbd_bd_ops = {
268	.owner			= THIS_MODULE,
269	.open			= rbd_open,
270	.release		= rbd_release,
271};
272
273/*
274 * Initialize an rbd client instance.
275 * We own *ceph_opts.
276 */
277static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278					    struct rbd_options *rbd_opts)
279{
280	struct rbd_client *rbdc;
281	int ret = -ENOMEM;
282
283	dout("rbd_client_create\n");
284	rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285	if (!rbdc)
286		goto out_opt;
287
288	kref_init(&rbdc->kref);
289	INIT_LIST_HEAD(&rbdc->node);
290
291	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
293	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294	if (IS_ERR(rbdc->client))
295		goto out_mutex;
296	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297
298	ret = ceph_open_session(rbdc->client);
299	if (ret < 0)
300		goto out_err;
301
302	rbdc->rbd_opts = rbd_opts;
303
304	spin_lock(&rbd_client_list_lock);
305	list_add_tail(&rbdc->node, &rbd_client_list);
306	spin_unlock(&rbd_client_list_lock);
307
308	mutex_unlock(&ctl_mutex);
309
310	dout("rbd_client_create created %p\n", rbdc);
311	return rbdc;
312
313out_err:
314	ceph_destroy_client(rbdc->client);
315out_mutex:
316	mutex_unlock(&ctl_mutex);
317	kfree(rbdc);
318out_opt:
319	if (ceph_opts)
320		ceph_destroy_options(ceph_opts);
321	return ERR_PTR(ret);
322}
323
324/*
325 * Find a ceph client with specific addr and configuration.
326 */
327static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
328{
329	struct rbd_client *client_node;
330
331	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
332		return NULL;
333
334	list_for_each_entry(client_node, &rbd_client_list, node)
335		if (!ceph_compare_options(ceph_opts, client_node->client))
336			return client_node;
337	return NULL;
338}
339
340/*
341 * mount options
342 */
343enum {
344	Opt_notify_timeout,
345	Opt_last_int,
346	/* int args above */
347	Opt_last_string,
348	/* string args above */
349};
350
351static match_table_t rbd_opts_tokens = {
352	{Opt_notify_timeout, "notify_timeout=%d"},
353	/* int args above */
354	/* string args above */
355	{-1, NULL}
356};
357
358static int parse_rbd_opts_token(char *c, void *private)
359{
360	struct rbd_options *rbd_opts = private;
361	substring_t argstr[MAX_OPT_ARGS];
362	int token, intval, ret;
363
364	token = match_token(c, rbd_opts_tokens, argstr);
365	if (token < 0)
366		return -EINVAL;
367
368	if (token < Opt_last_int) {
369		ret = match_int(&argstr[0], &intval);
370		if (ret < 0) {
371			pr_err("bad mount option arg (not int) "
372			       "at '%s'\n", c);
373			return ret;
374		}
375		dout("got int token %d val %d\n", token, intval);
376	} else if (token > Opt_last_int && token < Opt_last_string) {
377		dout("got string token %d val %s\n", token,
378		     argstr[0].from);
379	} else {
380		dout("got token %d\n", token);
381	}
382
383	switch (token) {
384	case Opt_notify_timeout:
385		rbd_opts->notify_timeout = intval;
386		break;
387	default:
388		BUG_ON(token);
389	}
390	return 0;
391}
392
393/*
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
396 */
397static struct rbd_client *rbd_get_client(const char *mon_addr,
398					 size_t mon_addr_len,
399					 char *options)
400{
401	struct rbd_client *rbdc;
402	struct ceph_options *ceph_opts;
403	struct rbd_options *rbd_opts;
404
405	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406	if (!rbd_opts)
407		return ERR_PTR(-ENOMEM);
408
409	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410
411	ceph_opts = ceph_parse_options(options, mon_addr,
412					mon_addr + mon_addr_len,
413					parse_rbd_opts_token, rbd_opts);
414	if (IS_ERR(ceph_opts)) {
415		kfree(rbd_opts);
416		return ERR_CAST(ceph_opts);
417	}
418
419	spin_lock(&rbd_client_list_lock);
420	rbdc = __rbd_client_find(ceph_opts);
421	if (rbdc) {
422		/* using an existing client */
423		kref_get(&rbdc->kref);
424		spin_unlock(&rbd_client_list_lock);
425
426		ceph_destroy_options(ceph_opts);
427		kfree(rbd_opts);
428
429		return rbdc;
430	}
431	spin_unlock(&rbd_client_list_lock);
432
433	rbdc = rbd_client_create(ceph_opts, rbd_opts);
434
435	if (IS_ERR(rbdc))
436		kfree(rbd_opts);
437
438	return rbdc;
439}
440
441/*
442 * Destroy ceph client
443 *
444 * Caller must hold rbd_client_list_lock.
445 */
446static void rbd_client_release(struct kref *kref)
447{
448	struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450	dout("rbd_release_client %p\n", rbdc);
451	spin_lock(&rbd_client_list_lock);
452	list_del(&rbdc->node);
453	spin_unlock(&rbd_client_list_lock);
454
455	ceph_destroy_client(rbdc->client);
456	kfree(rbdc->rbd_opts);
457	kfree(rbdc);
458}
459
460/*
461 * Drop reference to ceph client node. If it's not referenced anymore, release
462 * it.
463 */
464static void rbd_put_client(struct rbd_device *rbd_dev)
465{
466	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467	rbd_dev->rbd_client = NULL;
468}
469
470/*
471 * Destroy requests collection
472 */
473static void rbd_coll_release(struct kref *kref)
474{
475	struct rbd_req_coll *coll =
476		container_of(kref, struct rbd_req_coll, kref);
477
478	dout("rbd_coll_release %p\n", coll);
479	kfree(coll);
480}
481
482static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483{
484	return !memcmp(&ondisk->text,
485			RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
486}
487
488/*
489 * Create a new header structure, translate header format from the on-disk
490 * header.
491 */
492static int rbd_header_from_disk(struct rbd_image_header *header,
493				 struct rbd_image_header_ondisk *ondisk,
494				 u32 allocated_snaps)
495{
496	u32 snap_count;
497
498	if (!rbd_dev_ondisk_valid(ondisk))
499		return -ENXIO;
500
501	snap_count = le32_to_cpu(ondisk->snap_count);
502	if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
503				 / sizeof (u64))
504		return -EINVAL;
505	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
506				snap_count * sizeof(u64),
507				GFP_KERNEL);
508	if (!header->snapc)
509		return -ENOMEM;
510
511	if (snap_count) {
512		header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513		header->snap_names = kmalloc(header->snap_names_len,
514					     GFP_KERNEL);
515		if (!header->snap_names)
516			goto err_snapc;
517		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
518					     GFP_KERNEL);
519		if (!header->snap_sizes)
520			goto err_names;
521	} else {
522		WARN_ON(ondisk->snap_names_len);
523		header->snap_names_len = 0;
524		header->snap_names = NULL;
525		header->snap_sizes = NULL;
526	}
527
528	header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
529					GFP_KERNEL);
530	if (!header->object_prefix)
531		goto err_sizes;
532
533	memcpy(header->object_prefix, ondisk->block_name,
534	       sizeof(ondisk->block_name));
535	header->object_prefix[sizeof (ondisk->block_name)] = '\0';
536
537	header->image_size = le64_to_cpu(ondisk->image_size);
538	header->obj_order = ondisk->options.order;
539	header->crypt_type = ondisk->options.crypt_type;
540	header->comp_type = ondisk->options.comp_type;
541
542	atomic_set(&header->snapc->nref, 1);
543	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
544	header->snapc->num_snaps = snap_count;
545	header->total_snaps = snap_count;
546
547	if (snap_count && allocated_snaps == snap_count) {
548		int i;
549
550		for (i = 0; i < snap_count; i++) {
551			header->snapc->snaps[i] =
552				le64_to_cpu(ondisk->snaps[i].id);
553			header->snap_sizes[i] =
554				le64_to_cpu(ondisk->snaps[i].image_size);
555		}
556
557		/* copy snapshot names */
558		memcpy(header->snap_names, &ondisk->snaps[snap_count],
559			header->snap_names_len);
560	}
561
562	return 0;
563
564err_sizes:
565	kfree(header->snap_sizes);
566	header->snap_sizes = NULL;
567err_names:
568	kfree(header->snap_names);
569	header->snap_names = NULL;
570err_snapc:
571	kfree(header->snapc);
572	header->snapc = NULL;
573
574	return -ENOMEM;
575}
576
577static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
578			u64 *seq, u64 *size)
579{
580	int i;
581	char *p = header->snap_names;
582
583	for (i = 0; i < header->total_snaps; i++) {
584		if (!strcmp(snap_name, p)) {
585
586			/* Found it.  Pass back its id and/or size */
587
588			if (seq)
589				*seq = header->snapc->snaps[i];
590			if (size)
591				*size = header->snap_sizes[i];
592			return i;
593		}
594		p += strlen(p) + 1;	/* Skip ahead to the next name */
595	}
596	return -ENOENT;
597}
598
599static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
600{
601	int ret;
602
603	down_write(&rbd_dev->header_rwsem);
604
605	if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
606		    sizeof (RBD_SNAP_HEAD_NAME))) {
607		rbd_dev->snap_id = CEPH_NOSNAP;
608		rbd_dev->snap_exists = false;
609		rbd_dev->read_only = 0;
610		if (size)
611			*size = rbd_dev->header.image_size;
612	} else {
613		u64 snap_id = 0;
614
615		ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
616					&snap_id, size);
617		if (ret < 0)
618			goto done;
619		rbd_dev->snap_id = snap_id;
620		rbd_dev->snap_exists = true;
621		rbd_dev->read_only = 1;
622	}
623
624	ret = 0;
625done:
626	up_write(&rbd_dev->header_rwsem);
627	return ret;
628}
629
630static void rbd_header_free(struct rbd_image_header *header)
631{
632	kfree(header->object_prefix);
633	kfree(header->snap_sizes);
634	kfree(header->snap_names);
635	ceph_put_snap_context(header->snapc);
636}
637
638/*
639 * get the actual striped segment name, offset and length
640 */
641static u64 rbd_get_segment(struct rbd_image_header *header,
642			   const char *object_prefix,
643			   u64 ofs, u64 len,
644			   char *seg_name, u64 *segofs)
645{
646	u64 seg = ofs >> header->obj_order;
647
648	if (seg_name)
649		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
650			 "%s.%012llx", object_prefix, seg);
651
652	ofs = ofs & ((1 << header->obj_order) - 1);
653	len = min_t(u64, len, (1 << header->obj_order) - ofs);
654
655	if (segofs)
656		*segofs = ofs;
657
658	return len;
659}
660
661static int rbd_get_num_segments(struct rbd_image_header *header,
662				u64 ofs, u64 len)
663{
664	u64 start_seg = ofs >> header->obj_order;
665	u64 end_seg = (ofs + len - 1) >> header->obj_order;
666	return end_seg - start_seg + 1;
667}
668
669/*
670 * returns the size of an object in the image
671 */
672static u64 rbd_obj_bytes(struct rbd_image_header *header)
673{
674	return 1 << header->obj_order;
675}
676
677/*
678 * bio helpers
679 */
680
681static void bio_chain_put(struct bio *chain)
682{
683	struct bio *tmp;
684
685	while (chain) {
686		tmp = chain;
687		chain = chain->bi_next;
688		bio_put(tmp);
689	}
690}
691
692/*
693 * zeros a bio chain, starting at specific offset
694 */
695static void zero_bio_chain(struct bio *chain, int start_ofs)
696{
697	struct bio_vec *bv;
698	unsigned long flags;
699	void *buf;
700	int i;
701	int pos = 0;
702
703	while (chain) {
704		bio_for_each_segment(bv, chain, i) {
705			if (pos + bv->bv_len > start_ofs) {
706				int remainder = max(start_ofs - pos, 0);
707				buf = bvec_kmap_irq(bv, &flags);
708				memset(buf + remainder, 0,
709				       bv->bv_len - remainder);
710				bvec_kunmap_irq(buf, &flags);
711			}
712			pos += bv->bv_len;
713		}
714
715		chain = chain->bi_next;
716	}
717}
718
719/*
720 * bio_chain_clone - clone a chain of bios up to a certain length.
721 * might return a bio_pair that will need to be released.
722 */
723static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
724				   struct bio_pair **bp,
725				   int len, gfp_t gfpmask)
726{
727	struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
728	int total = 0;
729
730	if (*bp) {
731		bio_pair_release(*bp);
732		*bp = NULL;
733	}
734
735	while (old_chain && (total < len)) {
736		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
737		if (!tmp)
738			goto err_out;
739
740		if (total + old_chain->bi_size > len) {
741			struct bio_pair *bp;
742
743			/*
744			 * this split can only happen with a single paged bio,
745			 * split_bio will BUG_ON if this is not the case
746			 */
747			dout("bio_chain_clone split! total=%d remaining=%d"
748			     "bi_size=%u\n",
749			     total, len - total, old_chain->bi_size);
750
751			/* split the bio. We'll release it either in the next
752			   call, or it will have to be released outside */
753			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
754			if (!bp)
755				goto err_out;
756
757			__bio_clone(tmp, &bp->bio1);
758
759			*next = &bp->bio2;
760		} else {
761			__bio_clone(tmp, old_chain);
762			*next = old_chain->bi_next;
763		}
764
765		tmp->bi_bdev = NULL;
766		gfpmask &= ~__GFP_WAIT;
767		tmp->bi_next = NULL;
768
769		if (!new_chain) {
770			new_chain = tail = tmp;
771		} else {
772			tail->bi_next = tmp;
773			tail = tmp;
774		}
775		old_chain = old_chain->bi_next;
776
777		total += tmp->bi_size;
778	}
779
780	BUG_ON(total < len);
781
782	if (tail)
783		tail->bi_next = NULL;
784
785	*old = old_chain;
786
787	return new_chain;
788
789err_out:
790	dout("bio_chain_clone with err\n");
791	bio_chain_put(new_chain);
792	return NULL;
793}
794
795/*
796 * helpers for osd request op vectors.
797 */
798static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
799					int opcode, u32 payload_len)
800{
801	struct ceph_osd_req_op *ops;
802
803	ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
804	if (!ops)
805		return NULL;
806
807	ops[0].op = opcode;
808
809	/*
810	 * op extent offset and length will be set later on
811	 * in calc_raw_layout()
812	 */
813	ops[0].payload_len = payload_len;
814
815	return ops;
816}
817
818static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
819{
820	kfree(ops);
821}
822
823static void rbd_coll_end_req_index(struct request *rq,
824				   struct rbd_req_coll *coll,
825				   int index,
826				   int ret, u64 len)
827{
828	struct request_queue *q;
829	int min, max, i;
830
831	dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
832	     coll, index, ret, (unsigned long long) len);
833
834	if (!rq)
835		return;
836
837	if (!coll) {
838		blk_end_request(rq, ret, len);
839		return;
840	}
841
842	q = rq->q;
843
844	spin_lock_irq(q->queue_lock);
845	coll->status[index].done = 1;
846	coll->status[index].rc = ret;
847	coll->status[index].bytes = len;
848	max = min = coll->num_done;
849	while (max < coll->total && coll->status[max].done)
850		max++;
851
852	for (i = min; i<max; i++) {
853		__blk_end_request(rq, coll->status[i].rc,
854				  coll->status[i].bytes);
855		coll->num_done++;
856		kref_put(&coll->kref, rbd_coll_release);
857	}
858	spin_unlock_irq(q->queue_lock);
859}
860
861static void rbd_coll_end_req(struct rbd_request *req,
862			     int ret, u64 len)
863{
864	rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
865}
866
867/*
868 * Send ceph osd request
869 */
870static int rbd_do_request(struct request *rq,
871			  struct rbd_device *rbd_dev,
872			  struct ceph_snap_context *snapc,
873			  u64 snapid,
874			  const char *object_name, u64 ofs, u64 len,
875			  struct bio *bio,
876			  struct page **pages,
877			  int num_pages,
878			  int flags,
879			  struct ceph_osd_req_op *ops,
880			  struct rbd_req_coll *coll,
881			  int coll_index,
882			  void (*rbd_cb)(struct ceph_osd_request *req,
883					 struct ceph_msg *msg),
884			  struct ceph_osd_request **linger_req,
885			  u64 *ver)
886{
887	struct ceph_osd_request *req;
888	struct ceph_file_layout *layout;
889	int ret;
890	u64 bno;
891	struct timespec mtime = CURRENT_TIME;
892	struct rbd_request *req_data;
893	struct ceph_osd_request_head *reqhead;
894	struct ceph_osd_client *osdc;
895
896	req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
897	if (!req_data) {
898		if (coll)
899			rbd_coll_end_req_index(rq, coll, coll_index,
900					       -ENOMEM, len);
901		return -ENOMEM;
902	}
903
904	if (coll) {
905		req_data->coll = coll;
906		req_data->coll_index = coll_index;
907	}
908
909	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
910		(unsigned long long) ofs, (unsigned long long) len);
911
912	osdc = &rbd_dev->rbd_client->client->osdc;
913	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
914					false, GFP_NOIO, pages, bio);
915	if (!req) {
916		ret = -ENOMEM;
917		goto done_pages;
918	}
919
920	req->r_callback = rbd_cb;
921
922	req_data->rq = rq;
923	req_data->bio = bio;
924	req_data->pages = pages;
925	req_data->len = len;
926
927	req->r_priv = req_data;
928
929	reqhead = req->r_request->front.iov_base;
930	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
931
932	strncpy(req->r_oid, object_name, sizeof(req->r_oid));
933	req->r_oid_len = strlen(req->r_oid);
934
935	layout = &req->r_file_layout;
936	memset(layout, 0, sizeof(*layout));
937	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938	layout->fl_stripe_count = cpu_to_le32(1);
939	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
940	layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
941	ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
942				req, ops);
943
944	ceph_osdc_build_request(req, ofs, &len,
945				ops,
946				snapc,
947				&mtime,
948				req->r_oid, req->r_oid_len);
949
950	if (linger_req) {
951		ceph_osdc_set_request_linger(osdc, req);
952		*linger_req = req;
953	}
954
955	ret = ceph_osdc_start_request(osdc, req, false);
956	if (ret < 0)
957		goto done_err;
958
959	if (!rbd_cb) {
960		ret = ceph_osdc_wait_request(osdc, req);
961		if (ver)
962			*ver = le64_to_cpu(req->r_reassert_version.version);
963		dout("reassert_ver=%llu\n",
964			(unsigned long long)
965				le64_to_cpu(req->r_reassert_version.version));
966		ceph_osdc_put_request(req);
967	}
968	return ret;
969
970done_err:
971	bio_chain_put(req_data->bio);
972	ceph_osdc_put_request(req);
973done_pages:
974	rbd_coll_end_req(req_data, ret, len);
975	kfree(req_data);
976	return ret;
977}
978
979/*
980 * Ceph osd op callback
981 */
982static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
983{
984	struct rbd_request *req_data = req->r_priv;
985	struct ceph_osd_reply_head *replyhead;
986	struct ceph_osd_op *op;
987	__s32 rc;
988	u64 bytes;
989	int read_op;
990
991	/* parse reply */
992	replyhead = msg->front.iov_base;
993	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
994	op = (void *)(replyhead + 1);
995	rc = le32_to_cpu(replyhead->result);
996	bytes = le64_to_cpu(op->extent.length);
997	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
998
999	dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1000		(unsigned long long) bytes, read_op, (int) rc);
1001
1002	if (rc == -ENOENT && read_op) {
1003		zero_bio_chain(req_data->bio, 0);
1004		rc = 0;
1005	} else if (rc == 0 && read_op && bytes < req_data->len) {
1006		zero_bio_chain(req_data->bio, bytes);
1007		bytes = req_data->len;
1008	}
1009
1010	rbd_coll_end_req(req_data, rc, bytes);
1011
1012	if (req_data->bio)
1013		bio_chain_put(req_data->bio);
1014
1015	ceph_osdc_put_request(req);
1016	kfree(req_data);
1017}
1018
1019static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1020{
1021	ceph_osdc_put_request(req);
1022}
1023
1024/*
1025 * Do a synchronous ceph osd operation
1026 */
1027static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1028			   struct ceph_snap_context *snapc,
1029			   u64 snapid,
1030			   int flags,
1031			   struct ceph_osd_req_op *ops,
1032			   const char *object_name,
1033			   u64 ofs, u64 len,
1034			   char *buf,
1035			   struct ceph_osd_request **linger_req,
1036			   u64 *ver)
1037{
1038	int ret;
1039	struct page **pages;
1040	int num_pages;
1041
1042	BUG_ON(ops == NULL);
1043
1044	num_pages = calc_pages_for(ofs , len);
1045	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1046	if (IS_ERR(pages))
1047		return PTR_ERR(pages);
1048
1049	ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1050			  object_name, ofs, len, NULL,
1051			  pages, num_pages,
1052			  flags,
1053			  ops,
1054			  NULL, 0,
1055			  NULL,
1056			  linger_req, ver);
1057	if (ret < 0)
1058		goto done;
1059
1060	if ((flags & CEPH_OSD_FLAG_READ) && buf)
1061		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1062
1063done:
1064	ceph_release_page_vector(pages, num_pages);
1065	return ret;
1066}
1067
1068/*
1069 * Do an asynchronous ceph osd operation
1070 */
1071static int rbd_do_op(struct request *rq,
1072		     struct rbd_device *rbd_dev,
1073		     struct ceph_snap_context *snapc,
1074		     u64 snapid,
1075		     int opcode, int flags,
1076		     u64 ofs, u64 len,
1077		     struct bio *bio,
1078		     struct rbd_req_coll *coll,
1079		     int coll_index)
1080{
1081	char *seg_name;
1082	u64 seg_ofs;
1083	u64 seg_len;
1084	int ret;
1085	struct ceph_osd_req_op *ops;
1086	u32 payload_len;
1087
1088	seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1089	if (!seg_name)
1090		return -ENOMEM;
1091
1092	seg_len = rbd_get_segment(&rbd_dev->header,
1093				  rbd_dev->header.object_prefix,
1094				  ofs, len,
1095				  seg_name, &seg_ofs);
1096
1097	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1098
1099	ret = -ENOMEM;
1100	ops = rbd_create_rw_ops(1, opcode, payload_len);
1101	if (!ops)
1102		goto done;
1103
1104	/* we've taken care of segment sizes earlier when we
1105	   cloned the bios. We should never have a segment
1106	   truncated at this point */
1107	BUG_ON(seg_len < len);
1108
1109	ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110			     seg_name, seg_ofs, seg_len,
1111			     bio,
1112			     NULL, 0,
1113			     flags,
1114			     ops,
1115			     coll, coll_index,
1116			     rbd_req_cb, 0, NULL);
1117
1118	rbd_destroy_ops(ops);
1119done:
1120	kfree(seg_name);
1121	return ret;
1122}
1123
1124/*
1125 * Request async osd write
1126 */
1127static int rbd_req_write(struct request *rq,
1128			 struct rbd_device *rbd_dev,
1129			 struct ceph_snap_context *snapc,
1130			 u64 ofs, u64 len,
1131			 struct bio *bio,
1132			 struct rbd_req_coll *coll,
1133			 int coll_index)
1134{
1135	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1136			 CEPH_OSD_OP_WRITE,
1137			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1138			 ofs, len, bio, coll, coll_index);
1139}
1140
1141/*
1142 * Request async osd read
1143 */
1144static int rbd_req_read(struct request *rq,
1145			 struct rbd_device *rbd_dev,
1146			 u64 snapid,
1147			 u64 ofs, u64 len,
1148			 struct bio *bio,
1149			 struct rbd_req_coll *coll,
1150			 int coll_index)
1151{
1152	return rbd_do_op(rq, rbd_dev, NULL,
1153			 snapid,
1154			 CEPH_OSD_OP_READ,
1155			 CEPH_OSD_FLAG_READ,
1156			 ofs, len, bio, coll, coll_index);
1157}
1158
1159/*
1160 * Request sync osd read
1161 */
1162static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1163			  u64 snapid,
1164			  const char *object_name,
1165			  u64 ofs, u64 len,
1166			  char *buf,
1167			  u64 *ver)
1168{
1169	struct ceph_osd_req_op *ops;
1170	int ret;
1171
1172	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1173	if (!ops)
1174		return -ENOMEM;
1175
1176	ret = rbd_req_sync_op(rbd_dev, NULL,
1177			       snapid,
1178			       CEPH_OSD_FLAG_READ,
1179			       ops, object_name, ofs, len, buf, NULL, ver);
1180	rbd_destroy_ops(ops);
1181
1182	return ret;
1183}
1184
1185/*
1186 * Request sync osd watch
1187 */
1188static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1189				   u64 ver,
1190				   u64 notify_id)
1191{
1192	struct ceph_osd_req_op *ops;
1193	int ret;
1194
1195	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1196	if (!ops)
1197		return -ENOMEM;
1198
1199	ops[0].watch.ver = cpu_to_le64(ver);
1200	ops[0].watch.cookie = notify_id;
1201	ops[0].watch.flag = 0;
1202
1203	ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1204			  rbd_dev->header_name, 0, 0, NULL,
1205			  NULL, 0,
1206			  CEPH_OSD_FLAG_READ,
1207			  ops,
1208			  NULL, 0,
1209			  rbd_simple_req_cb, 0, NULL);
1210
1211	rbd_destroy_ops(ops);
1212	return ret;
1213}
1214
1215static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1216{
1217	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1218	u64 hver;
1219	int rc;
1220
1221	if (!rbd_dev)
1222		return;
1223
1224	dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1225		rbd_dev->header_name, (unsigned long long) notify_id,
1226		(unsigned int) opcode);
1227	rc = rbd_refresh_header(rbd_dev, &hver);
1228	if (rc)
1229		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1230			   " update snaps: %d\n", rbd_dev->major, rc);
1231
1232	rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1233}
1234
1235/*
1236 * Request sync osd watch
1237 */
1238static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1239{
1240	struct ceph_osd_req_op *ops;
1241	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1242	int ret;
1243
1244	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1245	if (!ops)
1246		return -ENOMEM;
1247
1248	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249				     (void *)rbd_dev, &rbd_dev->watch_event);
1250	if (ret < 0)
1251		goto fail;
1252
1253	ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1254	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1255	ops[0].watch.flag = 1;
1256
1257	ret = rbd_req_sync_op(rbd_dev, NULL,
1258			      CEPH_NOSNAP,
1259			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1260			      ops,
1261			      rbd_dev->header_name,
1262			      0, 0, NULL,
1263			      &rbd_dev->watch_request, NULL);
1264
1265	if (ret < 0)
1266		goto fail_event;
1267
1268	rbd_destroy_ops(ops);
1269	return 0;
1270
1271fail_event:
1272	ceph_osdc_cancel_event(rbd_dev->watch_event);
1273	rbd_dev->watch_event = NULL;
1274fail:
1275	rbd_destroy_ops(ops);
1276	return ret;
1277}
1278
1279/*
1280 * Request sync osd unwatch
1281 */
1282static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1283{
1284	struct ceph_osd_req_op *ops;
1285	int ret;
1286
1287	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1288	if (!ops)
1289		return -ENOMEM;
1290
1291	ops[0].watch.ver = 0;
1292	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1293	ops[0].watch.flag = 0;
1294
1295	ret = rbd_req_sync_op(rbd_dev, NULL,
1296			      CEPH_NOSNAP,
1297			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1298			      ops,
1299			      rbd_dev->header_name,
1300			      0, 0, NULL, NULL, NULL);
1301
1302
1303	rbd_destroy_ops(ops);
1304	ceph_osdc_cancel_event(rbd_dev->watch_event);
1305	rbd_dev->watch_event = NULL;
1306	return ret;
1307}
1308
1309struct rbd_notify_info {
1310	struct rbd_device *rbd_dev;
1311};
1312
1313static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1314{
1315	struct rbd_device *rbd_dev = (struct rbd_device *)data;
1316	if (!rbd_dev)
1317		return;
1318
1319	dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1320			rbd_dev->header_name, (unsigned long long) notify_id,
1321			(unsigned int) opcode);
1322}
1323
1324/*
1325 * Request sync osd notify
1326 */
1327static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1328{
1329	struct ceph_osd_req_op *ops;
1330	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1331	struct ceph_osd_event *event;
1332	struct rbd_notify_info info;
1333	int payload_len = sizeof(u32) + sizeof(u32);
1334	int ret;
1335
1336	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1337	if (!ops)
1338		return -ENOMEM;
1339
1340	info.rbd_dev = rbd_dev;
1341
1342	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1343				     (void *)&info, &event);
1344	if (ret < 0)
1345		goto fail;
1346
1347	ops[0].watch.ver = 1;
1348	ops[0].watch.flag = 1;
1349	ops[0].watch.cookie = event->cookie;
1350	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1351	ops[0].watch.timeout = 12;
1352
1353	ret = rbd_req_sync_op(rbd_dev, NULL,
1354			       CEPH_NOSNAP,
1355			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356			       ops,
1357			       rbd_dev->header_name,
1358			       0, 0, NULL, NULL, NULL);
1359	if (ret < 0)
1360		goto fail_event;
1361
1362	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1363	dout("ceph_osdc_wait_event returned %d\n", ret);
1364	rbd_destroy_ops(ops);
1365	return 0;
1366
1367fail_event:
1368	ceph_osdc_cancel_event(event);
1369fail:
1370	rbd_destroy_ops(ops);
1371	return ret;
1372}
1373
1374/*
1375 * Request sync osd read
1376 */
1377static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1378			     const char *object_name,
1379			     const char *class_name,
1380			     const char *method_name,
1381			     const char *data,
1382			     int len,
1383			     u64 *ver)
1384{
1385	struct ceph_osd_req_op *ops;
1386	int class_name_len = strlen(class_name);
1387	int method_name_len = strlen(method_name);
1388	int ret;
1389
1390	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1391				    class_name_len + method_name_len + len);
1392	if (!ops)
1393		return -ENOMEM;
1394
1395	ops[0].cls.class_name = class_name;
1396	ops[0].cls.class_len = (__u8) class_name_len;
1397	ops[0].cls.method_name = method_name;
1398	ops[0].cls.method_len = (__u8) method_name_len;
1399	ops[0].cls.argc = 0;
1400	ops[0].cls.indata = data;
1401	ops[0].cls.indata_len = len;
1402
1403	ret = rbd_req_sync_op(rbd_dev, NULL,
1404			       CEPH_NOSNAP,
1405			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1406			       ops,
1407			       object_name, 0, 0, NULL, NULL, ver);
1408
1409	rbd_destroy_ops(ops);
1410
1411	dout("cls_exec returned %d\n", ret);
1412	return ret;
1413}
1414
1415static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1416{
1417	struct rbd_req_coll *coll =
1418			kzalloc(sizeof(struct rbd_req_coll) +
1419			        sizeof(struct rbd_req_status) * num_reqs,
1420				GFP_ATOMIC);
1421
1422	if (!coll)
1423		return NULL;
1424	coll->total = num_reqs;
1425	kref_init(&coll->kref);
1426	return coll;
1427}
1428
1429/*
1430 * block device queue callback
1431 */
1432static void rbd_rq_fn(struct request_queue *q)
1433{
1434	struct rbd_device *rbd_dev = q->queuedata;
1435	struct request *rq;
1436	struct bio_pair *bp = NULL;
1437
1438	while ((rq = blk_fetch_request(q))) {
1439		struct bio *bio;
1440		struct bio *rq_bio, *next_bio = NULL;
1441		bool do_write;
1442		unsigned int size;
1443		u64 op_size = 0;
1444		u64 ofs;
1445		int num_segs, cur_seg = 0;
1446		struct rbd_req_coll *coll;
1447		struct ceph_snap_context *snapc;
1448
1449		/* peek at request from block layer */
1450		if (!rq)
1451			break;
1452
1453		dout("fetched request\n");
1454
1455		/* filter out block requests we don't understand */
1456		if ((rq->cmd_type != REQ_TYPE_FS)) {
1457			__blk_end_request_all(rq, 0);
1458			continue;
1459		}
1460
1461		/* deduce our operation (read, write) */
1462		do_write = (rq_data_dir(rq) == WRITE);
1463
1464		size = blk_rq_bytes(rq);
1465		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1466		rq_bio = rq->bio;
1467		if (do_write && rbd_dev->read_only) {
1468			__blk_end_request_all(rq, -EROFS);
1469			continue;
1470		}
1471
1472		spin_unlock_irq(q->queue_lock);
1473
1474		down_read(&rbd_dev->header_rwsem);
1475
1476		if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1477			up_read(&rbd_dev->header_rwsem);
1478			dout("request for non-existent snapshot");
1479			spin_lock_irq(q->queue_lock);
1480			__blk_end_request_all(rq, -ENXIO);
1481			continue;
1482		}
1483
1484		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1485
1486		up_read(&rbd_dev->header_rwsem);
1487
1488		dout("%s 0x%x bytes at 0x%llx\n",
1489		     do_write ? "write" : "read",
1490		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1491
1492		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1493		coll = rbd_alloc_coll(num_segs);
1494		if (!coll) {
1495			spin_lock_irq(q->queue_lock);
1496			__blk_end_request_all(rq, -ENOMEM);
1497			ceph_put_snap_context(snapc);
1498			continue;
1499		}
1500
1501		do {
1502			/* a bio clone to be passed down to OSD req */
1503			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1504			op_size = rbd_get_segment(&rbd_dev->header,
1505						  rbd_dev->header.object_prefix,
1506						  ofs, size,
1507						  NULL, NULL);
1508			kref_get(&coll->kref);
1509			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1510					      op_size, GFP_ATOMIC);
1511			if (!bio) {
1512				rbd_coll_end_req_index(rq, coll, cur_seg,
1513						       -ENOMEM, op_size);
1514				goto next_seg;
1515			}
1516
1517
1518			/* init OSD command: write or read */
1519			if (do_write)
1520				rbd_req_write(rq, rbd_dev,
1521					      snapc,
1522					      ofs,
1523					      op_size, bio,
1524					      coll, cur_seg);
1525			else
1526				rbd_req_read(rq, rbd_dev,
1527					     rbd_dev->snap_id,
1528					     ofs,
1529					     op_size, bio,
1530					     coll, cur_seg);
1531
1532next_seg:
1533			size -= op_size;
1534			ofs += op_size;
1535
1536			cur_seg++;
1537			rq_bio = next_bio;
1538		} while (size > 0);
1539		kref_put(&coll->kref, rbd_coll_release);
1540
1541		if (bp)
1542			bio_pair_release(bp);
1543		spin_lock_irq(q->queue_lock);
1544
1545		ceph_put_snap_context(snapc);
1546	}
1547}
1548
1549/*
1550 * a queue callback. Makes sure that we don't create a bio that spans across
1551 * multiple osd objects. One exception would be with a single page bios,
1552 * which we handle later at bio_chain_clone
1553 */
1554static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1555			  struct bio_vec *bvec)
1556{
1557	struct rbd_device *rbd_dev = q->queuedata;
1558	unsigned int chunk_sectors;
1559	sector_t sector;
1560	unsigned int bio_sectors;
1561	int max;
1562
1563	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1564	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1565	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1566
1567	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1568				 + bio_sectors)) << SECTOR_SHIFT;
1569	if (max < 0)
1570		max = 0; /* bio_add cannot handle a negative return */
1571	if (max <= bvec->bv_len && bio_sectors == 0)
1572		return bvec->bv_len;
1573	return max;
1574}
1575
1576static void rbd_free_disk(struct rbd_device *rbd_dev)
1577{
1578	struct gendisk *disk = rbd_dev->disk;
1579
1580	if (!disk)
1581		return;
1582
1583	rbd_header_free(&rbd_dev->header);
1584
1585	if (disk->flags & GENHD_FL_UP)
1586		del_gendisk(disk);
1587	if (disk->queue)
1588		blk_cleanup_queue(disk->queue);
1589	put_disk(disk);
1590}
1591
1592/*
1593 * reload the ondisk the header
1594 */
1595static int rbd_read_header(struct rbd_device *rbd_dev,
1596			   struct rbd_image_header *header)
1597{
1598	ssize_t rc;
1599	struct rbd_image_header_ondisk *dh;
1600	u32 snap_count = 0;
1601	u64 ver;
1602	size_t len;
1603
1604	/*
1605	 * First reads the fixed-size header to determine the number
1606	 * of snapshots, then re-reads it, along with all snapshot
1607	 * records as well as their stored names.
1608	 */
1609	len = sizeof (*dh);
1610	while (1) {
1611		dh = kmalloc(len, GFP_KERNEL);
1612		if (!dh)
1613			return -ENOMEM;
1614
1615		rc = rbd_req_sync_read(rbd_dev,
1616				       CEPH_NOSNAP,
1617				       rbd_dev->header_name,
1618				       0, len,
1619				       (char *)dh, &ver);
1620		if (rc < 0)
1621			goto out_dh;
1622
1623		rc = rbd_header_from_disk(header, dh, snap_count);
1624		if (rc < 0) {
1625			if (rc == -ENXIO)
1626				pr_warning("unrecognized header format"
1627					   " for image %s\n",
1628					   rbd_dev->image_name);
1629			goto out_dh;
1630		}
1631
1632		if (snap_count == header->total_snaps)
1633			break;
1634
1635		snap_count = header->total_snaps;
1636		len = sizeof (*dh) +
1637			snap_count * sizeof(struct rbd_image_snap_ondisk) +
1638			header->snap_names_len;
1639
1640		rbd_header_free(header);
1641		kfree(dh);
1642	}
1643	header->obj_version = ver;
1644
1645out_dh:
1646	kfree(dh);
1647	return rc;
1648}
1649
1650/*
1651 * create a snapshot
1652 */
1653static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1654			       const char *snap_name,
1655			       gfp_t gfp_flags)
1656{
1657	int name_len = strlen(snap_name);
1658	u64 new_snapid;
1659	int ret;
1660	void *data, *p, *e;
1661	struct ceph_mon_client *monc;
1662
1663	/* we should create a snapshot only if we're pointing at the head */
1664	if (rbd_dev->snap_id != CEPH_NOSNAP)
1665		return -EINVAL;
1666
1667	monc = &rbd_dev->rbd_client->client->monc;
1668	ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1669	dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1670	if (ret < 0)
1671		return ret;
1672
1673	data = kmalloc(name_len + 16, gfp_flags);
1674	if (!data)
1675		return -ENOMEM;
1676
1677	p = data;
1678	e = data + name_len + 16;
1679
1680	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1681	ceph_encode_64_safe(&p, e, new_snapid, bad);
1682
1683	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1684				"rbd", "snap_add",
1685				data, p - data, NULL);
1686
1687	kfree(data);
1688
1689	return ret < 0 ? ret : 0;
1690bad:
1691	return -ERANGE;
1692}
1693
1694static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1695{
1696	struct rbd_snap *snap;
1697	struct rbd_snap *next;
1698
1699	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1700		__rbd_remove_snap_dev(snap);
1701}
1702
1703/*
1704 * only read the first part of the ondisk header, without the snaps info
1705 */
1706static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1707{
1708	int ret;
1709	struct rbd_image_header h;
1710
1711	ret = rbd_read_header(rbd_dev, &h);
1712	if (ret < 0)
1713		return ret;
1714
1715	down_write(&rbd_dev->header_rwsem);
1716
1717	/* resized? */
1718	if (rbd_dev->snap_id == CEPH_NOSNAP) {
1719		sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1720
1721		dout("setting size to %llu sectors", (unsigned long long) size);
1722		set_capacity(rbd_dev->disk, size);
1723	}
1724
1725	/* rbd_dev->header.object_prefix shouldn't change */
1726	kfree(rbd_dev->header.snap_sizes);
1727	kfree(rbd_dev->header.snap_names);
1728	/* osd requests may still refer to snapc */
1729	ceph_put_snap_context(rbd_dev->header.snapc);
1730
1731	if (hver)
1732		*hver = h.obj_version;
1733	rbd_dev->header.obj_version = h.obj_version;
1734	rbd_dev->header.image_size = h.image_size;
1735	rbd_dev->header.total_snaps = h.total_snaps;
1736	rbd_dev->header.snapc = h.snapc;
1737	rbd_dev->header.snap_names = h.snap_names;
1738	rbd_dev->header.snap_names_len = h.snap_names_len;
1739	rbd_dev->header.snap_sizes = h.snap_sizes;
1740	/* Free the extra copy of the object prefix */
1741	WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1742	kfree(h.object_prefix);
1743
1744	ret = __rbd_init_snaps_header(rbd_dev);
1745
1746	up_write(&rbd_dev->header_rwsem);
1747
1748	return ret;
1749}
1750
1751static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1752{
1753	int ret;
1754
1755	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1756	ret = __rbd_refresh_header(rbd_dev, hver);
1757	mutex_unlock(&ctl_mutex);
1758
1759	return ret;
1760}
1761
1762static int rbd_init_disk(struct rbd_device *rbd_dev)
1763{
1764	struct gendisk *disk;
1765	struct request_queue *q;
1766	int rc;
1767	u64 segment_size;
1768	u64 total_size = 0;
1769
1770	/* contact OSD, request size info about the object being mapped */
1771	rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1772	if (rc)
1773		return rc;
1774
1775	/* no need to lock here, as rbd_dev is not registered yet */
1776	rc = __rbd_init_snaps_header(rbd_dev);
1777	if (rc)
1778		return rc;
1779
1780	rc = rbd_header_set_snap(rbd_dev, &total_size);
1781	if (rc)
1782		return rc;
1783
1784	/* create gendisk info */
1785	rc = -ENOMEM;
1786	disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1787	if (!disk)
1788		goto out;
1789
1790	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1791		 rbd_dev->dev_id);
1792	disk->major = rbd_dev->major;
1793	disk->first_minor = 0;
1794	disk->fops = &rbd_bd_ops;
1795	disk->private_data = rbd_dev;
1796
1797	/* init rq */
1798	rc = -ENOMEM;
1799	q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1800	if (!q)
1801		goto out_disk;
1802
1803	/* We use the default size, but let's be explicit about it. */
1804	blk_queue_physical_block_size(q, SECTOR_SIZE);
1805
1806	/* set io sizes to object size */
1807	segment_size = rbd_obj_bytes(&rbd_dev->header);
1808	blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1809	blk_queue_max_segment_size(q, segment_size);
1810	blk_queue_io_min(q, segment_size);
1811	blk_queue_io_opt(q, segment_size);
1812
1813	blk_queue_merge_bvec(q, rbd_merge_bvec);
1814	disk->queue = q;
1815
1816	q->queuedata = rbd_dev;
1817
1818	rbd_dev->disk = disk;
1819	rbd_dev->q = q;
1820
1821	/* finally, announce the disk to the world */
1822	set_capacity(disk, total_size / SECTOR_SIZE);
1823	add_disk(disk);
1824
1825	pr_info("%s: added with size 0x%llx\n",
1826		disk->disk_name, (unsigned long long)total_size);
1827	return 0;
1828
1829out_disk:
1830	put_disk(disk);
1831out:
1832	return rc;
1833}
1834
1835/*
1836  sysfs
1837*/
1838
1839static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1840{
1841	return container_of(dev, struct rbd_device, dev);
1842}
1843
1844static ssize_t rbd_size_show(struct device *dev,
1845			     struct device_attribute *attr, char *buf)
1846{
1847	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1848	sector_t size;
1849
1850	down_read(&rbd_dev->header_rwsem);
1851	size = get_capacity(rbd_dev->disk);
1852	up_read(&rbd_dev->header_rwsem);
1853
1854	return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1855}
1856
1857static ssize_t rbd_major_show(struct device *dev,
1858			      struct device_attribute *attr, char *buf)
1859{
1860	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861
1862	return sprintf(buf, "%d\n", rbd_dev->major);
1863}
1864
1865static ssize_t rbd_client_id_show(struct device *dev,
1866				  struct device_attribute *attr, char *buf)
1867{
1868	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869
1870	return sprintf(buf, "client%lld\n",
1871			ceph_client_id(rbd_dev->rbd_client->client));
1872}
1873
1874static ssize_t rbd_pool_show(struct device *dev,
1875			     struct device_attribute *attr, char *buf)
1876{
1877	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879	return sprintf(buf, "%s\n", rbd_dev->pool_name);
1880}
1881
1882static ssize_t rbd_pool_id_show(struct device *dev,
1883			     struct device_attribute *attr, char *buf)
1884{
1885	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1886
1887	return sprintf(buf, "%d\n", rbd_dev->pool_id);
1888}
1889
1890static ssize_t rbd_name_show(struct device *dev,
1891			     struct device_attribute *attr, char *buf)
1892{
1893	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1894
1895	return sprintf(buf, "%s\n", rbd_dev->image_name);
1896}
1897
1898static ssize_t rbd_snap_show(struct device *dev,
1899			     struct device_attribute *attr,
1900			     char *buf)
1901{
1902	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903
1904	return sprintf(buf, "%s\n", rbd_dev->snap_name);
1905}
1906
1907static ssize_t rbd_image_refresh(struct device *dev,
1908				 struct device_attribute *attr,
1909				 const char *buf,
1910				 size_t size)
1911{
1912	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913	int ret;
1914
1915	ret = rbd_refresh_header(rbd_dev, NULL);
1916
1917	return ret < 0 ? ret : size;
1918}
1919
1920static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1921static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1922static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1923static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1924static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1925static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1926static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1927static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1928static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1929
1930static struct attribute *rbd_attrs[] = {
1931	&dev_attr_size.attr,
1932	&dev_attr_major.attr,
1933	&dev_attr_client_id.attr,
1934	&dev_attr_pool.attr,
1935	&dev_attr_pool_id.attr,
1936	&dev_attr_name.attr,
1937	&dev_attr_current_snap.attr,
1938	&dev_attr_refresh.attr,
1939	&dev_attr_create_snap.attr,
1940	NULL
1941};
1942
1943static struct attribute_group rbd_attr_group = {
1944	.attrs = rbd_attrs,
1945};
1946
1947static const struct attribute_group *rbd_attr_groups[] = {
1948	&rbd_attr_group,
1949	NULL
1950};
1951
1952static void rbd_sysfs_dev_release(struct device *dev)
1953{
1954}
1955
1956static struct device_type rbd_device_type = {
1957	.name		= "rbd",
1958	.groups		= rbd_attr_groups,
1959	.release	= rbd_sysfs_dev_release,
1960};
1961
1962
1963/*
1964  sysfs - snapshots
1965*/
1966
1967static ssize_t rbd_snap_size_show(struct device *dev,
1968				  struct device_attribute *attr,
1969				  char *buf)
1970{
1971	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1972
1973	return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1974}
1975
1976static ssize_t rbd_snap_id_show(struct device *dev,
1977				struct device_attribute *attr,
1978				char *buf)
1979{
1980	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1981
1982	return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1983}
1984
1985static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1986static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1987
1988static struct attribute *rbd_snap_attrs[] = {
1989	&dev_attr_snap_size.attr,
1990	&dev_attr_snap_id.attr,
1991	NULL,
1992};
1993
1994static struct attribute_group rbd_snap_attr_group = {
1995	.attrs = rbd_snap_attrs,
1996};
1997
1998static void rbd_snap_dev_release(struct device *dev)
1999{
2000	struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001	kfree(snap->name);
2002	kfree(snap);
2003}
2004
2005static const struct attribute_group *rbd_snap_attr_groups[] = {
2006	&rbd_snap_attr_group,
2007	NULL
2008};
2009
2010static struct device_type rbd_snap_device_type = {
2011	.groups		= rbd_snap_attr_groups,
2012	.release	= rbd_snap_dev_release,
2013};
2014
2015static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2016{
2017	list_del(&snap->node);
2018	device_unregister(&snap->dev);
2019}
2020
2021static int rbd_register_snap_dev(struct rbd_snap *snap,
2022				  struct device *parent)
2023{
2024	struct device *dev = &snap->dev;
2025	int ret;
2026
2027	dev->type = &rbd_snap_device_type;
2028	dev->parent = parent;
2029	dev->release = rbd_snap_dev_release;
2030	dev_set_name(dev, "snap_%s", snap->name);
2031	ret = device_register(dev);
2032
2033	return ret;
2034}
2035
2036static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2037					      int i, const char *name)
2038{
2039	struct rbd_snap *snap;
2040	int ret;
2041
2042	snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2043	if (!snap)
2044		return ERR_PTR(-ENOMEM);
2045
2046	ret = -ENOMEM;
2047	snap->name = kstrdup(name, GFP_KERNEL);
2048	if (!snap->name)
2049		goto err;
2050
2051	snap->size = rbd_dev->header.snap_sizes[i];
2052	snap->id = rbd_dev->header.snapc->snaps[i];
2053	if (device_is_registered(&rbd_dev->dev)) {
2054		ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2055		if (ret < 0)
2056			goto err;
2057	}
2058
2059	return snap;
2060
2061err:
2062	kfree(snap->name);
2063	kfree(snap);
2064
2065	return ERR_PTR(ret);
2066}
2067
2068/*
2069 * search for the previous snap in a null delimited string list
2070 */
2071const char *rbd_prev_snap_name(const char *name, const char *start)
2072{
2073	if (name < start + 2)
2074		return NULL;
2075
2076	name -= 2;
2077	while (*name) {
2078		if (name == start)
2079			return start;
2080		name--;
2081	}
2082	return name + 1;
2083}
2084
2085/*
2086 * compare the old list of snapshots that we have to what's in the header
2087 * and update it accordingly. Note that the header holds the snapshots
2088 * in a reverse order (from newest to oldest) and we need to go from
2089 * older to new so that we don't get a duplicate snap name when
2090 * doing the process (e.g., removed snapshot and recreated a new
2091 * one with the same name.
2092 */
2093static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2094{
2095	const char *name, *first_name;
2096	int i = rbd_dev->header.total_snaps;
2097	struct rbd_snap *snap, *old_snap = NULL;
2098	struct list_head *p, *n;
2099
2100	first_name = rbd_dev->header.snap_names;
2101	name = first_name + rbd_dev->header.snap_names_len;
2102
2103	list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2104		u64 cur_id;
2105
2106		old_snap = list_entry(p, struct rbd_snap, node);
2107
2108		if (i)
2109			cur_id = rbd_dev->header.snapc->snaps[i - 1];
2110
2111		if (!i || old_snap->id < cur_id) {
2112			/*
2113			 * old_snap->id was skipped, thus was
2114			 * removed.  If this rbd_dev is mapped to
2115			 * the removed snapshot, record that it no
2116			 * longer exists, to prevent further I/O.
2117			 */
2118			if (rbd_dev->snap_id == old_snap->id)
2119				rbd_dev->snap_exists = false;
2120			__rbd_remove_snap_dev(old_snap);
2121			continue;
2122		}
2123		if (old_snap->id == cur_id) {
2124			/* we have this snapshot already */
2125			i--;
2126			name = rbd_prev_snap_name(name, first_name);
2127			continue;
2128		}
2129		for (; i > 0;
2130		     i--, name = rbd_prev_snap_name(name, first_name)) {
2131			if (!name) {
2132				WARN_ON(1);
2133				return -EINVAL;
2134			}
2135			cur_id = rbd_dev->header.snapc->snaps[i];
2136			/* snapshot removal? handle it above */
2137			if (cur_id >= old_snap->id)
2138				break;
2139			/* a new snapshot */
2140			snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2141			if (IS_ERR(snap))
2142				return PTR_ERR(snap);
2143
2144			/* note that we add it backward so using n and not p */
2145			list_add(&snap->node, n);
2146			p = &snap->node;
2147		}
2148	}
2149	/* we're done going over the old snap list, just add what's left */
2150	for (; i > 0; i--) {
2151		name = rbd_prev_snap_name(name, first_name);
2152		if (!name) {
2153			WARN_ON(1);
2154			return -EINVAL;
2155		}
2156		snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2157		if (IS_ERR(snap))
2158			return PTR_ERR(snap);
2159		list_add(&snap->node, &rbd_dev->snaps);
2160	}
2161
2162	return 0;
2163}
2164
2165static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2166{
2167	int ret;
2168	struct device *dev;
2169	struct rbd_snap *snap;
2170
2171	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2172	dev = &rbd_dev->dev;
2173
2174	dev->bus = &rbd_bus_type;
2175	dev->type = &rbd_device_type;
2176	dev->parent = &rbd_root_dev;
2177	dev->release = rbd_dev_release;
2178	dev_set_name(dev, "%d", rbd_dev->dev_id);
2179	ret = device_register(dev);
2180	if (ret < 0)
2181		goto out;
2182
2183	list_for_each_entry(snap, &rbd_dev->snaps, node) {
2184		ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2185		if (ret < 0)
2186			break;
2187	}
2188out:
2189	mutex_unlock(&ctl_mutex);
2190	return ret;
2191}
2192
2193static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2194{
2195	device_unregister(&rbd_dev->dev);
2196}
2197
2198static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2199{
2200	int ret, rc;
2201
2202	do {
2203		ret = rbd_req_sync_watch(rbd_dev);
2204		if (ret == -ERANGE) {
2205			rc = rbd_refresh_header(rbd_dev, NULL);
2206			if (rc < 0)
2207				return rc;
2208		}
2209	} while (ret == -ERANGE);
2210
2211	return ret;
2212}
2213
2214static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2215
2216/*
2217 * Get a unique rbd identifier for the given new rbd_dev, and add
2218 * the rbd_dev to the global list.  The minimum rbd id is 1.
2219 */
2220static void rbd_id_get(struct rbd_device *rbd_dev)
2221{
2222	rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2223
2224	spin_lock(&rbd_dev_list_lock);
2225	list_add_tail(&rbd_dev->node, &rbd_dev_list);
2226	spin_unlock(&rbd_dev_list_lock);
2227}
2228
2229/*
2230 * Remove an rbd_dev from the global list, and record that its
2231 * identifier is no longer in use.
2232 */
2233static void rbd_id_put(struct rbd_device *rbd_dev)
2234{
2235	struct list_head *tmp;
2236	int rbd_id = rbd_dev->dev_id;
2237	int max_id;
2238
2239	BUG_ON(rbd_id < 1);
2240
2241	spin_lock(&rbd_dev_list_lock);
2242	list_del_init(&rbd_dev->node);
2243
2244	/*
2245	 * If the id being "put" is not the current maximum, there
2246	 * is nothing special we need to do.
2247	 */
2248	if (rbd_id != atomic64_read(&rbd_id_max)) {
2249		spin_unlock(&rbd_dev_list_lock);
2250		return;
2251	}
2252
2253	/*
2254	 * We need to update the current maximum id.  Search the
2255	 * list to find out what it is.  We're more likely to find
2256	 * the maximum at the end, so search the list backward.
2257	 */
2258	max_id = 0;
2259	list_for_each_prev(tmp, &rbd_dev_list) {
2260		struct rbd_device *rbd_dev;
2261
2262		rbd_dev = list_entry(tmp, struct rbd_device, node);
2263		if (rbd_id > max_id)
2264			max_id = rbd_id;
2265	}
2266	spin_unlock(&rbd_dev_list_lock);
2267
2268	/*
2269	 * The max id could have been updated by rbd_id_get(), in
2270	 * which case it now accurately reflects the new maximum.
2271	 * Be careful not to overwrite the maximum value in that
2272	 * case.
2273	 */
2274	atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2275}
2276
2277/*
2278 * Skips over white space at *buf, and updates *buf to point to the
2279 * first found non-space character (if any). Returns the length of
2280 * the token (string of non-white space characters) found.  Note
2281 * that *buf must be terminated with '\0'.
2282 */
2283static inline size_t next_token(const char **buf)
2284{
2285        /*
2286        * These are the characters that produce nonzero for
2287        * isspace() in the "C" and "POSIX" locales.
2288        */
2289        const char *spaces = " \f\n\r\t\v";
2290
2291        *buf += strspn(*buf, spaces);	/* Find start of token */
2292
2293	return strcspn(*buf, spaces);   /* Return token length */
2294}
2295
2296/*
2297 * Finds the next token in *buf, and if the provided token buffer is
2298 * big enough, copies the found token into it.  The result, if
2299 * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2300 * must be terminated with '\0' on entry.
2301 *
2302 * Returns the length of the token found (not including the '\0').
2303 * Return value will be 0 if no token is found, and it will be >=
2304 * token_size if the token would not fit.
2305 *
2306 * The *buf pointer will be updated to point beyond the end of the
2307 * found token.  Note that this occurs even if the token buffer is
2308 * too small to hold it.
2309 */
2310static inline size_t copy_token(const char **buf,
2311				char *token,
2312				size_t token_size)
2313{
2314        size_t len;
2315
2316	len = next_token(buf);
2317	if (len < token_size) {
2318		memcpy(token, *buf, len);
2319		*(token + len) = '\0';
2320	}
2321	*buf += len;
2322
2323        return len;
2324}
2325
2326/*
2327 * Finds the next token in *buf, dynamically allocates a buffer big
2328 * enough to hold a copy of it, and copies the token into the new
2329 * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2330 * that a duplicate buffer is created even for a zero-length token.
2331 *
2332 * Returns a pointer to the newly-allocated duplicate, or a null
2333 * pointer if memory for the duplicate was not available.  If
2334 * the lenp argument is a non-null pointer, the length of the token
2335 * (not including the '\0') is returned in *lenp.
2336 *
2337 * If successful, the *buf pointer will be updated to point beyond
2338 * the end of the found token.
2339 *
2340 * Note: uses GFP_KERNEL for allocation.
2341 */
2342static inline char *dup_token(const char **buf, size_t *lenp)
2343{
2344	char *dup;
2345	size_t len;
2346
2347	len = next_token(buf);
2348	dup = kmalloc(len + 1, GFP_KERNEL);
2349	if (!dup)
2350		return NULL;
2351
2352	memcpy(dup, *buf, len);
2353	*(dup + len) = '\0';
2354	*buf += len;
2355
2356	if (lenp)
2357		*lenp = len;
2358
2359	return dup;
2360}
2361
2362/*
2363 * This fills in the pool_name, image_name, image_name_len, snap_name,
2364 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2365 * on the list of monitor addresses and other options provided via
2366 * /sys/bus/rbd/add.
2367 *
2368 * Note: rbd_dev is assumed to have been initially zero-filled.
2369 */
2370static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2371			      const char *buf,
2372			      const char **mon_addrs,
2373			      size_t *mon_addrs_size,
2374			      char *options,
2375			     size_t options_size)
2376{
2377	size_t len;
2378	int ret;
2379
2380	/* The first four tokens are required */
2381
2382	len = next_token(&buf);
2383	if (!len)
2384		return -EINVAL;
2385	*mon_addrs_size = len + 1;
2386	*mon_addrs = buf;
2387
2388	buf += len;
2389
2390	len = copy_token(&buf, options, options_size);
2391	if (!len || len >= options_size)
2392		return -EINVAL;
2393
2394	ret = -ENOMEM;
2395	rbd_dev->pool_name = dup_token(&buf, NULL);
2396	if (!rbd_dev->pool_name)
2397		goto out_err;
2398
2399	rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2400	if (!rbd_dev->image_name)
2401		goto out_err;
2402
2403	/* Create the name of the header object */
2404
2405	rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2406						+ sizeof (RBD_SUFFIX),
2407					GFP_KERNEL);
2408	if (!rbd_dev->header_name)
2409		goto out_err;
2410	sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2411
2412	/*
2413	 * The snapshot name is optional.  If none is is supplied,
2414	 * we use the default value.
2415	 */
2416	rbd_dev->snap_name = dup_token(&buf, &len);
2417	if (!rbd_dev->snap_name)
2418		goto out_err;
2419	if (!len) {
2420		/* Replace the empty name with the default */
2421		kfree(rbd_dev->snap_name);
2422		rbd_dev->snap_name
2423			= kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2424		if (!rbd_dev->snap_name)
2425			goto out_err;
2426
2427		memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2428			sizeof (RBD_SNAP_HEAD_NAME));
2429	}
2430
2431	return 0;
2432
2433out_err:
2434	kfree(rbd_dev->header_name);
2435	kfree(rbd_dev->image_name);
2436	kfree(rbd_dev->pool_name);
2437	rbd_dev->pool_name = NULL;
2438
2439	return ret;
2440}
2441
2442static ssize_t rbd_add(struct bus_type *bus,
2443		       const char *buf,
2444		       size_t count)
2445{
2446	char *options;
2447	struct rbd_device *rbd_dev = NULL;
2448	const char *mon_addrs = NULL;
2449	size_t mon_addrs_size = 0;
2450	struct ceph_osd_client *osdc;
2451	int rc = -ENOMEM;
2452
2453	if (!try_module_get(THIS_MODULE))
2454		return -ENODEV;
2455
2456	options = kmalloc(count, GFP_KERNEL);
2457	if (!options)
2458		goto err_nomem;
2459	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2460	if (!rbd_dev)
2461		goto err_nomem;
2462
2463	/* static rbd_device initialization */
2464	spin_lock_init(&rbd_dev->lock);
2465	INIT_LIST_HEAD(&rbd_dev->node);
2466	INIT_LIST_HEAD(&rbd_dev->snaps);
2467	init_rwsem(&rbd_dev->header_rwsem);
2468
2469	/* generate unique id: find highest unique id, add one */
2470	rbd_id_get(rbd_dev);
2471
2472	/* Fill in the device name, now that we have its id. */
2473	BUILD_BUG_ON(DEV_NAME_LEN
2474			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2475	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2476
2477	/* parse add command */
2478	rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2479				options, count);
2480	if (rc)
2481		goto err_put_id;
2482
2483	rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2484						options);
2485	if (IS_ERR(rbd_dev->rbd_client)) {
2486		rc = PTR_ERR(rbd_dev->rbd_client);
2487		goto err_put_id;
2488	}
2489
2490	/* pick the pool */
2491	osdc = &rbd_dev->rbd_client->client->osdc;
2492	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2493	if (rc < 0)
2494		goto err_out_client;
2495	rbd_dev->pool_id = rc;
2496
2497	/* register our block device */
2498	rc = register_blkdev(0, rbd_dev->name);
2499	if (rc < 0)
2500		goto err_out_client;
2501	rbd_dev->major = rc;
2502
2503	rc = rbd_bus_add_dev(rbd_dev);
2504	if (rc)
2505		goto err_out_blkdev;
2506
2507	/*
2508	 * At this point cleanup in the event of an error is the job
2509	 * of the sysfs code (initiated by rbd_bus_del_dev()).
2510	 *
2511	 * Set up and announce blkdev mapping.
2512	 */
2513	rc = rbd_init_disk(rbd_dev);
2514	if (rc)
2515		goto err_out_bus;
2516
2517	rc = rbd_init_watch_dev(rbd_dev);
2518	if (rc)
2519		goto err_out_bus;
2520
2521	return count;
2522
2523err_out_bus:
2524	/* this will also clean up rest of rbd_dev stuff */
2525
2526	rbd_bus_del_dev(rbd_dev);
2527	kfree(options);
2528	return rc;
2529
2530err_out_blkdev:
2531	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2532err_out_client:
2533	rbd_put_client(rbd_dev);
2534err_put_id:
2535	if (rbd_dev->pool_name) {
2536		kfree(rbd_dev->snap_name);
2537		kfree(rbd_dev->header_name);
2538		kfree(rbd_dev->image_name);
2539		kfree(rbd_dev->pool_name);
2540	}
2541	rbd_id_put(rbd_dev);
2542err_nomem:
2543	kfree(rbd_dev);
2544	kfree(options);
2545
2546	dout("Error adding device %s\n", buf);
2547	module_put(THIS_MODULE);
2548
2549	return (ssize_t) rc;
2550}
2551
2552static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2553{
2554	struct list_head *tmp;
2555	struct rbd_device *rbd_dev;
2556
2557	spin_lock(&rbd_dev_list_lock);
2558	list_for_each(tmp, &rbd_dev_list) {
2559		rbd_dev = list_entry(tmp, struct rbd_device, node);
2560		if (rbd_dev->dev_id == dev_id) {
2561			spin_unlock(&rbd_dev_list_lock);
2562			return rbd_dev;
2563		}
2564	}
2565	spin_unlock(&rbd_dev_list_lock);
2566	return NULL;
2567}
2568
2569static void rbd_dev_release(struct device *dev)
2570{
2571	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2572
2573	if (rbd_dev->watch_request) {
2574		struct ceph_client *client = rbd_dev->rbd_client->client;
2575
2576		ceph_osdc_unregister_linger_request(&client->osdc,
2577						    rbd_dev->watch_request);
2578	}
2579	if (rbd_dev->watch_event)
2580		rbd_req_sync_unwatch(rbd_dev);
2581
2582	rbd_put_client(rbd_dev);
2583
2584	/* clean up and free blkdev */
2585	rbd_free_disk(rbd_dev);
2586	unregister_blkdev(rbd_dev->major, rbd_dev->name);
2587
2588	/* done with the id, and with the rbd_dev */
2589	kfree(rbd_dev->snap_name);
2590	kfree(rbd_dev->header_name);
2591	kfree(rbd_dev->pool_name);
2592	kfree(rbd_dev->image_name);
2593	rbd_id_put(rbd_dev);
2594	kfree(rbd_dev);
2595
2596	/* release module ref */
2597	module_put(THIS_MODULE);
2598}
2599
2600static ssize_t rbd_remove(struct bus_type *bus,
2601			  const char *buf,
2602			  size_t count)
2603{
2604	struct rbd_device *rbd_dev = NULL;
2605	int target_id, rc;
2606	unsigned long ul;
2607	int ret = count;
2608
2609	rc = strict_strtoul(buf, 10, &ul);
2610	if (rc)
2611		return rc;
2612
2613	/* convert to int; abort if we lost anything in the conversion */
2614	target_id = (int) ul;
2615	if (target_id != ul)
2616		return -EINVAL;
2617
2618	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2619
2620	rbd_dev = __rbd_get_dev(target_id);
2621	if (!rbd_dev) {
2622		ret = -ENOENT;
2623		goto done;
2624	}
2625
2626	__rbd_remove_all_snaps(rbd_dev);
2627	rbd_bus_del_dev(rbd_dev);
2628
2629done:
2630	mutex_unlock(&ctl_mutex);
2631	return ret;
2632}
2633
2634static ssize_t rbd_snap_add(struct device *dev,
2635			    struct device_attribute *attr,
2636			    const char *buf,
2637			    size_t count)
2638{
2639	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2640	int ret;
2641	char *name = kmalloc(count + 1, GFP_KERNEL);
2642	if (!name)
2643		return -ENOMEM;
2644
2645	snprintf(name, count, "%s", buf);
2646
2647	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2648
2649	ret = rbd_header_add_snap(rbd_dev,
2650				  name, GFP_KERNEL);
2651	if (ret < 0)
2652		goto err_unlock;
2653
2654	ret = __rbd_refresh_header(rbd_dev, NULL);
2655	if (ret < 0)
2656		goto err_unlock;
2657
2658	/* shouldn't hold ctl_mutex when notifying.. notify might
2659	   trigger a watch callback that would need to get that mutex */
2660	mutex_unlock(&ctl_mutex);
2661
2662	/* make a best effort, don't error if failed */
2663	rbd_req_sync_notify(rbd_dev);
2664
2665	ret = count;
2666	kfree(name);
2667	return ret;
2668
2669err_unlock:
2670	mutex_unlock(&ctl_mutex);
2671	kfree(name);
2672	return ret;
2673}
2674
2675/*
2676 * create control files in sysfs
2677 * /sys/bus/rbd/...
2678 */
2679static int rbd_sysfs_init(void)
2680{
2681	int ret;
2682
2683	ret = device_register(&rbd_root_dev);
2684	if (ret < 0)
2685		return ret;
2686
2687	ret = bus_register(&rbd_bus_type);
2688	if (ret < 0)
2689		device_unregister(&rbd_root_dev);
2690
2691	return ret;
2692}
2693
2694static void rbd_sysfs_cleanup(void)
2695{
2696	bus_unregister(&rbd_bus_type);
2697	device_unregister(&rbd_root_dev);
2698}
2699
2700int __init rbd_init(void)
2701{
2702	int rc;
2703
2704	rc = rbd_sysfs_init();
2705	if (rc)
2706		return rc;
2707	pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2708	return 0;
2709}
2710
2711void __exit rbd_exit(void)
2712{
2713	rbd_sysfs_cleanup();
2714}
2715
2716module_init(rbd_init);
2717module_exit(rbd_exit);
2718
2719MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2720MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2721MODULE_DESCRIPTION("rados block device");
2722
2723/* following authorship retained from original osdblk.c */
2724MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2725
2726MODULE_LICENSE("GPL");
2727