bcast.c revision 3ac90216abc7d39e694533aec2805efeb06bf8ac
1/*
2 * net/tipc/bcast.c: TIPC broadcast code
3 *
4 * Copyright (c) 2004-2006, Ericsson AB
5 * Copyright (c) 2004, Intel Corporation.
6 * Copyright (c) 2005, Wind River Systems
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 *    notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 *    notice, this list of conditions and the following disclaimer in the
16 *    documentation and/or other materials provided with the distribution.
17 * 3. Neither the names of the copyright holders nor the names of its
18 *    contributors may be used to endorse or promote products derived from
19 *    this software without specific prior written permission.
20 *
21 * Alternatively, this software may be distributed under the terms of the
22 * GNU General Public License ("GPL") version 2 as published by the Free
23 * Software Foundation.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35 * POSSIBILITY OF SUCH DAMAGE.
36 */
37
38#include "core.h"
39#include "msg.h"
40#include "dbg.h"
41#include "link.h"
42#include "net.h"
43#include "node.h"
44#include "port.h"
45#include "addr.h"
46#include "node_subscr.h"
47#include "name_distr.h"
48#include "bearer.h"
49#include "name_table.h"
50#include "bcast.h"
51
52#define MAX_PKT_DEFAULT_MCAST 1500	/* bcast link max packet size (fixed) */
53
54#define BCLINK_WIN_DEFAULT 20		/* bcast link window size (default) */
55
56#define BCLINK_LOG_BUF_SIZE 0
57
58/*
59 * Loss rate for incoming broadcast frames; used to test retransmission code.
60 * Set to N to cause every N'th frame to be discarded; 0 => don't discard any.
61 */
62
63#define TIPC_BCAST_LOSS_RATE 0
64
65/**
66 * struct bcbearer_pair - a pair of bearers used by broadcast link
67 * @primary: pointer to primary bearer
68 * @secondary: pointer to secondary bearer
69 *
70 * Bearers must have same priority and same set of reachable destinations
71 * to be paired.
72 */
73
74struct bcbearer_pair {
75	struct bearer *primary;
76	struct bearer *secondary;
77};
78
79/**
80 * struct bcbearer - bearer used by broadcast link
81 * @bearer: (non-standard) broadcast bearer structure
82 * @media: (non-standard) broadcast media structure
83 * @bpairs: array of bearer pairs
84 * @bpairs_temp: array of bearer pairs used during creation of "bpairs"
85 */
86
87struct bcbearer {
88	struct bearer bearer;
89	struct media media;
90	struct bcbearer_pair bpairs[MAX_BEARERS];
91	struct bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
92};
93
94/**
95 * struct bclink - link used for broadcast messages
96 * @link: (non-standard) broadcast link structure
97 * @node: (non-standard) node structure representing b'cast link's peer node
98 *
99 * Handles sequence numbering, fragmentation, bundling, etc.
100 */
101
102struct bclink {
103	struct link link;
104	struct node node;
105};
106
107
108static struct bcbearer *bcbearer = NULL;
109static struct bclink *bclink = NULL;
110static struct link *bcl = NULL;
111static spinlock_t bc_lock = SPIN_LOCK_UNLOCKED;
112
113char tipc_bclink_name[] = "multicast-link";
114
115
116static u32 buf_seqno(struct sk_buff *buf)
117{
118	return msg_seqno(buf_msg(buf));
119}
120
121static u32 bcbuf_acks(struct sk_buff *buf)
122{
123	return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
124}
125
126static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
127{
128	TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
129}
130
131static void bcbuf_decr_acks(struct sk_buff *buf)
132{
133	bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
134}
135
136
137/**
138 * bclink_set_gap - set gap according to contents of current deferred pkt queue
139 *
140 * Called with 'node' locked, bc_lock unlocked
141 */
142
143static void bclink_set_gap(struct node *n_ptr)
144{
145	struct sk_buff *buf = n_ptr->bclink.deferred_head;
146
147	n_ptr->bclink.gap_after = n_ptr->bclink.gap_to =
148		mod(n_ptr->bclink.last_in);
149	if (unlikely(buf != NULL))
150		n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1);
151}
152
153/**
154 * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment
155 *
156 * This mechanism endeavours to prevent all nodes in network from trying
157 * to ACK or NACK at the same time.
158 *
159 * Note: TIPC uses a different trigger to distribute ACKs than it does to
160 *       distribute NACKs, but tries to use the same spacing (divide by 16).
161 */
162
163static int bclink_ack_allowed(u32 n)
164{
165	return((n % TIPC_MIN_LINK_WIN) == tipc_own_tag);
166}
167
168
169/**
170 * bclink_retransmit_pkt - retransmit broadcast packets
171 * @after: sequence number of last packet to *not* retransmit
172 * @to: sequence number of last packet to retransmit
173 *
174 * Called with bc_lock locked
175 */
176
177static void bclink_retransmit_pkt(u32 after, u32 to)
178{
179	struct sk_buff *buf;
180
181	buf = bcl->first_out;
182	while (buf && less_eq(buf_seqno(buf), after)) {
183		buf = buf->next;
184	}
185	tipc_link_retransmit(bcl, buf, mod(to - after));
186}
187
188/**
189 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
190 * @n_ptr: node that sent acknowledgement info
191 * @acked: broadcast sequence # that has been acknowledged
192 *
193 * Node is locked, bc_lock unlocked.
194 */
195
196void tipc_bclink_acknowledge(struct node *n_ptr, u32 acked)
197{
198	struct sk_buff *crs;
199	struct sk_buff *next;
200	unsigned int released = 0;
201
202	if (less_eq(acked, n_ptr->bclink.acked))
203		return;
204
205	spin_lock_bh(&bc_lock);
206
207	/* Skip over packets that node has previously acknowledged */
208
209	crs = bcl->first_out;
210	while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) {
211		crs = crs->next;
212	}
213
214	/* Update packets that node is now acknowledging */
215
216	while (crs && less_eq(buf_seqno(crs), acked)) {
217		next = crs->next;
218		bcbuf_decr_acks(crs);
219		if (bcbuf_acks(crs) == 0) {
220			bcl->first_out = next;
221			bcl->out_queue_size--;
222			buf_discard(crs);
223			released = 1;
224		}
225		crs = next;
226	}
227	n_ptr->bclink.acked = acked;
228
229	/* Try resolving broadcast link congestion, if necessary */
230
231	if (unlikely(bcl->next_out))
232		tipc_link_push_queue(bcl);
233	if (unlikely(released && !list_empty(&bcl->waiting_ports)))
234		tipc_link_wakeup_ports(bcl, 0);
235	spin_unlock_bh(&bc_lock);
236}
237
238/**
239 * bclink_send_ack - unicast an ACK msg
240 *
241 * tipc_net_lock and node lock set
242 */
243
244static void bclink_send_ack(struct node *n_ptr)
245{
246	struct link *l_ptr = n_ptr->active_links[n_ptr->addr & 1];
247
248	if (l_ptr != NULL)
249		tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
250}
251
252/**
253 * bclink_send_nack- broadcast a NACK msg
254 *
255 * tipc_net_lock and node lock set
256 */
257
258static void bclink_send_nack(struct node *n_ptr)
259{
260	struct sk_buff *buf;
261	struct tipc_msg *msg;
262
263	if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to))
264		return;
265
266	buf = buf_acquire(INT_H_SIZE);
267	if (buf) {
268		msg = buf_msg(buf);
269		msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
270			 TIPC_OK, INT_H_SIZE, n_ptr->addr);
271		msg_set_mc_netid(msg, tipc_net_id);
272		msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in));
273		msg_set_bcgap_after(msg, n_ptr->bclink.gap_after);
274		msg_set_bcgap_to(msg, n_ptr->bclink.gap_to);
275		msg_set_bcast_tag(msg, tipc_own_tag);
276
277		if (tipc_bearer_send(&bcbearer->bearer, buf, NULL)) {
278			bcl->stats.sent_nacks++;
279			buf_discard(buf);
280		} else {
281			tipc_bearer_schedule(bcl->b_ptr, bcl);
282			bcl->proto_msg_queue = buf;
283			bcl->stats.bearer_congs++;
284		}
285
286		/*
287		 * Ensure we doesn't send another NACK msg to the node
288		 * until 16 more deferred messages arrive from it
289		 * (i.e. helps prevent all nodes from NACK'ing at same time)
290		 */
291
292		n_ptr->bclink.nack_sync = tipc_own_tag;
293	}
294}
295
296/**
297 * tipc_bclink_check_gap - send a NACK if a sequence gap exists
298 *
299 * tipc_net_lock and node lock set
300 */
301
302void tipc_bclink_check_gap(struct node *n_ptr, u32 last_sent)
303{
304	if (!n_ptr->bclink.supported ||
305	    less_eq(last_sent, mod(n_ptr->bclink.last_in)))
306		return;
307
308	bclink_set_gap(n_ptr);
309	if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to)
310		n_ptr->bclink.gap_to = last_sent;
311	bclink_send_nack(n_ptr);
312}
313
314/**
315 * tipc_bclink_peek_nack - process a NACK msg meant for another node
316 *
317 * Only tipc_net_lock set.
318 */
319
320static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to)
321{
322	struct node *n_ptr = tipc_node_find(dest);
323	u32 my_after, my_to;
324
325	if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr)))
326		return;
327	tipc_node_lock(n_ptr);
328	/*
329	 * Modify gap to suppress unnecessary NACKs from this node
330	 */
331	my_after = n_ptr->bclink.gap_after;
332	my_to = n_ptr->bclink.gap_to;
333
334	if (less_eq(gap_after, my_after)) {
335		if (less(my_after, gap_to) && less(gap_to, my_to))
336			n_ptr->bclink.gap_after = gap_to;
337		else if (less_eq(my_to, gap_to))
338			n_ptr->bclink.gap_to = n_ptr->bclink.gap_after;
339	} else if (less_eq(gap_after, my_to)) {
340		if (less_eq(my_to, gap_to))
341			n_ptr->bclink.gap_to = gap_after;
342	} else {
343		/*
344		 * Expand gap if missing bufs not in deferred queue:
345		 */
346		struct sk_buff *buf = n_ptr->bclink.deferred_head;
347		u32 prev = n_ptr->bclink.gap_to;
348
349		for (; buf; buf = buf->next) {
350			u32 seqno = buf_seqno(buf);
351
352			if (mod(seqno - prev) != 1) {
353				buf = NULL;
354				break;
355			}
356			if (seqno == gap_after)
357				break;
358			prev = seqno;
359		}
360		if (buf == NULL)
361			n_ptr->bclink.gap_to = gap_after;
362	}
363	/*
364	 * Some nodes may send a complementary NACK now:
365	 */
366	if (bclink_ack_allowed(sender_tag + 1)) {
367		if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) {
368			bclink_send_nack(n_ptr);
369			bclink_set_gap(n_ptr);
370		}
371	}
372	tipc_node_unlock(n_ptr);
373}
374
375/**
376 * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster
377 */
378
379int tipc_bclink_send_msg(struct sk_buff *buf)
380{
381	int res;
382
383	spin_lock_bh(&bc_lock);
384
385	res = tipc_link_send_buf(bcl, buf);
386	if (unlikely(res == -ELINKCONG))
387		buf_discard(buf);
388	else
389		bcl->stats.sent_info++;
390
391	if (bcl->out_queue_size > bcl->stats.max_queue_sz)
392		bcl->stats.max_queue_sz = bcl->out_queue_size;
393	bcl->stats.queue_sz_counts++;
394	bcl->stats.accu_queue_sz += bcl->out_queue_size;
395
396	spin_unlock_bh(&bc_lock);
397	return res;
398}
399
400/**
401 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards
402 *
403 * tipc_net_lock is read_locked, no other locks set
404 */
405
406void tipc_bclink_recv_pkt(struct sk_buff *buf)
407{
408#if (TIPC_BCAST_LOSS_RATE)
409	static int rx_count = 0;
410#endif
411	struct tipc_msg *msg = buf_msg(buf);
412	struct node* node = tipc_node_find(msg_prevnode(msg));
413	u32 next_in;
414	u32 seqno;
415	struct sk_buff *deferred;
416
417	msg_dbg(msg, "<BC<<<");
418
419	if (unlikely(!node || !tipc_node_is_up(node) || !node->bclink.supported ||
420		     (msg_mc_netid(msg) != tipc_net_id))) {
421		buf_discard(buf);
422		return;
423	}
424
425	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
426		msg_dbg(msg, "<BCNACK<<<");
427		if (msg_destnode(msg) == tipc_own_addr) {
428			tipc_node_lock(node);
429			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
430			tipc_node_unlock(node);
431			spin_lock_bh(&bc_lock);
432			bcl->stats.recv_nacks++;
433			bcl->owner->next = node;   /* remember requestor */
434			bclink_retransmit_pkt(msg_bcgap_after(msg),
435					      msg_bcgap_to(msg));
436			bcl->owner->next = NULL;
437			spin_unlock_bh(&bc_lock);
438		} else {
439			tipc_bclink_peek_nack(msg_destnode(msg),
440					      msg_bcast_tag(msg),
441					      msg_bcgap_after(msg),
442					      msg_bcgap_to(msg));
443		}
444		buf_discard(buf);
445		return;
446	}
447
448#if (TIPC_BCAST_LOSS_RATE)
449	if (++rx_count == TIPC_BCAST_LOSS_RATE) {
450		rx_count = 0;
451		buf_discard(buf);
452		return;
453	}
454#endif
455
456	tipc_node_lock(node);
457receive:
458	deferred = node->bclink.deferred_head;
459	next_in = mod(node->bclink.last_in + 1);
460	seqno = msg_seqno(msg);
461
462	if (likely(seqno == next_in)) {
463		bcl->stats.recv_info++;
464		node->bclink.last_in++;
465		bclink_set_gap(node);
466		if (unlikely(bclink_ack_allowed(seqno))) {
467			bclink_send_ack(node);
468			bcl->stats.sent_acks++;
469		}
470		if (likely(msg_isdata(msg))) {
471			tipc_node_unlock(node);
472			tipc_port_recv_mcast(buf, NULL);
473		} else if (msg_user(msg) == MSG_BUNDLER) {
474			bcl->stats.recv_bundles++;
475			bcl->stats.recv_bundled += msg_msgcnt(msg);
476			tipc_node_unlock(node);
477			tipc_link_recv_bundle(buf);
478		} else if (msg_user(msg) == MSG_FRAGMENTER) {
479			bcl->stats.recv_fragments++;
480			if (tipc_link_recv_fragment(&node->bclink.defragm,
481						    &buf, &msg))
482				bcl->stats.recv_fragmented++;
483			tipc_node_unlock(node);
484			tipc_net_route_msg(buf);
485		} else {
486			tipc_node_unlock(node);
487			tipc_net_route_msg(buf);
488		}
489		if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) {
490			tipc_node_lock(node);
491			buf = deferred;
492			msg = buf_msg(buf);
493			node->bclink.deferred_head = deferred->next;
494			goto receive;
495		}
496		return;
497	} else if (less(next_in, seqno)) {
498		u32 gap_after = node->bclink.gap_after;
499		u32 gap_to = node->bclink.gap_to;
500
501		if (tipc_link_defer_pkt(&node->bclink.deferred_head,
502					&node->bclink.deferred_tail,
503					buf)) {
504			node->bclink.nack_sync++;
505			bcl->stats.deferred_recv++;
506			if (seqno == mod(gap_after + 1))
507				node->bclink.gap_after = seqno;
508			else if (less(gap_after, seqno) && less(seqno, gap_to))
509				node->bclink.gap_to = seqno;
510		}
511		if (bclink_ack_allowed(node->bclink.nack_sync)) {
512			if (gap_to != gap_after)
513				bclink_send_nack(node);
514			bclink_set_gap(node);
515		}
516	} else {
517		bcl->stats.duplicates++;
518		buf_discard(buf);
519	}
520	tipc_node_unlock(node);
521}
522
523u32 tipc_bclink_get_last_sent(void)
524{
525	u32 last_sent = mod(bcl->next_out_no - 1);
526
527	if (bcl->next_out)
528		last_sent = mod(buf_seqno(bcl->next_out) - 1);
529	return last_sent;
530}
531
532u32 tipc_bclink_acks_missing(struct node *n_ptr)
533{
534	return (n_ptr->bclink.supported &&
535		(tipc_bclink_get_last_sent() != n_ptr->bclink.acked));
536}
537
538
539/**
540 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
541 *
542 * Send through as many bearers as necessary to reach all nodes
543 * that support TIPC multicasting.
544 *
545 * Returns 0 if packet sent successfully, non-zero if not
546 */
547
548static int tipc_bcbearer_send(struct sk_buff *buf,
549			      struct tipc_bearer *unused1,
550			      struct tipc_media_addr *unused2)
551{
552	static int send_count = 0;
553
554	struct node_map *remains;
555	struct node_map *remains_new;
556	struct node_map *remains_tmp;
557	int bp_index;
558	int swap_time;
559	int err;
560
561	/* Prepare buffer for broadcasting (if first time trying to send it) */
562
563	if (likely(!msg_non_seq(buf_msg(buf)))) {
564		struct tipc_msg *msg;
565
566		assert(tipc_cltr_bcast_nodes.count != 0);
567		bcbuf_set_acks(buf, tipc_cltr_bcast_nodes.count);
568		msg = buf_msg(buf);
569		msg_set_non_seq(msg);
570		msg_set_mc_netid(msg, tipc_net_id);
571	}
572
573	/* Determine if bearer pairs should be swapped following this attempt */
574
575	if ((swap_time = (++send_count >= 10)))
576		send_count = 0;
577
578	/* Send buffer over bearers until all targets reached */
579
580	remains = kmalloc(sizeof(struct node_map), GFP_ATOMIC);
581	remains_new = kmalloc(sizeof(struct node_map), GFP_ATOMIC);
582	*remains = tipc_cltr_bcast_nodes;
583
584	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
585		struct bearer *p = bcbearer->bpairs[bp_index].primary;
586		struct bearer *s = bcbearer->bpairs[bp_index].secondary;
587
588		if (!p)
589			break;	/* no more bearers to try */
590
591		tipc_nmap_diff(remains, &p->nodes, remains_new);
592		if (remains_new->count == remains->count)
593			continue;	/* bearer pair doesn't add anything */
594
595		if (!p->publ.blocked &&
596		    !p->media->send_msg(buf, &p->publ, &p->media->bcast_addr)) {
597			if (swap_time && s && !s->publ.blocked)
598				goto swap;
599			else
600				goto update;
601		}
602
603		if (!s || s->publ.blocked ||
604		    s->media->send_msg(buf, &s->publ, &s->media->bcast_addr))
605			continue;	/* unable to send using bearer pair */
606swap:
607		bcbearer->bpairs[bp_index].primary = s;
608		bcbearer->bpairs[bp_index].secondary = p;
609update:
610		if (remains_new->count == 0) {
611			err = TIPC_OK;
612			goto out;
613		}
614
615		/* swap map */
616		remains_tmp = remains;
617		remains = remains_new;
618		remains_new = remains_tmp;
619	}
620
621	/* Unable to reach all targets */
622
623	bcbearer->bearer.publ.blocked = 1;
624	bcl->stats.bearer_congs++;
625	err = ~TIPC_OK;
626
627 out:
628	kfree(remains_new);
629	kfree(remains);
630	return err;
631}
632
633/**
634 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
635 */
636
637void tipc_bcbearer_sort(void)
638{
639	struct bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
640	struct bcbearer_pair *bp_curr;
641	int b_index;
642	int pri;
643
644	spin_lock_bh(&bc_lock);
645
646	/* Group bearers by priority (can assume max of two per priority) */
647
648	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
649
650	for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
651		struct bearer *b = &tipc_bearers[b_index];
652
653		if (!b->active || !b->nodes.count)
654			continue;
655
656		if (!bp_temp[b->priority].primary)
657			bp_temp[b->priority].primary = b;
658		else
659			bp_temp[b->priority].secondary = b;
660	}
661
662	/* Create array of bearer pairs for broadcasting */
663
664	bp_curr = bcbearer->bpairs;
665	memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
666
667	for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
668
669		if (!bp_temp[pri].primary)
670			continue;
671
672		bp_curr->primary = bp_temp[pri].primary;
673
674		if (bp_temp[pri].secondary) {
675			if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
676					    &bp_temp[pri].secondary->nodes)) {
677				bp_curr->secondary = bp_temp[pri].secondary;
678			} else {
679				bp_curr++;
680				bp_curr->primary = bp_temp[pri].secondary;
681			}
682		}
683
684		bp_curr++;
685	}
686
687	spin_unlock_bh(&bc_lock);
688}
689
690/**
691 * tipc_bcbearer_push - resolve bearer congestion
692 *
693 * Forces bclink to push out any unsent packets, until all packets are gone
694 * or congestion reoccurs.
695 * No locks set when function called
696 */
697
698void tipc_bcbearer_push(void)
699{
700	struct bearer *b_ptr;
701
702	spin_lock_bh(&bc_lock);
703	b_ptr = &bcbearer->bearer;
704	if (b_ptr->publ.blocked) {
705		b_ptr->publ.blocked = 0;
706		tipc_bearer_lock_push(b_ptr);
707	}
708	spin_unlock_bh(&bc_lock);
709}
710
711
712int tipc_bclink_stats(char *buf, const u32 buf_size)
713{
714	struct print_buf pb;
715
716	if (!bcl)
717		return 0;
718
719	tipc_printbuf_init(&pb, buf, buf_size);
720
721	spin_lock_bh(&bc_lock);
722
723	tipc_printf(&pb, "Link <%s>\n"
724		         "  Window:%u packets\n",
725		    bcl->name, bcl->queue_limit[0]);
726	tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
727		    bcl->stats.recv_info,
728		    bcl->stats.recv_fragments,
729		    bcl->stats.recv_fragmented,
730		    bcl->stats.recv_bundles,
731		    bcl->stats.recv_bundled);
732	tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
733		    bcl->stats.sent_info,
734		    bcl->stats.sent_fragments,
735		    bcl->stats.sent_fragmented,
736		    bcl->stats.sent_bundles,
737		    bcl->stats.sent_bundled);
738	tipc_printf(&pb, "  RX naks:%u defs:%u dups:%u\n",
739		    bcl->stats.recv_nacks,
740		    bcl->stats.deferred_recv,
741		    bcl->stats.duplicates);
742	tipc_printf(&pb, "  TX naks:%u acks:%u dups:%u\n",
743		    bcl->stats.sent_nacks,
744		    bcl->stats.sent_acks,
745		    bcl->stats.retransmitted);
746	tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
747		    bcl->stats.bearer_congs,
748		    bcl->stats.link_congs,
749		    bcl->stats.max_queue_sz,
750		    bcl->stats.queue_sz_counts
751		    ? (bcl->stats.accu_queue_sz / bcl->stats.queue_sz_counts)
752		    : 0);
753
754	spin_unlock_bh(&bc_lock);
755	return tipc_printbuf_validate(&pb);
756}
757
758int tipc_bclink_reset_stats(void)
759{
760	if (!bcl)
761		return -ENOPROTOOPT;
762
763	spin_lock_bh(&bc_lock);
764	memset(&bcl->stats, 0, sizeof(bcl->stats));
765	spin_unlock_bh(&bc_lock);
766	return TIPC_OK;
767}
768
769int tipc_bclink_set_queue_limits(u32 limit)
770{
771	if (!bcl)
772		return -ENOPROTOOPT;
773	if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
774		return -EINVAL;
775
776	spin_lock_bh(&bc_lock);
777	tipc_link_set_queue_limits(bcl, limit);
778	spin_unlock_bh(&bc_lock);
779	return TIPC_OK;
780}
781
782int tipc_bclink_init(void)
783{
784	bcbearer = kmalloc(sizeof(*bcbearer), GFP_ATOMIC);
785	bclink = kmalloc(sizeof(*bclink), GFP_ATOMIC);
786	if (!bcbearer || !bclink) {
787 nomem:
788	 	warn("Memory squeeze; Failed to create multicast link\n");
789		kfree(bcbearer);
790		bcbearer = NULL;
791		kfree(bclink);
792		bclink = NULL;
793		return -ENOMEM;
794	}
795
796	memset(bcbearer, 0, sizeof(struct bcbearer));
797	INIT_LIST_HEAD(&bcbearer->bearer.cong_links);
798	bcbearer->bearer.media = &bcbearer->media;
799	bcbearer->media.send_msg = tipc_bcbearer_send;
800	sprintf(bcbearer->media.name, "tipc-multicast");
801
802	bcl = &bclink->link;
803	memset(bclink, 0, sizeof(struct bclink));
804	INIT_LIST_HEAD(&bcl->waiting_ports);
805	bcl->next_out_no = 1;
806	bclink->node.lock =  SPIN_LOCK_UNLOCKED;
807	bcl->owner = &bclink->node;
808        bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
809	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
810	bcl->b_ptr = &bcbearer->bearer;
811	bcl->state = WORKING_WORKING;
812	sprintf(bcl->name, tipc_bclink_name);
813
814	if (BCLINK_LOG_BUF_SIZE) {
815		char *pb = kmalloc(BCLINK_LOG_BUF_SIZE, GFP_ATOMIC);
816
817		if (!pb)
818			goto nomem;
819		tipc_printbuf_init(&bcl->print_buf, pb, BCLINK_LOG_BUF_SIZE);
820	}
821
822	return TIPC_OK;
823}
824
825void tipc_bclink_stop(void)
826{
827	spin_lock_bh(&bc_lock);
828	if (bcbearer) {
829		tipc_link_stop(bcl);
830		if (BCLINK_LOG_BUF_SIZE)
831			kfree(bcl->print_buf.buf);
832		bcl = NULL;
833		kfree(bclink);
834		bclink = NULL;
835		kfree(bcbearer);
836		bcbearer = NULL;
837	}
838	spin_unlock_bh(&bc_lock);
839}
840
841