xprt.c revision fe3aca290f17ae4978bd73d02aa4029f1c9c024c
1/* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, it installs a timer that 16 * is run after the packet's timeout has expired. 17 * - When a packet arrives, the data_ready handler walks the list of 18 * pending requests for that transport. If a matching XID is found, the 19 * caller is woken up, and the timer removed. 20 * - When no reply arrives within the timeout interval, the timer is 21 * fired by the kernel and runs xprt_timer(). It either adjusts the 22 * timeout values (minor timeout) or wakes up the caller with a status 23 * of -ETIMEDOUT. 24 * - When the caller receives a notification from RPC that a reply arrived, 25 * it should release the RPC slot, and process the reply. 26 * If the call timed out, it may choose to retry the operation by 27 * adjusting the initial timeout value, and simply calling rpc_call 28 * again. 29 * 30 * Support for async RPC is done through a set of RPC-specific scheduling 31 * primitives that `transparently' work for processes as well as async 32 * tasks that rely on callbacks. 33 * 34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 35 * 36 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 37 */ 38 39#include <linux/module.h> 40 41#include <linux/types.h> 42#include <linux/interrupt.h> 43#include <linux/workqueue.h> 44#include <linux/random.h> 45 46#include <linux/sunrpc/clnt.h> 47 48/* 49 * Local variables 50 */ 51 52#ifdef RPC_DEBUG 53# undef RPC_DEBUG_DATA 54# define RPCDBG_FACILITY RPCDBG_XPRT 55#endif 56 57/* 58 * Local functions 59 */ 60static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 61static inline void do_xprt_reserve(struct rpc_task *); 62static void xprt_connect_status(struct rpc_task *task); 63static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 64 65static int xprt_clear_backlog(struct rpc_xprt *xprt); 66 67/* 68 * Serialize write access to transports, in order to prevent different 69 * requests from interfering with each other. 70 * Also prevents transport connects from colliding with writes. 71 */ 72static int 73__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 74{ 75 struct rpc_rqst *req = task->tk_rqstp; 76 77 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 78 if (task == xprt->snd_task) 79 return 1; 80 goto out_sleep; 81 } 82 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 83 xprt->snd_task = task; 84 if (req) { 85 req->rq_bytes_sent = 0; 86 req->rq_ntrans++; 87 } 88 return 1; 89 } 90 smp_mb__before_clear_bit(); 91 clear_bit(XPRT_LOCKED, &xprt->state); 92 smp_mb__after_clear_bit(); 93out_sleep: 94 dprintk("RPC: %4d failed to lock transport %p\n", task->tk_pid, xprt); 95 task->tk_timeout = 0; 96 task->tk_status = -EAGAIN; 97 if (req && req->rq_ntrans) 98 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 99 else 100 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 101 return 0; 102} 103 104static inline int 105xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 106{ 107 int retval; 108 109 spin_lock_bh(&xprt->transport_lock); 110 retval = __xprt_lock_write(xprt, task); 111 spin_unlock_bh(&xprt->transport_lock); 112 return retval; 113} 114 115 116static void 117__xprt_lock_write_next(struct rpc_xprt *xprt) 118{ 119 struct rpc_task *task; 120 121 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 122 return; 123 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) 124 goto out_unlock; 125 task = rpc_wake_up_next(&xprt->resend); 126 if (!task) { 127 task = rpc_wake_up_next(&xprt->sending); 128 if (!task) 129 goto out_unlock; 130 } 131 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 132 struct rpc_rqst *req = task->tk_rqstp; 133 xprt->snd_task = task; 134 if (req) { 135 req->rq_bytes_sent = 0; 136 req->rq_ntrans++; 137 } 138 return; 139 } 140out_unlock: 141 smp_mb__before_clear_bit(); 142 clear_bit(XPRT_LOCKED, &xprt->state); 143 smp_mb__after_clear_bit(); 144} 145 146/* 147 * Releases the transport for use by other requests. 148 */ 149static void 150__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 151{ 152 if (xprt->snd_task == task) { 153 xprt->snd_task = NULL; 154 smp_mb__before_clear_bit(); 155 clear_bit(XPRT_LOCKED, &xprt->state); 156 smp_mb__after_clear_bit(); 157 __xprt_lock_write_next(xprt); 158 } 159} 160 161static inline void 162xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 163{ 164 spin_lock_bh(&xprt->transport_lock); 165 __xprt_release_write(xprt, task); 166 spin_unlock_bh(&xprt->transport_lock); 167} 168 169/* 170 * Van Jacobson congestion avoidance. Check if the congestion window 171 * overflowed. Put the task to sleep if this is the case. 172 */ 173static int 174__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 175{ 176 struct rpc_rqst *req = task->tk_rqstp; 177 178 if (req->rq_cong) 179 return 1; 180 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", 181 task->tk_pid, xprt->cong, xprt->cwnd); 182 if (RPCXPRT_CONGESTED(xprt)) 183 return 0; 184 req->rq_cong = 1; 185 xprt->cong += RPC_CWNDSCALE; 186 return 1; 187} 188 189/* 190 * Adjust the congestion window, and wake up the next task 191 * that has been sleeping due to congestion 192 */ 193static void 194__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 195{ 196 if (!req->rq_cong) 197 return; 198 req->rq_cong = 0; 199 xprt->cong -= RPC_CWNDSCALE; 200 __xprt_lock_write_next(xprt); 201} 202 203/* 204 * Adjust RPC congestion window 205 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 206 */ 207static void 208xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) 209{ 210 unsigned long cwnd; 211 212 cwnd = xprt->cwnd; 213 if (result >= 0 && cwnd <= xprt->cong) { 214 /* The (cwnd >> 1) term makes sure 215 * the result gets rounded properly. */ 216 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 217 if (cwnd > RPC_MAXCWND(xprt)) 218 cwnd = RPC_MAXCWND(xprt); 219 __xprt_lock_write_next(xprt); 220 } else if (result == -ETIMEDOUT) { 221 cwnd >>= 1; 222 if (cwnd < RPC_CWNDSCALE) 223 cwnd = RPC_CWNDSCALE; 224 } 225 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 226 xprt->cong, xprt->cwnd, cwnd); 227 xprt->cwnd = cwnd; 228} 229 230/** 231 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 232 * @xprt: transport with waiting tasks 233 * @status: result code to plant in each task before waking it 234 * 235 */ 236void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 237{ 238 if (status < 0) 239 rpc_wake_up_status(&xprt->pending, status); 240 else 241 rpc_wake_up(&xprt->pending); 242} 243 244/** 245 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 246 * @task: task to be put to sleep 247 * 248 */ 249void xprt_wait_for_buffer_space(struct rpc_task *task) 250{ 251 struct rpc_rqst *req = task->tk_rqstp; 252 struct rpc_xprt *xprt = req->rq_xprt; 253 254 task->tk_timeout = req->rq_timeout; 255 rpc_sleep_on(&xprt->pending, task, NULL, NULL); 256} 257 258/** 259 * xprt_write_space - wake the task waiting for transport output buffer space 260 * @xprt: transport with waiting tasks 261 * 262 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 263 */ 264void xprt_write_space(struct rpc_xprt *xprt) 265{ 266 if (unlikely(xprt->shutdown)) 267 return; 268 269 spin_lock_bh(&xprt->transport_lock); 270 if (xprt->snd_task) { 271 dprintk("RPC: write space: waking waiting task on xprt %p\n", 272 xprt); 273 rpc_wake_up_task(xprt->snd_task); 274 } 275 spin_unlock_bh(&xprt->transport_lock); 276} 277 278/** 279 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 280 * @task: task whose timeout is to be set 281 * 282 * Set a request's retransmit timeout based on the transport's 283 * default timeout parameters. Used by transports that don't adjust 284 * the retransmit timeout based on round-trip time estimation. 285 */ 286void xprt_set_retrans_timeout_def(struct rpc_task *task) 287{ 288 task->tk_timeout = task->tk_rqstp->rq_timeout; 289} 290 291/* 292 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 293 * @task: task whose timeout is to be set 294 * 295 * Set a request's retransmit timeout using the RTT estimator. 296 */ 297void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 298{ 299 int timer = task->tk_msg.rpc_proc->p_timer; 300 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 301 struct rpc_rqst *req = task->tk_rqstp; 302 unsigned long max_timeout = req->rq_xprt->timeout.to_maxval; 303 304 task->tk_timeout = rpc_calc_rto(rtt, timer); 305 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 306 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 307 task->tk_timeout = max_timeout; 308} 309 310static void xprt_reset_majortimeo(struct rpc_rqst *req) 311{ 312 struct rpc_timeout *to = &req->rq_xprt->timeout; 313 314 req->rq_majortimeo = req->rq_timeout; 315 if (to->to_exponential) 316 req->rq_majortimeo <<= to->to_retries; 317 else 318 req->rq_majortimeo += to->to_increment * to->to_retries; 319 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 320 req->rq_majortimeo = to->to_maxval; 321 req->rq_majortimeo += jiffies; 322} 323 324/** 325 * xprt_adjust_timeout - adjust timeout values for next retransmit 326 * @req: RPC request containing parameters to use for the adjustment 327 * 328 */ 329int xprt_adjust_timeout(struct rpc_rqst *req) 330{ 331 struct rpc_xprt *xprt = req->rq_xprt; 332 struct rpc_timeout *to = &xprt->timeout; 333 int status = 0; 334 335 if (time_before(jiffies, req->rq_majortimeo)) { 336 if (to->to_exponential) 337 req->rq_timeout <<= 1; 338 else 339 req->rq_timeout += to->to_increment; 340 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 341 req->rq_timeout = to->to_maxval; 342 req->rq_retries++; 343 pprintk("RPC: %lu retrans\n", jiffies); 344 } else { 345 req->rq_timeout = to->to_initval; 346 req->rq_retries = 0; 347 xprt_reset_majortimeo(req); 348 /* Reset the RTT counters == "slow start" */ 349 spin_lock_bh(&xprt->transport_lock); 350 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 351 spin_unlock_bh(&xprt->transport_lock); 352 pprintk("RPC: %lu timeout\n", jiffies); 353 status = -ETIMEDOUT; 354 } 355 356 if (req->rq_timeout == 0) { 357 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 358 req->rq_timeout = 5 * HZ; 359 } 360 return status; 361} 362 363static void xprt_autoclose(void *args) 364{ 365 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 366 367 xprt_disconnect(xprt); 368 xprt->ops->close(xprt); 369 xprt_release_write(xprt, NULL); 370} 371 372/** 373 * xprt_disconnect - mark a transport as disconnected 374 * @xprt: transport to flag for disconnect 375 * 376 */ 377void xprt_disconnect(struct rpc_xprt *xprt) 378{ 379 dprintk("RPC: disconnected transport %p\n", xprt); 380 spin_lock_bh(&xprt->transport_lock); 381 xprt_clear_connected(xprt); 382 xprt_wake_pending_tasks(xprt, -ENOTCONN); 383 spin_unlock_bh(&xprt->transport_lock); 384} 385 386static void 387xprt_init_autodisconnect(unsigned long data) 388{ 389 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 390 391 spin_lock(&xprt->transport_lock); 392 if (!list_empty(&xprt->recv) || xprt->shutdown) 393 goto out_abort; 394 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 395 goto out_abort; 396 spin_unlock(&xprt->transport_lock); 397 if (xprt_connecting(xprt)) 398 xprt_release_write(xprt, NULL); 399 else 400 schedule_work(&xprt->task_cleanup); 401 return; 402out_abort: 403 spin_unlock(&xprt->transport_lock); 404} 405 406/** 407 * xprt_connect - schedule a transport connect operation 408 * @task: RPC task that is requesting the connect 409 * 410 */ 411void xprt_connect(struct rpc_task *task) 412{ 413 struct rpc_xprt *xprt = task->tk_xprt; 414 415 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid, 416 xprt, (xprt_connected(xprt) ? "is" : "is not")); 417 418 if (xprt->shutdown) { 419 task->tk_status = -EIO; 420 return; 421 } 422 if (!xprt->addr.sin_port) { 423 task->tk_status = -EIO; 424 return; 425 } 426 if (!xprt_lock_write(xprt, task)) 427 return; 428 if (xprt_connected(xprt)) 429 xprt_release_write(xprt, task); 430 else { 431 if (task->tk_rqstp) 432 task->tk_rqstp->rq_bytes_sent = 0; 433 434 task->tk_timeout = RPC_CONNECT_TIMEOUT; 435 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); 436 xprt->ops->connect(task); 437 } 438 return; 439} 440 441static void xprt_connect_status(struct rpc_task *task) 442{ 443 struct rpc_xprt *xprt = task->tk_xprt; 444 445 if (task->tk_status >= 0) { 446 dprintk("RPC: %4d xprt_connect_status: connection established\n", 447 task->tk_pid); 448 return; 449 } 450 451 switch (task->tk_status) { 452 case -ECONNREFUSED: 453 case -ECONNRESET: 454 dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n", 455 task->tk_pid, task->tk_client->cl_server); 456 break; 457 case -ENOTCONN: 458 dprintk("RPC: %4d xprt_connect_status: connection broken\n", 459 task->tk_pid); 460 break; 461 case -ETIMEDOUT: 462 dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n", 463 task->tk_pid); 464 break; 465 default: 466 dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n", 467 task->tk_pid, -task->tk_status, task->tk_client->cl_server); 468 xprt_release_write(xprt, task); 469 task->tk_status = -EIO; 470 return; 471 } 472 473 /* if soft mounted, just cause this RPC to fail */ 474 if (RPC_IS_SOFT(task)) { 475 xprt_release_write(xprt, task); 476 task->tk_status = -EIO; 477 } 478} 479 480/** 481 * xprt_lookup_rqst - find an RPC request corresponding to an XID 482 * @xprt: transport on which the original request was transmitted 483 * @xid: RPC XID of incoming reply 484 * 485 */ 486struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) 487{ 488 struct list_head *pos; 489 struct rpc_rqst *req = NULL; 490 491 list_for_each(pos, &xprt->recv) { 492 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); 493 if (entry->rq_xid == xid) { 494 req = entry; 495 break; 496 } 497 } 498 return req; 499} 500 501/** 502 * xprt_complete_rqst - called when reply processing is complete 503 * @xprt: controlling transport 504 * @req: RPC request that just completed 505 * @copied: actual number of bytes received from the transport 506 * 507 */ 508void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) 509{ 510 struct rpc_task *task = req->rq_task; 511 struct rpc_clnt *clnt = task->tk_client; 512 513 /* Adjust congestion window */ 514 if (!xprt->nocong) { 515 unsigned timer = task->tk_msg.rpc_proc->p_timer; 516 xprt_adjust_cwnd(xprt, copied); 517 __xprt_put_cong(xprt, req); 518 if (timer) { 519 if (req->rq_ntrans == 1) 520 rpc_update_rtt(clnt->cl_rtt, timer, 521 (long)jiffies - req->rq_xtime); 522 rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); 523 } 524 } 525 526#ifdef RPC_PROFILE 527 /* Profile only reads for now */ 528 if (copied > 1024) { 529 static unsigned long nextstat; 530 static unsigned long pkt_rtt, pkt_len, pkt_cnt; 531 532 pkt_cnt++; 533 pkt_len += req->rq_slen + copied; 534 pkt_rtt += jiffies - req->rq_xtime; 535 if (time_before(nextstat, jiffies)) { 536 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd); 537 printk("RPC: %ld %ld %ld %ld stat\n", 538 jiffies, pkt_cnt, pkt_len, pkt_rtt); 539 pkt_rtt = pkt_len = pkt_cnt = 0; 540 nextstat = jiffies + 5 * HZ; 541 } 542 } 543#endif 544 545 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); 546 list_del_init(&req->rq_list); 547 req->rq_received = req->rq_private_buf.len = copied; 548 549 /* ... and wake up the process. */ 550 rpc_wake_up_task(task); 551 return; 552} 553 554/* 555 * RPC receive timeout handler. 556 */ 557static void 558xprt_timer(struct rpc_task *task) 559{ 560 struct rpc_rqst *req = task->tk_rqstp; 561 struct rpc_xprt *xprt = req->rq_xprt; 562 563 spin_lock(&xprt->transport_lock); 564 if (req->rq_received) 565 goto out; 566 567 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); 568 __xprt_put_cong(xprt, req); 569 570 dprintk("RPC: %4d xprt_timer (%s request)\n", 571 task->tk_pid, req ? "pending" : "backlogged"); 572 573 task->tk_status = -ETIMEDOUT; 574out: 575 task->tk_timeout = 0; 576 rpc_wake_up_task(task); 577 spin_unlock(&xprt->transport_lock); 578} 579 580/** 581 * xprt_prepare_transmit - reserve the transport before sending a request 582 * @task: RPC task about to send a request 583 * 584 */ 585int xprt_prepare_transmit(struct rpc_task *task) 586{ 587 struct rpc_rqst *req = task->tk_rqstp; 588 struct rpc_xprt *xprt = req->rq_xprt; 589 int err = 0; 590 591 dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid); 592 593 if (xprt->shutdown) 594 return -EIO; 595 596 spin_lock_bh(&xprt->transport_lock); 597 if (req->rq_received && !req->rq_bytes_sent) { 598 err = req->rq_received; 599 goto out_unlock; 600 } 601 if (!__xprt_lock_write(xprt, task)) { 602 err = -EAGAIN; 603 goto out_unlock; 604 } 605 606 if (!xprt_connected(xprt)) { 607 err = -ENOTCONN; 608 goto out_unlock; 609 } 610out_unlock: 611 spin_unlock_bh(&xprt->transport_lock); 612 return err; 613} 614 615/** 616 * xprt_transmit - send an RPC request on a transport 617 * @task: controlling RPC task 618 * 619 * We have to copy the iovec because sendmsg fiddles with its contents. 620 */ 621void xprt_transmit(struct rpc_task *task) 622{ 623 struct rpc_rqst *req = task->tk_rqstp; 624 struct rpc_xprt *xprt = req->rq_xprt; 625 int status; 626 627 dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 628 629 smp_rmb(); 630 if (!req->rq_received) { 631 if (list_empty(&req->rq_list)) { 632 spin_lock_bh(&xprt->transport_lock); 633 /* Update the softirq receive buffer */ 634 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 635 sizeof(req->rq_private_buf)); 636 /* Add request to the receive list */ 637 list_add_tail(&req->rq_list, &xprt->recv); 638 spin_unlock_bh(&xprt->transport_lock); 639 xprt_reset_majortimeo(req); 640 /* Turn off autodisconnect */ 641 del_singleshot_timer_sync(&xprt->timer); 642 } 643 } else if (!req->rq_bytes_sent) 644 return; 645 646 status = xprt->ops->send_request(task); 647 if (status == 0) { 648 dprintk("RPC: %4d xmit complete\n", task->tk_pid); 649 spin_lock_bh(&xprt->transport_lock); 650 xprt->ops->set_retrans_timeout(task); 651 /* Don't race with disconnect */ 652 if (!xprt_connected(xprt)) 653 task->tk_status = -ENOTCONN; 654 else if (!req->rq_received) 655 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); 656 __xprt_release_write(xprt, task); 657 spin_unlock_bh(&xprt->transport_lock); 658 return; 659 } 660 661 /* Note: at this point, task->tk_sleeping has not yet been set, 662 * hence there is no danger of the waking up task being put on 663 * schedq, and being picked up by a parallel run of rpciod(). 664 */ 665 task->tk_status = status; 666 667 switch (status) { 668 case -ECONNREFUSED: 669 task->tk_timeout = RPC_REESTABLISH_TIMEOUT; 670 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 671 case -EAGAIN: 672 case -ENOTCONN: 673 return; 674 default: 675 break; 676 } 677 xprt_release_write(xprt, task); 678 return; 679} 680 681static inline void do_xprt_reserve(struct rpc_task *task) 682{ 683 struct rpc_xprt *xprt = task->tk_xprt; 684 685 task->tk_status = 0; 686 if (task->tk_rqstp) 687 return; 688 if (!list_empty(&xprt->free)) { 689 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 690 list_del_init(&req->rq_list); 691 task->tk_rqstp = req; 692 xprt_request_init(task, xprt); 693 return; 694 } 695 dprintk("RPC: waiting for request slot\n"); 696 task->tk_status = -EAGAIN; 697 task->tk_timeout = 0; 698 rpc_sleep_on(&xprt->backlog, task, NULL, NULL); 699} 700 701/** 702 * xprt_reserve - allocate an RPC request slot 703 * @task: RPC task requesting a slot allocation 704 * 705 * If no more slots are available, place the task on the transport's 706 * backlog queue. 707 */ 708void xprt_reserve(struct rpc_task *task) 709{ 710 struct rpc_xprt *xprt = task->tk_xprt; 711 712 task->tk_status = -EIO; 713 if (!xprt->shutdown) { 714 spin_lock(&xprt->reserve_lock); 715 do_xprt_reserve(task); 716 spin_unlock(&xprt->reserve_lock); 717 } 718} 719 720static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) 721{ 722 return xprt->xid++; 723} 724 725static inline void xprt_init_xid(struct rpc_xprt *xprt) 726{ 727 get_random_bytes(&xprt->xid, sizeof(xprt->xid)); 728} 729 730static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 731{ 732 struct rpc_rqst *req = task->tk_rqstp; 733 734 req->rq_timeout = xprt->timeout.to_initval; 735 req->rq_task = task; 736 req->rq_xprt = xprt; 737 req->rq_xid = xprt_alloc_xid(xprt); 738 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, 739 req, ntohl(req->rq_xid)); 740} 741 742/** 743 * xprt_release - release an RPC request slot 744 * @task: task which is finished with the slot 745 * 746 */ 747void xprt_release(struct rpc_task *task) 748{ 749 struct rpc_xprt *xprt = task->tk_xprt; 750 struct rpc_rqst *req; 751 752 if (!(req = task->tk_rqstp)) 753 return; 754 spin_lock_bh(&xprt->transport_lock); 755 __xprt_release_write(xprt, task); 756 __xprt_put_cong(xprt, req); 757 if (!list_empty(&req->rq_list)) 758 list_del(&req->rq_list); 759 xprt->last_used = jiffies; 760 if (list_empty(&xprt->recv) && !xprt->shutdown) 761 mod_timer(&xprt->timer, 762 xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT); 763 spin_unlock_bh(&xprt->transport_lock); 764 task->tk_rqstp = NULL; 765 memset(req, 0, sizeof(*req)); /* mark unused */ 766 767 dprintk("RPC: %4d release request %p\n", task->tk_pid, req); 768 769 spin_lock(&xprt->reserve_lock); 770 list_add(&req->rq_list, &xprt->free); 771 xprt_clear_backlog(xprt); 772 spin_unlock(&xprt->reserve_lock); 773} 774 775/** 776 * xprt_set_timeout - set constant RPC timeout 777 * @to: RPC timeout parameters to set up 778 * @retr: number of retries 779 * @incr: amount of increase after each retry 780 * 781 */ 782void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) 783{ 784 to->to_initval = 785 to->to_increment = incr; 786 to->to_maxval = to->to_initval + (incr * retr); 787 to->to_retries = retr; 788 to->to_exponential = 0; 789} 790 791static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) 792{ 793 int result; 794 struct rpc_xprt *xprt; 795 struct rpc_rqst *req; 796 797 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) 798 return ERR_PTR(-ENOMEM); 799 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ 800 801 xprt->addr = *ap; 802 803 switch (proto) { 804 case IPPROTO_UDP: 805 result = xs_setup_udp(xprt, to); 806 break; 807 case IPPROTO_TCP: 808 result = xs_setup_tcp(xprt, to); 809 break; 810 default: 811 printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n", 812 proto); 813 result = -EIO; 814 break; 815 } 816 if (result) { 817 kfree(xprt); 818 return ERR_PTR(result); 819 } 820 821 spin_lock_init(&xprt->transport_lock); 822 spin_lock_init(&xprt->reserve_lock); 823 init_waitqueue_head(&xprt->cong_wait); 824 825 INIT_LIST_HEAD(&xprt->free); 826 INIT_LIST_HEAD(&xprt->recv); 827 INIT_WORK(&xprt->task_cleanup, xprt_autoclose, xprt); 828 init_timer(&xprt->timer); 829 xprt->timer.function = xprt_init_autodisconnect; 830 xprt->timer.data = (unsigned long) xprt; 831 xprt->last_used = jiffies; 832 833 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 834 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 835 rpc_init_wait_queue(&xprt->resend, "xprt_resend"); 836 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 837 838 /* initialize free list */ 839 for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) 840 list_add(&req->rq_list, &xprt->free); 841 842 xprt_init_xid(xprt); 843 844 dprintk("RPC: created transport %p with %u slots\n", xprt, 845 xprt->max_reqs); 846 847 return xprt; 848} 849 850/** 851 * xprt_create_proto - create an RPC client transport 852 * @proto: requested transport protocol 853 * @sap: remote peer's address 854 * @to: timeout parameters for new transport 855 * 856 */ 857struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) 858{ 859 struct rpc_xprt *xprt; 860 861 xprt = xprt_setup(proto, sap, to); 862 if (IS_ERR(xprt)) 863 dprintk("RPC: xprt_create_proto failed\n"); 864 else 865 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt); 866 return xprt; 867} 868 869static void xprt_shutdown(struct rpc_xprt *xprt) 870{ 871 xprt->shutdown = 1; 872 rpc_wake_up(&xprt->sending); 873 rpc_wake_up(&xprt->resend); 874 xprt_wake_pending_tasks(xprt, -EIO); 875 rpc_wake_up(&xprt->backlog); 876 wake_up(&xprt->cong_wait); 877 del_timer_sync(&xprt->timer); 878} 879 880static int xprt_clear_backlog(struct rpc_xprt *xprt) { 881 rpc_wake_up_next(&xprt->backlog); 882 wake_up(&xprt->cong_wait); 883 return 1; 884} 885 886/** 887 * xprt_destroy - destroy an RPC transport, killing off all requests. 888 * @xprt: transport to destroy 889 * 890 */ 891int xprt_destroy(struct rpc_xprt *xprt) 892{ 893 dprintk("RPC: destroying transport %p\n", xprt); 894 xprt_shutdown(xprt); 895 xprt->ops->destroy(xprt); 896 kfree(xprt); 897 898 return 0; 899} 900