xprt.c revision 43118c29dea2b23798bd42a147015cceee7fa885
1/* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, it installs a timer that 16 * is run after the packet's timeout has expired. 17 * - When a packet arrives, the data_ready handler walks the list of 18 * pending requests for that transport. If a matching XID is found, the 19 * caller is woken up, and the timer removed. 20 * - When no reply arrives within the timeout interval, the timer is 21 * fired by the kernel and runs xprt_timer(). It either adjusts the 22 * timeout values (minor timeout) or wakes up the caller with a status 23 * of -ETIMEDOUT. 24 * - When the caller receives a notification from RPC that a reply arrived, 25 * it should release the RPC slot, and process the reply. 26 * If the call timed out, it may choose to retry the operation by 27 * adjusting the initial timeout value, and simply calling rpc_call 28 * again. 29 * 30 * Support for async RPC is done through a set of RPC-specific scheduling 31 * primitives that `transparently' work for processes as well as async 32 * tasks that rely on callbacks. 33 * 34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 35 * 36 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 37 */ 38 39#include <linux/module.h> 40 41#include <linux/types.h> 42#include <linux/interrupt.h> 43#include <linux/workqueue.h> 44#include <linux/random.h> 45 46#include <linux/sunrpc/clnt.h> 47 48/* 49 * Local variables 50 */ 51 52#ifdef RPC_DEBUG 53# undef RPC_DEBUG_DATA 54# define RPCDBG_FACILITY RPCDBG_XPRT 55#endif 56 57/* 58 * Local functions 59 */ 60static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 61static inline void do_xprt_reserve(struct rpc_task *); 62static void xprt_connect_status(struct rpc_task *task); 63static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 64 65static int xprt_clear_backlog(struct rpc_xprt *xprt); 66 67/* 68 * Serialize write access to transports, in order to prevent different 69 * requests from interfering with each other. 70 * Also prevents transport connects from colliding with writes. 71 */ 72static int 73__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 74{ 75 struct rpc_rqst *req = task->tk_rqstp; 76 77 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 78 if (task == xprt->snd_task) 79 return 1; 80 goto out_sleep; 81 } 82 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 83 xprt->snd_task = task; 84 if (req) { 85 req->rq_bytes_sent = 0; 86 req->rq_ntrans++; 87 } 88 return 1; 89 } 90 smp_mb__before_clear_bit(); 91 clear_bit(XPRT_LOCKED, &xprt->state); 92 smp_mb__after_clear_bit(); 93out_sleep: 94 dprintk("RPC: %4d failed to lock transport %p\n", task->tk_pid, xprt); 95 task->tk_timeout = 0; 96 task->tk_status = -EAGAIN; 97 if (req && req->rq_ntrans) 98 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 99 else 100 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 101 return 0; 102} 103 104static inline int 105xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 106{ 107 int retval; 108 109 spin_lock_bh(&xprt->transport_lock); 110 retval = __xprt_lock_write(xprt, task); 111 spin_unlock_bh(&xprt->transport_lock); 112 return retval; 113} 114 115 116static void 117__xprt_lock_write_next(struct rpc_xprt *xprt) 118{ 119 struct rpc_task *task; 120 121 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 122 return; 123 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) 124 goto out_unlock; 125 task = rpc_wake_up_next(&xprt->resend); 126 if (!task) { 127 task = rpc_wake_up_next(&xprt->sending); 128 if (!task) 129 goto out_unlock; 130 } 131 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 132 struct rpc_rqst *req = task->tk_rqstp; 133 xprt->snd_task = task; 134 if (req) { 135 req->rq_bytes_sent = 0; 136 req->rq_ntrans++; 137 } 138 return; 139 } 140out_unlock: 141 smp_mb__before_clear_bit(); 142 clear_bit(XPRT_LOCKED, &xprt->state); 143 smp_mb__after_clear_bit(); 144} 145 146/* 147 * Releases the transport for use by other requests. 148 */ 149static void 150__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 151{ 152 if (xprt->snd_task == task) { 153 xprt->snd_task = NULL; 154 smp_mb__before_clear_bit(); 155 clear_bit(XPRT_LOCKED, &xprt->state); 156 smp_mb__after_clear_bit(); 157 __xprt_lock_write_next(xprt); 158 } 159} 160 161static inline void 162xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 163{ 164 spin_lock_bh(&xprt->transport_lock); 165 __xprt_release_write(xprt, task); 166 spin_unlock_bh(&xprt->transport_lock); 167} 168 169/* 170 * Van Jacobson congestion avoidance. Check if the congestion window 171 * overflowed. Put the task to sleep if this is the case. 172 */ 173static int 174__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 175{ 176 struct rpc_rqst *req = task->tk_rqstp; 177 178 if (req->rq_cong) 179 return 1; 180 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", 181 task->tk_pid, xprt->cong, xprt->cwnd); 182 if (RPCXPRT_CONGESTED(xprt)) 183 return 0; 184 req->rq_cong = 1; 185 xprt->cong += RPC_CWNDSCALE; 186 return 1; 187} 188 189/* 190 * Adjust the congestion window, and wake up the next task 191 * that has been sleeping due to congestion 192 */ 193static void 194__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 195{ 196 if (!req->rq_cong) 197 return; 198 req->rq_cong = 0; 199 xprt->cong -= RPC_CWNDSCALE; 200 __xprt_lock_write_next(xprt); 201} 202 203/* 204 * Adjust RPC congestion window 205 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 206 */ 207static void 208xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) 209{ 210 unsigned long cwnd; 211 212 cwnd = xprt->cwnd; 213 if (result >= 0 && cwnd <= xprt->cong) { 214 /* The (cwnd >> 1) term makes sure 215 * the result gets rounded properly. */ 216 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 217 if (cwnd > RPC_MAXCWND(xprt)) 218 cwnd = RPC_MAXCWND(xprt); 219 __xprt_lock_write_next(xprt); 220 } else if (result == -ETIMEDOUT) { 221 cwnd >>= 1; 222 if (cwnd < RPC_CWNDSCALE) 223 cwnd = RPC_CWNDSCALE; 224 } 225 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 226 xprt->cong, xprt->cwnd, cwnd); 227 xprt->cwnd = cwnd; 228} 229 230/** 231 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 232 * @xprt: transport with waiting tasks 233 * @status: result code to plant in each task before waking it 234 * 235 */ 236void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 237{ 238 if (status < 0) 239 rpc_wake_up_status(&xprt->pending, status); 240 else 241 rpc_wake_up(&xprt->pending); 242} 243 244/** 245 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 246 * @task: task to be put to sleep 247 * 248 */ 249void xprt_wait_for_buffer_space(struct rpc_task *task) 250{ 251 struct rpc_rqst *req = task->tk_rqstp; 252 struct rpc_xprt *xprt = req->rq_xprt; 253 254 task->tk_timeout = req->rq_timeout; 255 rpc_sleep_on(&xprt->pending, task, NULL, NULL); 256} 257 258/** 259 * xprt_write_space - wake the task waiting for transport output buffer space 260 * @xprt: transport with waiting tasks 261 * 262 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 263 */ 264void xprt_write_space(struct rpc_xprt *xprt) 265{ 266 if (unlikely(xprt->shutdown)) 267 return; 268 269 spin_lock_bh(&xprt->transport_lock); 270 if (xprt->snd_task) { 271 dprintk("RPC: write space: waking waiting task on xprt %p\n", 272 xprt); 273 rpc_wake_up_task(xprt->snd_task); 274 } 275 spin_unlock_bh(&xprt->transport_lock); 276} 277 278static void xprt_reset_majortimeo(struct rpc_rqst *req) 279{ 280 struct rpc_timeout *to = &req->rq_xprt->timeout; 281 282 req->rq_majortimeo = req->rq_timeout; 283 if (to->to_exponential) 284 req->rq_majortimeo <<= to->to_retries; 285 else 286 req->rq_majortimeo += to->to_increment * to->to_retries; 287 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 288 req->rq_majortimeo = to->to_maxval; 289 req->rq_majortimeo += jiffies; 290} 291 292/** 293 * xprt_adjust_timeout - adjust timeout values for next retransmit 294 * @req: RPC request containing parameters to use for the adjustment 295 * 296 */ 297int xprt_adjust_timeout(struct rpc_rqst *req) 298{ 299 struct rpc_xprt *xprt = req->rq_xprt; 300 struct rpc_timeout *to = &xprt->timeout; 301 int status = 0; 302 303 if (time_before(jiffies, req->rq_majortimeo)) { 304 if (to->to_exponential) 305 req->rq_timeout <<= 1; 306 else 307 req->rq_timeout += to->to_increment; 308 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 309 req->rq_timeout = to->to_maxval; 310 req->rq_retries++; 311 pprintk("RPC: %lu retrans\n", jiffies); 312 } else { 313 req->rq_timeout = to->to_initval; 314 req->rq_retries = 0; 315 xprt_reset_majortimeo(req); 316 /* Reset the RTT counters == "slow start" */ 317 spin_lock_bh(&xprt->transport_lock); 318 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 319 spin_unlock_bh(&xprt->transport_lock); 320 pprintk("RPC: %lu timeout\n", jiffies); 321 status = -ETIMEDOUT; 322 } 323 324 if (req->rq_timeout == 0) { 325 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 326 req->rq_timeout = 5 * HZ; 327 } 328 return status; 329} 330 331static void xprt_autoclose(void *args) 332{ 333 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 334 335 xprt_disconnect(xprt); 336 xprt->ops->close(xprt); 337 xprt_release_write(xprt, NULL); 338} 339 340/** 341 * xprt_disconnect - mark a transport as disconnected 342 * @xprt: transport to flag for disconnect 343 * 344 */ 345void xprt_disconnect(struct rpc_xprt *xprt) 346{ 347 dprintk("RPC: disconnected transport %p\n", xprt); 348 spin_lock_bh(&xprt->transport_lock); 349 xprt_clear_connected(xprt); 350 xprt_wake_pending_tasks(xprt, -ENOTCONN); 351 spin_unlock_bh(&xprt->transport_lock); 352} 353 354static void 355xprt_init_autodisconnect(unsigned long data) 356{ 357 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 358 359 spin_lock(&xprt->transport_lock); 360 if (!list_empty(&xprt->recv) || xprt->shutdown) 361 goto out_abort; 362 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 363 goto out_abort; 364 spin_unlock(&xprt->transport_lock); 365 if (xprt_connecting(xprt)) 366 xprt_release_write(xprt, NULL); 367 else 368 schedule_work(&xprt->task_cleanup); 369 return; 370out_abort: 371 spin_unlock(&xprt->transport_lock); 372} 373 374/** 375 * xprt_connect - schedule a transport connect operation 376 * @task: RPC task that is requesting the connect 377 * 378 */ 379void xprt_connect(struct rpc_task *task) 380{ 381 struct rpc_xprt *xprt = task->tk_xprt; 382 383 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid, 384 xprt, (xprt_connected(xprt) ? "is" : "is not")); 385 386 if (xprt->shutdown) { 387 task->tk_status = -EIO; 388 return; 389 } 390 if (!xprt->addr.sin_port) { 391 task->tk_status = -EIO; 392 return; 393 } 394 if (!xprt_lock_write(xprt, task)) 395 return; 396 if (xprt_connected(xprt)) 397 xprt_release_write(xprt, task); 398 else { 399 if (task->tk_rqstp) 400 task->tk_rqstp->rq_bytes_sent = 0; 401 402 task->tk_timeout = RPC_CONNECT_TIMEOUT; 403 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); 404 xprt->ops->connect(task); 405 } 406 return; 407} 408 409static void xprt_connect_status(struct rpc_task *task) 410{ 411 struct rpc_xprt *xprt = task->tk_xprt; 412 413 if (task->tk_status >= 0) { 414 dprintk("RPC: %4d xprt_connect_status: connection established\n", 415 task->tk_pid); 416 return; 417 } 418 419 switch (task->tk_status) { 420 case -ECONNREFUSED: 421 case -ECONNRESET: 422 dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n", 423 task->tk_pid, task->tk_client->cl_server); 424 break; 425 case -ENOTCONN: 426 dprintk("RPC: %4d xprt_connect_status: connection broken\n", 427 task->tk_pid); 428 break; 429 case -ETIMEDOUT: 430 dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n", 431 task->tk_pid); 432 break; 433 default: 434 dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n", 435 task->tk_pid, -task->tk_status, task->tk_client->cl_server); 436 xprt_release_write(xprt, task); 437 task->tk_status = -EIO; 438 return; 439 } 440 441 /* if soft mounted, just cause this RPC to fail */ 442 if (RPC_IS_SOFT(task)) { 443 xprt_release_write(xprt, task); 444 task->tk_status = -EIO; 445 } 446} 447 448/** 449 * xprt_lookup_rqst - find an RPC request corresponding to an XID 450 * @xprt: transport on which the original request was transmitted 451 * @xid: RPC XID of incoming reply 452 * 453 */ 454struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) 455{ 456 struct list_head *pos; 457 struct rpc_rqst *req = NULL; 458 459 list_for_each(pos, &xprt->recv) { 460 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); 461 if (entry->rq_xid == xid) { 462 req = entry; 463 break; 464 } 465 } 466 return req; 467} 468 469/** 470 * xprt_complete_rqst - called when reply processing is complete 471 * @xprt: controlling transport 472 * @req: RPC request that just completed 473 * @copied: actual number of bytes received from the transport 474 * 475 */ 476void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) 477{ 478 struct rpc_task *task = req->rq_task; 479 struct rpc_clnt *clnt = task->tk_client; 480 481 /* Adjust congestion window */ 482 if (!xprt->nocong) { 483 unsigned timer = task->tk_msg.rpc_proc->p_timer; 484 xprt_adjust_cwnd(xprt, copied); 485 __xprt_put_cong(xprt, req); 486 if (timer) { 487 if (req->rq_ntrans == 1) 488 rpc_update_rtt(clnt->cl_rtt, timer, 489 (long)jiffies - req->rq_xtime); 490 rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); 491 } 492 } 493 494#ifdef RPC_PROFILE 495 /* Profile only reads for now */ 496 if (copied > 1024) { 497 static unsigned long nextstat; 498 static unsigned long pkt_rtt, pkt_len, pkt_cnt; 499 500 pkt_cnt++; 501 pkt_len += req->rq_slen + copied; 502 pkt_rtt += jiffies - req->rq_xtime; 503 if (time_before(nextstat, jiffies)) { 504 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd); 505 printk("RPC: %ld %ld %ld %ld stat\n", 506 jiffies, pkt_cnt, pkt_len, pkt_rtt); 507 pkt_rtt = pkt_len = pkt_cnt = 0; 508 nextstat = jiffies + 5 * HZ; 509 } 510 } 511#endif 512 513 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); 514 list_del_init(&req->rq_list); 515 req->rq_received = req->rq_private_buf.len = copied; 516 517 /* ... and wake up the process. */ 518 rpc_wake_up_task(task); 519 return; 520} 521 522/* 523 * RPC receive timeout handler. 524 */ 525static void 526xprt_timer(struct rpc_task *task) 527{ 528 struct rpc_rqst *req = task->tk_rqstp; 529 struct rpc_xprt *xprt = req->rq_xprt; 530 531 spin_lock(&xprt->transport_lock); 532 if (req->rq_received) 533 goto out; 534 535 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); 536 __xprt_put_cong(xprt, req); 537 538 dprintk("RPC: %4d xprt_timer (%s request)\n", 539 task->tk_pid, req ? "pending" : "backlogged"); 540 541 task->tk_status = -ETIMEDOUT; 542out: 543 task->tk_timeout = 0; 544 rpc_wake_up_task(task); 545 spin_unlock(&xprt->transport_lock); 546} 547 548/** 549 * xprt_prepare_transmit - reserve the transport before sending a request 550 * @task: RPC task about to send a request 551 * 552 */ 553int xprt_prepare_transmit(struct rpc_task *task) 554{ 555 struct rpc_rqst *req = task->tk_rqstp; 556 struct rpc_xprt *xprt = req->rq_xprt; 557 int err = 0; 558 559 dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid); 560 561 if (xprt->shutdown) 562 return -EIO; 563 564 spin_lock_bh(&xprt->transport_lock); 565 if (req->rq_received && !req->rq_bytes_sent) { 566 err = req->rq_received; 567 goto out_unlock; 568 } 569 if (!__xprt_lock_write(xprt, task)) { 570 err = -EAGAIN; 571 goto out_unlock; 572 } 573 574 if (!xprt_connected(xprt)) { 575 err = -ENOTCONN; 576 goto out_unlock; 577 } 578out_unlock: 579 spin_unlock_bh(&xprt->transport_lock); 580 return err; 581} 582 583/** 584 * xprt_transmit - send an RPC request on a transport 585 * @task: controlling RPC task 586 * 587 * We have to copy the iovec because sendmsg fiddles with its contents. 588 */ 589void xprt_transmit(struct rpc_task *task) 590{ 591 struct rpc_clnt *clnt = task->tk_client; 592 struct rpc_rqst *req = task->tk_rqstp; 593 struct rpc_xprt *xprt = req->rq_xprt; 594 int status; 595 596 dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 597 598 smp_rmb(); 599 if (!req->rq_received) { 600 if (list_empty(&req->rq_list)) { 601 spin_lock_bh(&xprt->transport_lock); 602 /* Update the softirq receive buffer */ 603 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 604 sizeof(req->rq_private_buf)); 605 /* Add request to the receive list */ 606 list_add_tail(&req->rq_list, &xprt->recv); 607 spin_unlock_bh(&xprt->transport_lock); 608 xprt_reset_majortimeo(req); 609 /* Turn off autodisconnect */ 610 del_singleshot_timer_sync(&xprt->timer); 611 } 612 } else if (!req->rq_bytes_sent) 613 return; 614 615 status = xprt->ops->send_request(task); 616 if (!status) 617 goto out_receive; 618 619 /* Note: at this point, task->tk_sleeping has not yet been set, 620 * hence there is no danger of the waking up task being put on 621 * schedq, and being picked up by a parallel run of rpciod(). 622 */ 623 task->tk_status = status; 624 625 switch (status) { 626 case -ECONNREFUSED: 627 task->tk_timeout = RPC_REESTABLISH_TIMEOUT; 628 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 629 case -EAGAIN: 630 case -ENOTCONN: 631 return; 632 default: 633 break; 634 } 635 xprt_release_write(xprt, task); 636 return; 637 out_receive: 638 dprintk("RPC: %4d xmit complete\n", task->tk_pid); 639 /* Set the task's receive timeout value */ 640 spin_lock_bh(&xprt->transport_lock); 641 if (!xprt->nocong) { 642 int timer = task->tk_msg.rpc_proc->p_timer; 643 task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer); 644 task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries; 645 if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0) 646 task->tk_timeout = xprt->timeout.to_maxval; 647 } else 648 task->tk_timeout = req->rq_timeout; 649 /* Don't race with disconnect */ 650 if (!xprt_connected(xprt)) 651 task->tk_status = -ENOTCONN; 652 else if (!req->rq_received) 653 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); 654 __xprt_release_write(xprt, task); 655 spin_unlock_bh(&xprt->transport_lock); 656} 657 658static inline void do_xprt_reserve(struct rpc_task *task) 659{ 660 struct rpc_xprt *xprt = task->tk_xprt; 661 662 task->tk_status = 0; 663 if (task->tk_rqstp) 664 return; 665 if (!list_empty(&xprt->free)) { 666 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 667 list_del_init(&req->rq_list); 668 task->tk_rqstp = req; 669 xprt_request_init(task, xprt); 670 return; 671 } 672 dprintk("RPC: waiting for request slot\n"); 673 task->tk_status = -EAGAIN; 674 task->tk_timeout = 0; 675 rpc_sleep_on(&xprt->backlog, task, NULL, NULL); 676} 677 678/** 679 * xprt_reserve - allocate an RPC request slot 680 * @task: RPC task requesting a slot allocation 681 * 682 * If no more slots are available, place the task on the transport's 683 * backlog queue. 684 */ 685void xprt_reserve(struct rpc_task *task) 686{ 687 struct rpc_xprt *xprt = task->tk_xprt; 688 689 task->tk_status = -EIO; 690 if (!xprt->shutdown) { 691 spin_lock(&xprt->reserve_lock); 692 do_xprt_reserve(task); 693 spin_unlock(&xprt->reserve_lock); 694 } 695} 696 697static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) 698{ 699 return xprt->xid++; 700} 701 702static inline void xprt_init_xid(struct rpc_xprt *xprt) 703{ 704 get_random_bytes(&xprt->xid, sizeof(xprt->xid)); 705} 706 707static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 708{ 709 struct rpc_rqst *req = task->tk_rqstp; 710 711 req->rq_timeout = xprt->timeout.to_initval; 712 req->rq_task = task; 713 req->rq_xprt = xprt; 714 req->rq_xid = xprt_alloc_xid(xprt); 715 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, 716 req, ntohl(req->rq_xid)); 717} 718 719/** 720 * xprt_release - release an RPC request slot 721 * @task: task which is finished with the slot 722 * 723 */ 724void xprt_release(struct rpc_task *task) 725{ 726 struct rpc_xprt *xprt = task->tk_xprt; 727 struct rpc_rqst *req; 728 729 if (!(req = task->tk_rqstp)) 730 return; 731 spin_lock_bh(&xprt->transport_lock); 732 __xprt_release_write(xprt, task); 733 __xprt_put_cong(xprt, req); 734 if (!list_empty(&req->rq_list)) 735 list_del(&req->rq_list); 736 xprt->last_used = jiffies; 737 if (list_empty(&xprt->recv) && !xprt->shutdown) 738 mod_timer(&xprt->timer, 739 xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT); 740 spin_unlock_bh(&xprt->transport_lock); 741 task->tk_rqstp = NULL; 742 memset(req, 0, sizeof(*req)); /* mark unused */ 743 744 dprintk("RPC: %4d release request %p\n", task->tk_pid, req); 745 746 spin_lock(&xprt->reserve_lock); 747 list_add(&req->rq_list, &xprt->free); 748 xprt_clear_backlog(xprt); 749 spin_unlock(&xprt->reserve_lock); 750} 751 752/** 753 * xprt_set_timeout - set constant RPC timeout 754 * @to: RPC timeout parameters to set up 755 * @retr: number of retries 756 * @incr: amount of increase after each retry 757 * 758 */ 759void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) 760{ 761 to->to_initval = 762 to->to_increment = incr; 763 to->to_maxval = to->to_initval + (incr * retr); 764 to->to_retries = retr; 765 to->to_exponential = 0; 766} 767 768static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) 769{ 770 int result; 771 struct rpc_xprt *xprt; 772 struct rpc_rqst *req; 773 774 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) 775 return ERR_PTR(-ENOMEM); 776 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ 777 778 xprt->addr = *ap; 779 780 switch (proto) { 781 case IPPROTO_UDP: 782 result = xs_setup_udp(xprt, to); 783 break; 784 case IPPROTO_TCP: 785 result = xs_setup_tcp(xprt, to); 786 break; 787 default: 788 printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n", 789 proto); 790 result = -EIO; 791 break; 792 } 793 if (result) { 794 kfree(xprt); 795 return ERR_PTR(result); 796 } 797 798 spin_lock_init(&xprt->transport_lock); 799 spin_lock_init(&xprt->reserve_lock); 800 init_waitqueue_head(&xprt->cong_wait); 801 802 INIT_LIST_HEAD(&xprt->free); 803 INIT_LIST_HEAD(&xprt->recv); 804 INIT_WORK(&xprt->task_cleanup, xprt_autoclose, xprt); 805 init_timer(&xprt->timer); 806 xprt->timer.function = xprt_init_autodisconnect; 807 xprt->timer.data = (unsigned long) xprt; 808 xprt->last_used = jiffies; 809 810 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 811 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 812 rpc_init_wait_queue(&xprt->resend, "xprt_resend"); 813 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 814 815 /* initialize free list */ 816 for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) 817 list_add(&req->rq_list, &xprt->free); 818 819 xprt_init_xid(xprt); 820 821 dprintk("RPC: created transport %p with %u slots\n", xprt, 822 xprt->max_reqs); 823 824 return xprt; 825} 826 827/** 828 * xprt_create_proto - create an RPC client transport 829 * @proto: requested transport protocol 830 * @sap: remote peer's address 831 * @to: timeout parameters for new transport 832 * 833 */ 834struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) 835{ 836 struct rpc_xprt *xprt; 837 838 xprt = xprt_setup(proto, sap, to); 839 if (IS_ERR(xprt)) 840 dprintk("RPC: xprt_create_proto failed\n"); 841 else 842 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt); 843 return xprt; 844} 845 846static void xprt_shutdown(struct rpc_xprt *xprt) 847{ 848 xprt->shutdown = 1; 849 rpc_wake_up(&xprt->sending); 850 rpc_wake_up(&xprt->resend); 851 xprt_wake_pending_tasks(xprt, -EIO); 852 rpc_wake_up(&xprt->backlog); 853 wake_up(&xprt->cong_wait); 854 del_timer_sync(&xprt->timer); 855} 856 857static int xprt_clear_backlog(struct rpc_xprt *xprt) { 858 rpc_wake_up_next(&xprt->backlog); 859 wake_up(&xprt->cong_wait); 860 return 1; 861} 862 863/** 864 * xprt_destroy - destroy an RPC transport, killing off all requests. 865 * @xprt: transport to destroy 866 * 867 */ 868int xprt_destroy(struct rpc_xprt *xprt) 869{ 870 dprintk("RPC: destroying transport %p\n", xprt); 871 xprt_shutdown(xprt); 872 xprt->ops->destroy(xprt); 873 kfree(xprt); 874 875 return 0; 876} 877