link.c revision 471450f7ec24ccd9ac24e6f05cd9358d40c09d03
1/*
2 * net/tipc/link.c: TIPC link code
3 *
4 * Copyright (c) 1996-2007, Ericsson AB
5 * Copyright (c) 2004-2007, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 *    contributors may be used to endorse or promote products derived from
18 *    this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include "core.h"
38#include "link.h"
39#include "port.h"
40#include "name_distr.h"
41#include "discover.h"
42#include "config.h"
43
44
45/*
46 * Out-of-range value for link session numbers
47 */
48
49#define INVALID_SESSION 0x10000
50
51/*
52 * Link state events:
53 */
54
55#define  STARTING_EVT    856384768	/* link processing trigger */
56#define  TRAFFIC_MSG_EVT 560815u	/* rx'd ??? */
57#define  TIMEOUT_EVT     560817u	/* link timer expired */
58
59/*
60 * The following two 'message types' is really just implementation
61 * data conveniently stored in the message header.
62 * They must not be considered part of the protocol
63 */
64#define OPEN_MSG   0
65#define CLOSED_MSG 1
66
67/*
68 * State value stored in 'exp_msg_count'
69 */
70
71#define START_CHANGEOVER 100000u
72
73/**
74 * struct link_name - deconstructed link name
75 * @addr_local: network address of node at this end
76 * @if_local: name of interface at this end
77 * @addr_peer: network address of node at far end
78 * @if_peer: name of interface at far end
79 */
80
81struct link_name {
82	u32 addr_local;
83	char if_local[TIPC_MAX_IF_NAME];
84	u32 addr_peer;
85	char if_peer[TIPC_MAX_IF_NAME];
86};
87
88static void link_handle_out_of_seq_msg(struct link *l_ptr,
89				       struct sk_buff *buf);
90static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
91static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
92static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
93static int  link_send_sections_long(struct port *sender,
94				    struct iovec const *msg_sect,
95				    u32 num_sect, u32 destnode);
96static void link_check_defragm_bufs(struct link *l_ptr);
97static void link_state_event(struct link *l_ptr, u32 event);
98static void link_reset_statistics(struct link *l_ptr);
99static void link_print(struct link *l_ptr, struct print_buf *buf,
100		       const char *str);
101static void link_start(struct link *l_ptr);
102static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf);
103
104
105/*
106 * Debugging code used by link routines only
107 *
108 * When debugging link problems on a system that has multiple links,
109 * the standard TIPC debugging routines may not be useful since they
110 * allow the output from multiple links to be intermixed.  For this reason
111 * routines of the form "dbg_link_XXX()" have been created that will capture
112 * debug info into a link's personal print buffer, which can then be dumped
113 * into the TIPC system log (TIPC_LOG) upon request.
114 *
115 * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
116 * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
117 * the dbg_link_XXX() routines simply send their output to the standard
118 * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
119 * when there is only a single link in the system being debugged.
120 *
121 * Notes:
122 * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE
123 * - "l_ptr" must be valid when using dbg_link_XXX() macros
124 */
125
126#define LINK_LOG_BUF_SIZE 0
127
128#define dbg_link(fmt, arg...) \
129	do { \
130		if (LINK_LOG_BUF_SIZE) \
131			tipc_printf(&l_ptr->print_buf, fmt, ## arg); \
132	} while (0)
133#define dbg_link_msg(msg, txt) \
134	do { \
135		if (LINK_LOG_BUF_SIZE) \
136			tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \
137	} while (0)
138#define dbg_link_state(txt) \
139	do { \
140		if (LINK_LOG_BUF_SIZE) \
141			link_print(l_ptr, &l_ptr->print_buf, txt); \
142	} while (0)
143#define dbg_link_dump() do { \
144	if (LINK_LOG_BUF_SIZE) { \
145		tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \
146		tipc_printbuf_move(LOG, &l_ptr->print_buf); \
147	} \
148} while (0)
149
150static void dbg_print_link(struct link *l_ptr, const char *str)
151{
152	if (DBG_OUTPUT != TIPC_NULL)
153		link_print(l_ptr, DBG_OUTPUT, str);
154}
155
156static void dbg_print_buf_chain(struct sk_buff *root_buf)
157{
158	if (DBG_OUTPUT != TIPC_NULL) {
159		struct sk_buff *buf = root_buf;
160
161		while (buf) {
162			msg_dbg(buf_msg(buf), "In chain: ");
163			buf = buf->next;
164		}
165	}
166}
167
168/*
169 *  Simple link routines
170 */
171
172static unsigned int align(unsigned int i)
173{
174	return (i + 3) & ~3u;
175}
176
177static void link_init_max_pkt(struct link *l_ptr)
178{
179	u32 max_pkt;
180
181	max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
182	if (max_pkt > MAX_MSG_SIZE)
183		max_pkt = MAX_MSG_SIZE;
184
185	l_ptr->max_pkt_target = max_pkt;
186	if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
187		l_ptr->max_pkt = l_ptr->max_pkt_target;
188	else
189		l_ptr->max_pkt = MAX_PKT_DEFAULT;
190
191	l_ptr->max_pkt_probes = 0;
192}
193
194static u32 link_next_sent(struct link *l_ptr)
195{
196	if (l_ptr->next_out)
197		return msg_seqno(buf_msg(l_ptr->next_out));
198	return mod(l_ptr->next_out_no);
199}
200
201static u32 link_last_sent(struct link *l_ptr)
202{
203	return mod(link_next_sent(l_ptr) - 1);
204}
205
206/*
207 *  Simple non-static link routines (i.e. referenced outside this file)
208 */
209
210int tipc_link_is_up(struct link *l_ptr)
211{
212	if (!l_ptr)
213		return 0;
214	return link_working_working(l_ptr) || link_working_unknown(l_ptr);
215}
216
217int tipc_link_is_active(struct link *l_ptr)
218{
219	return	(l_ptr->owner->active_links[0] == l_ptr) ||
220		(l_ptr->owner->active_links[1] == l_ptr);
221}
222
223/**
224 * link_name_validate - validate & (optionally) deconstruct link name
225 * @name - ptr to link name string
226 * @name_parts - ptr to area for link name components (or NULL if not needed)
227 *
228 * Returns 1 if link name is valid, otherwise 0.
229 */
230
231static int link_name_validate(const char *name, struct link_name *name_parts)
232{
233	char name_copy[TIPC_MAX_LINK_NAME];
234	char *addr_local;
235	char *if_local;
236	char *addr_peer;
237	char *if_peer;
238	char dummy;
239	u32 z_local, c_local, n_local;
240	u32 z_peer, c_peer, n_peer;
241	u32 if_local_len;
242	u32 if_peer_len;
243
244	/* copy link name & ensure length is OK */
245
246	name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
247	/* need above in case non-Posix strncpy() doesn't pad with nulls */
248	strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
249	if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
250		return 0;
251
252	/* ensure all component parts of link name are present */
253
254	addr_local = name_copy;
255	if ((if_local = strchr(addr_local, ':')) == NULL)
256		return 0;
257	*(if_local++) = 0;
258	if ((addr_peer = strchr(if_local, '-')) == NULL)
259		return 0;
260	*(addr_peer++) = 0;
261	if_local_len = addr_peer - if_local;
262	if ((if_peer = strchr(addr_peer, ':')) == NULL)
263		return 0;
264	*(if_peer++) = 0;
265	if_peer_len = strlen(if_peer) + 1;
266
267	/* validate component parts of link name */
268
269	if ((sscanf(addr_local, "%u.%u.%u%c",
270		    &z_local, &c_local, &n_local, &dummy) != 3) ||
271	    (sscanf(addr_peer, "%u.%u.%u%c",
272		    &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
273	    (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
274	    (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
275	    (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
276	    (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
277	    (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
278	    (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
279		return 0;
280
281	/* return link name components, if necessary */
282
283	if (name_parts) {
284		name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
285		strcpy(name_parts->if_local, if_local);
286		name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
287		strcpy(name_parts->if_peer, if_peer);
288	}
289	return 1;
290}
291
292/**
293 * link_timeout - handle expiration of link timer
294 * @l_ptr: pointer to link
295 *
296 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
297 * with tipc_link_delete().  (There is no risk that the node will be deleted by
298 * another thread because tipc_link_delete() always cancels the link timer before
299 * tipc_node_delete() is called.)
300 */
301
302static void link_timeout(struct link *l_ptr)
303{
304	tipc_node_lock(l_ptr->owner);
305
306	/* update counters used in statistical profiling of send traffic */
307
308	l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
309	l_ptr->stats.queue_sz_counts++;
310
311	if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
312		l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
313
314	if (l_ptr->first_out) {
315		struct tipc_msg *msg = buf_msg(l_ptr->first_out);
316		u32 length = msg_size(msg);
317
318		if ((msg_user(msg) == MSG_FRAGMENTER) &&
319		    (msg_type(msg) == FIRST_FRAGMENT)) {
320			length = msg_size(msg_get_wrapped(msg));
321		}
322		if (length) {
323			l_ptr->stats.msg_lengths_total += length;
324			l_ptr->stats.msg_length_counts++;
325			if (length <= 64)
326				l_ptr->stats.msg_length_profile[0]++;
327			else if (length <= 256)
328				l_ptr->stats.msg_length_profile[1]++;
329			else if (length <= 1024)
330				l_ptr->stats.msg_length_profile[2]++;
331			else if (length <= 4096)
332				l_ptr->stats.msg_length_profile[3]++;
333			else if (length <= 16384)
334				l_ptr->stats.msg_length_profile[4]++;
335			else if (length <= 32768)
336				l_ptr->stats.msg_length_profile[5]++;
337			else
338				l_ptr->stats.msg_length_profile[6]++;
339		}
340	}
341
342	/* do all other link processing performed on a periodic basis */
343
344	link_check_defragm_bufs(l_ptr);
345
346	link_state_event(l_ptr, TIMEOUT_EVT);
347
348	if (l_ptr->next_out)
349		tipc_link_push_queue(l_ptr);
350
351	tipc_node_unlock(l_ptr->owner);
352}
353
354static void link_set_timer(struct link *l_ptr, u32 time)
355{
356	k_start_timer(&l_ptr->timer, time);
357}
358
359/**
360 * tipc_link_create - create a new link
361 * @b_ptr: pointer to associated bearer
362 * @peer: network address of node at other end of link
363 * @media_addr: media address to use when sending messages over link
364 *
365 * Returns pointer to link.
366 */
367
368struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
369			      const struct tipc_media_addr *media_addr)
370{
371	struct link *l_ptr;
372	struct tipc_msg *msg;
373	char *if_name;
374
375	l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
376	if (!l_ptr) {
377		warn("Link creation failed, no memory\n");
378		return NULL;
379	}
380
381	if (LINK_LOG_BUF_SIZE) {
382		char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC);
383
384		if (!pb) {
385			kfree(l_ptr);
386			warn("Link creation failed, no memory for print buffer\n");
387			return NULL;
388		}
389		tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
390	}
391
392	l_ptr->addr = peer;
393	if_name = strchr(b_ptr->publ.name, ':') + 1;
394	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
395		tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
396		tipc_node(tipc_own_addr),
397		if_name,
398		tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
399		/* note: peer i/f is appended to link name by reset/activate */
400	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
401	l_ptr->checkpoint = 1;
402	l_ptr->b_ptr = b_ptr;
403	link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
404	l_ptr->state = RESET_UNKNOWN;
405
406	l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
407	msg = l_ptr->pmsg;
408	tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
409	msg_set_size(msg, sizeof(l_ptr->proto_msg));
410	msg_set_session(msg, (tipc_random & 0xffff));
411	msg_set_bearer_id(msg, b_ptr->identity);
412	strcpy((char *)msg_data(msg), if_name);
413
414	l_ptr->priority = b_ptr->priority;
415	tipc_link_set_queue_limits(l_ptr, b_ptr->media->window);
416
417	link_init_max_pkt(l_ptr);
418
419	l_ptr->next_out_no = 1;
420	INIT_LIST_HEAD(&l_ptr->waiting_ports);
421
422	link_reset_statistics(l_ptr);
423
424	l_ptr->owner = tipc_node_attach_link(l_ptr);
425	if (!l_ptr->owner) {
426		if (LINK_LOG_BUF_SIZE)
427			kfree(l_ptr->print_buf.buf);
428		kfree(l_ptr);
429		return NULL;
430	}
431
432	k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
433	list_add_tail(&l_ptr->link_list, &b_ptr->links);
434	tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
435
436	dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n",
437	    l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit);
438
439	return l_ptr;
440}
441
442/**
443 * tipc_link_delete - delete a link
444 * @l_ptr: pointer to link
445 *
446 * Note: 'tipc_net_lock' is write_locked, bearer is locked.
447 * This routine must not grab the node lock until after link timer cancellation
448 * to avoid a potential deadlock situation.
449 */
450
451void tipc_link_delete(struct link *l_ptr)
452{
453	if (!l_ptr) {
454		err("Attempt to delete non-existent link\n");
455		return;
456	}
457
458	dbg("tipc_link_delete()\n");
459
460	k_cancel_timer(&l_ptr->timer);
461
462	tipc_node_lock(l_ptr->owner);
463	tipc_link_reset(l_ptr);
464	tipc_node_detach_link(l_ptr->owner, l_ptr);
465	tipc_link_stop(l_ptr);
466	list_del_init(&l_ptr->link_list);
467	if (LINK_LOG_BUF_SIZE)
468		kfree(l_ptr->print_buf.buf);
469	tipc_node_unlock(l_ptr->owner);
470	k_term_timer(&l_ptr->timer);
471	kfree(l_ptr);
472}
473
474static void link_start(struct link *l_ptr)
475{
476	dbg("link_start %x\n", l_ptr);
477	link_state_event(l_ptr, STARTING_EVT);
478}
479
480/**
481 * link_schedule_port - schedule port for deferred sending
482 * @l_ptr: pointer to link
483 * @origport: reference to sending port
484 * @sz: amount of data to be sent
485 *
486 * Schedules port for renewed sending of messages after link congestion
487 * has abated.
488 */
489
490static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
491{
492	struct port *p_ptr;
493
494	spin_lock_bh(&tipc_port_list_lock);
495	p_ptr = tipc_port_lock(origport);
496	if (p_ptr) {
497		if (!p_ptr->wakeup)
498			goto exit;
499		if (!list_empty(&p_ptr->wait_list))
500			goto exit;
501		p_ptr->publ.congested = 1;
502		p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
503		list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
504		l_ptr->stats.link_congs++;
505exit:
506		tipc_port_unlock(p_ptr);
507	}
508	spin_unlock_bh(&tipc_port_list_lock);
509	return -ELINKCONG;
510}
511
512void tipc_link_wakeup_ports(struct link *l_ptr, int all)
513{
514	struct port *p_ptr;
515	struct port *temp_p_ptr;
516	int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
517
518	if (all)
519		win = 100000;
520	if (win <= 0)
521		return;
522	if (!spin_trylock_bh(&tipc_port_list_lock))
523		return;
524	if (link_congested(l_ptr))
525		goto exit;
526	list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
527				 wait_list) {
528		if (win <= 0)
529			break;
530		list_del_init(&p_ptr->wait_list);
531		spin_lock_bh(p_ptr->publ.lock);
532		p_ptr->publ.congested = 0;
533		p_ptr->wakeup(&p_ptr->publ);
534		win -= p_ptr->waiting_pkts;
535		spin_unlock_bh(p_ptr->publ.lock);
536	}
537
538exit:
539	spin_unlock_bh(&tipc_port_list_lock);
540}
541
542/**
543 * link_release_outqueue - purge link's outbound message queue
544 * @l_ptr: pointer to link
545 */
546
547static void link_release_outqueue(struct link *l_ptr)
548{
549	struct sk_buff *buf = l_ptr->first_out;
550	struct sk_buff *next;
551
552	while (buf) {
553		next = buf->next;
554		buf_discard(buf);
555		buf = next;
556	}
557	l_ptr->first_out = NULL;
558	l_ptr->out_queue_size = 0;
559}
560
561/**
562 * tipc_link_reset_fragments - purge link's inbound message fragments queue
563 * @l_ptr: pointer to link
564 */
565
566void tipc_link_reset_fragments(struct link *l_ptr)
567{
568	struct sk_buff *buf = l_ptr->defragm_buf;
569	struct sk_buff *next;
570
571	while (buf) {
572		next = buf->next;
573		buf_discard(buf);
574		buf = next;
575	}
576	l_ptr->defragm_buf = NULL;
577}
578
579/**
580 * tipc_link_stop - purge all inbound and outbound messages associated with link
581 * @l_ptr: pointer to link
582 */
583
584void tipc_link_stop(struct link *l_ptr)
585{
586	struct sk_buff *buf;
587	struct sk_buff *next;
588
589	buf = l_ptr->oldest_deferred_in;
590	while (buf) {
591		next = buf->next;
592		buf_discard(buf);
593		buf = next;
594	}
595
596	buf = l_ptr->first_out;
597	while (buf) {
598		next = buf->next;
599		buf_discard(buf);
600		buf = next;
601	}
602
603	tipc_link_reset_fragments(l_ptr);
604
605	buf_discard(l_ptr->proto_msg_queue);
606	l_ptr->proto_msg_queue = NULL;
607}
608
609/* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
610#define link_send_event(fcn, l_ptr, up) do { } while (0)
611
612void tipc_link_reset(struct link *l_ptr)
613{
614	struct sk_buff *buf;
615	u32 prev_state = l_ptr->state;
616	u32 checkpoint = l_ptr->next_in_no;
617	int was_active_link = tipc_link_is_active(l_ptr);
618
619	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
620
621	/* Link is down, accept any session */
622	l_ptr->peer_session = INVALID_SESSION;
623
624	/* Prepare for max packet size negotiation */
625	link_init_max_pkt(l_ptr);
626
627	l_ptr->state = RESET_UNKNOWN;
628	dbg_link_state("Resetting Link\n");
629
630	if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
631		return;
632
633	tipc_node_link_down(l_ptr->owner, l_ptr);
634	tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
635
636	if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
637	    l_ptr->owner->permit_changeover) {
638		l_ptr->reset_checkpoint = checkpoint;
639		l_ptr->exp_msg_count = START_CHANGEOVER;
640	}
641
642	/* Clean up all queues: */
643
644	link_release_outqueue(l_ptr);
645	buf_discard(l_ptr->proto_msg_queue);
646	l_ptr->proto_msg_queue = NULL;
647	buf = l_ptr->oldest_deferred_in;
648	while (buf) {
649		struct sk_buff *next = buf->next;
650		buf_discard(buf);
651		buf = next;
652	}
653	if (!list_empty(&l_ptr->waiting_ports))
654		tipc_link_wakeup_ports(l_ptr, 1);
655
656	l_ptr->retransm_queue_head = 0;
657	l_ptr->retransm_queue_size = 0;
658	l_ptr->last_out = NULL;
659	l_ptr->first_out = NULL;
660	l_ptr->next_out = NULL;
661	l_ptr->unacked_window = 0;
662	l_ptr->checkpoint = 1;
663	l_ptr->next_out_no = 1;
664	l_ptr->deferred_inqueue_sz = 0;
665	l_ptr->oldest_deferred_in = NULL;
666	l_ptr->newest_deferred_in = NULL;
667	l_ptr->fsm_msg_cnt = 0;
668	l_ptr->stale_count = 0;
669	link_reset_statistics(l_ptr);
670
671	link_send_event(tipc_cfg_link_event, l_ptr, 0);
672	if (!in_own_cluster(l_ptr->addr))
673		link_send_event(tipc_disc_link_event, l_ptr, 0);
674}
675
676
677static void link_activate(struct link *l_ptr)
678{
679	l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
680	tipc_node_link_up(l_ptr->owner, l_ptr);
681	tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
682	link_send_event(tipc_cfg_link_event, l_ptr, 1);
683	if (!in_own_cluster(l_ptr->addr))
684		link_send_event(tipc_disc_link_event, l_ptr, 1);
685}
686
687/**
688 * link_state_event - link finite state machine
689 * @l_ptr: pointer to link
690 * @event: state machine event to process
691 */
692
693static void link_state_event(struct link *l_ptr, unsigned event)
694{
695	struct link *other;
696	u32 cont_intv = l_ptr->continuity_interval;
697
698	if (!l_ptr->started && (event != STARTING_EVT))
699		return;		/* Not yet. */
700
701	if (link_blocked(l_ptr)) {
702		if (event == TIMEOUT_EVT) {
703			link_set_timer(l_ptr, cont_intv);
704		}
705		return;	  /* Changeover going on */
706	}
707	dbg_link("STATE_EV: <%s> ", l_ptr->name);
708
709	switch (l_ptr->state) {
710	case WORKING_WORKING:
711		dbg_link("WW/");
712		switch (event) {
713		case TRAFFIC_MSG_EVT:
714			dbg_link("TRF-");
715			/* fall through */
716		case ACTIVATE_MSG:
717			dbg_link("ACT\n");
718			break;
719		case TIMEOUT_EVT:
720			dbg_link("TIM ");
721			if (l_ptr->next_in_no != l_ptr->checkpoint) {
722				l_ptr->checkpoint = l_ptr->next_in_no;
723				if (tipc_bclink_acks_missing(l_ptr->owner)) {
724					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
725								 0, 0, 0, 0, 0);
726					l_ptr->fsm_msg_cnt++;
727				} else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
728					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
729								 1, 0, 0, 0, 0);
730					l_ptr->fsm_msg_cnt++;
731				}
732				link_set_timer(l_ptr, cont_intv);
733				break;
734			}
735			dbg_link(" -> WU\n");
736			l_ptr->state = WORKING_UNKNOWN;
737			l_ptr->fsm_msg_cnt = 0;
738			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
739			l_ptr->fsm_msg_cnt++;
740			link_set_timer(l_ptr, cont_intv / 4);
741			break;
742		case RESET_MSG:
743			dbg_link("RES -> RR\n");
744			info("Resetting link <%s>, requested by peer\n",
745			     l_ptr->name);
746			tipc_link_reset(l_ptr);
747			l_ptr->state = RESET_RESET;
748			l_ptr->fsm_msg_cnt = 0;
749			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
750			l_ptr->fsm_msg_cnt++;
751			link_set_timer(l_ptr, cont_intv);
752			break;
753		default:
754			err("Unknown link event %u in WW state\n", event);
755		}
756		break;
757	case WORKING_UNKNOWN:
758		dbg_link("WU/");
759		switch (event) {
760		case TRAFFIC_MSG_EVT:
761			dbg_link("TRF-");
762		case ACTIVATE_MSG:
763			dbg_link("ACT -> WW\n");
764			l_ptr->state = WORKING_WORKING;
765			l_ptr->fsm_msg_cnt = 0;
766			link_set_timer(l_ptr, cont_intv);
767			break;
768		case RESET_MSG:
769			dbg_link("RES -> RR\n");
770			info("Resetting link <%s>, requested by peer "
771			     "while probing\n", l_ptr->name);
772			tipc_link_reset(l_ptr);
773			l_ptr->state = RESET_RESET;
774			l_ptr->fsm_msg_cnt = 0;
775			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
776			l_ptr->fsm_msg_cnt++;
777			link_set_timer(l_ptr, cont_intv);
778			break;
779		case TIMEOUT_EVT:
780			dbg_link("TIM ");
781			if (l_ptr->next_in_no != l_ptr->checkpoint) {
782				dbg_link("-> WW\n");
783				l_ptr->state = WORKING_WORKING;
784				l_ptr->fsm_msg_cnt = 0;
785				l_ptr->checkpoint = l_ptr->next_in_no;
786				if (tipc_bclink_acks_missing(l_ptr->owner)) {
787					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
788								 0, 0, 0, 0, 0);
789					l_ptr->fsm_msg_cnt++;
790				}
791				link_set_timer(l_ptr, cont_intv);
792			} else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
793				dbg_link("Probing %u/%u,timer = %u ms)\n",
794					 l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
795					 cont_intv / 4);
796				tipc_link_send_proto_msg(l_ptr, STATE_MSG,
797							 1, 0, 0, 0, 0);
798				l_ptr->fsm_msg_cnt++;
799				link_set_timer(l_ptr, cont_intv / 4);
800			} else {	/* Link has failed */
801				dbg_link("-> RU (%u probes unanswered)\n",
802					 l_ptr->fsm_msg_cnt);
803				warn("Resetting link <%s>, peer not responding\n",
804				     l_ptr->name);
805				tipc_link_reset(l_ptr);
806				l_ptr->state = RESET_UNKNOWN;
807				l_ptr->fsm_msg_cnt = 0;
808				tipc_link_send_proto_msg(l_ptr, RESET_MSG,
809							 0, 0, 0, 0, 0);
810				l_ptr->fsm_msg_cnt++;
811				link_set_timer(l_ptr, cont_intv);
812			}
813			break;
814		default:
815			err("Unknown link event %u in WU state\n", event);
816		}
817		break;
818	case RESET_UNKNOWN:
819		dbg_link("RU/");
820		switch (event) {
821		case TRAFFIC_MSG_EVT:
822			dbg_link("TRF-\n");
823			break;
824		case ACTIVATE_MSG:
825			other = l_ptr->owner->active_links[0];
826			if (other && link_working_unknown(other)) {
827				dbg_link("ACT\n");
828				break;
829			}
830			dbg_link("ACT -> WW\n");
831			l_ptr->state = WORKING_WORKING;
832			l_ptr->fsm_msg_cnt = 0;
833			link_activate(l_ptr);
834			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
835			l_ptr->fsm_msg_cnt++;
836			link_set_timer(l_ptr, cont_intv);
837			break;
838		case RESET_MSG:
839			dbg_link("RES\n");
840			dbg_link(" -> RR\n");
841			l_ptr->state = RESET_RESET;
842			l_ptr->fsm_msg_cnt = 0;
843			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
844			l_ptr->fsm_msg_cnt++;
845			link_set_timer(l_ptr, cont_intv);
846			break;
847		case STARTING_EVT:
848			dbg_link("START-");
849			l_ptr->started = 1;
850			/* fall through */
851		case TIMEOUT_EVT:
852			dbg_link("TIM\n");
853			tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
854			l_ptr->fsm_msg_cnt++;
855			link_set_timer(l_ptr, cont_intv);
856			break;
857		default:
858			err("Unknown link event %u in RU state\n", event);
859		}
860		break;
861	case RESET_RESET:
862		dbg_link("RR/ ");
863		switch (event) {
864		case TRAFFIC_MSG_EVT:
865			dbg_link("TRF-");
866			/* fall through */
867		case ACTIVATE_MSG:
868			other = l_ptr->owner->active_links[0];
869			if (other && link_working_unknown(other)) {
870				dbg_link("ACT\n");
871				break;
872			}
873			dbg_link("ACT -> WW\n");
874			l_ptr->state = WORKING_WORKING;
875			l_ptr->fsm_msg_cnt = 0;
876			link_activate(l_ptr);
877			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
878			l_ptr->fsm_msg_cnt++;
879			link_set_timer(l_ptr, cont_intv);
880			break;
881		case RESET_MSG:
882			dbg_link("RES\n");
883			break;
884		case TIMEOUT_EVT:
885			dbg_link("TIM\n");
886			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
887			l_ptr->fsm_msg_cnt++;
888			link_set_timer(l_ptr, cont_intv);
889			dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt);
890			break;
891		default:
892			err("Unknown link event %u in RR state\n", event);
893		}
894		break;
895	default:
896		err("Unknown link state %u/%u\n", l_ptr->state, event);
897	}
898}
899
900/*
901 * link_bundle_buf(): Append contents of a buffer to
902 * the tail of an existing one.
903 */
904
905static int link_bundle_buf(struct link *l_ptr,
906			   struct sk_buff *bundler,
907			   struct sk_buff *buf)
908{
909	struct tipc_msg *bundler_msg = buf_msg(bundler);
910	struct tipc_msg *msg = buf_msg(buf);
911	u32 size = msg_size(msg);
912	u32 bundle_size = msg_size(bundler_msg);
913	u32 to_pos = align(bundle_size);
914	u32 pad = to_pos - bundle_size;
915
916	if (msg_user(bundler_msg) != MSG_BUNDLER)
917		return 0;
918	if (msg_type(bundler_msg) != OPEN_MSG)
919		return 0;
920	if (skb_tailroom(bundler) < (pad + size))
921		return 0;
922	if (l_ptr->max_pkt < (to_pos + size))
923		return 0;
924
925	skb_put(bundler, pad + size);
926	skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
927	msg_set_size(bundler_msg, to_pos + size);
928	msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
929	dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n",
930	    msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg));
931	msg_dbg(msg, "PACKD:");
932	buf_discard(buf);
933	l_ptr->stats.sent_bundled++;
934	return 1;
935}
936
937static void link_add_to_outqueue(struct link *l_ptr,
938				 struct sk_buff *buf,
939				 struct tipc_msg *msg)
940{
941	u32 ack = mod(l_ptr->next_in_no - 1);
942	u32 seqno = mod(l_ptr->next_out_no++);
943
944	msg_set_word(msg, 2, ((ack << 16) | seqno));
945	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
946	buf->next = NULL;
947	if (l_ptr->first_out) {
948		l_ptr->last_out->next = buf;
949		l_ptr->last_out = buf;
950	} else
951		l_ptr->first_out = l_ptr->last_out = buf;
952	l_ptr->out_queue_size++;
953}
954
955/*
956 * tipc_link_send_buf() is the 'full path' for messages, called from
957 * inside TIPC when the 'fast path' in tipc_send_buf
958 * has failed, and from link_send()
959 */
960
961int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
962{
963	struct tipc_msg *msg = buf_msg(buf);
964	u32 size = msg_size(msg);
965	u32 dsz = msg_data_sz(msg);
966	u32 queue_size = l_ptr->out_queue_size;
967	u32 imp = tipc_msg_tot_importance(msg);
968	u32 queue_limit = l_ptr->queue_limit[imp];
969	u32 max_packet = l_ptr->max_pkt;
970
971	msg_set_prevnode(msg, tipc_own_addr);	/* If routed message */
972
973	/* Match msg importance against queue limits: */
974
975	if (unlikely(queue_size >= queue_limit)) {
976		if (imp <= TIPC_CRITICAL_IMPORTANCE) {
977			return link_schedule_port(l_ptr, msg_origport(msg),
978						  size);
979		}
980		msg_dbg(msg, "TIPC: Congestion, throwing away\n");
981		buf_discard(buf);
982		if (imp > CONN_MANAGER) {
983			warn("Resetting link <%s>, send queue full", l_ptr->name);
984			tipc_link_reset(l_ptr);
985		}
986		return dsz;
987	}
988
989	/* Fragmentation needed ? */
990
991	if (size > max_packet)
992		return link_send_long_buf(l_ptr, buf);
993
994	/* Packet can be queued or sent: */
995
996	if (queue_size > l_ptr->stats.max_queue_sz)
997		l_ptr->stats.max_queue_sz = queue_size;
998
999	if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
1000		   !link_congested(l_ptr))) {
1001		link_add_to_outqueue(l_ptr, buf, msg);
1002
1003		if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
1004			l_ptr->unacked_window = 0;
1005		} else {
1006			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1007			l_ptr->stats.bearer_congs++;
1008			l_ptr->next_out = buf;
1009		}
1010		return dsz;
1011	}
1012	/* Congestion: can message be bundled ?: */
1013
1014	if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
1015	    (msg_user(msg) != MSG_FRAGMENTER)) {
1016
1017		/* Try adding message to an existing bundle */
1018
1019		if (l_ptr->next_out &&
1020		    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
1021			tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1022			return dsz;
1023		}
1024
1025		/* Try creating a new bundle */
1026
1027		if (size <= max_packet * 2 / 3) {
1028			struct sk_buff *bundler = tipc_buf_acquire(max_packet);
1029			struct tipc_msg bundler_hdr;
1030
1031			if (bundler) {
1032				tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
1033					 INT_H_SIZE, l_ptr->addr);
1034				skb_copy_to_linear_data(bundler, &bundler_hdr,
1035							INT_H_SIZE);
1036				skb_trim(bundler, INT_H_SIZE);
1037				link_bundle_buf(l_ptr, bundler, buf);
1038				buf = bundler;
1039				msg = buf_msg(buf);
1040				l_ptr->stats.sent_bundles++;
1041			}
1042		}
1043	}
1044	if (!l_ptr->next_out)
1045		l_ptr->next_out = buf;
1046	link_add_to_outqueue(l_ptr, buf, msg);
1047	tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1048	return dsz;
1049}
1050
1051/*
1052 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
1053 * not been selected yet, and the the owner node is not locked
1054 * Called by TIPC internal users, e.g. the name distributor
1055 */
1056
1057int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
1058{
1059	struct link *l_ptr;
1060	struct tipc_node *n_ptr;
1061	int res = -ELINKCONG;
1062
1063	read_lock_bh(&tipc_net_lock);
1064	n_ptr = tipc_node_select(dest, selector);
1065	if (n_ptr) {
1066		tipc_node_lock(n_ptr);
1067		l_ptr = n_ptr->active_links[selector & 1];
1068		if (l_ptr) {
1069			dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest);
1070			res = tipc_link_send_buf(l_ptr, buf);
1071		} else {
1072			dbg("Attempt to send msg to unreachable node:\n");
1073			msg_dbg(buf_msg(buf),">>>");
1074			buf_discard(buf);
1075		}
1076		tipc_node_unlock(n_ptr);
1077	} else {
1078		dbg("Attempt to send msg to unknown node:\n");
1079		msg_dbg(buf_msg(buf),">>>");
1080		buf_discard(buf);
1081	}
1082	read_unlock_bh(&tipc_net_lock);
1083	return res;
1084}
1085
1086/*
1087 * link_send_buf_fast: Entry for data messages where the
1088 * destination link is known and the header is complete,
1089 * inclusive total message length. Very time critical.
1090 * Link is locked. Returns user data length.
1091 */
1092
1093static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
1094			      u32 *used_max_pkt)
1095{
1096	struct tipc_msg *msg = buf_msg(buf);
1097	int res = msg_data_sz(msg);
1098
1099	if (likely(!link_congested(l_ptr))) {
1100		if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1101			if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1102				link_add_to_outqueue(l_ptr, buf, msg);
1103				if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
1104							    &l_ptr->media_addr))) {
1105					l_ptr->unacked_window = 0;
1106					msg_dbg(msg,"SENT_FAST:");
1107					return res;
1108				}
1109				dbg("failed sent fast...\n");
1110				tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1111				l_ptr->stats.bearer_congs++;
1112				l_ptr->next_out = buf;
1113				return res;
1114			}
1115		}
1116		else
1117			*used_max_pkt = l_ptr->max_pkt;
1118	}
1119	return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1120}
1121
1122/*
1123 * tipc_send_buf_fast: Entry for data messages where the
1124 * destination node is known and the header is complete,
1125 * inclusive total message length.
1126 * Returns user data length.
1127 */
1128int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1129{
1130	struct link *l_ptr;
1131	struct tipc_node *n_ptr;
1132	int res;
1133	u32 selector = msg_origport(buf_msg(buf)) & 1;
1134	u32 dummy;
1135
1136	if (destnode == tipc_own_addr)
1137		return tipc_port_recv_msg(buf);
1138
1139	read_lock_bh(&tipc_net_lock);
1140	n_ptr = tipc_node_select(destnode, selector);
1141	if (likely(n_ptr)) {
1142		tipc_node_lock(n_ptr);
1143		l_ptr = n_ptr->active_links[selector];
1144		dbg("send_fast: buf %x selected %x, destnode = %x\n",
1145		    buf, l_ptr, destnode);
1146		if (likely(l_ptr)) {
1147			res = link_send_buf_fast(l_ptr, buf, &dummy);
1148			tipc_node_unlock(n_ptr);
1149			read_unlock_bh(&tipc_net_lock);
1150			return res;
1151		}
1152		tipc_node_unlock(n_ptr);
1153	}
1154	read_unlock_bh(&tipc_net_lock);
1155	res = msg_data_sz(buf_msg(buf));
1156	tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1157	return res;
1158}
1159
1160
1161/*
1162 * tipc_link_send_sections_fast: Entry for messages where the
1163 * destination processor is known and the header is complete,
1164 * except for total message length.
1165 * Returns user data length or errno.
1166 */
1167int tipc_link_send_sections_fast(struct port *sender,
1168				 struct iovec const *msg_sect,
1169				 const u32 num_sect,
1170				 u32 destaddr)
1171{
1172	struct tipc_msg *hdr = &sender->publ.phdr;
1173	struct link *l_ptr;
1174	struct sk_buff *buf;
1175	struct tipc_node *node;
1176	int res;
1177	u32 selector = msg_origport(hdr) & 1;
1178
1179again:
1180	/*
1181	 * Try building message using port's max_pkt hint.
1182	 * (Must not hold any locks while building message.)
1183	 */
1184
1185	res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt,
1186			!sender->user_port, &buf);
1187
1188	read_lock_bh(&tipc_net_lock);
1189	node = tipc_node_select(destaddr, selector);
1190	if (likely(node)) {
1191		tipc_node_lock(node);
1192		l_ptr = node->active_links[selector];
1193		if (likely(l_ptr)) {
1194			if (likely(buf)) {
1195				res = link_send_buf_fast(l_ptr, buf,
1196							 &sender->publ.max_pkt);
1197				if (unlikely(res < 0))
1198					buf_discard(buf);
1199exit:
1200				tipc_node_unlock(node);
1201				read_unlock_bh(&tipc_net_lock);
1202				return res;
1203			}
1204
1205			/* Exit if build request was invalid */
1206
1207			if (unlikely(res < 0))
1208				goto exit;
1209
1210			/* Exit if link (or bearer) is congested */
1211
1212			if (link_congested(l_ptr) ||
1213			    !list_empty(&l_ptr->b_ptr->cong_links)) {
1214				res = link_schedule_port(l_ptr,
1215							 sender->publ.ref, res);
1216				goto exit;
1217			}
1218
1219			/*
1220			 * Message size exceeds max_pkt hint; update hint,
1221			 * then re-try fast path or fragment the message
1222			 */
1223
1224			sender->publ.max_pkt = l_ptr->max_pkt;
1225			tipc_node_unlock(node);
1226			read_unlock_bh(&tipc_net_lock);
1227
1228
1229			if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt)
1230				goto again;
1231
1232			return link_send_sections_long(sender, msg_sect,
1233						       num_sect, destaddr);
1234		}
1235		tipc_node_unlock(node);
1236	}
1237	read_unlock_bh(&tipc_net_lock);
1238
1239	/* Couldn't find a link to the destination node */
1240
1241	if (buf)
1242		return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1243	if (res >= 0)
1244		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1245						 TIPC_ERR_NO_NODE);
1246	return res;
1247}
1248
1249/*
1250 * link_send_sections_long(): Entry for long messages where the
1251 * destination node is known and the header is complete,
1252 * inclusive total message length.
1253 * Link and bearer congestion status have been checked to be ok,
1254 * and are ignored if they change.
1255 *
1256 * Note that fragments do not use the full link MTU so that they won't have
1257 * to undergo refragmentation if link changeover causes them to be sent
1258 * over another link with an additional tunnel header added as prefix.
1259 * (Refragmentation will still occur if the other link has a smaller MTU.)
1260 *
1261 * Returns user data length or errno.
1262 */
1263static int link_send_sections_long(struct port *sender,
1264				   struct iovec const *msg_sect,
1265				   u32 num_sect,
1266				   u32 destaddr)
1267{
1268	struct link *l_ptr;
1269	struct tipc_node *node;
1270	struct tipc_msg *hdr = &sender->publ.phdr;
1271	u32 dsz = msg_data_sz(hdr);
1272	u32 max_pkt,fragm_sz,rest;
1273	struct tipc_msg fragm_hdr;
1274	struct sk_buff *buf,*buf_chain,*prev;
1275	u32 fragm_crs,fragm_rest,hsz,sect_rest;
1276	const unchar *sect_crs;
1277	int curr_sect;
1278	u32 fragm_no;
1279
1280again:
1281	fragm_no = 1;
1282	max_pkt = sender->publ.max_pkt - INT_H_SIZE;
1283		/* leave room for tunnel header in case of link changeover */
1284	fragm_sz = max_pkt - INT_H_SIZE;
1285		/* leave room for fragmentation header in each fragment */
1286	rest = dsz;
1287	fragm_crs = 0;
1288	fragm_rest = 0;
1289	sect_rest = 0;
1290	sect_crs = NULL;
1291	curr_sect = -1;
1292
1293	/* Prepare reusable fragment header: */
1294
1295	msg_dbg(hdr, ">FRAGMENTING>");
1296	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1297		 INT_H_SIZE, msg_destnode(hdr));
1298	msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1299	msg_set_size(&fragm_hdr, max_pkt);
1300	msg_set_fragm_no(&fragm_hdr, 1);
1301
1302	/* Prepare header of first fragment: */
1303
1304	buf_chain = buf = tipc_buf_acquire(max_pkt);
1305	if (!buf)
1306		return -ENOMEM;
1307	buf->next = NULL;
1308	skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1309	hsz = msg_hdr_sz(hdr);
1310	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1311	msg_dbg(buf_msg(buf), ">BUILD>");
1312
1313	/* Chop up message: */
1314
1315	fragm_crs = INT_H_SIZE + hsz;
1316	fragm_rest = fragm_sz - hsz;
1317
1318	do {		/* For all sections */
1319		u32 sz;
1320
1321		if (!sect_rest) {
1322			sect_rest = msg_sect[++curr_sect].iov_len;
1323			sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1324		}
1325
1326		if (sect_rest < fragm_rest)
1327			sz = sect_rest;
1328		else
1329			sz = fragm_rest;
1330
1331		if (likely(!sender->user_port)) {
1332			if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1333error:
1334				for (; buf_chain; buf_chain = buf) {
1335					buf = buf_chain->next;
1336					buf_discard(buf_chain);
1337				}
1338				return -EFAULT;
1339			}
1340		} else
1341			skb_copy_to_linear_data_offset(buf, fragm_crs,
1342						       sect_crs, sz);
1343		sect_crs += sz;
1344		sect_rest -= sz;
1345		fragm_crs += sz;
1346		fragm_rest -= sz;
1347		rest -= sz;
1348
1349		if (!fragm_rest && rest) {
1350
1351			/* Initiate new fragment: */
1352			if (rest <= fragm_sz) {
1353				fragm_sz = rest;
1354				msg_set_type(&fragm_hdr,LAST_FRAGMENT);
1355			} else {
1356				msg_set_type(&fragm_hdr, FRAGMENT);
1357			}
1358			msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1359			msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1360			prev = buf;
1361			buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1362			if (!buf)
1363				goto error;
1364
1365			buf->next = NULL;
1366			prev->next = buf;
1367			skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1368			fragm_crs = INT_H_SIZE;
1369			fragm_rest = fragm_sz;
1370			msg_dbg(buf_msg(buf),"  >BUILD>");
1371		}
1372	}
1373	while (rest > 0);
1374
1375	/*
1376	 * Now we have a buffer chain. Select a link and check
1377	 * that packet size is still OK
1378	 */
1379	node = tipc_node_select(destaddr, sender->publ.ref & 1);
1380	if (likely(node)) {
1381		tipc_node_lock(node);
1382		l_ptr = node->active_links[sender->publ.ref & 1];
1383		if (!l_ptr) {
1384			tipc_node_unlock(node);
1385			goto reject;
1386		}
1387		if (l_ptr->max_pkt < max_pkt) {
1388			sender->publ.max_pkt = l_ptr->max_pkt;
1389			tipc_node_unlock(node);
1390			for (; buf_chain; buf_chain = buf) {
1391				buf = buf_chain->next;
1392				buf_discard(buf_chain);
1393			}
1394			goto again;
1395		}
1396	} else {
1397reject:
1398		for (; buf_chain; buf_chain = buf) {
1399			buf = buf_chain->next;
1400			buf_discard(buf_chain);
1401		}
1402		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1403						 TIPC_ERR_NO_NODE);
1404	}
1405
1406	/* Append whole chain to send queue: */
1407
1408	buf = buf_chain;
1409	l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1410	if (!l_ptr->next_out)
1411		l_ptr->next_out = buf_chain;
1412	l_ptr->stats.sent_fragmented++;
1413	while (buf) {
1414		struct sk_buff *next = buf->next;
1415		struct tipc_msg *msg = buf_msg(buf);
1416
1417		l_ptr->stats.sent_fragments++;
1418		msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1419		link_add_to_outqueue(l_ptr, buf, msg);
1420		msg_dbg(msg, ">ADD>");
1421		buf = next;
1422	}
1423
1424	/* Send it, if possible: */
1425
1426	tipc_link_push_queue(l_ptr);
1427	tipc_node_unlock(node);
1428	return dsz;
1429}
1430
1431/*
1432 * tipc_link_push_packet: Push one unsent packet to the media
1433 */
1434u32 tipc_link_push_packet(struct link *l_ptr)
1435{
1436	struct sk_buff *buf = l_ptr->first_out;
1437	u32 r_q_size = l_ptr->retransm_queue_size;
1438	u32 r_q_head = l_ptr->retransm_queue_head;
1439
1440	/* Step to position where retransmission failed, if any,    */
1441	/* consider that buffers may have been released in meantime */
1442
1443	if (r_q_size && buf) {
1444		u32 last = lesser(mod(r_q_head + r_q_size),
1445				  link_last_sent(l_ptr));
1446		u32 first = msg_seqno(buf_msg(buf));
1447
1448		while (buf && less(first, r_q_head)) {
1449			first = mod(first + 1);
1450			buf = buf->next;
1451		}
1452		l_ptr->retransm_queue_head = r_q_head = first;
1453		l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1454	}
1455
1456	/* Continue retransmission now, if there is anything: */
1457
1458	if (r_q_size && buf) {
1459		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1460		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1461		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1462			msg_dbg(buf_msg(buf), ">DEF-RETR>");
1463			l_ptr->retransm_queue_head = mod(++r_q_head);
1464			l_ptr->retransm_queue_size = --r_q_size;
1465			l_ptr->stats.retransmitted++;
1466			return 0;
1467		} else {
1468			l_ptr->stats.bearer_congs++;
1469			msg_dbg(buf_msg(buf), "|>DEF-RETR>");
1470			return PUSH_FAILED;
1471		}
1472	}
1473
1474	/* Send deferred protocol message, if any: */
1475
1476	buf = l_ptr->proto_msg_queue;
1477	if (buf) {
1478		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1479		msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);
1480		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1481			msg_dbg(buf_msg(buf), ">DEF-PROT>");
1482			l_ptr->unacked_window = 0;
1483			buf_discard(buf);
1484			l_ptr->proto_msg_queue = NULL;
1485			return 0;
1486		} else {
1487			msg_dbg(buf_msg(buf), "|>DEF-PROT>");
1488			l_ptr->stats.bearer_congs++;
1489			return PUSH_FAILED;
1490		}
1491	}
1492
1493	/* Send one deferred data message, if send window not full: */
1494
1495	buf = l_ptr->next_out;
1496	if (buf) {
1497		struct tipc_msg *msg = buf_msg(buf);
1498		u32 next = msg_seqno(msg);
1499		u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1500
1501		if (mod(next - first) < l_ptr->queue_limit[0]) {
1502			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1503			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1504			if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1505				if (msg_user(msg) == MSG_BUNDLER)
1506					msg_set_type(msg, CLOSED_MSG);
1507				msg_dbg(msg, ">PUSH-DATA>");
1508				l_ptr->next_out = buf->next;
1509				return 0;
1510			} else {
1511				msg_dbg(msg, "|PUSH-DATA|");
1512				l_ptr->stats.bearer_congs++;
1513				return PUSH_FAILED;
1514			}
1515		}
1516	}
1517	return PUSH_FINISHED;
1518}
1519
1520/*
1521 * push_queue(): push out the unsent messages of a link where
1522 *               congestion has abated. Node is locked
1523 */
1524void tipc_link_push_queue(struct link *l_ptr)
1525{
1526	u32 res;
1527
1528	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1529		return;
1530
1531	do {
1532		res = tipc_link_push_packet(l_ptr);
1533	} while (!res);
1534
1535	if (res == PUSH_FAILED)
1536		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1537}
1538
1539static void link_reset_all(unsigned long addr)
1540{
1541	struct tipc_node *n_ptr;
1542	char addr_string[16];
1543	u32 i;
1544
1545	read_lock_bh(&tipc_net_lock);
1546	n_ptr = tipc_node_find((u32)addr);
1547	if (!n_ptr) {
1548		read_unlock_bh(&tipc_net_lock);
1549		return;	/* node no longer exists */
1550	}
1551
1552	tipc_node_lock(n_ptr);
1553
1554	warn("Resetting all links to %s\n",
1555	     tipc_addr_string_fill(addr_string, n_ptr->addr));
1556
1557	for (i = 0; i < MAX_BEARERS; i++) {
1558		if (n_ptr->links[i]) {
1559			link_print(n_ptr->links[i], TIPC_OUTPUT,
1560				   "Resetting link\n");
1561			tipc_link_reset(n_ptr->links[i]);
1562		}
1563	}
1564
1565	tipc_node_unlock(n_ptr);
1566	read_unlock_bh(&tipc_net_lock);
1567}
1568
1569static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1570{
1571	struct tipc_msg *msg = buf_msg(buf);
1572
1573	warn("Retransmission failure on link <%s>\n", l_ptr->name);
1574	tipc_msg_dbg(TIPC_OUTPUT, msg, ">RETR-FAIL>");
1575
1576	if (l_ptr->addr) {
1577
1578		/* Handle failure on standard link */
1579
1580		link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");
1581		tipc_link_reset(l_ptr);
1582
1583	} else {
1584
1585		/* Handle failure on broadcast link */
1586
1587		struct tipc_node *n_ptr;
1588		char addr_string[16];
1589
1590		tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));
1591		tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",
1592				     (unsigned long) TIPC_SKB_CB(buf)->handle);
1593
1594		n_ptr = l_ptr->owner->next;
1595		tipc_node_lock(n_ptr);
1596
1597		tipc_addr_string_fill(addr_string, n_ptr->addr);
1598		tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);
1599		tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);
1600		tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);
1601		tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);
1602		tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);
1603		tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);
1604		tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1605
1606		tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1607
1608		tipc_node_unlock(n_ptr);
1609
1610		l_ptr->stale_count = 0;
1611	}
1612}
1613
1614void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
1615			  u32 retransmits)
1616{
1617	struct tipc_msg *msg;
1618
1619	if (!buf)
1620		return;
1621
1622	msg = buf_msg(buf);
1623
1624	dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);
1625
1626	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1627		if (l_ptr->retransm_queue_size == 0) {
1628			msg_dbg(msg, ">NO_RETR->BCONG>");
1629			dbg_print_link(l_ptr, "   ");
1630			l_ptr->retransm_queue_head = msg_seqno(msg);
1631			l_ptr->retransm_queue_size = retransmits;
1632		} else {
1633			err("Unexpected retransmit on link %s (qsize=%d)\n",
1634			    l_ptr->name, l_ptr->retransm_queue_size);
1635		}
1636		return;
1637	} else {
1638		/* Detect repeated retransmit failures on uncongested bearer */
1639
1640		if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1641			if (++l_ptr->stale_count > 100) {
1642				link_retransmit_failure(l_ptr, buf);
1643				return;
1644			}
1645		} else {
1646			l_ptr->last_retransmitted = msg_seqno(msg);
1647			l_ptr->stale_count = 1;
1648		}
1649	}
1650
1651	while (retransmits && (buf != l_ptr->next_out) && buf) {
1652		msg = buf_msg(buf);
1653		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1654		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1655		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1656			msg_dbg(buf_msg(buf), ">RETR>");
1657			buf = buf->next;
1658			retransmits--;
1659			l_ptr->stats.retransmitted++;
1660		} else {
1661			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1662			l_ptr->stats.bearer_congs++;
1663			l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1664			l_ptr->retransm_queue_size = retransmits;
1665			return;
1666		}
1667	}
1668
1669	l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1670}
1671
1672/**
1673 * link_insert_deferred_queue - insert deferred messages back into receive chain
1674 */
1675
1676static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
1677						  struct sk_buff *buf)
1678{
1679	u32 seq_no;
1680
1681	if (l_ptr->oldest_deferred_in == NULL)
1682		return buf;
1683
1684	seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1685	if (seq_no == mod(l_ptr->next_in_no)) {
1686		l_ptr->newest_deferred_in->next = buf;
1687		buf = l_ptr->oldest_deferred_in;
1688		l_ptr->oldest_deferred_in = NULL;
1689		l_ptr->deferred_inqueue_sz = 0;
1690	}
1691	return buf;
1692}
1693
1694/**
1695 * link_recv_buf_validate - validate basic format of received message
1696 *
1697 * This routine ensures a TIPC message has an acceptable header, and at least
1698 * as much data as the header indicates it should.  The routine also ensures
1699 * that the entire message header is stored in the main fragment of the message
1700 * buffer, to simplify future access to message header fields.
1701 *
1702 * Note: Having extra info present in the message header or data areas is OK.
1703 * TIPC will ignore the excess, under the assumption that it is optional info
1704 * introduced by a later release of the protocol.
1705 */
1706
1707static int link_recv_buf_validate(struct sk_buff *buf)
1708{
1709	static u32 min_data_hdr_size[8] = {
1710		SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE,
1711		MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1712		};
1713
1714	struct tipc_msg *msg;
1715	u32 tipc_hdr[2];
1716	u32 size;
1717	u32 hdr_size;
1718	u32 min_hdr_size;
1719
1720	if (unlikely(buf->len < MIN_H_SIZE))
1721		return 0;
1722
1723	msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1724	if (msg == NULL)
1725		return 0;
1726
1727	if (unlikely(msg_version(msg) != TIPC_VERSION))
1728		return 0;
1729
1730	size = msg_size(msg);
1731	hdr_size = msg_hdr_sz(msg);
1732	min_hdr_size = msg_isdata(msg) ?
1733		min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1734
1735	if (unlikely((hdr_size < min_hdr_size) ||
1736		     (size < hdr_size) ||
1737		     (buf->len < size) ||
1738		     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1739		return 0;
1740
1741	return pskb_may_pull(buf, hdr_size);
1742}
1743
1744/**
1745 * tipc_recv_msg - process TIPC messages arriving from off-node
1746 * @head: pointer to message buffer chain
1747 * @tb_ptr: pointer to bearer message arrived on
1748 *
1749 * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1750 * structure (i.e. cannot be NULL), but bearer can be inactive.
1751 */
1752
1753void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1754{
1755	read_lock_bh(&tipc_net_lock);
1756	while (head) {
1757		struct bearer *b_ptr = (struct bearer *)tb_ptr;
1758		struct tipc_node *n_ptr;
1759		struct link *l_ptr;
1760		struct sk_buff *crs;
1761		struct sk_buff *buf = head;
1762		struct tipc_msg *msg;
1763		u32 seq_no;
1764		u32 ackd;
1765		u32 released = 0;
1766		int type;
1767
1768		head = head->next;
1769
1770		/* Ensure bearer is still enabled */
1771
1772		if (unlikely(!b_ptr->active))
1773			goto cont;
1774
1775		/* Ensure message is well-formed */
1776
1777		if (unlikely(!link_recv_buf_validate(buf)))
1778			goto cont;
1779
1780		/* Ensure message data is a single contiguous unit */
1781
1782		if (unlikely(buf_linearize(buf))) {
1783			goto cont;
1784		}
1785
1786		/* Handle arrival of a non-unicast link message */
1787
1788		msg = buf_msg(buf);
1789
1790		if (unlikely(msg_non_seq(msg))) {
1791			if (msg_user(msg) ==  LINK_CONFIG)
1792				tipc_disc_recv_msg(buf, b_ptr);
1793			else
1794				tipc_bclink_recv_pkt(buf);
1795			continue;
1796		}
1797
1798		if (unlikely(!msg_short(msg) &&
1799			     (msg_destnode(msg) != tipc_own_addr)))
1800			goto cont;
1801
1802		/* Discard non-routeable messages destined for another node */
1803
1804		if (unlikely(!msg_isdata(msg) &&
1805			     (msg_destnode(msg) != tipc_own_addr))) {
1806			if ((msg_user(msg) != CONN_MANAGER) &&
1807			    (msg_user(msg) != MSG_FRAGMENTER))
1808				goto cont;
1809		}
1810
1811		/* Locate neighboring node that sent message */
1812
1813		n_ptr = tipc_node_find(msg_prevnode(msg));
1814		if (unlikely(!n_ptr))
1815			goto cont;
1816		tipc_node_lock(n_ptr);
1817
1818		/* Don't talk to neighbor during cleanup after last session */
1819
1820		if (n_ptr->cleanup_required) {
1821			tipc_node_unlock(n_ptr);
1822			goto cont;
1823		}
1824
1825		/* Locate unicast link endpoint that should handle message */
1826
1827		l_ptr = n_ptr->links[b_ptr->identity];
1828		if (unlikely(!l_ptr)) {
1829			tipc_node_unlock(n_ptr);
1830			goto cont;
1831		}
1832
1833		/* Validate message sequence number info */
1834
1835		seq_no = msg_seqno(msg);
1836		ackd = msg_ack(msg);
1837
1838		/* Release acked messages */
1839
1840		if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1841			if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1842				tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1843		}
1844
1845		crs = l_ptr->first_out;
1846		while ((crs != l_ptr->next_out) &&
1847		       less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1848			struct sk_buff *next = crs->next;
1849
1850			buf_discard(crs);
1851			crs = next;
1852			released++;
1853		}
1854		if (released) {
1855			l_ptr->first_out = crs;
1856			l_ptr->out_queue_size -= released;
1857		}
1858
1859		/* Try sending any messages link endpoint has pending */
1860
1861		if (unlikely(l_ptr->next_out))
1862			tipc_link_push_queue(l_ptr);
1863		if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1864			tipc_link_wakeup_ports(l_ptr, 0);
1865		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1866			l_ptr->stats.sent_acks++;
1867			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1868		}
1869
1870		/* Now (finally!) process the incoming message */
1871
1872protocol_check:
1873		if (likely(link_working_working(l_ptr))) {
1874			if (likely(seq_no == mod(l_ptr->next_in_no))) {
1875				l_ptr->next_in_no++;
1876				if (unlikely(l_ptr->oldest_deferred_in))
1877					head = link_insert_deferred_queue(l_ptr,
1878									  head);
1879				if (likely(msg_is_dest(msg, tipc_own_addr))) {
1880deliver:
1881					if (likely(msg_isdata(msg))) {
1882						tipc_node_unlock(n_ptr);
1883						tipc_port_recv_msg(buf);
1884						continue;
1885					}
1886					switch (msg_user(msg)) {
1887					case MSG_BUNDLER:
1888						l_ptr->stats.recv_bundles++;
1889						l_ptr->stats.recv_bundled +=
1890							msg_msgcnt(msg);
1891						tipc_node_unlock(n_ptr);
1892						tipc_link_recv_bundle(buf);
1893						continue;
1894					case ROUTE_DISTRIBUTOR:
1895						tipc_node_unlock(n_ptr);
1896						tipc_cltr_recv_routing_table(buf);
1897						continue;
1898					case NAME_DISTRIBUTOR:
1899						tipc_node_unlock(n_ptr);
1900						tipc_named_recv(buf);
1901						continue;
1902					case CONN_MANAGER:
1903						tipc_node_unlock(n_ptr);
1904						tipc_port_recv_proto_msg(buf);
1905						continue;
1906					case MSG_FRAGMENTER:
1907						l_ptr->stats.recv_fragments++;
1908						if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1909									    &buf, &msg)) {
1910							l_ptr->stats.recv_fragmented++;
1911							goto deliver;
1912						}
1913						break;
1914					case CHANGEOVER_PROTOCOL:
1915						type = msg_type(msg);
1916						if (link_recv_changeover_msg(&l_ptr, &buf)) {
1917							msg = buf_msg(buf);
1918							seq_no = msg_seqno(msg);
1919							if (type == ORIGINAL_MSG)
1920								goto deliver;
1921							goto protocol_check;
1922						}
1923						break;
1924					}
1925				}
1926				tipc_node_unlock(n_ptr);
1927				tipc_net_route_msg(buf);
1928				continue;
1929			}
1930			link_handle_out_of_seq_msg(l_ptr, buf);
1931			head = link_insert_deferred_queue(l_ptr, head);
1932			tipc_node_unlock(n_ptr);
1933			continue;
1934		}
1935
1936		if (msg_user(msg) == LINK_PROTOCOL) {
1937			link_recv_proto_msg(l_ptr, buf);
1938			head = link_insert_deferred_queue(l_ptr, head);
1939			tipc_node_unlock(n_ptr);
1940			continue;
1941		}
1942		msg_dbg(msg,"NSEQ<REC<");
1943		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1944
1945		if (link_working_working(l_ptr)) {
1946			/* Re-insert in front of queue */
1947			msg_dbg(msg,"RECV-REINS:");
1948			buf->next = head;
1949			head = buf;
1950			tipc_node_unlock(n_ptr);
1951			continue;
1952		}
1953		tipc_node_unlock(n_ptr);
1954cont:
1955		buf_discard(buf);
1956	}
1957	read_unlock_bh(&tipc_net_lock);
1958}
1959
1960/*
1961 * link_defer_buf(): Sort a received out-of-sequence packet
1962 *                   into the deferred reception queue.
1963 * Returns the increase of the queue length,i.e. 0 or 1
1964 */
1965
1966u32 tipc_link_defer_pkt(struct sk_buff **head,
1967			struct sk_buff **tail,
1968			struct sk_buff *buf)
1969{
1970	struct sk_buff *prev = NULL;
1971	struct sk_buff *crs = *head;
1972	u32 seq_no = msg_seqno(buf_msg(buf));
1973
1974	buf->next = NULL;
1975
1976	/* Empty queue ? */
1977	if (*head == NULL) {
1978		*head = *tail = buf;
1979		return 1;
1980	}
1981
1982	/* Last ? */
1983	if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
1984		(*tail)->next = buf;
1985		*tail = buf;
1986		return 1;
1987	}
1988
1989	/* Scan through queue and sort it in */
1990	do {
1991		struct tipc_msg *msg = buf_msg(crs);
1992
1993		if (less(seq_no, msg_seqno(msg))) {
1994			buf->next = crs;
1995			if (prev)
1996				prev->next = buf;
1997			else
1998				*head = buf;
1999			return 1;
2000		}
2001		if (seq_no == msg_seqno(msg)) {
2002			break;
2003		}
2004		prev = crs;
2005		crs = crs->next;
2006	}
2007	while (crs);
2008
2009	/* Message is a duplicate of an existing message */
2010
2011	buf_discard(buf);
2012	return 0;
2013}
2014
2015/**
2016 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
2017 */
2018
2019static void link_handle_out_of_seq_msg(struct link *l_ptr,
2020				       struct sk_buff *buf)
2021{
2022	u32 seq_no = msg_seqno(buf_msg(buf));
2023
2024	if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
2025		link_recv_proto_msg(l_ptr, buf);
2026		return;
2027	}
2028
2029	dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n",
2030	    seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no);
2031
2032	/* Record OOS packet arrival (force mismatch on next timeout) */
2033
2034	l_ptr->checkpoint--;
2035
2036	/*
2037	 * Discard packet if a duplicate; otherwise add it to deferred queue
2038	 * and notify peer of gap as per protocol specification
2039	 */
2040
2041	if (less(seq_no, mod(l_ptr->next_in_no))) {
2042		l_ptr->stats.duplicates++;
2043		buf_discard(buf);
2044		return;
2045	}
2046
2047	if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
2048				&l_ptr->newest_deferred_in, buf)) {
2049		l_ptr->deferred_inqueue_sz++;
2050		l_ptr->stats.deferred_recv++;
2051		if ((l_ptr->deferred_inqueue_sz % 16) == 1)
2052			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
2053	} else
2054		l_ptr->stats.duplicates++;
2055}
2056
2057/*
2058 * Send protocol message to the other endpoint.
2059 */
2060void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
2061			      u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
2062{
2063	struct sk_buff *buf = NULL;
2064	struct tipc_msg *msg = l_ptr->pmsg;
2065	u32 msg_size = sizeof(l_ptr->proto_msg);
2066
2067	if (link_blocked(l_ptr))
2068		return;
2069	msg_set_type(msg, msg_typ);
2070	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
2071	msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
2072	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
2073
2074	if (msg_typ == STATE_MSG) {
2075		u32 next_sent = mod(l_ptr->next_out_no);
2076
2077		if (!tipc_link_is_up(l_ptr))
2078			return;
2079		if (l_ptr->next_out)
2080			next_sent = msg_seqno(buf_msg(l_ptr->next_out));
2081		msg_set_next_sent(msg, next_sent);
2082		if (l_ptr->oldest_deferred_in) {
2083			u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
2084			gap = mod(rec - mod(l_ptr->next_in_no));
2085		}
2086		msg_set_seq_gap(msg, gap);
2087		if (gap)
2088			l_ptr->stats.sent_nacks++;
2089		msg_set_link_tolerance(msg, tolerance);
2090		msg_set_linkprio(msg, priority);
2091		msg_set_max_pkt(msg, ack_mtu);
2092		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
2093		msg_set_probe(msg, probe_msg != 0);
2094		if (probe_msg) {
2095			u32 mtu = l_ptr->max_pkt;
2096
2097			if ((mtu < l_ptr->max_pkt_target) &&
2098			    link_working_working(l_ptr) &&
2099			    l_ptr->fsm_msg_cnt) {
2100				msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2101				if (l_ptr->max_pkt_probes == 10) {
2102					l_ptr->max_pkt_target = (msg_size - 4);
2103					l_ptr->max_pkt_probes = 0;
2104					msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2105				}
2106				l_ptr->max_pkt_probes++;
2107			}
2108
2109			l_ptr->stats.sent_probes++;
2110		}
2111		l_ptr->stats.sent_states++;
2112	} else {		/* RESET_MSG or ACTIVATE_MSG */
2113		msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2114		msg_set_seq_gap(msg, 0);
2115		msg_set_next_sent(msg, 1);
2116		msg_set_link_tolerance(msg, l_ptr->tolerance);
2117		msg_set_linkprio(msg, l_ptr->priority);
2118		msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2119	}
2120
2121	if (tipc_node_has_redundant_links(l_ptr->owner)) {
2122		msg_set_redundant_link(msg);
2123	} else {
2124		msg_clear_redundant_link(msg);
2125	}
2126	msg_set_linkprio(msg, l_ptr->priority);
2127
2128	/* Ensure sequence number will not fit : */
2129
2130	msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2131
2132	/* Congestion? */
2133
2134	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2135		if (!l_ptr->proto_msg_queue) {
2136			l_ptr->proto_msg_queue =
2137				tipc_buf_acquire(sizeof(l_ptr->proto_msg));
2138		}
2139		buf = l_ptr->proto_msg_queue;
2140		if (!buf)
2141			return;
2142		skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2143		return;
2144	}
2145	msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
2146
2147	/* Message can be sent */
2148
2149	msg_dbg(msg, ">>");
2150
2151	buf = tipc_buf_acquire(msg_size);
2152	if (!buf)
2153		return;
2154
2155	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2156	msg_set_size(buf_msg(buf), msg_size);
2157
2158	if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2159		l_ptr->unacked_window = 0;
2160		buf_discard(buf);
2161		return;
2162	}
2163
2164	/* New congestion */
2165	tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2166	l_ptr->proto_msg_queue = buf;
2167	l_ptr->stats.bearer_congs++;
2168}
2169
2170/*
2171 * Receive protocol message :
2172 * Note that network plane id propagates through the network, and may
2173 * change at any time. The node with lowest address rules
2174 */
2175
2176static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2177{
2178	u32 rec_gap = 0;
2179	u32 max_pkt_info;
2180	u32 max_pkt_ack;
2181	u32 msg_tol;
2182	struct tipc_msg *msg = buf_msg(buf);
2183
2184	dbg("AT(%u):", jiffies_to_msecs(jiffies));
2185	msg_dbg(msg, "<<");
2186	if (link_blocked(l_ptr))
2187		goto exit;
2188
2189	/* record unnumbered packet arrival (force mismatch on next timeout) */
2190
2191	l_ptr->checkpoint--;
2192
2193	if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2194		if (tipc_own_addr > msg_prevnode(msg))
2195			l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2196
2197	l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2198
2199	switch (msg_type(msg)) {
2200
2201	case RESET_MSG:
2202		if (!link_working_unknown(l_ptr) &&
2203		    (l_ptr->peer_session != INVALID_SESSION)) {
2204			if (msg_session(msg) == l_ptr->peer_session) {
2205				dbg("Duplicate RESET: %u<->%u\n",
2206				    msg_session(msg), l_ptr->peer_session);
2207				break; /* duplicate: ignore */
2208			}
2209		}
2210		/* fall thru' */
2211	case ACTIVATE_MSG:
2212		/* Update link settings according other endpoint's values */
2213
2214		strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2215
2216		if ((msg_tol = msg_link_tolerance(msg)) &&
2217		    (msg_tol > l_ptr->tolerance))
2218			link_set_supervision_props(l_ptr, msg_tol);
2219
2220		if (msg_linkprio(msg) > l_ptr->priority)
2221			l_ptr->priority = msg_linkprio(msg);
2222
2223		max_pkt_info = msg_max_pkt(msg);
2224		if (max_pkt_info) {
2225			if (max_pkt_info < l_ptr->max_pkt_target)
2226				l_ptr->max_pkt_target = max_pkt_info;
2227			if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2228				l_ptr->max_pkt = l_ptr->max_pkt_target;
2229		} else {
2230			l_ptr->max_pkt = l_ptr->max_pkt_target;
2231		}
2232		l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2233
2234		link_state_event(l_ptr, msg_type(msg));
2235
2236		l_ptr->peer_session = msg_session(msg);
2237		l_ptr->peer_bearer_id = msg_bearer_id(msg);
2238
2239		/* Synchronize broadcast sequence numbers */
2240		if (!tipc_node_has_redundant_links(l_ptr->owner)) {
2241			l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2242		}
2243		break;
2244	case STATE_MSG:
2245
2246		if ((msg_tol = msg_link_tolerance(msg)))
2247			link_set_supervision_props(l_ptr, msg_tol);
2248
2249		if (msg_linkprio(msg) &&
2250		    (msg_linkprio(msg) != l_ptr->priority)) {
2251			warn("Resetting link <%s>, priority change %u->%u\n",
2252			     l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2253			l_ptr->priority = msg_linkprio(msg);
2254			tipc_link_reset(l_ptr); /* Enforce change to take effect */
2255			break;
2256		}
2257		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2258		l_ptr->stats.recv_states++;
2259		if (link_reset_unknown(l_ptr))
2260			break;
2261
2262		if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2263			rec_gap = mod(msg_next_sent(msg) -
2264				      mod(l_ptr->next_in_no));
2265		}
2266
2267		max_pkt_ack = msg_max_pkt(msg);
2268		if (max_pkt_ack > l_ptr->max_pkt) {
2269			dbg("Link <%s> updated MTU %u -> %u\n",
2270			    l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
2271			l_ptr->max_pkt = max_pkt_ack;
2272			l_ptr->max_pkt_probes = 0;
2273		}
2274
2275		max_pkt_ack = 0;
2276		if (msg_probe(msg)) {
2277			l_ptr->stats.recv_probes++;
2278			if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
2279				max_pkt_ack = msg_size(msg);
2280			}
2281		}
2282
2283		/* Protocol message before retransmits, reduce loss risk */
2284
2285		tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2286
2287		if (rec_gap || (msg_probe(msg))) {
2288			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2289						 0, rec_gap, 0, 0, max_pkt_ack);
2290		}
2291		if (msg_seq_gap(msg)) {
2292			msg_dbg(msg, "With Gap:");
2293			l_ptr->stats.recv_nacks++;
2294			tipc_link_retransmit(l_ptr, l_ptr->first_out,
2295					     msg_seq_gap(msg));
2296		}
2297		break;
2298	default:
2299		msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<");
2300	}
2301exit:
2302	buf_discard(buf);
2303}
2304
2305
2306/*
2307 * tipc_link_tunnel(): Send one message via a link belonging to
2308 * another bearer. Owner node is locked.
2309 */
2310static void tipc_link_tunnel(struct link *l_ptr,
2311			     struct tipc_msg *tunnel_hdr,
2312			     struct tipc_msg  *msg,
2313			     u32 selector)
2314{
2315	struct link *tunnel;
2316	struct sk_buff *buf;
2317	u32 length = msg_size(msg);
2318
2319	tunnel = l_ptr->owner->active_links[selector & 1];
2320	if (!tipc_link_is_up(tunnel)) {
2321		warn("Link changeover error, "
2322		     "tunnel link no longer available\n");
2323		return;
2324	}
2325	msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2326	buf = tipc_buf_acquire(length + INT_H_SIZE);
2327	if (!buf) {
2328		warn("Link changeover error, "
2329		     "unable to send tunnel msg\n");
2330		return;
2331	}
2332	skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2333	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2334	dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane);
2335	msg_dbg(buf_msg(buf), ">SEND>");
2336	tipc_link_send_buf(tunnel, buf);
2337}
2338
2339
2340
2341/*
2342 * changeover(): Send whole message queue via the remaining link
2343 *               Owner node is locked.
2344 */
2345
2346void tipc_link_changeover(struct link *l_ptr)
2347{
2348	u32 msgcount = l_ptr->out_queue_size;
2349	struct sk_buff *crs = l_ptr->first_out;
2350	struct link *tunnel = l_ptr->owner->active_links[0];
2351	struct tipc_msg tunnel_hdr;
2352	int split_bundles;
2353
2354	if (!tunnel)
2355		return;
2356
2357	if (!l_ptr->owner->permit_changeover) {
2358		warn("Link changeover error, "
2359		     "peer did not permit changeover\n");
2360		return;
2361	}
2362
2363	tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2364		 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2365	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2366	msg_set_msgcnt(&tunnel_hdr, msgcount);
2367	dbg("Link changeover requires %u tunnel messages\n", msgcount);
2368
2369	if (!l_ptr->first_out) {
2370		struct sk_buff *buf;
2371
2372		buf = tipc_buf_acquire(INT_H_SIZE);
2373		if (buf) {
2374			skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2375			msg_set_size(&tunnel_hdr, INT_H_SIZE);
2376			dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2377			    tunnel->b_ptr->net_plane);
2378			msg_dbg(&tunnel_hdr, "EMPTY>SEND>");
2379			tipc_link_send_buf(tunnel, buf);
2380		} else {
2381			warn("Link changeover error, "
2382			     "unable to send changeover msg\n");
2383		}
2384		return;
2385	}
2386
2387	split_bundles = (l_ptr->owner->active_links[0] !=
2388			 l_ptr->owner->active_links[1]);
2389
2390	while (crs) {
2391		struct tipc_msg *msg = buf_msg(crs);
2392
2393		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2394			struct tipc_msg *m = msg_get_wrapped(msg);
2395			unchar* pos = (unchar*)m;
2396
2397			msgcount = msg_msgcnt(msg);
2398			while (msgcount--) {
2399				msg_set_seqno(m,msg_seqno(msg));
2400				tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2401						 msg_link_selector(m));
2402				pos += align(msg_size(m));
2403				m = (struct tipc_msg *)pos;
2404			}
2405		} else {
2406			tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2407					 msg_link_selector(msg));
2408		}
2409		crs = crs->next;
2410	}
2411}
2412
2413void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2414{
2415	struct sk_buff *iter;
2416	struct tipc_msg tunnel_hdr;
2417
2418	tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2419		 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2420	msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2421	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2422	iter = l_ptr->first_out;
2423	while (iter) {
2424		struct sk_buff *outbuf;
2425		struct tipc_msg *msg = buf_msg(iter);
2426		u32 length = msg_size(msg);
2427
2428		if (msg_user(msg) == MSG_BUNDLER)
2429			msg_set_type(msg, CLOSED_MSG);
2430		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));	/* Update */
2431		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2432		msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2433		outbuf = tipc_buf_acquire(length + INT_H_SIZE);
2434		if (outbuf == NULL) {
2435			warn("Link changeover error, "
2436			     "unable to send duplicate msg\n");
2437			return;
2438		}
2439		skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2440		skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2441					       length);
2442		dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2443		    tunnel->b_ptr->net_plane);
2444		msg_dbg(buf_msg(outbuf), ">SEND>");
2445		tipc_link_send_buf(tunnel, outbuf);
2446		if (!tipc_link_is_up(l_ptr))
2447			return;
2448		iter = iter->next;
2449	}
2450}
2451
2452
2453
2454/**
2455 * buf_extract - extracts embedded TIPC message from another message
2456 * @skb: encapsulating message buffer
2457 * @from_pos: offset to extract from
2458 *
2459 * Returns a new message buffer containing an embedded message.  The
2460 * encapsulating message itself is left unchanged.
2461 */
2462
2463static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2464{
2465	struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2466	u32 size = msg_size(msg);
2467	struct sk_buff *eb;
2468
2469	eb = tipc_buf_acquire(size);
2470	if (eb)
2471		skb_copy_to_linear_data(eb, msg, size);
2472	return eb;
2473}
2474
2475/*
2476 *  link_recv_changeover_msg(): Receive tunneled packet sent
2477 *  via other link. Node is locked. Return extracted buffer.
2478 */
2479
2480static int link_recv_changeover_msg(struct link **l_ptr,
2481				    struct sk_buff **buf)
2482{
2483	struct sk_buff *tunnel_buf = *buf;
2484	struct link *dest_link;
2485	struct tipc_msg *msg;
2486	struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2487	u32 msg_typ = msg_type(tunnel_msg);
2488	u32 msg_count = msg_msgcnt(tunnel_msg);
2489
2490	dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2491	if (!dest_link) {
2492		msg_dbg(tunnel_msg, "NOLINK/<REC<");
2493		goto exit;
2494	}
2495	if (dest_link == *l_ptr) {
2496		err("Unexpected changeover message on link <%s>\n",
2497		    (*l_ptr)->name);
2498		goto exit;
2499	}
2500	dbg("%c<-%c:", dest_link->b_ptr->net_plane,
2501	    (*l_ptr)->b_ptr->net_plane);
2502	*l_ptr = dest_link;
2503	msg = msg_get_wrapped(tunnel_msg);
2504
2505	if (msg_typ == DUPLICATE_MSG) {
2506		if (less(msg_seqno(msg), mod(dest_link->next_in_no))) {
2507			msg_dbg(tunnel_msg, "DROP/<REC<");
2508			goto exit;
2509		}
2510		*buf = buf_extract(tunnel_buf,INT_H_SIZE);
2511		if (*buf == NULL) {
2512			warn("Link changeover error, duplicate msg dropped\n");
2513			goto exit;
2514		}
2515		msg_dbg(tunnel_msg, "TNL<REC<");
2516		buf_discard(tunnel_buf);
2517		return 1;
2518	}
2519
2520	/* First original message ?: */
2521
2522	if (tipc_link_is_up(dest_link)) {
2523		msg_dbg(tunnel_msg, "UP/FIRST/<REC<");
2524		info("Resetting link <%s>, changeover initiated by peer\n",
2525		     dest_link->name);
2526		tipc_link_reset(dest_link);
2527		dest_link->exp_msg_count = msg_count;
2528		dbg("Expecting %u tunnelled messages\n", msg_count);
2529		if (!msg_count)
2530			goto exit;
2531	} else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2532		msg_dbg(tunnel_msg, "BLK/FIRST/<REC<");
2533		dest_link->exp_msg_count = msg_count;
2534		dbg("Expecting %u tunnelled messages\n", msg_count);
2535		if (!msg_count)
2536			goto exit;
2537	}
2538
2539	/* Receive original message */
2540
2541	if (dest_link->exp_msg_count == 0) {
2542		warn("Link switchover error, "
2543		     "got too many tunnelled messages\n");
2544		msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<");
2545		dbg_print_link(dest_link, "LINK:");
2546		goto exit;
2547	}
2548	dest_link->exp_msg_count--;
2549	if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2550		msg_dbg(tunnel_msg, "DROP/DUPL/<REC<");
2551		goto exit;
2552	} else {
2553		*buf = buf_extract(tunnel_buf, INT_H_SIZE);
2554		if (*buf != NULL) {
2555			msg_dbg(tunnel_msg, "TNL<REC<");
2556			buf_discard(tunnel_buf);
2557			return 1;
2558		} else {
2559			warn("Link changeover error, original msg dropped\n");
2560		}
2561	}
2562exit:
2563	*buf = NULL;
2564	buf_discard(tunnel_buf);
2565	return 0;
2566}
2567
2568/*
2569 *  Bundler functionality:
2570 */
2571void tipc_link_recv_bundle(struct sk_buff *buf)
2572{
2573	u32 msgcount = msg_msgcnt(buf_msg(buf));
2574	u32 pos = INT_H_SIZE;
2575	struct sk_buff *obuf;
2576
2577	msg_dbg(buf_msg(buf), "<BNDL<: ");
2578	while (msgcount--) {
2579		obuf = buf_extract(buf, pos);
2580		if (obuf == NULL) {
2581			warn("Link unable to unbundle message(s)\n");
2582			break;
2583		}
2584		pos += align(msg_size(buf_msg(obuf)));
2585		msg_dbg(buf_msg(obuf), "     /");
2586		tipc_net_route_msg(obuf);
2587	}
2588	buf_discard(buf);
2589}
2590
2591/*
2592 *  Fragmentation/defragmentation:
2593 */
2594
2595
2596/*
2597 * link_send_long_buf: Entry for buffers needing fragmentation.
2598 * The buffer is complete, inclusive total message length.
2599 * Returns user data length.
2600 */
2601static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2602{
2603	struct tipc_msg *inmsg = buf_msg(buf);
2604	struct tipc_msg fragm_hdr;
2605	u32 insize = msg_size(inmsg);
2606	u32 dsz = msg_data_sz(inmsg);
2607	unchar *crs = buf->data;
2608	u32 rest = insize;
2609	u32 pack_sz = l_ptr->max_pkt;
2610	u32 fragm_sz = pack_sz - INT_H_SIZE;
2611	u32 fragm_no = 1;
2612	u32 destaddr;
2613
2614	if (msg_short(inmsg))
2615		destaddr = l_ptr->addr;
2616	else
2617		destaddr = msg_destnode(inmsg);
2618
2619	if (msg_routed(inmsg))
2620		msg_set_prevnode(inmsg, tipc_own_addr);
2621
2622	/* Prepare reusable fragment header: */
2623
2624	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2625		 INT_H_SIZE, destaddr);
2626	msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2627	msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2628	msg_set_fragm_no(&fragm_hdr, fragm_no);
2629	l_ptr->stats.sent_fragmented++;
2630
2631	/* Chop up message: */
2632
2633	while (rest > 0) {
2634		struct sk_buff *fragm;
2635
2636		if (rest <= fragm_sz) {
2637			fragm_sz = rest;
2638			msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2639		}
2640		fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2641		if (fragm == NULL) {
2642			warn("Link unable to fragment message\n");
2643			dsz = -ENOMEM;
2644			goto exit;
2645		}
2646		msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2647		skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2648		skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2649					       fragm_sz);
2650		/*  Send queued messages first, if any: */
2651
2652		l_ptr->stats.sent_fragments++;
2653		tipc_link_send_buf(l_ptr, fragm);
2654		if (!tipc_link_is_up(l_ptr))
2655			return dsz;
2656		msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2657		rest -= fragm_sz;
2658		crs += fragm_sz;
2659		msg_set_type(&fragm_hdr, FRAGMENT);
2660	}
2661exit:
2662	buf_discard(buf);
2663	return dsz;
2664}
2665
2666/*
2667 * A pending message being re-assembled must store certain values
2668 * to handle subsequent fragments correctly. The following functions
2669 * help storing these values in unused, available fields in the
2670 * pending message. This makes dynamic memory allocation unecessary.
2671 */
2672
2673static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2674{
2675	msg_set_seqno(buf_msg(buf), seqno);
2676}
2677
2678static u32 get_fragm_size(struct sk_buff *buf)
2679{
2680	return msg_ack(buf_msg(buf));
2681}
2682
2683static void set_fragm_size(struct sk_buff *buf, u32 sz)
2684{
2685	msg_set_ack(buf_msg(buf), sz);
2686}
2687
2688static u32 get_expected_frags(struct sk_buff *buf)
2689{
2690	return msg_bcast_ack(buf_msg(buf));
2691}
2692
2693static void set_expected_frags(struct sk_buff *buf, u32 exp)
2694{
2695	msg_set_bcast_ack(buf_msg(buf), exp);
2696}
2697
2698static u32 get_timer_cnt(struct sk_buff *buf)
2699{
2700	return msg_reroute_cnt(buf_msg(buf));
2701}
2702
2703static void incr_timer_cnt(struct sk_buff *buf)
2704{
2705	msg_incr_reroute_cnt(buf_msg(buf));
2706}
2707
2708/*
2709 * tipc_link_recv_fragment(): Called with node lock on. Returns
2710 * the reassembled buffer if message is complete.
2711 */
2712int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2713			    struct tipc_msg **m)
2714{
2715	struct sk_buff *prev = NULL;
2716	struct sk_buff *fbuf = *fb;
2717	struct tipc_msg *fragm = buf_msg(fbuf);
2718	struct sk_buff *pbuf = *pending;
2719	u32 long_msg_seq_no = msg_long_msgno(fragm);
2720
2721	*fb = NULL;
2722	msg_dbg(fragm,"FRG<REC<");
2723
2724	/* Is there an incomplete message waiting for this fragment? */
2725
2726	while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) ||
2727			(msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2728		prev = pbuf;
2729		pbuf = pbuf->next;
2730	}
2731
2732	if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2733		struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2734		u32 msg_sz = msg_size(imsg);
2735		u32 fragm_sz = msg_data_sz(fragm);
2736		u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2737		u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2738		if (msg_type(imsg) == TIPC_MCAST_MSG)
2739			max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2740		if (msg_size(imsg) > max) {
2741			msg_dbg(fragm,"<REC<Oversized: ");
2742			buf_discard(fbuf);
2743			return 0;
2744		}
2745		pbuf = tipc_buf_acquire(msg_size(imsg));
2746		if (pbuf != NULL) {
2747			pbuf->next = *pending;
2748			*pending = pbuf;
2749			skb_copy_to_linear_data(pbuf, imsg,
2750						msg_data_sz(fragm));
2751			/*  Prepare buffer for subsequent fragments. */
2752
2753			set_long_msg_seqno(pbuf, long_msg_seq_no);
2754			set_fragm_size(pbuf,fragm_sz);
2755			set_expected_frags(pbuf,exp_fragm_cnt - 1);
2756		} else {
2757			warn("Link unable to reassemble fragmented message\n");
2758		}
2759		buf_discard(fbuf);
2760		return 0;
2761	} else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2762		u32 dsz = msg_data_sz(fragm);
2763		u32 fsz = get_fragm_size(pbuf);
2764		u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2765		u32 exp_frags = get_expected_frags(pbuf) - 1;
2766		skb_copy_to_linear_data_offset(pbuf, crs,
2767					       msg_data(fragm), dsz);
2768		buf_discard(fbuf);
2769
2770		/* Is message complete? */
2771
2772		if (exp_frags == 0) {
2773			if (prev)
2774				prev->next = pbuf->next;
2775			else
2776				*pending = pbuf->next;
2777			msg_reset_reroute_cnt(buf_msg(pbuf));
2778			*fb = pbuf;
2779			*m = buf_msg(pbuf);
2780			return 1;
2781		}
2782		set_expected_frags(pbuf,exp_frags);
2783		return 0;
2784	}
2785	dbg(" Discarding orphan fragment %x\n",fbuf);
2786	msg_dbg(fragm,"ORPHAN:");
2787	dbg("Pending long buffers:\n");
2788	dbg_print_buf_chain(*pending);
2789	buf_discard(fbuf);
2790	return 0;
2791}
2792
2793/**
2794 * link_check_defragm_bufs - flush stale incoming message fragments
2795 * @l_ptr: pointer to link
2796 */
2797
2798static void link_check_defragm_bufs(struct link *l_ptr)
2799{
2800	struct sk_buff *prev = NULL;
2801	struct sk_buff *next = NULL;
2802	struct sk_buff *buf = l_ptr->defragm_buf;
2803
2804	if (!buf)
2805		return;
2806	if (!link_working_working(l_ptr))
2807		return;
2808	while (buf) {
2809		u32 cnt = get_timer_cnt(buf);
2810
2811		next = buf->next;
2812		if (cnt < 4) {
2813			incr_timer_cnt(buf);
2814			prev = buf;
2815		} else {
2816			dbg(" Discarding incomplete long buffer\n");
2817			msg_dbg(buf_msg(buf), "LONG:");
2818			dbg_print_link(l_ptr, "curr:");
2819			dbg("Pending long buffers:\n");
2820			dbg_print_buf_chain(l_ptr->defragm_buf);
2821			if (prev)
2822				prev->next = buf->next;
2823			else
2824				l_ptr->defragm_buf = buf->next;
2825			buf_discard(buf);
2826		}
2827		buf = next;
2828	}
2829}
2830
2831
2832
2833static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2834{
2835	l_ptr->tolerance = tolerance;
2836	l_ptr->continuity_interval =
2837		((tolerance / 4) > 500) ? 500 : tolerance / 4;
2838	l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2839}
2840
2841
2842void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2843{
2844	/* Data messages from this node, inclusive FIRST_FRAGM */
2845	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2846	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2847	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2848	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2849	/* Transiting data messages,inclusive FIRST_FRAGM */
2850	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2851	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2852	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2853	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2854	l_ptr->queue_limit[CONN_MANAGER] = 1200;
2855	l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200;
2856	l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2857	l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2858	/* FRAGMENT and LAST_FRAGMENT packets */
2859	l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2860}
2861
2862/**
2863 * link_find_link - locate link by name
2864 * @name - ptr to link name string
2865 * @node - ptr to area to be filled with ptr to associated node
2866 *
2867 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2868 * this also prevents link deletion.
2869 *
2870 * Returns pointer to link (or 0 if invalid link name).
2871 */
2872
2873static struct link *link_find_link(const char *name, struct tipc_node **node)
2874{
2875	struct link_name link_name_parts;
2876	struct bearer *b_ptr;
2877	struct link *l_ptr;
2878
2879	if (!link_name_validate(name, &link_name_parts))
2880		return NULL;
2881
2882	b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2883	if (!b_ptr)
2884		return NULL;
2885
2886	*node = tipc_node_find(link_name_parts.addr_peer);
2887	if (!*node)
2888		return NULL;
2889
2890	l_ptr = (*node)->links[b_ptr->identity];
2891	if (!l_ptr || strcmp(l_ptr->name, name))
2892		return NULL;
2893
2894	return l_ptr;
2895}
2896
2897struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2898				     u16 cmd)
2899{
2900	struct tipc_link_config *args;
2901	u32 new_value;
2902	struct link *l_ptr;
2903	struct tipc_node *node;
2904	int res;
2905
2906	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2907		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2908
2909	args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2910	new_value = ntohl(args->value);
2911
2912	if (!strcmp(args->name, tipc_bclink_name)) {
2913		if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2914		    (tipc_bclink_set_queue_limits(new_value) == 0))
2915			return tipc_cfg_reply_none();
2916		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2917						   " (cannot change setting on broadcast link)");
2918	}
2919
2920	read_lock_bh(&tipc_net_lock);
2921	l_ptr = link_find_link(args->name, &node);
2922	if (!l_ptr) {
2923		read_unlock_bh(&tipc_net_lock);
2924		return tipc_cfg_reply_error_string("link not found");
2925	}
2926
2927	tipc_node_lock(node);
2928	res = -EINVAL;
2929	switch (cmd) {
2930	case TIPC_CMD_SET_LINK_TOL:
2931		if ((new_value >= TIPC_MIN_LINK_TOL) &&
2932		    (new_value <= TIPC_MAX_LINK_TOL)) {
2933			link_set_supervision_props(l_ptr, new_value);
2934			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2935						 0, 0, new_value, 0, 0);
2936			res = 0;
2937		}
2938		break;
2939	case TIPC_CMD_SET_LINK_PRI:
2940		if ((new_value >= TIPC_MIN_LINK_PRI) &&
2941		    (new_value <= TIPC_MAX_LINK_PRI)) {
2942			l_ptr->priority = new_value;
2943			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2944						 0, 0, 0, new_value, 0);
2945			res = 0;
2946		}
2947		break;
2948	case TIPC_CMD_SET_LINK_WINDOW:
2949		if ((new_value >= TIPC_MIN_LINK_WIN) &&
2950		    (new_value <= TIPC_MAX_LINK_WIN)) {
2951			tipc_link_set_queue_limits(l_ptr, new_value);
2952			res = 0;
2953		}
2954		break;
2955	}
2956	tipc_node_unlock(node);
2957
2958	read_unlock_bh(&tipc_net_lock);
2959	if (res)
2960		return tipc_cfg_reply_error_string("cannot change link setting");
2961
2962	return tipc_cfg_reply_none();
2963}
2964
2965/**
2966 * link_reset_statistics - reset link statistics
2967 * @l_ptr: pointer to link
2968 */
2969
2970static void link_reset_statistics(struct link *l_ptr)
2971{
2972	memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2973	l_ptr->stats.sent_info = l_ptr->next_out_no;
2974	l_ptr->stats.recv_info = l_ptr->next_in_no;
2975}
2976
2977struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2978{
2979	char *link_name;
2980	struct link *l_ptr;
2981	struct tipc_node *node;
2982
2983	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2984		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2985
2986	link_name = (char *)TLV_DATA(req_tlv_area);
2987	if (!strcmp(link_name, tipc_bclink_name)) {
2988		if (tipc_bclink_reset_stats())
2989			return tipc_cfg_reply_error_string("link not found");
2990		return tipc_cfg_reply_none();
2991	}
2992
2993	read_lock_bh(&tipc_net_lock);
2994	l_ptr = link_find_link(link_name, &node);
2995	if (!l_ptr) {
2996		read_unlock_bh(&tipc_net_lock);
2997		return tipc_cfg_reply_error_string("link not found");
2998	}
2999
3000	tipc_node_lock(node);
3001	link_reset_statistics(l_ptr);
3002	tipc_node_unlock(node);
3003	read_unlock_bh(&tipc_net_lock);
3004	return tipc_cfg_reply_none();
3005}
3006
3007/**
3008 * percent - convert count to a percentage of total (rounding up or down)
3009 */
3010
3011static u32 percent(u32 count, u32 total)
3012{
3013	return (count * 100 + (total / 2)) / total;
3014}
3015
3016/**
3017 * tipc_link_stats - print link statistics
3018 * @name: link name
3019 * @buf: print buffer area
3020 * @buf_size: size of print buffer area
3021 *
3022 * Returns length of print buffer data string (or 0 if error)
3023 */
3024
3025static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
3026{
3027	struct print_buf pb;
3028	struct link *l_ptr;
3029	struct tipc_node *node;
3030	char *status;
3031	u32 profile_total = 0;
3032
3033	if (!strcmp(name, tipc_bclink_name))
3034		return tipc_bclink_stats(buf, buf_size);
3035
3036	tipc_printbuf_init(&pb, buf, buf_size);
3037
3038	read_lock_bh(&tipc_net_lock);
3039	l_ptr = link_find_link(name, &node);
3040	if (!l_ptr) {
3041		read_unlock_bh(&tipc_net_lock);
3042		return 0;
3043	}
3044	tipc_node_lock(node);
3045
3046	if (tipc_link_is_active(l_ptr))
3047		status = "ACTIVE";
3048	else if (tipc_link_is_up(l_ptr))
3049		status = "STANDBY";
3050	else
3051		status = "DEFUNCT";
3052	tipc_printf(&pb, "Link <%s>\n"
3053			 "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
3054			 "  Window:%u packets\n",
3055		    l_ptr->name, status, l_ptr->max_pkt,
3056		    l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
3057	tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
3058		    l_ptr->next_in_no - l_ptr->stats.recv_info,
3059		    l_ptr->stats.recv_fragments,
3060		    l_ptr->stats.recv_fragmented,
3061		    l_ptr->stats.recv_bundles,
3062		    l_ptr->stats.recv_bundled);
3063	tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
3064		    l_ptr->next_out_no - l_ptr->stats.sent_info,
3065		    l_ptr->stats.sent_fragments,
3066		    l_ptr->stats.sent_fragmented,
3067		    l_ptr->stats.sent_bundles,
3068		    l_ptr->stats.sent_bundled);
3069	profile_total = l_ptr->stats.msg_length_counts;
3070	if (!profile_total)
3071		profile_total = 1;
3072	tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
3073			 "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
3074			 "-16354:%u%% -32768:%u%% -66000:%u%%\n",
3075		    l_ptr->stats.msg_length_counts,
3076		    l_ptr->stats.msg_lengths_total / profile_total,
3077		    percent(l_ptr->stats.msg_length_profile[0], profile_total),
3078		    percent(l_ptr->stats.msg_length_profile[1], profile_total),
3079		    percent(l_ptr->stats.msg_length_profile[2], profile_total),
3080		    percent(l_ptr->stats.msg_length_profile[3], profile_total),
3081		    percent(l_ptr->stats.msg_length_profile[4], profile_total),
3082		    percent(l_ptr->stats.msg_length_profile[5], profile_total),
3083		    percent(l_ptr->stats.msg_length_profile[6], profile_total));
3084	tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
3085		    l_ptr->stats.recv_states,
3086		    l_ptr->stats.recv_probes,
3087		    l_ptr->stats.recv_nacks,
3088		    l_ptr->stats.deferred_recv,
3089		    l_ptr->stats.duplicates);
3090	tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
3091		    l_ptr->stats.sent_states,
3092		    l_ptr->stats.sent_probes,
3093		    l_ptr->stats.sent_nacks,
3094		    l_ptr->stats.sent_acks,
3095		    l_ptr->stats.retransmitted);
3096	tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
3097		    l_ptr->stats.bearer_congs,
3098		    l_ptr->stats.link_congs,
3099		    l_ptr->stats.max_queue_sz,
3100		    l_ptr->stats.queue_sz_counts
3101		    ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
3102		    : 0);
3103
3104	tipc_node_unlock(node);
3105	read_unlock_bh(&tipc_net_lock);
3106	return tipc_printbuf_validate(&pb);
3107}
3108
3109#define MAX_LINK_STATS_INFO 2000
3110
3111struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
3112{
3113	struct sk_buff *buf;
3114	struct tlv_desc *rep_tlv;
3115	int str_len;
3116
3117	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3118		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3119
3120	buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
3121	if (!buf)
3122		return NULL;
3123
3124	rep_tlv = (struct tlv_desc *)buf->data;
3125
3126	str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
3127				  (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
3128	if (!str_len) {
3129		buf_discard(buf);
3130		return tipc_cfg_reply_error_string("link not found");
3131	}
3132
3133	skb_put(buf, TLV_SPACE(str_len));
3134	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3135
3136	return buf;
3137}
3138
3139/**
3140 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
3141 * @dest: network address of destination node
3142 * @selector: used to select from set of active links
3143 *
3144 * If no active link can be found, uses default maximum packet size.
3145 */
3146
3147u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
3148{
3149	struct tipc_node *n_ptr;
3150	struct link *l_ptr;
3151	u32 res = MAX_PKT_DEFAULT;
3152
3153	if (dest == tipc_own_addr)
3154		return MAX_MSG_SIZE;
3155
3156	read_lock_bh(&tipc_net_lock);
3157	n_ptr = tipc_node_select(dest, selector);
3158	if (n_ptr) {
3159		tipc_node_lock(n_ptr);
3160		l_ptr = n_ptr->active_links[selector & 1];
3161		if (l_ptr)
3162			res = l_ptr->max_pkt;
3163		tipc_node_unlock(n_ptr);
3164	}
3165	read_unlock_bh(&tipc_net_lock);
3166	return res;
3167}
3168
3169static void link_dump_send_queue(struct link *l_ptr)
3170{
3171	if (l_ptr->next_out) {
3172		info("\nContents of unsent queue:\n");
3173		dbg_print_buf_chain(l_ptr->next_out);
3174	}
3175	info("\nContents of send queue:\n");
3176	if (l_ptr->first_out) {
3177		dbg_print_buf_chain(l_ptr->first_out);
3178	}
3179	info("Empty send queue\n");
3180}
3181
3182static void link_print(struct link *l_ptr, struct print_buf *buf,
3183		       const char *str)
3184{
3185	tipc_printf(buf, str);
3186	if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3187		return;
3188	tipc_printf(buf, "Link %x<%s>:",
3189		    l_ptr->addr, l_ptr->b_ptr->publ.name);
3190	tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3191	tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3192	tipc_printf(buf, "SQUE");
3193	if (l_ptr->first_out) {
3194		tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
3195		if (l_ptr->next_out)
3196			tipc_printf(buf, "%u..",
3197				    msg_seqno(buf_msg(l_ptr->next_out)));
3198		tipc_printf(buf, "%u]", msg_seqno(buf_msg(l_ptr->last_out)));
3199		if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
3200			 msg_seqno(buf_msg(l_ptr->first_out)))
3201		     != (l_ptr->out_queue_size - 1)) ||
3202		    (l_ptr->last_out->next != NULL)) {
3203			tipc_printf(buf, "\nSend queue inconsistency\n");
3204			tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
3205			tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
3206			tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
3207			link_dump_send_queue(l_ptr);
3208		}
3209	} else
3210		tipc_printf(buf, "[]");
3211	tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3212	if (l_ptr->oldest_deferred_in) {
3213		u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
3214		u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
3215		tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3216		if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3217			tipc_printf(buf, ":RQSIZ(%u)",
3218				    l_ptr->deferred_inqueue_sz);
3219		}
3220	}
3221	if (link_working_unknown(l_ptr))
3222		tipc_printf(buf, ":WU");
3223	if (link_reset_reset(l_ptr))
3224		tipc_printf(buf, ":RR");
3225	if (link_reset_unknown(l_ptr))
3226		tipc_printf(buf, ":RU");
3227	if (link_working_working(l_ptr))
3228		tipc_printf(buf, ":WW");
3229	tipc_printf(buf, "\n");
3230}
3231
3232