xprt.c revision 4a0f8c04f2ece949d54a0c4fd7490259cf23a58a
1/* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_call(). 14 * - xprt_call transmits the message and installs the caller on the 15 * socket's wait list. At the same time, it installs a timer that 16 * is run after the packet's timeout has expired. 17 * - When a packet arrives, the data_ready handler walks the list of 18 * pending requests for that socket. If a matching XID is found, the 19 * caller is woken up, and the timer removed. 20 * - When no reply arrives within the timeout interval, the timer is 21 * fired by the kernel and runs xprt_timer(). It either adjusts the 22 * timeout values (minor timeout) or wakes up the caller with a status 23 * of -ETIMEDOUT. 24 * - When the caller receives a notification from RPC that a reply arrived, 25 * it should release the RPC slot, and process the reply. 26 * If the call timed out, it may choose to retry the operation by 27 * adjusting the initial timeout value, and simply calling rpc_call 28 * again. 29 * 30 * Support for async RPC is done through a set of RPC-specific scheduling 31 * primitives that `transparently' work for processes as well as async 32 * tasks that rely on callbacks. 33 * 34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 35 */ 36 37#include <linux/module.h> 38 39#include <linux/types.h> 40#include <linux/interrupt.h> 41#include <linux/workqueue.h> 42#include <linux/random.h> 43 44#include <linux/sunrpc/clnt.h> 45 46/* 47 * Local variables 48 */ 49 50#ifdef RPC_DEBUG 51# undef RPC_DEBUG_DATA 52# define RPCDBG_FACILITY RPCDBG_XPRT 53#endif 54 55#define XPRT_MAX_BACKOFF (8) 56 57/* 58 * Local functions 59 */ 60static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 61static inline void do_xprt_reserve(struct rpc_task *); 62static void xprt_connect_status(struct rpc_task *task); 63static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 64 65static int xprt_clear_backlog(struct rpc_xprt *xprt); 66 67/* 68 * Serialize write access to sockets, in order to prevent different 69 * requests from interfering with each other. 70 * Also prevents TCP socket connects from colliding with writes. 71 */ 72static int 73__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 74{ 75 struct rpc_rqst *req = task->tk_rqstp; 76 77 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) { 78 if (task == xprt->snd_task) 79 return 1; 80 goto out_sleep; 81 } 82 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 83 xprt->snd_task = task; 84 if (req) { 85 req->rq_bytes_sent = 0; 86 req->rq_ntrans++; 87 } 88 return 1; 89 } 90 smp_mb__before_clear_bit(); 91 clear_bit(XPRT_LOCKED, &xprt->sockstate); 92 smp_mb__after_clear_bit(); 93out_sleep: 94 dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt); 95 task->tk_timeout = 0; 96 task->tk_status = -EAGAIN; 97 if (req && req->rq_ntrans) 98 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 99 else 100 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 101 return 0; 102} 103 104static inline int 105xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 106{ 107 int retval; 108 109 spin_lock_bh(&xprt->transport_lock); 110 retval = __xprt_lock_write(xprt, task); 111 spin_unlock_bh(&xprt->transport_lock); 112 return retval; 113} 114 115 116static void 117__xprt_lock_write_next(struct rpc_xprt *xprt) 118{ 119 struct rpc_task *task; 120 121 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) 122 return; 123 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) 124 goto out_unlock; 125 task = rpc_wake_up_next(&xprt->resend); 126 if (!task) { 127 task = rpc_wake_up_next(&xprt->sending); 128 if (!task) 129 goto out_unlock; 130 } 131 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 132 struct rpc_rqst *req = task->tk_rqstp; 133 xprt->snd_task = task; 134 if (req) { 135 req->rq_bytes_sent = 0; 136 req->rq_ntrans++; 137 } 138 return; 139 } 140out_unlock: 141 smp_mb__before_clear_bit(); 142 clear_bit(XPRT_LOCKED, &xprt->sockstate); 143 smp_mb__after_clear_bit(); 144} 145 146/* 147 * Releases the socket for use by other requests. 148 */ 149static void 150__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 151{ 152 if (xprt->snd_task == task) { 153 xprt->snd_task = NULL; 154 smp_mb__before_clear_bit(); 155 clear_bit(XPRT_LOCKED, &xprt->sockstate); 156 smp_mb__after_clear_bit(); 157 __xprt_lock_write_next(xprt); 158 } 159} 160 161static inline void 162xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 163{ 164 spin_lock_bh(&xprt->transport_lock); 165 __xprt_release_write(xprt, task); 166 spin_unlock_bh(&xprt->transport_lock); 167} 168 169/* 170 * Van Jacobson congestion avoidance. Check if the congestion window 171 * overflowed. Put the task to sleep if this is the case. 172 */ 173static int 174__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 175{ 176 struct rpc_rqst *req = task->tk_rqstp; 177 178 if (req->rq_cong) 179 return 1; 180 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", 181 task->tk_pid, xprt->cong, xprt->cwnd); 182 if (RPCXPRT_CONGESTED(xprt)) 183 return 0; 184 req->rq_cong = 1; 185 xprt->cong += RPC_CWNDSCALE; 186 return 1; 187} 188 189/* 190 * Adjust the congestion window, and wake up the next task 191 * that has been sleeping due to congestion 192 */ 193static void 194__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 195{ 196 if (!req->rq_cong) 197 return; 198 req->rq_cong = 0; 199 xprt->cong -= RPC_CWNDSCALE; 200 __xprt_lock_write_next(xprt); 201} 202 203/* 204 * Adjust RPC congestion window 205 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 206 */ 207static void 208xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) 209{ 210 unsigned long cwnd; 211 212 cwnd = xprt->cwnd; 213 if (result >= 0 && cwnd <= xprt->cong) { 214 /* The (cwnd >> 1) term makes sure 215 * the result gets rounded properly. */ 216 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 217 if (cwnd > RPC_MAXCWND(xprt)) 218 cwnd = RPC_MAXCWND(xprt); 219 __xprt_lock_write_next(xprt); 220 } else if (result == -ETIMEDOUT) { 221 cwnd >>= 1; 222 if (cwnd < RPC_CWNDSCALE) 223 cwnd = RPC_CWNDSCALE; 224 } 225 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 226 xprt->cong, xprt->cwnd, cwnd); 227 xprt->cwnd = cwnd; 228} 229 230static void xprt_reset_majortimeo(struct rpc_rqst *req) 231{ 232 struct rpc_timeout *to = &req->rq_xprt->timeout; 233 234 req->rq_majortimeo = req->rq_timeout; 235 if (to->to_exponential) 236 req->rq_majortimeo <<= to->to_retries; 237 else 238 req->rq_majortimeo += to->to_increment * to->to_retries; 239 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 240 req->rq_majortimeo = to->to_maxval; 241 req->rq_majortimeo += jiffies; 242} 243 244/** 245 * xprt_adjust_timeout - adjust timeout values for next retransmit 246 * @req: RPC request containing parameters to use for the adjustment 247 * 248 */ 249int xprt_adjust_timeout(struct rpc_rqst *req) 250{ 251 struct rpc_xprt *xprt = req->rq_xprt; 252 struct rpc_timeout *to = &xprt->timeout; 253 int status = 0; 254 255 if (time_before(jiffies, req->rq_majortimeo)) { 256 if (to->to_exponential) 257 req->rq_timeout <<= 1; 258 else 259 req->rq_timeout += to->to_increment; 260 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 261 req->rq_timeout = to->to_maxval; 262 req->rq_retries++; 263 pprintk("RPC: %lu retrans\n", jiffies); 264 } else { 265 req->rq_timeout = to->to_initval; 266 req->rq_retries = 0; 267 xprt_reset_majortimeo(req); 268 /* Reset the RTT counters == "slow start" */ 269 spin_lock_bh(&xprt->transport_lock); 270 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 271 spin_unlock_bh(&xprt->transport_lock); 272 pprintk("RPC: %lu timeout\n", jiffies); 273 status = -ETIMEDOUT; 274 } 275 276 if (req->rq_timeout == 0) { 277 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 278 req->rq_timeout = 5 * HZ; 279 } 280 return status; 281} 282 283static void 284xprt_socket_autoclose(void *args) 285{ 286 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 287 288 xprt_disconnect(xprt); 289 xprt->ops->close(xprt); 290 xprt_release_write(xprt, NULL); 291} 292 293/** 294 * xprt_disconnect - mark a transport as disconnected 295 * @xprt: transport to flag for disconnect 296 * 297 */ 298void xprt_disconnect(struct rpc_xprt *xprt) 299{ 300 dprintk("RPC: disconnected transport %p\n", xprt); 301 spin_lock_bh(&xprt->transport_lock); 302 xprt_clear_connected(xprt); 303 rpc_wake_up_status(&xprt->pending, -ENOTCONN); 304 spin_unlock_bh(&xprt->transport_lock); 305} 306 307static void 308xprt_init_autodisconnect(unsigned long data) 309{ 310 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 311 312 spin_lock(&xprt->transport_lock); 313 if (!list_empty(&xprt->recv) || xprt->shutdown) 314 goto out_abort; 315 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) 316 goto out_abort; 317 spin_unlock(&xprt->transport_lock); 318 /* Let keventd close the socket */ 319 if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0) 320 xprt_release_write(xprt, NULL); 321 else 322 schedule_work(&xprt->task_cleanup); 323 return; 324out_abort: 325 spin_unlock(&xprt->transport_lock); 326} 327 328/** 329 * xprt_connect - schedule a transport connect operation 330 * @task: RPC task that is requesting the connect 331 * 332 */ 333void xprt_connect(struct rpc_task *task) 334{ 335 struct rpc_xprt *xprt = task->tk_xprt; 336 337 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid, 338 xprt, (xprt_connected(xprt) ? "is" : "is not")); 339 340 if (xprt->shutdown) { 341 task->tk_status = -EIO; 342 return; 343 } 344 if (!xprt->addr.sin_port) { 345 task->tk_status = -EIO; 346 return; 347 } 348 if (!xprt_lock_write(xprt, task)) 349 return; 350 if (xprt_connected(xprt)) 351 xprt_release_write(xprt, task); 352 else { 353 if (task->tk_rqstp) 354 task->tk_rqstp->rq_bytes_sent = 0; 355 356 task->tk_timeout = RPC_CONNECT_TIMEOUT; 357 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); 358 xprt->ops->connect(task); 359 } 360 return; 361} 362 363static void xprt_connect_status(struct rpc_task *task) 364{ 365 struct rpc_xprt *xprt = task->tk_xprt; 366 367 if (task->tk_status >= 0) { 368 dprintk("RPC: %4d xprt_connect_status: connection established\n", 369 task->tk_pid); 370 return; 371 } 372 373 switch (task->tk_status) { 374 case -ECONNREFUSED: 375 case -ECONNRESET: 376 dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n", 377 task->tk_pid, task->tk_client->cl_server); 378 break; 379 case -ENOTCONN: 380 dprintk("RPC: %4d xprt_connect_status: connection broken\n", 381 task->tk_pid); 382 break; 383 case -ETIMEDOUT: 384 dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n", 385 task->tk_pid); 386 break; 387 default: 388 dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n", 389 task->tk_pid, -task->tk_status, task->tk_client->cl_server); 390 xprt_release_write(xprt, task); 391 task->tk_status = -EIO; 392 return; 393 } 394 395 /* if soft mounted, just cause this RPC to fail */ 396 if (RPC_IS_SOFT(task)) { 397 xprt_release_write(xprt, task); 398 task->tk_status = -EIO; 399 } 400} 401 402/** 403 * xprt_lookup_rqst - find an RPC request corresponding to an XID 404 * @xprt: transport on which the original request was transmitted 405 * @xid: RPC XID of incoming reply 406 * 407 */ 408struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) 409{ 410 struct list_head *pos; 411 struct rpc_rqst *req = NULL; 412 413 list_for_each(pos, &xprt->recv) { 414 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); 415 if (entry->rq_xid == xid) { 416 req = entry; 417 break; 418 } 419 } 420 return req; 421} 422 423/** 424 * xprt_complete_rqst - called when reply processing is complete 425 * @xprt: controlling transport 426 * @req: RPC request that just completed 427 * @copied: actual number of bytes received from the transport 428 * 429 */ 430void xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) 431{ 432 struct rpc_task *task = req->rq_task; 433 struct rpc_clnt *clnt = task->tk_client; 434 435 /* Adjust congestion window */ 436 if (!xprt->nocong) { 437 unsigned timer = task->tk_msg.rpc_proc->p_timer; 438 xprt_adjust_cwnd(xprt, copied); 439 __xprt_put_cong(xprt, req); 440 if (timer) { 441 if (req->rq_ntrans == 1) 442 rpc_update_rtt(clnt->cl_rtt, timer, 443 (long)jiffies - req->rq_xtime); 444 rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); 445 } 446 } 447 448#ifdef RPC_PROFILE 449 /* Profile only reads for now */ 450 if (copied > 1024) { 451 static unsigned long nextstat; 452 static unsigned long pkt_rtt, pkt_len, pkt_cnt; 453 454 pkt_cnt++; 455 pkt_len += req->rq_slen + copied; 456 pkt_rtt += jiffies - req->rq_xtime; 457 if (time_before(nextstat, jiffies)) { 458 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd); 459 printk("RPC: %ld %ld %ld %ld stat\n", 460 jiffies, pkt_cnt, pkt_len, pkt_rtt); 461 pkt_rtt = pkt_len = pkt_cnt = 0; 462 nextstat = jiffies + 5 * HZ; 463 } 464 } 465#endif 466 467 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); 468 list_del_init(&req->rq_list); 469 req->rq_received = req->rq_private_buf.len = copied; 470 471 /* ... and wake up the process. */ 472 rpc_wake_up_task(task); 473 return; 474} 475 476/* 477 * RPC receive timeout handler. 478 */ 479static void 480xprt_timer(struct rpc_task *task) 481{ 482 struct rpc_rqst *req = task->tk_rqstp; 483 struct rpc_xprt *xprt = req->rq_xprt; 484 485 spin_lock(&xprt->transport_lock); 486 if (req->rq_received) 487 goto out; 488 489 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); 490 __xprt_put_cong(xprt, req); 491 492 dprintk("RPC: %4d xprt_timer (%s request)\n", 493 task->tk_pid, req ? "pending" : "backlogged"); 494 495 task->tk_status = -ETIMEDOUT; 496out: 497 task->tk_timeout = 0; 498 rpc_wake_up_task(task); 499 spin_unlock(&xprt->transport_lock); 500} 501 502/** 503 * xprt_prepare_transmit - reserve the transport before sending a request 504 * @task: RPC task about to send a request 505 * 506 */ 507int xprt_prepare_transmit(struct rpc_task *task) 508{ 509 struct rpc_rqst *req = task->tk_rqstp; 510 struct rpc_xprt *xprt = req->rq_xprt; 511 int err = 0; 512 513 dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid); 514 515 if (xprt->shutdown) 516 return -EIO; 517 518 spin_lock_bh(&xprt->transport_lock); 519 if (req->rq_received && !req->rq_bytes_sent) { 520 err = req->rq_received; 521 goto out_unlock; 522 } 523 if (!__xprt_lock_write(xprt, task)) { 524 err = -EAGAIN; 525 goto out_unlock; 526 } 527 528 if (!xprt_connected(xprt)) { 529 err = -ENOTCONN; 530 goto out_unlock; 531 } 532out_unlock: 533 spin_unlock_bh(&xprt->transport_lock); 534 return err; 535} 536 537/** 538 * xprt_transmit - send an RPC request on a transport 539 * @task: controlling RPC task 540 * 541 * We have to copy the iovec because sendmsg fiddles with its contents. 542 */ 543void xprt_transmit(struct rpc_task *task) 544{ 545 struct rpc_clnt *clnt = task->tk_client; 546 struct rpc_rqst *req = task->tk_rqstp; 547 struct rpc_xprt *xprt = req->rq_xprt; 548 int status; 549 550 dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 551 552 smp_rmb(); 553 if (!req->rq_received) { 554 if (list_empty(&req->rq_list)) { 555 spin_lock_bh(&xprt->transport_lock); 556 /* Update the softirq receive buffer */ 557 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 558 sizeof(req->rq_private_buf)); 559 /* Add request to the receive list */ 560 list_add_tail(&req->rq_list, &xprt->recv); 561 spin_unlock_bh(&xprt->transport_lock); 562 xprt_reset_majortimeo(req); 563 /* Turn off autodisconnect */ 564 del_singleshot_timer_sync(&xprt->timer); 565 } 566 } else if (!req->rq_bytes_sent) 567 return; 568 569 status = xprt->ops->send_request(task); 570 if (!status) 571 goto out_receive; 572 573 /* Note: at this point, task->tk_sleeping has not yet been set, 574 * hence there is no danger of the waking up task being put on 575 * schedq, and being picked up by a parallel run of rpciod(). 576 */ 577 task->tk_status = status; 578 579 switch (status) { 580 case -ECONNREFUSED: 581 task->tk_timeout = RPC_REESTABLISH_TIMEOUT; 582 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 583 case -EAGAIN: 584 case -ENOTCONN: 585 return; 586 default: 587 if (xprt->stream) 588 xprt_disconnect(xprt); 589 } 590 xprt_release_write(xprt, task); 591 return; 592 out_receive: 593 dprintk("RPC: %4d xmit complete\n", task->tk_pid); 594 /* Set the task's receive timeout value */ 595 spin_lock_bh(&xprt->transport_lock); 596 if (!xprt->nocong) { 597 int timer = task->tk_msg.rpc_proc->p_timer; 598 task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer); 599 task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries; 600 if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0) 601 task->tk_timeout = xprt->timeout.to_maxval; 602 } else 603 task->tk_timeout = req->rq_timeout; 604 /* Don't race with disconnect */ 605 if (!xprt_connected(xprt)) 606 task->tk_status = -ENOTCONN; 607 else if (!req->rq_received) 608 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); 609 __xprt_release_write(xprt, task); 610 spin_unlock_bh(&xprt->transport_lock); 611} 612 613static inline void do_xprt_reserve(struct rpc_task *task) 614{ 615 struct rpc_xprt *xprt = task->tk_xprt; 616 617 task->tk_status = 0; 618 if (task->tk_rqstp) 619 return; 620 if (!list_empty(&xprt->free)) { 621 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 622 list_del_init(&req->rq_list); 623 task->tk_rqstp = req; 624 xprt_request_init(task, xprt); 625 return; 626 } 627 dprintk("RPC: waiting for request slot\n"); 628 task->tk_status = -EAGAIN; 629 task->tk_timeout = 0; 630 rpc_sleep_on(&xprt->backlog, task, NULL, NULL); 631} 632 633/** 634 * xprt_reserve - allocate an RPC request slot 635 * @task: RPC task requesting a slot allocation 636 * 637 * If no more slots are available, place the task on the transport's 638 * backlog queue. 639 */ 640void xprt_reserve(struct rpc_task *task) 641{ 642 struct rpc_xprt *xprt = task->tk_xprt; 643 644 task->tk_status = -EIO; 645 if (!xprt->shutdown) { 646 spin_lock(&xprt->xprt_lock); 647 do_xprt_reserve(task); 648 spin_unlock(&xprt->xprt_lock); 649 } 650} 651 652static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) 653{ 654 return xprt->xid++; 655} 656 657static inline void xprt_init_xid(struct rpc_xprt *xprt) 658{ 659 get_random_bytes(&xprt->xid, sizeof(xprt->xid)); 660} 661 662static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 663{ 664 struct rpc_rqst *req = task->tk_rqstp; 665 666 req->rq_timeout = xprt->timeout.to_initval; 667 req->rq_task = task; 668 req->rq_xprt = xprt; 669 req->rq_xid = xprt_alloc_xid(xprt); 670 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, 671 req, ntohl(req->rq_xid)); 672} 673 674/** 675 * xprt_release - release an RPC request slot 676 * @task: task which is finished with the slot 677 * 678 */ 679void xprt_release(struct rpc_task *task) 680{ 681 struct rpc_xprt *xprt = task->tk_xprt; 682 struct rpc_rqst *req; 683 684 if (!(req = task->tk_rqstp)) 685 return; 686 spin_lock_bh(&xprt->transport_lock); 687 __xprt_release_write(xprt, task); 688 __xprt_put_cong(xprt, req); 689 if (!list_empty(&req->rq_list)) 690 list_del(&req->rq_list); 691 xprt->last_used = jiffies; 692 if (list_empty(&xprt->recv) && !xprt->shutdown) 693 mod_timer(&xprt->timer, 694 xprt->last_used + RPC_IDLE_DISCONNECT_TIMEOUT); 695 spin_unlock_bh(&xprt->transport_lock); 696 task->tk_rqstp = NULL; 697 memset(req, 0, sizeof(*req)); /* mark unused */ 698 699 dprintk("RPC: %4d release request %p\n", task->tk_pid, req); 700 701 spin_lock(&xprt->xprt_lock); 702 list_add(&req->rq_list, &xprt->free); 703 xprt_clear_backlog(xprt); 704 spin_unlock(&xprt->xprt_lock); 705} 706 707/** 708 * xprt_set_timeout - set constant RPC timeout 709 * @to: RPC timeout parameters to set up 710 * @retr: number of retries 711 * @incr: amount of increase after each retry 712 * 713 */ 714void xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) 715{ 716 to->to_initval = 717 to->to_increment = incr; 718 to->to_maxval = to->to_initval + (incr * retr); 719 to->to_retries = retr; 720 to->to_exponential = 0; 721} 722 723static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) 724{ 725 int result; 726 struct rpc_xprt *xprt; 727 struct rpc_rqst *req; 728 729 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) 730 return ERR_PTR(-ENOMEM); 731 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ 732 733 xprt->addr = *ap; 734 735 switch (proto) { 736 case IPPROTO_UDP: 737 result = xs_setup_udp(xprt, to); 738 break; 739 case IPPROTO_TCP: 740 result = xs_setup_tcp(xprt, to); 741 break; 742 default: 743 printk(KERN_ERR "RPC: unrecognized transport protocol: %d\n", 744 proto); 745 result = -EIO; 746 break; 747 } 748 if (result) { 749 kfree(xprt); 750 return ERR_PTR(result); 751 } 752 753 spin_lock_init(&xprt->transport_lock); 754 spin_lock_init(&xprt->xprt_lock); 755 init_waitqueue_head(&xprt->cong_wait); 756 757 INIT_LIST_HEAD(&xprt->free); 758 INIT_LIST_HEAD(&xprt->recv); 759 INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); 760 init_timer(&xprt->timer); 761 xprt->timer.function = xprt_init_autodisconnect; 762 xprt->timer.data = (unsigned long) xprt; 763 xprt->last_used = jiffies; 764 765 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 766 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 767 rpc_init_wait_queue(&xprt->resend, "xprt_resend"); 768 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 769 770 /* initialize free list */ 771 for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--) 772 list_add(&req->rq_list, &xprt->free); 773 774 xprt_init_xid(xprt); 775 776 dprintk("RPC: created transport %p with %u slots\n", xprt, 777 xprt->max_reqs); 778 779 return xprt; 780} 781 782/** 783 * xprt_create_proto - create an RPC client transport 784 * @proto: requested transport protocol 785 * @sap: remote peer's address 786 * @to: timeout parameters for new transport 787 * 788 */ 789struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) 790{ 791 struct rpc_xprt *xprt; 792 793 xprt = xprt_setup(proto, sap, to); 794 if (IS_ERR(xprt)) 795 dprintk("RPC: xprt_create_proto failed\n"); 796 else 797 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt); 798 return xprt; 799} 800 801static void xprt_shutdown(struct rpc_xprt *xprt) 802{ 803 xprt->shutdown = 1; 804 rpc_wake_up(&xprt->sending); 805 rpc_wake_up(&xprt->resend); 806 rpc_wake_up(&xprt->pending); 807 rpc_wake_up(&xprt->backlog); 808 wake_up(&xprt->cong_wait); 809 del_timer_sync(&xprt->timer); 810} 811 812static int xprt_clear_backlog(struct rpc_xprt *xprt) { 813 rpc_wake_up_next(&xprt->backlog); 814 wake_up(&xprt->cong_wait); 815 return 1; 816} 817 818/** 819 * xprt_destroy - destroy an RPC transport, killing off all requests. 820 * @xprt: transport to destroy 821 * 822 */ 823int xprt_destroy(struct rpc_xprt *xprt) 824{ 825 dprintk("RPC: destroying transport %p\n", xprt); 826 xprt_shutdown(xprt); 827 xprt->ops->destroy(xprt); 828 kfree(xprt); 829 830 return 0; 831} 832