clnt.c revision b454ae906085cf7774fb4756746680c9b03b6f84
1/*
2 *  linux/net/sunrpc/clnt.c
3 *
4 *  This file contains the high-level RPC interface.
5 *  It is modeled as a finite state machine to support both synchronous
6 *  and asynchronous requests.
7 *
8 *  -	RPC header generation and argument serialization.
9 *  -	Credential refresh.
10 *  -	TCP connect handling.
11 *  -	Retry of operation when it is suspected the operation failed because
12 *	of uid squashing on the server, or when the credentials were stale
13 *	and need to be refreshed, or when a packet was damaged in transit.
14 *	This may be have to be moved to the VFS layer.
15 *
16 *  NB: BSD uses a more intelligent approach to guessing when a request
17 *  or reply has been lost by keeping the RTO estimate for each procedure.
18 *  We currently make do with a constant timeout value.
19 *
20 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22 */
23
24#include <asm/system.h>
25
26#include <linux/module.h>
27#include <linux/types.h>
28#include <linux/mm.h>
29#include <linux/slab.h>
30#include <linux/smp_lock.h>
31#include <linux/utsname.h>
32#include <linux/workqueue.h>
33#include <linux/in6.h>
34
35#include <linux/sunrpc/clnt.h>
36#include <linux/sunrpc/rpc_pipe_fs.h>
37#include <linux/sunrpc/metrics.h>
38
39
40#ifdef RPC_DEBUG
41# define RPCDBG_FACILITY	RPCDBG_CALL
42#endif
43
44#define dprint_status(t)					\
45	dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,		\
46			__FUNCTION__, t->tk_status)
47
48/*
49 * All RPC clients are linked into this list
50 */
51static LIST_HEAD(all_clients);
52static DEFINE_SPINLOCK(rpc_client_lock);
53
54static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
55
56
57static void	call_start(struct rpc_task *task);
58static void	call_reserve(struct rpc_task *task);
59static void	call_reserveresult(struct rpc_task *task);
60static void	call_allocate(struct rpc_task *task);
61static void	call_encode(struct rpc_task *task);
62static void	call_decode(struct rpc_task *task);
63static void	call_bind(struct rpc_task *task);
64static void	call_bind_status(struct rpc_task *task);
65static void	call_transmit(struct rpc_task *task);
66static void	call_status(struct rpc_task *task);
67static void	call_transmit_status(struct rpc_task *task);
68static void	call_refresh(struct rpc_task *task);
69static void	call_refreshresult(struct rpc_task *task);
70static void	call_timeout(struct rpc_task *task);
71static void	call_connect(struct rpc_task *task);
72static void	call_connect_status(struct rpc_task *task);
73static __be32 *	call_header(struct rpc_task *task);
74static __be32 *	call_verify(struct rpc_task *task);
75
76static int	rpc_ping(struct rpc_clnt *clnt, int flags);
77
78static void rpc_register_client(struct rpc_clnt *clnt)
79{
80	spin_lock(&rpc_client_lock);
81	list_add(&clnt->cl_clients, &all_clients);
82	spin_unlock(&rpc_client_lock);
83}
84
85static void rpc_unregister_client(struct rpc_clnt *clnt)
86{
87	spin_lock(&rpc_client_lock);
88	list_del(&clnt->cl_clients);
89	spin_unlock(&rpc_client_lock);
90}
91
92static int
93rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
94{
95	static uint32_t clntid;
96	int error;
97
98	clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
99	clnt->cl_dentry = ERR_PTR(-ENOENT);
100	if (dir_name == NULL)
101		return 0;
102
103	clnt->cl_vfsmnt = rpc_get_mount();
104	if (IS_ERR(clnt->cl_vfsmnt))
105		return PTR_ERR(clnt->cl_vfsmnt);
106
107	for (;;) {
108		snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
109				"%s/clnt%x", dir_name,
110				(unsigned int)clntid++);
111		clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
112		clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
113		if (!IS_ERR(clnt->cl_dentry))
114			return 0;
115		error = PTR_ERR(clnt->cl_dentry);
116		if (error != -EEXIST) {
117			printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
118					clnt->cl_pathname, error);
119			rpc_put_mount();
120			return error;
121		}
122	}
123}
124
125static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
126{
127	struct rpc_program	*program = args->program;
128	struct rpc_version	*version;
129	struct rpc_clnt		*clnt = NULL;
130	struct rpc_auth		*auth;
131	int err;
132	size_t len;
133
134	/* sanity check the name before trying to print it */
135	err = -EINVAL;
136	len = strlen(args->servername);
137	if (len > RPC_MAXNETNAMELEN)
138		goto out_no_rpciod;
139	len++;
140
141	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
142			program->name, args->servername, xprt);
143
144	err = rpciod_up();
145	if (err)
146		goto out_no_rpciod;
147	err = -EINVAL;
148	if (!xprt)
149		goto out_no_xprt;
150
151	if (args->version >= program->nrvers)
152		goto out_err;
153	version = program->version[args->version];
154	if (version == NULL)
155		goto out_err;
156
157	err = -ENOMEM;
158	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
159	if (!clnt)
160		goto out_err;
161	clnt->cl_parent = clnt;
162
163	clnt->cl_server = clnt->cl_inline_name;
164	if (len > sizeof(clnt->cl_inline_name)) {
165		char *buf = kmalloc(len, GFP_KERNEL);
166		if (buf != NULL)
167			clnt->cl_server = buf;
168		else
169			len = sizeof(clnt->cl_inline_name);
170	}
171	strlcpy(clnt->cl_server, args->servername, len);
172
173	clnt->cl_xprt     = xprt;
174	clnt->cl_procinfo = version->procs;
175	clnt->cl_maxproc  = version->nrprocs;
176	clnt->cl_protname = program->name;
177	clnt->cl_prog     = program->number;
178	clnt->cl_vers     = version->number;
179	clnt->cl_stats    = program->stats;
180	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
181	err = -ENOMEM;
182	if (clnt->cl_metrics == NULL)
183		goto out_no_stats;
184	clnt->cl_program  = program;
185	INIT_LIST_HEAD(&clnt->cl_tasks);
186	spin_lock_init(&clnt->cl_lock);
187
188	if (!xprt_bound(clnt->cl_xprt))
189		clnt->cl_autobind = 1;
190
191	clnt->cl_timeout = xprt->timeout;
192	if (args->timeout != NULL) {
193		memcpy(&clnt->cl_timeout_default, args->timeout,
194				sizeof(clnt->cl_timeout_default));
195		clnt->cl_timeout = &clnt->cl_timeout_default;
196	}
197
198	clnt->cl_rtt = &clnt->cl_rtt_default;
199	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
200
201	kref_init(&clnt->cl_kref);
202
203	err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
204	if (err < 0)
205		goto out_no_path;
206
207	auth = rpcauth_create(args->authflavor, clnt);
208	if (IS_ERR(auth)) {
209		printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
210				args->authflavor);
211		err = PTR_ERR(auth);
212		goto out_no_auth;
213	}
214
215	/* save the nodename */
216	clnt->cl_nodelen = strlen(utsname()->nodename);
217	if (clnt->cl_nodelen > UNX_MAXNODENAME)
218		clnt->cl_nodelen = UNX_MAXNODENAME;
219	memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
220	rpc_register_client(clnt);
221	return clnt;
222
223out_no_auth:
224	if (!IS_ERR(clnt->cl_dentry)) {
225		rpc_rmdir(clnt->cl_dentry);
226		rpc_put_mount();
227	}
228out_no_path:
229	rpc_free_iostats(clnt->cl_metrics);
230out_no_stats:
231	if (clnt->cl_server != clnt->cl_inline_name)
232		kfree(clnt->cl_server);
233	kfree(clnt);
234out_err:
235	xprt_put(xprt);
236out_no_xprt:
237	rpciod_down();
238out_no_rpciod:
239	return ERR_PTR(err);
240}
241
242/*
243 * rpc_create - create an RPC client and transport with one call
244 * @args: rpc_clnt create argument structure
245 *
246 * Creates and initializes an RPC transport and an RPC client.
247 *
248 * It can ping the server in order to determine if it is up, and to see if
249 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
250 * this behavior so asynchronous tasks can also use rpc_create.
251 */
252struct rpc_clnt *rpc_create(struct rpc_create_args *args)
253{
254	struct rpc_xprt *xprt;
255	struct rpc_clnt *clnt;
256	struct xprt_create xprtargs = {
257		.ident = args->protocol,
258		.srcaddr = args->saddress,
259		.dstaddr = args->address,
260		.addrlen = args->addrsize,
261	};
262	char servername[48];
263
264	xprt = xprt_create_transport(&xprtargs);
265	if (IS_ERR(xprt))
266		return (struct rpc_clnt *)xprt;
267
268	/*
269	 * If the caller chooses not to specify a hostname, whip
270	 * up a string representation of the passed-in address.
271	 */
272	if (args->servername == NULL) {
273		servername[0] = '\0';
274		switch (args->address->sa_family) {
275		case AF_INET: {
276			struct sockaddr_in *sin =
277					(struct sockaddr_in *)args->address;
278			snprintf(servername, sizeof(servername), NIPQUAD_FMT,
279				 NIPQUAD(sin->sin_addr.s_addr));
280			break;
281		}
282		case AF_INET6: {
283			struct sockaddr_in6 *sin =
284					(struct sockaddr_in6 *)args->address;
285			snprintf(servername, sizeof(servername), NIP6_FMT,
286				 NIP6(sin->sin6_addr));
287			break;
288		}
289		default:
290			/* caller wants default server name, but
291			 * address family isn't recognized. */
292			return ERR_PTR(-EINVAL);
293		}
294		args->servername = servername;
295	}
296
297	xprt = xprt_create_transport(&xprtargs);
298	if (IS_ERR(xprt))
299		return (struct rpc_clnt *)xprt;
300
301	/*
302	 * By default, kernel RPC client connects from a reserved port.
303	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
304	 * but it is always enabled for rpciod, which handles the connect
305	 * operation.
306	 */
307	xprt->resvport = 1;
308	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
309		xprt->resvport = 0;
310
311	clnt = rpc_new_client(args, xprt);
312	if (IS_ERR(clnt))
313		return clnt;
314
315	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
316		int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
317		if (err != 0) {
318			rpc_shutdown_client(clnt);
319			return ERR_PTR(err);
320		}
321	}
322
323	clnt->cl_softrtry = 1;
324	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
325		clnt->cl_softrtry = 0;
326
327	if (args->flags & RPC_CLNT_CREATE_INTR)
328		clnt->cl_intr = 1;
329	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
330		clnt->cl_autobind = 1;
331	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
332		clnt->cl_discrtry = 1;
333
334	return clnt;
335}
336EXPORT_SYMBOL_GPL(rpc_create);
337
338/*
339 * This function clones the RPC client structure. It allows us to share the
340 * same transport while varying parameters such as the authentication
341 * flavour.
342 */
343struct rpc_clnt *
344rpc_clone_client(struct rpc_clnt *clnt)
345{
346	struct rpc_clnt *new;
347	int err = -ENOMEM;
348
349	new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
350	if (!new)
351		goto out_no_clnt;
352	new->cl_parent = clnt;
353	/* Turn off autobind on clones */
354	new->cl_autobind = 0;
355	INIT_LIST_HEAD(&new->cl_tasks);
356	spin_lock_init(&new->cl_lock);
357	rpc_init_rtt(&new->cl_rtt_default, clnt->cl_timeout->to_initval);
358	new->cl_metrics = rpc_alloc_iostats(clnt);
359	if (new->cl_metrics == NULL)
360		goto out_no_stats;
361	kref_init(&new->cl_kref);
362	err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
363	if (err != 0)
364		goto out_no_path;
365	if (new->cl_auth)
366		atomic_inc(&new->cl_auth->au_count);
367	xprt_get(clnt->cl_xprt);
368	kref_get(&clnt->cl_kref);
369	rpc_register_client(new);
370	rpciod_up();
371	return new;
372out_no_path:
373	rpc_free_iostats(new->cl_metrics);
374out_no_stats:
375	kfree(new);
376out_no_clnt:
377	dprintk("RPC:       %s: returned error %d\n", __FUNCTION__, err);
378	return ERR_PTR(err);
379}
380EXPORT_SYMBOL_GPL(rpc_clone_client);
381
382/*
383 * Properly shut down an RPC client, terminating all outstanding
384 * requests.
385 */
386void rpc_shutdown_client(struct rpc_clnt *clnt)
387{
388	dprintk("RPC:       shutting down %s client for %s\n",
389			clnt->cl_protname, clnt->cl_server);
390
391	while (!list_empty(&clnt->cl_tasks)) {
392		rpc_killall_tasks(clnt);
393		wait_event_timeout(destroy_wait,
394			list_empty(&clnt->cl_tasks), 1*HZ);
395	}
396
397	rpc_release_client(clnt);
398}
399EXPORT_SYMBOL_GPL(rpc_shutdown_client);
400
401/*
402 * Free an RPC client
403 */
404static void
405rpc_free_client(struct kref *kref)
406{
407	struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
408
409	dprintk("RPC:       destroying %s client for %s\n",
410			clnt->cl_protname, clnt->cl_server);
411	if (!IS_ERR(clnt->cl_dentry)) {
412		rpc_rmdir(clnt->cl_dentry);
413		rpc_put_mount();
414	}
415	if (clnt->cl_parent != clnt) {
416		rpc_release_client(clnt->cl_parent);
417		goto out_free;
418	}
419	if (clnt->cl_server != clnt->cl_inline_name)
420		kfree(clnt->cl_server);
421out_free:
422	rpc_unregister_client(clnt);
423	rpc_free_iostats(clnt->cl_metrics);
424	clnt->cl_metrics = NULL;
425	xprt_put(clnt->cl_xprt);
426	rpciod_down();
427	kfree(clnt);
428}
429
430/*
431 * Free an RPC client
432 */
433static void
434rpc_free_auth(struct kref *kref)
435{
436	struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
437
438	if (clnt->cl_auth == NULL) {
439		rpc_free_client(kref);
440		return;
441	}
442
443	/*
444	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
445	 *       release remaining GSS contexts. This mechanism ensures
446	 *       that it can do so safely.
447	 */
448	kref_init(kref);
449	rpcauth_release(clnt->cl_auth);
450	clnt->cl_auth = NULL;
451	kref_put(kref, rpc_free_client);
452}
453
454/*
455 * Release reference to the RPC client
456 */
457void
458rpc_release_client(struct rpc_clnt *clnt)
459{
460	dprintk("RPC:       rpc_release_client(%p)\n", clnt);
461
462	if (list_empty(&clnt->cl_tasks))
463		wake_up(&destroy_wait);
464	kref_put(&clnt->cl_kref, rpc_free_auth);
465}
466
467/**
468 * rpc_bind_new_program - bind a new RPC program to an existing client
469 * @old - old rpc_client
470 * @program - rpc program to set
471 * @vers - rpc program version
472 *
473 * Clones the rpc client and sets up a new RPC program. This is mainly
474 * of use for enabling different RPC programs to share the same transport.
475 * The Sun NFSv2/v3 ACL protocol can do this.
476 */
477struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
478				      struct rpc_program *program,
479				      u32 vers)
480{
481	struct rpc_clnt *clnt;
482	struct rpc_version *version;
483	int err;
484
485	BUG_ON(vers >= program->nrvers || !program->version[vers]);
486	version = program->version[vers];
487	clnt = rpc_clone_client(old);
488	if (IS_ERR(clnt))
489		goto out;
490	clnt->cl_procinfo = version->procs;
491	clnt->cl_maxproc  = version->nrprocs;
492	clnt->cl_protname = program->name;
493	clnt->cl_prog     = program->number;
494	clnt->cl_vers     = version->number;
495	clnt->cl_stats    = program->stats;
496	err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
497	if (err != 0) {
498		rpc_shutdown_client(clnt);
499		clnt = ERR_PTR(err);
500	}
501out:
502	return clnt;
503}
504EXPORT_SYMBOL_GPL(rpc_bind_new_program);
505
506/*
507 * Default callback for async RPC calls
508 */
509static void
510rpc_default_callback(struct rpc_task *task, void *data)
511{
512}
513
514static const struct rpc_call_ops rpc_default_ops = {
515	.rpc_call_done = rpc_default_callback,
516};
517
518/*
519 *	Export the signal mask handling for synchronous code that
520 *	sleeps on RPC calls
521 */
522#define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
523
524static void rpc_save_sigmask(sigset_t *oldset, int intr)
525{
526	unsigned long	sigallow = sigmask(SIGKILL);
527	sigset_t sigmask;
528
529	/* Block all signals except those listed in sigallow */
530	if (intr)
531		sigallow |= RPC_INTR_SIGNALS;
532	siginitsetinv(&sigmask, sigallow);
533	sigprocmask(SIG_BLOCK, &sigmask, oldset);
534}
535
536static void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
537{
538	rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
539}
540
541static void rpc_restore_sigmask(sigset_t *oldset)
542{
543	sigprocmask(SIG_SETMASK, oldset, NULL);
544}
545
546void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
547{
548	rpc_save_sigmask(oldset, clnt->cl_intr);
549}
550EXPORT_SYMBOL_GPL(rpc_clnt_sigmask);
551
552void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
553{
554	rpc_restore_sigmask(oldset);
555}
556EXPORT_SYMBOL_GPL(rpc_clnt_sigunmask);
557
558/**
559 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
560 * @task_setup_data: pointer to task initialisation data
561 */
562struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
563{
564	struct rpc_task *task, *ret;
565	sigset_t oldset;
566
567	task = rpc_new_task(task_setup_data);
568	if (task == NULL) {
569		rpc_release_calldata(task_setup_data->callback_ops,
570				task_setup_data->callback_data);
571		ret = ERR_PTR(-ENOMEM);
572		goto out;
573	}
574
575	if (task->tk_status != 0) {
576		ret = ERR_PTR(task->tk_status);
577		rpc_put_task(task);
578		goto out;
579	}
580	atomic_inc(&task->tk_count);
581	/* Mask signals on synchronous RPC calls and RPCSEC_GSS upcalls */
582	rpc_task_sigmask(task, &oldset);
583	rpc_execute(task);
584	rpc_restore_sigmask(&oldset);
585	ret = task;
586out:
587	return ret;
588}
589EXPORT_SYMBOL_GPL(rpc_run_task);
590
591/**
592 * rpc_call_sync - Perform a synchronous RPC call
593 * @clnt: pointer to RPC client
594 * @msg: RPC call parameters
595 * @flags: RPC call flags
596 */
597int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
598{
599	struct rpc_task	*task;
600	struct rpc_task_setup task_setup_data = {
601		.rpc_client = clnt,
602		.rpc_message = msg,
603		.callback_ops = &rpc_default_ops,
604		.flags = flags,
605	};
606	int status;
607
608	BUG_ON(flags & RPC_TASK_ASYNC);
609
610	task = rpc_run_task(&task_setup_data);
611	if (IS_ERR(task))
612		return PTR_ERR(task);
613	status = task->tk_status;
614	rpc_put_task(task);
615	return status;
616}
617EXPORT_SYMBOL_GPL(rpc_call_sync);
618
619/**
620 * rpc_call_async - Perform an asynchronous RPC call
621 * @clnt: pointer to RPC client
622 * @msg: RPC call parameters
623 * @flags: RPC call flags
624 * @ops: RPC call ops
625 * @data: user call data
626 */
627int
628rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
629	       const struct rpc_call_ops *tk_ops, void *data)
630{
631	struct rpc_task	*task;
632	struct rpc_task_setup task_setup_data = {
633		.rpc_client = clnt,
634		.rpc_message = msg,
635		.callback_ops = tk_ops,
636		.callback_data = data,
637		.flags = flags|RPC_TASK_ASYNC,
638	};
639
640	task = rpc_run_task(&task_setup_data);
641	if (IS_ERR(task))
642		return PTR_ERR(task);
643	rpc_put_task(task);
644	return 0;
645}
646EXPORT_SYMBOL_GPL(rpc_call_async);
647
648void
649rpc_call_start(struct rpc_task *task)
650{
651	task->tk_action = call_start;
652}
653EXPORT_SYMBOL_GPL(rpc_call_start);
654
655/**
656 * rpc_peeraddr - extract remote peer address from clnt's xprt
657 * @clnt: RPC client structure
658 * @buf: target buffer
659 * @size: length of target buffer
660 *
661 * Returns the number of bytes that are actually in the stored address.
662 */
663size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
664{
665	size_t bytes;
666	struct rpc_xprt *xprt = clnt->cl_xprt;
667
668	bytes = sizeof(xprt->addr);
669	if (bytes > bufsize)
670		bytes = bufsize;
671	memcpy(buf, &clnt->cl_xprt->addr, bytes);
672	return xprt->addrlen;
673}
674EXPORT_SYMBOL_GPL(rpc_peeraddr);
675
676/**
677 * rpc_peeraddr2str - return remote peer address in printable format
678 * @clnt: RPC client structure
679 * @format: address format
680 *
681 */
682const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
683			     enum rpc_display_format_t format)
684{
685	struct rpc_xprt *xprt = clnt->cl_xprt;
686
687	if (xprt->address_strings[format] != NULL)
688		return xprt->address_strings[format];
689	else
690		return "unprintable";
691}
692EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
693
694void
695rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
696{
697	struct rpc_xprt *xprt = clnt->cl_xprt;
698	if (xprt->ops->set_buffer_size)
699		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
700}
701EXPORT_SYMBOL_GPL(rpc_setbufsize);
702
703/*
704 * Return size of largest payload RPC client can support, in bytes
705 *
706 * For stream transports, this is one RPC record fragment (see RFC
707 * 1831), as we don't support multi-record requests yet.  For datagram
708 * transports, this is the size of an IP packet minus the IP, UDP, and
709 * RPC header sizes.
710 */
711size_t rpc_max_payload(struct rpc_clnt *clnt)
712{
713	return clnt->cl_xprt->max_payload;
714}
715EXPORT_SYMBOL_GPL(rpc_max_payload);
716
717/**
718 * rpc_force_rebind - force transport to check that remote port is unchanged
719 * @clnt: client to rebind
720 *
721 */
722void rpc_force_rebind(struct rpc_clnt *clnt)
723{
724	if (clnt->cl_autobind)
725		xprt_clear_bound(clnt->cl_xprt);
726}
727EXPORT_SYMBOL_GPL(rpc_force_rebind);
728
729/*
730 * Restart an (async) RPC call. Usually called from within the
731 * exit handler.
732 */
733void
734rpc_restart_call(struct rpc_task *task)
735{
736	if (RPC_ASSASSINATED(task))
737		return;
738
739	task->tk_action = call_start;
740}
741EXPORT_SYMBOL_GPL(rpc_restart_call);
742
743/*
744 * 0.  Initial state
745 *
746 *     Other FSM states can be visited zero or more times, but
747 *     this state is visited exactly once for each RPC.
748 */
749static void
750call_start(struct rpc_task *task)
751{
752	struct rpc_clnt	*clnt = task->tk_client;
753
754	dprintk("RPC: %5u call_start %s%d proc %d (%s)\n", task->tk_pid,
755			clnt->cl_protname, clnt->cl_vers,
756			task->tk_msg.rpc_proc->p_proc,
757			(RPC_IS_ASYNC(task) ? "async" : "sync"));
758
759	/* Increment call count */
760	task->tk_msg.rpc_proc->p_count++;
761	clnt->cl_stats->rpccnt++;
762	task->tk_action = call_reserve;
763}
764
765/*
766 * 1.	Reserve an RPC call slot
767 */
768static void
769call_reserve(struct rpc_task *task)
770{
771	dprint_status(task);
772
773	if (!rpcauth_uptodatecred(task)) {
774		task->tk_action = call_refresh;
775		return;
776	}
777
778	task->tk_status  = 0;
779	task->tk_action  = call_reserveresult;
780	xprt_reserve(task);
781}
782
783/*
784 * 1b.	Grok the result of xprt_reserve()
785 */
786static void
787call_reserveresult(struct rpc_task *task)
788{
789	int status = task->tk_status;
790
791	dprint_status(task);
792
793	/*
794	 * After a call to xprt_reserve(), we must have either
795	 * a request slot or else an error status.
796	 */
797	task->tk_status = 0;
798	if (status >= 0) {
799		if (task->tk_rqstp) {
800			task->tk_action = call_allocate;
801			return;
802		}
803
804		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
805				__FUNCTION__, status);
806		rpc_exit(task, -EIO);
807		return;
808	}
809
810	/*
811	 * Even though there was an error, we may have acquired
812	 * a request slot somehow.  Make sure not to leak it.
813	 */
814	if (task->tk_rqstp) {
815		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
816				__FUNCTION__, status);
817		xprt_release(task);
818	}
819
820	switch (status) {
821	case -EAGAIN:	/* woken up; retry */
822		task->tk_action = call_reserve;
823		return;
824	case -EIO:	/* probably a shutdown */
825		break;
826	default:
827		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
828				__FUNCTION__, status);
829		break;
830	}
831	rpc_exit(task, status);
832}
833
834/*
835 * 2.	Allocate the buffer. For details, see sched.c:rpc_malloc.
836 *	(Note: buffer memory is freed in xprt_release).
837 */
838static void
839call_allocate(struct rpc_task *task)
840{
841	unsigned int slack = task->tk_msg.rpc_cred->cr_auth->au_cslack;
842	struct rpc_rqst *req = task->tk_rqstp;
843	struct rpc_xprt *xprt = task->tk_xprt;
844	struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
845
846	dprint_status(task);
847
848	task->tk_status = 0;
849	task->tk_action = call_bind;
850
851	if (req->rq_buffer)
852		return;
853
854	if (proc->p_proc != 0) {
855		BUG_ON(proc->p_arglen == 0);
856		if (proc->p_decode != NULL)
857			BUG_ON(proc->p_replen == 0);
858	}
859
860	/*
861	 * Calculate the size (in quads) of the RPC call
862	 * and reply headers, and convert both values
863	 * to byte sizes.
864	 */
865	req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
866	req->rq_callsize <<= 2;
867	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
868	req->rq_rcvsize <<= 2;
869
870	req->rq_buffer = xprt->ops->buf_alloc(task,
871					req->rq_callsize + req->rq_rcvsize);
872	if (req->rq_buffer != NULL)
873		return;
874
875	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
876
877	if (RPC_IS_ASYNC(task) || !signalled()) {
878		task->tk_action = call_allocate;
879		rpc_delay(task, HZ>>4);
880		return;
881	}
882
883	rpc_exit(task, -ERESTARTSYS);
884}
885
886static inline int
887rpc_task_need_encode(struct rpc_task *task)
888{
889	return task->tk_rqstp->rq_snd_buf.len == 0;
890}
891
892static inline void
893rpc_task_force_reencode(struct rpc_task *task)
894{
895	task->tk_rqstp->rq_snd_buf.len = 0;
896}
897
898static inline void
899rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
900{
901	buf->head[0].iov_base = start;
902	buf->head[0].iov_len = len;
903	buf->tail[0].iov_len = 0;
904	buf->page_len = 0;
905	buf->flags = 0;
906	buf->len = 0;
907	buf->buflen = len;
908}
909
910/*
911 * 3.	Encode arguments of an RPC call
912 */
913static void
914call_encode(struct rpc_task *task)
915{
916	struct rpc_rqst	*req = task->tk_rqstp;
917	kxdrproc_t	encode;
918	__be32		*p;
919
920	dprint_status(task);
921
922	rpc_xdr_buf_init(&req->rq_snd_buf,
923			 req->rq_buffer,
924			 req->rq_callsize);
925	rpc_xdr_buf_init(&req->rq_rcv_buf,
926			 (char *)req->rq_buffer + req->rq_callsize,
927			 req->rq_rcvsize);
928
929	/* Encode header and provided arguments */
930	encode = task->tk_msg.rpc_proc->p_encode;
931	if (!(p = call_header(task))) {
932		printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
933		rpc_exit(task, -EIO);
934		return;
935	}
936	if (encode == NULL)
937		return;
938
939	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
940			task->tk_msg.rpc_argp);
941	if (task->tk_status == -ENOMEM) {
942		/* XXX: Is this sane? */
943		rpc_delay(task, 3*HZ);
944		task->tk_status = -EAGAIN;
945	}
946}
947
948/*
949 * 4.	Get the server port number if not yet set
950 */
951static void
952call_bind(struct rpc_task *task)
953{
954	struct rpc_xprt *xprt = task->tk_xprt;
955
956	dprint_status(task);
957
958	task->tk_action = call_connect;
959	if (!xprt_bound(xprt)) {
960		task->tk_action = call_bind_status;
961		task->tk_timeout = xprt->bind_timeout;
962		xprt->ops->rpcbind(task);
963	}
964}
965
966/*
967 * 4a.	Sort out bind result
968 */
969static void
970call_bind_status(struct rpc_task *task)
971{
972	int status = -EIO;
973
974	if (task->tk_status >= 0) {
975		dprint_status(task);
976		task->tk_status = 0;
977		task->tk_action = call_connect;
978		return;
979	}
980
981	switch (task->tk_status) {
982	case -EAGAIN:
983		dprintk("RPC: %5u rpcbind waiting for another request "
984				"to finish\n", task->tk_pid);
985		/* avoid busy-waiting here -- could be a network outage. */
986		rpc_delay(task, 5*HZ);
987		goto retry_timeout;
988	case -EACCES:
989		dprintk("RPC: %5u remote rpcbind: RPC program/version "
990				"unavailable\n", task->tk_pid);
991		/* fail immediately if this is an RPC ping */
992		if (task->tk_msg.rpc_proc->p_proc == 0) {
993			status = -EOPNOTSUPP;
994			break;
995		}
996		rpc_delay(task, 3*HZ);
997		goto retry_timeout;
998	case -ETIMEDOUT:
999		dprintk("RPC: %5u rpcbind request timed out\n",
1000				task->tk_pid);
1001		goto retry_timeout;
1002	case -EPFNOSUPPORT:
1003		/* server doesn't support any rpcbind version we know of */
1004		dprintk("RPC: %5u remote rpcbind service unavailable\n",
1005				task->tk_pid);
1006		break;
1007	case -EPROTONOSUPPORT:
1008		dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1009				task->tk_pid);
1010		task->tk_status = 0;
1011		task->tk_action = call_bind;
1012		return;
1013	default:
1014		dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1015				task->tk_pid, -task->tk_status);
1016	}
1017
1018	rpc_exit(task, status);
1019	return;
1020
1021retry_timeout:
1022	task->tk_action = call_timeout;
1023}
1024
1025/*
1026 * 4b.	Connect to the RPC server
1027 */
1028static void
1029call_connect(struct rpc_task *task)
1030{
1031	struct rpc_xprt *xprt = task->tk_xprt;
1032
1033	dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1034			task->tk_pid, xprt,
1035			(xprt_connected(xprt) ? "is" : "is not"));
1036
1037	task->tk_action = call_transmit;
1038	if (!xprt_connected(xprt)) {
1039		task->tk_action = call_connect_status;
1040		if (task->tk_status < 0)
1041			return;
1042		xprt_connect(task);
1043	}
1044}
1045
1046/*
1047 * 4c.	Sort out connect result
1048 */
1049static void
1050call_connect_status(struct rpc_task *task)
1051{
1052	struct rpc_clnt *clnt = task->tk_client;
1053	int status = task->tk_status;
1054
1055	dprint_status(task);
1056
1057	task->tk_status = 0;
1058	if (status >= 0) {
1059		clnt->cl_stats->netreconn++;
1060		task->tk_action = call_transmit;
1061		return;
1062	}
1063
1064	/* Something failed: remote service port may have changed */
1065	rpc_force_rebind(clnt);
1066
1067	switch (status) {
1068	case -ENOTCONN:
1069	case -EAGAIN:
1070		task->tk_action = call_bind;
1071		if (!RPC_IS_SOFT(task))
1072			return;
1073		/* if soft mounted, test if we've timed out */
1074	case -ETIMEDOUT:
1075		task->tk_action = call_timeout;
1076		return;
1077	}
1078	rpc_exit(task, -EIO);
1079}
1080
1081/*
1082 * 5.	Transmit the RPC request, and wait for reply
1083 */
1084static void
1085call_transmit(struct rpc_task *task)
1086{
1087	dprint_status(task);
1088
1089	task->tk_action = call_status;
1090	if (task->tk_status < 0)
1091		return;
1092	task->tk_status = xprt_prepare_transmit(task);
1093	if (task->tk_status != 0)
1094		return;
1095	task->tk_action = call_transmit_status;
1096	/* Encode here so that rpcsec_gss can use correct sequence number. */
1097	if (rpc_task_need_encode(task)) {
1098		BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
1099		call_encode(task);
1100		/* Did the encode result in an error condition? */
1101		if (task->tk_status != 0)
1102			return;
1103	}
1104	xprt_transmit(task);
1105	if (task->tk_status < 0)
1106		return;
1107	/*
1108	 * On success, ensure that we call xprt_end_transmit() before sleeping
1109	 * in order to allow access to the socket to other RPC requests.
1110	 */
1111	call_transmit_status(task);
1112	if (task->tk_msg.rpc_proc->p_decode != NULL)
1113		return;
1114	task->tk_action = rpc_exit_task;
1115	rpc_wake_up_task(task);
1116}
1117
1118/*
1119 * 5a.	Handle cleanup after a transmission
1120 */
1121static void
1122call_transmit_status(struct rpc_task *task)
1123{
1124	task->tk_action = call_status;
1125	/*
1126	 * Special case: if we've been waiting on the socket's write_space()
1127	 * callback, then don't call xprt_end_transmit().
1128	 */
1129	if (task->tk_status == -EAGAIN)
1130		return;
1131	xprt_end_transmit(task);
1132	rpc_task_force_reencode(task);
1133}
1134
1135/*
1136 * 6.	Sort out the RPC call status
1137 */
1138static void
1139call_status(struct rpc_task *task)
1140{
1141	struct rpc_clnt	*clnt = task->tk_client;
1142	struct rpc_rqst	*req = task->tk_rqstp;
1143	int		status;
1144
1145	if (req->rq_received > 0 && !req->rq_bytes_sent)
1146		task->tk_status = req->rq_received;
1147
1148	dprint_status(task);
1149
1150	status = task->tk_status;
1151	if (status >= 0) {
1152		task->tk_action = call_decode;
1153		return;
1154	}
1155
1156	task->tk_status = 0;
1157	switch(status) {
1158	case -EHOSTDOWN:
1159	case -EHOSTUNREACH:
1160	case -ENETUNREACH:
1161		/*
1162		 * Delay any retries for 3 seconds, then handle as if it
1163		 * were a timeout.
1164		 */
1165		rpc_delay(task, 3*HZ);
1166	case -ETIMEDOUT:
1167		task->tk_action = call_timeout;
1168		if (task->tk_client->cl_discrtry)
1169			xprt_force_disconnect(task->tk_xprt);
1170		break;
1171	case -ECONNREFUSED:
1172	case -ENOTCONN:
1173		rpc_force_rebind(clnt);
1174		task->tk_action = call_bind;
1175		break;
1176	case -EAGAIN:
1177		task->tk_action = call_transmit;
1178		break;
1179	case -EIO:
1180		/* shutdown or soft timeout */
1181		rpc_exit(task, status);
1182		break;
1183	default:
1184		printk("%s: RPC call returned error %d\n",
1185			       clnt->cl_protname, -status);
1186		rpc_exit(task, status);
1187	}
1188}
1189
1190/*
1191 * 6a.	Handle RPC timeout
1192 * 	We do not release the request slot, so we keep using the
1193 *	same XID for all retransmits.
1194 */
1195static void
1196call_timeout(struct rpc_task *task)
1197{
1198	struct rpc_clnt	*clnt = task->tk_client;
1199
1200	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1201		dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1202		goto retry;
1203	}
1204
1205	dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1206	task->tk_timeouts++;
1207
1208	if (RPC_IS_SOFT(task)) {
1209		printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1210				clnt->cl_protname, clnt->cl_server);
1211		rpc_exit(task, -EIO);
1212		return;
1213	}
1214
1215	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1216		task->tk_flags |= RPC_CALL_MAJORSEEN;
1217		printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1218			clnt->cl_protname, clnt->cl_server);
1219	}
1220	rpc_force_rebind(clnt);
1221
1222retry:
1223	clnt->cl_stats->rpcretrans++;
1224	task->tk_action = call_bind;
1225	task->tk_status = 0;
1226}
1227
1228/*
1229 * 7.	Decode the RPC reply
1230 */
1231static void
1232call_decode(struct rpc_task *task)
1233{
1234	struct rpc_clnt	*clnt = task->tk_client;
1235	struct rpc_rqst	*req = task->tk_rqstp;
1236	kxdrproc_t	decode = task->tk_msg.rpc_proc->p_decode;
1237	__be32		*p;
1238
1239	dprintk("RPC: %5u call_decode (status %d)\n",
1240			task->tk_pid, task->tk_status);
1241
1242	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1243		printk(KERN_NOTICE "%s: server %s OK\n",
1244			clnt->cl_protname, clnt->cl_server);
1245		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1246	}
1247
1248	if (task->tk_status < 12) {
1249		if (!RPC_IS_SOFT(task)) {
1250			task->tk_action = call_bind;
1251			clnt->cl_stats->rpcretrans++;
1252			goto out_retry;
1253		}
1254		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1255				clnt->cl_protname, task->tk_status);
1256		task->tk_action = call_timeout;
1257		goto out_retry;
1258	}
1259
1260	/*
1261	 * Ensure that we see all writes made by xprt_complete_rqst()
1262	 * before it changed req->rq_received.
1263	 */
1264	smp_rmb();
1265	req->rq_rcv_buf.len = req->rq_private_buf.len;
1266
1267	/* Check that the softirq receive buffer is valid */
1268	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1269				sizeof(req->rq_rcv_buf)) != 0);
1270
1271	/* Verify the RPC header */
1272	p = call_verify(task);
1273	if (IS_ERR(p)) {
1274		if (p == ERR_PTR(-EAGAIN))
1275			goto out_retry;
1276		return;
1277	}
1278
1279	task->tk_action = rpc_exit_task;
1280
1281	if (decode) {
1282		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1283						      task->tk_msg.rpc_resp);
1284	}
1285	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1286			task->tk_status);
1287	return;
1288out_retry:
1289	req->rq_received = req->rq_private_buf.len = 0;
1290	task->tk_status = 0;
1291	if (task->tk_client->cl_discrtry)
1292		xprt_force_disconnect(task->tk_xprt);
1293}
1294
1295/*
1296 * 8.	Refresh the credentials if rejected by the server
1297 */
1298static void
1299call_refresh(struct rpc_task *task)
1300{
1301	dprint_status(task);
1302
1303	task->tk_action = call_refreshresult;
1304	task->tk_status = 0;
1305	task->tk_client->cl_stats->rpcauthrefresh++;
1306	rpcauth_refreshcred(task);
1307}
1308
1309/*
1310 * 8a.	Process the results of a credential refresh
1311 */
1312static void
1313call_refreshresult(struct rpc_task *task)
1314{
1315	int status = task->tk_status;
1316
1317	dprint_status(task);
1318
1319	task->tk_status = 0;
1320	task->tk_action = call_reserve;
1321	if (status >= 0 && rpcauth_uptodatecred(task))
1322		return;
1323	if (status == -EACCES) {
1324		rpc_exit(task, -EACCES);
1325		return;
1326	}
1327	task->tk_action = call_refresh;
1328	if (status != -ETIMEDOUT)
1329		rpc_delay(task, 3*HZ);
1330	return;
1331}
1332
1333/*
1334 * Call header serialization
1335 */
1336static __be32 *
1337call_header(struct rpc_task *task)
1338{
1339	struct rpc_clnt *clnt = task->tk_client;
1340	struct rpc_rqst	*req = task->tk_rqstp;
1341	__be32		*p = req->rq_svec[0].iov_base;
1342
1343	/* FIXME: check buffer size? */
1344
1345	p = xprt_skip_transport_header(task->tk_xprt, p);
1346	*p++ = req->rq_xid;		/* XID */
1347	*p++ = htonl(RPC_CALL);		/* CALL */
1348	*p++ = htonl(RPC_VERSION);	/* RPC version */
1349	*p++ = htonl(clnt->cl_prog);	/* program number */
1350	*p++ = htonl(clnt->cl_vers);	/* program version */
1351	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
1352	p = rpcauth_marshcred(task, p);
1353	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1354	return p;
1355}
1356
1357/*
1358 * Reply header verification
1359 */
1360static __be32 *
1361call_verify(struct rpc_task *task)
1362{
1363	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1364	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1365	__be32	*p = iov->iov_base;
1366	u32 n;
1367	int error = -EACCES;
1368
1369	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1370		/* RFC-1014 says that the representation of XDR data must be a
1371		 * multiple of four bytes
1372		 * - if it isn't pointer subtraction in the NFS client may give
1373		 *   undefined results
1374		 */
1375		dprintk("RPC: %5u %s: XDR representation not a multiple of"
1376		       " 4 bytes: 0x%x\n", task->tk_pid, __FUNCTION__,
1377		       task->tk_rqstp->rq_rcv_buf.len);
1378		goto out_eio;
1379	}
1380	if ((len -= 3) < 0)
1381		goto out_overflow;
1382	p += 1;	/* skip XID */
1383
1384	if ((n = ntohl(*p++)) != RPC_REPLY) {
1385		dprintk("RPC: %5u %s: not an RPC reply: %x\n",
1386				task->tk_pid, __FUNCTION__, n);
1387		goto out_garbage;
1388	}
1389	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1390		if (--len < 0)
1391			goto out_overflow;
1392		switch ((n = ntohl(*p++))) {
1393			case RPC_AUTH_ERROR:
1394				break;
1395			case RPC_MISMATCH:
1396				dprintk("RPC: %5u %s: RPC call version "
1397						"mismatch!\n",
1398						task->tk_pid, __FUNCTION__);
1399				error = -EPROTONOSUPPORT;
1400				goto out_err;
1401			default:
1402				dprintk("RPC: %5u %s: RPC call rejected, "
1403						"unknown error: %x\n",
1404						task->tk_pid, __FUNCTION__, n);
1405				goto out_eio;
1406		}
1407		if (--len < 0)
1408			goto out_overflow;
1409		switch ((n = ntohl(*p++))) {
1410		case RPC_AUTH_REJECTEDCRED:
1411		case RPC_AUTH_REJECTEDVERF:
1412		case RPCSEC_GSS_CREDPROBLEM:
1413		case RPCSEC_GSS_CTXPROBLEM:
1414			if (!task->tk_cred_retry)
1415				break;
1416			task->tk_cred_retry--;
1417			dprintk("RPC: %5u %s: retry stale creds\n",
1418					task->tk_pid, __FUNCTION__);
1419			rpcauth_invalcred(task);
1420			/* Ensure we obtain a new XID! */
1421			xprt_release(task);
1422			task->tk_action = call_refresh;
1423			goto out_retry;
1424		case RPC_AUTH_BADCRED:
1425		case RPC_AUTH_BADVERF:
1426			/* possibly garbled cred/verf? */
1427			if (!task->tk_garb_retry)
1428				break;
1429			task->tk_garb_retry--;
1430			dprintk("RPC: %5u %s: retry garbled creds\n",
1431					task->tk_pid, __FUNCTION__);
1432			task->tk_action = call_bind;
1433			goto out_retry;
1434		case RPC_AUTH_TOOWEAK:
1435			printk(KERN_NOTICE "call_verify: server %s requires stronger "
1436			       "authentication.\n", task->tk_client->cl_server);
1437			break;
1438		default:
1439			dprintk("RPC: %5u %s: unknown auth error: %x\n",
1440					task->tk_pid, __FUNCTION__, n);
1441			error = -EIO;
1442		}
1443		dprintk("RPC: %5u %s: call rejected %d\n",
1444				task->tk_pid, __FUNCTION__, n);
1445		goto out_err;
1446	}
1447	if (!(p = rpcauth_checkverf(task, p))) {
1448		dprintk("RPC: %5u %s: auth check failed\n",
1449				task->tk_pid, __FUNCTION__);
1450		goto out_garbage;		/* bad verifier, retry */
1451	}
1452	len = p - (__be32 *)iov->iov_base - 1;
1453	if (len < 0)
1454		goto out_overflow;
1455	switch ((n = ntohl(*p++))) {
1456	case RPC_SUCCESS:
1457		return p;
1458	case RPC_PROG_UNAVAIL:
1459		dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
1460				task->tk_pid, __FUNCTION__,
1461				(unsigned int)task->tk_client->cl_prog,
1462				task->tk_client->cl_server);
1463		error = -EPFNOSUPPORT;
1464		goto out_err;
1465	case RPC_PROG_MISMATCH:
1466		dprintk("RPC: %5u %s: program %u, version %u unsupported by "
1467				"server %s\n", task->tk_pid, __FUNCTION__,
1468				(unsigned int)task->tk_client->cl_prog,
1469				(unsigned int)task->tk_client->cl_vers,
1470				task->tk_client->cl_server);
1471		error = -EPROTONOSUPPORT;
1472		goto out_err;
1473	case RPC_PROC_UNAVAIL:
1474		dprintk("RPC: %5u %s: proc %p unsupported by program %u, "
1475				"version %u on server %s\n",
1476				task->tk_pid, __FUNCTION__,
1477				task->tk_msg.rpc_proc,
1478				task->tk_client->cl_prog,
1479				task->tk_client->cl_vers,
1480				task->tk_client->cl_server);
1481		error = -EOPNOTSUPP;
1482		goto out_err;
1483	case RPC_GARBAGE_ARGS:
1484		dprintk("RPC: %5u %s: server saw garbage\n",
1485				task->tk_pid, __FUNCTION__);
1486		break;			/* retry */
1487	default:
1488		dprintk("RPC: %5u %s: server accept status: %x\n",
1489				task->tk_pid, __FUNCTION__, n);
1490		/* Also retry */
1491	}
1492
1493out_garbage:
1494	task->tk_client->cl_stats->rpcgarbage++;
1495	if (task->tk_garb_retry) {
1496		task->tk_garb_retry--;
1497		dprintk("RPC: %5u %s: retrying\n",
1498				task->tk_pid, __FUNCTION__);
1499		task->tk_action = call_bind;
1500out_retry:
1501		return ERR_PTR(-EAGAIN);
1502	}
1503out_eio:
1504	error = -EIO;
1505out_err:
1506	rpc_exit(task, error);
1507	dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
1508			__FUNCTION__, error);
1509	return ERR_PTR(error);
1510out_overflow:
1511	dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
1512			__FUNCTION__);
1513	goto out_garbage;
1514}
1515
1516static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
1517{
1518	return 0;
1519}
1520
1521static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
1522{
1523	return 0;
1524}
1525
1526static struct rpc_procinfo rpcproc_null = {
1527	.p_encode = rpcproc_encode_null,
1528	.p_decode = rpcproc_decode_null,
1529};
1530
1531static int rpc_ping(struct rpc_clnt *clnt, int flags)
1532{
1533	struct rpc_message msg = {
1534		.rpc_proc = &rpcproc_null,
1535	};
1536	int err;
1537	msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1538	err = rpc_call_sync(clnt, &msg, flags);
1539	put_rpccred(msg.rpc_cred);
1540	return err;
1541}
1542
1543struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
1544{
1545	struct rpc_message msg = {
1546		.rpc_proc = &rpcproc_null,
1547		.rpc_cred = cred,
1548	};
1549	struct rpc_task_setup task_setup_data = {
1550		.rpc_client = clnt,
1551		.rpc_message = &msg,
1552		.callback_ops = &rpc_default_ops,
1553		.flags = flags,
1554	};
1555	return rpc_run_task(&task_setup_data);
1556}
1557EXPORT_SYMBOL_GPL(rpc_call_null);
1558
1559#ifdef RPC_DEBUG
1560void rpc_show_tasks(void)
1561{
1562	struct rpc_clnt *clnt;
1563	struct rpc_task *t;
1564
1565	spin_lock(&rpc_client_lock);
1566	if (list_empty(&all_clients))
1567		goto out;
1568	printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1569		"-rpcwait -action- ---ops--\n");
1570	list_for_each_entry(clnt, &all_clients, cl_clients) {
1571		if (list_empty(&clnt->cl_tasks))
1572			continue;
1573		spin_lock(&clnt->cl_lock);
1574		list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
1575			const char *rpc_waitq = "none";
1576			int proc;
1577
1578			if (t->tk_msg.rpc_proc)
1579				proc = t->tk_msg.rpc_proc->p_proc;
1580			else
1581				proc = -1;
1582
1583			if (RPC_IS_QUEUED(t))
1584				rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
1585
1586			printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
1587				t->tk_pid, proc,
1588				t->tk_flags, t->tk_status,
1589				t->tk_client,
1590				(t->tk_client ? t->tk_client->cl_prog : 0),
1591				t->tk_rqstp, t->tk_timeout,
1592				rpc_waitq,
1593				t->tk_action, t->tk_ops);
1594		}
1595		spin_unlock(&clnt->cl_lock);
1596	}
1597out:
1598	spin_unlock(&rpc_client_lock);
1599}
1600#endif
1601