rpc.c revision 23ebb3fd1dfd70bd167b1c4758c0a23fbdd25650
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lnet/selftest/rpc.c 37 * 38 * Author: Isaac Huang <isaac@clusterfs.com> 39 * 40 * 2012-05-13: Liang Zhen <liang@whamcloud.com> 41 * - percpt data for service to improve smp performance 42 * - code cleanup 43 */ 44 45#define DEBUG_SUBSYSTEM S_LNET 46 47#include "selftest.h" 48 49typedef enum { 50 SRPC_STATE_NONE, 51 SRPC_STATE_NI_INIT, 52 SRPC_STATE_EQ_INIT, 53 SRPC_STATE_RUNNING, 54 SRPC_STATE_STOPPING, 55} srpc_state_t; 56 57struct smoketest_rpc { 58 spinlock_t rpc_glock; /* global lock */ 59 srpc_service_t *rpc_services[SRPC_SERVICE_MAX_ID + 1]; 60 lnet_handle_eq_t rpc_lnet_eq; /* _the_ LNet event queue */ 61 srpc_state_t rpc_state; 62 srpc_counters_t rpc_counters; 63 __u64 rpc_matchbits; /* matchbits counter */ 64} srpc_data; 65 66static inline int 67srpc_serv_portal(int svc_id) 68{ 69 return svc_id < SRPC_FRAMEWORK_SERVICE_MAX_ID ? 70 SRPC_FRAMEWORK_REQUEST_PORTAL : SRPC_REQUEST_PORTAL; 71} 72 73/* forward ref's */ 74int srpc_handle_rpc (swi_workitem_t *wi); 75 76void srpc_get_counters (srpc_counters_t *cnt) 77{ 78 spin_lock(&srpc_data.rpc_glock); 79 *cnt = srpc_data.rpc_counters; 80 spin_unlock(&srpc_data.rpc_glock); 81} 82 83void srpc_set_counters (const srpc_counters_t *cnt) 84{ 85 spin_lock(&srpc_data.rpc_glock); 86 srpc_data.rpc_counters = *cnt; 87 spin_unlock(&srpc_data.rpc_glock); 88} 89 90int 91srpc_add_bulk_page(srpc_bulk_t *bk, struct page *pg, int i, int nob) 92{ 93 nob = min(nob, (int)PAGE_CACHE_SIZE); 94 95 LASSERT(nob > 0); 96 LASSERT(i >= 0 && i < bk->bk_niov); 97 98 bk->bk_iovs[i].kiov_offset = 0; 99 bk->bk_iovs[i].kiov_page = pg; 100 bk->bk_iovs[i].kiov_len = nob; 101 return nob; 102} 103 104void 105srpc_free_bulk (srpc_bulk_t *bk) 106{ 107 int i; 108 struct page *pg; 109 110 LASSERT (bk != NULL); 111 112 for (i = 0; i < bk->bk_niov; i++) { 113 pg = bk->bk_iovs[i].kiov_page; 114 if (pg == NULL) break; 115 116 __free_page(pg); 117 } 118 119 LIBCFS_FREE(bk, offsetof(srpc_bulk_t, bk_iovs[bk->bk_niov])); 120 return; 121} 122 123srpc_bulk_t * 124srpc_alloc_bulk(int cpt, unsigned bulk_npg, unsigned bulk_len, int sink) 125{ 126 srpc_bulk_t *bk; 127 int i; 128 129 LASSERT(bulk_npg > 0 && bulk_npg <= LNET_MAX_IOV); 130 131 LIBCFS_CPT_ALLOC(bk, lnet_cpt_table(), cpt, 132 offsetof(srpc_bulk_t, bk_iovs[bulk_npg])); 133 if (bk == NULL) { 134 CERROR("Can't allocate descriptor for %d pages\n", bulk_npg); 135 return NULL; 136 } 137 138 memset(bk, 0, offsetof(srpc_bulk_t, bk_iovs[bulk_npg])); 139 bk->bk_sink = sink; 140 bk->bk_len = bulk_len; 141 bk->bk_niov = bulk_npg; 142 143 for (i = 0; i < bulk_npg; i++) { 144 struct page *pg; 145 int nob; 146 147 pg = alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt), 148 GFP_IOFS, 0); 149 if (pg == NULL) { 150 CERROR("Can't allocate page %d of %d\n", i, bulk_npg); 151 srpc_free_bulk(bk); 152 return NULL; 153 } 154 155 nob = srpc_add_bulk_page(bk, pg, i, bulk_len); 156 bulk_len -= nob; 157 } 158 159 return bk; 160} 161 162static inline __u64 163srpc_next_id (void) 164{ 165 __u64 id; 166 167 spin_lock(&srpc_data.rpc_glock); 168 id = srpc_data.rpc_matchbits++; 169 spin_unlock(&srpc_data.rpc_glock); 170 return id; 171} 172 173void 174srpc_init_server_rpc(struct srpc_server_rpc *rpc, 175 struct srpc_service_cd *scd, 176 struct srpc_buffer *buffer) 177{ 178 memset(rpc, 0, sizeof(*rpc)); 179 swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc, 180 srpc_serv_is_framework(scd->scd_svc) ? 181 lst_sched_serial : lst_sched_test[scd->scd_cpt]); 182 183 rpc->srpc_ev.ev_fired = 1; /* no event expected now */ 184 185 rpc->srpc_scd = scd; 186 rpc->srpc_reqstbuf = buffer; 187 rpc->srpc_peer = buffer->buf_peer; 188 rpc->srpc_self = buffer->buf_self; 189 LNetInvalidateHandle(&rpc->srpc_replymdh); 190} 191 192static void 193srpc_service_fini(struct srpc_service *svc) 194{ 195 struct srpc_service_cd *scd; 196 struct srpc_server_rpc *rpc; 197 struct srpc_buffer *buf; 198 struct list_head *q; 199 int i; 200 201 if (svc->sv_cpt_data == NULL) 202 return; 203 204 cfs_percpt_for_each(scd, i, svc->sv_cpt_data) { 205 while (1) { 206 if (!list_empty(&scd->scd_buf_posted)) 207 q = &scd->scd_buf_posted; 208 else if (!list_empty(&scd->scd_buf_blocked)) 209 q = &scd->scd_buf_blocked; 210 else 211 break; 212 213 while (!list_empty(q)) { 214 buf = list_entry(q->next, 215 struct srpc_buffer, 216 buf_list); 217 list_del(&buf->buf_list); 218 LIBCFS_FREE(buf, sizeof(*buf)); 219 } 220 } 221 222 LASSERT(list_empty(&scd->scd_rpc_active)); 223 224 while (!list_empty(&scd->scd_rpc_free)) { 225 rpc = list_entry(scd->scd_rpc_free.next, 226 struct srpc_server_rpc, 227 srpc_list); 228 list_del(&rpc->srpc_list); 229 LIBCFS_FREE(rpc, sizeof(*rpc)); 230 } 231 } 232 233 cfs_percpt_free(svc->sv_cpt_data); 234 svc->sv_cpt_data = NULL; 235} 236 237static int 238srpc_service_nrpcs(struct srpc_service *svc) 239{ 240 int nrpcs = svc->sv_wi_total / svc->sv_ncpts; 241 242 return srpc_serv_is_framework(svc) ? 243 max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN); 244} 245 246int srpc_add_buffer(struct swi_workitem *wi); 247 248static int 249srpc_service_init(struct srpc_service *svc) 250{ 251 struct srpc_service_cd *scd; 252 struct srpc_server_rpc *rpc; 253 int nrpcs; 254 int i; 255 int j; 256 257 svc->sv_shuttingdown = 0; 258 259 svc->sv_cpt_data = cfs_percpt_alloc(lnet_cpt_table(), 260 sizeof(struct srpc_service_cd)); 261 if (svc->sv_cpt_data == NULL) 262 return -ENOMEM; 263 264 svc->sv_ncpts = srpc_serv_is_framework(svc) ? 265 1 : cfs_cpt_number(lnet_cpt_table()); 266 nrpcs = srpc_service_nrpcs(svc); 267 268 cfs_percpt_for_each(scd, i, svc->sv_cpt_data) { 269 scd->scd_cpt = i; 270 scd->scd_svc = svc; 271 spin_lock_init(&scd->scd_lock); 272 INIT_LIST_HEAD(&scd->scd_rpc_free); 273 INIT_LIST_HEAD(&scd->scd_rpc_active); 274 INIT_LIST_HEAD(&scd->scd_buf_posted); 275 INIT_LIST_HEAD(&scd->scd_buf_blocked); 276 277 scd->scd_ev.ev_data = scd; 278 scd->scd_ev.ev_type = SRPC_REQUEST_RCVD; 279 280 /* NB: don't use lst_sched_serial for adding buffer, 281 * see details in srpc_service_add_buffers() */ 282 swi_init_workitem(&scd->scd_buf_wi, scd, 283 srpc_add_buffer, lst_sched_test[i]); 284 285 if (i != 0 && srpc_serv_is_framework(svc)) { 286 /* NB: framework service only needs srpc_service_cd for 287 * one partition, but we allocate for all to make 288 * it easier to implement, it will waste a little 289 * memory but nobody should care about this */ 290 continue; 291 } 292 293 for (j = 0; j < nrpcs; j++) { 294 LIBCFS_CPT_ALLOC(rpc, lnet_cpt_table(), 295 i, sizeof(*rpc)); 296 if (rpc == NULL) { 297 srpc_service_fini(svc); 298 return -ENOMEM; 299 } 300 list_add(&rpc->srpc_list, &scd->scd_rpc_free); 301 } 302 } 303 304 return 0; 305} 306 307int 308srpc_add_service(struct srpc_service *sv) 309{ 310 int id = sv->sv_id; 311 312 LASSERT(0 <= id && id <= SRPC_SERVICE_MAX_ID); 313 314 if (srpc_service_init(sv) != 0) 315 return -ENOMEM; 316 317 spin_lock(&srpc_data.rpc_glock); 318 319 LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING); 320 321 if (srpc_data.rpc_services[id] != NULL) { 322 spin_unlock(&srpc_data.rpc_glock); 323 goto failed; 324 } 325 326 srpc_data.rpc_services[id] = sv; 327 spin_unlock(&srpc_data.rpc_glock); 328 329 CDEBUG(D_NET, "Adding service: id %d, name %s\n", id, sv->sv_name); 330 return 0; 331 332 failed: 333 srpc_service_fini(sv); 334 return -EBUSY; 335} 336 337int 338srpc_remove_service (srpc_service_t *sv) 339{ 340 int id = sv->sv_id; 341 342 spin_lock(&srpc_data.rpc_glock); 343 344 if (srpc_data.rpc_services[id] != sv) { 345 spin_unlock(&srpc_data.rpc_glock); 346 return -ENOENT; 347 } 348 349 srpc_data.rpc_services[id] = NULL; 350 spin_unlock(&srpc_data.rpc_glock); 351 return 0; 352} 353 354int 355srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf, 356 int len, int options, lnet_process_id_t peer, 357 lnet_handle_md_t *mdh, srpc_event_t *ev) 358{ 359 int rc; 360 lnet_md_t md; 361 lnet_handle_me_t meh; 362 363 rc = LNetMEAttach(portal, peer, matchbits, 0, LNET_UNLINK, 364 local ? LNET_INS_LOCAL : LNET_INS_AFTER, &meh); 365 if (rc != 0) { 366 CERROR ("LNetMEAttach failed: %d\n", rc); 367 LASSERT (rc == -ENOMEM); 368 return -ENOMEM; 369 } 370 371 md.threshold = 1; 372 md.user_ptr = ev; 373 md.start = buf; 374 md.length = len; 375 md.options = options; 376 md.eq_handle = srpc_data.rpc_lnet_eq; 377 378 rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh); 379 if (rc != 0) { 380 CERROR ("LNetMDAttach failed: %d\n", rc); 381 LASSERT (rc == -ENOMEM); 382 383 rc = LNetMEUnlink(meh); 384 LASSERT (rc == 0); 385 return -ENOMEM; 386 } 387 388 CDEBUG (D_NET, 389 "Posted passive RDMA: peer %s, portal %d, matchbits "LPX64"\n", 390 libcfs_id2str(peer), portal, matchbits); 391 return 0; 392} 393 394int 395srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len, 396 int options, lnet_process_id_t peer, lnet_nid_t self, 397 lnet_handle_md_t *mdh, srpc_event_t *ev) 398{ 399 int rc; 400 lnet_md_t md; 401 402 md.user_ptr = ev; 403 md.start = buf; 404 md.length = len; 405 md.eq_handle = srpc_data.rpc_lnet_eq; 406 md.threshold = ((options & LNET_MD_OP_GET) != 0) ? 2 : 1; 407 md.options = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET); 408 409 rc = LNetMDBind(md, LNET_UNLINK, mdh); 410 if (rc != 0) { 411 CERROR ("LNetMDBind failed: %d\n", rc); 412 LASSERT (rc == -ENOMEM); 413 return -ENOMEM; 414 } 415 416 /* this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options. 417 * they're only meaningful for MDs attached to an ME (i.e. passive 418 * buffers... */ 419 if ((options & LNET_MD_OP_PUT) != 0) { 420 rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer, 421 portal, matchbits, 0, 0); 422 } else { 423 LASSERT ((options & LNET_MD_OP_GET) != 0); 424 425 rc = LNetGet(self, *mdh, peer, portal, matchbits, 0); 426 } 427 428 if (rc != 0) { 429 CERROR ("LNet%s(%s, %d, "LPD64") failed: %d\n", 430 ((options & LNET_MD_OP_PUT) != 0) ? "Put" : "Get", 431 libcfs_id2str(peer), portal, matchbits, rc); 432 433 /* The forthcoming unlink event will complete this operation 434 * with failure, so fall through and return success here. 435 */ 436 rc = LNetMDUnlink(*mdh); 437 LASSERT (rc == 0); 438 } else { 439 CDEBUG (D_NET, 440 "Posted active RDMA: peer %s, portal %u, matchbits "LPX64"\n", 441 libcfs_id2str(peer), portal, matchbits); 442 } 443 return 0; 444} 445 446int 447srpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf, 448 int len, lnet_handle_md_t *mdh, srpc_event_t *ev) 449{ 450 return srpc_post_active_rdma(srpc_serv_portal(service), service, 451 buf, len, LNET_MD_OP_PUT, peer, 452 LNET_NID_ANY, mdh, ev); 453} 454 455int 456srpc_post_passive_rqtbuf(int service, int local, void *buf, int len, 457 lnet_handle_md_t *mdh, srpc_event_t *ev) 458{ 459 lnet_process_id_t any = {0}; 460 461 any.nid = LNET_NID_ANY; 462 any.pid = LNET_PID_ANY; 463 464 return srpc_post_passive_rdma(srpc_serv_portal(service), 465 local, service, buf, len, 466 LNET_MD_OP_PUT, any, mdh, ev); 467} 468 469int 470srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf) 471{ 472 struct srpc_service *sv = scd->scd_svc; 473 struct srpc_msg *msg = &buf->buf_msg; 474 int rc; 475 476 LNetInvalidateHandle(&buf->buf_mdh); 477 list_add(&buf->buf_list, &scd->scd_buf_posted); 478 scd->scd_buf_nposted++; 479 spin_unlock(&scd->scd_lock); 480 481 rc = srpc_post_passive_rqtbuf(sv->sv_id, 482 !srpc_serv_is_framework(sv), 483 msg, sizeof(*msg), &buf->buf_mdh, 484 &scd->scd_ev); 485 486 /* At this point, a RPC (new or delayed) may have arrived in 487 * msg and its event handler has been called. So we must add 488 * buf to scd_buf_posted _before_ dropping scd_lock */ 489 490 spin_lock(&scd->scd_lock); 491 492 if (rc == 0) { 493 if (!sv->sv_shuttingdown) 494 return 0; 495 496 spin_unlock(&scd->scd_lock); 497 /* srpc_shutdown_service might have tried to unlink me 498 * when my buf_mdh was still invalid */ 499 LNetMDUnlink(buf->buf_mdh); 500 spin_lock(&scd->scd_lock); 501 return 0; 502 } 503 504 scd->scd_buf_nposted--; 505 if (sv->sv_shuttingdown) 506 return rc; /* don't allow to change scd_buf_posted */ 507 508 list_del(&buf->buf_list); 509 spin_unlock(&scd->scd_lock); 510 511 LIBCFS_FREE(buf, sizeof(*buf)); 512 513 spin_lock(&scd->scd_lock); 514 return rc; 515} 516 517int 518srpc_add_buffer(struct swi_workitem *wi) 519{ 520 struct srpc_service_cd *scd = wi->swi_workitem.wi_data; 521 struct srpc_buffer *buf; 522 int rc = 0; 523 524 /* it's called by workitem scheduler threads, these threads 525 * should have been set CPT affinity, so buffers will be posted 526 * on CPT local list of Portal */ 527 spin_lock(&scd->scd_lock); 528 529 while (scd->scd_buf_adjust > 0 && 530 !scd->scd_svc->sv_shuttingdown) { 531 scd->scd_buf_adjust--; /* consume it */ 532 scd->scd_buf_posting++; 533 534 spin_unlock(&scd->scd_lock); 535 536 LIBCFS_ALLOC(buf, sizeof(*buf)); 537 if (buf == NULL) { 538 CERROR("Failed to add new buf to service: %s\n", 539 scd->scd_svc->sv_name); 540 spin_lock(&scd->scd_lock); 541 rc = -ENOMEM; 542 break; 543 } 544 545 spin_lock(&scd->scd_lock); 546 if (scd->scd_svc->sv_shuttingdown) { 547 spin_unlock(&scd->scd_lock); 548 LIBCFS_FREE(buf, sizeof(*buf)); 549 550 spin_lock(&scd->scd_lock); 551 rc = -ESHUTDOWN; 552 break; 553 } 554 555 rc = srpc_service_post_buffer(scd, buf); 556 if (rc != 0) 557 break; /* buf has been freed inside */ 558 559 LASSERT(scd->scd_buf_posting > 0); 560 scd->scd_buf_posting--; 561 scd->scd_buf_total++; 562 scd->scd_buf_low = MAX(2, scd->scd_buf_total / 4); 563 } 564 565 if (rc != 0) { 566 scd->scd_buf_err_stamp = cfs_time_current_sec(); 567 scd->scd_buf_err = rc; 568 569 LASSERT(scd->scd_buf_posting > 0); 570 scd->scd_buf_posting--; 571 } 572 573 spin_unlock(&scd->scd_lock); 574 return 0; 575} 576 577int 578srpc_service_add_buffers(struct srpc_service *sv, int nbuffer) 579{ 580 struct srpc_service_cd *scd; 581 int rc = 0; 582 int i; 583 584 LASSERTF(nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer); 585 586 cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { 587 spin_lock(&scd->scd_lock); 588 589 scd->scd_buf_err = 0; 590 scd->scd_buf_err_stamp = 0; 591 scd->scd_buf_posting = 0; 592 scd->scd_buf_adjust = nbuffer; 593 /* start to post buffers */ 594 swi_schedule_workitem(&scd->scd_buf_wi); 595 spin_unlock(&scd->scd_lock); 596 597 /* framework service only post buffer for one partition */ 598 if (srpc_serv_is_framework(sv)) 599 break; 600 } 601 602 cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { 603 spin_lock(&scd->scd_lock); 604 /* 605 * NB: srpc_service_add_buffers() can be called inside 606 * thread context of lst_sched_serial, and we don't normally 607 * allow to sleep inside thread context of WI scheduler 608 * because it will block current scheduler thread from doing 609 * anything else, even worse, it could deadlock if it's 610 * waiting on result from another WI of the same scheduler. 611 * However, it's safe at here because scd_buf_wi is scheduled 612 * by thread in a different WI scheduler (lst_sched_test), 613 * so we don't have any risk of deadlock, though this could 614 * block all WIs pending on lst_sched_serial for a moment 615 * which is not good but not fatal. 616 */ 617 lst_wait_until(scd->scd_buf_err != 0 || 618 (scd->scd_buf_adjust == 0 && 619 scd->scd_buf_posting == 0), 620 scd->scd_lock, "waiting for adding buffer\n"); 621 622 if (scd->scd_buf_err != 0 && rc == 0) 623 rc = scd->scd_buf_err; 624 625 spin_unlock(&scd->scd_lock); 626 } 627 628 return rc; 629} 630 631void 632srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer) 633{ 634 struct srpc_service_cd *scd; 635 int num; 636 int i; 637 638 LASSERT(!sv->sv_shuttingdown); 639 640 cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { 641 spin_lock(&scd->scd_lock); 642 643 num = scd->scd_buf_total + scd->scd_buf_posting; 644 scd->scd_buf_adjust -= min(nbuffer, num); 645 646 spin_unlock(&scd->scd_lock); 647 } 648} 649 650/* returns 1 if sv has finished, otherwise 0 */ 651int 652srpc_finish_service(struct srpc_service *sv) 653{ 654 struct srpc_service_cd *scd; 655 struct srpc_server_rpc *rpc; 656 int i; 657 658 LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */ 659 660 cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { 661 spin_lock(&scd->scd_lock); 662 if (!swi_deschedule_workitem(&scd->scd_buf_wi)) { 663 spin_unlock(&scd->scd_lock); 664 return 0; 665 } 666 667 if (scd->scd_buf_nposted > 0) { 668 CDEBUG(D_NET, "waiting for %d posted buffers to unlink", 669 scd->scd_buf_nposted); 670 spin_unlock(&scd->scd_lock); 671 return 0; 672 } 673 674 if (list_empty(&scd->scd_rpc_active)) { 675 spin_unlock(&scd->scd_lock); 676 continue; 677 } 678 679 rpc = list_entry(scd->scd_rpc_active.next, 680 struct srpc_server_rpc, srpc_list); 681 CNETERR("Active RPC %p on shutdown: sv %s, peer %s, " 682 "wi %s scheduled %d running %d, " 683 "ev fired %d type %d status %d lnet %d\n", 684 rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), 685 swi_state2str(rpc->srpc_wi.swi_state), 686 rpc->srpc_wi.swi_workitem.wi_scheduled, 687 rpc->srpc_wi.swi_workitem.wi_running, 688 rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type, 689 rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet); 690 spin_unlock(&scd->scd_lock); 691 return 0; 692 } 693 694 /* no lock needed from now on */ 695 srpc_service_fini(sv); 696 return 1; 697} 698 699/* called with sv->sv_lock held */ 700void 701srpc_service_recycle_buffer(struct srpc_service_cd *scd, srpc_buffer_t *buf) 702{ 703 if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) { 704 if (srpc_service_post_buffer(scd, buf) != 0) { 705 CWARN("Failed to post %s buffer\n", 706 scd->scd_svc->sv_name); 707 } 708 return; 709 } 710 711 /* service is shutting down, or we want to recycle some buffers */ 712 scd->scd_buf_total--; 713 714 if (scd->scd_buf_adjust < 0) { 715 scd->scd_buf_adjust++; 716 if (scd->scd_buf_adjust < 0 && 717 scd->scd_buf_total == 0 && scd->scd_buf_posting == 0) { 718 CDEBUG(D_INFO, 719 "Try to recycle %d buffers but nothing left\n", 720 scd->scd_buf_adjust); 721 scd->scd_buf_adjust = 0; 722 } 723 } 724 725 spin_unlock(&scd->scd_lock); 726 LIBCFS_FREE(buf, sizeof(*buf)); 727 spin_lock(&scd->scd_lock); 728} 729 730void 731srpc_abort_service(struct srpc_service *sv) 732{ 733 struct srpc_service_cd *scd; 734 struct srpc_server_rpc *rpc; 735 int i; 736 737 CDEBUG(D_NET, "Aborting service: id %d, name %s\n", 738 sv->sv_id, sv->sv_name); 739 740 cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { 741 spin_lock(&scd->scd_lock); 742 743 /* schedule in-flight RPCs to notice the abort, NB: 744 * racing with incoming RPCs; complete fix should make test 745 * RPCs carry session ID in its headers */ 746 list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) { 747 rpc->srpc_aborted = 1; 748 swi_schedule_workitem(&rpc->srpc_wi); 749 } 750 751 spin_unlock(&scd->scd_lock); 752 } 753} 754 755void 756srpc_shutdown_service(srpc_service_t *sv) 757{ 758 struct srpc_service_cd *scd; 759 struct srpc_server_rpc *rpc; 760 srpc_buffer_t *buf; 761 int i; 762 763 CDEBUG(D_NET, "Shutting down service: id %d, name %s\n", 764 sv->sv_id, sv->sv_name); 765 766 cfs_percpt_for_each(scd, i, sv->sv_cpt_data) 767 spin_lock(&scd->scd_lock); 768 769 sv->sv_shuttingdown = 1; /* i.e. no new active RPC */ 770 771 cfs_percpt_for_each(scd, i, sv->sv_cpt_data) 772 spin_unlock(&scd->scd_lock); 773 774 cfs_percpt_for_each(scd, i, sv->sv_cpt_data) { 775 spin_lock(&scd->scd_lock); 776 777 /* schedule in-flight RPCs to notice the shutdown */ 778 list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) 779 swi_schedule_workitem(&rpc->srpc_wi); 780 781 spin_unlock(&scd->scd_lock); 782 783 /* OK to traverse scd_buf_posted without lock, since no one 784 * touches scd_buf_posted now */ 785 list_for_each_entry(buf, &scd->scd_buf_posted, buf_list) 786 LNetMDUnlink(buf->buf_mdh); 787 } 788} 789 790int 791srpc_send_request (srpc_client_rpc_t *rpc) 792{ 793 srpc_event_t *ev = &rpc->crpc_reqstev; 794 int rc; 795 796 ev->ev_fired = 0; 797 ev->ev_data = rpc; 798 ev->ev_type = SRPC_REQUEST_SENT; 799 800 rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service, 801 &rpc->crpc_reqstmsg, sizeof(srpc_msg_t), 802 &rpc->crpc_reqstmdh, ev); 803 if (rc != 0) { 804 LASSERT (rc == -ENOMEM); 805 ev->ev_fired = 1; /* no more event expected */ 806 } 807 return rc; 808} 809 810int 811srpc_prepare_reply (srpc_client_rpc_t *rpc) 812{ 813 srpc_event_t *ev = &rpc->crpc_replyev; 814 __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid; 815 int rc; 816 817 ev->ev_fired = 0; 818 ev->ev_data = rpc; 819 ev->ev_type = SRPC_REPLY_RCVD; 820 821 *id = srpc_next_id(); 822 823 rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id, 824 &rpc->crpc_replymsg, sizeof(srpc_msg_t), 825 LNET_MD_OP_PUT, rpc->crpc_dest, 826 &rpc->crpc_replymdh, ev); 827 if (rc != 0) { 828 LASSERT (rc == -ENOMEM); 829 ev->ev_fired = 1; /* no more event expected */ 830 } 831 return rc; 832} 833 834int 835srpc_prepare_bulk (srpc_client_rpc_t *rpc) 836{ 837 srpc_bulk_t *bk = &rpc->crpc_bulk; 838 srpc_event_t *ev = &rpc->crpc_bulkev; 839 __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid; 840 int rc; 841 int opt; 842 843 LASSERT (bk->bk_niov <= LNET_MAX_IOV); 844 845 if (bk->bk_niov == 0) return 0; /* nothing to do */ 846 847 opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET; 848 opt |= LNET_MD_KIOV; 849 850 ev->ev_fired = 0; 851 ev->ev_data = rpc; 852 ev->ev_type = SRPC_BULK_REQ_RCVD; 853 854 *id = srpc_next_id(); 855 856 rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id, 857 &bk->bk_iovs[0], bk->bk_niov, opt, 858 rpc->crpc_dest, &bk->bk_mdh, ev); 859 if (rc != 0) { 860 LASSERT (rc == -ENOMEM); 861 ev->ev_fired = 1; /* no more event expected */ 862 } 863 return rc; 864} 865 866int 867srpc_do_bulk (srpc_server_rpc_t *rpc) 868{ 869 srpc_event_t *ev = &rpc->srpc_ev; 870 srpc_bulk_t *bk = rpc->srpc_bulk; 871 __u64 id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid; 872 int rc; 873 int opt; 874 875 LASSERT (bk != NULL); 876 877 opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT; 878 opt |= LNET_MD_KIOV; 879 880 ev->ev_fired = 0; 881 ev->ev_data = rpc; 882 ev->ev_type = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT; 883 884 rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id, 885 &bk->bk_iovs[0], bk->bk_niov, opt, 886 rpc->srpc_peer, rpc->srpc_self, 887 &bk->bk_mdh, ev); 888 if (rc != 0) 889 ev->ev_fired = 1; /* no more event expected */ 890 return rc; 891} 892 893/* only called from srpc_handle_rpc */ 894void 895srpc_server_rpc_done(srpc_server_rpc_t *rpc, int status) 896{ 897 struct srpc_service_cd *scd = rpc->srpc_scd; 898 struct srpc_service *sv = scd->scd_svc; 899 srpc_buffer_t *buffer; 900 901 LASSERT (status != 0 || rpc->srpc_wi.swi_state == SWI_STATE_DONE); 902 903 rpc->srpc_status = status; 904 905 CDEBUG_LIMIT (status == 0 ? D_NET : D_NETERROR, 906 "Server RPC %p done: service %s, peer %s, status %s:%d\n", 907 rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer), 908 swi_state2str(rpc->srpc_wi.swi_state), status); 909 910 if (status != 0) { 911 spin_lock(&srpc_data.rpc_glock); 912 srpc_data.rpc_counters.rpcs_dropped++; 913 spin_unlock(&srpc_data.rpc_glock); 914 } 915 916 if (rpc->srpc_done != NULL) 917 (*rpc->srpc_done) (rpc); 918 LASSERT(rpc->srpc_bulk == NULL); 919 920 spin_lock(&scd->scd_lock); 921 922 if (rpc->srpc_reqstbuf != NULL) { 923 /* NB might drop sv_lock in srpc_service_recycle_buffer, but 924 * sv won't go away for scd_rpc_active must not be empty */ 925 srpc_service_recycle_buffer(scd, rpc->srpc_reqstbuf); 926 rpc->srpc_reqstbuf = NULL; 927 } 928 929 list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */ 930 931 /* 932 * No one can schedule me now since: 933 * - I'm not on scd_rpc_active. 934 * - all LNet events have been fired. 935 * Cancel pending schedules and prevent future schedule attempts: 936 */ 937 LASSERT(rpc->srpc_ev.ev_fired); 938 swi_exit_workitem(&rpc->srpc_wi); 939 940 if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) { 941 buffer = list_entry(scd->scd_buf_blocked.next, 942 srpc_buffer_t, buf_list); 943 list_del(&buffer->buf_list); 944 945 srpc_init_server_rpc(rpc, scd, buffer); 946 list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active); 947 swi_schedule_workitem(&rpc->srpc_wi); 948 } else { 949 list_add(&rpc->srpc_list, &scd->scd_rpc_free); 950 } 951 952 spin_unlock(&scd->scd_lock); 953 return; 954} 955 956/* handles an incoming RPC */ 957int 958srpc_handle_rpc(swi_workitem_t *wi) 959{ 960 struct srpc_server_rpc *rpc = wi->swi_workitem.wi_data; 961 struct srpc_service_cd *scd = rpc->srpc_scd; 962 struct srpc_service *sv = scd->scd_svc; 963 srpc_event_t *ev = &rpc->srpc_ev; 964 int rc = 0; 965 966 LASSERT(wi == &rpc->srpc_wi); 967 968 spin_lock(&scd->scd_lock); 969 970 if (sv->sv_shuttingdown || rpc->srpc_aborted) { 971 spin_unlock(&scd->scd_lock); 972 973 if (rpc->srpc_bulk != NULL) 974 LNetMDUnlink(rpc->srpc_bulk->bk_mdh); 975 LNetMDUnlink(rpc->srpc_replymdh); 976 977 if (ev->ev_fired) { /* no more event, OK to finish */ 978 srpc_server_rpc_done(rpc, -ESHUTDOWN); 979 return 1; 980 } 981 return 0; 982 } 983 984 spin_unlock(&scd->scd_lock); 985 986 switch (wi->swi_state) { 987 default: 988 LBUG (); 989 case SWI_STATE_NEWBORN: { 990 srpc_msg_t *msg; 991 srpc_generic_reply_t *reply; 992 993 msg = &rpc->srpc_reqstbuf->buf_msg; 994 reply = &rpc->srpc_replymsg.msg_body.reply; 995 996 if (msg->msg_magic == 0) { 997 /* moaned already in srpc_lnet_ev_handler */ 998 srpc_server_rpc_done(rpc, EBADMSG); 999 return 1; 1000 } 1001 1002 srpc_unpack_msg_hdr(msg); 1003 if (msg->msg_version != SRPC_MSG_VERSION) { 1004 CWARN("Version mismatch: %u, %u expected, from %s\n", 1005 msg->msg_version, SRPC_MSG_VERSION, 1006 libcfs_id2str(rpc->srpc_peer)); 1007 reply->status = EPROTO; 1008 /* drop through and send reply */ 1009 } else { 1010 reply->status = 0; 1011 rc = (*sv->sv_handler)(rpc); 1012 LASSERT(reply->status == 0 || !rpc->srpc_bulk); 1013 if (rc != 0) { 1014 srpc_server_rpc_done(rpc, rc); 1015 return 1; 1016 } 1017 } 1018 1019 wi->swi_state = SWI_STATE_BULK_STARTED; 1020 1021 if (rpc->srpc_bulk != NULL) { 1022 rc = srpc_do_bulk(rpc); 1023 if (rc == 0) 1024 return 0; /* wait for bulk */ 1025 1026 LASSERT (ev->ev_fired); 1027 ev->ev_status = rc; 1028 } 1029 } 1030 case SWI_STATE_BULK_STARTED: 1031 LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); 1032 1033 if (rpc->srpc_bulk != NULL) { 1034 rc = ev->ev_status; 1035 1036 if (sv->sv_bulk_ready != NULL) 1037 rc = (*sv->sv_bulk_ready) (rpc, rc); 1038 1039 if (rc != 0) { 1040 srpc_server_rpc_done(rpc, rc); 1041 return 1; 1042 } 1043 } 1044 1045 wi->swi_state = SWI_STATE_REPLY_SUBMITTED; 1046 rc = srpc_send_reply(rpc); 1047 if (rc == 0) 1048 return 0; /* wait for reply */ 1049 srpc_server_rpc_done(rpc, rc); 1050 return 1; 1051 1052 case SWI_STATE_REPLY_SUBMITTED: 1053 if (!ev->ev_fired) { 1054 CERROR("RPC %p: bulk %p, service %d\n", 1055 rpc, rpc->srpc_bulk, sv->sv_id); 1056 CERROR("Event: status %d, type %d, lnet %d\n", 1057 ev->ev_status, ev->ev_type, ev->ev_lnet); 1058 LASSERT (ev->ev_fired); 1059 } 1060 1061 wi->swi_state = SWI_STATE_DONE; 1062 srpc_server_rpc_done(rpc, ev->ev_status); 1063 return 1; 1064 } 1065 1066 return 0; 1067} 1068 1069void 1070srpc_client_rpc_expired (void *data) 1071{ 1072 srpc_client_rpc_t *rpc = data; 1073 1074 CWARN ("Client RPC expired: service %d, peer %s, timeout %d.\n", 1075 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), 1076 rpc->crpc_timeout); 1077 1078 spin_lock(&rpc->crpc_lock); 1079 1080 rpc->crpc_timeout = 0; 1081 srpc_abort_rpc(rpc, -ETIMEDOUT); 1082 1083 spin_unlock(&rpc->crpc_lock); 1084 1085 spin_lock(&srpc_data.rpc_glock); 1086 srpc_data.rpc_counters.rpcs_expired++; 1087 spin_unlock(&srpc_data.rpc_glock); 1088} 1089 1090inline void 1091srpc_add_client_rpc_timer (srpc_client_rpc_t *rpc) 1092{ 1093 stt_timer_t *timer = &rpc->crpc_timer; 1094 1095 if (rpc->crpc_timeout == 0) return; 1096 1097 INIT_LIST_HEAD(&timer->stt_list); 1098 timer->stt_data = rpc; 1099 timer->stt_func = srpc_client_rpc_expired; 1100 timer->stt_expires = cfs_time_add(rpc->crpc_timeout, 1101 cfs_time_current_sec()); 1102 stt_add_timer(timer); 1103 return; 1104} 1105 1106/* 1107 * Called with rpc->crpc_lock held. 1108 * 1109 * Upon exit the RPC expiry timer is not queued and the handler is not 1110 * running on any CPU. */ 1111void 1112srpc_del_client_rpc_timer (srpc_client_rpc_t *rpc) 1113{ 1114 /* timer not planted or already exploded */ 1115 if (rpc->crpc_timeout == 0) 1116 return; 1117 1118 /* timer successfully defused */ 1119 if (stt_del_timer(&rpc->crpc_timer)) 1120 return; 1121 1122 /* timer detonated, wait for it to explode */ 1123 while (rpc->crpc_timeout != 0) { 1124 spin_unlock(&rpc->crpc_lock); 1125 1126 schedule(); 1127 1128 spin_lock(&rpc->crpc_lock); 1129 } 1130} 1131 1132void 1133srpc_client_rpc_done (srpc_client_rpc_t *rpc, int status) 1134{ 1135 swi_workitem_t *wi = &rpc->crpc_wi; 1136 1137 LASSERT(status != 0 || wi->swi_state == SWI_STATE_DONE); 1138 1139 spin_lock(&rpc->crpc_lock); 1140 1141 rpc->crpc_closed = 1; 1142 if (rpc->crpc_status == 0) 1143 rpc->crpc_status = status; 1144 1145 srpc_del_client_rpc_timer(rpc); 1146 1147 CDEBUG_LIMIT ((status == 0) ? D_NET : D_NETERROR, 1148 "Client RPC done: service %d, peer %s, status %s:%d:%d\n", 1149 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), 1150 swi_state2str(wi->swi_state), rpc->crpc_aborted, status); 1151 1152 /* 1153 * No one can schedule me now since: 1154 * - RPC timer has been defused. 1155 * - all LNet events have been fired. 1156 * - crpc_closed has been set, preventing srpc_abort_rpc from 1157 * scheduling me. 1158 * Cancel pending schedules and prevent future schedule attempts: 1159 */ 1160 LASSERT (!srpc_event_pending(rpc)); 1161 swi_exit_workitem(wi); 1162 1163 spin_unlock(&rpc->crpc_lock); 1164 1165 (*rpc->crpc_done)(rpc); 1166 return; 1167} 1168 1169/* sends an outgoing RPC */ 1170int 1171srpc_send_rpc (swi_workitem_t *wi) 1172{ 1173 int rc = 0; 1174 srpc_client_rpc_t *rpc; 1175 srpc_msg_t *reply; 1176 int do_bulk; 1177 1178 LASSERT(wi != NULL); 1179 1180 rpc = wi->swi_workitem.wi_data; 1181 1182 LASSERT (rpc != NULL); 1183 LASSERT (wi == &rpc->crpc_wi); 1184 1185 reply = &rpc->crpc_replymsg; 1186 do_bulk = rpc->crpc_bulk.bk_niov > 0; 1187 1188 spin_lock(&rpc->crpc_lock); 1189 1190 if (rpc->crpc_aborted) { 1191 spin_unlock(&rpc->crpc_lock); 1192 goto abort; 1193 } 1194 1195 spin_unlock(&rpc->crpc_lock); 1196 1197 switch (wi->swi_state) { 1198 default: 1199 LBUG (); 1200 case SWI_STATE_NEWBORN: 1201 LASSERT (!srpc_event_pending(rpc)); 1202 1203 rc = srpc_prepare_reply(rpc); 1204 if (rc != 0) { 1205 srpc_client_rpc_done(rpc, rc); 1206 return 1; 1207 } 1208 1209 rc = srpc_prepare_bulk(rpc); 1210 if (rc != 0) break; 1211 1212 wi->swi_state = SWI_STATE_REQUEST_SUBMITTED; 1213 rc = srpc_send_request(rpc); 1214 break; 1215 1216 case SWI_STATE_REQUEST_SUBMITTED: 1217 /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any 1218 * order; however, they're processed in a strict order: 1219 * rqt, rpy, and bulk. */ 1220 if (!rpc->crpc_reqstev.ev_fired) break; 1221 1222 rc = rpc->crpc_reqstev.ev_status; 1223 if (rc != 0) break; 1224 1225 wi->swi_state = SWI_STATE_REQUEST_SENT; 1226 /* perhaps more events, fall thru */ 1227 case SWI_STATE_REQUEST_SENT: { 1228 srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service); 1229 1230 if (!rpc->crpc_replyev.ev_fired) break; 1231 1232 rc = rpc->crpc_replyev.ev_status; 1233 if (rc != 0) break; 1234 1235 srpc_unpack_msg_hdr(reply); 1236 if (reply->msg_type != type || 1237 (reply->msg_magic != SRPC_MSG_MAGIC && 1238 reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) { 1239 CWARN ("Bad message from %s: type %u (%d expected)," 1240 " magic %u (%d expected).\n", 1241 libcfs_id2str(rpc->crpc_dest), 1242 reply->msg_type, type, 1243 reply->msg_magic, SRPC_MSG_MAGIC); 1244 rc = -EBADMSG; 1245 break; 1246 } 1247 1248 if (do_bulk && reply->msg_body.reply.status != 0) { 1249 CWARN ("Remote error %d at %s, unlink bulk buffer in " 1250 "case peer didn't initiate bulk transfer\n", 1251 reply->msg_body.reply.status, 1252 libcfs_id2str(rpc->crpc_dest)); 1253 LNetMDUnlink(rpc->crpc_bulk.bk_mdh); 1254 } 1255 1256 wi->swi_state = SWI_STATE_REPLY_RECEIVED; 1257 } 1258 case SWI_STATE_REPLY_RECEIVED: 1259 if (do_bulk && !rpc->crpc_bulkev.ev_fired) break; 1260 1261 rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0; 1262 1263 /* Bulk buffer was unlinked due to remote error. Clear error 1264 * since reply buffer still contains valid data. 1265 * NB rpc->crpc_done shouldn't look into bulk data in case of 1266 * remote error. */ 1267 if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK && 1268 rpc->crpc_status == 0 && reply->msg_body.reply.status != 0) 1269 rc = 0; 1270 1271 wi->swi_state = SWI_STATE_DONE; 1272 srpc_client_rpc_done(rpc, rc); 1273 return 1; 1274 } 1275 1276 if (rc != 0) { 1277 spin_lock(&rpc->crpc_lock); 1278 srpc_abort_rpc(rpc, rc); 1279 spin_unlock(&rpc->crpc_lock); 1280 } 1281 1282abort: 1283 if (rpc->crpc_aborted) { 1284 LNetMDUnlink(rpc->crpc_reqstmdh); 1285 LNetMDUnlink(rpc->crpc_replymdh); 1286 LNetMDUnlink(rpc->crpc_bulk.bk_mdh); 1287 1288 if (!srpc_event_pending(rpc)) { 1289 srpc_client_rpc_done(rpc, -EINTR); 1290 return 1; 1291 } 1292 } 1293 return 0; 1294} 1295 1296srpc_client_rpc_t * 1297srpc_create_client_rpc (lnet_process_id_t peer, int service, 1298 int nbulkiov, int bulklen, 1299 void (*rpc_done)(srpc_client_rpc_t *), 1300 void (*rpc_fini)(srpc_client_rpc_t *), void *priv) 1301{ 1302 srpc_client_rpc_t *rpc; 1303 1304 LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t, 1305 crpc_bulk.bk_iovs[nbulkiov])); 1306 if (rpc == NULL) 1307 return NULL; 1308 1309 srpc_init_client_rpc(rpc, peer, service, nbulkiov, 1310 bulklen, rpc_done, rpc_fini, priv); 1311 return rpc; 1312} 1313 1314/* called with rpc->crpc_lock held */ 1315void 1316srpc_abort_rpc (srpc_client_rpc_t *rpc, int why) 1317{ 1318 LASSERT (why != 0); 1319 1320 if (rpc->crpc_aborted || /* already aborted */ 1321 rpc->crpc_closed) /* callback imminent */ 1322 return; 1323 1324 CDEBUG (D_NET, 1325 "Aborting RPC: service %d, peer %s, state %s, why %d\n", 1326 rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), 1327 swi_state2str(rpc->crpc_wi.swi_state), why); 1328 1329 rpc->crpc_aborted = 1; 1330 rpc->crpc_status = why; 1331 swi_schedule_workitem(&rpc->crpc_wi); 1332 return; 1333} 1334 1335/* called with rpc->crpc_lock held */ 1336void 1337srpc_post_rpc (srpc_client_rpc_t *rpc) 1338{ 1339 LASSERT (!rpc->crpc_aborted); 1340 LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); 1341 1342 CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n", 1343 libcfs_id2str(rpc->crpc_dest), rpc->crpc_service, 1344 rpc->crpc_timeout); 1345 1346 srpc_add_client_rpc_timer(rpc); 1347 swi_schedule_workitem(&rpc->crpc_wi); 1348 return; 1349} 1350 1351 1352int 1353srpc_send_reply(struct srpc_server_rpc *rpc) 1354{ 1355 srpc_event_t *ev = &rpc->srpc_ev; 1356 struct srpc_msg *msg = &rpc->srpc_replymsg; 1357 struct srpc_buffer *buffer = rpc->srpc_reqstbuf; 1358 struct srpc_service_cd *scd = rpc->srpc_scd; 1359 struct srpc_service *sv = scd->scd_svc; 1360 __u64 rpyid; 1361 int rc; 1362 1363 LASSERT(buffer != NULL); 1364 rpyid = buffer->buf_msg.msg_body.reqst.rpyid; 1365 1366 spin_lock(&scd->scd_lock); 1367 1368 if (!sv->sv_shuttingdown && !srpc_serv_is_framework(sv)) { 1369 /* Repost buffer before replying since test client 1370 * might send me another RPC once it gets the reply */ 1371 if (srpc_service_post_buffer(scd, buffer) != 0) 1372 CWARN("Failed to repost %s buffer\n", sv->sv_name); 1373 rpc->srpc_reqstbuf = NULL; 1374 } 1375 1376 spin_unlock(&scd->scd_lock); 1377 1378 ev->ev_fired = 0; 1379 ev->ev_data = rpc; 1380 ev->ev_type = SRPC_REPLY_SENT; 1381 1382 msg->msg_magic = SRPC_MSG_MAGIC; 1383 msg->msg_version = SRPC_MSG_VERSION; 1384 msg->msg_type = srpc_service2reply(sv->sv_id); 1385 1386 rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg, 1387 sizeof(*msg), LNET_MD_OP_PUT, 1388 rpc->srpc_peer, rpc->srpc_self, 1389 &rpc->srpc_replymdh, ev); 1390 if (rc != 0) 1391 ev->ev_fired = 1; /* no more event expected */ 1392 return rc; 1393} 1394 1395/* when in kernel always called with LNET_LOCK() held, and in thread context */ 1396void 1397srpc_lnet_ev_handler(lnet_event_t *ev) 1398{ 1399 struct srpc_service_cd *scd; 1400 srpc_event_t *rpcev = ev->md.user_ptr; 1401 srpc_client_rpc_t *crpc; 1402 srpc_server_rpc_t *srpc; 1403 srpc_buffer_t *buffer; 1404 srpc_service_t *sv; 1405 srpc_msg_t *msg; 1406 srpc_msg_type_t type; 1407 1408 LASSERT (!in_interrupt()); 1409 1410 if (ev->status != 0) { 1411 spin_lock(&srpc_data.rpc_glock); 1412 srpc_data.rpc_counters.errors++; 1413 spin_unlock(&srpc_data.rpc_glock); 1414 } 1415 1416 rpcev->ev_lnet = ev->type; 1417 1418 switch (rpcev->ev_type) { 1419 default: 1420 CERROR("Unknown event: status %d, type %d, lnet %d\n", 1421 rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet); 1422 LBUG (); 1423 case SRPC_REQUEST_SENT: 1424 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { 1425 spin_lock(&srpc_data.rpc_glock); 1426 srpc_data.rpc_counters.rpcs_sent++; 1427 spin_unlock(&srpc_data.rpc_glock); 1428 } 1429 case SRPC_REPLY_RCVD: 1430 case SRPC_BULK_REQ_RCVD: 1431 crpc = rpcev->ev_data; 1432 1433 if (rpcev != &crpc->crpc_reqstev && 1434 rpcev != &crpc->crpc_replyev && 1435 rpcev != &crpc->crpc_bulkev) { 1436 CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n", 1437 rpcev, crpc, &crpc->crpc_reqstev, 1438 &crpc->crpc_replyev, &crpc->crpc_bulkev); 1439 CERROR("Bad event: status %d, type %d, lnet %d\n", 1440 rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet); 1441 LBUG (); 1442 } 1443 1444 spin_lock(&crpc->crpc_lock); 1445 1446 LASSERT(rpcev->ev_fired == 0); 1447 rpcev->ev_fired = 1; 1448 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 1449 -EINTR : ev->status; 1450 swi_schedule_workitem(&crpc->crpc_wi); 1451 1452 spin_unlock(&crpc->crpc_lock); 1453 break; 1454 1455 case SRPC_REQUEST_RCVD: 1456 scd = rpcev->ev_data; 1457 sv = scd->scd_svc; 1458 1459 LASSERT(rpcev == &scd->scd_ev); 1460 1461 spin_lock(&scd->scd_lock); 1462 1463 LASSERT (ev->unlinked); 1464 LASSERT (ev->type == LNET_EVENT_PUT || 1465 ev->type == LNET_EVENT_UNLINK); 1466 LASSERT (ev->type != LNET_EVENT_UNLINK || 1467 sv->sv_shuttingdown); 1468 1469 buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg); 1470 buffer->buf_peer = ev->initiator; 1471 buffer->buf_self = ev->target.nid; 1472 1473 LASSERT(scd->scd_buf_nposted > 0); 1474 scd->scd_buf_nposted--; 1475 1476 if (sv->sv_shuttingdown) { 1477 /* Leave buffer on scd->scd_buf_nposted since 1478 * srpc_finish_service needs to traverse it. */ 1479 spin_unlock(&scd->scd_lock); 1480 break; 1481 } 1482 1483 if (scd->scd_buf_err_stamp != 0 && 1484 scd->scd_buf_err_stamp < cfs_time_current_sec()) { 1485 /* re-enable adding buffer */ 1486 scd->scd_buf_err_stamp = 0; 1487 scd->scd_buf_err = 0; 1488 } 1489 1490 if (scd->scd_buf_err == 0 && /* adding buffer is enabled */ 1491 scd->scd_buf_adjust == 0 && 1492 scd->scd_buf_nposted < scd->scd_buf_low) { 1493 scd->scd_buf_adjust = MAX(scd->scd_buf_total / 2, 1494 SFW_TEST_WI_MIN); 1495 swi_schedule_workitem(&scd->scd_buf_wi); 1496 } 1497 1498 list_del(&buffer->buf_list); /* from scd->scd_buf_posted */ 1499 msg = &buffer->buf_msg; 1500 type = srpc_service2request(sv->sv_id); 1501 1502 if (ev->status != 0 || ev->mlength != sizeof(*msg) || 1503 (msg->msg_type != type && 1504 msg->msg_type != __swab32(type)) || 1505 (msg->msg_magic != SRPC_MSG_MAGIC && 1506 msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) { 1507 CERROR ("Dropping RPC (%s) from %s: " 1508 "status %d mlength %d type %u magic %u.\n", 1509 sv->sv_name, libcfs_id2str(ev->initiator), 1510 ev->status, ev->mlength, 1511 msg->msg_type, msg->msg_magic); 1512 1513 /* NB can't call srpc_service_recycle_buffer here since 1514 * it may call LNetM[DE]Attach. The invalid magic tells 1515 * srpc_handle_rpc to drop this RPC */ 1516 msg->msg_magic = 0; 1517 } 1518 1519 if (!list_empty(&scd->scd_rpc_free)) { 1520 srpc = list_entry(scd->scd_rpc_free.next, 1521 struct srpc_server_rpc, 1522 srpc_list); 1523 list_del(&srpc->srpc_list); 1524 1525 srpc_init_server_rpc(srpc, scd, buffer); 1526 list_add_tail(&srpc->srpc_list, 1527 &scd->scd_rpc_active); 1528 swi_schedule_workitem(&srpc->srpc_wi); 1529 } else { 1530 list_add_tail(&buffer->buf_list, 1531 &scd->scd_buf_blocked); 1532 } 1533 1534 spin_unlock(&scd->scd_lock); 1535 1536 spin_lock(&srpc_data.rpc_glock); 1537 srpc_data.rpc_counters.rpcs_rcvd++; 1538 spin_unlock(&srpc_data.rpc_glock); 1539 break; 1540 1541 case SRPC_BULK_GET_RPLD: 1542 LASSERT (ev->type == LNET_EVENT_SEND || 1543 ev->type == LNET_EVENT_REPLY || 1544 ev->type == LNET_EVENT_UNLINK); 1545 1546 if (!ev->unlinked) 1547 break; /* wait for final event */ 1548 1549 case SRPC_BULK_PUT_SENT: 1550 if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { 1551 spin_lock(&srpc_data.rpc_glock); 1552 1553 if (rpcev->ev_type == SRPC_BULK_GET_RPLD) 1554 srpc_data.rpc_counters.bulk_get += ev->mlength; 1555 else 1556 srpc_data.rpc_counters.bulk_put += ev->mlength; 1557 1558 spin_unlock(&srpc_data.rpc_glock); 1559 } 1560 case SRPC_REPLY_SENT: 1561 srpc = rpcev->ev_data; 1562 scd = srpc->srpc_scd; 1563 1564 LASSERT(rpcev == &srpc->srpc_ev); 1565 1566 spin_lock(&scd->scd_lock); 1567 1568 rpcev->ev_fired = 1; 1569 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? 1570 -EINTR : ev->status; 1571 swi_schedule_workitem(&srpc->srpc_wi); 1572 1573 spin_unlock(&scd->scd_lock); 1574 break; 1575 } 1576} 1577 1578 1579int 1580srpc_startup (void) 1581{ 1582 int rc; 1583 1584 memset(&srpc_data, 0, sizeof(struct smoketest_rpc)); 1585 spin_lock_init(&srpc_data.rpc_glock); 1586 1587 /* 1 second pause to avoid timestamp reuse */ 1588 cfs_pause(cfs_time_seconds(1)); 1589 srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48; 1590 1591 srpc_data.rpc_state = SRPC_STATE_NONE; 1592 1593 rc = LNetNIInit(LUSTRE_SRV_LNET_PID); 1594 if (rc < 0) { 1595 CERROR ("LNetNIInit() has failed: %d\n", rc); 1596 return rc; 1597 } 1598 1599 srpc_data.rpc_state = SRPC_STATE_NI_INIT; 1600 1601 LNetInvalidateHandle(&srpc_data.rpc_lnet_eq); 1602 rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq); 1603 if (rc != 0) { 1604 CERROR("LNetEQAlloc() has failed: %d\n", rc); 1605 goto bail; 1606 } 1607 1608 rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL); 1609 LASSERT(rc == 0); 1610 rc = LNetSetLazyPortal(SRPC_REQUEST_PORTAL); 1611 LASSERT(rc == 0); 1612 1613 srpc_data.rpc_state = SRPC_STATE_EQ_INIT; 1614 1615 rc = stt_startup(); 1616 1617bail: 1618 if (rc != 0) 1619 srpc_shutdown(); 1620 else 1621 srpc_data.rpc_state = SRPC_STATE_RUNNING; 1622 1623 return rc; 1624} 1625 1626void 1627srpc_shutdown (void) 1628{ 1629 int i; 1630 int rc; 1631 int state; 1632 1633 state = srpc_data.rpc_state; 1634 srpc_data.rpc_state = SRPC_STATE_STOPPING; 1635 1636 switch (state) { 1637 default: 1638 LBUG (); 1639 case SRPC_STATE_RUNNING: 1640 spin_lock(&srpc_data.rpc_glock); 1641 1642 for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) { 1643 srpc_service_t *sv = srpc_data.rpc_services[i]; 1644 1645 LASSERTF (sv == NULL, 1646 "service not empty: id %d, name %s\n", 1647 i, sv->sv_name); 1648 } 1649 1650 spin_unlock(&srpc_data.rpc_glock); 1651 1652 stt_shutdown(); 1653 1654 case SRPC_STATE_EQ_INIT: 1655 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL); 1656 rc = LNetClearLazyPortal(SRPC_REQUEST_PORTAL); 1657 LASSERT (rc == 0); 1658 rc = LNetEQFree(srpc_data.rpc_lnet_eq); 1659 LASSERT (rc == 0); /* the EQ should have no user by now */ 1660 1661 case SRPC_STATE_NI_INIT: 1662 LNetNIFini(); 1663 } 1664 1665 return; 1666} 1667