clnt.c revision 334ccfd545bba9690515f2c5c167d5adb161989b
1/*
2 *  linux/net/sunrpc/rpcclnt.c
3 *
4 *  This file contains the high-level RPC interface.
5 *  It is modeled as a finite state machine to support both synchronous
6 *  and asynchronous requests.
7 *
8 *  -	RPC header generation and argument serialization.
9 *  -	Credential refresh.
10 *  -	TCP connect handling.
11 *  -	Retry of operation when it is suspected the operation failed because
12 *	of uid squashing on the server, or when the credentials were stale
13 *	and need to be refreshed, or when a packet was damaged in transit.
14 *	This may be have to be moved to the VFS layer.
15 *
16 *  NB: BSD uses a more intelligent approach to guessing when a request
17 *  or reply has been lost by keeping the RTO estimate for each procedure.
18 *  We currently make do with a constant timeout value.
19 *
20 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22 */
23
24#include <asm/system.h>
25
26#include <linux/module.h>
27#include <linux/types.h>
28#include <linux/mm.h>
29#include <linux/slab.h>
30#include <linux/in.h>
31#include <linux/utsname.h>
32
33#include <linux/sunrpc/clnt.h>
34#include <linux/workqueue.h>
35#include <linux/sunrpc/rpc_pipe_fs.h>
36
37#include <linux/nfs.h>
38
39
40#define RPC_SLACK_SPACE		(1024)	/* total overkill */
41
42#ifdef RPC_DEBUG
43# define RPCDBG_FACILITY	RPCDBG_CALL
44#endif
45
46static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
47
48
49static void	call_start(struct rpc_task *task);
50static void	call_reserve(struct rpc_task *task);
51static void	call_reserveresult(struct rpc_task *task);
52static void	call_allocate(struct rpc_task *task);
53static void	call_encode(struct rpc_task *task);
54static void	call_decode(struct rpc_task *task);
55static void	call_bind(struct rpc_task *task);
56static void	call_transmit(struct rpc_task *task);
57static void	call_status(struct rpc_task *task);
58static void	call_refresh(struct rpc_task *task);
59static void	call_refreshresult(struct rpc_task *task);
60static void	call_timeout(struct rpc_task *task);
61static void	call_connect(struct rpc_task *task);
62static void	call_connect_status(struct rpc_task *task);
63static u32 *	call_header(struct rpc_task *task);
64static u32 *	call_verify(struct rpc_task *task);
65
66
67static int
68rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
69{
70	static uint32_t clntid;
71	int error;
72
73	if (dir_name == NULL)
74		return 0;
75	for (;;) {
76		snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
77				"%s/clnt%x", dir_name,
78				(unsigned int)clntid++);
79		clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
80		clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
81		if (!IS_ERR(clnt->cl_dentry))
82			return 0;
83		error = PTR_ERR(clnt->cl_dentry);
84		if (error != -EEXIST) {
85			printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
86					clnt->cl_pathname, error);
87			return error;
88		}
89	}
90}
91
92/*
93 * Create an RPC client
94 * FIXME: This should also take a flags argument (as in task->tk_flags).
95 * It's called (among others) from pmap_create_client, which may in
96 * turn be called by an async task. In this case, rpciod should not be
97 * made to sleep too long.
98 */
99struct rpc_clnt *
100rpc_create_client(struct rpc_xprt *xprt, char *servname,
101		  struct rpc_program *program, u32 vers,
102		  rpc_authflavor_t flavor)
103{
104	struct rpc_version	*version;
105	struct rpc_clnt		*clnt = NULL;
106	int err;
107	int len;
108
109	dprintk("RPC: creating %s client for %s (xprt %p)\n",
110		program->name, servname, xprt);
111
112	err = -EINVAL;
113	if (!xprt)
114		goto out_err;
115	if (vers >= program->nrvers || !(version = program->version[vers]))
116		goto out_err;
117
118	err = -ENOMEM;
119	clnt = (struct rpc_clnt *) kmalloc(sizeof(*clnt), GFP_KERNEL);
120	if (!clnt)
121		goto out_err;
122	memset(clnt, 0, sizeof(*clnt));
123	atomic_set(&clnt->cl_users, 0);
124	atomic_set(&clnt->cl_count, 1);
125	clnt->cl_parent = clnt;
126
127	clnt->cl_server = clnt->cl_inline_name;
128	len = strlen(servname) + 1;
129	if (len > sizeof(clnt->cl_inline_name)) {
130		char *buf = kmalloc(len, GFP_KERNEL);
131		if (buf != 0)
132			clnt->cl_server = buf;
133		else
134			len = sizeof(clnt->cl_inline_name);
135	}
136	strlcpy(clnt->cl_server, servname, len);
137
138	clnt->cl_xprt     = xprt;
139	clnt->cl_procinfo = version->procs;
140	clnt->cl_maxproc  = version->nrprocs;
141	clnt->cl_protname = program->name;
142	clnt->cl_pmap	  = &clnt->cl_pmap_default;
143	clnt->cl_port     = xprt->addr.sin_port;
144	clnt->cl_prog     = program->number;
145	clnt->cl_vers     = version->number;
146	clnt->cl_prot     = xprt->prot;
147	clnt->cl_stats    = program->stats;
148	rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
149
150	if (!clnt->cl_port)
151		clnt->cl_autobind = 1;
152
153	clnt->cl_rtt = &clnt->cl_rtt_default;
154	rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
155
156	err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
157	if (err < 0)
158		goto out_no_path;
159
160	err = -ENOMEM;
161	if (!rpcauth_create(flavor, clnt)) {
162		printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
163				flavor);
164		goto out_no_auth;
165	}
166
167	/* save the nodename */
168	clnt->cl_nodelen = strlen(system_utsname.nodename);
169	if (clnt->cl_nodelen > UNX_MAXNODENAME)
170		clnt->cl_nodelen = UNX_MAXNODENAME;
171	memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
172	return clnt;
173
174out_no_auth:
175	rpc_rmdir(clnt->cl_pathname);
176out_no_path:
177	if (clnt->cl_server != clnt->cl_inline_name)
178		kfree(clnt->cl_server);
179	kfree(clnt);
180out_err:
181	return ERR_PTR(err);
182}
183
184/*
185 * This function clones the RPC client structure. It allows us to share the
186 * same transport while varying parameters such as the authentication
187 * flavour.
188 */
189struct rpc_clnt *
190rpc_clone_client(struct rpc_clnt *clnt)
191{
192	struct rpc_clnt *new;
193
194	new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL);
195	if (!new)
196		goto out_no_clnt;
197	memcpy(new, clnt, sizeof(*new));
198	atomic_set(&new->cl_count, 1);
199	atomic_set(&new->cl_users, 0);
200	new->cl_parent = clnt;
201	atomic_inc(&clnt->cl_count);
202	/* Duplicate portmapper */
203	rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
204	/* Turn off autobind on clones */
205	new->cl_autobind = 0;
206	new->cl_oneshot = 0;
207	new->cl_dead = 0;
208	rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
209	if (new->cl_auth)
210		atomic_inc(&new->cl_auth->au_count);
211	return new;
212out_no_clnt:
213	printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
214	return ERR_PTR(-ENOMEM);
215}
216
217/*
218 * Properly shut down an RPC client, terminating all outstanding
219 * requests. Note that we must be certain that cl_oneshot and
220 * cl_dead are cleared, or else the client would be destroyed
221 * when the last task releases it.
222 */
223int
224rpc_shutdown_client(struct rpc_clnt *clnt)
225{
226	dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
227			clnt->cl_protname, clnt->cl_server,
228			atomic_read(&clnt->cl_users));
229
230	while (atomic_read(&clnt->cl_users) > 0) {
231		/* Don't let rpc_release_client destroy us */
232		clnt->cl_oneshot = 0;
233		clnt->cl_dead = 0;
234		rpc_killall_tasks(clnt);
235		sleep_on_timeout(&destroy_wait, 1*HZ);
236	}
237
238	if (atomic_read(&clnt->cl_users) < 0) {
239		printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
240				clnt, atomic_read(&clnt->cl_users));
241#ifdef RPC_DEBUG
242		rpc_show_tasks();
243#endif
244		BUG();
245	}
246
247	return rpc_destroy_client(clnt);
248}
249
250/*
251 * Delete an RPC client
252 */
253int
254rpc_destroy_client(struct rpc_clnt *clnt)
255{
256	if (!atomic_dec_and_test(&clnt->cl_count))
257		return 1;
258	BUG_ON(atomic_read(&clnt->cl_users) != 0);
259
260	dprintk("RPC: destroying %s client for %s\n",
261			clnt->cl_protname, clnt->cl_server);
262	if (clnt->cl_auth) {
263		rpcauth_destroy(clnt->cl_auth);
264		clnt->cl_auth = NULL;
265	}
266	if (clnt->cl_parent != clnt) {
267		rpc_destroy_client(clnt->cl_parent);
268		goto out_free;
269	}
270	if (clnt->cl_pathname[0])
271		rpc_rmdir(clnt->cl_pathname);
272	if (clnt->cl_xprt) {
273		xprt_destroy(clnt->cl_xprt);
274		clnt->cl_xprt = NULL;
275	}
276	if (clnt->cl_server != clnt->cl_inline_name)
277		kfree(clnt->cl_server);
278out_free:
279	kfree(clnt);
280	return 0;
281}
282
283/*
284 * Release an RPC client
285 */
286void
287rpc_release_client(struct rpc_clnt *clnt)
288{
289	dprintk("RPC:      rpc_release_client(%p, %d)\n",
290				clnt, atomic_read(&clnt->cl_users));
291
292	if (!atomic_dec_and_test(&clnt->cl_users))
293		return;
294	wake_up(&destroy_wait);
295	if (clnt->cl_oneshot || clnt->cl_dead)
296		rpc_destroy_client(clnt);
297}
298
299/*
300 * Default callback for async RPC calls
301 */
302static void
303rpc_default_callback(struct rpc_task *task)
304{
305}
306
307/*
308 *	Export the signal mask handling for aysnchronous code that
309 *	sleeps on RPC calls
310 */
311
312void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
313{
314	unsigned long	sigallow = sigmask(SIGKILL);
315	unsigned long	irqflags;
316
317	/* Turn off various signals */
318	if (clnt->cl_intr) {
319		struct k_sigaction *action = current->sighand->action;
320		if (action[SIGINT-1].sa.sa_handler == SIG_DFL)
321			sigallow |= sigmask(SIGINT);
322		if (action[SIGQUIT-1].sa.sa_handler == SIG_DFL)
323			sigallow |= sigmask(SIGQUIT);
324	}
325	spin_lock_irqsave(&current->sighand->siglock, irqflags);
326	*oldset = current->blocked;
327	siginitsetinv(&current->blocked, sigallow & ~oldset->sig[0]);
328	recalc_sigpending();
329	spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
330}
331
332void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
333{
334	unsigned long	irqflags;
335
336	spin_lock_irqsave(&current->sighand->siglock, irqflags);
337	current->blocked = *oldset;
338	recalc_sigpending();
339	spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
340}
341
342/*
343 * New rpc_call implementation
344 */
345int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
346{
347	struct rpc_task	*task;
348	sigset_t	oldset;
349	int		status;
350
351	/* If this client is slain all further I/O fails */
352	if (clnt->cl_dead)
353		return -EIO;
354
355	BUG_ON(flags & RPC_TASK_ASYNC);
356
357	rpc_clnt_sigmask(clnt, &oldset);
358
359	status = -ENOMEM;
360	task = rpc_new_task(clnt, NULL, flags);
361	if (task == NULL)
362		goto out;
363
364	rpc_call_setup(task, msg, 0);
365
366	/* Set up the call info struct and execute the task */
367	if (task->tk_status == 0)
368		status = rpc_execute(task);
369	else {
370		status = task->tk_status;
371		rpc_release_task(task);
372	}
373
374out:
375	rpc_clnt_sigunmask(clnt, &oldset);
376
377	return status;
378}
379
380/*
381 * New rpc_call implementation
382 */
383int
384rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
385	       rpc_action callback, void *data)
386{
387	struct rpc_task	*task;
388	sigset_t	oldset;
389	int		status;
390
391	/* If this client is slain all further I/O fails */
392	if (clnt->cl_dead)
393		return -EIO;
394
395	flags |= RPC_TASK_ASYNC;
396
397	rpc_clnt_sigmask(clnt, &oldset);
398
399	/* Create/initialize a new RPC task */
400	if (!callback)
401		callback = rpc_default_callback;
402	status = -ENOMEM;
403	if (!(task = rpc_new_task(clnt, callback, flags)))
404		goto out;
405	task->tk_calldata = data;
406
407	rpc_call_setup(task, msg, 0);
408
409	/* Set up the call info struct and execute the task */
410	status = task->tk_status;
411	if (status == 0)
412		rpc_execute(task);
413	else
414		rpc_release_task(task);
415
416out:
417	rpc_clnt_sigunmask(clnt, &oldset);
418
419	return status;
420}
421
422
423void
424rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
425{
426	task->tk_msg   = *msg;
427	task->tk_flags |= flags;
428	/* Bind the user cred */
429	if (task->tk_msg.rpc_cred != NULL)
430		rpcauth_holdcred(task);
431	else
432		rpcauth_bindcred(task);
433
434	if (task->tk_status == 0)
435		task->tk_action = call_start;
436	else
437		task->tk_action = NULL;
438}
439
440void
441rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
442{
443	struct rpc_xprt *xprt = clnt->cl_xprt;
444
445	xprt->sndsize = 0;
446	if (sndsize)
447		xprt->sndsize = sndsize + RPC_SLACK_SPACE;
448	xprt->rcvsize = 0;
449	if (rcvsize)
450		xprt->rcvsize = rcvsize + RPC_SLACK_SPACE;
451	if (xprt_connected(xprt))
452		xprt_sock_setbufsize(xprt);
453}
454
455/*
456 * Return size of largest payload RPC client can support, in bytes
457 *
458 * For stream transports, this is one RPC record fragment (see RFC
459 * 1831), as we don't support multi-record requests yet.  For datagram
460 * transports, this is the size of an IP packet minus the IP, UDP, and
461 * RPC header sizes.
462 */
463size_t rpc_max_payload(struct rpc_clnt *clnt)
464{
465	return clnt->cl_xprt->max_payload;
466}
467EXPORT_SYMBOL(rpc_max_payload);
468
469/*
470 * Restart an (async) RPC call. Usually called from within the
471 * exit handler.
472 */
473void
474rpc_restart_call(struct rpc_task *task)
475{
476	if (RPC_ASSASSINATED(task))
477		return;
478
479	task->tk_action = call_start;
480}
481
482/*
483 * 0.  Initial state
484 *
485 *     Other FSM states can be visited zero or more times, but
486 *     this state is visited exactly once for each RPC.
487 */
488static void
489call_start(struct rpc_task *task)
490{
491	struct rpc_clnt	*clnt = task->tk_client;
492
493	dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
494		clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
495		(RPC_IS_ASYNC(task) ? "async" : "sync"));
496
497	/* Increment call count */
498	task->tk_msg.rpc_proc->p_count++;
499	clnt->cl_stats->rpccnt++;
500	task->tk_action = call_reserve;
501}
502
503/*
504 * 1.	Reserve an RPC call slot
505 */
506static void
507call_reserve(struct rpc_task *task)
508{
509	dprintk("RPC: %4d call_reserve\n", task->tk_pid);
510
511	if (!rpcauth_uptodatecred(task)) {
512		task->tk_action = call_refresh;
513		return;
514	}
515
516	task->tk_status  = 0;
517	task->tk_action  = call_reserveresult;
518	xprt_reserve(task);
519}
520
521/*
522 * 1b.	Grok the result of xprt_reserve()
523 */
524static void
525call_reserveresult(struct rpc_task *task)
526{
527	int status = task->tk_status;
528
529	dprintk("RPC: %4d call_reserveresult (status %d)\n",
530				task->tk_pid, task->tk_status);
531
532	/*
533	 * After a call to xprt_reserve(), we must have either
534	 * a request slot or else an error status.
535	 */
536	task->tk_status = 0;
537	if (status >= 0) {
538		if (task->tk_rqstp) {
539			task->tk_action = call_allocate;
540			return;
541		}
542
543		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
544				__FUNCTION__, status);
545		rpc_exit(task, -EIO);
546		return;
547	}
548
549	/*
550	 * Even though there was an error, we may have acquired
551	 * a request slot somehow.  Make sure not to leak it.
552	 */
553	if (task->tk_rqstp) {
554		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
555				__FUNCTION__, status);
556		xprt_release(task);
557	}
558
559	switch (status) {
560	case -EAGAIN:	/* woken up; retry */
561		task->tk_action = call_reserve;
562		return;
563	case -EIO:	/* probably a shutdown */
564		break;
565	default:
566		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
567				__FUNCTION__, status);
568		break;
569	}
570	rpc_exit(task, status);
571}
572
573/*
574 * 2.	Allocate the buffer. For details, see sched.c:rpc_malloc.
575 *	(Note: buffer memory is freed in rpc_task_release).
576 */
577static void
578call_allocate(struct rpc_task *task)
579{
580	unsigned int	bufsiz;
581
582	dprintk("RPC: %4d call_allocate (status %d)\n",
583				task->tk_pid, task->tk_status);
584	task->tk_action = call_bind;
585	if (task->tk_buffer)
586		return;
587
588	/* FIXME: compute buffer requirements more exactly using
589	 * auth->au_wslack */
590	bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
591
592	if (rpc_malloc(task, bufsiz << 1) != NULL)
593		return;
594	printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task);
595
596	if (RPC_IS_ASYNC(task) || !(task->tk_client->cl_intr && signalled())) {
597		xprt_release(task);
598		task->tk_action = call_reserve;
599		rpc_delay(task, HZ>>4);
600		return;
601	}
602
603	rpc_exit(task, -ERESTARTSYS);
604}
605
606/*
607 * 3.	Encode arguments of an RPC call
608 */
609static void
610call_encode(struct rpc_task *task)
611{
612	struct rpc_clnt	*clnt = task->tk_client;
613	struct rpc_rqst	*req = task->tk_rqstp;
614	struct xdr_buf *sndbuf = &req->rq_snd_buf;
615	struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
616	unsigned int	bufsiz;
617	kxdrproc_t	encode;
618	int		status;
619	u32		*p;
620
621	dprintk("RPC: %4d call_encode (status %d)\n",
622				task->tk_pid, task->tk_status);
623
624	/* Default buffer setup */
625	bufsiz = task->tk_bufsize >> 1;
626	sndbuf->head[0].iov_base = (void *)task->tk_buffer;
627	sndbuf->head[0].iov_len  = bufsiz;
628	sndbuf->tail[0].iov_len  = 0;
629	sndbuf->page_len	 = 0;
630	sndbuf->len		 = 0;
631	sndbuf->buflen		 = bufsiz;
632	rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
633	rcvbuf->head[0].iov_len  = bufsiz;
634	rcvbuf->tail[0].iov_len  = 0;
635	rcvbuf->page_len	 = 0;
636	rcvbuf->len		 = 0;
637	rcvbuf->buflen		 = bufsiz;
638
639	/* Encode header and provided arguments */
640	encode = task->tk_msg.rpc_proc->p_encode;
641	if (!(p = call_header(task))) {
642		printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
643		rpc_exit(task, -EIO);
644		return;
645	}
646	if (encode && (status = rpcauth_wrap_req(task, encode, req, p,
647						 task->tk_msg.rpc_argp)) < 0) {
648		printk(KERN_WARNING "%s: can't encode arguments: %d\n",
649				clnt->cl_protname, -status);
650		rpc_exit(task, status);
651	}
652}
653
654/*
655 * 4.	Get the server port number if not yet set
656 */
657static void
658call_bind(struct rpc_task *task)
659{
660	struct rpc_clnt	*clnt = task->tk_client;
661	struct rpc_xprt *xprt = clnt->cl_xprt;
662
663	dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid,
664			xprt, (xprt_connected(xprt) ? "is" : "is not"));
665
666	task->tk_action = (xprt_connected(xprt)) ? call_transmit : call_connect;
667
668	if (!clnt->cl_port) {
669		task->tk_action = call_connect;
670		task->tk_timeout = RPC_CONNECT_TIMEOUT;
671		rpc_getport(task, clnt);
672	}
673}
674
675/*
676 * 4a.	Connect to the RPC server (TCP case)
677 */
678static void
679call_connect(struct rpc_task *task)
680{
681	struct rpc_clnt *clnt = task->tk_client;
682
683	dprintk("RPC: %4d call_connect status %d\n",
684				task->tk_pid, task->tk_status);
685
686	if (xprt_connected(clnt->cl_xprt)) {
687		task->tk_action = call_transmit;
688		return;
689	}
690	task->tk_action = call_connect_status;
691	if (task->tk_status < 0)
692		return;
693	xprt_connect(task);
694}
695
696/*
697 * 4b. Sort out connect result
698 */
699static void
700call_connect_status(struct rpc_task *task)
701{
702	struct rpc_clnt *clnt = task->tk_client;
703	int status = task->tk_status;
704
705	task->tk_status = 0;
706	if (status >= 0) {
707		clnt->cl_stats->netreconn++;
708		task->tk_action = call_transmit;
709		return;
710	}
711
712	/* Something failed: we may have to rebind */
713	if (clnt->cl_autobind)
714		clnt->cl_port = 0;
715	switch (status) {
716	case -ENOTCONN:
717	case -ETIMEDOUT:
718	case -EAGAIN:
719		task->tk_action = (clnt->cl_port == 0) ? call_bind : call_connect;
720		break;
721	default:
722		rpc_exit(task, -EIO);
723	}
724}
725
726/*
727 * 5.	Transmit the RPC request, and wait for reply
728 */
729static void
730call_transmit(struct rpc_task *task)
731{
732	dprintk("RPC: %4d call_transmit (status %d)\n",
733				task->tk_pid, task->tk_status);
734
735	task->tk_action = call_status;
736	if (task->tk_status < 0)
737		return;
738	task->tk_status = xprt_prepare_transmit(task);
739	if (task->tk_status != 0)
740		return;
741	/* Encode here so that rpcsec_gss can use correct sequence number. */
742	if (!task->tk_rqstp->rq_bytes_sent)
743		call_encode(task);
744	if (task->tk_status < 0)
745		return;
746	xprt_transmit(task);
747	if (task->tk_status < 0)
748		return;
749	if (!task->tk_msg.rpc_proc->p_decode) {
750		task->tk_action = NULL;
751		rpc_wake_up_task(task);
752	}
753}
754
755/*
756 * 6.	Sort out the RPC call status
757 */
758static void
759call_status(struct rpc_task *task)
760{
761	struct rpc_clnt	*clnt = task->tk_client;
762	struct rpc_rqst	*req = task->tk_rqstp;
763	int		status;
764
765	if (req->rq_received > 0 && !req->rq_bytes_sent)
766		task->tk_status = req->rq_received;
767
768	dprintk("RPC: %4d call_status (status %d)\n",
769				task->tk_pid, task->tk_status);
770
771	status = task->tk_status;
772	if (status >= 0) {
773		task->tk_action = call_decode;
774		return;
775	}
776
777	task->tk_status = 0;
778	switch(status) {
779	case -ETIMEDOUT:
780		task->tk_action = call_timeout;
781		break;
782	case -ECONNREFUSED:
783	case -ENOTCONN:
784		req->rq_bytes_sent = 0;
785		if (clnt->cl_autobind)
786			clnt->cl_port = 0;
787		task->tk_action = call_bind;
788		break;
789	case -EAGAIN:
790		task->tk_action = call_transmit;
791		break;
792	case -EIO:
793		/* shutdown or soft timeout */
794		rpc_exit(task, status);
795		break;
796	default:
797		if (clnt->cl_chatty)
798			printk("%s: RPC call returned error %d\n",
799			       clnt->cl_protname, -status);
800		rpc_exit(task, status);
801		break;
802	}
803}
804
805/*
806 * 6a.	Handle RPC timeout
807 * 	We do not release the request slot, so we keep using the
808 *	same XID for all retransmits.
809 */
810static void
811call_timeout(struct rpc_task *task)
812{
813	struct rpc_clnt	*clnt = task->tk_client;
814
815	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
816		dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
817		goto retry;
818	}
819
820	dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
821	if (RPC_IS_SOFT(task)) {
822		if (clnt->cl_chatty)
823			printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
824				clnt->cl_protname, clnt->cl_server);
825		rpc_exit(task, -EIO);
826		return;
827	}
828
829	if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) {
830		task->tk_flags |= RPC_CALL_MAJORSEEN;
831		printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
832			clnt->cl_protname, clnt->cl_server);
833	}
834	if (clnt->cl_autobind)
835		clnt->cl_port = 0;
836
837retry:
838	clnt->cl_stats->rpcretrans++;
839	task->tk_action = call_bind;
840	task->tk_status = 0;
841}
842
843/*
844 * 7.	Decode the RPC reply
845 */
846static void
847call_decode(struct rpc_task *task)
848{
849	struct rpc_clnt	*clnt = task->tk_client;
850	struct rpc_rqst	*req = task->tk_rqstp;
851	kxdrproc_t	decode = task->tk_msg.rpc_proc->p_decode;
852	u32		*p;
853
854	dprintk("RPC: %4d call_decode (status %d)\n",
855				task->tk_pid, task->tk_status);
856
857	if (clnt->cl_chatty && (task->tk_flags & RPC_CALL_MAJORSEEN)) {
858		printk(KERN_NOTICE "%s: server %s OK\n",
859			clnt->cl_protname, clnt->cl_server);
860		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
861	}
862
863	if (task->tk_status < 12) {
864		if (!RPC_IS_SOFT(task)) {
865			task->tk_action = call_bind;
866			clnt->cl_stats->rpcretrans++;
867			goto out_retry;
868		}
869		printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
870			clnt->cl_protname, task->tk_status);
871		rpc_exit(task, -EIO);
872		return;
873	}
874
875	req->rq_rcv_buf.len = req->rq_private_buf.len;
876
877	/* Check that the softirq receive buffer is valid */
878	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
879				sizeof(req->rq_rcv_buf)) != 0);
880
881	/* Verify the RPC header */
882	if (!(p = call_verify(task))) {
883		if (task->tk_action == NULL)
884			return;
885		goto out_retry;
886	}
887
888	task->tk_action = NULL;
889
890	if (decode)
891		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
892						      task->tk_msg.rpc_resp);
893	dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
894					task->tk_status);
895	return;
896out_retry:
897	req->rq_received = req->rq_private_buf.len = 0;
898	task->tk_status = 0;
899}
900
901/*
902 * 8.	Refresh the credentials if rejected by the server
903 */
904static void
905call_refresh(struct rpc_task *task)
906{
907	dprintk("RPC: %4d call_refresh\n", task->tk_pid);
908
909	xprt_release(task);	/* Must do to obtain new XID */
910	task->tk_action = call_refreshresult;
911	task->tk_status = 0;
912	task->tk_client->cl_stats->rpcauthrefresh++;
913	rpcauth_refreshcred(task);
914}
915
916/*
917 * 8a.	Process the results of a credential refresh
918 */
919static void
920call_refreshresult(struct rpc_task *task)
921{
922	int status = task->tk_status;
923	dprintk("RPC: %4d call_refreshresult (status %d)\n",
924				task->tk_pid, task->tk_status);
925
926	task->tk_status = 0;
927	task->tk_action = call_reserve;
928	if (status >= 0 && rpcauth_uptodatecred(task))
929		return;
930	if (status == -EACCES) {
931		rpc_exit(task, -EACCES);
932		return;
933	}
934	task->tk_action = call_refresh;
935	if (status != -ETIMEDOUT)
936		rpc_delay(task, 3*HZ);
937	return;
938}
939
940/*
941 * Call header serialization
942 */
943static u32 *
944call_header(struct rpc_task *task)
945{
946	struct rpc_clnt *clnt = task->tk_client;
947	struct rpc_xprt *xprt = clnt->cl_xprt;
948	struct rpc_rqst	*req = task->tk_rqstp;
949	u32		*p = req->rq_svec[0].iov_base;
950
951	/* FIXME: check buffer size? */
952	if (xprt->stream)
953		*p++ = 0;		/* fill in later */
954	*p++ = req->rq_xid;		/* XID */
955	*p++ = htonl(RPC_CALL);		/* CALL */
956	*p++ = htonl(RPC_VERSION);	/* RPC version */
957	*p++ = htonl(clnt->cl_prog);	/* program number */
958	*p++ = htonl(clnt->cl_vers);	/* program version */
959	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
960	p = rpcauth_marshcred(task, p);
961	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
962	return p;
963}
964
965/*
966 * Reply header verification
967 */
968static u32 *
969call_verify(struct rpc_task *task)
970{
971	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
972	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
973	u32	*p = iov->iov_base, n;
974	int error = -EACCES;
975
976	if ((len -= 3) < 0)
977		goto out_overflow;
978	p += 1;	/* skip XID */
979
980	if ((n = ntohl(*p++)) != RPC_REPLY) {
981		printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
982		goto out_retry;
983	}
984	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
985		if (--len < 0)
986			goto out_overflow;
987		switch ((n = ntohl(*p++))) {
988			case RPC_AUTH_ERROR:
989				break;
990			case RPC_MISMATCH:
991				printk(KERN_WARNING "%s: RPC call version mismatch!\n", __FUNCTION__);
992				goto out_eio;
993			default:
994				printk(KERN_WARNING "%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n);
995				goto out_eio;
996		}
997		if (--len < 0)
998			goto out_overflow;
999		switch ((n = ntohl(*p++))) {
1000		case RPC_AUTH_REJECTEDCRED:
1001		case RPC_AUTH_REJECTEDVERF:
1002		case RPCSEC_GSS_CREDPROBLEM:
1003		case RPCSEC_GSS_CTXPROBLEM:
1004			if (!task->tk_cred_retry)
1005				break;
1006			task->tk_cred_retry--;
1007			dprintk("RPC: %4d call_verify: retry stale creds\n",
1008							task->tk_pid);
1009			rpcauth_invalcred(task);
1010			task->tk_action = call_refresh;
1011			return NULL;
1012		case RPC_AUTH_BADCRED:
1013		case RPC_AUTH_BADVERF:
1014			/* possibly garbled cred/verf? */
1015			if (!task->tk_garb_retry)
1016				break;
1017			task->tk_garb_retry--;
1018			dprintk("RPC: %4d call_verify: retry garbled creds\n",
1019							task->tk_pid);
1020			task->tk_action = call_bind;
1021			return NULL;
1022		case RPC_AUTH_TOOWEAK:
1023			printk(KERN_NOTICE "call_verify: server requires stronger "
1024			       "authentication.\n");
1025			break;
1026		default:
1027			printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1028			error = -EIO;
1029		}
1030		dprintk("RPC: %4d call_verify: call rejected %d\n",
1031						task->tk_pid, n);
1032		goto out_err;
1033	}
1034	if (!(p = rpcauth_checkverf(task, p))) {
1035		printk(KERN_WARNING "call_verify: auth check failed\n");
1036		goto out_retry;		/* bad verifier, retry */
1037	}
1038	len = p - (u32 *)iov->iov_base - 1;
1039	if (len < 0)
1040		goto out_overflow;
1041	switch ((n = ntohl(*p++))) {
1042	case RPC_SUCCESS:
1043		return p;
1044	case RPC_PROG_UNAVAIL:
1045		printk(KERN_WARNING "RPC: call_verify: program %u is unsupported by server %s\n",
1046				(unsigned int)task->tk_client->cl_prog,
1047				task->tk_client->cl_server);
1048		goto out_eio;
1049	case RPC_PROG_MISMATCH:
1050		printk(KERN_WARNING "RPC: call_verify: program %u, version %u unsupported by server %s\n",
1051				(unsigned int)task->tk_client->cl_prog,
1052				(unsigned int)task->tk_client->cl_vers,
1053				task->tk_client->cl_server);
1054		goto out_eio;
1055	case RPC_PROC_UNAVAIL:
1056		printk(KERN_WARNING "RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1057				task->tk_msg.rpc_proc,
1058				task->tk_client->cl_prog,
1059				task->tk_client->cl_vers,
1060				task->tk_client->cl_server);
1061		goto out_eio;
1062	case RPC_GARBAGE_ARGS:
1063		dprintk("RPC: %4d %s: server saw garbage\n", task->tk_pid, __FUNCTION__);
1064		break;			/* retry */
1065	default:
1066		printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1067		/* Also retry */
1068	}
1069
1070out_retry:
1071	task->tk_client->cl_stats->rpcgarbage++;
1072	if (task->tk_garb_retry) {
1073		task->tk_garb_retry--;
1074		dprintk(KERN_WARNING "RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid);
1075		task->tk_action = call_bind;
1076		return NULL;
1077	}
1078	printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1079out_eio:
1080	error = -EIO;
1081out_err:
1082	rpc_exit(task, error);
1083	return NULL;
1084out_overflow:
1085	printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1086	goto out_retry;
1087}
1088