clnt.c revision 2e738fdce22f9a7edf20281fd2d768ef9785922e
1/*
2 *  linux/net/sunrpc/clnt.c
3 *
4 *  This file contains the high-level RPC interface.
5 *  It is modeled as a finite state machine to support both synchronous
6 *  and asynchronous requests.
7 *
8 *  -	RPC header generation and argument serialization.
9 *  -	Credential refresh.
10 *  -	TCP connect handling.
11 *  -	Retry of operation when it is suspected the operation failed because
12 *	of uid squashing on the server, or when the credentials were stale
13 *	and need to be refreshed, or when a packet was damaged in transit.
14 *	This may be have to be moved to the VFS layer.
15 *
16 *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17 *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18 */
19
20#include <asm/system.h>
21
22#include <linux/module.h>
23#include <linux/types.h>
24#include <linux/kallsyms.h>
25#include <linux/mm.h>
26#include <linux/namei.h>
27#include <linux/mount.h>
28#include <linux/slab.h>
29#include <linux/utsname.h>
30#include <linux/workqueue.h>
31#include <linux/in.h>
32#include <linux/in6.h>
33#include <linux/un.h>
34#include <linux/rcupdate.h>
35
36#include <linux/sunrpc/clnt.h>
37#include <linux/sunrpc/rpc_pipe_fs.h>
38#include <linux/sunrpc/metrics.h>
39#include <linux/sunrpc/bc_xprt.h>
40#include <trace/events/sunrpc.h>
41
42#include "sunrpc.h"
43#include "netns.h"
44
45#ifdef RPC_DEBUG
46# define RPCDBG_FACILITY	RPCDBG_CALL
47#endif
48
49#define dprint_status(t)					\
50	dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,		\
51			__func__, t->tk_status)
52
53/*
54 * All RPC clients are linked into this list
55 */
56
57static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58
59
60static void	call_start(struct rpc_task *task);
61static void	call_reserve(struct rpc_task *task);
62static void	call_reserveresult(struct rpc_task *task);
63static void	call_allocate(struct rpc_task *task);
64static void	call_decode(struct rpc_task *task);
65static void	call_bind(struct rpc_task *task);
66static void	call_bind_status(struct rpc_task *task);
67static void	call_transmit(struct rpc_task *task);
68#if defined(CONFIG_SUNRPC_BACKCHANNEL)
69static void	call_bc_transmit(struct rpc_task *task);
70#endif /* CONFIG_SUNRPC_BACKCHANNEL */
71static void	call_status(struct rpc_task *task);
72static void	call_transmit_status(struct rpc_task *task);
73static void	call_refresh(struct rpc_task *task);
74static void	call_refreshresult(struct rpc_task *task);
75static void	call_timeout(struct rpc_task *task);
76static void	call_connect(struct rpc_task *task);
77static void	call_connect_status(struct rpc_task *task);
78
79static __be32	*rpc_encode_header(struct rpc_task *task);
80static __be32	*rpc_verify_header(struct rpc_task *task);
81static int	rpc_ping(struct rpc_clnt *clnt);
82
83static void rpc_register_client(struct rpc_clnt *clnt)
84{
85	struct net *net = rpc_net_ns(clnt);
86	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87
88	spin_lock(&sn->rpc_client_lock);
89	list_add(&clnt->cl_clients, &sn->all_clients);
90	spin_unlock(&sn->rpc_client_lock);
91}
92
93static void rpc_unregister_client(struct rpc_clnt *clnt)
94{
95	struct net *net = rpc_net_ns(clnt);
96	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97
98	spin_lock(&sn->rpc_client_lock);
99	list_del(&clnt->cl_clients);
100	spin_unlock(&sn->rpc_client_lock);
101}
102
103static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104{
105	if (clnt->cl_dentry) {
106		if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
107			clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
108		rpc_remove_client_dir(clnt->cl_dentry);
109	}
110	clnt->cl_dentry = NULL;
111}
112
113static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
114{
115	struct net *net = rpc_net_ns(clnt);
116	struct super_block *pipefs_sb;
117
118	pipefs_sb = rpc_get_sb_net(net);
119	if (pipefs_sb) {
120		__rpc_clnt_remove_pipedir(clnt);
121		rpc_put_sb_net(net);
122	}
123}
124
125static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
126				    struct rpc_clnt *clnt,
127				    const char *dir_name)
128{
129	static uint32_t clntid;
130	char name[15];
131	struct qstr q = {
132		.name = name,
133	};
134	struct dentry *dir, *dentry;
135	int error;
136
137	dir = rpc_d_lookup_sb(sb, dir_name);
138	if (dir == NULL)
139		return dir;
140	for (;;) {
141		q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
142		name[sizeof(name) - 1] = '\0';
143		q.hash = full_name_hash(q.name, q.len);
144		dentry = rpc_create_client_dir(dir, &q, clnt);
145		if (!IS_ERR(dentry))
146			break;
147		error = PTR_ERR(dentry);
148		if (error != -EEXIST) {
149			printk(KERN_INFO "RPC: Couldn't create pipefs entry"
150					" %s/%s, error %d\n",
151					dir_name, name, error);
152			break;
153		}
154	}
155	dput(dir);
156	return dentry;
157}
158
159static int
160rpc_setup_pipedir(struct rpc_clnt *clnt, const char *dir_name)
161{
162	struct net *net = rpc_net_ns(clnt);
163	struct super_block *pipefs_sb;
164	struct dentry *dentry;
165
166	clnt->cl_dentry = NULL;
167	if (dir_name == NULL)
168		return 0;
169	pipefs_sb = rpc_get_sb_net(net);
170	if (!pipefs_sb)
171		return 0;
172	dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt, dir_name);
173	rpc_put_sb_net(net);
174	if (IS_ERR(dentry))
175		return PTR_ERR(dentry);
176	clnt->cl_dentry = dentry;
177	return 0;
178}
179
180static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
181				struct super_block *sb)
182{
183	struct dentry *dentry;
184	int err = 0;
185
186	switch (event) {
187	case RPC_PIPEFS_MOUNT:
188		if (clnt->cl_program->pipe_dir_name == NULL)
189			break;
190		dentry = rpc_setup_pipedir_sb(sb, clnt,
191					      clnt->cl_program->pipe_dir_name);
192		BUG_ON(dentry == NULL);
193		if (IS_ERR(dentry))
194			return PTR_ERR(dentry);
195		clnt->cl_dentry = dentry;
196		if (clnt->cl_auth->au_ops->pipes_create) {
197			err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
198			if (err)
199				__rpc_clnt_remove_pipedir(clnt);
200		}
201		break;
202	case RPC_PIPEFS_UMOUNT:
203		__rpc_clnt_remove_pipedir(clnt);
204		break;
205	default:
206		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
207		return -ENOTSUPP;
208	}
209	return err;
210}
211
212static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
213{
214	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
215	struct rpc_clnt *clnt;
216
217	spin_lock(&sn->rpc_client_lock);
218	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
219		if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
220		    ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
221			continue;
222		atomic_inc(&clnt->cl_count);
223		spin_unlock(&sn->rpc_client_lock);
224		return clnt;
225	}
226	spin_unlock(&sn->rpc_client_lock);
227	return NULL;
228}
229
230static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
231			    void *ptr)
232{
233	struct super_block *sb = ptr;
234	struct rpc_clnt *clnt;
235	int error = 0;
236
237	while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
238		error = __rpc_pipefs_event(clnt, event, sb);
239		rpc_release_client(clnt);
240		if (error)
241			break;
242	}
243	return error;
244}
245
246static struct notifier_block rpc_clients_block = {
247	.notifier_call	= rpc_pipefs_event,
248	.priority	= SUNRPC_PIPEFS_RPC_PRIO,
249};
250
251int rpc_clients_notifier_register(void)
252{
253	return rpc_pipefs_notifier_register(&rpc_clients_block);
254}
255
256void rpc_clients_notifier_unregister(void)
257{
258	return rpc_pipefs_notifier_unregister(&rpc_clients_block);
259}
260
261static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
262{
263	const struct rpc_program *program = args->program;
264	const struct rpc_version *version;
265	struct rpc_clnt		*clnt = NULL;
266	struct rpc_auth		*auth;
267	int err;
268
269	/* sanity check the name before trying to print it */
270	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
271			program->name, args->servername, xprt);
272
273	err = rpciod_up();
274	if (err)
275		goto out_no_rpciod;
276	err = -EINVAL;
277	if (!xprt)
278		goto out_no_xprt;
279
280	if (args->version >= program->nrvers)
281		goto out_err;
282	version = program->version[args->version];
283	if (version == NULL)
284		goto out_err;
285
286	err = -ENOMEM;
287	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
288	if (!clnt)
289		goto out_err;
290	clnt->cl_parent = clnt;
291
292	rcu_assign_pointer(clnt->cl_xprt, xprt);
293	clnt->cl_procinfo = version->procs;
294	clnt->cl_maxproc  = version->nrprocs;
295	clnt->cl_protname = program->name;
296	clnt->cl_prog     = args->prognumber ? : program->number;
297	clnt->cl_vers     = version->number;
298	clnt->cl_stats    = program->stats;
299	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
300	err = -ENOMEM;
301	if (clnt->cl_metrics == NULL)
302		goto out_no_stats;
303	clnt->cl_program  = program;
304	INIT_LIST_HEAD(&clnt->cl_tasks);
305	spin_lock_init(&clnt->cl_lock);
306
307	if (!xprt_bound(xprt))
308		clnt->cl_autobind = 1;
309
310	clnt->cl_timeout = xprt->timeout;
311	if (args->timeout != NULL) {
312		memcpy(&clnt->cl_timeout_default, args->timeout,
313				sizeof(clnt->cl_timeout_default));
314		clnt->cl_timeout = &clnt->cl_timeout_default;
315	}
316
317	clnt->cl_rtt = &clnt->cl_rtt_default;
318	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
319	clnt->cl_principal = NULL;
320	if (args->client_name) {
321		clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
322		if (!clnt->cl_principal)
323			goto out_no_principal;
324	}
325
326	atomic_set(&clnt->cl_count, 1);
327
328	err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
329	if (err < 0)
330		goto out_no_path;
331
332	auth = rpcauth_create(args->authflavor, clnt);
333	if (IS_ERR(auth)) {
334		printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
335				args->authflavor);
336		err = PTR_ERR(auth);
337		goto out_no_auth;
338	}
339
340	/* save the nodename */
341	clnt->cl_nodelen = strlen(init_utsname()->nodename);
342	if (clnt->cl_nodelen > UNX_MAXNODENAME)
343		clnt->cl_nodelen = UNX_MAXNODENAME;
344	memcpy(clnt->cl_nodename, init_utsname()->nodename, clnt->cl_nodelen);
345	rpc_register_client(clnt);
346	return clnt;
347
348out_no_auth:
349	rpc_clnt_remove_pipedir(clnt);
350out_no_path:
351	kfree(clnt->cl_principal);
352out_no_principal:
353	rpc_free_iostats(clnt->cl_metrics);
354out_no_stats:
355	kfree(clnt);
356out_err:
357	xprt_put(xprt);
358out_no_xprt:
359	rpciod_down();
360out_no_rpciod:
361	return ERR_PTR(err);
362}
363
364/*
365 * rpc_create - create an RPC client and transport with one call
366 * @args: rpc_clnt create argument structure
367 *
368 * Creates and initializes an RPC transport and an RPC client.
369 *
370 * It can ping the server in order to determine if it is up, and to see if
371 * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
372 * this behavior so asynchronous tasks can also use rpc_create.
373 */
374struct rpc_clnt *rpc_create(struct rpc_create_args *args)
375{
376	struct rpc_xprt *xprt;
377	struct rpc_clnt *clnt;
378	struct xprt_create xprtargs = {
379		.net = args->net,
380		.ident = args->protocol,
381		.srcaddr = args->saddress,
382		.dstaddr = args->address,
383		.addrlen = args->addrsize,
384		.servername = args->servername,
385		.bc_xprt = args->bc_xprt,
386	};
387	char servername[48];
388
389	/*
390	 * If the caller chooses not to specify a hostname, whip
391	 * up a string representation of the passed-in address.
392	 */
393	if (xprtargs.servername == NULL) {
394		struct sockaddr_un *sun =
395				(struct sockaddr_un *)args->address;
396		struct sockaddr_in *sin =
397				(struct sockaddr_in *)args->address;
398		struct sockaddr_in6 *sin6 =
399				(struct sockaddr_in6 *)args->address;
400
401		servername[0] = '\0';
402		switch (args->address->sa_family) {
403		case AF_LOCAL:
404			snprintf(servername, sizeof(servername), "%s",
405				 sun->sun_path);
406			break;
407		case AF_INET:
408			snprintf(servername, sizeof(servername), "%pI4",
409				 &sin->sin_addr.s_addr);
410			break;
411		case AF_INET6:
412			snprintf(servername, sizeof(servername), "%pI6",
413				 &sin6->sin6_addr);
414			break;
415		default:
416			/* caller wants default server name, but
417			 * address family isn't recognized. */
418			return ERR_PTR(-EINVAL);
419		}
420		xprtargs.servername = servername;
421	}
422
423	xprt = xprt_create_transport(&xprtargs);
424	if (IS_ERR(xprt))
425		return (struct rpc_clnt *)xprt;
426
427	/*
428	 * By default, kernel RPC client connects from a reserved port.
429	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
430	 * but it is always enabled for rpciod, which handles the connect
431	 * operation.
432	 */
433	xprt->resvport = 1;
434	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
435		xprt->resvport = 0;
436
437	clnt = rpc_new_client(args, xprt);
438	if (IS_ERR(clnt))
439		return clnt;
440
441	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
442		int err = rpc_ping(clnt);
443		if (err != 0) {
444			rpc_shutdown_client(clnt);
445			return ERR_PTR(err);
446		}
447	}
448
449	clnt->cl_softrtry = 1;
450	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
451		clnt->cl_softrtry = 0;
452
453	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
454		clnt->cl_autobind = 1;
455	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
456		clnt->cl_discrtry = 1;
457	if (!(args->flags & RPC_CLNT_CREATE_QUIET))
458		clnt->cl_chatty = 1;
459
460	return clnt;
461}
462EXPORT_SYMBOL_GPL(rpc_create);
463
464/*
465 * This function clones the RPC client structure. It allows us to share the
466 * same transport while varying parameters such as the authentication
467 * flavour.
468 */
469struct rpc_clnt *
470rpc_clone_client(struct rpc_clnt *clnt)
471{
472	struct rpc_clnt *new;
473	struct rpc_xprt *xprt;
474	int err = -ENOMEM;
475
476	new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
477	if (!new)
478		goto out_no_clnt;
479	new->cl_parent = clnt;
480	/* Turn off autobind on clones */
481	new->cl_autobind = 0;
482	INIT_LIST_HEAD(&new->cl_tasks);
483	spin_lock_init(&new->cl_lock);
484	rpc_init_rtt(&new->cl_rtt_default, clnt->cl_timeout->to_initval);
485	new->cl_metrics = rpc_alloc_iostats(clnt);
486	if (new->cl_metrics == NULL)
487		goto out_no_stats;
488	if (clnt->cl_principal) {
489		new->cl_principal = kstrdup(clnt->cl_principal, GFP_KERNEL);
490		if (new->cl_principal == NULL)
491			goto out_no_principal;
492	}
493	rcu_read_lock();
494	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
495	rcu_read_unlock();
496	if (xprt == NULL)
497		goto out_no_transport;
498	rcu_assign_pointer(new->cl_xprt, xprt);
499	atomic_set(&new->cl_count, 1);
500	err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
501	if (err != 0)
502		goto out_no_path;
503	if (new->cl_auth)
504		atomic_inc(&new->cl_auth->au_count);
505	atomic_inc(&clnt->cl_count);
506	rpc_register_client(new);
507	rpciod_up();
508	return new;
509out_no_path:
510	xprt_put(xprt);
511out_no_transport:
512	kfree(new->cl_principal);
513out_no_principal:
514	rpc_free_iostats(new->cl_metrics);
515out_no_stats:
516	kfree(new);
517out_no_clnt:
518	dprintk("RPC:       %s: returned error %d\n", __func__, err);
519	return ERR_PTR(err);
520}
521EXPORT_SYMBOL_GPL(rpc_clone_client);
522
523/*
524 * Kill all tasks for the given client.
525 * XXX: kill their descendants as well?
526 */
527void rpc_killall_tasks(struct rpc_clnt *clnt)
528{
529	struct rpc_task	*rovr;
530
531
532	if (list_empty(&clnt->cl_tasks))
533		return;
534	dprintk("RPC:       killing all tasks for client %p\n", clnt);
535	/*
536	 * Spin lock all_tasks to prevent changes...
537	 */
538	spin_lock(&clnt->cl_lock);
539	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
540		if (!RPC_IS_ACTIVATED(rovr))
541			continue;
542		if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
543			rovr->tk_flags |= RPC_TASK_KILLED;
544			rpc_exit(rovr, -EIO);
545			if (RPC_IS_QUEUED(rovr))
546				rpc_wake_up_queued_task(rovr->tk_waitqueue,
547							rovr);
548		}
549	}
550	spin_unlock(&clnt->cl_lock);
551}
552EXPORT_SYMBOL_GPL(rpc_killall_tasks);
553
554/*
555 * Properly shut down an RPC client, terminating all outstanding
556 * requests.
557 */
558void rpc_shutdown_client(struct rpc_clnt *clnt)
559{
560	dprintk_rcu("RPC:       shutting down %s client for %s\n",
561			clnt->cl_protname,
562			rcu_dereference(clnt->cl_xprt)->servername);
563
564	while (!list_empty(&clnt->cl_tasks)) {
565		rpc_killall_tasks(clnt);
566		wait_event_timeout(destroy_wait,
567			list_empty(&clnt->cl_tasks), 1*HZ);
568	}
569
570	rpc_release_client(clnt);
571}
572EXPORT_SYMBOL_GPL(rpc_shutdown_client);
573
574/*
575 * Free an RPC client
576 */
577static void
578rpc_free_client(struct rpc_clnt *clnt)
579{
580	dprintk_rcu("RPC:       destroying %s client for %s\n",
581			clnt->cl_protname,
582			rcu_dereference(clnt->cl_xprt)->servername);
583	if (clnt->cl_parent != clnt)
584		rpc_release_client(clnt->cl_parent);
585	rpc_unregister_client(clnt);
586	rpc_clnt_remove_pipedir(clnt);
587	rpc_free_iostats(clnt->cl_metrics);
588	kfree(clnt->cl_principal);
589	clnt->cl_metrics = NULL;
590	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
591	rpciod_down();
592	kfree(clnt);
593}
594
595/*
596 * Free an RPC client
597 */
598static void
599rpc_free_auth(struct rpc_clnt *clnt)
600{
601	if (clnt->cl_auth == NULL) {
602		rpc_free_client(clnt);
603		return;
604	}
605
606	/*
607	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
608	 *       release remaining GSS contexts. This mechanism ensures
609	 *       that it can do so safely.
610	 */
611	atomic_inc(&clnt->cl_count);
612	rpcauth_release(clnt->cl_auth);
613	clnt->cl_auth = NULL;
614	if (atomic_dec_and_test(&clnt->cl_count))
615		rpc_free_client(clnt);
616}
617
618/*
619 * Release reference to the RPC client
620 */
621void
622rpc_release_client(struct rpc_clnt *clnt)
623{
624	dprintk("RPC:       rpc_release_client(%p)\n", clnt);
625
626	if (list_empty(&clnt->cl_tasks))
627		wake_up(&destroy_wait);
628	if (atomic_dec_and_test(&clnt->cl_count))
629		rpc_free_auth(clnt);
630}
631
632/**
633 * rpc_bind_new_program - bind a new RPC program to an existing client
634 * @old: old rpc_client
635 * @program: rpc program to set
636 * @vers: rpc program version
637 *
638 * Clones the rpc client and sets up a new RPC program. This is mainly
639 * of use for enabling different RPC programs to share the same transport.
640 * The Sun NFSv2/v3 ACL protocol can do this.
641 */
642struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
643				      const struct rpc_program *program,
644				      u32 vers)
645{
646	struct rpc_clnt *clnt;
647	const struct rpc_version *version;
648	int err;
649
650	BUG_ON(vers >= program->nrvers || !program->version[vers]);
651	version = program->version[vers];
652	clnt = rpc_clone_client(old);
653	if (IS_ERR(clnt))
654		goto out;
655	clnt->cl_procinfo = version->procs;
656	clnt->cl_maxproc  = version->nrprocs;
657	clnt->cl_protname = program->name;
658	clnt->cl_prog     = program->number;
659	clnt->cl_vers     = version->number;
660	clnt->cl_stats    = program->stats;
661	err = rpc_ping(clnt);
662	if (err != 0) {
663		rpc_shutdown_client(clnt);
664		clnt = ERR_PTR(err);
665	}
666out:
667	return clnt;
668}
669EXPORT_SYMBOL_GPL(rpc_bind_new_program);
670
671void rpc_task_release_client(struct rpc_task *task)
672{
673	struct rpc_clnt *clnt = task->tk_client;
674
675	if (clnt != NULL) {
676		/* Remove from client task list */
677		spin_lock(&clnt->cl_lock);
678		list_del(&task->tk_task);
679		spin_unlock(&clnt->cl_lock);
680		task->tk_client = NULL;
681
682		rpc_release_client(clnt);
683	}
684}
685
686static
687void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
688{
689	if (clnt != NULL) {
690		rpc_task_release_client(task);
691		task->tk_client = clnt;
692		atomic_inc(&clnt->cl_count);
693		if (clnt->cl_softrtry)
694			task->tk_flags |= RPC_TASK_SOFT;
695		/* Add to the client's list of all tasks */
696		spin_lock(&clnt->cl_lock);
697		list_add_tail(&task->tk_task, &clnt->cl_tasks);
698		spin_unlock(&clnt->cl_lock);
699	}
700}
701
702void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
703{
704	rpc_task_release_client(task);
705	rpc_task_set_client(task, clnt);
706}
707EXPORT_SYMBOL_GPL(rpc_task_reset_client);
708
709
710static void
711rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
712{
713	if (msg != NULL) {
714		task->tk_msg.rpc_proc = msg->rpc_proc;
715		task->tk_msg.rpc_argp = msg->rpc_argp;
716		task->tk_msg.rpc_resp = msg->rpc_resp;
717		if (msg->rpc_cred != NULL)
718			task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
719	}
720}
721
722/*
723 * Default callback for async RPC calls
724 */
725static void
726rpc_default_callback(struct rpc_task *task, void *data)
727{
728}
729
730static const struct rpc_call_ops rpc_default_ops = {
731	.rpc_call_done = rpc_default_callback,
732};
733
734/**
735 * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
736 * @task_setup_data: pointer to task initialisation data
737 */
738struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
739{
740	struct rpc_task *task;
741
742	task = rpc_new_task(task_setup_data);
743	if (IS_ERR(task))
744		goto out;
745
746	rpc_task_set_client(task, task_setup_data->rpc_client);
747	rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
748
749	if (task->tk_action == NULL)
750		rpc_call_start(task);
751
752	atomic_inc(&task->tk_count);
753	rpc_execute(task);
754out:
755	return task;
756}
757EXPORT_SYMBOL_GPL(rpc_run_task);
758
759/**
760 * rpc_call_sync - Perform a synchronous RPC call
761 * @clnt: pointer to RPC client
762 * @msg: RPC call parameters
763 * @flags: RPC call flags
764 */
765int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
766{
767	struct rpc_task	*task;
768	struct rpc_task_setup task_setup_data = {
769		.rpc_client = clnt,
770		.rpc_message = msg,
771		.callback_ops = &rpc_default_ops,
772		.flags = flags,
773	};
774	int status;
775
776	BUG_ON(flags & RPC_TASK_ASYNC);
777
778	task = rpc_run_task(&task_setup_data);
779	if (IS_ERR(task))
780		return PTR_ERR(task);
781	status = task->tk_status;
782	rpc_put_task(task);
783	return status;
784}
785EXPORT_SYMBOL_GPL(rpc_call_sync);
786
787/**
788 * rpc_call_async - Perform an asynchronous RPC call
789 * @clnt: pointer to RPC client
790 * @msg: RPC call parameters
791 * @flags: RPC call flags
792 * @tk_ops: RPC call ops
793 * @data: user call data
794 */
795int
796rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
797	       const struct rpc_call_ops *tk_ops, void *data)
798{
799	struct rpc_task	*task;
800	struct rpc_task_setup task_setup_data = {
801		.rpc_client = clnt,
802		.rpc_message = msg,
803		.callback_ops = tk_ops,
804		.callback_data = data,
805		.flags = flags|RPC_TASK_ASYNC,
806	};
807
808	task = rpc_run_task(&task_setup_data);
809	if (IS_ERR(task))
810		return PTR_ERR(task);
811	rpc_put_task(task);
812	return 0;
813}
814EXPORT_SYMBOL_GPL(rpc_call_async);
815
816#if defined(CONFIG_SUNRPC_BACKCHANNEL)
817/**
818 * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
819 * rpc_execute against it
820 * @req: RPC request
821 * @tk_ops: RPC call ops
822 */
823struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
824				const struct rpc_call_ops *tk_ops)
825{
826	struct rpc_task *task;
827	struct xdr_buf *xbufp = &req->rq_snd_buf;
828	struct rpc_task_setup task_setup_data = {
829		.callback_ops = tk_ops,
830	};
831
832	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
833	/*
834	 * Create an rpc_task to send the data
835	 */
836	task = rpc_new_task(&task_setup_data);
837	if (IS_ERR(task)) {
838		xprt_free_bc_request(req);
839		goto out;
840	}
841	task->tk_rqstp = req;
842
843	/*
844	 * Set up the xdr_buf length.
845	 * This also indicates that the buffer is XDR encoded already.
846	 */
847	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
848			xbufp->tail[0].iov_len;
849
850	task->tk_action = call_bc_transmit;
851	atomic_inc(&task->tk_count);
852	BUG_ON(atomic_read(&task->tk_count) != 2);
853	rpc_execute(task);
854
855out:
856	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
857	return task;
858}
859#endif /* CONFIG_SUNRPC_BACKCHANNEL */
860
861void
862rpc_call_start(struct rpc_task *task)
863{
864	task->tk_action = call_start;
865}
866EXPORT_SYMBOL_GPL(rpc_call_start);
867
868/**
869 * rpc_peeraddr - extract remote peer address from clnt's xprt
870 * @clnt: RPC client structure
871 * @buf: target buffer
872 * @bufsize: length of target buffer
873 *
874 * Returns the number of bytes that are actually in the stored address.
875 */
876size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
877{
878	size_t bytes;
879	struct rpc_xprt *xprt;
880
881	rcu_read_lock();
882	xprt = rcu_dereference(clnt->cl_xprt);
883
884	bytes = xprt->addrlen;
885	if (bytes > bufsize)
886		bytes = bufsize;
887	memcpy(buf, &xprt->addr, bytes);
888	rcu_read_unlock();
889
890	return bytes;
891}
892EXPORT_SYMBOL_GPL(rpc_peeraddr);
893
894/**
895 * rpc_peeraddr2str - return remote peer address in printable format
896 * @clnt: RPC client structure
897 * @format: address format
898 *
899 * NB: the lifetime of the memory referenced by the returned pointer is
900 * the same as the rpc_xprt itself.  As long as the caller uses this
901 * pointer, it must hold the RCU read lock.
902 */
903const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
904			     enum rpc_display_format_t format)
905{
906	struct rpc_xprt *xprt;
907
908	xprt = rcu_dereference(clnt->cl_xprt);
909
910	if (xprt->address_strings[format] != NULL)
911		return xprt->address_strings[format];
912	else
913		return "unprintable";
914}
915EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
916
917static const struct sockaddr_in rpc_inaddr_loopback = {
918	.sin_family		= AF_INET,
919	.sin_addr.s_addr	= htonl(INADDR_ANY),
920};
921
922static const struct sockaddr_in6 rpc_in6addr_loopback = {
923	.sin6_family		= AF_INET6,
924	.sin6_addr		= IN6ADDR_ANY_INIT,
925};
926
927/*
928 * Try a getsockname() on a connected datagram socket.  Using a
929 * connected datagram socket prevents leaving a socket in TIME_WAIT.
930 * This conserves the ephemeral port number space.
931 *
932 * Returns zero and fills in "buf" if successful; otherwise, a
933 * negative errno is returned.
934 */
935static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
936			struct sockaddr *buf, int buflen)
937{
938	struct socket *sock;
939	int err;
940
941	err = __sock_create(net, sap->sa_family,
942				SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
943	if (err < 0) {
944		dprintk("RPC:       can't create UDP socket (%d)\n", err);
945		goto out;
946	}
947
948	switch (sap->sa_family) {
949	case AF_INET:
950		err = kernel_bind(sock,
951				(struct sockaddr *)&rpc_inaddr_loopback,
952				sizeof(rpc_inaddr_loopback));
953		break;
954	case AF_INET6:
955		err = kernel_bind(sock,
956				(struct sockaddr *)&rpc_in6addr_loopback,
957				sizeof(rpc_in6addr_loopback));
958		break;
959	default:
960		err = -EAFNOSUPPORT;
961		goto out;
962	}
963	if (err < 0) {
964		dprintk("RPC:       can't bind UDP socket (%d)\n", err);
965		goto out_release;
966	}
967
968	err = kernel_connect(sock, sap, salen, 0);
969	if (err < 0) {
970		dprintk("RPC:       can't connect UDP socket (%d)\n", err);
971		goto out_release;
972	}
973
974	err = kernel_getsockname(sock, buf, &buflen);
975	if (err < 0) {
976		dprintk("RPC:       getsockname failed (%d)\n", err);
977		goto out_release;
978	}
979
980	err = 0;
981	if (buf->sa_family == AF_INET6) {
982		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
983		sin6->sin6_scope_id = 0;
984	}
985	dprintk("RPC:       %s succeeded\n", __func__);
986
987out_release:
988	sock_release(sock);
989out:
990	return err;
991}
992
993/*
994 * Scraping a connected socket failed, so we don't have a useable
995 * local address.  Fallback: generate an address that will prevent
996 * the server from calling us back.
997 *
998 * Returns zero and fills in "buf" if successful; otherwise, a
999 * negative errno is returned.
1000 */
1001static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1002{
1003	switch (family) {
1004	case AF_INET:
1005		if (buflen < sizeof(rpc_inaddr_loopback))
1006			return -EINVAL;
1007		memcpy(buf, &rpc_inaddr_loopback,
1008				sizeof(rpc_inaddr_loopback));
1009		break;
1010	case AF_INET6:
1011		if (buflen < sizeof(rpc_in6addr_loopback))
1012			return -EINVAL;
1013		memcpy(buf, &rpc_in6addr_loopback,
1014				sizeof(rpc_in6addr_loopback));
1015	default:
1016		dprintk("RPC:       %s: address family not supported\n",
1017			__func__);
1018		return -EAFNOSUPPORT;
1019	}
1020	dprintk("RPC:       %s: succeeded\n", __func__);
1021	return 0;
1022}
1023
1024/**
1025 * rpc_localaddr - discover local endpoint address for an RPC client
1026 * @clnt: RPC client structure
1027 * @buf: target buffer
1028 * @buflen: size of target buffer, in bytes
1029 *
1030 * Returns zero and fills in "buf" and "buflen" if successful;
1031 * otherwise, a negative errno is returned.
1032 *
1033 * This works even if the underlying transport is not currently connected,
1034 * or if the upper layer never previously provided a source address.
1035 *
1036 * The result of this function call is transient: multiple calls in
1037 * succession may give different results, depending on how local
1038 * networking configuration changes over time.
1039 */
1040int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1041{
1042	struct sockaddr_storage address;
1043	struct sockaddr *sap = (struct sockaddr *)&address;
1044	struct rpc_xprt *xprt;
1045	struct net *net;
1046	size_t salen;
1047	int err;
1048
1049	rcu_read_lock();
1050	xprt = rcu_dereference(clnt->cl_xprt);
1051	salen = xprt->addrlen;
1052	memcpy(sap, &xprt->addr, salen);
1053	net = get_net(xprt->xprt_net);
1054	rcu_read_unlock();
1055
1056	rpc_set_port(sap, 0);
1057	err = rpc_sockname(net, sap, salen, buf, buflen);
1058	put_net(net);
1059	if (err != 0)
1060		/* Couldn't discover local address, return ANYADDR */
1061		return rpc_anyaddr(sap->sa_family, buf, buflen);
1062	return 0;
1063}
1064EXPORT_SYMBOL_GPL(rpc_localaddr);
1065
1066void
1067rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1068{
1069	struct rpc_xprt *xprt;
1070
1071	rcu_read_lock();
1072	xprt = rcu_dereference(clnt->cl_xprt);
1073	if (xprt->ops->set_buffer_size)
1074		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1075	rcu_read_unlock();
1076}
1077EXPORT_SYMBOL_GPL(rpc_setbufsize);
1078
1079/**
1080 * rpc_protocol - Get transport protocol number for an RPC client
1081 * @clnt: RPC client to query
1082 *
1083 */
1084int rpc_protocol(struct rpc_clnt *clnt)
1085{
1086	int protocol;
1087
1088	rcu_read_lock();
1089	protocol = rcu_dereference(clnt->cl_xprt)->prot;
1090	rcu_read_unlock();
1091	return protocol;
1092}
1093EXPORT_SYMBOL_GPL(rpc_protocol);
1094
1095/**
1096 * rpc_net_ns - Get the network namespace for this RPC client
1097 * @clnt: RPC client to query
1098 *
1099 */
1100struct net *rpc_net_ns(struct rpc_clnt *clnt)
1101{
1102	struct net *ret;
1103
1104	rcu_read_lock();
1105	ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1106	rcu_read_unlock();
1107	return ret;
1108}
1109EXPORT_SYMBOL_GPL(rpc_net_ns);
1110
1111/**
1112 * rpc_max_payload - Get maximum payload size for a transport, in bytes
1113 * @clnt: RPC client to query
1114 *
1115 * For stream transports, this is one RPC record fragment (see RFC
1116 * 1831), as we don't support multi-record requests yet.  For datagram
1117 * transports, this is the size of an IP packet minus the IP, UDP, and
1118 * RPC header sizes.
1119 */
1120size_t rpc_max_payload(struct rpc_clnt *clnt)
1121{
1122	size_t ret;
1123
1124	rcu_read_lock();
1125	ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1126	rcu_read_unlock();
1127	return ret;
1128}
1129EXPORT_SYMBOL_GPL(rpc_max_payload);
1130
1131/**
1132 * rpc_force_rebind - force transport to check that remote port is unchanged
1133 * @clnt: client to rebind
1134 *
1135 */
1136void rpc_force_rebind(struct rpc_clnt *clnt)
1137{
1138	if (clnt->cl_autobind) {
1139		rcu_read_lock();
1140		xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1141		rcu_read_unlock();
1142	}
1143}
1144EXPORT_SYMBOL_GPL(rpc_force_rebind);
1145
1146/*
1147 * Restart an (async) RPC call from the call_prepare state.
1148 * Usually called from within the exit handler.
1149 */
1150int
1151rpc_restart_call_prepare(struct rpc_task *task)
1152{
1153	if (RPC_ASSASSINATED(task))
1154		return 0;
1155	task->tk_action = call_start;
1156	if (task->tk_ops->rpc_call_prepare != NULL)
1157		task->tk_action = rpc_prepare_task;
1158	return 1;
1159}
1160EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1161
1162/*
1163 * Restart an (async) RPC call. Usually called from within the
1164 * exit handler.
1165 */
1166int
1167rpc_restart_call(struct rpc_task *task)
1168{
1169	if (RPC_ASSASSINATED(task))
1170		return 0;
1171	task->tk_action = call_start;
1172	return 1;
1173}
1174EXPORT_SYMBOL_GPL(rpc_restart_call);
1175
1176#ifdef RPC_DEBUG
1177static const char *rpc_proc_name(const struct rpc_task *task)
1178{
1179	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1180
1181	if (proc) {
1182		if (proc->p_name)
1183			return proc->p_name;
1184		else
1185			return "NULL";
1186	} else
1187		return "no proc";
1188}
1189#endif
1190
1191/*
1192 * 0.  Initial state
1193 *
1194 *     Other FSM states can be visited zero or more times, but
1195 *     this state is visited exactly once for each RPC.
1196 */
1197static void
1198call_start(struct rpc_task *task)
1199{
1200	struct rpc_clnt	*clnt = task->tk_client;
1201
1202	dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1203			clnt->cl_protname, clnt->cl_vers,
1204			rpc_proc_name(task),
1205			(RPC_IS_ASYNC(task) ? "async" : "sync"));
1206
1207	/* Increment call count */
1208	task->tk_msg.rpc_proc->p_count++;
1209	clnt->cl_stats->rpccnt++;
1210	task->tk_action = call_reserve;
1211}
1212
1213/*
1214 * 1.	Reserve an RPC call slot
1215 */
1216static void
1217call_reserve(struct rpc_task *task)
1218{
1219	dprint_status(task);
1220
1221	task->tk_status  = 0;
1222	task->tk_action  = call_reserveresult;
1223	xprt_reserve(task);
1224}
1225
1226/*
1227 * 1b.	Grok the result of xprt_reserve()
1228 */
1229static void
1230call_reserveresult(struct rpc_task *task)
1231{
1232	int status = task->tk_status;
1233
1234	dprint_status(task);
1235
1236	/*
1237	 * After a call to xprt_reserve(), we must have either
1238	 * a request slot or else an error status.
1239	 */
1240	task->tk_status = 0;
1241	if (status >= 0) {
1242		if (task->tk_rqstp) {
1243			task->tk_action = call_refresh;
1244			return;
1245		}
1246
1247		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1248				__func__, status);
1249		rpc_exit(task, -EIO);
1250		return;
1251	}
1252
1253	/*
1254	 * Even though there was an error, we may have acquired
1255	 * a request slot somehow.  Make sure not to leak it.
1256	 */
1257	if (task->tk_rqstp) {
1258		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1259				__func__, status);
1260		xprt_release(task);
1261	}
1262
1263	switch (status) {
1264	case -EAGAIN:	/* woken up; retry */
1265		task->tk_action = call_reserve;
1266		return;
1267	case -EIO:	/* probably a shutdown */
1268		break;
1269	default:
1270		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1271				__func__, status);
1272		break;
1273	}
1274	rpc_exit(task, status);
1275}
1276
1277/*
1278 * 2.	Bind and/or refresh the credentials
1279 */
1280static void
1281call_refresh(struct rpc_task *task)
1282{
1283	dprint_status(task);
1284
1285	task->tk_action = call_refreshresult;
1286	task->tk_status = 0;
1287	task->tk_client->cl_stats->rpcauthrefresh++;
1288	rpcauth_refreshcred(task);
1289}
1290
1291/*
1292 * 2a.	Process the results of a credential refresh
1293 */
1294static void
1295call_refreshresult(struct rpc_task *task)
1296{
1297	int status = task->tk_status;
1298
1299	dprint_status(task);
1300
1301	task->tk_status = 0;
1302	task->tk_action = call_refresh;
1303	switch (status) {
1304	case 0:
1305		if (rpcauth_uptodatecred(task))
1306			task->tk_action = call_allocate;
1307		return;
1308	case -ETIMEDOUT:
1309		rpc_delay(task, 3*HZ);
1310	case -EAGAIN:
1311		status = -EACCES;
1312		if (!task->tk_cred_retry)
1313			break;
1314		task->tk_cred_retry--;
1315		dprintk("RPC: %5u %s: retry refresh creds\n",
1316				task->tk_pid, __func__);
1317		return;
1318	}
1319	dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1320				task->tk_pid, __func__, status);
1321	rpc_exit(task, status);
1322}
1323
1324/*
1325 * 2b.	Allocate the buffer. For details, see sched.c:rpc_malloc.
1326 *	(Note: buffer memory is freed in xprt_release).
1327 */
1328static void
1329call_allocate(struct rpc_task *task)
1330{
1331	unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1332	struct rpc_rqst *req = task->tk_rqstp;
1333	struct rpc_xprt *xprt = task->tk_xprt;
1334	struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1335
1336	dprint_status(task);
1337
1338	task->tk_status = 0;
1339	task->tk_action = call_bind;
1340
1341	if (req->rq_buffer)
1342		return;
1343
1344	if (proc->p_proc != 0) {
1345		BUG_ON(proc->p_arglen == 0);
1346		if (proc->p_decode != NULL)
1347			BUG_ON(proc->p_replen == 0);
1348	}
1349
1350	/*
1351	 * Calculate the size (in quads) of the RPC call
1352	 * and reply headers, and convert both values
1353	 * to byte sizes.
1354	 */
1355	req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1356	req->rq_callsize <<= 2;
1357	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1358	req->rq_rcvsize <<= 2;
1359
1360	req->rq_buffer = xprt->ops->buf_alloc(task,
1361					req->rq_callsize + req->rq_rcvsize);
1362	if (req->rq_buffer != NULL)
1363		return;
1364
1365	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1366
1367	if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1368		task->tk_action = call_allocate;
1369		rpc_delay(task, HZ>>4);
1370		return;
1371	}
1372
1373	rpc_exit(task, -ERESTARTSYS);
1374}
1375
1376static inline int
1377rpc_task_need_encode(struct rpc_task *task)
1378{
1379	return task->tk_rqstp->rq_snd_buf.len == 0;
1380}
1381
1382static inline void
1383rpc_task_force_reencode(struct rpc_task *task)
1384{
1385	task->tk_rqstp->rq_snd_buf.len = 0;
1386	task->tk_rqstp->rq_bytes_sent = 0;
1387}
1388
1389static inline void
1390rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1391{
1392	buf->head[0].iov_base = start;
1393	buf->head[0].iov_len = len;
1394	buf->tail[0].iov_len = 0;
1395	buf->page_len = 0;
1396	buf->flags = 0;
1397	buf->len = 0;
1398	buf->buflen = len;
1399}
1400
1401/*
1402 * 3.	Encode arguments of an RPC call
1403 */
1404static void
1405rpc_xdr_encode(struct rpc_task *task)
1406{
1407	struct rpc_rqst	*req = task->tk_rqstp;
1408	kxdreproc_t	encode;
1409	__be32		*p;
1410
1411	dprint_status(task);
1412
1413	rpc_xdr_buf_init(&req->rq_snd_buf,
1414			 req->rq_buffer,
1415			 req->rq_callsize);
1416	rpc_xdr_buf_init(&req->rq_rcv_buf,
1417			 (char *)req->rq_buffer + req->rq_callsize,
1418			 req->rq_rcvsize);
1419
1420	p = rpc_encode_header(task);
1421	if (p == NULL) {
1422		printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1423		rpc_exit(task, -EIO);
1424		return;
1425	}
1426
1427	encode = task->tk_msg.rpc_proc->p_encode;
1428	if (encode == NULL)
1429		return;
1430
1431	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1432			task->tk_msg.rpc_argp);
1433}
1434
1435/*
1436 * 4.	Get the server port number if not yet set
1437 */
1438static void
1439call_bind(struct rpc_task *task)
1440{
1441	struct rpc_xprt *xprt = task->tk_xprt;
1442
1443	dprint_status(task);
1444
1445	task->tk_action = call_connect;
1446	if (!xprt_bound(xprt)) {
1447		task->tk_action = call_bind_status;
1448		task->tk_timeout = xprt->bind_timeout;
1449		xprt->ops->rpcbind(task);
1450	}
1451}
1452
1453/*
1454 * 4a.	Sort out bind result
1455 */
1456static void
1457call_bind_status(struct rpc_task *task)
1458{
1459	int status = -EIO;
1460
1461	if (task->tk_status >= 0) {
1462		dprint_status(task);
1463		task->tk_status = 0;
1464		task->tk_action = call_connect;
1465		return;
1466	}
1467
1468	trace_rpc_bind_status(task);
1469	switch (task->tk_status) {
1470	case -ENOMEM:
1471		dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1472		rpc_delay(task, HZ >> 2);
1473		goto retry_timeout;
1474	case -EACCES:
1475		dprintk("RPC: %5u remote rpcbind: RPC program/version "
1476				"unavailable\n", task->tk_pid);
1477		/* fail immediately if this is an RPC ping */
1478		if (task->tk_msg.rpc_proc->p_proc == 0) {
1479			status = -EOPNOTSUPP;
1480			break;
1481		}
1482		if (task->tk_rebind_retry == 0)
1483			break;
1484		task->tk_rebind_retry--;
1485		rpc_delay(task, 3*HZ);
1486		goto retry_timeout;
1487	case -ETIMEDOUT:
1488		dprintk("RPC: %5u rpcbind request timed out\n",
1489				task->tk_pid);
1490		goto retry_timeout;
1491	case -EPFNOSUPPORT:
1492		/* server doesn't support any rpcbind version we know of */
1493		dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1494				task->tk_pid);
1495		break;
1496	case -EPROTONOSUPPORT:
1497		dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1498				task->tk_pid);
1499		task->tk_status = 0;
1500		task->tk_action = call_bind;
1501		return;
1502	case -ECONNREFUSED:		/* connection problems */
1503	case -ECONNRESET:
1504	case -ENOTCONN:
1505	case -EHOSTDOWN:
1506	case -EHOSTUNREACH:
1507	case -ENETUNREACH:
1508	case -EPIPE:
1509		dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1510				task->tk_pid, task->tk_status);
1511		if (!RPC_IS_SOFTCONN(task)) {
1512			rpc_delay(task, 5*HZ);
1513			goto retry_timeout;
1514		}
1515		status = task->tk_status;
1516		break;
1517	default:
1518		dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1519				task->tk_pid, -task->tk_status);
1520	}
1521
1522	rpc_exit(task, status);
1523	return;
1524
1525retry_timeout:
1526	task->tk_action = call_timeout;
1527}
1528
1529/*
1530 * 4b.	Connect to the RPC server
1531 */
1532static void
1533call_connect(struct rpc_task *task)
1534{
1535	struct rpc_xprt *xprt = task->tk_xprt;
1536
1537	dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1538			task->tk_pid, xprt,
1539			(xprt_connected(xprt) ? "is" : "is not"));
1540
1541	task->tk_action = call_transmit;
1542	if (!xprt_connected(xprt)) {
1543		task->tk_action = call_connect_status;
1544		if (task->tk_status < 0)
1545			return;
1546		xprt_connect(task);
1547	}
1548}
1549
1550/*
1551 * 4c.	Sort out connect result
1552 */
1553static void
1554call_connect_status(struct rpc_task *task)
1555{
1556	struct rpc_clnt *clnt = task->tk_client;
1557	int status = task->tk_status;
1558
1559	dprint_status(task);
1560
1561	task->tk_status = 0;
1562	if (status >= 0 || status == -EAGAIN) {
1563		clnt->cl_stats->netreconn++;
1564		task->tk_action = call_transmit;
1565		return;
1566	}
1567
1568	trace_rpc_connect_status(task, status);
1569	switch (status) {
1570		/* if soft mounted, test if we've timed out */
1571	case -ETIMEDOUT:
1572		task->tk_action = call_timeout;
1573		break;
1574	default:
1575		rpc_exit(task, -EIO);
1576	}
1577}
1578
1579/*
1580 * 5.	Transmit the RPC request, and wait for reply
1581 */
1582static void
1583call_transmit(struct rpc_task *task)
1584{
1585	dprint_status(task);
1586
1587	task->tk_action = call_status;
1588	if (task->tk_status < 0)
1589		return;
1590	task->tk_status = xprt_prepare_transmit(task);
1591	if (task->tk_status != 0)
1592		return;
1593	task->tk_action = call_transmit_status;
1594	/* Encode here so that rpcsec_gss can use correct sequence number. */
1595	if (rpc_task_need_encode(task)) {
1596		BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
1597		rpc_xdr_encode(task);
1598		/* Did the encode result in an error condition? */
1599		if (task->tk_status != 0) {
1600			/* Was the error nonfatal? */
1601			if (task->tk_status == -EAGAIN)
1602				rpc_delay(task, HZ >> 4);
1603			else
1604				rpc_exit(task, task->tk_status);
1605			return;
1606		}
1607	}
1608	xprt_transmit(task);
1609	if (task->tk_status < 0)
1610		return;
1611	/*
1612	 * On success, ensure that we call xprt_end_transmit() before sleeping
1613	 * in order to allow access to the socket to other RPC requests.
1614	 */
1615	call_transmit_status(task);
1616	if (rpc_reply_expected(task))
1617		return;
1618	task->tk_action = rpc_exit_task;
1619	rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
1620}
1621
1622/*
1623 * 5a.	Handle cleanup after a transmission
1624 */
1625static void
1626call_transmit_status(struct rpc_task *task)
1627{
1628	task->tk_action = call_status;
1629
1630	/*
1631	 * Common case: success.  Force the compiler to put this
1632	 * test first.
1633	 */
1634	if (task->tk_status == 0) {
1635		xprt_end_transmit(task);
1636		rpc_task_force_reencode(task);
1637		return;
1638	}
1639
1640	switch (task->tk_status) {
1641	case -EAGAIN:
1642		break;
1643	default:
1644		dprint_status(task);
1645		xprt_end_transmit(task);
1646		rpc_task_force_reencode(task);
1647		break;
1648		/*
1649		 * Special cases: if we've been waiting on the
1650		 * socket's write_space() callback, or if the
1651		 * socket just returned a connection error,
1652		 * then hold onto the transport lock.
1653		 */
1654	case -ECONNREFUSED:
1655	case -EHOSTDOWN:
1656	case -EHOSTUNREACH:
1657	case -ENETUNREACH:
1658		if (RPC_IS_SOFTCONN(task)) {
1659			xprt_end_transmit(task);
1660			rpc_exit(task, task->tk_status);
1661			break;
1662		}
1663	case -ECONNRESET:
1664	case -ENOTCONN:
1665	case -EPIPE:
1666		rpc_task_force_reencode(task);
1667	}
1668}
1669
1670#if defined(CONFIG_SUNRPC_BACKCHANNEL)
1671/*
1672 * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
1673 * addition, disconnect on connectivity errors.
1674 */
1675static void
1676call_bc_transmit(struct rpc_task *task)
1677{
1678	struct rpc_rqst *req = task->tk_rqstp;
1679
1680	BUG_ON(task->tk_status != 0);
1681	task->tk_status = xprt_prepare_transmit(task);
1682	if (task->tk_status == -EAGAIN) {
1683		/*
1684		 * Could not reserve the transport. Try again after the
1685		 * transport is released.
1686		 */
1687		task->tk_status = 0;
1688		task->tk_action = call_bc_transmit;
1689		return;
1690	}
1691
1692	task->tk_action = rpc_exit_task;
1693	if (task->tk_status < 0) {
1694		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1695			"error: %d\n", task->tk_status);
1696		return;
1697	}
1698
1699	xprt_transmit(task);
1700	xprt_end_transmit(task);
1701	dprint_status(task);
1702	switch (task->tk_status) {
1703	case 0:
1704		/* Success */
1705		break;
1706	case -EHOSTDOWN:
1707	case -EHOSTUNREACH:
1708	case -ENETUNREACH:
1709	case -ETIMEDOUT:
1710		/*
1711		 * Problem reaching the server.  Disconnect and let the
1712		 * forechannel reestablish the connection.  The server will
1713		 * have to retransmit the backchannel request and we'll
1714		 * reprocess it.  Since these ops are idempotent, there's no
1715		 * need to cache our reply at this time.
1716		 */
1717		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1718			"error: %d\n", task->tk_status);
1719		xprt_conditional_disconnect(task->tk_xprt,
1720			req->rq_connect_cookie);
1721		break;
1722	default:
1723		/*
1724		 * We were unable to reply and will have to drop the
1725		 * request.  The server should reconnect and retransmit.
1726		 */
1727		BUG_ON(task->tk_status == -EAGAIN);
1728		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1729			"error: %d\n", task->tk_status);
1730		break;
1731	}
1732	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1733}
1734#endif /* CONFIG_SUNRPC_BACKCHANNEL */
1735
1736/*
1737 * 6.	Sort out the RPC call status
1738 */
1739static void
1740call_status(struct rpc_task *task)
1741{
1742	struct rpc_clnt	*clnt = task->tk_client;
1743	struct rpc_rqst	*req = task->tk_rqstp;
1744	int		status;
1745
1746	if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1747		task->tk_status = req->rq_reply_bytes_recvd;
1748
1749	dprint_status(task);
1750
1751	status = task->tk_status;
1752	if (status >= 0) {
1753		task->tk_action = call_decode;
1754		return;
1755	}
1756
1757	trace_rpc_call_status(task);
1758	task->tk_status = 0;
1759	switch(status) {
1760	case -EHOSTDOWN:
1761	case -EHOSTUNREACH:
1762	case -ENETUNREACH:
1763		/*
1764		 * Delay any retries for 3 seconds, then handle as if it
1765		 * were a timeout.
1766		 */
1767		rpc_delay(task, 3*HZ);
1768	case -ETIMEDOUT:
1769		task->tk_action = call_timeout;
1770		if (task->tk_client->cl_discrtry)
1771			xprt_conditional_disconnect(task->tk_xprt,
1772					req->rq_connect_cookie);
1773		break;
1774	case -ECONNRESET:
1775	case -ECONNREFUSED:
1776		rpc_force_rebind(clnt);
1777		rpc_delay(task, 3*HZ);
1778	case -EPIPE:
1779	case -ENOTCONN:
1780		task->tk_action = call_bind;
1781		break;
1782	case -EAGAIN:
1783		task->tk_action = call_transmit;
1784		break;
1785	case -EIO:
1786		/* shutdown or soft timeout */
1787		rpc_exit(task, status);
1788		break;
1789	default:
1790		if (clnt->cl_chatty)
1791			printk("%s: RPC call returned error %d\n",
1792			       clnt->cl_protname, -status);
1793		rpc_exit(task, status);
1794	}
1795}
1796
1797/*
1798 * 6a.	Handle RPC timeout
1799 * 	We do not release the request slot, so we keep using the
1800 *	same XID for all retransmits.
1801 */
1802static void
1803call_timeout(struct rpc_task *task)
1804{
1805	struct rpc_clnt	*clnt = task->tk_client;
1806
1807	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1808		dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1809		goto retry;
1810	}
1811
1812	dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1813	task->tk_timeouts++;
1814
1815	if (RPC_IS_SOFTCONN(task)) {
1816		rpc_exit(task, -ETIMEDOUT);
1817		return;
1818	}
1819	if (RPC_IS_SOFT(task)) {
1820		if (clnt->cl_chatty)
1821			rcu_read_lock();
1822			printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1823				clnt->cl_protname,
1824				rcu_dereference(clnt->cl_xprt)->servername);
1825			rcu_read_unlock();
1826		if (task->tk_flags & RPC_TASK_TIMEOUT)
1827			rpc_exit(task, -ETIMEDOUT);
1828		else
1829			rpc_exit(task, -EIO);
1830		return;
1831	}
1832
1833	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1834		task->tk_flags |= RPC_CALL_MAJORSEEN;
1835		if (clnt->cl_chatty) {
1836			rcu_read_lock();
1837			printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1838			clnt->cl_protname,
1839			rcu_dereference(clnt->cl_xprt)->servername);
1840			rcu_read_unlock();
1841		}
1842	}
1843	rpc_force_rebind(clnt);
1844	/*
1845	 * Did our request time out due to an RPCSEC_GSS out-of-sequence
1846	 * event? RFC2203 requires the server to drop all such requests.
1847	 */
1848	rpcauth_invalcred(task);
1849
1850retry:
1851	clnt->cl_stats->rpcretrans++;
1852	task->tk_action = call_bind;
1853	task->tk_status = 0;
1854}
1855
1856/*
1857 * 7.	Decode the RPC reply
1858 */
1859static void
1860call_decode(struct rpc_task *task)
1861{
1862	struct rpc_clnt	*clnt = task->tk_client;
1863	struct rpc_rqst	*req = task->tk_rqstp;
1864	kxdrdproc_t	decode = task->tk_msg.rpc_proc->p_decode;
1865	__be32		*p;
1866
1867	dprint_status(task);
1868
1869	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1870		if (clnt->cl_chatty) {
1871			rcu_read_lock();
1872			printk(KERN_NOTICE "%s: server %s OK\n",
1873				clnt->cl_protname,
1874				rcu_dereference(clnt->cl_xprt)->servername);
1875			rcu_read_unlock();
1876		}
1877		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1878	}
1879
1880	/*
1881	 * Ensure that we see all writes made by xprt_complete_rqst()
1882	 * before it changed req->rq_reply_bytes_recvd.
1883	 */
1884	smp_rmb();
1885	req->rq_rcv_buf.len = req->rq_private_buf.len;
1886
1887	/* Check that the softirq receive buffer is valid */
1888	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1889				sizeof(req->rq_rcv_buf)) != 0);
1890
1891	if (req->rq_rcv_buf.len < 12) {
1892		if (!RPC_IS_SOFT(task)) {
1893			task->tk_action = call_bind;
1894			clnt->cl_stats->rpcretrans++;
1895			goto out_retry;
1896		}
1897		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1898				clnt->cl_protname, task->tk_status);
1899		task->tk_action = call_timeout;
1900		goto out_retry;
1901	}
1902
1903	p = rpc_verify_header(task);
1904	if (IS_ERR(p)) {
1905		if (p == ERR_PTR(-EAGAIN))
1906			goto out_retry;
1907		return;
1908	}
1909
1910	task->tk_action = rpc_exit_task;
1911
1912	if (decode) {
1913		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1914						      task->tk_msg.rpc_resp);
1915	}
1916	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1917			task->tk_status);
1918	return;
1919out_retry:
1920	task->tk_status = 0;
1921	/* Note: rpc_verify_header() may have freed the RPC slot */
1922	if (task->tk_rqstp == req) {
1923		req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
1924		if (task->tk_client->cl_discrtry)
1925			xprt_conditional_disconnect(task->tk_xprt,
1926					req->rq_connect_cookie);
1927	}
1928}
1929
1930static __be32 *
1931rpc_encode_header(struct rpc_task *task)
1932{
1933	struct rpc_clnt *clnt = task->tk_client;
1934	struct rpc_rqst	*req = task->tk_rqstp;
1935	__be32		*p = req->rq_svec[0].iov_base;
1936
1937	/* FIXME: check buffer size? */
1938
1939	p = xprt_skip_transport_header(task->tk_xprt, p);
1940	*p++ = req->rq_xid;		/* XID */
1941	*p++ = htonl(RPC_CALL);		/* CALL */
1942	*p++ = htonl(RPC_VERSION);	/* RPC version */
1943	*p++ = htonl(clnt->cl_prog);	/* program number */
1944	*p++ = htonl(clnt->cl_vers);	/* program version */
1945	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
1946	p = rpcauth_marshcred(task, p);
1947	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1948	return p;
1949}
1950
1951static __be32 *
1952rpc_verify_header(struct rpc_task *task)
1953{
1954	struct rpc_clnt *clnt = task->tk_client;
1955	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1956	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1957	__be32	*p = iov->iov_base;
1958	u32 n;
1959	int error = -EACCES;
1960
1961	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1962		/* RFC-1014 says that the representation of XDR data must be a
1963		 * multiple of four bytes
1964		 * - if it isn't pointer subtraction in the NFS client may give
1965		 *   undefined results
1966		 */
1967		dprintk("RPC: %5u %s: XDR representation not a multiple of"
1968		       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
1969		       task->tk_rqstp->rq_rcv_buf.len);
1970		goto out_eio;
1971	}
1972	if ((len -= 3) < 0)
1973		goto out_overflow;
1974
1975	p += 1; /* skip XID */
1976	if ((n = ntohl(*p++)) != RPC_REPLY) {
1977		dprintk("RPC: %5u %s: not an RPC reply: %x\n",
1978			task->tk_pid, __func__, n);
1979		goto out_garbage;
1980	}
1981
1982	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1983		if (--len < 0)
1984			goto out_overflow;
1985		switch ((n = ntohl(*p++))) {
1986		case RPC_AUTH_ERROR:
1987			break;
1988		case RPC_MISMATCH:
1989			dprintk("RPC: %5u %s: RPC call version mismatch!\n",
1990				task->tk_pid, __func__);
1991			error = -EPROTONOSUPPORT;
1992			goto out_err;
1993		default:
1994			dprintk("RPC: %5u %s: RPC call rejected, "
1995				"unknown error: %x\n",
1996				task->tk_pid, __func__, n);
1997			goto out_eio;
1998		}
1999		if (--len < 0)
2000			goto out_overflow;
2001		switch ((n = ntohl(*p++))) {
2002		case RPC_AUTH_REJECTEDCRED:
2003		case RPC_AUTH_REJECTEDVERF:
2004		case RPCSEC_GSS_CREDPROBLEM:
2005		case RPCSEC_GSS_CTXPROBLEM:
2006			if (!task->tk_cred_retry)
2007				break;
2008			task->tk_cred_retry--;
2009			dprintk("RPC: %5u %s: retry stale creds\n",
2010					task->tk_pid, __func__);
2011			rpcauth_invalcred(task);
2012			/* Ensure we obtain a new XID! */
2013			xprt_release(task);
2014			task->tk_action = call_reserve;
2015			goto out_retry;
2016		case RPC_AUTH_BADCRED:
2017		case RPC_AUTH_BADVERF:
2018			/* possibly garbled cred/verf? */
2019			if (!task->tk_garb_retry)
2020				break;
2021			task->tk_garb_retry--;
2022			dprintk("RPC: %5u %s: retry garbled creds\n",
2023					task->tk_pid, __func__);
2024			task->tk_action = call_bind;
2025			goto out_retry;
2026		case RPC_AUTH_TOOWEAK:
2027			rcu_read_lock();
2028			printk(KERN_NOTICE "RPC: server %s requires stronger "
2029			       "authentication.\n",
2030			       rcu_dereference(clnt->cl_xprt)->servername);
2031			rcu_read_unlock();
2032			break;
2033		default:
2034			dprintk("RPC: %5u %s: unknown auth error: %x\n",
2035					task->tk_pid, __func__, n);
2036			error = -EIO;
2037		}
2038		dprintk("RPC: %5u %s: call rejected %d\n",
2039				task->tk_pid, __func__, n);
2040		goto out_err;
2041	}
2042	if (!(p = rpcauth_checkverf(task, p))) {
2043		dprintk("RPC: %5u %s: auth check failed\n",
2044				task->tk_pid, __func__);
2045		goto out_garbage;		/* bad verifier, retry */
2046	}
2047	len = p - (__be32 *)iov->iov_base - 1;
2048	if (len < 0)
2049		goto out_overflow;
2050	switch ((n = ntohl(*p++))) {
2051	case RPC_SUCCESS:
2052		return p;
2053	case RPC_PROG_UNAVAIL:
2054		dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2055				"by server %s\n", task->tk_pid, __func__,
2056				(unsigned int)clnt->cl_prog,
2057				rcu_dereference(clnt->cl_xprt)->servername);
2058		error = -EPFNOSUPPORT;
2059		goto out_err;
2060	case RPC_PROG_MISMATCH:
2061		dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2062				"by server %s\n", task->tk_pid, __func__,
2063				(unsigned int)clnt->cl_prog,
2064				(unsigned int)clnt->cl_vers,
2065				rcu_dereference(clnt->cl_xprt)->servername);
2066		error = -EPROTONOSUPPORT;
2067		goto out_err;
2068	case RPC_PROC_UNAVAIL:
2069		dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2070				"version %u on server %s\n",
2071				task->tk_pid, __func__,
2072				rpc_proc_name(task),
2073				clnt->cl_prog, clnt->cl_vers,
2074				rcu_dereference(clnt->cl_xprt)->servername);
2075		error = -EOPNOTSUPP;
2076		goto out_err;
2077	case RPC_GARBAGE_ARGS:
2078		dprintk("RPC: %5u %s: server saw garbage\n",
2079				task->tk_pid, __func__);
2080		break;			/* retry */
2081	default:
2082		dprintk("RPC: %5u %s: server accept status: %x\n",
2083				task->tk_pid, __func__, n);
2084		/* Also retry */
2085	}
2086
2087out_garbage:
2088	clnt->cl_stats->rpcgarbage++;
2089	if (task->tk_garb_retry) {
2090		task->tk_garb_retry--;
2091		dprintk("RPC: %5u %s: retrying\n",
2092				task->tk_pid, __func__);
2093		task->tk_action = call_bind;
2094out_retry:
2095		return ERR_PTR(-EAGAIN);
2096	}
2097out_eio:
2098	error = -EIO;
2099out_err:
2100	rpc_exit(task, error);
2101	dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2102			__func__, error);
2103	return ERR_PTR(error);
2104out_overflow:
2105	dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2106			__func__);
2107	goto out_garbage;
2108}
2109
2110static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2111{
2112}
2113
2114static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2115{
2116	return 0;
2117}
2118
2119static struct rpc_procinfo rpcproc_null = {
2120	.p_encode = rpcproc_encode_null,
2121	.p_decode = rpcproc_decode_null,
2122};
2123
2124static int rpc_ping(struct rpc_clnt *clnt)
2125{
2126	struct rpc_message msg = {
2127		.rpc_proc = &rpcproc_null,
2128	};
2129	int err;
2130	msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2131	err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2132	put_rpccred(msg.rpc_cred);
2133	return err;
2134}
2135
2136struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2137{
2138	struct rpc_message msg = {
2139		.rpc_proc = &rpcproc_null,
2140		.rpc_cred = cred,
2141	};
2142	struct rpc_task_setup task_setup_data = {
2143		.rpc_client = clnt,
2144		.rpc_message = &msg,
2145		.callback_ops = &rpc_default_ops,
2146		.flags = flags,
2147	};
2148	return rpc_run_task(&task_setup_data);
2149}
2150EXPORT_SYMBOL_GPL(rpc_call_null);
2151
2152#ifdef RPC_DEBUG
2153static void rpc_show_header(void)
2154{
2155	printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2156		"-timeout ---ops--\n");
2157}
2158
2159static void rpc_show_task(const struct rpc_clnt *clnt,
2160			  const struct rpc_task *task)
2161{
2162	const char *rpc_waitq = "none";
2163
2164	if (RPC_IS_QUEUED(task))
2165		rpc_waitq = rpc_qname(task->tk_waitqueue);
2166
2167	printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2168		task->tk_pid, task->tk_flags, task->tk_status,
2169		clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2170		clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
2171		task->tk_action, rpc_waitq);
2172}
2173
2174void rpc_show_tasks(struct net *net)
2175{
2176	struct rpc_clnt *clnt;
2177	struct rpc_task *task;
2178	int header = 0;
2179	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2180
2181	spin_lock(&sn->rpc_client_lock);
2182	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2183		spin_lock(&clnt->cl_lock);
2184		list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2185			if (!header) {
2186				rpc_show_header();
2187				header++;
2188			}
2189			rpc_show_task(clnt, task);
2190		}
2191		spin_unlock(&clnt->cl_lock);
2192	}
2193	spin_unlock(&sn->rpc_client_lock);
2194}
2195#endif
2196