xprt.c revision 49e9a89086b3cae784a4868ca852863e4f4ea3fe
1/* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, it installs a timer that 16 * is run after the packet's timeout has expired. 17 * - When a packet arrives, the data_ready handler walks the list of 18 * pending requests for that transport. If a matching XID is found, the 19 * caller is woken up, and the timer removed. 20 * - When no reply arrives within the timeout interval, the timer is 21 * fired by the kernel and runs xprt_timer(). It either adjusts the 22 * timeout values (minor timeout) or wakes up the caller with a status 23 * of -ETIMEDOUT. 24 * - When the caller receives a notification from RPC that a reply arrived, 25 * it should release the RPC slot, and process the reply. 26 * If the call timed out, it may choose to retry the operation by 27 * adjusting the initial timeout value, and simply calling rpc_call 28 * again. 29 * 30 * Support for async RPC is done through a set of RPC-specific scheduling 31 * primitives that `transparently' work for processes as well as async 32 * tasks that rely on callbacks. 33 * 34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 35 * 36 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 37 */ 38 39#include <linux/module.h> 40 41#include <linux/types.h> 42#include <linux/interrupt.h> 43#include <linux/workqueue.h> 44#include <linux/random.h> 45 46#include <linux/sunrpc/clnt.h> 47 48/* 49 * Local variables 50 */ 51 52#ifdef RPC_DEBUG 53# undef RPC_DEBUG_DATA 54# define RPCDBG_FACILITY RPCDBG_XPRT 55#endif 56 57/* 58 * Local functions 59 */ 60static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 61static inline void do_xprt_reserve(struct rpc_task *); 62static void xprt_connect_status(struct rpc_task *task); 63static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 64 65static int xprt_clear_backlog(struct rpc_xprt *xprt); 66 67/** 68 * xprt_reserve_xprt - serialize write access to transports 69 * @task: task that is requesting access to the transport 70 * 71 * This prevents mixing the payload of separate requests, and prevents 72 * transport connects from colliding with writes. No congestion control 73 * is provided. 74 */ 75int xprt_reserve_xprt(struct rpc_task *task) 76{ 77 struct rpc_xprt *xprt = task->tk_xprt; 78 struct rpc_rqst *req = task->tk_rqstp; 79 80 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 81 if (task == xprt->snd_task) 82 return 1; 83 if (task == NULL) 84 return 0; 85 goto out_sleep; 86 } 87 xprt->snd_task = task; 88 if (req) { 89 req->rq_bytes_sent = 0; 90 req->rq_ntrans++; 91 } 92 return 1; 93 94out_sleep: 95 dprintk("RPC: %4d failed to lock transport %p\n", 96 task->tk_pid, xprt); 97 task->tk_timeout = 0; 98 task->tk_status = -EAGAIN; 99 if (req && req->rq_ntrans) 100 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 101 else 102 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 103 return 0; 104} 105 106/* 107 * xprt_reserve_xprt_cong - serialize write access to transports 108 * @task: task that is requesting access to the transport 109 * 110 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 111 * integrated into the decision of whether a request is allowed to be 112 * woken up and given access to the transport. 113 */ 114int xprt_reserve_xprt_cong(struct rpc_task *task) 115{ 116 struct rpc_xprt *xprt = task->tk_xprt; 117 struct rpc_rqst *req = task->tk_rqstp; 118 119 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 120 if (task == xprt->snd_task) 121 return 1; 122 goto out_sleep; 123 } 124 if (__xprt_get_cong(xprt, task)) { 125 xprt->snd_task = task; 126 if (req) { 127 req->rq_bytes_sent = 0; 128 req->rq_ntrans++; 129 } 130 return 1; 131 } 132 smp_mb__before_clear_bit(); 133 clear_bit(XPRT_LOCKED, &xprt->state); 134 smp_mb__after_clear_bit(); 135out_sleep: 136 dprintk("RPC: %4d failed to lock transport %p\n", task->tk_pid, xprt); 137 task->tk_timeout = 0; 138 task->tk_status = -EAGAIN; 139 if (req && req->rq_ntrans) 140 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 141 else 142 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 143 return 0; 144} 145 146static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 147{ 148 int retval; 149 150 spin_lock_bh(&xprt->transport_lock); 151 retval = xprt->ops->reserve_xprt(task); 152 spin_unlock_bh(&xprt->transport_lock); 153 return retval; 154} 155 156static void __xprt_lock_write_next(struct rpc_xprt *xprt) 157{ 158 struct rpc_task *task; 159 struct rpc_rqst *req; 160 161 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 162 return; 163 164 task = rpc_wake_up_next(&xprt->resend); 165 if (!task) { 166 task = rpc_wake_up_next(&xprt->sending); 167 if (!task) 168 goto out_unlock; 169 } 170 171 req = task->tk_rqstp; 172 xprt->snd_task = task; 173 if (req) { 174 req->rq_bytes_sent = 0; 175 req->rq_ntrans++; 176 } 177 return; 178 179out_unlock: 180 smp_mb__before_clear_bit(); 181 clear_bit(XPRT_LOCKED, &xprt->state); 182 smp_mb__after_clear_bit(); 183} 184 185static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 186{ 187 struct rpc_task *task; 188 189 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 190 return; 191 if (RPCXPRT_CONGESTED(xprt)) 192 goto out_unlock; 193 task = rpc_wake_up_next(&xprt->resend); 194 if (!task) { 195 task = rpc_wake_up_next(&xprt->sending); 196 if (!task) 197 goto out_unlock; 198 } 199 if (__xprt_get_cong(xprt, task)) { 200 struct rpc_rqst *req = task->tk_rqstp; 201 xprt->snd_task = task; 202 if (req) { 203 req->rq_bytes_sent = 0; 204 req->rq_ntrans++; 205 } 206 return; 207 } 208out_unlock: 209 smp_mb__before_clear_bit(); 210 clear_bit(XPRT_LOCKED, &xprt->state); 211 smp_mb__after_clear_bit(); 212} 213 214/** 215 * xprt_release_xprt - allow other requests to use a transport 216 * @xprt: transport with other tasks potentially waiting 217 * @task: task that is releasing access to the transport 218 * 219 * Note that "task" can be NULL. No congestion control is provided. 220 */ 221void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 222{ 223 if (xprt->snd_task == task) { 224 xprt->snd_task = NULL; 225 smp_mb__before_clear_bit(); 226 clear_bit(XPRT_LOCKED, &xprt->state); 227 smp_mb__after_clear_bit(); 228 __xprt_lock_write_next(xprt); 229 } 230} 231 232/** 233 * xprt_release_xprt_cong - allow other requests to use a transport 234 * @xprt: transport with other tasks potentially waiting 235 * @task: task that is releasing access to the transport 236 * 237 * Note that "task" can be NULL. Another task is awoken to use the 238 * transport if the transport's congestion window allows it. 239 */ 240void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 241{ 242 if (xprt->snd_task == task) { 243 xprt->snd_task = NULL; 244 smp_mb__before_clear_bit(); 245 clear_bit(XPRT_LOCKED, &xprt->state); 246 smp_mb__after_clear_bit(); 247 __xprt_lock_write_next_cong(xprt); 248 } 249} 250 251static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 252{ 253 spin_lock_bh(&xprt->transport_lock); 254 xprt->ops->release_xprt(xprt, task); 255 spin_unlock_bh(&xprt->transport_lock); 256} 257 258/* 259 * Van Jacobson congestion avoidance. Check if the congestion window 260 * overflowed. Put the task to sleep if this is the case. 261 */ 262static int 263__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 264{ 265 struct rpc_rqst *req = task->tk_rqstp; 266 267 if (req->rq_cong) 268 return 1; 269 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", 270 task->tk_pid, xprt->cong, xprt->cwnd); 271 if (RPCXPRT_CONGESTED(xprt)) 272 return 0; 273 req->rq_cong = 1; 274 xprt->cong += RPC_CWNDSCALE; 275 return 1; 276} 277 278/* 279 * Adjust the congestion window, and wake up the next task 280 * that has been sleeping due to congestion 281 */ 282static void 283__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 284{ 285 if (!req->rq_cong) 286 return; 287 req->rq_cong = 0; 288 xprt->cong -= RPC_CWNDSCALE; 289 __xprt_lock_write_next_cong(xprt); 290} 291 292/* 293 * Adjust RPC congestion window 294 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 295 */ 296static void 297xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) 298{ 299 unsigned long cwnd; 300 301 cwnd = xprt->cwnd; 302 if (result >= 0 && cwnd <= xprt->cong) { 303 /* The (cwnd >> 1) term makes sure 304 * the result gets rounded properly. */ 305 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 306 if (cwnd > RPC_MAXCWND(xprt)) 307 cwnd = RPC_MAXCWND(xprt); 308 __xprt_lock_write_next_cong(xprt); 309 } else if (result == -ETIMEDOUT) { 310 cwnd >>= 1; 311 if (cwnd < RPC_CWNDSCALE) 312 cwnd = RPC_CWNDSCALE; 313 } 314 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 315 xprt->cong, xprt->cwnd, cwnd); 316 xprt->cwnd = cwnd; 317} 318 319/** 320 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 321 * @xprt: transport with waiting tasks 322 * @status: result code to plant in each task before waking it 323 * 324 */ 325void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 326{ 327 if (status < 0) 328 rpc_wake_up_status(&xprt->pending, status); 329 else 330 rpc_wake_up(&xprt->pending); 331} 332 333/** 334 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 335 * @task: task to be put to sleep 336 * 337 */ 338void xprt_wait_for_buffer_space(struct rpc_task *task) 339{ 340 struct rpc_rqst *req = task->tk_rqstp; 341 struct rpc_xprt *xprt = req->rq_xprt; 342 343 task->tk_timeout = req->rq_timeout; 344 rpc_sleep_on(&xprt->pending, task, NULL, NULL); 345} 346 347/** 348 * xprt_write_space - wake the task waiting for transport output buffer space 349 * @xprt: transport with waiting tasks 350 * 351 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 352 */ 353void xprt_write_space(struct rpc_xprt *xprt) 354{ 355 if (unlikely(xprt->shutdown)) 356 return; 357 358 spin_lock_bh(&xprt->transport_lock); 359 if (xprt->snd_task) { 360 dprintk("RPC: write space: waking waiting task on xprt %p\n", 361 xprt); 362 rpc_wake_up_task(xprt->snd_task); 363 } 364 spin_unlock_bh(&xprt->transport_lock); 365} 366 367/** 368 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 369 * @task: task whose timeout is to be set 370 * 371 * Set a request's retransmit timeout based on the transport's 372 * default timeout parameters. Used by transports that don't adjust 373 * the retransmit timeout based on round-trip time estimation. 374 */ 375void xprt_set_retrans_timeout_def(struct rpc_task *task) 376{ 377 task->tk_timeout = task->tk_rqstp->rq_timeout; 378} 379 380/* 381 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 382 * @task: task whose timeout is to be set 383 * 384 * Set a request's retransmit timeout using the RTT estimator. 385 */ 386void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 387{ 388 int timer = task->tk_msg.rpc_proc->p_timer; 389 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 390 struct rpc_rqst *req = task->tk_rqstp; 391 unsigned long max_timeout = req->rq_xprt->timeout.to_maxval; 392 393 task->tk_timeout = rpc_calc_rto(rtt, timer); 394 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 395 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 396 task->tk_timeout = max_timeout; 397} 398 399static void xprt_reset_majortimeo(struct rpc_rqst *req) 400{ 401 struct rpc_timeout *to = &req->rq_xprt->timeout; 402 403 req->rq_majortimeo = req->rq_timeout; 404 if (to->to_exponential) 405 req->rq_majortimeo <<= to->to_retries; 406 else 407 req->rq_majortimeo += to->to_increment * to->to_retries; 408 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 409 req->rq_majortimeo = to->to_maxval; 410 req->rq_majortimeo += jiffies; 411} 412 413/** 414 * xprt_adjust_timeout - adjust timeout values for next retransmit 415 * @req: RPC request containing parameters to use for the adjustment 416 * 417 */ 418int xprt_adjust_timeout(struct rpc_rqst *req) 419{ 420 struct rpc_xprt *xprt = req->rq_xprt; 421 struct rpc_timeout *to = &xprt->timeout; 422 int status = 0; 423 424 if (time_before(jiffies, req->rq_majortimeo)) { 425 if (to->to_exponential) 426 req->rq_timeout <<= 1; 427 else 428 req->rq_timeout += to->to_increment; 429 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 430 req->rq_timeout = to->to_maxval; 431 req->rq_retries++; 432 pprintk("RPC: %lu retrans\n", jiffies); 433 } else { 434 req->rq_timeout = to->to_initval; 435 req->rq_retries = 0; 436 xprt_reset_majortimeo(req); 437 /* Reset the RTT counters == "slow start" */ 438 spin_lock_bh(&xprt->transport_lock); 439 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 440 spin_unlock_bh(&xprt->transport_lock); 441 pprintk("RPC: %lu timeout\n", jiffies); 442 status = -ETIMEDOUT; 443 } 444 445 if (req->rq_timeout == 0) { 446 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 447 req->rq_timeout = 5 * HZ; 448 } 449 return status; 450} 451 452static void xprt_autoclose(void *args) 453{ 454 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 455 456 xprt_disconnect(xprt); 457 xprt->ops->close(xprt); 458 xprt_release_write(xprt, NULL); 459} 460 461/** 462 * xprt_disconnect - mark a transport as disconnected 463 * @xprt: transport to flag for disconnect 464 * 465 */ 466void xprt_disconnect(struct rpc_xprt *xprt) 467{ 468 dprintk("RPC: disconnected transport %p\n", xprt); 469 spin_lock_bh(&xprt->transport_lock); 470 xprt_clear_connected(xprt); 471 xprt_wake_pending_tasks(xprt, -ENOTCONN); 472 spin_unlock_bh(&xprt->transport_lock); 473} 474 475static void 476xprt_init_autodisconnect(unsigned long data) 477{ 478 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 479 480 spin_lock(&xprt->transport_lock); 481 if (!list_empty(&xprt->recv) || xprt->shutdown) 482 goto out_abort; 483 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 484 goto out_abort; 485 spin_unlock(&xprt->transport_lock); 486 if (xprt_connecting(xprt)) 487 xprt_release_write(xprt, NULL); 488 else 489 schedule_work(&xprt->task_cleanup); 490 return; 491out_abort: 492 spin_unlock(&xprt->transport_lock); 493} 494 495/** 496 * xprt_connect - schedule a transport connect operation 497 * @task: RPC task that is requesting the connect 498 * 499 */ 500void xprt_connect(struct rpc_task *task) 501{ 502 struct rpc_xprt *xprt = task->tk_xprt; 503 504 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid, 505 xprt, (xprt_connected(xprt) ? "is" : "is not")); 506 507 if (xprt->shutdown) { 508 task->tk_status = -EIO; 509 return; 510 } 511 if (!xprt->addr.sin_port) { 512 task->tk_status = -EIO; 513 return; 514 } 515 if (!xprt_lock_write(xprt, task)) 516 return; 517 if (xprt_connected(xprt)) 518 xprt_release_write(xprt, task); 519 else { 520 if (task->tk_rqstp) 521 task->tk_rqstp->rq_bytes_sent = 0; 522 523 task->tk_timeout = RPC_CONNECT_TIMEOUT; 524 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); 525 xprt->ops->connect(task); 526 } 527 return; 528} 529 530static void xprt_connect_status(struct rpc_task *task) 531{ 532 struct rpc_xprt *xprt = task->tk_xprt; 533 534 if (task->tk_status >= 0) { 535 dprintk("RPC: %4d xprt_connect_status: connection established\n", 536 task->tk_pid); 537 return; 538 } 539 540 switch (task->tk_status) { 541 case -ECONNREFUSED: 542 case -ECONNRESET: 543 dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n", 544 task->tk_pid, task->tk_client->cl_server); 545 break; 546 case -ENOTCONN: 547 dprintk("RPC: %4d xprt_connect_status: connection broken\n", 548 task->tk_pid); 549 break; 550 case -ETIMEDOUT: 551 dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n", 552 task->tk_pid); 553 break; 554 default: 555 dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n", 556 task->tk_pid, -task->tk_status, task->tk_client->cl_server); 557 xprt_release_write(xprt, task); 558 task->tk_status = -EIO; 559 return; 560 } 561 562 /* if soft mounted, just cause this RPC to fail */ 563 if (RPC_IS_SOFT(task)) { 564 xprt_release_write(xprt, task); 565 task->tk_status = -EIO; 566 } 567} 568 569/** 570 * xprt_lookup_rqst - find an RPC request corresponding to an XID 571 * @xprt: transport on which the original request was transmitted 572 * @xid: RPC XID of incoming reply 573 * 574 */ 575struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) 576{ 577 struct list_head *pos; 578 struct rpc_rqst *req = NULL; 579 580 list_for_each(pos, &xprt->recv) { 581 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); 582 if (entry->rq_xid == xid) { 583 req = entry; 584 break; 585 } 586 } 587 return req; 588} 589 590/** 591 * xprt_complete_rqst - called when reply processing is complete 592 * @xprt: controlling transport 593 * @req: RPC request that just completed 594 * @copied: actual number of bytes received from the transport 595 * 596 */ 597void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) 598{ 599 struct rpc_task *task = req->rq_task; 600 struct rpc_clnt *clnt = task->tk_client; 601 602 /* Adjust congestion window */ 603 if (!xprt->nocong) { 604 unsigned timer = task->tk_msg.rpc_proc->p_timer; 605 xprt_adjust_cwnd(xprt, copied); 606 __xprt_put_cong(xprt, req); 607 if (timer) { 608 if (req->rq_ntrans == 1) 609 rpc_update_rtt(clnt->cl_rtt, timer, 610 (long)jiffies - req->rq_xtime); 611 rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); 612 } 613 } 614 615#ifdef RPC_PROFILE 616 /* Profile only reads for now */ 617 if (copied > 1024) { 618 static unsigned long nextstat; 619 static unsigned long pkt_rtt, pkt_len, pkt_cnt; 620 621 pkt_cnt++; 622 pkt_len += req->rq_slen + copied; 623 pkt_rtt += jiffies - req->rq_xtime; 624 if (time_before(nextstat, jiffies)) { 625 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd); 626 printk("RPC: %ld %ld %ld %ld stat\n", 627 jiffies, pkt_cnt, pkt_len, pkt_rtt); 628 pkt_rtt = pkt_len = pkt_cnt = 0; 629 nextstat = jiffies + 5 * HZ; 630 } 631 } 632#endif 633 634 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); 635 list_del_init(&req->rq_list); 636 req->rq_received = req->rq_private_buf.len = copied; 637 638 /* ... and wake up the process. */ 639 rpc_wake_up_task(task); 640 return; 641} 642 643/* 644 * RPC receive timeout handler. 645 */ 646static void 647xprt_timer(struct rpc_task *task) 648{ 649 struct rpc_rqst *req = task->tk_rqstp; 650 struct rpc_xprt *xprt = req->rq_xprt; 651 652 spin_lock(&xprt->transport_lock); 653 if (req->rq_received) 654 goto out; 655 656 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); 657 __xprt_put_cong(xprt, req); 658 659 dprintk("RPC: %4d xprt_timer (%s request)\n", 660 task->tk_pid, req ? "pending" : "backlogged"); 661 662 task->tk_status = -ETIMEDOUT; 663out: 664 task->tk_timeout = 0; 665 rpc_wake_up_task(task); 666 spin_unlock(&xprt->transport_lock); 667} 668 669/** 670 * xprt_prepare_transmit - reserve the transport before sending a request 671 * @task: RPC task about to send a request 672 * 673 */ 674int xprt_prepare_transmit(struct rpc_task *task) 675{ 676 struct rpc_rqst *req = task->tk_rqstp; 677 struct rpc_xprt *xprt = req->rq_xprt; 678 int err = 0; 679 680 dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid); 681 682 if (xprt->shutdown) 683 return -EIO; 684 685 spin_lock_bh(&xprt->transport_lock); 686 if (req->rq_received && !req->rq_bytes_sent) { 687 err = req->rq_received; 688 goto out_unlock; 689 } 690 if (!xprt->ops->reserve_xprt(task)) { 691 err = -EAGAIN; 692 goto out_unlock; 693 } 694 695 if (!xprt_connected(xprt)) { 696 err = -ENOTCONN; 697 goto out_unlock; 698 } 699out_unlock: 700 spin_unlock_bh(&xprt->transport_lock); 701 return err; 702} 703 704/** 705 * xprt_transmit - send an RPC request on a transport 706 * @task: controlling RPC task 707 * 708 * We have to copy the iovec because sendmsg fiddles with its contents. 709 */ 710void xprt_transmit(struct rpc_task *task) 711{ 712 struct rpc_rqst *req = task->tk_rqstp; 713 struct rpc_xprt *xprt = req->rq_xprt; 714 int status; 715 716 dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 717 718 smp_rmb(); 719 if (!req->rq_received) { 720 if (list_empty(&req->rq_list)) { 721 spin_lock_bh(&xprt->transport_lock); 722 /* Update the softirq receive buffer */ 723 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 724 sizeof(req->rq_private_buf)); 725 /* Add request to the receive list */ 726 list_add_tail(&req->rq_list, &xprt->recv); 727 spin_unlock_bh(&xprt->transport_lock); 728 xprt_reset_majortimeo(req); 729 /* Turn off autodisconnect */ 730 del_singleshot_timer_sync(&xprt->timer); 731 } 732 } else if (!req->rq_bytes_sent) 733 return; 734 735 status = xprt->ops->send_request(task); 736 if (status == 0) { 737 dprintk("RPC: %4d xmit complete\n", task->tk_pid); 738 spin_lock_bh(&xprt->transport_lock); 739 xprt->ops->set_retrans_timeout(task); 740 /* Don't race with disconnect */ 741 if (!xprt_connected(xprt)) 742 task->tk_status = -ENOTCONN; 743 else if (!req->rq_received) 744 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); 745 xprt->ops->release_xprt(xprt, task); 746 spin_unlock_bh(&xprt->transport_lock); 747 return; 748 } 749 750 /* Note: at this point, task->tk_sleeping has not yet been set, 751 * hence there is no danger of the waking up task being put on 752 * schedq, and being picked up by a parallel run of rpciod(). 753 */ 754 task->tk_status = status; 755 756 switch (status) { 757 case -ECONNREFUSED: 758 task->tk_timeout = RPC_REESTABLISH_TIMEOUT; 759 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 760 case -EAGAIN: 761 case -ENOTCONN: 762 return; 763 default: 764 break; 765 } 766 xprt_release_write(xprt, task); 767 return; 768} 769 770static inline void do_xprt_reserve(struct rpc_task *task) 771{ 772 struct rpc_xprt *xprt = task->tk_xprt; 773 774 task->tk_status = 0; 775 if (task->tk_rqstp) 776 return; 777 if (!list_empty(&xprt->free)) { 778 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 779 list_del_init(&req->rq_list); 780 task->tk_rqstp = req; 781 xprt_request_init(task, xprt); 782 return; 783 } 784 dprintk("RPC: waiting for request slot\n"); 785 task->tk_status = -EAGAIN; 786 task->tk_timeout = 0; 787 rpc_sleep_on(&xprt->backlog, task, NULL, NULL); 788} 789 790/** 791 * xprt_reserve - allocate an RPC request slot 792 * @task: RPC task requesting a slot allocation 793 * 794 * If no more slots are available, place the task on the transport's 795 * backlog queue. 796 */ 797void xprt_reserve(struct rpc_task *task) 798{ 799 struct rpc_xprt *xprt = task->tk_xprt; 800 801 task->tk_status = -EIO; 802 if (!xprt->shutdown) { 803 spin_lock(&xprt->reserve_lock); 804 do_xprt_reserve(task); 805 spin_unlock(&xprt->reserve_lock); 806 } 807} 808 809static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) 810{ 811 return xprt->xid++; 812} 813 814static inline void xprt_init_xid(struct rpc_xprt *xprt) 815{ 816 get_random_bytes(&xprt->xid, sizeof(xprt->xid)); 817} 818 819static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 820{ 821 struct rpc_rqst *req = task->tk_rqstp; 822 823 req->rq_timeout = xprt->timeout.to_initval; 824 req->rq_task = task; 825 req->rq_xprt = xprt; 826 req->rq_xid = xprt_alloc_xid(xprt); 827 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, 828 req, ntohl(req->rq_xid)); 829} 830 831/** 832 * xprt_release - release an RPC request slot 833 * @task: task which is finished with the slot 834 * 835 */ 836void xprt_release(struct rpc_task *task) 837{ 838 struct rpc_xprt *xprt = task->tk_xprt; 839 struct rpc_rqst *req; 840 841 if (!(req = task->tk_rqstp)) 842 return; 843 spin_lock_bh(&xprt->transport_lock); 844 xprt->ops->release_xprt(xprt, task); 845 __xprt_put_cong(xprt, req); 846 if (!list_empty(&req->rq_list)) 847 list_del(&req->rq_list); 848 xprt->last_used = jiffies; 849 if (list_empty(&xprt->recv) && !xprt->shutdown) 850 mod_timer(&xprt->timer, 851 xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT); 852 spin_unlock_bh(&xprt->transport_lock); 853 task->tk_rqstp = NULL; 854 memset(req, 0, sizeof(*req)); /* mark unused */ 855 856 dprintk("RPC: %4d release request %p\n", task->tk_pid, req); 857 858 spin_lock(&xprt->reserve_lock); 859 list_add(&req->rq_list, &xprt->free); 860 xprt_clear_backlog(xprt); 861 spin_unlock(&xprt->reserve_lock); 862} 863 864/** 865 * xprt_set_timeout - set constant RPC timeout 866 * @to: RPC timeout parameters to set up 867 * @retr: number of retries 868 * @incr: amount of increase after each retry 869 * 870 */ 871void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) 872{ 873 to->to_initval = 874 to->to_increment = incr; 875 to->to_maxval = to->to_initval + (incr * retr); 876 to->to_retries = retr; 877 to->to_exponential = 0; 878} 879 880static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) 881{ 882 int result; 883 struct rpc_xprt *xprt; 884 struct rpc_rqst *req; 885 886 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) 887 return ERR_PTR(-ENOMEM); 888 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ 889 890 xprt->addr = *ap; 891 892 switch (proto) { 893 case IPPROTO_UDP: 894 result = xs_setup_udp(xprt, to); 895 break; 896 case IPPROTO_TCP: 897 result = xs_setup_tcp(xprt, to); 898 break; 899 default: 900 printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n", 901 proto); 902 result = -EIO; 903 break; 904 } 905 if (result) { 906 kfree(xprt); 907 return ERR_PTR(result); 908 } 909 910 spin_lock_init(&xprt->transport_lock); 911 spin_lock_init(&xprt->reserve_lock); 912 init_waitqueue_head(&xprt->cong_wait); 913 914 INIT_LIST_HEAD(&xprt->free); 915 INIT_LIST_HEAD(&xprt->recv); 916 INIT_WORK(&xprt->task_cleanup, xprt_autoclose, xprt); 917 init_timer(&xprt->timer); 918 xprt->timer.function = xprt_init_autodisconnect; 919 xprt->timer.data = (unsigned long) xprt; 920 xprt->last_used = jiffies; 921 922 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 923 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 924 rpc_init_wait_queue(&xprt->resend, "xprt_resend"); 925 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 926 927 /* initialize free list */ 928 for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) 929 list_add(&req->rq_list, &xprt->free); 930 931 xprt_init_xid(xprt); 932 933 dprintk("RPC: created transport %p with %u slots\n", xprt, 934 xprt->max_reqs); 935 936 return xprt; 937} 938 939/** 940 * xprt_create_proto - create an RPC client transport 941 * @proto: requested transport protocol 942 * @sap: remote peer's address 943 * @to: timeout parameters for new transport 944 * 945 */ 946struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) 947{ 948 struct rpc_xprt *xprt; 949 950 xprt = xprt_setup(proto, sap, to); 951 if (IS_ERR(xprt)) 952 dprintk("RPC: xprt_create_proto failed\n"); 953 else 954 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt); 955 return xprt; 956} 957 958static void xprt_shutdown(struct rpc_xprt *xprt) 959{ 960 xprt->shutdown = 1; 961 rpc_wake_up(&xprt->sending); 962 rpc_wake_up(&xprt->resend); 963 xprt_wake_pending_tasks(xprt, -EIO); 964 rpc_wake_up(&xprt->backlog); 965 wake_up(&xprt->cong_wait); 966 del_timer_sync(&xprt->timer); 967} 968 969static int xprt_clear_backlog(struct rpc_xprt *xprt) { 970 rpc_wake_up_next(&xprt->backlog); 971 wake_up(&xprt->cong_wait); 972 return 1; 973} 974 975/** 976 * xprt_destroy - destroy an RPC transport, killing off all requests. 977 * @xprt: transport to destroy 978 * 979 */ 980int xprt_destroy(struct rpc_xprt *xprt) 981{ 982 dprintk("RPC: destroying transport %p\n", xprt); 983 xprt_shutdown(xprt); 984 xprt->ops->destroy(xprt); 985 kfree(xprt); 986 987 return 0; 988} 989