conrpc.c revision d7e09d0397e84eefbabfd9cb353221f3c6448d83
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lnet/selftest/conctl.c
37 *
38 * Console framework rpcs
39 *
40 * Author: Liang Zhen <liang@whamcloud.com>
41 */
42
43
44#include <linux/libcfs/libcfs.h>
45#include <linux/lnet/lib-lnet.h>
46#include "timer.h"
47#include "conrpc.h"
48#include "console.h"
49
50void lstcon_rpc_stat_reply(lstcon_rpc_trans_t *, srpc_msg_t *,
51			   lstcon_node_t *, lstcon_trans_stat_t *);
52
53static void
54lstcon_rpc_done(srpc_client_rpc_t *rpc)
55{
56	lstcon_rpc_t *crpc = (lstcon_rpc_t *)rpc->crpc_priv;
57
58	LASSERT(crpc != NULL && rpc == crpc->crp_rpc);
59	LASSERT(crpc->crp_posted && !crpc->crp_finished);
60
61	spin_lock(&rpc->crpc_lock);
62
63	if (crpc->crp_trans == NULL) {
64		/* Orphan RPC is not in any transaction,
65		 * I'm just a poor body and nobody loves me */
66		spin_unlock(&rpc->crpc_lock);
67
68		/* release it */
69		lstcon_rpc_put(crpc);
70		return;
71	}
72
73	/* not an orphan RPC */
74	crpc->crp_finished = 1;
75
76	if (crpc->crp_stamp == 0) {
77		/* not aborted */
78		LASSERT (crpc->crp_status == 0);
79
80		crpc->crp_stamp  = cfs_time_current();
81		crpc->crp_status = rpc->crpc_status;
82	}
83
84	/* wakeup (transaction)thread if I'm the last RPC in the transaction */
85	if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining))
86		wake_up(&crpc->crp_trans->tas_waitq);
87
88	spin_unlock(&rpc->crpc_lock);
89}
90
91int
92lstcon_rpc_init(lstcon_node_t *nd, int service, unsigned feats,
93		int bulk_npg, int bulk_len, int embedded, lstcon_rpc_t *crpc)
94{
95	crpc->crp_rpc = sfw_create_rpc(nd->nd_id, service,
96				       feats, bulk_npg, bulk_len,
97				       lstcon_rpc_done, (void *)crpc);
98	if (crpc->crp_rpc == NULL)
99		return -ENOMEM;
100
101	crpc->crp_trans    = NULL;
102	crpc->crp_node     = nd;
103	crpc->crp_posted   = 0;
104	crpc->crp_finished = 0;
105	crpc->crp_unpacked = 0;
106	crpc->crp_status   = 0;
107	crpc->crp_stamp    = 0;
108	crpc->crp_embedded = embedded;
109	INIT_LIST_HEAD(&crpc->crp_link);
110
111	atomic_inc(&console_session.ses_rpc_counter);
112
113	return 0;
114}
115
116int
117lstcon_rpc_prep(lstcon_node_t *nd, int service, unsigned feats,
118		int bulk_npg, int bulk_len, lstcon_rpc_t **crpcpp)
119{
120	lstcon_rpc_t  *crpc = NULL;
121	int	    rc;
122
123	spin_lock(&console_session.ses_rpc_lock);
124
125	if (!list_empty(&console_session.ses_rpc_freelist)) {
126		crpc = list_entry(console_session.ses_rpc_freelist.next,
127				      lstcon_rpc_t, crp_link);
128		list_del_init(&crpc->crp_link);
129	}
130
131	spin_unlock(&console_session.ses_rpc_lock);
132
133	if (crpc == NULL) {
134		LIBCFS_ALLOC(crpc, sizeof(*crpc));
135		if (crpc == NULL)
136			return -ENOMEM;
137	}
138
139	rc = lstcon_rpc_init(nd, service, feats, bulk_npg, bulk_len, 0, crpc);
140	if (rc == 0) {
141		*crpcpp = crpc;
142		return 0;
143	}
144
145	LIBCFS_FREE(crpc, sizeof(*crpc));
146
147	return rc;
148}
149
150void
151lstcon_rpc_put(lstcon_rpc_t *crpc)
152{
153	srpc_bulk_t *bulk = &crpc->crp_rpc->crpc_bulk;
154	int	  i;
155
156	LASSERT (list_empty(&crpc->crp_link));
157
158	for (i = 0; i < bulk->bk_niov; i++) {
159		if (bulk->bk_iovs[i].kiov_page == NULL)
160			continue;
161
162		__free_page(bulk->bk_iovs[i].kiov_page);
163	}
164
165	srpc_client_rpc_decref(crpc->crp_rpc);
166
167	if (crpc->crp_embedded) {
168		/* embedded RPC, don't recycle it */
169		memset(crpc, 0, sizeof(*crpc));
170		crpc->crp_embedded = 1;
171
172	} else {
173		spin_lock(&console_session.ses_rpc_lock);
174
175		list_add(&crpc->crp_link,
176			     &console_session.ses_rpc_freelist);
177
178		spin_unlock(&console_session.ses_rpc_lock);
179	}
180
181	/* RPC is not alive now */
182	atomic_dec(&console_session.ses_rpc_counter);
183}
184
185void
186lstcon_rpc_post(lstcon_rpc_t *crpc)
187{
188	lstcon_rpc_trans_t *trans = crpc->crp_trans;
189
190	LASSERT (trans != NULL);
191
192	atomic_inc(&trans->tas_remaining);
193	crpc->crp_posted = 1;
194
195	sfw_post_rpc(crpc->crp_rpc);
196}
197
198static char *
199lstcon_rpc_trans_name(int transop)
200{
201	if (transop == LST_TRANS_SESNEW)
202		return "SESNEW";
203
204	if (transop == LST_TRANS_SESEND)
205		return "SESEND";
206
207	if (transop == LST_TRANS_SESQRY)
208		return "SESQRY";
209
210	if (transop == LST_TRANS_SESPING)
211		return "SESPING";
212
213	if (transop == LST_TRANS_TSBCLIADD)
214		return "TSBCLIADD";
215
216	if (transop == LST_TRANS_TSBSRVADD)
217		return "TSBSRVADD";
218
219	if (transop == LST_TRANS_TSBRUN)
220		return "TSBRUN";
221
222	if (transop == LST_TRANS_TSBSTOP)
223		return "TSBSTOP";
224
225	if (transop == LST_TRANS_TSBCLIQRY)
226		return "TSBCLIQRY";
227
228	if (transop == LST_TRANS_TSBSRVQRY)
229		return "TSBSRVQRY";
230
231	if (transop == LST_TRANS_STATQRY)
232		return "STATQRY";
233
234	return "Unknown";
235}
236
237int
238lstcon_rpc_trans_prep(struct list_head *translist,
239		      int transop, lstcon_rpc_trans_t **transpp)
240{
241	lstcon_rpc_trans_t *trans;
242
243	if (translist != NULL) {
244		list_for_each_entry(trans, translist, tas_link) {
245			/* Can't enqueue two private transaction on
246			 * the same object */
247			if ((trans->tas_opc & transop) == LST_TRANS_PRIVATE)
248				return -EPERM;
249		}
250	}
251
252	/* create a trans group */
253	LIBCFS_ALLOC(trans, sizeof(*trans));
254	if (trans == NULL)
255		return -ENOMEM;
256
257	trans->tas_opc = transop;
258
259	if (translist == NULL)
260		INIT_LIST_HEAD(&trans->tas_olink);
261	else
262		list_add_tail(&trans->tas_olink, translist);
263
264	list_add_tail(&trans->tas_link, &console_session.ses_trans_list);
265
266	INIT_LIST_HEAD(&trans->tas_rpcs_list);
267	atomic_set(&trans->tas_remaining, 0);
268	init_waitqueue_head(&trans->tas_waitq);
269
270	spin_lock(&console_session.ses_rpc_lock);
271	trans->tas_features = console_session.ses_features;
272	spin_unlock(&console_session.ses_rpc_lock);
273
274	*transpp = trans;
275	return 0;
276}
277
278void
279lstcon_rpc_trans_addreq(lstcon_rpc_trans_t *trans, lstcon_rpc_t *crpc)
280{
281	list_add_tail(&crpc->crp_link, &trans->tas_rpcs_list);
282	crpc->crp_trans = trans;
283}
284
285void
286lstcon_rpc_trans_abort(lstcon_rpc_trans_t *trans, int error)
287{
288	srpc_client_rpc_t *rpc;
289	lstcon_rpc_t      *crpc;
290	lstcon_node_t     *nd;
291
292	list_for_each_entry (crpc, &trans->tas_rpcs_list, crp_link) {
293		rpc = crpc->crp_rpc;
294
295		spin_lock(&rpc->crpc_lock);
296
297		if (!crpc->crp_posted || /* not posted */
298		    crpc->crp_stamp != 0) { /* rpc done or aborted already */
299			if (crpc->crp_stamp == 0) {
300				crpc->crp_stamp = cfs_time_current();
301				crpc->crp_status = -EINTR;
302			}
303			spin_unlock(&rpc->crpc_lock);
304			continue;
305		}
306
307		crpc->crp_stamp  = cfs_time_current();
308		crpc->crp_status = error;
309
310		spin_unlock(&rpc->crpc_lock);
311
312		sfw_abort_rpc(rpc);
313
314		if  (error != ETIMEDOUT)
315			continue;
316
317		nd = crpc->crp_node;
318		if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
319			continue;
320
321		nd->nd_stamp = crpc->crp_stamp;
322		nd->nd_state = LST_NODE_DOWN;
323	}
324}
325
326static int
327lstcon_rpc_trans_check(lstcon_rpc_trans_t *trans)
328{
329	if (console_session.ses_shutdown &&
330	    !list_empty(&trans->tas_olink)) /* Not an end session RPC */
331		return 1;
332
333	return (atomic_read(&trans->tas_remaining) == 0) ? 1: 0;
334}
335
336int
337lstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout)
338{
339	lstcon_rpc_t  *crpc;
340	int	    rc;
341
342	if (list_empty(&trans->tas_rpcs_list))
343		return 0;
344
345	if (timeout < LST_TRANS_MIN_TIMEOUT)
346		timeout = LST_TRANS_MIN_TIMEOUT;
347
348	CDEBUG(D_NET, "Transaction %s started\n",
349	       lstcon_rpc_trans_name(trans->tas_opc));
350
351	/* post all requests */
352	list_for_each_entry (crpc, &trans->tas_rpcs_list, crp_link) {
353		LASSERT (!crpc->crp_posted);
354
355		lstcon_rpc_post(crpc);
356	}
357
358	mutex_unlock(&console_session.ses_mutex);
359
360	rc = wait_event_interruptible_timeout(trans->tas_waitq,
361					      lstcon_rpc_trans_check(trans),
362					      cfs_time_seconds(timeout));
363	rc = (rc > 0) ? 0 : ((rc < 0) ? -EINTR : -ETIMEDOUT);
364
365	mutex_lock(&console_session.ses_mutex);
366
367	if (console_session.ses_shutdown)
368		rc = -ESHUTDOWN;
369
370	if (rc != 0 || atomic_read(&trans->tas_remaining) != 0) {
371		/* treat short timeout as canceled */
372		if (rc == -ETIMEDOUT && timeout < LST_TRANS_MIN_TIMEOUT * 2)
373			rc = -EINTR;
374
375		lstcon_rpc_trans_abort(trans, rc);
376	}
377
378	CDEBUG(D_NET, "Transaction %s stopped: %d\n",
379	       lstcon_rpc_trans_name(trans->tas_opc), rc);
380
381	lstcon_rpc_trans_stat(trans, lstcon_trans_stat());
382
383	return rc;
384}
385
386int
387lstcon_rpc_get_reply(lstcon_rpc_t *crpc, srpc_msg_t **msgpp)
388{
389	lstcon_node_t	*nd  = crpc->crp_node;
390	srpc_client_rpc_t    *rpc = crpc->crp_rpc;
391	srpc_generic_reply_t *rep;
392
393	LASSERT (nd != NULL && rpc != NULL);
394	LASSERT (crpc->crp_stamp != 0);
395
396	if (crpc->crp_status != 0) {
397		*msgpp = NULL;
398		return crpc->crp_status;
399	}
400
401	*msgpp = &rpc->crpc_replymsg;
402	if (!crpc->crp_unpacked) {
403		sfw_unpack_message(*msgpp);
404		crpc->crp_unpacked = 1;
405	}
406
407	if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))
408		return 0;
409
410	nd->nd_stamp = crpc->crp_stamp;
411	rep = &(*msgpp)->msg_body.reply;
412
413	if (rep->sid.ses_nid == LNET_NID_ANY)
414		nd->nd_state = LST_NODE_UNKNOWN;
415	else if (lstcon_session_match(rep->sid))
416		nd->nd_state = LST_NODE_ACTIVE;
417	else
418		nd->nd_state = LST_NODE_BUSY;
419
420	return 0;
421}
422
423void
424lstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, lstcon_trans_stat_t *stat)
425{
426	lstcon_rpc_t      *crpc;
427	srpc_msg_t	*rep;
428	int		error;
429
430	LASSERT (stat != NULL);
431
432	memset(stat, 0, sizeof(*stat));
433
434	list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
435		lstcon_rpc_stat_total(stat, 1);
436
437		LASSERT (crpc->crp_stamp != 0);
438
439		error = lstcon_rpc_get_reply(crpc, &rep);
440		if (error != 0) {
441			lstcon_rpc_stat_failure(stat, 1);
442			if (stat->trs_rpc_errno == 0)
443				stat->trs_rpc_errno = -error;
444
445			continue;
446		}
447
448		lstcon_rpc_stat_success(stat, 1);
449
450		lstcon_rpc_stat_reply(trans, rep, crpc->crp_node, stat);
451	}
452
453	if (trans->tas_opc == LST_TRANS_SESNEW && stat->trs_fwk_errno == 0) {
454		stat->trs_fwk_errno =
455		      lstcon_session_feats_check(trans->tas_features);
456	}
457
458	CDEBUG(D_NET, "transaction %s : success %d, failure %d, total %d, "
459		      "RPC error(%d), Framework error(%d)\n",
460	       lstcon_rpc_trans_name(trans->tas_opc),
461	       lstcon_rpc_stat_success(stat, 0),
462	       lstcon_rpc_stat_failure(stat, 0),
463	       lstcon_rpc_stat_total(stat, 0),
464	       stat->trs_rpc_errno, stat->trs_fwk_errno);
465
466	return;
467}
468
469int
470lstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans,
471			     struct list_head *head_up,
472			     lstcon_rpc_readent_func_t readent)
473{
474	struct list_head	    tmp;
475	struct list_head	   *next;
476	lstcon_rpc_ent_t     *ent;
477	srpc_generic_reply_t *rep;
478	lstcon_rpc_t	 *crpc;
479	srpc_msg_t	   *msg;
480	lstcon_node_t	*nd;
481	cfs_duration_t	dur;
482	struct timeval	tv;
483	int		   error;
484
485	LASSERT (head_up != NULL);
486
487	next = head_up;
488
489	list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {
490		if (copy_from_user(&tmp, next,
491				       sizeof(struct list_head)))
492			return -EFAULT;
493
494		if (tmp.next == head_up)
495			return 0;
496
497		next = tmp.next;
498
499		ent = list_entry(next, lstcon_rpc_ent_t, rpe_link);
500
501		LASSERT (crpc->crp_stamp != 0);
502
503		error = lstcon_rpc_get_reply(crpc, &msg);
504
505		nd = crpc->crp_node;
506
507		dur = (cfs_duration_t)cfs_time_sub(crpc->crp_stamp,
508		      (cfs_time_t)console_session.ses_id.ses_stamp);
509		cfs_duration_usec(dur, &tv);
510
511		if (copy_to_user(&ent->rpe_peer,
512				     &nd->nd_id, sizeof(lnet_process_id_t)) ||
513		    copy_to_user(&ent->rpe_stamp, &tv, sizeof(tv)) ||
514		    copy_to_user(&ent->rpe_state,
515				     &nd->nd_state, sizeof(nd->nd_state)) ||
516		    copy_to_user(&ent->rpe_rpc_errno, &error,
517				     sizeof(error)))
518			return -EFAULT;
519
520		if (error != 0)
521			continue;
522
523		/* RPC is done */
524		rep = (srpc_generic_reply_t *)&msg->msg_body.reply;
525
526		if (copy_to_user(&ent->rpe_sid,
527				     &rep->sid, sizeof(lst_sid_t)) ||
528		    copy_to_user(&ent->rpe_fwk_errno,
529				     &rep->status, sizeof(rep->status)))
530			return -EFAULT;
531
532		if (readent == NULL)
533			continue;
534
535		if ((error = readent(trans->tas_opc, msg, ent)) != 0)
536			return error;
537	}
538
539	return 0;
540}
541
542void
543lstcon_rpc_trans_destroy(lstcon_rpc_trans_t *trans)
544{
545	srpc_client_rpc_t *rpc;
546	lstcon_rpc_t      *crpc;
547	lstcon_rpc_t      *tmp;
548	int		count = 0;
549
550	list_for_each_entry_safe(crpc, tmp, &trans->tas_rpcs_list,
551				 crp_link) {
552		rpc = crpc->crp_rpc;
553
554		spin_lock(&rpc->crpc_lock);
555
556		/* free it if not posted or finished already */
557		if (!crpc->crp_posted || crpc->crp_finished) {
558			spin_unlock(&rpc->crpc_lock);
559
560			list_del_init(&crpc->crp_link);
561			lstcon_rpc_put(crpc);
562
563			continue;
564		}
565
566		/* rpcs can be still not callbacked (even LNetMDUnlink is called)
567		 * because huge timeout for inaccessible network, don't make
568		 * user wait for them, just abandon them, they will be recycled
569		 * in callback */
570
571		LASSERT (crpc->crp_status != 0);
572
573		crpc->crp_node  = NULL;
574		crpc->crp_trans = NULL;
575		list_del_init(&crpc->crp_link);
576		count ++;
577
578		spin_unlock(&rpc->crpc_lock);
579
580		atomic_dec(&trans->tas_remaining);
581	}
582
583	LASSERT (atomic_read(&trans->tas_remaining) == 0);
584
585	list_del(&trans->tas_link);
586	if (!list_empty(&trans->tas_olink))
587		list_del(&trans->tas_olink);
588
589	CDEBUG(D_NET, "Transaction %s destroyed with %d pending RPCs\n",
590	       lstcon_rpc_trans_name(trans->tas_opc), count);
591
592	LIBCFS_FREE(trans, sizeof(*trans));
593
594	return;
595}
596
597int
598lstcon_sesrpc_prep(lstcon_node_t *nd, int transop,
599		   unsigned feats, lstcon_rpc_t **crpc)
600{
601	srpc_mksn_reqst_t *msrq;
602	srpc_rmsn_reqst_t *rsrq;
603	int		rc;
604
605	switch (transop) {
606	case LST_TRANS_SESNEW:
607		rc = lstcon_rpc_prep(nd, SRPC_SERVICE_MAKE_SESSION,
608				     feats, 0, 0, crpc);
609		if (rc != 0)
610			return rc;
611
612		msrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.mksn_reqst;
613		msrq->mksn_sid     = console_session.ses_id;
614		msrq->mksn_force   = console_session.ses_force;
615		strncpy(msrq->mksn_name, console_session.ses_name,
616			strlen(console_session.ses_name));
617		break;
618
619	case LST_TRANS_SESEND:
620		rc = lstcon_rpc_prep(nd, SRPC_SERVICE_REMOVE_SESSION,
621				     feats, 0, 0, crpc);
622		if (rc != 0)
623			return rc;
624
625		rsrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.rmsn_reqst;
626		rsrq->rmsn_sid = console_session.ses_id;
627		break;
628
629	default:
630		LBUG();
631	}
632
633	return 0;
634}
635
636int
637lstcon_dbgrpc_prep(lstcon_node_t *nd, unsigned feats, lstcon_rpc_t **crpc)
638{
639	srpc_debug_reqst_t *drq;
640	int		    rc;
641
642	rc = lstcon_rpc_prep(nd, SRPC_SERVICE_DEBUG, feats, 0, 0, crpc);
643	if (rc != 0)
644		return rc;
645
646	drq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
647
648	drq->dbg_sid   = console_session.ses_id;
649	drq->dbg_flags = 0;
650
651	return rc;
652}
653
654int
655lstcon_batrpc_prep(lstcon_node_t *nd, int transop, unsigned feats,
656		   lstcon_tsb_hdr_t *tsb, lstcon_rpc_t **crpc)
657{
658	lstcon_batch_t	   *batch;
659	srpc_batch_reqst_t *brq;
660	int		    rc;
661
662	rc = lstcon_rpc_prep(nd, SRPC_SERVICE_BATCH, feats, 0, 0, crpc);
663	if (rc != 0)
664		return rc;
665
666	brq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.bat_reqst;
667
668	brq->bar_sid     = console_session.ses_id;
669	brq->bar_bid     = tsb->tsb_id;
670	brq->bar_testidx = tsb->tsb_index;
671	brq->bar_opc     = transop == LST_TRANS_TSBRUN ? SRPC_BATCH_OPC_RUN :
672			   (transop == LST_TRANS_TSBSTOP ? SRPC_BATCH_OPC_STOP:
673			    SRPC_BATCH_OPC_QUERY);
674
675	if (transop != LST_TRANS_TSBRUN &&
676	    transop != LST_TRANS_TSBSTOP)
677		return 0;
678
679	LASSERT (tsb->tsb_index == 0);
680
681	batch = (lstcon_batch_t *)tsb;
682	brq->bar_arg = batch->bat_arg;
683
684	return 0;
685}
686
687int
688lstcon_statrpc_prep(lstcon_node_t *nd, unsigned feats, lstcon_rpc_t **crpc)
689{
690	srpc_stat_reqst_t *srq;
691	int		   rc;
692
693	rc = lstcon_rpc_prep(nd, SRPC_SERVICE_QUERY_STAT, feats, 0, 0, crpc);
694	if (rc != 0)
695		return rc;
696
697	srq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.stat_reqst;
698
699	srq->str_sid  = console_session.ses_id;
700	srq->str_type = 0; /* XXX remove it */
701
702	return 0;
703}
704
705lnet_process_id_packed_t *
706lstcon_next_id(int idx, int nkiov, lnet_kiov_t *kiov)
707{
708	lnet_process_id_packed_t *pid;
709	int		       i;
710
711	i = idx / SFW_ID_PER_PAGE;
712
713	LASSERT (i < nkiov);
714
715	pid = (lnet_process_id_packed_t *)page_address(kiov[i].kiov_page);
716
717	return &pid[idx % SFW_ID_PER_PAGE];
718}
719
720int
721lstcon_dstnodes_prep(lstcon_group_t *grp, int idx,
722		     int dist, int span, int nkiov, lnet_kiov_t *kiov)
723{
724	lnet_process_id_packed_t *pid;
725	lstcon_ndlink_t	  *ndl;
726	lstcon_node_t	    *nd;
727	int		       start;
728	int		       end;
729	int		       i = 0;
730
731	LASSERT (dist >= 1);
732	LASSERT (span >= 1);
733	LASSERT (grp->grp_nnode >= 1);
734
735	if (span > grp->grp_nnode)
736		return -EINVAL;
737
738	start = ((idx / dist) * span) % grp->grp_nnode;
739	end   = ((idx / dist) * span + span - 1) % grp->grp_nnode;
740
741	list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
742		nd = ndl->ndl_node;
743		if (i < start) {
744			i ++;
745			continue;
746		}
747
748		if (i > (end >= start ? end: grp->grp_nnode))
749			break;
750
751		pid = lstcon_next_id((i - start), nkiov, kiov);
752		pid->nid = nd->nd_id.nid;
753		pid->pid = nd->nd_id.pid;
754		i++;
755	}
756
757	if (start <= end) /* done */
758		return 0;
759
760	list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) {
761		if (i > grp->grp_nnode + end)
762			break;
763
764		nd = ndl->ndl_node;
765		pid = lstcon_next_id((i - start), nkiov, kiov);
766		pid->nid = nd->nd_id.nid;
767		pid->pid = nd->nd_id.pid;
768		i++;
769	}
770
771	return 0;
772}
773
774int
775lstcon_pingrpc_prep(lst_test_ping_param_t *param, srpc_test_reqst_t *req)
776{
777	test_ping_req_t *prq = &req->tsr_u.ping;
778
779	prq->png_size   = param->png_size;
780	prq->png_flags  = param->png_flags;
781	/* TODO dest */
782	return 0;
783}
784
785int
786lstcon_bulkrpc_v0_prep(lst_test_bulk_param_t *param, srpc_test_reqst_t *req)
787{
788	test_bulk_req_t *brq = &req->tsr_u.bulk_v0;
789
790	brq->blk_opc    = param->blk_opc;
791	brq->blk_npg    = (param->blk_size + PAGE_CACHE_SIZE - 1) / PAGE_CACHE_SIZE;
792	brq->blk_flags  = param->blk_flags;
793
794	return 0;
795}
796
797int
798lstcon_bulkrpc_v1_prep(lst_test_bulk_param_t *param, srpc_test_reqst_t *req)
799{
800	test_bulk_req_v1_t *brq = &req->tsr_u.bulk_v1;
801
802	brq->blk_opc	= param->blk_opc;
803	brq->blk_flags	= param->blk_flags;
804	brq->blk_len	= param->blk_size;
805	brq->blk_offset	= 0; /* reserved */
806
807	return 0;
808}
809
810int
811lstcon_testrpc_prep(lstcon_node_t *nd, int transop, unsigned feats,
812		    lstcon_test_t *test, lstcon_rpc_t **crpc)
813{
814	lstcon_group_t    *sgrp = test->tes_src_grp;
815	lstcon_group_t    *dgrp = test->tes_dst_grp;
816	srpc_test_reqst_t *trq;
817	srpc_bulk_t       *bulk;
818	int		i;
819	int		   npg = 0;
820	int		   nob = 0;
821	int		   rc  = 0;
822
823	if (transop == LST_TRANS_TSBCLIADD) {
824		npg = sfw_id_pages(test->tes_span);
825		nob = (feats & LST_FEAT_BULK_LEN) == 0 ?
826		      npg * PAGE_CACHE_SIZE :
827		      sizeof(lnet_process_id_packed_t) * test->tes_span;
828	}
829
830	rc = lstcon_rpc_prep(nd, SRPC_SERVICE_TEST, feats, npg, nob, crpc);
831	if (rc != 0)
832		return rc;
833
834	trq  = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.tes_reqst;
835
836	if (transop == LST_TRANS_TSBSRVADD) {
837		int ndist = (sgrp->grp_nnode + test->tes_dist - 1) / test->tes_dist;
838		int nspan = (dgrp->grp_nnode + test->tes_span - 1) / test->tes_span;
839		int nmax = (ndist + nspan - 1) / nspan;
840
841		trq->tsr_ndest = 0;
842		trq->tsr_loop  = nmax * test->tes_dist * test->tes_concur;
843
844	} else {
845		bulk = &(*crpc)->crp_rpc->crpc_bulk;
846
847		for (i = 0; i < npg; i++) {
848			int	len;
849
850			LASSERT(nob > 0);
851
852			len = (feats & LST_FEAT_BULK_LEN) == 0 ?
853			      PAGE_CACHE_SIZE : min_t(int, nob, PAGE_CACHE_SIZE);
854			nob -= len;
855
856			bulk->bk_iovs[i].kiov_offset = 0;
857			bulk->bk_iovs[i].kiov_len    = len;
858			bulk->bk_iovs[i].kiov_page   =
859				alloc_page(GFP_IOFS);
860
861			if (bulk->bk_iovs[i].kiov_page == NULL) {
862				lstcon_rpc_put(*crpc);
863				return -ENOMEM;
864			}
865		}
866
867		bulk->bk_sink = 0;
868
869		LASSERT (transop == LST_TRANS_TSBCLIADD);
870
871		rc = lstcon_dstnodes_prep(test->tes_dst_grp,
872					  test->tes_cliidx++,
873					  test->tes_dist,
874					  test->tes_span,
875					  npg, &bulk->bk_iovs[0]);
876		if (rc != 0) {
877			lstcon_rpc_put(*crpc);
878			return rc;
879		}
880
881		trq->tsr_ndest = test->tes_span;
882		trq->tsr_loop  = test->tes_loop;
883	}
884
885	trq->tsr_sid	= console_session.ses_id;
886	trq->tsr_bid	= test->tes_hdr.tsb_id;
887	trq->tsr_concur     = test->tes_concur;
888	trq->tsr_is_client  = (transop == LST_TRANS_TSBCLIADD) ? 1 : 0;
889	trq->tsr_stop_onerr = !!test->tes_stop_onerr;
890
891	switch (test->tes_type) {
892	case LST_TEST_PING:
893		trq->tsr_service = SRPC_SERVICE_PING;
894		rc = lstcon_pingrpc_prep((lst_test_ping_param_t *)
895					 &test->tes_param[0], trq);
896		break;
897
898	case LST_TEST_BULK:
899		trq->tsr_service = SRPC_SERVICE_BRW;
900		if ((feats & LST_FEAT_BULK_LEN) == 0) {
901			rc = lstcon_bulkrpc_v0_prep((lst_test_bulk_param_t *)
902						    &test->tes_param[0], trq);
903		} else {
904			rc = lstcon_bulkrpc_v1_prep((lst_test_bulk_param_t *)
905						    &test->tes_param[0], trq);
906		}
907
908		break;
909	default:
910		LBUG();
911		break;
912	}
913
914	return rc;
915}
916
917int
918lstcon_sesnew_stat_reply(lstcon_rpc_trans_t *trans,
919			 lstcon_node_t *nd, srpc_msg_t *reply)
920{
921	srpc_mksn_reply_t *mksn_rep = &reply->msg_body.mksn_reply;
922	int		   status   = mksn_rep->mksn_status;
923
924	if (status == 0 &&
925	    (reply->msg_ses_feats & ~LST_FEATS_MASK) != 0) {
926		mksn_rep->mksn_status = EPROTO;
927		status = EPROTO;
928	}
929
930	if (status == EPROTO) {
931		CNETERR("session protocol error from %s: %u\n",
932			libcfs_nid2str(nd->nd_id.nid),
933			reply->msg_ses_feats);
934	}
935
936	if (status != 0)
937		return status;
938
939	if (!trans->tas_feats_updated) {
940		trans->tas_feats_updated = 1;
941		trans->tas_features = reply->msg_ses_feats;
942	}
943
944	if (reply->msg_ses_feats != trans->tas_features) {
945		CNETERR("Framework features %x from %s is different with "
946			"features on this transaction: %x\n",
947			 reply->msg_ses_feats, libcfs_nid2str(nd->nd_id.nid),
948			 trans->tas_features);
949		status = mksn_rep->mksn_status = EPROTO;
950	}
951
952	if (status == 0) {
953		/* session timeout on remote node */
954		nd->nd_timeout = mksn_rep->mksn_timeout;
955	}
956
957	return status;
958}
959
960void
961lstcon_rpc_stat_reply(lstcon_rpc_trans_t *trans, srpc_msg_t *msg,
962		      lstcon_node_t *nd, lstcon_trans_stat_t *stat)
963{
964	srpc_rmsn_reply_t  *rmsn_rep;
965	srpc_debug_reply_t *dbg_rep;
966	srpc_batch_reply_t *bat_rep;
967	srpc_test_reply_t  *test_rep;
968	srpc_stat_reply_t  *stat_rep;
969	int		 rc = 0;
970
971	switch (trans->tas_opc) {
972	case LST_TRANS_SESNEW:
973		rc = lstcon_sesnew_stat_reply(trans, nd, msg);
974		if (rc == 0) {
975			lstcon_sesop_stat_success(stat, 1);
976			return;
977		}
978
979		lstcon_sesop_stat_failure(stat, 1);
980		break;
981
982	case LST_TRANS_SESEND:
983		rmsn_rep = &msg->msg_body.rmsn_reply;
984		/* ESRCH is not an error for end session */
985		if (rmsn_rep->rmsn_status == 0 ||
986		    rmsn_rep->rmsn_status == ESRCH) {
987			lstcon_sesop_stat_success(stat, 1);
988			return;
989		}
990
991		lstcon_sesop_stat_failure(stat, 1);
992		rc = rmsn_rep->rmsn_status;
993		break;
994
995	case LST_TRANS_SESQRY:
996	case LST_TRANS_SESPING:
997		dbg_rep = &msg->msg_body.dbg_reply;
998
999		if (dbg_rep->dbg_status == ESRCH) {
1000			lstcon_sesqry_stat_unknown(stat, 1);
1001			return;
1002		}
1003
1004		if (lstcon_session_match(dbg_rep->dbg_sid))
1005			lstcon_sesqry_stat_active(stat, 1);
1006		else
1007			lstcon_sesqry_stat_busy(stat, 1);
1008		return;
1009
1010	case LST_TRANS_TSBRUN:
1011	case LST_TRANS_TSBSTOP:
1012		bat_rep = &msg->msg_body.bat_reply;
1013
1014		if (bat_rep->bar_status == 0) {
1015			lstcon_tsbop_stat_success(stat, 1);
1016			return;
1017		}
1018
1019		if (bat_rep->bar_status == EPERM &&
1020		    trans->tas_opc == LST_TRANS_TSBSTOP) {
1021			lstcon_tsbop_stat_success(stat, 1);
1022			return;
1023		}
1024
1025		lstcon_tsbop_stat_failure(stat, 1);
1026		rc = bat_rep->bar_status;
1027		break;
1028
1029	case LST_TRANS_TSBCLIQRY:
1030	case LST_TRANS_TSBSRVQRY:
1031		bat_rep = &msg->msg_body.bat_reply;
1032
1033		if (bat_rep->bar_active != 0)
1034			lstcon_tsbqry_stat_run(stat, 1);
1035		else
1036			lstcon_tsbqry_stat_idle(stat, 1);
1037
1038		if (bat_rep->bar_status == 0)
1039			return;
1040
1041		lstcon_tsbqry_stat_failure(stat, 1);
1042		rc = bat_rep->bar_status;
1043		break;
1044
1045	case LST_TRANS_TSBCLIADD:
1046	case LST_TRANS_TSBSRVADD:
1047		test_rep = &msg->msg_body.tes_reply;
1048
1049		if (test_rep->tsr_status == 0) {
1050			lstcon_tsbop_stat_success(stat, 1);
1051			return;
1052		}
1053
1054		lstcon_tsbop_stat_failure(stat, 1);
1055		rc = test_rep->tsr_status;
1056		break;
1057
1058	case LST_TRANS_STATQRY:
1059		stat_rep = &msg->msg_body.stat_reply;
1060
1061		if (stat_rep->str_status == 0) {
1062			lstcon_statqry_stat_success(stat, 1);
1063			return;
1064		}
1065
1066		lstcon_statqry_stat_failure(stat, 1);
1067		rc = stat_rep->str_status;
1068		break;
1069
1070	default:
1071		LBUG();
1072	}
1073
1074	if (stat->trs_fwk_errno == 0)
1075		stat->trs_fwk_errno = rc;
1076
1077	return;
1078}
1079
1080int
1081lstcon_rpc_trans_ndlist(struct list_head *ndlist,
1082			struct list_head *translist, int transop,
1083			void *arg, lstcon_rpc_cond_func_t condition,
1084			lstcon_rpc_trans_t **transpp)
1085{
1086	lstcon_rpc_trans_t *trans;
1087	lstcon_ndlink_t    *ndl;
1088	lstcon_node_t      *nd;
1089	lstcon_rpc_t       *rpc;
1090	unsigned	    feats;
1091	int		 rc;
1092
1093	/* Creating session RPG for list of nodes */
1094
1095	rc = lstcon_rpc_trans_prep(translist, transop, &trans);
1096	if (rc != 0) {
1097		CERROR("Can't create transaction %d: %d\n", transop, rc);
1098		return rc;
1099	}
1100
1101	feats = trans->tas_features;
1102	list_for_each_entry(ndl, ndlist, ndl_link) {
1103		rc = condition == NULL ? 1 :
1104		     condition(transop, ndl->ndl_node, arg);
1105
1106		if (rc == 0)
1107			continue;
1108
1109		if (rc < 0) {
1110			CDEBUG(D_NET, "Condition error while creating RPC "
1111				      " for transaction %d: %d\n", transop, rc);
1112			break;
1113		}
1114
1115		nd = ndl->ndl_node;
1116
1117		switch (transop) {
1118		case LST_TRANS_SESNEW:
1119		case LST_TRANS_SESEND:
1120			rc = lstcon_sesrpc_prep(nd, transop, feats, &rpc);
1121			break;
1122		case LST_TRANS_SESQRY:
1123		case LST_TRANS_SESPING:
1124			rc = lstcon_dbgrpc_prep(nd, feats, &rpc);
1125			break;
1126		case LST_TRANS_TSBCLIADD:
1127		case LST_TRANS_TSBSRVADD:
1128			rc = lstcon_testrpc_prep(nd, transop, feats,
1129						 (lstcon_test_t *)arg, &rpc);
1130			break;
1131		case LST_TRANS_TSBRUN:
1132		case LST_TRANS_TSBSTOP:
1133		case LST_TRANS_TSBCLIQRY:
1134		case LST_TRANS_TSBSRVQRY:
1135			rc = lstcon_batrpc_prep(nd, transop, feats,
1136						(lstcon_tsb_hdr_t *)arg, &rpc);
1137			break;
1138		case LST_TRANS_STATQRY:
1139			rc = lstcon_statrpc_prep(nd, feats, &rpc);
1140			break;
1141		default:
1142			rc = -EINVAL;
1143			break;
1144		}
1145
1146		if (rc != 0) {
1147			CERROR("Failed to create RPC for transaction %s: %d\n",
1148			       lstcon_rpc_trans_name(transop), rc);
1149			break;
1150		}
1151
1152		lstcon_rpc_trans_addreq(trans, rpc);
1153	}
1154
1155	if (rc == 0) {
1156		*transpp = trans;
1157		return 0;
1158	}
1159
1160	lstcon_rpc_trans_destroy(trans);
1161
1162	return rc;
1163}
1164
1165void
1166lstcon_rpc_pinger(void *arg)
1167{
1168	stt_timer_t	*ptimer = (stt_timer_t *)arg;
1169	lstcon_rpc_trans_t *trans;
1170	lstcon_rpc_t       *crpc;
1171	srpc_msg_t	 *rep;
1172	srpc_debug_reqst_t *drq;
1173	lstcon_ndlink_t    *ndl;
1174	lstcon_node_t      *nd;
1175	time_t	      intv;
1176	int		 count = 0;
1177	int		 rc;
1178
1179	/* RPC pinger is a special case of transaction,
1180	 * it's called by timer at 8 seconds interval.
1181	 */
1182	mutex_lock(&console_session.ses_mutex);
1183
1184	if (console_session.ses_shutdown || console_session.ses_expired) {
1185		mutex_unlock(&console_session.ses_mutex);
1186		return;
1187	}
1188
1189	if (!console_session.ses_expired &&
1190	    cfs_time_current_sec() - console_session.ses_laststamp >
1191	    (time_t)console_session.ses_timeout)
1192		console_session.ses_expired = 1;
1193
1194	trans = console_session.ses_ping;
1195
1196	LASSERT (trans != NULL);
1197
1198	list_for_each_entry(ndl, &console_session.ses_ndl_list, ndl_link) {
1199		nd = ndl->ndl_node;
1200
1201		if (console_session.ses_expired) {
1202			/* idle console, end session on all nodes */
1203			if (nd->nd_state != LST_NODE_ACTIVE)
1204				continue;
1205
1206			rc = lstcon_sesrpc_prep(nd, LST_TRANS_SESEND,
1207						trans->tas_features, &crpc);
1208			if (rc != 0) {
1209				CERROR("Out of memory\n");
1210				break;
1211			}
1212
1213			lstcon_rpc_trans_addreq(trans, crpc);
1214			lstcon_rpc_post(crpc);
1215
1216			continue;
1217		}
1218
1219		crpc = &nd->nd_ping;
1220
1221		if (crpc->crp_rpc != NULL) {
1222			LASSERT (crpc->crp_trans == trans);
1223			LASSERT (!list_empty(&crpc->crp_link));
1224
1225			spin_lock(&crpc->crp_rpc->crpc_lock);
1226
1227			LASSERT(crpc->crp_posted);
1228
1229			if (!crpc->crp_finished) {
1230				/* in flight */
1231				spin_unlock(&crpc->crp_rpc->crpc_lock);
1232				continue;
1233			}
1234
1235			spin_unlock(&crpc->crp_rpc->crpc_lock);
1236
1237			lstcon_rpc_get_reply(crpc, &rep);
1238
1239			list_del_init(&crpc->crp_link);
1240
1241			lstcon_rpc_put(crpc);
1242		}
1243
1244		if (nd->nd_state != LST_NODE_ACTIVE)
1245			continue;
1246
1247		intv = cfs_duration_sec(cfs_time_sub(cfs_time_current(),
1248						     nd->nd_stamp));
1249		if (intv < (time_t)nd->nd_timeout / 2)
1250			continue;
1251
1252		rc = lstcon_rpc_init(nd, SRPC_SERVICE_DEBUG,
1253				     trans->tas_features, 0, 0, 1, crpc);
1254		if (rc != 0) {
1255			CERROR("Out of memory\n");
1256			break;
1257		}
1258
1259		drq = &crpc->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst;
1260
1261		drq->dbg_sid   = console_session.ses_id;
1262		drq->dbg_flags = 0;
1263
1264		lstcon_rpc_trans_addreq(trans, crpc);
1265		lstcon_rpc_post(crpc);
1266
1267		count ++;
1268	}
1269
1270	if (console_session.ses_expired) {
1271		mutex_unlock(&console_session.ses_mutex);
1272		return;
1273	}
1274
1275	CDEBUG(D_NET, "Ping %d nodes in session\n", count);
1276
1277	ptimer->stt_expires = (cfs_time_t)(cfs_time_current_sec() + LST_PING_INTERVAL);
1278	stt_add_timer(ptimer);
1279
1280	mutex_unlock(&console_session.ses_mutex);
1281}
1282
1283int
1284lstcon_rpc_pinger_start(void)
1285{
1286	stt_timer_t    *ptimer;
1287	int	     rc;
1288
1289	LASSERT (list_empty(&console_session.ses_rpc_freelist));
1290	LASSERT (atomic_read(&console_session.ses_rpc_counter) == 0);
1291
1292	rc = lstcon_rpc_trans_prep(NULL, LST_TRANS_SESPING,
1293				   &console_session.ses_ping);
1294	if (rc != 0) {
1295		CERROR("Failed to create console pinger\n");
1296		return rc;
1297	}
1298
1299	ptimer = &console_session.ses_ping_timer;
1300	ptimer->stt_expires = (cfs_time_t)(cfs_time_current_sec() + LST_PING_INTERVAL);
1301
1302	stt_add_timer(ptimer);
1303
1304	return 0;
1305}
1306
1307void
1308lstcon_rpc_pinger_stop(void)
1309{
1310	LASSERT (console_session.ses_shutdown);
1311
1312	stt_del_timer(&console_session.ses_ping_timer);
1313
1314	lstcon_rpc_trans_abort(console_session.ses_ping, -ESHUTDOWN);
1315	lstcon_rpc_trans_stat(console_session.ses_ping, lstcon_trans_stat());
1316	lstcon_rpc_trans_destroy(console_session.ses_ping);
1317
1318	memset(lstcon_trans_stat(), 0, sizeof(lstcon_trans_stat_t));
1319
1320	console_session.ses_ping = NULL;
1321}
1322
1323void
1324lstcon_rpc_cleanup_wait(void)
1325{
1326	lstcon_rpc_trans_t *trans;
1327	lstcon_rpc_t       *crpc;
1328	struct list_head	 *pacer;
1329	struct list_head	  zlist;
1330
1331	/* Called with hold of global mutex */
1332
1333	LASSERT (console_session.ses_shutdown);
1334
1335	while (!list_empty(&console_session.ses_trans_list)) {
1336		list_for_each(pacer, &console_session.ses_trans_list) {
1337			trans = list_entry(pacer, lstcon_rpc_trans_t,
1338					       tas_link);
1339
1340			CDEBUG(D_NET, "Session closed, wakeup transaction %s\n",
1341			       lstcon_rpc_trans_name(trans->tas_opc));
1342
1343			wake_up(&trans->tas_waitq);
1344		}
1345
1346		mutex_unlock(&console_session.ses_mutex);
1347
1348		CWARN("Session is shutting down, "
1349		      "waiting for termination of transactions\n");
1350		cfs_pause(cfs_time_seconds(1));
1351
1352		mutex_lock(&console_session.ses_mutex);
1353	}
1354
1355	spin_lock(&console_session.ses_rpc_lock);
1356
1357	lst_wait_until((atomic_read(&console_session.ses_rpc_counter) == 0),
1358		       console_session.ses_rpc_lock,
1359		       "Network is not accessable or target is down, "
1360		       "waiting for %d console RPCs to being recycled\n",
1361		       atomic_read(&console_session.ses_rpc_counter));
1362
1363	list_add(&zlist, &console_session.ses_rpc_freelist);
1364	list_del_init(&console_session.ses_rpc_freelist);
1365
1366	spin_unlock(&console_session.ses_rpc_lock);
1367
1368	while (!list_empty(&zlist)) {
1369		crpc = list_entry(zlist.next, lstcon_rpc_t, crp_link);
1370
1371		list_del(&crpc->crp_link);
1372		LIBCFS_FREE(crpc, sizeof(lstcon_rpc_t));
1373	}
1374}
1375
1376int
1377lstcon_rpc_module_init(void)
1378{
1379	INIT_LIST_HEAD(&console_session.ses_ping_timer.stt_list);
1380	console_session.ses_ping_timer.stt_func = lstcon_rpc_pinger;
1381	console_session.ses_ping_timer.stt_data = &console_session.ses_ping_timer;
1382
1383	console_session.ses_ping = NULL;
1384
1385	spin_lock_init(&console_session.ses_rpc_lock);
1386	atomic_set(&console_session.ses_rpc_counter, 0);
1387	INIT_LIST_HEAD(&console_session.ses_rpc_freelist);
1388
1389	return 0;
1390}
1391
1392void
1393lstcon_rpc_module_fini(void)
1394{
1395	LASSERT (list_empty(&console_session.ses_rpc_freelist));
1396	LASSERT (atomic_read(&console_session.ses_rpc_counter) == 0);
1397}
1398