rbd.c revision c47f9371545abe2510ac3b66c3fc180921816f65
1/* 2 rbd.c -- Export ceph rados objects as a Linux block device 3 4 5 based on drivers/block/osdblk.c: 6 7 Copyright 2009 Red Hat, Inc. 8 9 This program is free software; you can redistribute it and/or modify 10 it under the terms of the GNU General Public License as published by 11 the Free Software Foundation. 12 13 This program is distributed in the hope that it will be useful, 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 GNU General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; see the file COPYING. If not, write to 20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 21 22 23 24 For usage instructions, please refer to: 25 26 Documentation/ABI/testing/sysfs-bus-rbd 27 28 */ 29 30#include <linux/ceph/libceph.h> 31#include <linux/ceph/osd_client.h> 32#include <linux/ceph/mon_client.h> 33#include <linux/ceph/decode.h> 34#include <linux/parser.h> 35 36#include <linux/kernel.h> 37#include <linux/device.h> 38#include <linux/module.h> 39#include <linux/fs.h> 40#include <linux/blkdev.h> 41 42#include "rbd_types.h" 43 44#define RBD_DEBUG /* Activate rbd_assert() calls */ 45 46/* 47 * The basic unit of block I/O is a sector. It is interpreted in a 48 * number of contexts in Linux (blk, bio, genhd), but the default is 49 * universally 512 bytes. These symbols are just slightly more 50 * meaningful than the bare numbers they represent. 51 */ 52#define SECTOR_SHIFT 9 53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 54 55/* It might be useful to have these defined elsewhere */ 56 57#define U8_MAX ((u8) (~0U)) 58#define U16_MAX ((u16) (~0U)) 59#define U32_MAX ((u32) (~0U)) 60#define U64_MAX ((u64) (~0ULL)) 61 62#define RBD_DRV_NAME "rbd" 63#define RBD_DRV_NAME_LONG "rbd (rados block device)" 64 65#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 66 67#define RBD_SNAP_DEV_NAME_PREFIX "snap_" 68#define RBD_MAX_SNAP_NAME_LEN \ 69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 70 71#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 72 73#define RBD_SNAP_HEAD_NAME "-" 74 75/* This allows a single page to hold an image name sent by OSD */ 76#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 77#define RBD_IMAGE_ID_LEN_MAX 64 78 79#define RBD_OBJ_PREFIX_LEN_MAX 64 80 81/* Feature bits */ 82 83#define RBD_FEATURE_LAYERING 1 84 85/* Features supported by this (client software) implementation. */ 86 87#define RBD_FEATURES_ALL (0) 88 89/* 90 * An RBD device name will be "rbd#", where the "rbd" comes from 91 * RBD_DRV_NAME above, and # is a unique integer identifier. 92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big 93 * enough to hold all possible device names. 94 */ 95#define DEV_NAME_LEN 32 96#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) 97 98/* 99 * block device image metadata (in-memory version) 100 */ 101struct rbd_image_header { 102 /* These four fields never change for a given rbd image */ 103 char *object_prefix; 104 u64 features; 105 __u8 obj_order; 106 __u8 crypt_type; 107 __u8 comp_type; 108 109 /* The remaining fields need to be updated occasionally */ 110 u64 image_size; 111 struct ceph_snap_context *snapc; 112 char *snap_names; 113 u64 *snap_sizes; 114 115 u64 obj_version; 116}; 117 118/* 119 * An rbd image specification. 120 * 121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 122 * identify an image. Each rbd_dev structure includes a pointer to 123 * an rbd_spec structure that encapsulates this identity. 124 * 125 * Each of the id's in an rbd_spec has an associated name. For a 126 * user-mapped image, the names are supplied and the id's associated 127 * with them are looked up. For a layered image, a parent image is 128 * defined by the tuple, and the names are looked up. 129 * 130 * An rbd_dev structure contains a parent_spec pointer which is 131 * non-null if the image it represents is a child in a layered 132 * image. This pointer will refer to the rbd_spec structure used 133 * by the parent rbd_dev for its own identity (i.e., the structure 134 * is shared between the parent and child). 135 * 136 * Since these structures are populated once, during the discovery 137 * phase of image construction, they are effectively immutable so 138 * we make no effort to synchronize access to them. 139 * 140 * Note that code herein does not assume the image name is known (it 141 * could be a null pointer). 142 */ 143struct rbd_spec { 144 u64 pool_id; 145 char *pool_name; 146 147 char *image_id; 148 char *image_name; 149 150 u64 snap_id; 151 char *snap_name; 152 153 struct kref kref; 154}; 155 156/* 157 * an instance of the client. multiple devices may share an rbd client. 158 */ 159struct rbd_client { 160 struct ceph_client *client; 161 struct kref kref; 162 struct list_head node; 163}; 164 165struct rbd_img_request; 166typedef void (*rbd_img_callback_t)(struct rbd_img_request *); 167 168#define BAD_WHICH U32_MAX /* Good which or bad which, which? */ 169 170struct rbd_obj_request; 171typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); 172 173enum obj_request_type { 174 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES 175}; 176 177struct rbd_obj_request { 178 const char *object_name; 179 u64 offset; /* object start byte */ 180 u64 length; /* bytes from offset */ 181 182 struct rbd_img_request *img_request; 183 struct list_head links; /* img_request->obj_requests */ 184 u32 which; /* posn image request list */ 185 186 enum obj_request_type type; 187 union { 188 struct bio *bio_list; 189 struct { 190 struct page **pages; 191 u32 page_count; 192 }; 193 }; 194 195 struct ceph_osd_request *osd_req; 196 197 u64 xferred; /* bytes transferred */ 198 u64 version; 199 s32 result; 200 atomic_t done; 201 202 rbd_obj_callback_t callback; 203 struct completion completion; 204 205 struct kref kref; 206}; 207 208struct rbd_img_request { 209 struct request *rq; 210 struct rbd_device *rbd_dev; 211 u64 offset; /* starting image byte offset */ 212 u64 length; /* byte count from offset */ 213 bool write_request; /* false for read */ 214 union { 215 struct ceph_snap_context *snapc; /* for writes */ 216 u64 snap_id; /* for reads */ 217 }; 218 spinlock_t completion_lock;/* protects next_completion */ 219 u32 next_completion; 220 rbd_img_callback_t callback; 221 222 u32 obj_request_count; 223 struct list_head obj_requests; /* rbd_obj_request structs */ 224 225 struct kref kref; 226}; 227 228#define for_each_obj_request(ireq, oreq) \ 229 list_for_each_entry(oreq, &(ireq)->obj_requests, links) 230#define for_each_obj_request_from(ireq, oreq) \ 231 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) 232#define for_each_obj_request_safe(ireq, oreq, n) \ 233 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) 234 235struct rbd_snap { 236 struct device dev; 237 const char *name; 238 u64 size; 239 struct list_head node; 240 u64 id; 241 u64 features; 242}; 243 244struct rbd_mapping { 245 u64 size; 246 u64 features; 247 bool read_only; 248}; 249 250/* 251 * a single device 252 */ 253struct rbd_device { 254 int dev_id; /* blkdev unique id */ 255 256 int major; /* blkdev assigned major */ 257 struct gendisk *disk; /* blkdev's gendisk and rq */ 258 259 u32 image_format; /* Either 1 or 2 */ 260 struct rbd_client *rbd_client; 261 262 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 263 264 spinlock_t lock; /* queue, flags, open_count */ 265 266 struct rbd_image_header header; 267 unsigned long flags; /* possibly lock protected */ 268 struct rbd_spec *spec; 269 270 char *header_name; 271 272 struct ceph_file_layout layout; 273 274 struct ceph_osd_event *watch_event; 275 struct rbd_obj_request *watch_request; 276 277 struct rbd_spec *parent_spec; 278 u64 parent_overlap; 279 280 /* protects updating the header */ 281 struct rw_semaphore header_rwsem; 282 283 struct rbd_mapping mapping; 284 285 struct list_head node; 286 287 /* list of snapshots */ 288 struct list_head snaps; 289 290 /* sysfs related */ 291 struct device dev; 292 unsigned long open_count; /* protected by lock */ 293}; 294 295/* 296 * Flag bits for rbd_dev->flags. If atomicity is required, 297 * rbd_dev->lock is used to protect access. 298 * 299 * Currently, only the "removing" flag (which is coupled with the 300 * "open_count" field) requires atomic access. 301 */ 302enum rbd_dev_flags { 303 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ 304 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 305}; 306 307static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 308 309static LIST_HEAD(rbd_dev_list); /* devices */ 310static DEFINE_SPINLOCK(rbd_dev_list_lock); 311 312static LIST_HEAD(rbd_client_list); /* clients */ 313static DEFINE_SPINLOCK(rbd_client_list_lock); 314 315static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); 316static int rbd_dev_snaps_register(struct rbd_device *rbd_dev); 317 318static void rbd_dev_release(struct device *dev); 319static void rbd_remove_snap_dev(struct rbd_snap *snap); 320 321static ssize_t rbd_add(struct bus_type *bus, const char *buf, 322 size_t count); 323static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 324 size_t count); 325 326static struct bus_attribute rbd_bus_attrs[] = { 327 __ATTR(add, S_IWUSR, NULL, rbd_add), 328 __ATTR(remove, S_IWUSR, NULL, rbd_remove), 329 __ATTR_NULL 330}; 331 332static struct bus_type rbd_bus_type = { 333 .name = "rbd", 334 .bus_attrs = rbd_bus_attrs, 335}; 336 337static void rbd_root_dev_release(struct device *dev) 338{ 339} 340 341static struct device rbd_root_dev = { 342 .init_name = "rbd", 343 .release = rbd_root_dev_release, 344}; 345 346static __printf(2, 3) 347void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) 348{ 349 struct va_format vaf; 350 va_list args; 351 352 va_start(args, fmt); 353 vaf.fmt = fmt; 354 vaf.va = &args; 355 356 if (!rbd_dev) 357 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); 358 else if (rbd_dev->disk) 359 printk(KERN_WARNING "%s: %s: %pV\n", 360 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); 361 else if (rbd_dev->spec && rbd_dev->spec->image_name) 362 printk(KERN_WARNING "%s: image %s: %pV\n", 363 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); 364 else if (rbd_dev->spec && rbd_dev->spec->image_id) 365 printk(KERN_WARNING "%s: id %s: %pV\n", 366 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); 367 else /* punt */ 368 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", 369 RBD_DRV_NAME, rbd_dev, &vaf); 370 va_end(args); 371} 372 373#ifdef RBD_DEBUG 374#define rbd_assert(expr) \ 375 if (unlikely(!(expr))) { \ 376 printk(KERN_ERR "\nAssertion failure in %s() " \ 377 "at line %d:\n\n" \ 378 "\trbd_assert(%s);\n\n", \ 379 __func__, __LINE__, #expr); \ 380 BUG(); \ 381 } 382#else /* !RBD_DEBUG */ 383# define rbd_assert(expr) ((void) 0) 384#endif /* !RBD_DEBUG */ 385 386static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); 387static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); 388 389static int rbd_open(struct block_device *bdev, fmode_t mode) 390{ 391 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 392 bool removing = false; 393 394 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 395 return -EROFS; 396 397 spin_lock_irq(&rbd_dev->lock); 398 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) 399 removing = true; 400 else 401 rbd_dev->open_count++; 402 spin_unlock_irq(&rbd_dev->lock); 403 if (removing) 404 return -ENOENT; 405 406 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 407 (void) get_device(&rbd_dev->dev); 408 set_device_ro(bdev, rbd_dev->mapping.read_only); 409 mutex_unlock(&ctl_mutex); 410 411 return 0; 412} 413 414static int rbd_release(struct gendisk *disk, fmode_t mode) 415{ 416 struct rbd_device *rbd_dev = disk->private_data; 417 unsigned long open_count_before; 418 419 spin_lock_irq(&rbd_dev->lock); 420 open_count_before = rbd_dev->open_count--; 421 spin_unlock_irq(&rbd_dev->lock); 422 rbd_assert(open_count_before > 0); 423 424 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 425 put_device(&rbd_dev->dev); 426 mutex_unlock(&ctl_mutex); 427 428 return 0; 429} 430 431static const struct block_device_operations rbd_bd_ops = { 432 .owner = THIS_MODULE, 433 .open = rbd_open, 434 .release = rbd_release, 435}; 436 437/* 438 * Initialize an rbd client instance. 439 * We own *ceph_opts. 440 */ 441static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 442{ 443 struct rbd_client *rbdc; 444 int ret = -ENOMEM; 445 446 dout("%s:\n", __func__); 447 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 448 if (!rbdc) 449 goto out_opt; 450 451 kref_init(&rbdc->kref); 452 INIT_LIST_HEAD(&rbdc->node); 453 454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 455 456 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); 457 if (IS_ERR(rbdc->client)) 458 goto out_mutex; 459 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 460 461 ret = ceph_open_session(rbdc->client); 462 if (ret < 0) 463 goto out_err; 464 465 spin_lock(&rbd_client_list_lock); 466 list_add_tail(&rbdc->node, &rbd_client_list); 467 spin_unlock(&rbd_client_list_lock); 468 469 mutex_unlock(&ctl_mutex); 470 dout("%s: rbdc %p\n", __func__, rbdc); 471 472 return rbdc; 473 474out_err: 475 ceph_destroy_client(rbdc->client); 476out_mutex: 477 mutex_unlock(&ctl_mutex); 478 kfree(rbdc); 479out_opt: 480 if (ceph_opts) 481 ceph_destroy_options(ceph_opts); 482 dout("%s: error %d\n", __func__, ret); 483 484 return ERR_PTR(ret); 485} 486 487/* 488 * Find a ceph client with specific addr and configuration. If 489 * found, bump its reference count. 490 */ 491static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 492{ 493 struct rbd_client *client_node; 494 bool found = false; 495 496 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 497 return NULL; 498 499 spin_lock(&rbd_client_list_lock); 500 list_for_each_entry(client_node, &rbd_client_list, node) { 501 if (!ceph_compare_options(ceph_opts, client_node->client)) { 502 kref_get(&client_node->kref); 503 found = true; 504 break; 505 } 506 } 507 spin_unlock(&rbd_client_list_lock); 508 509 return found ? client_node : NULL; 510} 511 512/* 513 * mount options 514 */ 515enum { 516 Opt_last_int, 517 /* int args above */ 518 Opt_last_string, 519 /* string args above */ 520 Opt_read_only, 521 Opt_read_write, 522 /* Boolean args above */ 523 Opt_last_bool, 524}; 525 526static match_table_t rbd_opts_tokens = { 527 /* int args above */ 528 /* string args above */ 529 {Opt_read_only, "read_only"}, 530 {Opt_read_only, "ro"}, /* Alternate spelling */ 531 {Opt_read_write, "read_write"}, 532 {Opt_read_write, "rw"}, /* Alternate spelling */ 533 /* Boolean args above */ 534 {-1, NULL} 535}; 536 537struct rbd_options { 538 bool read_only; 539}; 540 541#define RBD_READ_ONLY_DEFAULT false 542 543static int parse_rbd_opts_token(char *c, void *private) 544{ 545 struct rbd_options *rbd_opts = private; 546 substring_t argstr[MAX_OPT_ARGS]; 547 int token, intval, ret; 548 549 token = match_token(c, rbd_opts_tokens, argstr); 550 if (token < 0) 551 return -EINVAL; 552 553 if (token < Opt_last_int) { 554 ret = match_int(&argstr[0], &intval); 555 if (ret < 0) { 556 pr_err("bad mount option arg (not int) " 557 "at '%s'\n", c); 558 return ret; 559 } 560 dout("got int token %d val %d\n", token, intval); 561 } else if (token > Opt_last_int && token < Opt_last_string) { 562 dout("got string token %d val %s\n", token, 563 argstr[0].from); 564 } else if (token > Opt_last_string && token < Opt_last_bool) { 565 dout("got Boolean token %d\n", token); 566 } else { 567 dout("got token %d\n", token); 568 } 569 570 switch (token) { 571 case Opt_read_only: 572 rbd_opts->read_only = true; 573 break; 574 case Opt_read_write: 575 rbd_opts->read_only = false; 576 break; 577 default: 578 rbd_assert(false); 579 break; 580 } 581 return 0; 582} 583 584/* 585 * Get a ceph client with specific addr and configuration, if one does 586 * not exist create it. 587 */ 588static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 589{ 590 struct rbd_client *rbdc; 591 592 rbdc = rbd_client_find(ceph_opts); 593 if (rbdc) /* using an existing client */ 594 ceph_destroy_options(ceph_opts); 595 else 596 rbdc = rbd_client_create(ceph_opts); 597 598 return rbdc; 599} 600 601/* 602 * Destroy ceph client 603 * 604 * Caller must hold rbd_client_list_lock. 605 */ 606static void rbd_client_release(struct kref *kref) 607{ 608 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 609 610 dout("%s: rbdc %p\n", __func__, rbdc); 611 spin_lock(&rbd_client_list_lock); 612 list_del(&rbdc->node); 613 spin_unlock(&rbd_client_list_lock); 614 615 ceph_destroy_client(rbdc->client); 616 kfree(rbdc); 617} 618 619/* 620 * Drop reference to ceph client node. If it's not referenced anymore, release 621 * it. 622 */ 623static void rbd_put_client(struct rbd_client *rbdc) 624{ 625 if (rbdc) 626 kref_put(&rbdc->kref, rbd_client_release); 627} 628 629static bool rbd_image_format_valid(u32 image_format) 630{ 631 return image_format == 1 || image_format == 2; 632} 633 634static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 635{ 636 size_t size; 637 u32 snap_count; 638 639 /* The header has to start with the magic rbd header text */ 640 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 641 return false; 642 643 /* The bio layer requires at least sector-sized I/O */ 644 645 if (ondisk->options.order < SECTOR_SHIFT) 646 return false; 647 648 /* If we use u64 in a few spots we may be able to loosen this */ 649 650 if (ondisk->options.order > 8 * sizeof (int) - 1) 651 return false; 652 653 /* 654 * The size of a snapshot header has to fit in a size_t, and 655 * that limits the number of snapshots. 656 */ 657 snap_count = le32_to_cpu(ondisk->snap_count); 658 size = SIZE_MAX - sizeof (struct ceph_snap_context); 659 if (snap_count > size / sizeof (__le64)) 660 return false; 661 662 /* 663 * Not only that, but the size of the entire the snapshot 664 * header must also be representable in a size_t. 665 */ 666 size -= snap_count * sizeof (__le64); 667 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 668 return false; 669 670 return true; 671} 672 673/* 674 * Create a new header structure, translate header format from the on-disk 675 * header. 676 */ 677static int rbd_header_from_disk(struct rbd_image_header *header, 678 struct rbd_image_header_ondisk *ondisk) 679{ 680 u32 snap_count; 681 size_t len; 682 size_t size; 683 u32 i; 684 685 memset(header, 0, sizeof (*header)); 686 687 snap_count = le32_to_cpu(ondisk->snap_count); 688 689 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix)); 690 header->object_prefix = kmalloc(len + 1, GFP_KERNEL); 691 if (!header->object_prefix) 692 return -ENOMEM; 693 memcpy(header->object_prefix, ondisk->object_prefix, len); 694 header->object_prefix[len] = '\0'; 695 696 if (snap_count) { 697 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 698 699 /* Save a copy of the snapshot names */ 700 701 if (snap_names_len > (u64) SIZE_MAX) 702 return -EIO; 703 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL); 704 if (!header->snap_names) 705 goto out_err; 706 /* 707 * Note that rbd_dev_v1_header_read() guarantees 708 * the ondisk buffer we're working with has 709 * snap_names_len bytes beyond the end of the 710 * snapshot id array, this memcpy() is safe. 711 */ 712 memcpy(header->snap_names, &ondisk->snaps[snap_count], 713 snap_names_len); 714 715 /* Record each snapshot's size */ 716 717 size = snap_count * sizeof (*header->snap_sizes); 718 header->snap_sizes = kmalloc(size, GFP_KERNEL); 719 if (!header->snap_sizes) 720 goto out_err; 721 for (i = 0; i < snap_count; i++) 722 header->snap_sizes[i] = 723 le64_to_cpu(ondisk->snaps[i].image_size); 724 } else { 725 WARN_ON(ondisk->snap_names_len); 726 header->snap_names = NULL; 727 header->snap_sizes = NULL; 728 } 729 730 header->features = 0; /* No features support in v1 images */ 731 header->obj_order = ondisk->options.order; 732 header->crypt_type = ondisk->options.crypt_type; 733 header->comp_type = ondisk->options.comp_type; 734 735 /* Allocate and fill in the snapshot context */ 736 737 header->image_size = le64_to_cpu(ondisk->image_size); 738 size = sizeof (struct ceph_snap_context); 739 size += snap_count * sizeof (header->snapc->snaps[0]); 740 header->snapc = kzalloc(size, GFP_KERNEL); 741 if (!header->snapc) 742 goto out_err; 743 744 atomic_set(&header->snapc->nref, 1); 745 header->snapc->seq = le64_to_cpu(ondisk->snap_seq); 746 header->snapc->num_snaps = snap_count; 747 for (i = 0; i < snap_count; i++) 748 header->snapc->snaps[i] = 749 le64_to_cpu(ondisk->snaps[i].id); 750 751 return 0; 752 753out_err: 754 kfree(header->snap_sizes); 755 header->snap_sizes = NULL; 756 kfree(header->snap_names); 757 header->snap_names = NULL; 758 kfree(header->object_prefix); 759 header->object_prefix = NULL; 760 761 return -ENOMEM; 762} 763 764static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 765{ 766 struct rbd_snap *snap; 767 768 if (snap_id == CEPH_NOSNAP) 769 return RBD_SNAP_HEAD_NAME; 770 771 list_for_each_entry(snap, &rbd_dev->snaps, node) 772 if (snap_id == snap->id) 773 return snap->name; 774 775 return NULL; 776} 777 778static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name) 779{ 780 781 struct rbd_snap *snap; 782 783 list_for_each_entry(snap, &rbd_dev->snaps, node) { 784 if (!strcmp(snap_name, snap->name)) { 785 rbd_dev->spec->snap_id = snap->id; 786 rbd_dev->mapping.size = snap->size; 787 rbd_dev->mapping.features = snap->features; 788 789 return 0; 790 } 791 } 792 793 return -ENOENT; 794} 795 796static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) 797{ 798 int ret; 799 800 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME, 801 sizeof (RBD_SNAP_HEAD_NAME))) { 802 rbd_dev->spec->snap_id = CEPH_NOSNAP; 803 rbd_dev->mapping.size = rbd_dev->header.image_size; 804 rbd_dev->mapping.features = rbd_dev->header.features; 805 ret = 0; 806 } else { 807 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name); 808 if (ret < 0) 809 goto done; 810 rbd_dev->mapping.read_only = true; 811 } 812 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 813 814done: 815 return ret; 816} 817 818static void rbd_header_free(struct rbd_image_header *header) 819{ 820 kfree(header->object_prefix); 821 header->object_prefix = NULL; 822 kfree(header->snap_sizes); 823 header->snap_sizes = NULL; 824 kfree(header->snap_names); 825 header->snap_names = NULL; 826 ceph_put_snap_context(header->snapc); 827 header->snapc = NULL; 828} 829 830static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 831{ 832 char *name; 833 u64 segment; 834 int ret; 835 836 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO); 837 if (!name) 838 return NULL; 839 segment = offset >> rbd_dev->header.obj_order; 840 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx", 841 rbd_dev->header.object_prefix, segment); 842 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { 843 pr_err("error formatting segment name for #%llu (%d)\n", 844 segment, ret); 845 kfree(name); 846 name = NULL; 847 } 848 849 return name; 850} 851 852static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 853{ 854 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 855 856 return offset & (segment_size - 1); 857} 858 859static u64 rbd_segment_length(struct rbd_device *rbd_dev, 860 u64 offset, u64 length) 861{ 862 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 863 864 offset &= segment_size - 1; 865 866 rbd_assert(length <= U64_MAX - offset); 867 if (offset + length > segment_size) 868 length = segment_size - offset; 869 870 return length; 871} 872 873/* 874 * returns the size of an object in the image 875 */ 876static u64 rbd_obj_bytes(struct rbd_image_header *header) 877{ 878 return 1 << header->obj_order; 879} 880 881/* 882 * bio helpers 883 */ 884 885static void bio_chain_put(struct bio *chain) 886{ 887 struct bio *tmp; 888 889 while (chain) { 890 tmp = chain; 891 chain = chain->bi_next; 892 bio_put(tmp); 893 } 894} 895 896/* 897 * zeros a bio chain, starting at specific offset 898 */ 899static void zero_bio_chain(struct bio *chain, int start_ofs) 900{ 901 struct bio_vec *bv; 902 unsigned long flags; 903 void *buf; 904 int i; 905 int pos = 0; 906 907 while (chain) { 908 bio_for_each_segment(bv, chain, i) { 909 if (pos + bv->bv_len > start_ofs) { 910 int remainder = max(start_ofs - pos, 0); 911 buf = bvec_kmap_irq(bv, &flags); 912 memset(buf + remainder, 0, 913 bv->bv_len - remainder); 914 bvec_kunmap_irq(buf, &flags); 915 } 916 pos += bv->bv_len; 917 } 918 919 chain = chain->bi_next; 920 } 921} 922 923/* 924 * Clone a portion of a bio, starting at the given byte offset 925 * and continuing for the number of bytes indicated. 926 */ 927static struct bio *bio_clone_range(struct bio *bio_src, 928 unsigned int offset, 929 unsigned int len, 930 gfp_t gfpmask) 931{ 932 struct bio_vec *bv; 933 unsigned int resid; 934 unsigned short idx; 935 unsigned int voff; 936 unsigned short end_idx; 937 unsigned short vcnt; 938 struct bio *bio; 939 940 /* Handle the easy case for the caller */ 941 942 if (!offset && len == bio_src->bi_size) 943 return bio_clone(bio_src, gfpmask); 944 945 if (WARN_ON_ONCE(!len)) 946 return NULL; 947 if (WARN_ON_ONCE(len > bio_src->bi_size)) 948 return NULL; 949 if (WARN_ON_ONCE(offset > bio_src->bi_size - len)) 950 return NULL; 951 952 /* Find first affected segment... */ 953 954 resid = offset; 955 __bio_for_each_segment(bv, bio_src, idx, 0) { 956 if (resid < bv->bv_len) 957 break; 958 resid -= bv->bv_len; 959 } 960 voff = resid; 961 962 /* ...and the last affected segment */ 963 964 resid += len; 965 __bio_for_each_segment(bv, bio_src, end_idx, idx) { 966 if (resid <= bv->bv_len) 967 break; 968 resid -= bv->bv_len; 969 } 970 vcnt = end_idx - idx + 1; 971 972 /* Build the clone */ 973 974 bio = bio_alloc(gfpmask, (unsigned int) vcnt); 975 if (!bio) 976 return NULL; /* ENOMEM */ 977 978 bio->bi_bdev = bio_src->bi_bdev; 979 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT); 980 bio->bi_rw = bio_src->bi_rw; 981 bio->bi_flags |= 1 << BIO_CLONED; 982 983 /* 984 * Copy over our part of the bio_vec, then update the first 985 * and last (or only) entries. 986 */ 987 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx], 988 vcnt * sizeof (struct bio_vec)); 989 bio->bi_io_vec[0].bv_offset += voff; 990 if (vcnt > 1) { 991 bio->bi_io_vec[0].bv_len -= voff; 992 bio->bi_io_vec[vcnt - 1].bv_len = resid; 993 } else { 994 bio->bi_io_vec[0].bv_len = len; 995 } 996 997 bio->bi_vcnt = vcnt; 998 bio->bi_size = len; 999 bio->bi_idx = 0; 1000 1001 return bio; 1002} 1003 1004/* 1005 * Clone a portion of a bio chain, starting at the given byte offset 1006 * into the first bio in the source chain and continuing for the 1007 * number of bytes indicated. The result is another bio chain of 1008 * exactly the given length, or a null pointer on error. 1009 * 1010 * The bio_src and offset parameters are both in-out. On entry they 1011 * refer to the first source bio and the offset into that bio where 1012 * the start of data to be cloned is located. 1013 * 1014 * On return, bio_src is updated to refer to the bio in the source 1015 * chain that contains first un-cloned byte, and *offset will 1016 * contain the offset of that byte within that bio. 1017 */ 1018static struct bio *bio_chain_clone_range(struct bio **bio_src, 1019 unsigned int *offset, 1020 unsigned int len, 1021 gfp_t gfpmask) 1022{ 1023 struct bio *bi = *bio_src; 1024 unsigned int off = *offset; 1025 struct bio *chain = NULL; 1026 struct bio **end; 1027 1028 /* Build up a chain of clone bios up to the limit */ 1029 1030 if (!bi || off >= bi->bi_size || !len) 1031 return NULL; /* Nothing to clone */ 1032 1033 end = &chain; 1034 while (len) { 1035 unsigned int bi_size; 1036 struct bio *bio; 1037 1038 if (!bi) { 1039 rbd_warn(NULL, "bio_chain exhausted with %u left", len); 1040 goto out_err; /* EINVAL; ran out of bio's */ 1041 } 1042 bi_size = min_t(unsigned int, bi->bi_size - off, len); 1043 bio = bio_clone_range(bi, off, bi_size, gfpmask); 1044 if (!bio) 1045 goto out_err; /* ENOMEM */ 1046 1047 *end = bio; 1048 end = &bio->bi_next; 1049 1050 off += bi_size; 1051 if (off == bi->bi_size) { 1052 bi = bi->bi_next; 1053 off = 0; 1054 } 1055 len -= bi_size; 1056 } 1057 *bio_src = bi; 1058 *offset = off; 1059 1060 return chain; 1061out_err: 1062 bio_chain_put(chain); 1063 1064 return NULL; 1065} 1066 1067static void rbd_obj_request_get(struct rbd_obj_request *obj_request) 1068{ 1069 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1070 atomic_read(&obj_request->kref.refcount)); 1071 kref_get(&obj_request->kref); 1072} 1073 1074static void rbd_obj_request_destroy(struct kref *kref); 1075static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1076{ 1077 rbd_assert(obj_request != NULL); 1078 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1079 atomic_read(&obj_request->kref.refcount)); 1080 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1081} 1082 1083static void rbd_img_request_get(struct rbd_img_request *img_request) 1084{ 1085 dout("%s: img %p (was %d)\n", __func__, img_request, 1086 atomic_read(&img_request->kref.refcount)); 1087 kref_get(&img_request->kref); 1088} 1089 1090static void rbd_img_request_destroy(struct kref *kref); 1091static void rbd_img_request_put(struct rbd_img_request *img_request) 1092{ 1093 rbd_assert(img_request != NULL); 1094 dout("%s: img %p (was %d)\n", __func__, img_request, 1095 atomic_read(&img_request->kref.refcount)); 1096 kref_put(&img_request->kref, rbd_img_request_destroy); 1097} 1098 1099static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1100 struct rbd_obj_request *obj_request) 1101{ 1102 rbd_assert(obj_request->img_request == NULL); 1103 1104 rbd_obj_request_get(obj_request); 1105 obj_request->img_request = img_request; 1106 obj_request->which = img_request->obj_request_count; 1107 rbd_assert(obj_request->which != BAD_WHICH); 1108 img_request->obj_request_count++; 1109 list_add_tail(&obj_request->links, &img_request->obj_requests); 1110 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1111 obj_request->which); 1112} 1113 1114static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1115 struct rbd_obj_request *obj_request) 1116{ 1117 rbd_assert(obj_request->which != BAD_WHICH); 1118 1119 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1120 obj_request->which); 1121 list_del(&obj_request->links); 1122 rbd_assert(img_request->obj_request_count > 0); 1123 img_request->obj_request_count--; 1124 rbd_assert(obj_request->which == img_request->obj_request_count); 1125 obj_request->which = BAD_WHICH; 1126 rbd_assert(obj_request->img_request == img_request); 1127 obj_request->img_request = NULL; 1128 obj_request->callback = NULL; 1129 rbd_obj_request_put(obj_request); 1130} 1131 1132static bool obj_request_type_valid(enum obj_request_type type) 1133{ 1134 switch (type) { 1135 case OBJ_REQUEST_NODATA: 1136 case OBJ_REQUEST_BIO: 1137 case OBJ_REQUEST_PAGES: 1138 return true; 1139 default: 1140 return false; 1141 } 1142} 1143 1144static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...) 1145{ 1146 struct ceph_osd_req_op *op; 1147 va_list args; 1148 size_t size; 1149 1150 op = kzalloc(sizeof (*op), GFP_NOIO); 1151 if (!op) 1152 return NULL; 1153 op->op = opcode; 1154 va_start(args, opcode); 1155 switch (opcode) { 1156 case CEPH_OSD_OP_READ: 1157 case CEPH_OSD_OP_WRITE: 1158 /* rbd_osd_req_op_create(READ, offset, length) */ 1159 /* rbd_osd_req_op_create(WRITE, offset, length) */ 1160 op->extent.offset = va_arg(args, u64); 1161 op->extent.length = va_arg(args, u64); 1162 if (opcode == CEPH_OSD_OP_WRITE) 1163 op->payload_len = op->extent.length; 1164 break; 1165 case CEPH_OSD_OP_STAT: 1166 break; 1167 case CEPH_OSD_OP_CALL: 1168 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */ 1169 op->cls.class_name = va_arg(args, char *); 1170 size = strlen(op->cls.class_name); 1171 rbd_assert(size <= (size_t) U8_MAX); 1172 op->cls.class_len = size; 1173 op->payload_len = size; 1174 1175 op->cls.method_name = va_arg(args, char *); 1176 size = strlen(op->cls.method_name); 1177 rbd_assert(size <= (size_t) U8_MAX); 1178 op->cls.method_len = size; 1179 op->payload_len += size; 1180 1181 op->cls.argc = 0; 1182 op->cls.indata = va_arg(args, void *); 1183 size = va_arg(args, size_t); 1184 rbd_assert(size <= (size_t) U32_MAX); 1185 op->cls.indata_len = (u32) size; 1186 op->payload_len += size; 1187 break; 1188 case CEPH_OSD_OP_NOTIFY_ACK: 1189 case CEPH_OSD_OP_WATCH: 1190 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */ 1191 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */ 1192 op->watch.cookie = va_arg(args, u64); 1193 op->watch.ver = va_arg(args, u64); 1194 op->watch.ver = cpu_to_le64(op->watch.ver); 1195 if (opcode == CEPH_OSD_OP_WATCH && va_arg(args, int)) 1196 op->watch.flag = (u8) 1; 1197 break; 1198 default: 1199 rbd_warn(NULL, "unsupported opcode %hu\n", opcode); 1200 kfree(op); 1201 op = NULL; 1202 break; 1203 } 1204 va_end(args); 1205 1206 return op; 1207} 1208 1209static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op) 1210{ 1211 kfree(op); 1212} 1213 1214static int rbd_obj_request_submit(struct ceph_osd_client *osdc, 1215 struct rbd_obj_request *obj_request) 1216{ 1217 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request); 1218 1219 return ceph_osdc_start_request(osdc, obj_request->osd_req, false); 1220} 1221 1222static void rbd_img_request_complete(struct rbd_img_request *img_request) 1223{ 1224 dout("%s: img %p\n", __func__, img_request); 1225 if (img_request->callback) 1226 img_request->callback(img_request); 1227 else 1228 rbd_img_request_put(img_request); 1229} 1230 1231/* Caller is responsible for rbd_obj_request_destroy(obj_request) */ 1232 1233static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) 1234{ 1235 dout("%s: obj %p\n", __func__, obj_request); 1236 1237 return wait_for_completion_interruptible(&obj_request->completion); 1238} 1239 1240static void obj_request_done_init(struct rbd_obj_request *obj_request) 1241{ 1242 atomic_set(&obj_request->done, 0); 1243 smp_wmb(); 1244} 1245 1246static void obj_request_done_set(struct rbd_obj_request *obj_request) 1247{ 1248 int done; 1249 1250 done = atomic_inc_return(&obj_request->done); 1251 if (done > 1) { 1252 struct rbd_img_request *img_request = obj_request->img_request; 1253 struct rbd_device *rbd_dev; 1254 1255 rbd_dev = img_request ? img_request->rbd_dev : NULL; 1256 rbd_warn(rbd_dev, "obj_request %p was already done\n", 1257 obj_request); 1258 } 1259} 1260 1261static bool obj_request_done_test(struct rbd_obj_request *obj_request) 1262{ 1263 smp_mb(); 1264 return atomic_read(&obj_request->done) != 0; 1265} 1266 1267static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) 1268{ 1269 dout("%s: obj %p cb %p\n", __func__, obj_request, 1270 obj_request->callback); 1271 if (obj_request->callback) 1272 obj_request->callback(obj_request); 1273 else 1274 complete_all(&obj_request->completion); 1275} 1276 1277static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) 1278{ 1279 dout("%s: obj %p\n", __func__, obj_request); 1280 obj_request_done_set(obj_request); 1281} 1282 1283static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1284{ 1285 1286 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, 1287 obj_request->result, obj_request->xferred, obj_request->length); 1288 if (obj_request->result == (s32) -ENOENT) { 1289 zero_bio_chain(obj_request->bio_list, 0); 1290 obj_request->result = 0; 1291 } else if (obj_request->xferred < obj_request->length && 1292 !obj_request->result) { 1293 zero_bio_chain(obj_request->bio_list, obj_request->xferred); 1294 obj_request->xferred = obj_request->length; 1295 } 1296 obj_request_done_set(obj_request); 1297} 1298 1299static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) 1300{ 1301 dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request, 1302 obj_request->result, obj_request->xferred, obj_request->length); 1303 1304 /* A short write really shouldn't occur. Warn if we see one */ 1305 1306 if (obj_request->xferred != obj_request->length) { 1307 struct rbd_img_request *img_request = obj_request->img_request; 1308 struct rbd_device *rbd_dev; 1309 1310 rbd_dev = img_request ? img_request->rbd_dev : NULL; 1311 rbd_warn(rbd_dev, "wrote %llu want %llu\n", 1312 obj_request->xferred, obj_request->length); 1313 } 1314 1315 obj_request_done_set(obj_request); 1316} 1317 1318/* 1319 * For a simple stat call there's nothing to do. We'll do more if 1320 * this is part of a write sequence for a layered image. 1321 */ 1322static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request) 1323{ 1324 dout("%s: obj %p\n", __func__, obj_request); 1325 obj_request_done_set(obj_request); 1326} 1327 1328static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, 1329 struct ceph_msg *msg) 1330{ 1331 struct rbd_obj_request *obj_request = osd_req->r_priv; 1332 struct ceph_osd_reply_head *reply_head; 1333 struct ceph_osd_op *op; 1334 u32 num_ops; 1335 u16 opcode; 1336 1337 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); 1338 rbd_assert(osd_req == obj_request->osd_req); 1339 rbd_assert(!!obj_request->img_request ^ 1340 (obj_request->which == BAD_WHICH)); 1341 1342 reply_head = msg->front.iov_base; 1343 obj_request->result = (s32) le32_to_cpu(reply_head->result); 1344 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); 1345 1346 num_ops = le32_to_cpu(reply_head->num_ops); 1347 WARN_ON(num_ops != 1); /* For now */ 1348 1349 /* 1350 * We support a 64-bit length, but ultimately it has to be 1351 * passed to blk_end_request(), which takes an unsigned int. 1352 */ 1353 op = &reply_head->ops[0]; 1354 obj_request->xferred = le64_to_cpu(op->extent.length); 1355 rbd_assert(obj_request->xferred < (u64) UINT_MAX); 1356 1357 opcode = le16_to_cpu(op->op); 1358 switch (opcode) { 1359 case CEPH_OSD_OP_READ: 1360 rbd_osd_read_callback(obj_request); 1361 break; 1362 case CEPH_OSD_OP_WRITE: 1363 rbd_osd_write_callback(obj_request); 1364 break; 1365 case CEPH_OSD_OP_STAT: 1366 rbd_osd_stat_callback(obj_request); 1367 break; 1368 case CEPH_OSD_OP_CALL: 1369 case CEPH_OSD_OP_NOTIFY_ACK: 1370 case CEPH_OSD_OP_WATCH: 1371 rbd_osd_trivial_callback(obj_request); 1372 break; 1373 default: 1374 rbd_warn(NULL, "%s: unsupported op %hu\n", 1375 obj_request->object_name, (unsigned short) opcode); 1376 break; 1377 } 1378 1379 if (obj_request_done_test(obj_request)) 1380 rbd_obj_request_complete(obj_request); 1381} 1382 1383static struct ceph_osd_request *rbd_osd_req_create( 1384 struct rbd_device *rbd_dev, 1385 bool write_request, 1386 struct rbd_obj_request *obj_request, 1387 struct ceph_osd_req_op *op) 1388{ 1389 struct rbd_img_request *img_request = obj_request->img_request; 1390 struct ceph_snap_context *snapc = NULL; 1391 struct ceph_osd_client *osdc; 1392 struct ceph_osd_request *osd_req; 1393 struct timespec now; 1394 struct timespec *mtime; 1395 u64 snap_id = CEPH_NOSNAP; 1396 u64 offset = obj_request->offset; 1397 u64 length = obj_request->length; 1398 1399 if (img_request) { 1400 rbd_assert(img_request->write_request == write_request); 1401 if (img_request->write_request) 1402 snapc = img_request->snapc; 1403 else 1404 snap_id = img_request->snap_id; 1405 } 1406 1407 /* Allocate and initialize the request, for the single op */ 1408 1409 osdc = &rbd_dev->rbd_client->client->osdc; 1410 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); 1411 if (!osd_req) 1412 return NULL; /* ENOMEM */ 1413 1414 rbd_assert(obj_request_type_valid(obj_request->type)); 1415 switch (obj_request->type) { 1416 case OBJ_REQUEST_NODATA: 1417 break; /* Nothing to do */ 1418 case OBJ_REQUEST_BIO: 1419 rbd_assert(obj_request->bio_list != NULL); 1420 osd_req->r_bio = obj_request->bio_list; 1421 break; 1422 case OBJ_REQUEST_PAGES: 1423 osd_req->r_pages = obj_request->pages; 1424 osd_req->r_num_pages = obj_request->page_count; 1425 osd_req->r_page_alignment = offset & ~PAGE_MASK; 1426 break; 1427 } 1428 1429 if (write_request) { 1430 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; 1431 now = CURRENT_TIME; 1432 mtime = &now; 1433 } else { 1434 osd_req->r_flags = CEPH_OSD_FLAG_READ; 1435 mtime = NULL; /* not needed for reads */ 1436 offset = 0; /* These are not used... */ 1437 length = 0; /* ...for osd read requests */ 1438 } 1439 1440 osd_req->r_callback = rbd_osd_req_callback; 1441 osd_req->r_priv = obj_request; 1442 1443 osd_req->r_oid_len = strlen(obj_request->object_name); 1444 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); 1445 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); 1446 1447 osd_req->r_file_layout = rbd_dev->layout; /* struct */ 1448 1449 /* osd_req will get its own reference to snapc (if non-null) */ 1450 1451 ceph_osdc_build_request(osd_req, offset, length, 1, op, 1452 snapc, snap_id, mtime); 1453 1454 return osd_req; 1455} 1456 1457static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 1458{ 1459 ceph_osdc_put_request(osd_req); 1460} 1461 1462/* object_name is assumed to be a non-null pointer and NUL-terminated */ 1463 1464static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, 1465 u64 offset, u64 length, 1466 enum obj_request_type type) 1467{ 1468 struct rbd_obj_request *obj_request; 1469 size_t size; 1470 char *name; 1471 1472 rbd_assert(obj_request_type_valid(type)); 1473 1474 size = strlen(object_name) + 1; 1475 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL); 1476 if (!obj_request) 1477 return NULL; 1478 1479 name = (char *)(obj_request + 1); 1480 obj_request->object_name = memcpy(name, object_name, size); 1481 obj_request->offset = offset; 1482 obj_request->length = length; 1483 obj_request->which = BAD_WHICH; 1484 obj_request->type = type; 1485 INIT_LIST_HEAD(&obj_request->links); 1486 obj_request_done_init(obj_request); 1487 init_completion(&obj_request->completion); 1488 kref_init(&obj_request->kref); 1489 1490 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name, 1491 offset, length, (int)type, obj_request); 1492 1493 return obj_request; 1494} 1495 1496static void rbd_obj_request_destroy(struct kref *kref) 1497{ 1498 struct rbd_obj_request *obj_request; 1499 1500 obj_request = container_of(kref, struct rbd_obj_request, kref); 1501 1502 dout("%s: obj %p\n", __func__, obj_request); 1503 1504 rbd_assert(obj_request->img_request == NULL); 1505 rbd_assert(obj_request->which == BAD_WHICH); 1506 1507 if (obj_request->osd_req) 1508 rbd_osd_req_destroy(obj_request->osd_req); 1509 1510 rbd_assert(obj_request_type_valid(obj_request->type)); 1511 switch (obj_request->type) { 1512 case OBJ_REQUEST_NODATA: 1513 break; /* Nothing to do */ 1514 case OBJ_REQUEST_BIO: 1515 if (obj_request->bio_list) 1516 bio_chain_put(obj_request->bio_list); 1517 break; 1518 case OBJ_REQUEST_PAGES: 1519 if (obj_request->pages) 1520 ceph_release_page_vector(obj_request->pages, 1521 obj_request->page_count); 1522 break; 1523 } 1524 1525 kfree(obj_request); 1526} 1527 1528/* 1529 * Caller is responsible for filling in the list of object requests 1530 * that comprises the image request, and the Linux request pointer 1531 * (if there is one). 1532 */ 1533static struct rbd_img_request *rbd_img_request_create( 1534 struct rbd_device *rbd_dev, 1535 u64 offset, u64 length, 1536 bool write_request) 1537{ 1538 struct rbd_img_request *img_request; 1539 struct ceph_snap_context *snapc = NULL; 1540 1541 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC); 1542 if (!img_request) 1543 return NULL; 1544 1545 if (write_request) { 1546 down_read(&rbd_dev->header_rwsem); 1547 snapc = ceph_get_snap_context(rbd_dev->header.snapc); 1548 up_read(&rbd_dev->header_rwsem); 1549 if (WARN_ON(!snapc)) { 1550 kfree(img_request); 1551 return NULL; /* Shouldn't happen */ 1552 } 1553 } 1554 1555 img_request->rq = NULL; 1556 img_request->rbd_dev = rbd_dev; 1557 img_request->offset = offset; 1558 img_request->length = length; 1559 img_request->write_request = write_request; 1560 if (write_request) 1561 img_request->snapc = snapc; 1562 else 1563 img_request->snap_id = rbd_dev->spec->snap_id; 1564 spin_lock_init(&img_request->completion_lock); 1565 img_request->next_completion = 0; 1566 img_request->callback = NULL; 1567 img_request->obj_request_count = 0; 1568 INIT_LIST_HEAD(&img_request->obj_requests); 1569 kref_init(&img_request->kref); 1570 1571 rbd_img_request_get(img_request); /* Avoid a warning */ 1572 rbd_img_request_put(img_request); /* TEMPORARY */ 1573 1574 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, 1575 write_request ? "write" : "read", offset, length, 1576 img_request); 1577 1578 return img_request; 1579} 1580 1581static void rbd_img_request_destroy(struct kref *kref) 1582{ 1583 struct rbd_img_request *img_request; 1584 struct rbd_obj_request *obj_request; 1585 struct rbd_obj_request *next_obj_request; 1586 1587 img_request = container_of(kref, struct rbd_img_request, kref); 1588 1589 dout("%s: img %p\n", __func__, img_request); 1590 1591 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1592 rbd_img_obj_request_del(img_request, obj_request); 1593 rbd_assert(img_request->obj_request_count == 0); 1594 1595 if (img_request->write_request) 1596 ceph_put_snap_context(img_request->snapc); 1597 1598 kfree(img_request); 1599} 1600 1601static int rbd_img_request_fill_bio(struct rbd_img_request *img_request, 1602 struct bio *bio_list) 1603{ 1604 struct rbd_device *rbd_dev = img_request->rbd_dev; 1605 struct rbd_obj_request *obj_request = NULL; 1606 struct rbd_obj_request *next_obj_request; 1607 unsigned int bio_offset; 1608 u64 image_offset; 1609 u64 resid; 1610 u16 opcode; 1611 1612 dout("%s: img %p bio %p\n", __func__, img_request, bio_list); 1613 1614 opcode = img_request->write_request ? CEPH_OSD_OP_WRITE 1615 : CEPH_OSD_OP_READ; 1616 bio_offset = 0; 1617 image_offset = img_request->offset; 1618 rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT); 1619 resid = img_request->length; 1620 rbd_assert(resid > 0); 1621 while (resid) { 1622 const char *object_name; 1623 unsigned int clone_size; 1624 struct ceph_osd_req_op *op; 1625 u64 offset; 1626 u64 length; 1627 1628 object_name = rbd_segment_name(rbd_dev, image_offset); 1629 if (!object_name) 1630 goto out_unwind; 1631 offset = rbd_segment_offset(rbd_dev, image_offset); 1632 length = rbd_segment_length(rbd_dev, image_offset, resid); 1633 obj_request = rbd_obj_request_create(object_name, 1634 offset, length, 1635 OBJ_REQUEST_BIO); 1636 kfree(object_name); /* object request has its own copy */ 1637 if (!obj_request) 1638 goto out_unwind; 1639 1640 rbd_assert(length <= (u64) UINT_MAX); 1641 clone_size = (unsigned int) length; 1642 obj_request->bio_list = bio_chain_clone_range(&bio_list, 1643 &bio_offset, clone_size, 1644 GFP_ATOMIC); 1645 if (!obj_request->bio_list) 1646 goto out_partial; 1647 1648 /* 1649 * Build up the op to use in building the osd 1650 * request. Note that the contents of the op are 1651 * copied by rbd_osd_req_create(). 1652 */ 1653 op = rbd_osd_req_op_create(opcode, offset, length); 1654 if (!op) 1655 goto out_partial; 1656 obj_request->osd_req = rbd_osd_req_create(rbd_dev, 1657 img_request->write_request, 1658 obj_request, op); 1659 rbd_osd_req_op_destroy(op); 1660 if (!obj_request->osd_req) 1661 goto out_partial; 1662 /* status and version are initially zero-filled */ 1663 1664 rbd_img_obj_request_add(img_request, obj_request); 1665 1666 image_offset += length; 1667 resid -= length; 1668 } 1669 1670 return 0; 1671 1672out_partial: 1673 rbd_obj_request_put(obj_request); 1674out_unwind: 1675 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1676 rbd_obj_request_put(obj_request); 1677 1678 return -ENOMEM; 1679} 1680 1681static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) 1682{ 1683 struct rbd_img_request *img_request; 1684 u32 which = obj_request->which; 1685 bool more = true; 1686 1687 img_request = obj_request->img_request; 1688 1689 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1690 rbd_assert(img_request != NULL); 1691 rbd_assert(img_request->rq != NULL); 1692 rbd_assert(img_request->obj_request_count > 0); 1693 rbd_assert(which != BAD_WHICH); 1694 rbd_assert(which < img_request->obj_request_count); 1695 rbd_assert(which >= img_request->next_completion); 1696 1697 spin_lock_irq(&img_request->completion_lock); 1698 if (which != img_request->next_completion) 1699 goto out; 1700 1701 for_each_obj_request_from(img_request, obj_request) { 1702 unsigned int xferred; 1703 int result; 1704 1705 rbd_assert(more); 1706 rbd_assert(which < img_request->obj_request_count); 1707 1708 if (!obj_request_done_test(obj_request)) 1709 break; 1710 1711 rbd_assert(obj_request->xferred <= (u64) UINT_MAX); 1712 xferred = (unsigned int) obj_request->xferred; 1713 result = (int) obj_request->result; 1714 if (result) 1715 rbd_warn(NULL, "obj_request %s result %d xferred %u\n", 1716 img_request->write_request ? "write" : "read", 1717 result, xferred); 1718 1719 more = blk_end_request(img_request->rq, result, xferred); 1720 which++; 1721 } 1722 rbd_assert(more ^ (which == img_request->obj_request_count)); 1723 img_request->next_completion = which; 1724out: 1725 spin_unlock_irq(&img_request->completion_lock); 1726 1727 if (!more) 1728 rbd_img_request_complete(img_request); 1729} 1730 1731static int rbd_img_request_submit(struct rbd_img_request *img_request) 1732{ 1733 struct rbd_device *rbd_dev = img_request->rbd_dev; 1734 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1735 struct rbd_obj_request *obj_request; 1736 1737 dout("%s: img %p\n", __func__, img_request); 1738 for_each_obj_request(img_request, obj_request) { 1739 int ret; 1740 1741 obj_request->callback = rbd_img_obj_callback; 1742 ret = rbd_obj_request_submit(osdc, obj_request); 1743 if (ret) 1744 return ret; 1745 /* 1746 * The image request has its own reference to each 1747 * of its object requests, so we can safely drop the 1748 * initial one here. 1749 */ 1750 rbd_obj_request_put(obj_request); 1751 } 1752 1753 return 0; 1754} 1755 1756static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, 1757 u64 ver, u64 notify_id) 1758{ 1759 struct rbd_obj_request *obj_request; 1760 struct ceph_osd_req_op *op; 1761 struct ceph_osd_client *osdc; 1762 int ret; 1763 1764 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 1765 OBJ_REQUEST_NODATA); 1766 if (!obj_request) 1767 return -ENOMEM; 1768 1769 ret = -ENOMEM; 1770 op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver); 1771 if (!op) 1772 goto out; 1773 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1774 obj_request, op); 1775 rbd_osd_req_op_destroy(op); 1776 if (!obj_request->osd_req) 1777 goto out; 1778 1779 osdc = &rbd_dev->rbd_client->client->osdc; 1780 obj_request->callback = rbd_obj_request_put; 1781 ret = rbd_obj_request_submit(osdc, obj_request); 1782out: 1783 if (ret) 1784 rbd_obj_request_put(obj_request); 1785 1786 return ret; 1787} 1788 1789static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 1790{ 1791 struct rbd_device *rbd_dev = (struct rbd_device *)data; 1792 u64 hver; 1793 int rc; 1794 1795 if (!rbd_dev) 1796 return; 1797 1798 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, 1799 rbd_dev->header_name, (unsigned long long) notify_id, 1800 (unsigned int) opcode); 1801 rc = rbd_dev_refresh(rbd_dev, &hver); 1802 if (rc) 1803 rbd_warn(rbd_dev, "got notification but failed to " 1804 " update snaps: %d\n", rc); 1805 1806 rbd_obj_notify_ack(rbd_dev, hver, notify_id); 1807} 1808 1809/* 1810 * Request sync osd watch/unwatch. The value of "start" determines 1811 * whether a watch request is being initiated or torn down. 1812 */ 1813static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) 1814{ 1815 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 1816 struct rbd_obj_request *obj_request; 1817 struct ceph_osd_req_op *op; 1818 int ret; 1819 1820 rbd_assert(start ^ !!rbd_dev->watch_event); 1821 rbd_assert(start ^ !!rbd_dev->watch_request); 1822 1823 if (start) { 1824 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, 1825 &rbd_dev->watch_event); 1826 if (ret < 0) 1827 return ret; 1828 rbd_assert(rbd_dev->watch_event != NULL); 1829 } 1830 1831 ret = -ENOMEM; 1832 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 1833 OBJ_REQUEST_NODATA); 1834 if (!obj_request) 1835 goto out_cancel; 1836 1837 op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH, 1838 rbd_dev->watch_event->cookie, 1839 rbd_dev->header.obj_version, start); 1840 if (!op) 1841 goto out_cancel; 1842 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1843 obj_request, op); 1844 rbd_osd_req_op_destroy(op); 1845 if (!obj_request->osd_req) 1846 goto out_cancel; 1847 1848 if (start) 1849 ceph_osdc_set_request_linger(osdc, obj_request->osd_req); 1850 else 1851 ceph_osdc_unregister_linger_request(osdc, 1852 rbd_dev->watch_request->osd_req); 1853 ret = rbd_obj_request_submit(osdc, obj_request); 1854 if (ret) 1855 goto out_cancel; 1856 ret = rbd_obj_request_wait(obj_request); 1857 if (ret) 1858 goto out_cancel; 1859 ret = obj_request->result; 1860 if (ret) 1861 goto out_cancel; 1862 1863 /* 1864 * A watch request is set to linger, so the underlying osd 1865 * request won't go away until we unregister it. We retain 1866 * a pointer to the object request during that time (in 1867 * rbd_dev->watch_request), so we'll keep a reference to 1868 * it. We'll drop that reference (below) after we've 1869 * unregistered it. 1870 */ 1871 if (start) { 1872 rbd_dev->watch_request = obj_request; 1873 1874 return 0; 1875 } 1876 1877 /* We have successfully torn down the watch request */ 1878 1879 rbd_obj_request_put(rbd_dev->watch_request); 1880 rbd_dev->watch_request = NULL; 1881out_cancel: 1882 /* Cancel the event if we're tearing down, or on error */ 1883 ceph_osdc_cancel_event(rbd_dev->watch_event); 1884 rbd_dev->watch_event = NULL; 1885 if (obj_request) 1886 rbd_obj_request_put(obj_request); 1887 1888 return ret; 1889} 1890 1891/* 1892 * Synchronous osd object method call 1893 */ 1894static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 1895 const char *object_name, 1896 const char *class_name, 1897 const char *method_name, 1898 const char *outbound, 1899 size_t outbound_size, 1900 char *inbound, 1901 size_t inbound_size, 1902 u64 *version) 1903{ 1904 struct rbd_obj_request *obj_request; 1905 struct ceph_osd_client *osdc; 1906 struct ceph_osd_req_op *op; 1907 struct page **pages; 1908 u32 page_count; 1909 int ret; 1910 1911 /* 1912 * Method calls are ultimately read operations but they 1913 * don't involve object data (so no offset or length). 1914 * The result should placed into the inbound buffer 1915 * provided. They also supply outbound data--parameters for 1916 * the object method. Currently if this is present it will 1917 * be a snapshot id. 1918 */ 1919 page_count = (u32) calc_pages_for(0, inbound_size); 1920 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 1921 if (IS_ERR(pages)) 1922 return PTR_ERR(pages); 1923 1924 ret = -ENOMEM; 1925 obj_request = rbd_obj_request_create(object_name, 0, 0, 1926 OBJ_REQUEST_PAGES); 1927 if (!obj_request) 1928 goto out; 1929 1930 obj_request->pages = pages; 1931 obj_request->page_count = page_count; 1932 1933 op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name, 1934 method_name, outbound, outbound_size); 1935 if (!op) 1936 goto out; 1937 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 1938 obj_request, op); 1939 rbd_osd_req_op_destroy(op); 1940 if (!obj_request->osd_req) 1941 goto out; 1942 1943 osdc = &rbd_dev->rbd_client->client->osdc; 1944 ret = rbd_obj_request_submit(osdc, obj_request); 1945 if (ret) 1946 goto out; 1947 ret = rbd_obj_request_wait(obj_request); 1948 if (ret) 1949 goto out; 1950 1951 ret = obj_request->result; 1952 if (ret < 0) 1953 goto out; 1954 ret = 0; 1955 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); 1956 if (version) 1957 *version = obj_request->version; 1958out: 1959 if (obj_request) 1960 rbd_obj_request_put(obj_request); 1961 else 1962 ceph_release_page_vector(pages, page_count); 1963 1964 return ret; 1965} 1966 1967static void rbd_request_fn(struct request_queue *q) 1968 __releases(q->queue_lock) __acquires(q->queue_lock) 1969{ 1970 struct rbd_device *rbd_dev = q->queuedata; 1971 bool read_only = rbd_dev->mapping.read_only; 1972 struct request *rq; 1973 int result; 1974 1975 while ((rq = blk_fetch_request(q))) { 1976 bool write_request = rq_data_dir(rq) == WRITE; 1977 struct rbd_img_request *img_request; 1978 u64 offset; 1979 u64 length; 1980 1981 /* Ignore any non-FS requests that filter through. */ 1982 1983 if (rq->cmd_type != REQ_TYPE_FS) { 1984 dout("%s: non-fs request type %d\n", __func__, 1985 (int) rq->cmd_type); 1986 __blk_end_request_all(rq, 0); 1987 continue; 1988 } 1989 1990 /* Ignore/skip any zero-length requests */ 1991 1992 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; 1993 length = (u64) blk_rq_bytes(rq); 1994 1995 if (!length) { 1996 dout("%s: zero-length request\n", __func__); 1997 __blk_end_request_all(rq, 0); 1998 continue; 1999 } 2000 2001 spin_unlock_irq(q->queue_lock); 2002 2003 /* Disallow writes to a read-only device */ 2004 2005 if (write_request) { 2006 result = -EROFS; 2007 if (read_only) 2008 goto end_request; 2009 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); 2010 } 2011 2012 /* 2013 * Quit early if the mapped snapshot no longer 2014 * exists. It's still possible the snapshot will 2015 * have disappeared by the time our request arrives 2016 * at the osd, but there's no sense in sending it if 2017 * we already know. 2018 */ 2019 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { 2020 dout("request for non-existent snapshot"); 2021 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 2022 result = -ENXIO; 2023 goto end_request; 2024 } 2025 2026 result = -EINVAL; 2027 if (WARN_ON(offset && length > U64_MAX - offset + 1)) 2028 goto end_request; /* Shouldn't happen */ 2029 2030 result = -ENOMEM; 2031 img_request = rbd_img_request_create(rbd_dev, offset, length, 2032 write_request); 2033 if (!img_request) 2034 goto end_request; 2035 2036 img_request->rq = rq; 2037 2038 result = rbd_img_request_fill_bio(img_request, rq->bio); 2039 if (!result) 2040 result = rbd_img_request_submit(img_request); 2041 if (result) 2042 rbd_img_request_put(img_request); 2043end_request: 2044 spin_lock_irq(q->queue_lock); 2045 if (result < 0) { 2046 rbd_warn(rbd_dev, "obj_request %s result %d\n", 2047 write_request ? "write" : "read", result); 2048 __blk_end_request_all(rq, result); 2049 } 2050 } 2051} 2052 2053/* 2054 * a queue callback. Makes sure that we don't create a bio that spans across 2055 * multiple osd objects. One exception would be with a single page bios, 2056 * which we handle later at bio_chain_clone_range() 2057 */ 2058static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 2059 struct bio_vec *bvec) 2060{ 2061 struct rbd_device *rbd_dev = q->queuedata; 2062 sector_t sector_offset; 2063 sector_t sectors_per_obj; 2064 sector_t obj_sector_offset; 2065 int ret; 2066 2067 /* 2068 * Find how far into its rbd object the partition-relative 2069 * bio start sector is to offset relative to the enclosing 2070 * device. 2071 */ 2072 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector; 2073 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 2074 obj_sector_offset = sector_offset & (sectors_per_obj - 1); 2075 2076 /* 2077 * Compute the number of bytes from that offset to the end 2078 * of the object. Account for what's already used by the bio. 2079 */ 2080 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT; 2081 if (ret > bmd->bi_size) 2082 ret -= bmd->bi_size; 2083 else 2084 ret = 0; 2085 2086 /* 2087 * Don't send back more than was asked for. And if the bio 2088 * was empty, let the whole thing through because: "Note 2089 * that a block device *must* allow a single page to be 2090 * added to an empty bio." 2091 */ 2092 rbd_assert(bvec->bv_len <= PAGE_SIZE); 2093 if (ret > (int) bvec->bv_len || !bmd->bi_size) 2094 ret = (int) bvec->bv_len; 2095 2096 return ret; 2097} 2098 2099static void rbd_free_disk(struct rbd_device *rbd_dev) 2100{ 2101 struct gendisk *disk = rbd_dev->disk; 2102 2103 if (!disk) 2104 return; 2105 2106 if (disk->flags & GENHD_FL_UP) 2107 del_gendisk(disk); 2108 if (disk->queue) 2109 blk_cleanup_queue(disk->queue); 2110 put_disk(disk); 2111} 2112 2113static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 2114 const char *object_name, 2115 u64 offset, u64 length, 2116 char *buf, u64 *version) 2117 2118{ 2119 struct ceph_osd_req_op *op; 2120 struct rbd_obj_request *obj_request; 2121 struct ceph_osd_client *osdc; 2122 struct page **pages = NULL; 2123 u32 page_count; 2124 size_t size; 2125 int ret; 2126 2127 page_count = (u32) calc_pages_for(offset, length); 2128 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2129 if (IS_ERR(pages)) 2130 ret = PTR_ERR(pages); 2131 2132 ret = -ENOMEM; 2133 obj_request = rbd_obj_request_create(object_name, offset, length, 2134 OBJ_REQUEST_PAGES); 2135 if (!obj_request) 2136 goto out; 2137 2138 obj_request->pages = pages; 2139 obj_request->page_count = page_count; 2140 2141 op = rbd_osd_req_op_create(CEPH_OSD_OP_READ, offset, length); 2142 if (!op) 2143 goto out; 2144 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, 2145 obj_request, op); 2146 rbd_osd_req_op_destroy(op); 2147 if (!obj_request->osd_req) 2148 goto out; 2149 2150 osdc = &rbd_dev->rbd_client->client->osdc; 2151 ret = rbd_obj_request_submit(osdc, obj_request); 2152 if (ret) 2153 goto out; 2154 ret = rbd_obj_request_wait(obj_request); 2155 if (ret) 2156 goto out; 2157 2158 ret = obj_request->result; 2159 if (ret < 0) 2160 goto out; 2161 2162 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); 2163 size = (size_t) obj_request->xferred; 2164 ceph_copy_from_page_vector(pages, buf, 0, size); 2165 rbd_assert(size <= (size_t) INT_MAX); 2166 ret = (int) size; 2167 if (version) 2168 *version = obj_request->version; 2169out: 2170 if (obj_request) 2171 rbd_obj_request_put(obj_request); 2172 else 2173 ceph_release_page_vector(pages, page_count); 2174 2175 return ret; 2176} 2177 2178/* 2179 * Read the complete header for the given rbd device. 2180 * 2181 * Returns a pointer to a dynamically-allocated buffer containing 2182 * the complete and validated header. Caller can pass the address 2183 * of a variable that will be filled in with the version of the 2184 * header object at the time it was read. 2185 * 2186 * Returns a pointer-coded errno if a failure occurs. 2187 */ 2188static struct rbd_image_header_ondisk * 2189rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) 2190{ 2191 struct rbd_image_header_ondisk *ondisk = NULL; 2192 u32 snap_count = 0; 2193 u64 names_size = 0; 2194 u32 want_count; 2195 int ret; 2196 2197 /* 2198 * The complete header will include an array of its 64-bit 2199 * snapshot ids, followed by the names of those snapshots as 2200 * a contiguous block of NUL-terminated strings. Note that 2201 * the number of snapshots could change by the time we read 2202 * it in, in which case we re-read it. 2203 */ 2204 do { 2205 size_t size; 2206 2207 kfree(ondisk); 2208 2209 size = sizeof (*ondisk); 2210 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 2211 size += names_size; 2212 ondisk = kmalloc(size, GFP_KERNEL); 2213 if (!ondisk) 2214 return ERR_PTR(-ENOMEM); 2215 2216 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, 2217 0, size, 2218 (char *) ondisk, version); 2219 if (ret < 0) 2220 goto out_err; 2221 if (WARN_ON((size_t) ret < size)) { 2222 ret = -ENXIO; 2223 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 2224 size, ret); 2225 goto out_err; 2226 } 2227 if (!rbd_dev_ondisk_valid(ondisk)) { 2228 ret = -ENXIO; 2229 rbd_warn(rbd_dev, "invalid header"); 2230 goto out_err; 2231 } 2232 2233 names_size = le64_to_cpu(ondisk->snap_names_len); 2234 want_count = snap_count; 2235 snap_count = le32_to_cpu(ondisk->snap_count); 2236 } while (snap_count != want_count); 2237 2238 return ondisk; 2239 2240out_err: 2241 kfree(ondisk); 2242 2243 return ERR_PTR(ret); 2244} 2245 2246/* 2247 * reload the ondisk the header 2248 */ 2249static int rbd_read_header(struct rbd_device *rbd_dev, 2250 struct rbd_image_header *header) 2251{ 2252 struct rbd_image_header_ondisk *ondisk; 2253 u64 ver = 0; 2254 int ret; 2255 2256 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver); 2257 if (IS_ERR(ondisk)) 2258 return PTR_ERR(ondisk); 2259 ret = rbd_header_from_disk(header, ondisk); 2260 if (ret >= 0) 2261 header->obj_version = ver; 2262 kfree(ondisk); 2263 2264 return ret; 2265} 2266 2267static void rbd_remove_all_snaps(struct rbd_device *rbd_dev) 2268{ 2269 struct rbd_snap *snap; 2270 struct rbd_snap *next; 2271 2272 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) 2273 rbd_remove_snap_dev(snap); 2274} 2275 2276static void rbd_update_mapping_size(struct rbd_device *rbd_dev) 2277{ 2278 sector_t size; 2279 2280 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 2281 return; 2282 2283 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE; 2284 dout("setting size to %llu sectors", (unsigned long long) size); 2285 rbd_dev->mapping.size = (u64) size; 2286 set_capacity(rbd_dev->disk, size); 2287} 2288 2289/* 2290 * only read the first part of the ondisk header, without the snaps info 2291 */ 2292static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver) 2293{ 2294 int ret; 2295 struct rbd_image_header h; 2296 2297 ret = rbd_read_header(rbd_dev, &h); 2298 if (ret < 0) 2299 return ret; 2300 2301 down_write(&rbd_dev->header_rwsem); 2302 2303 /* Update image size, and check for resize of mapped image */ 2304 rbd_dev->header.image_size = h.image_size; 2305 rbd_update_mapping_size(rbd_dev); 2306 2307 /* rbd_dev->header.object_prefix shouldn't change */ 2308 kfree(rbd_dev->header.snap_sizes); 2309 kfree(rbd_dev->header.snap_names); 2310 /* osd requests may still refer to snapc */ 2311 ceph_put_snap_context(rbd_dev->header.snapc); 2312 2313 if (hver) 2314 *hver = h.obj_version; 2315 rbd_dev->header.obj_version = h.obj_version; 2316 rbd_dev->header.image_size = h.image_size; 2317 rbd_dev->header.snapc = h.snapc; 2318 rbd_dev->header.snap_names = h.snap_names; 2319 rbd_dev->header.snap_sizes = h.snap_sizes; 2320 /* Free the extra copy of the object prefix */ 2321 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix)); 2322 kfree(h.object_prefix); 2323 2324 ret = rbd_dev_snaps_update(rbd_dev); 2325 if (!ret) 2326 ret = rbd_dev_snaps_register(rbd_dev); 2327 2328 up_write(&rbd_dev->header_rwsem); 2329 2330 return ret; 2331} 2332 2333static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver) 2334{ 2335 int ret; 2336 2337 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 2338 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 2339 if (rbd_dev->image_format == 1) 2340 ret = rbd_dev_v1_refresh(rbd_dev, hver); 2341 else 2342 ret = rbd_dev_v2_refresh(rbd_dev, hver); 2343 mutex_unlock(&ctl_mutex); 2344 2345 return ret; 2346} 2347 2348static int rbd_init_disk(struct rbd_device *rbd_dev) 2349{ 2350 struct gendisk *disk; 2351 struct request_queue *q; 2352 u64 segment_size; 2353 2354 /* create gendisk info */ 2355 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 2356 if (!disk) 2357 return -ENOMEM; 2358 2359 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 2360 rbd_dev->dev_id); 2361 disk->major = rbd_dev->major; 2362 disk->first_minor = 0; 2363 disk->fops = &rbd_bd_ops; 2364 disk->private_data = rbd_dev; 2365 2366 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); 2367 if (!q) 2368 goto out_disk; 2369 2370 /* We use the default size, but let's be explicit about it. */ 2371 blk_queue_physical_block_size(q, SECTOR_SIZE); 2372 2373 /* set io sizes to object size */ 2374 segment_size = rbd_obj_bytes(&rbd_dev->header); 2375 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); 2376 blk_queue_max_segment_size(q, segment_size); 2377 blk_queue_io_min(q, segment_size); 2378 blk_queue_io_opt(q, segment_size); 2379 2380 blk_queue_merge_bvec(q, rbd_merge_bvec); 2381 disk->queue = q; 2382 2383 q->queuedata = rbd_dev; 2384 2385 rbd_dev->disk = disk; 2386 2387 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 2388 2389 return 0; 2390out_disk: 2391 put_disk(disk); 2392 2393 return -ENOMEM; 2394} 2395 2396/* 2397 sysfs 2398*/ 2399 2400static struct rbd_device *dev_to_rbd_dev(struct device *dev) 2401{ 2402 return container_of(dev, struct rbd_device, dev); 2403} 2404 2405static ssize_t rbd_size_show(struct device *dev, 2406 struct device_attribute *attr, char *buf) 2407{ 2408 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2409 sector_t size; 2410 2411 down_read(&rbd_dev->header_rwsem); 2412 size = get_capacity(rbd_dev->disk); 2413 up_read(&rbd_dev->header_rwsem); 2414 2415 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE); 2416} 2417 2418/* 2419 * Note this shows the features for whatever's mapped, which is not 2420 * necessarily the base image. 2421 */ 2422static ssize_t rbd_features_show(struct device *dev, 2423 struct device_attribute *attr, char *buf) 2424{ 2425 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2426 2427 return sprintf(buf, "0x%016llx\n", 2428 (unsigned long long) rbd_dev->mapping.features); 2429} 2430 2431static ssize_t rbd_major_show(struct device *dev, 2432 struct device_attribute *attr, char *buf) 2433{ 2434 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2435 2436 return sprintf(buf, "%d\n", rbd_dev->major); 2437} 2438 2439static ssize_t rbd_client_id_show(struct device *dev, 2440 struct device_attribute *attr, char *buf) 2441{ 2442 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2443 2444 return sprintf(buf, "client%lld\n", 2445 ceph_client_id(rbd_dev->rbd_client->client)); 2446} 2447 2448static ssize_t rbd_pool_show(struct device *dev, 2449 struct device_attribute *attr, char *buf) 2450{ 2451 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2452 2453 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 2454} 2455 2456static ssize_t rbd_pool_id_show(struct device *dev, 2457 struct device_attribute *attr, char *buf) 2458{ 2459 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2460 2461 return sprintf(buf, "%llu\n", 2462 (unsigned long long) rbd_dev->spec->pool_id); 2463} 2464 2465static ssize_t rbd_name_show(struct device *dev, 2466 struct device_attribute *attr, char *buf) 2467{ 2468 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2469 2470 if (rbd_dev->spec->image_name) 2471 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 2472 2473 return sprintf(buf, "(unknown)\n"); 2474} 2475 2476static ssize_t rbd_image_id_show(struct device *dev, 2477 struct device_attribute *attr, char *buf) 2478{ 2479 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2480 2481 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 2482} 2483 2484/* 2485 * Shows the name of the currently-mapped snapshot (or 2486 * RBD_SNAP_HEAD_NAME for the base image). 2487 */ 2488static ssize_t rbd_snap_show(struct device *dev, 2489 struct device_attribute *attr, 2490 char *buf) 2491{ 2492 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2493 2494 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 2495} 2496 2497/* 2498 * For an rbd v2 image, shows the pool id, image id, and snapshot id 2499 * for the parent image. If there is no parent, simply shows 2500 * "(no parent image)". 2501 */ 2502static ssize_t rbd_parent_show(struct device *dev, 2503 struct device_attribute *attr, 2504 char *buf) 2505{ 2506 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2507 struct rbd_spec *spec = rbd_dev->parent_spec; 2508 int count; 2509 char *bufp = buf; 2510 2511 if (!spec) 2512 return sprintf(buf, "(no parent image)\n"); 2513 2514 count = sprintf(bufp, "pool_id %llu\npool_name %s\n", 2515 (unsigned long long) spec->pool_id, spec->pool_name); 2516 if (count < 0) 2517 return count; 2518 bufp += count; 2519 2520 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id, 2521 spec->image_name ? spec->image_name : "(unknown)"); 2522 if (count < 0) 2523 return count; 2524 bufp += count; 2525 2526 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n", 2527 (unsigned long long) spec->snap_id, spec->snap_name); 2528 if (count < 0) 2529 return count; 2530 bufp += count; 2531 2532 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap); 2533 if (count < 0) 2534 return count; 2535 bufp += count; 2536 2537 return (ssize_t) (bufp - buf); 2538} 2539 2540static ssize_t rbd_image_refresh(struct device *dev, 2541 struct device_attribute *attr, 2542 const char *buf, 2543 size_t size) 2544{ 2545 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 2546 int ret; 2547 2548 ret = rbd_dev_refresh(rbd_dev, NULL); 2549 2550 return ret < 0 ? ret : size; 2551} 2552 2553static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 2554static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 2555static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 2556static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 2557static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 2558static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 2559static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 2560static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 2561static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 2562static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 2563static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); 2564 2565static struct attribute *rbd_attrs[] = { 2566 &dev_attr_size.attr, 2567 &dev_attr_features.attr, 2568 &dev_attr_major.attr, 2569 &dev_attr_client_id.attr, 2570 &dev_attr_pool.attr, 2571 &dev_attr_pool_id.attr, 2572 &dev_attr_name.attr, 2573 &dev_attr_image_id.attr, 2574 &dev_attr_current_snap.attr, 2575 &dev_attr_parent.attr, 2576 &dev_attr_refresh.attr, 2577 NULL 2578}; 2579 2580static struct attribute_group rbd_attr_group = { 2581 .attrs = rbd_attrs, 2582}; 2583 2584static const struct attribute_group *rbd_attr_groups[] = { 2585 &rbd_attr_group, 2586 NULL 2587}; 2588 2589static void rbd_sysfs_dev_release(struct device *dev) 2590{ 2591} 2592 2593static struct device_type rbd_device_type = { 2594 .name = "rbd", 2595 .groups = rbd_attr_groups, 2596 .release = rbd_sysfs_dev_release, 2597}; 2598 2599 2600/* 2601 sysfs - snapshots 2602*/ 2603 2604static ssize_t rbd_snap_size_show(struct device *dev, 2605 struct device_attribute *attr, 2606 char *buf) 2607{ 2608 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2609 2610 return sprintf(buf, "%llu\n", (unsigned long long)snap->size); 2611} 2612 2613static ssize_t rbd_snap_id_show(struct device *dev, 2614 struct device_attribute *attr, 2615 char *buf) 2616{ 2617 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2618 2619 return sprintf(buf, "%llu\n", (unsigned long long)snap->id); 2620} 2621 2622static ssize_t rbd_snap_features_show(struct device *dev, 2623 struct device_attribute *attr, 2624 char *buf) 2625{ 2626 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2627 2628 return sprintf(buf, "0x%016llx\n", 2629 (unsigned long long) snap->features); 2630} 2631 2632static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL); 2633static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL); 2634static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL); 2635 2636static struct attribute *rbd_snap_attrs[] = { 2637 &dev_attr_snap_size.attr, 2638 &dev_attr_snap_id.attr, 2639 &dev_attr_snap_features.attr, 2640 NULL, 2641}; 2642 2643static struct attribute_group rbd_snap_attr_group = { 2644 .attrs = rbd_snap_attrs, 2645}; 2646 2647static void rbd_snap_dev_release(struct device *dev) 2648{ 2649 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev); 2650 kfree(snap->name); 2651 kfree(snap); 2652} 2653 2654static const struct attribute_group *rbd_snap_attr_groups[] = { 2655 &rbd_snap_attr_group, 2656 NULL 2657}; 2658 2659static struct device_type rbd_snap_device_type = { 2660 .groups = rbd_snap_attr_groups, 2661 .release = rbd_snap_dev_release, 2662}; 2663 2664static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 2665{ 2666 kref_get(&spec->kref); 2667 2668 return spec; 2669} 2670 2671static void rbd_spec_free(struct kref *kref); 2672static void rbd_spec_put(struct rbd_spec *spec) 2673{ 2674 if (spec) 2675 kref_put(&spec->kref, rbd_spec_free); 2676} 2677 2678static struct rbd_spec *rbd_spec_alloc(void) 2679{ 2680 struct rbd_spec *spec; 2681 2682 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 2683 if (!spec) 2684 return NULL; 2685 kref_init(&spec->kref); 2686 2687 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */ 2688 2689 return spec; 2690} 2691 2692static void rbd_spec_free(struct kref *kref) 2693{ 2694 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 2695 2696 kfree(spec->pool_name); 2697 kfree(spec->image_id); 2698 kfree(spec->image_name); 2699 kfree(spec->snap_name); 2700 kfree(spec); 2701} 2702 2703static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 2704 struct rbd_spec *spec) 2705{ 2706 struct rbd_device *rbd_dev; 2707 2708 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); 2709 if (!rbd_dev) 2710 return NULL; 2711 2712 spin_lock_init(&rbd_dev->lock); 2713 rbd_dev->flags = 0; 2714 INIT_LIST_HEAD(&rbd_dev->node); 2715 INIT_LIST_HEAD(&rbd_dev->snaps); 2716 init_rwsem(&rbd_dev->header_rwsem); 2717 2718 rbd_dev->spec = spec; 2719 rbd_dev->rbd_client = rbdc; 2720 2721 /* Initialize the layout used for all rbd requests */ 2722 2723 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 2724 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1); 2725 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 2726 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); 2727 2728 return rbd_dev; 2729} 2730 2731static void rbd_dev_destroy(struct rbd_device *rbd_dev) 2732{ 2733 rbd_spec_put(rbd_dev->parent_spec); 2734 kfree(rbd_dev->header_name); 2735 rbd_put_client(rbd_dev->rbd_client); 2736 rbd_spec_put(rbd_dev->spec); 2737 kfree(rbd_dev); 2738} 2739 2740static bool rbd_snap_registered(struct rbd_snap *snap) 2741{ 2742 bool ret = snap->dev.type == &rbd_snap_device_type; 2743 bool reg = device_is_registered(&snap->dev); 2744 2745 rbd_assert(!ret ^ reg); 2746 2747 return ret; 2748} 2749 2750static void rbd_remove_snap_dev(struct rbd_snap *snap) 2751{ 2752 list_del(&snap->node); 2753 if (device_is_registered(&snap->dev)) 2754 device_unregister(&snap->dev); 2755} 2756 2757static int rbd_register_snap_dev(struct rbd_snap *snap, 2758 struct device *parent) 2759{ 2760 struct device *dev = &snap->dev; 2761 int ret; 2762 2763 dev->type = &rbd_snap_device_type; 2764 dev->parent = parent; 2765 dev->release = rbd_snap_dev_release; 2766 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name); 2767 dout("%s: registering device for snapshot %s\n", __func__, snap->name); 2768 2769 ret = device_register(dev); 2770 2771 return ret; 2772} 2773 2774static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev, 2775 const char *snap_name, 2776 u64 snap_id, u64 snap_size, 2777 u64 snap_features) 2778{ 2779 struct rbd_snap *snap; 2780 int ret; 2781 2782 snap = kzalloc(sizeof (*snap), GFP_KERNEL); 2783 if (!snap) 2784 return ERR_PTR(-ENOMEM); 2785 2786 ret = -ENOMEM; 2787 snap->name = kstrdup(snap_name, GFP_KERNEL); 2788 if (!snap->name) 2789 goto err; 2790 2791 snap->id = snap_id; 2792 snap->size = snap_size; 2793 snap->features = snap_features; 2794 2795 return snap; 2796 2797err: 2798 kfree(snap->name); 2799 kfree(snap); 2800 2801 return ERR_PTR(ret); 2802} 2803 2804static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which, 2805 u64 *snap_size, u64 *snap_features) 2806{ 2807 char *snap_name; 2808 2809 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 2810 2811 *snap_size = rbd_dev->header.snap_sizes[which]; 2812 *snap_features = 0; /* No features for v1 */ 2813 2814 /* Skip over names until we find the one we are looking for */ 2815 2816 snap_name = rbd_dev->header.snap_names; 2817 while (which--) 2818 snap_name += strlen(snap_name) + 1; 2819 2820 return snap_name; 2821} 2822 2823/* 2824 * Get the size and object order for an image snapshot, or if 2825 * snap_id is CEPH_NOSNAP, gets this information for the base 2826 * image. 2827 */ 2828static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 2829 u8 *order, u64 *snap_size) 2830{ 2831 __le64 snapid = cpu_to_le64(snap_id); 2832 int ret; 2833 struct { 2834 u8 order; 2835 __le64 size; 2836 } __attribute__ ((packed)) size_buf = { 0 }; 2837 2838 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 2839 "rbd", "get_size", 2840 (char *) &snapid, sizeof (snapid), 2841 (char *) &size_buf, sizeof (size_buf), NULL); 2842 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 2843 if (ret < 0) 2844 return ret; 2845 2846 *order = size_buf.order; 2847 *snap_size = le64_to_cpu(size_buf.size); 2848 2849 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", 2850 (unsigned long long) snap_id, (unsigned int) *order, 2851 (unsigned long long) *snap_size); 2852 2853 return 0; 2854} 2855 2856static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 2857{ 2858 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 2859 &rbd_dev->header.obj_order, 2860 &rbd_dev->header.image_size); 2861} 2862 2863static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 2864{ 2865 void *reply_buf; 2866 int ret; 2867 void *p; 2868 2869 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 2870 if (!reply_buf) 2871 return -ENOMEM; 2872 2873 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 2874 "rbd", "get_object_prefix", 2875 NULL, 0, 2876 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL); 2877 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 2878 if (ret < 0) 2879 goto out; 2880 2881 p = reply_buf; 2882 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 2883 p + RBD_OBJ_PREFIX_LEN_MAX, 2884 NULL, GFP_NOIO); 2885 2886 if (IS_ERR(rbd_dev->header.object_prefix)) { 2887 ret = PTR_ERR(rbd_dev->header.object_prefix); 2888 rbd_dev->header.object_prefix = NULL; 2889 } else { 2890 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 2891 } 2892 2893out: 2894 kfree(reply_buf); 2895 2896 return ret; 2897} 2898 2899static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 2900 u64 *snap_features) 2901{ 2902 __le64 snapid = cpu_to_le64(snap_id); 2903 struct { 2904 __le64 features; 2905 __le64 incompat; 2906 } features_buf = { 0 }; 2907 u64 incompat; 2908 int ret; 2909 2910 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 2911 "rbd", "get_features", 2912 (char *) &snapid, sizeof (snapid), 2913 (char *) &features_buf, sizeof (features_buf), 2914 NULL); 2915 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 2916 if (ret < 0) 2917 return ret; 2918 2919 incompat = le64_to_cpu(features_buf.incompat); 2920 if (incompat & ~RBD_FEATURES_ALL) 2921 return -ENXIO; 2922 2923 *snap_features = le64_to_cpu(features_buf.features); 2924 2925 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 2926 (unsigned long long) snap_id, 2927 (unsigned long long) *snap_features, 2928 (unsigned long long) le64_to_cpu(features_buf.incompat)); 2929 2930 return 0; 2931} 2932 2933static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 2934{ 2935 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 2936 &rbd_dev->header.features); 2937} 2938 2939static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 2940{ 2941 struct rbd_spec *parent_spec; 2942 size_t size; 2943 void *reply_buf = NULL; 2944 __le64 snapid; 2945 void *p; 2946 void *end; 2947 char *image_id; 2948 u64 overlap; 2949 int ret; 2950 2951 parent_spec = rbd_spec_alloc(); 2952 if (!parent_spec) 2953 return -ENOMEM; 2954 2955 size = sizeof (__le64) + /* pool_id */ 2956 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ 2957 sizeof (__le64) + /* snap_id */ 2958 sizeof (__le64); /* overlap */ 2959 reply_buf = kmalloc(size, GFP_KERNEL); 2960 if (!reply_buf) { 2961 ret = -ENOMEM; 2962 goto out_err; 2963 } 2964 2965 snapid = cpu_to_le64(CEPH_NOSNAP); 2966 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 2967 "rbd", "get_parent", 2968 (char *) &snapid, sizeof (snapid), 2969 (char *) reply_buf, size, NULL); 2970 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 2971 if (ret < 0) 2972 goto out_err; 2973 2974 ret = -ERANGE; 2975 p = reply_buf; 2976 end = (char *) reply_buf + size; 2977 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err); 2978 if (parent_spec->pool_id == CEPH_NOPOOL) 2979 goto out; /* No parent? No problem. */ 2980 2981 /* The ceph file layout needs to fit pool id in 32 bits */ 2982 2983 ret = -EIO; 2984 if (WARN_ON(parent_spec->pool_id > (u64) U32_MAX)) 2985 goto out; 2986 2987 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 2988 if (IS_ERR(image_id)) { 2989 ret = PTR_ERR(image_id); 2990 goto out_err; 2991 } 2992 parent_spec->image_id = image_id; 2993 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); 2994 ceph_decode_64_safe(&p, end, overlap, out_err); 2995 2996 rbd_dev->parent_overlap = overlap; 2997 rbd_dev->parent_spec = parent_spec; 2998 parent_spec = NULL; /* rbd_dev now owns this */ 2999out: 3000 ret = 0; 3001out_err: 3002 kfree(reply_buf); 3003 rbd_spec_put(parent_spec); 3004 3005 return ret; 3006} 3007 3008static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 3009{ 3010 size_t image_id_size; 3011 char *image_id; 3012 void *p; 3013 void *end; 3014 size_t size; 3015 void *reply_buf = NULL; 3016 size_t len = 0; 3017 char *image_name = NULL; 3018 int ret; 3019 3020 rbd_assert(!rbd_dev->spec->image_name); 3021 3022 len = strlen(rbd_dev->spec->image_id); 3023 image_id_size = sizeof (__le32) + len; 3024 image_id = kmalloc(image_id_size, GFP_KERNEL); 3025 if (!image_id) 3026 return NULL; 3027 3028 p = image_id; 3029 end = (char *) image_id + image_id_size; 3030 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32) len); 3031 3032 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 3033 reply_buf = kmalloc(size, GFP_KERNEL); 3034 if (!reply_buf) 3035 goto out; 3036 3037 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, 3038 "rbd", "dir_get_name", 3039 image_id, image_id_size, 3040 (char *) reply_buf, size, NULL); 3041 if (ret < 0) 3042 goto out; 3043 p = reply_buf; 3044 end = (char *) reply_buf + size; 3045 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 3046 if (IS_ERR(image_name)) 3047 image_name = NULL; 3048 else 3049 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 3050out: 3051 kfree(reply_buf); 3052 kfree(image_id); 3053 3054 return image_name; 3055} 3056 3057/* 3058 * When a parent image gets probed, we only have the pool, image, 3059 * and snapshot ids but not the names of any of them. This call 3060 * is made later to fill in those names. It has to be done after 3061 * rbd_dev_snaps_update() has completed because some of the 3062 * information (in particular, snapshot name) is not available 3063 * until then. 3064 */ 3065static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) 3066{ 3067 struct ceph_osd_client *osdc; 3068 const char *name; 3069 void *reply_buf = NULL; 3070 int ret; 3071 3072 if (rbd_dev->spec->pool_name) 3073 return 0; /* Already have the names */ 3074 3075 /* Look up the pool name */ 3076 3077 osdc = &rbd_dev->rbd_client->client->osdc; 3078 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); 3079 if (!name) { 3080 rbd_warn(rbd_dev, "there is no pool with id %llu", 3081 rbd_dev->spec->pool_id); /* Really a BUG() */ 3082 return -EIO; 3083 } 3084 3085 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); 3086 if (!rbd_dev->spec->pool_name) 3087 return -ENOMEM; 3088 3089 /* Fetch the image name; tolerate failure here */ 3090 3091 name = rbd_dev_image_name(rbd_dev); 3092 if (name) 3093 rbd_dev->spec->image_name = (char *) name; 3094 else 3095 rbd_warn(rbd_dev, "unable to get image name"); 3096 3097 /* Look up the snapshot name. */ 3098 3099 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); 3100 if (!name) { 3101 rbd_warn(rbd_dev, "no snapshot with id %llu", 3102 rbd_dev->spec->snap_id); /* Really a BUG() */ 3103 ret = -EIO; 3104 goto out_err; 3105 } 3106 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL); 3107 if(!rbd_dev->spec->snap_name) 3108 goto out_err; 3109 3110 return 0; 3111out_err: 3112 kfree(reply_buf); 3113 kfree(rbd_dev->spec->pool_name); 3114 rbd_dev->spec->pool_name = NULL; 3115 3116 return ret; 3117} 3118 3119static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) 3120{ 3121 size_t size; 3122 int ret; 3123 void *reply_buf; 3124 void *p; 3125 void *end; 3126 u64 seq; 3127 u32 snap_count; 3128 struct ceph_snap_context *snapc; 3129 u32 i; 3130 3131 /* 3132 * We'll need room for the seq value (maximum snapshot id), 3133 * snapshot count, and array of that many snapshot ids. 3134 * For now we have a fixed upper limit on the number we're 3135 * prepared to receive. 3136 */ 3137 size = sizeof (__le64) + sizeof (__le32) + 3138 RBD_MAX_SNAP_COUNT * sizeof (__le64); 3139 reply_buf = kzalloc(size, GFP_KERNEL); 3140 if (!reply_buf) 3141 return -ENOMEM; 3142 3143 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3144 "rbd", "get_snapcontext", 3145 NULL, 0, 3146 reply_buf, size, ver); 3147 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3148 if (ret < 0) 3149 goto out; 3150 3151 ret = -ERANGE; 3152 p = reply_buf; 3153 end = (char *) reply_buf + size; 3154 ceph_decode_64_safe(&p, end, seq, out); 3155 ceph_decode_32_safe(&p, end, snap_count, out); 3156 3157 /* 3158 * Make sure the reported number of snapshot ids wouldn't go 3159 * beyond the end of our buffer. But before checking that, 3160 * make sure the computed size of the snapshot context we 3161 * allocate is representable in a size_t. 3162 */ 3163 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 3164 / sizeof (u64)) { 3165 ret = -EINVAL; 3166 goto out; 3167 } 3168 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 3169 goto out; 3170 3171 size = sizeof (struct ceph_snap_context) + 3172 snap_count * sizeof (snapc->snaps[0]); 3173 snapc = kmalloc(size, GFP_KERNEL); 3174 if (!snapc) { 3175 ret = -ENOMEM; 3176 goto out; 3177 } 3178 3179 atomic_set(&snapc->nref, 1); 3180 snapc->seq = seq; 3181 snapc->num_snaps = snap_count; 3182 for (i = 0; i < snap_count; i++) 3183 snapc->snaps[i] = ceph_decode_64(&p); 3184 3185 rbd_dev->header.snapc = snapc; 3186 3187 dout(" snap context seq = %llu, snap_count = %u\n", 3188 (unsigned long long) seq, (unsigned int) snap_count); 3189 3190out: 3191 kfree(reply_buf); 3192 3193 return 0; 3194} 3195 3196static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) 3197{ 3198 size_t size; 3199 void *reply_buf; 3200 __le64 snap_id; 3201 int ret; 3202 void *p; 3203 void *end; 3204 char *snap_name; 3205 3206 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 3207 reply_buf = kmalloc(size, GFP_KERNEL); 3208 if (!reply_buf) 3209 return ERR_PTR(-ENOMEM); 3210 3211 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]); 3212 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3213 "rbd", "get_snapshot_name", 3214 (char *) &snap_id, sizeof (snap_id), 3215 reply_buf, size, NULL); 3216 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3217 if (ret < 0) 3218 goto out; 3219 3220 p = reply_buf; 3221 end = (char *) reply_buf + size; 3222 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 3223 if (IS_ERR(snap_name)) { 3224 ret = PTR_ERR(snap_name); 3225 goto out; 3226 } else { 3227 dout(" snap_id 0x%016llx snap_name = %s\n", 3228 (unsigned long long) le64_to_cpu(snap_id), snap_name); 3229 } 3230 kfree(reply_buf); 3231 3232 return snap_name; 3233out: 3234 kfree(reply_buf); 3235 3236 return ERR_PTR(ret); 3237} 3238 3239static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, 3240 u64 *snap_size, u64 *snap_features) 3241{ 3242 u64 snap_id; 3243 u8 order; 3244 int ret; 3245 3246 snap_id = rbd_dev->header.snapc->snaps[which]; 3247 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size); 3248 if (ret) 3249 return ERR_PTR(ret); 3250 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features); 3251 if (ret) 3252 return ERR_PTR(ret); 3253 3254 return rbd_dev_v2_snap_name(rbd_dev, which); 3255} 3256 3257static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which, 3258 u64 *snap_size, u64 *snap_features) 3259{ 3260 if (rbd_dev->image_format == 1) 3261 return rbd_dev_v1_snap_info(rbd_dev, which, 3262 snap_size, snap_features); 3263 if (rbd_dev->image_format == 2) 3264 return rbd_dev_v2_snap_info(rbd_dev, which, 3265 snap_size, snap_features); 3266 return ERR_PTR(-EINVAL); 3267} 3268 3269static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver) 3270{ 3271 int ret; 3272 __u8 obj_order; 3273 3274 down_write(&rbd_dev->header_rwsem); 3275 3276 /* Grab old order first, to see if it changes */ 3277 3278 obj_order = rbd_dev->header.obj_order, 3279 ret = rbd_dev_v2_image_size(rbd_dev); 3280 if (ret) 3281 goto out; 3282 if (rbd_dev->header.obj_order != obj_order) { 3283 ret = -EIO; 3284 goto out; 3285 } 3286 rbd_update_mapping_size(rbd_dev); 3287 3288 ret = rbd_dev_v2_snap_context(rbd_dev, hver); 3289 dout("rbd_dev_v2_snap_context returned %d\n", ret); 3290 if (ret) 3291 goto out; 3292 ret = rbd_dev_snaps_update(rbd_dev); 3293 dout("rbd_dev_snaps_update returned %d\n", ret); 3294 if (ret) 3295 goto out; 3296 ret = rbd_dev_snaps_register(rbd_dev); 3297 dout("rbd_dev_snaps_register returned %d\n", ret); 3298out: 3299 up_write(&rbd_dev->header_rwsem); 3300 3301 return ret; 3302} 3303 3304/* 3305 * Scan the rbd device's current snapshot list and compare it to the 3306 * newly-received snapshot context. Remove any existing snapshots 3307 * not present in the new snapshot context. Add a new snapshot for 3308 * any snaphots in the snapshot context not in the current list. 3309 * And verify there are no changes to snapshots we already know 3310 * about. 3311 * 3312 * Assumes the snapshots in the snapshot context are sorted by 3313 * snapshot id, highest id first. (Snapshots in the rbd_dev's list 3314 * are also maintained in that order.) 3315 */ 3316static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) 3317{ 3318 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 3319 const u32 snap_count = snapc->num_snaps; 3320 struct list_head *head = &rbd_dev->snaps; 3321 struct list_head *links = head->next; 3322 u32 index = 0; 3323 3324 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count); 3325 while (index < snap_count || links != head) { 3326 u64 snap_id; 3327 struct rbd_snap *snap; 3328 char *snap_name; 3329 u64 snap_size = 0; 3330 u64 snap_features = 0; 3331 3332 snap_id = index < snap_count ? snapc->snaps[index] 3333 : CEPH_NOSNAP; 3334 snap = links != head ? list_entry(links, struct rbd_snap, node) 3335 : NULL; 3336 rbd_assert(!snap || snap->id != CEPH_NOSNAP); 3337 3338 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) { 3339 struct list_head *next = links->next; 3340 3341 /* 3342 * A previously-existing snapshot is not in 3343 * the new snap context. 3344 * 3345 * If the now missing snapshot is the one the 3346 * image is mapped to, clear its exists flag 3347 * so we can avoid sending any more requests 3348 * to it. 3349 */ 3350 if (rbd_dev->spec->snap_id == snap->id) 3351 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 3352 rbd_remove_snap_dev(snap); 3353 dout("%ssnap id %llu has been removed\n", 3354 rbd_dev->spec->snap_id == snap->id ? 3355 "mapped " : "", 3356 (unsigned long long) snap->id); 3357 3358 /* Done with this list entry; advance */ 3359 3360 links = next; 3361 continue; 3362 } 3363 3364 snap_name = rbd_dev_snap_info(rbd_dev, index, 3365 &snap_size, &snap_features); 3366 if (IS_ERR(snap_name)) 3367 return PTR_ERR(snap_name); 3368 3369 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count, 3370 (unsigned long long) snap_id); 3371 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) { 3372 struct rbd_snap *new_snap; 3373 3374 /* We haven't seen this snapshot before */ 3375 3376 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name, 3377 snap_id, snap_size, snap_features); 3378 if (IS_ERR(new_snap)) { 3379 int err = PTR_ERR(new_snap); 3380 3381 dout(" failed to add dev, error %d\n", err); 3382 3383 return err; 3384 } 3385 3386 /* New goes before existing, or at end of list */ 3387 3388 dout(" added dev%s\n", snap ? "" : " at end\n"); 3389 if (snap) 3390 list_add_tail(&new_snap->node, &snap->node); 3391 else 3392 list_add_tail(&new_snap->node, head); 3393 } else { 3394 /* Already have this one */ 3395 3396 dout(" already present\n"); 3397 3398 rbd_assert(snap->size == snap_size); 3399 rbd_assert(!strcmp(snap->name, snap_name)); 3400 rbd_assert(snap->features == snap_features); 3401 3402 /* Done with this list entry; advance */ 3403 3404 links = links->next; 3405 } 3406 3407 /* Advance to the next entry in the snapshot context */ 3408 3409 index++; 3410 } 3411 dout("%s: done\n", __func__); 3412 3413 return 0; 3414} 3415 3416/* 3417 * Scan the list of snapshots and register the devices for any that 3418 * have not already been registered. 3419 */ 3420static int rbd_dev_snaps_register(struct rbd_device *rbd_dev) 3421{ 3422 struct rbd_snap *snap; 3423 int ret = 0; 3424 3425 dout("%s:\n", __func__); 3426 if (WARN_ON(!device_is_registered(&rbd_dev->dev))) 3427 return -EIO; 3428 3429 list_for_each_entry(snap, &rbd_dev->snaps, node) { 3430 if (!rbd_snap_registered(snap)) { 3431 ret = rbd_register_snap_dev(snap, &rbd_dev->dev); 3432 if (ret < 0) 3433 break; 3434 } 3435 } 3436 dout("%s: returning %d\n", __func__, ret); 3437 3438 return ret; 3439} 3440 3441static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 3442{ 3443 struct device *dev; 3444 int ret; 3445 3446 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 3447 3448 dev = &rbd_dev->dev; 3449 dev->bus = &rbd_bus_type; 3450 dev->type = &rbd_device_type; 3451 dev->parent = &rbd_root_dev; 3452 dev->release = rbd_dev_release; 3453 dev_set_name(dev, "%d", rbd_dev->dev_id); 3454 ret = device_register(dev); 3455 3456 mutex_unlock(&ctl_mutex); 3457 3458 return ret; 3459} 3460 3461static void rbd_bus_del_dev(struct rbd_device *rbd_dev) 3462{ 3463 device_unregister(&rbd_dev->dev); 3464} 3465 3466static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); 3467 3468/* 3469 * Get a unique rbd identifier for the given new rbd_dev, and add 3470 * the rbd_dev to the global list. The minimum rbd id is 1. 3471 */ 3472static void rbd_dev_id_get(struct rbd_device *rbd_dev) 3473{ 3474 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max); 3475 3476 spin_lock(&rbd_dev_list_lock); 3477 list_add_tail(&rbd_dev->node, &rbd_dev_list); 3478 spin_unlock(&rbd_dev_list_lock); 3479 dout("rbd_dev %p given dev id %llu\n", rbd_dev, 3480 (unsigned long long) rbd_dev->dev_id); 3481} 3482 3483/* 3484 * Remove an rbd_dev from the global list, and record that its 3485 * identifier is no longer in use. 3486 */ 3487static void rbd_dev_id_put(struct rbd_device *rbd_dev) 3488{ 3489 struct list_head *tmp; 3490 int rbd_id = rbd_dev->dev_id; 3491 int max_id; 3492 3493 rbd_assert(rbd_id > 0); 3494 3495 dout("rbd_dev %p released dev id %llu\n", rbd_dev, 3496 (unsigned long long) rbd_dev->dev_id); 3497 spin_lock(&rbd_dev_list_lock); 3498 list_del_init(&rbd_dev->node); 3499 3500 /* 3501 * If the id being "put" is not the current maximum, there 3502 * is nothing special we need to do. 3503 */ 3504 if (rbd_id != atomic64_read(&rbd_dev_id_max)) { 3505 spin_unlock(&rbd_dev_list_lock); 3506 return; 3507 } 3508 3509 /* 3510 * We need to update the current maximum id. Search the 3511 * list to find out what it is. We're more likely to find 3512 * the maximum at the end, so search the list backward. 3513 */ 3514 max_id = 0; 3515 list_for_each_prev(tmp, &rbd_dev_list) { 3516 struct rbd_device *rbd_dev; 3517 3518 rbd_dev = list_entry(tmp, struct rbd_device, node); 3519 if (rbd_dev->dev_id > max_id) 3520 max_id = rbd_dev->dev_id; 3521 } 3522 spin_unlock(&rbd_dev_list_lock); 3523 3524 /* 3525 * The max id could have been updated by rbd_dev_id_get(), in 3526 * which case it now accurately reflects the new maximum. 3527 * Be careful not to overwrite the maximum value in that 3528 * case. 3529 */ 3530 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id); 3531 dout(" max dev id has been reset\n"); 3532} 3533 3534/* 3535 * Skips over white space at *buf, and updates *buf to point to the 3536 * first found non-space character (if any). Returns the length of 3537 * the token (string of non-white space characters) found. Note 3538 * that *buf must be terminated with '\0'. 3539 */ 3540static inline size_t next_token(const char **buf) 3541{ 3542 /* 3543 * These are the characters that produce nonzero for 3544 * isspace() in the "C" and "POSIX" locales. 3545 */ 3546 const char *spaces = " \f\n\r\t\v"; 3547 3548 *buf += strspn(*buf, spaces); /* Find start of token */ 3549 3550 return strcspn(*buf, spaces); /* Return token length */ 3551} 3552 3553/* 3554 * Finds the next token in *buf, and if the provided token buffer is 3555 * big enough, copies the found token into it. The result, if 3556 * copied, is guaranteed to be terminated with '\0'. Note that *buf 3557 * must be terminated with '\0' on entry. 3558 * 3559 * Returns the length of the token found (not including the '\0'). 3560 * Return value will be 0 if no token is found, and it will be >= 3561 * token_size if the token would not fit. 3562 * 3563 * The *buf pointer will be updated to point beyond the end of the 3564 * found token. Note that this occurs even if the token buffer is 3565 * too small to hold it. 3566 */ 3567static inline size_t copy_token(const char **buf, 3568 char *token, 3569 size_t token_size) 3570{ 3571 size_t len; 3572 3573 len = next_token(buf); 3574 if (len < token_size) { 3575 memcpy(token, *buf, len); 3576 *(token + len) = '\0'; 3577 } 3578 *buf += len; 3579 3580 return len; 3581} 3582 3583/* 3584 * Finds the next token in *buf, dynamically allocates a buffer big 3585 * enough to hold a copy of it, and copies the token into the new 3586 * buffer. The copy is guaranteed to be terminated with '\0'. Note 3587 * that a duplicate buffer is created even for a zero-length token. 3588 * 3589 * Returns a pointer to the newly-allocated duplicate, or a null 3590 * pointer if memory for the duplicate was not available. If 3591 * the lenp argument is a non-null pointer, the length of the token 3592 * (not including the '\0') is returned in *lenp. 3593 * 3594 * If successful, the *buf pointer will be updated to point beyond 3595 * the end of the found token. 3596 * 3597 * Note: uses GFP_KERNEL for allocation. 3598 */ 3599static inline char *dup_token(const char **buf, size_t *lenp) 3600{ 3601 char *dup; 3602 size_t len; 3603 3604 len = next_token(buf); 3605 dup = kmemdup(*buf, len + 1, GFP_KERNEL); 3606 if (!dup) 3607 return NULL; 3608 *(dup + len) = '\0'; 3609 *buf += len; 3610 3611 if (lenp) 3612 *lenp = len; 3613 3614 return dup; 3615} 3616 3617/* 3618 * Parse the options provided for an "rbd add" (i.e., rbd image 3619 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 3620 * and the data written is passed here via a NUL-terminated buffer. 3621 * Returns 0 if successful or an error code otherwise. 3622 * 3623 * The information extracted from these options is recorded in 3624 * the other parameters which return dynamically-allocated 3625 * structures: 3626 * ceph_opts 3627 * The address of a pointer that will refer to a ceph options 3628 * structure. Caller must release the returned pointer using 3629 * ceph_destroy_options() when it is no longer needed. 3630 * rbd_opts 3631 * Address of an rbd options pointer. Fully initialized by 3632 * this function; caller must release with kfree(). 3633 * spec 3634 * Address of an rbd image specification pointer. Fully 3635 * initialized by this function based on parsed options. 3636 * Caller must release with rbd_spec_put(). 3637 * 3638 * The options passed take this form: 3639 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 3640 * where: 3641 * <mon_addrs> 3642 * A comma-separated list of one or more monitor addresses. 3643 * A monitor address is an ip address, optionally followed 3644 * by a port number (separated by a colon). 3645 * I.e.: ip1[:port1][,ip2[:port2]...] 3646 * <options> 3647 * A comma-separated list of ceph and/or rbd options. 3648 * <pool_name> 3649 * The name of the rados pool containing the rbd image. 3650 * <image_name> 3651 * The name of the image in that pool to map. 3652 * <snap_id> 3653 * An optional snapshot id. If provided, the mapping will 3654 * present data from the image at the time that snapshot was 3655 * created. The image head is used if no snapshot id is 3656 * provided. Snapshot mappings are always read-only. 3657 */ 3658static int rbd_add_parse_args(const char *buf, 3659 struct ceph_options **ceph_opts, 3660 struct rbd_options **opts, 3661 struct rbd_spec **rbd_spec) 3662{ 3663 size_t len; 3664 char *options; 3665 const char *mon_addrs; 3666 size_t mon_addrs_size; 3667 struct rbd_spec *spec = NULL; 3668 struct rbd_options *rbd_opts = NULL; 3669 struct ceph_options *copts; 3670 int ret; 3671 3672 /* The first four tokens are required */ 3673 3674 len = next_token(&buf); 3675 if (!len) { 3676 rbd_warn(NULL, "no monitor address(es) provided"); 3677 return -EINVAL; 3678 } 3679 mon_addrs = buf; 3680 mon_addrs_size = len + 1; 3681 buf += len; 3682 3683 ret = -EINVAL; 3684 options = dup_token(&buf, NULL); 3685 if (!options) 3686 return -ENOMEM; 3687 if (!*options) { 3688 rbd_warn(NULL, "no options provided"); 3689 goto out_err; 3690 } 3691 3692 spec = rbd_spec_alloc(); 3693 if (!spec) 3694 goto out_mem; 3695 3696 spec->pool_name = dup_token(&buf, NULL); 3697 if (!spec->pool_name) 3698 goto out_mem; 3699 if (!*spec->pool_name) { 3700 rbd_warn(NULL, "no pool name provided"); 3701 goto out_err; 3702 } 3703 3704 spec->image_name = dup_token(&buf, NULL); 3705 if (!spec->image_name) 3706 goto out_mem; 3707 if (!*spec->image_name) { 3708 rbd_warn(NULL, "no image name provided"); 3709 goto out_err; 3710 } 3711 3712 /* 3713 * Snapshot name is optional; default is to use "-" 3714 * (indicating the head/no snapshot). 3715 */ 3716 len = next_token(&buf); 3717 if (!len) { 3718 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 3719 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 3720 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 3721 ret = -ENAMETOOLONG; 3722 goto out_err; 3723 } 3724 spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 3725 if (!spec->snap_name) 3726 goto out_mem; 3727 *(spec->snap_name + len) = '\0'; 3728 3729 /* Initialize all rbd options to the defaults */ 3730 3731 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); 3732 if (!rbd_opts) 3733 goto out_mem; 3734 3735 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 3736 3737 copts = ceph_parse_options(options, mon_addrs, 3738 mon_addrs + mon_addrs_size - 1, 3739 parse_rbd_opts_token, rbd_opts); 3740 if (IS_ERR(copts)) { 3741 ret = PTR_ERR(copts); 3742 goto out_err; 3743 } 3744 kfree(options); 3745 3746 *ceph_opts = copts; 3747 *opts = rbd_opts; 3748 *rbd_spec = spec; 3749 3750 return 0; 3751out_mem: 3752 ret = -ENOMEM; 3753out_err: 3754 kfree(rbd_opts); 3755 rbd_spec_put(spec); 3756 kfree(options); 3757 3758 return ret; 3759} 3760 3761/* 3762 * An rbd format 2 image has a unique identifier, distinct from the 3763 * name given to it by the user. Internally, that identifier is 3764 * what's used to specify the names of objects related to the image. 3765 * 3766 * A special "rbd id" object is used to map an rbd image name to its 3767 * id. If that object doesn't exist, then there is no v2 rbd image 3768 * with the supplied name. 3769 * 3770 * This function will record the given rbd_dev's image_id field if 3771 * it can be determined, and in that case will return 0. If any 3772 * errors occur a negative errno will be returned and the rbd_dev's 3773 * image_id field will be unchanged (and should be NULL). 3774 */ 3775static int rbd_dev_image_id(struct rbd_device *rbd_dev) 3776{ 3777 int ret; 3778 size_t size; 3779 char *object_name; 3780 void *response; 3781 void *p; 3782 3783 /* 3784 * When probing a parent image, the image id is already 3785 * known (and the image name likely is not). There's no 3786 * need to fetch the image id again in this case. 3787 */ 3788 if (rbd_dev->spec->image_id) 3789 return 0; 3790 3791 /* 3792 * First, see if the format 2 image id file exists, and if 3793 * so, get the image's persistent id from it. 3794 */ 3795 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name); 3796 object_name = kmalloc(size, GFP_NOIO); 3797 if (!object_name) 3798 return -ENOMEM; 3799 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); 3800 dout("rbd id object name is %s\n", object_name); 3801 3802 /* Response will be an encoded string, which includes a length */ 3803 3804 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 3805 response = kzalloc(size, GFP_NOIO); 3806 if (!response) { 3807 ret = -ENOMEM; 3808 goto out; 3809 } 3810 3811 ret = rbd_obj_method_sync(rbd_dev, object_name, 3812 "rbd", "get_id", 3813 NULL, 0, 3814 response, RBD_IMAGE_ID_LEN_MAX, NULL); 3815 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3816 if (ret < 0) 3817 goto out; 3818 3819 p = response; 3820 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p, 3821 p + RBD_IMAGE_ID_LEN_MAX, 3822 NULL, GFP_NOIO); 3823 if (IS_ERR(rbd_dev->spec->image_id)) { 3824 ret = PTR_ERR(rbd_dev->spec->image_id); 3825 rbd_dev->spec->image_id = NULL; 3826 } else { 3827 dout("image_id is %s\n", rbd_dev->spec->image_id); 3828 } 3829out: 3830 kfree(response); 3831 kfree(object_name); 3832 3833 return ret; 3834} 3835 3836static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) 3837{ 3838 int ret; 3839 size_t size; 3840 3841 /* Version 1 images have no id; empty string is used */ 3842 3843 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL); 3844 if (!rbd_dev->spec->image_id) 3845 return -ENOMEM; 3846 3847 /* Record the header object name for this rbd image. */ 3848 3849 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX); 3850 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3851 if (!rbd_dev->header_name) { 3852 ret = -ENOMEM; 3853 goto out_err; 3854 } 3855 sprintf(rbd_dev->header_name, "%s%s", 3856 rbd_dev->spec->image_name, RBD_SUFFIX); 3857 3858 /* Populate rbd image metadata */ 3859 3860 ret = rbd_read_header(rbd_dev, &rbd_dev->header); 3861 if (ret < 0) 3862 goto out_err; 3863 3864 /* Version 1 images have no parent (no layering) */ 3865 3866 rbd_dev->parent_spec = NULL; 3867 rbd_dev->parent_overlap = 0; 3868 3869 rbd_dev->image_format = 1; 3870 3871 dout("discovered version 1 image, header name is %s\n", 3872 rbd_dev->header_name); 3873 3874 return 0; 3875 3876out_err: 3877 kfree(rbd_dev->header_name); 3878 rbd_dev->header_name = NULL; 3879 kfree(rbd_dev->spec->image_id); 3880 rbd_dev->spec->image_id = NULL; 3881 3882 return ret; 3883} 3884 3885static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) 3886{ 3887 size_t size; 3888 int ret; 3889 u64 ver = 0; 3890 3891 /* 3892 * Image id was filled in by the caller. Record the header 3893 * object name for this rbd image. 3894 */ 3895 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id); 3896 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 3897 if (!rbd_dev->header_name) 3898 return -ENOMEM; 3899 sprintf(rbd_dev->header_name, "%s%s", 3900 RBD_HEADER_PREFIX, rbd_dev->spec->image_id); 3901 3902 /* Get the size and object order for the image */ 3903 3904 ret = rbd_dev_v2_image_size(rbd_dev); 3905 if (ret < 0) 3906 goto out_err; 3907 3908 /* Get the object prefix (a.k.a. block_name) for the image */ 3909 3910 ret = rbd_dev_v2_object_prefix(rbd_dev); 3911 if (ret < 0) 3912 goto out_err; 3913 3914 /* Get the and check features for the image */ 3915 3916 ret = rbd_dev_v2_features(rbd_dev); 3917 if (ret < 0) 3918 goto out_err; 3919 3920 /* If the image supports layering, get the parent info */ 3921 3922 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 3923 ret = rbd_dev_v2_parent_info(rbd_dev); 3924 if (ret < 0) 3925 goto out_err; 3926 } 3927 3928 /* crypto and compression type aren't (yet) supported for v2 images */ 3929 3930 rbd_dev->header.crypt_type = 0; 3931 rbd_dev->header.comp_type = 0; 3932 3933 /* Get the snapshot context, plus the header version */ 3934 3935 ret = rbd_dev_v2_snap_context(rbd_dev, &ver); 3936 if (ret) 3937 goto out_err; 3938 rbd_dev->header.obj_version = ver; 3939 3940 rbd_dev->image_format = 2; 3941 3942 dout("discovered version 2 image, header name is %s\n", 3943 rbd_dev->header_name); 3944 3945 return 0; 3946out_err: 3947 rbd_dev->parent_overlap = 0; 3948 rbd_spec_put(rbd_dev->parent_spec); 3949 rbd_dev->parent_spec = NULL; 3950 kfree(rbd_dev->header_name); 3951 rbd_dev->header_name = NULL; 3952 kfree(rbd_dev->header.object_prefix); 3953 rbd_dev->header.object_prefix = NULL; 3954 3955 return ret; 3956} 3957 3958static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) 3959{ 3960 int ret; 3961 3962 /* no need to lock here, as rbd_dev is not registered yet */ 3963 ret = rbd_dev_snaps_update(rbd_dev); 3964 if (ret) 3965 return ret; 3966 3967 ret = rbd_dev_probe_update_spec(rbd_dev); 3968 if (ret) 3969 goto err_out_snaps; 3970 3971 ret = rbd_dev_set_mapping(rbd_dev); 3972 if (ret) 3973 goto err_out_snaps; 3974 3975 /* generate unique id: find highest unique id, add one */ 3976 rbd_dev_id_get(rbd_dev); 3977 3978 /* Fill in the device name, now that we have its id. */ 3979 BUILD_BUG_ON(DEV_NAME_LEN 3980 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 3981 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); 3982 3983 /* Get our block major device number. */ 3984 3985 ret = register_blkdev(0, rbd_dev->name); 3986 if (ret < 0) 3987 goto err_out_id; 3988 rbd_dev->major = ret; 3989 3990 /* Set up the blkdev mapping. */ 3991 3992 ret = rbd_init_disk(rbd_dev); 3993 if (ret) 3994 goto err_out_blkdev; 3995 3996 ret = rbd_bus_add_dev(rbd_dev); 3997 if (ret) 3998 goto err_out_disk; 3999 4000 /* 4001 * At this point cleanup in the event of an error is the job 4002 * of the sysfs code (initiated by rbd_bus_del_dev()). 4003 */ 4004 down_write(&rbd_dev->header_rwsem); 4005 ret = rbd_dev_snaps_register(rbd_dev); 4006 up_write(&rbd_dev->header_rwsem); 4007 if (ret) 4008 goto err_out_bus; 4009 4010 ret = rbd_dev_header_watch_sync(rbd_dev, 1); 4011 if (ret) 4012 goto err_out_bus; 4013 4014 /* Everything's ready. Announce the disk to the world. */ 4015 4016 add_disk(rbd_dev->disk); 4017 4018 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 4019 (unsigned long long) rbd_dev->mapping.size); 4020 4021 return ret; 4022err_out_bus: 4023 /* this will also clean up rest of rbd_dev stuff */ 4024 4025 rbd_bus_del_dev(rbd_dev); 4026 4027 return ret; 4028err_out_disk: 4029 rbd_free_disk(rbd_dev); 4030err_out_blkdev: 4031 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4032err_out_id: 4033 rbd_dev_id_put(rbd_dev); 4034err_out_snaps: 4035 rbd_remove_all_snaps(rbd_dev); 4036 4037 return ret; 4038} 4039 4040/* 4041 * Probe for the existence of the header object for the given rbd 4042 * device. For format 2 images this includes determining the image 4043 * id. 4044 */ 4045static int rbd_dev_probe(struct rbd_device *rbd_dev) 4046{ 4047 int ret; 4048 4049 /* 4050 * Get the id from the image id object. If it's not a 4051 * format 2 image, we'll get ENOENT back, and we'll assume 4052 * it's a format 1 image. 4053 */ 4054 ret = rbd_dev_image_id(rbd_dev); 4055 if (ret) 4056 ret = rbd_dev_v1_probe(rbd_dev); 4057 else 4058 ret = rbd_dev_v2_probe(rbd_dev); 4059 if (ret) { 4060 dout("probe failed, returning %d\n", ret); 4061 4062 return ret; 4063 } 4064 4065 ret = rbd_dev_probe_finish(rbd_dev); 4066 if (ret) 4067 rbd_header_free(&rbd_dev->header); 4068 4069 return ret; 4070} 4071 4072static ssize_t rbd_add(struct bus_type *bus, 4073 const char *buf, 4074 size_t count) 4075{ 4076 struct rbd_device *rbd_dev = NULL; 4077 struct ceph_options *ceph_opts = NULL; 4078 struct rbd_options *rbd_opts = NULL; 4079 struct rbd_spec *spec = NULL; 4080 struct rbd_client *rbdc; 4081 struct ceph_osd_client *osdc; 4082 int rc = -ENOMEM; 4083 4084 if (!try_module_get(THIS_MODULE)) 4085 return -ENODEV; 4086 4087 /* parse add command */ 4088 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 4089 if (rc < 0) 4090 goto err_out_module; 4091 4092 rbdc = rbd_get_client(ceph_opts); 4093 if (IS_ERR(rbdc)) { 4094 rc = PTR_ERR(rbdc); 4095 goto err_out_args; 4096 } 4097 ceph_opts = NULL; /* rbd_dev client now owns this */ 4098 4099 /* pick the pool */ 4100 osdc = &rbdc->client->osdc; 4101 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); 4102 if (rc < 0) 4103 goto err_out_client; 4104 spec->pool_id = (u64) rc; 4105 4106 /* The ceph file layout needs to fit pool id in 32 bits */ 4107 4108 if (WARN_ON(spec->pool_id > (u64) U32_MAX)) { 4109 rc = -EIO; 4110 goto err_out_client; 4111 } 4112 4113 rbd_dev = rbd_dev_create(rbdc, spec); 4114 if (!rbd_dev) 4115 goto err_out_client; 4116 rbdc = NULL; /* rbd_dev now owns this */ 4117 spec = NULL; /* rbd_dev now owns this */ 4118 4119 rbd_dev->mapping.read_only = rbd_opts->read_only; 4120 kfree(rbd_opts); 4121 rbd_opts = NULL; /* done with this */ 4122 4123 rc = rbd_dev_probe(rbd_dev); 4124 if (rc < 0) 4125 goto err_out_rbd_dev; 4126 4127 return count; 4128err_out_rbd_dev: 4129 rbd_dev_destroy(rbd_dev); 4130err_out_client: 4131 rbd_put_client(rbdc); 4132err_out_args: 4133 if (ceph_opts) 4134 ceph_destroy_options(ceph_opts); 4135 kfree(rbd_opts); 4136 rbd_spec_put(spec); 4137err_out_module: 4138 module_put(THIS_MODULE); 4139 4140 dout("Error adding device %s\n", buf); 4141 4142 return (ssize_t) rc; 4143} 4144 4145static struct rbd_device *__rbd_get_dev(unsigned long dev_id) 4146{ 4147 struct list_head *tmp; 4148 struct rbd_device *rbd_dev; 4149 4150 spin_lock(&rbd_dev_list_lock); 4151 list_for_each(tmp, &rbd_dev_list) { 4152 rbd_dev = list_entry(tmp, struct rbd_device, node); 4153 if (rbd_dev->dev_id == dev_id) { 4154 spin_unlock(&rbd_dev_list_lock); 4155 return rbd_dev; 4156 } 4157 } 4158 spin_unlock(&rbd_dev_list_lock); 4159 return NULL; 4160} 4161 4162static void rbd_dev_release(struct device *dev) 4163{ 4164 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4165 4166 if (rbd_dev->watch_event) 4167 rbd_dev_header_watch_sync(rbd_dev, 0); 4168 4169 /* clean up and free blkdev */ 4170 rbd_free_disk(rbd_dev); 4171 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4172 4173 /* release allocated disk header fields */ 4174 rbd_header_free(&rbd_dev->header); 4175 4176 /* done with the id, and with the rbd_dev */ 4177 rbd_dev_id_put(rbd_dev); 4178 rbd_assert(rbd_dev->rbd_client != NULL); 4179 rbd_dev_destroy(rbd_dev); 4180 4181 /* release module ref */ 4182 module_put(THIS_MODULE); 4183} 4184 4185static ssize_t rbd_remove(struct bus_type *bus, 4186 const char *buf, 4187 size_t count) 4188{ 4189 struct rbd_device *rbd_dev = NULL; 4190 int target_id, rc; 4191 unsigned long ul; 4192 int ret = count; 4193 4194 rc = strict_strtoul(buf, 10, &ul); 4195 if (rc) 4196 return rc; 4197 4198 /* convert to int; abort if we lost anything in the conversion */ 4199 target_id = (int) ul; 4200 if (target_id != ul) 4201 return -EINVAL; 4202 4203 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 4204 4205 rbd_dev = __rbd_get_dev(target_id); 4206 if (!rbd_dev) { 4207 ret = -ENOENT; 4208 goto done; 4209 } 4210 4211 spin_lock_irq(&rbd_dev->lock); 4212 if (rbd_dev->open_count) 4213 ret = -EBUSY; 4214 else 4215 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags); 4216 spin_unlock_irq(&rbd_dev->lock); 4217 if (ret < 0) 4218 goto done; 4219 4220 rbd_remove_all_snaps(rbd_dev); 4221 rbd_bus_del_dev(rbd_dev); 4222 4223done: 4224 mutex_unlock(&ctl_mutex); 4225 4226 return ret; 4227} 4228 4229/* 4230 * create control files in sysfs 4231 * /sys/bus/rbd/... 4232 */ 4233static int rbd_sysfs_init(void) 4234{ 4235 int ret; 4236 4237 ret = device_register(&rbd_root_dev); 4238 if (ret < 0) 4239 return ret; 4240 4241 ret = bus_register(&rbd_bus_type); 4242 if (ret < 0) 4243 device_unregister(&rbd_root_dev); 4244 4245 return ret; 4246} 4247 4248static void rbd_sysfs_cleanup(void) 4249{ 4250 bus_unregister(&rbd_bus_type); 4251 device_unregister(&rbd_root_dev); 4252} 4253 4254static int __init rbd_init(void) 4255{ 4256 int rc; 4257 4258 if (!libceph_compatible(NULL)) { 4259 rbd_warn(NULL, "libceph incompatibility (quitting)"); 4260 4261 return -EINVAL; 4262 } 4263 rc = rbd_sysfs_init(); 4264 if (rc) 4265 return rc; 4266 pr_info("loaded " RBD_DRV_NAME_LONG "\n"); 4267 return 0; 4268} 4269 4270static void __exit rbd_exit(void) 4271{ 4272 rbd_sysfs_cleanup(); 4273} 4274 4275module_init(rbd_init); 4276module_exit(rbd_exit); 4277 4278MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 4279MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 4280MODULE_DESCRIPTION("rados block device"); 4281 4282/* following authorship retained from original osdblk.c */ 4283MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 4284 4285MODULE_LICENSE("GPL"); 4286