rbd.c revision c0cd10db4685a76397f32bed246e861705642576
1/* 2 rbd.c -- Export ceph rados objects as a Linux block device 3 4 5 based on drivers/block/osdblk.c: 6 7 Copyright 2009 Red Hat, Inc. 8 9 This program is free software; you can redistribute it and/or modify 10 it under the terms of the GNU General Public License as published by 11 the Free Software Foundation. 12 13 This program is distributed in the hope that it will be useful, 14 but WITHOUT ANY WARRANTY; without even the implied warranty of 15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 GNU General Public License for more details. 17 18 You should have received a copy of the GNU General Public License 19 along with this program; see the file COPYING. If not, write to 20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 21 22 23 24 For usage instructions, please refer to: 25 26 Documentation/ABI/testing/sysfs-bus-rbd 27 28 */ 29 30#include <linux/ceph/libceph.h> 31#include <linux/ceph/osd_client.h> 32#include <linux/ceph/mon_client.h> 33#include <linux/ceph/decode.h> 34#include <linux/parser.h> 35 36#include <linux/kernel.h> 37#include <linux/device.h> 38#include <linux/module.h> 39#include <linux/fs.h> 40#include <linux/blkdev.h> 41 42#include "rbd_types.h" 43 44#define RBD_DEBUG /* Activate rbd_assert() calls */ 45 46/* 47 * The basic unit of block I/O is a sector. It is interpreted in a 48 * number of contexts in Linux (blk, bio, genhd), but the default is 49 * universally 512 bytes. These symbols are just slightly more 50 * meaningful than the bare numbers they represent. 51 */ 52#define SECTOR_SHIFT 9 53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT) 54 55#define RBD_DRV_NAME "rbd" 56#define RBD_DRV_NAME_LONG "rbd (rados block device)" 57 58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */ 59 60#define RBD_SNAP_DEV_NAME_PREFIX "snap_" 61#define RBD_MAX_SNAP_NAME_LEN \ 62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1)) 63 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */ 65 66#define RBD_SNAP_HEAD_NAME "-" 67 68/* This allows a single page to hold an image name sent by OSD */ 69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1) 70#define RBD_IMAGE_ID_LEN_MAX 64 71 72#define RBD_OBJ_PREFIX_LEN_MAX 64 73 74/* Feature bits */ 75 76#define RBD_FEATURE_LAYERING (1<<0) 77#define RBD_FEATURE_STRIPINGV2 (1<<1) 78#define RBD_FEATURES_ALL \ 79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2) 80 81/* Features supported by this (client software) implementation. */ 82 83#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL) 84 85/* 86 * An RBD device name will be "rbd#", where the "rbd" comes from 87 * RBD_DRV_NAME above, and # is a unique integer identifier. 88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big 89 * enough to hold all possible device names. 90 */ 91#define DEV_NAME_LEN 32 92#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1) 93 94/* 95 * block device image metadata (in-memory version) 96 */ 97struct rbd_image_header { 98 /* These four fields never change for a given rbd image */ 99 char *object_prefix; 100 u64 features; 101 __u8 obj_order; 102 __u8 crypt_type; 103 __u8 comp_type; 104 105 /* The remaining fields need to be updated occasionally */ 106 u64 image_size; 107 struct ceph_snap_context *snapc; 108 char *snap_names; 109 u64 *snap_sizes; 110 111 u64 stripe_unit; 112 u64 stripe_count; 113 114 u64 obj_version; 115}; 116 117/* 118 * An rbd image specification. 119 * 120 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely 121 * identify an image. Each rbd_dev structure includes a pointer to 122 * an rbd_spec structure that encapsulates this identity. 123 * 124 * Each of the id's in an rbd_spec has an associated name. For a 125 * user-mapped image, the names are supplied and the id's associated 126 * with them are looked up. For a layered image, a parent image is 127 * defined by the tuple, and the names are looked up. 128 * 129 * An rbd_dev structure contains a parent_spec pointer which is 130 * non-null if the image it represents is a child in a layered 131 * image. This pointer will refer to the rbd_spec structure used 132 * by the parent rbd_dev for its own identity (i.e., the structure 133 * is shared between the parent and child). 134 * 135 * Since these structures are populated once, during the discovery 136 * phase of image construction, they are effectively immutable so 137 * we make no effort to synchronize access to them. 138 * 139 * Note that code herein does not assume the image name is known (it 140 * could be a null pointer). 141 */ 142struct rbd_spec { 143 u64 pool_id; 144 const char *pool_name; 145 146 const char *image_id; 147 const char *image_name; 148 149 u64 snap_id; 150 const char *snap_name; 151 152 struct kref kref; 153}; 154 155/* 156 * an instance of the client. multiple devices may share an rbd client. 157 */ 158struct rbd_client { 159 struct ceph_client *client; 160 struct kref kref; 161 struct list_head node; 162}; 163 164struct rbd_img_request; 165typedef void (*rbd_img_callback_t)(struct rbd_img_request *); 166 167#define BAD_WHICH U32_MAX /* Good which or bad which, which? */ 168 169struct rbd_obj_request; 170typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *); 171 172enum obj_request_type { 173 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES 174}; 175 176enum obj_req_flags { 177 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */ 178 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */ 179 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */ 180 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */ 181}; 182 183struct rbd_obj_request { 184 const char *object_name; 185 u64 offset; /* object start byte */ 186 u64 length; /* bytes from offset */ 187 unsigned long flags; 188 189 /* 190 * An object request associated with an image will have its 191 * img_data flag set; a standalone object request will not. 192 * 193 * A standalone object request will have which == BAD_WHICH 194 * and a null obj_request pointer. 195 * 196 * An object request initiated in support of a layered image 197 * object (to check for its existence before a write) will 198 * have which == BAD_WHICH and a non-null obj_request pointer. 199 * 200 * Finally, an object request for rbd image data will have 201 * which != BAD_WHICH, and will have a non-null img_request 202 * pointer. The value of which will be in the range 203 * 0..(img_request->obj_request_count-1). 204 */ 205 union { 206 struct rbd_obj_request *obj_request; /* STAT op */ 207 struct { 208 struct rbd_img_request *img_request; 209 u64 img_offset; 210 /* links for img_request->obj_requests list */ 211 struct list_head links; 212 }; 213 }; 214 u32 which; /* posn image request list */ 215 216 enum obj_request_type type; 217 union { 218 struct bio *bio_list; 219 struct { 220 struct page **pages; 221 u32 page_count; 222 }; 223 }; 224 struct page **copyup_pages; 225 226 struct ceph_osd_request *osd_req; 227 228 u64 xferred; /* bytes transferred */ 229 u64 version; 230 int result; 231 232 rbd_obj_callback_t callback; 233 struct completion completion; 234 235 struct kref kref; 236}; 237 238enum img_req_flags { 239 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */ 240 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */ 241 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */ 242}; 243 244struct rbd_img_request { 245 struct rbd_device *rbd_dev; 246 u64 offset; /* starting image byte offset */ 247 u64 length; /* byte count from offset */ 248 unsigned long flags; 249 union { 250 u64 snap_id; /* for reads */ 251 struct ceph_snap_context *snapc; /* for writes */ 252 }; 253 union { 254 struct request *rq; /* block request */ 255 struct rbd_obj_request *obj_request; /* obj req initiator */ 256 }; 257 struct page **copyup_pages; 258 spinlock_t completion_lock;/* protects next_completion */ 259 u32 next_completion; 260 rbd_img_callback_t callback; 261 u64 xferred;/* aggregate bytes transferred */ 262 int result; /* first nonzero obj_request result */ 263 264 u32 obj_request_count; 265 struct list_head obj_requests; /* rbd_obj_request structs */ 266 267 struct kref kref; 268}; 269 270#define for_each_obj_request(ireq, oreq) \ 271 list_for_each_entry(oreq, &(ireq)->obj_requests, links) 272#define for_each_obj_request_from(ireq, oreq) \ 273 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links) 274#define for_each_obj_request_safe(ireq, oreq, n) \ 275 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links) 276 277struct rbd_snap { 278 const char *name; 279 u64 size; 280 struct list_head node; 281 u64 id; 282 u64 features; 283}; 284 285struct rbd_mapping { 286 u64 size; 287 u64 features; 288 bool read_only; 289}; 290 291/* 292 * a single device 293 */ 294struct rbd_device { 295 int dev_id; /* blkdev unique id */ 296 297 int major; /* blkdev assigned major */ 298 struct gendisk *disk; /* blkdev's gendisk and rq */ 299 300 u32 image_format; /* Either 1 or 2 */ 301 struct rbd_client *rbd_client; 302 303 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */ 304 305 spinlock_t lock; /* queue, flags, open_count */ 306 307 struct rbd_image_header header; 308 unsigned long flags; /* possibly lock protected */ 309 struct rbd_spec *spec; 310 311 char *header_name; 312 313 struct ceph_file_layout layout; 314 315 struct ceph_osd_event *watch_event; 316 struct rbd_obj_request *watch_request; 317 318 struct rbd_spec *parent_spec; 319 u64 parent_overlap; 320 struct rbd_device *parent; 321 322 /* protects updating the header */ 323 struct rw_semaphore header_rwsem; 324 325 struct rbd_mapping mapping; 326 327 struct list_head node; 328 329 /* list of snapshots */ 330 struct list_head snaps; 331 332 /* sysfs related */ 333 struct device dev; 334 unsigned long open_count; /* protected by lock */ 335}; 336 337/* 338 * Flag bits for rbd_dev->flags. If atomicity is required, 339 * rbd_dev->lock is used to protect access. 340 * 341 * Currently, only the "removing" flag (which is coupled with the 342 * "open_count" field) requires atomic access. 343 */ 344enum rbd_dev_flags { 345 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */ 346 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */ 347}; 348 349static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */ 350 351static LIST_HEAD(rbd_dev_list); /* devices */ 352static DEFINE_SPINLOCK(rbd_dev_list_lock); 353 354static LIST_HEAD(rbd_client_list); /* clients */ 355static DEFINE_SPINLOCK(rbd_client_list_lock); 356 357static int rbd_img_request_submit(struct rbd_img_request *img_request); 358 359static int rbd_dev_snaps_update(struct rbd_device *rbd_dev); 360 361static void rbd_dev_release(struct device *dev); 362static void rbd_snap_destroy(struct rbd_snap *snap); 363 364static ssize_t rbd_add(struct bus_type *bus, const char *buf, 365 size_t count); 366static ssize_t rbd_remove(struct bus_type *bus, const char *buf, 367 size_t count); 368static int rbd_dev_probe(struct rbd_device *rbd_dev); 369 370static struct bus_attribute rbd_bus_attrs[] = { 371 __ATTR(add, S_IWUSR, NULL, rbd_add), 372 __ATTR(remove, S_IWUSR, NULL, rbd_remove), 373 __ATTR_NULL 374}; 375 376static struct bus_type rbd_bus_type = { 377 .name = "rbd", 378 .bus_attrs = rbd_bus_attrs, 379}; 380 381static void rbd_root_dev_release(struct device *dev) 382{ 383} 384 385static struct device rbd_root_dev = { 386 .init_name = "rbd", 387 .release = rbd_root_dev_release, 388}; 389 390static __printf(2, 3) 391void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...) 392{ 393 struct va_format vaf; 394 va_list args; 395 396 va_start(args, fmt); 397 vaf.fmt = fmt; 398 vaf.va = &args; 399 400 if (!rbd_dev) 401 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf); 402 else if (rbd_dev->disk) 403 printk(KERN_WARNING "%s: %s: %pV\n", 404 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf); 405 else if (rbd_dev->spec && rbd_dev->spec->image_name) 406 printk(KERN_WARNING "%s: image %s: %pV\n", 407 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf); 408 else if (rbd_dev->spec && rbd_dev->spec->image_id) 409 printk(KERN_WARNING "%s: id %s: %pV\n", 410 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf); 411 else /* punt */ 412 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n", 413 RBD_DRV_NAME, rbd_dev, &vaf); 414 va_end(args); 415} 416 417#ifdef RBD_DEBUG 418#define rbd_assert(expr) \ 419 if (unlikely(!(expr))) { \ 420 printk(KERN_ERR "\nAssertion failure in %s() " \ 421 "at line %d:\n\n" \ 422 "\trbd_assert(%s);\n\n", \ 423 __func__, __LINE__, #expr); \ 424 BUG(); \ 425 } 426#else /* !RBD_DEBUG */ 427# define rbd_assert(expr) ((void) 0) 428#endif /* !RBD_DEBUG */ 429 430static void rbd_img_parent_read(struct rbd_obj_request *obj_request); 431static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request); 432 433static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver); 434static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver); 435 436static int rbd_open(struct block_device *bdev, fmode_t mode) 437{ 438 struct rbd_device *rbd_dev = bdev->bd_disk->private_data; 439 bool removing = false; 440 441 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only) 442 return -EROFS; 443 444 spin_lock_irq(&rbd_dev->lock); 445 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags)) 446 removing = true; 447 else 448 rbd_dev->open_count++; 449 spin_unlock_irq(&rbd_dev->lock); 450 if (removing) 451 return -ENOENT; 452 453 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 454 (void) get_device(&rbd_dev->dev); 455 set_device_ro(bdev, rbd_dev->mapping.read_only); 456 mutex_unlock(&ctl_mutex); 457 458 return 0; 459} 460 461static int rbd_release(struct gendisk *disk, fmode_t mode) 462{ 463 struct rbd_device *rbd_dev = disk->private_data; 464 unsigned long open_count_before; 465 466 spin_lock_irq(&rbd_dev->lock); 467 open_count_before = rbd_dev->open_count--; 468 spin_unlock_irq(&rbd_dev->lock); 469 rbd_assert(open_count_before > 0); 470 471 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 472 put_device(&rbd_dev->dev); 473 mutex_unlock(&ctl_mutex); 474 475 return 0; 476} 477 478static const struct block_device_operations rbd_bd_ops = { 479 .owner = THIS_MODULE, 480 .open = rbd_open, 481 .release = rbd_release, 482}; 483 484/* 485 * Initialize an rbd client instance. 486 * We own *ceph_opts. 487 */ 488static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts) 489{ 490 struct rbd_client *rbdc; 491 int ret = -ENOMEM; 492 493 dout("%s:\n", __func__); 494 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL); 495 if (!rbdc) 496 goto out_opt; 497 498 kref_init(&rbdc->kref); 499 INIT_LIST_HEAD(&rbdc->node); 500 501 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 502 503 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0); 504 if (IS_ERR(rbdc->client)) 505 goto out_mutex; 506 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */ 507 508 ret = ceph_open_session(rbdc->client); 509 if (ret < 0) 510 goto out_err; 511 512 spin_lock(&rbd_client_list_lock); 513 list_add_tail(&rbdc->node, &rbd_client_list); 514 spin_unlock(&rbd_client_list_lock); 515 516 mutex_unlock(&ctl_mutex); 517 dout("%s: rbdc %p\n", __func__, rbdc); 518 519 return rbdc; 520 521out_err: 522 ceph_destroy_client(rbdc->client); 523out_mutex: 524 mutex_unlock(&ctl_mutex); 525 kfree(rbdc); 526out_opt: 527 if (ceph_opts) 528 ceph_destroy_options(ceph_opts); 529 dout("%s: error %d\n", __func__, ret); 530 531 return ERR_PTR(ret); 532} 533 534static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc) 535{ 536 kref_get(&rbdc->kref); 537 538 return rbdc; 539} 540 541/* 542 * Find a ceph client with specific addr and configuration. If 543 * found, bump its reference count. 544 */ 545static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts) 546{ 547 struct rbd_client *client_node; 548 bool found = false; 549 550 if (ceph_opts->flags & CEPH_OPT_NOSHARE) 551 return NULL; 552 553 spin_lock(&rbd_client_list_lock); 554 list_for_each_entry(client_node, &rbd_client_list, node) { 555 if (!ceph_compare_options(ceph_opts, client_node->client)) { 556 __rbd_get_client(client_node); 557 558 found = true; 559 break; 560 } 561 } 562 spin_unlock(&rbd_client_list_lock); 563 564 return found ? client_node : NULL; 565} 566 567/* 568 * mount options 569 */ 570enum { 571 Opt_last_int, 572 /* int args above */ 573 Opt_last_string, 574 /* string args above */ 575 Opt_read_only, 576 Opt_read_write, 577 /* Boolean args above */ 578 Opt_last_bool, 579}; 580 581static match_table_t rbd_opts_tokens = { 582 /* int args above */ 583 /* string args above */ 584 {Opt_read_only, "read_only"}, 585 {Opt_read_only, "ro"}, /* Alternate spelling */ 586 {Opt_read_write, "read_write"}, 587 {Opt_read_write, "rw"}, /* Alternate spelling */ 588 /* Boolean args above */ 589 {-1, NULL} 590}; 591 592struct rbd_options { 593 bool read_only; 594}; 595 596#define RBD_READ_ONLY_DEFAULT false 597 598static int parse_rbd_opts_token(char *c, void *private) 599{ 600 struct rbd_options *rbd_opts = private; 601 substring_t argstr[MAX_OPT_ARGS]; 602 int token, intval, ret; 603 604 token = match_token(c, rbd_opts_tokens, argstr); 605 if (token < 0) 606 return -EINVAL; 607 608 if (token < Opt_last_int) { 609 ret = match_int(&argstr[0], &intval); 610 if (ret < 0) { 611 pr_err("bad mount option arg (not int) " 612 "at '%s'\n", c); 613 return ret; 614 } 615 dout("got int token %d val %d\n", token, intval); 616 } else if (token > Opt_last_int && token < Opt_last_string) { 617 dout("got string token %d val %s\n", token, 618 argstr[0].from); 619 } else if (token > Opt_last_string && token < Opt_last_bool) { 620 dout("got Boolean token %d\n", token); 621 } else { 622 dout("got token %d\n", token); 623 } 624 625 switch (token) { 626 case Opt_read_only: 627 rbd_opts->read_only = true; 628 break; 629 case Opt_read_write: 630 rbd_opts->read_only = false; 631 break; 632 default: 633 rbd_assert(false); 634 break; 635 } 636 return 0; 637} 638 639/* 640 * Get a ceph client with specific addr and configuration, if one does 641 * not exist create it. 642 */ 643static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts) 644{ 645 struct rbd_client *rbdc; 646 647 rbdc = rbd_client_find(ceph_opts); 648 if (rbdc) /* using an existing client */ 649 ceph_destroy_options(ceph_opts); 650 else 651 rbdc = rbd_client_create(ceph_opts); 652 653 return rbdc; 654} 655 656/* 657 * Destroy ceph client 658 * 659 * Caller must hold rbd_client_list_lock. 660 */ 661static void rbd_client_release(struct kref *kref) 662{ 663 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref); 664 665 dout("%s: rbdc %p\n", __func__, rbdc); 666 spin_lock(&rbd_client_list_lock); 667 list_del(&rbdc->node); 668 spin_unlock(&rbd_client_list_lock); 669 670 ceph_destroy_client(rbdc->client); 671 kfree(rbdc); 672} 673 674/* 675 * Drop reference to ceph client node. If it's not referenced anymore, release 676 * it. 677 */ 678static void rbd_put_client(struct rbd_client *rbdc) 679{ 680 if (rbdc) 681 kref_put(&rbdc->kref, rbd_client_release); 682} 683 684static bool rbd_image_format_valid(u32 image_format) 685{ 686 return image_format == 1 || image_format == 2; 687} 688 689static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) 690{ 691 size_t size; 692 u32 snap_count; 693 694 /* The header has to start with the magic rbd header text */ 695 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT))) 696 return false; 697 698 /* The bio layer requires at least sector-sized I/O */ 699 700 if (ondisk->options.order < SECTOR_SHIFT) 701 return false; 702 703 /* If we use u64 in a few spots we may be able to loosen this */ 704 705 if (ondisk->options.order > 8 * sizeof (int) - 1) 706 return false; 707 708 /* 709 * The size of a snapshot header has to fit in a size_t, and 710 * that limits the number of snapshots. 711 */ 712 snap_count = le32_to_cpu(ondisk->snap_count); 713 size = SIZE_MAX - sizeof (struct ceph_snap_context); 714 if (snap_count > size / sizeof (__le64)) 715 return false; 716 717 /* 718 * Not only that, but the size of the entire the snapshot 719 * header must also be representable in a size_t. 720 */ 721 size -= snap_count * sizeof (__le64); 722 if ((u64) size < le64_to_cpu(ondisk->snap_names_len)) 723 return false; 724 725 return true; 726} 727 728/* 729 * Create a new header structure, translate header format from the on-disk 730 * header. 731 */ 732static int rbd_header_from_disk(struct rbd_image_header *header, 733 struct rbd_image_header_ondisk *ondisk) 734{ 735 u32 snap_count; 736 size_t len; 737 size_t size; 738 u32 i; 739 740 memset(header, 0, sizeof (*header)); 741 742 snap_count = le32_to_cpu(ondisk->snap_count); 743 744 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix)); 745 header->object_prefix = kmalloc(len + 1, GFP_KERNEL); 746 if (!header->object_prefix) 747 return -ENOMEM; 748 memcpy(header->object_prefix, ondisk->object_prefix, len); 749 header->object_prefix[len] = '\0'; 750 751 if (snap_count) { 752 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len); 753 754 /* Save a copy of the snapshot names */ 755 756 if (snap_names_len > (u64) SIZE_MAX) 757 return -EIO; 758 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL); 759 if (!header->snap_names) 760 goto out_err; 761 /* 762 * Note that rbd_dev_v1_header_read() guarantees 763 * the ondisk buffer we're working with has 764 * snap_names_len bytes beyond the end of the 765 * snapshot id array, this memcpy() is safe. 766 */ 767 memcpy(header->snap_names, &ondisk->snaps[snap_count], 768 snap_names_len); 769 770 /* Record each snapshot's size */ 771 772 size = snap_count * sizeof (*header->snap_sizes); 773 header->snap_sizes = kmalloc(size, GFP_KERNEL); 774 if (!header->snap_sizes) 775 goto out_err; 776 for (i = 0; i < snap_count; i++) 777 header->snap_sizes[i] = 778 le64_to_cpu(ondisk->snaps[i].image_size); 779 } else { 780 header->snap_names = NULL; 781 header->snap_sizes = NULL; 782 } 783 784 header->features = 0; /* No features support in v1 images */ 785 header->obj_order = ondisk->options.order; 786 header->crypt_type = ondisk->options.crypt_type; 787 header->comp_type = ondisk->options.comp_type; 788 789 /* Allocate and fill in the snapshot context */ 790 791 header->image_size = le64_to_cpu(ondisk->image_size); 792 size = sizeof (struct ceph_snap_context); 793 size += snap_count * sizeof (header->snapc->snaps[0]); 794 header->snapc = kzalloc(size, GFP_KERNEL); 795 if (!header->snapc) 796 goto out_err; 797 798 atomic_set(&header->snapc->nref, 1); 799 header->snapc->seq = le64_to_cpu(ondisk->snap_seq); 800 header->snapc->num_snaps = snap_count; 801 for (i = 0; i < snap_count; i++) 802 header->snapc->snaps[i] = 803 le64_to_cpu(ondisk->snaps[i].id); 804 805 return 0; 806 807out_err: 808 kfree(header->snap_sizes); 809 header->snap_sizes = NULL; 810 kfree(header->snap_names); 811 header->snap_names = NULL; 812 kfree(header->object_prefix); 813 header->object_prefix = NULL; 814 815 return -ENOMEM; 816} 817 818static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id) 819{ 820 struct rbd_snap *snap; 821 822 if (snap_id == CEPH_NOSNAP) 823 return RBD_SNAP_HEAD_NAME; 824 825 list_for_each_entry(snap, &rbd_dev->snaps, node) 826 if (snap_id == snap->id) 827 return snap->name; 828 829 return NULL; 830} 831 832static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev, 833 const char *snap_name) 834{ 835 struct rbd_snap *snap; 836 837 list_for_each_entry(snap, &rbd_dev->snaps, node) 838 if (!strcmp(snap_name, snap->name)) 839 return snap; 840 841 return NULL; 842} 843 844static int rbd_dev_set_mapping(struct rbd_device *rbd_dev) 845{ 846 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME, 847 sizeof (RBD_SNAP_HEAD_NAME))) { 848 rbd_dev->mapping.size = rbd_dev->header.image_size; 849 rbd_dev->mapping.features = rbd_dev->header.features; 850 } else { 851 struct rbd_snap *snap; 852 853 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name); 854 if (!snap) 855 return -ENOENT; 856 rbd_dev->mapping.size = snap->size; 857 rbd_dev->mapping.features = snap->features; 858 rbd_dev->mapping.read_only = true; 859 } 860 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 861 862 return 0; 863} 864 865static void rbd_header_free(struct rbd_image_header *header) 866{ 867 kfree(header->object_prefix); 868 header->object_prefix = NULL; 869 kfree(header->snap_sizes); 870 header->snap_sizes = NULL; 871 kfree(header->snap_names); 872 header->snap_names = NULL; 873 ceph_put_snap_context(header->snapc); 874 header->snapc = NULL; 875} 876 877static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) 878{ 879 char *name; 880 u64 segment; 881 int ret; 882 883 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO); 884 if (!name) 885 return NULL; 886 segment = offset >> rbd_dev->header.obj_order; 887 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx", 888 rbd_dev->header.object_prefix, segment); 889 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) { 890 pr_err("error formatting segment name for #%llu (%d)\n", 891 segment, ret); 892 kfree(name); 893 name = NULL; 894 } 895 896 return name; 897} 898 899static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) 900{ 901 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 902 903 return offset & (segment_size - 1); 904} 905 906static u64 rbd_segment_length(struct rbd_device *rbd_dev, 907 u64 offset, u64 length) 908{ 909 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; 910 911 offset &= segment_size - 1; 912 913 rbd_assert(length <= U64_MAX - offset); 914 if (offset + length > segment_size) 915 length = segment_size - offset; 916 917 return length; 918} 919 920/* 921 * returns the size of an object in the image 922 */ 923static u64 rbd_obj_bytes(struct rbd_image_header *header) 924{ 925 return 1 << header->obj_order; 926} 927 928/* 929 * bio helpers 930 */ 931 932static void bio_chain_put(struct bio *chain) 933{ 934 struct bio *tmp; 935 936 while (chain) { 937 tmp = chain; 938 chain = chain->bi_next; 939 bio_put(tmp); 940 } 941} 942 943/* 944 * zeros a bio chain, starting at specific offset 945 */ 946static void zero_bio_chain(struct bio *chain, int start_ofs) 947{ 948 struct bio_vec *bv; 949 unsigned long flags; 950 void *buf; 951 int i; 952 int pos = 0; 953 954 while (chain) { 955 bio_for_each_segment(bv, chain, i) { 956 if (pos + bv->bv_len > start_ofs) { 957 int remainder = max(start_ofs - pos, 0); 958 buf = bvec_kmap_irq(bv, &flags); 959 memset(buf + remainder, 0, 960 bv->bv_len - remainder); 961 bvec_kunmap_irq(buf, &flags); 962 } 963 pos += bv->bv_len; 964 } 965 966 chain = chain->bi_next; 967 } 968} 969 970/* 971 * similar to zero_bio_chain(), zeros data defined by a page array, 972 * starting at the given byte offset from the start of the array and 973 * continuing up to the given end offset. The pages array is 974 * assumed to be big enough to hold all bytes up to the end. 975 */ 976static void zero_pages(struct page **pages, u64 offset, u64 end) 977{ 978 struct page **page = &pages[offset >> PAGE_SHIFT]; 979 980 rbd_assert(end > offset); 981 rbd_assert(end - offset <= (u64)SIZE_MAX); 982 while (offset < end) { 983 size_t page_offset; 984 size_t length; 985 unsigned long flags; 986 void *kaddr; 987 988 page_offset = (size_t)(offset & ~PAGE_MASK); 989 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset)); 990 local_irq_save(flags); 991 kaddr = kmap_atomic(*page); 992 memset(kaddr + page_offset, 0, length); 993 kunmap_atomic(kaddr); 994 local_irq_restore(flags); 995 996 offset += length; 997 page++; 998 } 999} 1000 1001/* 1002 * Clone a portion of a bio, starting at the given byte offset 1003 * and continuing for the number of bytes indicated. 1004 */ 1005static struct bio *bio_clone_range(struct bio *bio_src, 1006 unsigned int offset, 1007 unsigned int len, 1008 gfp_t gfpmask) 1009{ 1010 struct bio_vec *bv; 1011 unsigned int resid; 1012 unsigned short idx; 1013 unsigned int voff; 1014 unsigned short end_idx; 1015 unsigned short vcnt; 1016 struct bio *bio; 1017 1018 /* Handle the easy case for the caller */ 1019 1020 if (!offset && len == bio_src->bi_size) 1021 return bio_clone(bio_src, gfpmask); 1022 1023 if (WARN_ON_ONCE(!len)) 1024 return NULL; 1025 if (WARN_ON_ONCE(len > bio_src->bi_size)) 1026 return NULL; 1027 if (WARN_ON_ONCE(offset > bio_src->bi_size - len)) 1028 return NULL; 1029 1030 /* Find first affected segment... */ 1031 1032 resid = offset; 1033 __bio_for_each_segment(bv, bio_src, idx, 0) { 1034 if (resid < bv->bv_len) 1035 break; 1036 resid -= bv->bv_len; 1037 } 1038 voff = resid; 1039 1040 /* ...and the last affected segment */ 1041 1042 resid += len; 1043 __bio_for_each_segment(bv, bio_src, end_idx, idx) { 1044 if (resid <= bv->bv_len) 1045 break; 1046 resid -= bv->bv_len; 1047 } 1048 vcnt = end_idx - idx + 1; 1049 1050 /* Build the clone */ 1051 1052 bio = bio_alloc(gfpmask, (unsigned int) vcnt); 1053 if (!bio) 1054 return NULL; /* ENOMEM */ 1055 1056 bio->bi_bdev = bio_src->bi_bdev; 1057 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT); 1058 bio->bi_rw = bio_src->bi_rw; 1059 bio->bi_flags |= 1 << BIO_CLONED; 1060 1061 /* 1062 * Copy over our part of the bio_vec, then update the first 1063 * and last (or only) entries. 1064 */ 1065 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx], 1066 vcnt * sizeof (struct bio_vec)); 1067 bio->bi_io_vec[0].bv_offset += voff; 1068 if (vcnt > 1) { 1069 bio->bi_io_vec[0].bv_len -= voff; 1070 bio->bi_io_vec[vcnt - 1].bv_len = resid; 1071 } else { 1072 bio->bi_io_vec[0].bv_len = len; 1073 } 1074 1075 bio->bi_vcnt = vcnt; 1076 bio->bi_size = len; 1077 bio->bi_idx = 0; 1078 1079 return bio; 1080} 1081 1082/* 1083 * Clone a portion of a bio chain, starting at the given byte offset 1084 * into the first bio in the source chain and continuing for the 1085 * number of bytes indicated. The result is another bio chain of 1086 * exactly the given length, or a null pointer on error. 1087 * 1088 * The bio_src and offset parameters are both in-out. On entry they 1089 * refer to the first source bio and the offset into that bio where 1090 * the start of data to be cloned is located. 1091 * 1092 * On return, bio_src is updated to refer to the bio in the source 1093 * chain that contains first un-cloned byte, and *offset will 1094 * contain the offset of that byte within that bio. 1095 */ 1096static struct bio *bio_chain_clone_range(struct bio **bio_src, 1097 unsigned int *offset, 1098 unsigned int len, 1099 gfp_t gfpmask) 1100{ 1101 struct bio *bi = *bio_src; 1102 unsigned int off = *offset; 1103 struct bio *chain = NULL; 1104 struct bio **end; 1105 1106 /* Build up a chain of clone bios up to the limit */ 1107 1108 if (!bi || off >= bi->bi_size || !len) 1109 return NULL; /* Nothing to clone */ 1110 1111 end = &chain; 1112 while (len) { 1113 unsigned int bi_size; 1114 struct bio *bio; 1115 1116 if (!bi) { 1117 rbd_warn(NULL, "bio_chain exhausted with %u left", len); 1118 goto out_err; /* EINVAL; ran out of bio's */ 1119 } 1120 bi_size = min_t(unsigned int, bi->bi_size - off, len); 1121 bio = bio_clone_range(bi, off, bi_size, gfpmask); 1122 if (!bio) 1123 goto out_err; /* ENOMEM */ 1124 1125 *end = bio; 1126 end = &bio->bi_next; 1127 1128 off += bi_size; 1129 if (off == bi->bi_size) { 1130 bi = bi->bi_next; 1131 off = 0; 1132 } 1133 len -= bi_size; 1134 } 1135 *bio_src = bi; 1136 *offset = off; 1137 1138 return chain; 1139out_err: 1140 bio_chain_put(chain); 1141 1142 return NULL; 1143} 1144 1145/* 1146 * The default/initial value for all object request flags is 0. For 1147 * each flag, once its value is set to 1 it is never reset to 0 1148 * again. 1149 */ 1150static void obj_request_img_data_set(struct rbd_obj_request *obj_request) 1151{ 1152 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) { 1153 struct rbd_device *rbd_dev; 1154 1155 rbd_dev = obj_request->img_request->rbd_dev; 1156 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n", 1157 obj_request); 1158 } 1159} 1160 1161static bool obj_request_img_data_test(struct rbd_obj_request *obj_request) 1162{ 1163 smp_mb(); 1164 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0; 1165} 1166 1167static void obj_request_done_set(struct rbd_obj_request *obj_request) 1168{ 1169 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) { 1170 struct rbd_device *rbd_dev = NULL; 1171 1172 if (obj_request_img_data_test(obj_request)) 1173 rbd_dev = obj_request->img_request->rbd_dev; 1174 rbd_warn(rbd_dev, "obj_request %p already marked done\n", 1175 obj_request); 1176 } 1177} 1178 1179static bool obj_request_done_test(struct rbd_obj_request *obj_request) 1180{ 1181 smp_mb(); 1182 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0; 1183} 1184 1185/* 1186 * This sets the KNOWN flag after (possibly) setting the EXISTS 1187 * flag. The latter is set based on the "exists" value provided. 1188 * 1189 * Note that for our purposes once an object exists it never goes 1190 * away again. It's possible that the response from two existence 1191 * checks are separated by the creation of the target object, and 1192 * the first ("doesn't exist") response arrives *after* the second 1193 * ("does exist"). In that case we ignore the second one. 1194 */ 1195static void obj_request_existence_set(struct rbd_obj_request *obj_request, 1196 bool exists) 1197{ 1198 if (exists) 1199 set_bit(OBJ_REQ_EXISTS, &obj_request->flags); 1200 set_bit(OBJ_REQ_KNOWN, &obj_request->flags); 1201 smp_mb(); 1202} 1203 1204static bool obj_request_known_test(struct rbd_obj_request *obj_request) 1205{ 1206 smp_mb(); 1207 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0; 1208} 1209 1210static bool obj_request_exists_test(struct rbd_obj_request *obj_request) 1211{ 1212 smp_mb(); 1213 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0; 1214} 1215 1216static void rbd_obj_request_get(struct rbd_obj_request *obj_request) 1217{ 1218 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1219 atomic_read(&obj_request->kref.refcount)); 1220 kref_get(&obj_request->kref); 1221} 1222 1223static void rbd_obj_request_destroy(struct kref *kref); 1224static void rbd_obj_request_put(struct rbd_obj_request *obj_request) 1225{ 1226 rbd_assert(obj_request != NULL); 1227 dout("%s: obj %p (was %d)\n", __func__, obj_request, 1228 atomic_read(&obj_request->kref.refcount)); 1229 kref_put(&obj_request->kref, rbd_obj_request_destroy); 1230} 1231 1232static void rbd_img_request_get(struct rbd_img_request *img_request) 1233{ 1234 dout("%s: img %p (was %d)\n", __func__, img_request, 1235 atomic_read(&img_request->kref.refcount)); 1236 kref_get(&img_request->kref); 1237} 1238 1239static void rbd_img_request_destroy(struct kref *kref); 1240static void rbd_img_request_put(struct rbd_img_request *img_request) 1241{ 1242 rbd_assert(img_request != NULL); 1243 dout("%s: img %p (was %d)\n", __func__, img_request, 1244 atomic_read(&img_request->kref.refcount)); 1245 kref_put(&img_request->kref, rbd_img_request_destroy); 1246} 1247 1248static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request, 1249 struct rbd_obj_request *obj_request) 1250{ 1251 rbd_assert(obj_request->img_request == NULL); 1252 1253 /* Image request now owns object's original reference */ 1254 obj_request->img_request = img_request; 1255 obj_request->which = img_request->obj_request_count; 1256 rbd_assert(!obj_request_img_data_test(obj_request)); 1257 obj_request_img_data_set(obj_request); 1258 rbd_assert(obj_request->which != BAD_WHICH); 1259 img_request->obj_request_count++; 1260 list_add_tail(&obj_request->links, &img_request->obj_requests); 1261 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1262 obj_request->which); 1263} 1264 1265static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request, 1266 struct rbd_obj_request *obj_request) 1267{ 1268 rbd_assert(obj_request->which != BAD_WHICH); 1269 1270 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request, 1271 obj_request->which); 1272 list_del(&obj_request->links); 1273 rbd_assert(img_request->obj_request_count > 0); 1274 img_request->obj_request_count--; 1275 rbd_assert(obj_request->which == img_request->obj_request_count); 1276 obj_request->which = BAD_WHICH; 1277 rbd_assert(obj_request_img_data_test(obj_request)); 1278 rbd_assert(obj_request->img_request == img_request); 1279 obj_request->img_request = NULL; 1280 obj_request->callback = NULL; 1281 rbd_obj_request_put(obj_request); 1282} 1283 1284static bool obj_request_type_valid(enum obj_request_type type) 1285{ 1286 switch (type) { 1287 case OBJ_REQUEST_NODATA: 1288 case OBJ_REQUEST_BIO: 1289 case OBJ_REQUEST_PAGES: 1290 return true; 1291 default: 1292 return false; 1293 } 1294} 1295 1296static int rbd_obj_request_submit(struct ceph_osd_client *osdc, 1297 struct rbd_obj_request *obj_request) 1298{ 1299 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request); 1300 1301 return ceph_osdc_start_request(osdc, obj_request->osd_req, false); 1302} 1303 1304static void rbd_img_request_complete(struct rbd_img_request *img_request) 1305{ 1306 1307 dout("%s: img %p\n", __func__, img_request); 1308 1309 /* 1310 * If no error occurred, compute the aggregate transfer 1311 * count for the image request. We could instead use 1312 * atomic64_cmpxchg() to update it as each object request 1313 * completes; not clear which way is better off hand. 1314 */ 1315 if (!img_request->result) { 1316 struct rbd_obj_request *obj_request; 1317 u64 xferred = 0; 1318 1319 for_each_obj_request(img_request, obj_request) 1320 xferred += obj_request->xferred; 1321 img_request->xferred = xferred; 1322 } 1323 1324 if (img_request->callback) 1325 img_request->callback(img_request); 1326 else 1327 rbd_img_request_put(img_request); 1328} 1329 1330/* Caller is responsible for rbd_obj_request_destroy(obj_request) */ 1331 1332static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) 1333{ 1334 dout("%s: obj %p\n", __func__, obj_request); 1335 1336 return wait_for_completion_interruptible(&obj_request->completion); 1337} 1338 1339/* 1340 * The default/initial value for all image request flags is 0. Each 1341 * is conditionally set to 1 at image request initialization time 1342 * and currently never change thereafter. 1343 */ 1344static void img_request_write_set(struct rbd_img_request *img_request) 1345{ 1346 set_bit(IMG_REQ_WRITE, &img_request->flags); 1347 smp_mb(); 1348} 1349 1350static bool img_request_write_test(struct rbd_img_request *img_request) 1351{ 1352 smp_mb(); 1353 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0; 1354} 1355 1356static void img_request_child_set(struct rbd_img_request *img_request) 1357{ 1358 set_bit(IMG_REQ_CHILD, &img_request->flags); 1359 smp_mb(); 1360} 1361 1362static bool img_request_child_test(struct rbd_img_request *img_request) 1363{ 1364 smp_mb(); 1365 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0; 1366} 1367 1368static void img_request_layered_set(struct rbd_img_request *img_request) 1369{ 1370 set_bit(IMG_REQ_LAYERED, &img_request->flags); 1371 smp_mb(); 1372} 1373 1374static bool img_request_layered_test(struct rbd_img_request *img_request) 1375{ 1376 smp_mb(); 1377 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0; 1378} 1379 1380static void 1381rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request) 1382{ 1383 u64 xferred = obj_request->xferred; 1384 u64 length = obj_request->length; 1385 1386 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, 1387 obj_request, obj_request->img_request, obj_request->result, 1388 xferred, length); 1389 /* 1390 * ENOENT means a hole in the image. We zero-fill the 1391 * entire length of the request. A short read also implies 1392 * zero-fill to the end of the request. Either way we 1393 * update the xferred count to indicate the whole request 1394 * was satisfied. 1395 */ 1396 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA); 1397 if (obj_request->result == -ENOENT) { 1398 if (obj_request->type == OBJ_REQUEST_BIO) 1399 zero_bio_chain(obj_request->bio_list, 0); 1400 else 1401 zero_pages(obj_request->pages, 0, length); 1402 obj_request->result = 0; 1403 obj_request->xferred = length; 1404 } else if (xferred < length && !obj_request->result) { 1405 if (obj_request->type == OBJ_REQUEST_BIO) 1406 zero_bio_chain(obj_request->bio_list, xferred); 1407 else 1408 zero_pages(obj_request->pages, xferred, length); 1409 obj_request->xferred = length; 1410 } 1411 obj_request_done_set(obj_request); 1412} 1413 1414static void rbd_obj_request_complete(struct rbd_obj_request *obj_request) 1415{ 1416 dout("%s: obj %p cb %p\n", __func__, obj_request, 1417 obj_request->callback); 1418 if (obj_request->callback) 1419 obj_request->callback(obj_request); 1420 else 1421 complete_all(&obj_request->completion); 1422} 1423 1424static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request) 1425{ 1426 dout("%s: obj %p\n", __func__, obj_request); 1427 obj_request_done_set(obj_request); 1428} 1429 1430static void rbd_osd_read_callback(struct rbd_obj_request *obj_request) 1431{ 1432 struct rbd_img_request *img_request = NULL; 1433 struct rbd_device *rbd_dev = NULL; 1434 bool layered = false; 1435 1436 if (obj_request_img_data_test(obj_request)) { 1437 img_request = obj_request->img_request; 1438 layered = img_request && img_request_layered_test(img_request); 1439 rbd_dev = img_request->rbd_dev; 1440 } 1441 1442 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__, 1443 obj_request, img_request, obj_request->result, 1444 obj_request->xferred, obj_request->length); 1445 if (layered && obj_request->result == -ENOENT && 1446 obj_request->img_offset < rbd_dev->parent_overlap) 1447 rbd_img_parent_read(obj_request); 1448 else if (img_request) 1449 rbd_img_obj_request_read_callback(obj_request); 1450 else 1451 obj_request_done_set(obj_request); 1452} 1453 1454static void rbd_osd_write_callback(struct rbd_obj_request *obj_request) 1455{ 1456 dout("%s: obj %p result %d %llu\n", __func__, obj_request, 1457 obj_request->result, obj_request->length); 1458 /* 1459 * There is no such thing as a successful short write. Set 1460 * it to our originally-requested length. 1461 */ 1462 obj_request->xferred = obj_request->length; 1463 obj_request_done_set(obj_request); 1464} 1465 1466/* 1467 * For a simple stat call there's nothing to do. We'll do more if 1468 * this is part of a write sequence for a layered image. 1469 */ 1470static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request) 1471{ 1472 dout("%s: obj %p\n", __func__, obj_request); 1473 obj_request_done_set(obj_request); 1474} 1475 1476static void rbd_osd_req_callback(struct ceph_osd_request *osd_req, 1477 struct ceph_msg *msg) 1478{ 1479 struct rbd_obj_request *obj_request = osd_req->r_priv; 1480 u16 opcode; 1481 1482 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg); 1483 rbd_assert(osd_req == obj_request->osd_req); 1484 if (obj_request_img_data_test(obj_request)) { 1485 rbd_assert(obj_request->img_request); 1486 rbd_assert(obj_request->which != BAD_WHICH); 1487 } else { 1488 rbd_assert(obj_request->which == BAD_WHICH); 1489 } 1490 1491 if (osd_req->r_result < 0) 1492 obj_request->result = osd_req->r_result; 1493 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version); 1494 1495 BUG_ON(osd_req->r_num_ops > 2); 1496 1497 /* 1498 * We support a 64-bit length, but ultimately it has to be 1499 * passed to blk_end_request(), which takes an unsigned int. 1500 */ 1501 obj_request->xferred = osd_req->r_reply_op_len[0]; 1502 rbd_assert(obj_request->xferred < (u64)UINT_MAX); 1503 opcode = osd_req->r_ops[0].op; 1504 switch (opcode) { 1505 case CEPH_OSD_OP_READ: 1506 rbd_osd_read_callback(obj_request); 1507 break; 1508 case CEPH_OSD_OP_WRITE: 1509 rbd_osd_write_callback(obj_request); 1510 break; 1511 case CEPH_OSD_OP_STAT: 1512 rbd_osd_stat_callback(obj_request); 1513 break; 1514 case CEPH_OSD_OP_CALL: 1515 case CEPH_OSD_OP_NOTIFY_ACK: 1516 case CEPH_OSD_OP_WATCH: 1517 rbd_osd_trivial_callback(obj_request); 1518 break; 1519 default: 1520 rbd_warn(NULL, "%s: unsupported op %hu\n", 1521 obj_request->object_name, (unsigned short) opcode); 1522 break; 1523 } 1524 1525 if (obj_request_done_test(obj_request)) 1526 rbd_obj_request_complete(obj_request); 1527} 1528 1529static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request) 1530{ 1531 struct rbd_img_request *img_request = obj_request->img_request; 1532 struct ceph_osd_request *osd_req = obj_request->osd_req; 1533 u64 snap_id; 1534 1535 rbd_assert(osd_req != NULL); 1536 1537 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP; 1538 ceph_osdc_build_request(osd_req, obj_request->offset, 1539 NULL, snap_id, NULL); 1540} 1541 1542static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) 1543{ 1544 struct rbd_img_request *img_request = obj_request->img_request; 1545 struct ceph_osd_request *osd_req = obj_request->osd_req; 1546 struct ceph_snap_context *snapc; 1547 struct timespec mtime = CURRENT_TIME; 1548 1549 rbd_assert(osd_req != NULL); 1550 1551 snapc = img_request ? img_request->snapc : NULL; 1552 ceph_osdc_build_request(osd_req, obj_request->offset, 1553 snapc, CEPH_NOSNAP, &mtime); 1554} 1555 1556static struct ceph_osd_request *rbd_osd_req_create( 1557 struct rbd_device *rbd_dev, 1558 bool write_request, 1559 struct rbd_obj_request *obj_request) 1560{ 1561 struct ceph_snap_context *snapc = NULL; 1562 struct ceph_osd_client *osdc; 1563 struct ceph_osd_request *osd_req; 1564 1565 if (obj_request_img_data_test(obj_request)) { 1566 struct rbd_img_request *img_request = obj_request->img_request; 1567 1568 rbd_assert(write_request == 1569 img_request_write_test(img_request)); 1570 if (write_request) 1571 snapc = img_request->snapc; 1572 } 1573 1574 /* Allocate and initialize the request, for the single op */ 1575 1576 osdc = &rbd_dev->rbd_client->client->osdc; 1577 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC); 1578 if (!osd_req) 1579 return NULL; /* ENOMEM */ 1580 1581 if (write_request) 1582 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; 1583 else 1584 osd_req->r_flags = CEPH_OSD_FLAG_READ; 1585 1586 osd_req->r_callback = rbd_osd_req_callback; 1587 osd_req->r_priv = obj_request; 1588 1589 osd_req->r_oid_len = strlen(obj_request->object_name); 1590 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); 1591 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); 1592 1593 osd_req->r_file_layout = rbd_dev->layout; /* struct */ 1594 1595 return osd_req; 1596} 1597 1598/* 1599 * Create a copyup osd request based on the information in the 1600 * object request supplied. A copyup request has two osd ops, 1601 * a copyup method call, and a "normal" write request. 1602 */ 1603static struct ceph_osd_request * 1604rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) 1605{ 1606 struct rbd_img_request *img_request; 1607 struct ceph_snap_context *snapc; 1608 struct rbd_device *rbd_dev; 1609 struct ceph_osd_client *osdc; 1610 struct ceph_osd_request *osd_req; 1611 1612 rbd_assert(obj_request_img_data_test(obj_request)); 1613 img_request = obj_request->img_request; 1614 rbd_assert(img_request); 1615 rbd_assert(img_request_write_test(img_request)); 1616 1617 /* Allocate and initialize the request, for the two ops */ 1618 1619 snapc = img_request->snapc; 1620 rbd_dev = img_request->rbd_dev; 1621 osdc = &rbd_dev->rbd_client->client->osdc; 1622 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC); 1623 if (!osd_req) 1624 return NULL; /* ENOMEM */ 1625 1626 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; 1627 osd_req->r_callback = rbd_osd_req_callback; 1628 osd_req->r_priv = obj_request; 1629 1630 osd_req->r_oid_len = strlen(obj_request->object_name); 1631 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid)); 1632 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len); 1633 1634 osd_req->r_file_layout = rbd_dev->layout; /* struct */ 1635 1636 return osd_req; 1637} 1638 1639 1640static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) 1641{ 1642 ceph_osdc_put_request(osd_req); 1643} 1644 1645/* object_name is assumed to be a non-null pointer and NUL-terminated */ 1646 1647static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, 1648 u64 offset, u64 length, 1649 enum obj_request_type type) 1650{ 1651 struct rbd_obj_request *obj_request; 1652 size_t size; 1653 char *name; 1654 1655 rbd_assert(obj_request_type_valid(type)); 1656 1657 size = strlen(object_name) + 1; 1658 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL); 1659 if (!obj_request) 1660 return NULL; 1661 1662 name = (char *)(obj_request + 1); 1663 obj_request->object_name = memcpy(name, object_name, size); 1664 obj_request->offset = offset; 1665 obj_request->length = length; 1666 obj_request->flags = 0; 1667 obj_request->which = BAD_WHICH; 1668 obj_request->type = type; 1669 INIT_LIST_HEAD(&obj_request->links); 1670 init_completion(&obj_request->completion); 1671 kref_init(&obj_request->kref); 1672 1673 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name, 1674 offset, length, (int)type, obj_request); 1675 1676 return obj_request; 1677} 1678 1679static void rbd_obj_request_destroy(struct kref *kref) 1680{ 1681 struct rbd_obj_request *obj_request; 1682 1683 obj_request = container_of(kref, struct rbd_obj_request, kref); 1684 1685 dout("%s: obj %p\n", __func__, obj_request); 1686 1687 rbd_assert(obj_request->img_request == NULL); 1688 rbd_assert(obj_request->which == BAD_WHICH); 1689 1690 if (obj_request->osd_req) 1691 rbd_osd_req_destroy(obj_request->osd_req); 1692 1693 rbd_assert(obj_request_type_valid(obj_request->type)); 1694 switch (obj_request->type) { 1695 case OBJ_REQUEST_NODATA: 1696 break; /* Nothing to do */ 1697 case OBJ_REQUEST_BIO: 1698 if (obj_request->bio_list) 1699 bio_chain_put(obj_request->bio_list); 1700 break; 1701 case OBJ_REQUEST_PAGES: 1702 if (obj_request->pages) 1703 ceph_release_page_vector(obj_request->pages, 1704 obj_request->page_count); 1705 break; 1706 } 1707 1708 kfree(obj_request); 1709} 1710 1711/* 1712 * Caller is responsible for filling in the list of object requests 1713 * that comprises the image request, and the Linux request pointer 1714 * (if there is one). 1715 */ 1716static struct rbd_img_request *rbd_img_request_create( 1717 struct rbd_device *rbd_dev, 1718 u64 offset, u64 length, 1719 bool write_request, 1720 bool child_request) 1721{ 1722 struct rbd_img_request *img_request; 1723 struct ceph_snap_context *snapc = NULL; 1724 1725 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC); 1726 if (!img_request) 1727 return NULL; 1728 1729 if (write_request) { 1730 down_read(&rbd_dev->header_rwsem); 1731 snapc = ceph_get_snap_context(rbd_dev->header.snapc); 1732 up_read(&rbd_dev->header_rwsem); 1733 if (WARN_ON(!snapc)) { 1734 kfree(img_request); 1735 return NULL; /* Shouldn't happen */ 1736 } 1737 1738 } 1739 1740 img_request->rq = NULL; 1741 img_request->rbd_dev = rbd_dev; 1742 img_request->offset = offset; 1743 img_request->length = length; 1744 img_request->flags = 0; 1745 if (write_request) { 1746 img_request_write_set(img_request); 1747 img_request->snapc = snapc; 1748 } else { 1749 img_request->snap_id = rbd_dev->spec->snap_id; 1750 } 1751 if (child_request) 1752 img_request_child_set(img_request); 1753 if (rbd_dev->parent_spec) 1754 img_request_layered_set(img_request); 1755 spin_lock_init(&img_request->completion_lock); 1756 img_request->next_completion = 0; 1757 img_request->callback = NULL; 1758 img_request->result = 0; 1759 img_request->obj_request_count = 0; 1760 INIT_LIST_HEAD(&img_request->obj_requests); 1761 kref_init(&img_request->kref); 1762 1763 rbd_img_request_get(img_request); /* Avoid a warning */ 1764 rbd_img_request_put(img_request); /* TEMPORARY */ 1765 1766 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev, 1767 write_request ? "write" : "read", offset, length, 1768 img_request); 1769 1770 return img_request; 1771} 1772 1773static void rbd_img_request_destroy(struct kref *kref) 1774{ 1775 struct rbd_img_request *img_request; 1776 struct rbd_obj_request *obj_request; 1777 struct rbd_obj_request *next_obj_request; 1778 1779 img_request = container_of(kref, struct rbd_img_request, kref); 1780 1781 dout("%s: img %p\n", __func__, img_request); 1782 1783 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1784 rbd_img_obj_request_del(img_request, obj_request); 1785 rbd_assert(img_request->obj_request_count == 0); 1786 1787 if (img_request_write_test(img_request)) 1788 ceph_put_snap_context(img_request->snapc); 1789 1790 if (img_request_child_test(img_request)) 1791 rbd_obj_request_put(img_request->obj_request); 1792 1793 kfree(img_request); 1794} 1795 1796static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request) 1797{ 1798 struct rbd_img_request *img_request; 1799 unsigned int xferred; 1800 int result; 1801 bool more; 1802 1803 rbd_assert(obj_request_img_data_test(obj_request)); 1804 img_request = obj_request->img_request; 1805 1806 rbd_assert(obj_request->xferred <= (u64)UINT_MAX); 1807 xferred = (unsigned int)obj_request->xferred; 1808 result = obj_request->result; 1809 if (result) { 1810 struct rbd_device *rbd_dev = img_request->rbd_dev; 1811 1812 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n", 1813 img_request_write_test(img_request) ? "write" : "read", 1814 obj_request->length, obj_request->img_offset, 1815 obj_request->offset); 1816 rbd_warn(rbd_dev, " result %d xferred %x\n", 1817 result, xferred); 1818 if (!img_request->result) 1819 img_request->result = result; 1820 } 1821 1822 /* Image object requests don't own their page array */ 1823 1824 if (obj_request->type == OBJ_REQUEST_PAGES) { 1825 obj_request->pages = NULL; 1826 obj_request->page_count = 0; 1827 } 1828 1829 if (img_request_child_test(img_request)) { 1830 rbd_assert(img_request->obj_request != NULL); 1831 more = obj_request->which < img_request->obj_request_count - 1; 1832 } else { 1833 rbd_assert(img_request->rq != NULL); 1834 more = blk_end_request(img_request->rq, result, xferred); 1835 } 1836 1837 return more; 1838} 1839 1840static void rbd_img_obj_callback(struct rbd_obj_request *obj_request) 1841{ 1842 struct rbd_img_request *img_request; 1843 u32 which = obj_request->which; 1844 bool more = true; 1845 1846 rbd_assert(obj_request_img_data_test(obj_request)); 1847 img_request = obj_request->img_request; 1848 1849 dout("%s: img %p obj %p\n", __func__, img_request, obj_request); 1850 rbd_assert(img_request != NULL); 1851 rbd_assert(img_request->obj_request_count > 0); 1852 rbd_assert(which != BAD_WHICH); 1853 rbd_assert(which < img_request->obj_request_count); 1854 rbd_assert(which >= img_request->next_completion); 1855 1856 spin_lock_irq(&img_request->completion_lock); 1857 if (which != img_request->next_completion) 1858 goto out; 1859 1860 for_each_obj_request_from(img_request, obj_request) { 1861 rbd_assert(more); 1862 rbd_assert(which < img_request->obj_request_count); 1863 1864 if (!obj_request_done_test(obj_request)) 1865 break; 1866 more = rbd_img_obj_end_request(obj_request); 1867 which++; 1868 } 1869 1870 rbd_assert(more ^ (which == img_request->obj_request_count)); 1871 img_request->next_completion = which; 1872out: 1873 spin_unlock_irq(&img_request->completion_lock); 1874 1875 if (!more) 1876 rbd_img_request_complete(img_request); 1877} 1878 1879/* 1880 * Split up an image request into one or more object requests, each 1881 * to a different object. The "type" parameter indicates whether 1882 * "data_desc" is the pointer to the head of a list of bio 1883 * structures, or the base of a page array. In either case this 1884 * function assumes data_desc describes memory sufficient to hold 1885 * all data described by the image request. 1886 */ 1887static int rbd_img_request_fill(struct rbd_img_request *img_request, 1888 enum obj_request_type type, 1889 void *data_desc) 1890{ 1891 struct rbd_device *rbd_dev = img_request->rbd_dev; 1892 struct rbd_obj_request *obj_request = NULL; 1893 struct rbd_obj_request *next_obj_request; 1894 bool write_request = img_request_write_test(img_request); 1895 struct bio *bio_list; 1896 unsigned int bio_offset = 0; 1897 struct page **pages; 1898 u64 img_offset; 1899 u64 resid; 1900 u16 opcode; 1901 1902 dout("%s: img %p type %d data_desc %p\n", __func__, img_request, 1903 (int)type, data_desc); 1904 1905 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ; 1906 img_offset = img_request->offset; 1907 resid = img_request->length; 1908 rbd_assert(resid > 0); 1909 1910 if (type == OBJ_REQUEST_BIO) { 1911 bio_list = data_desc; 1912 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT); 1913 } else { 1914 rbd_assert(type == OBJ_REQUEST_PAGES); 1915 pages = data_desc; 1916 } 1917 1918 while (resid) { 1919 struct ceph_osd_request *osd_req; 1920 const char *object_name; 1921 u64 offset; 1922 u64 length; 1923 1924 object_name = rbd_segment_name(rbd_dev, img_offset); 1925 if (!object_name) 1926 goto out_unwind; 1927 offset = rbd_segment_offset(rbd_dev, img_offset); 1928 length = rbd_segment_length(rbd_dev, img_offset, resid); 1929 obj_request = rbd_obj_request_create(object_name, 1930 offset, length, type); 1931 kfree(object_name); /* object request has its own copy */ 1932 if (!obj_request) 1933 goto out_unwind; 1934 1935 if (type == OBJ_REQUEST_BIO) { 1936 unsigned int clone_size; 1937 1938 rbd_assert(length <= (u64)UINT_MAX); 1939 clone_size = (unsigned int)length; 1940 obj_request->bio_list = 1941 bio_chain_clone_range(&bio_list, 1942 &bio_offset, 1943 clone_size, 1944 GFP_ATOMIC); 1945 if (!obj_request->bio_list) 1946 goto out_partial; 1947 } else { 1948 unsigned int page_count; 1949 1950 obj_request->pages = pages; 1951 page_count = (u32)calc_pages_for(offset, length); 1952 obj_request->page_count = page_count; 1953 if ((offset + length) & ~PAGE_MASK) 1954 page_count--; /* more on last page */ 1955 pages += page_count; 1956 } 1957 1958 osd_req = rbd_osd_req_create(rbd_dev, write_request, 1959 obj_request); 1960 if (!osd_req) 1961 goto out_partial; 1962 obj_request->osd_req = osd_req; 1963 obj_request->callback = rbd_img_obj_callback; 1964 1965 osd_req_op_extent_init(osd_req, 0, opcode, offset, length, 1966 0, 0); 1967 if (type == OBJ_REQUEST_BIO) 1968 osd_req_op_extent_osd_data_bio(osd_req, 0, 1969 obj_request->bio_list, length); 1970 else 1971 osd_req_op_extent_osd_data_pages(osd_req, 0, 1972 obj_request->pages, length, 1973 offset & ~PAGE_MASK, false, false); 1974 1975 if (write_request) 1976 rbd_osd_req_format_write(obj_request); 1977 else 1978 rbd_osd_req_format_read(obj_request); 1979 1980 obj_request->img_offset = img_offset; 1981 rbd_img_obj_request_add(img_request, obj_request); 1982 1983 img_offset += length; 1984 resid -= length; 1985 } 1986 1987 return 0; 1988 1989out_partial: 1990 rbd_obj_request_put(obj_request); 1991out_unwind: 1992 for_each_obj_request_safe(img_request, obj_request, next_obj_request) 1993 rbd_obj_request_put(obj_request); 1994 1995 return -ENOMEM; 1996} 1997 1998static void 1999rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request) 2000{ 2001 struct rbd_img_request *img_request; 2002 struct rbd_device *rbd_dev; 2003 u64 length; 2004 u32 page_count; 2005 2006 rbd_assert(obj_request->type == OBJ_REQUEST_BIO); 2007 rbd_assert(obj_request_img_data_test(obj_request)); 2008 img_request = obj_request->img_request; 2009 rbd_assert(img_request); 2010 2011 rbd_dev = img_request->rbd_dev; 2012 rbd_assert(rbd_dev); 2013 length = (u64)1 << rbd_dev->header.obj_order; 2014 page_count = (u32)calc_pages_for(0, length); 2015 2016 rbd_assert(obj_request->copyup_pages); 2017 ceph_release_page_vector(obj_request->copyup_pages, page_count); 2018 obj_request->copyup_pages = NULL; 2019 2020 /* 2021 * We want the transfer count to reflect the size of the 2022 * original write request. There is no such thing as a 2023 * successful short write, so if the request was successful 2024 * we can just set it to the originally-requested length. 2025 */ 2026 if (!obj_request->result) 2027 obj_request->xferred = obj_request->length; 2028 2029 /* Finish up with the normal image object callback */ 2030 2031 rbd_img_obj_callback(obj_request); 2032} 2033 2034static void 2035rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request) 2036{ 2037 struct rbd_obj_request *orig_request; 2038 struct ceph_osd_request *osd_req; 2039 struct ceph_osd_client *osdc; 2040 struct rbd_device *rbd_dev; 2041 struct page **pages; 2042 int result; 2043 u64 obj_size; 2044 u64 xferred; 2045 2046 rbd_assert(img_request_child_test(img_request)); 2047 2048 /* First get what we need from the image request */ 2049 2050 pages = img_request->copyup_pages; 2051 rbd_assert(pages != NULL); 2052 img_request->copyup_pages = NULL; 2053 2054 orig_request = img_request->obj_request; 2055 rbd_assert(orig_request != NULL); 2056 rbd_assert(orig_request->type == OBJ_REQUEST_BIO); 2057 result = img_request->result; 2058 obj_size = img_request->length; 2059 xferred = img_request->xferred; 2060 2061 rbd_dev = img_request->rbd_dev; 2062 rbd_assert(rbd_dev); 2063 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order); 2064 2065 rbd_img_request_put(img_request); 2066 2067 if (result) 2068 goto out_err; 2069 2070 /* Allocate the new copyup osd request for the original request */ 2071 2072 result = -ENOMEM; 2073 rbd_assert(!orig_request->osd_req); 2074 osd_req = rbd_osd_req_create_copyup(orig_request); 2075 if (!osd_req) 2076 goto out_err; 2077 orig_request->osd_req = osd_req; 2078 orig_request->copyup_pages = pages; 2079 2080 /* Initialize the copyup op */ 2081 2082 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup"); 2083 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0, 2084 false, false); 2085 2086 /* Then the original write request op */ 2087 2088 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE, 2089 orig_request->offset, 2090 orig_request->length, 0, 0); 2091 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list, 2092 orig_request->length); 2093 2094 rbd_osd_req_format_write(orig_request); 2095 2096 /* All set, send it off. */ 2097 2098 orig_request->callback = rbd_img_obj_copyup_callback; 2099 osdc = &rbd_dev->rbd_client->client->osdc; 2100 result = rbd_obj_request_submit(osdc, orig_request); 2101 if (!result) 2102 return; 2103out_err: 2104 /* Record the error code and complete the request */ 2105 2106 orig_request->result = result; 2107 orig_request->xferred = 0; 2108 obj_request_done_set(orig_request); 2109 rbd_obj_request_complete(orig_request); 2110} 2111 2112/* 2113 * Read from the parent image the range of data that covers the 2114 * entire target of the given object request. This is used for 2115 * satisfying a layered image write request when the target of an 2116 * object request from the image request does not exist. 2117 * 2118 * A page array big enough to hold the returned data is allocated 2119 * and supplied to rbd_img_request_fill() as the "data descriptor." 2120 * When the read completes, this page array will be transferred to 2121 * the original object request for the copyup operation. 2122 * 2123 * If an error occurs, record it as the result of the original 2124 * object request and mark it done so it gets completed. 2125 */ 2126static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) 2127{ 2128 struct rbd_img_request *img_request = NULL; 2129 struct rbd_img_request *parent_request = NULL; 2130 struct rbd_device *rbd_dev; 2131 u64 img_offset; 2132 u64 length; 2133 struct page **pages = NULL; 2134 u32 page_count; 2135 int result; 2136 2137 rbd_assert(obj_request_img_data_test(obj_request)); 2138 rbd_assert(obj_request->type == OBJ_REQUEST_BIO); 2139 2140 img_request = obj_request->img_request; 2141 rbd_assert(img_request != NULL); 2142 rbd_dev = img_request->rbd_dev; 2143 rbd_assert(rbd_dev->parent != NULL); 2144 2145 /* 2146 * First things first. The original osd request is of no 2147 * use to use any more, we'll need a new one that can hold 2148 * the two ops in a copyup request. We'll get that later, 2149 * but for now we can release the old one. 2150 */ 2151 rbd_osd_req_destroy(obj_request->osd_req); 2152 obj_request->osd_req = NULL; 2153 2154 /* 2155 * Determine the byte range covered by the object in the 2156 * child image to which the original request was to be sent. 2157 */ 2158 img_offset = obj_request->img_offset - obj_request->offset; 2159 length = (u64)1 << rbd_dev->header.obj_order; 2160 2161 /* 2162 * There is no defined parent data beyond the parent 2163 * overlap, so limit what we read at that boundary if 2164 * necessary. 2165 */ 2166 if (img_offset + length > rbd_dev->parent_overlap) { 2167 rbd_assert(img_offset < rbd_dev->parent_overlap); 2168 length = rbd_dev->parent_overlap - img_offset; 2169 } 2170 2171 /* 2172 * Allocate a page array big enough to receive the data read 2173 * from the parent. 2174 */ 2175 page_count = (u32)calc_pages_for(0, length); 2176 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2177 if (IS_ERR(pages)) { 2178 result = PTR_ERR(pages); 2179 pages = NULL; 2180 goto out_err; 2181 } 2182 2183 result = -ENOMEM; 2184 parent_request = rbd_img_request_create(rbd_dev->parent, 2185 img_offset, length, 2186 false, true); 2187 if (!parent_request) 2188 goto out_err; 2189 rbd_obj_request_get(obj_request); 2190 parent_request->obj_request = obj_request; 2191 2192 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages); 2193 if (result) 2194 goto out_err; 2195 parent_request->copyup_pages = pages; 2196 2197 parent_request->callback = rbd_img_obj_parent_read_full_callback; 2198 result = rbd_img_request_submit(parent_request); 2199 if (!result) 2200 return 0; 2201 2202 parent_request->copyup_pages = NULL; 2203 parent_request->obj_request = NULL; 2204 rbd_obj_request_put(obj_request); 2205out_err: 2206 if (pages) 2207 ceph_release_page_vector(pages, page_count); 2208 if (parent_request) 2209 rbd_img_request_put(parent_request); 2210 obj_request->result = result; 2211 obj_request->xferred = 0; 2212 obj_request_done_set(obj_request); 2213 2214 return result; 2215} 2216 2217static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request) 2218{ 2219 struct rbd_obj_request *orig_request; 2220 int result; 2221 2222 rbd_assert(!obj_request_img_data_test(obj_request)); 2223 2224 /* 2225 * All we need from the object request is the original 2226 * request and the result of the STAT op. Grab those, then 2227 * we're done with the request. 2228 */ 2229 orig_request = obj_request->obj_request; 2230 obj_request->obj_request = NULL; 2231 rbd_assert(orig_request); 2232 rbd_assert(orig_request->img_request); 2233 2234 result = obj_request->result; 2235 obj_request->result = 0; 2236 2237 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__, 2238 obj_request, orig_request, result, 2239 obj_request->xferred, obj_request->length); 2240 rbd_obj_request_put(obj_request); 2241 2242 rbd_assert(orig_request); 2243 rbd_assert(orig_request->img_request); 2244 2245 /* 2246 * Our only purpose here is to determine whether the object 2247 * exists, and we don't want to treat the non-existence as 2248 * an error. If something else comes back, transfer the 2249 * error to the original request and complete it now. 2250 */ 2251 if (!result) { 2252 obj_request_existence_set(orig_request, true); 2253 } else if (result == -ENOENT) { 2254 obj_request_existence_set(orig_request, false); 2255 } else if (result) { 2256 orig_request->result = result; 2257 goto out; 2258 } 2259 2260 /* 2261 * Resubmit the original request now that we have recorded 2262 * whether the target object exists. 2263 */ 2264 orig_request->result = rbd_img_obj_request_submit(orig_request); 2265out: 2266 if (orig_request->result) 2267 rbd_obj_request_complete(orig_request); 2268 rbd_obj_request_put(orig_request); 2269} 2270 2271static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) 2272{ 2273 struct rbd_obj_request *stat_request; 2274 struct rbd_device *rbd_dev; 2275 struct ceph_osd_client *osdc; 2276 struct page **pages = NULL; 2277 u32 page_count; 2278 size_t size; 2279 int ret; 2280 2281 /* 2282 * The response data for a STAT call consists of: 2283 * le64 length; 2284 * struct { 2285 * le32 tv_sec; 2286 * le32 tv_nsec; 2287 * } mtime; 2288 */ 2289 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32); 2290 page_count = (u32)calc_pages_for(0, size); 2291 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2292 if (IS_ERR(pages)) 2293 return PTR_ERR(pages); 2294 2295 ret = -ENOMEM; 2296 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, 2297 OBJ_REQUEST_PAGES); 2298 if (!stat_request) 2299 goto out; 2300 2301 rbd_obj_request_get(obj_request); 2302 stat_request->obj_request = obj_request; 2303 stat_request->pages = pages; 2304 stat_request->page_count = page_count; 2305 2306 rbd_assert(obj_request->img_request); 2307 rbd_dev = obj_request->img_request->rbd_dev; 2308 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false, 2309 stat_request); 2310 if (!stat_request->osd_req) 2311 goto out; 2312 stat_request->callback = rbd_img_obj_exists_callback; 2313 2314 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT); 2315 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0, 2316 false, false); 2317 rbd_osd_req_format_read(stat_request); 2318 2319 osdc = &rbd_dev->rbd_client->client->osdc; 2320 ret = rbd_obj_request_submit(osdc, stat_request); 2321out: 2322 if (ret) 2323 rbd_obj_request_put(obj_request); 2324 2325 return ret; 2326} 2327 2328static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request) 2329{ 2330 struct rbd_img_request *img_request; 2331 struct rbd_device *rbd_dev; 2332 bool known; 2333 2334 rbd_assert(obj_request_img_data_test(obj_request)); 2335 2336 img_request = obj_request->img_request; 2337 rbd_assert(img_request); 2338 rbd_dev = img_request->rbd_dev; 2339 2340 /* 2341 * Only writes to layered images need special handling. 2342 * Reads and non-layered writes are simple object requests. 2343 * Layered writes that start beyond the end of the overlap 2344 * with the parent have no parent data, so they too are 2345 * simple object requests. Finally, if the target object is 2346 * known to already exist, its parent data has already been 2347 * copied, so a write to the object can also be handled as a 2348 * simple object request. 2349 */ 2350 if (!img_request_write_test(img_request) || 2351 !img_request_layered_test(img_request) || 2352 rbd_dev->parent_overlap <= obj_request->img_offset || 2353 ((known = obj_request_known_test(obj_request)) && 2354 obj_request_exists_test(obj_request))) { 2355 2356 struct rbd_device *rbd_dev; 2357 struct ceph_osd_client *osdc; 2358 2359 rbd_dev = obj_request->img_request->rbd_dev; 2360 osdc = &rbd_dev->rbd_client->client->osdc; 2361 2362 return rbd_obj_request_submit(osdc, obj_request); 2363 } 2364 2365 /* 2366 * It's a layered write. The target object might exist but 2367 * we may not know that yet. If we know it doesn't exist, 2368 * start by reading the data for the full target object from 2369 * the parent so we can use it for a copyup to the target. 2370 */ 2371 if (known) 2372 return rbd_img_obj_parent_read_full(obj_request); 2373 2374 /* We don't know whether the target exists. Go find out. */ 2375 2376 return rbd_img_obj_exists_submit(obj_request); 2377} 2378 2379static int rbd_img_request_submit(struct rbd_img_request *img_request) 2380{ 2381 struct rbd_obj_request *obj_request; 2382 struct rbd_obj_request *next_obj_request; 2383 2384 dout("%s: img %p\n", __func__, img_request); 2385 for_each_obj_request_safe(img_request, obj_request, next_obj_request) { 2386 int ret; 2387 2388 ret = rbd_img_obj_request_submit(obj_request); 2389 if (ret) 2390 return ret; 2391 } 2392 2393 return 0; 2394} 2395 2396static void rbd_img_parent_read_callback(struct rbd_img_request *img_request) 2397{ 2398 struct rbd_obj_request *obj_request; 2399 struct rbd_device *rbd_dev; 2400 u64 obj_end; 2401 2402 rbd_assert(img_request_child_test(img_request)); 2403 2404 obj_request = img_request->obj_request; 2405 rbd_assert(obj_request); 2406 rbd_assert(obj_request->img_request); 2407 2408 obj_request->result = img_request->result; 2409 if (obj_request->result) 2410 goto out; 2411 2412 /* 2413 * We need to zero anything beyond the parent overlap 2414 * boundary. Since rbd_img_obj_request_read_callback() 2415 * will zero anything beyond the end of a short read, an 2416 * easy way to do this is to pretend the data from the 2417 * parent came up short--ending at the overlap boundary. 2418 */ 2419 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length); 2420 obj_end = obj_request->img_offset + obj_request->length; 2421 rbd_dev = obj_request->img_request->rbd_dev; 2422 if (obj_end > rbd_dev->parent_overlap) { 2423 u64 xferred = 0; 2424 2425 if (obj_request->img_offset < rbd_dev->parent_overlap) 2426 xferred = rbd_dev->parent_overlap - 2427 obj_request->img_offset; 2428 2429 obj_request->xferred = min(img_request->xferred, xferred); 2430 } else { 2431 obj_request->xferred = img_request->xferred; 2432 } 2433out: 2434 rbd_img_obj_request_read_callback(obj_request); 2435 rbd_obj_request_complete(obj_request); 2436} 2437 2438static void rbd_img_parent_read(struct rbd_obj_request *obj_request) 2439{ 2440 struct rbd_device *rbd_dev; 2441 struct rbd_img_request *img_request; 2442 int result; 2443 2444 rbd_assert(obj_request_img_data_test(obj_request)); 2445 rbd_assert(obj_request->img_request != NULL); 2446 rbd_assert(obj_request->result == (s32) -ENOENT); 2447 rbd_assert(obj_request->type == OBJ_REQUEST_BIO); 2448 2449 rbd_dev = obj_request->img_request->rbd_dev; 2450 rbd_assert(rbd_dev->parent != NULL); 2451 /* rbd_read_finish(obj_request, obj_request->length); */ 2452 img_request = rbd_img_request_create(rbd_dev->parent, 2453 obj_request->img_offset, 2454 obj_request->length, 2455 false, true); 2456 result = -ENOMEM; 2457 if (!img_request) 2458 goto out_err; 2459 2460 rbd_obj_request_get(obj_request); 2461 img_request->obj_request = obj_request; 2462 2463 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 2464 obj_request->bio_list); 2465 if (result) 2466 goto out_err; 2467 2468 img_request->callback = rbd_img_parent_read_callback; 2469 result = rbd_img_request_submit(img_request); 2470 if (result) 2471 goto out_err; 2472 2473 return; 2474out_err: 2475 if (img_request) 2476 rbd_img_request_put(img_request); 2477 obj_request->result = result; 2478 obj_request->xferred = 0; 2479 obj_request_done_set(obj_request); 2480} 2481 2482static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, 2483 u64 ver, u64 notify_id) 2484{ 2485 struct rbd_obj_request *obj_request; 2486 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2487 int ret; 2488 2489 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 2490 OBJ_REQUEST_NODATA); 2491 if (!obj_request) 2492 return -ENOMEM; 2493 2494 ret = -ENOMEM; 2495 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); 2496 if (!obj_request->osd_req) 2497 goto out; 2498 obj_request->callback = rbd_obj_request_put; 2499 2500 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK, 2501 notify_id, ver, 0); 2502 rbd_osd_req_format_read(obj_request); 2503 2504 ret = rbd_obj_request_submit(osdc, obj_request); 2505out: 2506 if (ret) 2507 rbd_obj_request_put(obj_request); 2508 2509 return ret; 2510} 2511 2512static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data) 2513{ 2514 struct rbd_device *rbd_dev = (struct rbd_device *)data; 2515 u64 hver; 2516 2517 if (!rbd_dev) 2518 return; 2519 2520 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__, 2521 rbd_dev->header_name, (unsigned long long) notify_id, 2522 (unsigned int) opcode); 2523 (void)rbd_dev_refresh(rbd_dev, &hver); 2524 2525 rbd_obj_notify_ack(rbd_dev, hver, notify_id); 2526} 2527 2528/* 2529 * Request sync osd watch/unwatch. The value of "start" determines 2530 * whether a watch request is being initiated or torn down. 2531 */ 2532static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start) 2533{ 2534 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2535 struct rbd_obj_request *obj_request; 2536 int ret; 2537 2538 rbd_assert(start ^ !!rbd_dev->watch_event); 2539 rbd_assert(start ^ !!rbd_dev->watch_request); 2540 2541 if (start) { 2542 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev, 2543 &rbd_dev->watch_event); 2544 if (ret < 0) 2545 return ret; 2546 rbd_assert(rbd_dev->watch_event != NULL); 2547 } 2548 2549 ret = -ENOMEM; 2550 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0, 2551 OBJ_REQUEST_NODATA); 2552 if (!obj_request) 2553 goto out_cancel; 2554 2555 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request); 2556 if (!obj_request->osd_req) 2557 goto out_cancel; 2558 2559 if (start) 2560 ceph_osdc_set_request_linger(osdc, obj_request->osd_req); 2561 else 2562 ceph_osdc_unregister_linger_request(osdc, 2563 rbd_dev->watch_request->osd_req); 2564 2565 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH, 2566 rbd_dev->watch_event->cookie, 2567 rbd_dev->header.obj_version, start); 2568 rbd_osd_req_format_write(obj_request); 2569 2570 ret = rbd_obj_request_submit(osdc, obj_request); 2571 if (ret) 2572 goto out_cancel; 2573 ret = rbd_obj_request_wait(obj_request); 2574 if (ret) 2575 goto out_cancel; 2576 ret = obj_request->result; 2577 if (ret) 2578 goto out_cancel; 2579 2580 /* 2581 * A watch request is set to linger, so the underlying osd 2582 * request won't go away until we unregister it. We retain 2583 * a pointer to the object request during that time (in 2584 * rbd_dev->watch_request), so we'll keep a reference to 2585 * it. We'll drop that reference (below) after we've 2586 * unregistered it. 2587 */ 2588 if (start) { 2589 rbd_dev->watch_request = obj_request; 2590 2591 return 0; 2592 } 2593 2594 /* We have successfully torn down the watch request */ 2595 2596 rbd_obj_request_put(rbd_dev->watch_request); 2597 rbd_dev->watch_request = NULL; 2598out_cancel: 2599 /* Cancel the event if we're tearing down, or on error */ 2600 ceph_osdc_cancel_event(rbd_dev->watch_event); 2601 rbd_dev->watch_event = NULL; 2602 if (obj_request) 2603 rbd_obj_request_put(obj_request); 2604 2605 return ret; 2606} 2607 2608/* 2609 * Synchronous osd object method call. Returns the number of bytes 2610 * returned in the outbound buffer, or a negative error code. 2611 */ 2612static int rbd_obj_method_sync(struct rbd_device *rbd_dev, 2613 const char *object_name, 2614 const char *class_name, 2615 const char *method_name, 2616 const void *outbound, 2617 size_t outbound_size, 2618 void *inbound, 2619 size_t inbound_size, 2620 u64 *version) 2621{ 2622 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2623 struct rbd_obj_request *obj_request; 2624 struct page **pages; 2625 u32 page_count; 2626 int ret; 2627 2628 /* 2629 * Method calls are ultimately read operations. The result 2630 * should placed into the inbound buffer provided. They 2631 * also supply outbound data--parameters for the object 2632 * method. Currently if this is present it will be a 2633 * snapshot id. 2634 */ 2635 page_count = (u32)calc_pages_for(0, inbound_size); 2636 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2637 if (IS_ERR(pages)) 2638 return PTR_ERR(pages); 2639 2640 ret = -ENOMEM; 2641 obj_request = rbd_obj_request_create(object_name, 0, inbound_size, 2642 OBJ_REQUEST_PAGES); 2643 if (!obj_request) 2644 goto out; 2645 2646 obj_request->pages = pages; 2647 obj_request->page_count = page_count; 2648 2649 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); 2650 if (!obj_request->osd_req) 2651 goto out; 2652 2653 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL, 2654 class_name, method_name); 2655 if (outbound_size) { 2656 struct ceph_pagelist *pagelist; 2657 2658 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 2659 if (!pagelist) 2660 goto out; 2661 2662 ceph_pagelist_init(pagelist); 2663 ceph_pagelist_append(pagelist, outbound, outbound_size); 2664 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0, 2665 pagelist); 2666 } 2667 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, 2668 obj_request->pages, inbound_size, 2669 0, false, false); 2670 rbd_osd_req_format_read(obj_request); 2671 2672 ret = rbd_obj_request_submit(osdc, obj_request); 2673 if (ret) 2674 goto out; 2675 ret = rbd_obj_request_wait(obj_request); 2676 if (ret) 2677 goto out; 2678 2679 ret = obj_request->result; 2680 if (ret < 0) 2681 goto out; 2682 2683 rbd_assert(obj_request->xferred < (u64)INT_MAX); 2684 ret = (int)obj_request->xferred; 2685 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); 2686 if (version) 2687 *version = obj_request->version; 2688out: 2689 if (obj_request) 2690 rbd_obj_request_put(obj_request); 2691 else 2692 ceph_release_page_vector(pages, page_count); 2693 2694 return ret; 2695} 2696 2697static void rbd_request_fn(struct request_queue *q) 2698 __releases(q->queue_lock) __acquires(q->queue_lock) 2699{ 2700 struct rbd_device *rbd_dev = q->queuedata; 2701 bool read_only = rbd_dev->mapping.read_only; 2702 struct request *rq; 2703 int result; 2704 2705 while ((rq = blk_fetch_request(q))) { 2706 bool write_request = rq_data_dir(rq) == WRITE; 2707 struct rbd_img_request *img_request; 2708 u64 offset; 2709 u64 length; 2710 2711 /* Ignore any non-FS requests that filter through. */ 2712 2713 if (rq->cmd_type != REQ_TYPE_FS) { 2714 dout("%s: non-fs request type %d\n", __func__, 2715 (int) rq->cmd_type); 2716 __blk_end_request_all(rq, 0); 2717 continue; 2718 } 2719 2720 /* Ignore/skip any zero-length requests */ 2721 2722 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT; 2723 length = (u64) blk_rq_bytes(rq); 2724 2725 if (!length) { 2726 dout("%s: zero-length request\n", __func__); 2727 __blk_end_request_all(rq, 0); 2728 continue; 2729 } 2730 2731 spin_unlock_irq(q->queue_lock); 2732 2733 /* Disallow writes to a read-only device */ 2734 2735 if (write_request) { 2736 result = -EROFS; 2737 if (read_only) 2738 goto end_request; 2739 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP); 2740 } 2741 2742 /* 2743 * Quit early if the mapped snapshot no longer 2744 * exists. It's still possible the snapshot will 2745 * have disappeared by the time our request arrives 2746 * at the osd, but there's no sense in sending it if 2747 * we already know. 2748 */ 2749 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) { 2750 dout("request for non-existent snapshot"); 2751 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP); 2752 result = -ENXIO; 2753 goto end_request; 2754 } 2755 2756 result = -EINVAL; 2757 if (offset && length > U64_MAX - offset + 1) { 2758 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n", 2759 offset, length); 2760 goto end_request; /* Shouldn't happen */ 2761 } 2762 2763 result = -ENOMEM; 2764 img_request = rbd_img_request_create(rbd_dev, offset, length, 2765 write_request, false); 2766 if (!img_request) 2767 goto end_request; 2768 2769 img_request->rq = rq; 2770 2771 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO, 2772 rq->bio); 2773 if (!result) 2774 result = rbd_img_request_submit(img_request); 2775 if (result) 2776 rbd_img_request_put(img_request); 2777end_request: 2778 spin_lock_irq(q->queue_lock); 2779 if (result < 0) { 2780 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n", 2781 write_request ? "write" : "read", 2782 length, offset, result); 2783 2784 __blk_end_request_all(rq, result); 2785 } 2786 } 2787} 2788 2789/* 2790 * a queue callback. Makes sure that we don't create a bio that spans across 2791 * multiple osd objects. One exception would be with a single page bios, 2792 * which we handle later at bio_chain_clone_range() 2793 */ 2794static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd, 2795 struct bio_vec *bvec) 2796{ 2797 struct rbd_device *rbd_dev = q->queuedata; 2798 sector_t sector_offset; 2799 sector_t sectors_per_obj; 2800 sector_t obj_sector_offset; 2801 int ret; 2802 2803 /* 2804 * Find how far into its rbd object the partition-relative 2805 * bio start sector is to offset relative to the enclosing 2806 * device. 2807 */ 2808 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector; 2809 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT); 2810 obj_sector_offset = sector_offset & (sectors_per_obj - 1); 2811 2812 /* 2813 * Compute the number of bytes from that offset to the end 2814 * of the object. Account for what's already used by the bio. 2815 */ 2816 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT; 2817 if (ret > bmd->bi_size) 2818 ret -= bmd->bi_size; 2819 else 2820 ret = 0; 2821 2822 /* 2823 * Don't send back more than was asked for. And if the bio 2824 * was empty, let the whole thing through because: "Note 2825 * that a block device *must* allow a single page to be 2826 * added to an empty bio." 2827 */ 2828 rbd_assert(bvec->bv_len <= PAGE_SIZE); 2829 if (ret > (int) bvec->bv_len || !bmd->bi_size) 2830 ret = (int) bvec->bv_len; 2831 2832 return ret; 2833} 2834 2835static void rbd_free_disk(struct rbd_device *rbd_dev) 2836{ 2837 struct gendisk *disk = rbd_dev->disk; 2838 2839 if (!disk) 2840 return; 2841 2842 rbd_dev->disk = NULL; 2843 if (disk->flags & GENHD_FL_UP) { 2844 del_gendisk(disk); 2845 if (disk->queue) 2846 blk_cleanup_queue(disk->queue); 2847 } 2848 put_disk(disk); 2849} 2850 2851static int rbd_obj_read_sync(struct rbd_device *rbd_dev, 2852 const char *object_name, 2853 u64 offset, u64 length, 2854 void *buf, u64 *version) 2855 2856{ 2857 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; 2858 struct rbd_obj_request *obj_request; 2859 struct page **pages = NULL; 2860 u32 page_count; 2861 size_t size; 2862 int ret; 2863 2864 page_count = (u32) calc_pages_for(offset, length); 2865 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); 2866 if (IS_ERR(pages)) 2867 ret = PTR_ERR(pages); 2868 2869 ret = -ENOMEM; 2870 obj_request = rbd_obj_request_create(object_name, offset, length, 2871 OBJ_REQUEST_PAGES); 2872 if (!obj_request) 2873 goto out; 2874 2875 obj_request->pages = pages; 2876 obj_request->page_count = page_count; 2877 2878 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request); 2879 if (!obj_request->osd_req) 2880 goto out; 2881 2882 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, 2883 offset, length, 0, 0); 2884 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, 2885 obj_request->pages, 2886 obj_request->length, 2887 obj_request->offset & ~PAGE_MASK, 2888 false, false); 2889 rbd_osd_req_format_read(obj_request); 2890 2891 ret = rbd_obj_request_submit(osdc, obj_request); 2892 if (ret) 2893 goto out; 2894 ret = rbd_obj_request_wait(obj_request); 2895 if (ret) 2896 goto out; 2897 2898 ret = obj_request->result; 2899 if (ret < 0) 2900 goto out; 2901 2902 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); 2903 size = (size_t) obj_request->xferred; 2904 ceph_copy_from_page_vector(pages, buf, 0, size); 2905 rbd_assert(size <= (size_t) INT_MAX); 2906 ret = (int) size; 2907 if (version) 2908 *version = obj_request->version; 2909out: 2910 if (obj_request) 2911 rbd_obj_request_put(obj_request); 2912 else 2913 ceph_release_page_vector(pages, page_count); 2914 2915 return ret; 2916} 2917 2918/* 2919 * Read the complete header for the given rbd device. 2920 * 2921 * Returns a pointer to a dynamically-allocated buffer containing 2922 * the complete and validated header. Caller can pass the address 2923 * of a variable that will be filled in with the version of the 2924 * header object at the time it was read. 2925 * 2926 * Returns a pointer-coded errno if a failure occurs. 2927 */ 2928static struct rbd_image_header_ondisk * 2929rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version) 2930{ 2931 struct rbd_image_header_ondisk *ondisk = NULL; 2932 u32 snap_count = 0; 2933 u64 names_size = 0; 2934 u32 want_count; 2935 int ret; 2936 2937 /* 2938 * The complete header will include an array of its 64-bit 2939 * snapshot ids, followed by the names of those snapshots as 2940 * a contiguous block of NUL-terminated strings. Note that 2941 * the number of snapshots could change by the time we read 2942 * it in, in which case we re-read it. 2943 */ 2944 do { 2945 size_t size; 2946 2947 kfree(ondisk); 2948 2949 size = sizeof (*ondisk); 2950 size += snap_count * sizeof (struct rbd_image_snap_ondisk); 2951 size += names_size; 2952 ondisk = kmalloc(size, GFP_KERNEL); 2953 if (!ondisk) 2954 return ERR_PTR(-ENOMEM); 2955 2956 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name, 2957 0, size, ondisk, version); 2958 if (ret < 0) 2959 goto out_err; 2960 if ((size_t)ret < size) { 2961 ret = -ENXIO; 2962 rbd_warn(rbd_dev, "short header read (want %zd got %d)", 2963 size, ret); 2964 goto out_err; 2965 } 2966 if (!rbd_dev_ondisk_valid(ondisk)) { 2967 ret = -ENXIO; 2968 rbd_warn(rbd_dev, "invalid header"); 2969 goto out_err; 2970 } 2971 2972 names_size = le64_to_cpu(ondisk->snap_names_len); 2973 want_count = snap_count; 2974 snap_count = le32_to_cpu(ondisk->snap_count); 2975 } while (snap_count != want_count); 2976 2977 return ondisk; 2978 2979out_err: 2980 kfree(ondisk); 2981 2982 return ERR_PTR(ret); 2983} 2984 2985/* 2986 * reload the ondisk the header 2987 */ 2988static int rbd_read_header(struct rbd_device *rbd_dev, 2989 struct rbd_image_header *header) 2990{ 2991 struct rbd_image_header_ondisk *ondisk; 2992 u64 ver = 0; 2993 int ret; 2994 2995 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver); 2996 if (IS_ERR(ondisk)) 2997 return PTR_ERR(ondisk); 2998 ret = rbd_header_from_disk(header, ondisk); 2999 if (ret >= 0) 3000 header->obj_version = ver; 3001 kfree(ondisk); 3002 3003 return ret; 3004} 3005 3006static void rbd_remove_all_snaps(struct rbd_device *rbd_dev) 3007{ 3008 struct rbd_snap *snap; 3009 struct rbd_snap *next; 3010 3011 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) { 3012 list_del(&snap->node); 3013 rbd_snap_destroy(snap); 3014 } 3015} 3016 3017static void rbd_update_mapping_size(struct rbd_device *rbd_dev) 3018{ 3019 sector_t size; 3020 3021 if (rbd_dev->spec->snap_id != CEPH_NOSNAP) 3022 return; 3023 3024 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE; 3025 dout("setting size to %llu sectors", (unsigned long long) size); 3026 rbd_dev->mapping.size = (u64) size; 3027 set_capacity(rbd_dev->disk, size); 3028} 3029 3030/* 3031 * only read the first part of the ondisk header, without the snaps info 3032 */ 3033static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver) 3034{ 3035 int ret; 3036 struct rbd_image_header h; 3037 3038 ret = rbd_read_header(rbd_dev, &h); 3039 if (ret < 0) 3040 return ret; 3041 3042 down_write(&rbd_dev->header_rwsem); 3043 3044 /* Update image size, and check for resize of mapped image */ 3045 rbd_dev->header.image_size = h.image_size; 3046 rbd_update_mapping_size(rbd_dev); 3047 3048 /* rbd_dev->header.object_prefix shouldn't change */ 3049 kfree(rbd_dev->header.snap_sizes); 3050 kfree(rbd_dev->header.snap_names); 3051 /* osd requests may still refer to snapc */ 3052 ceph_put_snap_context(rbd_dev->header.snapc); 3053 3054 if (hver) 3055 *hver = h.obj_version; 3056 rbd_dev->header.obj_version = h.obj_version; 3057 rbd_dev->header.image_size = h.image_size; 3058 rbd_dev->header.snapc = h.snapc; 3059 rbd_dev->header.snap_names = h.snap_names; 3060 rbd_dev->header.snap_sizes = h.snap_sizes; 3061 /* Free the extra copy of the object prefix */ 3062 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix)) 3063 rbd_warn(rbd_dev, "object prefix changed (ignoring)"); 3064 kfree(h.object_prefix); 3065 3066 ret = rbd_dev_snaps_update(rbd_dev); 3067 3068 up_write(&rbd_dev->header_rwsem); 3069 3070 return ret; 3071} 3072 3073static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver) 3074{ 3075 int ret; 3076 3077 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 3078 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 3079 if (rbd_dev->image_format == 1) 3080 ret = rbd_dev_v1_refresh(rbd_dev, hver); 3081 else 3082 ret = rbd_dev_v2_refresh(rbd_dev, hver); 3083 mutex_unlock(&ctl_mutex); 3084 revalidate_disk(rbd_dev->disk); 3085 if (ret) 3086 rbd_warn(rbd_dev, "got notification but failed to " 3087 " update snaps: %d\n", ret); 3088 3089 return ret; 3090} 3091 3092static int rbd_init_disk(struct rbd_device *rbd_dev) 3093{ 3094 struct gendisk *disk; 3095 struct request_queue *q; 3096 u64 segment_size; 3097 3098 /* create gendisk info */ 3099 disk = alloc_disk(RBD_MINORS_PER_MAJOR); 3100 if (!disk) 3101 return -ENOMEM; 3102 3103 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d", 3104 rbd_dev->dev_id); 3105 disk->major = rbd_dev->major; 3106 disk->first_minor = 0; 3107 disk->fops = &rbd_bd_ops; 3108 disk->private_data = rbd_dev; 3109 3110 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock); 3111 if (!q) 3112 goto out_disk; 3113 3114 /* We use the default size, but let's be explicit about it. */ 3115 blk_queue_physical_block_size(q, SECTOR_SIZE); 3116 3117 /* set io sizes to object size */ 3118 segment_size = rbd_obj_bytes(&rbd_dev->header); 3119 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE); 3120 blk_queue_max_segment_size(q, segment_size); 3121 blk_queue_io_min(q, segment_size); 3122 blk_queue_io_opt(q, segment_size); 3123 3124 blk_queue_merge_bvec(q, rbd_merge_bvec); 3125 disk->queue = q; 3126 3127 q->queuedata = rbd_dev; 3128 3129 rbd_dev->disk = disk; 3130 3131 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE); 3132 3133 return 0; 3134out_disk: 3135 put_disk(disk); 3136 3137 return -ENOMEM; 3138} 3139 3140/* 3141 sysfs 3142*/ 3143 3144static struct rbd_device *dev_to_rbd_dev(struct device *dev) 3145{ 3146 return container_of(dev, struct rbd_device, dev); 3147} 3148 3149static ssize_t rbd_size_show(struct device *dev, 3150 struct device_attribute *attr, char *buf) 3151{ 3152 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3153 sector_t size; 3154 3155 down_read(&rbd_dev->header_rwsem); 3156 size = get_capacity(rbd_dev->disk); 3157 up_read(&rbd_dev->header_rwsem); 3158 3159 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE); 3160} 3161 3162/* 3163 * Note this shows the features for whatever's mapped, which is not 3164 * necessarily the base image. 3165 */ 3166static ssize_t rbd_features_show(struct device *dev, 3167 struct device_attribute *attr, char *buf) 3168{ 3169 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3170 3171 return sprintf(buf, "0x%016llx\n", 3172 (unsigned long long) rbd_dev->mapping.features); 3173} 3174 3175static ssize_t rbd_major_show(struct device *dev, 3176 struct device_attribute *attr, char *buf) 3177{ 3178 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3179 3180 return sprintf(buf, "%d\n", rbd_dev->major); 3181} 3182 3183static ssize_t rbd_client_id_show(struct device *dev, 3184 struct device_attribute *attr, char *buf) 3185{ 3186 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3187 3188 return sprintf(buf, "client%lld\n", 3189 ceph_client_id(rbd_dev->rbd_client->client)); 3190} 3191 3192static ssize_t rbd_pool_show(struct device *dev, 3193 struct device_attribute *attr, char *buf) 3194{ 3195 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3196 3197 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name); 3198} 3199 3200static ssize_t rbd_pool_id_show(struct device *dev, 3201 struct device_attribute *attr, char *buf) 3202{ 3203 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3204 3205 return sprintf(buf, "%llu\n", 3206 (unsigned long long) rbd_dev->spec->pool_id); 3207} 3208 3209static ssize_t rbd_name_show(struct device *dev, 3210 struct device_attribute *attr, char *buf) 3211{ 3212 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3213 3214 if (rbd_dev->spec->image_name) 3215 return sprintf(buf, "%s\n", rbd_dev->spec->image_name); 3216 3217 return sprintf(buf, "(unknown)\n"); 3218} 3219 3220static ssize_t rbd_image_id_show(struct device *dev, 3221 struct device_attribute *attr, char *buf) 3222{ 3223 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3224 3225 return sprintf(buf, "%s\n", rbd_dev->spec->image_id); 3226} 3227 3228/* 3229 * Shows the name of the currently-mapped snapshot (or 3230 * RBD_SNAP_HEAD_NAME for the base image). 3231 */ 3232static ssize_t rbd_snap_show(struct device *dev, 3233 struct device_attribute *attr, 3234 char *buf) 3235{ 3236 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3237 3238 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name); 3239} 3240 3241/* 3242 * For an rbd v2 image, shows the pool id, image id, and snapshot id 3243 * for the parent image. If there is no parent, simply shows 3244 * "(no parent image)". 3245 */ 3246static ssize_t rbd_parent_show(struct device *dev, 3247 struct device_attribute *attr, 3248 char *buf) 3249{ 3250 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3251 struct rbd_spec *spec = rbd_dev->parent_spec; 3252 int count; 3253 char *bufp = buf; 3254 3255 if (!spec) 3256 return sprintf(buf, "(no parent image)\n"); 3257 3258 count = sprintf(bufp, "pool_id %llu\npool_name %s\n", 3259 (unsigned long long) spec->pool_id, spec->pool_name); 3260 if (count < 0) 3261 return count; 3262 bufp += count; 3263 3264 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id, 3265 spec->image_name ? spec->image_name : "(unknown)"); 3266 if (count < 0) 3267 return count; 3268 bufp += count; 3269 3270 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n", 3271 (unsigned long long) spec->snap_id, spec->snap_name); 3272 if (count < 0) 3273 return count; 3274 bufp += count; 3275 3276 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap); 3277 if (count < 0) 3278 return count; 3279 bufp += count; 3280 3281 return (ssize_t) (bufp - buf); 3282} 3283 3284static ssize_t rbd_image_refresh(struct device *dev, 3285 struct device_attribute *attr, 3286 const char *buf, 3287 size_t size) 3288{ 3289 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 3290 int ret; 3291 3292 ret = rbd_dev_refresh(rbd_dev, NULL); 3293 3294 return ret < 0 ? ret : size; 3295} 3296 3297static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL); 3298static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL); 3299static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL); 3300static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL); 3301static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL); 3302static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL); 3303static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL); 3304static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL); 3305static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh); 3306static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL); 3307static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL); 3308 3309static struct attribute *rbd_attrs[] = { 3310 &dev_attr_size.attr, 3311 &dev_attr_features.attr, 3312 &dev_attr_major.attr, 3313 &dev_attr_client_id.attr, 3314 &dev_attr_pool.attr, 3315 &dev_attr_pool_id.attr, 3316 &dev_attr_name.attr, 3317 &dev_attr_image_id.attr, 3318 &dev_attr_current_snap.attr, 3319 &dev_attr_parent.attr, 3320 &dev_attr_refresh.attr, 3321 NULL 3322}; 3323 3324static struct attribute_group rbd_attr_group = { 3325 .attrs = rbd_attrs, 3326}; 3327 3328static const struct attribute_group *rbd_attr_groups[] = { 3329 &rbd_attr_group, 3330 NULL 3331}; 3332 3333static void rbd_sysfs_dev_release(struct device *dev) 3334{ 3335} 3336 3337static struct device_type rbd_device_type = { 3338 .name = "rbd", 3339 .groups = rbd_attr_groups, 3340 .release = rbd_sysfs_dev_release, 3341}; 3342 3343static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec) 3344{ 3345 kref_get(&spec->kref); 3346 3347 return spec; 3348} 3349 3350static void rbd_spec_free(struct kref *kref); 3351static void rbd_spec_put(struct rbd_spec *spec) 3352{ 3353 if (spec) 3354 kref_put(&spec->kref, rbd_spec_free); 3355} 3356 3357static struct rbd_spec *rbd_spec_alloc(void) 3358{ 3359 struct rbd_spec *spec; 3360 3361 spec = kzalloc(sizeof (*spec), GFP_KERNEL); 3362 if (!spec) 3363 return NULL; 3364 kref_init(&spec->kref); 3365 3366 return spec; 3367} 3368 3369static void rbd_spec_free(struct kref *kref) 3370{ 3371 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref); 3372 3373 kfree(spec->pool_name); 3374 kfree(spec->image_id); 3375 kfree(spec->image_name); 3376 kfree(spec->snap_name); 3377 kfree(spec); 3378} 3379 3380static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc, 3381 struct rbd_spec *spec) 3382{ 3383 struct rbd_device *rbd_dev; 3384 3385 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL); 3386 if (!rbd_dev) 3387 return NULL; 3388 3389 spin_lock_init(&rbd_dev->lock); 3390 rbd_dev->flags = 0; 3391 INIT_LIST_HEAD(&rbd_dev->node); 3392 INIT_LIST_HEAD(&rbd_dev->snaps); 3393 init_rwsem(&rbd_dev->header_rwsem); 3394 3395 rbd_dev->spec = spec; 3396 rbd_dev->rbd_client = rbdc; 3397 3398 /* Initialize the layout used for all rbd requests */ 3399 3400 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 3401 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1); 3402 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER); 3403 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id); 3404 3405 return rbd_dev; 3406} 3407 3408static void rbd_dev_destroy(struct rbd_device *rbd_dev) 3409{ 3410 rbd_spec_put(rbd_dev->parent_spec); 3411 kfree(rbd_dev->header_name); 3412 rbd_put_client(rbd_dev->rbd_client); 3413 rbd_spec_put(rbd_dev->spec); 3414 kfree(rbd_dev); 3415} 3416 3417static void rbd_snap_destroy(struct rbd_snap *snap) 3418{ 3419 kfree(snap->name); 3420 kfree(snap); 3421} 3422 3423static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev, 3424 const char *snap_name, 3425 u64 snap_id, u64 snap_size, 3426 u64 snap_features) 3427{ 3428 struct rbd_snap *snap; 3429 3430 snap = kzalloc(sizeof (*snap), GFP_KERNEL); 3431 if (!snap) 3432 return ERR_PTR(-ENOMEM); 3433 3434 snap->name = snap_name; 3435 snap->id = snap_id; 3436 snap->size = snap_size; 3437 snap->features = snap_features; 3438 3439 return snap; 3440} 3441 3442/* 3443 * Returns a dynamically-allocated snapshot name if successful, or a 3444 * pointer-coded error otherwise. 3445 */ 3446static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which, 3447 u64 *snap_size, u64 *snap_features) 3448{ 3449 char *snap_name; 3450 int i; 3451 3452 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 3453 3454 /* Skip over names until we find the one we are looking for */ 3455 3456 snap_name = rbd_dev->header.snap_names; 3457 for (i = 0; i < which; i++) 3458 snap_name += strlen(snap_name) + 1; 3459 3460 snap_name = kstrdup(snap_name, GFP_KERNEL); 3461 if (!snap_name) 3462 return ERR_PTR(-ENOMEM); 3463 3464 *snap_size = rbd_dev->header.snap_sizes[which]; 3465 *snap_features = 0; /* No features for v1 */ 3466 3467 return snap_name; 3468} 3469 3470/* 3471 * Get the size and object order for an image snapshot, or if 3472 * snap_id is CEPH_NOSNAP, gets this information for the base 3473 * image. 3474 */ 3475static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, 3476 u8 *order, u64 *snap_size) 3477{ 3478 __le64 snapid = cpu_to_le64(snap_id); 3479 int ret; 3480 struct { 3481 u8 order; 3482 __le64 size; 3483 } __attribute__ ((packed)) size_buf = { 0 }; 3484 3485 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3486 "rbd", "get_size", 3487 &snapid, sizeof (snapid), 3488 &size_buf, sizeof (size_buf), NULL); 3489 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3490 if (ret < 0) 3491 return ret; 3492 if (ret < sizeof (size_buf)) 3493 return -ERANGE; 3494 3495 if (order) 3496 *order = size_buf.order; 3497 *snap_size = le64_to_cpu(size_buf.size); 3498 3499 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n", 3500 (unsigned long long)snap_id, (unsigned int)*order, 3501 (unsigned long long)*snap_size); 3502 3503 return 0; 3504} 3505 3506static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev) 3507{ 3508 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP, 3509 &rbd_dev->header.obj_order, 3510 &rbd_dev->header.image_size); 3511} 3512 3513static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) 3514{ 3515 void *reply_buf; 3516 int ret; 3517 void *p; 3518 3519 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL); 3520 if (!reply_buf) 3521 return -ENOMEM; 3522 3523 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3524 "rbd", "get_object_prefix", NULL, 0, 3525 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL); 3526 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3527 if (ret < 0) 3528 goto out; 3529 3530 p = reply_buf; 3531 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p, 3532 p + ret, NULL, GFP_NOIO); 3533 ret = 0; 3534 3535 if (IS_ERR(rbd_dev->header.object_prefix)) { 3536 ret = PTR_ERR(rbd_dev->header.object_prefix); 3537 rbd_dev->header.object_prefix = NULL; 3538 } else { 3539 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix); 3540 } 3541out: 3542 kfree(reply_buf); 3543 3544 return ret; 3545} 3546 3547static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, 3548 u64 *snap_features) 3549{ 3550 __le64 snapid = cpu_to_le64(snap_id); 3551 struct { 3552 __le64 features; 3553 __le64 incompat; 3554 } __attribute__ ((packed)) features_buf = { 0 }; 3555 u64 incompat; 3556 int ret; 3557 3558 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3559 "rbd", "get_features", 3560 &snapid, sizeof (snapid), 3561 &features_buf, sizeof (features_buf), NULL); 3562 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3563 if (ret < 0) 3564 return ret; 3565 if (ret < sizeof (features_buf)) 3566 return -ERANGE; 3567 3568 incompat = le64_to_cpu(features_buf.incompat); 3569 if (incompat & ~RBD_FEATURES_SUPPORTED) 3570 return -ENXIO; 3571 3572 *snap_features = le64_to_cpu(features_buf.features); 3573 3574 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n", 3575 (unsigned long long)snap_id, 3576 (unsigned long long)*snap_features, 3577 (unsigned long long)le64_to_cpu(features_buf.incompat)); 3578 3579 return 0; 3580} 3581 3582static int rbd_dev_v2_features(struct rbd_device *rbd_dev) 3583{ 3584 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP, 3585 &rbd_dev->header.features); 3586} 3587 3588static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) 3589{ 3590 struct rbd_spec *parent_spec; 3591 size_t size; 3592 void *reply_buf = NULL; 3593 __le64 snapid; 3594 void *p; 3595 void *end; 3596 char *image_id; 3597 u64 overlap; 3598 int ret; 3599 3600 parent_spec = rbd_spec_alloc(); 3601 if (!parent_spec) 3602 return -ENOMEM; 3603 3604 size = sizeof (__le64) + /* pool_id */ 3605 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */ 3606 sizeof (__le64) + /* snap_id */ 3607 sizeof (__le64); /* overlap */ 3608 reply_buf = kmalloc(size, GFP_KERNEL); 3609 if (!reply_buf) { 3610 ret = -ENOMEM; 3611 goto out_err; 3612 } 3613 3614 snapid = cpu_to_le64(CEPH_NOSNAP); 3615 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3616 "rbd", "get_parent", 3617 &snapid, sizeof (snapid), 3618 reply_buf, size, NULL); 3619 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3620 if (ret < 0) 3621 goto out_err; 3622 3623 p = reply_buf; 3624 end = reply_buf + ret; 3625 ret = -ERANGE; 3626 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err); 3627 if (parent_spec->pool_id == CEPH_NOPOOL) 3628 goto out; /* No parent? No problem. */ 3629 3630 /* The ceph file layout needs to fit pool id in 32 bits */ 3631 3632 ret = -EIO; 3633 if (parent_spec->pool_id > (u64)U32_MAX) { 3634 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n", 3635 (unsigned long long)parent_spec->pool_id, U32_MAX); 3636 goto out_err; 3637 } 3638 3639 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 3640 if (IS_ERR(image_id)) { 3641 ret = PTR_ERR(image_id); 3642 goto out_err; 3643 } 3644 parent_spec->image_id = image_id; 3645 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err); 3646 ceph_decode_64_safe(&p, end, overlap, out_err); 3647 3648 rbd_dev->parent_overlap = overlap; 3649 rbd_dev->parent_spec = parent_spec; 3650 parent_spec = NULL; /* rbd_dev now owns this */ 3651out: 3652 ret = 0; 3653out_err: 3654 kfree(reply_buf); 3655 rbd_spec_put(parent_spec); 3656 3657 return ret; 3658} 3659 3660static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) 3661{ 3662 struct { 3663 __le64 stripe_unit; 3664 __le64 stripe_count; 3665 } __attribute__ ((packed)) striping_info_buf = { 0 }; 3666 size_t size = sizeof (striping_info_buf); 3667 void *p; 3668 u64 obj_size; 3669 u64 stripe_unit; 3670 u64 stripe_count; 3671 int ret; 3672 3673 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3674 "rbd", "get_stripe_unit_count", NULL, 0, 3675 (char *)&striping_info_buf, size, NULL); 3676 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3677 if (ret < 0) 3678 return ret; 3679 if (ret < size) 3680 return -ERANGE; 3681 3682 /* 3683 * We don't actually support the "fancy striping" feature 3684 * (STRIPINGV2) yet, but if the striping sizes are the 3685 * defaults the behavior is the same as before. So find 3686 * out, and only fail if the image has non-default values. 3687 */ 3688 ret = -EINVAL; 3689 obj_size = (u64)1 << rbd_dev->header.obj_order; 3690 p = &striping_info_buf; 3691 stripe_unit = ceph_decode_64(&p); 3692 if (stripe_unit != obj_size) { 3693 rbd_warn(rbd_dev, "unsupported stripe unit " 3694 "(got %llu want %llu)", 3695 stripe_unit, obj_size); 3696 return -EINVAL; 3697 } 3698 stripe_count = ceph_decode_64(&p); 3699 if (stripe_count != 1) { 3700 rbd_warn(rbd_dev, "unsupported stripe count " 3701 "(got %llu want 1)", stripe_count); 3702 return -EINVAL; 3703 } 3704 rbd_dev->header.stripe_unit = stripe_unit; 3705 rbd_dev->header.stripe_count = stripe_count; 3706 3707 return 0; 3708} 3709 3710static char *rbd_dev_image_name(struct rbd_device *rbd_dev) 3711{ 3712 size_t image_id_size; 3713 char *image_id; 3714 void *p; 3715 void *end; 3716 size_t size; 3717 void *reply_buf = NULL; 3718 size_t len = 0; 3719 char *image_name = NULL; 3720 int ret; 3721 3722 rbd_assert(!rbd_dev->spec->image_name); 3723 3724 len = strlen(rbd_dev->spec->image_id); 3725 image_id_size = sizeof (__le32) + len; 3726 image_id = kmalloc(image_id_size, GFP_KERNEL); 3727 if (!image_id) 3728 return NULL; 3729 3730 p = image_id; 3731 end = image_id + image_id_size; 3732 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len); 3733 3734 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX; 3735 reply_buf = kmalloc(size, GFP_KERNEL); 3736 if (!reply_buf) 3737 goto out; 3738 3739 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, 3740 "rbd", "dir_get_name", 3741 image_id, image_id_size, 3742 reply_buf, size, NULL); 3743 if (ret < 0) 3744 goto out; 3745 p = reply_buf; 3746 end = reply_buf + ret; 3747 3748 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL); 3749 if (IS_ERR(image_name)) 3750 image_name = NULL; 3751 else 3752 dout("%s: name is %s len is %zd\n", __func__, image_name, len); 3753out: 3754 kfree(reply_buf); 3755 kfree(image_id); 3756 3757 return image_name; 3758} 3759 3760/* 3761 * When a parent image gets probed, we only have the pool, image, 3762 * and snapshot ids but not the names of any of them. This call 3763 * is made later to fill in those names. It has to be done after 3764 * rbd_dev_snaps_update() has completed because some of the 3765 * information (in particular, snapshot name) is not available 3766 * until then. 3767 * 3768 * When an image being mapped (not a parent) is probed, we have the 3769 * pool name and pool id, image name and image id, and the snapshot 3770 * name. The only thing we're missing is the snapshot id. 3771 */ 3772static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev) 3773{ 3774 struct ceph_osd_client *osdc; 3775 const char *name; 3776 void *reply_buf = NULL; 3777 int ret; 3778 3779 /* 3780 * An image being mapped will have the pool name (etc.), but 3781 * we need to look up the snapshot id. 3782 */ 3783 if (rbd_dev->spec->pool_name) { 3784 if (strcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME)) { 3785 struct rbd_snap *snap; 3786 3787 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name); 3788 if (!snap) 3789 return -ENOENT; 3790 rbd_dev->spec->snap_id = snap->id; 3791 } else { 3792 rbd_dev->spec->snap_id = CEPH_NOSNAP; 3793 } 3794 3795 return 0; 3796 } 3797 3798 /* Look up the pool name */ 3799 3800 osdc = &rbd_dev->rbd_client->client->osdc; 3801 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id); 3802 if (!name) { 3803 rbd_warn(rbd_dev, "there is no pool with id %llu", 3804 rbd_dev->spec->pool_id); /* Really a BUG() */ 3805 return -EIO; 3806 } 3807 3808 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL); 3809 if (!rbd_dev->spec->pool_name) 3810 return -ENOMEM; 3811 3812 /* Fetch the image name; tolerate failure here */ 3813 3814 name = rbd_dev_image_name(rbd_dev); 3815 if (name) 3816 rbd_dev->spec->image_name = (char *)name; 3817 else 3818 rbd_warn(rbd_dev, "unable to get image name"); 3819 3820 /* Look up the snapshot name. */ 3821 3822 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id); 3823 if (!name) { 3824 rbd_warn(rbd_dev, "no snapshot with id %llu", 3825 rbd_dev->spec->snap_id); /* Really a BUG() */ 3826 ret = -EIO; 3827 goto out_err; 3828 } 3829 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL); 3830 if(!rbd_dev->spec->snap_name) 3831 goto out_err; 3832 3833 return 0; 3834out_err: 3835 kfree(reply_buf); 3836 kfree(rbd_dev->spec->pool_name); 3837 rbd_dev->spec->pool_name = NULL; 3838 3839 return ret; 3840} 3841 3842static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver) 3843{ 3844 size_t size; 3845 int ret; 3846 void *reply_buf; 3847 void *p; 3848 void *end; 3849 u64 seq; 3850 u32 snap_count; 3851 struct ceph_snap_context *snapc; 3852 u32 i; 3853 3854 /* 3855 * We'll need room for the seq value (maximum snapshot id), 3856 * snapshot count, and array of that many snapshot ids. 3857 * For now we have a fixed upper limit on the number we're 3858 * prepared to receive. 3859 */ 3860 size = sizeof (__le64) + sizeof (__le32) + 3861 RBD_MAX_SNAP_COUNT * sizeof (__le64); 3862 reply_buf = kzalloc(size, GFP_KERNEL); 3863 if (!reply_buf) 3864 return -ENOMEM; 3865 3866 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3867 "rbd", "get_snapcontext", NULL, 0, 3868 reply_buf, size, ver); 3869 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3870 if (ret < 0) 3871 goto out; 3872 3873 p = reply_buf; 3874 end = reply_buf + ret; 3875 ret = -ERANGE; 3876 ceph_decode_64_safe(&p, end, seq, out); 3877 ceph_decode_32_safe(&p, end, snap_count, out); 3878 3879 /* 3880 * Make sure the reported number of snapshot ids wouldn't go 3881 * beyond the end of our buffer. But before checking that, 3882 * make sure the computed size of the snapshot context we 3883 * allocate is representable in a size_t. 3884 */ 3885 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context)) 3886 / sizeof (u64)) { 3887 ret = -EINVAL; 3888 goto out; 3889 } 3890 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64))) 3891 goto out; 3892 3893 size = sizeof (struct ceph_snap_context) + 3894 snap_count * sizeof (snapc->snaps[0]); 3895 snapc = kmalloc(size, GFP_KERNEL); 3896 if (!snapc) { 3897 ret = -ENOMEM; 3898 goto out; 3899 } 3900 ret = 0; 3901 3902 atomic_set(&snapc->nref, 1); 3903 snapc->seq = seq; 3904 snapc->num_snaps = snap_count; 3905 for (i = 0; i < snap_count; i++) 3906 snapc->snaps[i] = ceph_decode_64(&p); 3907 3908 rbd_dev->header.snapc = snapc; 3909 3910 dout(" snap context seq = %llu, snap_count = %u\n", 3911 (unsigned long long)seq, (unsigned int)snap_count); 3912out: 3913 kfree(reply_buf); 3914 3915 return ret; 3916} 3917 3918static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which) 3919{ 3920 size_t size; 3921 void *reply_buf; 3922 __le64 snap_id; 3923 int ret; 3924 void *p; 3925 void *end; 3926 char *snap_name; 3927 3928 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN; 3929 reply_buf = kmalloc(size, GFP_KERNEL); 3930 if (!reply_buf) 3931 return ERR_PTR(-ENOMEM); 3932 3933 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 3934 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]); 3935 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name, 3936 "rbd", "get_snapshot_name", 3937 &snap_id, sizeof (snap_id), 3938 reply_buf, size, NULL); 3939 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 3940 if (ret < 0) { 3941 snap_name = ERR_PTR(ret); 3942 goto out; 3943 } 3944 3945 p = reply_buf; 3946 end = reply_buf + ret; 3947 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL); 3948 if (IS_ERR(snap_name)) 3949 goto out; 3950 3951 dout(" snap_id 0x%016llx snap_name = %s\n", 3952 (unsigned long long)le64_to_cpu(snap_id), snap_name); 3953out: 3954 kfree(reply_buf); 3955 3956 return snap_name; 3957} 3958 3959static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which, 3960 u64 *snap_size, u64 *snap_features) 3961{ 3962 u64 snap_id; 3963 u64 size; 3964 u64 features; 3965 char *snap_name; 3966 int ret; 3967 3968 rbd_assert(which < rbd_dev->header.snapc->num_snaps); 3969 snap_id = rbd_dev->header.snapc->snaps[which]; 3970 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size); 3971 if (ret) 3972 goto out_err; 3973 3974 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features); 3975 if (ret) 3976 goto out_err; 3977 3978 snap_name = rbd_dev_v2_snap_name(rbd_dev, which); 3979 if (!IS_ERR(snap_name)) { 3980 *snap_size = size; 3981 *snap_features = features; 3982 } 3983 3984 return snap_name; 3985out_err: 3986 return ERR_PTR(ret); 3987} 3988 3989static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which, 3990 u64 *snap_size, u64 *snap_features) 3991{ 3992 if (rbd_dev->image_format == 1) 3993 return rbd_dev_v1_snap_info(rbd_dev, which, 3994 snap_size, snap_features); 3995 if (rbd_dev->image_format == 2) 3996 return rbd_dev_v2_snap_info(rbd_dev, which, 3997 snap_size, snap_features); 3998 return ERR_PTR(-EINVAL); 3999} 4000 4001static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver) 4002{ 4003 int ret; 4004 __u8 obj_order; 4005 4006 down_write(&rbd_dev->header_rwsem); 4007 4008 /* Grab old order first, to see if it changes */ 4009 4010 obj_order = rbd_dev->header.obj_order, 4011 ret = rbd_dev_v2_image_size(rbd_dev); 4012 if (ret) 4013 goto out; 4014 if (rbd_dev->header.obj_order != obj_order) { 4015 ret = -EIO; 4016 goto out; 4017 } 4018 rbd_update_mapping_size(rbd_dev); 4019 4020 ret = rbd_dev_v2_snap_context(rbd_dev, hver); 4021 dout("rbd_dev_v2_snap_context returned %d\n", ret); 4022 if (ret) 4023 goto out; 4024 ret = rbd_dev_snaps_update(rbd_dev); 4025 dout("rbd_dev_snaps_update returned %d\n", ret); 4026 if (ret) 4027 goto out; 4028out: 4029 up_write(&rbd_dev->header_rwsem); 4030 4031 return ret; 4032} 4033 4034/* 4035 * Scan the rbd device's current snapshot list and compare it to the 4036 * newly-received snapshot context. Remove any existing snapshots 4037 * not present in the new snapshot context. Add a new snapshot for 4038 * any snaphots in the snapshot context not in the current list. 4039 * And verify there are no changes to snapshots we already know 4040 * about. 4041 * 4042 * Assumes the snapshots in the snapshot context are sorted by 4043 * snapshot id, highest id first. (Snapshots in the rbd_dev's list 4044 * are also maintained in that order.) 4045 * 4046 * Note that any error occurs while updating the snapshot list 4047 * aborts the update, and the entire list is cleared. The snapshot 4048 * list becomes inconsistent at that point anyway, so it might as 4049 * well be empty. 4050 */ 4051static int rbd_dev_snaps_update(struct rbd_device *rbd_dev) 4052{ 4053 struct ceph_snap_context *snapc = rbd_dev->header.snapc; 4054 const u32 snap_count = snapc->num_snaps; 4055 struct list_head *head = &rbd_dev->snaps; 4056 struct list_head *links = head->next; 4057 u32 index = 0; 4058 int ret = 0; 4059 4060 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count); 4061 while (index < snap_count || links != head) { 4062 u64 snap_id; 4063 struct rbd_snap *snap; 4064 char *snap_name; 4065 u64 snap_size = 0; 4066 u64 snap_features = 0; 4067 4068 snap_id = index < snap_count ? snapc->snaps[index] 4069 : CEPH_NOSNAP; 4070 snap = links != head ? list_entry(links, struct rbd_snap, node) 4071 : NULL; 4072 rbd_assert(!snap || snap->id != CEPH_NOSNAP); 4073 4074 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) { 4075 struct list_head *next = links->next; 4076 4077 /* 4078 * A previously-existing snapshot is not in 4079 * the new snap context. 4080 * 4081 * If the now-missing snapshot is the one 4082 * the image represents, clear its existence 4083 * flag so we can avoid sending any more 4084 * requests to it. 4085 */ 4086 if (rbd_dev->spec->snap_id == snap->id) 4087 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags); 4088 dout("removing %ssnap id %llu\n", 4089 rbd_dev->spec->snap_id == snap->id ? 4090 "mapped " : "", 4091 (unsigned long long)snap->id); 4092 4093 list_del(&snap->node); 4094 rbd_snap_destroy(snap); 4095 4096 /* Done with this list entry; advance */ 4097 4098 links = next; 4099 continue; 4100 } 4101 4102 snap_name = rbd_dev_snap_info(rbd_dev, index, 4103 &snap_size, &snap_features); 4104 if (IS_ERR(snap_name)) { 4105 ret = PTR_ERR(snap_name); 4106 dout("failed to get snap info, error %d\n", ret); 4107 goto out_err; 4108 } 4109 4110 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count, 4111 (unsigned long long)snap_id); 4112 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) { 4113 struct rbd_snap *new_snap; 4114 4115 /* We haven't seen this snapshot before */ 4116 4117 new_snap = rbd_snap_create(rbd_dev, snap_name, 4118 snap_id, snap_size, snap_features); 4119 if (IS_ERR(new_snap)) { 4120 ret = PTR_ERR(new_snap); 4121 dout(" failed to add dev, error %d\n", ret); 4122 goto out_err; 4123 } 4124 4125 /* New goes before existing, or at end of list */ 4126 4127 dout(" added dev%s\n", snap ? "" : " at end\n"); 4128 if (snap) 4129 list_add_tail(&new_snap->node, &snap->node); 4130 else 4131 list_add_tail(&new_snap->node, head); 4132 } else { 4133 /* Already have this one */ 4134 4135 dout(" already present\n"); 4136 4137 rbd_assert(snap->size == snap_size); 4138 rbd_assert(!strcmp(snap->name, snap_name)); 4139 rbd_assert(snap->features == snap_features); 4140 4141 /* Done with this list entry; advance */ 4142 4143 links = links->next; 4144 } 4145 4146 /* Advance to the next entry in the snapshot context */ 4147 4148 index++; 4149 } 4150 dout("%s: done\n", __func__); 4151 4152 return 0; 4153out_err: 4154 rbd_remove_all_snaps(rbd_dev); 4155 4156 return ret; 4157} 4158 4159static int rbd_bus_add_dev(struct rbd_device *rbd_dev) 4160{ 4161 struct device *dev; 4162 int ret; 4163 4164 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 4165 4166 dev = &rbd_dev->dev; 4167 dev->bus = &rbd_bus_type; 4168 dev->type = &rbd_device_type; 4169 dev->parent = &rbd_root_dev; 4170 dev->release = rbd_dev_release; 4171 dev_set_name(dev, "%d", rbd_dev->dev_id); 4172 ret = device_register(dev); 4173 4174 mutex_unlock(&ctl_mutex); 4175 4176 return ret; 4177} 4178 4179static void rbd_bus_del_dev(struct rbd_device *rbd_dev) 4180{ 4181 device_unregister(&rbd_dev->dev); 4182} 4183 4184static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0); 4185 4186/* 4187 * Get a unique rbd identifier for the given new rbd_dev, and add 4188 * the rbd_dev to the global list. The minimum rbd id is 1. 4189 */ 4190static void rbd_dev_id_get(struct rbd_device *rbd_dev) 4191{ 4192 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max); 4193 4194 spin_lock(&rbd_dev_list_lock); 4195 list_add_tail(&rbd_dev->node, &rbd_dev_list); 4196 spin_unlock(&rbd_dev_list_lock); 4197 dout("rbd_dev %p given dev id %llu\n", rbd_dev, 4198 (unsigned long long) rbd_dev->dev_id); 4199} 4200 4201/* 4202 * Remove an rbd_dev from the global list, and record that its 4203 * identifier is no longer in use. 4204 */ 4205static void rbd_dev_id_put(struct rbd_device *rbd_dev) 4206{ 4207 struct list_head *tmp; 4208 int rbd_id = rbd_dev->dev_id; 4209 int max_id; 4210 4211 rbd_assert(rbd_id > 0); 4212 4213 dout("rbd_dev %p released dev id %llu\n", rbd_dev, 4214 (unsigned long long) rbd_dev->dev_id); 4215 spin_lock(&rbd_dev_list_lock); 4216 list_del_init(&rbd_dev->node); 4217 4218 /* 4219 * If the id being "put" is not the current maximum, there 4220 * is nothing special we need to do. 4221 */ 4222 if (rbd_id != atomic64_read(&rbd_dev_id_max)) { 4223 spin_unlock(&rbd_dev_list_lock); 4224 return; 4225 } 4226 4227 /* 4228 * We need to update the current maximum id. Search the 4229 * list to find out what it is. We're more likely to find 4230 * the maximum at the end, so search the list backward. 4231 */ 4232 max_id = 0; 4233 list_for_each_prev(tmp, &rbd_dev_list) { 4234 struct rbd_device *rbd_dev; 4235 4236 rbd_dev = list_entry(tmp, struct rbd_device, node); 4237 if (rbd_dev->dev_id > max_id) 4238 max_id = rbd_dev->dev_id; 4239 } 4240 spin_unlock(&rbd_dev_list_lock); 4241 4242 /* 4243 * The max id could have been updated by rbd_dev_id_get(), in 4244 * which case it now accurately reflects the new maximum. 4245 * Be careful not to overwrite the maximum value in that 4246 * case. 4247 */ 4248 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id); 4249 dout(" max dev id has been reset\n"); 4250} 4251 4252/* 4253 * Skips over white space at *buf, and updates *buf to point to the 4254 * first found non-space character (if any). Returns the length of 4255 * the token (string of non-white space characters) found. Note 4256 * that *buf must be terminated with '\0'. 4257 */ 4258static inline size_t next_token(const char **buf) 4259{ 4260 /* 4261 * These are the characters that produce nonzero for 4262 * isspace() in the "C" and "POSIX" locales. 4263 */ 4264 const char *spaces = " \f\n\r\t\v"; 4265 4266 *buf += strspn(*buf, spaces); /* Find start of token */ 4267 4268 return strcspn(*buf, spaces); /* Return token length */ 4269} 4270 4271/* 4272 * Finds the next token in *buf, and if the provided token buffer is 4273 * big enough, copies the found token into it. The result, if 4274 * copied, is guaranteed to be terminated with '\0'. Note that *buf 4275 * must be terminated with '\0' on entry. 4276 * 4277 * Returns the length of the token found (not including the '\0'). 4278 * Return value will be 0 if no token is found, and it will be >= 4279 * token_size if the token would not fit. 4280 * 4281 * The *buf pointer will be updated to point beyond the end of the 4282 * found token. Note that this occurs even if the token buffer is 4283 * too small to hold it. 4284 */ 4285static inline size_t copy_token(const char **buf, 4286 char *token, 4287 size_t token_size) 4288{ 4289 size_t len; 4290 4291 len = next_token(buf); 4292 if (len < token_size) { 4293 memcpy(token, *buf, len); 4294 *(token + len) = '\0'; 4295 } 4296 *buf += len; 4297 4298 return len; 4299} 4300 4301/* 4302 * Finds the next token in *buf, dynamically allocates a buffer big 4303 * enough to hold a copy of it, and copies the token into the new 4304 * buffer. The copy is guaranteed to be terminated with '\0'. Note 4305 * that a duplicate buffer is created even for a zero-length token. 4306 * 4307 * Returns a pointer to the newly-allocated duplicate, or a null 4308 * pointer if memory for the duplicate was not available. If 4309 * the lenp argument is a non-null pointer, the length of the token 4310 * (not including the '\0') is returned in *lenp. 4311 * 4312 * If successful, the *buf pointer will be updated to point beyond 4313 * the end of the found token. 4314 * 4315 * Note: uses GFP_KERNEL for allocation. 4316 */ 4317static inline char *dup_token(const char **buf, size_t *lenp) 4318{ 4319 char *dup; 4320 size_t len; 4321 4322 len = next_token(buf); 4323 dup = kmemdup(*buf, len + 1, GFP_KERNEL); 4324 if (!dup) 4325 return NULL; 4326 *(dup + len) = '\0'; 4327 *buf += len; 4328 4329 if (lenp) 4330 *lenp = len; 4331 4332 return dup; 4333} 4334 4335/* 4336 * Parse the options provided for an "rbd add" (i.e., rbd image 4337 * mapping) request. These arrive via a write to /sys/bus/rbd/add, 4338 * and the data written is passed here via a NUL-terminated buffer. 4339 * Returns 0 if successful or an error code otherwise. 4340 * 4341 * The information extracted from these options is recorded in 4342 * the other parameters which return dynamically-allocated 4343 * structures: 4344 * ceph_opts 4345 * The address of a pointer that will refer to a ceph options 4346 * structure. Caller must release the returned pointer using 4347 * ceph_destroy_options() when it is no longer needed. 4348 * rbd_opts 4349 * Address of an rbd options pointer. Fully initialized by 4350 * this function; caller must release with kfree(). 4351 * spec 4352 * Address of an rbd image specification pointer. Fully 4353 * initialized by this function based on parsed options. 4354 * Caller must release with rbd_spec_put(). 4355 * 4356 * The options passed take this form: 4357 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>] 4358 * where: 4359 * <mon_addrs> 4360 * A comma-separated list of one or more monitor addresses. 4361 * A monitor address is an ip address, optionally followed 4362 * by a port number (separated by a colon). 4363 * I.e.: ip1[:port1][,ip2[:port2]...] 4364 * <options> 4365 * A comma-separated list of ceph and/or rbd options. 4366 * <pool_name> 4367 * The name of the rados pool containing the rbd image. 4368 * <image_name> 4369 * The name of the image in that pool to map. 4370 * <snap_id> 4371 * An optional snapshot id. If provided, the mapping will 4372 * present data from the image at the time that snapshot was 4373 * created. The image head is used if no snapshot id is 4374 * provided. Snapshot mappings are always read-only. 4375 */ 4376static int rbd_add_parse_args(const char *buf, 4377 struct ceph_options **ceph_opts, 4378 struct rbd_options **opts, 4379 struct rbd_spec **rbd_spec) 4380{ 4381 size_t len; 4382 char *options; 4383 const char *mon_addrs; 4384 char *snap_name; 4385 size_t mon_addrs_size; 4386 struct rbd_spec *spec = NULL; 4387 struct rbd_options *rbd_opts = NULL; 4388 struct ceph_options *copts; 4389 int ret; 4390 4391 /* The first four tokens are required */ 4392 4393 len = next_token(&buf); 4394 if (!len) { 4395 rbd_warn(NULL, "no monitor address(es) provided"); 4396 return -EINVAL; 4397 } 4398 mon_addrs = buf; 4399 mon_addrs_size = len + 1; 4400 buf += len; 4401 4402 ret = -EINVAL; 4403 options = dup_token(&buf, NULL); 4404 if (!options) 4405 return -ENOMEM; 4406 if (!*options) { 4407 rbd_warn(NULL, "no options provided"); 4408 goto out_err; 4409 } 4410 4411 spec = rbd_spec_alloc(); 4412 if (!spec) 4413 goto out_mem; 4414 4415 spec->pool_name = dup_token(&buf, NULL); 4416 if (!spec->pool_name) 4417 goto out_mem; 4418 if (!*spec->pool_name) { 4419 rbd_warn(NULL, "no pool name provided"); 4420 goto out_err; 4421 } 4422 4423 spec->image_name = dup_token(&buf, NULL); 4424 if (!spec->image_name) 4425 goto out_mem; 4426 if (!*spec->image_name) { 4427 rbd_warn(NULL, "no image name provided"); 4428 goto out_err; 4429 } 4430 4431 /* 4432 * Snapshot name is optional; default is to use "-" 4433 * (indicating the head/no snapshot). 4434 */ 4435 len = next_token(&buf); 4436 if (!len) { 4437 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */ 4438 len = sizeof (RBD_SNAP_HEAD_NAME) - 1; 4439 } else if (len > RBD_MAX_SNAP_NAME_LEN) { 4440 ret = -ENAMETOOLONG; 4441 goto out_err; 4442 } 4443 snap_name = kmemdup(buf, len + 1, GFP_KERNEL); 4444 if (!snap_name) 4445 goto out_mem; 4446 *(snap_name + len) = '\0'; 4447 spec->snap_name = snap_name; 4448 4449 /* Initialize all rbd options to the defaults */ 4450 4451 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL); 4452 if (!rbd_opts) 4453 goto out_mem; 4454 4455 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT; 4456 4457 copts = ceph_parse_options(options, mon_addrs, 4458 mon_addrs + mon_addrs_size - 1, 4459 parse_rbd_opts_token, rbd_opts); 4460 if (IS_ERR(copts)) { 4461 ret = PTR_ERR(copts); 4462 goto out_err; 4463 } 4464 kfree(options); 4465 4466 *ceph_opts = copts; 4467 *opts = rbd_opts; 4468 *rbd_spec = spec; 4469 4470 return 0; 4471out_mem: 4472 ret = -ENOMEM; 4473out_err: 4474 kfree(rbd_opts); 4475 rbd_spec_put(spec); 4476 kfree(options); 4477 4478 return ret; 4479} 4480 4481/* 4482 * An rbd format 2 image has a unique identifier, distinct from the 4483 * name given to it by the user. Internally, that identifier is 4484 * what's used to specify the names of objects related to the image. 4485 * 4486 * A special "rbd id" object is used to map an rbd image name to its 4487 * id. If that object doesn't exist, then there is no v2 rbd image 4488 * with the supplied name. 4489 * 4490 * This function will record the given rbd_dev's image_id field if 4491 * it can be determined, and in that case will return 0. If any 4492 * errors occur a negative errno will be returned and the rbd_dev's 4493 * image_id field will be unchanged (and should be NULL). 4494 */ 4495static int rbd_dev_image_id(struct rbd_device *rbd_dev) 4496{ 4497 int ret; 4498 size_t size; 4499 char *object_name; 4500 void *response; 4501 char *image_id; 4502 4503 /* 4504 * When probing a parent image, the image id is already 4505 * known (and the image name likely is not). There's no 4506 * need to fetch the image id again in this case. We 4507 * do still need to set the image format though. 4508 */ 4509 if (rbd_dev->spec->image_id) { 4510 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1; 4511 4512 return 0; 4513 } 4514 4515 /* 4516 * First, see if the format 2 image id file exists, and if 4517 * so, get the image's persistent id from it. 4518 */ 4519 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name); 4520 object_name = kmalloc(size, GFP_NOIO); 4521 if (!object_name) 4522 return -ENOMEM; 4523 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); 4524 dout("rbd id object name is %s\n", object_name); 4525 4526 /* Response will be an encoded string, which includes a length */ 4527 4528 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX; 4529 response = kzalloc(size, GFP_NOIO); 4530 if (!response) { 4531 ret = -ENOMEM; 4532 goto out; 4533 } 4534 4535 /* If it doesn't exist we'll assume it's a format 1 image */ 4536 4537 ret = rbd_obj_method_sync(rbd_dev, object_name, 4538 "rbd", "get_id", NULL, 0, 4539 response, RBD_IMAGE_ID_LEN_MAX, NULL); 4540 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); 4541 if (ret == -ENOENT) { 4542 image_id = kstrdup("", GFP_KERNEL); 4543 ret = image_id ? 0 : -ENOMEM; 4544 if (!ret) 4545 rbd_dev->image_format = 1; 4546 } else if (ret > sizeof (__le32)) { 4547 void *p = response; 4548 4549 image_id = ceph_extract_encoded_string(&p, p + ret, 4550 NULL, GFP_NOIO); 4551 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0; 4552 if (!ret) 4553 rbd_dev->image_format = 2; 4554 } else { 4555 ret = -EINVAL; 4556 } 4557 4558 if (!ret) { 4559 rbd_dev->spec->image_id = image_id; 4560 dout("image_id is %s\n", image_id); 4561 } 4562out: 4563 kfree(response); 4564 kfree(object_name); 4565 4566 return ret; 4567} 4568 4569static int rbd_dev_v1_probe(struct rbd_device *rbd_dev) 4570{ 4571 int ret; 4572 size_t size; 4573 4574 /* Record the header object name for this rbd image. */ 4575 4576 size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX); 4577 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 4578 if (!rbd_dev->header_name) { 4579 ret = -ENOMEM; 4580 goto out_err; 4581 } 4582 sprintf(rbd_dev->header_name, "%s%s", 4583 rbd_dev->spec->image_name, RBD_SUFFIX); 4584 4585 /* Populate rbd image metadata */ 4586 4587 ret = rbd_read_header(rbd_dev, &rbd_dev->header); 4588 if (ret < 0) 4589 goto out_err; 4590 4591 /* Version 1 images have no parent (no layering) */ 4592 4593 rbd_dev->parent_spec = NULL; 4594 rbd_dev->parent_overlap = 0; 4595 4596 dout("discovered version 1 image, header name is %s\n", 4597 rbd_dev->header_name); 4598 4599 return 0; 4600 4601out_err: 4602 kfree(rbd_dev->header_name); 4603 rbd_dev->header_name = NULL; 4604 kfree(rbd_dev->spec->image_id); 4605 rbd_dev->spec->image_id = NULL; 4606 4607 return ret; 4608} 4609 4610static int rbd_dev_v2_probe(struct rbd_device *rbd_dev) 4611{ 4612 size_t size; 4613 int ret; 4614 u64 ver = 0; 4615 4616 /* 4617 * Image id was filled in by the caller. Record the header 4618 * object name for this rbd image. 4619 */ 4620 size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id); 4621 rbd_dev->header_name = kmalloc(size, GFP_KERNEL); 4622 if (!rbd_dev->header_name) 4623 return -ENOMEM; 4624 sprintf(rbd_dev->header_name, "%s%s", 4625 RBD_HEADER_PREFIX, rbd_dev->spec->image_id); 4626 4627 /* Get the size and object order for the image */ 4628 ret = rbd_dev_v2_image_size(rbd_dev); 4629 if (ret) 4630 goto out_err; 4631 4632 /* Get the object prefix (a.k.a. block_name) for the image */ 4633 4634 ret = rbd_dev_v2_object_prefix(rbd_dev); 4635 if (ret) 4636 goto out_err; 4637 4638 /* Get the and check features for the image */ 4639 4640 ret = rbd_dev_v2_features(rbd_dev); 4641 if (ret) 4642 goto out_err; 4643 4644 /* If the image supports layering, get the parent info */ 4645 4646 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) { 4647 ret = rbd_dev_v2_parent_info(rbd_dev); 4648 if (ret) 4649 goto out_err; 4650 rbd_warn(rbd_dev, "WARNING: kernel support for " 4651 "layered rbd images is EXPERIMENTAL!"); 4652 } 4653 4654 /* If the image supports fancy striping, get its parameters */ 4655 4656 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) { 4657 ret = rbd_dev_v2_striping_info(rbd_dev); 4658 if (ret < 0) 4659 goto out_err; 4660 } 4661 4662 /* crypto and compression type aren't (yet) supported for v2 images */ 4663 4664 rbd_dev->header.crypt_type = 0; 4665 rbd_dev->header.comp_type = 0; 4666 4667 /* Get the snapshot context, plus the header version */ 4668 4669 ret = rbd_dev_v2_snap_context(rbd_dev, &ver); 4670 if (ret) 4671 goto out_err; 4672 rbd_dev->header.obj_version = ver; 4673 4674 dout("discovered version 2 image, header name is %s\n", 4675 rbd_dev->header_name); 4676 4677 return 0; 4678out_err: 4679 rbd_dev->parent_overlap = 0; 4680 rbd_spec_put(rbd_dev->parent_spec); 4681 rbd_dev->parent_spec = NULL; 4682 kfree(rbd_dev->header_name); 4683 rbd_dev->header_name = NULL; 4684 kfree(rbd_dev->header.object_prefix); 4685 rbd_dev->header.object_prefix = NULL; 4686 4687 return ret; 4688} 4689 4690static int rbd_dev_probe_finish(struct rbd_device *rbd_dev) 4691{ 4692 struct rbd_device *parent = NULL; 4693 struct rbd_spec *parent_spec = NULL; 4694 struct rbd_client *rbdc = NULL; 4695 int ret; 4696 4697 /* no need to lock here, as rbd_dev is not registered yet */ 4698 ret = rbd_dev_snaps_update(rbd_dev); 4699 if (ret) 4700 return ret; 4701 4702 ret = rbd_dev_probe_update_spec(rbd_dev); 4703 if (ret) 4704 goto err_out_snaps; 4705 4706 ret = rbd_dev_set_mapping(rbd_dev); 4707 if (ret) 4708 goto err_out_snaps; 4709 4710 /* generate unique id: find highest unique id, add one */ 4711 rbd_dev_id_get(rbd_dev); 4712 4713 /* Fill in the device name, now that we have its id. */ 4714 BUILD_BUG_ON(DEV_NAME_LEN 4715 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH); 4716 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id); 4717 4718 /* Get our block major device number. */ 4719 4720 ret = register_blkdev(0, rbd_dev->name); 4721 if (ret < 0) 4722 goto err_out_id; 4723 rbd_dev->major = ret; 4724 4725 /* Set up the blkdev mapping. */ 4726 4727 ret = rbd_init_disk(rbd_dev); 4728 if (ret) 4729 goto err_out_blkdev; 4730 4731 ret = rbd_bus_add_dev(rbd_dev); 4732 if (ret) 4733 goto err_out_disk; 4734 4735 /* 4736 * At this point cleanup in the event of an error is the job 4737 * of the sysfs code (initiated by rbd_bus_del_dev()). 4738 */ 4739 /* Probe the parent if there is one */ 4740 4741 if (rbd_dev->parent_spec) { 4742 /* 4743 * We need to pass a reference to the client and the 4744 * parent spec when creating the parent rbd_dev. 4745 * Images related by parent/child relationships 4746 * always share both. 4747 */ 4748 parent_spec = rbd_spec_get(rbd_dev->parent_spec); 4749 rbdc = __rbd_get_client(rbd_dev->rbd_client); 4750 4751 parent = rbd_dev_create(rbdc, parent_spec); 4752 if (!parent) { 4753 ret = -ENOMEM; 4754 goto err_out_spec; 4755 } 4756 rbdc = NULL; /* parent now owns reference */ 4757 parent_spec = NULL; /* parent now owns reference */ 4758 ret = rbd_dev_probe(parent); 4759 if (ret < 0) 4760 goto err_out_parent; 4761 rbd_dev->parent = parent; 4762 } 4763 4764 ret = rbd_dev_header_watch_sync(rbd_dev, 1); 4765 if (ret) 4766 goto err_out_bus; 4767 4768 /* Everything's ready. Announce the disk to the world. */ 4769 4770 add_disk(rbd_dev->disk); 4771 4772 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name, 4773 (unsigned long long) rbd_dev->mapping.size); 4774 4775 return ret; 4776 4777err_out_parent: 4778 rbd_dev_destroy(parent); 4779err_out_spec: 4780 rbd_spec_put(parent_spec); 4781 rbd_put_client(rbdc); 4782err_out_bus: 4783 /* this will also clean up rest of rbd_dev stuff */ 4784 4785 rbd_bus_del_dev(rbd_dev); 4786 4787 return ret; 4788err_out_disk: 4789 rbd_free_disk(rbd_dev); 4790err_out_blkdev: 4791 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4792err_out_id: 4793 rbd_dev_id_put(rbd_dev); 4794err_out_snaps: 4795 rbd_remove_all_snaps(rbd_dev); 4796 4797 return ret; 4798} 4799 4800/* 4801 * Probe for the existence of the header object for the given rbd 4802 * device. For format 2 images this includes determining the image 4803 * id. 4804 */ 4805static int rbd_dev_probe(struct rbd_device *rbd_dev) 4806{ 4807 int ret; 4808 4809 /* 4810 * Get the id from the image id object. If it's not a 4811 * format 2 image, we'll get ENOENT back, and we'll assume 4812 * it's a format 1 image. 4813 */ 4814 ret = rbd_dev_image_id(rbd_dev); 4815 if (ret) 4816 return ret; 4817 rbd_assert(rbd_dev->spec->image_id); 4818 rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); 4819 4820 if (rbd_dev->image_format == 1) 4821 ret = rbd_dev_v1_probe(rbd_dev); 4822 else 4823 ret = rbd_dev_v2_probe(rbd_dev); 4824 if (ret) 4825 goto out_err; 4826 4827 ret = rbd_dev_probe_finish(rbd_dev); 4828 if (ret) 4829 rbd_header_free(&rbd_dev->header); 4830 4831 return ret; 4832out_err: 4833 kfree(rbd_dev->spec->image_id); 4834 rbd_dev->spec->image_id = NULL; 4835 4836 dout("probe failed, returning %d\n", ret); 4837 4838 return ret; 4839} 4840 4841static ssize_t rbd_add(struct bus_type *bus, 4842 const char *buf, 4843 size_t count) 4844{ 4845 struct rbd_device *rbd_dev = NULL; 4846 struct ceph_options *ceph_opts = NULL; 4847 struct rbd_options *rbd_opts = NULL; 4848 struct rbd_spec *spec = NULL; 4849 struct rbd_client *rbdc; 4850 struct ceph_osd_client *osdc; 4851 int rc = -ENOMEM; 4852 4853 if (!try_module_get(THIS_MODULE)) 4854 return -ENODEV; 4855 4856 /* parse add command */ 4857 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec); 4858 if (rc < 0) 4859 goto err_out_module; 4860 4861 rbdc = rbd_get_client(ceph_opts); 4862 if (IS_ERR(rbdc)) { 4863 rc = PTR_ERR(rbdc); 4864 goto err_out_args; 4865 } 4866 ceph_opts = NULL; /* rbd_dev client now owns this */ 4867 4868 /* pick the pool */ 4869 osdc = &rbdc->client->osdc; 4870 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name); 4871 if (rc < 0) 4872 goto err_out_client; 4873 spec->pool_id = (u64)rc; 4874 4875 /* The ceph file layout needs to fit pool id in 32 bits */ 4876 4877 if (spec->pool_id > (u64)U32_MAX) { 4878 rbd_warn(NULL, "pool id too large (%llu > %u)\n", 4879 (unsigned long long)spec->pool_id, U32_MAX); 4880 rc = -EIO; 4881 goto err_out_client; 4882 } 4883 4884 rbd_dev = rbd_dev_create(rbdc, spec); 4885 if (!rbd_dev) 4886 goto err_out_client; 4887 rbdc = NULL; /* rbd_dev now owns this */ 4888 spec = NULL; /* rbd_dev now owns this */ 4889 4890 rbd_dev->mapping.read_only = rbd_opts->read_only; 4891 kfree(rbd_opts); 4892 rbd_opts = NULL; /* done with this */ 4893 4894 rc = rbd_dev_probe(rbd_dev); 4895 if (rc < 0) 4896 goto err_out_rbd_dev; 4897 4898 return count; 4899err_out_rbd_dev: 4900 rbd_dev_destroy(rbd_dev); 4901err_out_client: 4902 rbd_put_client(rbdc); 4903err_out_args: 4904 if (ceph_opts) 4905 ceph_destroy_options(ceph_opts); 4906 kfree(rbd_opts); 4907 rbd_spec_put(spec); 4908err_out_module: 4909 module_put(THIS_MODULE); 4910 4911 dout("Error adding device %s\n", buf); 4912 4913 return (ssize_t)rc; 4914} 4915 4916static struct rbd_device *__rbd_get_dev(unsigned long dev_id) 4917{ 4918 struct list_head *tmp; 4919 struct rbd_device *rbd_dev; 4920 4921 spin_lock(&rbd_dev_list_lock); 4922 list_for_each(tmp, &rbd_dev_list) { 4923 rbd_dev = list_entry(tmp, struct rbd_device, node); 4924 if (rbd_dev->dev_id == dev_id) { 4925 spin_unlock(&rbd_dev_list_lock); 4926 return rbd_dev; 4927 } 4928 } 4929 spin_unlock(&rbd_dev_list_lock); 4930 return NULL; 4931} 4932 4933static void rbd_dev_release(struct device *dev) 4934{ 4935 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev); 4936 4937 if (rbd_dev->watch_event) 4938 rbd_dev_header_watch_sync(rbd_dev, 0); 4939 4940 /* clean up and free blkdev */ 4941 rbd_free_disk(rbd_dev); 4942 unregister_blkdev(rbd_dev->major, rbd_dev->name); 4943 4944 /* release allocated disk header fields */ 4945 rbd_header_free(&rbd_dev->header); 4946 4947 /* done with the id, and with the rbd_dev */ 4948 rbd_dev_id_put(rbd_dev); 4949 rbd_assert(rbd_dev->rbd_client != NULL); 4950 rbd_dev_destroy(rbd_dev); 4951 4952 /* release module ref */ 4953 module_put(THIS_MODULE); 4954} 4955 4956static void __rbd_remove(struct rbd_device *rbd_dev) 4957{ 4958 rbd_remove_all_snaps(rbd_dev); 4959 rbd_bus_del_dev(rbd_dev); 4960} 4961 4962static ssize_t rbd_remove(struct bus_type *bus, 4963 const char *buf, 4964 size_t count) 4965{ 4966 struct rbd_device *rbd_dev = NULL; 4967 int target_id, rc; 4968 unsigned long ul; 4969 int ret = count; 4970 4971 rc = strict_strtoul(buf, 10, &ul); 4972 if (rc) 4973 return rc; 4974 4975 /* convert to int; abort if we lost anything in the conversion */ 4976 target_id = (int) ul; 4977 if (target_id != ul) 4978 return -EINVAL; 4979 4980 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING); 4981 4982 rbd_dev = __rbd_get_dev(target_id); 4983 if (!rbd_dev) { 4984 ret = -ENOENT; 4985 goto done; 4986 } 4987 4988 spin_lock_irq(&rbd_dev->lock); 4989 if (rbd_dev->open_count) 4990 ret = -EBUSY; 4991 else 4992 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags); 4993 spin_unlock_irq(&rbd_dev->lock); 4994 if (ret < 0) 4995 goto done; 4996 4997 while (rbd_dev->parent_spec) { 4998 struct rbd_device *first = rbd_dev; 4999 struct rbd_device *second = first->parent; 5000 struct rbd_device *third; 5001 5002 /* 5003 * Follow to the parent with no grandparent and 5004 * remove it. 5005 */ 5006 while (second && (third = second->parent)) { 5007 first = second; 5008 second = third; 5009 } 5010 __rbd_remove(second); 5011 rbd_spec_put(first->parent_spec); 5012 first->parent_spec = NULL; 5013 first->parent_overlap = 0; 5014 first->parent = NULL; 5015 } 5016 __rbd_remove(rbd_dev); 5017 5018done: 5019 mutex_unlock(&ctl_mutex); 5020 5021 return ret; 5022} 5023 5024/* 5025 * create control files in sysfs 5026 * /sys/bus/rbd/... 5027 */ 5028static int rbd_sysfs_init(void) 5029{ 5030 int ret; 5031 5032 ret = device_register(&rbd_root_dev); 5033 if (ret < 0) 5034 return ret; 5035 5036 ret = bus_register(&rbd_bus_type); 5037 if (ret < 0) 5038 device_unregister(&rbd_root_dev); 5039 5040 return ret; 5041} 5042 5043static void rbd_sysfs_cleanup(void) 5044{ 5045 bus_unregister(&rbd_bus_type); 5046 device_unregister(&rbd_root_dev); 5047} 5048 5049static int __init rbd_init(void) 5050{ 5051 int rc; 5052 5053 if (!libceph_compatible(NULL)) { 5054 rbd_warn(NULL, "libceph incompatibility (quitting)"); 5055 5056 return -EINVAL; 5057 } 5058 rc = rbd_sysfs_init(); 5059 if (rc) 5060 return rc; 5061 pr_info("loaded " RBD_DRV_NAME_LONG "\n"); 5062 return 0; 5063} 5064 5065static void __exit rbd_exit(void) 5066{ 5067 rbd_sysfs_cleanup(); 5068} 5069 5070module_init(rbd_init); 5071module_exit(rbd_exit); 5072 5073MODULE_AUTHOR("Sage Weil <sage@newdream.net>"); 5074MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>"); 5075MODULE_DESCRIPTION("rados block device"); 5076 5077/* following authorship retained from original osdblk.c */ 5078MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>"); 5079 5080MODULE_LICENSE("GPL"); 5081