port.c revision df4ef33716232077564024baf0e5f2c74a295dfd
1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2007, Ericsson AB 5 * Copyright (c) 2004-2008, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "dbg.h" 40#include "port.h" 41#include "addr.h" 42#include "link.h" 43#include "node.h" 44#include "name_table.h" 45#include "user_reg.h" 46#include "msg.h" 47#include "bcast.h" 48 49/* Connection management: */ 50#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 51#define CONFIRMED 0 52#define PROBING 1 53 54#define MAX_REJECT_SIZE 1024 55 56static struct sk_buff *msg_queue_head = NULL; 57static struct sk_buff *msg_queue_tail = NULL; 58 59DEFINE_SPINLOCK(tipc_port_list_lock); 60static DEFINE_SPINLOCK(queue_lock); 61 62static LIST_HEAD(ports); 63static void port_handle_node_down(unsigned long ref); 64static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err); 65static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err); 66static void port_timeout(unsigned long ref); 67 68 69static u32 port_peernode(struct port *p_ptr) 70{ 71 return msg_destnode(&p_ptr->publ.phdr); 72} 73 74static u32 port_peerport(struct port *p_ptr) 75{ 76 return msg_destport(&p_ptr->publ.phdr); 77} 78 79static u32 port_out_seqno(struct port *p_ptr) 80{ 81 return msg_transp_seqno(&p_ptr->publ.phdr); 82} 83 84static void port_incr_out_seqno(struct port *p_ptr) 85{ 86 struct tipc_msg *m = &p_ptr->publ.phdr; 87 88 if (likely(!msg_routed(m))) 89 return; 90 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1)); 91} 92 93/** 94 * tipc_multicast - send a multicast message to local and remote destinations 95 */ 96 97int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain, 98 u32 num_sect, struct iovec const *msg_sect) 99{ 100 struct tipc_msg *hdr; 101 struct sk_buff *buf; 102 struct sk_buff *ibuf = NULL; 103 struct port_list dports = {0, NULL, }; 104 struct port *oport = tipc_port_deref(ref); 105 int ext_targets; 106 int res; 107 108 if (unlikely(!oport)) 109 return -EINVAL; 110 111 /* Create multicast message */ 112 113 hdr = &oport->publ.phdr; 114 msg_set_type(hdr, TIPC_MCAST_MSG); 115 msg_set_nametype(hdr, seq->type); 116 msg_set_namelower(hdr, seq->lower); 117 msg_set_nameupper(hdr, seq->upper); 118 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 119 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 120 !oport->user_port, &buf); 121 if (unlikely(!buf)) 122 return res; 123 124 /* Figure out where to send multicast message */ 125 126 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 127 TIPC_NODE_SCOPE, &dports); 128 129 /* Send message to destinations (duplicate it only if necessary) */ 130 131 if (ext_targets) { 132 if (dports.count != 0) { 133 ibuf = skb_copy(buf, GFP_ATOMIC); 134 if (ibuf == NULL) { 135 tipc_port_list_free(&dports); 136 buf_discard(buf); 137 return -ENOMEM; 138 } 139 } 140 res = tipc_bclink_send_msg(buf); 141 if ((res < 0) && (dports.count != 0)) { 142 buf_discard(ibuf); 143 } 144 } else { 145 ibuf = buf; 146 } 147 148 if (res >= 0) { 149 if (ibuf) 150 tipc_port_recv_mcast(ibuf, &dports); 151 } else { 152 tipc_port_list_free(&dports); 153 } 154 return res; 155} 156 157/** 158 * tipc_port_recv_mcast - deliver multicast message to all destination ports 159 * 160 * If there is no port list, perform a lookup to create one 161 */ 162 163void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp) 164{ 165 struct tipc_msg* msg; 166 struct port_list dports = {0, NULL, }; 167 struct port_list *item = dp; 168 int cnt = 0; 169 170 msg = buf_msg(buf); 171 172 /* Create destination port list, if one wasn't supplied */ 173 174 if (dp == NULL) { 175 tipc_nametbl_mc_translate(msg_nametype(msg), 176 msg_namelower(msg), 177 msg_nameupper(msg), 178 TIPC_CLUSTER_SCOPE, 179 &dports); 180 item = dp = &dports; 181 } 182 183 /* Deliver a copy of message to each destination port */ 184 185 if (dp->count != 0) { 186 if (dp->count == 1) { 187 msg_set_destport(msg, dp->ports[0]); 188 tipc_port_recv_msg(buf); 189 tipc_port_list_free(dp); 190 return; 191 } 192 for (; cnt < dp->count; cnt++) { 193 int index = cnt % PLSIZE; 194 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 195 196 if (b == NULL) { 197 warn("Unable to deliver multicast message(s)\n"); 198 msg_dbg(msg, "LOST:"); 199 goto exit; 200 } 201 if ((index == 0) && (cnt != 0)) { 202 item = item->next; 203 } 204 msg_set_destport(buf_msg(b),item->ports[index]); 205 tipc_port_recv_msg(b); 206 } 207 } 208exit: 209 buf_discard(buf); 210 tipc_port_list_free(dp); 211} 212 213/** 214 * tipc_createport_raw - create a generic TIPC port 215 * 216 * Returns pointer to (locked) TIPC port, or NULL if unable to create it 217 */ 218 219struct tipc_port *tipc_createport_raw(void *usr_handle, 220 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 221 void (*wakeup)(struct tipc_port *), 222 const u32 importance) 223{ 224 struct port *p_ptr; 225 struct tipc_msg *msg; 226 u32 ref; 227 228 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 229 if (!p_ptr) { 230 warn("Port creation failed, no memory\n"); 231 return NULL; 232 } 233 ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); 234 if (!ref) { 235 warn("Port creation failed, reference table exhausted\n"); 236 kfree(p_ptr); 237 return NULL; 238 } 239 240 p_ptr->publ.usr_handle = usr_handle; 241 p_ptr->publ.max_pkt = MAX_PKT_DEFAULT; 242 p_ptr->publ.ref = ref; 243 msg = &p_ptr->publ.phdr; 244 msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0); 245 msg_set_origport(msg, ref); 246 p_ptr->last_in_seqno = 41; 247 p_ptr->sent = 1; 248 INIT_LIST_HEAD(&p_ptr->wait_list); 249 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 250 p_ptr->dispatcher = dispatcher; 251 p_ptr->wakeup = wakeup; 252 p_ptr->user_port = NULL; 253 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 254 spin_lock_bh(&tipc_port_list_lock); 255 INIT_LIST_HEAD(&p_ptr->publications); 256 INIT_LIST_HEAD(&p_ptr->port_list); 257 list_add_tail(&p_ptr->port_list, &ports); 258 spin_unlock_bh(&tipc_port_list_lock); 259 return &(p_ptr->publ); 260} 261 262int tipc_deleteport(u32 ref) 263{ 264 struct port *p_ptr; 265 struct sk_buff *buf = NULL; 266 267 tipc_withdraw(ref, 0, NULL); 268 p_ptr = tipc_port_lock(ref); 269 if (!p_ptr) 270 return -EINVAL; 271 272 tipc_ref_discard(ref); 273 tipc_port_unlock(p_ptr); 274 275 k_cancel_timer(&p_ptr->timer); 276 if (p_ptr->publ.connected) { 277 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 278 tipc_nodesub_unsubscribe(&p_ptr->subscription); 279 } 280 if (p_ptr->user_port) { 281 tipc_reg_remove_port(p_ptr->user_port); 282 kfree(p_ptr->user_port); 283 } 284 285 spin_lock_bh(&tipc_port_list_lock); 286 list_del(&p_ptr->port_list); 287 list_del(&p_ptr->wait_list); 288 spin_unlock_bh(&tipc_port_list_lock); 289 k_term_timer(&p_ptr->timer); 290 kfree(p_ptr); 291 dbg("Deleted port %u\n", ref); 292 tipc_net_route_msg(buf); 293 return 0; 294} 295 296/** 297 * tipc_get_port() - return port associated with 'ref' 298 * 299 * Note: Port is not locked. 300 */ 301 302struct tipc_port *tipc_get_port(const u32 ref) 303{ 304 return (struct tipc_port *)tipc_ref_deref(ref); 305} 306 307/** 308 * tipc_get_handle - return user handle associated to port 'ref' 309 */ 310 311void *tipc_get_handle(const u32 ref) 312{ 313 struct port *p_ptr; 314 void * handle; 315 316 p_ptr = tipc_port_lock(ref); 317 if (!p_ptr) 318 return NULL; 319 handle = p_ptr->publ.usr_handle; 320 tipc_port_unlock(p_ptr); 321 return handle; 322} 323 324static int port_unreliable(struct port *p_ptr) 325{ 326 return msg_src_droppable(&p_ptr->publ.phdr); 327} 328 329int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 330{ 331 struct port *p_ptr; 332 333 p_ptr = tipc_port_lock(ref); 334 if (!p_ptr) 335 return -EINVAL; 336 *isunreliable = port_unreliable(p_ptr); 337 tipc_port_unlock(p_ptr); 338 return 0; 339} 340 341int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 342{ 343 struct port *p_ptr; 344 345 p_ptr = tipc_port_lock(ref); 346 if (!p_ptr) 347 return -EINVAL; 348 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); 349 tipc_port_unlock(p_ptr); 350 return 0; 351} 352 353static int port_unreturnable(struct port *p_ptr) 354{ 355 return msg_dest_droppable(&p_ptr->publ.phdr); 356} 357 358int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 359{ 360 struct port *p_ptr; 361 362 p_ptr = tipc_port_lock(ref); 363 if (!p_ptr) 364 return -EINVAL; 365 *isunrejectable = port_unreturnable(p_ptr); 366 tipc_port_unlock(p_ptr); 367 return 0; 368} 369 370int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 371{ 372 struct port *p_ptr; 373 374 p_ptr = tipc_port_lock(ref); 375 if (!p_ptr) 376 return -EINVAL; 377 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); 378 tipc_port_unlock(p_ptr); 379 return 0; 380} 381 382/* 383 * port_build_proto_msg(): build a port level protocol 384 * or a connection abortion message. Called with 385 * tipc_port lock on. 386 */ 387static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, 388 u32 origport, u32 orignode, 389 u32 usr, u32 type, u32 err, 390 u32 seqno, u32 ack) 391{ 392 struct sk_buff *buf; 393 struct tipc_msg *msg; 394 395 buf = buf_acquire(LONG_H_SIZE); 396 if (buf) { 397 msg = buf_msg(buf); 398 msg_init(msg, usr, type, LONG_H_SIZE, destnode); 399 msg_set_errcode(msg, err); 400 msg_set_destport(msg, destport); 401 msg_set_origport(msg, origport); 402 msg_set_orignode(msg, orignode); 403 msg_set_transp_seqno(msg, seqno); 404 msg_set_msgcnt(msg, ack); 405 msg_dbg(msg, "PORT>SEND>:"); 406 } 407 return buf; 408} 409 410int tipc_reject_msg(struct sk_buff *buf, u32 err) 411{ 412 struct tipc_msg *msg = buf_msg(buf); 413 struct sk_buff *rbuf; 414 struct tipc_msg *rmsg; 415 int hdr_sz; 416 u32 imp = msg_importance(msg); 417 u32 data_sz = msg_data_sz(msg); 418 419 if (data_sz > MAX_REJECT_SIZE) 420 data_sz = MAX_REJECT_SIZE; 421 if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE)) 422 imp++; 423 msg_dbg(msg, "port->rej: "); 424 425 /* discard rejected message if it shouldn't be returned to sender */ 426 if (msg_errcode(msg) || msg_dest_droppable(msg)) { 427 buf_discard(buf); 428 return data_sz; 429 } 430 431 /* construct rejected message */ 432 if (msg_mcast(msg)) 433 hdr_sz = MCAST_H_SIZE; 434 else 435 hdr_sz = LONG_H_SIZE; 436 rbuf = buf_acquire(data_sz + hdr_sz); 437 if (rbuf == NULL) { 438 buf_discard(buf); 439 return data_sz; 440 } 441 rmsg = buf_msg(rbuf); 442 msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg)); 443 msg_set_errcode(rmsg, err); 444 msg_set_destport(rmsg, msg_origport(msg)); 445 msg_set_origport(rmsg, msg_destport(msg)); 446 if (msg_short(msg)) { 447 msg_set_orignode(rmsg, tipc_own_addr); 448 /* leave name type & instance as zeroes */ 449 } else { 450 msg_set_orignode(rmsg, msg_destnode(msg)); 451 msg_set_nametype(rmsg, msg_nametype(msg)); 452 msg_set_nameinst(rmsg, msg_nameinst(msg)); 453 } 454 msg_set_size(rmsg, data_sz + hdr_sz); 455 skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz); 456 457 /* send self-abort message when rejecting on a connected port */ 458 if (msg_connected(msg)) { 459 struct sk_buff *abuf = NULL; 460 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 461 462 if (p_ptr) { 463 if (p_ptr->publ.connected) 464 abuf = port_build_self_abort_msg(p_ptr, err); 465 tipc_port_unlock(p_ptr); 466 } 467 tipc_net_route_msg(abuf); 468 } 469 470 /* send rejected message */ 471 buf_discard(buf); 472 tipc_net_route_msg(rbuf); 473 return data_sz; 474} 475 476int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, 477 struct iovec const *msg_sect, u32 num_sect, 478 int err) 479{ 480 struct sk_buff *buf; 481 int res; 482 483 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 484 !p_ptr->user_port, &buf); 485 if (!buf) 486 return res; 487 488 return tipc_reject_msg(buf, err); 489} 490 491static void port_timeout(unsigned long ref) 492{ 493 struct port *p_ptr = tipc_port_lock(ref); 494 struct sk_buff *buf = NULL; 495 496 if (!p_ptr) 497 return; 498 499 if (!p_ptr->publ.connected) { 500 tipc_port_unlock(p_ptr); 501 return; 502 } 503 504 /* Last probe answered ? */ 505 if (p_ptr->probing_state == PROBING) { 506 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 507 } else { 508 buf = port_build_proto_msg(port_peerport(p_ptr), 509 port_peernode(p_ptr), 510 p_ptr->publ.ref, 511 tipc_own_addr, 512 CONN_MANAGER, 513 CONN_PROBE, 514 TIPC_OK, 515 port_out_seqno(p_ptr), 516 0); 517 port_incr_out_seqno(p_ptr); 518 p_ptr->probing_state = PROBING; 519 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 520 } 521 tipc_port_unlock(p_ptr); 522 tipc_net_route_msg(buf); 523} 524 525 526static void port_handle_node_down(unsigned long ref) 527{ 528 struct port *p_ptr = tipc_port_lock(ref); 529 struct sk_buff* buf = NULL; 530 531 if (!p_ptr) 532 return; 533 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 534 tipc_port_unlock(p_ptr); 535 tipc_net_route_msg(buf); 536} 537 538 539static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err) 540{ 541 u32 imp = msg_importance(&p_ptr->publ.phdr); 542 543 if (!p_ptr->publ.connected) 544 return NULL; 545 if (imp < TIPC_CRITICAL_IMPORTANCE) 546 imp++; 547 return port_build_proto_msg(p_ptr->publ.ref, 548 tipc_own_addr, 549 port_peerport(p_ptr), 550 port_peernode(p_ptr), 551 imp, 552 TIPC_CONN_MSG, 553 err, 554 p_ptr->last_in_seqno + 1, 555 0); 556} 557 558 559static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err) 560{ 561 u32 imp = msg_importance(&p_ptr->publ.phdr); 562 563 if (!p_ptr->publ.connected) 564 return NULL; 565 if (imp < TIPC_CRITICAL_IMPORTANCE) 566 imp++; 567 return port_build_proto_msg(port_peerport(p_ptr), 568 port_peernode(p_ptr), 569 p_ptr->publ.ref, 570 tipc_own_addr, 571 imp, 572 TIPC_CONN_MSG, 573 err, 574 port_out_seqno(p_ptr), 575 0); 576} 577 578void tipc_port_recv_proto_msg(struct sk_buff *buf) 579{ 580 struct tipc_msg *msg = buf_msg(buf); 581 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 582 u32 err = TIPC_OK; 583 struct sk_buff *r_buf = NULL; 584 struct sk_buff *abort_buf = NULL; 585 586 msg_dbg(msg, "PORT<RECV<:"); 587 588 if (!p_ptr) { 589 err = TIPC_ERR_NO_PORT; 590 } else if (p_ptr->publ.connected) { 591 if (port_peernode(p_ptr) != msg_orignode(msg)) 592 err = TIPC_ERR_NO_PORT; 593 if (port_peerport(p_ptr) != msg_origport(msg)) 594 err = TIPC_ERR_NO_PORT; 595 if (!err && msg_routed(msg)) { 596 u32 seqno = msg_transp_seqno(msg); 597 u32 myno = ++p_ptr->last_in_seqno; 598 if (seqno != myno) { 599 err = TIPC_ERR_NO_PORT; 600 abort_buf = port_build_self_abort_msg(p_ptr, err); 601 } 602 } 603 if (msg_type(msg) == CONN_ACK) { 604 int wakeup = tipc_port_congested(p_ptr) && 605 p_ptr->publ.congested && 606 p_ptr->wakeup; 607 p_ptr->acked += msg_msgcnt(msg); 608 if (tipc_port_congested(p_ptr)) 609 goto exit; 610 p_ptr->publ.congested = 0; 611 if (!wakeup) 612 goto exit; 613 p_ptr->wakeup(&p_ptr->publ); 614 goto exit; 615 } 616 } else if (p_ptr->publ.published) { 617 err = TIPC_ERR_NO_PORT; 618 } 619 if (err) { 620 r_buf = port_build_proto_msg(msg_origport(msg), 621 msg_orignode(msg), 622 msg_destport(msg), 623 tipc_own_addr, 624 TIPC_HIGH_IMPORTANCE, 625 TIPC_CONN_MSG, 626 err, 627 0, 628 0); 629 goto exit; 630 } 631 632 /* All is fine */ 633 if (msg_type(msg) == CONN_PROBE) { 634 r_buf = port_build_proto_msg(msg_origport(msg), 635 msg_orignode(msg), 636 msg_destport(msg), 637 tipc_own_addr, 638 CONN_MANAGER, 639 CONN_PROBE_REPLY, 640 TIPC_OK, 641 port_out_seqno(p_ptr), 642 0); 643 } 644 p_ptr->probing_state = CONFIRMED; 645 port_incr_out_seqno(p_ptr); 646exit: 647 if (p_ptr) 648 tipc_port_unlock(p_ptr); 649 tipc_net_route_msg(r_buf); 650 tipc_net_route_msg(abort_buf); 651 buf_discard(buf); 652} 653 654static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id) 655{ 656 struct publication *publ; 657 658 if (full_id) 659 tipc_printf(buf, "<%u.%u.%u:%u>:", 660 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 661 tipc_node(tipc_own_addr), p_ptr->publ.ref); 662 else 663 tipc_printf(buf, "%-10u:", p_ptr->publ.ref); 664 665 if (p_ptr->publ.connected) { 666 u32 dport = port_peerport(p_ptr); 667 u32 destnode = port_peernode(p_ptr); 668 669 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 670 tipc_zone(destnode), tipc_cluster(destnode), 671 tipc_node(destnode), dport); 672 if (p_ptr->publ.conn_type != 0) 673 tipc_printf(buf, " via {%u,%u}", 674 p_ptr->publ.conn_type, 675 p_ptr->publ.conn_instance); 676 } 677 else if (p_ptr->publ.published) { 678 tipc_printf(buf, " bound to"); 679 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 680 if (publ->lower == publ->upper) 681 tipc_printf(buf, " {%u,%u}", publ->type, 682 publ->lower); 683 else 684 tipc_printf(buf, " {%u,%u,%u}", publ->type, 685 publ->lower, publ->upper); 686 } 687 } 688 tipc_printf(buf, "\n"); 689} 690 691#define MAX_PORT_QUERY 32768 692 693struct sk_buff *tipc_port_get_ports(void) 694{ 695 struct sk_buff *buf; 696 struct tlv_desc *rep_tlv; 697 struct print_buf pb; 698 struct port *p_ptr; 699 int str_len; 700 701 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 702 if (!buf) 703 return NULL; 704 rep_tlv = (struct tlv_desc *)buf->data; 705 706 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 707 spin_lock_bh(&tipc_port_list_lock); 708 list_for_each_entry(p_ptr, &ports, port_list) { 709 spin_lock_bh(p_ptr->publ.lock); 710 port_print(p_ptr, &pb, 0); 711 spin_unlock_bh(p_ptr->publ.lock); 712 } 713 spin_unlock_bh(&tipc_port_list_lock); 714 str_len = tipc_printbuf_validate(&pb); 715 716 skb_put(buf, TLV_SPACE(str_len)); 717 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 718 719 return buf; 720} 721 722#if 0 723 724#define MAX_PORT_STATS 2000 725 726struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space) 727{ 728 u32 ref; 729 struct port *p_ptr; 730 struct sk_buff *buf; 731 struct tlv_desc *rep_tlv; 732 struct print_buf pb; 733 int str_len; 734 735 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF)) 736 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 737 738 ref = *(u32 *)TLV_DATA(req_tlv_area); 739 ref = ntohl(ref); 740 741 p_ptr = tipc_port_lock(ref); 742 if (!p_ptr) 743 return cfg_reply_error_string("port not found"); 744 745 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS)); 746 if (!buf) { 747 tipc_port_unlock(p_ptr); 748 return NULL; 749 } 750 rep_tlv = (struct tlv_desc *)buf->data; 751 752 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS); 753 port_print(p_ptr, &pb, 1); 754 /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */ 755 tipc_port_unlock(p_ptr); 756 str_len = tipc_printbuf_validate(&pb); 757 758 skb_put(buf, TLV_SPACE(str_len)); 759 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 760 761 return buf; 762} 763 764#endif 765 766void tipc_port_reinit(void) 767{ 768 struct port *p_ptr; 769 struct tipc_msg *msg; 770 771 spin_lock_bh(&tipc_port_list_lock); 772 list_for_each_entry(p_ptr, &ports, port_list) { 773 msg = &p_ptr->publ.phdr; 774 if (msg_orignode(msg) == tipc_own_addr) 775 break; 776 msg_set_prevnode(msg, tipc_own_addr); 777 msg_set_orignode(msg, tipc_own_addr); 778 } 779 spin_unlock_bh(&tipc_port_list_lock); 780} 781 782 783/* 784 * port_dispatcher_sigh(): Signal handler for messages destinated 785 * to the tipc_port interface. 786 */ 787 788static void port_dispatcher_sigh(void *dummy) 789{ 790 struct sk_buff *buf; 791 792 spin_lock_bh(&queue_lock); 793 buf = msg_queue_head; 794 msg_queue_head = NULL; 795 spin_unlock_bh(&queue_lock); 796 797 while (buf) { 798 struct port *p_ptr; 799 struct user_port *up_ptr; 800 struct tipc_portid orig; 801 struct tipc_name_seq dseq; 802 void *usr_handle; 803 int connected; 804 int published; 805 u32 message_type; 806 807 struct sk_buff *next = buf->next; 808 struct tipc_msg *msg = buf_msg(buf); 809 u32 dref = msg_destport(msg); 810 811 message_type = msg_type(msg); 812 if (message_type > TIPC_DIRECT_MSG) 813 goto reject; /* Unsupported message type */ 814 815 p_ptr = tipc_port_lock(dref); 816 if (!p_ptr) 817 goto reject; /* Port deleted while msg in queue */ 818 819 orig.ref = msg_origport(msg); 820 orig.node = msg_orignode(msg); 821 up_ptr = p_ptr->user_port; 822 usr_handle = up_ptr->usr_handle; 823 connected = p_ptr->publ.connected; 824 published = p_ptr->publ.published; 825 826 if (unlikely(msg_errcode(msg))) 827 goto err; 828 829 switch (message_type) { 830 831 case TIPC_CONN_MSG:{ 832 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 833 u32 peer_port = port_peerport(p_ptr); 834 u32 peer_node = port_peernode(p_ptr); 835 836 tipc_port_unlock(p_ptr); 837 if (unlikely(!cb)) 838 goto reject; 839 if (unlikely(!connected)) { 840 if (tipc_connect2port(dref, &orig)) 841 goto reject; 842 } else if ((msg_origport(msg) != peer_port) || 843 (msg_orignode(msg) != peer_node)) 844 goto reject; 845 if (unlikely(++p_ptr->publ.conn_unacked >= 846 TIPC_FLOW_CONTROL_WIN)) 847 tipc_acknowledge(dref, 848 p_ptr->publ.conn_unacked); 849 skb_pull(buf, msg_hdr_sz(msg)); 850 cb(usr_handle, dref, &buf, msg_data(msg), 851 msg_data_sz(msg)); 852 break; 853 } 854 case TIPC_DIRECT_MSG:{ 855 tipc_msg_event cb = up_ptr->msg_cb; 856 857 tipc_port_unlock(p_ptr); 858 if (unlikely(!cb || connected)) 859 goto reject; 860 skb_pull(buf, msg_hdr_sz(msg)); 861 cb(usr_handle, dref, &buf, msg_data(msg), 862 msg_data_sz(msg), msg_importance(msg), 863 &orig); 864 break; 865 } 866 case TIPC_MCAST_MSG: 867 case TIPC_NAMED_MSG:{ 868 tipc_named_msg_event cb = up_ptr->named_msg_cb; 869 870 tipc_port_unlock(p_ptr); 871 if (unlikely(!cb || connected || !published)) 872 goto reject; 873 dseq.type = msg_nametype(msg); 874 dseq.lower = msg_nameinst(msg); 875 dseq.upper = (message_type == TIPC_NAMED_MSG) 876 ? dseq.lower : msg_nameupper(msg); 877 skb_pull(buf, msg_hdr_sz(msg)); 878 cb(usr_handle, dref, &buf, msg_data(msg), 879 msg_data_sz(msg), msg_importance(msg), 880 &orig, &dseq); 881 break; 882 } 883 } 884 if (buf) 885 buf_discard(buf); 886 buf = next; 887 continue; 888err: 889 switch (message_type) { 890 891 case TIPC_CONN_MSG:{ 892 tipc_conn_shutdown_event cb = 893 up_ptr->conn_err_cb; 894 u32 peer_port = port_peerport(p_ptr); 895 u32 peer_node = port_peernode(p_ptr); 896 897 tipc_port_unlock(p_ptr); 898 if (!cb || !connected) 899 break; 900 if ((msg_origport(msg) != peer_port) || 901 (msg_orignode(msg) != peer_node)) 902 break; 903 tipc_disconnect(dref); 904 skb_pull(buf, msg_hdr_sz(msg)); 905 cb(usr_handle, dref, &buf, msg_data(msg), 906 msg_data_sz(msg), msg_errcode(msg)); 907 break; 908 } 909 case TIPC_DIRECT_MSG:{ 910 tipc_msg_err_event cb = up_ptr->err_cb; 911 912 tipc_port_unlock(p_ptr); 913 if (!cb || connected) 914 break; 915 skb_pull(buf, msg_hdr_sz(msg)); 916 cb(usr_handle, dref, &buf, msg_data(msg), 917 msg_data_sz(msg), msg_errcode(msg), &orig); 918 break; 919 } 920 case TIPC_MCAST_MSG: 921 case TIPC_NAMED_MSG:{ 922 tipc_named_msg_err_event cb = 923 up_ptr->named_err_cb; 924 925 tipc_port_unlock(p_ptr); 926 if (!cb || connected) 927 break; 928 dseq.type = msg_nametype(msg); 929 dseq.lower = msg_nameinst(msg); 930 dseq.upper = (message_type == TIPC_NAMED_MSG) 931 ? dseq.lower : msg_nameupper(msg); 932 skb_pull(buf, msg_hdr_sz(msg)); 933 cb(usr_handle, dref, &buf, msg_data(msg), 934 msg_data_sz(msg), msg_errcode(msg), &dseq); 935 break; 936 } 937 } 938 if (buf) 939 buf_discard(buf); 940 buf = next; 941 continue; 942reject: 943 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 944 buf = next; 945 } 946} 947 948/* 949 * port_dispatcher(): Dispatcher for messages destinated 950 * to the tipc_port interface. Called with port locked. 951 */ 952 953static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 954{ 955 buf->next = NULL; 956 spin_lock_bh(&queue_lock); 957 if (msg_queue_head) { 958 msg_queue_tail->next = buf; 959 msg_queue_tail = buf; 960 } else { 961 msg_queue_tail = msg_queue_head = buf; 962 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 963 } 964 spin_unlock_bh(&queue_lock); 965 return 0; 966} 967 968/* 969 * Wake up port after congestion: Called with port locked, 970 * 971 */ 972 973static void port_wakeup_sh(unsigned long ref) 974{ 975 struct port *p_ptr; 976 struct user_port *up_ptr; 977 tipc_continue_event cb = NULL; 978 void *uh = NULL; 979 980 p_ptr = tipc_port_lock(ref); 981 if (p_ptr) { 982 up_ptr = p_ptr->user_port; 983 if (up_ptr) { 984 cb = up_ptr->continue_event_cb; 985 uh = up_ptr->usr_handle; 986 } 987 tipc_port_unlock(p_ptr); 988 } 989 if (cb) 990 cb(uh, ref); 991} 992 993 994static void port_wakeup(struct tipc_port *p_ptr) 995{ 996 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 997} 998 999void tipc_acknowledge(u32 ref, u32 ack) 1000{ 1001 struct port *p_ptr; 1002 struct sk_buff *buf = NULL; 1003 1004 p_ptr = tipc_port_lock(ref); 1005 if (!p_ptr) 1006 return; 1007 if (p_ptr->publ.connected) { 1008 p_ptr->publ.conn_unacked -= ack; 1009 buf = port_build_proto_msg(port_peerport(p_ptr), 1010 port_peernode(p_ptr), 1011 ref, 1012 tipc_own_addr, 1013 CONN_MANAGER, 1014 CONN_ACK, 1015 TIPC_OK, 1016 port_out_seqno(p_ptr), 1017 ack); 1018 } 1019 tipc_port_unlock(p_ptr); 1020 tipc_net_route_msg(buf); 1021} 1022 1023/* 1024 * tipc_createport(): user level call. Will add port to 1025 * registry if non-zero user_ref. 1026 */ 1027 1028int tipc_createport(u32 user_ref, 1029 void *usr_handle, 1030 unsigned int importance, 1031 tipc_msg_err_event error_cb, 1032 tipc_named_msg_err_event named_error_cb, 1033 tipc_conn_shutdown_event conn_error_cb, 1034 tipc_msg_event msg_cb, 1035 tipc_named_msg_event named_msg_cb, 1036 tipc_conn_msg_event conn_msg_cb, 1037 tipc_continue_event continue_event_cb,/* May be zero */ 1038 u32 *portref) 1039{ 1040 struct user_port *up_ptr; 1041 struct port *p_ptr; 1042 1043 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 1044 if (!up_ptr) { 1045 warn("Port creation failed, no memory\n"); 1046 return -ENOMEM; 1047 } 1048 p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher, 1049 port_wakeup, importance); 1050 if (!p_ptr) { 1051 kfree(up_ptr); 1052 return -ENOMEM; 1053 } 1054 1055 p_ptr->user_port = up_ptr; 1056 up_ptr->user_ref = user_ref; 1057 up_ptr->usr_handle = usr_handle; 1058 up_ptr->ref = p_ptr->publ.ref; 1059 up_ptr->err_cb = error_cb; 1060 up_ptr->named_err_cb = named_error_cb; 1061 up_ptr->conn_err_cb = conn_error_cb; 1062 up_ptr->msg_cb = msg_cb; 1063 up_ptr->named_msg_cb = named_msg_cb; 1064 up_ptr->conn_msg_cb = conn_msg_cb; 1065 up_ptr->continue_event_cb = continue_event_cb; 1066 INIT_LIST_HEAD(&up_ptr->uport_list); 1067 tipc_reg_add_port(up_ptr); 1068 *portref = p_ptr->publ.ref; 1069 tipc_port_unlock(p_ptr); 1070 return 0; 1071} 1072 1073int tipc_ownidentity(u32 ref, struct tipc_portid *id) 1074{ 1075 id->ref = ref; 1076 id->node = tipc_own_addr; 1077 return 0; 1078} 1079 1080int tipc_portimportance(u32 ref, unsigned int *importance) 1081{ 1082 struct port *p_ptr; 1083 1084 p_ptr = tipc_port_lock(ref); 1085 if (!p_ptr) 1086 return -EINVAL; 1087 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); 1088 tipc_port_unlock(p_ptr); 1089 return 0; 1090} 1091 1092int tipc_set_portimportance(u32 ref, unsigned int imp) 1093{ 1094 struct port *p_ptr; 1095 1096 if (imp > TIPC_CRITICAL_IMPORTANCE) 1097 return -EINVAL; 1098 1099 p_ptr = tipc_port_lock(ref); 1100 if (!p_ptr) 1101 return -EINVAL; 1102 msg_set_importance(&p_ptr->publ.phdr, (u32)imp); 1103 tipc_port_unlock(p_ptr); 1104 return 0; 1105} 1106 1107 1108int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1109{ 1110 struct port *p_ptr; 1111 struct publication *publ; 1112 u32 key; 1113 int res = -EINVAL; 1114 1115 p_ptr = tipc_port_lock(ref); 1116 if (!p_ptr) 1117 return -EINVAL; 1118 1119 dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, " 1120 "lower = %u, upper = %u\n", 1121 ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper); 1122 if (p_ptr->publ.connected) 1123 goto exit; 1124 if (seq->lower > seq->upper) 1125 goto exit; 1126 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE)) 1127 goto exit; 1128 key = ref + p_ptr->pub_count + 1; 1129 if (key == ref) { 1130 res = -EADDRINUSE; 1131 goto exit; 1132 } 1133 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 1134 scope, p_ptr->publ.ref, key); 1135 if (publ) { 1136 list_add(&publ->pport_list, &p_ptr->publications); 1137 p_ptr->pub_count++; 1138 p_ptr->publ.published = 1; 1139 res = 0; 1140 } 1141exit: 1142 tipc_port_unlock(p_ptr); 1143 return res; 1144} 1145 1146int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1147{ 1148 struct port *p_ptr; 1149 struct publication *publ; 1150 struct publication *tpubl; 1151 int res = -EINVAL; 1152 1153 p_ptr = tipc_port_lock(ref); 1154 if (!p_ptr) 1155 return -EINVAL; 1156 if (!seq) { 1157 list_for_each_entry_safe(publ, tpubl, 1158 &p_ptr->publications, pport_list) { 1159 tipc_nametbl_withdraw(publ->type, publ->lower, 1160 publ->ref, publ->key); 1161 } 1162 res = 0; 1163 } else { 1164 list_for_each_entry_safe(publ, tpubl, 1165 &p_ptr->publications, pport_list) { 1166 if (publ->scope != scope) 1167 continue; 1168 if (publ->type != seq->type) 1169 continue; 1170 if (publ->lower != seq->lower) 1171 continue; 1172 if (publ->upper != seq->upper) 1173 break; 1174 tipc_nametbl_withdraw(publ->type, publ->lower, 1175 publ->ref, publ->key); 1176 res = 0; 1177 break; 1178 } 1179 } 1180 if (list_empty(&p_ptr->publications)) 1181 p_ptr->publ.published = 0; 1182 tipc_port_unlock(p_ptr); 1183 return res; 1184} 1185 1186int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1187{ 1188 struct port *p_ptr; 1189 struct tipc_msg *msg; 1190 int res = -EINVAL; 1191 1192 p_ptr = tipc_port_lock(ref); 1193 if (!p_ptr) 1194 return -EINVAL; 1195 if (p_ptr->publ.published || p_ptr->publ.connected) 1196 goto exit; 1197 if (!peer->ref) 1198 goto exit; 1199 1200 msg = &p_ptr->publ.phdr; 1201 msg_set_destnode(msg, peer->node); 1202 msg_set_destport(msg, peer->ref); 1203 msg_set_orignode(msg, tipc_own_addr); 1204 msg_set_origport(msg, p_ptr->publ.ref); 1205 msg_set_transp_seqno(msg, 42); 1206 msg_set_type(msg, TIPC_CONN_MSG); 1207 if (!may_route(peer->node)) 1208 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1209 else 1210 msg_set_hdr_sz(msg, LONG_H_SIZE); 1211 1212 p_ptr->probing_interval = PROBING_INTERVAL; 1213 p_ptr->probing_state = CONFIRMED; 1214 p_ptr->publ.connected = 1; 1215 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1216 1217 tipc_nodesub_subscribe(&p_ptr->subscription,peer->node, 1218 (void *)(unsigned long)ref, 1219 (net_ev_handler)port_handle_node_down); 1220 res = 0; 1221exit: 1222 tipc_port_unlock(p_ptr); 1223 p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1224 return res; 1225} 1226 1227/** 1228 * tipc_disconnect_port - disconnect port from peer 1229 * 1230 * Port must be locked. 1231 */ 1232 1233int tipc_disconnect_port(struct tipc_port *tp_ptr) 1234{ 1235 int res; 1236 1237 if (tp_ptr->connected) { 1238 tp_ptr->connected = 0; 1239 /* let timer expire on it's own to avoid deadlock! */ 1240 tipc_nodesub_unsubscribe( 1241 &((struct port *)tp_ptr)->subscription); 1242 res = 0; 1243 } else { 1244 res = -ENOTCONN; 1245 } 1246 return res; 1247} 1248 1249/* 1250 * tipc_disconnect(): Disconnect port form peer. 1251 * This is a node local operation. 1252 */ 1253 1254int tipc_disconnect(u32 ref) 1255{ 1256 struct port *p_ptr; 1257 int res; 1258 1259 p_ptr = tipc_port_lock(ref); 1260 if (!p_ptr) 1261 return -EINVAL; 1262 res = tipc_disconnect_port((struct tipc_port *)p_ptr); 1263 tipc_port_unlock(p_ptr); 1264 return res; 1265} 1266 1267/* 1268 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1269 */ 1270int tipc_shutdown(u32 ref) 1271{ 1272 struct port *p_ptr; 1273 struct sk_buff *buf = NULL; 1274 1275 p_ptr = tipc_port_lock(ref); 1276 if (!p_ptr) 1277 return -EINVAL; 1278 1279 if (p_ptr->publ.connected) { 1280 u32 imp = msg_importance(&p_ptr->publ.phdr); 1281 if (imp < TIPC_CRITICAL_IMPORTANCE) 1282 imp++; 1283 buf = port_build_proto_msg(port_peerport(p_ptr), 1284 port_peernode(p_ptr), 1285 ref, 1286 tipc_own_addr, 1287 imp, 1288 TIPC_CONN_MSG, 1289 TIPC_CONN_SHUTDOWN, 1290 port_out_seqno(p_ptr), 1291 0); 1292 } 1293 tipc_port_unlock(p_ptr); 1294 tipc_net_route_msg(buf); 1295 return tipc_disconnect(ref); 1296} 1297 1298int tipc_isconnected(u32 ref, int *isconnected) 1299{ 1300 struct port *p_ptr; 1301 1302 p_ptr = tipc_port_lock(ref); 1303 if (!p_ptr) 1304 return -EINVAL; 1305 *isconnected = p_ptr->publ.connected; 1306 tipc_port_unlock(p_ptr); 1307 return 0; 1308} 1309 1310int tipc_peer(u32 ref, struct tipc_portid *peer) 1311{ 1312 struct port *p_ptr; 1313 int res; 1314 1315 p_ptr = tipc_port_lock(ref); 1316 if (!p_ptr) 1317 return -EINVAL; 1318 if (p_ptr->publ.connected) { 1319 peer->ref = port_peerport(p_ptr); 1320 peer->node = port_peernode(p_ptr); 1321 res = 0; 1322 } else 1323 res = -ENOTCONN; 1324 tipc_port_unlock(p_ptr); 1325 return res; 1326} 1327 1328int tipc_ref_valid(u32 ref) 1329{ 1330 /* Works irrespective of type */ 1331 return !!tipc_ref_deref(ref); 1332} 1333 1334 1335/* 1336 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1337 * message for this node. 1338 */ 1339 1340int tipc_port_recv_sections(struct port *sender, unsigned int num_sect, 1341 struct iovec const *msg_sect) 1342{ 1343 struct sk_buff *buf; 1344 int res; 1345 1346 res = msg_build(&sender->publ.phdr, msg_sect, num_sect, 1347 MAX_MSG_SIZE, !sender->user_port, &buf); 1348 if (likely(buf)) 1349 tipc_port_recv_msg(buf); 1350 return res; 1351} 1352 1353/** 1354 * tipc_send - send message sections on connection 1355 */ 1356 1357int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect) 1358{ 1359 struct port *p_ptr; 1360 u32 destnode; 1361 int res; 1362 1363 p_ptr = tipc_port_deref(ref); 1364 if (!p_ptr || !p_ptr->publ.connected) 1365 return -EINVAL; 1366 1367 p_ptr->publ.congested = 1; 1368 if (!tipc_port_congested(p_ptr)) { 1369 destnode = port_peernode(p_ptr); 1370 if (likely(destnode != tipc_own_addr)) 1371 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1372 destnode); 1373 else 1374 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1375 1376 if (likely(res != -ELINKCONG)) { 1377 port_incr_out_seqno(p_ptr); 1378 p_ptr->publ.congested = 0; 1379 p_ptr->sent++; 1380 return res; 1381 } 1382 } 1383 if (port_unreliable(p_ptr)) { 1384 p_ptr->publ.congested = 0; 1385 /* Just calculate msg length and return */ 1386 return msg_calc_data_size(msg_sect, num_sect); 1387 } 1388 return -ELINKCONG; 1389} 1390 1391/** 1392 * tipc_send_buf - send message buffer on connection 1393 */ 1394 1395int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz) 1396{ 1397 struct port *p_ptr; 1398 struct tipc_msg *msg; 1399 u32 destnode; 1400 u32 hsz; 1401 u32 sz; 1402 u32 res; 1403 1404 p_ptr = tipc_port_deref(ref); 1405 if (!p_ptr || !p_ptr->publ.connected) 1406 return -EINVAL; 1407 1408 msg = &p_ptr->publ.phdr; 1409 hsz = msg_hdr_sz(msg); 1410 sz = hsz + dsz; 1411 msg_set_size(msg, sz); 1412 if (skb_cow(buf, hsz)) 1413 return -ENOMEM; 1414 1415 skb_push(buf, hsz); 1416 skb_copy_to_linear_data(buf, msg, hsz); 1417 destnode = msg_destnode(msg); 1418 p_ptr->publ.congested = 1; 1419 if (!tipc_port_congested(p_ptr)) { 1420 if (likely(destnode != tipc_own_addr)) 1421 res = tipc_send_buf_fast(buf, destnode); 1422 else { 1423 tipc_port_recv_msg(buf); 1424 res = sz; 1425 } 1426 if (likely(res != -ELINKCONG)) { 1427 port_incr_out_seqno(p_ptr); 1428 p_ptr->sent++; 1429 p_ptr->publ.congested = 0; 1430 return res; 1431 } 1432 } 1433 if (port_unreliable(p_ptr)) { 1434 p_ptr->publ.congested = 0; 1435 return dsz; 1436 } 1437 return -ELINKCONG; 1438} 1439 1440/** 1441 * tipc_forward2name - forward message sections to port name 1442 */ 1443 1444int tipc_forward2name(u32 ref, 1445 struct tipc_name const *name, 1446 u32 domain, 1447 u32 num_sect, 1448 struct iovec const *msg_sect, 1449 struct tipc_portid const *orig, 1450 unsigned int importance) 1451{ 1452 struct port *p_ptr; 1453 struct tipc_msg *msg; 1454 u32 destnode = domain; 1455 u32 destport = 0; 1456 int res; 1457 1458 p_ptr = tipc_port_deref(ref); 1459 if (!p_ptr || p_ptr->publ.connected) 1460 return -EINVAL; 1461 1462 msg = &p_ptr->publ.phdr; 1463 msg_set_type(msg, TIPC_NAMED_MSG); 1464 msg_set_orignode(msg, orig->node); 1465 msg_set_origport(msg, orig->ref); 1466 msg_set_hdr_sz(msg, LONG_H_SIZE); 1467 msg_set_nametype(msg, name->type); 1468 msg_set_nameinst(msg, name->instance); 1469 msg_set_lookup_scope(msg, addr_scope(domain)); 1470 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1471 msg_set_importance(msg,importance); 1472 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1473 msg_set_destnode(msg, destnode); 1474 msg_set_destport(msg, destport); 1475 1476 if (likely(destport || destnode)) { 1477 p_ptr->sent++; 1478 if (likely(destnode == tipc_own_addr)) 1479 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1480 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1481 destnode); 1482 if (likely(res != -ELINKCONG)) 1483 return res; 1484 if (port_unreliable(p_ptr)) { 1485 /* Just calculate msg length and return */ 1486 return msg_calc_data_size(msg_sect, num_sect); 1487 } 1488 return -ELINKCONG; 1489 } 1490 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1491 TIPC_ERR_NO_NAME); 1492} 1493 1494/** 1495 * tipc_send2name - send message sections to port name 1496 */ 1497 1498int tipc_send2name(u32 ref, 1499 struct tipc_name const *name, 1500 unsigned int domain, 1501 unsigned int num_sect, 1502 struct iovec const *msg_sect) 1503{ 1504 struct tipc_portid orig; 1505 1506 orig.ref = ref; 1507 orig.node = tipc_own_addr; 1508 return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig, 1509 TIPC_PORT_IMPORTANCE); 1510} 1511 1512/** 1513 * tipc_forward_buf2name - forward message buffer to port name 1514 */ 1515 1516int tipc_forward_buf2name(u32 ref, 1517 struct tipc_name const *name, 1518 u32 domain, 1519 struct sk_buff *buf, 1520 unsigned int dsz, 1521 struct tipc_portid const *orig, 1522 unsigned int importance) 1523{ 1524 struct port *p_ptr; 1525 struct tipc_msg *msg; 1526 u32 destnode = domain; 1527 u32 destport = 0; 1528 int res; 1529 1530 p_ptr = (struct port *)tipc_ref_deref(ref); 1531 if (!p_ptr || p_ptr->publ.connected) 1532 return -EINVAL; 1533 1534 msg = &p_ptr->publ.phdr; 1535 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1536 msg_set_importance(msg, importance); 1537 msg_set_type(msg, TIPC_NAMED_MSG); 1538 msg_set_orignode(msg, orig->node); 1539 msg_set_origport(msg, orig->ref); 1540 msg_set_nametype(msg, name->type); 1541 msg_set_nameinst(msg, name->instance); 1542 msg_set_lookup_scope(msg, addr_scope(domain)); 1543 msg_set_hdr_sz(msg, LONG_H_SIZE); 1544 msg_set_size(msg, LONG_H_SIZE + dsz); 1545 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1546 msg_set_destnode(msg, destnode); 1547 msg_set_destport(msg, destport); 1548 msg_dbg(msg, "forw2name ==> "); 1549 if (skb_cow(buf, LONG_H_SIZE)) 1550 return -ENOMEM; 1551 skb_push(buf, LONG_H_SIZE); 1552 skb_copy_to_linear_data(buf, msg, LONG_H_SIZE); 1553 msg_dbg(buf_msg(buf),"PREP:"); 1554 if (likely(destport || destnode)) { 1555 p_ptr->sent++; 1556 if (destnode == tipc_own_addr) 1557 return tipc_port_recv_msg(buf); 1558 res = tipc_send_buf_fast(buf, destnode); 1559 if (likely(res != -ELINKCONG)) 1560 return res; 1561 if (port_unreliable(p_ptr)) 1562 return dsz; 1563 return -ELINKCONG; 1564 } 1565 return tipc_reject_msg(buf, TIPC_ERR_NO_NAME); 1566} 1567 1568/** 1569 * tipc_send_buf2name - send message buffer to port name 1570 */ 1571 1572int tipc_send_buf2name(u32 ref, 1573 struct tipc_name const *dest, 1574 u32 domain, 1575 struct sk_buff *buf, 1576 unsigned int dsz) 1577{ 1578 struct tipc_portid orig; 1579 1580 orig.ref = ref; 1581 orig.node = tipc_own_addr; 1582 return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig, 1583 TIPC_PORT_IMPORTANCE); 1584} 1585 1586/** 1587 * tipc_forward2port - forward message sections to port identity 1588 */ 1589 1590int tipc_forward2port(u32 ref, 1591 struct tipc_portid const *dest, 1592 unsigned int num_sect, 1593 struct iovec const *msg_sect, 1594 struct tipc_portid const *orig, 1595 unsigned int importance) 1596{ 1597 struct port *p_ptr; 1598 struct tipc_msg *msg; 1599 int res; 1600 1601 p_ptr = tipc_port_deref(ref); 1602 if (!p_ptr || p_ptr->publ.connected) 1603 return -EINVAL; 1604 1605 msg = &p_ptr->publ.phdr; 1606 msg_set_type(msg, TIPC_DIRECT_MSG); 1607 msg_set_orignode(msg, orig->node); 1608 msg_set_origport(msg, orig->ref); 1609 msg_set_destnode(msg, dest->node); 1610 msg_set_destport(msg, dest->ref); 1611 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1612 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1613 msg_set_importance(msg, importance); 1614 p_ptr->sent++; 1615 if (dest->node == tipc_own_addr) 1616 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1617 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node); 1618 if (likely(res != -ELINKCONG)) 1619 return res; 1620 if (port_unreliable(p_ptr)) { 1621 /* Just calculate msg length and return */ 1622 return msg_calc_data_size(msg_sect, num_sect); 1623 } 1624 return -ELINKCONG; 1625} 1626 1627/** 1628 * tipc_send2port - send message sections to port identity 1629 */ 1630 1631int tipc_send2port(u32 ref, 1632 struct tipc_portid const *dest, 1633 unsigned int num_sect, 1634 struct iovec const *msg_sect) 1635{ 1636 struct tipc_portid orig; 1637 1638 orig.ref = ref; 1639 orig.node = tipc_own_addr; 1640 return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 1641 TIPC_PORT_IMPORTANCE); 1642} 1643 1644/** 1645 * tipc_forward_buf2port - forward message buffer to port identity 1646 */ 1647int tipc_forward_buf2port(u32 ref, 1648 struct tipc_portid const *dest, 1649 struct sk_buff *buf, 1650 unsigned int dsz, 1651 struct tipc_portid const *orig, 1652 unsigned int importance) 1653{ 1654 struct port *p_ptr; 1655 struct tipc_msg *msg; 1656 int res; 1657 1658 p_ptr = (struct port *)tipc_ref_deref(ref); 1659 if (!p_ptr || p_ptr->publ.connected) 1660 return -EINVAL; 1661 1662 msg = &p_ptr->publ.phdr; 1663 msg_set_type(msg, TIPC_DIRECT_MSG); 1664 msg_set_orignode(msg, orig->node); 1665 msg_set_origport(msg, orig->ref); 1666 msg_set_destnode(msg, dest->node); 1667 msg_set_destport(msg, dest->ref); 1668 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1669 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1670 msg_set_importance(msg, importance); 1671 msg_set_size(msg, DIR_MSG_H_SIZE + dsz); 1672 if (skb_cow(buf, DIR_MSG_H_SIZE)) 1673 return -ENOMEM; 1674 1675 skb_push(buf, DIR_MSG_H_SIZE); 1676 skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE); 1677 msg_dbg(msg, "buf2port: "); 1678 p_ptr->sent++; 1679 if (dest->node == tipc_own_addr) 1680 return tipc_port_recv_msg(buf); 1681 res = tipc_send_buf_fast(buf, dest->node); 1682 if (likely(res != -ELINKCONG)) 1683 return res; 1684 if (port_unreliable(p_ptr)) 1685 return dsz; 1686 return -ELINKCONG; 1687} 1688 1689/** 1690 * tipc_send_buf2port - send message buffer to port identity 1691 */ 1692 1693int tipc_send_buf2port(u32 ref, 1694 struct tipc_portid const *dest, 1695 struct sk_buff *buf, 1696 unsigned int dsz) 1697{ 1698 struct tipc_portid orig; 1699 1700 orig.ref = ref; 1701 orig.node = tipc_own_addr; 1702 return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 1703 TIPC_PORT_IMPORTANCE); 1704} 1705 1706