mon_client.c revision 6740a845b2543cc46e1902ba21bac743fbadd0dc
1#include <linux/ceph/ceph_debug.h> 2 3#include <linux/module.h> 4#include <linux/types.h> 5#include <linux/slab.h> 6#include <linux/random.h> 7#include <linux/sched.h> 8 9#include <linux/ceph/mon_client.h> 10#include <linux/ceph/libceph.h> 11#include <linux/ceph/debugfs.h> 12#include <linux/ceph/decode.h> 13#include <linux/ceph/auth.h> 14 15/* 16 * Interact with Ceph monitor cluster. Handle requests for new map 17 * versions, and periodically resend as needed. Also implement 18 * statfs() and umount(). 19 * 20 * A small cluster of Ceph "monitors" are responsible for managing critical 21 * cluster configuration and state information. An odd number (e.g., 3, 5) 22 * of cmon daemons use a modified version of the Paxos part-time parliament 23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 24 * list of clients who have mounted the file system. 25 * 26 * We maintain an open, active session with a monitor at all times in order to 27 * receive timely MDSMap updates. We periodically send a keepalive byte on the 28 * TCP socket to ensure we detect a failure. If the connection does break, we 29 * randomly hunt for a new monitor. Once the connection is reestablished, we 30 * resend any outstanding requests. 31 */ 32 33static const struct ceph_connection_operations mon_con_ops; 34 35static int __validate_auth(struct ceph_mon_client *monc); 36 37/* 38 * Decode a monmap blob (e.g., during mount). 39 */ 40struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 41{ 42 struct ceph_monmap *m = NULL; 43 int i, err = -EINVAL; 44 struct ceph_fsid fsid; 45 u32 epoch, num_mon; 46 u16 version; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 54 ceph_decode_16_safe(&p, end, version, bad); 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 62 63 if (num_mon >= CEPH_MAX_MON) 64 goto bad; 65 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 66 if (m == NULL) 67 return ERR_PTR(-ENOMEM); 68 m->fsid = fsid; 69 m->epoch = epoch; 70 m->num_mon = num_mon; 71 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 72 for (i = 0; i < num_mon; i++) 73 ceph_decode_addr(&m->mon_inst[i].addr); 74 75 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 76 m->num_mon); 77 for (i = 0; i < m->num_mon; i++) 78 dout("monmap_decode mon%d is %s\n", i, 79 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 80 return m; 81 82bad: 83 dout("monmap_decode failed with %d\n", err); 84 kfree(m); 85 return ERR_PTR(err); 86} 87 88/* 89 * return true if *addr is included in the monmap. 90 */ 91int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 92{ 93 int i; 94 95 for (i = 0; i < m->num_mon; i++) 96 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 97 return 1; 98 return 0; 99} 100 101/* 102 * Send an auth request. 103 */ 104static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 105{ 106 monc->pending_auth = 1; 107 monc->m_auth->front.iov_len = len; 108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 109 ceph_msg_revoke(monc->m_auth); 110 ceph_msg_get(monc->m_auth); /* keep our ref */ 111 ceph_con_send(&monc->con, monc->m_auth); 112} 113 114/* 115 * Close monitor session, if any. 116 */ 117static void __close_session(struct ceph_mon_client *monc) 118{ 119 dout("__close_session closing mon%d\n", monc->cur_mon); 120 ceph_msg_revoke(monc->m_auth); 121 ceph_con_close(&monc->con); 122 monc->con.private = NULL; 123 monc->cur_mon = -1; 124 monc->pending_auth = 0; 125 ceph_auth_reset(monc->auth); 126} 127 128/* 129 * Open a session with a (new) monitor. 130 */ 131static int __open_session(struct ceph_mon_client *monc) 132{ 133 char r; 134 int ret; 135 136 if (monc->cur_mon < 0) { 137 get_random_bytes(&r, 1); 138 monc->cur_mon = r % monc->monmap->num_mon; 139 dout("open_session num=%d r=%d -> mon%d\n", 140 monc->monmap->num_mon, r, monc->cur_mon); 141 monc->sub_sent = 0; 142 monc->sub_renew_after = jiffies; /* i.e., expired */ 143 monc->want_next_osdmap = !!monc->want_next_osdmap; 144 145 ceph_con_init(&monc->con, monc, &mon_con_ops, 146 &monc->client->msgr, 147 CEPH_ENTITY_TYPE_MON, monc->cur_mon); 148 149 dout("open_session mon%d opening\n", monc->cur_mon); 150 ceph_con_open(&monc->con, 151 &monc->monmap->mon_inst[monc->cur_mon].addr); 152 153 /* initiatiate authentication handshake */ 154 ret = ceph_auth_build_hello(monc->auth, 155 monc->m_auth->front.iov_base, 156 monc->m_auth->front_max); 157 __send_prepared_auth_request(monc, ret); 158 } else { 159 dout("open_session mon%d already open\n", monc->cur_mon); 160 } 161 return 0; 162} 163 164static bool __sub_expired(struct ceph_mon_client *monc) 165{ 166 return time_after_eq(jiffies, monc->sub_renew_after); 167} 168 169/* 170 * Reschedule delayed work timer. 171 */ 172static void __schedule_delayed(struct ceph_mon_client *monc) 173{ 174 unsigned delay; 175 176 if (monc->cur_mon < 0 || __sub_expired(monc)) 177 delay = 10 * HZ; 178 else 179 delay = 20 * HZ; 180 dout("__schedule_delayed after %u\n", delay); 181 schedule_delayed_work(&monc->delayed_work, delay); 182} 183 184/* 185 * Send subscribe request for mdsmap and/or osdmap. 186 */ 187static void __send_subscribe(struct ceph_mon_client *monc) 188{ 189 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 190 (unsigned)monc->sub_sent, __sub_expired(monc), 191 monc->want_next_osdmap); 192 if ((__sub_expired(monc) && !monc->sub_sent) || 193 monc->want_next_osdmap == 1) { 194 struct ceph_msg *msg = monc->m_subscribe; 195 struct ceph_mon_subscribe_item *i; 196 void *p, *end; 197 int num; 198 199 p = msg->front.iov_base; 200 end = p + msg->front_max; 201 202 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 203 ceph_encode_32(&p, num); 204 205 if (monc->want_next_osdmap) { 206 dout("__send_subscribe to 'osdmap' %u\n", 207 (unsigned)monc->have_osdmap); 208 ceph_encode_string(&p, end, "osdmap", 6); 209 i = p; 210 i->have = cpu_to_le64(monc->have_osdmap); 211 i->onetime = 1; 212 p += sizeof(*i); 213 monc->want_next_osdmap = 2; /* requested */ 214 } 215 if (monc->want_mdsmap) { 216 dout("__send_subscribe to 'mdsmap' %u+\n", 217 (unsigned)monc->have_mdsmap); 218 ceph_encode_string(&p, end, "mdsmap", 6); 219 i = p; 220 i->have = cpu_to_le64(monc->have_mdsmap); 221 i->onetime = 0; 222 p += sizeof(*i); 223 } 224 ceph_encode_string(&p, end, "monmap", 6); 225 i = p; 226 i->have = 0; 227 i->onetime = 0; 228 p += sizeof(*i); 229 230 msg->front.iov_len = p - msg->front.iov_base; 231 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 232 ceph_msg_revoke(msg); 233 ceph_con_send(&monc->con, ceph_msg_get(msg)); 234 235 monc->sub_sent = jiffies | 1; /* never 0 */ 236 } 237} 238 239static void handle_subscribe_ack(struct ceph_mon_client *monc, 240 struct ceph_msg *msg) 241{ 242 unsigned seconds; 243 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 244 245 if (msg->front.iov_len < sizeof(*h)) 246 goto bad; 247 seconds = le32_to_cpu(h->duration); 248 249 mutex_lock(&monc->mutex); 250 if (monc->hunting) { 251 pr_info("mon%d %s session established\n", 252 monc->cur_mon, 253 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 254 monc->hunting = false; 255 } 256 dout("handle_subscribe_ack after %d seconds\n", seconds); 257 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; 258 monc->sub_sent = 0; 259 mutex_unlock(&monc->mutex); 260 return; 261bad: 262 pr_err("got corrupt subscribe-ack msg\n"); 263 ceph_msg_dump(msg); 264} 265 266/* 267 * Keep track of which maps we have 268 */ 269int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 270{ 271 mutex_lock(&monc->mutex); 272 monc->have_mdsmap = got; 273 mutex_unlock(&monc->mutex); 274 return 0; 275} 276EXPORT_SYMBOL(ceph_monc_got_mdsmap); 277 278int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 279{ 280 mutex_lock(&monc->mutex); 281 monc->have_osdmap = got; 282 monc->want_next_osdmap = 0; 283 mutex_unlock(&monc->mutex); 284 return 0; 285} 286 287/* 288 * Register interest in the next osdmap 289 */ 290void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 291{ 292 dout("request_next_osdmap have %u\n", monc->have_osdmap); 293 mutex_lock(&monc->mutex); 294 if (!monc->want_next_osdmap) 295 monc->want_next_osdmap = 1; 296 if (monc->want_next_osdmap < 2) 297 __send_subscribe(monc); 298 mutex_unlock(&monc->mutex); 299} 300 301/* 302 * 303 */ 304int ceph_monc_open_session(struct ceph_mon_client *monc) 305{ 306 mutex_lock(&monc->mutex); 307 __open_session(monc); 308 __schedule_delayed(monc); 309 mutex_unlock(&monc->mutex); 310 return 0; 311} 312EXPORT_SYMBOL(ceph_monc_open_session); 313 314/* 315 * The monitor responds with mount ack indicate mount success. The 316 * included client ticket allows the client to talk to MDSs and OSDs. 317 */ 318static void ceph_monc_handle_map(struct ceph_mon_client *monc, 319 struct ceph_msg *msg) 320{ 321 struct ceph_client *client = monc->client; 322 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 323 void *p, *end; 324 325 mutex_lock(&monc->mutex); 326 327 dout("handle_monmap\n"); 328 p = msg->front.iov_base; 329 end = p + msg->front.iov_len; 330 331 monmap = ceph_monmap_decode(p, end); 332 if (IS_ERR(monmap)) { 333 pr_err("problem decoding monmap, %d\n", 334 (int)PTR_ERR(monmap)); 335 goto out; 336 } 337 338 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 339 kfree(monmap); 340 goto out; 341 } 342 343 client->monc.monmap = monmap; 344 kfree(old); 345 346 if (!client->have_fsid) { 347 client->have_fsid = true; 348 mutex_unlock(&monc->mutex); 349 /* 350 * do debugfs initialization without mutex to avoid 351 * creating a locking dependency 352 */ 353 ceph_debugfs_client_init(client); 354 goto out_unlocked; 355 } 356out: 357 mutex_unlock(&monc->mutex); 358out_unlocked: 359 wake_up_all(&client->auth_wq); 360} 361 362/* 363 * generic requests (e.g., statfs, poolop) 364 */ 365static struct ceph_mon_generic_request *__lookup_generic_req( 366 struct ceph_mon_client *monc, u64 tid) 367{ 368 struct ceph_mon_generic_request *req; 369 struct rb_node *n = monc->generic_request_tree.rb_node; 370 371 while (n) { 372 req = rb_entry(n, struct ceph_mon_generic_request, node); 373 if (tid < req->tid) 374 n = n->rb_left; 375 else if (tid > req->tid) 376 n = n->rb_right; 377 else 378 return req; 379 } 380 return NULL; 381} 382 383static void __insert_generic_request(struct ceph_mon_client *monc, 384 struct ceph_mon_generic_request *new) 385{ 386 struct rb_node **p = &monc->generic_request_tree.rb_node; 387 struct rb_node *parent = NULL; 388 struct ceph_mon_generic_request *req = NULL; 389 390 while (*p) { 391 parent = *p; 392 req = rb_entry(parent, struct ceph_mon_generic_request, node); 393 if (new->tid < req->tid) 394 p = &(*p)->rb_left; 395 else if (new->tid > req->tid) 396 p = &(*p)->rb_right; 397 else 398 BUG(); 399 } 400 401 rb_link_node(&new->node, parent, p); 402 rb_insert_color(&new->node, &monc->generic_request_tree); 403} 404 405static void release_generic_request(struct kref *kref) 406{ 407 struct ceph_mon_generic_request *req = 408 container_of(kref, struct ceph_mon_generic_request, kref); 409 410 if (req->reply) 411 ceph_msg_put(req->reply); 412 if (req->request) 413 ceph_msg_put(req->request); 414 415 kfree(req); 416} 417 418static void put_generic_request(struct ceph_mon_generic_request *req) 419{ 420 kref_put(&req->kref, release_generic_request); 421} 422 423static void get_generic_request(struct ceph_mon_generic_request *req) 424{ 425 kref_get(&req->kref); 426} 427 428static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 429 struct ceph_msg_header *hdr, 430 int *skip) 431{ 432 struct ceph_mon_client *monc = con->private; 433 struct ceph_mon_generic_request *req; 434 u64 tid = le64_to_cpu(hdr->tid); 435 struct ceph_msg *m; 436 437 mutex_lock(&monc->mutex); 438 req = __lookup_generic_req(monc, tid); 439 if (!req) { 440 dout("get_generic_reply %lld dne\n", tid); 441 *skip = 1; 442 m = NULL; 443 } else { 444 dout("get_generic_reply %lld got %p\n", tid, req->reply); 445 *skip = 0; 446 m = ceph_msg_get(req->reply); 447 /* 448 * we don't need to track the connection reading into 449 * this reply because we only have one open connection 450 * at a time, ever. 451 */ 452 } 453 mutex_unlock(&monc->mutex); 454 return m; 455} 456 457static int do_generic_request(struct ceph_mon_client *monc, 458 struct ceph_mon_generic_request *req) 459{ 460 int err; 461 462 /* register request */ 463 mutex_lock(&monc->mutex); 464 req->tid = ++monc->last_tid; 465 req->request->hdr.tid = cpu_to_le64(req->tid); 466 __insert_generic_request(monc, req); 467 monc->num_generic_requests++; 468 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 469 mutex_unlock(&monc->mutex); 470 471 err = wait_for_completion_interruptible(&req->completion); 472 473 mutex_lock(&monc->mutex); 474 rb_erase(&req->node, &monc->generic_request_tree); 475 monc->num_generic_requests--; 476 mutex_unlock(&monc->mutex); 477 478 if (!err) 479 err = req->result; 480 return err; 481} 482 483/* 484 * statfs 485 */ 486static void handle_statfs_reply(struct ceph_mon_client *monc, 487 struct ceph_msg *msg) 488{ 489 struct ceph_mon_generic_request *req; 490 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 491 u64 tid = le64_to_cpu(msg->hdr.tid); 492 493 if (msg->front.iov_len != sizeof(*reply)) 494 goto bad; 495 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 496 497 mutex_lock(&monc->mutex); 498 req = __lookup_generic_req(monc, tid); 499 if (req) { 500 *(struct ceph_statfs *)req->buf = reply->st; 501 req->result = 0; 502 get_generic_request(req); 503 } 504 mutex_unlock(&monc->mutex); 505 if (req) { 506 complete_all(&req->completion); 507 put_generic_request(req); 508 } 509 return; 510 511bad: 512 pr_err("corrupt generic reply, tid %llu\n", tid); 513 ceph_msg_dump(msg); 514} 515 516/* 517 * Do a synchronous statfs(). 518 */ 519int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 520{ 521 struct ceph_mon_generic_request *req; 522 struct ceph_mon_statfs *h; 523 int err; 524 525 req = kzalloc(sizeof(*req), GFP_NOFS); 526 if (!req) 527 return -ENOMEM; 528 529 kref_init(&req->kref); 530 req->buf = buf; 531 req->buf_len = sizeof(*buf); 532 init_completion(&req->completion); 533 534 err = -ENOMEM; 535 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 536 true); 537 if (!req->request) 538 goto out; 539 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, 540 true); 541 if (!req->reply) 542 goto out; 543 544 /* fill out request */ 545 h = req->request->front.iov_base; 546 h->monhdr.have_version = 0; 547 h->monhdr.session_mon = cpu_to_le16(-1); 548 h->monhdr.session_mon_tid = 0; 549 h->fsid = monc->monmap->fsid; 550 551 err = do_generic_request(monc, req); 552 553out: 554 kref_put(&req->kref, release_generic_request); 555 return err; 556} 557EXPORT_SYMBOL(ceph_monc_do_statfs); 558 559/* 560 * pool ops 561 */ 562static int get_poolop_reply_buf(const char *src, size_t src_len, 563 char *dst, size_t dst_len) 564{ 565 u32 buf_len; 566 567 if (src_len != sizeof(u32) + dst_len) 568 return -EINVAL; 569 570 buf_len = le32_to_cpu(*(u32 *)src); 571 if (buf_len != dst_len) 572 return -EINVAL; 573 574 memcpy(dst, src + sizeof(u32), dst_len); 575 return 0; 576} 577 578static void handle_poolop_reply(struct ceph_mon_client *monc, 579 struct ceph_msg *msg) 580{ 581 struct ceph_mon_generic_request *req; 582 struct ceph_mon_poolop_reply *reply = msg->front.iov_base; 583 u64 tid = le64_to_cpu(msg->hdr.tid); 584 585 if (msg->front.iov_len < sizeof(*reply)) 586 goto bad; 587 dout("handle_poolop_reply %p tid %llu\n", msg, tid); 588 589 mutex_lock(&monc->mutex); 590 req = __lookup_generic_req(monc, tid); 591 if (req) { 592 if (req->buf_len && 593 get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply), 594 msg->front.iov_len - sizeof(*reply), 595 req->buf, req->buf_len) < 0) { 596 mutex_unlock(&monc->mutex); 597 goto bad; 598 } 599 req->result = le32_to_cpu(reply->reply_code); 600 get_generic_request(req); 601 } 602 mutex_unlock(&monc->mutex); 603 if (req) { 604 complete(&req->completion); 605 put_generic_request(req); 606 } 607 return; 608 609bad: 610 pr_err("corrupt generic reply, tid %llu\n", tid); 611 ceph_msg_dump(msg); 612} 613 614/* 615 * Do a synchronous pool op. 616 */ 617int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, 618 u32 pool, u64 snapid, 619 char *buf, int len) 620{ 621 struct ceph_mon_generic_request *req; 622 struct ceph_mon_poolop *h; 623 int err; 624 625 req = kzalloc(sizeof(*req), GFP_NOFS); 626 if (!req) 627 return -ENOMEM; 628 629 kref_init(&req->kref); 630 req->buf = buf; 631 req->buf_len = len; 632 init_completion(&req->completion); 633 634 err = -ENOMEM; 635 req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS, 636 true); 637 if (!req->request) 638 goto out; 639 req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS, 640 true); 641 if (!req->reply) 642 goto out; 643 644 /* fill out request */ 645 req->request->hdr.version = cpu_to_le16(2); 646 h = req->request->front.iov_base; 647 h->monhdr.have_version = 0; 648 h->monhdr.session_mon = cpu_to_le16(-1); 649 h->monhdr.session_mon_tid = 0; 650 h->fsid = monc->monmap->fsid; 651 h->pool = cpu_to_le32(pool); 652 h->op = cpu_to_le32(op); 653 h->auid = 0; 654 h->snapid = cpu_to_le64(snapid); 655 h->name_len = 0; 656 657 err = do_generic_request(monc, req); 658 659out: 660 kref_put(&req->kref, release_generic_request); 661 return err; 662} 663 664int ceph_monc_create_snapid(struct ceph_mon_client *monc, 665 u32 pool, u64 *snapid) 666{ 667 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 668 pool, 0, (char *)snapid, sizeof(*snapid)); 669 670} 671EXPORT_SYMBOL(ceph_monc_create_snapid); 672 673int ceph_monc_delete_snapid(struct ceph_mon_client *monc, 674 u32 pool, u64 snapid) 675{ 676 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 677 pool, snapid, 0, 0); 678 679} 680 681/* 682 * Resend pending generic requests. 683 */ 684static void __resend_generic_request(struct ceph_mon_client *monc) 685{ 686 struct ceph_mon_generic_request *req; 687 struct rb_node *p; 688 689 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 690 req = rb_entry(p, struct ceph_mon_generic_request, node); 691 ceph_msg_revoke(req->request); 692 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 693 } 694} 695 696/* 697 * Delayed work. If we haven't mounted yet, retry. Otherwise, 698 * renew/retry subscription as needed (in case it is timing out, or we 699 * got an ENOMEM). And keep the monitor connection alive. 700 */ 701static void delayed_work(struct work_struct *work) 702{ 703 struct ceph_mon_client *monc = 704 container_of(work, struct ceph_mon_client, delayed_work.work); 705 706 dout("monc delayed_work\n"); 707 mutex_lock(&monc->mutex); 708 if (monc->hunting) { 709 __close_session(monc); 710 __open_session(monc); /* continue hunting */ 711 } else { 712 ceph_con_keepalive(&monc->con); 713 714 __validate_auth(monc); 715 716 if (monc->auth->ops->is_authenticated(monc->auth)) 717 __send_subscribe(monc); 718 } 719 __schedule_delayed(monc); 720 mutex_unlock(&monc->mutex); 721} 722 723/* 724 * On startup, we build a temporary monmap populated with the IPs 725 * provided by mount(2). 726 */ 727static int build_initial_monmap(struct ceph_mon_client *monc) 728{ 729 struct ceph_options *opt = monc->client->options; 730 struct ceph_entity_addr *mon_addr = opt->mon_addr; 731 int num_mon = opt->num_mon; 732 int i; 733 734 /* build initial monmap */ 735 monc->monmap = kzalloc(sizeof(*monc->monmap) + 736 num_mon*sizeof(monc->monmap->mon_inst[0]), 737 GFP_KERNEL); 738 if (!monc->monmap) 739 return -ENOMEM; 740 for (i = 0; i < num_mon; i++) { 741 monc->monmap->mon_inst[i].addr = mon_addr[i]; 742 monc->monmap->mon_inst[i].addr.nonce = 0; 743 monc->monmap->mon_inst[i].name.type = 744 CEPH_ENTITY_TYPE_MON; 745 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 746 } 747 monc->monmap->num_mon = num_mon; 748 monc->have_fsid = false; 749 return 0; 750} 751 752int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 753{ 754 int err = 0; 755 756 dout("init\n"); 757 memset(monc, 0, sizeof(*monc)); 758 monc->client = cl; 759 monc->monmap = NULL; 760 mutex_init(&monc->mutex); 761 762 err = build_initial_monmap(monc); 763 if (err) 764 goto out; 765 766 /* connection */ 767 /* authentication */ 768 monc->auth = ceph_auth_init(cl->options->name, 769 cl->options->key); 770 if (IS_ERR(monc->auth)) { 771 err = PTR_ERR(monc->auth); 772 goto out_monmap; 773 } 774 monc->auth->want_keys = 775 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 776 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 777 778 /* msgs */ 779 err = -ENOMEM; 780 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 781 sizeof(struct ceph_mon_subscribe_ack), 782 GFP_NOFS, true); 783 if (!monc->m_subscribe_ack) 784 goto out_auth; 785 786 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, 787 true); 788 if (!monc->m_subscribe) 789 goto out_subscribe_ack; 790 791 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, 792 true); 793 if (!monc->m_auth_reply) 794 goto out_subscribe; 795 796 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true); 797 monc->pending_auth = 0; 798 if (!monc->m_auth) 799 goto out_auth_reply; 800 801 monc->cur_mon = -1; 802 monc->hunting = true; 803 monc->sub_renew_after = jiffies; 804 monc->sub_sent = 0; 805 806 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 807 monc->generic_request_tree = RB_ROOT; 808 monc->num_generic_requests = 0; 809 monc->last_tid = 0; 810 811 monc->have_mdsmap = 0; 812 monc->have_osdmap = 0; 813 monc->want_next_osdmap = 1; 814 return 0; 815 816out_auth_reply: 817 ceph_msg_put(monc->m_auth_reply); 818out_subscribe: 819 ceph_msg_put(monc->m_subscribe); 820out_subscribe_ack: 821 ceph_msg_put(monc->m_subscribe_ack); 822out_auth: 823 ceph_auth_destroy(monc->auth); 824out_monmap: 825 kfree(monc->monmap); 826out: 827 return err; 828} 829EXPORT_SYMBOL(ceph_monc_init); 830 831void ceph_monc_stop(struct ceph_mon_client *monc) 832{ 833 dout("stop\n"); 834 cancel_delayed_work_sync(&monc->delayed_work); 835 836 mutex_lock(&monc->mutex); 837 __close_session(monc); 838 839 mutex_unlock(&monc->mutex); 840 841 ceph_auth_destroy(monc->auth); 842 843 ceph_msg_put(monc->m_auth); 844 ceph_msg_put(monc->m_auth_reply); 845 ceph_msg_put(monc->m_subscribe); 846 ceph_msg_put(monc->m_subscribe_ack); 847 848 kfree(monc->monmap); 849} 850EXPORT_SYMBOL(ceph_monc_stop); 851 852static void handle_auth_reply(struct ceph_mon_client *monc, 853 struct ceph_msg *msg) 854{ 855 int ret; 856 int was_auth = 0; 857 858 mutex_lock(&monc->mutex); 859 if (monc->auth->ops) 860 was_auth = monc->auth->ops->is_authenticated(monc->auth); 861 monc->pending_auth = 0; 862 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 863 msg->front.iov_len, 864 monc->m_auth->front.iov_base, 865 monc->m_auth->front_max); 866 if (ret < 0) { 867 monc->client->auth_err = ret; 868 wake_up_all(&monc->client->auth_wq); 869 } else if (ret > 0) { 870 __send_prepared_auth_request(monc, ret); 871 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { 872 dout("authenticated, starting session\n"); 873 874 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 875 monc->client->msgr.inst.name.num = 876 cpu_to_le64(monc->auth->global_id); 877 878 __send_subscribe(monc); 879 __resend_generic_request(monc); 880 } 881 mutex_unlock(&monc->mutex); 882} 883 884static int __validate_auth(struct ceph_mon_client *monc) 885{ 886 int ret; 887 888 if (monc->pending_auth) 889 return 0; 890 891 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 892 monc->m_auth->front_max); 893 if (ret <= 0) 894 return ret; /* either an error, or no need to authenticate */ 895 __send_prepared_auth_request(monc, ret); 896 return 0; 897} 898 899int ceph_monc_validate_auth(struct ceph_mon_client *monc) 900{ 901 int ret; 902 903 mutex_lock(&monc->mutex); 904 ret = __validate_auth(monc); 905 mutex_unlock(&monc->mutex); 906 return ret; 907} 908EXPORT_SYMBOL(ceph_monc_validate_auth); 909 910/* 911 * handle incoming message 912 */ 913static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 914{ 915 struct ceph_mon_client *monc = con->private; 916 int type = le16_to_cpu(msg->hdr.type); 917 918 if (!monc) 919 return; 920 921 switch (type) { 922 case CEPH_MSG_AUTH_REPLY: 923 handle_auth_reply(monc, msg); 924 break; 925 926 case CEPH_MSG_MON_SUBSCRIBE_ACK: 927 handle_subscribe_ack(monc, msg); 928 break; 929 930 case CEPH_MSG_STATFS_REPLY: 931 handle_statfs_reply(monc, msg); 932 break; 933 934 case CEPH_MSG_POOLOP_REPLY: 935 handle_poolop_reply(monc, msg); 936 break; 937 938 case CEPH_MSG_MON_MAP: 939 ceph_monc_handle_map(monc, msg); 940 break; 941 942 case CEPH_MSG_OSD_MAP: 943 ceph_osdc_handle_map(&monc->client->osdc, msg); 944 break; 945 946 default: 947 /* can the chained handler handle it? */ 948 if (monc->client->extra_mon_dispatch && 949 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 950 break; 951 952 pr_err("received unknown message type %d %s\n", type, 953 ceph_msg_type_name(type)); 954 } 955 ceph_msg_put(msg); 956} 957 958/* 959 * Allocate memory for incoming message 960 */ 961static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 962 struct ceph_msg_header *hdr, 963 int *skip) 964{ 965 struct ceph_mon_client *monc = con->private; 966 int type = le16_to_cpu(hdr->type); 967 int front_len = le32_to_cpu(hdr->front_len); 968 struct ceph_msg *m = NULL; 969 970 *skip = 0; 971 972 switch (type) { 973 case CEPH_MSG_MON_SUBSCRIBE_ACK: 974 m = ceph_msg_get(monc->m_subscribe_ack); 975 break; 976 case CEPH_MSG_POOLOP_REPLY: 977 case CEPH_MSG_STATFS_REPLY: 978 return get_generic_reply(con, hdr, skip); 979 case CEPH_MSG_AUTH_REPLY: 980 m = ceph_msg_get(monc->m_auth_reply); 981 break; 982 case CEPH_MSG_MON_MAP: 983 case CEPH_MSG_MDS_MAP: 984 case CEPH_MSG_OSD_MAP: 985 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 986 if (!m) 987 return NULL; /* ENOMEM--return skip == 0 */ 988 break; 989 } 990 991 if (!m) { 992 pr_info("alloc_msg unknown type %d\n", type); 993 *skip = 1; 994 } 995 return m; 996} 997 998/* 999 * If the monitor connection resets, pick a new monitor and resubmit 1000 * any pending requests. 1001 */ 1002static void mon_fault(struct ceph_connection *con) 1003{ 1004 struct ceph_mon_client *monc = con->private; 1005 1006 if (!monc) 1007 return; 1008 1009 dout("mon_fault\n"); 1010 mutex_lock(&monc->mutex); 1011 if (!con->private) 1012 goto out; 1013 1014 if (!monc->hunting) 1015 pr_info("mon%d %s session lost, " 1016 "hunting for new mon\n", monc->cur_mon, 1017 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1018 1019 __close_session(monc); 1020 if (!monc->hunting) { 1021 /* start hunting */ 1022 monc->hunting = true; 1023 __open_session(monc); 1024 } else { 1025 /* already hunting, let's wait a bit */ 1026 __schedule_delayed(monc); 1027 } 1028out: 1029 mutex_unlock(&monc->mutex); 1030} 1031 1032/* 1033 * We can ignore refcounting on the connection struct, as all references 1034 * will come from the messenger workqueue, which is drained prior to 1035 * mon_client destruction. 1036 */ 1037static struct ceph_connection *con_get(struct ceph_connection *con) 1038{ 1039 return con; 1040} 1041 1042static void con_put(struct ceph_connection *con) 1043{ 1044} 1045 1046static const struct ceph_connection_operations mon_con_ops = { 1047 .get = con_get, 1048 .put = con_put, 1049 .dispatch = dispatch, 1050 .fault = mon_fault, 1051 .alloc_msg = mon_alloc_msg, 1052}; 1053