clnt.c revision 698b6d088e8a5d907596c689d5ae9109611c5b59
1/*
2 *  linux/net/sunrpc/clnt.c
3 *
4 *  This file contains the high-level RPC interface.
5 *  It is modeled as a finite state machine to support both synchronous
6 *  and asynchronous requests.
7 *
8 *  -	RPC header generation and argument serialization.
9 *  -	Credential refresh.
10 *  -	TCP connect handling.
11 *  -	Retry of operation when it is suspected the operation failed because
12 *	of uid squashing on the server, or when the credentials were stale
13 *	and need to be refreshed, or when a packet was damaged in transit.
14 *	This may be have to be moved to the VFS layer.
15 *
16 *  NB: BSD uses a more intelligent approach to guessing when a request
17 *  or reply has been lost by keeping the RTO estimate for each procedure.
18 *  We currently make do with a constant timeout value.
19 *
20 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22 */
23
24#include <asm/system.h>
25
26#include <linux/module.h>
27#include <linux/types.h>
28#include <linux/mm.h>
29#include <linux/slab.h>
30#include <linux/smp_lock.h>
31#include <linux/utsname.h>
32#include <linux/workqueue.h>
33#include <linux/in6.h>
34
35#include <linux/sunrpc/clnt.h>
36#include <linux/sunrpc/rpc_pipe_fs.h>
37#include <linux/sunrpc/metrics.h>
38
39
40#ifdef RPC_DEBUG
41# define RPCDBG_FACILITY	RPCDBG_CALL
42#endif
43
44#define dprint_status(t)					\
45	dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,		\
46			__FUNCTION__, t->tk_status)
47
48/*
49 * All RPC clients are linked into this list
50 */
51static LIST_HEAD(all_clients);
52static DEFINE_SPINLOCK(rpc_client_lock);
53
54static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
55
56
57static void	call_start(struct rpc_task *task);
58static void	call_reserve(struct rpc_task *task);
59static void	call_reserveresult(struct rpc_task *task);
60static void	call_allocate(struct rpc_task *task);
61static void	call_encode(struct rpc_task *task);
62static void	call_decode(struct rpc_task *task);
63static void	call_bind(struct rpc_task *task);
64static void	call_bind_status(struct rpc_task *task);
65static void	call_transmit(struct rpc_task *task);
66static void	call_status(struct rpc_task *task);
67static void	call_transmit_status(struct rpc_task *task);
68static void	call_refresh(struct rpc_task *task);
69static void	call_refreshresult(struct rpc_task *task);
70static void	call_timeout(struct rpc_task *task);
71static void	call_connect(struct rpc_task *task);
72static void	call_connect_status(struct rpc_task *task);
73static __be32 *	call_header(struct rpc_task *task);
74static __be32 *	call_verify(struct rpc_task *task);
75
76static int	rpc_ping(struct rpc_clnt *clnt, int flags);
77
78static void rpc_register_client(struct rpc_clnt *clnt)
79{
80	spin_lock(&rpc_client_lock);
81	list_add(&clnt->cl_clients, &all_clients);
82	spin_unlock(&rpc_client_lock);
83}
84
85static void rpc_unregister_client(struct rpc_clnt *clnt)
86{
87	spin_lock(&rpc_client_lock);
88	list_del(&clnt->cl_clients);
89	spin_unlock(&rpc_client_lock);
90}
91
92static int
93rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
94{
95	static uint32_t clntid;
96	int error;
97
98	clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
99	clnt->cl_dentry = ERR_PTR(-ENOENT);
100	if (dir_name == NULL)
101		return 0;
102
103	clnt->cl_vfsmnt = rpc_get_mount();
104	if (IS_ERR(clnt->cl_vfsmnt))
105		return PTR_ERR(clnt->cl_vfsmnt);
106
107	for (;;) {
108		snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
109				"%s/clnt%x", dir_name,
110				(unsigned int)clntid++);
111		clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
112		clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
113		if (!IS_ERR(clnt->cl_dentry))
114			return 0;
115		error = PTR_ERR(clnt->cl_dentry);
116		if (error != -EEXIST) {
117			printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
118					clnt->cl_pathname, error);
119			rpc_put_mount();
120			return error;
121		}
122	}
123}
124
125static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
126{
127	struct rpc_program	*program = args->program;
128	struct rpc_version	*version;
129	struct rpc_clnt		*clnt = NULL;
130	struct rpc_auth		*auth;
131	int err;
132	size_t len;
133
134	/* sanity check the name before trying to print it */
135	err = -EINVAL;
136	len = strlen(args->servername);
137	if (len > RPC_MAXNETNAMELEN)
138		goto out_no_rpciod;
139	len++;
140
141	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
142			program->name, args->servername, xprt);
143
144	err = rpciod_up();
145	if (err)
146		goto out_no_rpciod;
147	err = -EINVAL;
148	if (!xprt)
149		goto out_no_xprt;
150
151	if (args->version >= program->nrvers)
152		goto out_err;
153	version = program->version[args->version];
154	if (version == NULL)
155		goto out_err;
156
157	err = -ENOMEM;
158	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
159	if (!clnt)
160		goto out_err;
161	clnt->cl_parent = clnt;
162
163	clnt->cl_server = clnt->cl_inline_name;
164	if (len > sizeof(clnt->cl_inline_name)) {
165		char *buf = kmalloc(len, GFP_KERNEL);
166		if (buf != NULL)
167			clnt->cl_server = buf;
168		else
169			len = sizeof(clnt->cl_inline_name);
170	}
171	strlcpy(clnt->cl_server, args->servername, len);
172
173	clnt->cl_xprt     = xprt;
174	clnt->cl_procinfo = version->procs;
175	clnt->cl_maxproc  = version->nrprocs;
176	clnt->cl_protname = program->name;
177	clnt->cl_prog     = program->number;
178	clnt->cl_vers     = version->number;
179	clnt->cl_stats    = program->stats;
180	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
181	err = -ENOMEM;
182	if (clnt->cl_metrics == NULL)
183		goto out_no_stats;
184	clnt->cl_program  = program;
185	INIT_LIST_HEAD(&clnt->cl_tasks);
186	spin_lock_init(&clnt->cl_lock);
187
188	if (!xprt_bound(clnt->cl_xprt))
189		clnt->cl_autobind = 1;
190
191	clnt->cl_rtt = &clnt->cl_rtt_default;
192	rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
193
194	kref_init(&clnt->cl_kref);
195
196	err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
197	if (err < 0)
198		goto out_no_path;
199
200	auth = rpcauth_create(args->authflavor, clnt);
201	if (IS_ERR(auth)) {
202		printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
203				args->authflavor);
204		err = PTR_ERR(auth);
205		goto out_no_auth;
206	}
207
208	/* save the nodename */
209	clnt->cl_nodelen = strlen(utsname()->nodename);
210	if (clnt->cl_nodelen > UNX_MAXNODENAME)
211		clnt->cl_nodelen = UNX_MAXNODENAME;
212	memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
213	rpc_register_client(clnt);
214	return clnt;
215
216out_no_auth:
217	if (!IS_ERR(clnt->cl_dentry)) {
218		rpc_rmdir(clnt->cl_dentry);
219		rpc_put_mount();
220	}
221out_no_path:
222	rpc_free_iostats(clnt->cl_metrics);
223out_no_stats:
224	if (clnt->cl_server != clnt->cl_inline_name)
225		kfree(clnt->cl_server);
226	kfree(clnt);
227out_err:
228	xprt_put(xprt);
229out_no_xprt:
230	rpciod_down();
231out_no_rpciod:
232	return ERR_PTR(err);
233}
234
235/*
236 * rpc_create - create an RPC client and transport with one call
237 * @args: rpc_clnt create argument structure
238 *
239 * Creates and initializes an RPC transport and an RPC client.
240 *
241 * It can ping the server in order to determine if it is up, and to see if
242 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
243 * this behavior so asynchronous tasks can also use rpc_create.
244 */
245struct rpc_clnt *rpc_create(struct rpc_create_args *args)
246{
247	struct rpc_xprt *xprt;
248	struct rpc_clnt *clnt;
249	struct xprt_create xprtargs = {
250		.ident = args->protocol,
251		.srcaddr = args->saddress,
252		.dstaddr = args->address,
253		.addrlen = args->addrsize,
254		.timeout = args->timeout
255	};
256	char servername[48];
257
258	xprt = xprt_create_transport(&xprtargs);
259	if (IS_ERR(xprt))
260		return (struct rpc_clnt *)xprt;
261
262	/*
263	 * If the caller chooses not to specify a hostname, whip
264	 * up a string representation of the passed-in address.
265	 */
266	if (args->servername == NULL) {
267		servername[0] = '\0';
268		switch (args->address->sa_family) {
269		case AF_INET: {
270			struct sockaddr_in *sin =
271					(struct sockaddr_in *)args->address;
272			snprintf(servername, sizeof(servername), NIPQUAD_FMT,
273				 NIPQUAD(sin->sin_addr.s_addr));
274			break;
275		}
276		case AF_INET6: {
277			struct sockaddr_in6 *sin =
278					(struct sockaddr_in6 *)args->address;
279			snprintf(servername, sizeof(servername), NIP6_FMT,
280				 NIP6(sin->sin6_addr));
281			break;
282		}
283		default:
284			/* caller wants default server name, but
285			 * address family isn't recognized. */
286			return ERR_PTR(-EINVAL);
287		}
288		args->servername = servername;
289	}
290
291	xprt = xprt_create_transport(&xprtargs);
292	if (IS_ERR(xprt))
293		return (struct rpc_clnt *)xprt;
294
295	/*
296	 * By default, kernel RPC client connects from a reserved port.
297	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
298	 * but it is always enabled for rpciod, which handles the connect
299	 * operation.
300	 */
301	xprt->resvport = 1;
302	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
303		xprt->resvport = 0;
304
305	clnt = rpc_new_client(args, xprt);
306	if (IS_ERR(clnt))
307		return clnt;
308
309	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
310		int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
311		if (err != 0) {
312			rpc_shutdown_client(clnt);
313			return ERR_PTR(err);
314		}
315	}
316
317	clnt->cl_softrtry = 1;
318	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
319		clnt->cl_softrtry = 0;
320
321	if (args->flags & RPC_CLNT_CREATE_INTR)
322		clnt->cl_intr = 1;
323	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
324		clnt->cl_autobind = 1;
325	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
326		clnt->cl_discrtry = 1;
327
328	return clnt;
329}
330EXPORT_SYMBOL_GPL(rpc_create);
331
332/*
333 * This function clones the RPC client structure. It allows us to share the
334 * same transport while varying parameters such as the authentication
335 * flavour.
336 */
337struct rpc_clnt *
338rpc_clone_client(struct rpc_clnt *clnt)
339{
340	struct rpc_clnt *new;
341	int err = -ENOMEM;
342
343	new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
344	if (!new)
345		goto out_no_clnt;
346	new->cl_parent = clnt;
347	/* Turn off autobind on clones */
348	new->cl_autobind = 0;
349	INIT_LIST_HEAD(&new->cl_tasks);
350	spin_lock_init(&new->cl_lock);
351	rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
352	new->cl_metrics = rpc_alloc_iostats(clnt);
353	if (new->cl_metrics == NULL)
354		goto out_no_stats;
355	kref_init(&new->cl_kref);
356	err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
357	if (err != 0)
358		goto out_no_path;
359	if (new->cl_auth)
360		atomic_inc(&new->cl_auth->au_count);
361	xprt_get(clnt->cl_xprt);
362	kref_get(&clnt->cl_kref);
363	rpc_register_client(new);
364	rpciod_up();
365	return new;
366out_no_path:
367	rpc_free_iostats(new->cl_metrics);
368out_no_stats:
369	kfree(new);
370out_no_clnt:
371	dprintk("RPC:       %s: returned error %d\n", __FUNCTION__, err);
372	return ERR_PTR(err);
373}
374EXPORT_SYMBOL_GPL(rpc_clone_client);
375
376/*
377 * Properly shut down an RPC client, terminating all outstanding
378 * requests.
379 */
380void rpc_shutdown_client(struct rpc_clnt *clnt)
381{
382	dprintk("RPC:       shutting down %s client for %s\n",
383			clnt->cl_protname, clnt->cl_server);
384
385	while (!list_empty(&clnt->cl_tasks)) {
386		rpc_killall_tasks(clnt);
387		wait_event_timeout(destroy_wait,
388			list_empty(&clnt->cl_tasks), 1*HZ);
389	}
390
391	rpc_release_client(clnt);
392}
393EXPORT_SYMBOL_GPL(rpc_shutdown_client);
394
395/*
396 * Free an RPC client
397 */
398static void
399rpc_free_client(struct kref *kref)
400{
401	struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
402
403	dprintk("RPC:       destroying %s client for %s\n",
404			clnt->cl_protname, clnt->cl_server);
405	if (!IS_ERR(clnt->cl_dentry)) {
406		rpc_rmdir(clnt->cl_dentry);
407		rpc_put_mount();
408	}
409	if (clnt->cl_parent != clnt) {
410		rpc_release_client(clnt->cl_parent);
411		goto out_free;
412	}
413	if (clnt->cl_server != clnt->cl_inline_name)
414		kfree(clnt->cl_server);
415out_free:
416	rpc_unregister_client(clnt);
417	rpc_free_iostats(clnt->cl_metrics);
418	clnt->cl_metrics = NULL;
419	xprt_put(clnt->cl_xprt);
420	rpciod_down();
421	kfree(clnt);
422}
423
424/*
425 * Free an RPC client
426 */
427static void
428rpc_free_auth(struct kref *kref)
429{
430	struct rpc_clnt *clnt = container_of(kref, struct rpc_clnt, cl_kref);
431
432	if (clnt->cl_auth == NULL) {
433		rpc_free_client(kref);
434		return;
435	}
436
437	/*
438	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
439	 *       release remaining GSS contexts. This mechanism ensures
440	 *       that it can do so safely.
441	 */
442	kref_init(kref);
443	rpcauth_release(clnt->cl_auth);
444	clnt->cl_auth = NULL;
445	kref_put(kref, rpc_free_client);
446}
447
448/*
449 * Release reference to the RPC client
450 */
451void
452rpc_release_client(struct rpc_clnt *clnt)
453{
454	dprintk("RPC:       rpc_release_client(%p)\n", clnt);
455
456	if (list_empty(&clnt->cl_tasks))
457		wake_up(&destroy_wait);
458	kref_put(&clnt->cl_kref, rpc_free_auth);
459}
460
461/**
462 * rpc_bind_new_program - bind a new RPC program to an existing client
463 * @old - old rpc_client
464 * @program - rpc program to set
465 * @vers - rpc program version
466 *
467 * Clones the rpc client and sets up a new RPC program. This is mainly
468 * of use for enabling different RPC programs to share the same transport.
469 * The Sun NFSv2/v3 ACL protocol can do this.
470 */
471struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
472				      struct rpc_program *program,
473				      u32 vers)
474{
475	struct rpc_clnt *clnt;
476	struct rpc_version *version;
477	int err;
478
479	BUG_ON(vers >= program->nrvers || !program->version[vers]);
480	version = program->version[vers];
481	clnt = rpc_clone_client(old);
482	if (IS_ERR(clnt))
483		goto out;
484	clnt->cl_procinfo = version->procs;
485	clnt->cl_maxproc  = version->nrprocs;
486	clnt->cl_protname = program->name;
487	clnt->cl_prog     = program->number;
488	clnt->cl_vers     = version->number;
489	clnt->cl_stats    = program->stats;
490	err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
491	if (err != 0) {
492		rpc_shutdown_client(clnt);
493		clnt = ERR_PTR(err);
494	}
495out:
496	return clnt;
497}
498EXPORT_SYMBOL_GPL(rpc_bind_new_program);
499
500/*
501 * Default callback for async RPC calls
502 */
503static void
504rpc_default_callback(struct rpc_task *task, void *data)
505{
506}
507
508static const struct rpc_call_ops rpc_default_ops = {
509	.rpc_call_done = rpc_default_callback,
510};
511
512/*
513 *	Export the signal mask handling for synchronous code that
514 *	sleeps on RPC calls
515 */
516#define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
517
518static void rpc_save_sigmask(sigset_t *oldset, int intr)
519{
520	unsigned long	sigallow = sigmask(SIGKILL);
521	sigset_t sigmask;
522
523	/* Block all signals except those listed in sigallow */
524	if (intr)
525		sigallow |= RPC_INTR_SIGNALS;
526	siginitsetinv(&sigmask, sigallow);
527	sigprocmask(SIG_BLOCK, &sigmask, oldset);
528}
529
530static void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
531{
532	rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
533}
534
535static void rpc_restore_sigmask(sigset_t *oldset)
536{
537	sigprocmask(SIG_SETMASK, oldset, NULL);
538}
539
540void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
541{
542	rpc_save_sigmask(oldset, clnt->cl_intr);
543}
544EXPORT_SYMBOL_GPL(rpc_clnt_sigmask);
545
546void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
547{
548	rpc_restore_sigmask(oldset);
549}
550EXPORT_SYMBOL_GPL(rpc_clnt_sigunmask);
551
552/**
553 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
554 * @task_setup_data: pointer to task initialisation data
555 */
556struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
557{
558	struct rpc_task *task, *ret;
559	sigset_t oldset;
560
561	task = rpc_new_task(task_setup_data);
562	if (task == NULL) {
563		rpc_release_calldata(task_setup_data->callback_ops,
564				task_setup_data->callback_data);
565		ret = ERR_PTR(-ENOMEM);
566		goto out;
567	}
568
569	if (task->tk_status != 0) {
570		ret = ERR_PTR(task->tk_status);
571		rpc_put_task(task);
572		goto out;
573	}
574	atomic_inc(&task->tk_count);
575	/* Mask signals on synchronous RPC calls and RPCSEC_GSS upcalls */
576	rpc_task_sigmask(task, &oldset);
577	rpc_execute(task);
578	rpc_restore_sigmask(&oldset);
579	ret = task;
580out:
581	return ret;
582}
583EXPORT_SYMBOL_GPL(rpc_run_task);
584
585/**
586 * rpc_call_sync - Perform a synchronous RPC call
587 * @clnt: pointer to RPC client
588 * @msg: RPC call parameters
589 * @flags: RPC call flags
590 */
591int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
592{
593	struct rpc_task	*task;
594	struct rpc_task_setup task_setup_data = {
595		.rpc_client = clnt,
596		.rpc_message = msg,
597		.callback_ops = &rpc_default_ops,
598		.flags = flags,
599	};
600	int status;
601
602	BUG_ON(flags & RPC_TASK_ASYNC);
603
604	task = rpc_run_task(&task_setup_data);
605	if (IS_ERR(task))
606		return PTR_ERR(task);
607	status = task->tk_status;
608	rpc_put_task(task);
609	return status;
610}
611EXPORT_SYMBOL_GPL(rpc_call_sync);
612
613/**
614 * rpc_call_async - Perform an asynchronous RPC call
615 * @clnt: pointer to RPC client
616 * @msg: RPC call parameters
617 * @flags: RPC call flags
618 * @ops: RPC call ops
619 * @data: user call data
620 */
621int
622rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
623	       const struct rpc_call_ops *tk_ops, void *data)
624{
625	struct rpc_task	*task;
626	struct rpc_task_setup task_setup_data = {
627		.rpc_client = clnt,
628		.rpc_message = msg,
629		.callback_ops = tk_ops,
630		.callback_data = data,
631		.flags = flags|RPC_TASK_ASYNC,
632	};
633
634	task = rpc_run_task(&task_setup_data);
635	if (IS_ERR(task))
636		return PTR_ERR(task);
637	rpc_put_task(task);
638	return 0;
639}
640EXPORT_SYMBOL_GPL(rpc_call_async);
641
642void
643rpc_call_start(struct rpc_task *task)
644{
645	task->tk_action = call_start;
646}
647EXPORT_SYMBOL_GPL(rpc_call_start);
648
649/**
650 * rpc_peeraddr - extract remote peer address from clnt's xprt
651 * @clnt: RPC client structure
652 * @buf: target buffer
653 * @size: length of target buffer
654 *
655 * Returns the number of bytes that are actually in the stored address.
656 */
657size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
658{
659	size_t bytes;
660	struct rpc_xprt *xprt = clnt->cl_xprt;
661
662	bytes = sizeof(xprt->addr);
663	if (bytes > bufsize)
664		bytes = bufsize;
665	memcpy(buf, &clnt->cl_xprt->addr, bytes);
666	return xprt->addrlen;
667}
668EXPORT_SYMBOL_GPL(rpc_peeraddr);
669
670/**
671 * rpc_peeraddr2str - return remote peer address in printable format
672 * @clnt: RPC client structure
673 * @format: address format
674 *
675 */
676char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
677{
678	struct rpc_xprt *xprt = clnt->cl_xprt;
679
680	if (xprt->address_strings[format] != NULL)
681		return xprt->address_strings[format];
682	else
683		return "unprintable";
684}
685EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
686
687void
688rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
689{
690	struct rpc_xprt *xprt = clnt->cl_xprt;
691	if (xprt->ops->set_buffer_size)
692		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
693}
694EXPORT_SYMBOL_GPL(rpc_setbufsize);
695
696/*
697 * Return size of largest payload RPC client can support, in bytes
698 *
699 * For stream transports, this is one RPC record fragment (see RFC
700 * 1831), as we don't support multi-record requests yet.  For datagram
701 * transports, this is the size of an IP packet minus the IP, UDP, and
702 * RPC header sizes.
703 */
704size_t rpc_max_payload(struct rpc_clnt *clnt)
705{
706	return clnt->cl_xprt->max_payload;
707}
708EXPORT_SYMBOL_GPL(rpc_max_payload);
709
710/**
711 * rpc_force_rebind - force transport to check that remote port is unchanged
712 * @clnt: client to rebind
713 *
714 */
715void rpc_force_rebind(struct rpc_clnt *clnt)
716{
717	if (clnt->cl_autobind)
718		xprt_clear_bound(clnt->cl_xprt);
719}
720EXPORT_SYMBOL_GPL(rpc_force_rebind);
721
722/*
723 * Restart an (async) RPC call. Usually called from within the
724 * exit handler.
725 */
726void
727rpc_restart_call(struct rpc_task *task)
728{
729	if (RPC_ASSASSINATED(task))
730		return;
731
732	task->tk_action = call_start;
733}
734EXPORT_SYMBOL_GPL(rpc_restart_call);
735
736/*
737 * 0.  Initial state
738 *
739 *     Other FSM states can be visited zero or more times, but
740 *     this state is visited exactly once for each RPC.
741 */
742static void
743call_start(struct rpc_task *task)
744{
745	struct rpc_clnt	*clnt = task->tk_client;
746
747	dprintk("RPC: %5u call_start %s%d proc %d (%s)\n", task->tk_pid,
748			clnt->cl_protname, clnt->cl_vers,
749			task->tk_msg.rpc_proc->p_proc,
750			(RPC_IS_ASYNC(task) ? "async" : "sync"));
751
752	/* Increment call count */
753	task->tk_msg.rpc_proc->p_count++;
754	clnt->cl_stats->rpccnt++;
755	task->tk_action = call_reserve;
756}
757
758/*
759 * 1.	Reserve an RPC call slot
760 */
761static void
762call_reserve(struct rpc_task *task)
763{
764	dprint_status(task);
765
766	if (!rpcauth_uptodatecred(task)) {
767		task->tk_action = call_refresh;
768		return;
769	}
770
771	task->tk_status  = 0;
772	task->tk_action  = call_reserveresult;
773	xprt_reserve(task);
774}
775
776/*
777 * 1b.	Grok the result of xprt_reserve()
778 */
779static void
780call_reserveresult(struct rpc_task *task)
781{
782	int status = task->tk_status;
783
784	dprint_status(task);
785
786	/*
787	 * After a call to xprt_reserve(), we must have either
788	 * a request slot or else an error status.
789	 */
790	task->tk_status = 0;
791	if (status >= 0) {
792		if (task->tk_rqstp) {
793			task->tk_action = call_allocate;
794			return;
795		}
796
797		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
798				__FUNCTION__, status);
799		rpc_exit(task, -EIO);
800		return;
801	}
802
803	/*
804	 * Even though there was an error, we may have acquired
805	 * a request slot somehow.  Make sure not to leak it.
806	 */
807	if (task->tk_rqstp) {
808		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
809				__FUNCTION__, status);
810		xprt_release(task);
811	}
812
813	switch (status) {
814	case -EAGAIN:	/* woken up; retry */
815		task->tk_action = call_reserve;
816		return;
817	case -EIO:	/* probably a shutdown */
818		break;
819	default:
820		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
821				__FUNCTION__, status);
822		break;
823	}
824	rpc_exit(task, status);
825}
826
827/*
828 * 2.	Allocate the buffer. For details, see sched.c:rpc_malloc.
829 *	(Note: buffer memory is freed in xprt_release).
830 */
831static void
832call_allocate(struct rpc_task *task)
833{
834	unsigned int slack = task->tk_msg.rpc_cred->cr_auth->au_cslack;
835	struct rpc_rqst *req = task->tk_rqstp;
836	struct rpc_xprt *xprt = task->tk_xprt;
837	struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
838
839	dprint_status(task);
840
841	task->tk_status = 0;
842	task->tk_action = call_bind;
843
844	if (req->rq_buffer)
845		return;
846
847	if (proc->p_proc != 0) {
848		BUG_ON(proc->p_arglen == 0);
849		if (proc->p_decode != NULL)
850			BUG_ON(proc->p_replen == 0);
851	}
852
853	/*
854	 * Calculate the size (in quads) of the RPC call
855	 * and reply headers, and convert both values
856	 * to byte sizes.
857	 */
858	req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
859	req->rq_callsize <<= 2;
860	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
861	req->rq_rcvsize <<= 2;
862
863	req->rq_buffer = xprt->ops->buf_alloc(task,
864					req->rq_callsize + req->rq_rcvsize);
865	if (req->rq_buffer != NULL)
866		return;
867
868	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
869
870	if (RPC_IS_ASYNC(task) || !signalled()) {
871		task->tk_action = call_allocate;
872		rpc_delay(task, HZ>>4);
873		return;
874	}
875
876	rpc_exit(task, -ERESTARTSYS);
877}
878
879static inline int
880rpc_task_need_encode(struct rpc_task *task)
881{
882	return task->tk_rqstp->rq_snd_buf.len == 0;
883}
884
885static inline void
886rpc_task_force_reencode(struct rpc_task *task)
887{
888	task->tk_rqstp->rq_snd_buf.len = 0;
889}
890
891static inline void
892rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
893{
894	buf->head[0].iov_base = start;
895	buf->head[0].iov_len = len;
896	buf->tail[0].iov_len = 0;
897	buf->page_len = 0;
898	buf->flags = 0;
899	buf->len = 0;
900	buf->buflen = len;
901}
902
903/*
904 * 3.	Encode arguments of an RPC call
905 */
906static void
907call_encode(struct rpc_task *task)
908{
909	struct rpc_rqst	*req = task->tk_rqstp;
910	kxdrproc_t	encode;
911	__be32		*p;
912
913	dprint_status(task);
914
915	rpc_xdr_buf_init(&req->rq_snd_buf,
916			 req->rq_buffer,
917			 req->rq_callsize);
918	rpc_xdr_buf_init(&req->rq_rcv_buf,
919			 (char *)req->rq_buffer + req->rq_callsize,
920			 req->rq_rcvsize);
921
922	/* Encode header and provided arguments */
923	encode = task->tk_msg.rpc_proc->p_encode;
924	if (!(p = call_header(task))) {
925		printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
926		rpc_exit(task, -EIO);
927		return;
928	}
929	if (encode == NULL)
930		return;
931
932	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
933			task->tk_msg.rpc_argp);
934	if (task->tk_status == -ENOMEM) {
935		/* XXX: Is this sane? */
936		rpc_delay(task, 3*HZ);
937		task->tk_status = -EAGAIN;
938	}
939}
940
941/*
942 * 4.	Get the server port number if not yet set
943 */
944static void
945call_bind(struct rpc_task *task)
946{
947	struct rpc_xprt *xprt = task->tk_xprt;
948
949	dprint_status(task);
950
951	task->tk_action = call_connect;
952	if (!xprt_bound(xprt)) {
953		task->tk_action = call_bind_status;
954		task->tk_timeout = xprt->bind_timeout;
955		xprt->ops->rpcbind(task);
956	}
957}
958
959/*
960 * 4a.	Sort out bind result
961 */
962static void
963call_bind_status(struct rpc_task *task)
964{
965	int status = -EIO;
966
967	if (task->tk_status >= 0) {
968		dprint_status(task);
969		task->tk_status = 0;
970		task->tk_action = call_connect;
971		return;
972	}
973
974	switch (task->tk_status) {
975	case -EAGAIN:
976		dprintk("RPC: %5u rpcbind waiting for another request "
977				"to finish\n", task->tk_pid);
978		/* avoid busy-waiting here -- could be a network outage. */
979		rpc_delay(task, 5*HZ);
980		goto retry_timeout;
981	case -EACCES:
982		dprintk("RPC: %5u remote rpcbind: RPC program/version "
983				"unavailable\n", task->tk_pid);
984		/* fail immediately if this is an RPC ping */
985		if (task->tk_msg.rpc_proc->p_proc == 0) {
986			status = -EOPNOTSUPP;
987			break;
988		}
989		rpc_delay(task, 3*HZ);
990		goto retry_timeout;
991	case -ETIMEDOUT:
992		dprintk("RPC: %5u rpcbind request timed out\n",
993				task->tk_pid);
994		goto retry_timeout;
995	case -EPFNOSUPPORT:
996		/* server doesn't support any rpcbind version we know of */
997		dprintk("RPC: %5u remote rpcbind service unavailable\n",
998				task->tk_pid);
999		break;
1000	case -EPROTONOSUPPORT:
1001		dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1002				task->tk_pid);
1003		task->tk_status = 0;
1004		task->tk_action = call_bind;
1005		return;
1006	default:
1007		dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1008				task->tk_pid, -task->tk_status);
1009	}
1010
1011	rpc_exit(task, status);
1012	return;
1013
1014retry_timeout:
1015	task->tk_action = call_timeout;
1016}
1017
1018/*
1019 * 4b.	Connect to the RPC server
1020 */
1021static void
1022call_connect(struct rpc_task *task)
1023{
1024	struct rpc_xprt *xprt = task->tk_xprt;
1025
1026	dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1027			task->tk_pid, xprt,
1028			(xprt_connected(xprt) ? "is" : "is not"));
1029
1030	task->tk_action = call_transmit;
1031	if (!xprt_connected(xprt)) {
1032		task->tk_action = call_connect_status;
1033		if (task->tk_status < 0)
1034			return;
1035		xprt_connect(task);
1036	}
1037}
1038
1039/*
1040 * 4c.	Sort out connect result
1041 */
1042static void
1043call_connect_status(struct rpc_task *task)
1044{
1045	struct rpc_clnt *clnt = task->tk_client;
1046	int status = task->tk_status;
1047
1048	dprint_status(task);
1049
1050	task->tk_status = 0;
1051	if (status >= 0) {
1052		clnt->cl_stats->netreconn++;
1053		task->tk_action = call_transmit;
1054		return;
1055	}
1056
1057	/* Something failed: remote service port may have changed */
1058	rpc_force_rebind(clnt);
1059
1060	switch (status) {
1061	case -ENOTCONN:
1062	case -EAGAIN:
1063		task->tk_action = call_bind;
1064		if (!RPC_IS_SOFT(task))
1065			return;
1066		/* if soft mounted, test if we've timed out */
1067	case -ETIMEDOUT:
1068		task->tk_action = call_timeout;
1069		return;
1070	}
1071	rpc_exit(task, -EIO);
1072}
1073
1074/*
1075 * 5.	Transmit the RPC request, and wait for reply
1076 */
1077static void
1078call_transmit(struct rpc_task *task)
1079{
1080	dprint_status(task);
1081
1082	task->tk_action = call_status;
1083	if (task->tk_status < 0)
1084		return;
1085	task->tk_status = xprt_prepare_transmit(task);
1086	if (task->tk_status != 0)
1087		return;
1088	task->tk_action = call_transmit_status;
1089	/* Encode here so that rpcsec_gss can use correct sequence number. */
1090	if (rpc_task_need_encode(task)) {
1091		BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
1092		call_encode(task);
1093		/* Did the encode result in an error condition? */
1094		if (task->tk_status != 0)
1095			return;
1096	}
1097	xprt_transmit(task);
1098	if (task->tk_status < 0)
1099		return;
1100	/*
1101	 * On success, ensure that we call xprt_end_transmit() before sleeping
1102	 * in order to allow access to the socket to other RPC requests.
1103	 */
1104	call_transmit_status(task);
1105	if (task->tk_msg.rpc_proc->p_decode != NULL)
1106		return;
1107	task->tk_action = rpc_exit_task;
1108	rpc_wake_up_task(task);
1109}
1110
1111/*
1112 * 5a.	Handle cleanup after a transmission
1113 */
1114static void
1115call_transmit_status(struct rpc_task *task)
1116{
1117	task->tk_action = call_status;
1118	/*
1119	 * Special case: if we've been waiting on the socket's write_space()
1120	 * callback, then don't call xprt_end_transmit().
1121	 */
1122	if (task->tk_status == -EAGAIN)
1123		return;
1124	xprt_end_transmit(task);
1125	rpc_task_force_reencode(task);
1126}
1127
1128/*
1129 * 6.	Sort out the RPC call status
1130 */
1131static void
1132call_status(struct rpc_task *task)
1133{
1134	struct rpc_clnt	*clnt = task->tk_client;
1135	struct rpc_rqst	*req = task->tk_rqstp;
1136	int		status;
1137
1138	if (req->rq_received > 0 && !req->rq_bytes_sent)
1139		task->tk_status = req->rq_received;
1140
1141	dprint_status(task);
1142
1143	status = task->tk_status;
1144	if (status >= 0) {
1145		task->tk_action = call_decode;
1146		return;
1147	}
1148
1149	task->tk_status = 0;
1150	switch(status) {
1151	case -EHOSTDOWN:
1152	case -EHOSTUNREACH:
1153	case -ENETUNREACH:
1154		/*
1155		 * Delay any retries for 3 seconds, then handle as if it
1156		 * were a timeout.
1157		 */
1158		rpc_delay(task, 3*HZ);
1159	case -ETIMEDOUT:
1160		task->tk_action = call_timeout;
1161		if (task->tk_client->cl_discrtry)
1162			xprt_force_disconnect(task->tk_xprt);
1163		break;
1164	case -ECONNREFUSED:
1165	case -ENOTCONN:
1166		rpc_force_rebind(clnt);
1167		task->tk_action = call_bind;
1168		break;
1169	case -EAGAIN:
1170		task->tk_action = call_transmit;
1171		break;
1172	case -EIO:
1173		/* shutdown or soft timeout */
1174		rpc_exit(task, status);
1175		break;
1176	default:
1177		printk("%s: RPC call returned error %d\n",
1178			       clnt->cl_protname, -status);
1179		rpc_exit(task, status);
1180	}
1181}
1182
1183/*
1184 * 6a.	Handle RPC timeout
1185 * 	We do not release the request slot, so we keep using the
1186 *	same XID for all retransmits.
1187 */
1188static void
1189call_timeout(struct rpc_task *task)
1190{
1191	struct rpc_clnt	*clnt = task->tk_client;
1192
1193	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1194		dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1195		goto retry;
1196	}
1197
1198	dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1199	task->tk_timeouts++;
1200
1201	if (RPC_IS_SOFT(task)) {
1202		printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1203				clnt->cl_protname, clnt->cl_server);
1204		rpc_exit(task, -EIO);
1205		return;
1206	}
1207
1208	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1209		task->tk_flags |= RPC_CALL_MAJORSEEN;
1210		printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1211			clnt->cl_protname, clnt->cl_server);
1212	}
1213	rpc_force_rebind(clnt);
1214
1215retry:
1216	clnt->cl_stats->rpcretrans++;
1217	task->tk_action = call_bind;
1218	task->tk_status = 0;
1219}
1220
1221/*
1222 * 7.	Decode the RPC reply
1223 */
1224static void
1225call_decode(struct rpc_task *task)
1226{
1227	struct rpc_clnt	*clnt = task->tk_client;
1228	struct rpc_rqst	*req = task->tk_rqstp;
1229	kxdrproc_t	decode = task->tk_msg.rpc_proc->p_decode;
1230	__be32		*p;
1231
1232	dprintk("RPC: %5u call_decode (status %d)\n",
1233			task->tk_pid, task->tk_status);
1234
1235	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1236		printk(KERN_NOTICE "%s: server %s OK\n",
1237			clnt->cl_protname, clnt->cl_server);
1238		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1239	}
1240
1241	if (task->tk_status < 12) {
1242		if (!RPC_IS_SOFT(task)) {
1243			task->tk_action = call_bind;
1244			clnt->cl_stats->rpcretrans++;
1245			goto out_retry;
1246		}
1247		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1248				clnt->cl_protname, task->tk_status);
1249		task->tk_action = call_timeout;
1250		goto out_retry;
1251	}
1252
1253	/*
1254	 * Ensure that we see all writes made by xprt_complete_rqst()
1255	 * before it changed req->rq_received.
1256	 */
1257	smp_rmb();
1258	req->rq_rcv_buf.len = req->rq_private_buf.len;
1259
1260	/* Check that the softirq receive buffer is valid */
1261	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1262				sizeof(req->rq_rcv_buf)) != 0);
1263
1264	/* Verify the RPC header */
1265	p = call_verify(task);
1266	if (IS_ERR(p)) {
1267		if (p == ERR_PTR(-EAGAIN))
1268			goto out_retry;
1269		return;
1270	}
1271
1272	task->tk_action = rpc_exit_task;
1273
1274	if (decode) {
1275		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1276						      task->tk_msg.rpc_resp);
1277	}
1278	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1279			task->tk_status);
1280	return;
1281out_retry:
1282	req->rq_received = req->rq_private_buf.len = 0;
1283	task->tk_status = 0;
1284	if (task->tk_client->cl_discrtry)
1285		xprt_force_disconnect(task->tk_xprt);
1286}
1287
1288/*
1289 * 8.	Refresh the credentials if rejected by the server
1290 */
1291static void
1292call_refresh(struct rpc_task *task)
1293{
1294	dprint_status(task);
1295
1296	task->tk_action = call_refreshresult;
1297	task->tk_status = 0;
1298	task->tk_client->cl_stats->rpcauthrefresh++;
1299	rpcauth_refreshcred(task);
1300}
1301
1302/*
1303 * 8a.	Process the results of a credential refresh
1304 */
1305static void
1306call_refreshresult(struct rpc_task *task)
1307{
1308	int status = task->tk_status;
1309
1310	dprint_status(task);
1311
1312	task->tk_status = 0;
1313	task->tk_action = call_reserve;
1314	if (status >= 0 && rpcauth_uptodatecred(task))
1315		return;
1316	if (status == -EACCES) {
1317		rpc_exit(task, -EACCES);
1318		return;
1319	}
1320	task->tk_action = call_refresh;
1321	if (status != -ETIMEDOUT)
1322		rpc_delay(task, 3*HZ);
1323	return;
1324}
1325
1326/*
1327 * Call header serialization
1328 */
1329static __be32 *
1330call_header(struct rpc_task *task)
1331{
1332	struct rpc_clnt *clnt = task->tk_client;
1333	struct rpc_rqst	*req = task->tk_rqstp;
1334	__be32		*p = req->rq_svec[0].iov_base;
1335
1336	/* FIXME: check buffer size? */
1337
1338	p = xprt_skip_transport_header(task->tk_xprt, p);
1339	*p++ = req->rq_xid;		/* XID */
1340	*p++ = htonl(RPC_CALL);		/* CALL */
1341	*p++ = htonl(RPC_VERSION);	/* RPC version */
1342	*p++ = htonl(clnt->cl_prog);	/* program number */
1343	*p++ = htonl(clnt->cl_vers);	/* program version */
1344	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
1345	p = rpcauth_marshcred(task, p);
1346	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1347	return p;
1348}
1349
1350/*
1351 * Reply header verification
1352 */
1353static __be32 *
1354call_verify(struct rpc_task *task)
1355{
1356	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1357	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1358	__be32	*p = iov->iov_base;
1359	u32 n;
1360	int error = -EACCES;
1361
1362	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1363		/* RFC-1014 says that the representation of XDR data must be a
1364		 * multiple of four bytes
1365		 * - if it isn't pointer subtraction in the NFS client may give
1366		 *   undefined results
1367		 */
1368		dprintk("RPC: %5u %s: XDR representation not a multiple of"
1369		       " 4 bytes: 0x%x\n", task->tk_pid, __FUNCTION__,
1370		       task->tk_rqstp->rq_rcv_buf.len);
1371		goto out_eio;
1372	}
1373	if ((len -= 3) < 0)
1374		goto out_overflow;
1375	p += 1;	/* skip XID */
1376
1377	if ((n = ntohl(*p++)) != RPC_REPLY) {
1378		dprintk("RPC: %5u %s: not an RPC reply: %x\n",
1379				task->tk_pid, __FUNCTION__, n);
1380		goto out_garbage;
1381	}
1382	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1383		if (--len < 0)
1384			goto out_overflow;
1385		switch ((n = ntohl(*p++))) {
1386			case RPC_AUTH_ERROR:
1387				break;
1388			case RPC_MISMATCH:
1389				dprintk("RPC: %5u %s: RPC call version "
1390						"mismatch!\n",
1391						task->tk_pid, __FUNCTION__);
1392				error = -EPROTONOSUPPORT;
1393				goto out_err;
1394			default:
1395				dprintk("RPC: %5u %s: RPC call rejected, "
1396						"unknown error: %x\n",
1397						task->tk_pid, __FUNCTION__, n);
1398				goto out_eio;
1399		}
1400		if (--len < 0)
1401			goto out_overflow;
1402		switch ((n = ntohl(*p++))) {
1403		case RPC_AUTH_REJECTEDCRED:
1404		case RPC_AUTH_REJECTEDVERF:
1405		case RPCSEC_GSS_CREDPROBLEM:
1406		case RPCSEC_GSS_CTXPROBLEM:
1407			if (!task->tk_cred_retry)
1408				break;
1409			task->tk_cred_retry--;
1410			dprintk("RPC: %5u %s: retry stale creds\n",
1411					task->tk_pid, __FUNCTION__);
1412			rpcauth_invalcred(task);
1413			/* Ensure we obtain a new XID! */
1414			xprt_release(task);
1415			task->tk_action = call_refresh;
1416			goto out_retry;
1417		case RPC_AUTH_BADCRED:
1418		case RPC_AUTH_BADVERF:
1419			/* possibly garbled cred/verf? */
1420			if (!task->tk_garb_retry)
1421				break;
1422			task->tk_garb_retry--;
1423			dprintk("RPC: %5u %s: retry garbled creds\n",
1424					task->tk_pid, __FUNCTION__);
1425			task->tk_action = call_bind;
1426			goto out_retry;
1427		case RPC_AUTH_TOOWEAK:
1428			printk(KERN_NOTICE "call_verify: server %s requires stronger "
1429			       "authentication.\n", task->tk_client->cl_server);
1430			break;
1431		default:
1432			dprintk("RPC: %5u %s: unknown auth error: %x\n",
1433					task->tk_pid, __FUNCTION__, n);
1434			error = -EIO;
1435		}
1436		dprintk("RPC: %5u %s: call rejected %d\n",
1437				task->tk_pid, __FUNCTION__, n);
1438		goto out_err;
1439	}
1440	if (!(p = rpcauth_checkverf(task, p))) {
1441		dprintk("RPC: %5u %s: auth check failed\n",
1442				task->tk_pid, __FUNCTION__);
1443		goto out_garbage;		/* bad verifier, retry */
1444	}
1445	len = p - (__be32 *)iov->iov_base - 1;
1446	if (len < 0)
1447		goto out_overflow;
1448	switch ((n = ntohl(*p++))) {
1449	case RPC_SUCCESS:
1450		return p;
1451	case RPC_PROG_UNAVAIL:
1452		dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
1453				task->tk_pid, __FUNCTION__,
1454				(unsigned int)task->tk_client->cl_prog,
1455				task->tk_client->cl_server);
1456		error = -EPFNOSUPPORT;
1457		goto out_err;
1458	case RPC_PROG_MISMATCH:
1459		dprintk("RPC: %5u %s: program %u, version %u unsupported by "
1460				"server %s\n", task->tk_pid, __FUNCTION__,
1461				(unsigned int)task->tk_client->cl_prog,
1462				(unsigned int)task->tk_client->cl_vers,
1463				task->tk_client->cl_server);
1464		error = -EPROTONOSUPPORT;
1465		goto out_err;
1466	case RPC_PROC_UNAVAIL:
1467		dprintk("RPC: %5u %s: proc %p unsupported by program %u, "
1468				"version %u on server %s\n",
1469				task->tk_pid, __FUNCTION__,
1470				task->tk_msg.rpc_proc,
1471				task->tk_client->cl_prog,
1472				task->tk_client->cl_vers,
1473				task->tk_client->cl_server);
1474		error = -EOPNOTSUPP;
1475		goto out_err;
1476	case RPC_GARBAGE_ARGS:
1477		dprintk("RPC: %5u %s: server saw garbage\n",
1478				task->tk_pid, __FUNCTION__);
1479		break;			/* retry */
1480	default:
1481		dprintk("RPC: %5u %s: server accept status: %x\n",
1482				task->tk_pid, __FUNCTION__, n);
1483		/* Also retry */
1484	}
1485
1486out_garbage:
1487	task->tk_client->cl_stats->rpcgarbage++;
1488	if (task->tk_garb_retry) {
1489		task->tk_garb_retry--;
1490		dprintk("RPC: %5u %s: retrying\n",
1491				task->tk_pid, __FUNCTION__);
1492		task->tk_action = call_bind;
1493out_retry:
1494		return ERR_PTR(-EAGAIN);
1495	}
1496out_eio:
1497	error = -EIO;
1498out_err:
1499	rpc_exit(task, error);
1500	dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
1501			__FUNCTION__, error);
1502	return ERR_PTR(error);
1503out_overflow:
1504	dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
1505			__FUNCTION__);
1506	goto out_garbage;
1507}
1508
1509static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
1510{
1511	return 0;
1512}
1513
1514static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
1515{
1516	return 0;
1517}
1518
1519static struct rpc_procinfo rpcproc_null = {
1520	.p_encode = rpcproc_encode_null,
1521	.p_decode = rpcproc_decode_null,
1522};
1523
1524static int rpc_ping(struct rpc_clnt *clnt, int flags)
1525{
1526	struct rpc_message msg = {
1527		.rpc_proc = &rpcproc_null,
1528	};
1529	int err;
1530	msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1531	err = rpc_call_sync(clnt, &msg, flags);
1532	put_rpccred(msg.rpc_cred);
1533	return err;
1534}
1535
1536struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
1537{
1538	struct rpc_message msg = {
1539		.rpc_proc = &rpcproc_null,
1540		.rpc_cred = cred,
1541	};
1542	struct rpc_task_setup task_setup_data = {
1543		.rpc_client = clnt,
1544		.rpc_message = &msg,
1545		.callback_ops = &rpc_default_ops,
1546		.flags = flags,
1547	};
1548	return rpc_run_task(&task_setup_data);
1549}
1550EXPORT_SYMBOL_GPL(rpc_call_null);
1551
1552#ifdef RPC_DEBUG
1553void rpc_show_tasks(void)
1554{
1555	struct rpc_clnt *clnt;
1556	struct rpc_task *t;
1557
1558	spin_lock(&rpc_client_lock);
1559	if (list_empty(&all_clients))
1560		goto out;
1561	printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1562		"-rpcwait -action- ---ops--\n");
1563	list_for_each_entry(clnt, &all_clients, cl_clients) {
1564		if (list_empty(&clnt->cl_tasks))
1565			continue;
1566		spin_lock(&clnt->cl_lock);
1567		list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
1568			const char *rpc_waitq = "none";
1569			int proc;
1570
1571			if (t->tk_msg.rpc_proc)
1572				proc = t->tk_msg.rpc_proc->p_proc;
1573			else
1574				proc = -1;
1575
1576			if (RPC_IS_QUEUED(t))
1577				rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
1578
1579			printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
1580				t->tk_pid, proc,
1581				t->tk_flags, t->tk_status,
1582				t->tk_client,
1583				(t->tk_client ? t->tk_client->cl_prog : 0),
1584				t->tk_rqstp, t->tk_timeout,
1585				rpc_waitq,
1586				t->tk_action, t->tk_ops);
1587		}
1588		spin_unlock(&clnt->cl_lock);
1589	}
1590out:
1591	spin_unlock(&rpc_client_lock);
1592}
1593#endif
1594