mon_client.c revision 1bfd89f4e6e1adc6a782d94aa5d4c53be1e404d7
1#include <linux/ceph/ceph_debug.h> 2 3#include <linux/module.h> 4#include <linux/types.h> 5#include <linux/slab.h> 6#include <linux/random.h> 7#include <linux/sched.h> 8 9#include <linux/ceph/mon_client.h> 10#include <linux/ceph/libceph.h> 11#include <linux/ceph/debugfs.h> 12#include <linux/ceph/decode.h> 13#include <linux/ceph/auth.h> 14 15/* 16 * Interact with Ceph monitor cluster. Handle requests for new map 17 * versions, and periodically resend as needed. Also implement 18 * statfs() and umount(). 19 * 20 * A small cluster of Ceph "monitors" are responsible for managing critical 21 * cluster configuration and state information. An odd number (e.g., 3, 5) 22 * of cmon daemons use a modified version of the Paxos part-time parliament 23 * algorithm to manage the MDS map (mds cluster membership), OSD map, and 24 * list of clients who have mounted the file system. 25 * 26 * We maintain an open, active session with a monitor at all times in order to 27 * receive timely MDSMap updates. We periodically send a keepalive byte on the 28 * TCP socket to ensure we detect a failure. If the connection does break, we 29 * randomly hunt for a new monitor. Once the connection is reestablished, we 30 * resend any outstanding requests. 31 */ 32 33static const struct ceph_connection_operations mon_con_ops; 34 35static int __validate_auth(struct ceph_mon_client *monc); 36 37/* 38 * Decode a monmap blob (e.g., during mount). 39 */ 40struct ceph_monmap *ceph_monmap_decode(void *p, void *end) 41{ 42 struct ceph_monmap *m = NULL; 43 int i, err = -EINVAL; 44 struct ceph_fsid fsid; 45 u32 epoch, num_mon; 46 u16 version; 47 u32 len; 48 49 ceph_decode_32_safe(&p, end, len, bad); 50 ceph_decode_need(&p, end, len, bad); 51 52 dout("monmap_decode %p %p len %d\n", p, end, (int)(end-p)); 53 54 ceph_decode_16_safe(&p, end, version, bad); 55 56 ceph_decode_need(&p, end, sizeof(fsid) + 2*sizeof(u32), bad); 57 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 58 epoch = ceph_decode_32(&p); 59 60 num_mon = ceph_decode_32(&p); 61 ceph_decode_need(&p, end, num_mon*sizeof(m->mon_inst[0]), bad); 62 63 if (num_mon >= CEPH_MAX_MON) 64 goto bad; 65 m = kmalloc(sizeof(*m) + sizeof(m->mon_inst[0])*num_mon, GFP_NOFS); 66 if (m == NULL) 67 return ERR_PTR(-ENOMEM); 68 m->fsid = fsid; 69 m->epoch = epoch; 70 m->num_mon = num_mon; 71 ceph_decode_copy(&p, m->mon_inst, num_mon*sizeof(m->mon_inst[0])); 72 for (i = 0; i < num_mon; i++) 73 ceph_decode_addr(&m->mon_inst[i].addr); 74 75 dout("monmap_decode epoch %d, num_mon %d\n", m->epoch, 76 m->num_mon); 77 for (i = 0; i < m->num_mon; i++) 78 dout("monmap_decode mon%d is %s\n", i, 79 ceph_pr_addr(&m->mon_inst[i].addr.in_addr)); 80 return m; 81 82bad: 83 dout("monmap_decode failed with %d\n", err); 84 kfree(m); 85 return ERR_PTR(err); 86} 87 88/* 89 * return true if *addr is included in the monmap. 90 */ 91int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr) 92{ 93 int i; 94 95 for (i = 0; i < m->num_mon; i++) 96 if (memcmp(addr, &m->mon_inst[i].addr, sizeof(*addr)) == 0) 97 return 1; 98 return 0; 99} 100 101/* 102 * Send an auth request. 103 */ 104static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len) 105{ 106 monc->pending_auth = 1; 107 monc->m_auth->front.iov_len = len; 108 monc->m_auth->hdr.front_len = cpu_to_le32(len); 109 ceph_con_revoke(&monc->con, monc->m_auth); 110 ceph_msg_get(monc->m_auth); /* keep our ref */ 111 ceph_con_send(&monc->con, monc->m_auth); 112} 113 114/* 115 * Close monitor session, if any. 116 */ 117static void __close_session(struct ceph_mon_client *monc) 118{ 119 dout("__close_session closing mon%d\n", monc->cur_mon); 120 ceph_con_revoke(&monc->con, monc->m_auth); 121 ceph_con_close(&monc->con); 122 monc->con.private = NULL; 123 monc->cur_mon = -1; 124 monc->pending_auth = 0; 125 ceph_auth_reset(monc->auth); 126} 127 128/* 129 * Open a session with a (new) monitor. 130 */ 131static int __open_session(struct ceph_mon_client *monc) 132{ 133 char r; 134 int ret; 135 136 if (monc->cur_mon < 0) { 137 get_random_bytes(&r, 1); 138 monc->cur_mon = r % monc->monmap->num_mon; 139 dout("open_session num=%d r=%d -> mon%d\n", 140 monc->monmap->num_mon, r, monc->cur_mon); 141 monc->sub_sent = 0; 142 monc->sub_renew_after = jiffies; /* i.e., expired */ 143 monc->want_next_osdmap = !!monc->want_next_osdmap; 144 145 ceph_con_init(&monc->con, monc, &mon_con_ops, 146 &monc->client->msgr, 147 CEPH_ENTITY_TYPE_MON, monc->cur_mon); 148 149 dout("open_session mon%d opening\n", monc->cur_mon); 150 ceph_con_open(&monc->con, 151 &monc->monmap->mon_inst[monc->cur_mon].addr); 152 153 /* initiatiate authentication handshake */ 154 ret = ceph_auth_build_hello(monc->auth, 155 monc->m_auth->front.iov_base, 156 monc->m_auth->front_max); 157 __send_prepared_auth_request(monc, ret); 158 } else { 159 dout("open_session mon%d already open\n", monc->cur_mon); 160 } 161 return 0; 162} 163 164static bool __sub_expired(struct ceph_mon_client *monc) 165{ 166 return time_after_eq(jiffies, monc->sub_renew_after); 167} 168 169/* 170 * Reschedule delayed work timer. 171 */ 172static void __schedule_delayed(struct ceph_mon_client *monc) 173{ 174 unsigned delay; 175 176 if (monc->cur_mon < 0 || __sub_expired(monc)) 177 delay = 10 * HZ; 178 else 179 delay = 20 * HZ; 180 dout("__schedule_delayed after %u\n", delay); 181 schedule_delayed_work(&monc->delayed_work, delay); 182} 183 184/* 185 * Send subscribe request for mdsmap and/or osdmap. 186 */ 187static void __send_subscribe(struct ceph_mon_client *monc) 188{ 189 dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n", 190 (unsigned)monc->sub_sent, __sub_expired(monc), 191 monc->want_next_osdmap); 192 if ((__sub_expired(monc) && !monc->sub_sent) || 193 monc->want_next_osdmap == 1) { 194 struct ceph_msg *msg = monc->m_subscribe; 195 struct ceph_mon_subscribe_item *i; 196 void *p, *end; 197 int num; 198 199 p = msg->front.iov_base; 200 end = p + msg->front_max; 201 202 num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap; 203 ceph_encode_32(&p, num); 204 205 if (monc->want_next_osdmap) { 206 dout("__send_subscribe to 'osdmap' %u\n", 207 (unsigned)monc->have_osdmap); 208 ceph_encode_string(&p, end, "osdmap", 6); 209 i = p; 210 i->have = cpu_to_le64(monc->have_osdmap); 211 i->onetime = 1; 212 p += sizeof(*i); 213 monc->want_next_osdmap = 2; /* requested */ 214 } 215 if (monc->want_mdsmap) { 216 dout("__send_subscribe to 'mdsmap' %u+\n", 217 (unsigned)monc->have_mdsmap); 218 ceph_encode_string(&p, end, "mdsmap", 6); 219 i = p; 220 i->have = cpu_to_le64(monc->have_mdsmap); 221 i->onetime = 0; 222 p += sizeof(*i); 223 } 224 ceph_encode_string(&p, end, "monmap", 6); 225 i = p; 226 i->have = 0; 227 i->onetime = 0; 228 p += sizeof(*i); 229 230 msg->front.iov_len = p - msg->front.iov_base; 231 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 232 ceph_con_revoke(&monc->con, msg); 233 ceph_con_send(&monc->con, ceph_msg_get(msg)); 234 235 monc->sub_sent = jiffies | 1; /* never 0 */ 236 } 237} 238 239static void handle_subscribe_ack(struct ceph_mon_client *monc, 240 struct ceph_msg *msg) 241{ 242 unsigned seconds; 243 struct ceph_mon_subscribe_ack *h = msg->front.iov_base; 244 245 if (msg->front.iov_len < sizeof(*h)) 246 goto bad; 247 seconds = le32_to_cpu(h->duration); 248 249 mutex_lock(&monc->mutex); 250 if (monc->hunting) { 251 pr_info("mon%d %s session established\n", 252 monc->cur_mon, 253 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 254 monc->hunting = false; 255 } 256 dout("handle_subscribe_ack after %d seconds\n", seconds); 257 monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1; 258 monc->sub_sent = 0; 259 mutex_unlock(&monc->mutex); 260 return; 261bad: 262 pr_err("got corrupt subscribe-ack msg\n"); 263 ceph_msg_dump(msg); 264} 265 266/* 267 * Keep track of which maps we have 268 */ 269int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got) 270{ 271 mutex_lock(&monc->mutex); 272 monc->have_mdsmap = got; 273 mutex_unlock(&monc->mutex); 274 return 0; 275} 276EXPORT_SYMBOL(ceph_monc_got_mdsmap); 277 278int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got) 279{ 280 mutex_lock(&monc->mutex); 281 monc->have_osdmap = got; 282 monc->want_next_osdmap = 0; 283 mutex_unlock(&monc->mutex); 284 return 0; 285} 286 287/* 288 * Register interest in the next osdmap 289 */ 290void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) 291{ 292 dout("request_next_osdmap have %u\n", monc->have_osdmap); 293 mutex_lock(&monc->mutex); 294 if (!monc->want_next_osdmap) 295 monc->want_next_osdmap = 1; 296 if (monc->want_next_osdmap < 2) 297 __send_subscribe(monc); 298 mutex_unlock(&monc->mutex); 299} 300 301/* 302 * 303 */ 304int ceph_monc_open_session(struct ceph_mon_client *monc) 305{ 306 mutex_lock(&monc->mutex); 307 __open_session(monc); 308 __schedule_delayed(monc); 309 mutex_unlock(&monc->mutex); 310 return 0; 311} 312EXPORT_SYMBOL(ceph_monc_open_session); 313 314/* 315 * The monitor responds with mount ack indicate mount success. The 316 * included client ticket allows the client to talk to MDSs and OSDs. 317 */ 318static void ceph_monc_handle_map(struct ceph_mon_client *monc, 319 struct ceph_msg *msg) 320{ 321 struct ceph_client *client = monc->client; 322 struct ceph_monmap *monmap = NULL, *old = monc->monmap; 323 void *p, *end; 324 325 mutex_lock(&monc->mutex); 326 327 dout("handle_monmap\n"); 328 p = msg->front.iov_base; 329 end = p + msg->front.iov_len; 330 331 monmap = ceph_monmap_decode(p, end); 332 if (IS_ERR(monmap)) { 333 pr_err("problem decoding monmap, %d\n", 334 (int)PTR_ERR(monmap)); 335 goto out; 336 } 337 338 if (ceph_check_fsid(monc->client, &monmap->fsid) < 0) { 339 kfree(monmap); 340 goto out; 341 } 342 343 client->monc.monmap = monmap; 344 kfree(old); 345 346 if (!client->have_fsid) { 347 client->have_fsid = true; 348 mutex_unlock(&monc->mutex); 349 /* 350 * do debugfs initialization without mutex to avoid 351 * creating a locking dependency 352 */ 353 ceph_debugfs_client_init(client); 354 goto out_unlocked; 355 } 356out: 357 mutex_unlock(&monc->mutex); 358out_unlocked: 359 wake_up_all(&client->auth_wq); 360} 361 362/* 363 * generic requests (e.g., statfs, poolop) 364 */ 365static struct ceph_mon_generic_request *__lookup_generic_req( 366 struct ceph_mon_client *monc, u64 tid) 367{ 368 struct ceph_mon_generic_request *req; 369 struct rb_node *n = monc->generic_request_tree.rb_node; 370 371 while (n) { 372 req = rb_entry(n, struct ceph_mon_generic_request, node); 373 if (tid < req->tid) 374 n = n->rb_left; 375 else if (tid > req->tid) 376 n = n->rb_right; 377 else 378 return req; 379 } 380 return NULL; 381} 382 383static void __insert_generic_request(struct ceph_mon_client *monc, 384 struct ceph_mon_generic_request *new) 385{ 386 struct rb_node **p = &monc->generic_request_tree.rb_node; 387 struct rb_node *parent = NULL; 388 struct ceph_mon_generic_request *req = NULL; 389 390 while (*p) { 391 parent = *p; 392 req = rb_entry(parent, struct ceph_mon_generic_request, node); 393 if (new->tid < req->tid) 394 p = &(*p)->rb_left; 395 else if (new->tid > req->tid) 396 p = &(*p)->rb_right; 397 else 398 BUG(); 399 } 400 401 rb_link_node(&new->node, parent, p); 402 rb_insert_color(&new->node, &monc->generic_request_tree); 403} 404 405static void release_generic_request(struct kref *kref) 406{ 407 struct ceph_mon_generic_request *req = 408 container_of(kref, struct ceph_mon_generic_request, kref); 409 410 if (req->reply) 411 ceph_msg_put(req->reply); 412 if (req->request) 413 ceph_msg_put(req->request); 414 415 kfree(req); 416} 417 418static void put_generic_request(struct ceph_mon_generic_request *req) 419{ 420 kref_put(&req->kref, release_generic_request); 421} 422 423static void get_generic_request(struct ceph_mon_generic_request *req) 424{ 425 kref_get(&req->kref); 426} 427 428static struct ceph_msg *get_generic_reply(struct ceph_connection *con, 429 struct ceph_msg_header *hdr, 430 int *skip) 431{ 432 struct ceph_mon_client *monc = con->private; 433 struct ceph_mon_generic_request *req; 434 u64 tid = le64_to_cpu(hdr->tid); 435 struct ceph_msg *m; 436 437 mutex_lock(&monc->mutex); 438 req = __lookup_generic_req(monc, tid); 439 if (!req) { 440 dout("get_generic_reply %lld dne\n", tid); 441 *skip = 1; 442 m = NULL; 443 } else { 444 dout("get_generic_reply %lld got %p\n", tid, req->reply); 445 m = ceph_msg_get(req->reply); 446 /* 447 * we don't need to track the connection reading into 448 * this reply because we only have one open connection 449 * at a time, ever. 450 */ 451 } 452 mutex_unlock(&monc->mutex); 453 return m; 454} 455 456static int do_generic_request(struct ceph_mon_client *monc, 457 struct ceph_mon_generic_request *req) 458{ 459 int err; 460 461 /* register request */ 462 mutex_lock(&monc->mutex); 463 req->tid = ++monc->last_tid; 464 req->request->hdr.tid = cpu_to_le64(req->tid); 465 __insert_generic_request(monc, req); 466 monc->num_generic_requests++; 467 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 468 mutex_unlock(&monc->mutex); 469 470 err = wait_for_completion_interruptible(&req->completion); 471 472 mutex_lock(&monc->mutex); 473 rb_erase(&req->node, &monc->generic_request_tree); 474 monc->num_generic_requests--; 475 mutex_unlock(&monc->mutex); 476 477 if (!err) 478 err = req->result; 479 return err; 480} 481 482/* 483 * statfs 484 */ 485static void handle_statfs_reply(struct ceph_mon_client *monc, 486 struct ceph_msg *msg) 487{ 488 struct ceph_mon_generic_request *req; 489 struct ceph_mon_statfs_reply *reply = msg->front.iov_base; 490 u64 tid = le64_to_cpu(msg->hdr.tid); 491 492 if (msg->front.iov_len != sizeof(*reply)) 493 goto bad; 494 dout("handle_statfs_reply %p tid %llu\n", msg, tid); 495 496 mutex_lock(&monc->mutex); 497 req = __lookup_generic_req(monc, tid); 498 if (req) { 499 *(struct ceph_statfs *)req->buf = reply->st; 500 req->result = 0; 501 get_generic_request(req); 502 } 503 mutex_unlock(&monc->mutex); 504 if (req) { 505 complete_all(&req->completion); 506 put_generic_request(req); 507 } 508 return; 509 510bad: 511 pr_err("corrupt generic reply, tid %llu\n", tid); 512 ceph_msg_dump(msg); 513} 514 515/* 516 * Do a synchronous statfs(). 517 */ 518int ceph_monc_do_statfs(struct ceph_mon_client *monc, struct ceph_statfs *buf) 519{ 520 struct ceph_mon_generic_request *req; 521 struct ceph_mon_statfs *h; 522 int err; 523 524 req = kzalloc(sizeof(*req), GFP_NOFS); 525 if (!req) 526 return -ENOMEM; 527 528 kref_init(&req->kref); 529 req->buf = buf; 530 req->buf_len = sizeof(*buf); 531 init_completion(&req->completion); 532 533 err = -ENOMEM; 534 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS, 535 true); 536 if (!req->request) 537 goto out; 538 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS, 539 true); 540 if (!req->reply) 541 goto out; 542 543 /* fill out request */ 544 h = req->request->front.iov_base; 545 h->monhdr.have_version = 0; 546 h->monhdr.session_mon = cpu_to_le16(-1); 547 h->monhdr.session_mon_tid = 0; 548 h->fsid = monc->monmap->fsid; 549 550 err = do_generic_request(monc, req); 551 552out: 553 kref_put(&req->kref, release_generic_request); 554 return err; 555} 556EXPORT_SYMBOL(ceph_monc_do_statfs); 557 558/* 559 * pool ops 560 */ 561static int get_poolop_reply_buf(const char *src, size_t src_len, 562 char *dst, size_t dst_len) 563{ 564 u32 buf_len; 565 566 if (src_len != sizeof(u32) + dst_len) 567 return -EINVAL; 568 569 buf_len = le32_to_cpu(*(u32 *)src); 570 if (buf_len != dst_len) 571 return -EINVAL; 572 573 memcpy(dst, src + sizeof(u32), dst_len); 574 return 0; 575} 576 577static void handle_poolop_reply(struct ceph_mon_client *monc, 578 struct ceph_msg *msg) 579{ 580 struct ceph_mon_generic_request *req; 581 struct ceph_mon_poolop_reply *reply = msg->front.iov_base; 582 u64 tid = le64_to_cpu(msg->hdr.tid); 583 584 if (msg->front.iov_len < sizeof(*reply)) 585 goto bad; 586 dout("handle_poolop_reply %p tid %llu\n", msg, tid); 587 588 mutex_lock(&monc->mutex); 589 req = __lookup_generic_req(monc, tid); 590 if (req) { 591 if (req->buf_len && 592 get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply), 593 msg->front.iov_len - sizeof(*reply), 594 req->buf, req->buf_len) < 0) { 595 mutex_unlock(&monc->mutex); 596 goto bad; 597 } 598 req->result = le32_to_cpu(reply->reply_code); 599 get_generic_request(req); 600 } 601 mutex_unlock(&monc->mutex); 602 if (req) { 603 complete(&req->completion); 604 put_generic_request(req); 605 } 606 return; 607 608bad: 609 pr_err("corrupt generic reply, tid %llu\n", tid); 610 ceph_msg_dump(msg); 611} 612 613/* 614 * Do a synchronous pool op. 615 */ 616int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op, 617 u32 pool, u64 snapid, 618 char *buf, int len) 619{ 620 struct ceph_mon_generic_request *req; 621 struct ceph_mon_poolop *h; 622 int err; 623 624 req = kzalloc(sizeof(*req), GFP_NOFS); 625 if (!req) 626 return -ENOMEM; 627 628 kref_init(&req->kref); 629 req->buf = buf; 630 req->buf_len = len; 631 init_completion(&req->completion); 632 633 err = -ENOMEM; 634 req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS, 635 true); 636 if (!req->request) 637 goto out; 638 req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS, 639 true); 640 if (!req->reply) 641 goto out; 642 643 /* fill out request */ 644 req->request->hdr.version = cpu_to_le16(2); 645 h = req->request->front.iov_base; 646 h->monhdr.have_version = 0; 647 h->monhdr.session_mon = cpu_to_le16(-1); 648 h->monhdr.session_mon_tid = 0; 649 h->fsid = monc->monmap->fsid; 650 h->pool = cpu_to_le32(pool); 651 h->op = cpu_to_le32(op); 652 h->auid = 0; 653 h->snapid = cpu_to_le64(snapid); 654 h->name_len = 0; 655 656 err = do_generic_request(monc, req); 657 658out: 659 kref_put(&req->kref, release_generic_request); 660 return err; 661} 662 663int ceph_monc_create_snapid(struct ceph_mon_client *monc, 664 u32 pool, u64 *snapid) 665{ 666 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 667 pool, 0, (char *)snapid, sizeof(*snapid)); 668 669} 670EXPORT_SYMBOL(ceph_monc_create_snapid); 671 672int ceph_monc_delete_snapid(struct ceph_mon_client *monc, 673 u32 pool, u64 snapid) 674{ 675 return ceph_monc_do_poolop(monc, POOL_OP_CREATE_UNMANAGED_SNAP, 676 pool, snapid, 0, 0); 677 678} 679 680/* 681 * Resend pending generic requests. 682 */ 683static void __resend_generic_request(struct ceph_mon_client *monc) 684{ 685 struct ceph_mon_generic_request *req; 686 struct rb_node *p; 687 688 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) { 689 req = rb_entry(p, struct ceph_mon_generic_request, node); 690 ceph_con_revoke(&monc->con, req->request); 691 ceph_con_send(&monc->con, ceph_msg_get(req->request)); 692 } 693} 694 695/* 696 * Delayed work. If we haven't mounted yet, retry. Otherwise, 697 * renew/retry subscription as needed (in case it is timing out, or we 698 * got an ENOMEM). And keep the monitor connection alive. 699 */ 700static void delayed_work(struct work_struct *work) 701{ 702 struct ceph_mon_client *monc = 703 container_of(work, struct ceph_mon_client, delayed_work.work); 704 705 dout("monc delayed_work\n"); 706 mutex_lock(&monc->mutex); 707 if (monc->hunting) { 708 __close_session(monc); 709 __open_session(monc); /* continue hunting */ 710 } else { 711 ceph_con_keepalive(&monc->con); 712 713 __validate_auth(monc); 714 715 if (monc->auth->ops->is_authenticated(monc->auth)) 716 __send_subscribe(monc); 717 } 718 __schedule_delayed(monc); 719 mutex_unlock(&monc->mutex); 720} 721 722/* 723 * On startup, we build a temporary monmap populated with the IPs 724 * provided by mount(2). 725 */ 726static int build_initial_monmap(struct ceph_mon_client *monc) 727{ 728 struct ceph_options *opt = monc->client->options; 729 struct ceph_entity_addr *mon_addr = opt->mon_addr; 730 int num_mon = opt->num_mon; 731 int i; 732 733 /* build initial monmap */ 734 monc->monmap = kzalloc(sizeof(*monc->monmap) + 735 num_mon*sizeof(monc->monmap->mon_inst[0]), 736 GFP_KERNEL); 737 if (!monc->monmap) 738 return -ENOMEM; 739 for (i = 0; i < num_mon; i++) { 740 monc->monmap->mon_inst[i].addr = mon_addr[i]; 741 monc->monmap->mon_inst[i].addr.nonce = 0; 742 monc->monmap->mon_inst[i].name.type = 743 CEPH_ENTITY_TYPE_MON; 744 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i); 745 } 746 monc->monmap->num_mon = num_mon; 747 monc->have_fsid = false; 748 return 0; 749} 750 751int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl) 752{ 753 int err = 0; 754 755 dout("init\n"); 756 memset(monc, 0, sizeof(*monc)); 757 monc->client = cl; 758 monc->monmap = NULL; 759 mutex_init(&monc->mutex); 760 761 err = build_initial_monmap(monc); 762 if (err) 763 goto out; 764 765 /* connection */ 766 /* authentication */ 767 monc->auth = ceph_auth_init(cl->options->name, 768 cl->options->key); 769 if (IS_ERR(monc->auth)) { 770 err = PTR_ERR(monc->auth); 771 goto out_monmap; 772 } 773 monc->auth->want_keys = 774 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON | 775 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS; 776 777 /* msgs */ 778 err = -ENOMEM; 779 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK, 780 sizeof(struct ceph_mon_subscribe_ack), 781 GFP_NOFS, true); 782 if (!monc->m_subscribe_ack) 783 goto out_auth; 784 785 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS, 786 true); 787 if (!monc->m_subscribe) 788 goto out_subscribe_ack; 789 790 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS, 791 true); 792 if (!monc->m_auth_reply) 793 goto out_subscribe; 794 795 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true); 796 monc->pending_auth = 0; 797 if (!monc->m_auth) 798 goto out_auth_reply; 799 800 monc->cur_mon = -1; 801 monc->hunting = true; 802 monc->sub_renew_after = jiffies; 803 monc->sub_sent = 0; 804 805 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work); 806 monc->generic_request_tree = RB_ROOT; 807 monc->num_generic_requests = 0; 808 monc->last_tid = 0; 809 810 monc->have_mdsmap = 0; 811 monc->have_osdmap = 0; 812 monc->want_next_osdmap = 1; 813 return 0; 814 815out_auth_reply: 816 ceph_msg_put(monc->m_auth_reply); 817out_subscribe: 818 ceph_msg_put(monc->m_subscribe); 819out_subscribe_ack: 820 ceph_msg_put(monc->m_subscribe_ack); 821out_auth: 822 ceph_auth_destroy(monc->auth); 823out_monmap: 824 kfree(monc->monmap); 825out: 826 return err; 827} 828EXPORT_SYMBOL(ceph_monc_init); 829 830void ceph_monc_stop(struct ceph_mon_client *monc) 831{ 832 dout("stop\n"); 833 cancel_delayed_work_sync(&monc->delayed_work); 834 835 mutex_lock(&monc->mutex); 836 __close_session(monc); 837 838 mutex_unlock(&monc->mutex); 839 840 ceph_auth_destroy(monc->auth); 841 842 ceph_msg_put(monc->m_auth); 843 ceph_msg_put(monc->m_auth_reply); 844 ceph_msg_put(monc->m_subscribe); 845 ceph_msg_put(monc->m_subscribe_ack); 846 847 kfree(monc->monmap); 848} 849EXPORT_SYMBOL(ceph_monc_stop); 850 851static void handle_auth_reply(struct ceph_mon_client *monc, 852 struct ceph_msg *msg) 853{ 854 int ret; 855 int was_auth = 0; 856 857 mutex_lock(&monc->mutex); 858 if (monc->auth->ops) 859 was_auth = monc->auth->ops->is_authenticated(monc->auth); 860 monc->pending_auth = 0; 861 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base, 862 msg->front.iov_len, 863 monc->m_auth->front.iov_base, 864 monc->m_auth->front_max); 865 if (ret < 0) { 866 monc->client->auth_err = ret; 867 wake_up_all(&monc->client->auth_wq); 868 } else if (ret > 0) { 869 __send_prepared_auth_request(monc, ret); 870 } else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) { 871 dout("authenticated, starting session\n"); 872 873 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT; 874 monc->client->msgr.inst.name.num = 875 cpu_to_le64(monc->auth->global_id); 876 877 __send_subscribe(monc); 878 __resend_generic_request(monc); 879 } 880 mutex_unlock(&monc->mutex); 881} 882 883static int __validate_auth(struct ceph_mon_client *monc) 884{ 885 int ret; 886 887 if (monc->pending_auth) 888 return 0; 889 890 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base, 891 monc->m_auth->front_max); 892 if (ret <= 0) 893 return ret; /* either an error, or no need to authenticate */ 894 __send_prepared_auth_request(monc, ret); 895 return 0; 896} 897 898int ceph_monc_validate_auth(struct ceph_mon_client *monc) 899{ 900 int ret; 901 902 mutex_lock(&monc->mutex); 903 ret = __validate_auth(monc); 904 mutex_unlock(&monc->mutex); 905 return ret; 906} 907EXPORT_SYMBOL(ceph_monc_validate_auth); 908 909/* 910 * handle incoming message 911 */ 912static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 913{ 914 struct ceph_mon_client *monc = con->private; 915 int type = le16_to_cpu(msg->hdr.type); 916 917 if (!monc) 918 return; 919 920 switch (type) { 921 case CEPH_MSG_AUTH_REPLY: 922 handle_auth_reply(monc, msg); 923 break; 924 925 case CEPH_MSG_MON_SUBSCRIBE_ACK: 926 handle_subscribe_ack(monc, msg); 927 break; 928 929 case CEPH_MSG_STATFS_REPLY: 930 handle_statfs_reply(monc, msg); 931 break; 932 933 case CEPH_MSG_POOLOP_REPLY: 934 handle_poolop_reply(monc, msg); 935 break; 936 937 case CEPH_MSG_MON_MAP: 938 ceph_monc_handle_map(monc, msg); 939 break; 940 941 case CEPH_MSG_OSD_MAP: 942 ceph_osdc_handle_map(&monc->client->osdc, msg); 943 break; 944 945 default: 946 /* can the chained handler handle it? */ 947 if (monc->client->extra_mon_dispatch && 948 monc->client->extra_mon_dispatch(monc->client, msg) == 0) 949 break; 950 951 pr_err("received unknown message type %d %s\n", type, 952 ceph_msg_type_name(type)); 953 } 954 ceph_msg_put(msg); 955} 956 957/* 958 * Allocate memory for incoming message 959 */ 960static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con, 961 struct ceph_msg_header *hdr, 962 int *skip) 963{ 964 struct ceph_mon_client *monc = con->private; 965 int type = le16_to_cpu(hdr->type); 966 int front_len = le32_to_cpu(hdr->front_len); 967 struct ceph_msg *m = NULL; 968 969 *skip = 0; 970 971 switch (type) { 972 case CEPH_MSG_MON_SUBSCRIBE_ACK: 973 m = ceph_msg_get(monc->m_subscribe_ack); 974 break; 975 case CEPH_MSG_POOLOP_REPLY: 976 case CEPH_MSG_STATFS_REPLY: 977 return get_generic_reply(con, hdr, skip); 978 case CEPH_MSG_AUTH_REPLY: 979 m = ceph_msg_get(monc->m_auth_reply); 980 break; 981 case CEPH_MSG_MON_MAP: 982 case CEPH_MSG_MDS_MAP: 983 case CEPH_MSG_OSD_MAP: 984 m = ceph_msg_new(type, front_len, GFP_NOFS, false); 985 break; 986 } 987 988 if (!m) { 989 pr_info("alloc_msg unknown type %d\n", type); 990 *skip = 1; 991 } 992 return m; 993} 994 995/* 996 * If the monitor connection resets, pick a new monitor and resubmit 997 * any pending requests. 998 */ 999static void mon_fault(struct ceph_connection *con) 1000{ 1001 struct ceph_mon_client *monc = con->private; 1002 1003 if (!monc) 1004 return; 1005 1006 dout("mon_fault\n"); 1007 mutex_lock(&monc->mutex); 1008 if (!con->private) 1009 goto out; 1010 1011 if (!monc->hunting) 1012 pr_info("mon%d %s session lost, " 1013 "hunting for new mon\n", monc->cur_mon, 1014 ceph_pr_addr(&monc->con.peer_addr.in_addr)); 1015 1016 __close_session(monc); 1017 if (!monc->hunting) { 1018 /* start hunting */ 1019 monc->hunting = true; 1020 __open_session(monc); 1021 } else { 1022 /* already hunting, let's wait a bit */ 1023 __schedule_delayed(monc); 1024 } 1025out: 1026 mutex_unlock(&monc->mutex); 1027} 1028 1029/* 1030 * We can ignore refcounting on the connection struct, as all references 1031 * will come from the messenger workqueue, which is drained prior to 1032 * mon_client destruction. 1033 */ 1034static struct ceph_connection *con_get(struct ceph_connection *con) 1035{ 1036 return con; 1037} 1038 1039static void con_put(struct ceph_connection *con) 1040{ 1041} 1042 1043static const struct ceph_connection_operations mon_con_ops = { 1044 .get = con_get, 1045 .put = con_put, 1046 .dispatch = dispatch, 1047 .fault = mon_fault, 1048 .alloc_msg = mon_alloc_msg, 1049}; 1050