xprt.c revision c9f6cde6e26ef98ee9c4b6288b126ac9c580d88b
1/*
2 *  linux/net/sunrpc/xprt.c
3 *
4 *  This is a generic RPC call interface supporting congestion avoidance,
5 *  and asynchronous calls.
6 *
7 *  The interface works like this:
8 *
9 *  -	When a process places a call, it allocates a request slot if
10 *	one is available. Otherwise, it sleeps on the backlog queue
11 *	(xprt_reserve).
12 *  -	Next, the caller puts together the RPC message, stuffs it into
13 *	the request struct, and calls xprt_transmit().
14 *  -	xprt_transmit sends the message and installs the caller on the
15 *	transport's wait list. At the same time, it installs a timer that
16 *	is run after the packet's timeout has expired.
17 *  -	When a packet arrives, the data_ready handler walks the list of
18 *	pending requests for that transport. If a matching XID is found, the
19 *	caller is woken up, and the timer removed.
20 *  -	When no reply arrives within the timeout interval, the timer is
21 *	fired by the kernel and runs xprt_timer(). It either adjusts the
22 *	timeout values (minor timeout) or wakes up the caller with a status
23 *	of -ETIMEDOUT.
24 *  -	When the caller receives a notification from RPC that a reply arrived,
25 *	it should release the RPC slot, and process the reply.
26 *	If the call timed out, it may choose to retry the operation by
27 *	adjusting the initial timeout value, and simply calling rpc_call
28 *	again.
29 *
30 *  Support for async RPC is done through a set of RPC-specific scheduling
31 *  primitives that `transparently' work for processes as well as async
32 *  tasks that rely on callbacks.
33 *
34 *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
35 *
36 *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
37 */
38
39#include <linux/module.h>
40
41#include <linux/types.h>
42#include <linux/interrupt.h>
43#include <linux/workqueue.h>
44#include <linux/net.h>
45
46#include <linux/sunrpc/clnt.h>
47#include <linux/sunrpc/metrics.h>
48
49/*
50 * Local variables
51 */
52
53#ifdef RPC_DEBUG
54# define RPCDBG_FACILITY	RPCDBG_XPRT
55#endif
56
57/*
58 * Local functions
59 */
60static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
61static inline void	do_xprt_reserve(struct rpc_task *);
62static void	xprt_connect_status(struct rpc_task *task);
63static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
64
65static DEFINE_SPINLOCK(xprt_list_lock);
66static LIST_HEAD(xprt_list);
67
68/*
69 * The transport code maintains an estimate on the maximum number of out-
70 * standing RPC requests, using a smoothed version of the congestion
71 * avoidance implemented in 44BSD. This is basically the Van Jacobson
72 * congestion algorithm: If a retransmit occurs, the congestion window is
73 * halved; otherwise, it is incremented by 1/cwnd when
74 *
75 *	-	a reply is received and
76 *	-	a full number of requests are outstanding and
77 *	-	the congestion window hasn't been updated recently.
78 */
79#define RPC_CWNDSHIFT		(8U)
80#define RPC_CWNDSCALE		(1U << RPC_CWNDSHIFT)
81#define RPC_INITCWND		RPC_CWNDSCALE
82#define RPC_MAXCWND(xprt)	((xprt)->max_reqs << RPC_CWNDSHIFT)
83
84#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
85
86/**
87 * xprt_register_transport - register a transport implementation
88 * @transport: transport to register
89 *
90 * If a transport implementation is loaded as a kernel module, it can
91 * call this interface to make itself known to the RPC client.
92 *
93 * Returns:
94 * 0:		transport successfully registered
95 * -EEXIST:	transport already registered
96 * -EINVAL:	transport module being unloaded
97 */
98int xprt_register_transport(struct xprt_class *transport)
99{
100	struct xprt_class *t;
101	int result;
102
103	result = -EEXIST;
104	spin_lock(&xprt_list_lock);
105	list_for_each_entry(t, &xprt_list, list) {
106		/* don't register the same transport class twice */
107		if (t->ident == transport->ident)
108			goto out;
109	}
110
111	list_add_tail(&transport->list, &xprt_list);
112	printk(KERN_INFO "RPC: Registered %s transport module.\n",
113	       transport->name);
114	result = 0;
115
116out:
117	spin_unlock(&xprt_list_lock);
118	return result;
119}
120EXPORT_SYMBOL_GPL(xprt_register_transport);
121
122/**
123 * xprt_unregister_transport - unregister a transport implementation
124 * @transport: transport to unregister
125 *
126 * Returns:
127 * 0:		transport successfully unregistered
128 * -ENOENT:	transport never registered
129 */
130int xprt_unregister_transport(struct xprt_class *transport)
131{
132	struct xprt_class *t;
133	int result;
134
135	result = 0;
136	spin_lock(&xprt_list_lock);
137	list_for_each_entry(t, &xprt_list, list) {
138		if (t == transport) {
139			printk(KERN_INFO
140				"RPC: Unregistered %s transport module.\n",
141				transport->name);
142			list_del_init(&transport->list);
143			goto out;
144		}
145	}
146	result = -ENOENT;
147
148out:
149	spin_unlock(&xprt_list_lock);
150	return result;
151}
152EXPORT_SYMBOL_GPL(xprt_unregister_transport);
153
154/**
155 * xprt_reserve_xprt - serialize write access to transports
156 * @task: task that is requesting access to the transport
157 *
158 * This prevents mixing the payload of separate requests, and prevents
159 * transport connects from colliding with writes.  No congestion control
160 * is provided.
161 */
162int xprt_reserve_xprt(struct rpc_task *task)
163{
164	struct rpc_xprt	*xprt = task->tk_xprt;
165	struct rpc_rqst *req = task->tk_rqstp;
166
167	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
168		if (task == xprt->snd_task)
169			return 1;
170		if (task == NULL)
171			return 0;
172		goto out_sleep;
173	}
174	xprt->snd_task = task;
175	if (req) {
176		req->rq_bytes_sent = 0;
177		req->rq_ntrans++;
178	}
179	return 1;
180
181out_sleep:
182	dprintk("RPC: %5u failed to lock transport %p\n",
183			task->tk_pid, xprt);
184	task->tk_timeout = 0;
185	task->tk_status = -EAGAIN;
186	if (req && req->rq_ntrans)
187		rpc_sleep_on(&xprt->resend, task, NULL);
188	else
189		rpc_sleep_on(&xprt->sending, task, NULL);
190	return 0;
191}
192EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
193
194static void xprt_clear_locked(struct rpc_xprt *xprt)
195{
196	xprt->snd_task = NULL;
197	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
198		smp_mb__before_clear_bit();
199		clear_bit(XPRT_LOCKED, &xprt->state);
200		smp_mb__after_clear_bit();
201	} else
202		queue_work(rpciod_workqueue, &xprt->task_cleanup);
203}
204
205/*
206 * xprt_reserve_xprt_cong - serialize write access to transports
207 * @task: task that is requesting access to the transport
208 *
209 * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
210 * integrated into the decision of whether a request is allowed to be
211 * woken up and given access to the transport.
212 */
213int xprt_reserve_xprt_cong(struct rpc_task *task)
214{
215	struct rpc_xprt	*xprt = task->tk_xprt;
216	struct rpc_rqst *req = task->tk_rqstp;
217
218	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
219		if (task == xprt->snd_task)
220			return 1;
221		goto out_sleep;
222	}
223	if (__xprt_get_cong(xprt, task)) {
224		xprt->snd_task = task;
225		if (req) {
226			req->rq_bytes_sent = 0;
227			req->rq_ntrans++;
228		}
229		return 1;
230	}
231	xprt_clear_locked(xprt);
232out_sleep:
233	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
234	task->tk_timeout = 0;
235	task->tk_status = -EAGAIN;
236	if (req && req->rq_ntrans)
237		rpc_sleep_on(&xprt->resend, task, NULL);
238	else
239		rpc_sleep_on(&xprt->sending, task, NULL);
240	return 0;
241}
242EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
243
244static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
245{
246	int retval;
247
248	spin_lock_bh(&xprt->transport_lock);
249	retval = xprt->ops->reserve_xprt(task);
250	spin_unlock_bh(&xprt->transport_lock);
251	return retval;
252}
253
254static void __xprt_lock_write_next(struct rpc_xprt *xprt)
255{
256	struct rpc_task *task;
257	struct rpc_rqst *req;
258
259	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
260		return;
261
262	task = rpc_wake_up_next(&xprt->resend);
263	if (!task) {
264		task = rpc_wake_up_next(&xprt->sending);
265		if (!task)
266			goto out_unlock;
267	}
268
269	req = task->tk_rqstp;
270	xprt->snd_task = task;
271	if (req) {
272		req->rq_bytes_sent = 0;
273		req->rq_ntrans++;
274	}
275	return;
276
277out_unlock:
278	xprt_clear_locked(xprt);
279}
280
281static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
282{
283	struct rpc_task *task;
284
285	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
286		return;
287	if (RPCXPRT_CONGESTED(xprt))
288		goto out_unlock;
289	task = rpc_wake_up_next(&xprt->resend);
290	if (!task) {
291		task = rpc_wake_up_next(&xprt->sending);
292		if (!task)
293			goto out_unlock;
294	}
295	if (__xprt_get_cong(xprt, task)) {
296		struct rpc_rqst *req = task->tk_rqstp;
297		xprt->snd_task = task;
298		if (req) {
299			req->rq_bytes_sent = 0;
300			req->rq_ntrans++;
301		}
302		return;
303	}
304out_unlock:
305	xprt_clear_locked(xprt);
306}
307
308/**
309 * xprt_release_xprt - allow other requests to use a transport
310 * @xprt: transport with other tasks potentially waiting
311 * @task: task that is releasing access to the transport
312 *
313 * Note that "task" can be NULL.  No congestion control is provided.
314 */
315void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
316{
317	if (xprt->snd_task == task) {
318		xprt_clear_locked(xprt);
319		__xprt_lock_write_next(xprt);
320	}
321}
322EXPORT_SYMBOL_GPL(xprt_release_xprt);
323
324/**
325 * xprt_release_xprt_cong - allow other requests to use a transport
326 * @xprt: transport with other tasks potentially waiting
327 * @task: task that is releasing access to the transport
328 *
329 * Note that "task" can be NULL.  Another task is awoken to use the
330 * transport if the transport's congestion window allows it.
331 */
332void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
333{
334	if (xprt->snd_task == task) {
335		xprt_clear_locked(xprt);
336		__xprt_lock_write_next_cong(xprt);
337	}
338}
339EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
340
341static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
342{
343	spin_lock_bh(&xprt->transport_lock);
344	xprt->ops->release_xprt(xprt, task);
345	spin_unlock_bh(&xprt->transport_lock);
346}
347
348/*
349 * Van Jacobson congestion avoidance. Check if the congestion window
350 * overflowed. Put the task to sleep if this is the case.
351 */
352static int
353__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
354{
355	struct rpc_rqst *req = task->tk_rqstp;
356
357	if (req->rq_cong)
358		return 1;
359	dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
360			task->tk_pid, xprt->cong, xprt->cwnd);
361	if (RPCXPRT_CONGESTED(xprt))
362		return 0;
363	req->rq_cong = 1;
364	xprt->cong += RPC_CWNDSCALE;
365	return 1;
366}
367
368/*
369 * Adjust the congestion window, and wake up the next task
370 * that has been sleeping due to congestion
371 */
372static void
373__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
374{
375	if (!req->rq_cong)
376		return;
377	req->rq_cong = 0;
378	xprt->cong -= RPC_CWNDSCALE;
379	__xprt_lock_write_next_cong(xprt);
380}
381
382/**
383 * xprt_release_rqst_cong - housekeeping when request is complete
384 * @task: RPC request that recently completed
385 *
386 * Useful for transports that require congestion control.
387 */
388void xprt_release_rqst_cong(struct rpc_task *task)
389{
390	__xprt_put_cong(task->tk_xprt, task->tk_rqstp);
391}
392EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
393
394/**
395 * xprt_adjust_cwnd - adjust transport congestion window
396 * @task: recently completed RPC request used to adjust window
397 * @result: result code of completed RPC request
398 *
399 * We use a time-smoothed congestion estimator to avoid heavy oscillation.
400 */
401void xprt_adjust_cwnd(struct rpc_task *task, int result)
402{
403	struct rpc_rqst *req = task->tk_rqstp;
404	struct rpc_xprt *xprt = task->tk_xprt;
405	unsigned long cwnd = xprt->cwnd;
406
407	if (result >= 0 && cwnd <= xprt->cong) {
408		/* The (cwnd >> 1) term makes sure
409		 * the result gets rounded properly. */
410		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
411		if (cwnd > RPC_MAXCWND(xprt))
412			cwnd = RPC_MAXCWND(xprt);
413		__xprt_lock_write_next_cong(xprt);
414	} else if (result == -ETIMEDOUT) {
415		cwnd >>= 1;
416		if (cwnd < RPC_CWNDSCALE)
417			cwnd = RPC_CWNDSCALE;
418	}
419	dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
420			xprt->cong, xprt->cwnd, cwnd);
421	xprt->cwnd = cwnd;
422	__xprt_put_cong(xprt, req);
423}
424EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
425
426/**
427 * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
428 * @xprt: transport with waiting tasks
429 * @status: result code to plant in each task before waking it
430 *
431 */
432void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
433{
434	if (status < 0)
435		rpc_wake_up_status(&xprt->pending, status);
436	else
437		rpc_wake_up(&xprt->pending);
438}
439EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
440
441/**
442 * xprt_wait_for_buffer_space - wait for transport output buffer to clear
443 * @task: task to be put to sleep
444 * @action: function pointer to be executed after wait
445 */
446void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
447{
448	struct rpc_rqst *req = task->tk_rqstp;
449	struct rpc_xprt *xprt = req->rq_xprt;
450
451	task->tk_timeout = req->rq_timeout;
452	rpc_sleep_on(&xprt->pending, task, action);
453}
454EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
455
456/**
457 * xprt_write_space - wake the task waiting for transport output buffer space
458 * @xprt: transport with waiting tasks
459 *
460 * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
461 */
462void xprt_write_space(struct rpc_xprt *xprt)
463{
464	if (unlikely(xprt->shutdown))
465		return;
466
467	spin_lock_bh(&xprt->transport_lock);
468	if (xprt->snd_task) {
469		dprintk("RPC:       write space: waking waiting task on "
470				"xprt %p\n", xprt);
471		rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
472	}
473	spin_unlock_bh(&xprt->transport_lock);
474}
475EXPORT_SYMBOL_GPL(xprt_write_space);
476
477/**
478 * xprt_set_retrans_timeout_def - set a request's retransmit timeout
479 * @task: task whose timeout is to be set
480 *
481 * Set a request's retransmit timeout based on the transport's
482 * default timeout parameters.  Used by transports that don't adjust
483 * the retransmit timeout based on round-trip time estimation.
484 */
485void xprt_set_retrans_timeout_def(struct rpc_task *task)
486{
487	task->tk_timeout = task->tk_rqstp->rq_timeout;
488}
489EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
490
491/*
492 * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout
493 * @task: task whose timeout is to be set
494 *
495 * Set a request's retransmit timeout using the RTT estimator.
496 */
497void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
498{
499	int timer = task->tk_msg.rpc_proc->p_timer;
500	struct rpc_clnt *clnt = task->tk_client;
501	struct rpc_rtt *rtt = clnt->cl_rtt;
502	struct rpc_rqst *req = task->tk_rqstp;
503	unsigned long max_timeout = clnt->cl_timeout->to_maxval;
504
505	task->tk_timeout = rpc_calc_rto(rtt, timer);
506	task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
507	if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
508		task->tk_timeout = max_timeout;
509}
510EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
511
512static void xprt_reset_majortimeo(struct rpc_rqst *req)
513{
514	const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
515
516	req->rq_majortimeo = req->rq_timeout;
517	if (to->to_exponential)
518		req->rq_majortimeo <<= to->to_retries;
519	else
520		req->rq_majortimeo += to->to_increment * to->to_retries;
521	if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
522		req->rq_majortimeo = to->to_maxval;
523	req->rq_majortimeo += jiffies;
524}
525
526/**
527 * xprt_adjust_timeout - adjust timeout values for next retransmit
528 * @req: RPC request containing parameters to use for the adjustment
529 *
530 */
531int xprt_adjust_timeout(struct rpc_rqst *req)
532{
533	struct rpc_xprt *xprt = req->rq_xprt;
534	const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
535	int status = 0;
536
537	if (time_before(jiffies, req->rq_majortimeo)) {
538		if (to->to_exponential)
539			req->rq_timeout <<= 1;
540		else
541			req->rq_timeout += to->to_increment;
542		if (to->to_maxval && req->rq_timeout >= to->to_maxval)
543			req->rq_timeout = to->to_maxval;
544		req->rq_retries++;
545	} else {
546		req->rq_timeout = to->to_initval;
547		req->rq_retries = 0;
548		xprt_reset_majortimeo(req);
549		/* Reset the RTT counters == "slow start" */
550		spin_lock_bh(&xprt->transport_lock);
551		rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
552		spin_unlock_bh(&xprt->transport_lock);
553		status = -ETIMEDOUT;
554	}
555
556	if (req->rq_timeout == 0) {
557		printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
558		req->rq_timeout = 5 * HZ;
559	}
560	return status;
561}
562
563static void xprt_autoclose(struct work_struct *work)
564{
565	struct rpc_xprt *xprt =
566		container_of(work, struct rpc_xprt, task_cleanup);
567
568	xprt->ops->close(xprt);
569	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
570	xprt_release_write(xprt, NULL);
571}
572
573/**
574 * xprt_disconnect_done - mark a transport as disconnected
575 * @xprt: transport to flag for disconnect
576 *
577 */
578void xprt_disconnect_done(struct rpc_xprt *xprt)
579{
580	dprintk("RPC:       disconnected transport %p\n", xprt);
581	spin_lock_bh(&xprt->transport_lock);
582	xprt_clear_connected(xprt);
583	xprt_wake_pending_tasks(xprt, -ENOTCONN);
584	spin_unlock_bh(&xprt->transport_lock);
585}
586EXPORT_SYMBOL_GPL(xprt_disconnect_done);
587
588/**
589 * xprt_force_disconnect - force a transport to disconnect
590 * @xprt: transport to disconnect
591 *
592 */
593void xprt_force_disconnect(struct rpc_xprt *xprt)
594{
595	/* Don't race with the test_bit() in xprt_clear_locked() */
596	spin_lock_bh(&xprt->transport_lock);
597	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
598	/* Try to schedule an autoclose RPC call */
599	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
600		queue_work(rpciod_workqueue, &xprt->task_cleanup);
601	xprt_wake_pending_tasks(xprt, -ENOTCONN);
602	spin_unlock_bh(&xprt->transport_lock);
603}
604
605/**
606 * xprt_conditional_disconnect - force a transport to disconnect
607 * @xprt: transport to disconnect
608 * @cookie: 'connection cookie'
609 *
610 * This attempts to break the connection if and only if 'cookie' matches
611 * the current transport 'connection cookie'. It ensures that we don't
612 * try to break the connection more than once when we need to retransmit
613 * a batch of RPC requests.
614 *
615 */
616void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
617{
618	/* Don't race with the test_bit() in xprt_clear_locked() */
619	spin_lock_bh(&xprt->transport_lock);
620	if (cookie != xprt->connect_cookie)
621		goto out;
622	if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
623		goto out;
624	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
625	/* Try to schedule an autoclose RPC call */
626	if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
627		queue_work(rpciod_workqueue, &xprt->task_cleanup);
628	xprt_wake_pending_tasks(xprt, -ENOTCONN);
629out:
630	spin_unlock_bh(&xprt->transport_lock);
631}
632
633static void
634xprt_init_autodisconnect(unsigned long data)
635{
636	struct rpc_xprt *xprt = (struct rpc_xprt *)data;
637
638	spin_lock(&xprt->transport_lock);
639	if (!list_empty(&xprt->recv) || xprt->shutdown)
640		goto out_abort;
641	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
642		goto out_abort;
643	spin_unlock(&xprt->transport_lock);
644	if (xprt_connecting(xprt))
645		xprt_release_write(xprt, NULL);
646	else
647		queue_work(rpciod_workqueue, &xprt->task_cleanup);
648	return;
649out_abort:
650	spin_unlock(&xprt->transport_lock);
651}
652
653/**
654 * xprt_connect - schedule a transport connect operation
655 * @task: RPC task that is requesting the connect
656 *
657 */
658void xprt_connect(struct rpc_task *task)
659{
660	struct rpc_xprt	*xprt = task->tk_xprt;
661
662	dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
663			xprt, (xprt_connected(xprt) ? "is" : "is not"));
664
665	if (!xprt_bound(xprt)) {
666		task->tk_status = -EIO;
667		return;
668	}
669	if (!xprt_lock_write(xprt, task))
670		return;
671	if (xprt_connected(xprt))
672		xprt_release_write(xprt, task);
673	else {
674		if (task->tk_rqstp)
675			task->tk_rqstp->rq_bytes_sent = 0;
676
677		task->tk_timeout = xprt->connect_timeout;
678		rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
679		xprt->stat.connect_start = jiffies;
680		xprt->ops->connect(task);
681	}
682	return;
683}
684
685static void xprt_connect_status(struct rpc_task *task)
686{
687	struct rpc_xprt	*xprt = task->tk_xprt;
688
689	if (task->tk_status == 0) {
690		xprt->stat.connect_count++;
691		xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
692		dprintk("RPC: %5u xprt_connect_status: connection established\n",
693				task->tk_pid);
694		return;
695	}
696
697	switch (task->tk_status) {
698	case -ENOTCONN:
699		dprintk("RPC: %5u xprt_connect_status: connection broken\n",
700				task->tk_pid);
701		break;
702	case -ETIMEDOUT:
703		dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
704				"out\n", task->tk_pid);
705		break;
706	default:
707		dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
708				"server %s\n", task->tk_pid, -task->tk_status,
709				task->tk_client->cl_server);
710		xprt_release_write(xprt, task);
711		task->tk_status = -EIO;
712	}
713}
714
715/**
716 * xprt_lookup_rqst - find an RPC request corresponding to an XID
717 * @xprt: transport on which the original request was transmitted
718 * @xid: RPC XID of incoming reply
719 *
720 */
721struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
722{
723	struct list_head *pos;
724
725	list_for_each(pos, &xprt->recv) {
726		struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
727		if (entry->rq_xid == xid)
728			return entry;
729	}
730
731	dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
732			ntohl(xid));
733	xprt->stat.bad_xids++;
734	return NULL;
735}
736EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
737
738/**
739 * xprt_update_rtt - update an RPC client's RTT state after receiving a reply
740 * @task: RPC request that recently completed
741 *
742 */
743void xprt_update_rtt(struct rpc_task *task)
744{
745	struct rpc_rqst *req = task->tk_rqstp;
746	struct rpc_rtt *rtt = task->tk_client->cl_rtt;
747	unsigned timer = task->tk_msg.rpc_proc->p_timer;
748
749	if (timer) {
750		if (req->rq_ntrans == 1)
751			rpc_update_rtt(rtt, timer,
752					(long)jiffies - req->rq_xtime);
753		rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
754	}
755}
756EXPORT_SYMBOL_GPL(xprt_update_rtt);
757
758/**
759 * xprt_complete_rqst - called when reply processing is complete
760 * @task: RPC request that recently completed
761 * @copied: actual number of bytes received from the transport
762 *
763 * Caller holds transport lock.
764 */
765void xprt_complete_rqst(struct rpc_task *task, int copied)
766{
767	struct rpc_rqst *req = task->tk_rqstp;
768	struct rpc_xprt *xprt = req->rq_xprt;
769
770	dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
771			task->tk_pid, ntohl(req->rq_xid), copied);
772
773	xprt->stat.recvs++;
774	task->tk_rtt = (long)jiffies - req->rq_xtime;
775
776	list_del_init(&req->rq_list);
777	req->rq_private_buf.len = copied;
778	/* Ensure all writes are done before we update req->rq_received */
779	smp_wmb();
780	req->rq_received = copied;
781	rpc_wake_up_queued_task(&xprt->pending, task);
782}
783EXPORT_SYMBOL_GPL(xprt_complete_rqst);
784
785static void xprt_timer(struct rpc_task *task)
786{
787	struct rpc_rqst *req = task->tk_rqstp;
788	struct rpc_xprt *xprt = req->rq_xprt;
789
790	if (task->tk_status != -ETIMEDOUT)
791		return;
792	dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
793
794	spin_lock_bh(&xprt->transport_lock);
795	if (!req->rq_received) {
796		if (xprt->ops->timer)
797			xprt->ops->timer(task);
798	} else
799		task->tk_status = 0;
800	spin_unlock_bh(&xprt->transport_lock);
801}
802
803/**
804 * xprt_prepare_transmit - reserve the transport before sending a request
805 * @task: RPC task about to send a request
806 *
807 */
808int xprt_prepare_transmit(struct rpc_task *task)
809{
810	struct rpc_rqst	*req = task->tk_rqstp;
811	struct rpc_xprt	*xprt = req->rq_xprt;
812	int err = 0;
813
814	dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
815
816	spin_lock_bh(&xprt->transport_lock);
817	if (req->rq_received && !req->rq_bytes_sent) {
818		err = req->rq_received;
819		goto out_unlock;
820	}
821	if (!xprt->ops->reserve_xprt(task)) {
822		err = -EAGAIN;
823		goto out_unlock;
824	}
825
826	if (!xprt_connected(xprt)) {
827		err = -ENOTCONN;
828		goto out_unlock;
829	}
830out_unlock:
831	spin_unlock_bh(&xprt->transport_lock);
832	return err;
833}
834
835void xprt_end_transmit(struct rpc_task *task)
836{
837	xprt_release_write(task->tk_xprt, task);
838}
839
840/**
841 * xprt_transmit - send an RPC request on a transport
842 * @task: controlling RPC task
843 *
844 * We have to copy the iovec because sendmsg fiddles with its contents.
845 */
846void xprt_transmit(struct rpc_task *task)
847{
848	struct rpc_rqst	*req = task->tk_rqstp;
849	struct rpc_xprt	*xprt = req->rq_xprt;
850	int status;
851
852	dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
853
854	if (!req->rq_received) {
855		if (list_empty(&req->rq_list)) {
856			spin_lock_bh(&xprt->transport_lock);
857			/* Update the softirq receive buffer */
858			memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
859					sizeof(req->rq_private_buf));
860			/* Add request to the receive list */
861			list_add_tail(&req->rq_list, &xprt->recv);
862			spin_unlock_bh(&xprt->transport_lock);
863			xprt_reset_majortimeo(req);
864			/* Turn off autodisconnect */
865			del_singleshot_timer_sync(&xprt->timer);
866		}
867	} else if (!req->rq_bytes_sent)
868		return;
869
870	req->rq_connect_cookie = xprt->connect_cookie;
871	req->rq_xtime = jiffies;
872	status = xprt->ops->send_request(task);
873	if (status == 0) {
874		dprintk("RPC: %5u xmit complete\n", task->tk_pid);
875		spin_lock_bh(&xprt->transport_lock);
876
877		xprt->ops->set_retrans_timeout(task);
878
879		xprt->stat.sends++;
880		xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
881		xprt->stat.bklog_u += xprt->backlog.qlen;
882
883		/* Don't race with disconnect */
884		if (!xprt_connected(xprt))
885			task->tk_status = -ENOTCONN;
886		else if (!req->rq_received)
887			rpc_sleep_on(&xprt->pending, task, xprt_timer);
888		spin_unlock_bh(&xprt->transport_lock);
889		return;
890	}
891
892	/* Note: at this point, task->tk_sleeping has not yet been set,
893	 *	 hence there is no danger of the waking up task being put on
894	 *	 schedq, and being picked up by a parallel run of rpciod().
895	 */
896	task->tk_status = status;
897	if (status == -ECONNREFUSED)
898		rpc_sleep_on(&xprt->sending, task, NULL);
899}
900
901static inline void do_xprt_reserve(struct rpc_task *task)
902{
903	struct rpc_xprt	*xprt = task->tk_xprt;
904
905	task->tk_status = 0;
906	if (task->tk_rqstp)
907		return;
908	if (!list_empty(&xprt->free)) {
909		struct rpc_rqst	*req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
910		list_del_init(&req->rq_list);
911		task->tk_rqstp = req;
912		xprt_request_init(task, xprt);
913		return;
914	}
915	dprintk("RPC:       waiting for request slot\n");
916	task->tk_status = -EAGAIN;
917	task->tk_timeout = 0;
918	rpc_sleep_on(&xprt->backlog, task, NULL);
919}
920
921/**
922 * xprt_reserve - allocate an RPC request slot
923 * @task: RPC task requesting a slot allocation
924 *
925 * If no more slots are available, place the task on the transport's
926 * backlog queue.
927 */
928void xprt_reserve(struct rpc_task *task)
929{
930	struct rpc_xprt	*xprt = task->tk_xprt;
931
932	task->tk_status = -EIO;
933	spin_lock(&xprt->reserve_lock);
934	do_xprt_reserve(task);
935	spin_unlock(&xprt->reserve_lock);
936}
937
938static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
939{
940	return xprt->xid++;
941}
942
943static inline void xprt_init_xid(struct rpc_xprt *xprt)
944{
945	xprt->xid = net_random();
946}
947
948static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
949{
950	struct rpc_rqst	*req = task->tk_rqstp;
951
952	req->rq_timeout = task->tk_client->cl_timeout->to_initval;
953	req->rq_task	= task;
954	req->rq_xprt    = xprt;
955	req->rq_buffer  = NULL;
956	req->rq_xid     = xprt_alloc_xid(xprt);
957	req->rq_release_snd_buf = NULL;
958	xprt_reset_majortimeo(req);
959	dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
960			req, ntohl(req->rq_xid));
961}
962
963/**
964 * xprt_release - release an RPC request slot
965 * @task: task which is finished with the slot
966 *
967 */
968void xprt_release(struct rpc_task *task)
969{
970	struct rpc_xprt	*xprt = task->tk_xprt;
971	struct rpc_rqst	*req;
972
973	if (!(req = task->tk_rqstp))
974		return;
975	rpc_count_iostats(task);
976	spin_lock_bh(&xprt->transport_lock);
977	xprt->ops->release_xprt(xprt, task);
978	if (xprt->ops->release_request)
979		xprt->ops->release_request(task);
980	if (!list_empty(&req->rq_list))
981		list_del(&req->rq_list);
982	xprt->last_used = jiffies;
983	if (list_empty(&xprt->recv))
984		mod_timer(&xprt->timer,
985				xprt->last_used + xprt->idle_timeout);
986	spin_unlock_bh(&xprt->transport_lock);
987	xprt->ops->buf_free(req->rq_buffer);
988	task->tk_rqstp = NULL;
989	if (req->rq_release_snd_buf)
990		req->rq_release_snd_buf(req);
991	memset(req, 0, sizeof(*req));	/* mark unused */
992
993	dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
994
995	spin_lock(&xprt->reserve_lock);
996	list_add(&req->rq_list, &xprt->free);
997	rpc_wake_up_next(&xprt->backlog);
998	spin_unlock(&xprt->reserve_lock);
999}
1000
1001/**
1002 * xprt_create_transport - create an RPC transport
1003 * @args: rpc transport creation arguments
1004 *
1005 */
1006struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1007{
1008	struct rpc_xprt	*xprt;
1009	struct rpc_rqst	*req;
1010	struct xprt_class *t;
1011
1012	spin_lock(&xprt_list_lock);
1013	list_for_each_entry(t, &xprt_list, list) {
1014		if (t->ident == args->ident) {
1015			spin_unlock(&xprt_list_lock);
1016			goto found;
1017		}
1018	}
1019	spin_unlock(&xprt_list_lock);
1020	printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
1021	return ERR_PTR(-EIO);
1022
1023found:
1024	xprt = t->setup(args);
1025	if (IS_ERR(xprt)) {
1026		dprintk("RPC:       xprt_create_transport: failed, %ld\n",
1027				-PTR_ERR(xprt));
1028		return xprt;
1029	}
1030
1031	kref_init(&xprt->kref);
1032	spin_lock_init(&xprt->transport_lock);
1033	spin_lock_init(&xprt->reserve_lock);
1034
1035	INIT_LIST_HEAD(&xprt->free);
1036	INIT_LIST_HEAD(&xprt->recv);
1037	INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1038	setup_timer(&xprt->timer, xprt_init_autodisconnect,
1039			(unsigned long)xprt);
1040	xprt->last_used = jiffies;
1041	xprt->cwnd = RPC_INITCWND;
1042	xprt->bind_index = 0;
1043
1044	rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1045	rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1046	rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1047	rpc_init_wait_queue(&xprt->resend, "xprt_resend");
1048	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1049
1050	/* initialize free list */
1051	for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
1052		list_add(&req->rq_list, &xprt->free);
1053
1054	xprt_init_xid(xprt);
1055
1056	dprintk("RPC:       created transport %p with %u slots\n", xprt,
1057			xprt->max_reqs);
1058
1059	return xprt;
1060}
1061
1062/**
1063 * xprt_destroy - destroy an RPC transport, killing off all requests.
1064 * @kref: kref for the transport to destroy
1065 *
1066 */
1067static void xprt_destroy(struct kref *kref)
1068{
1069	struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
1070
1071	dprintk("RPC:       destroying transport %p\n", xprt);
1072	xprt->shutdown = 1;
1073	del_timer_sync(&xprt->timer);
1074
1075	rpc_destroy_wait_queue(&xprt->binding);
1076	rpc_destroy_wait_queue(&xprt->pending);
1077	rpc_destroy_wait_queue(&xprt->sending);
1078	rpc_destroy_wait_queue(&xprt->resend);
1079	rpc_destroy_wait_queue(&xprt->backlog);
1080	/*
1081	 * Tear down transport state and free the rpc_xprt
1082	 */
1083	xprt->ops->destroy(xprt);
1084}
1085
1086/**
1087 * xprt_put - release a reference to an RPC transport.
1088 * @xprt: pointer to the transport
1089 *
1090 */
1091void xprt_put(struct rpc_xprt *xprt)
1092{
1093	kref_put(&xprt->kref, xprt_destroy);
1094}
1095
1096/**
1097 * xprt_get - return a reference to an RPC transport.
1098 * @xprt: pointer to the transport
1099 *
1100 */
1101struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1102{
1103	kref_get(&xprt->kref);
1104	return xprt;
1105}
1106