lib-msg.c revision 2b2843264260d4d6c4afd0fecf6082736ff86b78
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lnet/lnet/lib-msg.c 37 * 38 * Message decoding, parsing and finalizing routines 39 */ 40 41#define DEBUG_SUBSYSTEM S_LNET 42 43#include <linux/lnet/lib-lnet.h> 44 45void 46lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev) 47{ 48 memset(ev, 0, sizeof(*ev)); 49 50 ev->status = 0; 51 ev->unlinked = 1; 52 ev->type = LNET_EVENT_UNLINK; 53 lnet_md_deconstruct(md, &ev->md); 54 lnet_md2handle(&ev->md_handle, md); 55} 56 57/* 58 * Don't need any lock, must be called after lnet_commit_md 59 */ 60void 61lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type) 62{ 63 lnet_hdr_t *hdr = &msg->msg_hdr; 64 lnet_event_t *ev = &msg->msg_ev; 65 66 LASSERT(!msg->msg_routing); 67 68 ev->type = ev_type; 69 70 if (ev_type == LNET_EVENT_SEND) { 71 /* event for active message */ 72 ev->target.nid = le64_to_cpu(hdr->dest_nid); 73 ev->target.pid = le32_to_cpu(hdr->dest_pid); 74 ev->initiator.nid = LNET_NID_ANY; 75 ev->initiator.pid = the_lnet.ln_pid; 76 ev->sender = LNET_NID_ANY; 77 78 } else { 79 /* event for passive message */ 80 ev->target.pid = hdr->dest_pid; 81 ev->target.nid = hdr->dest_nid; 82 ev->initiator.pid = hdr->src_pid; 83 ev->initiator.nid = hdr->src_nid; 84 ev->rlength = hdr->payload_length; 85 ev->sender = msg->msg_from; 86 ev->mlength = msg->msg_wanted; 87 ev->offset = msg->msg_offset; 88 } 89 90 switch (ev_type) { 91 default: 92 LBUG(); 93 94 case LNET_EVENT_PUT: /* passive PUT */ 95 ev->pt_index = hdr->msg.put.ptl_index; 96 ev->match_bits = hdr->msg.put.match_bits; 97 ev->hdr_data = hdr->msg.put.hdr_data; 98 return; 99 100 case LNET_EVENT_GET: /* passive GET */ 101 ev->pt_index = hdr->msg.get.ptl_index; 102 ev->match_bits = hdr->msg.get.match_bits; 103 ev->hdr_data = 0; 104 return; 105 106 case LNET_EVENT_ACK: /* ACK */ 107 ev->match_bits = hdr->msg.ack.match_bits; 108 ev->mlength = hdr->msg.ack.mlength; 109 return; 110 111 case LNET_EVENT_REPLY: /* REPLY */ 112 return; 113 114 case LNET_EVENT_SEND: /* active message */ 115 if (msg->msg_type == LNET_MSG_PUT) { 116 ev->pt_index = le32_to_cpu(hdr->msg.put.ptl_index); 117 ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits); 118 ev->offset = le32_to_cpu(hdr->msg.put.offset); 119 ev->mlength = 120 ev->rlength = le32_to_cpu(hdr->payload_length); 121 ev->hdr_data = le64_to_cpu(hdr->msg.put.hdr_data); 122 123 } else { 124 LASSERT(msg->msg_type == LNET_MSG_GET); 125 ev->pt_index = le32_to_cpu(hdr->msg.get.ptl_index); 126 ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits); 127 ev->mlength = 128 ev->rlength = le32_to_cpu(hdr->msg.get.sink_length); 129 ev->offset = le32_to_cpu(hdr->msg.get.src_offset); 130 ev->hdr_data = 0; 131 } 132 return; 133 } 134} 135 136void 137lnet_msg_commit(lnet_msg_t *msg, int cpt) 138{ 139 struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt]; 140 lnet_counters_t *counters = the_lnet.ln_counters[cpt]; 141 142 /* routed message can be committed for both receiving and sending */ 143 LASSERT(!msg->msg_tx_committed); 144 145 if (msg->msg_sending) { 146 LASSERT(!msg->msg_receiving); 147 148 msg->msg_tx_cpt = cpt; 149 msg->msg_tx_committed = 1; 150 if (msg->msg_rx_committed) { /* routed message REPLY */ 151 LASSERT(msg->msg_onactivelist); 152 return; 153 } 154 } else { 155 LASSERT(!msg->msg_sending); 156 msg->msg_rx_cpt = cpt; 157 msg->msg_rx_committed = 1; 158 } 159 160 LASSERT(!msg->msg_onactivelist); 161 msg->msg_onactivelist = 1; 162 list_add(&msg->msg_activelist, &container->msc_active); 163 164 counters->msgs_alloc++; 165 if (counters->msgs_alloc > counters->msgs_max) 166 counters->msgs_max = counters->msgs_alloc; 167} 168 169static void 170lnet_msg_decommit_tx(lnet_msg_t *msg, int status) 171{ 172 lnet_counters_t *counters; 173 lnet_event_t *ev = &msg->msg_ev; 174 175 LASSERT(msg->msg_tx_committed); 176 if (status != 0) 177 goto out; 178 179 counters = the_lnet.ln_counters[msg->msg_tx_cpt]; 180 switch (ev->type) { 181 default: /* routed message */ 182 LASSERT(msg->msg_routing); 183 LASSERT(msg->msg_rx_committed); 184 LASSERT(ev->type == 0); 185 186 counters->route_length += msg->msg_len; 187 counters->route_count++; 188 goto out; 189 190 case LNET_EVENT_PUT: 191 /* should have been decommitted */ 192 LASSERT(!msg->msg_rx_committed); 193 /* overwritten while sending ACK */ 194 LASSERT(msg->msg_type == LNET_MSG_ACK); 195 msg->msg_type = LNET_MSG_PUT; /* fix type */ 196 break; 197 198 case LNET_EVENT_SEND: 199 LASSERT(!msg->msg_rx_committed); 200 if (msg->msg_type == LNET_MSG_PUT) 201 counters->send_length += msg->msg_len; 202 break; 203 204 case LNET_EVENT_GET: 205 LASSERT(msg->msg_rx_committed); 206 /* overwritten while sending reply, we should never be 207 * here for optimized GET */ 208 LASSERT(msg->msg_type == LNET_MSG_REPLY); 209 msg->msg_type = LNET_MSG_GET; /* fix type */ 210 break; 211 } 212 213 counters->send_count++; 214 out: 215 lnet_return_tx_credits_locked(msg); 216 msg->msg_tx_committed = 0; 217} 218 219static void 220lnet_msg_decommit_rx(lnet_msg_t *msg, int status) 221{ 222 lnet_counters_t *counters; 223 lnet_event_t *ev = &msg->msg_ev; 224 225 LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */ 226 LASSERT(msg->msg_rx_committed); 227 228 if (status != 0) 229 goto out; 230 231 counters = the_lnet.ln_counters[msg->msg_rx_cpt]; 232 switch (ev->type) { 233 default: 234 LASSERT(ev->type == 0); 235 LASSERT(msg->msg_routing); 236 goto out; 237 238 case LNET_EVENT_ACK: 239 LASSERT(msg->msg_type == LNET_MSG_ACK); 240 break; 241 242 case LNET_EVENT_GET: 243 /* type is "REPLY" if it's an optimized GET on passive side, 244 * because optimized GET will never be committed for sending, 245 * so message type wouldn't be changed back to "GET" by 246 * lnet_msg_decommit_tx(), see details in lnet_parse_get() */ 247 LASSERT(msg->msg_type == LNET_MSG_REPLY || 248 msg->msg_type == LNET_MSG_GET); 249 counters->send_length += msg->msg_wanted; 250 break; 251 252 case LNET_EVENT_PUT: 253 LASSERT(msg->msg_type == LNET_MSG_PUT); 254 break; 255 256 case LNET_EVENT_REPLY: 257 /* type is "GET" if it's an optimized GET on active side, 258 * see details in lnet_create_reply_msg() */ 259 LASSERT(msg->msg_type == LNET_MSG_GET || 260 msg->msg_type == LNET_MSG_REPLY); 261 break; 262 } 263 264 counters->recv_count++; 265 if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY) 266 counters->recv_length += msg->msg_wanted; 267 268 out: 269 lnet_return_rx_credits_locked(msg); 270 msg->msg_rx_committed = 0; 271} 272 273void 274lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status) 275{ 276 int cpt2 = cpt; 277 278 LASSERT(msg->msg_tx_committed || msg->msg_rx_committed); 279 LASSERT(msg->msg_onactivelist); 280 281 if (msg->msg_tx_committed) { /* always decommit for sending first */ 282 LASSERT(cpt == msg->msg_tx_cpt); 283 lnet_msg_decommit_tx(msg, status); 284 } 285 286 if (msg->msg_rx_committed) { 287 /* forwarding msg committed for both receiving and sending */ 288 if (cpt != msg->msg_rx_cpt) { 289 lnet_net_unlock(cpt); 290 cpt2 = msg->msg_rx_cpt; 291 lnet_net_lock(cpt2); 292 } 293 lnet_msg_decommit_rx(msg, status); 294 } 295 296 list_del(&msg->msg_activelist); 297 msg->msg_onactivelist = 0; 298 299 the_lnet.ln_counters[cpt2]->msgs_alloc--; 300 301 if (cpt2 != cpt) { 302 lnet_net_unlock(cpt2); 303 lnet_net_lock(cpt); 304 } 305} 306 307void 308lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md, 309 unsigned int offset, unsigned int mlen) 310{ 311 /* NB: @offset and @len are only useful for receiving */ 312 /* Here, we attach the MD on lnet_msg and mark it busy and 313 * decrementing its threshold. Come what may, the lnet_msg "owns" 314 * the MD until a call to lnet_msg_detach_md or lnet_finalize() 315 * signals completion. */ 316 LASSERT(!msg->msg_routing); 317 318 msg->msg_md = md; 319 if (msg->msg_receiving) { /* committed for receiving */ 320 msg->msg_offset = offset; 321 msg->msg_wanted = mlen; 322 } 323 324 md->md_refcount++; 325 if (md->md_threshold != LNET_MD_THRESH_INF) { 326 LASSERT(md->md_threshold > 0); 327 md->md_threshold--; 328 } 329 330 /* build umd in event */ 331 lnet_md2handle(&msg->msg_ev.md_handle, md); 332 lnet_md_deconstruct(md, &msg->msg_ev.md); 333} 334 335void 336lnet_msg_detach_md(lnet_msg_t *msg, int status) 337{ 338 lnet_libmd_t *md = msg->msg_md; 339 int unlink; 340 341 /* Now it's safe to drop my caller's ref */ 342 md->md_refcount--; 343 LASSERT(md->md_refcount >= 0); 344 345 unlink = lnet_md_unlinkable(md); 346 if (md->md_eq != NULL) { 347 msg->msg_ev.status = status; 348 msg->msg_ev.unlinked = unlink; 349 lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev); 350 } 351 352 if (unlink) 353 lnet_md_unlink(md); 354 355 msg->msg_md = NULL; 356} 357 358static int 359lnet_complete_msg_locked(lnet_msg_t *msg, int cpt) 360{ 361 lnet_handle_wire_t ack_wmd; 362 int rc; 363 int status = msg->msg_ev.status; 364 365 LASSERT (msg->msg_onactivelist); 366 367 if (status == 0 && msg->msg_ack) { 368 /* Only send an ACK if the PUT completed successfully */ 369 370 lnet_msg_decommit(msg, cpt, 0); 371 372 msg->msg_ack = 0; 373 lnet_net_unlock(cpt); 374 375 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT); 376 LASSERT(!msg->msg_routing); 377 378 ack_wmd = msg->msg_hdr.msg.put.ack_wmd; 379 380 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0); 381 382 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd; 383 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits; 384 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength); 385 386 /* NB: we probably want to use NID of msg::msg_from as 3rd 387 * parameter (router NID) if it's routed message */ 388 rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY); 389 390 lnet_net_lock(cpt); 391 /* 392 * NB: message is committed for sending, we should return 393 * on success because LND will finalize this message later. 394 * 395 * Also, there is possibility that message is committed for 396 * sending and also failed before delivering to LND, 397 * i.e: ENOMEM, in that case we can't fall through either 398 * because CPT for sending can be different with CPT for 399 * receiving, so we should return back to lnet_finalize() 400 * to make sure we are locking the correct partition. 401 */ 402 return rc; 403 404 } else if (status == 0 && /* OK so far */ 405 (msg->msg_routing && !msg->msg_sending)) { 406 /* not forwarded */ 407 LASSERT(!msg->msg_receiving); /* called back recv already */ 408 lnet_net_unlock(cpt); 409 410 rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY); 411 412 lnet_net_lock(cpt); 413 /* 414 * NB: message is committed for sending, we should return 415 * on success because LND will finalize this message later. 416 * 417 * Also, there is possibility that message is committed for 418 * sending and also failed before delivering to LND, 419 * i.e: ENOMEM, in that case we can't fall through either: 420 * - The rule is message must decommit for sending first if 421 * the it's committed for both sending and receiving 422 * - CPT for sending can be different with CPT for receiving, 423 * so we should return back to lnet_finalize() to make 424 * sure we are locking the correct partition. 425 */ 426 return rc; 427 } 428 429 lnet_msg_decommit(msg, cpt, status); 430 lnet_msg_free_locked(msg); 431 return 0; 432} 433 434void 435lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status) 436{ 437 struct lnet_msg_container *container; 438 int my_slot; 439 int cpt; 440 int rc; 441 int i; 442 443 LASSERT (!in_interrupt ()); 444 445 if (msg == NULL) 446 return; 447#if 0 448 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n", 449 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target), 450 msg->msg_target_is_router ? "t" : "", 451 msg->msg_routing ? "X" : "", 452 msg->msg_ack ? "A" : "", 453 msg->msg_sending ? "S" : "", 454 msg->msg_receiving ? "R" : "", 455 msg->msg_delayed ? "d" : "", 456 msg->msg_txcredit ? "C" : "", 457 msg->msg_peertxcredit ? "c" : "", 458 msg->msg_rtrcredit ? "F" : "", 459 msg->msg_peerrtrcredit ? "f" : "", 460 msg->msg_onactivelist ? "!" : "", 461 msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid), 462 msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid)); 463#endif 464 msg->msg_ev.status = status; 465 466 if (msg->msg_md != NULL) { 467 cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie); 468 469 lnet_res_lock(cpt); 470 lnet_msg_detach_md(msg, status); 471 lnet_res_unlock(cpt); 472 } 473 474 again: 475 rc = 0; 476 if (!msg->msg_tx_committed && !msg->msg_rx_committed) { 477 /* not committed to network yet */ 478 LASSERT(!msg->msg_onactivelist); 479 lnet_msg_free(msg); 480 return; 481 } 482 483 /* 484 * NB: routed message can be committed for both receiving and sending, 485 * we should finalize in LIFO order and keep counters correct. 486 * (finalize sending first then finalize receiving) 487 */ 488 cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt; 489 lnet_net_lock(cpt); 490 491 container = the_lnet.ln_msg_containers[cpt]; 492 list_add_tail(&msg->msg_list, &container->msc_finalizing); 493 494 /* Recursion breaker. Don't complete the message here if I am (or 495 * enough other threads are) already completing messages */ 496 497 my_slot = -1; 498 for (i = 0; i < container->msc_nfinalizers; i++) { 499 if (container->msc_finalizers[i] == current) 500 break; 501 502 if (my_slot < 0 && container->msc_finalizers[i] == NULL) 503 my_slot = i; 504 } 505 506 if (i < container->msc_nfinalizers || my_slot < 0) { 507 lnet_net_unlock(cpt); 508 return; 509 } 510 511 container->msc_finalizers[my_slot] = current; 512 513 while (!list_empty(&container->msc_finalizing)) { 514 msg = list_entry(container->msc_finalizing.next, 515 lnet_msg_t, msg_list); 516 517 list_del(&msg->msg_list); 518 519 /* NB drops and regains the lnet lock if it actually does 520 * anything, so my finalizing friends can chomp along too */ 521 rc = lnet_complete_msg_locked(msg, cpt); 522 if (rc != 0) 523 break; 524 } 525 526 container->msc_finalizers[my_slot] = NULL; 527 lnet_net_unlock(cpt); 528 529 if (rc != 0) 530 goto again; 531} 532EXPORT_SYMBOL(lnet_finalize); 533 534void 535lnet_msg_container_cleanup(struct lnet_msg_container *container) 536{ 537 int count = 0; 538 539 if (container->msc_init == 0) 540 return; 541 542 while (!list_empty(&container->msc_active)) { 543 lnet_msg_t *msg = list_entry(container->msc_active.next, 544 lnet_msg_t, msg_activelist); 545 546 LASSERT(msg->msg_onactivelist); 547 msg->msg_onactivelist = 0; 548 list_del(&msg->msg_activelist); 549 lnet_msg_free(msg); 550 count++; 551 } 552 553 if (count > 0) 554 CERROR("%d active msg on exit\n", count); 555 556 if (container->msc_finalizers != NULL) { 557 LIBCFS_FREE(container->msc_finalizers, 558 container->msc_nfinalizers * 559 sizeof(*container->msc_finalizers)); 560 container->msc_finalizers = NULL; 561 } 562#ifdef LNET_USE_LIB_FREELIST 563 lnet_freelist_fini(&container->msc_freelist); 564#endif 565 container->msc_init = 0; 566} 567 568int 569lnet_msg_container_setup(struct lnet_msg_container *container, int cpt) 570{ 571 int rc; 572 573 container->msc_init = 1; 574 575 INIT_LIST_HEAD(&container->msc_active); 576 INIT_LIST_HEAD(&container->msc_finalizing); 577 578#ifdef LNET_USE_LIB_FREELIST 579 memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t)); 580 581 rc = lnet_freelist_init(&container->msc_freelist, 582 LNET_FL_MAX_MSGS, sizeof(lnet_msg_t)); 583 if (rc != 0) { 584 CERROR("Failed to init freelist for message container\n"); 585 lnet_msg_container_cleanup(container); 586 return rc; 587 } 588#else 589 rc = 0; 590#endif 591 /* number of CPUs */ 592 container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt); 593 594 LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt, 595 container->msc_nfinalizers * 596 sizeof(*container->msc_finalizers)); 597 598 if (container->msc_finalizers == NULL) { 599 CERROR("Failed to allocate message finalizers\n"); 600 lnet_msg_container_cleanup(container); 601 return -ENOMEM; 602 } 603 604 return rc; 605} 606 607void 608lnet_msg_containers_destroy(void) 609{ 610 struct lnet_msg_container *container; 611 int i; 612 613 if (the_lnet.ln_msg_containers == NULL) 614 return; 615 616 cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) 617 lnet_msg_container_cleanup(container); 618 619 cfs_percpt_free(the_lnet.ln_msg_containers); 620 the_lnet.ln_msg_containers = NULL; 621} 622 623int 624lnet_msg_containers_create(void) 625{ 626 struct lnet_msg_container *container; 627 int rc; 628 int i; 629 630 the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(), 631 sizeof(*container)); 632 633 if (the_lnet.ln_msg_containers == NULL) { 634 CERROR("Failed to allocate cpu-partition data for network\n"); 635 return -ENOMEM; 636 } 637 638 cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) { 639 rc = lnet_msg_container_setup(container, i); 640 if (rc != 0) { 641 lnet_msg_containers_destroy(); 642 return rc; 643 } 644 } 645 646 return 0; 647} 648