xprt.c revision eab5c084b858fd95a873fc2b97de9a9ad937b4ed
1/* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_call(). 14 * - xprt_call transmits the message and installs the caller on the 15 * socket's wait list. At the same time, it installs a timer that 16 * is run after the packet's timeout has expired. 17 * - When a packet arrives, the data_ready handler walks the list of 18 * pending requests for that socket. If a matching XID is found, the 19 * caller is woken up, and the timer removed. 20 * - When no reply arrives within the timeout interval, the timer is 21 * fired by the kernel and runs xprt_timer(). It either adjusts the 22 * timeout values (minor timeout) or wakes up the caller with a status 23 * of -ETIMEDOUT. 24 * - When the caller receives a notification from RPC that a reply arrived, 25 * it should release the RPC slot, and process the reply. 26 * If the call timed out, it may choose to retry the operation by 27 * adjusting the initial timeout value, and simply calling rpc_call 28 * again. 29 * 30 * Support for async RPC is done through a set of RPC-specific scheduling 31 * primitives that `transparently' work for processes as well as async 32 * tasks that rely on callbacks. 33 * 34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 35 * 36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> 37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> 38 * TCP NFS related read + write fixes 39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 40 * 41 * Rewrite of larges part of the code in order to stabilize TCP stuff. 42 * Fix behaviour when socket buffer is full. 43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 44 */ 45 46#include <linux/types.h> 47#include <linux/slab.h> 48#include <linux/capability.h> 49#include <linux/sched.h> 50#include <linux/errno.h> 51#include <linux/socket.h> 52#include <linux/in.h> 53#include <linux/net.h> 54#include <linux/mm.h> 55#include <linux/udp.h> 56#include <linux/tcp.h> 57#include <linux/sunrpc/clnt.h> 58#include <linux/file.h> 59#include <linux/workqueue.h> 60#include <linux/random.h> 61 62#include <net/sock.h> 63#include <net/checksum.h> 64#include <net/udp.h> 65#include <net/tcp.h> 66 67/* 68 * Local variables 69 */ 70 71#ifdef RPC_DEBUG 72# undef RPC_DEBUG_DATA 73# define RPCDBG_FACILITY RPCDBG_XPRT 74#endif 75 76#define XPRT_MAX_BACKOFF (8) 77#define XPRT_IDLE_TIMEOUT (5*60*HZ) 78#define XPRT_MAX_RESVPORT (800) 79 80/* 81 * Local functions 82 */ 83static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 84static inline void do_xprt_reserve(struct rpc_task *); 85static void xprt_disconnect(struct rpc_xprt *); 86static void xprt_connect_status(struct rpc_task *task); 87static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap, 88 struct rpc_timeout *to); 89static struct socket *xprt_create_socket(struct rpc_xprt *, int, int); 90static void xprt_bind_socket(struct rpc_xprt *, struct socket *); 91static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 92 93static int xprt_clear_backlog(struct rpc_xprt *xprt); 94 95#ifdef RPC_DEBUG_DATA 96/* 97 * Print the buffer contents (first 128 bytes only--just enough for 98 * diropres return). 99 */ 100static void 101xprt_pktdump(char *msg, u32 *packet, unsigned int count) 102{ 103 u8 *buf = (u8 *) packet; 104 int j; 105 106 dprintk("RPC: %s\n", msg); 107 for (j = 0; j < count && j < 128; j += 4) { 108 if (!(j & 31)) { 109 if (j) 110 dprintk("\n"); 111 dprintk("0x%04x ", j); 112 } 113 dprintk("%02x%02x%02x%02x ", 114 buf[j], buf[j+1], buf[j+2], buf[j+3]); 115 } 116 dprintk("\n"); 117} 118#else 119static inline void 120xprt_pktdump(char *msg, u32 *packet, unsigned int count) 121{ 122 /* NOP */ 123} 124#endif 125 126/* 127 * Look up RPC transport given an INET socket 128 */ 129static inline struct rpc_xprt * 130xprt_from_sock(struct sock *sk) 131{ 132 return (struct rpc_xprt *) sk->sk_user_data; 133} 134 135/* 136 * Serialize write access to sockets, in order to prevent different 137 * requests from interfering with each other. 138 * Also prevents TCP socket connects from colliding with writes. 139 */ 140static int 141__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 142{ 143 struct rpc_rqst *req = task->tk_rqstp; 144 145 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) { 146 if (task == xprt->snd_task) 147 return 1; 148 goto out_sleep; 149 } 150 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 151 xprt->snd_task = task; 152 if (req) { 153 req->rq_bytes_sent = 0; 154 req->rq_ntrans++; 155 } 156 return 1; 157 } 158 smp_mb__before_clear_bit(); 159 clear_bit(XPRT_LOCKED, &xprt->sockstate); 160 smp_mb__after_clear_bit(); 161out_sleep: 162 dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt); 163 task->tk_timeout = 0; 164 task->tk_status = -EAGAIN; 165 if (req && req->rq_ntrans) 166 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 167 else 168 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 169 return 0; 170} 171 172static inline int 173xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 174{ 175 int retval; 176 177 spin_lock_bh(&xprt->sock_lock); 178 retval = __xprt_lock_write(xprt, task); 179 spin_unlock_bh(&xprt->sock_lock); 180 return retval; 181} 182 183 184static void 185__xprt_lock_write_next(struct rpc_xprt *xprt) 186{ 187 struct rpc_task *task; 188 189 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) 190 return; 191 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) 192 goto out_unlock; 193 task = rpc_wake_up_next(&xprt->resend); 194 if (!task) { 195 task = rpc_wake_up_next(&xprt->sending); 196 if (!task) 197 goto out_unlock; 198 } 199 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 200 struct rpc_rqst *req = task->tk_rqstp; 201 xprt->snd_task = task; 202 if (req) { 203 req->rq_bytes_sent = 0; 204 req->rq_ntrans++; 205 } 206 return; 207 } 208out_unlock: 209 smp_mb__before_clear_bit(); 210 clear_bit(XPRT_LOCKED, &xprt->sockstate); 211 smp_mb__after_clear_bit(); 212} 213 214/* 215 * Releases the socket for use by other requests. 216 */ 217static void 218__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 219{ 220 if (xprt->snd_task == task) { 221 xprt->snd_task = NULL; 222 smp_mb__before_clear_bit(); 223 clear_bit(XPRT_LOCKED, &xprt->sockstate); 224 smp_mb__after_clear_bit(); 225 __xprt_lock_write_next(xprt); 226 } 227} 228 229static inline void 230xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 231{ 232 spin_lock_bh(&xprt->sock_lock); 233 __xprt_release_write(xprt, task); 234 spin_unlock_bh(&xprt->sock_lock); 235} 236 237/* 238 * Write data to socket. 239 */ 240static inline int 241xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) 242{ 243 struct socket *sock = xprt->sock; 244 struct xdr_buf *xdr = &req->rq_snd_buf; 245 struct sockaddr *addr = NULL; 246 int addrlen = 0; 247 unsigned int skip; 248 int result; 249 250 if (!sock) 251 return -ENOTCONN; 252 253 xprt_pktdump("packet data:", 254 req->rq_svec->iov_base, 255 req->rq_svec->iov_len); 256 257 /* For UDP, we need to provide an address */ 258 if (!xprt->stream) { 259 addr = (struct sockaddr *) &xprt->addr; 260 addrlen = sizeof(xprt->addr); 261 } 262 /* Dont repeat bytes */ 263 skip = req->rq_bytes_sent; 264 265 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 266 result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); 267 268 dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); 269 270 if (result >= 0) 271 return result; 272 273 switch (result) { 274 case -ECONNREFUSED: 275 /* When the server has died, an ICMP port unreachable message 276 * prompts ECONNREFUSED. 277 */ 278 case -EAGAIN: 279 break; 280 case -ECONNRESET: 281 case -ENOTCONN: 282 case -EPIPE: 283 /* connection broken */ 284 if (xprt->stream) 285 result = -ENOTCONN; 286 break; 287 default: 288 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); 289 } 290 return result; 291} 292 293/* 294 * Van Jacobson congestion avoidance. Check if the congestion window 295 * overflowed. Put the task to sleep if this is the case. 296 */ 297static int 298__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 299{ 300 struct rpc_rqst *req = task->tk_rqstp; 301 302 if (req->rq_cong) 303 return 1; 304 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", 305 task->tk_pid, xprt->cong, xprt->cwnd); 306 if (RPCXPRT_CONGESTED(xprt)) 307 return 0; 308 req->rq_cong = 1; 309 xprt->cong += RPC_CWNDSCALE; 310 return 1; 311} 312 313/* 314 * Adjust the congestion window, and wake up the next task 315 * that has been sleeping due to congestion 316 */ 317static void 318__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 319{ 320 if (!req->rq_cong) 321 return; 322 req->rq_cong = 0; 323 xprt->cong -= RPC_CWNDSCALE; 324 __xprt_lock_write_next(xprt); 325} 326 327/* 328 * Adjust RPC congestion window 329 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 330 */ 331static void 332xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) 333{ 334 unsigned long cwnd; 335 336 cwnd = xprt->cwnd; 337 if (result >= 0 && cwnd <= xprt->cong) { 338 /* The (cwnd >> 1) term makes sure 339 * the result gets rounded properly. */ 340 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 341 if (cwnd > RPC_MAXCWND(xprt)) 342 cwnd = RPC_MAXCWND(xprt); 343 __xprt_lock_write_next(xprt); 344 } else if (result == -ETIMEDOUT) { 345 cwnd >>= 1; 346 if (cwnd < RPC_CWNDSCALE) 347 cwnd = RPC_CWNDSCALE; 348 } 349 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 350 xprt->cong, xprt->cwnd, cwnd); 351 xprt->cwnd = cwnd; 352} 353 354/* 355 * Reset the major timeout value 356 */ 357static void xprt_reset_majortimeo(struct rpc_rqst *req) 358{ 359 struct rpc_timeout *to = &req->rq_xprt->timeout; 360 361 req->rq_majortimeo = req->rq_timeout; 362 if (to->to_exponential) 363 req->rq_majortimeo <<= to->to_retries; 364 else 365 req->rq_majortimeo += to->to_increment * to->to_retries; 366 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 367 req->rq_majortimeo = to->to_maxval; 368 req->rq_majortimeo += jiffies; 369} 370 371/* 372 * Adjust timeout values etc for next retransmit 373 */ 374int xprt_adjust_timeout(struct rpc_rqst *req) 375{ 376 struct rpc_xprt *xprt = req->rq_xprt; 377 struct rpc_timeout *to = &xprt->timeout; 378 int status = 0; 379 380 if (time_before(jiffies, req->rq_majortimeo)) { 381 if (to->to_exponential) 382 req->rq_timeout <<= 1; 383 else 384 req->rq_timeout += to->to_increment; 385 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 386 req->rq_timeout = to->to_maxval; 387 req->rq_retries++; 388 pprintk("RPC: %lu retrans\n", jiffies); 389 } else { 390 req->rq_timeout = to->to_initval; 391 req->rq_retries = 0; 392 xprt_reset_majortimeo(req); 393 /* Reset the RTT counters == "slow start" */ 394 spin_lock_bh(&xprt->sock_lock); 395 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 396 spin_unlock_bh(&xprt->sock_lock); 397 pprintk("RPC: %lu timeout\n", jiffies); 398 status = -ETIMEDOUT; 399 } 400 401 if (req->rq_timeout == 0) { 402 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 403 req->rq_timeout = 5 * HZ; 404 } 405 return status; 406} 407 408/* 409 * Close down a transport socket 410 */ 411static void 412xprt_close(struct rpc_xprt *xprt) 413{ 414 struct socket *sock = xprt->sock; 415 struct sock *sk = xprt->inet; 416 417 if (!sk) 418 return; 419 420 write_lock_bh(&sk->sk_callback_lock); 421 xprt->inet = NULL; 422 xprt->sock = NULL; 423 424 sk->sk_user_data = NULL; 425 sk->sk_data_ready = xprt->old_data_ready; 426 sk->sk_state_change = xprt->old_state_change; 427 sk->sk_write_space = xprt->old_write_space; 428 write_unlock_bh(&sk->sk_callback_lock); 429 430 sk->sk_no_check = 0; 431 432 sock_release(sock); 433} 434 435static void 436xprt_socket_autoclose(void *args) 437{ 438 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 439 440 xprt_disconnect(xprt); 441 xprt_close(xprt); 442 xprt_release_write(xprt, NULL); 443} 444 445/* 446 * Mark a transport as disconnected 447 */ 448static void 449xprt_disconnect(struct rpc_xprt *xprt) 450{ 451 dprintk("RPC: disconnected transport %p\n", xprt); 452 spin_lock_bh(&xprt->sock_lock); 453 xprt_clear_connected(xprt); 454 rpc_wake_up_status(&xprt->pending, -ENOTCONN); 455 spin_unlock_bh(&xprt->sock_lock); 456} 457 458/* 459 * Used to allow disconnection when we've been idle 460 */ 461static void 462xprt_init_autodisconnect(unsigned long data) 463{ 464 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 465 466 spin_lock(&xprt->sock_lock); 467 if (!list_empty(&xprt->recv) || xprt->shutdown) 468 goto out_abort; 469 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) 470 goto out_abort; 471 spin_unlock(&xprt->sock_lock); 472 /* Let keventd close the socket */ 473 if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0) 474 xprt_release_write(xprt, NULL); 475 else 476 schedule_work(&xprt->task_cleanup); 477 return; 478out_abort: 479 spin_unlock(&xprt->sock_lock); 480} 481 482static void xprt_socket_connect(void *args) 483{ 484 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 485 struct socket *sock = xprt->sock; 486 int status = -EIO; 487 488 if (xprt->shutdown || xprt->addr.sin_port == 0) 489 goto out; 490 491 /* 492 * Start by resetting any existing state 493 */ 494 xprt_close(xprt); 495 sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); 496 if (sock == NULL) { 497 /* couldn't create socket or bind to reserved port; 498 * this is likely a permanent error, so cause an abort */ 499 goto out; 500 } 501 xprt_bind_socket(xprt, sock); 502 xprt_sock_setbufsize(xprt); 503 504 status = 0; 505 if (!xprt->stream) 506 goto out; 507 508 /* 509 * Tell the socket layer to start connecting... 510 */ 511 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, 512 sizeof(xprt->addr), O_NONBLOCK); 513 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 514 xprt, -status, xprt_connected(xprt), sock->sk->sk_state); 515 if (status < 0) { 516 switch (status) { 517 case -EINPROGRESS: 518 case -EALREADY: 519 goto out_clear; 520 } 521 } 522out: 523 if (status < 0) 524 rpc_wake_up_status(&xprt->pending, status); 525 else 526 rpc_wake_up(&xprt->pending); 527out_clear: 528 smp_mb__before_clear_bit(); 529 clear_bit(XPRT_CONNECTING, &xprt->sockstate); 530 smp_mb__after_clear_bit(); 531} 532 533/* 534 * Attempt to connect a TCP socket. 535 * 536 */ 537void xprt_connect(struct rpc_task *task) 538{ 539 struct rpc_xprt *xprt = task->tk_xprt; 540 541 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid, 542 xprt, (xprt_connected(xprt) ? "is" : "is not")); 543 544 if (xprt->shutdown) { 545 task->tk_status = -EIO; 546 return; 547 } 548 if (!xprt->addr.sin_port) { 549 task->tk_status = -EIO; 550 return; 551 } 552 if (!xprt_lock_write(xprt, task)) 553 return; 554 if (xprt_connected(xprt)) 555 goto out_write; 556 557 if (task->tk_rqstp) 558 task->tk_rqstp->rq_bytes_sent = 0; 559 560 task->tk_timeout = RPC_CONNECT_TIMEOUT; 561 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); 562 if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { 563 /* Note: if we are here due to a dropped connection 564 * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ 565 * seconds 566 */ 567 if (xprt->sock != NULL) 568 schedule_delayed_work(&xprt->sock_connect, 569 RPC_REESTABLISH_TIMEOUT); 570 else { 571 schedule_work(&xprt->sock_connect); 572 if (!RPC_IS_ASYNC(task)) 573 flush_scheduled_work(); 574 } 575 } 576 return; 577 out_write: 578 xprt_release_write(xprt, task); 579} 580 581/* 582 * We arrive here when awoken from waiting on connection establishment. 583 */ 584static void 585xprt_connect_status(struct rpc_task *task) 586{ 587 struct rpc_xprt *xprt = task->tk_xprt; 588 589 if (task->tk_status >= 0) { 590 dprintk("RPC: %4d xprt_connect_status: connection established\n", 591 task->tk_pid); 592 return; 593 } 594 595 switch (task->tk_status) { 596 case -ECONNREFUSED: 597 case -ECONNRESET: 598 dprintk("RPC: %4d xprt_connect_status: server %s refused connection\n", 599 task->tk_pid, task->tk_client->cl_server); 600 break; 601 case -ENOTCONN: 602 dprintk("RPC: %4d xprt_connect_status: connection broken\n", 603 task->tk_pid); 604 break; 605 case -ETIMEDOUT: 606 dprintk("RPC: %4d xprt_connect_status: connect attempt timed out\n", 607 task->tk_pid); 608 break; 609 default: 610 dprintk("RPC: %4d xprt_connect_status: error %d connecting to server %s\n", 611 task->tk_pid, -task->tk_status, task->tk_client->cl_server); 612 xprt_release_write(xprt, task); 613 task->tk_status = -EIO; 614 return; 615 } 616 617 /* if soft mounted, just cause this RPC to fail */ 618 if (RPC_IS_SOFT(task)) { 619 xprt_release_write(xprt, task); 620 task->tk_status = -EIO; 621 } 622} 623 624/* 625 * Look up the RPC request corresponding to a reply, and then lock it. 626 */ 627static inline struct rpc_rqst * 628xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) 629{ 630 struct list_head *pos; 631 struct rpc_rqst *req = NULL; 632 633 list_for_each(pos, &xprt->recv) { 634 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); 635 if (entry->rq_xid == xid) { 636 req = entry; 637 break; 638 } 639 } 640 return req; 641} 642 643/* 644 * Complete reply received. 645 * The TCP code relies on us to remove the request from xprt->pending. 646 */ 647static void 648xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) 649{ 650 struct rpc_task *task = req->rq_task; 651 struct rpc_clnt *clnt = task->tk_client; 652 653 /* Adjust congestion window */ 654 if (!xprt->nocong) { 655 unsigned timer = task->tk_msg.rpc_proc->p_timer; 656 xprt_adjust_cwnd(xprt, copied); 657 __xprt_put_cong(xprt, req); 658 if (timer) { 659 if (req->rq_ntrans == 1) 660 rpc_update_rtt(clnt->cl_rtt, timer, 661 (long)jiffies - req->rq_xtime); 662 rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); 663 } 664 } 665 666#ifdef RPC_PROFILE 667 /* Profile only reads for now */ 668 if (copied > 1024) { 669 static unsigned long nextstat; 670 static unsigned long pkt_rtt, pkt_len, pkt_cnt; 671 672 pkt_cnt++; 673 pkt_len += req->rq_slen + copied; 674 pkt_rtt += jiffies - req->rq_xtime; 675 if (time_before(nextstat, jiffies)) { 676 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd); 677 printk("RPC: %ld %ld %ld %ld stat\n", 678 jiffies, pkt_cnt, pkt_len, pkt_rtt); 679 pkt_rtt = pkt_len = pkt_cnt = 0; 680 nextstat = jiffies + 5 * HZ; 681 } 682 } 683#endif 684 685 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); 686 list_del_init(&req->rq_list); 687 req->rq_received = req->rq_private_buf.len = copied; 688 689 /* ... and wake up the process. */ 690 rpc_wake_up_task(task); 691 return; 692} 693 694static size_t 695skb_read_bits(skb_reader_t *desc, void *to, size_t len) 696{ 697 if (len > desc->count) 698 len = desc->count; 699 if (skb_copy_bits(desc->skb, desc->offset, to, len)) 700 return 0; 701 desc->count -= len; 702 desc->offset += len; 703 return len; 704} 705 706static size_t 707skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len) 708{ 709 unsigned int csum2, pos; 710 711 if (len > desc->count) 712 len = desc->count; 713 pos = desc->offset; 714 csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0); 715 desc->csum = csum_block_add(desc->csum, csum2, pos); 716 desc->count -= len; 717 desc->offset += len; 718 return len; 719} 720 721/* 722 * We have set things up such that we perform the checksum of the UDP 723 * packet in parallel with the copies into the RPC client iovec. -DaveM 724 */ 725int 726csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) 727{ 728 skb_reader_t desc; 729 730 desc.skb = skb; 731 desc.offset = sizeof(struct udphdr); 732 desc.count = skb->len - desc.offset; 733 734 if (skb->ip_summed == CHECKSUM_UNNECESSARY) 735 goto no_checksum; 736 737 desc.csum = csum_partial(skb->data, desc.offset, skb->csum); 738 if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits) < 0) 739 return -1; 740 if (desc.offset != skb->len) { 741 unsigned int csum2; 742 csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0); 743 desc.csum = csum_block_add(desc.csum, csum2, desc.offset); 744 } 745 if (desc.count) 746 return -1; 747 if ((unsigned short)csum_fold(desc.csum)) 748 return -1; 749 return 0; 750no_checksum: 751 if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits) < 0) 752 return -1; 753 if (desc.count) 754 return -1; 755 return 0; 756} 757 758/* 759 * Input handler for RPC replies. Called from a bottom half and hence 760 * atomic. 761 */ 762static void 763udp_data_ready(struct sock *sk, int len) 764{ 765 struct rpc_task *task; 766 struct rpc_xprt *xprt; 767 struct rpc_rqst *rovr; 768 struct sk_buff *skb; 769 int err, repsize, copied; 770 u32 _xid, *xp; 771 772 read_lock(&sk->sk_callback_lock); 773 dprintk("RPC: udp_data_ready...\n"); 774 if (!(xprt = xprt_from_sock(sk))) { 775 printk("RPC: udp_data_ready request not found!\n"); 776 goto out; 777 } 778 779 dprintk("RPC: udp_data_ready client %p\n", xprt); 780 781 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 782 goto out; 783 784 if (xprt->shutdown) 785 goto dropit; 786 787 repsize = skb->len - sizeof(struct udphdr); 788 if (repsize < 4) { 789 printk("RPC: impossible RPC reply size %d!\n", repsize); 790 goto dropit; 791 } 792 793 /* Copy the XID from the skb... */ 794 xp = skb_header_pointer(skb, sizeof(struct udphdr), 795 sizeof(_xid), &_xid); 796 if (xp == NULL) 797 goto dropit; 798 799 /* Look up and lock the request corresponding to the given XID */ 800 spin_lock(&xprt->sock_lock); 801 rovr = xprt_lookup_rqst(xprt, *xp); 802 if (!rovr) 803 goto out_unlock; 804 task = rovr->rq_task; 805 806 dprintk("RPC: %4d received reply\n", task->tk_pid); 807 808 if ((copied = rovr->rq_private_buf.buflen) > repsize) 809 copied = repsize; 810 811 /* Suck it into the iovec, verify checksum if not done by hw. */ 812 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) 813 goto out_unlock; 814 815 /* Something worked... */ 816 dst_confirm(skb->dst); 817 818 xprt_complete_rqst(xprt, rovr, copied); 819 820 out_unlock: 821 spin_unlock(&xprt->sock_lock); 822 dropit: 823 skb_free_datagram(sk, skb); 824 out: 825 read_unlock(&sk->sk_callback_lock); 826} 827 828/* 829 * Copy from an skb into memory and shrink the skb. 830 */ 831static inline size_t 832tcp_copy_data(skb_reader_t *desc, void *p, size_t len) 833{ 834 if (len > desc->count) 835 len = desc->count; 836 if (skb_copy_bits(desc->skb, desc->offset, p, len)) { 837 dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", 838 len, desc->count); 839 return 0; 840 } 841 desc->offset += len; 842 desc->count -= len; 843 dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", 844 len, desc->count); 845 return len; 846} 847 848/* 849 * TCP read fragment marker 850 */ 851static inline void 852tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) 853{ 854 size_t len, used; 855 char *p; 856 857 p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; 858 len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; 859 used = tcp_copy_data(desc, p, len); 860 xprt->tcp_offset += used; 861 if (used != len) 862 return; 863 xprt->tcp_reclen = ntohl(xprt->tcp_recm); 864 if (xprt->tcp_reclen & 0x80000000) 865 xprt->tcp_flags |= XPRT_LAST_FRAG; 866 else 867 xprt->tcp_flags &= ~XPRT_LAST_FRAG; 868 xprt->tcp_reclen &= 0x7fffffff; 869 xprt->tcp_flags &= ~XPRT_COPY_RECM; 870 xprt->tcp_offset = 0; 871 /* Sanity check of the record length */ 872 if (xprt->tcp_reclen < 4) { 873 printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); 874 xprt_disconnect(xprt); 875 } 876 dprintk("RPC: reading TCP record fragment of length %d\n", 877 xprt->tcp_reclen); 878} 879 880static void 881tcp_check_recm(struct rpc_xprt *xprt) 882{ 883 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", 884 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); 885 if (xprt->tcp_offset == xprt->tcp_reclen) { 886 xprt->tcp_flags |= XPRT_COPY_RECM; 887 xprt->tcp_offset = 0; 888 if (xprt->tcp_flags & XPRT_LAST_FRAG) { 889 xprt->tcp_flags &= ~XPRT_COPY_DATA; 890 xprt->tcp_flags |= XPRT_COPY_XID; 891 xprt->tcp_copied = 0; 892 } 893 } 894} 895 896/* 897 * TCP read xid 898 */ 899static inline void 900tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) 901{ 902 size_t len, used; 903 char *p; 904 905 len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; 906 dprintk("RPC: reading XID (%Zu bytes)\n", len); 907 p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; 908 used = tcp_copy_data(desc, p, len); 909 xprt->tcp_offset += used; 910 if (used != len) 911 return; 912 xprt->tcp_flags &= ~XPRT_COPY_XID; 913 xprt->tcp_flags |= XPRT_COPY_DATA; 914 xprt->tcp_copied = 4; 915 dprintk("RPC: reading reply for XID %08x\n", 916 ntohl(xprt->tcp_xid)); 917 tcp_check_recm(xprt); 918} 919 920/* 921 * TCP read and complete request 922 */ 923static inline void 924tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) 925{ 926 struct rpc_rqst *req; 927 struct xdr_buf *rcvbuf; 928 size_t len; 929 ssize_t r; 930 931 /* Find and lock the request corresponding to this xid */ 932 spin_lock(&xprt->sock_lock); 933 req = xprt_lookup_rqst(xprt, xprt->tcp_xid); 934 if (!req) { 935 xprt->tcp_flags &= ~XPRT_COPY_DATA; 936 dprintk("RPC: XID %08x request not found!\n", 937 ntohl(xprt->tcp_xid)); 938 spin_unlock(&xprt->sock_lock); 939 return; 940 } 941 942 rcvbuf = &req->rq_private_buf; 943 len = desc->count; 944 if (len > xprt->tcp_reclen - xprt->tcp_offset) { 945 skb_reader_t my_desc; 946 947 len = xprt->tcp_reclen - xprt->tcp_offset; 948 memcpy(&my_desc, desc, sizeof(my_desc)); 949 my_desc.count = len; 950 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 951 &my_desc, tcp_copy_data); 952 desc->count -= r; 953 desc->offset += r; 954 } else 955 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 956 desc, tcp_copy_data); 957 958 if (r > 0) { 959 xprt->tcp_copied += r; 960 xprt->tcp_offset += r; 961 } 962 if (r != len) { 963 /* Error when copying to the receive buffer, 964 * usually because we weren't able to allocate 965 * additional buffer pages. All we can do now 966 * is turn off XPRT_COPY_DATA, so the request 967 * will not receive any additional updates, 968 * and time out. 969 * Any remaining data from this record will 970 * be discarded. 971 */ 972 xprt->tcp_flags &= ~XPRT_COPY_DATA; 973 dprintk("RPC: XID %08x truncated request\n", 974 ntohl(xprt->tcp_xid)); 975 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 976 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 977 goto out; 978 } 979 980 dprintk("RPC: XID %08x read %Zd bytes\n", 981 ntohl(xprt->tcp_xid), r); 982 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 983 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 984 985 if (xprt->tcp_copied == req->rq_private_buf.buflen) 986 xprt->tcp_flags &= ~XPRT_COPY_DATA; 987 else if (xprt->tcp_offset == xprt->tcp_reclen) { 988 if (xprt->tcp_flags & XPRT_LAST_FRAG) 989 xprt->tcp_flags &= ~XPRT_COPY_DATA; 990 } 991 992out: 993 if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { 994 dprintk("RPC: %4d received reply complete\n", 995 req->rq_task->tk_pid); 996 xprt_complete_rqst(xprt, req, xprt->tcp_copied); 997 } 998 spin_unlock(&xprt->sock_lock); 999 tcp_check_recm(xprt); 1000} 1001 1002/* 1003 * TCP discard extra bytes from a short read 1004 */ 1005static inline void 1006tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) 1007{ 1008 size_t len; 1009 1010 len = xprt->tcp_reclen - xprt->tcp_offset; 1011 if (len > desc->count) 1012 len = desc->count; 1013 desc->count -= len; 1014 desc->offset += len; 1015 xprt->tcp_offset += len; 1016 dprintk("RPC: discarded %Zu bytes\n", len); 1017 tcp_check_recm(xprt); 1018} 1019 1020/* 1021 * TCP record receive routine 1022 * We first have to grab the record marker, then the XID, then the data. 1023 */ 1024static int 1025tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, 1026 unsigned int offset, size_t len) 1027{ 1028 struct rpc_xprt *xprt = rd_desc->arg.data; 1029 skb_reader_t desc = { 1030 .skb = skb, 1031 .offset = offset, 1032 .count = len, 1033 .csum = 0 1034 }; 1035 1036 dprintk("RPC: tcp_data_recv\n"); 1037 do { 1038 /* Read in a new fragment marker if necessary */ 1039 /* Can we ever really expect to get completely empty fragments? */ 1040 if (xprt->tcp_flags & XPRT_COPY_RECM) { 1041 tcp_read_fraghdr(xprt, &desc); 1042 continue; 1043 } 1044 /* Read in the xid if necessary */ 1045 if (xprt->tcp_flags & XPRT_COPY_XID) { 1046 tcp_read_xid(xprt, &desc); 1047 continue; 1048 } 1049 /* Read in the request data */ 1050 if (xprt->tcp_flags & XPRT_COPY_DATA) { 1051 tcp_read_request(xprt, &desc); 1052 continue; 1053 } 1054 /* Skip over any trailing bytes on short reads */ 1055 tcp_read_discard(xprt, &desc); 1056 } while (desc.count); 1057 dprintk("RPC: tcp_data_recv done\n"); 1058 return len - desc.count; 1059} 1060 1061static void tcp_data_ready(struct sock *sk, int bytes) 1062{ 1063 struct rpc_xprt *xprt; 1064 read_descriptor_t rd_desc; 1065 1066 read_lock(&sk->sk_callback_lock); 1067 dprintk("RPC: tcp_data_ready...\n"); 1068 if (!(xprt = xprt_from_sock(sk))) { 1069 printk("RPC: tcp_data_ready socket info not found!\n"); 1070 goto out; 1071 } 1072 if (xprt->shutdown) 1073 goto out; 1074 1075 /* We use rd_desc to pass struct xprt to tcp_data_recv */ 1076 rd_desc.arg.data = xprt; 1077 rd_desc.count = 65536; 1078 tcp_read_sock(sk, &rd_desc, tcp_data_recv); 1079out: 1080 read_unlock(&sk->sk_callback_lock); 1081} 1082 1083static void 1084tcp_state_change(struct sock *sk) 1085{ 1086 struct rpc_xprt *xprt; 1087 1088 read_lock(&sk->sk_callback_lock); 1089 if (!(xprt = xprt_from_sock(sk))) 1090 goto out; 1091 dprintk("RPC: tcp_state_change client %p...\n", xprt); 1092 dprintk("RPC: state %x conn %d dead %d zapped %d\n", 1093 sk->sk_state, xprt_connected(xprt), 1094 sock_flag(sk, SOCK_DEAD), 1095 sock_flag(sk, SOCK_ZAPPED)); 1096 1097 switch (sk->sk_state) { 1098 case TCP_ESTABLISHED: 1099 spin_lock_bh(&xprt->sock_lock); 1100 if (!xprt_test_and_set_connected(xprt)) { 1101 /* Reset TCP record info */ 1102 xprt->tcp_offset = 0; 1103 xprt->tcp_reclen = 0; 1104 xprt->tcp_copied = 0; 1105 xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; 1106 rpc_wake_up(&xprt->pending); 1107 } 1108 spin_unlock_bh(&xprt->sock_lock); 1109 break; 1110 case TCP_SYN_SENT: 1111 case TCP_SYN_RECV: 1112 break; 1113 default: 1114 xprt_disconnect(xprt); 1115 break; 1116 } 1117 out: 1118 read_unlock(&sk->sk_callback_lock); 1119} 1120 1121/* 1122 * Called when more output buffer space is available for this socket. 1123 * We try not to wake our writers until they can make "significant" 1124 * progress, otherwise we'll waste resources thrashing sock_sendmsg 1125 * with a bunch of small requests. 1126 */ 1127static void 1128xprt_write_space(struct sock *sk) 1129{ 1130 struct rpc_xprt *xprt; 1131 struct socket *sock; 1132 1133 read_lock(&sk->sk_callback_lock); 1134 if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) 1135 goto out; 1136 if (xprt->shutdown) 1137 goto out; 1138 1139 /* Wait until we have enough socket memory */ 1140 if (xprt->stream) { 1141 /* from net/core/stream.c:sk_stream_write_space */ 1142 if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) 1143 goto out; 1144 } else { 1145 /* from net/core/sock.c:sock_def_write_space */ 1146 if (!sock_writeable(sk)) 1147 goto out; 1148 } 1149 1150 if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) 1151 goto out; 1152 1153 spin_lock_bh(&xprt->sock_lock); 1154 if (xprt->snd_task) 1155 rpc_wake_up_task(xprt->snd_task); 1156 spin_unlock_bh(&xprt->sock_lock); 1157out: 1158 read_unlock(&sk->sk_callback_lock); 1159} 1160 1161/* 1162 * RPC receive timeout handler. 1163 */ 1164static void 1165xprt_timer(struct rpc_task *task) 1166{ 1167 struct rpc_rqst *req = task->tk_rqstp; 1168 struct rpc_xprt *xprt = req->rq_xprt; 1169 1170 spin_lock(&xprt->sock_lock); 1171 if (req->rq_received) 1172 goto out; 1173 1174 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); 1175 __xprt_put_cong(xprt, req); 1176 1177 dprintk("RPC: %4d xprt_timer (%s request)\n", 1178 task->tk_pid, req ? "pending" : "backlogged"); 1179 1180 task->tk_status = -ETIMEDOUT; 1181out: 1182 task->tk_timeout = 0; 1183 rpc_wake_up_task(task); 1184 spin_unlock(&xprt->sock_lock); 1185} 1186 1187/* 1188 * Place the actual RPC call. 1189 * We have to copy the iovec because sendmsg fiddles with its contents. 1190 */ 1191int 1192xprt_prepare_transmit(struct rpc_task *task) 1193{ 1194 struct rpc_rqst *req = task->tk_rqstp; 1195 struct rpc_xprt *xprt = req->rq_xprt; 1196 int err = 0; 1197 1198 dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid); 1199 1200 if (xprt->shutdown) 1201 return -EIO; 1202 1203 spin_lock_bh(&xprt->sock_lock); 1204 if (req->rq_received && !req->rq_bytes_sent) { 1205 err = req->rq_received; 1206 goto out_unlock; 1207 } 1208 if (!__xprt_lock_write(xprt, task)) { 1209 err = -EAGAIN; 1210 goto out_unlock; 1211 } 1212 1213 if (!xprt_connected(xprt)) { 1214 err = -ENOTCONN; 1215 goto out_unlock; 1216 } 1217out_unlock: 1218 spin_unlock_bh(&xprt->sock_lock); 1219 return err; 1220} 1221 1222void 1223xprt_transmit(struct rpc_task *task) 1224{ 1225 struct rpc_clnt *clnt = task->tk_client; 1226 struct rpc_rqst *req = task->tk_rqstp; 1227 struct rpc_xprt *xprt = req->rq_xprt; 1228 int status, retry = 0; 1229 1230 1231 dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 1232 1233 /* set up everything as needed. */ 1234 /* Write the record marker */ 1235 if (xprt->stream) { 1236 u32 *marker = req->rq_svec[0].iov_base; 1237 1238 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); 1239 } 1240 1241 smp_rmb(); 1242 if (!req->rq_received) { 1243 if (list_empty(&req->rq_list)) { 1244 spin_lock_bh(&xprt->sock_lock); 1245 /* Update the softirq receive buffer */ 1246 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1247 sizeof(req->rq_private_buf)); 1248 /* Add request to the receive list */ 1249 list_add_tail(&req->rq_list, &xprt->recv); 1250 spin_unlock_bh(&xprt->sock_lock); 1251 xprt_reset_majortimeo(req); 1252 /* Turn off autodisconnect */ 1253 del_singleshot_timer_sync(&xprt->timer); 1254 } 1255 } else if (!req->rq_bytes_sent) 1256 return; 1257 1258 /* Continue transmitting the packet/record. We must be careful 1259 * to cope with writespace callbacks arriving _after_ we have 1260 * called xprt_sendmsg(). 1261 */ 1262 while (1) { 1263 req->rq_xtime = jiffies; 1264 status = xprt_sendmsg(xprt, req); 1265 1266 if (status < 0) 1267 break; 1268 1269 if (xprt->stream) { 1270 req->rq_bytes_sent += status; 1271 1272 /* If we've sent the entire packet, immediately 1273 * reset the count of bytes sent. */ 1274 if (req->rq_bytes_sent >= req->rq_slen) { 1275 req->rq_bytes_sent = 0; 1276 goto out_receive; 1277 } 1278 } else { 1279 if (status >= req->rq_slen) 1280 goto out_receive; 1281 status = -EAGAIN; 1282 break; 1283 } 1284 1285 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", 1286 task->tk_pid, req->rq_slen - req->rq_bytes_sent, 1287 req->rq_slen); 1288 1289 status = -EAGAIN; 1290 if (retry++ > 50) 1291 break; 1292 } 1293 1294 /* Note: at this point, task->tk_sleeping has not yet been set, 1295 * hence there is no danger of the waking up task being put on 1296 * schedq, and being picked up by a parallel run of rpciod(). 1297 */ 1298 task->tk_status = status; 1299 1300 switch (status) { 1301 case -EAGAIN: 1302 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { 1303 /* Protect against races with xprt_write_space */ 1304 spin_lock_bh(&xprt->sock_lock); 1305 /* Don't race with disconnect */ 1306 if (!xprt_connected(xprt)) 1307 task->tk_status = -ENOTCONN; 1308 else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { 1309 task->tk_timeout = req->rq_timeout; 1310 rpc_sleep_on(&xprt->pending, task, NULL, NULL); 1311 } 1312 spin_unlock_bh(&xprt->sock_lock); 1313 return; 1314 } 1315 /* Keep holding the socket if it is blocked */ 1316 rpc_delay(task, HZ>>4); 1317 return; 1318 case -ECONNREFUSED: 1319 task->tk_timeout = RPC_REESTABLISH_TIMEOUT; 1320 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 1321 case -ENOTCONN: 1322 return; 1323 default: 1324 if (xprt->stream) 1325 xprt_disconnect(xprt); 1326 } 1327 xprt_release_write(xprt, task); 1328 return; 1329 out_receive: 1330 dprintk("RPC: %4d xmit complete\n", task->tk_pid); 1331 /* Set the task's receive timeout value */ 1332 spin_lock_bh(&xprt->sock_lock); 1333 if (!xprt->nocong) { 1334 int timer = task->tk_msg.rpc_proc->p_timer; 1335 task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer); 1336 task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries; 1337 if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0) 1338 task->tk_timeout = xprt->timeout.to_maxval; 1339 } else 1340 task->tk_timeout = req->rq_timeout; 1341 /* Don't race with disconnect */ 1342 if (!xprt_connected(xprt)) 1343 task->tk_status = -ENOTCONN; 1344 else if (!req->rq_received) 1345 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); 1346 __xprt_release_write(xprt, task); 1347 spin_unlock_bh(&xprt->sock_lock); 1348} 1349 1350/* 1351 * Reserve an RPC call slot. 1352 */ 1353static inline void 1354do_xprt_reserve(struct rpc_task *task) 1355{ 1356 struct rpc_xprt *xprt = task->tk_xprt; 1357 1358 task->tk_status = 0; 1359 if (task->tk_rqstp) 1360 return; 1361 if (!list_empty(&xprt->free)) { 1362 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1363 list_del_init(&req->rq_list); 1364 task->tk_rqstp = req; 1365 xprt_request_init(task, xprt); 1366 return; 1367 } 1368 dprintk("RPC: waiting for request slot\n"); 1369 task->tk_status = -EAGAIN; 1370 task->tk_timeout = 0; 1371 rpc_sleep_on(&xprt->backlog, task, NULL, NULL); 1372} 1373 1374void 1375xprt_reserve(struct rpc_task *task) 1376{ 1377 struct rpc_xprt *xprt = task->tk_xprt; 1378 1379 task->tk_status = -EIO; 1380 if (!xprt->shutdown) { 1381 spin_lock(&xprt->xprt_lock); 1382 do_xprt_reserve(task); 1383 spin_unlock(&xprt->xprt_lock); 1384 } 1385} 1386 1387/* 1388 * Allocate a 'unique' XID 1389 */ 1390static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) 1391{ 1392 return xprt->xid++; 1393} 1394 1395static inline void xprt_init_xid(struct rpc_xprt *xprt) 1396{ 1397 get_random_bytes(&xprt->xid, sizeof(xprt->xid)); 1398} 1399 1400/* 1401 * Initialize RPC request 1402 */ 1403static void 1404xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 1405{ 1406 struct rpc_rqst *req = task->tk_rqstp; 1407 1408 req->rq_timeout = xprt->timeout.to_initval; 1409 req->rq_task = task; 1410 req->rq_xprt = xprt; 1411 req->rq_xid = xprt_alloc_xid(xprt); 1412 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, 1413 req, ntohl(req->rq_xid)); 1414} 1415 1416/* 1417 * Release an RPC call slot 1418 */ 1419void 1420xprt_release(struct rpc_task *task) 1421{ 1422 struct rpc_xprt *xprt = task->tk_xprt; 1423 struct rpc_rqst *req; 1424 1425 if (!(req = task->tk_rqstp)) 1426 return; 1427 spin_lock_bh(&xprt->sock_lock); 1428 __xprt_release_write(xprt, task); 1429 __xprt_put_cong(xprt, req); 1430 if (!list_empty(&req->rq_list)) 1431 list_del(&req->rq_list); 1432 xprt->last_used = jiffies; 1433 if (list_empty(&xprt->recv) && !xprt->shutdown) 1434 mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT); 1435 spin_unlock_bh(&xprt->sock_lock); 1436 task->tk_rqstp = NULL; 1437 memset(req, 0, sizeof(*req)); /* mark unused */ 1438 1439 dprintk("RPC: %4d release request %p\n", task->tk_pid, req); 1440 1441 spin_lock(&xprt->xprt_lock); 1442 list_add(&req->rq_list, &xprt->free); 1443 xprt_clear_backlog(xprt); 1444 spin_unlock(&xprt->xprt_lock); 1445} 1446 1447/* 1448 * Set default timeout parameters 1449 */ 1450static void 1451xprt_default_timeout(struct rpc_timeout *to, int proto) 1452{ 1453 if (proto == IPPROTO_UDP) 1454 xprt_set_timeout(to, 5, 5 * HZ); 1455 else 1456 xprt_set_timeout(to, 2, 60 * HZ); 1457} 1458 1459/* 1460 * Set constant timeout 1461 */ 1462void 1463xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) 1464{ 1465 to->to_initval = 1466 to->to_increment = incr; 1467 to->to_maxval = to->to_initval + (incr * retr); 1468 to->to_retries = retr; 1469 to->to_exponential = 0; 1470} 1471 1472unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 1473unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; 1474 1475/* 1476 * Initialize an RPC client 1477 */ 1478static struct rpc_xprt * 1479xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) 1480{ 1481 struct rpc_xprt *xprt; 1482 unsigned int entries; 1483 size_t slot_table_size; 1484 struct rpc_rqst *req; 1485 1486 dprintk("RPC: setting up %s transport...\n", 1487 proto == IPPROTO_UDP? "UDP" : "TCP"); 1488 1489 entries = (proto == IPPROTO_TCP)? 1490 xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries; 1491 1492 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) 1493 return ERR_PTR(-ENOMEM); 1494 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ 1495 xprt->max_reqs = entries; 1496 slot_table_size = entries * sizeof(xprt->slot[0]); 1497 xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); 1498 if (xprt->slot == NULL) { 1499 kfree(xprt); 1500 return ERR_PTR(-ENOMEM); 1501 } 1502 memset(xprt->slot, 0, slot_table_size); 1503 1504 xprt->addr = *ap; 1505 xprt->prot = proto; 1506 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; 1507 if (xprt->stream) { 1508 xprt->cwnd = RPC_MAXCWND(xprt); 1509 xprt->nocong = 1; 1510 xprt->max_payload = (1U << 31) - 1; 1511 } else { 1512 xprt->cwnd = RPC_INITCWND; 1513 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 1514 } 1515 spin_lock_init(&xprt->sock_lock); 1516 spin_lock_init(&xprt->xprt_lock); 1517 init_waitqueue_head(&xprt->cong_wait); 1518 1519 INIT_LIST_HEAD(&xprt->free); 1520 INIT_LIST_HEAD(&xprt->recv); 1521 INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); 1522 INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); 1523 init_timer(&xprt->timer); 1524 xprt->timer.function = xprt_init_autodisconnect; 1525 xprt->timer.data = (unsigned long) xprt; 1526 xprt->last_used = jiffies; 1527 xprt->port = XPRT_MAX_RESVPORT; 1528 1529 /* Set timeout parameters */ 1530 if (to) { 1531 xprt->timeout = *to; 1532 } else 1533 xprt_default_timeout(&xprt->timeout, xprt->prot); 1534 1535 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1536 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1537 rpc_init_wait_queue(&xprt->resend, "xprt_resend"); 1538 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1539 1540 /* initialize free list */ 1541 for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--) 1542 list_add(&req->rq_list, &xprt->free); 1543 1544 xprt_init_xid(xprt); 1545 1546 /* Check whether we want to use a reserved port */ 1547 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; 1548 1549 dprintk("RPC: created transport %p with %u slots\n", xprt, 1550 xprt->max_reqs); 1551 1552 return xprt; 1553} 1554 1555/* 1556 * Bind to a reserved port 1557 */ 1558static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) 1559{ 1560 struct sockaddr_in myaddr = { 1561 .sin_family = AF_INET, 1562 }; 1563 int err, port; 1564 1565 /* Were we already bound to a given port? Try to reuse it */ 1566 port = xprt->port; 1567 do { 1568 myaddr.sin_port = htons(port); 1569 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, 1570 sizeof(myaddr)); 1571 if (err == 0) { 1572 xprt->port = port; 1573 return 0; 1574 } 1575 if (--port == 0) 1576 port = XPRT_MAX_RESVPORT; 1577 } while (err == -EADDRINUSE && port != xprt->port); 1578 1579 printk("RPC: Can't bind to reserved port (%d).\n", -err); 1580 return err; 1581} 1582 1583static void 1584xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) 1585{ 1586 struct sock *sk = sock->sk; 1587 1588 if (xprt->inet) 1589 return; 1590 1591 write_lock_bh(&sk->sk_callback_lock); 1592 sk->sk_user_data = xprt; 1593 xprt->old_data_ready = sk->sk_data_ready; 1594 xprt->old_state_change = sk->sk_state_change; 1595 xprt->old_write_space = sk->sk_write_space; 1596 if (xprt->prot == IPPROTO_UDP) { 1597 sk->sk_data_ready = udp_data_ready; 1598 sk->sk_no_check = UDP_CSUM_NORCV; 1599 xprt_set_connected(xprt); 1600 } else { 1601 tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ 1602 sk->sk_data_ready = tcp_data_ready; 1603 sk->sk_state_change = tcp_state_change; 1604 xprt_clear_connected(xprt); 1605 } 1606 sk->sk_write_space = xprt_write_space; 1607 1608 /* Reset to new socket */ 1609 xprt->sock = sock; 1610 xprt->inet = sk; 1611 write_unlock_bh(&sk->sk_callback_lock); 1612 1613 return; 1614} 1615 1616/* 1617 * Set socket buffer length 1618 */ 1619void 1620xprt_sock_setbufsize(struct rpc_xprt *xprt) 1621{ 1622 struct sock *sk = xprt->inet; 1623 1624 if (xprt->stream) 1625 return; 1626 if (xprt->rcvsize) { 1627 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 1628 sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; 1629 } 1630 if (xprt->sndsize) { 1631 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 1632 sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; 1633 sk->sk_write_space(sk); 1634 } 1635} 1636 1637/* 1638 * Datastream sockets are created here, but xprt_connect will create 1639 * and connect stream sockets. 1640 */ 1641static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) 1642{ 1643 struct socket *sock; 1644 int type, err; 1645 1646 dprintk("RPC: xprt_create_socket(%s %d)\n", 1647 (proto == IPPROTO_UDP)? "udp" : "tcp", proto); 1648 1649 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; 1650 1651 if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { 1652 printk("RPC: can't create socket (%d).\n", -err); 1653 return NULL; 1654 } 1655 1656 /* If the caller has the capability, bind to a reserved port */ 1657 if (resvport && xprt_bindresvport(xprt, sock) < 0) { 1658 printk("RPC: can't bind to reserved port.\n"); 1659 goto failed; 1660 } 1661 1662 return sock; 1663 1664failed: 1665 sock_release(sock); 1666 return NULL; 1667} 1668 1669/* 1670 * Create an RPC client transport given the protocol and peer address. 1671 */ 1672struct rpc_xprt * 1673xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) 1674{ 1675 struct rpc_xprt *xprt; 1676 1677 xprt = xprt_setup(proto, sap, to); 1678 if (IS_ERR(xprt)) 1679 dprintk("RPC: xprt_create_proto failed\n"); 1680 else 1681 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt); 1682 return xprt; 1683} 1684 1685/* 1686 * Prepare for transport shutdown. 1687 */ 1688static void 1689xprt_shutdown(struct rpc_xprt *xprt) 1690{ 1691 xprt->shutdown = 1; 1692 rpc_wake_up(&xprt->sending); 1693 rpc_wake_up(&xprt->resend); 1694 rpc_wake_up(&xprt->pending); 1695 rpc_wake_up(&xprt->backlog); 1696 wake_up(&xprt->cong_wait); 1697 del_timer_sync(&xprt->timer); 1698 1699 /* synchronously wait for connect worker to finish */ 1700 cancel_delayed_work(&xprt->sock_connect); 1701 flush_scheduled_work(); 1702} 1703 1704/* 1705 * Clear the xprt backlog queue 1706 */ 1707static int 1708xprt_clear_backlog(struct rpc_xprt *xprt) { 1709 rpc_wake_up_next(&xprt->backlog); 1710 wake_up(&xprt->cong_wait); 1711 return 1; 1712} 1713 1714/* 1715 * Destroy an RPC transport, killing off all requests. 1716 */ 1717int 1718xprt_destroy(struct rpc_xprt *xprt) 1719{ 1720 dprintk("RPC: destroying transport %p\n", xprt); 1721 xprt_shutdown(xprt); 1722 xprt_disconnect(xprt); 1723 xprt_close(xprt); 1724 kfree(xprt->slot); 1725 kfree(xprt); 1726 1727 return 0; 1728} 1729