bcast.c revision 3ac90216abc7d39e694533aec2805efeb06bf8ac
1/* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38#include "core.h" 39#include "msg.h" 40#include "dbg.h" 41#include "link.h" 42#include "net.h" 43#include "node.h" 44#include "port.h" 45#include "addr.h" 46#include "node_subscr.h" 47#include "name_distr.h" 48#include "bearer.h" 49#include "name_table.h" 50#include "bcast.h" 51 52#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ 53 54#define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ 55 56#define BCLINK_LOG_BUF_SIZE 0 57 58/* 59 * Loss rate for incoming broadcast frames; used to test retransmission code. 60 * Set to N to cause every N'th frame to be discarded; 0 => don't discard any. 61 */ 62 63#define TIPC_BCAST_LOSS_RATE 0 64 65/** 66 * struct bcbearer_pair - a pair of bearers used by broadcast link 67 * @primary: pointer to primary bearer 68 * @secondary: pointer to secondary bearer 69 * 70 * Bearers must have same priority and same set of reachable destinations 71 * to be paired. 72 */ 73 74struct bcbearer_pair { 75 struct bearer *primary; 76 struct bearer *secondary; 77}; 78 79/** 80 * struct bcbearer - bearer used by broadcast link 81 * @bearer: (non-standard) broadcast bearer structure 82 * @media: (non-standard) broadcast media structure 83 * @bpairs: array of bearer pairs 84 * @bpairs_temp: array of bearer pairs used during creation of "bpairs" 85 */ 86 87struct bcbearer { 88 struct bearer bearer; 89 struct media media; 90 struct bcbearer_pair bpairs[MAX_BEARERS]; 91 struct bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1]; 92}; 93 94/** 95 * struct bclink - link used for broadcast messages 96 * @link: (non-standard) broadcast link structure 97 * @node: (non-standard) node structure representing b'cast link's peer node 98 * 99 * Handles sequence numbering, fragmentation, bundling, etc. 100 */ 101 102struct bclink { 103 struct link link; 104 struct node node; 105}; 106 107 108static struct bcbearer *bcbearer = NULL; 109static struct bclink *bclink = NULL; 110static struct link *bcl = NULL; 111static spinlock_t bc_lock = SPIN_LOCK_UNLOCKED; 112 113char tipc_bclink_name[] = "multicast-link"; 114 115 116static u32 buf_seqno(struct sk_buff *buf) 117{ 118 return msg_seqno(buf_msg(buf)); 119} 120 121static u32 bcbuf_acks(struct sk_buff *buf) 122{ 123 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 124} 125 126static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 127{ 128 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 129} 130 131static void bcbuf_decr_acks(struct sk_buff *buf) 132{ 133 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 134} 135 136 137/** 138 * bclink_set_gap - set gap according to contents of current deferred pkt queue 139 * 140 * Called with 'node' locked, bc_lock unlocked 141 */ 142 143static void bclink_set_gap(struct node *n_ptr) 144{ 145 struct sk_buff *buf = n_ptr->bclink.deferred_head; 146 147 n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 148 mod(n_ptr->bclink.last_in); 149 if (unlikely(buf != NULL)) 150 n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1); 151} 152 153/** 154 * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment 155 * 156 * This mechanism endeavours to prevent all nodes in network from trying 157 * to ACK or NACK at the same time. 158 * 159 * Note: TIPC uses a different trigger to distribute ACKs than it does to 160 * distribute NACKs, but tries to use the same spacing (divide by 16). 161 */ 162 163static int bclink_ack_allowed(u32 n) 164{ 165 return((n % TIPC_MIN_LINK_WIN) == tipc_own_tag); 166} 167 168 169/** 170 * bclink_retransmit_pkt - retransmit broadcast packets 171 * @after: sequence number of last packet to *not* retransmit 172 * @to: sequence number of last packet to retransmit 173 * 174 * Called with bc_lock locked 175 */ 176 177static void bclink_retransmit_pkt(u32 after, u32 to) 178{ 179 struct sk_buff *buf; 180 181 buf = bcl->first_out; 182 while (buf && less_eq(buf_seqno(buf), after)) { 183 buf = buf->next; 184 } 185 tipc_link_retransmit(bcl, buf, mod(to - after)); 186} 187 188/** 189 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets 190 * @n_ptr: node that sent acknowledgement info 191 * @acked: broadcast sequence # that has been acknowledged 192 * 193 * Node is locked, bc_lock unlocked. 194 */ 195 196void tipc_bclink_acknowledge(struct node *n_ptr, u32 acked) 197{ 198 struct sk_buff *crs; 199 struct sk_buff *next; 200 unsigned int released = 0; 201 202 if (less_eq(acked, n_ptr->bclink.acked)) 203 return; 204 205 spin_lock_bh(&bc_lock); 206 207 /* Skip over packets that node has previously acknowledged */ 208 209 crs = bcl->first_out; 210 while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) { 211 crs = crs->next; 212 } 213 214 /* Update packets that node is now acknowledging */ 215 216 while (crs && less_eq(buf_seqno(crs), acked)) { 217 next = crs->next; 218 bcbuf_decr_acks(crs); 219 if (bcbuf_acks(crs) == 0) { 220 bcl->first_out = next; 221 bcl->out_queue_size--; 222 buf_discard(crs); 223 released = 1; 224 } 225 crs = next; 226 } 227 n_ptr->bclink.acked = acked; 228 229 /* Try resolving broadcast link congestion, if necessary */ 230 231 if (unlikely(bcl->next_out)) 232 tipc_link_push_queue(bcl); 233 if (unlikely(released && !list_empty(&bcl->waiting_ports))) 234 tipc_link_wakeup_ports(bcl, 0); 235 spin_unlock_bh(&bc_lock); 236} 237 238/** 239 * bclink_send_ack - unicast an ACK msg 240 * 241 * tipc_net_lock and node lock set 242 */ 243 244static void bclink_send_ack(struct node *n_ptr) 245{ 246 struct link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 247 248 if (l_ptr != NULL) 249 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 250} 251 252/** 253 * bclink_send_nack- broadcast a NACK msg 254 * 255 * tipc_net_lock and node lock set 256 */ 257 258static void bclink_send_nack(struct node *n_ptr) 259{ 260 struct sk_buff *buf; 261 struct tipc_msg *msg; 262 263 if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) 264 return; 265 266 buf = buf_acquire(INT_H_SIZE); 267 if (buf) { 268 msg = buf_msg(buf); 269 msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 270 TIPC_OK, INT_H_SIZE, n_ptr->addr); 271 msg_set_mc_netid(msg, tipc_net_id); 272 msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); 273 msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); 274 msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); 275 msg_set_bcast_tag(msg, tipc_own_tag); 276 277 if (tipc_bearer_send(&bcbearer->bearer, buf, NULL)) { 278 bcl->stats.sent_nacks++; 279 buf_discard(buf); 280 } else { 281 tipc_bearer_schedule(bcl->b_ptr, bcl); 282 bcl->proto_msg_queue = buf; 283 bcl->stats.bearer_congs++; 284 } 285 286 /* 287 * Ensure we doesn't send another NACK msg to the node 288 * until 16 more deferred messages arrive from it 289 * (i.e. helps prevent all nodes from NACK'ing at same time) 290 */ 291 292 n_ptr->bclink.nack_sync = tipc_own_tag; 293 } 294} 295 296/** 297 * tipc_bclink_check_gap - send a NACK if a sequence gap exists 298 * 299 * tipc_net_lock and node lock set 300 */ 301 302void tipc_bclink_check_gap(struct node *n_ptr, u32 last_sent) 303{ 304 if (!n_ptr->bclink.supported || 305 less_eq(last_sent, mod(n_ptr->bclink.last_in))) 306 return; 307 308 bclink_set_gap(n_ptr); 309 if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to) 310 n_ptr->bclink.gap_to = last_sent; 311 bclink_send_nack(n_ptr); 312} 313 314/** 315 * tipc_bclink_peek_nack - process a NACK msg meant for another node 316 * 317 * Only tipc_net_lock set. 318 */ 319 320static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) 321{ 322 struct node *n_ptr = tipc_node_find(dest); 323 u32 my_after, my_to; 324 325 if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) 326 return; 327 tipc_node_lock(n_ptr); 328 /* 329 * Modify gap to suppress unnecessary NACKs from this node 330 */ 331 my_after = n_ptr->bclink.gap_after; 332 my_to = n_ptr->bclink.gap_to; 333 334 if (less_eq(gap_after, my_after)) { 335 if (less(my_after, gap_to) && less(gap_to, my_to)) 336 n_ptr->bclink.gap_after = gap_to; 337 else if (less_eq(my_to, gap_to)) 338 n_ptr->bclink.gap_to = n_ptr->bclink.gap_after; 339 } else if (less_eq(gap_after, my_to)) { 340 if (less_eq(my_to, gap_to)) 341 n_ptr->bclink.gap_to = gap_after; 342 } else { 343 /* 344 * Expand gap if missing bufs not in deferred queue: 345 */ 346 struct sk_buff *buf = n_ptr->bclink.deferred_head; 347 u32 prev = n_ptr->bclink.gap_to; 348 349 for (; buf; buf = buf->next) { 350 u32 seqno = buf_seqno(buf); 351 352 if (mod(seqno - prev) != 1) { 353 buf = NULL; 354 break; 355 } 356 if (seqno == gap_after) 357 break; 358 prev = seqno; 359 } 360 if (buf == NULL) 361 n_ptr->bclink.gap_to = gap_after; 362 } 363 /* 364 * Some nodes may send a complementary NACK now: 365 */ 366 if (bclink_ack_allowed(sender_tag + 1)) { 367 if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) { 368 bclink_send_nack(n_ptr); 369 bclink_set_gap(n_ptr); 370 } 371 } 372 tipc_node_unlock(n_ptr); 373} 374 375/** 376 * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster 377 */ 378 379int tipc_bclink_send_msg(struct sk_buff *buf) 380{ 381 int res; 382 383 spin_lock_bh(&bc_lock); 384 385 res = tipc_link_send_buf(bcl, buf); 386 if (unlikely(res == -ELINKCONG)) 387 buf_discard(buf); 388 else 389 bcl->stats.sent_info++; 390 391 if (bcl->out_queue_size > bcl->stats.max_queue_sz) 392 bcl->stats.max_queue_sz = bcl->out_queue_size; 393 bcl->stats.queue_sz_counts++; 394 bcl->stats.accu_queue_sz += bcl->out_queue_size; 395 396 spin_unlock_bh(&bc_lock); 397 return res; 398} 399 400/** 401 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards 402 * 403 * tipc_net_lock is read_locked, no other locks set 404 */ 405 406void tipc_bclink_recv_pkt(struct sk_buff *buf) 407{ 408#if (TIPC_BCAST_LOSS_RATE) 409 static int rx_count = 0; 410#endif 411 struct tipc_msg *msg = buf_msg(buf); 412 struct node* node = tipc_node_find(msg_prevnode(msg)); 413 u32 next_in; 414 u32 seqno; 415 struct sk_buff *deferred; 416 417 msg_dbg(msg, "<BC<<<"); 418 419 if (unlikely(!node || !tipc_node_is_up(node) || !node->bclink.supported || 420 (msg_mc_netid(msg) != tipc_net_id))) { 421 buf_discard(buf); 422 return; 423 } 424 425 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 426 msg_dbg(msg, "<BCNACK<<<"); 427 if (msg_destnode(msg) == tipc_own_addr) { 428 tipc_node_lock(node); 429 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 430 tipc_node_unlock(node); 431 spin_lock_bh(&bc_lock); 432 bcl->stats.recv_nacks++; 433 bcl->owner->next = node; /* remember requestor */ 434 bclink_retransmit_pkt(msg_bcgap_after(msg), 435 msg_bcgap_to(msg)); 436 bcl->owner->next = NULL; 437 spin_unlock_bh(&bc_lock); 438 } else { 439 tipc_bclink_peek_nack(msg_destnode(msg), 440 msg_bcast_tag(msg), 441 msg_bcgap_after(msg), 442 msg_bcgap_to(msg)); 443 } 444 buf_discard(buf); 445 return; 446 } 447 448#if (TIPC_BCAST_LOSS_RATE) 449 if (++rx_count == TIPC_BCAST_LOSS_RATE) { 450 rx_count = 0; 451 buf_discard(buf); 452 return; 453 } 454#endif 455 456 tipc_node_lock(node); 457receive: 458 deferred = node->bclink.deferred_head; 459 next_in = mod(node->bclink.last_in + 1); 460 seqno = msg_seqno(msg); 461 462 if (likely(seqno == next_in)) { 463 bcl->stats.recv_info++; 464 node->bclink.last_in++; 465 bclink_set_gap(node); 466 if (unlikely(bclink_ack_allowed(seqno))) { 467 bclink_send_ack(node); 468 bcl->stats.sent_acks++; 469 } 470 if (likely(msg_isdata(msg))) { 471 tipc_node_unlock(node); 472 tipc_port_recv_mcast(buf, NULL); 473 } else if (msg_user(msg) == MSG_BUNDLER) { 474 bcl->stats.recv_bundles++; 475 bcl->stats.recv_bundled += msg_msgcnt(msg); 476 tipc_node_unlock(node); 477 tipc_link_recv_bundle(buf); 478 } else if (msg_user(msg) == MSG_FRAGMENTER) { 479 bcl->stats.recv_fragments++; 480 if (tipc_link_recv_fragment(&node->bclink.defragm, 481 &buf, &msg)) 482 bcl->stats.recv_fragmented++; 483 tipc_node_unlock(node); 484 tipc_net_route_msg(buf); 485 } else { 486 tipc_node_unlock(node); 487 tipc_net_route_msg(buf); 488 } 489 if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { 490 tipc_node_lock(node); 491 buf = deferred; 492 msg = buf_msg(buf); 493 node->bclink.deferred_head = deferred->next; 494 goto receive; 495 } 496 return; 497 } else if (less(next_in, seqno)) { 498 u32 gap_after = node->bclink.gap_after; 499 u32 gap_to = node->bclink.gap_to; 500 501 if (tipc_link_defer_pkt(&node->bclink.deferred_head, 502 &node->bclink.deferred_tail, 503 buf)) { 504 node->bclink.nack_sync++; 505 bcl->stats.deferred_recv++; 506 if (seqno == mod(gap_after + 1)) 507 node->bclink.gap_after = seqno; 508 else if (less(gap_after, seqno) && less(seqno, gap_to)) 509 node->bclink.gap_to = seqno; 510 } 511 if (bclink_ack_allowed(node->bclink.nack_sync)) { 512 if (gap_to != gap_after) 513 bclink_send_nack(node); 514 bclink_set_gap(node); 515 } 516 } else { 517 bcl->stats.duplicates++; 518 buf_discard(buf); 519 } 520 tipc_node_unlock(node); 521} 522 523u32 tipc_bclink_get_last_sent(void) 524{ 525 u32 last_sent = mod(bcl->next_out_no - 1); 526 527 if (bcl->next_out) 528 last_sent = mod(buf_seqno(bcl->next_out) - 1); 529 return last_sent; 530} 531 532u32 tipc_bclink_acks_missing(struct node *n_ptr) 533{ 534 return (n_ptr->bclink.supported && 535 (tipc_bclink_get_last_sent() != n_ptr->bclink.acked)); 536} 537 538 539/** 540 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer 541 * 542 * Send through as many bearers as necessary to reach all nodes 543 * that support TIPC multicasting. 544 * 545 * Returns 0 if packet sent successfully, non-zero if not 546 */ 547 548static int tipc_bcbearer_send(struct sk_buff *buf, 549 struct tipc_bearer *unused1, 550 struct tipc_media_addr *unused2) 551{ 552 static int send_count = 0; 553 554 struct node_map *remains; 555 struct node_map *remains_new; 556 struct node_map *remains_tmp; 557 int bp_index; 558 int swap_time; 559 int err; 560 561 /* Prepare buffer for broadcasting (if first time trying to send it) */ 562 563 if (likely(!msg_non_seq(buf_msg(buf)))) { 564 struct tipc_msg *msg; 565 566 assert(tipc_cltr_bcast_nodes.count != 0); 567 bcbuf_set_acks(buf, tipc_cltr_bcast_nodes.count); 568 msg = buf_msg(buf); 569 msg_set_non_seq(msg); 570 msg_set_mc_netid(msg, tipc_net_id); 571 } 572 573 /* Determine if bearer pairs should be swapped following this attempt */ 574 575 if ((swap_time = (++send_count >= 10))) 576 send_count = 0; 577 578 /* Send buffer over bearers until all targets reached */ 579 580 remains = kmalloc(sizeof(struct node_map), GFP_ATOMIC); 581 remains_new = kmalloc(sizeof(struct node_map), GFP_ATOMIC); 582 *remains = tipc_cltr_bcast_nodes; 583 584 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 585 struct bearer *p = bcbearer->bpairs[bp_index].primary; 586 struct bearer *s = bcbearer->bpairs[bp_index].secondary; 587 588 if (!p) 589 break; /* no more bearers to try */ 590 591 tipc_nmap_diff(remains, &p->nodes, remains_new); 592 if (remains_new->count == remains->count) 593 continue; /* bearer pair doesn't add anything */ 594 595 if (!p->publ.blocked && 596 !p->media->send_msg(buf, &p->publ, &p->media->bcast_addr)) { 597 if (swap_time && s && !s->publ.blocked) 598 goto swap; 599 else 600 goto update; 601 } 602 603 if (!s || s->publ.blocked || 604 s->media->send_msg(buf, &s->publ, &s->media->bcast_addr)) 605 continue; /* unable to send using bearer pair */ 606swap: 607 bcbearer->bpairs[bp_index].primary = s; 608 bcbearer->bpairs[bp_index].secondary = p; 609update: 610 if (remains_new->count == 0) { 611 err = TIPC_OK; 612 goto out; 613 } 614 615 /* swap map */ 616 remains_tmp = remains; 617 remains = remains_new; 618 remains_new = remains_tmp; 619 } 620 621 /* Unable to reach all targets */ 622 623 bcbearer->bearer.publ.blocked = 1; 624 bcl->stats.bearer_congs++; 625 err = ~TIPC_OK; 626 627 out: 628 kfree(remains_new); 629 kfree(remains); 630 return err; 631} 632 633/** 634 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 635 */ 636 637void tipc_bcbearer_sort(void) 638{ 639 struct bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 640 struct bcbearer_pair *bp_curr; 641 int b_index; 642 int pri; 643 644 spin_lock_bh(&bc_lock); 645 646 /* Group bearers by priority (can assume max of two per priority) */ 647 648 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 649 650 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 651 struct bearer *b = &tipc_bearers[b_index]; 652 653 if (!b->active || !b->nodes.count) 654 continue; 655 656 if (!bp_temp[b->priority].primary) 657 bp_temp[b->priority].primary = b; 658 else 659 bp_temp[b->priority].secondary = b; 660 } 661 662 /* Create array of bearer pairs for broadcasting */ 663 664 bp_curr = bcbearer->bpairs; 665 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 666 667 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 668 669 if (!bp_temp[pri].primary) 670 continue; 671 672 bp_curr->primary = bp_temp[pri].primary; 673 674 if (bp_temp[pri].secondary) { 675 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 676 &bp_temp[pri].secondary->nodes)) { 677 bp_curr->secondary = bp_temp[pri].secondary; 678 } else { 679 bp_curr++; 680 bp_curr->primary = bp_temp[pri].secondary; 681 } 682 } 683 684 bp_curr++; 685 } 686 687 spin_unlock_bh(&bc_lock); 688} 689 690/** 691 * tipc_bcbearer_push - resolve bearer congestion 692 * 693 * Forces bclink to push out any unsent packets, until all packets are gone 694 * or congestion reoccurs. 695 * No locks set when function called 696 */ 697 698void tipc_bcbearer_push(void) 699{ 700 struct bearer *b_ptr; 701 702 spin_lock_bh(&bc_lock); 703 b_ptr = &bcbearer->bearer; 704 if (b_ptr->publ.blocked) { 705 b_ptr->publ.blocked = 0; 706 tipc_bearer_lock_push(b_ptr); 707 } 708 spin_unlock_bh(&bc_lock); 709} 710 711 712int tipc_bclink_stats(char *buf, const u32 buf_size) 713{ 714 struct print_buf pb; 715 716 if (!bcl) 717 return 0; 718 719 tipc_printbuf_init(&pb, buf, buf_size); 720 721 spin_lock_bh(&bc_lock); 722 723 tipc_printf(&pb, "Link <%s>\n" 724 " Window:%u packets\n", 725 bcl->name, bcl->queue_limit[0]); 726 tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 727 bcl->stats.recv_info, 728 bcl->stats.recv_fragments, 729 bcl->stats.recv_fragmented, 730 bcl->stats.recv_bundles, 731 bcl->stats.recv_bundled); 732 tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 733 bcl->stats.sent_info, 734 bcl->stats.sent_fragments, 735 bcl->stats.sent_fragmented, 736 bcl->stats.sent_bundles, 737 bcl->stats.sent_bundled); 738 tipc_printf(&pb, " RX naks:%u defs:%u dups:%u\n", 739 bcl->stats.recv_nacks, 740 bcl->stats.deferred_recv, 741 bcl->stats.duplicates); 742 tipc_printf(&pb, " TX naks:%u acks:%u dups:%u\n", 743 bcl->stats.sent_nacks, 744 bcl->stats.sent_acks, 745 bcl->stats.retransmitted); 746 tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 747 bcl->stats.bearer_congs, 748 bcl->stats.link_congs, 749 bcl->stats.max_queue_sz, 750 bcl->stats.queue_sz_counts 751 ? (bcl->stats.accu_queue_sz / bcl->stats.queue_sz_counts) 752 : 0); 753 754 spin_unlock_bh(&bc_lock); 755 return tipc_printbuf_validate(&pb); 756} 757 758int tipc_bclink_reset_stats(void) 759{ 760 if (!bcl) 761 return -ENOPROTOOPT; 762 763 spin_lock_bh(&bc_lock); 764 memset(&bcl->stats, 0, sizeof(bcl->stats)); 765 spin_unlock_bh(&bc_lock); 766 return TIPC_OK; 767} 768 769int tipc_bclink_set_queue_limits(u32 limit) 770{ 771 if (!bcl) 772 return -ENOPROTOOPT; 773 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 774 return -EINVAL; 775 776 spin_lock_bh(&bc_lock); 777 tipc_link_set_queue_limits(bcl, limit); 778 spin_unlock_bh(&bc_lock); 779 return TIPC_OK; 780} 781 782int tipc_bclink_init(void) 783{ 784 bcbearer = kmalloc(sizeof(*bcbearer), GFP_ATOMIC); 785 bclink = kmalloc(sizeof(*bclink), GFP_ATOMIC); 786 if (!bcbearer || !bclink) { 787 nomem: 788 warn("Memory squeeze; Failed to create multicast link\n"); 789 kfree(bcbearer); 790 bcbearer = NULL; 791 kfree(bclink); 792 bclink = NULL; 793 return -ENOMEM; 794 } 795 796 memset(bcbearer, 0, sizeof(struct bcbearer)); 797 INIT_LIST_HEAD(&bcbearer->bearer.cong_links); 798 bcbearer->bearer.media = &bcbearer->media; 799 bcbearer->media.send_msg = tipc_bcbearer_send; 800 sprintf(bcbearer->media.name, "tipc-multicast"); 801 802 bcl = &bclink->link; 803 memset(bclink, 0, sizeof(struct bclink)); 804 INIT_LIST_HEAD(&bcl->waiting_ports); 805 bcl->next_out_no = 1; 806 bclink->node.lock = SPIN_LOCK_UNLOCKED; 807 bcl->owner = &bclink->node; 808 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 809 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 810 bcl->b_ptr = &bcbearer->bearer; 811 bcl->state = WORKING_WORKING; 812 sprintf(bcl->name, tipc_bclink_name); 813 814 if (BCLINK_LOG_BUF_SIZE) { 815 char *pb = kmalloc(BCLINK_LOG_BUF_SIZE, GFP_ATOMIC); 816 817 if (!pb) 818 goto nomem; 819 tipc_printbuf_init(&bcl->print_buf, pb, BCLINK_LOG_BUF_SIZE); 820 } 821 822 return TIPC_OK; 823} 824 825void tipc_bclink_stop(void) 826{ 827 spin_lock_bh(&bc_lock); 828 if (bcbearer) { 829 tipc_link_stop(bcl); 830 if (BCLINK_LOG_BUF_SIZE) 831 kfree(bcl->print_buf.buf); 832 bcl = NULL; 833 kfree(bclink); 834 bclink = NULL; 835 kfree(bcbearer); 836 bcbearer = NULL; 837 } 838 spin_unlock_bh(&bc_lock); 839} 840 841