clnt.c revision 41b6b4d0b88f80d04729a5286e838e972733db1e
1/*
2 *  linux/net/sunrpc/clnt.c
3 *
4 *  This file contains the high-level RPC interface.
5 *  It is modeled as a finite state machine to support both synchronous
6 *  and asynchronous requests.
7 *
8 *  -	RPC header generation and argument serialization.
9 *  -	Credential refresh.
10 *  -	TCP connect handling.
11 *  -	Retry of operation when it is suspected the operation failed because
12 *	of uid squashing on the server, or when the credentials were stale
13 *	and need to be refreshed, or when a packet was damaged in transit.
14 *	This may be have to be moved to the VFS layer.
15 *
16 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18 */
19
20
21#include <linux/module.h>
22#include <linux/types.h>
23#include <linux/kallsyms.h>
24#include <linux/mm.h>
25#include <linux/namei.h>
26#include <linux/mount.h>
27#include <linux/slab.h>
28#include <linux/utsname.h>
29#include <linux/workqueue.h>
30#include <linux/in.h>
31#include <linux/in6.h>
32#include <linux/un.h>
33#include <linux/rcupdate.h>
34
35#include <linux/sunrpc/clnt.h>
36#include <linux/sunrpc/addr.h>
37#include <linux/sunrpc/rpc_pipe_fs.h>
38#include <linux/sunrpc/metrics.h>
39#include <linux/sunrpc/bc_xprt.h>
40#include <trace/events/sunrpc.h>
41
42#include "sunrpc.h"
43#include "netns.h"
44
45#ifdef RPC_DEBUG
46# define RPCDBG_FACILITY	RPCDBG_CALL
47#endif
48
49#define dprint_status(t)					\
50	dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,		\
51			__func__, t->tk_status)
52
53/*
54 * All RPC clients are linked into this list
55 */
56
57static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58
59
60static void	call_start(struct rpc_task *task);
61static void	call_reserve(struct rpc_task *task);
62static void	call_reserveresult(struct rpc_task *task);
63static void	call_allocate(struct rpc_task *task);
64static void	call_decode(struct rpc_task *task);
65static void	call_bind(struct rpc_task *task);
66static void	call_bind_status(struct rpc_task *task);
67static void	call_transmit(struct rpc_task *task);
68#if defined(CONFIG_SUNRPC_BACKCHANNEL)
69static void	call_bc_transmit(struct rpc_task *task);
70#endif /* CONFIG_SUNRPC_BACKCHANNEL */
71static void	call_status(struct rpc_task *task);
72static void	call_transmit_status(struct rpc_task *task);
73static void	call_refresh(struct rpc_task *task);
74static void	call_refreshresult(struct rpc_task *task);
75static void	call_timeout(struct rpc_task *task);
76static void	call_connect(struct rpc_task *task);
77static void	call_connect_status(struct rpc_task *task);
78
79static __be32	*rpc_encode_header(struct rpc_task *task);
80static __be32	*rpc_verify_header(struct rpc_task *task);
81static int	rpc_ping(struct rpc_clnt *clnt);
82
83static void rpc_register_client(struct rpc_clnt *clnt)
84{
85	struct net *net = rpc_net_ns(clnt);
86	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87
88	spin_lock(&sn->rpc_client_lock);
89	list_add(&clnt->cl_clients, &sn->all_clients);
90	spin_unlock(&sn->rpc_client_lock);
91}
92
93static void rpc_unregister_client(struct rpc_clnt *clnt)
94{
95	struct net *net = rpc_net_ns(clnt);
96	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97
98	spin_lock(&sn->rpc_client_lock);
99	list_del(&clnt->cl_clients);
100	spin_unlock(&sn->rpc_client_lock);
101}
102
103static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104{
105	if (clnt->cl_dentry) {
106		if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
107			clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
108		rpc_remove_client_dir(clnt->cl_dentry);
109	}
110	clnt->cl_dentry = NULL;
111}
112
113static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
114{
115	struct net *net = rpc_net_ns(clnt);
116	struct super_block *pipefs_sb;
117
118	pipefs_sb = rpc_get_sb_net(net);
119	if (pipefs_sb) {
120		__rpc_clnt_remove_pipedir(clnt);
121		rpc_put_sb_net(net);
122	}
123}
124
125static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
126				    struct rpc_clnt *clnt)
127{
128	static uint32_t clntid;
129	const char *dir_name = clnt->cl_program->pipe_dir_name;
130	char name[15];
131	struct dentry *dir, *dentry;
132
133	dir = rpc_d_lookup_sb(sb, dir_name);
134	if (dir == NULL) {
135		pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
136		return dir;
137	}
138	for (;;) {
139		snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
140		name[sizeof(name) - 1] = '\0';
141		dentry = rpc_create_client_dir(dir, name, clnt);
142		if (!IS_ERR(dentry))
143			break;
144		if (dentry == ERR_PTR(-EEXIST))
145			continue;
146		printk(KERN_INFO "RPC: Couldn't create pipefs entry"
147				" %s/%s, error %ld\n",
148				dir_name, name, PTR_ERR(dentry));
149		break;
150	}
151	dput(dir);
152	return dentry;
153}
154
155static int
156rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
157{
158	struct dentry *dentry;
159
160	if (clnt->cl_program->pipe_dir_name == NULL)
161		goto out;
162	clnt->cl_dentry = NULL;
163	dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
164	if (IS_ERR(dentry))
165		return PTR_ERR(dentry);
166	clnt->cl_dentry = dentry;
167out:
168	return 0;
169}
170
171static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
172{
173	if (clnt->cl_program->pipe_dir_name == NULL)
174		return 1;
175
176	if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
177	    ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
178		return 1;
179	if ((event == RPC_PIPEFS_MOUNT) && atomic_read(&clnt->cl_count) == 0)
180		return 1;
181	return 0;
182}
183
184static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
185				   struct super_block *sb)
186{
187	struct dentry *dentry;
188	int err = 0;
189
190	switch (event) {
191	case RPC_PIPEFS_MOUNT:
192		dentry = rpc_setup_pipedir_sb(sb, clnt);
193		if (!dentry)
194			return -ENOENT;
195		if (IS_ERR(dentry))
196			return PTR_ERR(dentry);
197		clnt->cl_dentry = dentry;
198		if (clnt->cl_auth->au_ops->pipes_create) {
199			err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
200			if (err)
201				__rpc_clnt_remove_pipedir(clnt);
202		}
203		break;
204	case RPC_PIPEFS_UMOUNT:
205		__rpc_clnt_remove_pipedir(clnt);
206		break;
207	default:
208		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
209		return -ENOTSUPP;
210	}
211	return err;
212}
213
214static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
215				struct super_block *sb)
216{
217	int error = 0;
218
219	for (;; clnt = clnt->cl_parent) {
220		if (!rpc_clnt_skip_event(clnt, event))
221			error = __rpc_clnt_handle_event(clnt, event, sb);
222		if (error || clnt == clnt->cl_parent)
223			break;
224	}
225	return error;
226}
227
228static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
229{
230	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
231	struct rpc_clnt *clnt;
232
233	spin_lock(&sn->rpc_client_lock);
234	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
235		if (rpc_clnt_skip_event(clnt, event))
236			continue;
237		spin_unlock(&sn->rpc_client_lock);
238		return clnt;
239	}
240	spin_unlock(&sn->rpc_client_lock);
241	return NULL;
242}
243
244static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
245			    void *ptr)
246{
247	struct super_block *sb = ptr;
248	struct rpc_clnt *clnt;
249	int error = 0;
250
251	while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
252		error = __rpc_pipefs_event(clnt, event, sb);
253		if (error)
254			break;
255	}
256	return error;
257}
258
259static struct notifier_block rpc_clients_block = {
260	.notifier_call	= rpc_pipefs_event,
261	.priority	= SUNRPC_PIPEFS_RPC_PRIO,
262};
263
264int rpc_clients_notifier_register(void)
265{
266	return rpc_pipefs_notifier_register(&rpc_clients_block);
267}
268
269void rpc_clients_notifier_unregister(void)
270{
271	return rpc_pipefs_notifier_unregister(&rpc_clients_block);
272}
273
274static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
275{
276	clnt->cl_nodelen = strlen(nodename);
277	if (clnt->cl_nodelen > UNX_MAXNODENAME)
278		clnt->cl_nodelen = UNX_MAXNODENAME;
279	memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
280}
281
282static int rpc_client_register(const struct rpc_create_args *args,
283			       struct rpc_clnt *clnt)
284{
285	struct rpc_auth *auth;
286	struct net *net = rpc_net_ns(clnt);
287	struct super_block *pipefs_sb;
288	int err;
289
290	pipefs_sb = rpc_get_sb_net(net);
291	if (pipefs_sb) {
292		err = rpc_setup_pipedir(pipefs_sb, clnt);
293		if (err)
294			goto out;
295	}
296
297	rpc_register_client(clnt);
298	if (pipefs_sb)
299		rpc_put_sb_net(net);
300
301	auth = rpcauth_create(args->authflavor, clnt);
302	if (IS_ERR(auth)) {
303		dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
304				args->authflavor);
305		err = PTR_ERR(auth);
306		goto err_auth;
307	}
308	return 0;
309err_auth:
310	pipefs_sb = rpc_get_sb_net(net);
311	rpc_unregister_client(clnt);
312	__rpc_clnt_remove_pipedir(clnt);
313out:
314	if (pipefs_sb)
315		rpc_put_sb_net(net);
316	return err;
317}
318
319static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
320{
321	const struct rpc_program *program = args->program;
322	const struct rpc_version *version;
323	struct rpc_clnt		*clnt = NULL;
324	int err;
325
326	/* sanity check the name before trying to print it */
327	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
328			program->name, args->servername, xprt);
329
330	err = rpciod_up();
331	if (err)
332		goto out_no_rpciod;
333
334	err = -EINVAL;
335	if (args->version >= program->nrvers)
336		goto out_err;
337	version = program->version[args->version];
338	if (version == NULL)
339		goto out_err;
340
341	err = -ENOMEM;
342	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
343	if (!clnt)
344		goto out_err;
345	clnt->cl_parent = clnt;
346
347	rcu_assign_pointer(clnt->cl_xprt, xprt);
348	clnt->cl_procinfo = version->procs;
349	clnt->cl_maxproc  = version->nrprocs;
350	clnt->cl_prog     = args->prognumber ? : program->number;
351	clnt->cl_vers     = version->number;
352	clnt->cl_stats    = program->stats;
353	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
354	err = -ENOMEM;
355	if (clnt->cl_metrics == NULL)
356		goto out_no_stats;
357	clnt->cl_program  = program;
358	INIT_LIST_HEAD(&clnt->cl_tasks);
359	spin_lock_init(&clnt->cl_lock);
360
361	if (!xprt_bound(xprt))
362		clnt->cl_autobind = 1;
363
364	clnt->cl_timeout = xprt->timeout;
365	if (args->timeout != NULL) {
366		memcpy(&clnt->cl_timeout_default, args->timeout,
367				sizeof(clnt->cl_timeout_default));
368		clnt->cl_timeout = &clnt->cl_timeout_default;
369	}
370
371	clnt->cl_rtt = &clnt->cl_rtt_default;
372	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
373	clnt->cl_principal = NULL;
374	if (args->client_name) {
375		clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
376		if (!clnt->cl_principal)
377			goto out_no_principal;
378	}
379
380	atomic_set(&clnt->cl_count, 1);
381
382	/* save the nodename */
383	rpc_clnt_set_nodename(clnt, utsname()->nodename);
384
385	err = rpc_client_register(args, clnt);
386	if (err)
387		goto out_no_path;
388	return clnt;
389
390out_no_path:
391	kfree(clnt->cl_principal);
392out_no_principal:
393	rpc_free_iostats(clnt->cl_metrics);
394out_no_stats:
395	kfree(clnt);
396out_err:
397	rpciod_down();
398out_no_rpciod:
399	xprt_put(xprt);
400	return ERR_PTR(err);
401}
402
403/**
404 * rpc_create - create an RPC client and transport with one call
405 * @args: rpc_clnt create argument structure
406 *
407 * Creates and initializes an RPC transport and an RPC client.
408 *
409 * It can ping the server in order to determine if it is up, and to see if
410 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
411 * this behavior so asynchronous tasks can also use rpc_create.
412 */
413struct rpc_clnt *rpc_create(struct rpc_create_args *args)
414{
415	struct rpc_xprt *xprt;
416	struct rpc_clnt *clnt;
417	struct xprt_create xprtargs = {
418		.net = args->net,
419		.ident = args->protocol,
420		.srcaddr = args->saddress,
421		.dstaddr = args->address,
422		.addrlen = args->addrsize,
423		.servername = args->servername,
424		.bc_xprt = args->bc_xprt,
425	};
426	char servername[48];
427
428	if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
429		xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
430	if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
431		xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
432	/*
433	 * If the caller chooses not to specify a hostname, whip
434	 * up a string representation of the passed-in address.
435	 */
436	if (xprtargs.servername == NULL) {
437		struct sockaddr_un *sun =
438				(struct sockaddr_un *)args->address;
439		struct sockaddr_in *sin =
440				(struct sockaddr_in *)args->address;
441		struct sockaddr_in6 *sin6 =
442				(struct sockaddr_in6 *)args->address;
443
444		servername[0] = '\0';
445		switch (args->address->sa_family) {
446		case AF_LOCAL:
447			snprintf(servername, sizeof(servername), "%s",
448				 sun->sun_path);
449			break;
450		case AF_INET:
451			snprintf(servername, sizeof(servername), "%pI4",
452				 &sin->sin_addr.s_addr);
453			break;
454		case AF_INET6:
455			snprintf(servername, sizeof(servername), "%pI6",
456				 &sin6->sin6_addr);
457			break;
458		default:
459			/* caller wants default server name, but
460			 * address family isn't recognized. */
461			return ERR_PTR(-EINVAL);
462		}
463		xprtargs.servername = servername;
464	}
465
466	xprt = xprt_create_transport(&xprtargs);
467	if (IS_ERR(xprt))
468		return (struct rpc_clnt *)xprt;
469
470	/*
471	 * By default, kernel RPC client connects from a reserved port.
472	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
473	 * but it is always enabled for rpciod, which handles the connect
474	 * operation.
475	 */
476	xprt->resvport = 1;
477	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
478		xprt->resvport = 0;
479
480	clnt = rpc_new_client(args, xprt);
481	if (IS_ERR(clnt))
482		return clnt;
483
484	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
485		int err = rpc_ping(clnt);
486		if (err != 0) {
487			rpc_shutdown_client(clnt);
488			return ERR_PTR(err);
489		}
490	}
491
492	clnt->cl_softrtry = 1;
493	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
494		clnt->cl_softrtry = 0;
495
496	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
497		clnt->cl_autobind = 1;
498	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
499		clnt->cl_discrtry = 1;
500	if (!(args->flags & RPC_CLNT_CREATE_QUIET))
501		clnt->cl_chatty = 1;
502
503	return clnt;
504}
505EXPORT_SYMBOL_GPL(rpc_create);
506
507/*
508 * This function clones the RPC client structure. It allows us to share the
509 * same transport while varying parameters such as the authentication
510 * flavour.
511 */
512static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
513					   struct rpc_clnt *clnt)
514{
515	struct rpc_xprt *xprt;
516	struct rpc_clnt *new;
517	int err;
518
519	err = -ENOMEM;
520	rcu_read_lock();
521	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
522	rcu_read_unlock();
523	if (xprt == NULL)
524		goto out_err;
525	args->servername = xprt->servername;
526
527	new = rpc_new_client(args, xprt);
528	if (IS_ERR(new)) {
529		err = PTR_ERR(new);
530		goto out_err;
531	}
532
533	atomic_inc(&clnt->cl_count);
534	new->cl_parent = clnt;
535
536	/* Turn off autobind on clones */
537	new->cl_autobind = 0;
538	new->cl_softrtry = clnt->cl_softrtry;
539	new->cl_discrtry = clnt->cl_discrtry;
540	new->cl_chatty = clnt->cl_chatty;
541	return new;
542
543out_err:
544	dprintk("RPC:       %s: returned error %d\n", __func__, err);
545	return ERR_PTR(err);
546}
547
548/**
549 * rpc_clone_client - Clone an RPC client structure
550 *
551 * @clnt: RPC client whose parameters are copied
552 *
553 * Returns a fresh RPC client or an ERR_PTR.
554 */
555struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
556{
557	struct rpc_create_args args = {
558		.program	= clnt->cl_program,
559		.prognumber	= clnt->cl_prog,
560		.version	= clnt->cl_vers,
561		.authflavor	= clnt->cl_auth->au_flavor,
562		.client_name	= clnt->cl_principal,
563	};
564	return __rpc_clone_client(&args, clnt);
565}
566EXPORT_SYMBOL_GPL(rpc_clone_client);
567
568/**
569 * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
570 *
571 * @clnt: RPC client whose parameters are copied
572 * @flavor: security flavor for new client
573 *
574 * Returns a fresh RPC client or an ERR_PTR.
575 */
576struct rpc_clnt *
577rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
578{
579	struct rpc_create_args args = {
580		.program	= clnt->cl_program,
581		.prognumber	= clnt->cl_prog,
582		.version	= clnt->cl_vers,
583		.authflavor	= flavor,
584		.client_name	= clnt->cl_principal,
585	};
586	return __rpc_clone_client(&args, clnt);
587}
588EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
589
590/*
591 * Kill all tasks for the given client.
592 * XXX: kill their descendants as well?
593 */
594void rpc_killall_tasks(struct rpc_clnt *clnt)
595{
596	struct rpc_task	*rovr;
597
598
599	if (list_empty(&clnt->cl_tasks))
600		return;
601	dprintk("RPC:       killing all tasks for client %p\n", clnt);
602	/*
603	 * Spin lock all_tasks to prevent changes...
604	 */
605	spin_lock(&clnt->cl_lock);
606	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
607		if (!RPC_IS_ACTIVATED(rovr))
608			continue;
609		if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
610			rovr->tk_flags |= RPC_TASK_KILLED;
611			rpc_exit(rovr, -EIO);
612			if (RPC_IS_QUEUED(rovr))
613				rpc_wake_up_queued_task(rovr->tk_waitqueue,
614							rovr);
615		}
616	}
617	spin_unlock(&clnt->cl_lock);
618}
619EXPORT_SYMBOL_GPL(rpc_killall_tasks);
620
621/*
622 * Properly shut down an RPC client, terminating all outstanding
623 * requests.
624 */
625void rpc_shutdown_client(struct rpc_clnt *clnt)
626{
627	might_sleep();
628
629	dprintk_rcu("RPC:       shutting down %s client for %s\n",
630			clnt->cl_program->name,
631			rcu_dereference(clnt->cl_xprt)->servername);
632
633	while (!list_empty(&clnt->cl_tasks)) {
634		rpc_killall_tasks(clnt);
635		wait_event_timeout(destroy_wait,
636			list_empty(&clnt->cl_tasks), 1*HZ);
637	}
638
639	rpc_release_client(clnt);
640}
641EXPORT_SYMBOL_GPL(rpc_shutdown_client);
642
643/*
644 * Free an RPC client
645 */
646static void
647rpc_free_client(struct rpc_clnt *clnt)
648{
649	dprintk_rcu("RPC:       destroying %s client for %s\n",
650			clnt->cl_program->name,
651			rcu_dereference(clnt->cl_xprt)->servername);
652	if (clnt->cl_parent != clnt)
653		rpc_release_client(clnt->cl_parent);
654	rpc_clnt_remove_pipedir(clnt);
655	rpc_unregister_client(clnt);
656	rpc_free_iostats(clnt->cl_metrics);
657	kfree(clnt->cl_principal);
658	clnt->cl_metrics = NULL;
659	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
660	rpciod_down();
661	kfree(clnt);
662}
663
664/*
665 * Free an RPC client
666 */
667static void
668rpc_free_auth(struct rpc_clnt *clnt)
669{
670	if (clnt->cl_auth == NULL) {
671		rpc_free_client(clnt);
672		return;
673	}
674
675	/*
676	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
677	 *       release remaining GSS contexts. This mechanism ensures
678	 *       that it can do so safely.
679	 */
680	atomic_inc(&clnt->cl_count);
681	rpcauth_release(clnt->cl_auth);
682	clnt->cl_auth = NULL;
683	if (atomic_dec_and_test(&clnt->cl_count))
684		rpc_free_client(clnt);
685}
686
687/*
688 * Release reference to the RPC client
689 */
690void
691rpc_release_client(struct rpc_clnt *clnt)
692{
693	dprintk("RPC:       rpc_release_client(%p)\n", clnt);
694
695	if (list_empty(&clnt->cl_tasks))
696		wake_up(&destroy_wait);
697	if (atomic_dec_and_test(&clnt->cl_count))
698		rpc_free_auth(clnt);
699}
700EXPORT_SYMBOL_GPL(rpc_release_client);
701
702/**
703 * rpc_bind_new_program - bind a new RPC program to an existing client
704 * @old: old rpc_client
705 * @program: rpc program to set
706 * @vers: rpc program version
707 *
708 * Clones the rpc client and sets up a new RPC program. This is mainly
709 * of use for enabling different RPC programs to share the same transport.
710 * The Sun NFSv2/v3 ACL protocol can do this.
711 */
712struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
713				      const struct rpc_program *program,
714				      u32 vers)
715{
716	struct rpc_create_args args = {
717		.program	= program,
718		.prognumber	= program->number,
719		.version	= vers,
720		.authflavor	= old->cl_auth->au_flavor,
721		.client_name	= old->cl_principal,
722	};
723	struct rpc_clnt *clnt;
724	int err;
725
726	clnt = __rpc_clone_client(&args, old);
727	if (IS_ERR(clnt))
728		goto out;
729	err = rpc_ping(clnt);
730	if (err != 0) {
731		rpc_shutdown_client(clnt);
732		clnt = ERR_PTR(err);
733	}
734out:
735	return clnt;
736}
737EXPORT_SYMBOL_GPL(rpc_bind_new_program);
738
739void rpc_task_release_client(struct rpc_task *task)
740{
741	struct rpc_clnt *clnt = task->tk_client;
742
743	if (clnt != NULL) {
744		/* Remove from client task list */
745		spin_lock(&clnt->cl_lock);
746		list_del(&task->tk_task);
747		spin_unlock(&clnt->cl_lock);
748		task->tk_client = NULL;
749
750		rpc_release_client(clnt);
751	}
752}
753
754static
755void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
756{
757	if (clnt != NULL) {
758		rpc_task_release_client(task);
759		task->tk_client = clnt;
760		atomic_inc(&clnt->cl_count);
761		if (clnt->cl_softrtry)
762			task->tk_flags |= RPC_TASK_SOFT;
763		if (sk_memalloc_socks()) {
764			struct rpc_xprt *xprt;
765
766			rcu_read_lock();
767			xprt = rcu_dereference(clnt->cl_xprt);
768			if (xprt->swapper)
769				task->tk_flags |= RPC_TASK_SWAPPER;
770			rcu_read_unlock();
771		}
772		/* Add to the client's list of all tasks */
773		spin_lock(&clnt->cl_lock);
774		list_add_tail(&task->tk_task, &clnt->cl_tasks);
775		spin_unlock(&clnt->cl_lock);
776	}
777}
778
779void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
780{
781	rpc_task_release_client(task);
782	rpc_task_set_client(task, clnt);
783}
784EXPORT_SYMBOL_GPL(rpc_task_reset_client);
785
786
787static void
788rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
789{
790	if (msg != NULL) {
791		task->tk_msg.rpc_proc = msg->rpc_proc;
792		task->tk_msg.rpc_argp = msg->rpc_argp;
793		task->tk_msg.rpc_resp = msg->rpc_resp;
794		if (msg->rpc_cred != NULL)
795			task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
796	}
797}
798
799/*
800 * Default callback for async RPC calls
801 */
802static void
803rpc_default_callback(struct rpc_task *task, void *data)
804{
805}
806
807static const struct rpc_call_ops rpc_default_ops = {
808	.rpc_call_done = rpc_default_callback,
809};
810
811/**
812 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
813 * @task_setup_data: pointer to task initialisation data
814 */
815struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
816{
817	struct rpc_task *task;
818
819	task = rpc_new_task(task_setup_data);
820	if (IS_ERR(task))
821		goto out;
822
823	rpc_task_set_client(task, task_setup_data->rpc_client);
824	rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
825
826	if (task->tk_action == NULL)
827		rpc_call_start(task);
828
829	atomic_inc(&task->tk_count);
830	rpc_execute(task);
831out:
832	return task;
833}
834EXPORT_SYMBOL_GPL(rpc_run_task);
835
836/**
837 * rpc_call_sync - Perform a synchronous RPC call
838 * @clnt: pointer to RPC client
839 * @msg: RPC call parameters
840 * @flags: RPC call flags
841 */
842int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
843{
844	struct rpc_task	*task;
845	struct rpc_task_setup task_setup_data = {
846		.rpc_client = clnt,
847		.rpc_message = msg,
848		.callback_ops = &rpc_default_ops,
849		.flags = flags,
850	};
851	int status;
852
853	WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
854	if (flags & RPC_TASK_ASYNC) {
855		rpc_release_calldata(task_setup_data.callback_ops,
856			task_setup_data.callback_data);
857		return -EINVAL;
858	}
859
860	task = rpc_run_task(&task_setup_data);
861	if (IS_ERR(task))
862		return PTR_ERR(task);
863	status = task->tk_status;
864	rpc_put_task(task);
865	return status;
866}
867EXPORT_SYMBOL_GPL(rpc_call_sync);
868
869/**
870 * rpc_call_async - Perform an asynchronous RPC call
871 * @clnt: pointer to RPC client
872 * @msg: RPC call parameters
873 * @flags: RPC call flags
874 * @tk_ops: RPC call ops
875 * @data: user call data
876 */
877int
878rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
879	       const struct rpc_call_ops *tk_ops, void *data)
880{
881	struct rpc_task	*task;
882	struct rpc_task_setup task_setup_data = {
883		.rpc_client = clnt,
884		.rpc_message = msg,
885		.callback_ops = tk_ops,
886		.callback_data = data,
887		.flags = flags|RPC_TASK_ASYNC,
888	};
889
890	task = rpc_run_task(&task_setup_data);
891	if (IS_ERR(task))
892		return PTR_ERR(task);
893	rpc_put_task(task);
894	return 0;
895}
896EXPORT_SYMBOL_GPL(rpc_call_async);
897
898#if defined(CONFIG_SUNRPC_BACKCHANNEL)
899/**
900 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
901 * rpc_execute against it
902 * @req: RPC request
903 * @tk_ops: RPC call ops
904 */
905struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
906				const struct rpc_call_ops *tk_ops)
907{
908	struct rpc_task *task;
909	struct xdr_buf *xbufp = &req->rq_snd_buf;
910	struct rpc_task_setup task_setup_data = {
911		.callback_ops = tk_ops,
912	};
913
914	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
915	/*
916	 * Create an rpc_task to send the data
917	 */
918	task = rpc_new_task(&task_setup_data);
919	if (IS_ERR(task)) {
920		xprt_free_bc_request(req);
921		goto out;
922	}
923	task->tk_rqstp = req;
924
925	/*
926	 * Set up the xdr_buf length.
927	 * This also indicates that the buffer is XDR encoded already.
928	 */
929	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
930			xbufp->tail[0].iov_len;
931
932	task->tk_action = call_bc_transmit;
933	atomic_inc(&task->tk_count);
934	WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
935	rpc_execute(task);
936
937out:
938	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
939	return task;
940}
941#endif /* CONFIG_SUNRPC_BACKCHANNEL */
942
943void
944rpc_call_start(struct rpc_task *task)
945{
946	task->tk_action = call_start;
947}
948EXPORT_SYMBOL_GPL(rpc_call_start);
949
950/**
951 * rpc_peeraddr - extract remote peer address from clnt's xprt
952 * @clnt: RPC client structure
953 * @buf: target buffer
954 * @bufsize: length of target buffer
955 *
956 * Returns the number of bytes that are actually in the stored address.
957 */
958size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
959{
960	size_t bytes;
961	struct rpc_xprt *xprt;
962
963	rcu_read_lock();
964	xprt = rcu_dereference(clnt->cl_xprt);
965
966	bytes = xprt->addrlen;
967	if (bytes > bufsize)
968		bytes = bufsize;
969	memcpy(buf, &xprt->addr, bytes);
970	rcu_read_unlock();
971
972	return bytes;
973}
974EXPORT_SYMBOL_GPL(rpc_peeraddr);
975
976/**
977 * rpc_peeraddr2str - return remote peer address in printable format
978 * @clnt: RPC client structure
979 * @format: address format
980 *
981 * NB: the lifetime of the memory referenced by the returned pointer is
982 * the same as the rpc_xprt itself.  As long as the caller uses this
983 * pointer, it must hold the RCU read lock.
984 */
985const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
986			     enum rpc_display_format_t format)
987{
988	struct rpc_xprt *xprt;
989
990	xprt = rcu_dereference(clnt->cl_xprt);
991
992	if (xprt->address_strings[format] != NULL)
993		return xprt->address_strings[format];
994	else
995		return "unprintable";
996}
997EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
998
999static const struct sockaddr_in rpc_inaddr_loopback = {
1000	.sin_family		= AF_INET,
1001	.sin_addr.s_addr	= htonl(INADDR_ANY),
1002};
1003
1004static const struct sockaddr_in6 rpc_in6addr_loopback = {
1005	.sin6_family		= AF_INET6,
1006	.sin6_addr		= IN6ADDR_ANY_INIT,
1007};
1008
1009/*
1010 * Try a getsockname() on a connected datagram socket.  Using a
1011 * connected datagram socket prevents leaving a socket in TIME_WAIT.
1012 * This conserves the ephemeral port number space.
1013 *
1014 * Returns zero and fills in "buf" if successful; otherwise, a
1015 * negative errno is returned.
1016 */
1017static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1018			struct sockaddr *buf, int buflen)
1019{
1020	struct socket *sock;
1021	int err;
1022
1023	err = __sock_create(net, sap->sa_family,
1024				SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1025	if (err < 0) {
1026		dprintk("RPC:       can't create UDP socket (%d)\n", err);
1027		goto out;
1028	}
1029
1030	switch (sap->sa_family) {
1031	case AF_INET:
1032		err = kernel_bind(sock,
1033				(struct sockaddr *)&rpc_inaddr_loopback,
1034				sizeof(rpc_inaddr_loopback));
1035		break;
1036	case AF_INET6:
1037		err = kernel_bind(sock,
1038				(struct sockaddr *)&rpc_in6addr_loopback,
1039				sizeof(rpc_in6addr_loopback));
1040		break;
1041	default:
1042		err = -EAFNOSUPPORT;
1043		goto out;
1044	}
1045	if (err < 0) {
1046		dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1047		goto out_release;
1048	}
1049
1050	err = kernel_connect(sock, sap, salen, 0);
1051	if (err < 0) {
1052		dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1053		goto out_release;
1054	}
1055
1056	err = kernel_getsockname(sock, buf, &buflen);
1057	if (err < 0) {
1058		dprintk("RPC:       getsockname failed (%d)\n", err);
1059		goto out_release;
1060	}
1061
1062	err = 0;
1063	if (buf->sa_family == AF_INET6) {
1064		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1065		sin6->sin6_scope_id = 0;
1066	}
1067	dprintk("RPC:       %s succeeded\n", __func__);
1068
1069out_release:
1070	sock_release(sock);
1071out:
1072	return err;
1073}
1074
1075/*
1076 * Scraping a connected socket failed, so we don't have a useable
1077 * local address.  Fallback: generate an address that will prevent
1078 * the server from calling us back.
1079 *
1080 * Returns zero and fills in "buf" if successful; otherwise, a
1081 * negative errno is returned.
1082 */
1083static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1084{
1085	switch (family) {
1086	case AF_INET:
1087		if (buflen < sizeof(rpc_inaddr_loopback))
1088			return -EINVAL;
1089		memcpy(buf, &rpc_inaddr_loopback,
1090				sizeof(rpc_inaddr_loopback));
1091		break;
1092	case AF_INET6:
1093		if (buflen < sizeof(rpc_in6addr_loopback))
1094			return -EINVAL;
1095		memcpy(buf, &rpc_in6addr_loopback,
1096				sizeof(rpc_in6addr_loopback));
1097	default:
1098		dprintk("RPC:       %s: address family not supported\n",
1099			__func__);
1100		return -EAFNOSUPPORT;
1101	}
1102	dprintk("RPC:       %s: succeeded\n", __func__);
1103	return 0;
1104}
1105
1106/**
1107 * rpc_localaddr - discover local endpoint address for an RPC client
1108 * @clnt: RPC client structure
1109 * @buf: target buffer
1110 * @buflen: size of target buffer, in bytes
1111 *
1112 * Returns zero and fills in "buf" and "buflen" if successful;
1113 * otherwise, a negative errno is returned.
1114 *
1115 * This works even if the underlying transport is not currently connected,
1116 * or if the upper layer never previously provided a source address.
1117 *
1118 * The result of this function call is transient: multiple calls in
1119 * succession may give different results, depending on how local
1120 * networking configuration changes over time.
1121 */
1122int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1123{
1124	struct sockaddr_storage address;
1125	struct sockaddr *sap = (struct sockaddr *)&address;
1126	struct rpc_xprt *xprt;
1127	struct net *net;
1128	size_t salen;
1129	int err;
1130
1131	rcu_read_lock();
1132	xprt = rcu_dereference(clnt->cl_xprt);
1133	salen = xprt->addrlen;
1134	memcpy(sap, &xprt->addr, salen);
1135	net = get_net(xprt->xprt_net);
1136	rcu_read_unlock();
1137
1138	rpc_set_port(sap, 0);
1139	err = rpc_sockname(net, sap, salen, buf, buflen);
1140	put_net(net);
1141	if (err != 0)
1142		/* Couldn't discover local address, return ANYADDR */
1143		return rpc_anyaddr(sap->sa_family, buf, buflen);
1144	return 0;
1145}
1146EXPORT_SYMBOL_GPL(rpc_localaddr);
1147
1148void
1149rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1150{
1151	struct rpc_xprt *xprt;
1152
1153	rcu_read_lock();
1154	xprt = rcu_dereference(clnt->cl_xprt);
1155	if (xprt->ops->set_buffer_size)
1156		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1157	rcu_read_unlock();
1158}
1159EXPORT_SYMBOL_GPL(rpc_setbufsize);
1160
1161/**
1162 * rpc_protocol - Get transport protocol number for an RPC client
1163 * @clnt: RPC client to query
1164 *
1165 */
1166int rpc_protocol(struct rpc_clnt *clnt)
1167{
1168	int protocol;
1169
1170	rcu_read_lock();
1171	protocol = rcu_dereference(clnt->cl_xprt)->prot;
1172	rcu_read_unlock();
1173	return protocol;
1174}
1175EXPORT_SYMBOL_GPL(rpc_protocol);
1176
1177/**
1178 * rpc_net_ns - Get the network namespace for this RPC client
1179 * @clnt: RPC client to query
1180 *
1181 */
1182struct net *rpc_net_ns(struct rpc_clnt *clnt)
1183{
1184	struct net *ret;
1185
1186	rcu_read_lock();
1187	ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1188	rcu_read_unlock();
1189	return ret;
1190}
1191EXPORT_SYMBOL_GPL(rpc_net_ns);
1192
1193/**
1194 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1195 * @clnt: RPC client to query
1196 *
1197 * For stream transports, this is one RPC record fragment (see RFC
1198 * 1831), as we don't support multi-record requests yet.  For datagram
1199 * transports, this is the size of an IP packet minus the IP, UDP, and
1200 * RPC header sizes.
1201 */
1202size_t rpc_max_payload(struct rpc_clnt *clnt)
1203{
1204	size_t ret;
1205
1206	rcu_read_lock();
1207	ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1208	rcu_read_unlock();
1209	return ret;
1210}
1211EXPORT_SYMBOL_GPL(rpc_max_payload);
1212
1213/**
1214 * rpc_get_timeout - Get timeout for transport in units of HZ
1215 * @clnt: RPC client to query
1216 */
1217unsigned long rpc_get_timeout(struct rpc_clnt *clnt)
1218{
1219	unsigned long ret;
1220
1221	rcu_read_lock();
1222	ret = rcu_dereference(clnt->cl_xprt)->timeout->to_initval;
1223	rcu_read_unlock();
1224	return ret;
1225}
1226EXPORT_SYMBOL_GPL(rpc_get_timeout);
1227
1228/**
1229 * rpc_force_rebind - force transport to check that remote port is unchanged
1230 * @clnt: client to rebind
1231 *
1232 */
1233void rpc_force_rebind(struct rpc_clnt *clnt)
1234{
1235	if (clnt->cl_autobind) {
1236		rcu_read_lock();
1237		xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1238		rcu_read_unlock();
1239	}
1240}
1241EXPORT_SYMBOL_GPL(rpc_force_rebind);
1242
1243/*
1244 * Restart an (async) RPC call from the call_prepare state.
1245 * Usually called from within the exit handler.
1246 */
1247int
1248rpc_restart_call_prepare(struct rpc_task *task)
1249{
1250	if (RPC_ASSASSINATED(task))
1251		return 0;
1252	task->tk_action = call_start;
1253	if (task->tk_ops->rpc_call_prepare != NULL)
1254		task->tk_action = rpc_prepare_task;
1255	return 1;
1256}
1257EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1258
1259/*
1260 * Restart an (async) RPC call. Usually called from within the
1261 * exit handler.
1262 */
1263int
1264rpc_restart_call(struct rpc_task *task)
1265{
1266	if (RPC_ASSASSINATED(task))
1267		return 0;
1268	task->tk_action = call_start;
1269	return 1;
1270}
1271EXPORT_SYMBOL_GPL(rpc_restart_call);
1272
1273#ifdef RPC_DEBUG
1274static const char *rpc_proc_name(const struct rpc_task *task)
1275{
1276	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1277
1278	if (proc) {
1279		if (proc->p_name)
1280			return proc->p_name;
1281		else
1282			return "NULL";
1283	} else
1284		return "no proc";
1285}
1286#endif
1287
1288/*
1289 * 0.  Initial state
1290 *
1291 *     Other FSM states can be visited zero or more times, but
1292 *     this state is visited exactly once for each RPC.
1293 */
1294static void
1295call_start(struct rpc_task *task)
1296{
1297	struct rpc_clnt	*clnt = task->tk_client;
1298
1299	dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1300			clnt->cl_program->name, clnt->cl_vers,
1301			rpc_proc_name(task),
1302			(RPC_IS_ASYNC(task) ? "async" : "sync"));
1303
1304	/* Increment call count */
1305	task->tk_msg.rpc_proc->p_count++;
1306	clnt->cl_stats->rpccnt++;
1307	task->tk_action = call_reserve;
1308}
1309
1310/*
1311 * 1.	Reserve an RPC call slot
1312 */
1313static void
1314call_reserve(struct rpc_task *task)
1315{
1316	dprint_status(task);
1317
1318	task->tk_status  = 0;
1319	task->tk_action  = call_reserveresult;
1320	xprt_reserve(task);
1321}
1322
1323static void call_retry_reserve(struct rpc_task *task);
1324
1325/*
1326 * 1b.	Grok the result of xprt_reserve()
1327 */
1328static void
1329call_reserveresult(struct rpc_task *task)
1330{
1331	int status = task->tk_status;
1332
1333	dprint_status(task);
1334
1335	/*
1336	 * After a call to xprt_reserve(), we must have either
1337	 * a request slot or else an error status.
1338	 */
1339	task->tk_status = 0;
1340	if (status >= 0) {
1341		if (task->tk_rqstp) {
1342			task->tk_action = call_refresh;
1343			return;
1344		}
1345
1346		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1347				__func__, status);
1348		rpc_exit(task, -EIO);
1349		return;
1350	}
1351
1352	/*
1353	 * Even though there was an error, we may have acquired
1354	 * a request slot somehow.  Make sure not to leak it.
1355	 */
1356	if (task->tk_rqstp) {
1357		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1358				__func__, status);
1359		xprt_release(task);
1360	}
1361
1362	switch (status) {
1363	case -ENOMEM:
1364		rpc_delay(task, HZ >> 2);
1365	case -EAGAIN:	/* woken up; retry */
1366		task->tk_action = call_retry_reserve;
1367		return;
1368	case -EIO:	/* probably a shutdown */
1369		break;
1370	default:
1371		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1372				__func__, status);
1373		break;
1374	}
1375	rpc_exit(task, status);
1376}
1377
1378/*
1379 * 1c.	Retry reserving an RPC call slot
1380 */
1381static void
1382call_retry_reserve(struct rpc_task *task)
1383{
1384	dprint_status(task);
1385
1386	task->tk_status  = 0;
1387	task->tk_action  = call_reserveresult;
1388	xprt_retry_reserve(task);
1389}
1390
1391/*
1392 * 2.	Bind and/or refresh the credentials
1393 */
1394static void
1395call_refresh(struct rpc_task *task)
1396{
1397	dprint_status(task);
1398
1399	task->tk_action = call_refreshresult;
1400	task->tk_status = 0;
1401	task->tk_client->cl_stats->rpcauthrefresh++;
1402	rpcauth_refreshcred(task);
1403}
1404
1405/*
1406 * 2a.	Process the results of a credential refresh
1407 */
1408static void
1409call_refreshresult(struct rpc_task *task)
1410{
1411	int status = task->tk_status;
1412
1413	dprint_status(task);
1414
1415	task->tk_status = 0;
1416	task->tk_action = call_refresh;
1417	switch (status) {
1418	case 0:
1419		if (rpcauth_uptodatecred(task))
1420			task->tk_action = call_allocate;
1421		return;
1422	case -ETIMEDOUT:
1423		rpc_delay(task, 3*HZ);
1424	case -EKEYEXPIRED:
1425	case -EAGAIN:
1426		status = -EACCES;
1427		if (!task->tk_cred_retry)
1428			break;
1429		task->tk_cred_retry--;
1430		dprintk("RPC: %5u %s: retry refresh creds\n",
1431				task->tk_pid, __func__);
1432		return;
1433	}
1434	dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1435				task->tk_pid, __func__, status);
1436	rpc_exit(task, status);
1437}
1438
1439/*
1440 * 2b.	Allocate the buffer. For details, see sched.c:rpc_malloc.
1441 *	(Note: buffer memory is freed in xprt_release).
1442 */
1443static void
1444call_allocate(struct rpc_task *task)
1445{
1446	unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1447	struct rpc_rqst *req = task->tk_rqstp;
1448	struct rpc_xprt *xprt = req->rq_xprt;
1449	struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1450
1451	dprint_status(task);
1452
1453	task->tk_status = 0;
1454	task->tk_action = call_bind;
1455
1456	if (req->rq_buffer)
1457		return;
1458
1459	if (proc->p_proc != 0) {
1460		BUG_ON(proc->p_arglen == 0);
1461		if (proc->p_decode != NULL)
1462			BUG_ON(proc->p_replen == 0);
1463	}
1464
1465	/*
1466	 * Calculate the size (in quads) of the RPC call
1467	 * and reply headers, and convert both values
1468	 * to byte sizes.
1469	 */
1470	req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1471	req->rq_callsize <<= 2;
1472	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1473	req->rq_rcvsize <<= 2;
1474
1475	req->rq_buffer = xprt->ops->buf_alloc(task,
1476					req->rq_callsize + req->rq_rcvsize);
1477	if (req->rq_buffer != NULL)
1478		return;
1479
1480	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1481
1482	if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1483		task->tk_action = call_allocate;
1484		rpc_delay(task, HZ>>4);
1485		return;
1486	}
1487
1488	rpc_exit(task, -ERESTARTSYS);
1489}
1490
1491static inline int
1492rpc_task_need_encode(struct rpc_task *task)
1493{
1494	return task->tk_rqstp->rq_snd_buf.len == 0;
1495}
1496
1497static inline void
1498rpc_task_force_reencode(struct rpc_task *task)
1499{
1500	task->tk_rqstp->rq_snd_buf.len = 0;
1501	task->tk_rqstp->rq_bytes_sent = 0;
1502}
1503
1504static inline void
1505rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1506{
1507	buf->head[0].iov_base = start;
1508	buf->head[0].iov_len = len;
1509	buf->tail[0].iov_len = 0;
1510	buf->page_len = 0;
1511	buf->flags = 0;
1512	buf->len = 0;
1513	buf->buflen = len;
1514}
1515
1516/*
1517 * 3.	Encode arguments of an RPC call
1518 */
1519static void
1520rpc_xdr_encode(struct rpc_task *task)
1521{
1522	struct rpc_rqst	*req = task->tk_rqstp;
1523	kxdreproc_t	encode;
1524	__be32		*p;
1525
1526	dprint_status(task);
1527
1528	rpc_xdr_buf_init(&req->rq_snd_buf,
1529			 req->rq_buffer,
1530			 req->rq_callsize);
1531	rpc_xdr_buf_init(&req->rq_rcv_buf,
1532			 (char *)req->rq_buffer + req->rq_callsize,
1533			 req->rq_rcvsize);
1534
1535	p = rpc_encode_header(task);
1536	if (p == NULL) {
1537		printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1538		rpc_exit(task, -EIO);
1539		return;
1540	}
1541
1542	encode = task->tk_msg.rpc_proc->p_encode;
1543	if (encode == NULL)
1544		return;
1545
1546	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1547			task->tk_msg.rpc_argp);
1548}
1549
1550/*
1551 * 4.	Get the server port number if not yet set
1552 */
1553static void
1554call_bind(struct rpc_task *task)
1555{
1556	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1557
1558	dprint_status(task);
1559
1560	task->tk_action = call_connect;
1561	if (!xprt_bound(xprt)) {
1562		task->tk_action = call_bind_status;
1563		task->tk_timeout = xprt->bind_timeout;
1564		xprt->ops->rpcbind(task);
1565	}
1566}
1567
1568/*
1569 * 4a.	Sort out bind result
1570 */
1571static void
1572call_bind_status(struct rpc_task *task)
1573{
1574	int status = -EIO;
1575
1576	if (task->tk_status >= 0) {
1577		dprint_status(task);
1578		task->tk_status = 0;
1579		task->tk_action = call_connect;
1580		return;
1581	}
1582
1583	trace_rpc_bind_status(task);
1584	switch (task->tk_status) {
1585	case -ENOMEM:
1586		dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1587		rpc_delay(task, HZ >> 2);
1588		goto retry_timeout;
1589	case -EACCES:
1590		dprintk("RPC: %5u remote rpcbind: RPC program/version "
1591				"unavailable\n", task->tk_pid);
1592		/* fail immediately if this is an RPC ping */
1593		if (task->tk_msg.rpc_proc->p_proc == 0) {
1594			status = -EOPNOTSUPP;
1595			break;
1596		}
1597		if (task->tk_rebind_retry == 0)
1598			break;
1599		task->tk_rebind_retry--;
1600		rpc_delay(task, 3*HZ);
1601		goto retry_timeout;
1602	case -ETIMEDOUT:
1603		dprintk("RPC: %5u rpcbind request timed out\n",
1604				task->tk_pid);
1605		goto retry_timeout;
1606	case -EPFNOSUPPORT:
1607		/* server doesn't support any rpcbind version we know of */
1608		dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1609				task->tk_pid);
1610		break;
1611	case -EPROTONOSUPPORT:
1612		dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1613				task->tk_pid);
1614		task->tk_status = 0;
1615		task->tk_action = call_bind;
1616		return;
1617	case -ECONNREFUSED:		/* connection problems */
1618	case -ECONNRESET:
1619	case -ENOTCONN:
1620	case -EHOSTDOWN:
1621	case -EHOSTUNREACH:
1622	case -ENETUNREACH:
1623	case -EPIPE:
1624		dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1625				task->tk_pid, task->tk_status);
1626		if (!RPC_IS_SOFTCONN(task)) {
1627			rpc_delay(task, 5*HZ);
1628			goto retry_timeout;
1629		}
1630		status = task->tk_status;
1631		break;
1632	default:
1633		dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1634				task->tk_pid, -task->tk_status);
1635	}
1636
1637	rpc_exit(task, status);
1638	return;
1639
1640retry_timeout:
1641	task->tk_action = call_timeout;
1642}
1643
1644/*
1645 * 4b.	Connect to the RPC server
1646 */
1647static void
1648call_connect(struct rpc_task *task)
1649{
1650	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1651
1652	dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1653			task->tk_pid, xprt,
1654			(xprt_connected(xprt) ? "is" : "is not"));
1655
1656	task->tk_action = call_transmit;
1657	if (!xprt_connected(xprt)) {
1658		task->tk_action = call_connect_status;
1659		if (task->tk_status < 0)
1660			return;
1661		xprt_connect(task);
1662	}
1663}
1664
1665/*
1666 * 4c.	Sort out connect result
1667 */
1668static void
1669call_connect_status(struct rpc_task *task)
1670{
1671	struct rpc_clnt *clnt = task->tk_client;
1672	int status = task->tk_status;
1673
1674	dprint_status(task);
1675
1676	trace_rpc_connect_status(task, status);
1677	switch (status) {
1678		/* if soft mounted, test if we've timed out */
1679	case -ETIMEDOUT:
1680		task->tk_action = call_timeout;
1681		return;
1682	case -ECONNREFUSED:
1683	case -ECONNRESET:
1684	case -ENETUNREACH:
1685		if (RPC_IS_SOFTCONN(task))
1686			break;
1687		/* retry with existing socket, after a delay */
1688	case 0:
1689	case -EAGAIN:
1690		task->tk_status = 0;
1691		clnt->cl_stats->netreconn++;
1692		task->tk_action = call_transmit;
1693		return;
1694	}
1695	rpc_exit(task, status);
1696}
1697
1698/*
1699 * 5.	Transmit the RPC request, and wait for reply
1700 */
1701static void
1702call_transmit(struct rpc_task *task)
1703{
1704	dprint_status(task);
1705
1706	task->tk_action = call_status;
1707	if (task->tk_status < 0)
1708		return;
1709	task->tk_status = xprt_prepare_transmit(task);
1710	if (task->tk_status != 0)
1711		return;
1712	task->tk_action = call_transmit_status;
1713	/* Encode here so that rpcsec_gss can use correct sequence number. */
1714	if (rpc_task_need_encode(task)) {
1715		rpc_xdr_encode(task);
1716		/* Did the encode result in an error condition? */
1717		if (task->tk_status != 0) {
1718			/* Was the error nonfatal? */
1719			if (task->tk_status == -EAGAIN)
1720				rpc_delay(task, HZ >> 4);
1721			else
1722				rpc_exit(task, task->tk_status);
1723			return;
1724		}
1725	}
1726	xprt_transmit(task);
1727	if (task->tk_status < 0)
1728		return;
1729	/*
1730	 * On success, ensure that we call xprt_end_transmit() before sleeping
1731	 * in order to allow access to the socket to other RPC requests.
1732	 */
1733	call_transmit_status(task);
1734	if (rpc_reply_expected(task))
1735		return;
1736	task->tk_action = rpc_exit_task;
1737	rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1738}
1739
1740/*
1741 * 5a.	Handle cleanup after a transmission
1742 */
1743static void
1744call_transmit_status(struct rpc_task *task)
1745{
1746	task->tk_action = call_status;
1747
1748	/*
1749	 * Common case: success.  Force the compiler to put this
1750	 * test first.
1751	 */
1752	if (task->tk_status == 0) {
1753		xprt_end_transmit(task);
1754		rpc_task_force_reencode(task);
1755		return;
1756	}
1757
1758	switch (task->tk_status) {
1759	case -EAGAIN:
1760		break;
1761	default:
1762		dprint_status(task);
1763		xprt_end_transmit(task);
1764		rpc_task_force_reencode(task);
1765		break;
1766		/*
1767		 * Special cases: if we've been waiting on the
1768		 * socket's write_space() callback, or if the
1769		 * socket just returned a connection error,
1770		 * then hold onto the transport lock.
1771		 */
1772	case -ECONNREFUSED:
1773	case -EHOSTDOWN:
1774	case -EHOSTUNREACH:
1775	case -ENETUNREACH:
1776		if (RPC_IS_SOFTCONN(task)) {
1777			xprt_end_transmit(task);
1778			rpc_exit(task, task->tk_status);
1779			break;
1780		}
1781	case -ECONNRESET:
1782	case -ENOTCONN:
1783	case -EPIPE:
1784		rpc_task_force_reencode(task);
1785	}
1786}
1787
1788#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1789/*
1790 * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
1791 * addition, disconnect on connectivity errors.
1792 */
1793static void
1794call_bc_transmit(struct rpc_task *task)
1795{
1796	struct rpc_rqst *req = task->tk_rqstp;
1797
1798	task->tk_status = xprt_prepare_transmit(task);
1799	if (task->tk_status == -EAGAIN) {
1800		/*
1801		 * Could not reserve the transport. Try again after the
1802		 * transport is released.
1803		 */
1804		task->tk_status = 0;
1805		task->tk_action = call_bc_transmit;
1806		return;
1807	}
1808
1809	task->tk_action = rpc_exit_task;
1810	if (task->tk_status < 0) {
1811		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1812			"error: %d\n", task->tk_status);
1813		return;
1814	}
1815
1816	xprt_transmit(task);
1817	xprt_end_transmit(task);
1818	dprint_status(task);
1819	switch (task->tk_status) {
1820	case 0:
1821		/* Success */
1822		break;
1823	case -EHOSTDOWN:
1824	case -EHOSTUNREACH:
1825	case -ENETUNREACH:
1826	case -ETIMEDOUT:
1827		/*
1828		 * Problem reaching the server.  Disconnect and let the
1829		 * forechannel reestablish the connection.  The server will
1830		 * have to retransmit the backchannel request and we'll
1831		 * reprocess it.  Since these ops are idempotent, there's no
1832		 * need to cache our reply at this time.
1833		 */
1834		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1835			"error: %d\n", task->tk_status);
1836		xprt_conditional_disconnect(req->rq_xprt,
1837			req->rq_connect_cookie);
1838		break;
1839	default:
1840		/*
1841		 * We were unable to reply and will have to drop the
1842		 * request.  The server should reconnect and retransmit.
1843		 */
1844		WARN_ON_ONCE(task->tk_status == -EAGAIN);
1845		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1846			"error: %d\n", task->tk_status);
1847		break;
1848	}
1849	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1850}
1851#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1852
1853/*
1854 * 6.	Sort out the RPC call status
1855 */
1856static void
1857call_status(struct rpc_task *task)
1858{
1859	struct rpc_clnt	*clnt = task->tk_client;
1860	struct rpc_rqst	*req = task->tk_rqstp;
1861	int		status;
1862
1863	if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1864		task->tk_status = req->rq_reply_bytes_recvd;
1865
1866	dprint_status(task);
1867
1868	status = task->tk_status;
1869	if (status >= 0) {
1870		task->tk_action = call_decode;
1871		return;
1872	}
1873
1874	trace_rpc_call_status(task);
1875	task->tk_status = 0;
1876	switch(status) {
1877	case -EHOSTDOWN:
1878	case -EHOSTUNREACH:
1879	case -ENETUNREACH:
1880		/*
1881		 * Delay any retries for 3 seconds, then handle as if it
1882		 * were a timeout.
1883		 */
1884		rpc_delay(task, 3*HZ);
1885	case -ETIMEDOUT:
1886		task->tk_action = call_timeout;
1887		if (task->tk_client->cl_discrtry)
1888			xprt_conditional_disconnect(req->rq_xprt,
1889					req->rq_connect_cookie);
1890		break;
1891	case -ECONNRESET:
1892	case -ECONNREFUSED:
1893		rpc_force_rebind(clnt);
1894		rpc_delay(task, 3*HZ);
1895	case -EPIPE:
1896	case -ENOTCONN:
1897		task->tk_action = call_bind;
1898		break;
1899	case -EAGAIN:
1900		task->tk_action = call_transmit;
1901		break;
1902	case -EIO:
1903		/* shutdown or soft timeout */
1904		rpc_exit(task, status);
1905		break;
1906	default:
1907		if (clnt->cl_chatty)
1908			printk("%s: RPC call returned error %d\n",
1909			       clnt->cl_program->name, -status);
1910		rpc_exit(task, status);
1911	}
1912}
1913
1914/*
1915 * 6a.	Handle RPC timeout
1916 * 	We do not release the request slot, so we keep using the
1917 *	same XID for all retransmits.
1918 */
1919static void
1920call_timeout(struct rpc_task *task)
1921{
1922	struct rpc_clnt	*clnt = task->tk_client;
1923
1924	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1925		dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1926		goto retry;
1927	}
1928
1929	dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1930	task->tk_timeouts++;
1931
1932	if (RPC_IS_SOFTCONN(task)) {
1933		rpc_exit(task, -ETIMEDOUT);
1934		return;
1935	}
1936	if (RPC_IS_SOFT(task)) {
1937		if (clnt->cl_chatty) {
1938			rcu_read_lock();
1939			printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1940				clnt->cl_program->name,
1941				rcu_dereference(clnt->cl_xprt)->servername);
1942			rcu_read_unlock();
1943		}
1944		if (task->tk_flags & RPC_TASK_TIMEOUT)
1945			rpc_exit(task, -ETIMEDOUT);
1946		else
1947			rpc_exit(task, -EIO);
1948		return;
1949	}
1950
1951	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1952		task->tk_flags |= RPC_CALL_MAJORSEEN;
1953		if (clnt->cl_chatty) {
1954			rcu_read_lock();
1955			printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1956			clnt->cl_program->name,
1957			rcu_dereference(clnt->cl_xprt)->servername);
1958			rcu_read_unlock();
1959		}
1960	}
1961	rpc_force_rebind(clnt);
1962	/*
1963	 * Did our request time out due to an RPCSEC_GSS out-of-sequence
1964	 * event? RFC2203 requires the server to drop all such requests.
1965	 */
1966	rpcauth_invalcred(task);
1967
1968retry:
1969	clnt->cl_stats->rpcretrans++;
1970	task->tk_action = call_bind;
1971	task->tk_status = 0;
1972}
1973
1974/*
1975 * 7.	Decode the RPC reply
1976 */
1977static void
1978call_decode(struct rpc_task *task)
1979{
1980	struct rpc_clnt	*clnt = task->tk_client;
1981	struct rpc_rqst	*req = task->tk_rqstp;
1982	kxdrdproc_t	decode = task->tk_msg.rpc_proc->p_decode;
1983	__be32		*p;
1984
1985	dprint_status(task);
1986
1987	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1988		if (clnt->cl_chatty) {
1989			rcu_read_lock();
1990			printk(KERN_NOTICE "%s: server %s OK\n",
1991				clnt->cl_program->name,
1992				rcu_dereference(clnt->cl_xprt)->servername);
1993			rcu_read_unlock();
1994		}
1995		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1996	}
1997
1998	/*
1999	 * Ensure that we see all writes made by xprt_complete_rqst()
2000	 * before it changed req->rq_reply_bytes_recvd.
2001	 */
2002	smp_rmb();
2003	req->rq_rcv_buf.len = req->rq_private_buf.len;
2004
2005	/* Check that the softirq receive buffer is valid */
2006	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2007				sizeof(req->rq_rcv_buf)) != 0);
2008
2009	if (req->rq_rcv_buf.len < 12) {
2010		if (!RPC_IS_SOFT(task)) {
2011			task->tk_action = call_bind;
2012			clnt->cl_stats->rpcretrans++;
2013			goto out_retry;
2014		}
2015		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2016				clnt->cl_program->name, task->tk_status);
2017		task->tk_action = call_timeout;
2018		goto out_retry;
2019	}
2020
2021	p = rpc_verify_header(task);
2022	if (IS_ERR(p)) {
2023		if (p == ERR_PTR(-EAGAIN))
2024			goto out_retry;
2025		return;
2026	}
2027
2028	task->tk_action = rpc_exit_task;
2029
2030	if (decode) {
2031		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2032						      task->tk_msg.rpc_resp);
2033	}
2034	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2035			task->tk_status);
2036	return;
2037out_retry:
2038	task->tk_status = 0;
2039	/* Note: rpc_verify_header() may have freed the RPC slot */
2040	if (task->tk_rqstp == req) {
2041		req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2042		if (task->tk_client->cl_discrtry)
2043			xprt_conditional_disconnect(req->rq_xprt,
2044					req->rq_connect_cookie);
2045	}
2046}
2047
2048static __be32 *
2049rpc_encode_header(struct rpc_task *task)
2050{
2051	struct rpc_clnt *clnt = task->tk_client;
2052	struct rpc_rqst	*req = task->tk_rqstp;
2053	__be32		*p = req->rq_svec[0].iov_base;
2054
2055	/* FIXME: check buffer size? */
2056
2057	p = xprt_skip_transport_header(req->rq_xprt, p);
2058	*p++ = req->rq_xid;		/* XID */
2059	*p++ = htonl(RPC_CALL);		/* CALL */
2060	*p++ = htonl(RPC_VERSION);	/* RPC version */
2061	*p++ = htonl(clnt->cl_prog);	/* program number */
2062	*p++ = htonl(clnt->cl_vers);	/* program version */
2063	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
2064	p = rpcauth_marshcred(task, p);
2065	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2066	return p;
2067}
2068
2069static __be32 *
2070rpc_verify_header(struct rpc_task *task)
2071{
2072	struct rpc_clnt *clnt = task->tk_client;
2073	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2074	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2075	__be32	*p = iov->iov_base;
2076	u32 n;
2077	int error = -EACCES;
2078
2079	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2080		/* RFC-1014 says that the representation of XDR data must be a
2081		 * multiple of four bytes
2082		 * - if it isn't pointer subtraction in the NFS client may give
2083		 *   undefined results
2084		 */
2085		dprintk("RPC: %5u %s: XDR representation not a multiple of"
2086		       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2087		       task->tk_rqstp->rq_rcv_buf.len);
2088		goto out_eio;
2089	}
2090	if ((len -= 3) < 0)
2091		goto out_overflow;
2092
2093	p += 1; /* skip XID */
2094	if ((n = ntohl(*p++)) != RPC_REPLY) {
2095		dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2096			task->tk_pid, __func__, n);
2097		goto out_garbage;
2098	}
2099
2100	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2101		if (--len < 0)
2102			goto out_overflow;
2103		switch ((n = ntohl(*p++))) {
2104		case RPC_AUTH_ERROR:
2105			break;
2106		case RPC_MISMATCH:
2107			dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2108				task->tk_pid, __func__);
2109			error = -EPROTONOSUPPORT;
2110			goto out_err;
2111		default:
2112			dprintk("RPC: %5u %s: RPC call rejected, "
2113				"unknown error: %x\n",
2114				task->tk_pid, __func__, n);
2115			goto out_eio;
2116		}
2117		if (--len < 0)
2118			goto out_overflow;
2119		switch ((n = ntohl(*p++))) {
2120		case RPC_AUTH_REJECTEDCRED:
2121		case RPC_AUTH_REJECTEDVERF:
2122		case RPCSEC_GSS_CREDPROBLEM:
2123		case RPCSEC_GSS_CTXPROBLEM:
2124			if (!task->tk_cred_retry)
2125				break;
2126			task->tk_cred_retry--;
2127			dprintk("RPC: %5u %s: retry stale creds\n",
2128					task->tk_pid, __func__);
2129			rpcauth_invalcred(task);
2130			/* Ensure we obtain a new XID! */
2131			xprt_release(task);
2132			task->tk_action = call_reserve;
2133			goto out_retry;
2134		case RPC_AUTH_BADCRED:
2135		case RPC_AUTH_BADVERF:
2136			/* possibly garbled cred/verf? */
2137			if (!task->tk_garb_retry)
2138				break;
2139			task->tk_garb_retry--;
2140			dprintk("RPC: %5u %s: retry garbled creds\n",
2141					task->tk_pid, __func__);
2142			task->tk_action = call_bind;
2143			goto out_retry;
2144		case RPC_AUTH_TOOWEAK:
2145			rcu_read_lock();
2146			printk(KERN_NOTICE "RPC: server %s requires stronger "
2147			       "authentication.\n",
2148			       rcu_dereference(clnt->cl_xprt)->servername);
2149			rcu_read_unlock();
2150			break;
2151		default:
2152			dprintk("RPC: %5u %s: unknown auth error: %x\n",
2153					task->tk_pid, __func__, n);
2154			error = -EIO;
2155		}
2156		dprintk("RPC: %5u %s: call rejected %d\n",
2157				task->tk_pid, __func__, n);
2158		goto out_err;
2159	}
2160	if (!(p = rpcauth_checkverf(task, p))) {
2161		dprintk("RPC: %5u %s: auth check failed\n",
2162				task->tk_pid, __func__);
2163		goto out_garbage;		/* bad verifier, retry */
2164	}
2165	len = p - (__be32 *)iov->iov_base - 1;
2166	if (len < 0)
2167		goto out_overflow;
2168	switch ((n = ntohl(*p++))) {
2169	case RPC_SUCCESS:
2170		return p;
2171	case RPC_PROG_UNAVAIL:
2172		dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2173				"by server %s\n", task->tk_pid, __func__,
2174				(unsigned int)clnt->cl_prog,
2175				rcu_dereference(clnt->cl_xprt)->servername);
2176		error = -EPFNOSUPPORT;
2177		goto out_err;
2178	case RPC_PROG_MISMATCH:
2179		dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2180				"by server %s\n", task->tk_pid, __func__,
2181				(unsigned int)clnt->cl_prog,
2182				(unsigned int)clnt->cl_vers,
2183				rcu_dereference(clnt->cl_xprt)->servername);
2184		error = -EPROTONOSUPPORT;
2185		goto out_err;
2186	case RPC_PROC_UNAVAIL:
2187		dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2188				"version %u on server %s\n",
2189				task->tk_pid, __func__,
2190				rpc_proc_name(task),
2191				clnt->cl_prog, clnt->cl_vers,
2192				rcu_dereference(clnt->cl_xprt)->servername);
2193		error = -EOPNOTSUPP;
2194		goto out_err;
2195	case RPC_GARBAGE_ARGS:
2196		dprintk("RPC: %5u %s: server saw garbage\n",
2197				task->tk_pid, __func__);
2198		break;			/* retry */
2199	default:
2200		dprintk("RPC: %5u %s: server accept status: %x\n",
2201				task->tk_pid, __func__, n);
2202		/* Also retry */
2203	}
2204
2205out_garbage:
2206	clnt->cl_stats->rpcgarbage++;
2207	if (task->tk_garb_retry) {
2208		task->tk_garb_retry--;
2209		dprintk("RPC: %5u %s: retrying\n",
2210				task->tk_pid, __func__);
2211		task->tk_action = call_bind;
2212out_retry:
2213		return ERR_PTR(-EAGAIN);
2214	}
2215out_eio:
2216	error = -EIO;
2217out_err:
2218	rpc_exit(task, error);
2219	dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2220			__func__, error);
2221	return ERR_PTR(error);
2222out_overflow:
2223	dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2224			__func__);
2225	goto out_garbage;
2226}
2227
2228static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2229{
2230}
2231
2232static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2233{
2234	return 0;
2235}
2236
2237static struct rpc_procinfo rpcproc_null = {
2238	.p_encode = rpcproc_encode_null,
2239	.p_decode = rpcproc_decode_null,
2240};
2241
2242static int rpc_ping(struct rpc_clnt *clnt)
2243{
2244	struct rpc_message msg = {
2245		.rpc_proc = &rpcproc_null,
2246	};
2247	int err;
2248	msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2249	err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2250	put_rpccred(msg.rpc_cred);
2251	return err;
2252}
2253
2254struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2255{
2256	struct rpc_message msg = {
2257		.rpc_proc = &rpcproc_null,
2258		.rpc_cred = cred,
2259	};
2260	struct rpc_task_setup task_setup_data = {
2261		.rpc_client = clnt,
2262		.rpc_message = &msg,
2263		.callback_ops = &rpc_default_ops,
2264		.flags = flags,
2265	};
2266	return rpc_run_task(&task_setup_data);
2267}
2268EXPORT_SYMBOL_GPL(rpc_call_null);
2269
2270#ifdef RPC_DEBUG
2271static void rpc_show_header(void)
2272{
2273	printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2274		"-timeout ---ops--\n");
2275}
2276
2277static void rpc_show_task(const struct rpc_clnt *clnt,
2278			  const struct rpc_task *task)
2279{
2280	const char *rpc_waitq = "none";
2281
2282	if (RPC_IS_QUEUED(task))
2283		rpc_waitq = rpc_qname(task->tk_waitqueue);
2284
2285	printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2286		task->tk_pid, task->tk_flags, task->tk_status,
2287		clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2288		clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2289		task->tk_action, rpc_waitq);
2290}
2291
2292void rpc_show_tasks(struct net *net)
2293{
2294	struct rpc_clnt *clnt;
2295	struct rpc_task *task;
2296	int header = 0;
2297	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2298
2299	spin_lock(&sn->rpc_client_lock);
2300	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2301		spin_lock(&clnt->cl_lock);
2302		list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2303			if (!header) {
2304				rpc_show_header();
2305				header++;
2306			}
2307			rpc_show_task(clnt, task);
2308		}
2309		spin_unlock(&clnt->cl_lock);
2310	}
2311	spin_unlock(&sn->rpc_client_lock);
2312}
2313#endif
2314