xprt.c revision 34006cee28f7344f9557a4be3816c7891b1bbab1
1/* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_transmit(). 14 * - xprt_transmit sends the message and installs the caller on the 15 * transport's wait list. At the same time, if a reply is expected, 16 * it installs a timer that is run after the packet's timeout has 17 * expired. 18 * - When a packet arrives, the data_ready handler walks the list of 19 * pending requests for that transport. If a matching XID is found, the 20 * caller is woken up, and the timer removed. 21 * - When no reply arrives within the timeout interval, the timer is 22 * fired by the kernel and runs xprt_timer(). It either adjusts the 23 * timeout values (minor timeout) or wakes up the caller with a status 24 * of -ETIMEDOUT. 25 * - When the caller receives a notification from RPC that a reply arrived, 26 * it should release the RPC slot, and process the reply. 27 * If the call timed out, it may choose to retry the operation by 28 * adjusting the initial timeout value, and simply calling rpc_call 29 * again. 30 * 31 * Support for async RPC is done through a set of RPC-specific scheduling 32 * primitives that `transparently' work for processes as well as async 33 * tasks that rely on callbacks. 34 * 35 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 36 * 37 * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> 38 */ 39 40#include <linux/module.h> 41 42#include <linux/types.h> 43#include <linux/interrupt.h> 44#include <linux/workqueue.h> 45#include <linux/net.h> 46#include <linux/ktime.h> 47 48#include <linux/sunrpc/clnt.h> 49#include <linux/sunrpc/metrics.h> 50#include <linux/sunrpc/bc_xprt.h> 51 52#include "sunrpc.h" 53 54/* 55 * Local variables 56 */ 57 58#ifdef RPC_DEBUG 59# define RPCDBG_FACILITY RPCDBG_XPRT 60#endif 61 62/* 63 * Local functions 64 */ 65static void xprt_init(struct rpc_xprt *xprt, struct net *net); 66static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 67static void xprt_connect_status(struct rpc_task *task); 68static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 69 70static DEFINE_SPINLOCK(xprt_list_lock); 71static LIST_HEAD(xprt_list); 72 73/* 74 * The transport code maintains an estimate on the maximum number of out- 75 * standing RPC requests, using a smoothed version of the congestion 76 * avoidance implemented in 44BSD. This is basically the Van Jacobson 77 * congestion algorithm: If a retransmit occurs, the congestion window is 78 * halved; otherwise, it is incremented by 1/cwnd when 79 * 80 * - a reply is received and 81 * - a full number of requests are outstanding and 82 * - the congestion window hasn't been updated recently. 83 */ 84#define RPC_CWNDSHIFT (8U) 85#define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT) 86#define RPC_INITCWND RPC_CWNDSCALE 87#define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT) 88 89#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) 90 91/** 92 * xprt_register_transport - register a transport implementation 93 * @transport: transport to register 94 * 95 * If a transport implementation is loaded as a kernel module, it can 96 * call this interface to make itself known to the RPC client. 97 * 98 * Returns: 99 * 0: transport successfully registered 100 * -EEXIST: transport already registered 101 * -EINVAL: transport module being unloaded 102 */ 103int xprt_register_transport(struct xprt_class *transport) 104{ 105 struct xprt_class *t; 106 int result; 107 108 result = -EEXIST; 109 spin_lock(&xprt_list_lock); 110 list_for_each_entry(t, &xprt_list, list) { 111 /* don't register the same transport class twice */ 112 if (t->ident == transport->ident) 113 goto out; 114 } 115 116 list_add_tail(&transport->list, &xprt_list); 117 printk(KERN_INFO "RPC: Registered %s transport module.\n", 118 transport->name); 119 result = 0; 120 121out: 122 spin_unlock(&xprt_list_lock); 123 return result; 124} 125EXPORT_SYMBOL_GPL(xprt_register_transport); 126 127/** 128 * xprt_unregister_transport - unregister a transport implementation 129 * @transport: transport to unregister 130 * 131 * Returns: 132 * 0: transport successfully unregistered 133 * -ENOENT: transport never registered 134 */ 135int xprt_unregister_transport(struct xprt_class *transport) 136{ 137 struct xprt_class *t; 138 int result; 139 140 result = 0; 141 spin_lock(&xprt_list_lock); 142 list_for_each_entry(t, &xprt_list, list) { 143 if (t == transport) { 144 printk(KERN_INFO 145 "RPC: Unregistered %s transport module.\n", 146 transport->name); 147 list_del_init(&transport->list); 148 goto out; 149 } 150 } 151 result = -ENOENT; 152 153out: 154 spin_unlock(&xprt_list_lock); 155 return result; 156} 157EXPORT_SYMBOL_GPL(xprt_unregister_transport); 158 159/** 160 * xprt_load_transport - load a transport implementation 161 * @transport_name: transport to load 162 * 163 * Returns: 164 * 0: transport successfully loaded 165 * -ENOENT: transport module not available 166 */ 167int xprt_load_transport(const char *transport_name) 168{ 169 struct xprt_class *t; 170 int result; 171 172 result = 0; 173 spin_lock(&xprt_list_lock); 174 list_for_each_entry(t, &xprt_list, list) { 175 if (strcmp(t->name, transport_name) == 0) { 176 spin_unlock(&xprt_list_lock); 177 goto out; 178 } 179 } 180 spin_unlock(&xprt_list_lock); 181 result = request_module("xprt%s", transport_name); 182out: 183 return result; 184} 185EXPORT_SYMBOL_GPL(xprt_load_transport); 186 187/** 188 * xprt_reserve_xprt - serialize write access to transports 189 * @task: task that is requesting access to the transport 190 * 191 * This prevents mixing the payload of separate requests, and prevents 192 * transport connects from colliding with writes. No congestion control 193 * is provided. 194 */ 195int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 196{ 197 struct rpc_rqst *req = task->tk_rqstp; 198 int priority; 199 200 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 201 if (task == xprt->snd_task) 202 return 1; 203 goto out_sleep; 204 } 205 xprt->snd_task = task; 206 if (req != NULL) { 207 req->rq_bytes_sent = 0; 208 req->rq_ntrans++; 209 } 210 211 return 1; 212 213out_sleep: 214 dprintk("RPC: %5u failed to lock transport %p\n", 215 task->tk_pid, xprt); 216 task->tk_timeout = 0; 217 task->tk_status = -EAGAIN; 218 if (req == NULL) 219 priority = RPC_PRIORITY_LOW; 220 else if (!req->rq_ntrans) 221 priority = RPC_PRIORITY_NORMAL; 222 else 223 priority = RPC_PRIORITY_HIGH; 224 rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); 225 return 0; 226} 227EXPORT_SYMBOL_GPL(xprt_reserve_xprt); 228 229static void xprt_clear_locked(struct rpc_xprt *xprt) 230{ 231 xprt->snd_task = NULL; 232 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) { 233 smp_mb__before_clear_bit(); 234 clear_bit(XPRT_LOCKED, &xprt->state); 235 smp_mb__after_clear_bit(); 236 } else 237 queue_work(rpciod_workqueue, &xprt->task_cleanup); 238} 239 240/* 241 * xprt_reserve_xprt_cong - serialize write access to transports 242 * @task: task that is requesting access to the transport 243 * 244 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is 245 * integrated into the decision of whether a request is allowed to be 246 * woken up and given access to the transport. 247 */ 248int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 249{ 250 struct rpc_rqst *req = task->tk_rqstp; 251 int priority; 252 253 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { 254 if (task == xprt->snd_task) 255 return 1; 256 goto out_sleep; 257 } 258 if (req == NULL) { 259 xprt->snd_task = task; 260 return 1; 261 } 262 if (__xprt_get_cong(xprt, task)) { 263 xprt->snd_task = task; 264 req->rq_bytes_sent = 0; 265 req->rq_ntrans++; 266 return 1; 267 } 268 xprt_clear_locked(xprt); 269out_sleep: 270 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); 271 task->tk_timeout = 0; 272 task->tk_status = -EAGAIN; 273 if (req == NULL) 274 priority = RPC_PRIORITY_LOW; 275 else if (!req->rq_ntrans) 276 priority = RPC_PRIORITY_NORMAL; 277 else 278 priority = RPC_PRIORITY_HIGH; 279 rpc_sleep_on_priority(&xprt->sending, task, NULL, priority); 280 return 0; 281} 282EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong); 283 284static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 285{ 286 int retval; 287 288 spin_lock_bh(&xprt->transport_lock); 289 retval = xprt->ops->reserve_xprt(xprt, task); 290 spin_unlock_bh(&xprt->transport_lock); 291 return retval; 292} 293 294static void __xprt_lock_write_next(struct rpc_xprt *xprt) 295{ 296 struct rpc_task *task; 297 struct rpc_rqst *req; 298 299 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 300 return; 301 302 task = rpc_wake_up_next(&xprt->sending); 303 if (task == NULL) 304 goto out_unlock; 305 306 req = task->tk_rqstp; 307 xprt->snd_task = task; 308 if (req) { 309 req->rq_bytes_sent = 0; 310 req->rq_ntrans++; 311 } 312 return; 313 314out_unlock: 315 xprt_clear_locked(xprt); 316} 317 318static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) 319{ 320 struct rpc_task *task; 321 struct rpc_rqst *req; 322 323 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 324 return; 325 if (RPCXPRT_CONGESTED(xprt)) 326 goto out_unlock; 327 task = rpc_wake_up_next(&xprt->sending); 328 if (task == NULL) 329 goto out_unlock; 330 331 req = task->tk_rqstp; 332 if (req == NULL) { 333 xprt->snd_task = task; 334 return; 335 } 336 if (__xprt_get_cong(xprt, task)) { 337 xprt->snd_task = task; 338 req->rq_bytes_sent = 0; 339 req->rq_ntrans++; 340 return; 341 } 342out_unlock: 343 xprt_clear_locked(xprt); 344} 345 346/** 347 * xprt_release_xprt - allow other requests to use a transport 348 * @xprt: transport with other tasks potentially waiting 349 * @task: task that is releasing access to the transport 350 * 351 * Note that "task" can be NULL. No congestion control is provided. 352 */ 353void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 354{ 355 if (xprt->snd_task == task) { 356 xprt_clear_locked(xprt); 357 __xprt_lock_write_next(xprt); 358 } 359} 360EXPORT_SYMBOL_GPL(xprt_release_xprt); 361 362/** 363 * xprt_release_xprt_cong - allow other requests to use a transport 364 * @xprt: transport with other tasks potentially waiting 365 * @task: task that is releasing access to the transport 366 * 367 * Note that "task" can be NULL. Another task is awoken to use the 368 * transport if the transport's congestion window allows it. 369 */ 370void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task) 371{ 372 if (xprt->snd_task == task) { 373 xprt_clear_locked(xprt); 374 __xprt_lock_write_next_cong(xprt); 375 } 376} 377EXPORT_SYMBOL_GPL(xprt_release_xprt_cong); 378 379static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 380{ 381 spin_lock_bh(&xprt->transport_lock); 382 xprt->ops->release_xprt(xprt, task); 383 spin_unlock_bh(&xprt->transport_lock); 384} 385 386/* 387 * Van Jacobson congestion avoidance. Check if the congestion window 388 * overflowed. Put the task to sleep if this is the case. 389 */ 390static int 391__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 392{ 393 struct rpc_rqst *req = task->tk_rqstp; 394 395 if (req->rq_cong) 396 return 1; 397 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", 398 task->tk_pid, xprt->cong, xprt->cwnd); 399 if (RPCXPRT_CONGESTED(xprt)) 400 return 0; 401 req->rq_cong = 1; 402 xprt->cong += RPC_CWNDSCALE; 403 return 1; 404} 405 406/* 407 * Adjust the congestion window, and wake up the next task 408 * that has been sleeping due to congestion 409 */ 410static void 411__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 412{ 413 if (!req->rq_cong) 414 return; 415 req->rq_cong = 0; 416 xprt->cong -= RPC_CWNDSCALE; 417 __xprt_lock_write_next_cong(xprt); 418} 419 420/** 421 * xprt_release_rqst_cong - housekeeping when request is complete 422 * @task: RPC request that recently completed 423 * 424 * Useful for transports that require congestion control. 425 */ 426void xprt_release_rqst_cong(struct rpc_task *task) 427{ 428 __xprt_put_cong(task->tk_xprt, task->tk_rqstp); 429} 430EXPORT_SYMBOL_GPL(xprt_release_rqst_cong); 431 432/** 433 * xprt_adjust_cwnd - adjust transport congestion window 434 * @task: recently completed RPC request used to adjust window 435 * @result: result code of completed RPC request 436 * 437 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 438 */ 439void xprt_adjust_cwnd(struct rpc_task *task, int result) 440{ 441 struct rpc_rqst *req = task->tk_rqstp; 442 struct rpc_xprt *xprt = task->tk_xprt; 443 unsigned long cwnd = xprt->cwnd; 444 445 if (result >= 0 && cwnd <= xprt->cong) { 446 /* The (cwnd >> 1) term makes sure 447 * the result gets rounded properly. */ 448 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 449 if (cwnd > RPC_MAXCWND(xprt)) 450 cwnd = RPC_MAXCWND(xprt); 451 __xprt_lock_write_next_cong(xprt); 452 } else if (result == -ETIMEDOUT) { 453 cwnd >>= 1; 454 if (cwnd < RPC_CWNDSCALE) 455 cwnd = RPC_CWNDSCALE; 456 } 457 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 458 xprt->cong, xprt->cwnd, cwnd); 459 xprt->cwnd = cwnd; 460 __xprt_put_cong(xprt, req); 461} 462EXPORT_SYMBOL_GPL(xprt_adjust_cwnd); 463 464/** 465 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue 466 * @xprt: transport with waiting tasks 467 * @status: result code to plant in each task before waking it 468 * 469 */ 470void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status) 471{ 472 if (status < 0) 473 rpc_wake_up_status(&xprt->pending, status); 474 else 475 rpc_wake_up(&xprt->pending); 476} 477EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks); 478 479/** 480 * xprt_wait_for_buffer_space - wait for transport output buffer to clear 481 * @task: task to be put to sleep 482 * @action: function pointer to be executed after wait 483 */ 484void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action) 485{ 486 struct rpc_rqst *req = task->tk_rqstp; 487 struct rpc_xprt *xprt = req->rq_xprt; 488 489 task->tk_timeout = req->rq_timeout; 490 rpc_sleep_on(&xprt->pending, task, action); 491} 492EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space); 493 494/** 495 * xprt_write_space - wake the task waiting for transport output buffer space 496 * @xprt: transport with waiting tasks 497 * 498 * Can be called in a soft IRQ context, so xprt_write_space never sleeps. 499 */ 500void xprt_write_space(struct rpc_xprt *xprt) 501{ 502 if (unlikely(xprt->shutdown)) 503 return; 504 505 spin_lock_bh(&xprt->transport_lock); 506 if (xprt->snd_task) { 507 dprintk("RPC: write space: waking waiting task on " 508 "xprt %p\n", xprt); 509 rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task); 510 } 511 spin_unlock_bh(&xprt->transport_lock); 512} 513EXPORT_SYMBOL_GPL(xprt_write_space); 514 515/** 516 * xprt_set_retrans_timeout_def - set a request's retransmit timeout 517 * @task: task whose timeout is to be set 518 * 519 * Set a request's retransmit timeout based on the transport's 520 * default timeout parameters. Used by transports that don't adjust 521 * the retransmit timeout based on round-trip time estimation. 522 */ 523void xprt_set_retrans_timeout_def(struct rpc_task *task) 524{ 525 task->tk_timeout = task->tk_rqstp->rq_timeout; 526} 527EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def); 528 529/* 530 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout 531 * @task: task whose timeout is to be set 532 * 533 * Set a request's retransmit timeout using the RTT estimator. 534 */ 535void xprt_set_retrans_timeout_rtt(struct rpc_task *task) 536{ 537 int timer = task->tk_msg.rpc_proc->p_timer; 538 struct rpc_clnt *clnt = task->tk_client; 539 struct rpc_rtt *rtt = clnt->cl_rtt; 540 struct rpc_rqst *req = task->tk_rqstp; 541 unsigned long max_timeout = clnt->cl_timeout->to_maxval; 542 543 task->tk_timeout = rpc_calc_rto(rtt, timer); 544 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; 545 if (task->tk_timeout > max_timeout || task->tk_timeout == 0) 546 task->tk_timeout = max_timeout; 547} 548EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt); 549 550static void xprt_reset_majortimeo(struct rpc_rqst *req) 551{ 552 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 553 554 req->rq_majortimeo = req->rq_timeout; 555 if (to->to_exponential) 556 req->rq_majortimeo <<= to->to_retries; 557 else 558 req->rq_majortimeo += to->to_increment * to->to_retries; 559 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 560 req->rq_majortimeo = to->to_maxval; 561 req->rq_majortimeo += jiffies; 562} 563 564/** 565 * xprt_adjust_timeout - adjust timeout values for next retransmit 566 * @req: RPC request containing parameters to use for the adjustment 567 * 568 */ 569int xprt_adjust_timeout(struct rpc_rqst *req) 570{ 571 struct rpc_xprt *xprt = req->rq_xprt; 572 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout; 573 int status = 0; 574 575 if (time_before(jiffies, req->rq_majortimeo)) { 576 if (to->to_exponential) 577 req->rq_timeout <<= 1; 578 else 579 req->rq_timeout += to->to_increment; 580 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 581 req->rq_timeout = to->to_maxval; 582 req->rq_retries++; 583 } else { 584 req->rq_timeout = to->to_initval; 585 req->rq_retries = 0; 586 xprt_reset_majortimeo(req); 587 /* Reset the RTT counters == "slow start" */ 588 spin_lock_bh(&xprt->transport_lock); 589 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 590 spin_unlock_bh(&xprt->transport_lock); 591 status = -ETIMEDOUT; 592 } 593 594 if (req->rq_timeout == 0) { 595 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 596 req->rq_timeout = 5 * HZ; 597 } 598 return status; 599} 600 601static void xprt_autoclose(struct work_struct *work) 602{ 603 struct rpc_xprt *xprt = 604 container_of(work, struct rpc_xprt, task_cleanup); 605 606 xprt->ops->close(xprt); 607 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 608 xprt_release_write(xprt, NULL); 609} 610 611/** 612 * xprt_disconnect_done - mark a transport as disconnected 613 * @xprt: transport to flag for disconnect 614 * 615 */ 616void xprt_disconnect_done(struct rpc_xprt *xprt) 617{ 618 dprintk("RPC: disconnected transport %p\n", xprt); 619 spin_lock_bh(&xprt->transport_lock); 620 xprt_clear_connected(xprt); 621 xprt_wake_pending_tasks(xprt, -EAGAIN); 622 spin_unlock_bh(&xprt->transport_lock); 623} 624EXPORT_SYMBOL_GPL(xprt_disconnect_done); 625 626/** 627 * xprt_force_disconnect - force a transport to disconnect 628 * @xprt: transport to disconnect 629 * 630 */ 631void xprt_force_disconnect(struct rpc_xprt *xprt) 632{ 633 /* Don't race with the test_bit() in xprt_clear_locked() */ 634 spin_lock_bh(&xprt->transport_lock); 635 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 636 /* Try to schedule an autoclose RPC call */ 637 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 638 queue_work(rpciod_workqueue, &xprt->task_cleanup); 639 xprt_wake_pending_tasks(xprt, -EAGAIN); 640 spin_unlock_bh(&xprt->transport_lock); 641} 642 643/** 644 * xprt_conditional_disconnect - force a transport to disconnect 645 * @xprt: transport to disconnect 646 * @cookie: 'connection cookie' 647 * 648 * This attempts to break the connection if and only if 'cookie' matches 649 * the current transport 'connection cookie'. It ensures that we don't 650 * try to break the connection more than once when we need to retransmit 651 * a batch of RPC requests. 652 * 653 */ 654void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) 655{ 656 /* Don't race with the test_bit() in xprt_clear_locked() */ 657 spin_lock_bh(&xprt->transport_lock); 658 if (cookie != xprt->connect_cookie) 659 goto out; 660 if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt)) 661 goto out; 662 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 663 /* Try to schedule an autoclose RPC call */ 664 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) 665 queue_work(rpciod_workqueue, &xprt->task_cleanup); 666 xprt_wake_pending_tasks(xprt, -EAGAIN); 667out: 668 spin_unlock_bh(&xprt->transport_lock); 669} 670 671static void 672xprt_init_autodisconnect(unsigned long data) 673{ 674 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 675 676 spin_lock(&xprt->transport_lock); 677 if (!list_empty(&xprt->recv) || xprt->shutdown) 678 goto out_abort; 679 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) 680 goto out_abort; 681 spin_unlock(&xprt->transport_lock); 682 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 683 queue_work(rpciod_workqueue, &xprt->task_cleanup); 684 return; 685out_abort: 686 spin_unlock(&xprt->transport_lock); 687} 688 689/** 690 * xprt_connect - schedule a transport connect operation 691 * @task: RPC task that is requesting the connect 692 * 693 */ 694void xprt_connect(struct rpc_task *task) 695{ 696 struct rpc_xprt *xprt = task->tk_xprt; 697 698 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid, 699 xprt, (xprt_connected(xprt) ? "is" : "is not")); 700 701 if (!xprt_bound(xprt)) { 702 task->tk_status = -EAGAIN; 703 return; 704 } 705 if (!xprt_lock_write(xprt, task)) 706 return; 707 708 if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state)) 709 xprt->ops->close(xprt); 710 711 if (xprt_connected(xprt)) 712 xprt_release_write(xprt, task); 713 else { 714 if (task->tk_rqstp) 715 task->tk_rqstp->rq_bytes_sent = 0; 716 717 task->tk_timeout = task->tk_rqstp->rq_timeout; 718 rpc_sleep_on(&xprt->pending, task, xprt_connect_status); 719 720 if (test_bit(XPRT_CLOSING, &xprt->state)) 721 return; 722 if (xprt_test_and_set_connecting(xprt)) 723 return; 724 xprt->stat.connect_start = jiffies; 725 xprt->ops->connect(task); 726 } 727} 728 729static void xprt_connect_status(struct rpc_task *task) 730{ 731 struct rpc_xprt *xprt = task->tk_xprt; 732 733 if (task->tk_status == 0) { 734 xprt->stat.connect_count++; 735 xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start; 736 dprintk("RPC: %5u xprt_connect_status: connection established\n", 737 task->tk_pid); 738 return; 739 } 740 741 switch (task->tk_status) { 742 case -EAGAIN: 743 dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid); 744 break; 745 case -ETIMEDOUT: 746 dprintk("RPC: %5u xprt_connect_status: connect attempt timed " 747 "out\n", task->tk_pid); 748 break; 749 default: 750 dprintk("RPC: %5u xprt_connect_status: error %d connecting to " 751 "server %s\n", task->tk_pid, -task->tk_status, 752 task->tk_client->cl_server); 753 xprt_release_write(xprt, task); 754 task->tk_status = -EIO; 755 } 756} 757 758/** 759 * xprt_lookup_rqst - find an RPC request corresponding to an XID 760 * @xprt: transport on which the original request was transmitted 761 * @xid: RPC XID of incoming reply 762 * 763 */ 764struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid) 765{ 766 struct rpc_rqst *entry; 767 768 list_for_each_entry(entry, &xprt->recv, rq_list) 769 if (entry->rq_xid == xid) 770 return entry; 771 772 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n", 773 ntohl(xid)); 774 xprt->stat.bad_xids++; 775 return NULL; 776} 777EXPORT_SYMBOL_GPL(xprt_lookup_rqst); 778 779static void xprt_update_rtt(struct rpc_task *task) 780{ 781 struct rpc_rqst *req = task->tk_rqstp; 782 struct rpc_rtt *rtt = task->tk_client->cl_rtt; 783 unsigned timer = task->tk_msg.rpc_proc->p_timer; 784 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt)); 785 786 if (timer) { 787 if (req->rq_ntrans == 1) 788 rpc_update_rtt(rtt, timer, m); 789 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1); 790 } 791} 792 793/** 794 * xprt_complete_rqst - called when reply processing is complete 795 * @task: RPC request that recently completed 796 * @copied: actual number of bytes received from the transport 797 * 798 * Caller holds transport lock. 799 */ 800void xprt_complete_rqst(struct rpc_task *task, int copied) 801{ 802 struct rpc_rqst *req = task->tk_rqstp; 803 struct rpc_xprt *xprt = req->rq_xprt; 804 805 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n", 806 task->tk_pid, ntohl(req->rq_xid), copied); 807 808 xprt->stat.recvs++; 809 req->rq_rtt = ktime_sub(ktime_get(), req->rq_xtime); 810 if (xprt->ops->timer != NULL) 811 xprt_update_rtt(task); 812 813 list_del_init(&req->rq_list); 814 req->rq_private_buf.len = copied; 815 /* Ensure all writes are done before we update */ 816 /* req->rq_reply_bytes_recvd */ 817 smp_wmb(); 818 req->rq_reply_bytes_recvd = copied; 819 rpc_wake_up_queued_task(&xprt->pending, task); 820} 821EXPORT_SYMBOL_GPL(xprt_complete_rqst); 822 823static void xprt_timer(struct rpc_task *task) 824{ 825 struct rpc_rqst *req = task->tk_rqstp; 826 struct rpc_xprt *xprt = req->rq_xprt; 827 828 if (task->tk_status != -ETIMEDOUT) 829 return; 830 dprintk("RPC: %5u xprt_timer\n", task->tk_pid); 831 832 spin_lock_bh(&xprt->transport_lock); 833 if (!req->rq_reply_bytes_recvd) { 834 if (xprt->ops->timer) 835 xprt->ops->timer(task); 836 } else 837 task->tk_status = 0; 838 spin_unlock_bh(&xprt->transport_lock); 839} 840 841static inline int xprt_has_timer(struct rpc_xprt *xprt) 842{ 843 return xprt->idle_timeout != 0; 844} 845 846/** 847 * xprt_prepare_transmit - reserve the transport before sending a request 848 * @task: RPC task about to send a request 849 * 850 */ 851int xprt_prepare_transmit(struct rpc_task *task) 852{ 853 struct rpc_rqst *req = task->tk_rqstp; 854 struct rpc_xprt *xprt = req->rq_xprt; 855 int err = 0; 856 857 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid); 858 859 spin_lock_bh(&xprt->transport_lock); 860 if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) { 861 err = req->rq_reply_bytes_recvd; 862 goto out_unlock; 863 } 864 if (!xprt->ops->reserve_xprt(xprt, task)) 865 err = -EAGAIN; 866out_unlock: 867 spin_unlock_bh(&xprt->transport_lock); 868 return err; 869} 870 871void xprt_end_transmit(struct rpc_task *task) 872{ 873 xprt_release_write(task->tk_rqstp->rq_xprt, task); 874} 875 876/** 877 * xprt_transmit - send an RPC request on a transport 878 * @task: controlling RPC task 879 * 880 * We have to copy the iovec because sendmsg fiddles with its contents. 881 */ 882void xprt_transmit(struct rpc_task *task) 883{ 884 struct rpc_rqst *req = task->tk_rqstp; 885 struct rpc_xprt *xprt = req->rq_xprt; 886 int status; 887 888 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 889 890 if (!req->rq_reply_bytes_recvd) { 891 if (list_empty(&req->rq_list) && rpc_reply_expected(task)) { 892 /* 893 * Add to the list only if we're expecting a reply 894 */ 895 spin_lock_bh(&xprt->transport_lock); 896 /* Update the softirq receive buffer */ 897 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 898 sizeof(req->rq_private_buf)); 899 /* Add request to the receive list */ 900 list_add_tail(&req->rq_list, &xprt->recv); 901 spin_unlock_bh(&xprt->transport_lock); 902 xprt_reset_majortimeo(req); 903 /* Turn off autodisconnect */ 904 del_singleshot_timer_sync(&xprt->timer); 905 } 906 } else if (!req->rq_bytes_sent) 907 return; 908 909 req->rq_connect_cookie = xprt->connect_cookie; 910 req->rq_xtime = ktime_get(); 911 status = xprt->ops->send_request(task); 912 if (status != 0) { 913 task->tk_status = status; 914 return; 915 } 916 917 dprintk("RPC: %5u xmit complete\n", task->tk_pid); 918 task->tk_flags |= RPC_TASK_SENT; 919 spin_lock_bh(&xprt->transport_lock); 920 921 xprt->ops->set_retrans_timeout(task); 922 923 xprt->stat.sends++; 924 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; 925 xprt->stat.bklog_u += xprt->backlog.qlen; 926 927 /* Don't race with disconnect */ 928 if (!xprt_connected(xprt)) 929 task->tk_status = -ENOTCONN; 930 else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) { 931 /* 932 * Sleep on the pending queue since 933 * we're expecting a reply. 934 */ 935 rpc_sleep_on(&xprt->pending, task, xprt_timer); 936 } 937 spin_unlock_bh(&xprt->transport_lock); 938} 939 940static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags) 941{ 942 struct rpc_rqst *req = ERR_PTR(-EAGAIN); 943 944 if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs)) 945 goto out; 946 req = kzalloc(sizeof(struct rpc_rqst), gfp_flags); 947 if (req != NULL) 948 goto out; 949 atomic_dec(&xprt->num_reqs); 950 req = ERR_PTR(-ENOMEM); 951out: 952 return req; 953} 954 955static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 956{ 957 if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) { 958 kfree(req); 959 return true; 960 } 961 return false; 962} 963 964static void xprt_alloc_slot(struct rpc_task *task) 965{ 966 struct rpc_xprt *xprt = task->tk_xprt; 967 struct rpc_rqst *req; 968 969 if (!list_empty(&xprt->free)) { 970 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 971 list_del(&req->rq_list); 972 goto out_init_req; 973 } 974 req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT); 975 if (!IS_ERR(req)) 976 goto out_init_req; 977 switch (PTR_ERR(req)) { 978 case -ENOMEM: 979 rpc_delay(task, HZ >> 2); 980 dprintk("RPC: dynamic allocation of request slot " 981 "failed! Retrying\n"); 982 break; 983 case -EAGAIN: 984 rpc_sleep_on(&xprt->backlog, task, NULL); 985 dprintk("RPC: waiting for request slot\n"); 986 } 987 task->tk_status = -EAGAIN; 988 return; 989out_init_req: 990 task->tk_status = 0; 991 task->tk_rqstp = req; 992 xprt_request_init(task, xprt); 993} 994 995static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req) 996{ 997 if (xprt_dynamic_free_slot(xprt, req)) 998 return; 999 1000 memset(req, 0, sizeof(*req)); /* mark unused */ 1001 1002 spin_lock(&xprt->reserve_lock); 1003 list_add(&req->rq_list, &xprt->free); 1004 rpc_wake_up_next(&xprt->backlog); 1005 spin_unlock(&xprt->reserve_lock); 1006} 1007 1008static void xprt_free_all_slots(struct rpc_xprt *xprt) 1009{ 1010 struct rpc_rqst *req; 1011 while (!list_empty(&xprt->free)) { 1012 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list); 1013 list_del(&req->rq_list); 1014 kfree(req); 1015 } 1016} 1017 1018struct rpc_xprt *xprt_alloc(struct net *net, size_t size, 1019 unsigned int num_prealloc, 1020 unsigned int max_alloc) 1021{ 1022 struct rpc_xprt *xprt; 1023 struct rpc_rqst *req; 1024 int i; 1025 1026 xprt = kzalloc(size, GFP_KERNEL); 1027 if (xprt == NULL) 1028 goto out; 1029 1030 xprt_init(xprt, net); 1031 1032 for (i = 0; i < num_prealloc; i++) { 1033 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL); 1034 if (!req) 1035 break; 1036 list_add(&req->rq_list, &xprt->free); 1037 } 1038 if (i < num_prealloc) 1039 goto out_free; 1040 if (max_alloc > num_prealloc) 1041 xprt->max_reqs = max_alloc; 1042 else 1043 xprt->max_reqs = num_prealloc; 1044 xprt->min_reqs = num_prealloc; 1045 atomic_set(&xprt->num_reqs, num_prealloc); 1046 1047 return xprt; 1048 1049out_free: 1050 xprt_free(xprt); 1051out: 1052 return NULL; 1053} 1054EXPORT_SYMBOL_GPL(xprt_alloc); 1055 1056void xprt_free(struct rpc_xprt *xprt) 1057{ 1058 put_net(xprt->xprt_net); 1059 xprt_free_all_slots(xprt); 1060 kfree(xprt); 1061} 1062EXPORT_SYMBOL_GPL(xprt_free); 1063 1064/** 1065 * xprt_reserve - allocate an RPC request slot 1066 * @task: RPC task requesting a slot allocation 1067 * 1068 * If no more slots are available, place the task on the transport's 1069 * backlog queue. 1070 */ 1071void xprt_reserve(struct rpc_task *task) 1072{ 1073 struct rpc_xprt *xprt = task->tk_xprt; 1074 1075 task->tk_status = 0; 1076 if (task->tk_rqstp != NULL) 1077 return; 1078 1079 /* Note: grabbing the xprt_lock_write() here is not strictly needed, 1080 * but ensures that we throttle new slot allocation if the transport 1081 * is congested (e.g. if reconnecting or if we're out of socket 1082 * write buffer space). 1083 */ 1084 task->tk_timeout = 0; 1085 task->tk_status = -EAGAIN; 1086 if (!xprt_lock_write(xprt, task)) 1087 return; 1088 1089 spin_lock(&xprt->reserve_lock); 1090 xprt_alloc_slot(task); 1091 spin_unlock(&xprt->reserve_lock); 1092 xprt_release_write(xprt, task); 1093} 1094 1095static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt) 1096{ 1097 return (__force __be32)xprt->xid++; 1098} 1099 1100static inline void xprt_init_xid(struct rpc_xprt *xprt) 1101{ 1102 xprt->xid = net_random(); 1103} 1104 1105static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 1106{ 1107 struct rpc_rqst *req = task->tk_rqstp; 1108 1109 INIT_LIST_HEAD(&req->rq_list); 1110 req->rq_timeout = task->tk_client->cl_timeout->to_initval; 1111 req->rq_task = task; 1112 req->rq_xprt = xprt; 1113 req->rq_buffer = NULL; 1114 req->rq_xid = xprt_alloc_xid(xprt); 1115 req->rq_release_snd_buf = NULL; 1116 xprt_reset_majortimeo(req); 1117 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid, 1118 req, ntohl(req->rq_xid)); 1119} 1120 1121/** 1122 * xprt_release - release an RPC request slot 1123 * @task: task which is finished with the slot 1124 * 1125 */ 1126void xprt_release(struct rpc_task *task) 1127{ 1128 struct rpc_xprt *xprt; 1129 struct rpc_rqst *req; 1130 1131 if (!(req = task->tk_rqstp)) 1132 return; 1133 1134 xprt = req->rq_xprt; 1135 rpc_count_iostats(task); 1136 spin_lock_bh(&xprt->transport_lock); 1137 xprt->ops->release_xprt(xprt, task); 1138 if (xprt->ops->release_request) 1139 xprt->ops->release_request(task); 1140 if (!list_empty(&req->rq_list)) 1141 list_del(&req->rq_list); 1142 xprt->last_used = jiffies; 1143 if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) 1144 mod_timer(&xprt->timer, 1145 xprt->last_used + xprt->idle_timeout); 1146 spin_unlock_bh(&xprt->transport_lock); 1147 if (req->rq_buffer) 1148 xprt->ops->buf_free(req->rq_buffer); 1149 if (req->rq_cred != NULL) 1150 put_rpccred(req->rq_cred); 1151 task->tk_rqstp = NULL; 1152 if (req->rq_release_snd_buf) 1153 req->rq_release_snd_buf(req); 1154 1155 dprintk("RPC: %5u release request %p\n", task->tk_pid, req); 1156 if (likely(!bc_prealloc(req))) 1157 xprt_free_slot(xprt, req); 1158 else 1159 xprt_free_bc_request(req); 1160} 1161 1162static void xprt_init(struct rpc_xprt *xprt, struct net *net) 1163{ 1164 atomic_set(&xprt->count, 1); 1165 1166 spin_lock_init(&xprt->transport_lock); 1167 spin_lock_init(&xprt->reserve_lock); 1168 1169 INIT_LIST_HEAD(&xprt->free); 1170 INIT_LIST_HEAD(&xprt->recv); 1171#if defined(CONFIG_SUNRPC_BACKCHANNEL) 1172 spin_lock_init(&xprt->bc_pa_lock); 1173 INIT_LIST_HEAD(&xprt->bc_pa_list); 1174#endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1175 1176 xprt->last_used = jiffies; 1177 xprt->cwnd = RPC_INITCWND; 1178 xprt->bind_index = 0; 1179 1180 rpc_init_wait_queue(&xprt->binding, "xprt_binding"); 1181 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1182 rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending"); 1183 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1184 1185 xprt_init_xid(xprt); 1186 1187 xprt->xprt_net = get_net(net); 1188} 1189 1190/** 1191 * xprt_create_transport - create an RPC transport 1192 * @args: rpc transport creation arguments 1193 * 1194 */ 1195struct rpc_xprt *xprt_create_transport(struct xprt_create *args) 1196{ 1197 struct rpc_xprt *xprt; 1198 struct xprt_class *t; 1199 1200 spin_lock(&xprt_list_lock); 1201 list_for_each_entry(t, &xprt_list, list) { 1202 if (t->ident == args->ident) { 1203 spin_unlock(&xprt_list_lock); 1204 goto found; 1205 } 1206 } 1207 spin_unlock(&xprt_list_lock); 1208 printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident); 1209 return ERR_PTR(-EIO); 1210 1211found: 1212 xprt = t->setup(args); 1213 if (IS_ERR(xprt)) { 1214 dprintk("RPC: xprt_create_transport: failed, %ld\n", 1215 -PTR_ERR(xprt)); 1216 goto out; 1217 } 1218 INIT_WORK(&xprt->task_cleanup, xprt_autoclose); 1219 if (xprt_has_timer(xprt)) 1220 setup_timer(&xprt->timer, xprt_init_autodisconnect, 1221 (unsigned long)xprt); 1222 else 1223 init_timer(&xprt->timer); 1224 dprintk("RPC: created transport %p with %u slots\n", xprt, 1225 xprt->max_reqs); 1226out: 1227 return xprt; 1228} 1229 1230/** 1231 * xprt_destroy - destroy an RPC transport, killing off all requests. 1232 * @xprt: transport to destroy 1233 * 1234 */ 1235static void xprt_destroy(struct rpc_xprt *xprt) 1236{ 1237 dprintk("RPC: destroying transport %p\n", xprt); 1238 xprt->shutdown = 1; 1239 del_timer_sync(&xprt->timer); 1240 1241 rpc_destroy_wait_queue(&xprt->binding); 1242 rpc_destroy_wait_queue(&xprt->pending); 1243 rpc_destroy_wait_queue(&xprt->sending); 1244 rpc_destroy_wait_queue(&xprt->backlog); 1245 cancel_work_sync(&xprt->task_cleanup); 1246 /* 1247 * Tear down transport state and free the rpc_xprt 1248 */ 1249 xprt->ops->destroy(xprt); 1250} 1251 1252/** 1253 * xprt_put - release a reference to an RPC transport. 1254 * @xprt: pointer to the transport 1255 * 1256 */ 1257void xprt_put(struct rpc_xprt *xprt) 1258{ 1259 if (atomic_dec_and_test(&xprt->count)) 1260 xprt_destroy(xprt); 1261} 1262 1263/** 1264 * xprt_get - return a reference to an RPC transport. 1265 * @xprt: pointer to the transport 1266 * 1267 */ 1268struct rpc_xprt *xprt_get(struct rpc_xprt *xprt) 1269{ 1270 if (atomic_inc_not_zero(&xprt->count)) 1271 return xprt; 1272 return NULL; 1273} 1274