lib-msg.c revision 2b2843264260d4d6c4afd0fecf6082736ff86b78
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lnet/lnet/lib-msg.c
37 *
38 * Message decoding, parsing and finalizing routines
39 */
40
41#define DEBUG_SUBSYSTEM S_LNET
42
43#include <linux/lnet/lib-lnet.h>
44
45void
46lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
47{
48	memset(ev, 0, sizeof(*ev));
49
50	ev->status   = 0;
51	ev->unlinked = 1;
52	ev->type     = LNET_EVENT_UNLINK;
53	lnet_md_deconstruct(md, &ev->md);
54	lnet_md2handle(&ev->md_handle, md);
55}
56
57/*
58 * Don't need any lock, must be called after lnet_commit_md
59 */
60void
61lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
62{
63	lnet_hdr_t	*hdr = &msg->msg_hdr;
64	lnet_event_t	*ev  = &msg->msg_ev;
65
66	LASSERT(!msg->msg_routing);
67
68	ev->type = ev_type;
69
70	if (ev_type == LNET_EVENT_SEND) {
71		/* event for active message */
72		ev->target.nid    = le64_to_cpu(hdr->dest_nid);
73		ev->target.pid    = le32_to_cpu(hdr->dest_pid);
74		ev->initiator.nid = LNET_NID_ANY;
75		ev->initiator.pid = the_lnet.ln_pid;
76		ev->sender	  = LNET_NID_ANY;
77
78	} else {
79		/* event for passive message */
80		ev->target.pid    = hdr->dest_pid;
81		ev->target.nid    = hdr->dest_nid;
82		ev->initiator.pid = hdr->src_pid;
83		ev->initiator.nid = hdr->src_nid;
84		ev->rlength       = hdr->payload_length;
85		ev->sender	  = msg->msg_from;
86		ev->mlength	  = msg->msg_wanted;
87		ev->offset	  = msg->msg_offset;
88	}
89
90	switch (ev_type) {
91	default:
92		LBUG();
93
94	case LNET_EVENT_PUT: /* passive PUT */
95		ev->pt_index   = hdr->msg.put.ptl_index;
96		ev->match_bits = hdr->msg.put.match_bits;
97		ev->hdr_data   = hdr->msg.put.hdr_data;
98		return;
99
100	case LNET_EVENT_GET: /* passive GET */
101		ev->pt_index   = hdr->msg.get.ptl_index;
102		ev->match_bits = hdr->msg.get.match_bits;
103		ev->hdr_data   = 0;
104		return;
105
106	case LNET_EVENT_ACK: /* ACK */
107		ev->match_bits = hdr->msg.ack.match_bits;
108		ev->mlength    = hdr->msg.ack.mlength;
109		return;
110
111	case LNET_EVENT_REPLY: /* REPLY */
112		return;
113
114	case LNET_EVENT_SEND: /* active message */
115		if (msg->msg_type == LNET_MSG_PUT) {
116			ev->pt_index   = le32_to_cpu(hdr->msg.put.ptl_index);
117			ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
118			ev->offset     = le32_to_cpu(hdr->msg.put.offset);
119			ev->mlength    =
120			ev->rlength    = le32_to_cpu(hdr->payload_length);
121			ev->hdr_data   = le64_to_cpu(hdr->msg.put.hdr_data);
122
123		} else {
124			LASSERT(msg->msg_type == LNET_MSG_GET);
125			ev->pt_index   = le32_to_cpu(hdr->msg.get.ptl_index);
126			ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
127			ev->mlength    =
128			ev->rlength    = le32_to_cpu(hdr->msg.get.sink_length);
129			ev->offset     = le32_to_cpu(hdr->msg.get.src_offset);
130			ev->hdr_data   = 0;
131		}
132		return;
133	}
134}
135
136void
137lnet_msg_commit(lnet_msg_t *msg, int cpt)
138{
139	struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
140	lnet_counters_t		  *counters  = the_lnet.ln_counters[cpt];
141
142	/* routed message can be committed for both receiving and sending */
143	LASSERT(!msg->msg_tx_committed);
144
145	if (msg->msg_sending) {
146		LASSERT(!msg->msg_receiving);
147
148		msg->msg_tx_cpt = cpt;
149		msg->msg_tx_committed = 1;
150		if (msg->msg_rx_committed) { /* routed message REPLY */
151			LASSERT(msg->msg_onactivelist);
152			return;
153		}
154	} else {
155		LASSERT(!msg->msg_sending);
156		msg->msg_rx_cpt = cpt;
157		msg->msg_rx_committed = 1;
158	}
159
160	LASSERT(!msg->msg_onactivelist);
161	msg->msg_onactivelist = 1;
162	list_add(&msg->msg_activelist, &container->msc_active);
163
164	counters->msgs_alloc++;
165	if (counters->msgs_alloc > counters->msgs_max)
166		counters->msgs_max = counters->msgs_alloc;
167}
168
169static void
170lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
171{
172	lnet_counters_t	*counters;
173	lnet_event_t	*ev = &msg->msg_ev;
174
175	LASSERT(msg->msg_tx_committed);
176	if (status != 0)
177		goto out;
178
179	counters = the_lnet.ln_counters[msg->msg_tx_cpt];
180	switch (ev->type) {
181	default: /* routed message */
182		LASSERT(msg->msg_routing);
183		LASSERT(msg->msg_rx_committed);
184		LASSERT(ev->type == 0);
185
186		counters->route_length += msg->msg_len;
187		counters->route_count++;
188		goto out;
189
190	case LNET_EVENT_PUT:
191		/* should have been decommitted */
192		LASSERT(!msg->msg_rx_committed);
193		/* overwritten while sending ACK */
194		LASSERT(msg->msg_type == LNET_MSG_ACK);
195		msg->msg_type = LNET_MSG_PUT; /* fix type */
196		break;
197
198	case LNET_EVENT_SEND:
199		LASSERT(!msg->msg_rx_committed);
200		if (msg->msg_type == LNET_MSG_PUT)
201			counters->send_length += msg->msg_len;
202		break;
203
204	case LNET_EVENT_GET:
205		LASSERT(msg->msg_rx_committed);
206		/* overwritten while sending reply, we should never be
207		 * here for optimized GET */
208		LASSERT(msg->msg_type == LNET_MSG_REPLY);
209		msg->msg_type = LNET_MSG_GET; /* fix type */
210		break;
211	}
212
213	counters->send_count++;
214 out:
215	lnet_return_tx_credits_locked(msg);
216	msg->msg_tx_committed = 0;
217}
218
219static void
220lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
221{
222	lnet_counters_t	*counters;
223	lnet_event_t	*ev = &msg->msg_ev;
224
225	LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */
226	LASSERT(msg->msg_rx_committed);
227
228	if (status != 0)
229		goto out;
230
231	counters = the_lnet.ln_counters[msg->msg_rx_cpt];
232	switch (ev->type) {
233	default:
234		LASSERT(ev->type == 0);
235		LASSERT(msg->msg_routing);
236		goto out;
237
238	case LNET_EVENT_ACK:
239		LASSERT(msg->msg_type == LNET_MSG_ACK);
240		break;
241
242	case LNET_EVENT_GET:
243		/* type is "REPLY" if it's an optimized GET on passive side,
244		 * because optimized GET will never be committed for sending,
245		 * so message type wouldn't be changed back to "GET" by
246		 * lnet_msg_decommit_tx(), see details in lnet_parse_get() */
247		LASSERT(msg->msg_type == LNET_MSG_REPLY ||
248			msg->msg_type == LNET_MSG_GET);
249		counters->send_length += msg->msg_wanted;
250		break;
251
252	case LNET_EVENT_PUT:
253		LASSERT(msg->msg_type == LNET_MSG_PUT);
254		break;
255
256	case LNET_EVENT_REPLY:
257		/* type is "GET" if it's an optimized GET on active side,
258		 * see details in lnet_create_reply_msg() */
259		LASSERT(msg->msg_type == LNET_MSG_GET ||
260			msg->msg_type == LNET_MSG_REPLY);
261		break;
262	}
263
264	counters->recv_count++;
265	if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
266		counters->recv_length += msg->msg_wanted;
267
268 out:
269	lnet_return_rx_credits_locked(msg);
270	msg->msg_rx_committed = 0;
271}
272
273void
274lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
275{
276	int	cpt2 = cpt;
277
278	LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
279	LASSERT(msg->msg_onactivelist);
280
281	if (msg->msg_tx_committed) { /* always decommit for sending first */
282		LASSERT(cpt == msg->msg_tx_cpt);
283		lnet_msg_decommit_tx(msg, status);
284	}
285
286	if (msg->msg_rx_committed) {
287		/* forwarding msg committed for both receiving and sending */
288		if (cpt != msg->msg_rx_cpt) {
289			lnet_net_unlock(cpt);
290			cpt2 = msg->msg_rx_cpt;
291			lnet_net_lock(cpt2);
292		}
293		lnet_msg_decommit_rx(msg, status);
294	}
295
296	list_del(&msg->msg_activelist);
297	msg->msg_onactivelist = 0;
298
299	the_lnet.ln_counters[cpt2]->msgs_alloc--;
300
301	if (cpt2 != cpt) {
302		lnet_net_unlock(cpt2);
303		lnet_net_lock(cpt);
304	}
305}
306
307void
308lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
309		   unsigned int offset, unsigned int mlen)
310{
311	/* NB: @offset and @len are only useful for receiving */
312	/* Here, we attach the MD on lnet_msg and mark it busy and
313	 * decrementing its threshold. Come what may, the lnet_msg "owns"
314	 * the MD until a call to lnet_msg_detach_md or lnet_finalize()
315	 * signals completion. */
316	LASSERT(!msg->msg_routing);
317
318	msg->msg_md = md;
319	if (msg->msg_receiving) { /* committed for receiving */
320		msg->msg_offset = offset;
321		msg->msg_wanted = mlen;
322	}
323
324	md->md_refcount++;
325	if (md->md_threshold != LNET_MD_THRESH_INF) {
326		LASSERT(md->md_threshold > 0);
327		md->md_threshold--;
328	}
329
330	/* build umd in event */
331	lnet_md2handle(&msg->msg_ev.md_handle, md);
332	lnet_md_deconstruct(md, &msg->msg_ev.md);
333}
334
335void
336lnet_msg_detach_md(lnet_msg_t *msg, int status)
337{
338	lnet_libmd_t	*md = msg->msg_md;
339	int		unlink;
340
341	/* Now it's safe to drop my caller's ref */
342	md->md_refcount--;
343	LASSERT(md->md_refcount >= 0);
344
345	unlink = lnet_md_unlinkable(md);
346	if (md->md_eq != NULL) {
347		msg->msg_ev.status   = status;
348		msg->msg_ev.unlinked = unlink;
349		lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
350	}
351
352	if (unlink)
353		lnet_md_unlink(md);
354
355	msg->msg_md = NULL;
356}
357
358static int
359lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
360{
361	lnet_handle_wire_t ack_wmd;
362	int		rc;
363	int		status = msg->msg_ev.status;
364
365	LASSERT (msg->msg_onactivelist);
366
367	if (status == 0 && msg->msg_ack) {
368		/* Only send an ACK if the PUT completed successfully */
369
370		lnet_msg_decommit(msg, cpt, 0);
371
372		msg->msg_ack = 0;
373		lnet_net_unlock(cpt);
374
375		LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
376		LASSERT(!msg->msg_routing);
377
378		ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
379
380		lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
381
382		msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
383		msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
384		msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
385
386		/* NB: we probably want to use NID of msg::msg_from as 3rd
387		 * parameter (router NID) if it's routed message */
388		rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
389
390		lnet_net_lock(cpt);
391		/*
392		 * NB: message is committed for sending, we should return
393		 * on success because LND will finalize this message later.
394		 *
395		 * Also, there is possibility that message is committed for
396		 * sending and also failed before delivering to LND,
397		 * i.e: ENOMEM, in that case we can't fall through either
398		 * because CPT for sending can be different with CPT for
399		 * receiving, so we should return back to lnet_finalize()
400		 * to make sure we are locking the correct partition.
401		 */
402		return rc;
403
404	} else if (status == 0 &&	/* OK so far */
405		   (msg->msg_routing && !msg->msg_sending)) {
406		/* not forwarded */
407		LASSERT(!msg->msg_receiving);	/* called back recv already */
408		lnet_net_unlock(cpt);
409
410		rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
411
412		lnet_net_lock(cpt);
413		/*
414		 * NB: message is committed for sending, we should return
415		 * on success because LND will finalize this message later.
416		 *
417		 * Also, there is possibility that message is committed for
418		 * sending and also failed before delivering to LND,
419		 * i.e: ENOMEM, in that case we can't fall through either:
420		 * - The rule is message must decommit for sending first if
421		 *   the it's committed for both sending and receiving
422		 * - CPT for sending can be different with CPT for receiving,
423		 *   so we should return back to lnet_finalize() to make
424		 *   sure we are locking the correct partition.
425		 */
426		return rc;
427	}
428
429	lnet_msg_decommit(msg, cpt, status);
430	lnet_msg_free_locked(msg);
431	return 0;
432}
433
434void
435lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
436{
437	struct lnet_msg_container	*container;
438	int				my_slot;
439	int				cpt;
440	int				rc;
441	int				i;
442
443	LASSERT (!in_interrupt ());
444
445	if (msg == NULL)
446		return;
447#if 0
448	CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
449	       lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
450	       msg->msg_target_is_router ? "t" : "",
451	       msg->msg_routing ? "X" : "",
452	       msg->msg_ack ? "A" : "",
453	       msg->msg_sending ? "S" : "",
454	       msg->msg_receiving ? "R" : "",
455	       msg->msg_delayed ? "d" : "",
456	       msg->msg_txcredit ? "C" : "",
457	       msg->msg_peertxcredit ? "c" : "",
458	       msg->msg_rtrcredit ? "F" : "",
459	       msg->msg_peerrtrcredit ? "f" : "",
460	       msg->msg_onactivelist ? "!" : "",
461	       msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
462	       msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
463#endif
464	msg->msg_ev.status = status;
465
466	if (msg->msg_md != NULL) {
467		cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
468
469		lnet_res_lock(cpt);
470		lnet_msg_detach_md(msg, status);
471		lnet_res_unlock(cpt);
472	}
473
474 again:
475	rc = 0;
476	if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
477		/* not committed to network yet */
478		LASSERT(!msg->msg_onactivelist);
479		lnet_msg_free(msg);
480		return;
481	}
482
483	/*
484	 * NB: routed message can be committed for both receiving and sending,
485	 * we should finalize in LIFO order and keep counters correct.
486	 * (finalize sending first then finalize receiving)
487	 */
488	cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt;
489	lnet_net_lock(cpt);
490
491	container = the_lnet.ln_msg_containers[cpt];
492	list_add_tail(&msg->msg_list, &container->msc_finalizing);
493
494	/* Recursion breaker.  Don't complete the message here if I am (or
495	 * enough other threads are) already completing messages */
496
497	my_slot = -1;
498	for (i = 0; i < container->msc_nfinalizers; i++) {
499		if (container->msc_finalizers[i] == current)
500			break;
501
502		if (my_slot < 0 && container->msc_finalizers[i] == NULL)
503			my_slot = i;
504	}
505
506	if (i < container->msc_nfinalizers || my_slot < 0) {
507		lnet_net_unlock(cpt);
508		return;
509	}
510
511	container->msc_finalizers[my_slot] = current;
512
513	while (!list_empty(&container->msc_finalizing)) {
514		msg = list_entry(container->msc_finalizing.next,
515				     lnet_msg_t, msg_list);
516
517		list_del(&msg->msg_list);
518
519		/* NB drops and regains the lnet lock if it actually does
520		 * anything, so my finalizing friends can chomp along too */
521		rc = lnet_complete_msg_locked(msg, cpt);
522		if (rc != 0)
523			break;
524	}
525
526	container->msc_finalizers[my_slot] = NULL;
527	lnet_net_unlock(cpt);
528
529	if (rc != 0)
530		goto again;
531}
532EXPORT_SYMBOL(lnet_finalize);
533
534void
535lnet_msg_container_cleanup(struct lnet_msg_container *container)
536{
537	int     count = 0;
538
539	if (container->msc_init == 0)
540		return;
541
542	while (!list_empty(&container->msc_active)) {
543		lnet_msg_t *msg = list_entry(container->msc_active.next,
544						 lnet_msg_t, msg_activelist);
545
546		LASSERT(msg->msg_onactivelist);
547		msg->msg_onactivelist = 0;
548		list_del(&msg->msg_activelist);
549		lnet_msg_free(msg);
550		count++;
551	}
552
553	if (count > 0)
554		CERROR("%d active msg on exit\n", count);
555
556	if (container->msc_finalizers != NULL) {
557		LIBCFS_FREE(container->msc_finalizers,
558			    container->msc_nfinalizers *
559			    sizeof(*container->msc_finalizers));
560		container->msc_finalizers = NULL;
561	}
562#ifdef LNET_USE_LIB_FREELIST
563	lnet_freelist_fini(&container->msc_freelist);
564#endif
565	container->msc_init = 0;
566}
567
568int
569lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
570{
571	int	rc;
572
573	container->msc_init = 1;
574
575	INIT_LIST_HEAD(&container->msc_active);
576	INIT_LIST_HEAD(&container->msc_finalizing);
577
578#ifdef LNET_USE_LIB_FREELIST
579	memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
580
581	rc = lnet_freelist_init(&container->msc_freelist,
582				LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
583	if (rc != 0) {
584		CERROR("Failed to init freelist for message container\n");
585		lnet_msg_container_cleanup(container);
586		return rc;
587	}
588#else
589	rc = 0;
590#endif
591	/* number of CPUs */
592	container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
593
594	LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt,
595			 container->msc_nfinalizers *
596			 sizeof(*container->msc_finalizers));
597
598	if (container->msc_finalizers == NULL) {
599		CERROR("Failed to allocate message finalizers\n");
600		lnet_msg_container_cleanup(container);
601		return -ENOMEM;
602	}
603
604	return rc;
605}
606
607void
608lnet_msg_containers_destroy(void)
609{
610	struct lnet_msg_container *container;
611	int     i;
612
613	if (the_lnet.ln_msg_containers == NULL)
614		return;
615
616	cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers)
617		lnet_msg_container_cleanup(container);
618
619	cfs_percpt_free(the_lnet.ln_msg_containers);
620	the_lnet.ln_msg_containers = NULL;
621}
622
623int
624lnet_msg_containers_create(void)
625{
626	struct lnet_msg_container *container;
627	int	rc;
628	int	i;
629
630	the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(),
631						      sizeof(*container));
632
633	if (the_lnet.ln_msg_containers == NULL) {
634		CERROR("Failed to allocate cpu-partition data for network\n");
635		return -ENOMEM;
636	}
637
638	cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) {
639		rc = lnet_msg_container_setup(container, i);
640		if (rc != 0) {
641			lnet_msg_containers_destroy();
642			return rc;
643		}
644	}
645
646	return 0;
647}
648