conrpc.c revision 4d72b5afc0bd3e97bb5871b52fd7904c78961ff0
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lnet/selftest/conctl.c 37 * 38 * Console framework rpcs 39 * 40 * Author: Liang Zhen <liang@whamcloud.com> 41 */ 42 43 44#include <linux/libcfs/libcfs.h> 45#include <linux/lnet/lib-lnet.h> 46#include "timer.h" 47#include "conrpc.h" 48#include "console.h" 49 50void lstcon_rpc_stat_reply(lstcon_rpc_trans_t *, srpc_msg_t *, 51 lstcon_node_t *, lstcon_trans_stat_t *); 52 53static void 54lstcon_rpc_done(srpc_client_rpc_t *rpc) 55{ 56 lstcon_rpc_t *crpc = (lstcon_rpc_t *)rpc->crpc_priv; 57 58 LASSERT(crpc != NULL && rpc == crpc->crp_rpc); 59 LASSERT(crpc->crp_posted && !crpc->crp_finished); 60 61 spin_lock(&rpc->crpc_lock); 62 63 if (crpc->crp_trans == NULL) { 64 /* Orphan RPC is not in any transaction, 65 * I'm just a poor body and nobody loves me */ 66 spin_unlock(&rpc->crpc_lock); 67 68 /* release it */ 69 lstcon_rpc_put(crpc); 70 return; 71 } 72 73 /* not an orphan RPC */ 74 crpc->crp_finished = 1; 75 76 if (crpc->crp_stamp == 0) { 77 /* not aborted */ 78 LASSERT(crpc->crp_status == 0); 79 80 crpc->crp_stamp = cfs_time_current(); 81 crpc->crp_status = rpc->crpc_status; 82 } 83 84 /* wakeup (transaction)thread if I'm the last RPC in the transaction */ 85 if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining)) 86 wake_up(&crpc->crp_trans->tas_waitq); 87 88 spin_unlock(&rpc->crpc_lock); 89} 90 91int 92lstcon_rpc_init(lstcon_node_t *nd, int service, unsigned feats, 93 int bulk_npg, int bulk_len, int embedded, lstcon_rpc_t *crpc) 94{ 95 crpc->crp_rpc = sfw_create_rpc(nd->nd_id, service, 96 feats, bulk_npg, bulk_len, 97 lstcon_rpc_done, (void *)crpc); 98 if (crpc->crp_rpc == NULL) 99 return -ENOMEM; 100 101 crpc->crp_trans = NULL; 102 crpc->crp_node = nd; 103 crpc->crp_posted = 0; 104 crpc->crp_finished = 0; 105 crpc->crp_unpacked = 0; 106 crpc->crp_status = 0; 107 crpc->crp_stamp = 0; 108 crpc->crp_embedded = embedded; 109 INIT_LIST_HEAD(&crpc->crp_link); 110 111 atomic_inc(&console_session.ses_rpc_counter); 112 113 return 0; 114} 115 116int 117lstcon_rpc_prep(lstcon_node_t *nd, int service, unsigned feats, 118 int bulk_npg, int bulk_len, lstcon_rpc_t **crpcpp) 119{ 120 lstcon_rpc_t *crpc = NULL; 121 int rc; 122 123 spin_lock(&console_session.ses_rpc_lock); 124 125 if (!list_empty(&console_session.ses_rpc_freelist)) { 126 crpc = list_entry(console_session.ses_rpc_freelist.next, 127 lstcon_rpc_t, crp_link); 128 list_del_init(&crpc->crp_link); 129 } 130 131 spin_unlock(&console_session.ses_rpc_lock); 132 133 if (crpc == NULL) { 134 LIBCFS_ALLOC(crpc, sizeof(*crpc)); 135 if (crpc == NULL) 136 return -ENOMEM; 137 } 138 139 rc = lstcon_rpc_init(nd, service, feats, bulk_npg, bulk_len, 0, crpc); 140 if (rc == 0) { 141 *crpcpp = crpc; 142 return 0; 143 } 144 145 LIBCFS_FREE(crpc, sizeof(*crpc)); 146 147 return rc; 148} 149 150void 151lstcon_rpc_put(lstcon_rpc_t *crpc) 152{ 153 srpc_bulk_t *bulk = &crpc->crp_rpc->crpc_bulk; 154 int i; 155 156 LASSERT(list_empty(&crpc->crp_link)); 157 158 for (i = 0; i < bulk->bk_niov; i++) { 159 if (bulk->bk_iovs[i].kiov_page == NULL) 160 continue; 161 162 __free_page(bulk->bk_iovs[i].kiov_page); 163 } 164 165 srpc_client_rpc_decref(crpc->crp_rpc); 166 167 if (crpc->crp_embedded) { 168 /* embedded RPC, don't recycle it */ 169 memset(crpc, 0, sizeof(*crpc)); 170 crpc->crp_embedded = 1; 171 172 } else { 173 spin_lock(&console_session.ses_rpc_lock); 174 175 list_add(&crpc->crp_link, 176 &console_session.ses_rpc_freelist); 177 178 spin_unlock(&console_session.ses_rpc_lock); 179 } 180 181 /* RPC is not alive now */ 182 atomic_dec(&console_session.ses_rpc_counter); 183} 184 185void 186lstcon_rpc_post(lstcon_rpc_t *crpc) 187{ 188 lstcon_rpc_trans_t *trans = crpc->crp_trans; 189 190 LASSERT(trans != NULL); 191 192 atomic_inc(&trans->tas_remaining); 193 crpc->crp_posted = 1; 194 195 sfw_post_rpc(crpc->crp_rpc); 196} 197 198static char * 199lstcon_rpc_trans_name(int transop) 200{ 201 if (transop == LST_TRANS_SESNEW) 202 return "SESNEW"; 203 204 if (transop == LST_TRANS_SESEND) 205 return "SESEND"; 206 207 if (transop == LST_TRANS_SESQRY) 208 return "SESQRY"; 209 210 if (transop == LST_TRANS_SESPING) 211 return "SESPING"; 212 213 if (transop == LST_TRANS_TSBCLIADD) 214 return "TSBCLIADD"; 215 216 if (transop == LST_TRANS_TSBSRVADD) 217 return "TSBSRVADD"; 218 219 if (transop == LST_TRANS_TSBRUN) 220 return "TSBRUN"; 221 222 if (transop == LST_TRANS_TSBSTOP) 223 return "TSBSTOP"; 224 225 if (transop == LST_TRANS_TSBCLIQRY) 226 return "TSBCLIQRY"; 227 228 if (transop == LST_TRANS_TSBSRVQRY) 229 return "TSBSRVQRY"; 230 231 if (transop == LST_TRANS_STATQRY) 232 return "STATQRY"; 233 234 return "Unknown"; 235} 236 237int 238lstcon_rpc_trans_prep(struct list_head *translist, 239 int transop, lstcon_rpc_trans_t **transpp) 240{ 241 lstcon_rpc_trans_t *trans; 242 243 if (translist != NULL) { 244 list_for_each_entry(trans, translist, tas_link) { 245 /* Can't enqueue two private transaction on 246 * the same object */ 247 if ((trans->tas_opc & transop) == LST_TRANS_PRIVATE) 248 return -EPERM; 249 } 250 } 251 252 /* create a trans group */ 253 LIBCFS_ALLOC(trans, sizeof(*trans)); 254 if (trans == NULL) 255 return -ENOMEM; 256 257 trans->tas_opc = transop; 258 259 if (translist == NULL) 260 INIT_LIST_HEAD(&trans->tas_olink); 261 else 262 list_add_tail(&trans->tas_olink, translist); 263 264 list_add_tail(&trans->tas_link, &console_session.ses_trans_list); 265 266 INIT_LIST_HEAD(&trans->tas_rpcs_list); 267 atomic_set(&trans->tas_remaining, 0); 268 init_waitqueue_head(&trans->tas_waitq); 269 270 spin_lock(&console_session.ses_rpc_lock); 271 trans->tas_features = console_session.ses_features; 272 spin_unlock(&console_session.ses_rpc_lock); 273 274 *transpp = trans; 275 return 0; 276} 277 278void 279lstcon_rpc_trans_addreq(lstcon_rpc_trans_t *trans, lstcon_rpc_t *crpc) 280{ 281 list_add_tail(&crpc->crp_link, &trans->tas_rpcs_list); 282 crpc->crp_trans = trans; 283} 284 285void 286lstcon_rpc_trans_abort(lstcon_rpc_trans_t *trans, int error) 287{ 288 srpc_client_rpc_t *rpc; 289 lstcon_rpc_t *crpc; 290 lstcon_node_t *nd; 291 292 list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) { 293 rpc = crpc->crp_rpc; 294 295 spin_lock(&rpc->crpc_lock); 296 297 if (!crpc->crp_posted || /* not posted */ 298 crpc->crp_stamp != 0) { /* rpc done or aborted already */ 299 if (crpc->crp_stamp == 0) { 300 crpc->crp_stamp = cfs_time_current(); 301 crpc->crp_status = -EINTR; 302 } 303 spin_unlock(&rpc->crpc_lock); 304 continue; 305 } 306 307 crpc->crp_stamp = cfs_time_current(); 308 crpc->crp_status = error; 309 310 spin_unlock(&rpc->crpc_lock); 311 312 sfw_abort_rpc(rpc); 313 314 if (error != ETIMEDOUT) 315 continue; 316 317 nd = crpc->crp_node; 318 if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp)) 319 continue; 320 321 nd->nd_stamp = crpc->crp_stamp; 322 nd->nd_state = LST_NODE_DOWN; 323 } 324} 325 326static int 327lstcon_rpc_trans_check(lstcon_rpc_trans_t *trans) 328{ 329 if (console_session.ses_shutdown && 330 !list_empty(&trans->tas_olink)) /* Not an end session RPC */ 331 return 1; 332 333 return (atomic_read(&trans->tas_remaining) == 0) ? 1: 0; 334} 335 336int 337lstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout) 338{ 339 lstcon_rpc_t *crpc; 340 int rc; 341 342 if (list_empty(&trans->tas_rpcs_list)) 343 return 0; 344 345 if (timeout < LST_TRANS_MIN_TIMEOUT) 346 timeout = LST_TRANS_MIN_TIMEOUT; 347 348 CDEBUG(D_NET, "Transaction %s started\n", 349 lstcon_rpc_trans_name(trans->tas_opc)); 350 351 /* post all requests */ 352 list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) { 353 LASSERT(!crpc->crp_posted); 354 355 lstcon_rpc_post(crpc); 356 } 357 358 mutex_unlock(&console_session.ses_mutex); 359 360 rc = wait_event_interruptible_timeout(trans->tas_waitq, 361 lstcon_rpc_trans_check(trans), 362 cfs_time_seconds(timeout)); 363 rc = (rc > 0) ? 0 : ((rc < 0) ? -EINTR : -ETIMEDOUT); 364 365 mutex_lock(&console_session.ses_mutex); 366 367 if (console_session.ses_shutdown) 368 rc = -ESHUTDOWN; 369 370 if (rc != 0 || atomic_read(&trans->tas_remaining) != 0) { 371 /* treat short timeout as canceled */ 372 if (rc == -ETIMEDOUT && timeout < LST_TRANS_MIN_TIMEOUT * 2) 373 rc = -EINTR; 374 375 lstcon_rpc_trans_abort(trans, rc); 376 } 377 378 CDEBUG(D_NET, "Transaction %s stopped: %d\n", 379 lstcon_rpc_trans_name(trans->tas_opc), rc); 380 381 lstcon_rpc_trans_stat(trans, lstcon_trans_stat()); 382 383 return rc; 384} 385 386int 387lstcon_rpc_get_reply(lstcon_rpc_t *crpc, srpc_msg_t **msgpp) 388{ 389 lstcon_node_t *nd = crpc->crp_node; 390 srpc_client_rpc_t *rpc = crpc->crp_rpc; 391 srpc_generic_reply_t *rep; 392 393 LASSERT(nd != NULL && rpc != NULL); 394 LASSERT(crpc->crp_stamp != 0); 395 396 if (crpc->crp_status != 0) { 397 *msgpp = NULL; 398 return crpc->crp_status; 399 } 400 401 *msgpp = &rpc->crpc_replymsg; 402 if (!crpc->crp_unpacked) { 403 sfw_unpack_message(*msgpp); 404 crpc->crp_unpacked = 1; 405 } 406 407 if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp)) 408 return 0; 409 410 nd->nd_stamp = crpc->crp_stamp; 411 rep = &(*msgpp)->msg_body.reply; 412 413 if (rep->sid.ses_nid == LNET_NID_ANY) 414 nd->nd_state = LST_NODE_UNKNOWN; 415 else if (lstcon_session_match(rep->sid)) 416 nd->nd_state = LST_NODE_ACTIVE; 417 else 418 nd->nd_state = LST_NODE_BUSY; 419 420 return 0; 421} 422 423void 424lstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, lstcon_trans_stat_t *stat) 425{ 426 lstcon_rpc_t *crpc; 427 srpc_msg_t *rep; 428 int error; 429 430 LASSERT(stat != NULL); 431 432 memset(stat, 0, sizeof(*stat)); 433 434 list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) { 435 lstcon_rpc_stat_total(stat, 1); 436 437 LASSERT(crpc->crp_stamp != 0); 438 439 error = lstcon_rpc_get_reply(crpc, &rep); 440 if (error != 0) { 441 lstcon_rpc_stat_failure(stat, 1); 442 if (stat->trs_rpc_errno == 0) 443 stat->trs_rpc_errno = -error; 444 445 continue; 446 } 447 448 lstcon_rpc_stat_success(stat, 1); 449 450 lstcon_rpc_stat_reply(trans, rep, crpc->crp_node, stat); 451 } 452 453 if (trans->tas_opc == LST_TRANS_SESNEW && stat->trs_fwk_errno == 0) { 454 stat->trs_fwk_errno = 455 lstcon_session_feats_check(trans->tas_features); 456 } 457 458 CDEBUG(D_NET, "transaction %s : success %d, failure %d, total %d, " 459 "RPC error(%d), Framework error(%d)\n", 460 lstcon_rpc_trans_name(trans->tas_opc), 461 lstcon_rpc_stat_success(stat, 0), 462 lstcon_rpc_stat_failure(stat, 0), 463 lstcon_rpc_stat_total(stat, 0), 464 stat->trs_rpc_errno, stat->trs_fwk_errno); 465 466 return; 467} 468 469int 470lstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans, 471 struct list_head *head_up, 472 lstcon_rpc_readent_func_t readent) 473{ 474 struct list_head tmp; 475 struct list_head *next; 476 lstcon_rpc_ent_t *ent; 477 srpc_generic_reply_t *rep; 478 lstcon_rpc_t *crpc; 479 srpc_msg_t *msg; 480 lstcon_node_t *nd; 481 cfs_duration_t dur; 482 struct timeval tv; 483 int error; 484 485 LASSERT(head_up != NULL); 486 487 next = head_up; 488 489 list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) { 490 if (copy_from_user(&tmp, next, 491 sizeof(struct list_head))) 492 return -EFAULT; 493 494 if (tmp.next == head_up) 495 return 0; 496 497 next = tmp.next; 498 499 ent = list_entry(next, lstcon_rpc_ent_t, rpe_link); 500 501 LASSERT(crpc->crp_stamp != 0); 502 503 error = lstcon_rpc_get_reply(crpc, &msg); 504 505 nd = crpc->crp_node; 506 507 dur = (cfs_duration_t)cfs_time_sub(crpc->crp_stamp, 508 (cfs_time_t)console_session.ses_id.ses_stamp); 509 cfs_duration_usec(dur, &tv); 510 511 if (copy_to_user(&ent->rpe_peer, 512 &nd->nd_id, sizeof(lnet_process_id_t)) || 513 copy_to_user(&ent->rpe_stamp, &tv, sizeof(tv)) || 514 copy_to_user(&ent->rpe_state, 515 &nd->nd_state, sizeof(nd->nd_state)) || 516 copy_to_user(&ent->rpe_rpc_errno, &error, 517 sizeof(error))) 518 return -EFAULT; 519 520 if (error != 0) 521 continue; 522 523 /* RPC is done */ 524 rep = (srpc_generic_reply_t *)&msg->msg_body.reply; 525 526 if (copy_to_user(&ent->rpe_sid, 527 &rep->sid, sizeof(lst_sid_t)) || 528 copy_to_user(&ent->rpe_fwk_errno, 529 &rep->status, sizeof(rep->status))) 530 return -EFAULT; 531 532 if (readent == NULL) 533 continue; 534 535 if ((error = readent(trans->tas_opc, msg, ent)) != 0) 536 return error; 537 } 538 539 return 0; 540} 541 542void 543lstcon_rpc_trans_destroy(lstcon_rpc_trans_t *trans) 544{ 545 srpc_client_rpc_t *rpc; 546 lstcon_rpc_t *crpc; 547 lstcon_rpc_t *tmp; 548 int count = 0; 549 550 list_for_each_entry_safe(crpc, tmp, &trans->tas_rpcs_list, 551 crp_link) { 552 rpc = crpc->crp_rpc; 553 554 spin_lock(&rpc->crpc_lock); 555 556 /* free it if not posted or finished already */ 557 if (!crpc->crp_posted || crpc->crp_finished) { 558 spin_unlock(&rpc->crpc_lock); 559 560 list_del_init(&crpc->crp_link); 561 lstcon_rpc_put(crpc); 562 563 continue; 564 } 565 566 /* rpcs can be still not callbacked (even LNetMDUnlink is called) 567 * because huge timeout for inaccessible network, don't make 568 * user wait for them, just abandon them, they will be recycled 569 * in callback */ 570 571 LASSERT(crpc->crp_status != 0); 572 573 crpc->crp_node = NULL; 574 crpc->crp_trans = NULL; 575 list_del_init(&crpc->crp_link); 576 count ++; 577 578 spin_unlock(&rpc->crpc_lock); 579 580 atomic_dec(&trans->tas_remaining); 581 } 582 583 LASSERT(atomic_read(&trans->tas_remaining) == 0); 584 585 list_del(&trans->tas_link); 586 if (!list_empty(&trans->tas_olink)) 587 list_del(&trans->tas_olink); 588 589 CDEBUG(D_NET, "Transaction %s destroyed with %d pending RPCs\n", 590 lstcon_rpc_trans_name(trans->tas_opc), count); 591 592 LIBCFS_FREE(trans, sizeof(*trans)); 593 594 return; 595} 596 597int 598lstcon_sesrpc_prep(lstcon_node_t *nd, int transop, 599 unsigned feats, lstcon_rpc_t **crpc) 600{ 601 srpc_mksn_reqst_t *msrq; 602 srpc_rmsn_reqst_t *rsrq; 603 int rc; 604 605 switch (transop) { 606 case LST_TRANS_SESNEW: 607 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_MAKE_SESSION, 608 feats, 0, 0, crpc); 609 if (rc != 0) 610 return rc; 611 612 msrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.mksn_reqst; 613 msrq->mksn_sid = console_session.ses_id; 614 msrq->mksn_force = console_session.ses_force; 615 strncpy(msrq->mksn_name, console_session.ses_name, 616 strlen(console_session.ses_name)); 617 break; 618 619 case LST_TRANS_SESEND: 620 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_REMOVE_SESSION, 621 feats, 0, 0, crpc); 622 if (rc != 0) 623 return rc; 624 625 rsrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.rmsn_reqst; 626 rsrq->rmsn_sid = console_session.ses_id; 627 break; 628 629 default: 630 LBUG(); 631 } 632 633 return 0; 634} 635 636int 637lstcon_dbgrpc_prep(lstcon_node_t *nd, unsigned feats, lstcon_rpc_t **crpc) 638{ 639 srpc_debug_reqst_t *drq; 640 int rc; 641 642 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_DEBUG, feats, 0, 0, crpc); 643 if (rc != 0) 644 return rc; 645 646 drq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst; 647 648 drq->dbg_sid = console_session.ses_id; 649 drq->dbg_flags = 0; 650 651 return rc; 652} 653 654int 655lstcon_batrpc_prep(lstcon_node_t *nd, int transop, unsigned feats, 656 lstcon_tsb_hdr_t *tsb, lstcon_rpc_t **crpc) 657{ 658 lstcon_batch_t *batch; 659 srpc_batch_reqst_t *brq; 660 int rc; 661 662 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_BATCH, feats, 0, 0, crpc); 663 if (rc != 0) 664 return rc; 665 666 brq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.bat_reqst; 667 668 brq->bar_sid = console_session.ses_id; 669 brq->bar_bid = tsb->tsb_id; 670 brq->bar_testidx = tsb->tsb_index; 671 brq->bar_opc = transop == LST_TRANS_TSBRUN ? SRPC_BATCH_OPC_RUN : 672 (transop == LST_TRANS_TSBSTOP ? SRPC_BATCH_OPC_STOP: 673 SRPC_BATCH_OPC_QUERY); 674 675 if (transop != LST_TRANS_TSBRUN && 676 transop != LST_TRANS_TSBSTOP) 677 return 0; 678 679 LASSERT(tsb->tsb_index == 0); 680 681 batch = (lstcon_batch_t *)tsb; 682 brq->bar_arg = batch->bat_arg; 683 684 return 0; 685} 686 687int 688lstcon_statrpc_prep(lstcon_node_t *nd, unsigned feats, lstcon_rpc_t **crpc) 689{ 690 srpc_stat_reqst_t *srq; 691 int rc; 692 693 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_QUERY_STAT, feats, 0, 0, crpc); 694 if (rc != 0) 695 return rc; 696 697 srq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.stat_reqst; 698 699 srq->str_sid = console_session.ses_id; 700 srq->str_type = 0; /* XXX remove it */ 701 702 return 0; 703} 704 705lnet_process_id_packed_t * 706lstcon_next_id(int idx, int nkiov, lnet_kiov_t *kiov) 707{ 708 lnet_process_id_packed_t *pid; 709 int i; 710 711 i = idx / SFW_ID_PER_PAGE; 712 713 LASSERT(i < nkiov); 714 715 pid = (lnet_process_id_packed_t *)page_address(kiov[i].kiov_page); 716 717 return &pid[idx % SFW_ID_PER_PAGE]; 718} 719 720int 721lstcon_dstnodes_prep(lstcon_group_t *grp, int idx, 722 int dist, int span, int nkiov, lnet_kiov_t *kiov) 723{ 724 lnet_process_id_packed_t *pid; 725 lstcon_ndlink_t *ndl; 726 lstcon_node_t *nd; 727 int start; 728 int end; 729 int i = 0; 730 731 LASSERT(dist >= 1); 732 LASSERT(span >= 1); 733 LASSERT(grp->grp_nnode >= 1); 734 735 if (span > grp->grp_nnode) 736 return -EINVAL; 737 738 start = ((idx / dist) * span) % grp->grp_nnode; 739 end = ((idx / dist) * span + span - 1) % grp->grp_nnode; 740 741 list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) { 742 nd = ndl->ndl_node; 743 if (i < start) { 744 i ++; 745 continue; 746 } 747 748 if (i > (end >= start ? end: grp->grp_nnode)) 749 break; 750 751 pid = lstcon_next_id((i - start), nkiov, kiov); 752 pid->nid = nd->nd_id.nid; 753 pid->pid = nd->nd_id.pid; 754 i++; 755 } 756 757 if (start <= end) /* done */ 758 return 0; 759 760 list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) { 761 if (i > grp->grp_nnode + end) 762 break; 763 764 nd = ndl->ndl_node; 765 pid = lstcon_next_id((i - start), nkiov, kiov); 766 pid->nid = nd->nd_id.nid; 767 pid->pid = nd->nd_id.pid; 768 i++; 769 } 770 771 return 0; 772} 773 774int 775lstcon_pingrpc_prep(lst_test_ping_param_t *param, srpc_test_reqst_t *req) 776{ 777 test_ping_req_t *prq = &req->tsr_u.ping; 778 779 prq->png_size = param->png_size; 780 prq->png_flags = param->png_flags; 781 /* TODO dest */ 782 return 0; 783} 784 785int 786lstcon_bulkrpc_v0_prep(lst_test_bulk_param_t *param, srpc_test_reqst_t *req) 787{ 788 test_bulk_req_t *brq = &req->tsr_u.bulk_v0; 789 790 brq->blk_opc = param->blk_opc; 791 brq->blk_npg = (param->blk_size + PAGE_CACHE_SIZE - 1) / PAGE_CACHE_SIZE; 792 brq->blk_flags = param->blk_flags; 793 794 return 0; 795} 796 797int 798lstcon_bulkrpc_v1_prep(lst_test_bulk_param_t *param, srpc_test_reqst_t *req) 799{ 800 test_bulk_req_v1_t *brq = &req->tsr_u.bulk_v1; 801 802 brq->blk_opc = param->blk_opc; 803 brq->blk_flags = param->blk_flags; 804 brq->blk_len = param->blk_size; 805 brq->blk_offset = 0; /* reserved */ 806 807 return 0; 808} 809 810int 811lstcon_testrpc_prep(lstcon_node_t *nd, int transop, unsigned feats, 812 lstcon_test_t *test, lstcon_rpc_t **crpc) 813{ 814 lstcon_group_t *sgrp = test->tes_src_grp; 815 lstcon_group_t *dgrp = test->tes_dst_grp; 816 srpc_test_reqst_t *trq; 817 srpc_bulk_t *bulk; 818 int i; 819 int npg = 0; 820 int nob = 0; 821 int rc = 0; 822 823 if (transop == LST_TRANS_TSBCLIADD) { 824 npg = sfw_id_pages(test->tes_span); 825 nob = (feats & LST_FEAT_BULK_LEN) == 0 ? 826 npg * PAGE_CACHE_SIZE : 827 sizeof(lnet_process_id_packed_t) * test->tes_span; 828 } 829 830 rc = lstcon_rpc_prep(nd, SRPC_SERVICE_TEST, feats, npg, nob, crpc); 831 if (rc != 0) 832 return rc; 833 834 trq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.tes_reqst; 835 836 if (transop == LST_TRANS_TSBSRVADD) { 837 int ndist = (sgrp->grp_nnode + test->tes_dist - 1) / test->tes_dist; 838 int nspan = (dgrp->grp_nnode + test->tes_span - 1) / test->tes_span; 839 int nmax = (ndist + nspan - 1) / nspan; 840 841 trq->tsr_ndest = 0; 842 trq->tsr_loop = nmax * test->tes_dist * test->tes_concur; 843 844 } else { 845 bulk = &(*crpc)->crp_rpc->crpc_bulk; 846 847 for (i = 0; i < npg; i++) { 848 int len; 849 850 LASSERT(nob > 0); 851 852 len = (feats & LST_FEAT_BULK_LEN) == 0 ? 853 PAGE_CACHE_SIZE : min_t(int, nob, PAGE_CACHE_SIZE); 854 nob -= len; 855 856 bulk->bk_iovs[i].kiov_offset = 0; 857 bulk->bk_iovs[i].kiov_len = len; 858 bulk->bk_iovs[i].kiov_page = 859 alloc_page(GFP_IOFS); 860 861 if (bulk->bk_iovs[i].kiov_page == NULL) { 862 lstcon_rpc_put(*crpc); 863 return -ENOMEM; 864 } 865 } 866 867 bulk->bk_sink = 0; 868 869 LASSERT(transop == LST_TRANS_TSBCLIADD); 870 871 rc = lstcon_dstnodes_prep(test->tes_dst_grp, 872 test->tes_cliidx++, 873 test->tes_dist, 874 test->tes_span, 875 npg, &bulk->bk_iovs[0]); 876 if (rc != 0) { 877 lstcon_rpc_put(*crpc); 878 return rc; 879 } 880 881 trq->tsr_ndest = test->tes_span; 882 trq->tsr_loop = test->tes_loop; 883 } 884 885 trq->tsr_sid = console_session.ses_id; 886 trq->tsr_bid = test->tes_hdr.tsb_id; 887 trq->tsr_concur = test->tes_concur; 888 trq->tsr_is_client = (transop == LST_TRANS_TSBCLIADD) ? 1 : 0; 889 trq->tsr_stop_onerr = !!test->tes_stop_onerr; 890 891 switch (test->tes_type) { 892 case LST_TEST_PING: 893 trq->tsr_service = SRPC_SERVICE_PING; 894 rc = lstcon_pingrpc_prep((lst_test_ping_param_t *) 895 &test->tes_param[0], trq); 896 break; 897 898 case LST_TEST_BULK: 899 trq->tsr_service = SRPC_SERVICE_BRW; 900 if ((feats & LST_FEAT_BULK_LEN) == 0) { 901 rc = lstcon_bulkrpc_v0_prep((lst_test_bulk_param_t *) 902 &test->tes_param[0], trq); 903 } else { 904 rc = lstcon_bulkrpc_v1_prep((lst_test_bulk_param_t *) 905 &test->tes_param[0], trq); 906 } 907 908 break; 909 default: 910 LBUG(); 911 break; 912 } 913 914 return rc; 915} 916 917int 918lstcon_sesnew_stat_reply(lstcon_rpc_trans_t *trans, 919 lstcon_node_t *nd, srpc_msg_t *reply) 920{ 921 srpc_mksn_reply_t *mksn_rep = &reply->msg_body.mksn_reply; 922 int status = mksn_rep->mksn_status; 923 924 if (status == 0 && 925 (reply->msg_ses_feats & ~LST_FEATS_MASK) != 0) { 926 mksn_rep->mksn_status = EPROTO; 927 status = EPROTO; 928 } 929 930 if (status == EPROTO) { 931 CNETERR("session protocol error from %s: %u\n", 932 libcfs_nid2str(nd->nd_id.nid), 933 reply->msg_ses_feats); 934 } 935 936 if (status != 0) 937 return status; 938 939 if (!trans->tas_feats_updated) { 940 trans->tas_feats_updated = 1; 941 trans->tas_features = reply->msg_ses_feats; 942 } 943 944 if (reply->msg_ses_feats != trans->tas_features) { 945 CNETERR("Framework features %x from %s is different with " 946 "features on this transaction: %x\n", 947 reply->msg_ses_feats, libcfs_nid2str(nd->nd_id.nid), 948 trans->tas_features); 949 status = mksn_rep->mksn_status = EPROTO; 950 } 951 952 if (status == 0) { 953 /* session timeout on remote node */ 954 nd->nd_timeout = mksn_rep->mksn_timeout; 955 } 956 957 return status; 958} 959 960void 961lstcon_rpc_stat_reply(lstcon_rpc_trans_t *trans, srpc_msg_t *msg, 962 lstcon_node_t *nd, lstcon_trans_stat_t *stat) 963{ 964 srpc_rmsn_reply_t *rmsn_rep; 965 srpc_debug_reply_t *dbg_rep; 966 srpc_batch_reply_t *bat_rep; 967 srpc_test_reply_t *test_rep; 968 srpc_stat_reply_t *stat_rep; 969 int rc = 0; 970 971 switch (trans->tas_opc) { 972 case LST_TRANS_SESNEW: 973 rc = lstcon_sesnew_stat_reply(trans, nd, msg); 974 if (rc == 0) { 975 lstcon_sesop_stat_success(stat, 1); 976 return; 977 } 978 979 lstcon_sesop_stat_failure(stat, 1); 980 break; 981 982 case LST_TRANS_SESEND: 983 rmsn_rep = &msg->msg_body.rmsn_reply; 984 /* ESRCH is not an error for end session */ 985 if (rmsn_rep->rmsn_status == 0 || 986 rmsn_rep->rmsn_status == ESRCH) { 987 lstcon_sesop_stat_success(stat, 1); 988 return; 989 } 990 991 lstcon_sesop_stat_failure(stat, 1); 992 rc = rmsn_rep->rmsn_status; 993 break; 994 995 case LST_TRANS_SESQRY: 996 case LST_TRANS_SESPING: 997 dbg_rep = &msg->msg_body.dbg_reply; 998 999 if (dbg_rep->dbg_status == ESRCH) { 1000 lstcon_sesqry_stat_unknown(stat, 1); 1001 return; 1002 } 1003 1004 if (lstcon_session_match(dbg_rep->dbg_sid)) 1005 lstcon_sesqry_stat_active(stat, 1); 1006 else 1007 lstcon_sesqry_stat_busy(stat, 1); 1008 return; 1009 1010 case LST_TRANS_TSBRUN: 1011 case LST_TRANS_TSBSTOP: 1012 bat_rep = &msg->msg_body.bat_reply; 1013 1014 if (bat_rep->bar_status == 0) { 1015 lstcon_tsbop_stat_success(stat, 1); 1016 return; 1017 } 1018 1019 if (bat_rep->bar_status == EPERM && 1020 trans->tas_opc == LST_TRANS_TSBSTOP) { 1021 lstcon_tsbop_stat_success(stat, 1); 1022 return; 1023 } 1024 1025 lstcon_tsbop_stat_failure(stat, 1); 1026 rc = bat_rep->bar_status; 1027 break; 1028 1029 case LST_TRANS_TSBCLIQRY: 1030 case LST_TRANS_TSBSRVQRY: 1031 bat_rep = &msg->msg_body.bat_reply; 1032 1033 if (bat_rep->bar_active != 0) 1034 lstcon_tsbqry_stat_run(stat, 1); 1035 else 1036 lstcon_tsbqry_stat_idle(stat, 1); 1037 1038 if (bat_rep->bar_status == 0) 1039 return; 1040 1041 lstcon_tsbqry_stat_failure(stat, 1); 1042 rc = bat_rep->bar_status; 1043 break; 1044 1045 case LST_TRANS_TSBCLIADD: 1046 case LST_TRANS_TSBSRVADD: 1047 test_rep = &msg->msg_body.tes_reply; 1048 1049 if (test_rep->tsr_status == 0) { 1050 lstcon_tsbop_stat_success(stat, 1); 1051 return; 1052 } 1053 1054 lstcon_tsbop_stat_failure(stat, 1); 1055 rc = test_rep->tsr_status; 1056 break; 1057 1058 case LST_TRANS_STATQRY: 1059 stat_rep = &msg->msg_body.stat_reply; 1060 1061 if (stat_rep->str_status == 0) { 1062 lstcon_statqry_stat_success(stat, 1); 1063 return; 1064 } 1065 1066 lstcon_statqry_stat_failure(stat, 1); 1067 rc = stat_rep->str_status; 1068 break; 1069 1070 default: 1071 LBUG(); 1072 } 1073 1074 if (stat->trs_fwk_errno == 0) 1075 stat->trs_fwk_errno = rc; 1076 1077 return; 1078} 1079 1080int 1081lstcon_rpc_trans_ndlist(struct list_head *ndlist, 1082 struct list_head *translist, int transop, 1083 void *arg, lstcon_rpc_cond_func_t condition, 1084 lstcon_rpc_trans_t **transpp) 1085{ 1086 lstcon_rpc_trans_t *trans; 1087 lstcon_ndlink_t *ndl; 1088 lstcon_node_t *nd; 1089 lstcon_rpc_t *rpc; 1090 unsigned feats; 1091 int rc; 1092 1093 /* Creating session RPG for list of nodes */ 1094 1095 rc = lstcon_rpc_trans_prep(translist, transop, &trans); 1096 if (rc != 0) { 1097 CERROR("Can't create transaction %d: %d\n", transop, rc); 1098 return rc; 1099 } 1100 1101 feats = trans->tas_features; 1102 list_for_each_entry(ndl, ndlist, ndl_link) { 1103 rc = condition == NULL ? 1 : 1104 condition(transop, ndl->ndl_node, arg); 1105 1106 if (rc == 0) 1107 continue; 1108 1109 if (rc < 0) { 1110 CDEBUG(D_NET, "Condition error while creating RPC " 1111 " for transaction %d: %d\n", transop, rc); 1112 break; 1113 } 1114 1115 nd = ndl->ndl_node; 1116 1117 switch (transop) { 1118 case LST_TRANS_SESNEW: 1119 case LST_TRANS_SESEND: 1120 rc = lstcon_sesrpc_prep(nd, transop, feats, &rpc); 1121 break; 1122 case LST_TRANS_SESQRY: 1123 case LST_TRANS_SESPING: 1124 rc = lstcon_dbgrpc_prep(nd, feats, &rpc); 1125 break; 1126 case LST_TRANS_TSBCLIADD: 1127 case LST_TRANS_TSBSRVADD: 1128 rc = lstcon_testrpc_prep(nd, transop, feats, 1129 (lstcon_test_t *)arg, &rpc); 1130 break; 1131 case LST_TRANS_TSBRUN: 1132 case LST_TRANS_TSBSTOP: 1133 case LST_TRANS_TSBCLIQRY: 1134 case LST_TRANS_TSBSRVQRY: 1135 rc = lstcon_batrpc_prep(nd, transop, feats, 1136 (lstcon_tsb_hdr_t *)arg, &rpc); 1137 break; 1138 case LST_TRANS_STATQRY: 1139 rc = lstcon_statrpc_prep(nd, feats, &rpc); 1140 break; 1141 default: 1142 rc = -EINVAL; 1143 break; 1144 } 1145 1146 if (rc != 0) { 1147 CERROR("Failed to create RPC for transaction %s: %d\n", 1148 lstcon_rpc_trans_name(transop), rc); 1149 break; 1150 } 1151 1152 lstcon_rpc_trans_addreq(trans, rpc); 1153 } 1154 1155 if (rc == 0) { 1156 *transpp = trans; 1157 return 0; 1158 } 1159 1160 lstcon_rpc_trans_destroy(trans); 1161 1162 return rc; 1163} 1164 1165void 1166lstcon_rpc_pinger(void *arg) 1167{ 1168 stt_timer_t *ptimer = (stt_timer_t *)arg; 1169 lstcon_rpc_trans_t *trans; 1170 lstcon_rpc_t *crpc; 1171 srpc_msg_t *rep; 1172 srpc_debug_reqst_t *drq; 1173 lstcon_ndlink_t *ndl; 1174 lstcon_node_t *nd; 1175 time_t intv; 1176 int count = 0; 1177 int rc; 1178 1179 /* RPC pinger is a special case of transaction, 1180 * it's called by timer at 8 seconds interval. 1181 */ 1182 mutex_lock(&console_session.ses_mutex); 1183 1184 if (console_session.ses_shutdown || console_session.ses_expired) { 1185 mutex_unlock(&console_session.ses_mutex); 1186 return; 1187 } 1188 1189 if (!console_session.ses_expired && 1190 cfs_time_current_sec() - console_session.ses_laststamp > 1191 (time_t)console_session.ses_timeout) 1192 console_session.ses_expired = 1; 1193 1194 trans = console_session.ses_ping; 1195 1196 LASSERT(trans != NULL); 1197 1198 list_for_each_entry(ndl, &console_session.ses_ndl_list, ndl_link) { 1199 nd = ndl->ndl_node; 1200 1201 if (console_session.ses_expired) { 1202 /* idle console, end session on all nodes */ 1203 if (nd->nd_state != LST_NODE_ACTIVE) 1204 continue; 1205 1206 rc = lstcon_sesrpc_prep(nd, LST_TRANS_SESEND, 1207 trans->tas_features, &crpc); 1208 if (rc != 0) { 1209 CERROR("Out of memory\n"); 1210 break; 1211 } 1212 1213 lstcon_rpc_trans_addreq(trans, crpc); 1214 lstcon_rpc_post(crpc); 1215 1216 continue; 1217 } 1218 1219 crpc = &nd->nd_ping; 1220 1221 if (crpc->crp_rpc != NULL) { 1222 LASSERT(crpc->crp_trans == trans); 1223 LASSERT(!list_empty(&crpc->crp_link)); 1224 1225 spin_lock(&crpc->crp_rpc->crpc_lock); 1226 1227 LASSERT(crpc->crp_posted); 1228 1229 if (!crpc->crp_finished) { 1230 /* in flight */ 1231 spin_unlock(&crpc->crp_rpc->crpc_lock); 1232 continue; 1233 } 1234 1235 spin_unlock(&crpc->crp_rpc->crpc_lock); 1236 1237 lstcon_rpc_get_reply(crpc, &rep); 1238 1239 list_del_init(&crpc->crp_link); 1240 1241 lstcon_rpc_put(crpc); 1242 } 1243 1244 if (nd->nd_state != LST_NODE_ACTIVE) 1245 continue; 1246 1247 intv = cfs_duration_sec(cfs_time_sub(cfs_time_current(), 1248 nd->nd_stamp)); 1249 if (intv < (time_t)nd->nd_timeout / 2) 1250 continue; 1251 1252 rc = lstcon_rpc_init(nd, SRPC_SERVICE_DEBUG, 1253 trans->tas_features, 0, 0, 1, crpc); 1254 if (rc != 0) { 1255 CERROR("Out of memory\n"); 1256 break; 1257 } 1258 1259 drq = &crpc->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst; 1260 1261 drq->dbg_sid = console_session.ses_id; 1262 drq->dbg_flags = 0; 1263 1264 lstcon_rpc_trans_addreq(trans, crpc); 1265 lstcon_rpc_post(crpc); 1266 1267 count ++; 1268 } 1269 1270 if (console_session.ses_expired) { 1271 mutex_unlock(&console_session.ses_mutex); 1272 return; 1273 } 1274 1275 CDEBUG(D_NET, "Ping %d nodes in session\n", count); 1276 1277 ptimer->stt_expires = (cfs_time_t)(cfs_time_current_sec() + LST_PING_INTERVAL); 1278 stt_add_timer(ptimer); 1279 1280 mutex_unlock(&console_session.ses_mutex); 1281} 1282 1283int 1284lstcon_rpc_pinger_start(void) 1285{ 1286 stt_timer_t *ptimer; 1287 int rc; 1288 1289 LASSERT(list_empty(&console_session.ses_rpc_freelist)); 1290 LASSERT(atomic_read(&console_session.ses_rpc_counter) == 0); 1291 1292 rc = lstcon_rpc_trans_prep(NULL, LST_TRANS_SESPING, 1293 &console_session.ses_ping); 1294 if (rc != 0) { 1295 CERROR("Failed to create console pinger\n"); 1296 return rc; 1297 } 1298 1299 ptimer = &console_session.ses_ping_timer; 1300 ptimer->stt_expires = (cfs_time_t)(cfs_time_current_sec() + LST_PING_INTERVAL); 1301 1302 stt_add_timer(ptimer); 1303 1304 return 0; 1305} 1306 1307void 1308lstcon_rpc_pinger_stop(void) 1309{ 1310 LASSERT(console_session.ses_shutdown); 1311 1312 stt_del_timer(&console_session.ses_ping_timer); 1313 1314 lstcon_rpc_trans_abort(console_session.ses_ping, -ESHUTDOWN); 1315 lstcon_rpc_trans_stat(console_session.ses_ping, lstcon_trans_stat()); 1316 lstcon_rpc_trans_destroy(console_session.ses_ping); 1317 1318 memset(lstcon_trans_stat(), 0, sizeof(lstcon_trans_stat_t)); 1319 1320 console_session.ses_ping = NULL; 1321} 1322 1323void 1324lstcon_rpc_cleanup_wait(void) 1325{ 1326 lstcon_rpc_trans_t *trans; 1327 lstcon_rpc_t *crpc; 1328 struct list_head *pacer; 1329 struct list_head zlist; 1330 1331 /* Called with hold of global mutex */ 1332 1333 LASSERT(console_session.ses_shutdown); 1334 1335 while (!list_empty(&console_session.ses_trans_list)) { 1336 list_for_each(pacer, &console_session.ses_trans_list) { 1337 trans = list_entry(pacer, lstcon_rpc_trans_t, 1338 tas_link); 1339 1340 CDEBUG(D_NET, "Session closed, wakeup transaction %s\n", 1341 lstcon_rpc_trans_name(trans->tas_opc)); 1342 1343 wake_up(&trans->tas_waitq); 1344 } 1345 1346 mutex_unlock(&console_session.ses_mutex); 1347 1348 CWARN("Session is shutting down, " 1349 "waiting for termination of transactions\n"); 1350 cfs_pause(cfs_time_seconds(1)); 1351 1352 mutex_lock(&console_session.ses_mutex); 1353 } 1354 1355 spin_lock(&console_session.ses_rpc_lock); 1356 1357 lst_wait_until((atomic_read(&console_session.ses_rpc_counter) == 0), 1358 console_session.ses_rpc_lock, 1359 "Network is not accessible or target is down, " 1360 "waiting for %d console RPCs to being recycled\n", 1361 atomic_read(&console_session.ses_rpc_counter)); 1362 1363 list_add(&zlist, &console_session.ses_rpc_freelist); 1364 list_del_init(&console_session.ses_rpc_freelist); 1365 1366 spin_unlock(&console_session.ses_rpc_lock); 1367 1368 while (!list_empty(&zlist)) { 1369 crpc = list_entry(zlist.next, lstcon_rpc_t, crp_link); 1370 1371 list_del(&crpc->crp_link); 1372 LIBCFS_FREE(crpc, sizeof(lstcon_rpc_t)); 1373 } 1374} 1375 1376int 1377lstcon_rpc_module_init(void) 1378{ 1379 INIT_LIST_HEAD(&console_session.ses_ping_timer.stt_list); 1380 console_session.ses_ping_timer.stt_func = lstcon_rpc_pinger; 1381 console_session.ses_ping_timer.stt_data = &console_session.ses_ping_timer; 1382 1383 console_session.ses_ping = NULL; 1384 1385 spin_lock_init(&console_session.ses_rpc_lock); 1386 atomic_set(&console_session.ses_rpc_counter, 0); 1387 INIT_LIST_HEAD(&console_session.ses_rpc_freelist); 1388 1389 return 0; 1390} 1391 1392void 1393lstcon_rpc_module_fini(void) 1394{ 1395 LASSERT(list_empty(&console_session.ses_rpc_freelist)); 1396 LASSERT(atomic_read(&console_session.ses_rpc_counter) == 0); 1397} 1398