mon_client.c revision 735a72ef952d42a256f79ae3e6dc1c17a45c041b
1#include <linux/ceph/ceph_debug.h> 2 3#include <linux/module.h> 4#include <linux/types.h> 5#include <linux/slab.h> 6#include <linux/random.h> 7#include <linux/sched.h> 8 9#include <linux/ceph/mon_client.h> 10#include <linux/ceph/libceph.h> 11#include <linux/ceph/debugfs.h> 12#include <linux/ceph/decode.h> 13#include <linux/ceph/auth.h> 14 15/* 16 * Interact with Ceph monitor cluster. Handle requests for new map 17 * versions, and periodically resend as needed. Also implement 18 * statfs() and umount(). 19 * 20 * A small cluster of Ceph "monitors" are responsible for managing critical 21 * cluster configuration and state information. An odd number (e.g., 3, 5) 22 * of cmon daemons use a modified version of the Paxos part-time parliament 23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 24 * list of clients who have mounted the file system. 25 * 26 * We maintain an open, active session with a monitor at all times in order to 27 * receive timely MDSMap updates. We periodically send a keepalive byte on the 28 * TCP socket to ensure we detect a failure. If the connection does break, we 29 * randomly hunt for a new monitor. Once the connection is reestablished, we 30 * resend any outstanding requests. 31 */ 32 33static const struct ceph_connection_operations mon_con_ops; 34 35static int __validate_auth(struct ceph_mon_client *monc); 36 37/* 38 * Decode a monmap blob (e.g., during mount). 39 */ 40struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 41{ 42 struct ceph_monmap *m = NULL; 43 int i, err = -EINVAL; 44 struct ceph_fsid fsid; 45 u32 epoch, num_mon; 46 u16 version; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 54 ceph_decode_16_safe(&p, end, version, bad); 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 62 63 if (num_mon >= CEPH_MAX_MON) 64 goto bad; 65 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 66 if (m == NULL) 67 return ERR_PTR(-ENOMEM); 68 m->fsid = fsid; 69 m->epoch = epoch; 70 m->num_mon = num_mon; 71 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 72 for (i = 0; i < num_mon; i++) 73 ceph_decode_addr(&m->mon_inst[i].addr); 74 75 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 76 m->num_mon); 77 for (i = 0; i < m->num_mon; i++) 78 dout("monmap_decode mon%d is %s\n", i, 79 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 80 return m; 81 82bad: 83 dout("monmap_decode failed with %d\n", err); 84 kfree(m); 85 return ERR_PTR(err); 86} 87 88/* 89 * return true if *addr is included in the monmap. 90 */ 91int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 92{ 93 int i; 94 95 for (i = 0; i < m->num_mon; i++) 96 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 97 return 1; 98 return 0; 99} 100 101/* 102 * Send an auth request. 103 */ 104static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 105{ 106 monc->pending_auth = 1; 107 monc->m_auth->front.iov_len = len; 108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 109 ceph_msg_revoke(monc->m_auth); 110 ceph_msg_get(monc->m_auth); /* keep our ref */ 111 ceph_con_send(&monc->con, monc->m_auth); 112} 113 114/* 115 * Close monitor session, if any. 116 */ 117static void __close_session(struct ceph_mon_client *monc) 118{ 119 dout("__close_session closing mon%d\n", monc->cur_mon); 120 ceph_msg_revoke(monc->m_auth); 121 ceph_con_close(&monc->con); 122 monc->cur_mon = -1; 123 monc->pending_auth = 0; 124 ceph_auth_reset(monc->auth); 125} 126 127/* 128 * Open a session with a (new) monitor. 129 */ 130static int __open_session(struct ceph_mon_client *monc) 131{ 132 char r; 133 int ret; 134 135 if (monc->cur_mon < 0) { 136 get_random_bytes(&r, 1); 137 monc->cur_mon = r % monc->monmap->num_mon; 138 dout("open_session num=%d r=%d -> mon%d\n", 139 monc->monmap->num_mon, r, monc->cur_mon); 140 monc->sub_sent = 0; 141 monc->sub_renew_after = jiffies; /* i.e., expired */ 142 monc->want_next_osdmap = !!monc->want_next_osdmap; 143 144 dout("open_session mon%d opening\n", monc->cur_mon); 145 ceph_con_open(&monc->con, 146 CEPH_ENTITY_TYPE_MON, monc->cur_mon, 147 &monc->monmap->mon_inst[monc->cur_mon].addr); 148 149 /* initiatiate authentication handshake */ 150 ret = ceph_auth_build_hello(monc->auth, 151 monc->m_auth->front.iov_base, 152 monc->m_auth->front_max); 153 __send_prepared_auth_request(monc, ret); 154 } else { 155 dout("open_session mon%d already open\n", monc->cur_mon); 156 } 157 return 0; 158} 159 160static bool __sub_expired(struct ceph_mon_client *monc) 161{ 162 return time_after_eq(jiffies, monc->sub_renew_after); 163} 164 165/* 166 * Reschedule delayed work timer. 167 */ 168static void __schedule_delayed(struct ceph_mon_client *monc) 169{ 170 unsigned int delay; 171 172 if (monc->cur_mon < 0 || __sub_expired(monc)) 173 delay = 10 * HZ; 174 else 175 delay = 20 * HZ; 176 dout("__schedule_delayed after %u\n", delay); 177 schedule_delayed_work(&monc->delayed_work, delay); 178} 179 180/* 181 * Send subscribe request for mdsmap and/or osdmap. 182 */ 183static void __send_subscribe(struct ceph_mon_client *monc) 184{ 185 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 186 (unsigned int)monc->sub_sent, __sub_expired(monc), 187 monc->want_next_osdmap); 188 if ((__sub_expired(monc) && !monc->sub_sent) || 189 monc->want_next_osdmap == 1) { 190 struct ceph_msg *msg = monc->m_subscribe; 191 struct ceph_mon_subscribe_item *i; 192 void *p, *end; 193 int num; 194 195 p = msg->front.iov_base; 196 end = p + msg->front_max; 197 198 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 199 ceph_encode_32(&p, num); 200 201 if (monc->want_next_osdmap) { 202 dout("__send_subscribe to 'osdmap' %u\n", 203 (unsigned int)monc->have_osdmap); 204 ceph_encode_string(&p, end, "osdmap", 6); 205 i = p; 206 i->have = cpu_to_le64(monc->have_osdmap); 207 i->onetime = 1; 208 p += sizeof(*i); 209 monc->want_next_osdmap = 2; /* requested */ 210 } 211 if (monc->want_mdsmap) { 212 dout("__send_subscribe to 'mdsmap' %u+\n", 213 (unsigned int)monc->have_mdsmap); 214 ceph_encode_string(&p, end, "mdsmap", 6); 215 i = p; 216 i->have = cpu_to_le64(monc->have_mdsmap); 217 i->onetime = 0; 218 p += sizeof(*i); 219 } 220 ceph_encode_string(&p, end, "monmap", 6); 221 i = p; 222 i->have = 0; 223 i->onetime = 0; 224 p += sizeof(*i); 225 226 msg->front.iov_len = p - msg->front.iov_base; 227 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 228 ceph_msg_revoke(msg); 229 ceph_con_send(&monc->con, ceph_msg_get(msg)); 230 231 monc->sub_sent = jiffies | 1; /* never 0 */ 232 } 233} 234 235static void handle_subscribe_ack(struct ceph_mon_client *monc, 236 struct ceph_msg *msg) 237{ 238 unsigned int seconds; 239 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 240 241 if (msg->front.iov_len < sizeof(*h)) 242 goto bad; 243 seconds = le32_to_cpu(h->duration); 244 245 mutex_lock(&monc->mutex); 246 if (monc->hunting) { 247 pr_info("mon%d %s session established\n", 248 monc->cur_mon, 249 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 250 monc->hunting = false; 251 } 252 dout("handle_subscribe_ack after %d seconds\n", seconds); 253 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; 254 monc->sub_sent = 0; 255 mutex_unlock(&monc->mutex); 256 return; 257bad: 258 pr_err("got corrupt subscribe-ack msg\n"); 259 ceph_msg_dump(msg); 260} 261 262/* 263 * Keep track of which maps we have 264 */ 265int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 266{ 267 mutex_lock(&monc->mutex); 268 monc->have_mdsmap = got; 269 mutex_unlock(&monc->mutex); 270 return 0; 271} 272EXPORT_SYMBOL(ceph_monc_got_mdsmap); 273 274int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 275{ 276 mutex_lock(&monc->mutex); 277 monc->have_osdmap = got; 278 monc->want_next_osdmap = 0; 279 mutex_unlock(&monc->mutex); 280 return 0; 281} 282 283/* 284 * Register interest in the next osdmap 285 */ 286void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 287{ 288 dout("request_next_osdmap have %u\n", monc->have_osdmap); 289 mutex_lock(&monc->mutex); 290 if (!monc->want_next_osdmap) 291 monc->want_next_osdmap = 1; 292 if (monc->want_next_osdmap < 2) 293 __send_subscribe(monc); 294 mutex_unlock(&monc->mutex); 295} 296 297/* 298 * 299 */ 300int ceph_monc_open_session(struct ceph_mon_client *monc) 301{ 302 mutex_lock(&monc->mutex); 303 __open_session(monc); 304 __schedule_delayed(monc); 305 mutex_unlock(&monc->mutex); 306 return 0; 307} 308EXPORT_SYMBOL(ceph_monc_open_session); 309 310/* 311 * The monitor responds with mount ack indicate mount success. The 312 * included client ticket allows the client to talk to MDSs and OSDs. 313 */ 314static void ceph_monc_handle_map(struct ceph_mon_client *monc, 315 struct ceph_msg *msg) 316{ 317 struct ceph_client *client = monc->client; 318 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 319 void *p, *end; 320 321 mutex_lock(&monc->mutex); 322 323 dout("handle_monmap\n"); 324 p = msg->front.iov_base; 325 end = p + msg->front.iov_len; 326 327 monmap = ceph_monmap_decode(p, end); 328 if (IS_ERR(monmap)) { 329 pr_err("problem decoding monmap, %d\n", 330 (int)PTR_ERR(monmap)); 331 goto out; 332 } 333 334 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 335 kfree(monmap); 336 goto out; 337 } 338 339 client->monc.monmap = monmap; 340 kfree(old); 341 342 if (!client->have_fsid) { 343 client->have_fsid = true; 344 mutex_unlock(&monc->mutex); 345 /* 346 * do debugfs initialization without mutex to avoid 347 * creating a locking dependency 348 */ 349 ceph_debugfs_client_init(client); 350 goto out_unlocked; 351 } 352out: 353 mutex_unlock(&monc->mutex); 354out_unlocked: 355 wake_up_all(&client->auth_wq); 356} 357 358/* 359 * generic requests (e.g., statfs, poolop) 360 */ 361static struct ceph_mon_generic_request *__lookup_generic_req( 362 struct ceph_mon_client *monc, u64 tid) 363{ 364 struct ceph_mon_generic_request *req; 365 struct rb_node *n = monc->generic_request_tree.rb_node; 366 367 while (n) { 368 req = rb_entry(n, struct ceph_mon_generic_request, node); 369 if (tid < req->tid) 370 n = n->rb_left; 371 else if (tid > req->tid) 372 n = n->rb_right; 373 else 374 return req; 375 } 376 return NULL; 377} 378 379static void __insert_generic_request(struct ceph_mon_client *monc, 380 struct ceph_mon_generic_request *new) 381{ 382 struct rb_node **p = &monc->generic_request_tree.rb_node; 383 struct rb_node *parent = NULL; 384 struct ceph_mon_generic_request *req = NULL; 385 386 while (*p) { 387 parent = *p; 388 req = rb_entry(parent, struct ceph_mon_generic_request, node); 389 if (new->tid < req->tid) 390 p = &(*p)->rb_left; 391 else if (new->tid > req->tid) 392 p = &(*p)->rb_right; 393 else 394 BUG(); 395 } 396 397 rb_link_node(&new->node, parent, p); 398 rb_insert_color(&new->node, &monc->generic_request_tree); 399} 400 401static void release_generic_request(struct kref *kref) 402{ 403 struct ceph_mon_generic_request *req = 404 container_of(kref, struct ceph_mon_generic_request, kref); 405 406 if (req->reply) 407 ceph_msg_put(req->reply); 408 if (req->request) 409 ceph_msg_put(req->request); 410 411 kfree(req); 412} 413 414static void put_generic_request(struct ceph_mon_generic_request *req) 415{ 416 kref_put(&req->kref, release_generic_request); 417} 418 419static void get_generic_request(struct ceph_mon_generic_request *req) 420{ 421 kref_get(&req->kref); 422} 423 424static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 425 struct ceph_msg_header *hdr, 426 int *skip) 427{ 428 struct ceph_mon_client *monc = con->private; 429 struct ceph_mon_generic_request *req; 430 u64 tid = le64_to_cpu(hdr->tid); 431 struct ceph_msg *m; 432 433 mutex_lock(&monc->mutex); 434 req = __lookup_generic_req(monc, tid); 435 if (!req) { 436 dout("get_generic_reply %lld dne\n", tid); 437 *skip = 1; 438 m = NULL; 439 } else { 440 dout("get_generic_reply %lld got %p\n", tid, req->reply); 441 *skip = 0; 442 m = ceph_msg_get(req->reply); 443 /* 444 * we don't need to track the connection reading into 445 * this reply because we only have one open connection 446 * at a time, ever. 447 */ 448 } 449 mutex_unlock(&monc->mutex); 450 return m; 451} 452 453static int do_generic_request(struct ceph_mon_client *monc, 454 struct ceph_mon_generic_request *req) 455{ 456 int err; 457 458 /* register request */ 459 mutex_lock(&monc->mutex); 460 req->tid = ++monc->last_tid; 461 req->request->hdr.tid = cpu_to_le64(req->tid); 462 __insert_generic_request(monc, req); 463 monc->num_generic_requests++; 464 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 465 mutex_unlock(&monc->mutex); 466 467 err = wait_for_completion_interruptible(&req->completion); 468 469 mutex_lock(&monc->mutex); 470 rb_erase(&req->node, &monc->generic_request_tree); 471 monc->num_generic_requests--; 472 mutex_unlock(&monc->mutex); 473 474 if (!err) 475 err = req->result; 476 return err; 477} 478 479/* 480 * statfs 481 */ 482static void handle_statfs_reply(struct ceph_mon_client *monc, 483 struct ceph_msg *msg) 484{ 485 struct ceph_mon_generic_request *req; 486 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 487 u64 tid = le64_to_cpu(msg->hdr.tid); 488 489 if (msg->front.iov_len != sizeof(*reply)) 490 goto bad; 491 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 492 493 mutex_lock(&monc->mutex); 494 req = __lookup_generic_req(monc, tid); 495 if (req) { 496 *(struct ceph_statfs *)req->buf = reply->st; 497 req->result = 0; 498 get_generic_request(req); 499 } 500 mutex_unlock(&monc->mutex); 501 if (req) { 502 complete_all(&req->completion); 503 put_generic_request(req); 504 } 505 return; 506 507bad: 508 pr_err("corrupt generic reply, tid %llu\n", tid); 509 ceph_msg_dump(msg); 510} 511 512/* 513 * Do a synchronous statfs(). 514 */ 515int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 516{ 517 struct ceph_mon_generic_request *req; 518 struct ceph_mon_statfs *h; 519 int err; 520 521 req = kzalloc(sizeof(*req), GFP_NOFS); 522 if (!req) 523 return -ENOMEM; 524 525 kref_init(&req->kref); 526 req->buf = buf; 527 req->buf_len = sizeof(*buf); 528 init_completion(&req->completion); 529 530 err = -ENOMEM; 531 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 532 true); 533 if (!req->request) 534 goto out; 535 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, 536 true); 537 if (!req->reply) 538 goto out; 539 540 /* fill out request */ 541 h = req->request->front.iov_base; 542 h->monhdr.have_version = 0; 543 h->monhdr.session_mon = cpu_to_le16(-1); 544 h->monhdr.session_mon_tid = 0; 545 h->fsid = monc->monmap->fsid; 546 547 err = do_generic_request(monc, req); 548 549out: 550 kref_put(&req->kref, release_generic_request); 551 return err; 552} 553EXPORT_SYMBOL(ceph_monc_do_statfs); 554 555/* 556 * pool ops 557 */ 558static int get_poolop_reply_buf(const char *src, size_t src_len, 559 char *dst, size_t dst_len) 560{ 561 u32 buf_len; 562 563 if (src_len != sizeof(u32) + dst_len) 564 return -EINVAL; 565 566 buf_len = le32_to_cpu(*(u32 *)src); 567 if (buf_len != dst_len) 568 return -EINVAL; 569 570 memcpy(dst, src + sizeof(u32), dst_len); 571 return 0; 572} 573 574static void handle_poolop_reply(struct ceph_mon_client *monc, 575 struct ceph_msg *msg) 576{ 577 struct ceph_mon_generic_request *req; 578 struct ceph_mon_poolop_reply *reply = msg->front.iov_base; 579 u64 tid = le64_to_cpu(msg->hdr.tid); 580 581 if (msg->front.iov_len < sizeof(*reply)) 582 goto bad; 583 dout("handle_poolop_reply %p tid %llu\n", msg, tid); 584 585 mutex_lock(&monc->mutex); 586 req = __lookup_generic_req(monc, tid); 587 if (req) { 588 if (req->buf_len && 589 get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply), 590 msg->front.iov_len - sizeof(*reply), 591 req->buf, req->buf_len) < 0) { 592 mutex_unlock(&monc->mutex); 593 goto bad; 594 } 595 req->result = le32_to_cpu(reply->reply_code); 596 get_generic_request(req); 597 } 598 mutex_unlock(&monc->mutex); 599 if (req) { 600 complete(&req->completion); 601 put_generic_request(req); 602 } 603 return; 604 605bad: 606 pr_err("corrupt generic reply, tid %llu\n", tid); 607 ceph_msg_dump(msg); 608} 609 610/* 611 * Do a synchronous pool op. 612 */ 613int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, 614 u32 pool, u64 snapid, 615 char *buf, int len) 616{ 617 struct ceph_mon_generic_request *req; 618 struct ceph_mon_poolop *h; 619 int err; 620 621 req = kzalloc(sizeof(*req), GFP_NOFS); 622 if (!req) 623 return -ENOMEM; 624 625 kref_init(&req->kref); 626 req->buf = buf; 627 req->buf_len = len; 628 init_completion(&req->completion); 629 630 err = -ENOMEM; 631 req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS, 632 true); 633 if (!req->request) 634 goto out; 635 req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS, 636 true); 637 if (!req->reply) 638 goto out; 639 640 /* fill out request */ 641 req->request->hdr.version = cpu_to_le16(2); 642 h = req->request->front.iov_base; 643 h->monhdr.have_version = 0; 644 h->monhdr.session_mon = cpu_to_le16(-1); 645 h->monhdr.session_mon_tid = 0; 646 h->fsid = monc->monmap->fsid; 647 h->pool = cpu_to_le32(pool); 648 h->op = cpu_to_le32(op); 649 h->auid = 0; 650 h->snapid = cpu_to_le64(snapid); 651 h->name_len = 0; 652 653 err = do_generic_request(monc, req); 654 655out: 656 kref_put(&req->kref, release_generic_request); 657 return err; 658} 659 660int ceph_monc_create_snapid(struct ceph_mon_client *monc, 661 u32 pool, u64 *snapid) 662{ 663 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 664 pool, 0, (char *)snapid, sizeof(*snapid)); 665 666} 667EXPORT_SYMBOL(ceph_monc_create_snapid); 668 669int ceph_monc_delete_snapid(struct ceph_mon_client *monc, 670 u32 pool, u64 snapid) 671{ 672 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 673 pool, snapid, 0, 0); 674 675} 676 677/* 678 * Resend pending generic requests. 679 */ 680static void __resend_generic_request(struct ceph_mon_client *monc) 681{ 682 struct ceph_mon_generic_request *req; 683 struct rb_node *p; 684 685 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 686 req = rb_entry(p, struct ceph_mon_generic_request, node); 687 ceph_msg_revoke(req->request); 688 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 689 } 690} 691 692/* 693 * Delayed work. If we haven't mounted yet, retry. Otherwise, 694 * renew/retry subscription as needed (in case it is timing out, or we 695 * got an ENOMEM). And keep the monitor connection alive. 696 */ 697static void delayed_work(struct work_struct *work) 698{ 699 struct ceph_mon_client *monc = 700 container_of(work, struct ceph_mon_client, delayed_work.work); 701 702 dout("monc delayed_work\n"); 703 mutex_lock(&monc->mutex); 704 if (monc->hunting) { 705 __close_session(monc); 706 __open_session(monc); /* continue hunting */ 707 } else { 708 ceph_con_keepalive(&monc->con); 709 710 __validate_auth(monc); 711 712 if (monc->auth->ops->is_authenticated(monc->auth)) 713 __send_subscribe(monc); 714 } 715 __schedule_delayed(monc); 716 mutex_unlock(&monc->mutex); 717} 718 719/* 720 * On startup, we build a temporary monmap populated with the IPs 721 * provided by mount(2). 722 */ 723static int build_initial_monmap(struct ceph_mon_client *monc) 724{ 725 struct ceph_options *opt = monc->client->options; 726 struct ceph_entity_addr *mon_addr = opt->mon_addr; 727 int num_mon = opt->num_mon; 728 int i; 729 730 /* build initial monmap */ 731 monc->monmap = kzalloc(sizeof(*monc->monmap) + 732 num_mon*sizeof(monc->monmap->mon_inst[0]), 733 GFP_KERNEL); 734 if (!monc->monmap) 735 return -ENOMEM; 736 for (i = 0; i < num_mon; i++) { 737 monc->monmap->mon_inst[i].addr = mon_addr[i]; 738 monc->monmap->mon_inst[i].addr.nonce = 0; 739 monc->monmap->mon_inst[i].name.type = 740 CEPH_ENTITY_TYPE_MON; 741 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 742 } 743 monc->monmap->num_mon = num_mon; 744 monc->have_fsid = false; 745 return 0; 746} 747 748int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 749{ 750 int err = 0; 751 752 dout("init\n"); 753 memset(monc, 0, sizeof(*monc)); 754 monc->client = cl; 755 monc->monmap = NULL; 756 mutex_init(&monc->mutex); 757 758 err = build_initial_monmap(monc); 759 if (err) 760 goto out; 761 762 /* connection */ 763 /* authentication */ 764 monc->auth = ceph_auth_init(cl->options->name, 765 cl->options->key); 766 if (IS_ERR(monc->auth)) { 767 err = PTR_ERR(monc->auth); 768 goto out_monmap; 769 } 770 monc->auth->want_keys = 771 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 772 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 773 774 /* msgs */ 775 err = -ENOMEM; 776 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 777 sizeof(struct ceph_mon_subscribe_ack), 778 GFP_NOFS, true); 779 if (!monc->m_subscribe_ack) 780 goto out_auth; 781 782 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, 783 true); 784 if (!monc->m_subscribe) 785 goto out_subscribe_ack; 786 787 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, 788 true); 789 if (!monc->m_auth_reply) 790 goto out_subscribe; 791 792 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true); 793 monc->pending_auth = 0; 794 if (!monc->m_auth) 795 goto out_auth_reply; 796 797 ceph_con_init(&monc->con, monc, &mon_con_ops, 798 &monc->client->msgr); 799 800 monc->cur_mon = -1; 801 monc->hunting = true; 802 monc->sub_renew_after = jiffies; 803 monc->sub_sent = 0; 804 805 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 806 monc->generic_request_tree = RB_ROOT; 807 monc->num_generic_requests = 0; 808 monc->last_tid = 0; 809 810 monc->have_mdsmap = 0; 811 monc->have_osdmap = 0; 812 monc->want_next_osdmap = 1; 813 return 0; 814 815out_auth_reply: 816 ceph_msg_put(monc->m_auth_reply); 817out_subscribe: 818 ceph_msg_put(monc->m_subscribe); 819out_subscribe_ack: 820 ceph_msg_put(monc->m_subscribe_ack); 821out_auth: 822 ceph_auth_destroy(monc->auth); 823out_monmap: 824 kfree(monc->monmap); 825out: 826 return err; 827} 828EXPORT_SYMBOL(ceph_monc_init); 829 830void ceph_monc_stop(struct ceph_mon_client *monc) 831{ 832 dout("stop\n"); 833 cancel_delayed_work_sync(&monc->delayed_work); 834 835 mutex_lock(&monc->mutex); 836 __close_session(monc); 837 838 mutex_unlock(&monc->mutex); 839 840 /* 841 * flush msgr queue before we destroy ourselves to ensure that: 842 * - any work that references our embedded con is finished. 843 * - any osd_client or other work that may reference an authorizer 844 * finishes before we shut down the auth subsystem. 845 */ 846 ceph_msgr_flush(); 847 848 ceph_auth_destroy(monc->auth); 849 850 ceph_msg_put(monc->m_auth); 851 ceph_msg_put(monc->m_auth_reply); 852 ceph_msg_put(monc->m_subscribe); 853 ceph_msg_put(monc->m_subscribe_ack); 854 855 kfree(monc->monmap); 856} 857EXPORT_SYMBOL(ceph_monc_stop); 858 859static void handle_auth_reply(struct ceph_mon_client *monc, 860 struct ceph_msg *msg) 861{ 862 int ret; 863 int was_auth = 0; 864 865 mutex_lock(&monc->mutex); 866 if (monc->auth->ops) 867 was_auth = monc->auth->ops->is_authenticated(monc->auth); 868 monc->pending_auth = 0; 869 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 870 msg->front.iov_len, 871 monc->m_auth->front.iov_base, 872 monc->m_auth->front_max); 873 if (ret < 0) { 874 monc->client->auth_err = ret; 875 wake_up_all(&monc->client->auth_wq); 876 } else if (ret > 0) { 877 __send_prepared_auth_request(monc, ret); 878 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { 879 dout("authenticated, starting session\n"); 880 881 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 882 monc->client->msgr.inst.name.num = 883 cpu_to_le64(monc->auth->global_id); 884 885 __send_subscribe(monc); 886 __resend_generic_request(monc); 887 } 888 mutex_unlock(&monc->mutex); 889} 890 891static int __validate_auth(struct ceph_mon_client *monc) 892{ 893 int ret; 894 895 if (monc->pending_auth) 896 return 0; 897 898 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 899 monc->m_auth->front_max); 900 if (ret <= 0) 901 return ret; /* either an error, or no need to authenticate */ 902 __send_prepared_auth_request(monc, ret); 903 return 0; 904} 905 906int ceph_monc_validate_auth(struct ceph_mon_client *monc) 907{ 908 int ret; 909 910 mutex_lock(&monc->mutex); 911 ret = __validate_auth(monc); 912 mutex_unlock(&monc->mutex); 913 return ret; 914} 915EXPORT_SYMBOL(ceph_monc_validate_auth); 916 917/* 918 * handle incoming message 919 */ 920static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 921{ 922 struct ceph_mon_client *monc = con->private; 923 int type = le16_to_cpu(msg->hdr.type); 924 925 if (!monc) 926 return; 927 928 switch (type) { 929 case CEPH_MSG_AUTH_REPLY: 930 handle_auth_reply(monc, msg); 931 break; 932 933 case CEPH_MSG_MON_SUBSCRIBE_ACK: 934 handle_subscribe_ack(monc, msg); 935 break; 936 937 case CEPH_MSG_STATFS_REPLY: 938 handle_statfs_reply(monc, msg); 939 break; 940 941 case CEPH_MSG_POOLOP_REPLY: 942 handle_poolop_reply(monc, msg); 943 break; 944 945 case CEPH_MSG_MON_MAP: 946 ceph_monc_handle_map(monc, msg); 947 break; 948 949 case CEPH_MSG_OSD_MAP: 950 ceph_osdc_handle_map(&monc->client->osdc, msg); 951 break; 952 953 default: 954 /* can the chained handler handle it? */ 955 if (monc->client->extra_mon_dispatch && 956 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 957 break; 958 959 pr_err("received unknown message type %d %s\n", type, 960 ceph_msg_type_name(type)); 961 } 962 ceph_msg_put(msg); 963} 964 965/* 966 * Allocate memory for incoming message 967 */ 968static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 969 struct ceph_msg_header *hdr, 970 int *skip) 971{ 972 struct ceph_mon_client *monc = con->private; 973 int type = le16_to_cpu(hdr->type); 974 int front_len = le32_to_cpu(hdr->front_len); 975 struct ceph_msg *m = NULL; 976 977 *skip = 0; 978 979 switch (type) { 980 case CEPH_MSG_MON_SUBSCRIBE_ACK: 981 m = ceph_msg_get(monc->m_subscribe_ack); 982 break; 983 case CEPH_MSG_POOLOP_REPLY: 984 case CEPH_MSG_STATFS_REPLY: 985 return get_generic_reply(con, hdr, skip); 986 case CEPH_MSG_AUTH_REPLY: 987 m = ceph_msg_get(monc->m_auth_reply); 988 break; 989 case CEPH_MSG_MON_MAP: 990 case CEPH_MSG_MDS_MAP: 991 case CEPH_MSG_OSD_MAP: 992 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 993 if (!m) 994 return NULL; /* ENOMEM--return skip == 0 */ 995 break; 996 } 997 998 if (!m) { 999 pr_info("alloc_msg unknown type %d\n", type); 1000 *skip = 1; 1001 } 1002 return m; 1003} 1004 1005/* 1006 * If the monitor connection resets, pick a new monitor and resubmit 1007 * any pending requests. 1008 */ 1009static void mon_fault(struct ceph_connection *con) 1010{ 1011 struct ceph_mon_client *monc = con->private; 1012 1013 if (!monc) 1014 return; 1015 1016 dout("mon_fault\n"); 1017 mutex_lock(&monc->mutex); 1018 if (!con->private) 1019 goto out; 1020 1021 if (!monc->hunting) 1022 pr_info("mon%d %s session lost, " 1023 "hunting for new mon\n", monc->cur_mon, 1024 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1025 1026 __close_session(monc); 1027 if (!monc->hunting) { 1028 /* start hunting */ 1029 monc->hunting = true; 1030 __open_session(monc); 1031 } else { 1032 /* already hunting, let's wait a bit */ 1033 __schedule_delayed(monc); 1034 } 1035out: 1036 mutex_unlock(&monc->mutex); 1037} 1038 1039/* 1040 * We can ignore refcounting on the connection struct, as all references 1041 * will come from the messenger workqueue, which is drained prior to 1042 * mon_client destruction. 1043 */ 1044static struct ceph_connection *con_get(struct ceph_connection *con) 1045{ 1046 return con; 1047} 1048 1049static void con_put(struct ceph_connection *con) 1050{ 1051} 1052 1053static const struct ceph_connection_operations mon_con_ops = { 1054 .get = con_get, 1055 .put = con_put, 1056 .dispatch = dispatch, 1057 .fault = mon_fault, 1058 .alloc_msg = mon_alloc_msg, 1059}; 1060