xprt.c revision c54d7e03c3a21b38c587f671704c5a12aa3987fc
1/* 2 * linux/net/sunrpc/xprt.c 3 * 4 * This is a generic RPC call interface supporting congestion avoidance, 5 * and asynchronous calls. 6 * 7 * The interface works like this: 8 * 9 * - When a process places a call, it allocates a request slot if 10 * one is available. Otherwise, it sleeps on the backlog queue 11 * (xprt_reserve). 12 * - Next, the caller puts together the RPC message, stuffs it into 13 * the request struct, and calls xprt_call(). 14 * - xprt_call transmits the message and installs the caller on the 15 * socket's wait list. At the same time, it installs a timer that 16 * is run after the packet's timeout has expired. 17 * - When a packet arrives, the data_ready handler walks the list of 18 * pending requests for that socket. If a matching XID is found, the 19 * caller is woken up, and the timer removed. 20 * - When no reply arrives within the timeout interval, the timer is 21 * fired by the kernel and runs xprt_timer(). It either adjusts the 22 * timeout values (minor timeout) or wakes up the caller with a status 23 * of -ETIMEDOUT. 24 * - When the caller receives a notification from RPC that a reply arrived, 25 * it should release the RPC slot, and process the reply. 26 * If the call timed out, it may choose to retry the operation by 27 * adjusting the initial timeout value, and simply calling rpc_call 28 * again. 29 * 30 * Support for async RPC is done through a set of RPC-specific scheduling 31 * primitives that `transparently' work for processes as well as async 32 * tasks that rely on callbacks. 33 * 34 * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> 35 * 36 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> 37 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> 38 * TCP NFS related read + write fixes 39 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 40 * 41 * Rewrite of larges part of the code in order to stabilize TCP stuff. 42 * Fix behaviour when socket buffer is full. 43 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 44 */ 45 46#include <linux/types.h> 47#include <linux/slab.h> 48#include <linux/capability.h> 49#include <linux/sched.h> 50#include <linux/errno.h> 51#include <linux/socket.h> 52#include <linux/in.h> 53#include <linux/net.h> 54#include <linux/mm.h> 55#include <linux/udp.h> 56#include <linux/tcp.h> 57#include <linux/sunrpc/clnt.h> 58#include <linux/file.h> 59#include <linux/workqueue.h> 60#include <linux/random.h> 61 62#include <net/sock.h> 63#include <net/checksum.h> 64#include <net/udp.h> 65#include <net/tcp.h> 66 67/* 68 * Local variables 69 */ 70 71#ifdef RPC_DEBUG 72# undef RPC_DEBUG_DATA 73# define RPCDBG_FACILITY RPCDBG_XPRT 74#endif 75 76#define XPRT_MAX_BACKOFF (8) 77#define XPRT_IDLE_TIMEOUT (5*60*HZ) 78#define XPRT_MAX_RESVPORT (800) 79 80/* 81 * Local functions 82 */ 83static void xprt_request_init(struct rpc_task *, struct rpc_xprt *); 84static inline void do_xprt_reserve(struct rpc_task *); 85static void xprt_disconnect(struct rpc_xprt *); 86static void xprt_connect_status(struct rpc_task *task); 87static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap, 88 struct rpc_timeout *to); 89static struct socket *xprt_create_socket(struct rpc_xprt *, int, int); 90static void xprt_bind_socket(struct rpc_xprt *, struct socket *); 91static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *); 92 93static int xprt_clear_backlog(struct rpc_xprt *xprt); 94 95#ifdef RPC_DEBUG_DATA 96/* 97 * Print the buffer contents (first 128 bytes only--just enough for 98 * diropres return). 99 */ 100static void 101xprt_pktdump(char *msg, u32 *packet, unsigned int count) 102{ 103 u8 *buf = (u8 *) packet; 104 int j; 105 106 dprintk("RPC: %s\n", msg); 107 for (j = 0; j < count && j < 128; j += 4) { 108 if (!(j & 31)) { 109 if (j) 110 dprintk("\n"); 111 dprintk("0x%04x ", j); 112 } 113 dprintk("%02x%02x%02x%02x ", 114 buf[j], buf[j+1], buf[j+2], buf[j+3]); 115 } 116 dprintk("\n"); 117} 118#else 119static inline void 120xprt_pktdump(char *msg, u32 *packet, unsigned int count) 121{ 122 /* NOP */ 123} 124#endif 125 126/* 127 * Look up RPC transport given an INET socket 128 */ 129static inline struct rpc_xprt * 130xprt_from_sock(struct sock *sk) 131{ 132 return (struct rpc_xprt *) sk->sk_user_data; 133} 134 135/* 136 * Serialize write access to sockets, in order to prevent different 137 * requests from interfering with each other. 138 * Also prevents TCP socket connects from colliding with writes. 139 */ 140static int 141__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 142{ 143 struct rpc_rqst *req = task->tk_rqstp; 144 145 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) { 146 if (task == xprt->snd_task) 147 return 1; 148 if (task == NULL) 149 return 0; 150 goto out_sleep; 151 } 152 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 153 xprt->snd_task = task; 154 if (req) { 155 req->rq_bytes_sent = 0; 156 req->rq_ntrans++; 157 } 158 return 1; 159 } 160 smp_mb__before_clear_bit(); 161 clear_bit(XPRT_LOCKED, &xprt->sockstate); 162 smp_mb__after_clear_bit(); 163out_sleep: 164 dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt); 165 task->tk_timeout = 0; 166 task->tk_status = -EAGAIN; 167 if (req && req->rq_ntrans) 168 rpc_sleep_on(&xprt->resend, task, NULL, NULL); 169 else 170 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 171 return 0; 172} 173 174static inline int 175xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) 176{ 177 int retval; 178 179 spin_lock_bh(&xprt->sock_lock); 180 retval = __xprt_lock_write(xprt, task); 181 spin_unlock_bh(&xprt->sock_lock); 182 return retval; 183} 184 185 186static void 187__xprt_lock_write_next(struct rpc_xprt *xprt) 188{ 189 struct rpc_task *task; 190 191 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) 192 return; 193 if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) 194 goto out_unlock; 195 task = rpc_wake_up_next(&xprt->resend); 196 if (!task) { 197 task = rpc_wake_up_next(&xprt->sending); 198 if (!task) 199 goto out_unlock; 200 } 201 if (xprt->nocong || __xprt_get_cong(xprt, task)) { 202 struct rpc_rqst *req = task->tk_rqstp; 203 xprt->snd_task = task; 204 if (req) { 205 req->rq_bytes_sent = 0; 206 req->rq_ntrans++; 207 } 208 return; 209 } 210out_unlock: 211 smp_mb__before_clear_bit(); 212 clear_bit(XPRT_LOCKED, &xprt->sockstate); 213 smp_mb__after_clear_bit(); 214} 215 216/* 217 * Releases the socket for use by other requests. 218 */ 219static void 220__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 221{ 222 if (xprt->snd_task == task) { 223 xprt->snd_task = NULL; 224 smp_mb__before_clear_bit(); 225 clear_bit(XPRT_LOCKED, &xprt->sockstate); 226 smp_mb__after_clear_bit(); 227 __xprt_lock_write_next(xprt); 228 } 229} 230 231static inline void 232xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task) 233{ 234 spin_lock_bh(&xprt->sock_lock); 235 __xprt_release_write(xprt, task); 236 spin_unlock_bh(&xprt->sock_lock); 237} 238 239/* 240 * Write data to socket. 241 */ 242static inline int 243xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) 244{ 245 struct socket *sock = xprt->sock; 246 struct xdr_buf *xdr = &req->rq_snd_buf; 247 struct sockaddr *addr = NULL; 248 int addrlen = 0; 249 unsigned int skip; 250 int result; 251 252 if (!sock) 253 return -ENOTCONN; 254 255 xprt_pktdump("packet data:", 256 req->rq_svec->iov_base, 257 req->rq_svec->iov_len); 258 259 /* For UDP, we need to provide an address */ 260 if (!xprt->stream) { 261 addr = (struct sockaddr *) &xprt->addr; 262 addrlen = sizeof(xprt->addr); 263 } 264 /* Dont repeat bytes */ 265 skip = req->rq_bytes_sent; 266 267 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 268 result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); 269 270 dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); 271 272 if (result >= 0) 273 return result; 274 275 switch (result) { 276 case -ECONNREFUSED: 277 /* When the server has died, an ICMP port unreachable message 278 * prompts ECONNREFUSED. 279 */ 280 case -EAGAIN: 281 break; 282 case -ECONNRESET: 283 case -ENOTCONN: 284 case -EPIPE: 285 /* connection broken */ 286 if (xprt->stream) 287 result = -ENOTCONN; 288 break; 289 default: 290 printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); 291 } 292 return result; 293} 294 295/* 296 * Van Jacobson congestion avoidance. Check if the congestion window 297 * overflowed. Put the task to sleep if this is the case. 298 */ 299static int 300__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task) 301{ 302 struct rpc_rqst *req = task->tk_rqstp; 303 304 if (req->rq_cong) 305 return 1; 306 dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", 307 task->tk_pid, xprt->cong, xprt->cwnd); 308 if (RPCXPRT_CONGESTED(xprt)) 309 return 0; 310 req->rq_cong = 1; 311 xprt->cong += RPC_CWNDSCALE; 312 return 1; 313} 314 315/* 316 * Adjust the congestion window, and wake up the next task 317 * that has been sleeping due to congestion 318 */ 319static void 320__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) 321{ 322 if (!req->rq_cong) 323 return; 324 req->rq_cong = 0; 325 xprt->cong -= RPC_CWNDSCALE; 326 __xprt_lock_write_next(xprt); 327} 328 329/* 330 * Adjust RPC congestion window 331 * We use a time-smoothed congestion estimator to avoid heavy oscillation. 332 */ 333static void 334xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) 335{ 336 unsigned long cwnd; 337 338 cwnd = xprt->cwnd; 339 if (result >= 0 && cwnd <= xprt->cong) { 340 /* The (cwnd >> 1) term makes sure 341 * the result gets rounded properly. */ 342 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; 343 if (cwnd > RPC_MAXCWND(xprt)) 344 cwnd = RPC_MAXCWND(xprt); 345 __xprt_lock_write_next(xprt); 346 } else if (result == -ETIMEDOUT) { 347 cwnd >>= 1; 348 if (cwnd < RPC_CWNDSCALE) 349 cwnd = RPC_CWNDSCALE; 350 } 351 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", 352 xprt->cong, xprt->cwnd, cwnd); 353 xprt->cwnd = cwnd; 354} 355 356/* 357 * Reset the major timeout value 358 */ 359static void xprt_reset_majortimeo(struct rpc_rqst *req) 360{ 361 struct rpc_timeout *to = &req->rq_xprt->timeout; 362 363 req->rq_majortimeo = req->rq_timeout; 364 if (to->to_exponential) 365 req->rq_majortimeo <<= to->to_retries; 366 else 367 req->rq_majortimeo += to->to_increment * to->to_retries; 368 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) 369 req->rq_majortimeo = to->to_maxval; 370 req->rq_majortimeo += jiffies; 371} 372 373/* 374 * Adjust timeout values etc for next retransmit 375 */ 376int xprt_adjust_timeout(struct rpc_rqst *req) 377{ 378 struct rpc_xprt *xprt = req->rq_xprt; 379 struct rpc_timeout *to = &xprt->timeout; 380 int status = 0; 381 382 if (time_before(jiffies, req->rq_majortimeo)) { 383 if (to->to_exponential) 384 req->rq_timeout <<= 1; 385 else 386 req->rq_timeout += to->to_increment; 387 if (to->to_maxval && req->rq_timeout >= to->to_maxval) 388 req->rq_timeout = to->to_maxval; 389 req->rq_retries++; 390 pprintk("RPC: %lu retrans\n", jiffies); 391 } else { 392 req->rq_timeout = to->to_initval; 393 req->rq_retries = 0; 394 xprt_reset_majortimeo(req); 395 /* Reset the RTT counters == "slow start" */ 396 spin_lock_bh(&xprt->sock_lock); 397 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); 398 spin_unlock_bh(&xprt->sock_lock); 399 pprintk("RPC: %lu timeout\n", jiffies); 400 status = -ETIMEDOUT; 401 } 402 403 if (req->rq_timeout == 0) { 404 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); 405 req->rq_timeout = 5 * HZ; 406 } 407 return status; 408} 409 410/* 411 * Close down a transport socket 412 */ 413static void 414xprt_close(struct rpc_xprt *xprt) 415{ 416 struct socket *sock = xprt->sock; 417 struct sock *sk = xprt->inet; 418 419 if (!sk) 420 return; 421 422 write_lock_bh(&sk->sk_callback_lock); 423 xprt->inet = NULL; 424 xprt->sock = NULL; 425 426 sk->sk_user_data = NULL; 427 sk->sk_data_ready = xprt->old_data_ready; 428 sk->sk_state_change = xprt->old_state_change; 429 sk->sk_write_space = xprt->old_write_space; 430 write_unlock_bh(&sk->sk_callback_lock); 431 432 sk->sk_no_check = 0; 433 434 sock_release(sock); 435} 436 437static void 438xprt_socket_autoclose(void *args) 439{ 440 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 441 442 xprt_disconnect(xprt); 443 xprt_close(xprt); 444 xprt_release_write(xprt, NULL); 445} 446 447/* 448 * Mark a transport as disconnected 449 */ 450static void 451xprt_disconnect(struct rpc_xprt *xprt) 452{ 453 dprintk("RPC: disconnected transport %p\n", xprt); 454 spin_lock_bh(&xprt->sock_lock); 455 xprt_clear_connected(xprt); 456 rpc_wake_up_status(&xprt->pending, -ENOTCONN); 457 spin_unlock_bh(&xprt->sock_lock); 458} 459 460/* 461 * Used to allow disconnection when we've been idle 462 */ 463static void 464xprt_init_autodisconnect(unsigned long data) 465{ 466 struct rpc_xprt *xprt = (struct rpc_xprt *)data; 467 468 spin_lock(&xprt->sock_lock); 469 if (!list_empty(&xprt->recv) || xprt->shutdown) 470 goto out_abort; 471 if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) 472 goto out_abort; 473 spin_unlock(&xprt->sock_lock); 474 /* Let keventd close the socket */ 475 if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0) 476 xprt_release_write(xprt, NULL); 477 else 478 schedule_work(&xprt->task_cleanup); 479 return; 480out_abort: 481 spin_unlock(&xprt->sock_lock); 482} 483 484static void xprt_socket_connect(void *args) 485{ 486 struct rpc_xprt *xprt = (struct rpc_xprt *)args; 487 struct socket *sock = xprt->sock; 488 int status = -EIO; 489 490 if (xprt->shutdown || xprt->addr.sin_port == 0) 491 goto out; 492 493 /* 494 * Start by resetting any existing state 495 */ 496 xprt_close(xprt); 497 sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); 498 if (sock == NULL) { 499 /* couldn't create socket or bind to reserved port; 500 * this is likely a permanent error, so cause an abort */ 501 goto out; 502 } 503 xprt_bind_socket(xprt, sock); 504 xprt_sock_setbufsize(xprt); 505 506 status = 0; 507 if (!xprt->stream) 508 goto out; 509 510 /* 511 * Tell the socket layer to start connecting... 512 */ 513 status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, 514 sizeof(xprt->addr), O_NONBLOCK); 515 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 516 xprt, -status, xprt_connected(xprt), sock->sk->sk_state); 517 if (status < 0) { 518 switch (status) { 519 case -EINPROGRESS: 520 case -EALREADY: 521 goto out_clear; 522 } 523 } 524out: 525 if (status < 0) 526 rpc_wake_up_status(&xprt->pending, status); 527 else 528 rpc_wake_up(&xprt->pending); 529out_clear: 530 smp_mb__before_clear_bit(); 531 clear_bit(XPRT_CONNECTING, &xprt->sockstate); 532 smp_mb__after_clear_bit(); 533} 534 535/* 536 * Attempt to connect a TCP socket. 537 * 538 */ 539void xprt_connect(struct rpc_task *task) 540{ 541 struct rpc_xprt *xprt = task->tk_xprt; 542 543 dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid, 544 xprt, (xprt_connected(xprt) ? "is" : "is not")); 545 546 if (xprt->shutdown) { 547 task->tk_status = -EIO; 548 return; 549 } 550 if (!xprt->addr.sin_port) { 551 task->tk_status = -EIO; 552 return; 553 } 554 if (!xprt_lock_write(xprt, task)) 555 return; 556 if (xprt_connected(xprt)) 557 goto out_write; 558 559 if (task->tk_rqstp) 560 task->tk_rqstp->rq_bytes_sent = 0; 561 562 task->tk_timeout = RPC_CONNECT_TIMEOUT; 563 rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL); 564 if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { 565 /* Note: if we are here due to a dropped connection 566 * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ 567 * seconds 568 */ 569 if (xprt->sock != NULL) 570 schedule_delayed_work(&xprt->sock_connect, 571 RPC_REESTABLISH_TIMEOUT); 572 else { 573 schedule_work(&xprt->sock_connect); 574 if (!RPC_IS_ASYNC(task)) 575 flush_scheduled_work(); 576 } 577 } 578 return; 579 out_write: 580 xprt_release_write(xprt, task); 581} 582 583/* 584 * We arrive here when awoken from waiting on connection establishment. 585 */ 586static void 587xprt_connect_status(struct rpc_task *task) 588{ 589 struct rpc_xprt *xprt = task->tk_xprt; 590 591 if (task->tk_status >= 0) { 592 dprintk("RPC: %4d xprt_connect_status: connection established\n", 593 task->tk_pid); 594 return; 595 } 596 597 /* if soft mounted, just cause this RPC to fail */ 598 if (RPC_IS_SOFT(task)) 599 task->tk_status = -EIO; 600 601 switch (task->tk_status) { 602 case -ECONNREFUSED: 603 case -ECONNRESET: 604 case -ENOTCONN: 605 return; 606 case -ETIMEDOUT: 607 dprintk("RPC: %4d xprt_connect_status: timed out\n", 608 task->tk_pid); 609 break; 610 default: 611 printk(KERN_ERR "RPC: error %d connecting to server %s\n", 612 -task->tk_status, task->tk_client->cl_server); 613 } 614 xprt_release_write(xprt, task); 615} 616 617/* 618 * Look up the RPC request corresponding to a reply, and then lock it. 619 */ 620static inline struct rpc_rqst * 621xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid) 622{ 623 struct list_head *pos; 624 struct rpc_rqst *req = NULL; 625 626 list_for_each(pos, &xprt->recv) { 627 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list); 628 if (entry->rq_xid == xid) { 629 req = entry; 630 break; 631 } 632 } 633 return req; 634} 635 636/* 637 * Complete reply received. 638 * The TCP code relies on us to remove the request from xprt->pending. 639 */ 640static void 641xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied) 642{ 643 struct rpc_task *task = req->rq_task; 644 struct rpc_clnt *clnt = task->tk_client; 645 646 /* Adjust congestion window */ 647 if (!xprt->nocong) { 648 unsigned timer = task->tk_msg.rpc_proc->p_timer; 649 xprt_adjust_cwnd(xprt, copied); 650 __xprt_put_cong(xprt, req); 651 if (timer) { 652 if (req->rq_ntrans == 1) 653 rpc_update_rtt(clnt->cl_rtt, timer, 654 (long)jiffies - req->rq_xtime); 655 rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1); 656 } 657 } 658 659#ifdef RPC_PROFILE 660 /* Profile only reads for now */ 661 if (copied > 1024) { 662 static unsigned long nextstat; 663 static unsigned long pkt_rtt, pkt_len, pkt_cnt; 664 665 pkt_cnt++; 666 pkt_len += req->rq_slen + copied; 667 pkt_rtt += jiffies - req->rq_xtime; 668 if (time_before(nextstat, jiffies)) { 669 printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd); 670 printk("RPC: %ld %ld %ld %ld stat\n", 671 jiffies, pkt_cnt, pkt_len, pkt_rtt); 672 pkt_rtt = pkt_len = pkt_cnt = 0; 673 nextstat = jiffies + 5 * HZ; 674 } 675 } 676#endif 677 678 dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied); 679 list_del_init(&req->rq_list); 680 req->rq_received = req->rq_private_buf.len = copied; 681 682 /* ... and wake up the process. */ 683 rpc_wake_up_task(task); 684 return; 685} 686 687static size_t 688skb_read_bits(skb_reader_t *desc, void *to, size_t len) 689{ 690 if (len > desc->count) 691 len = desc->count; 692 if (skb_copy_bits(desc->skb, desc->offset, to, len)) 693 return 0; 694 desc->count -= len; 695 desc->offset += len; 696 return len; 697} 698 699static size_t 700skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len) 701{ 702 unsigned int csum2, pos; 703 704 if (len > desc->count) 705 len = desc->count; 706 pos = desc->offset; 707 csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0); 708 desc->csum = csum_block_add(desc->csum, csum2, pos); 709 desc->count -= len; 710 desc->offset += len; 711 return len; 712} 713 714/* 715 * We have set things up such that we perform the checksum of the UDP 716 * packet in parallel with the copies into the RPC client iovec. -DaveM 717 */ 718int 719csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) 720{ 721 skb_reader_t desc; 722 723 desc.skb = skb; 724 desc.offset = sizeof(struct udphdr); 725 desc.count = skb->len - desc.offset; 726 727 if (skb->ip_summed == CHECKSUM_UNNECESSARY) 728 goto no_checksum; 729 730 desc.csum = csum_partial(skb->data, desc.offset, skb->csum); 731 if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits) < 0) 732 return -1; 733 if (desc.offset != skb->len) { 734 unsigned int csum2; 735 csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0); 736 desc.csum = csum_block_add(desc.csum, csum2, desc.offset); 737 } 738 if (desc.count) 739 return -1; 740 if ((unsigned short)csum_fold(desc.csum)) 741 return -1; 742 return 0; 743no_checksum: 744 if (xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits) < 0) 745 return -1; 746 if (desc.count) 747 return -1; 748 return 0; 749} 750 751/* 752 * Input handler for RPC replies. Called from a bottom half and hence 753 * atomic. 754 */ 755static void 756udp_data_ready(struct sock *sk, int len) 757{ 758 struct rpc_task *task; 759 struct rpc_xprt *xprt; 760 struct rpc_rqst *rovr; 761 struct sk_buff *skb; 762 int err, repsize, copied; 763 u32 _xid, *xp; 764 765 read_lock(&sk->sk_callback_lock); 766 dprintk("RPC: udp_data_ready...\n"); 767 if (!(xprt = xprt_from_sock(sk))) { 768 printk("RPC: udp_data_ready request not found!\n"); 769 goto out; 770 } 771 772 dprintk("RPC: udp_data_ready client %p\n", xprt); 773 774 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 775 goto out; 776 777 if (xprt->shutdown) 778 goto dropit; 779 780 repsize = skb->len - sizeof(struct udphdr); 781 if (repsize < 4) { 782 printk("RPC: impossible RPC reply size %d!\n", repsize); 783 goto dropit; 784 } 785 786 /* Copy the XID from the skb... */ 787 xp = skb_header_pointer(skb, sizeof(struct udphdr), 788 sizeof(_xid), &_xid); 789 if (xp == NULL) 790 goto dropit; 791 792 /* Look up and lock the request corresponding to the given XID */ 793 spin_lock(&xprt->sock_lock); 794 rovr = xprt_lookup_rqst(xprt, *xp); 795 if (!rovr) 796 goto out_unlock; 797 task = rovr->rq_task; 798 799 dprintk("RPC: %4d received reply\n", task->tk_pid); 800 801 if ((copied = rovr->rq_private_buf.buflen) > repsize) 802 copied = repsize; 803 804 /* Suck it into the iovec, verify checksum if not done by hw. */ 805 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) 806 goto out_unlock; 807 808 /* Something worked... */ 809 dst_confirm(skb->dst); 810 811 xprt_complete_rqst(xprt, rovr, copied); 812 813 out_unlock: 814 spin_unlock(&xprt->sock_lock); 815 dropit: 816 skb_free_datagram(sk, skb); 817 out: 818 read_unlock(&sk->sk_callback_lock); 819} 820 821/* 822 * Copy from an skb into memory and shrink the skb. 823 */ 824static inline size_t 825tcp_copy_data(skb_reader_t *desc, void *p, size_t len) 826{ 827 if (len > desc->count) 828 len = desc->count; 829 if (skb_copy_bits(desc->skb, desc->offset, p, len)) { 830 dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", 831 len, desc->count); 832 return 0; 833 } 834 desc->offset += len; 835 desc->count -= len; 836 dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", 837 len, desc->count); 838 return len; 839} 840 841/* 842 * TCP read fragment marker 843 */ 844static inline void 845tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) 846{ 847 size_t len, used; 848 char *p; 849 850 p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; 851 len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; 852 used = tcp_copy_data(desc, p, len); 853 xprt->tcp_offset += used; 854 if (used != len) 855 return; 856 xprt->tcp_reclen = ntohl(xprt->tcp_recm); 857 if (xprt->tcp_reclen & 0x80000000) 858 xprt->tcp_flags |= XPRT_LAST_FRAG; 859 else 860 xprt->tcp_flags &= ~XPRT_LAST_FRAG; 861 xprt->tcp_reclen &= 0x7fffffff; 862 xprt->tcp_flags &= ~XPRT_COPY_RECM; 863 xprt->tcp_offset = 0; 864 /* Sanity check of the record length */ 865 if (xprt->tcp_reclen < 4) { 866 printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); 867 xprt_disconnect(xprt); 868 } 869 dprintk("RPC: reading TCP record fragment of length %d\n", 870 xprt->tcp_reclen); 871} 872 873static void 874tcp_check_recm(struct rpc_xprt *xprt) 875{ 876 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", 877 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); 878 if (xprt->tcp_offset == xprt->tcp_reclen) { 879 xprt->tcp_flags |= XPRT_COPY_RECM; 880 xprt->tcp_offset = 0; 881 if (xprt->tcp_flags & XPRT_LAST_FRAG) { 882 xprt->tcp_flags &= ~XPRT_COPY_DATA; 883 xprt->tcp_flags |= XPRT_COPY_XID; 884 xprt->tcp_copied = 0; 885 } 886 } 887} 888 889/* 890 * TCP read xid 891 */ 892static inline void 893tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) 894{ 895 size_t len, used; 896 char *p; 897 898 len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; 899 dprintk("RPC: reading XID (%Zu bytes)\n", len); 900 p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; 901 used = tcp_copy_data(desc, p, len); 902 xprt->tcp_offset += used; 903 if (used != len) 904 return; 905 xprt->tcp_flags &= ~XPRT_COPY_XID; 906 xprt->tcp_flags |= XPRT_COPY_DATA; 907 xprt->tcp_copied = 4; 908 dprintk("RPC: reading reply for XID %08x\n", 909 ntohl(xprt->tcp_xid)); 910 tcp_check_recm(xprt); 911} 912 913/* 914 * TCP read and complete request 915 */ 916static inline void 917tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) 918{ 919 struct rpc_rqst *req; 920 struct xdr_buf *rcvbuf; 921 size_t len; 922 ssize_t r; 923 924 /* Find and lock the request corresponding to this xid */ 925 spin_lock(&xprt->sock_lock); 926 req = xprt_lookup_rqst(xprt, xprt->tcp_xid); 927 if (!req) { 928 xprt->tcp_flags &= ~XPRT_COPY_DATA; 929 dprintk("RPC: XID %08x request not found!\n", 930 ntohl(xprt->tcp_xid)); 931 spin_unlock(&xprt->sock_lock); 932 return; 933 } 934 935 rcvbuf = &req->rq_private_buf; 936 len = desc->count; 937 if (len > xprt->tcp_reclen - xprt->tcp_offset) { 938 skb_reader_t my_desc; 939 940 len = xprt->tcp_reclen - xprt->tcp_offset; 941 memcpy(&my_desc, desc, sizeof(my_desc)); 942 my_desc.count = len; 943 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 944 &my_desc, tcp_copy_data); 945 desc->count -= r; 946 desc->offset += r; 947 } else 948 r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 949 desc, tcp_copy_data); 950 951 if (r > 0) { 952 xprt->tcp_copied += r; 953 xprt->tcp_offset += r; 954 } 955 if (r != len) { 956 /* Error when copying to the receive buffer, 957 * usually because we weren't able to allocate 958 * additional buffer pages. All we can do now 959 * is turn off XPRT_COPY_DATA, so the request 960 * will not receive any additional updates, 961 * and time out. 962 * Any remaining data from this record will 963 * be discarded. 964 */ 965 xprt->tcp_flags &= ~XPRT_COPY_DATA; 966 dprintk("RPC: XID %08x truncated request\n", 967 ntohl(xprt->tcp_xid)); 968 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 969 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 970 goto out; 971 } 972 973 dprintk("RPC: XID %08x read %Zd bytes\n", 974 ntohl(xprt->tcp_xid), r); 975 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 976 xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 977 978 if (xprt->tcp_copied == req->rq_private_buf.buflen) 979 xprt->tcp_flags &= ~XPRT_COPY_DATA; 980 else if (xprt->tcp_offset == xprt->tcp_reclen) { 981 if (xprt->tcp_flags & XPRT_LAST_FRAG) 982 xprt->tcp_flags &= ~XPRT_COPY_DATA; 983 } 984 985out: 986 if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { 987 dprintk("RPC: %4d received reply complete\n", 988 req->rq_task->tk_pid); 989 xprt_complete_rqst(xprt, req, xprt->tcp_copied); 990 } 991 spin_unlock(&xprt->sock_lock); 992 tcp_check_recm(xprt); 993} 994 995/* 996 * TCP discard extra bytes from a short read 997 */ 998static inline void 999tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) 1000{ 1001 size_t len; 1002 1003 len = xprt->tcp_reclen - xprt->tcp_offset; 1004 if (len > desc->count) 1005 len = desc->count; 1006 desc->count -= len; 1007 desc->offset += len; 1008 xprt->tcp_offset += len; 1009 dprintk("RPC: discarded %Zu bytes\n", len); 1010 tcp_check_recm(xprt); 1011} 1012 1013/* 1014 * TCP record receive routine 1015 * We first have to grab the record marker, then the XID, then the data. 1016 */ 1017static int 1018tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, 1019 unsigned int offset, size_t len) 1020{ 1021 struct rpc_xprt *xprt = rd_desc->arg.data; 1022 skb_reader_t desc = { 1023 .skb = skb, 1024 .offset = offset, 1025 .count = len, 1026 .csum = 0 1027 }; 1028 1029 dprintk("RPC: tcp_data_recv\n"); 1030 do { 1031 /* Read in a new fragment marker if necessary */ 1032 /* Can we ever really expect to get completely empty fragments? */ 1033 if (xprt->tcp_flags & XPRT_COPY_RECM) { 1034 tcp_read_fraghdr(xprt, &desc); 1035 continue; 1036 } 1037 /* Read in the xid if necessary */ 1038 if (xprt->tcp_flags & XPRT_COPY_XID) { 1039 tcp_read_xid(xprt, &desc); 1040 continue; 1041 } 1042 /* Read in the request data */ 1043 if (xprt->tcp_flags & XPRT_COPY_DATA) { 1044 tcp_read_request(xprt, &desc); 1045 continue; 1046 } 1047 /* Skip over any trailing bytes on short reads */ 1048 tcp_read_discard(xprt, &desc); 1049 } while (desc.count); 1050 dprintk("RPC: tcp_data_recv done\n"); 1051 return len - desc.count; 1052} 1053 1054static void tcp_data_ready(struct sock *sk, int bytes) 1055{ 1056 struct rpc_xprt *xprt; 1057 read_descriptor_t rd_desc; 1058 1059 read_lock(&sk->sk_callback_lock); 1060 dprintk("RPC: tcp_data_ready...\n"); 1061 if (!(xprt = xprt_from_sock(sk))) { 1062 printk("RPC: tcp_data_ready socket info not found!\n"); 1063 goto out; 1064 } 1065 if (xprt->shutdown) 1066 goto out; 1067 1068 /* We use rd_desc to pass struct xprt to tcp_data_recv */ 1069 rd_desc.arg.data = xprt; 1070 rd_desc.count = 65536; 1071 tcp_read_sock(sk, &rd_desc, tcp_data_recv); 1072out: 1073 read_unlock(&sk->sk_callback_lock); 1074} 1075 1076static void 1077tcp_state_change(struct sock *sk) 1078{ 1079 struct rpc_xprt *xprt; 1080 1081 read_lock(&sk->sk_callback_lock); 1082 if (!(xprt = xprt_from_sock(sk))) 1083 goto out; 1084 dprintk("RPC: tcp_state_change client %p...\n", xprt); 1085 dprintk("RPC: state %x conn %d dead %d zapped %d\n", 1086 sk->sk_state, xprt_connected(xprt), 1087 sock_flag(sk, SOCK_DEAD), 1088 sock_flag(sk, SOCK_ZAPPED)); 1089 1090 switch (sk->sk_state) { 1091 case TCP_ESTABLISHED: 1092 spin_lock_bh(&xprt->sock_lock); 1093 if (!xprt_test_and_set_connected(xprt)) { 1094 /* Reset TCP record info */ 1095 xprt->tcp_offset = 0; 1096 xprt->tcp_reclen = 0; 1097 xprt->tcp_copied = 0; 1098 xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; 1099 rpc_wake_up(&xprt->pending); 1100 } 1101 spin_unlock_bh(&xprt->sock_lock); 1102 break; 1103 case TCP_SYN_SENT: 1104 case TCP_SYN_RECV: 1105 break; 1106 default: 1107 xprt_disconnect(xprt); 1108 break; 1109 } 1110 out: 1111 read_unlock(&sk->sk_callback_lock); 1112} 1113 1114/* 1115 * Called when more output buffer space is available for this socket. 1116 * We try not to wake our writers until they can make "significant" 1117 * progress, otherwise we'll waste resources thrashing sock_sendmsg 1118 * with a bunch of small requests. 1119 */ 1120static void 1121xprt_write_space(struct sock *sk) 1122{ 1123 struct rpc_xprt *xprt; 1124 struct socket *sock; 1125 1126 read_lock(&sk->sk_callback_lock); 1127 if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) 1128 goto out; 1129 if (xprt->shutdown) 1130 goto out; 1131 1132 /* Wait until we have enough socket memory */ 1133 if (xprt->stream) { 1134 /* from net/core/stream.c:sk_stream_write_space */ 1135 if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) 1136 goto out; 1137 } else { 1138 /* from net/core/sock.c:sock_def_write_space */ 1139 if (!sock_writeable(sk)) 1140 goto out; 1141 } 1142 1143 if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) 1144 goto out; 1145 1146 spin_lock_bh(&xprt->sock_lock); 1147 if (xprt->snd_task) 1148 rpc_wake_up_task(xprt->snd_task); 1149 spin_unlock_bh(&xprt->sock_lock); 1150out: 1151 read_unlock(&sk->sk_callback_lock); 1152} 1153 1154/* 1155 * RPC receive timeout handler. 1156 */ 1157static void 1158xprt_timer(struct rpc_task *task) 1159{ 1160 struct rpc_rqst *req = task->tk_rqstp; 1161 struct rpc_xprt *xprt = req->rq_xprt; 1162 1163 spin_lock(&xprt->sock_lock); 1164 if (req->rq_received) 1165 goto out; 1166 1167 xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT); 1168 __xprt_put_cong(xprt, req); 1169 1170 dprintk("RPC: %4d xprt_timer (%s request)\n", 1171 task->tk_pid, req ? "pending" : "backlogged"); 1172 1173 task->tk_status = -ETIMEDOUT; 1174out: 1175 task->tk_timeout = 0; 1176 rpc_wake_up_task(task); 1177 spin_unlock(&xprt->sock_lock); 1178} 1179 1180/* 1181 * Place the actual RPC call. 1182 * We have to copy the iovec because sendmsg fiddles with its contents. 1183 */ 1184int 1185xprt_prepare_transmit(struct rpc_task *task) 1186{ 1187 struct rpc_rqst *req = task->tk_rqstp; 1188 struct rpc_xprt *xprt = req->rq_xprt; 1189 int err = 0; 1190 1191 dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid); 1192 1193 if (xprt->shutdown) 1194 return -EIO; 1195 1196 spin_lock_bh(&xprt->sock_lock); 1197 if (req->rq_received && !req->rq_bytes_sent) { 1198 err = req->rq_received; 1199 goto out_unlock; 1200 } 1201 if (!__xprt_lock_write(xprt, task)) { 1202 err = -EAGAIN; 1203 goto out_unlock; 1204 } 1205 1206 if (!xprt_connected(xprt)) { 1207 err = -ENOTCONN; 1208 goto out_unlock; 1209 } 1210out_unlock: 1211 spin_unlock_bh(&xprt->sock_lock); 1212 return err; 1213} 1214 1215void 1216xprt_transmit(struct rpc_task *task) 1217{ 1218 struct rpc_clnt *clnt = task->tk_client; 1219 struct rpc_rqst *req = task->tk_rqstp; 1220 struct rpc_xprt *xprt = req->rq_xprt; 1221 int status, retry = 0; 1222 1223 1224 dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); 1225 1226 /* set up everything as needed. */ 1227 /* Write the record marker */ 1228 if (xprt->stream) { 1229 u32 *marker = req->rq_svec[0].iov_base; 1230 1231 *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); 1232 } 1233 1234 smp_rmb(); 1235 if (!req->rq_received) { 1236 if (list_empty(&req->rq_list)) { 1237 spin_lock_bh(&xprt->sock_lock); 1238 /* Update the softirq receive buffer */ 1239 memcpy(&req->rq_private_buf, &req->rq_rcv_buf, 1240 sizeof(req->rq_private_buf)); 1241 /* Add request to the receive list */ 1242 list_add_tail(&req->rq_list, &xprt->recv); 1243 spin_unlock_bh(&xprt->sock_lock); 1244 xprt_reset_majortimeo(req); 1245 /* Turn off autodisconnect */ 1246 del_singleshot_timer_sync(&xprt->timer); 1247 } 1248 } else if (!req->rq_bytes_sent) 1249 return; 1250 1251 /* Continue transmitting the packet/record. We must be careful 1252 * to cope with writespace callbacks arriving _after_ we have 1253 * called xprt_sendmsg(). 1254 */ 1255 while (1) { 1256 req->rq_xtime = jiffies; 1257 status = xprt_sendmsg(xprt, req); 1258 1259 if (status < 0) 1260 break; 1261 1262 if (xprt->stream) { 1263 req->rq_bytes_sent += status; 1264 1265 /* If we've sent the entire packet, immediately 1266 * reset the count of bytes sent. */ 1267 if (req->rq_bytes_sent >= req->rq_slen) { 1268 req->rq_bytes_sent = 0; 1269 goto out_receive; 1270 } 1271 } else { 1272 if (status >= req->rq_slen) 1273 goto out_receive; 1274 status = -EAGAIN; 1275 break; 1276 } 1277 1278 dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", 1279 task->tk_pid, req->rq_slen - req->rq_bytes_sent, 1280 req->rq_slen); 1281 1282 status = -EAGAIN; 1283 if (retry++ > 50) 1284 break; 1285 } 1286 1287 /* Note: at this point, task->tk_sleeping has not yet been set, 1288 * hence there is no danger of the waking up task being put on 1289 * schedq, and being picked up by a parallel run of rpciod(). 1290 */ 1291 task->tk_status = status; 1292 1293 switch (status) { 1294 case -EAGAIN: 1295 if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { 1296 /* Protect against races with xprt_write_space */ 1297 spin_lock_bh(&xprt->sock_lock); 1298 /* Don't race with disconnect */ 1299 if (!xprt_connected(xprt)) 1300 task->tk_status = -ENOTCONN; 1301 else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { 1302 task->tk_timeout = req->rq_timeout; 1303 rpc_sleep_on(&xprt->pending, task, NULL, NULL); 1304 } 1305 spin_unlock_bh(&xprt->sock_lock); 1306 return; 1307 } 1308 /* Keep holding the socket if it is blocked */ 1309 rpc_delay(task, HZ>>4); 1310 return; 1311 case -ECONNREFUSED: 1312 task->tk_timeout = RPC_REESTABLISH_TIMEOUT; 1313 rpc_sleep_on(&xprt->sending, task, NULL, NULL); 1314 case -ENOTCONN: 1315 return; 1316 default: 1317 if (xprt->stream) 1318 xprt_disconnect(xprt); 1319 } 1320 xprt_release_write(xprt, task); 1321 return; 1322 out_receive: 1323 dprintk("RPC: %4d xmit complete\n", task->tk_pid); 1324 /* Set the task's receive timeout value */ 1325 spin_lock_bh(&xprt->sock_lock); 1326 if (!xprt->nocong) { 1327 int timer = task->tk_msg.rpc_proc->p_timer; 1328 task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer); 1329 task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries; 1330 if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0) 1331 task->tk_timeout = xprt->timeout.to_maxval; 1332 } else 1333 task->tk_timeout = req->rq_timeout; 1334 /* Don't race with disconnect */ 1335 if (!xprt_connected(xprt)) 1336 task->tk_status = -ENOTCONN; 1337 else if (!req->rq_received) 1338 rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer); 1339 __xprt_release_write(xprt, task); 1340 spin_unlock_bh(&xprt->sock_lock); 1341} 1342 1343/* 1344 * Reserve an RPC call slot. 1345 */ 1346static inline void 1347do_xprt_reserve(struct rpc_task *task) 1348{ 1349 struct rpc_xprt *xprt = task->tk_xprt; 1350 1351 task->tk_status = 0; 1352 if (task->tk_rqstp) 1353 return; 1354 if (!list_empty(&xprt->free)) { 1355 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list); 1356 list_del_init(&req->rq_list); 1357 task->tk_rqstp = req; 1358 xprt_request_init(task, xprt); 1359 return; 1360 } 1361 dprintk("RPC: waiting for request slot\n"); 1362 task->tk_status = -EAGAIN; 1363 task->tk_timeout = 0; 1364 rpc_sleep_on(&xprt->backlog, task, NULL, NULL); 1365} 1366 1367void 1368xprt_reserve(struct rpc_task *task) 1369{ 1370 struct rpc_xprt *xprt = task->tk_xprt; 1371 1372 task->tk_status = -EIO; 1373 if (!xprt->shutdown) { 1374 spin_lock(&xprt->xprt_lock); 1375 do_xprt_reserve(task); 1376 spin_unlock(&xprt->xprt_lock); 1377 } 1378} 1379 1380/* 1381 * Allocate a 'unique' XID 1382 */ 1383static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt) 1384{ 1385 return xprt->xid++; 1386} 1387 1388static inline void xprt_init_xid(struct rpc_xprt *xprt) 1389{ 1390 get_random_bytes(&xprt->xid, sizeof(xprt->xid)); 1391} 1392 1393/* 1394 * Initialize RPC request 1395 */ 1396static void 1397xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) 1398{ 1399 struct rpc_rqst *req = task->tk_rqstp; 1400 1401 req->rq_timeout = xprt->timeout.to_initval; 1402 req->rq_task = task; 1403 req->rq_xprt = xprt; 1404 req->rq_xid = xprt_alloc_xid(xprt); 1405 dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, 1406 req, ntohl(req->rq_xid)); 1407} 1408 1409/* 1410 * Release an RPC call slot 1411 */ 1412void 1413xprt_release(struct rpc_task *task) 1414{ 1415 struct rpc_xprt *xprt = task->tk_xprt; 1416 struct rpc_rqst *req; 1417 1418 if (!(req = task->tk_rqstp)) 1419 return; 1420 spin_lock_bh(&xprt->sock_lock); 1421 __xprt_release_write(xprt, task); 1422 __xprt_put_cong(xprt, req); 1423 if (!list_empty(&req->rq_list)) 1424 list_del(&req->rq_list); 1425 xprt->last_used = jiffies; 1426 if (list_empty(&xprt->recv) && !xprt->shutdown) 1427 mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT); 1428 spin_unlock_bh(&xprt->sock_lock); 1429 task->tk_rqstp = NULL; 1430 memset(req, 0, sizeof(*req)); /* mark unused */ 1431 1432 dprintk("RPC: %4d release request %p\n", task->tk_pid, req); 1433 1434 spin_lock(&xprt->xprt_lock); 1435 list_add(&req->rq_list, &xprt->free); 1436 xprt_clear_backlog(xprt); 1437 spin_unlock(&xprt->xprt_lock); 1438} 1439 1440/* 1441 * Set default timeout parameters 1442 */ 1443static void 1444xprt_default_timeout(struct rpc_timeout *to, int proto) 1445{ 1446 if (proto == IPPROTO_UDP) 1447 xprt_set_timeout(to, 5, 5 * HZ); 1448 else 1449 xprt_set_timeout(to, 5, 60 * HZ); 1450} 1451 1452/* 1453 * Set constant timeout 1454 */ 1455void 1456xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) 1457{ 1458 to->to_initval = 1459 to->to_increment = incr; 1460 to->to_maxval = incr * retr; 1461 to->to_retries = retr; 1462 to->to_exponential = 0; 1463} 1464 1465unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 1466unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; 1467 1468/* 1469 * Initialize an RPC client 1470 */ 1471static struct rpc_xprt * 1472xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) 1473{ 1474 struct rpc_xprt *xprt; 1475 unsigned int entries; 1476 size_t slot_table_size; 1477 struct rpc_rqst *req; 1478 1479 dprintk("RPC: setting up %s transport...\n", 1480 proto == IPPROTO_UDP? "UDP" : "TCP"); 1481 1482 entries = (proto == IPPROTO_TCP)? 1483 xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries; 1484 1485 if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) 1486 return ERR_PTR(-ENOMEM); 1487 memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ 1488 xprt->max_reqs = entries; 1489 slot_table_size = entries * sizeof(xprt->slot[0]); 1490 xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); 1491 if (xprt->slot == NULL) { 1492 kfree(xprt); 1493 return ERR_PTR(-ENOMEM); 1494 } 1495 memset(xprt->slot, 0, slot_table_size); 1496 1497 xprt->addr = *ap; 1498 xprt->prot = proto; 1499 xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; 1500 if (xprt->stream) { 1501 xprt->cwnd = RPC_MAXCWND(xprt); 1502 xprt->nocong = 1; 1503 xprt->max_payload = (1U << 31) - 1; 1504 } else { 1505 xprt->cwnd = RPC_INITCWND; 1506 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 1507 } 1508 spin_lock_init(&xprt->sock_lock); 1509 spin_lock_init(&xprt->xprt_lock); 1510 init_waitqueue_head(&xprt->cong_wait); 1511 1512 INIT_LIST_HEAD(&xprt->free); 1513 INIT_LIST_HEAD(&xprt->recv); 1514 INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); 1515 INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); 1516 init_timer(&xprt->timer); 1517 xprt->timer.function = xprt_init_autodisconnect; 1518 xprt->timer.data = (unsigned long) xprt; 1519 xprt->last_used = jiffies; 1520 xprt->port = XPRT_MAX_RESVPORT; 1521 1522 /* Set timeout parameters */ 1523 if (to) { 1524 xprt->timeout = *to; 1525 } else 1526 xprt_default_timeout(&xprt->timeout, xprt->prot); 1527 1528 rpc_init_wait_queue(&xprt->pending, "xprt_pending"); 1529 rpc_init_wait_queue(&xprt->sending, "xprt_sending"); 1530 rpc_init_wait_queue(&xprt->resend, "xprt_resend"); 1531 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog"); 1532 1533 /* initialize free list */ 1534 for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--) 1535 list_add(&req->rq_list, &xprt->free); 1536 1537 xprt_init_xid(xprt); 1538 1539 /* Check whether we want to use a reserved port */ 1540 xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; 1541 1542 dprintk("RPC: created transport %p with %u slots\n", xprt, 1543 xprt->max_reqs); 1544 1545 return xprt; 1546} 1547 1548/* 1549 * Bind to a reserved port 1550 */ 1551static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) 1552{ 1553 struct sockaddr_in myaddr = { 1554 .sin_family = AF_INET, 1555 }; 1556 int err, port; 1557 1558 /* Were we already bound to a given port? Try to reuse it */ 1559 port = xprt->port; 1560 do { 1561 myaddr.sin_port = htons(port); 1562 err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, 1563 sizeof(myaddr)); 1564 if (err == 0) { 1565 xprt->port = port; 1566 return 0; 1567 } 1568 if (--port == 0) 1569 port = XPRT_MAX_RESVPORT; 1570 } while (err == -EADDRINUSE && port != xprt->port); 1571 1572 printk("RPC: Can't bind to reserved port (%d).\n", -err); 1573 return err; 1574} 1575 1576static void 1577xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) 1578{ 1579 struct sock *sk = sock->sk; 1580 1581 if (xprt->inet) 1582 return; 1583 1584 write_lock_bh(&sk->sk_callback_lock); 1585 sk->sk_user_data = xprt; 1586 xprt->old_data_ready = sk->sk_data_ready; 1587 xprt->old_state_change = sk->sk_state_change; 1588 xprt->old_write_space = sk->sk_write_space; 1589 if (xprt->prot == IPPROTO_UDP) { 1590 sk->sk_data_ready = udp_data_ready; 1591 sk->sk_no_check = UDP_CSUM_NORCV; 1592 xprt_set_connected(xprt); 1593 } else { 1594 tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ 1595 sk->sk_data_ready = tcp_data_ready; 1596 sk->sk_state_change = tcp_state_change; 1597 xprt_clear_connected(xprt); 1598 } 1599 sk->sk_write_space = xprt_write_space; 1600 1601 /* Reset to new socket */ 1602 xprt->sock = sock; 1603 xprt->inet = sk; 1604 write_unlock_bh(&sk->sk_callback_lock); 1605 1606 return; 1607} 1608 1609/* 1610 * Set socket buffer length 1611 */ 1612void 1613xprt_sock_setbufsize(struct rpc_xprt *xprt) 1614{ 1615 struct sock *sk = xprt->inet; 1616 1617 if (xprt->stream) 1618 return; 1619 if (xprt->rcvsize) { 1620 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 1621 sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; 1622 } 1623 if (xprt->sndsize) { 1624 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 1625 sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; 1626 sk->sk_write_space(sk); 1627 } 1628} 1629 1630/* 1631 * Datastream sockets are created here, but xprt_connect will create 1632 * and connect stream sockets. 1633 */ 1634static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) 1635{ 1636 struct socket *sock; 1637 int type, err; 1638 1639 dprintk("RPC: xprt_create_socket(%s %d)\n", 1640 (proto == IPPROTO_UDP)? "udp" : "tcp", proto); 1641 1642 type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; 1643 1644 if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { 1645 printk("RPC: can't create socket (%d).\n", -err); 1646 return NULL; 1647 } 1648 1649 /* If the caller has the capability, bind to a reserved port */ 1650 if (resvport && xprt_bindresvport(xprt, sock) < 0) { 1651 printk("RPC: can't bind to reserved port.\n"); 1652 goto failed; 1653 } 1654 1655 return sock; 1656 1657failed: 1658 sock_release(sock); 1659 return NULL; 1660} 1661 1662/* 1663 * Create an RPC client transport given the protocol and peer address. 1664 */ 1665struct rpc_xprt * 1666xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to) 1667{ 1668 struct rpc_xprt *xprt; 1669 1670 xprt = xprt_setup(proto, sap, to); 1671 if (IS_ERR(xprt)) 1672 dprintk("RPC: xprt_create_proto failed\n"); 1673 else 1674 dprintk("RPC: xprt_create_proto created xprt %p\n", xprt); 1675 return xprt; 1676} 1677 1678/* 1679 * Prepare for transport shutdown. 1680 */ 1681static void 1682xprt_shutdown(struct rpc_xprt *xprt) 1683{ 1684 xprt->shutdown = 1; 1685 rpc_wake_up(&xprt->sending); 1686 rpc_wake_up(&xprt->resend); 1687 rpc_wake_up(&xprt->pending); 1688 rpc_wake_up(&xprt->backlog); 1689 wake_up(&xprt->cong_wait); 1690 del_timer_sync(&xprt->timer); 1691 1692 /* synchronously wait for connect worker to finish */ 1693 cancel_delayed_work(&xprt->sock_connect); 1694 flush_scheduled_work(); 1695} 1696 1697/* 1698 * Clear the xprt backlog queue 1699 */ 1700static int 1701xprt_clear_backlog(struct rpc_xprt *xprt) { 1702 rpc_wake_up_next(&xprt->backlog); 1703 wake_up(&xprt->cong_wait); 1704 return 1; 1705} 1706 1707/* 1708 * Destroy an RPC transport, killing off all requests. 1709 */ 1710int 1711xprt_destroy(struct rpc_xprt *xprt) 1712{ 1713 dprintk("RPC: destroying transport %p\n", xprt); 1714 xprt_shutdown(xprt); 1715 xprt_disconnect(xprt); 1716 xprt_close(xprt); 1717 kfree(xprt->slot); 1718 kfree(xprt); 1719 1720 return 0; 1721} 1722