framework.c revision 7264b8a5db30b717b39394234fd3bb7dabdbda92
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lnet/selftest/framework.c 37 * 38 * Author: Isaac Huang <isaac@clusterfs.com> 39 * Author: Liang Zhen <liangzhen@clusterfs.com> 40 */ 41 42#define DEBUG_SUBSYSTEM S_LNET 43 44#include "selftest.h" 45 46lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1}; 47 48static int session_timeout = 100; 49module_param(session_timeout, int, 0444); 50MODULE_PARM_DESC(session_timeout, "test session timeout in seconds (100 by default, 0 == never)"); 51 52static int rpc_timeout = 64; 53module_param(rpc_timeout, int, 0644); 54MODULE_PARM_DESC(rpc_timeout, "rpc timeout in seconds (64 by default, 0 == never)"); 55 56#define sfw_unpack_id(id) \ 57do { \ 58 __swab64s(&(id).nid); \ 59 __swab32s(&(id).pid); \ 60} while (0) 61 62#define sfw_unpack_sid(sid) \ 63do { \ 64 __swab64s(&(sid).ses_nid); \ 65 __swab64s(&(sid).ses_stamp); \ 66} while (0) 67 68#define sfw_unpack_fw_counters(fc) \ 69do { \ 70 __swab32s(&(fc).running_ms); \ 71 __swab32s(&(fc).active_batches); \ 72 __swab32s(&(fc).zombie_sessions); \ 73 __swab32s(&(fc).brw_errors); \ 74 __swab32s(&(fc).ping_errors); \ 75} while (0) 76 77#define sfw_unpack_rpc_counters(rc) \ 78do { \ 79 __swab32s(&(rc).errors); \ 80 __swab32s(&(rc).rpcs_sent); \ 81 __swab32s(&(rc).rpcs_rcvd); \ 82 __swab32s(&(rc).rpcs_dropped); \ 83 __swab32s(&(rc).rpcs_expired); \ 84 __swab64s(&(rc).bulk_get); \ 85 __swab64s(&(rc).bulk_put); \ 86} while (0) 87 88#define sfw_unpack_lnet_counters(lc) \ 89do { \ 90 __swab32s(&(lc).errors); \ 91 __swab32s(&(lc).msgs_max); \ 92 __swab32s(&(lc).msgs_alloc); \ 93 __swab32s(&(lc).send_count); \ 94 __swab32s(&(lc).recv_count); \ 95 __swab32s(&(lc).drop_count); \ 96 __swab32s(&(lc).route_count); \ 97 __swab64s(&(lc).send_length); \ 98 __swab64s(&(lc).recv_length); \ 99 __swab64s(&(lc).drop_length); \ 100 __swab64s(&(lc).route_length); \ 101} while (0) 102 103#define sfw_test_active(t) (atomic_read(&(t)->tsi_nactive) != 0) 104#define sfw_batch_active(b) (atomic_read(&(b)->bat_nactive) != 0) 105 106struct smoketest_framework { 107 struct list_head fw_zombie_rpcs; /* RPCs to be recycled */ 108 struct list_head fw_zombie_sessions; /* stopping sessions */ 109 struct list_head fw_tests; /* registered test cases */ 110 atomic_t fw_nzombies; /* # zombie sessions */ 111 spinlock_t fw_lock; /* serialise */ 112 sfw_session_t *fw_session; /* _the_ session */ 113 int fw_shuttingdown; /* shutdown in progress */ 114 srpc_server_rpc_t *fw_active_srpc; /* running RPC */ 115} sfw_data; 116 117/* forward ref's */ 118int sfw_stop_batch (sfw_batch_t *tsb, int force); 119void sfw_destroy_session (sfw_session_t *sn); 120 121static inline sfw_test_case_t * 122sfw_find_test_case(int id) 123{ 124 sfw_test_case_t *tsc; 125 126 LASSERT (id <= SRPC_SERVICE_MAX_ID); 127 LASSERT (id > SRPC_FRAMEWORK_SERVICE_MAX_ID); 128 129 list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) { 130 if (tsc->tsc_srv_service->sv_id == id) 131 return tsc; 132 } 133 134 return NULL; 135} 136 137static int 138sfw_register_test (srpc_service_t *service, sfw_test_client_ops_t *cliops) 139{ 140 sfw_test_case_t *tsc; 141 142 if (sfw_find_test_case(service->sv_id) != NULL) { 143 CERROR ("Failed to register test %s (%d)\n", 144 service->sv_name, service->sv_id); 145 return -EEXIST; 146 } 147 148 LIBCFS_ALLOC(tsc, sizeof(sfw_test_case_t)); 149 if (tsc == NULL) 150 return -ENOMEM; 151 152 tsc->tsc_cli_ops = cliops; 153 tsc->tsc_srv_service = service; 154 155 list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests); 156 return 0; 157} 158 159void 160sfw_add_session_timer (void) 161{ 162 sfw_session_t *sn = sfw_data.fw_session; 163 stt_timer_t *timer = &sn->sn_timer; 164 165 LASSERT (!sfw_data.fw_shuttingdown); 166 167 if (sn == NULL || sn->sn_timeout == 0) 168 return; 169 170 LASSERT (!sn->sn_timer_active); 171 172 sn->sn_timer_active = 1; 173 timer->stt_expires = cfs_time_add(sn->sn_timeout, 174 get_seconds()); 175 stt_add_timer(timer); 176 return; 177} 178 179int 180sfw_del_session_timer (void) 181{ 182 sfw_session_t *sn = sfw_data.fw_session; 183 184 if (sn == NULL || !sn->sn_timer_active) 185 return 0; 186 187 LASSERT (sn->sn_timeout != 0); 188 189 if (stt_del_timer(&sn->sn_timer)) { /* timer defused */ 190 sn->sn_timer_active = 0; 191 return 0; 192 } 193 194 return EBUSY; /* racing with sfw_session_expired() */ 195} 196 197/* called with sfw_data.fw_lock held */ 198static void 199sfw_deactivate_session (void) 200{ 201 sfw_session_t *sn = sfw_data.fw_session; 202 int nactive = 0; 203 sfw_batch_t *tsb; 204 sfw_test_case_t *tsc; 205 206 if (sn == NULL) return; 207 208 LASSERT (!sn->sn_timer_active); 209 210 sfw_data.fw_session = NULL; 211 atomic_inc(&sfw_data.fw_nzombies); 212 list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions); 213 214 spin_unlock(&sfw_data.fw_lock); 215 216 list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) { 217 srpc_abort_service(tsc->tsc_srv_service); 218 } 219 220 spin_lock(&sfw_data.fw_lock); 221 222 list_for_each_entry (tsb, &sn->sn_batches, bat_list) { 223 if (sfw_batch_active(tsb)) { 224 nactive++; 225 sfw_stop_batch(tsb, 1); 226 } 227 } 228 229 if (nactive != 0) 230 return; /* wait for active batches to stop */ 231 232 list_del_init(&sn->sn_list); 233 spin_unlock(&sfw_data.fw_lock); 234 235 sfw_destroy_session(sn); 236 237 spin_lock(&sfw_data.fw_lock); 238} 239 240 241void 242sfw_session_expired (void *data) 243{ 244 sfw_session_t *sn = data; 245 246 spin_lock(&sfw_data.fw_lock); 247 248 LASSERT (sn->sn_timer_active); 249 LASSERT (sn == sfw_data.fw_session); 250 251 CWARN ("Session expired! sid: %s-"LPU64", name: %s\n", 252 libcfs_nid2str(sn->sn_id.ses_nid), 253 sn->sn_id.ses_stamp, &sn->sn_name[0]); 254 255 sn->sn_timer_active = 0; 256 sfw_deactivate_session(); 257 258 spin_unlock(&sfw_data.fw_lock); 259} 260 261static inline void 262sfw_init_session(sfw_session_t *sn, lst_sid_t sid, 263 unsigned features, const char *name) 264{ 265 stt_timer_t *timer = &sn->sn_timer; 266 267 memset(sn, 0, sizeof(sfw_session_t)); 268 INIT_LIST_HEAD(&sn->sn_list); 269 INIT_LIST_HEAD(&sn->sn_batches); 270 atomic_set(&sn->sn_refcount, 1); /* +1 for caller */ 271 atomic_set(&sn->sn_brw_errors, 0); 272 atomic_set(&sn->sn_ping_errors, 0); 273 strlcpy(&sn->sn_name[0], name, sizeof(sn->sn_name)); 274 275 sn->sn_timer_active = 0; 276 sn->sn_id = sid; 277 sn->sn_features = features; 278 sn->sn_timeout = session_timeout; 279 sn->sn_started = cfs_time_current(); 280 281 timer->stt_data = sn; 282 timer->stt_func = sfw_session_expired; 283 INIT_LIST_HEAD(&timer->stt_list); 284} 285 286/* completion handler for incoming framework RPCs */ 287void 288sfw_server_rpc_done(struct srpc_server_rpc *rpc) 289{ 290 struct srpc_service *sv = rpc->srpc_scd->scd_svc; 291 int status = rpc->srpc_status; 292 293 CDEBUG (D_NET, 294 "Incoming framework RPC done: " 295 "service %s, peer %s, status %s:%d\n", 296 sv->sv_name, libcfs_id2str(rpc->srpc_peer), 297 swi_state2str(rpc->srpc_wi.swi_state), 298 status); 299 300 if (rpc->srpc_bulk != NULL) 301 sfw_free_pages(rpc); 302 return; 303} 304 305void 306sfw_client_rpc_fini (srpc_client_rpc_t *rpc) 307{ 308 LASSERT (rpc->crpc_bulk.bk_niov == 0); 309 LASSERT (list_empty(&rpc->crpc_list)); 310 LASSERT (atomic_read(&rpc->crpc_refcount) == 0); 311 312 CDEBUG (D_NET, 313 "Outgoing framework RPC done: " 314 "service %d, peer %s, status %s:%d:%d\n", 315 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), 316 swi_state2str(rpc->crpc_wi.swi_state), 317 rpc->crpc_aborted, rpc->crpc_status); 318 319 spin_lock(&sfw_data.fw_lock); 320 321 /* my callers must finish all RPCs before shutting me down */ 322 LASSERT(!sfw_data.fw_shuttingdown); 323 list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs); 324 325 spin_unlock(&sfw_data.fw_lock); 326} 327 328sfw_batch_t * 329sfw_find_batch (lst_bid_t bid) 330{ 331 sfw_session_t *sn = sfw_data.fw_session; 332 sfw_batch_t *bat; 333 334 LASSERT (sn != NULL); 335 336 list_for_each_entry (bat, &sn->sn_batches, bat_list) { 337 if (bat->bat_id.bat_id == bid.bat_id) 338 return bat; 339 } 340 341 return NULL; 342} 343 344sfw_batch_t * 345sfw_bid2batch (lst_bid_t bid) 346{ 347 sfw_session_t *sn = sfw_data.fw_session; 348 sfw_batch_t *bat; 349 350 LASSERT (sn != NULL); 351 352 bat = sfw_find_batch(bid); 353 if (bat != NULL) 354 return bat; 355 356 LIBCFS_ALLOC(bat, sizeof(sfw_batch_t)); 357 if (bat == NULL) 358 return NULL; 359 360 bat->bat_error = 0; 361 bat->bat_session = sn; 362 bat->bat_id = bid; 363 atomic_set(&bat->bat_nactive, 0); 364 INIT_LIST_HEAD(&bat->bat_tests); 365 366 list_add_tail(&bat->bat_list, &sn->sn_batches); 367 return bat; 368} 369 370int 371sfw_get_stats (srpc_stat_reqst_t *request, srpc_stat_reply_t *reply) 372{ 373 sfw_session_t *sn = sfw_data.fw_session; 374 sfw_counters_t *cnt = &reply->str_fw; 375 sfw_batch_t *bat; 376 struct timeval tv; 377 378 reply->str_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id; 379 380 if (request->str_sid.ses_nid == LNET_NID_ANY) { 381 reply->str_status = EINVAL; 382 return 0; 383 } 384 385 if (sn == NULL || !sfw_sid_equal(request->str_sid, sn->sn_id)) { 386 reply->str_status = ESRCH; 387 return 0; 388 } 389 390 lnet_counters_get(&reply->str_lnet); 391 srpc_get_counters(&reply->str_rpc); 392 393 /* send over the msecs since the session was started 394 - with 32 bits to send, this is ~49 days */ 395 cfs_duration_usec(cfs_time_sub(cfs_time_current(), 396 sn->sn_started), &tv); 397 398 cnt->running_ms = (__u32)(tv.tv_sec * 1000 + tv.tv_usec / 1000); 399 cnt->brw_errors = atomic_read(&sn->sn_brw_errors); 400 cnt->ping_errors = atomic_read(&sn->sn_ping_errors); 401 cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies); 402 403 cnt->active_batches = 0; 404 list_for_each_entry (bat, &sn->sn_batches, bat_list) { 405 if (atomic_read(&bat->bat_nactive) > 0) 406 cnt->active_batches++; 407 } 408 409 reply->str_status = 0; 410 return 0; 411} 412 413int 414sfw_make_session(srpc_mksn_reqst_t *request, srpc_mksn_reply_t *reply) 415{ 416 sfw_session_t *sn = sfw_data.fw_session; 417 srpc_msg_t *msg = container_of(request, srpc_msg_t, 418 msg_body.mksn_reqst); 419 int cplen = 0; 420 421 if (request->mksn_sid.ses_nid == LNET_NID_ANY) { 422 reply->mksn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id; 423 reply->mksn_status = EINVAL; 424 return 0; 425 } 426 427 if (sn != NULL) { 428 reply->mksn_status = 0; 429 reply->mksn_sid = sn->sn_id; 430 reply->mksn_timeout = sn->sn_timeout; 431 432 if (sfw_sid_equal(request->mksn_sid, sn->sn_id)) { 433 atomic_inc(&sn->sn_refcount); 434 return 0; 435 } 436 437 if (!request->mksn_force) { 438 reply->mksn_status = EBUSY; 439 cplen = strlcpy(&reply->mksn_name[0], &sn->sn_name[0], 440 sizeof(reply->mksn_name)); 441 if (cplen >= sizeof(reply->mksn_name)) 442 return -E2BIG; 443 return 0; 444 } 445 } 446 447 /* reject the request if it requires unknown features 448 * NB: old version will always accept all features because it's not 449 * aware of srpc_msg_t::msg_ses_feats, it's a defect but it's also 450 * harmless because it will return zero feature to console, and it's 451 * console's responsibility to make sure all nodes in a session have 452 * same feature mask. */ 453 if ((msg->msg_ses_feats & ~LST_FEATS_MASK) != 0) { 454 reply->mksn_status = EPROTO; 455 return 0; 456 } 457 458 /* brand new or create by force */ 459 LIBCFS_ALLOC(sn, sizeof(sfw_session_t)); 460 if (sn == NULL) { 461 CERROR ("Dropping RPC (mksn) under memory pressure.\n"); 462 return -ENOMEM; 463 } 464 465 sfw_init_session(sn, request->mksn_sid, 466 msg->msg_ses_feats, &request->mksn_name[0]); 467 468 spin_lock(&sfw_data.fw_lock); 469 470 sfw_deactivate_session(); 471 LASSERT(sfw_data.fw_session == NULL); 472 sfw_data.fw_session = sn; 473 474 spin_unlock(&sfw_data.fw_lock); 475 476 reply->mksn_status = 0; 477 reply->mksn_sid = sn->sn_id; 478 reply->mksn_timeout = sn->sn_timeout; 479 return 0; 480} 481 482int 483sfw_remove_session (srpc_rmsn_reqst_t *request, srpc_rmsn_reply_t *reply) 484{ 485 sfw_session_t *sn = sfw_data.fw_session; 486 487 reply->rmsn_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id; 488 489 if (request->rmsn_sid.ses_nid == LNET_NID_ANY) { 490 reply->rmsn_status = EINVAL; 491 return 0; 492 } 493 494 if (sn == NULL || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) { 495 reply->rmsn_status = (sn == NULL) ? ESRCH : EBUSY; 496 return 0; 497 } 498 499 if (!atomic_dec_and_test(&sn->sn_refcount)) { 500 reply->rmsn_status = 0; 501 return 0; 502 } 503 504 spin_lock(&sfw_data.fw_lock); 505 sfw_deactivate_session(); 506 spin_unlock(&sfw_data.fw_lock); 507 508 reply->rmsn_status = 0; 509 reply->rmsn_sid = LST_INVALID_SID; 510 LASSERT(sfw_data.fw_session == NULL); 511 return 0; 512} 513 514int 515sfw_debug_session (srpc_debug_reqst_t *request, srpc_debug_reply_t *reply) 516{ 517 sfw_session_t *sn = sfw_data.fw_session; 518 519 if (sn == NULL) { 520 reply->dbg_status = ESRCH; 521 reply->dbg_sid = LST_INVALID_SID; 522 return 0; 523 } 524 525 reply->dbg_status = 0; 526 reply->dbg_sid = sn->sn_id; 527 reply->dbg_timeout = sn->sn_timeout; 528 if (strlcpy(reply->dbg_name, &sn->sn_name[0], sizeof(reply->dbg_name)) 529 >= sizeof(reply->dbg_name)) 530 return -E2BIG; 531 532 return 0; 533} 534 535void 536sfw_test_rpc_fini (srpc_client_rpc_t *rpc) 537{ 538 sfw_test_unit_t *tsu = rpc->crpc_priv; 539 sfw_test_instance_t *tsi = tsu->tsu_instance; 540 541 /* Called with hold of tsi->tsi_lock */ 542 LASSERT (list_empty(&rpc->crpc_list)); 543 list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs); 544} 545 546static inline int 547sfw_test_buffers(sfw_test_instance_t *tsi) 548{ 549 struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service); 550 struct srpc_service *svc = tsc->tsc_srv_service; 551 int nbuf; 552 553 nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts; 554 return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA); 555} 556 557int 558sfw_load_test(struct sfw_test_instance *tsi) 559{ 560 struct sfw_test_case *tsc; 561 struct srpc_service *svc; 562 int nbuf; 563 int rc; 564 565 LASSERT(tsi != NULL); 566 tsc = sfw_find_test_case(tsi->tsi_service); 567 nbuf = sfw_test_buffers(tsi); 568 LASSERT(tsc != NULL); 569 svc = tsc->tsc_srv_service; 570 571 if (tsi->tsi_is_client) { 572 tsi->tsi_ops = tsc->tsc_cli_ops; 573 return 0; 574 } 575 576 rc = srpc_service_add_buffers(svc, nbuf); 577 if (rc != 0) { 578 CWARN("Failed to reserve enough buffers: " 579 "service %s, %d needed: %d\n", svc->sv_name, nbuf, rc); 580 /* NB: this error handler is not strictly correct, because 581 * it may release more buffers than already allocated, 582 * but it doesn't matter because request portal should 583 * be lazy portal and will grow buffers if necessary. */ 584 srpc_service_remove_buffers(svc, nbuf); 585 return -ENOMEM; 586 } 587 588 CDEBUG(D_NET, "Reserved %d buffers for test %s\n", 589 nbuf * (srpc_serv_is_framework(svc) ? 590 1 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name); 591 return 0; 592} 593 594void 595sfw_unload_test(struct sfw_test_instance *tsi) 596{ 597 struct sfw_test_case *tsc = sfw_find_test_case(tsi->tsi_service); 598 599 LASSERT(tsc != NULL); 600 601 if (tsi->tsi_is_client) 602 return; 603 604 /* shrink buffers, because request portal is lazy portal 605 * which can grow buffers at runtime so we may leave 606 * some buffers behind, but never mind... */ 607 srpc_service_remove_buffers(tsc->tsc_srv_service, 608 sfw_test_buffers(tsi)); 609 return; 610} 611 612void 613sfw_destroy_test_instance (sfw_test_instance_t *tsi) 614{ 615 srpc_client_rpc_t *rpc; 616 sfw_test_unit_t *tsu; 617 618 if (!tsi->tsi_is_client) goto clean; 619 620 tsi->tsi_ops->tso_fini(tsi); 621 622 LASSERT (!tsi->tsi_stopping); 623 LASSERT (list_empty(&tsi->tsi_active_rpcs)); 624 LASSERT (!sfw_test_active(tsi)); 625 626 while (!list_empty(&tsi->tsi_units)) { 627 tsu = list_entry(tsi->tsi_units.next, 628 sfw_test_unit_t, tsu_list); 629 list_del(&tsu->tsu_list); 630 LIBCFS_FREE(tsu, sizeof(*tsu)); 631 } 632 633 while (!list_empty(&tsi->tsi_free_rpcs)) { 634 rpc = list_entry(tsi->tsi_free_rpcs.next, 635 srpc_client_rpc_t, crpc_list); 636 list_del(&rpc->crpc_list); 637 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc)); 638 } 639 640clean: 641 sfw_unload_test(tsi); 642 LIBCFS_FREE(tsi, sizeof(*tsi)); 643 return; 644} 645 646void 647sfw_destroy_batch (sfw_batch_t *tsb) 648{ 649 sfw_test_instance_t *tsi; 650 651 LASSERT (!sfw_batch_active(tsb)); 652 LASSERT (list_empty(&tsb->bat_list)); 653 654 while (!list_empty(&tsb->bat_tests)) { 655 tsi = list_entry(tsb->bat_tests.next, 656 sfw_test_instance_t, tsi_list); 657 list_del_init(&tsi->tsi_list); 658 sfw_destroy_test_instance(tsi); 659 } 660 661 LIBCFS_FREE(tsb, sizeof(sfw_batch_t)); 662 return; 663} 664 665void 666sfw_destroy_session (sfw_session_t *sn) 667{ 668 sfw_batch_t *batch; 669 670 LASSERT (list_empty(&sn->sn_list)); 671 LASSERT (sn != sfw_data.fw_session); 672 673 while (!list_empty(&sn->sn_batches)) { 674 batch = list_entry(sn->sn_batches.next, 675 sfw_batch_t, bat_list); 676 list_del_init(&batch->bat_list); 677 sfw_destroy_batch(batch); 678 } 679 680 LIBCFS_FREE(sn, sizeof(*sn)); 681 atomic_dec(&sfw_data.fw_nzombies); 682 return; 683} 684 685void 686sfw_unpack_addtest_req(srpc_msg_t *msg) 687{ 688 srpc_test_reqst_t *req = &msg->msg_body.tes_reqst; 689 690 LASSERT (msg->msg_type == SRPC_MSG_TEST_REQST); 691 LASSERT (req->tsr_is_client); 692 693 if (msg->msg_magic == SRPC_MSG_MAGIC) 694 return; /* no flipping needed */ 695 696 LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC)); 697 698 if (req->tsr_service == SRPC_SERVICE_BRW) { 699 if ((msg->msg_ses_feats & LST_FEAT_BULK_LEN) == 0) { 700 test_bulk_req_t *bulk = &req->tsr_u.bulk_v0; 701 702 __swab32s(&bulk->blk_opc); 703 __swab32s(&bulk->blk_npg); 704 __swab32s(&bulk->blk_flags); 705 706 } else { 707 test_bulk_req_v1_t *bulk = &req->tsr_u.bulk_v1; 708 709 __swab16s(&bulk->blk_opc); 710 __swab16s(&bulk->blk_flags); 711 __swab32s(&bulk->blk_offset); 712 __swab32s(&bulk->blk_len); 713 } 714 715 return; 716 } 717 718 if (req->tsr_service == SRPC_SERVICE_PING) { 719 test_ping_req_t *ping = &req->tsr_u.ping; 720 721 __swab32s(&ping->png_size); 722 __swab32s(&ping->png_flags); 723 return; 724 } 725 726 LBUG (); 727 return; 728} 729 730int 731sfw_add_test_instance (sfw_batch_t *tsb, srpc_server_rpc_t *rpc) 732{ 733 srpc_msg_t *msg = &rpc->srpc_reqstbuf->buf_msg; 734 srpc_test_reqst_t *req = &msg->msg_body.tes_reqst; 735 srpc_bulk_t *bk = rpc->srpc_bulk; 736 int ndest = req->tsr_ndest; 737 sfw_test_unit_t *tsu; 738 sfw_test_instance_t *tsi; 739 int i; 740 int rc; 741 742 LIBCFS_ALLOC(tsi, sizeof(*tsi)); 743 if (tsi == NULL) { 744 CERROR ("Can't allocate test instance for batch: "LPU64"\n", 745 tsb->bat_id.bat_id); 746 return -ENOMEM; 747 } 748 749 spin_lock_init(&tsi->tsi_lock); 750 atomic_set(&tsi->tsi_nactive, 0); 751 INIT_LIST_HEAD(&tsi->tsi_units); 752 INIT_LIST_HEAD(&tsi->tsi_free_rpcs); 753 INIT_LIST_HEAD(&tsi->tsi_active_rpcs); 754 755 tsi->tsi_stopping = 0; 756 tsi->tsi_batch = tsb; 757 tsi->tsi_loop = req->tsr_loop; 758 tsi->tsi_concur = req->tsr_concur; 759 tsi->tsi_service = req->tsr_service; 760 tsi->tsi_is_client = !!(req->tsr_is_client); 761 tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr); 762 763 rc = sfw_load_test(tsi); 764 if (rc != 0) { 765 LIBCFS_FREE(tsi, sizeof(*tsi)); 766 return rc; 767 } 768 769 LASSERT (!sfw_batch_active(tsb)); 770 771 if (!tsi->tsi_is_client) { 772 /* it's test server, just add it to tsb */ 773 list_add_tail(&tsi->tsi_list, &tsb->bat_tests); 774 return 0; 775 } 776 777 LASSERT (bk != NULL); 778 LASSERT (bk->bk_niov * SFW_ID_PER_PAGE >= (unsigned int)ndest); 779 LASSERT((unsigned int)bk->bk_len >= 780 sizeof(lnet_process_id_packed_t) * ndest); 781 782 sfw_unpack_addtest_req(msg); 783 memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u)); 784 785 for (i = 0; i < ndest; i++) { 786 lnet_process_id_packed_t *dests; 787 lnet_process_id_packed_t id; 788 int j; 789 790 dests = page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].kiov_page); 791 LASSERT (dests != NULL); /* my pages are within KVM always */ 792 id = dests[i % SFW_ID_PER_PAGE]; 793 if (msg->msg_magic != SRPC_MSG_MAGIC) 794 sfw_unpack_id(id); 795 796 for (j = 0; j < tsi->tsi_concur; j++) { 797 LIBCFS_ALLOC(tsu, sizeof(sfw_test_unit_t)); 798 if (tsu == NULL) { 799 rc = -ENOMEM; 800 CERROR ("Can't allocate tsu for %d\n", 801 tsi->tsi_service); 802 goto error; 803 } 804 805 tsu->tsu_dest.nid = id.nid; 806 tsu->tsu_dest.pid = id.pid; 807 tsu->tsu_instance = tsi; 808 tsu->tsu_private = NULL; 809 list_add_tail(&tsu->tsu_list, &tsi->tsi_units); 810 } 811 } 812 813 rc = tsi->tsi_ops->tso_init(tsi); 814 if (rc == 0) { 815 list_add_tail(&tsi->tsi_list, &tsb->bat_tests); 816 return 0; 817 } 818 819error: 820 LASSERT (rc != 0); 821 sfw_destroy_test_instance(tsi); 822 return rc; 823} 824 825static void 826sfw_test_unit_done (sfw_test_unit_t *tsu) 827{ 828 sfw_test_instance_t *tsi = tsu->tsu_instance; 829 sfw_batch_t *tsb = tsi->tsi_batch; 830 sfw_session_t *sn = tsb->bat_session; 831 832 LASSERT (sfw_test_active(tsi)); 833 834 if (!atomic_dec_and_test(&tsi->tsi_nactive)) 835 return; 836 837 /* the test instance is done */ 838 spin_lock(&tsi->tsi_lock); 839 840 tsi->tsi_stopping = 0; 841 842 spin_unlock(&tsi->tsi_lock); 843 844 spin_lock(&sfw_data.fw_lock); 845 846 if (!atomic_dec_and_test(&tsb->bat_nactive) ||/* tsb still active */ 847 sn == sfw_data.fw_session) { /* sn also active */ 848 spin_unlock(&sfw_data.fw_lock); 849 return; 850 } 851 852 LASSERT (!list_empty(&sn->sn_list)); /* I'm a zombie! */ 853 854 list_for_each_entry (tsb, &sn->sn_batches, bat_list) { 855 if (sfw_batch_active(tsb)) { 856 spin_unlock(&sfw_data.fw_lock); 857 return; 858 } 859 } 860 861 list_del_init(&sn->sn_list); 862 spin_unlock(&sfw_data.fw_lock); 863 864 sfw_destroy_session(sn); 865 return; 866} 867 868void 869sfw_test_rpc_done (srpc_client_rpc_t *rpc) 870{ 871 sfw_test_unit_t *tsu = rpc->crpc_priv; 872 sfw_test_instance_t *tsi = tsu->tsu_instance; 873 int done = 0; 874 875 tsi->tsi_ops->tso_done_rpc(tsu, rpc); 876 877 spin_lock(&tsi->tsi_lock); 878 879 LASSERT (sfw_test_active(tsi)); 880 LASSERT (!list_empty(&rpc->crpc_list)); 881 882 list_del_init(&rpc->crpc_list); 883 884 /* batch is stopping or loop is done or get error */ 885 if (tsi->tsi_stopping || 886 tsu->tsu_loop == 0 || 887 (rpc->crpc_status != 0 && tsi->tsi_stoptsu_onerr)) 888 done = 1; 889 890 /* dec ref for poster */ 891 srpc_client_rpc_decref(rpc); 892 893 spin_unlock(&tsi->tsi_lock); 894 895 if (!done) { 896 swi_schedule_workitem(&tsu->tsu_worker); 897 return; 898 } 899 900 sfw_test_unit_done(tsu); 901 return; 902} 903 904int 905sfw_create_test_rpc(sfw_test_unit_t *tsu, lnet_process_id_t peer, 906 unsigned features, int nblk, int blklen, 907 srpc_client_rpc_t **rpcpp) 908{ 909 srpc_client_rpc_t *rpc = NULL; 910 sfw_test_instance_t *tsi = tsu->tsu_instance; 911 912 spin_lock(&tsi->tsi_lock); 913 914 LASSERT (sfw_test_active(tsi)); 915 916 if (!list_empty(&tsi->tsi_free_rpcs)) { 917 /* pick request from buffer */ 918 rpc = list_entry(tsi->tsi_free_rpcs.next, 919 srpc_client_rpc_t, crpc_list); 920 LASSERT (nblk == rpc->crpc_bulk.bk_niov); 921 list_del_init(&rpc->crpc_list); 922 } 923 924 spin_unlock(&tsi->tsi_lock); 925 926 if (rpc == NULL) { 927 rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk, 928 blklen, sfw_test_rpc_done, 929 sfw_test_rpc_fini, tsu); 930 } else { 931 srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk, 932 blklen, sfw_test_rpc_done, 933 sfw_test_rpc_fini, tsu); 934 } 935 936 if (rpc == NULL) { 937 CERROR("Can't create rpc for test %d\n", tsi->tsi_service); 938 return -ENOMEM; 939 } 940 941 rpc->crpc_reqstmsg.msg_ses_feats = features; 942 *rpcpp = rpc; 943 944 return 0; 945} 946 947int 948sfw_run_test (swi_workitem_t *wi) 949{ 950 sfw_test_unit_t *tsu = wi->swi_workitem.wi_data; 951 sfw_test_instance_t *tsi = tsu->tsu_instance; 952 srpc_client_rpc_t *rpc = NULL; 953 954 LASSERT (wi == &tsu->tsu_worker); 955 956 if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc) != 0) { 957 LASSERT (rpc == NULL); 958 goto test_done; 959 } 960 961 LASSERT (rpc != NULL); 962 963 spin_lock(&tsi->tsi_lock); 964 965 if (tsi->tsi_stopping) { 966 list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs); 967 spin_unlock(&tsi->tsi_lock); 968 goto test_done; 969 } 970 971 if (tsu->tsu_loop > 0) 972 tsu->tsu_loop--; 973 974 list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs); 975 spin_unlock(&tsi->tsi_lock); 976 977 rpc->crpc_timeout = rpc_timeout; 978 979 spin_lock(&rpc->crpc_lock); 980 srpc_post_rpc(rpc); 981 spin_unlock(&rpc->crpc_lock); 982 return 0; 983 984test_done: 985 /* 986 * No one can schedule me now since: 987 * - previous RPC, if any, has done and 988 * - no new RPC is initiated. 989 * - my batch is still active; no one can run it again now. 990 * Cancel pending schedules and prevent future schedule attempts: 991 */ 992 swi_exit_workitem(wi); 993 sfw_test_unit_done(tsu); 994 return 1; 995} 996 997int 998sfw_run_batch (sfw_batch_t *tsb) 999{ 1000 swi_workitem_t *wi; 1001 sfw_test_unit_t *tsu; 1002 sfw_test_instance_t *tsi; 1003 1004 if (sfw_batch_active(tsb)) { 1005 CDEBUG(D_NET, "Batch already active: "LPU64" (%d)\n", 1006 tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive)); 1007 return 0; 1008 } 1009 1010 list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) { 1011 if (!tsi->tsi_is_client) /* skip server instances */ 1012 continue; 1013 1014 LASSERT (!tsi->tsi_stopping); 1015 LASSERT (!sfw_test_active(tsi)); 1016 1017 atomic_inc(&tsb->bat_nactive); 1018 1019 list_for_each_entry (tsu, &tsi->tsi_units, tsu_list) { 1020 atomic_inc(&tsi->tsi_nactive); 1021 tsu->tsu_loop = tsi->tsi_loop; 1022 wi = &tsu->tsu_worker; 1023 swi_init_workitem(wi, tsu, sfw_run_test, 1024 lst_sched_test[\ 1025 lnet_cpt_of_nid(tsu->tsu_dest.nid)]); 1026 swi_schedule_workitem(wi); 1027 } 1028 } 1029 1030 return 0; 1031} 1032 1033int 1034sfw_stop_batch (sfw_batch_t *tsb, int force) 1035{ 1036 sfw_test_instance_t *tsi; 1037 srpc_client_rpc_t *rpc; 1038 1039 if (!sfw_batch_active(tsb)) { 1040 CDEBUG(D_NET, "Batch "LPU64" inactive\n", tsb->bat_id.bat_id); 1041 return 0; 1042 } 1043 1044 list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) { 1045 spin_lock(&tsi->tsi_lock); 1046 1047 if (!tsi->tsi_is_client || 1048 !sfw_test_active(tsi) || tsi->tsi_stopping) { 1049 spin_unlock(&tsi->tsi_lock); 1050 continue; 1051 } 1052 1053 tsi->tsi_stopping = 1; 1054 1055 if (!force) { 1056 spin_unlock(&tsi->tsi_lock); 1057 continue; 1058 } 1059 1060 /* abort launched rpcs in the test */ 1061 list_for_each_entry(rpc, &tsi->tsi_active_rpcs, crpc_list) { 1062 spin_lock(&rpc->crpc_lock); 1063 1064 srpc_abort_rpc(rpc, -EINTR); 1065 1066 spin_unlock(&rpc->crpc_lock); 1067 } 1068 1069 spin_unlock(&tsi->tsi_lock); 1070 } 1071 1072 return 0; 1073} 1074 1075int 1076sfw_query_batch (sfw_batch_t *tsb, int testidx, srpc_batch_reply_t *reply) 1077{ 1078 sfw_test_instance_t *tsi; 1079 1080 if (testidx < 0) 1081 return -EINVAL; 1082 1083 if (testidx == 0) { 1084 reply->bar_active = atomic_read(&tsb->bat_nactive); 1085 return 0; 1086 } 1087 1088 list_for_each_entry (tsi, &tsb->bat_tests, tsi_list) { 1089 if (testidx-- > 1) 1090 continue; 1091 1092 reply->bar_active = atomic_read(&tsi->tsi_nactive); 1093 return 0; 1094 } 1095 1096 return -ENOENT; 1097} 1098 1099void 1100sfw_free_pages (srpc_server_rpc_t *rpc) 1101{ 1102 srpc_free_bulk(rpc->srpc_bulk); 1103 rpc->srpc_bulk = NULL; 1104} 1105 1106int 1107sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int len, 1108 int sink) 1109{ 1110 LASSERT(rpc->srpc_bulk == NULL); 1111 LASSERT(npages > 0 && npages <= LNET_MAX_IOV); 1112 1113 rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, len, sink); 1114 if (rpc->srpc_bulk == NULL) 1115 return -ENOMEM; 1116 1117 return 0; 1118} 1119 1120int 1121sfw_add_test (srpc_server_rpc_t *rpc) 1122{ 1123 sfw_session_t *sn = sfw_data.fw_session; 1124 srpc_test_reply_t *reply = &rpc->srpc_replymsg.msg_body.tes_reply; 1125 srpc_test_reqst_t *request; 1126 int rc; 1127 sfw_batch_t *bat; 1128 1129 request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst; 1130 reply->tsr_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id; 1131 1132 if (request->tsr_loop == 0 || 1133 request->tsr_concur == 0 || 1134 request->tsr_sid.ses_nid == LNET_NID_ANY || 1135 request->tsr_ndest > SFW_MAX_NDESTS || 1136 (request->tsr_is_client && request->tsr_ndest == 0) || 1137 request->tsr_concur > SFW_MAX_CONCUR || 1138 request->tsr_service > SRPC_SERVICE_MAX_ID || 1139 request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) { 1140 reply->tsr_status = EINVAL; 1141 return 0; 1142 } 1143 1144 if (sn == NULL || !sfw_sid_equal(request->tsr_sid, sn->sn_id) || 1145 sfw_find_test_case(request->tsr_service) == NULL) { 1146 reply->tsr_status = ENOENT; 1147 return 0; 1148 } 1149 1150 bat = sfw_bid2batch(request->tsr_bid); 1151 if (bat == NULL) { 1152 CERROR ("Dropping RPC (%s) from %s under memory pressure.\n", 1153 rpc->srpc_scd->scd_svc->sv_name, 1154 libcfs_id2str(rpc->srpc_peer)); 1155 return -ENOMEM; 1156 } 1157 1158 if (sfw_batch_active(bat)) { 1159 reply->tsr_status = EBUSY; 1160 return 0; 1161 } 1162 1163 if (request->tsr_is_client && rpc->srpc_bulk == NULL) { 1164 /* rpc will be resumed later in sfw_bulk_ready */ 1165 int npg = sfw_id_pages(request->tsr_ndest); 1166 int len; 1167 1168 if ((sn->sn_features & LST_FEAT_BULK_LEN) == 0) { 1169 len = npg * PAGE_CACHE_SIZE; 1170 1171 } else { 1172 len = sizeof(lnet_process_id_packed_t) * 1173 request->tsr_ndest; 1174 } 1175 1176 return sfw_alloc_pages(rpc, CFS_CPT_ANY, npg, len, 1); 1177 } 1178 1179 rc = sfw_add_test_instance(bat, rpc); 1180 CDEBUG (rc == 0 ? D_NET : D_WARNING, 1181 "%s test: sv %d %s, loop %d, concur %d, ndest %d\n", 1182 rc == 0 ? "Added" : "Failed to add", request->tsr_service, 1183 request->tsr_is_client ? "client" : "server", 1184 request->tsr_loop, request->tsr_concur, request->tsr_ndest); 1185 1186 reply->tsr_status = (rc < 0) ? -rc : rc; 1187 return 0; 1188} 1189 1190int 1191sfw_control_batch (srpc_batch_reqst_t *request, srpc_batch_reply_t *reply) 1192{ 1193 sfw_session_t *sn = sfw_data.fw_session; 1194 int rc = 0; 1195 sfw_batch_t *bat; 1196 1197 reply->bar_sid = (sn == NULL) ? LST_INVALID_SID : sn->sn_id; 1198 1199 if (sn == NULL || !sfw_sid_equal(request->bar_sid, sn->sn_id)) { 1200 reply->bar_status = ESRCH; 1201 return 0; 1202 } 1203 1204 bat = sfw_find_batch(request->bar_bid); 1205 if (bat == NULL) { 1206 reply->bar_status = ENOENT; 1207 return 0; 1208 } 1209 1210 switch (request->bar_opc) { 1211 case SRPC_BATCH_OPC_RUN: 1212 rc = sfw_run_batch(bat); 1213 break; 1214 1215 case SRPC_BATCH_OPC_STOP: 1216 rc = sfw_stop_batch(bat, request->bar_arg); 1217 break; 1218 1219 case SRPC_BATCH_OPC_QUERY: 1220 rc = sfw_query_batch(bat, request->bar_testidx, reply); 1221 break; 1222 1223 default: 1224 return -EINVAL; /* drop it */ 1225 } 1226 1227 reply->bar_status = (rc < 0) ? -rc : rc; 1228 return 0; 1229} 1230 1231int 1232sfw_handle_server_rpc(struct srpc_server_rpc *rpc) 1233{ 1234 struct srpc_service *sv = rpc->srpc_scd->scd_svc; 1235 srpc_msg_t *reply = &rpc->srpc_replymsg; 1236 srpc_msg_t *request = &rpc->srpc_reqstbuf->buf_msg; 1237 unsigned features = LST_FEATS_MASK; 1238 int rc = 0; 1239 1240 LASSERT(sfw_data.fw_active_srpc == NULL); 1241 LASSERT(sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID); 1242 1243 spin_lock(&sfw_data.fw_lock); 1244 1245 if (sfw_data.fw_shuttingdown) { 1246 spin_unlock(&sfw_data.fw_lock); 1247 return -ESHUTDOWN; 1248 } 1249 1250 /* Remove timer to avoid racing with it or expiring active session */ 1251 if (sfw_del_session_timer() != 0) { 1252 CERROR("Dropping RPC (%s) from %s: racing with expiry timer.", 1253 sv->sv_name, libcfs_id2str(rpc->srpc_peer)); 1254 spin_unlock(&sfw_data.fw_lock); 1255 return -EAGAIN; 1256 } 1257 1258 sfw_data.fw_active_srpc = rpc; 1259 spin_unlock(&sfw_data.fw_lock); 1260 1261 sfw_unpack_message(request); 1262 LASSERT(request->msg_type == srpc_service2request(sv->sv_id)); 1263 1264 /* rpc module should have checked this */ 1265 LASSERT(request->msg_version == SRPC_MSG_VERSION); 1266 1267 if (sv->sv_id != SRPC_SERVICE_MAKE_SESSION && 1268 sv->sv_id != SRPC_SERVICE_DEBUG) { 1269 sfw_session_t *sn = sfw_data.fw_session; 1270 1271 if (sn != NULL && 1272 sn->sn_features != request->msg_ses_feats) { 1273 CNETERR("Features of framework RPC don't match " 1274 "features of current session: %x/%x\n", 1275 request->msg_ses_feats, sn->sn_features); 1276 reply->msg_body.reply.status = EPROTO; 1277 reply->msg_body.reply.sid = sn->sn_id; 1278 goto out; 1279 } 1280 1281 } else if ((request->msg_ses_feats & ~LST_FEATS_MASK) != 0) { 1282 /* NB: at this point, old version will ignore features and 1283 * create new session anyway, so console should be able 1284 * to handle this */ 1285 reply->msg_body.reply.status = EPROTO; 1286 goto out; 1287 } 1288 1289 switch(sv->sv_id) { 1290 default: 1291 LBUG (); 1292 case SRPC_SERVICE_TEST: 1293 rc = sfw_add_test(rpc); 1294 break; 1295 1296 case SRPC_SERVICE_BATCH: 1297 rc = sfw_control_batch(&request->msg_body.bat_reqst, 1298 &reply->msg_body.bat_reply); 1299 break; 1300 1301 case SRPC_SERVICE_QUERY_STAT: 1302 rc = sfw_get_stats(&request->msg_body.stat_reqst, 1303 &reply->msg_body.stat_reply); 1304 break; 1305 1306 case SRPC_SERVICE_DEBUG: 1307 rc = sfw_debug_session(&request->msg_body.dbg_reqst, 1308 &reply->msg_body.dbg_reply); 1309 break; 1310 1311 case SRPC_SERVICE_MAKE_SESSION: 1312 rc = sfw_make_session(&request->msg_body.mksn_reqst, 1313 &reply->msg_body.mksn_reply); 1314 break; 1315 1316 case SRPC_SERVICE_REMOVE_SESSION: 1317 rc = sfw_remove_session(&request->msg_body.rmsn_reqst, 1318 &reply->msg_body.rmsn_reply); 1319 break; 1320 } 1321 1322 if (sfw_data.fw_session != NULL) 1323 features = sfw_data.fw_session->sn_features; 1324 out: 1325 reply->msg_ses_feats = features; 1326 rpc->srpc_done = sfw_server_rpc_done; 1327 spin_lock(&sfw_data.fw_lock); 1328 1329 if (!sfw_data.fw_shuttingdown) 1330 sfw_add_session_timer(); 1331 1332 sfw_data.fw_active_srpc = NULL; 1333 spin_unlock(&sfw_data.fw_lock); 1334 return rc; 1335} 1336 1337int 1338sfw_bulk_ready(struct srpc_server_rpc *rpc, int status) 1339{ 1340 struct srpc_service *sv = rpc->srpc_scd->scd_svc; 1341 int rc; 1342 1343 LASSERT(rpc->srpc_bulk != NULL); 1344 LASSERT(sv->sv_id == SRPC_SERVICE_TEST); 1345 LASSERT(sfw_data.fw_active_srpc == NULL); 1346 LASSERT(rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client); 1347 1348 spin_lock(&sfw_data.fw_lock); 1349 1350 if (status != 0) { 1351 CERROR("Bulk transfer failed for RPC: " 1352 "service %s, peer %s, status %d\n", 1353 sv->sv_name, libcfs_id2str(rpc->srpc_peer), status); 1354 spin_unlock(&sfw_data.fw_lock); 1355 return -EIO; 1356 } 1357 1358 if (sfw_data.fw_shuttingdown) { 1359 spin_unlock(&sfw_data.fw_lock); 1360 return -ESHUTDOWN; 1361 } 1362 1363 if (sfw_del_session_timer() != 0) { 1364 CERROR("Dropping RPC (%s) from %s: racing with expiry timer", 1365 sv->sv_name, libcfs_id2str(rpc->srpc_peer)); 1366 spin_unlock(&sfw_data.fw_lock); 1367 return -EAGAIN; 1368 } 1369 1370 sfw_data.fw_active_srpc = rpc; 1371 spin_unlock(&sfw_data.fw_lock); 1372 1373 rc = sfw_add_test(rpc); 1374 1375 spin_lock(&sfw_data.fw_lock); 1376 1377 if (!sfw_data.fw_shuttingdown) 1378 sfw_add_session_timer(); 1379 1380 sfw_data.fw_active_srpc = NULL; 1381 spin_unlock(&sfw_data.fw_lock); 1382 return rc; 1383} 1384 1385srpc_client_rpc_t * 1386sfw_create_rpc(lnet_process_id_t peer, int service, 1387 unsigned features, int nbulkiov, int bulklen, 1388 void (*done)(srpc_client_rpc_t *), void *priv) 1389{ 1390 srpc_client_rpc_t *rpc = NULL; 1391 1392 spin_lock(&sfw_data.fw_lock); 1393 1394 LASSERT (!sfw_data.fw_shuttingdown); 1395 LASSERT (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID); 1396 1397 if (nbulkiov == 0 && !list_empty(&sfw_data.fw_zombie_rpcs)) { 1398 rpc = list_entry(sfw_data.fw_zombie_rpcs.next, 1399 srpc_client_rpc_t, crpc_list); 1400 list_del(&rpc->crpc_list); 1401 1402 srpc_init_client_rpc(rpc, peer, service, 0, 0, 1403 done, sfw_client_rpc_fini, priv); 1404 } 1405 1406 spin_unlock(&sfw_data.fw_lock); 1407 1408 if (rpc == NULL) { 1409 rpc = srpc_create_client_rpc(peer, service, 1410 nbulkiov, bulklen, done, 1411 nbulkiov != 0 ? NULL : 1412 sfw_client_rpc_fini, 1413 priv); 1414 } 1415 1416 if (rpc != NULL) /* "session" is concept in framework */ 1417 rpc->crpc_reqstmsg.msg_ses_feats = features; 1418 1419 return rpc; 1420} 1421 1422void 1423sfw_unpack_message (srpc_msg_t *msg) 1424{ 1425 if (msg->msg_magic == SRPC_MSG_MAGIC) 1426 return; /* no flipping needed */ 1427 1428 /* srpc module should guarantee I wouldn't get crap */ 1429 LASSERT (msg->msg_magic == __swab32(SRPC_MSG_MAGIC)); 1430 1431 if (msg->msg_type == SRPC_MSG_STAT_REQST) { 1432 srpc_stat_reqst_t *req = &msg->msg_body.stat_reqst; 1433 1434 __swab32s(&req->str_type); 1435 __swab64s(&req->str_rpyid); 1436 sfw_unpack_sid(req->str_sid); 1437 return; 1438 } 1439 1440 if (msg->msg_type == SRPC_MSG_STAT_REPLY) { 1441 srpc_stat_reply_t *rep = &msg->msg_body.stat_reply; 1442 1443 __swab32s(&rep->str_status); 1444 sfw_unpack_sid(rep->str_sid); 1445 sfw_unpack_fw_counters(rep->str_fw); 1446 sfw_unpack_rpc_counters(rep->str_rpc); 1447 sfw_unpack_lnet_counters(rep->str_lnet); 1448 return; 1449 } 1450 1451 if (msg->msg_type == SRPC_MSG_MKSN_REQST) { 1452 srpc_mksn_reqst_t *req = &msg->msg_body.mksn_reqst; 1453 1454 __swab64s(&req->mksn_rpyid); 1455 __swab32s(&req->mksn_force); 1456 sfw_unpack_sid(req->mksn_sid); 1457 return; 1458 } 1459 1460 if (msg->msg_type == SRPC_MSG_MKSN_REPLY) { 1461 srpc_mksn_reply_t *rep = &msg->msg_body.mksn_reply; 1462 1463 __swab32s(&rep->mksn_status); 1464 __swab32s(&rep->mksn_timeout); 1465 sfw_unpack_sid(rep->mksn_sid); 1466 return; 1467 } 1468 1469 if (msg->msg_type == SRPC_MSG_RMSN_REQST) { 1470 srpc_rmsn_reqst_t *req = &msg->msg_body.rmsn_reqst; 1471 1472 __swab64s(&req->rmsn_rpyid); 1473 sfw_unpack_sid(req->rmsn_sid); 1474 return; 1475 } 1476 1477 if (msg->msg_type == SRPC_MSG_RMSN_REPLY) { 1478 srpc_rmsn_reply_t *rep = &msg->msg_body.rmsn_reply; 1479 1480 __swab32s(&rep->rmsn_status); 1481 sfw_unpack_sid(rep->rmsn_sid); 1482 return; 1483 } 1484 1485 if (msg->msg_type == SRPC_MSG_DEBUG_REQST) { 1486 srpc_debug_reqst_t *req = &msg->msg_body.dbg_reqst; 1487 1488 __swab64s(&req->dbg_rpyid); 1489 __swab32s(&req->dbg_flags); 1490 sfw_unpack_sid(req->dbg_sid); 1491 return; 1492 } 1493 1494 if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) { 1495 srpc_debug_reply_t *rep = &msg->msg_body.dbg_reply; 1496 1497 __swab32s(&rep->dbg_nbatch); 1498 __swab32s(&rep->dbg_timeout); 1499 sfw_unpack_sid(rep->dbg_sid); 1500 return; 1501 } 1502 1503 if (msg->msg_type == SRPC_MSG_BATCH_REQST) { 1504 srpc_batch_reqst_t *req = &msg->msg_body.bat_reqst; 1505 1506 __swab32s(&req->bar_opc); 1507 __swab64s(&req->bar_rpyid); 1508 __swab32s(&req->bar_testidx); 1509 __swab32s(&req->bar_arg); 1510 sfw_unpack_sid(req->bar_sid); 1511 __swab64s(&req->bar_bid.bat_id); 1512 return; 1513 } 1514 1515 if (msg->msg_type == SRPC_MSG_BATCH_REPLY) { 1516 srpc_batch_reply_t *rep = &msg->msg_body.bat_reply; 1517 1518 __swab32s(&rep->bar_status); 1519 sfw_unpack_sid(rep->bar_sid); 1520 return; 1521 } 1522 1523 if (msg->msg_type == SRPC_MSG_TEST_REQST) { 1524 srpc_test_reqst_t *req = &msg->msg_body.tes_reqst; 1525 1526 __swab64s(&req->tsr_rpyid); 1527 __swab64s(&req->tsr_bulkid); 1528 __swab32s(&req->tsr_loop); 1529 __swab32s(&req->tsr_ndest); 1530 __swab32s(&req->tsr_concur); 1531 __swab32s(&req->tsr_service); 1532 sfw_unpack_sid(req->tsr_sid); 1533 __swab64s(&req->tsr_bid.bat_id); 1534 return; 1535 } 1536 1537 if (msg->msg_type == SRPC_MSG_TEST_REPLY) { 1538 srpc_test_reply_t *rep = &msg->msg_body.tes_reply; 1539 1540 __swab32s(&rep->tsr_status); 1541 sfw_unpack_sid(rep->tsr_sid); 1542 return; 1543 } 1544 1545 if (msg->msg_type == SRPC_MSG_JOIN_REQST) { 1546 srpc_join_reqst_t *req = &msg->msg_body.join_reqst; 1547 1548 __swab64s(&req->join_rpyid); 1549 sfw_unpack_sid(req->join_sid); 1550 return; 1551 } 1552 1553 if (msg->msg_type == SRPC_MSG_JOIN_REPLY) { 1554 srpc_join_reply_t *rep = &msg->msg_body.join_reply; 1555 1556 __swab32s(&rep->join_status); 1557 __swab32s(&rep->join_timeout); 1558 sfw_unpack_sid(rep->join_sid); 1559 return; 1560 } 1561 1562 LBUG (); 1563 return; 1564} 1565 1566void 1567sfw_abort_rpc (srpc_client_rpc_t *rpc) 1568{ 1569 LASSERT(atomic_read(&rpc->crpc_refcount) > 0); 1570 LASSERT(rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID); 1571 1572 spin_lock(&rpc->crpc_lock); 1573 srpc_abort_rpc(rpc, -EINTR); 1574 spin_unlock(&rpc->crpc_lock); 1575 return; 1576} 1577 1578void 1579sfw_post_rpc (srpc_client_rpc_t *rpc) 1580{ 1581 spin_lock(&rpc->crpc_lock); 1582 1583 LASSERT (!rpc->crpc_closed); 1584 LASSERT (!rpc->crpc_aborted); 1585 LASSERT (list_empty(&rpc->crpc_list)); 1586 LASSERT (!sfw_data.fw_shuttingdown); 1587 1588 rpc->crpc_timeout = rpc_timeout; 1589 srpc_post_rpc(rpc); 1590 1591 spin_unlock(&rpc->crpc_lock); 1592 return; 1593} 1594 1595static srpc_service_t sfw_services[] = 1596{ 1597 { 1598 /* sv_id */ SRPC_SERVICE_DEBUG, 1599 /* sv_name */ "debug", 1600 0 1601 }, 1602 { 1603 /* sv_id */ SRPC_SERVICE_QUERY_STAT, 1604 /* sv_name */ "query stats", 1605 0 1606 }, 1607 { 1608 /* sv_id */ SRPC_SERVICE_MAKE_SESSION, 1609 /* sv_name */ "make session", 1610 0 1611 }, 1612 { 1613 /* sv_id */ SRPC_SERVICE_REMOVE_SESSION, 1614 /* sv_name */ "remove session", 1615 0 1616 }, 1617 { 1618 /* sv_id */ SRPC_SERVICE_BATCH, 1619 /* sv_name */ "batch service", 1620 0 1621 }, 1622 { 1623 /* sv_id */ SRPC_SERVICE_TEST, 1624 /* sv_name */ "test service", 1625 0 1626 }, 1627 { 1628 /* sv_id */ 0, 1629 /* sv_name */ NULL, 1630 0 1631 } 1632}; 1633 1634extern sfw_test_client_ops_t ping_test_client; 1635extern srpc_service_t ping_test_service; 1636extern void ping_init_test_client(void); 1637extern void ping_init_test_service(void); 1638 1639extern sfw_test_client_ops_t brw_test_client; 1640extern srpc_service_t brw_test_service; 1641extern void brw_init_test_client(void); 1642extern void brw_init_test_service(void); 1643 1644 1645int 1646sfw_startup (void) 1647{ 1648 int i; 1649 int rc; 1650 int error; 1651 srpc_service_t *sv; 1652 sfw_test_case_t *tsc; 1653 1654 1655 if (session_timeout < 0) { 1656 CERROR ("Session timeout must be non-negative: %d\n", 1657 session_timeout); 1658 return -EINVAL; 1659 } 1660 1661 if (rpc_timeout < 0) { 1662 CERROR ("RPC timeout must be non-negative: %d\n", 1663 rpc_timeout); 1664 return -EINVAL; 1665 } 1666 1667 if (session_timeout == 0) 1668 CWARN ("Zero session_timeout specified " 1669 "- test sessions never expire.\n"); 1670 1671 if (rpc_timeout == 0) 1672 CWARN ("Zero rpc_timeout specified " 1673 "- test RPC never expire.\n"); 1674 1675 memset(&sfw_data, 0, sizeof(struct smoketest_framework)); 1676 1677 sfw_data.fw_session = NULL; 1678 sfw_data.fw_active_srpc = NULL; 1679 spin_lock_init(&sfw_data.fw_lock); 1680 atomic_set(&sfw_data.fw_nzombies, 0); 1681 INIT_LIST_HEAD(&sfw_data.fw_tests); 1682 INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs); 1683 INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions); 1684 1685 brw_init_test_client(); 1686 brw_init_test_service(); 1687 rc = sfw_register_test(&brw_test_service, &brw_test_client); 1688 LASSERT (rc == 0); 1689 1690 ping_init_test_client(); 1691 ping_init_test_service(); 1692 rc = sfw_register_test(&ping_test_service, &ping_test_client); 1693 LASSERT (rc == 0); 1694 1695 error = 0; 1696 list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) { 1697 sv = tsc->tsc_srv_service; 1698 1699 rc = srpc_add_service(sv); 1700 LASSERT (rc != -EBUSY); 1701 if (rc != 0) { 1702 CWARN ("Failed to add %s service: %d\n", 1703 sv->sv_name, rc); 1704 error = rc; 1705 } 1706 } 1707 1708 for (i = 0; ; i++) { 1709 sv = &sfw_services[i]; 1710 if (sv->sv_name == NULL) break; 1711 1712 sv->sv_bulk_ready = NULL; 1713 sv->sv_handler = sfw_handle_server_rpc; 1714 sv->sv_wi_total = SFW_FRWK_WI_MAX; 1715 if (sv->sv_id == SRPC_SERVICE_TEST) 1716 sv->sv_bulk_ready = sfw_bulk_ready; 1717 1718 rc = srpc_add_service(sv); 1719 LASSERT (rc != -EBUSY); 1720 if (rc != 0) { 1721 CWARN ("Failed to add %s service: %d\n", 1722 sv->sv_name, rc); 1723 error = rc; 1724 } 1725 1726 /* about to sfw_shutdown, no need to add buffer */ 1727 if (error) continue; 1728 1729 rc = srpc_service_add_buffers(sv, sv->sv_wi_total); 1730 if (rc != 0) { 1731 CWARN("Failed to reserve enough buffers: " 1732 "service %s, %d needed: %d\n", 1733 sv->sv_name, sv->sv_wi_total, rc); 1734 error = -ENOMEM; 1735 } 1736 } 1737 1738 if (error != 0) 1739 sfw_shutdown(); 1740 return error; 1741} 1742 1743void 1744sfw_shutdown (void) 1745{ 1746 srpc_service_t *sv; 1747 sfw_test_case_t *tsc; 1748 int i; 1749 1750 spin_lock(&sfw_data.fw_lock); 1751 1752 sfw_data.fw_shuttingdown = 1; 1753 lst_wait_until(sfw_data.fw_active_srpc == NULL, sfw_data.fw_lock, 1754 "waiting for active RPC to finish.\n"); 1755 1756 if (sfw_del_session_timer() != 0) 1757 lst_wait_until(sfw_data.fw_session == NULL, sfw_data.fw_lock, 1758 "waiting for session timer to explode.\n"); 1759 1760 sfw_deactivate_session(); 1761 lst_wait_until(atomic_read(&sfw_data.fw_nzombies) == 0, 1762 sfw_data.fw_lock, 1763 "waiting for %d zombie sessions to die.\n", 1764 atomic_read(&sfw_data.fw_nzombies)); 1765 1766 spin_unlock(&sfw_data.fw_lock); 1767 1768 for (i = 0; ; i++) { 1769 sv = &sfw_services[i]; 1770 if (sv->sv_name == NULL) 1771 break; 1772 1773 srpc_shutdown_service(sv); 1774 srpc_remove_service(sv); 1775 } 1776 1777 list_for_each_entry (tsc, &sfw_data.fw_tests, tsc_list) { 1778 sv = tsc->tsc_srv_service; 1779 srpc_shutdown_service(sv); 1780 srpc_remove_service(sv); 1781 } 1782 1783 while (!list_empty(&sfw_data.fw_zombie_rpcs)) { 1784 srpc_client_rpc_t *rpc; 1785 1786 rpc = list_entry(sfw_data.fw_zombie_rpcs.next, 1787 srpc_client_rpc_t, crpc_list); 1788 list_del(&rpc->crpc_list); 1789 1790 LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc)); 1791 } 1792 1793 for (i = 0; ; i++) { 1794 sv = &sfw_services[i]; 1795 if (sv->sv_name == NULL) 1796 break; 1797 1798 srpc_wait_service_shutdown(sv); 1799 } 1800 1801 while (!list_empty(&sfw_data.fw_tests)) { 1802 tsc = list_entry(sfw_data.fw_tests.next, 1803 sfw_test_case_t, tsc_list); 1804 1805 srpc_wait_service_shutdown(tsc->tsc_srv_service); 1806 1807 list_del(&tsc->tsc_list); 1808 LIBCFS_FREE(tsc, sizeof(*tsc)); 1809 } 1810 1811 return; 1812} 1813