svc_xprt.c revision f16b6e8d838b2e2bb4561201311c66ac02ad67df
1/* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7#include <linux/sched.h> 8#include <linux/smp_lock.h> 9#include <linux/errno.h> 10#include <linux/freezer.h> 11#include <linux/kthread.h> 12#include <linux/slab.h> 13#include <net/sock.h> 14#include <linux/sunrpc/stats.h> 15#include <linux/sunrpc/svc_xprt.h> 16#include <linux/sunrpc/svcsock.h> 17 18#define RPCDBG_FACILITY RPCDBG_SVCXPRT 19 20static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 21static int svc_deferred_recv(struct svc_rqst *rqstp); 22static struct cache_deferred_req *svc_defer(struct cache_req *req); 23static void svc_age_temp_xprts(unsigned long closure); 24 25/* apparently the "standard" is that clients close 26 * idle connections after 5 minutes, servers after 27 * 6 minutes 28 * http://www.connectathon.org/talks96/nfstcp.pdf 29 */ 30static int svc_conn_age_period = 6*60; 31 32/* List of registered transport classes */ 33static DEFINE_SPINLOCK(svc_xprt_class_lock); 34static LIST_HEAD(svc_xprt_class_list); 35 36/* SMP locking strategy: 37 * 38 * svc_pool->sp_lock protects most of the fields of that pool. 39 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 40 * when both need to be taken (rare), svc_serv->sv_lock is first. 41 * BKL protects svc_serv->sv_nrthread. 42 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 43 * and the ->sk_info_authunix cache. 44 * 45 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 46 * enqueued multiply. During normal transport processing this bit 47 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 48 * Providers should not manipulate this bit directly. 49 * 50 * Some flags can be set to certain values at any time 51 * providing that certain rules are followed: 52 * 53 * XPT_CONN, XPT_DATA: 54 * - Can be set or cleared at any time. 55 * - After a set, svc_xprt_enqueue must be called to enqueue 56 * the transport for processing. 57 * - After a clear, the transport must be read/accepted. 58 * If this succeeds, it must be set again. 59 * XPT_CLOSE: 60 * - Can set at any time. It is never cleared. 61 * XPT_DEAD: 62 * - Can only be set while XPT_BUSY is held which ensures 63 * that no other thread will be using the transport or will 64 * try to set XPT_DEAD. 65 */ 66 67int svc_reg_xprt_class(struct svc_xprt_class *xcl) 68{ 69 struct svc_xprt_class *cl; 70 int res = -EEXIST; 71 72 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 73 74 INIT_LIST_HEAD(&xcl->xcl_list); 75 spin_lock(&svc_xprt_class_lock); 76 /* Make sure there isn't already a class with the same name */ 77 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 78 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 79 goto out; 80 } 81 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 82 res = 0; 83out: 84 spin_unlock(&svc_xprt_class_lock); 85 return res; 86} 87EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 88 89void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 90{ 91 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 92 spin_lock(&svc_xprt_class_lock); 93 list_del_init(&xcl->xcl_list); 94 spin_unlock(&svc_xprt_class_lock); 95} 96EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 97 98/* 99 * Format the transport list for printing 100 */ 101int svc_print_xprts(char *buf, int maxlen) 102{ 103 struct list_head *le; 104 char tmpstr[80]; 105 int len = 0; 106 buf[0] = '\0'; 107 108 spin_lock(&svc_xprt_class_lock); 109 list_for_each(le, &svc_xprt_class_list) { 110 int slen; 111 struct svc_xprt_class *xcl = 112 list_entry(le, struct svc_xprt_class, xcl_list); 113 114 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 115 slen = strlen(tmpstr); 116 if (len + slen > maxlen) 117 break; 118 len += slen; 119 strcat(buf, tmpstr); 120 } 121 spin_unlock(&svc_xprt_class_lock); 122 123 return len; 124} 125 126static void svc_xprt_free(struct kref *kref) 127{ 128 struct svc_xprt *xprt = 129 container_of(kref, struct svc_xprt, xpt_ref); 130 struct module *owner = xprt->xpt_class->xcl_owner; 131 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags) && 132 xprt->xpt_auth_cache != NULL) 133 svcauth_unix_info_release(xprt->xpt_auth_cache); 134 xprt->xpt_ops->xpo_free(xprt); 135 module_put(owner); 136} 137 138void svc_xprt_put(struct svc_xprt *xprt) 139{ 140 kref_put(&xprt->xpt_ref, svc_xprt_free); 141} 142EXPORT_SYMBOL_GPL(svc_xprt_put); 143 144/* 145 * Called by transport drivers to initialize the transport independent 146 * portion of the transport instance. 147 */ 148void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 149 struct svc_serv *serv) 150{ 151 memset(xprt, 0, sizeof(*xprt)); 152 xprt->xpt_class = xcl; 153 xprt->xpt_ops = xcl->xcl_ops; 154 kref_init(&xprt->xpt_ref); 155 xprt->xpt_server = serv; 156 INIT_LIST_HEAD(&xprt->xpt_list); 157 INIT_LIST_HEAD(&xprt->xpt_ready); 158 INIT_LIST_HEAD(&xprt->xpt_deferred); 159 mutex_init(&xprt->xpt_mutex); 160 spin_lock_init(&xprt->xpt_lock); 161 set_bit(XPT_BUSY, &xprt->xpt_flags); 162 rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); 163} 164EXPORT_SYMBOL_GPL(svc_xprt_init); 165 166static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, 167 struct svc_serv *serv, 168 const int family, 169 const unsigned short port, 170 int flags) 171{ 172 struct sockaddr_in sin = { 173 .sin_family = AF_INET, 174 .sin_addr.s_addr = htonl(INADDR_ANY), 175 .sin_port = htons(port), 176 }; 177#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 178 struct sockaddr_in6 sin6 = { 179 .sin6_family = AF_INET6, 180 .sin6_addr = IN6ADDR_ANY_INIT, 181 .sin6_port = htons(port), 182 }; 183#endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 184 struct sockaddr *sap; 185 size_t len; 186 187 switch (family) { 188 case PF_INET: 189 sap = (struct sockaddr *)&sin; 190 len = sizeof(sin); 191 break; 192#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 193 case PF_INET6: 194 sap = (struct sockaddr *)&sin6; 195 len = sizeof(sin6); 196 break; 197#endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 198 default: 199 return ERR_PTR(-EAFNOSUPPORT); 200 } 201 202 return xcl->xcl_ops->xpo_create(serv, sap, len, flags); 203} 204 205int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, 206 const int family, const unsigned short port, 207 int flags) 208{ 209 struct svc_xprt_class *xcl; 210 211 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 212 spin_lock(&svc_xprt_class_lock); 213 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 214 struct svc_xprt *newxprt; 215 216 if (strcmp(xprt_name, xcl->xcl_name)) 217 continue; 218 219 if (!try_module_get(xcl->xcl_owner)) 220 goto err; 221 222 spin_unlock(&svc_xprt_class_lock); 223 newxprt = __svc_xpo_create(xcl, serv, family, port, flags); 224 if (IS_ERR(newxprt)) { 225 module_put(xcl->xcl_owner); 226 return PTR_ERR(newxprt); 227 } 228 229 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 230 spin_lock_bh(&serv->sv_lock); 231 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 232 spin_unlock_bh(&serv->sv_lock); 233 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 234 return svc_xprt_local_port(newxprt); 235 } 236 err: 237 spin_unlock(&svc_xprt_class_lock); 238 dprintk("svc: transport %s not found\n", xprt_name); 239 240 /* This errno is exposed to user space. Provide a reasonable 241 * perror msg for a bad transport. */ 242 return -EPROTONOSUPPORT; 243} 244EXPORT_SYMBOL_GPL(svc_create_xprt); 245 246/* 247 * Copy the local and remote xprt addresses to the rqstp structure 248 */ 249void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 250{ 251 struct sockaddr *sin; 252 253 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 254 rqstp->rq_addrlen = xprt->xpt_remotelen; 255 256 /* 257 * Destination address in request is needed for binding the 258 * source address in RPC replies/callbacks later. 259 */ 260 sin = (struct sockaddr *)&xprt->xpt_local; 261 switch (sin->sa_family) { 262 case AF_INET: 263 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 264 break; 265 case AF_INET6: 266 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 267 break; 268 } 269} 270EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 271 272/** 273 * svc_print_addr - Format rq_addr field for printing 274 * @rqstp: svc_rqst struct containing address to print 275 * @buf: target buffer for formatted address 276 * @len: length of target buffer 277 * 278 */ 279char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 280{ 281 return __svc_print_addr(svc_addr(rqstp), buf, len); 282} 283EXPORT_SYMBOL_GPL(svc_print_addr); 284 285/* 286 * Queue up an idle server thread. Must have pool->sp_lock held. 287 * Note: this is really a stack rather than a queue, so that we only 288 * use as many different threads as we need, and the rest don't pollute 289 * the cache. 290 */ 291static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 292{ 293 list_add(&rqstp->rq_list, &pool->sp_threads); 294} 295 296/* 297 * Dequeue an nfsd thread. Must have pool->sp_lock held. 298 */ 299static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 300{ 301 list_del(&rqstp->rq_list); 302} 303 304/* 305 * Queue up a transport with data pending. If there are idle nfsd 306 * processes, wake 'em up. 307 * 308 */ 309void svc_xprt_enqueue(struct svc_xprt *xprt) 310{ 311 struct svc_serv *serv = xprt->xpt_server; 312 struct svc_pool *pool; 313 struct svc_rqst *rqstp; 314 int cpu; 315 316 if (!(xprt->xpt_flags & 317 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 318 return; 319 320 cpu = get_cpu(); 321 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 322 put_cpu(); 323 324 spin_lock_bh(&pool->sp_lock); 325 326 if (!list_empty(&pool->sp_threads) && 327 !list_empty(&pool->sp_sockets)) 328 printk(KERN_ERR 329 "svc_xprt_enqueue: " 330 "threads and transports both waiting??\n"); 331 332 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) { 333 /* Don't enqueue dead transports */ 334 dprintk("svc: transport %p is dead, not enqueued\n", xprt); 335 goto out_unlock; 336 } 337 338 pool->sp_stats.packets++; 339 340 /* Mark transport as busy. It will remain in this state until 341 * the provider calls svc_xprt_received. We update XPT_BUSY 342 * atomically because it also guards against trying to enqueue 343 * the transport twice. 344 */ 345 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 346 /* Don't enqueue transport while already enqueued */ 347 dprintk("svc: transport %p busy, not enqueued\n", xprt); 348 goto out_unlock; 349 } 350 BUG_ON(xprt->xpt_pool != NULL); 351 xprt->xpt_pool = pool; 352 353 /* Handle pending connection */ 354 if (test_bit(XPT_CONN, &xprt->xpt_flags)) 355 goto process; 356 357 /* Handle close in-progress */ 358 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 359 goto process; 360 361 /* Check if we have space to reply to a request */ 362 if (!xprt->xpt_ops->xpo_has_wspace(xprt)) { 363 /* Don't enqueue while not enough space for reply */ 364 dprintk("svc: no write space, transport %p not enqueued\n", 365 xprt); 366 xprt->xpt_pool = NULL; 367 clear_bit(XPT_BUSY, &xprt->xpt_flags); 368 goto out_unlock; 369 } 370 371 process: 372 if (!list_empty(&pool->sp_threads)) { 373 rqstp = list_entry(pool->sp_threads.next, 374 struct svc_rqst, 375 rq_list); 376 dprintk("svc: transport %p served by daemon %p\n", 377 xprt, rqstp); 378 svc_thread_dequeue(pool, rqstp); 379 if (rqstp->rq_xprt) 380 printk(KERN_ERR 381 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 382 rqstp, rqstp->rq_xprt); 383 rqstp->rq_xprt = xprt; 384 svc_xprt_get(xprt); 385 rqstp->rq_reserved = serv->sv_max_mesg; 386 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 387 pool->sp_stats.threads_woken++; 388 BUG_ON(xprt->xpt_pool != pool); 389 wake_up(&rqstp->rq_wait); 390 } else { 391 dprintk("svc: transport %p put into queue\n", xprt); 392 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 393 pool->sp_stats.sockets_queued++; 394 BUG_ON(xprt->xpt_pool != pool); 395 } 396 397out_unlock: 398 spin_unlock_bh(&pool->sp_lock); 399} 400EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 401 402/* 403 * Dequeue the first transport. Must be called with the pool->sp_lock held. 404 */ 405static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 406{ 407 struct svc_xprt *xprt; 408 409 if (list_empty(&pool->sp_sockets)) 410 return NULL; 411 412 xprt = list_entry(pool->sp_sockets.next, 413 struct svc_xprt, xpt_ready); 414 list_del_init(&xprt->xpt_ready); 415 416 dprintk("svc: transport %p dequeued, inuse=%d\n", 417 xprt, atomic_read(&xprt->xpt_ref.refcount)); 418 419 return xprt; 420} 421 422/* 423 * svc_xprt_received conditionally queues the transport for processing 424 * by another thread. The caller must hold the XPT_BUSY bit and must 425 * not thereafter touch transport data. 426 * 427 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 428 * insufficient) data. 429 */ 430void svc_xprt_received(struct svc_xprt *xprt) 431{ 432 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 433 xprt->xpt_pool = NULL; 434 clear_bit(XPT_BUSY, &xprt->xpt_flags); 435 svc_xprt_enqueue(xprt); 436} 437EXPORT_SYMBOL_GPL(svc_xprt_received); 438 439/** 440 * svc_reserve - change the space reserved for the reply to a request. 441 * @rqstp: The request in question 442 * @space: new max space to reserve 443 * 444 * Each request reserves some space on the output queue of the transport 445 * to make sure the reply fits. This function reduces that reserved 446 * space to be the amount of space used already, plus @space. 447 * 448 */ 449void svc_reserve(struct svc_rqst *rqstp, int space) 450{ 451 space += rqstp->rq_res.head[0].iov_len; 452 453 if (space < rqstp->rq_reserved) { 454 struct svc_xprt *xprt = rqstp->rq_xprt; 455 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 456 rqstp->rq_reserved = space; 457 458 svc_xprt_enqueue(xprt); 459 } 460} 461EXPORT_SYMBOL_GPL(svc_reserve); 462 463static void svc_xprt_release(struct svc_rqst *rqstp) 464{ 465 struct svc_xprt *xprt = rqstp->rq_xprt; 466 467 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 468 469 kfree(rqstp->rq_deferred); 470 rqstp->rq_deferred = NULL; 471 472 svc_free_res_pages(rqstp); 473 rqstp->rq_res.page_len = 0; 474 rqstp->rq_res.page_base = 0; 475 476 /* Reset response buffer and release 477 * the reservation. 478 * But first, check that enough space was reserved 479 * for the reply, otherwise we have a bug! 480 */ 481 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 482 printk(KERN_ERR "RPC request reserved %d but used %d\n", 483 rqstp->rq_reserved, 484 rqstp->rq_res.len); 485 486 rqstp->rq_res.head[0].iov_len = 0; 487 svc_reserve(rqstp, 0); 488 rqstp->rq_xprt = NULL; 489 490 svc_xprt_put(xprt); 491} 492 493/* 494 * External function to wake up a server waiting for data 495 * This really only makes sense for services like lockd 496 * which have exactly one thread anyway. 497 */ 498void svc_wake_up(struct svc_serv *serv) 499{ 500 struct svc_rqst *rqstp; 501 unsigned int i; 502 struct svc_pool *pool; 503 504 for (i = 0; i < serv->sv_nrpools; i++) { 505 pool = &serv->sv_pools[i]; 506 507 spin_lock_bh(&pool->sp_lock); 508 if (!list_empty(&pool->sp_threads)) { 509 rqstp = list_entry(pool->sp_threads.next, 510 struct svc_rqst, 511 rq_list); 512 dprintk("svc: daemon %p woken up.\n", rqstp); 513 /* 514 svc_thread_dequeue(pool, rqstp); 515 rqstp->rq_xprt = NULL; 516 */ 517 wake_up(&rqstp->rq_wait); 518 } 519 spin_unlock_bh(&pool->sp_lock); 520 } 521} 522EXPORT_SYMBOL_GPL(svc_wake_up); 523 524int svc_port_is_privileged(struct sockaddr *sin) 525{ 526 switch (sin->sa_family) { 527 case AF_INET: 528 return ntohs(((struct sockaddr_in *)sin)->sin_port) 529 < PROT_SOCK; 530 case AF_INET6: 531 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 532 < PROT_SOCK; 533 default: 534 return 0; 535 } 536} 537 538/* 539 * Make sure that we don't have too many active connections. If we have, 540 * something must be dropped. It's not clear what will happen if we allow 541 * "too many" connections, but when dealing with network-facing software, 542 * we have to code defensively. Here we do that by imposing hard limits. 543 * 544 * There's no point in trying to do random drop here for DoS 545 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 546 * attacker can easily beat that. 547 * 548 * The only somewhat efficient mechanism would be if drop old 549 * connections from the same IP first. But right now we don't even 550 * record the client IP in svc_sock. 551 * 552 * single-threaded services that expect a lot of clients will probably 553 * need to set sv_maxconn to override the default value which is based 554 * on the number of threads 555 */ 556static void svc_check_conn_limits(struct svc_serv *serv) 557{ 558 unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn : 559 (serv->sv_nrthreads+3) * 20; 560 561 if (serv->sv_tmpcnt > limit) { 562 struct svc_xprt *xprt = NULL; 563 spin_lock_bh(&serv->sv_lock); 564 if (!list_empty(&serv->sv_tempsocks)) { 565 if (net_ratelimit()) { 566 /* Try to help the admin */ 567 printk(KERN_NOTICE "%s: too many open " 568 "connections, consider increasing %s\n", 569 serv->sv_name, serv->sv_maxconn ? 570 "the max number of connections." : 571 "the number of threads."); 572 } 573 /* 574 * Always select the oldest connection. It's not fair, 575 * but so is life 576 */ 577 xprt = list_entry(serv->sv_tempsocks.prev, 578 struct svc_xprt, 579 xpt_list); 580 set_bit(XPT_CLOSE, &xprt->xpt_flags); 581 svc_xprt_get(xprt); 582 } 583 spin_unlock_bh(&serv->sv_lock); 584 585 if (xprt) { 586 svc_xprt_enqueue(xprt); 587 svc_xprt_put(xprt); 588 } 589 } 590} 591 592/* 593 * Receive the next request on any transport. This code is carefully 594 * organised not to touch any cachelines in the shared svc_serv 595 * structure, only cachelines in the local svc_pool. 596 */ 597int svc_recv(struct svc_rqst *rqstp, long timeout) 598{ 599 struct svc_xprt *xprt = NULL; 600 struct svc_serv *serv = rqstp->rq_server; 601 struct svc_pool *pool = rqstp->rq_pool; 602 int len, i; 603 int pages; 604 struct xdr_buf *arg; 605 DECLARE_WAITQUEUE(wait, current); 606 long time_left; 607 608 dprintk("svc: server %p waiting for data (to = %ld)\n", 609 rqstp, timeout); 610 611 if (rqstp->rq_xprt) 612 printk(KERN_ERR 613 "svc_recv: service %p, transport not NULL!\n", 614 rqstp); 615 if (waitqueue_active(&rqstp->rq_wait)) 616 printk(KERN_ERR 617 "svc_recv: service %p, wait queue active!\n", 618 rqstp); 619 620 /* now allocate needed pages. If we get a failure, sleep briefly */ 621 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 622 for (i = 0; i < pages ; i++) 623 while (rqstp->rq_pages[i] == NULL) { 624 struct page *p = alloc_page(GFP_KERNEL); 625 if (!p) { 626 set_current_state(TASK_INTERRUPTIBLE); 627 if (signalled() || kthread_should_stop()) { 628 set_current_state(TASK_RUNNING); 629 return -EINTR; 630 } 631 schedule_timeout(msecs_to_jiffies(500)); 632 } 633 rqstp->rq_pages[i] = p; 634 } 635 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 636 BUG_ON(pages >= RPCSVC_MAXPAGES); 637 638 /* Make arg->head point to first page and arg->pages point to rest */ 639 arg = &rqstp->rq_arg; 640 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 641 arg->head[0].iov_len = PAGE_SIZE; 642 arg->pages = rqstp->rq_pages + 1; 643 arg->page_base = 0; 644 /* save at least one page for response */ 645 arg->page_len = (pages-2)*PAGE_SIZE; 646 arg->len = (pages-1)*PAGE_SIZE; 647 arg->tail[0].iov_len = 0; 648 649 try_to_freeze(); 650 cond_resched(); 651 if (signalled() || kthread_should_stop()) 652 return -EINTR; 653 654 /* Normally we will wait up to 5 seconds for any required 655 * cache information to be provided. 656 */ 657 rqstp->rq_chandle.thread_wait = 5*HZ; 658 659 spin_lock_bh(&pool->sp_lock); 660 xprt = svc_xprt_dequeue(pool); 661 if (xprt) { 662 rqstp->rq_xprt = xprt; 663 svc_xprt_get(xprt); 664 rqstp->rq_reserved = serv->sv_max_mesg; 665 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 666 667 /* As there is a shortage of threads and this request 668 * had to be queue, don't allow the thread to wait so 669 * long for cache updates. 670 */ 671 rqstp->rq_chandle.thread_wait = 1*HZ; 672 } else { 673 /* No data pending. Go to sleep */ 674 svc_thread_enqueue(pool, rqstp); 675 676 /* 677 * We have to be able to interrupt this wait 678 * to bring down the daemons ... 679 */ 680 set_current_state(TASK_INTERRUPTIBLE); 681 682 /* 683 * checking kthread_should_stop() here allows us to avoid 684 * locking and signalling when stopping kthreads that call 685 * svc_recv. If the thread has already been woken up, then 686 * we can exit here without sleeping. If not, then it 687 * it'll be woken up quickly during the schedule_timeout 688 */ 689 if (kthread_should_stop()) { 690 set_current_state(TASK_RUNNING); 691 spin_unlock_bh(&pool->sp_lock); 692 return -EINTR; 693 } 694 695 add_wait_queue(&rqstp->rq_wait, &wait); 696 spin_unlock_bh(&pool->sp_lock); 697 698 time_left = schedule_timeout(timeout); 699 700 try_to_freeze(); 701 702 spin_lock_bh(&pool->sp_lock); 703 remove_wait_queue(&rqstp->rq_wait, &wait); 704 if (!time_left) 705 pool->sp_stats.threads_timedout++; 706 707 xprt = rqstp->rq_xprt; 708 if (!xprt) { 709 svc_thread_dequeue(pool, rqstp); 710 spin_unlock_bh(&pool->sp_lock); 711 dprintk("svc: server %p, no data yet\n", rqstp); 712 if (signalled() || kthread_should_stop()) 713 return -EINTR; 714 else 715 return -EAGAIN; 716 } 717 } 718 spin_unlock_bh(&pool->sp_lock); 719 720 len = 0; 721 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 722 dprintk("svc_recv: found XPT_CLOSE\n"); 723 svc_delete_xprt(xprt); 724 } else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { 725 struct svc_xprt *newxpt; 726 newxpt = xprt->xpt_ops->xpo_accept(xprt); 727 if (newxpt) { 728 /* 729 * We know this module_get will succeed because the 730 * listener holds a reference too 731 */ 732 __module_get(newxpt->xpt_class->xcl_owner); 733 svc_check_conn_limits(xprt->xpt_server); 734 spin_lock_bh(&serv->sv_lock); 735 set_bit(XPT_TEMP, &newxpt->xpt_flags); 736 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 737 serv->sv_tmpcnt++; 738 if (serv->sv_temptimer.function == NULL) { 739 /* setup timer to age temp transports */ 740 setup_timer(&serv->sv_temptimer, 741 svc_age_temp_xprts, 742 (unsigned long)serv); 743 mod_timer(&serv->sv_temptimer, 744 jiffies + svc_conn_age_period * HZ); 745 } 746 spin_unlock_bh(&serv->sv_lock); 747 svc_xprt_received(newxpt); 748 } 749 svc_xprt_received(xprt); 750 } else { 751 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 752 rqstp, pool->sp_id, xprt, 753 atomic_read(&xprt->xpt_ref.refcount)); 754 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 755 if (rqstp->rq_deferred) { 756 svc_xprt_received(xprt); 757 len = svc_deferred_recv(rqstp); 758 } else { 759 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 760 svc_xprt_received(xprt); 761 } 762 dprintk("svc: got len=%d\n", len); 763 } 764 765 /* No data, incomplete (TCP) read, or accept() */ 766 if (len == 0 || len == -EAGAIN) { 767 rqstp->rq_res.len = 0; 768 svc_xprt_release(rqstp); 769 return -EAGAIN; 770 } 771 clear_bit(XPT_OLD, &xprt->xpt_flags); 772 773 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 774 rqstp->rq_chandle.defer = svc_defer; 775 776 if (serv->sv_stats) 777 serv->sv_stats->netcnt++; 778 return len; 779} 780EXPORT_SYMBOL_GPL(svc_recv); 781 782/* 783 * Drop request 784 */ 785void svc_drop(struct svc_rqst *rqstp) 786{ 787 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 788 svc_xprt_release(rqstp); 789} 790EXPORT_SYMBOL_GPL(svc_drop); 791 792/* 793 * Return reply to client. 794 */ 795int svc_send(struct svc_rqst *rqstp) 796{ 797 struct svc_xprt *xprt; 798 int len; 799 struct xdr_buf *xb; 800 801 xprt = rqstp->rq_xprt; 802 if (!xprt) 803 return -EFAULT; 804 805 /* release the receive skb before sending the reply */ 806 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 807 808 /* calculate over-all length */ 809 xb = &rqstp->rq_res; 810 xb->len = xb->head[0].iov_len + 811 xb->page_len + 812 xb->tail[0].iov_len; 813 814 /* Grab mutex to serialize outgoing data. */ 815 mutex_lock(&xprt->xpt_mutex); 816 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 817 len = -ENOTCONN; 818 else 819 len = xprt->xpt_ops->xpo_sendto(rqstp); 820 mutex_unlock(&xprt->xpt_mutex); 821 rpc_wake_up(&xprt->xpt_bc_pending); 822 svc_xprt_release(rqstp); 823 824 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 825 return 0; 826 return len; 827} 828 829/* 830 * Timer function to close old temporary transports, using 831 * a mark-and-sweep algorithm. 832 */ 833static void svc_age_temp_xprts(unsigned long closure) 834{ 835 struct svc_serv *serv = (struct svc_serv *)closure; 836 struct svc_xprt *xprt; 837 struct list_head *le, *next; 838 LIST_HEAD(to_be_aged); 839 840 dprintk("svc_age_temp_xprts\n"); 841 842 if (!spin_trylock_bh(&serv->sv_lock)) { 843 /* busy, try again 1 sec later */ 844 dprintk("svc_age_temp_xprts: busy\n"); 845 mod_timer(&serv->sv_temptimer, jiffies + HZ); 846 return; 847 } 848 849 list_for_each_safe(le, next, &serv->sv_tempsocks) { 850 xprt = list_entry(le, struct svc_xprt, xpt_list); 851 852 /* First time through, just mark it OLD. Second time 853 * through, close it. */ 854 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 855 continue; 856 if (atomic_read(&xprt->xpt_ref.refcount) > 1 || 857 test_bit(XPT_BUSY, &xprt->xpt_flags)) 858 continue; 859 svc_xprt_get(xprt); 860 list_move(le, &to_be_aged); 861 set_bit(XPT_CLOSE, &xprt->xpt_flags); 862 set_bit(XPT_DETACHED, &xprt->xpt_flags); 863 } 864 spin_unlock_bh(&serv->sv_lock); 865 866 while (!list_empty(&to_be_aged)) { 867 le = to_be_aged.next; 868 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 869 list_del_init(le); 870 xprt = list_entry(le, struct svc_xprt, xpt_list); 871 872 dprintk("queuing xprt %p for closing\n", xprt); 873 874 /* a thread will dequeue and close it soon */ 875 svc_xprt_enqueue(xprt); 876 svc_xprt_put(xprt); 877 } 878 879 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 880} 881 882/* 883 * Remove a dead transport 884 */ 885void svc_delete_xprt(struct svc_xprt *xprt) 886{ 887 struct svc_serv *serv = xprt->xpt_server; 888 struct svc_deferred_req *dr; 889 890 /* Only do this once */ 891 if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) 892 return; 893 894 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 895 xprt->xpt_ops->xpo_detach(xprt); 896 897 spin_lock_bh(&serv->sv_lock); 898 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 899 list_del_init(&xprt->xpt_list); 900 /* 901 * We used to delete the transport from whichever list 902 * it's sk_xprt.xpt_ready node was on, but we don't actually 903 * need to. This is because the only time we're called 904 * while still attached to a queue, the queue itself 905 * is about to be destroyed (in svc_destroy). 906 */ 907 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 908 serv->sv_tmpcnt--; 909 spin_unlock_bh(&serv->sv_lock); 910 911 while ((dr = svc_deferred_dequeue(xprt)) != NULL) 912 kfree(dr); 913 914 svc_xprt_put(xprt); 915} 916 917void svc_close_xprt(struct svc_xprt *xprt) 918{ 919 set_bit(XPT_CLOSE, &xprt->xpt_flags); 920 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 921 /* someone else will have to effect the close */ 922 return; 923 924 svc_xprt_get(xprt); 925 svc_delete_xprt(xprt); 926 clear_bit(XPT_BUSY, &xprt->xpt_flags); 927 svc_xprt_put(xprt); 928} 929EXPORT_SYMBOL_GPL(svc_close_xprt); 930 931void svc_close_all(struct list_head *xprt_list) 932{ 933 struct svc_xprt *xprt; 934 struct svc_xprt *tmp; 935 936 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 937 set_bit(XPT_CLOSE, &xprt->xpt_flags); 938 if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { 939 /* Waiting to be processed, but no threads left, 940 * So just remove it from the waiting list 941 */ 942 list_del_init(&xprt->xpt_ready); 943 clear_bit(XPT_BUSY, &xprt->xpt_flags); 944 } 945 svc_close_xprt(xprt); 946 } 947} 948 949/* 950 * Handle defer and revisit of requests 951 */ 952 953static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 954{ 955 struct svc_deferred_req *dr = 956 container_of(dreq, struct svc_deferred_req, handle); 957 struct svc_xprt *xprt = dr->xprt; 958 959 spin_lock(&xprt->xpt_lock); 960 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 961 if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) { 962 spin_unlock(&xprt->xpt_lock); 963 dprintk("revisit canceled\n"); 964 svc_xprt_put(xprt); 965 kfree(dr); 966 return; 967 } 968 dprintk("revisit queued\n"); 969 dr->xprt = NULL; 970 list_add(&dr->handle.recent, &xprt->xpt_deferred); 971 spin_unlock(&xprt->xpt_lock); 972 svc_xprt_enqueue(xprt); 973 svc_xprt_put(xprt); 974} 975 976/* 977 * Save the request off for later processing. The request buffer looks 978 * like this: 979 * 980 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 981 * 982 * This code can only handle requests that consist of an xprt-header 983 * and rpc-header. 984 */ 985static struct cache_deferred_req *svc_defer(struct cache_req *req) 986{ 987 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 988 struct svc_deferred_req *dr; 989 990 if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral) 991 return NULL; /* if more than a page, give up FIXME */ 992 if (rqstp->rq_deferred) { 993 dr = rqstp->rq_deferred; 994 rqstp->rq_deferred = NULL; 995 } else { 996 size_t skip; 997 size_t size; 998 /* FIXME maybe discard if size too large */ 999 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 1000 dr = kmalloc(size, GFP_KERNEL); 1001 if (dr == NULL) 1002 return NULL; 1003 1004 dr->handle.owner = rqstp->rq_server; 1005 dr->prot = rqstp->rq_prot; 1006 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 1007 dr->addrlen = rqstp->rq_addrlen; 1008 dr->daddr = rqstp->rq_daddr; 1009 dr->argslen = rqstp->rq_arg.len >> 2; 1010 dr->xprt_hlen = rqstp->rq_xprt_hlen; 1011 1012 /* back up head to the start of the buffer and copy */ 1013 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 1014 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 1015 dr->argslen << 2); 1016 } 1017 svc_xprt_get(rqstp->rq_xprt); 1018 dr->xprt = rqstp->rq_xprt; 1019 1020 dr->handle.revisit = svc_revisit; 1021 return &dr->handle; 1022} 1023 1024/* 1025 * recv data from a deferred request into an active one 1026 */ 1027static int svc_deferred_recv(struct svc_rqst *rqstp) 1028{ 1029 struct svc_deferred_req *dr = rqstp->rq_deferred; 1030 1031 /* setup iov_base past transport header */ 1032 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 1033 /* The iov_len does not include the transport header bytes */ 1034 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 1035 rqstp->rq_arg.page_len = 0; 1036 /* The rq_arg.len includes the transport header bytes */ 1037 rqstp->rq_arg.len = dr->argslen<<2; 1038 rqstp->rq_prot = dr->prot; 1039 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 1040 rqstp->rq_addrlen = dr->addrlen; 1041 /* Save off transport header len in case we get deferred again */ 1042 rqstp->rq_xprt_hlen = dr->xprt_hlen; 1043 rqstp->rq_daddr = dr->daddr; 1044 rqstp->rq_respages = rqstp->rq_pages; 1045 return (dr->argslen<<2) - dr->xprt_hlen; 1046} 1047 1048 1049static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 1050{ 1051 struct svc_deferred_req *dr = NULL; 1052 1053 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 1054 return NULL; 1055 spin_lock(&xprt->xpt_lock); 1056 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 1057 if (!list_empty(&xprt->xpt_deferred)) { 1058 dr = list_entry(xprt->xpt_deferred.next, 1059 struct svc_deferred_req, 1060 handle.recent); 1061 list_del_init(&dr->handle.recent); 1062 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 1063 } 1064 spin_unlock(&xprt->xpt_lock); 1065 return dr; 1066} 1067 1068/** 1069 * svc_find_xprt - find an RPC transport instance 1070 * @serv: pointer to svc_serv to search 1071 * @xcl_name: C string containing transport's class name 1072 * @af: Address family of transport's local address 1073 * @port: transport's IP port number 1074 * 1075 * Return the transport instance pointer for the endpoint accepting 1076 * connections/peer traffic from the specified transport class, 1077 * address family and port. 1078 * 1079 * Specifying 0 for the address family or port is effectively a 1080 * wild-card, and will result in matching the first transport in the 1081 * service's list that has a matching class name. 1082 */ 1083struct svc_xprt *svc_find_xprt(struct svc_serv *serv, const char *xcl_name, 1084 const sa_family_t af, const unsigned short port) 1085{ 1086 struct svc_xprt *xprt; 1087 struct svc_xprt *found = NULL; 1088 1089 /* Sanity check the args */ 1090 if (serv == NULL || xcl_name == NULL) 1091 return found; 1092 1093 spin_lock_bh(&serv->sv_lock); 1094 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1095 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1096 continue; 1097 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1098 continue; 1099 if (port != 0 && port != svc_xprt_local_port(xprt)) 1100 continue; 1101 found = xprt; 1102 svc_xprt_get(xprt); 1103 break; 1104 } 1105 spin_unlock_bh(&serv->sv_lock); 1106 return found; 1107} 1108EXPORT_SYMBOL_GPL(svc_find_xprt); 1109 1110static int svc_one_xprt_name(const struct svc_xprt *xprt, 1111 char *pos, int remaining) 1112{ 1113 int len; 1114 1115 len = snprintf(pos, remaining, "%s %u\n", 1116 xprt->xpt_class->xcl_name, 1117 svc_xprt_local_port(xprt)); 1118 if (len >= remaining) 1119 return -ENAMETOOLONG; 1120 return len; 1121} 1122 1123/** 1124 * svc_xprt_names - format a buffer with a list of transport names 1125 * @serv: pointer to an RPC service 1126 * @buf: pointer to a buffer to be filled in 1127 * @buflen: length of buffer to be filled in 1128 * 1129 * Fills in @buf with a string containing a list of transport names, 1130 * each name terminated with '\n'. 1131 * 1132 * Returns positive length of the filled-in string on success; otherwise 1133 * a negative errno value is returned if an error occurs. 1134 */ 1135int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) 1136{ 1137 struct svc_xprt *xprt; 1138 int len, totlen; 1139 char *pos; 1140 1141 /* Sanity check args */ 1142 if (!serv) 1143 return 0; 1144 1145 spin_lock_bh(&serv->sv_lock); 1146 1147 pos = buf; 1148 totlen = 0; 1149 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1150 len = svc_one_xprt_name(xprt, pos, buflen - totlen); 1151 if (len < 0) { 1152 *buf = '\0'; 1153 totlen = len; 1154 } 1155 if (len <= 0) 1156 break; 1157 1158 pos += len; 1159 totlen += len; 1160 } 1161 1162 spin_unlock_bh(&serv->sv_lock); 1163 return totlen; 1164} 1165EXPORT_SYMBOL_GPL(svc_xprt_names); 1166 1167 1168/*----------------------------------------------------------------------------*/ 1169 1170static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) 1171{ 1172 unsigned int pidx = (unsigned int)*pos; 1173 struct svc_serv *serv = m->private; 1174 1175 dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); 1176 1177 if (!pidx) 1178 return SEQ_START_TOKEN; 1179 return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); 1180} 1181 1182static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) 1183{ 1184 struct svc_pool *pool = p; 1185 struct svc_serv *serv = m->private; 1186 1187 dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); 1188 1189 if (p == SEQ_START_TOKEN) { 1190 pool = &serv->sv_pools[0]; 1191 } else { 1192 unsigned int pidx = (pool - &serv->sv_pools[0]); 1193 if (pidx < serv->sv_nrpools-1) 1194 pool = &serv->sv_pools[pidx+1]; 1195 else 1196 pool = NULL; 1197 } 1198 ++*pos; 1199 return pool; 1200} 1201 1202static void svc_pool_stats_stop(struct seq_file *m, void *p) 1203{ 1204} 1205 1206static int svc_pool_stats_show(struct seq_file *m, void *p) 1207{ 1208 struct svc_pool *pool = p; 1209 1210 if (p == SEQ_START_TOKEN) { 1211 seq_puts(m, "# pool packets-arrived sockets-enqueued threads-woken threads-timedout\n"); 1212 return 0; 1213 } 1214 1215 seq_printf(m, "%u %lu %lu %lu %lu\n", 1216 pool->sp_id, 1217 pool->sp_stats.packets, 1218 pool->sp_stats.sockets_queued, 1219 pool->sp_stats.threads_woken, 1220 pool->sp_stats.threads_timedout); 1221 1222 return 0; 1223} 1224 1225static const struct seq_operations svc_pool_stats_seq_ops = { 1226 .start = svc_pool_stats_start, 1227 .next = svc_pool_stats_next, 1228 .stop = svc_pool_stats_stop, 1229 .show = svc_pool_stats_show, 1230}; 1231 1232int svc_pool_stats_open(struct svc_serv *serv, struct file *file) 1233{ 1234 int err; 1235 1236 err = seq_open(file, &svc_pool_stats_seq_ops); 1237 if (!err) 1238 ((struct seq_file *) file->private_data)->private = serv; 1239 return err; 1240} 1241EXPORT_SYMBOL(svc_pool_stats_open); 1242 1243/*----------------------------------------------------------------------------*/ 1244