lowcomms.c revision 36b71a8bfbc92e1ba164e9aec840c0180ee933b5
1/****************************************************************************** 2******************************************************************************* 3** 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 6** 7** This copyrighted material is made available to anyone wishing to use, 8** modify, copy, or redistribute it subject to the terms and conditions 9** of the GNU General Public License v.2. 10** 11******************************************************************************* 12******************************************************************************/ 13 14/* 15 * lowcomms.c 16 * 17 * This is the "low-level" comms layer. 18 * 19 * It is responsible for sending/receiving messages 20 * from other nodes in the cluster. 21 * 22 * Cluster nodes are referred to by their nodeids. nodeids are 23 * simply 32 bit numbers to the locking module - if they need to 24 * be expanded for the cluster infrastructure then that is its 25 * responsibility. It is this layer's 26 * responsibility to resolve these into IP address or 27 * whatever it needs for inter-node communication. 28 * 29 * The comms level is two kernel threads that deal mainly with 30 * the receiving of messages from other nodes and passing them 31 * up to the mid-level comms layer (which understands the 32 * message format) for execution by the locking core, and 33 * a send thread which does all the setting up of connections 34 * to remote nodes and the sending of data. Threads are not allowed 35 * to send their own data because it may cause them to wait in times 36 * of high load. Also, this way, the sending thread can collect together 37 * messages bound for one node and send them in one block. 38 * 39 * lowcomms will choose to use either TCP or SCTP as its transport layer 40 * depending on the configuration variable 'protocol'. This should be set 41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 42 * cluster-wide mechanism as it must be the same on all nodes of the cluster 43 * for the DLM to function. 44 * 45 */ 46 47#include <asm/ioctls.h> 48#include <net/sock.h> 49#include <net/tcp.h> 50#include <linux/pagemap.h> 51#include <linux/file.h> 52#include <linux/mutex.h> 53#include <linux/sctp.h> 54#include <linux/slab.h> 55#include <net/sctp/sctp.h> 56#include <net/sctp/user.h> 57#include <net/ipv6.h> 58 59#include "dlm_internal.h" 60#include "lowcomms.h" 61#include "midcomms.h" 62#include "config.h" 63 64#define NEEDED_RMEM (4*1024*1024) 65#define CONN_HASH_SIZE 32 66 67/* Number of messages to send before rescheduling */ 68#define MAX_SEND_MSG_COUNT 25 69 70struct cbuf { 71 unsigned int base; 72 unsigned int len; 73 unsigned int mask; 74}; 75 76static void cbuf_add(struct cbuf *cb, int n) 77{ 78 cb->len += n; 79} 80 81static int cbuf_data(struct cbuf *cb) 82{ 83 return ((cb->base + cb->len) & cb->mask); 84} 85 86static void cbuf_init(struct cbuf *cb, int size) 87{ 88 cb->base = cb->len = 0; 89 cb->mask = size-1; 90} 91 92static void cbuf_eat(struct cbuf *cb, int n) 93{ 94 cb->len -= n; 95 cb->base += n; 96 cb->base &= cb->mask; 97} 98 99static bool cbuf_empty(struct cbuf *cb) 100{ 101 return cb->len == 0; 102} 103 104struct connection { 105 struct socket *sock; /* NULL if not connected */ 106 uint32_t nodeid; /* So we know who we are in the list */ 107 struct mutex sock_mutex; 108 unsigned long flags; 109#define CF_READ_PENDING 1 110#define CF_WRITE_PENDING 2 111#define CF_CONNECT_PENDING 3 112#define CF_INIT_PENDING 4 113#define CF_IS_OTHERCON 5 114#define CF_CLOSE 6 115#define CF_APP_LIMITED 7 116 struct list_head writequeue; /* List of outgoing writequeue_entries */ 117 spinlock_t writequeue_lock; 118 int (*rx_action) (struct connection *); /* What to do when active */ 119 void (*connect_action) (struct connection *); /* What to do to connect */ 120 struct page *rx_page; 121 struct cbuf cb; 122 int retries; 123#define MAX_CONNECT_RETRIES 3 124 int sctp_assoc; 125 struct hlist_node list; 126 struct connection *othercon; 127 struct work_struct rwork; /* Receive workqueue */ 128 struct work_struct swork; /* Send workqueue */ 129}; 130#define sock2con(x) ((struct connection *)(x)->sk_user_data) 131 132/* An entry waiting to be sent */ 133struct writequeue_entry { 134 struct list_head list; 135 struct page *page; 136 int offset; 137 int len; 138 int end; 139 int users; 140 struct connection *con; 141}; 142 143struct dlm_node_addr { 144 struct list_head list; 145 int nodeid; 146 int addr_count; 147 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 148}; 149 150static LIST_HEAD(dlm_node_addrs); 151static DEFINE_SPINLOCK(dlm_node_addrs_spin); 152 153static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 154static int dlm_local_count; 155static int dlm_allow_conn; 156 157/* Work queues */ 158static struct workqueue_struct *recv_workqueue; 159static struct workqueue_struct *send_workqueue; 160 161static struct hlist_head connection_hash[CONN_HASH_SIZE]; 162static DEFINE_MUTEX(connections_lock); 163static struct kmem_cache *con_cache; 164 165static void process_recv_sockets(struct work_struct *work); 166static void process_send_sockets(struct work_struct *work); 167 168 169/* This is deliberately very simple because most clusters have simple 170 sequential nodeids, so we should be able to go straight to a connection 171 struct in the array */ 172static inline int nodeid_hash(int nodeid) 173{ 174 return nodeid & (CONN_HASH_SIZE-1); 175} 176 177static struct connection *__find_con(int nodeid) 178{ 179 int r; 180 struct hlist_node *h; 181 struct connection *con; 182 183 r = nodeid_hash(nodeid); 184 185 hlist_for_each_entry(con, h, &connection_hash[r], list) { 186 if (con->nodeid == nodeid) 187 return con; 188 } 189 return NULL; 190} 191 192/* 193 * If 'allocation' is zero then we don't attempt to create a new 194 * connection structure for this node. 195 */ 196static struct connection *__nodeid2con(int nodeid, gfp_t alloc) 197{ 198 struct connection *con = NULL; 199 int r; 200 201 con = __find_con(nodeid); 202 if (con || !alloc) 203 return con; 204 205 con = kmem_cache_zalloc(con_cache, alloc); 206 if (!con) 207 return NULL; 208 209 r = nodeid_hash(nodeid); 210 hlist_add_head(&con->list, &connection_hash[r]); 211 212 con->nodeid = nodeid; 213 mutex_init(&con->sock_mutex); 214 INIT_LIST_HEAD(&con->writequeue); 215 spin_lock_init(&con->writequeue_lock); 216 INIT_WORK(&con->swork, process_send_sockets); 217 INIT_WORK(&con->rwork, process_recv_sockets); 218 219 /* Setup action pointers for child sockets */ 220 if (con->nodeid) { 221 struct connection *zerocon = __find_con(0); 222 223 con->connect_action = zerocon->connect_action; 224 if (!con->rx_action) 225 con->rx_action = zerocon->rx_action; 226 } 227 228 return con; 229} 230 231/* Loop round all connections */ 232static void foreach_conn(void (*conn_func)(struct connection *c)) 233{ 234 int i; 235 struct hlist_node *h, *n; 236 struct connection *con; 237 238 for (i = 0; i < CONN_HASH_SIZE; i++) { 239 hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){ 240 conn_func(con); 241 } 242 } 243} 244 245static struct connection *nodeid2con(int nodeid, gfp_t allocation) 246{ 247 struct connection *con; 248 249 mutex_lock(&connections_lock); 250 con = __nodeid2con(nodeid, allocation); 251 mutex_unlock(&connections_lock); 252 253 return con; 254} 255 256/* This is a bit drastic, but only called when things go wrong */ 257static struct connection *assoc2con(int assoc_id) 258{ 259 int i; 260 struct hlist_node *h; 261 struct connection *con; 262 263 mutex_lock(&connections_lock); 264 265 for (i = 0 ; i < CONN_HASH_SIZE; i++) { 266 hlist_for_each_entry(con, h, &connection_hash[i], list) { 267 if (con->sctp_assoc == assoc_id) { 268 mutex_unlock(&connections_lock); 269 return con; 270 } 271 } 272 } 273 mutex_unlock(&connections_lock); 274 return NULL; 275} 276 277static struct dlm_node_addr *find_node_addr(int nodeid) 278{ 279 struct dlm_node_addr *na; 280 281 list_for_each_entry(na, &dlm_node_addrs, list) { 282 if (na->nodeid == nodeid) 283 return na; 284 } 285 return NULL; 286} 287 288static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) 289{ 290 switch (x->ss_family) { 291 case AF_INET: { 292 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 293 struct sockaddr_in *siny = (struct sockaddr_in *)y; 294 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 295 return 0; 296 if (sinx->sin_port != siny->sin_port) 297 return 0; 298 break; 299 } 300 case AF_INET6: { 301 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 302 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 303 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 304 return 0; 305 if (sinx->sin6_port != siny->sin6_port) 306 return 0; 307 break; 308 } 309 default: 310 return 0; 311 } 312 return 1; 313} 314 315static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 316 struct sockaddr *sa_out) 317{ 318 struct sockaddr_storage sas; 319 struct dlm_node_addr *na; 320 321 if (!dlm_local_count) 322 return -1; 323 324 spin_lock(&dlm_node_addrs_spin); 325 na = find_node_addr(nodeid); 326 if (na && na->addr_count) 327 memcpy(&sas, na->addr[0], sizeof(struct sockaddr_storage)); 328 spin_unlock(&dlm_node_addrs_spin); 329 330 if (!na) 331 return -EEXIST; 332 333 if (!na->addr_count) 334 return -ENOENT; 335 336 if (sas_out) 337 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 338 339 if (!sa_out) 340 return 0; 341 342 if (dlm_local_addr[0]->ss_family == AF_INET) { 343 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 344 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 345 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 346 } else { 347 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 348 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 349 ret6->sin6_addr = in6->sin6_addr; 350 } 351 352 return 0; 353} 354 355static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) 356{ 357 struct dlm_node_addr *na; 358 int rv = -EEXIST; 359 360 spin_lock(&dlm_node_addrs_spin); 361 list_for_each_entry(na, &dlm_node_addrs, list) { 362 if (!na->addr_count) 363 continue; 364 365 if (!addr_compare(na->addr[0], addr)) 366 continue; 367 368 *nodeid = na->nodeid; 369 rv = 0; 370 break; 371 } 372 spin_unlock(&dlm_node_addrs_spin); 373 return rv; 374} 375 376int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 377{ 378 struct sockaddr_storage *new_addr; 379 struct dlm_node_addr *new_node, *na; 380 381 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 382 if (!new_node) 383 return -ENOMEM; 384 385 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 386 if (!new_addr) { 387 kfree(new_node); 388 return -ENOMEM; 389 } 390 391 memcpy(new_addr, addr, len); 392 393 spin_lock(&dlm_node_addrs_spin); 394 na = find_node_addr(nodeid); 395 if (!na) { 396 new_node->nodeid = nodeid; 397 new_node->addr[0] = new_addr; 398 new_node->addr_count = 1; 399 list_add(&new_node->list, &dlm_node_addrs); 400 spin_unlock(&dlm_node_addrs_spin); 401 return 0; 402 } 403 404 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 405 spin_unlock(&dlm_node_addrs_spin); 406 kfree(new_addr); 407 kfree(new_node); 408 return -ENOSPC; 409 } 410 411 na->addr[na->addr_count++] = new_addr; 412 spin_unlock(&dlm_node_addrs_spin); 413 kfree(new_node); 414 return 0; 415} 416 417/* Data available on socket or listen socket received a connect */ 418static void lowcomms_data_ready(struct sock *sk, int count_unused) 419{ 420 struct connection *con = sock2con(sk); 421 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 422 queue_work(recv_workqueue, &con->rwork); 423} 424 425static void lowcomms_write_space(struct sock *sk) 426{ 427 struct connection *con = sock2con(sk); 428 429 if (!con) 430 return; 431 432 clear_bit(SOCK_NOSPACE, &con->sock->flags); 433 434 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 435 con->sock->sk->sk_write_pending--; 436 clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags); 437 } 438 439 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 440 queue_work(send_workqueue, &con->swork); 441} 442 443static inline void lowcomms_connect_sock(struct connection *con) 444{ 445 if (test_bit(CF_CLOSE, &con->flags)) 446 return; 447 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 448 queue_work(send_workqueue, &con->swork); 449} 450 451static void lowcomms_state_change(struct sock *sk) 452{ 453 if (sk->sk_state == TCP_ESTABLISHED) 454 lowcomms_write_space(sk); 455} 456 457int dlm_lowcomms_connect_node(int nodeid) 458{ 459 struct connection *con; 460 461 /* with sctp there's no connecting without sending */ 462 if (dlm_config.ci_protocol != 0) 463 return 0; 464 465 if (nodeid == dlm_our_nodeid()) 466 return 0; 467 468 con = nodeid2con(nodeid, GFP_NOFS); 469 if (!con) 470 return -ENOMEM; 471 lowcomms_connect_sock(con); 472 return 0; 473} 474 475/* Make a socket active */ 476static int add_sock(struct socket *sock, struct connection *con) 477{ 478 con->sock = sock; 479 480 /* Install a data_ready callback */ 481 con->sock->sk->sk_data_ready = lowcomms_data_ready; 482 con->sock->sk->sk_write_space = lowcomms_write_space; 483 con->sock->sk->sk_state_change = lowcomms_state_change; 484 con->sock->sk->sk_user_data = con; 485 con->sock->sk->sk_allocation = GFP_NOFS; 486 return 0; 487} 488 489/* Add the port number to an IPv6 or 4 sockaddr and return the address 490 length */ 491static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 492 int *addr_len) 493{ 494 saddr->ss_family = dlm_local_addr[0]->ss_family; 495 if (saddr->ss_family == AF_INET) { 496 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 497 in4_addr->sin_port = cpu_to_be16(port); 498 *addr_len = sizeof(struct sockaddr_in); 499 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 500 } else { 501 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 502 in6_addr->sin6_port = cpu_to_be16(port); 503 *addr_len = sizeof(struct sockaddr_in6); 504 } 505 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 506} 507 508/* Close a remote connection and tidy up */ 509static void close_connection(struct connection *con, bool and_other) 510{ 511 mutex_lock(&con->sock_mutex); 512 513 if (con->sock) { 514 sock_release(con->sock); 515 con->sock = NULL; 516 } 517 if (con->othercon && and_other) { 518 /* Will only re-enter once. */ 519 close_connection(con->othercon, false); 520 } 521 if (con->rx_page) { 522 __free_page(con->rx_page); 523 con->rx_page = NULL; 524 } 525 526 con->retries = 0; 527 mutex_unlock(&con->sock_mutex); 528} 529 530/* We only send shutdown messages to nodes that are not part of the cluster */ 531static void sctp_send_shutdown(sctp_assoc_t associd) 532{ 533 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 534 struct msghdr outmessage; 535 struct cmsghdr *cmsg; 536 struct sctp_sndrcvinfo *sinfo; 537 int ret; 538 struct connection *con; 539 540 con = nodeid2con(0,0); 541 BUG_ON(con == NULL); 542 543 outmessage.msg_name = NULL; 544 outmessage.msg_namelen = 0; 545 outmessage.msg_control = outcmsg; 546 outmessage.msg_controllen = sizeof(outcmsg); 547 outmessage.msg_flags = MSG_EOR; 548 549 cmsg = CMSG_FIRSTHDR(&outmessage); 550 cmsg->cmsg_level = IPPROTO_SCTP; 551 cmsg->cmsg_type = SCTP_SNDRCV; 552 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 553 outmessage.msg_controllen = cmsg->cmsg_len; 554 sinfo = CMSG_DATA(cmsg); 555 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 556 557 sinfo->sinfo_flags |= MSG_EOF; 558 sinfo->sinfo_assoc_id = associd; 559 560 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); 561 562 if (ret != 0) 563 log_print("send EOF to node failed: %d", ret); 564} 565 566static void sctp_init_failed_foreach(struct connection *con) 567{ 568 con->sctp_assoc = 0; 569 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 570 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 571 queue_work(send_workqueue, &con->swork); 572 } 573} 574 575/* INIT failed but we don't know which node... 576 restart INIT on all pending nodes */ 577static void sctp_init_failed(void) 578{ 579 mutex_lock(&connections_lock); 580 581 foreach_conn(sctp_init_failed_foreach); 582 583 mutex_unlock(&connections_lock); 584} 585 586/* Something happened to an association */ 587static void process_sctp_notification(struct connection *con, 588 struct msghdr *msg, char *buf) 589{ 590 union sctp_notification *sn = (union sctp_notification *)buf; 591 592 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { 593 switch (sn->sn_assoc_change.sac_state) { 594 595 case SCTP_COMM_UP: 596 case SCTP_RESTART: 597 { 598 /* Check that the new node is in the lockspace */ 599 struct sctp_prim prim; 600 int nodeid; 601 int prim_len, ret; 602 int addr_len; 603 struct connection *new_con; 604 605 /* 606 * We get this before any data for an association. 607 * We verify that the node is in the cluster and 608 * then peel off a socket for it. 609 */ 610 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 611 log_print("COMM_UP for invalid assoc ID %d", 612 (int)sn->sn_assoc_change.sac_assoc_id); 613 sctp_init_failed(); 614 return; 615 } 616 memset(&prim, 0, sizeof(struct sctp_prim)); 617 prim_len = sizeof(struct sctp_prim); 618 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; 619 620 ret = kernel_getsockopt(con->sock, 621 IPPROTO_SCTP, 622 SCTP_PRIMARY_ADDR, 623 (char*)&prim, 624 &prim_len); 625 if (ret < 0) { 626 log_print("getsockopt/sctp_primary_addr on " 627 "new assoc %d failed : %d", 628 (int)sn->sn_assoc_change.sac_assoc_id, 629 ret); 630 631 /* Retry INIT later */ 632 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 633 if (new_con) 634 clear_bit(CF_CONNECT_PENDING, &con->flags); 635 return; 636 } 637 make_sockaddr(&prim.ssp_addr, 0, &addr_len); 638 if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) { 639 unsigned char *b=(unsigned char *)&prim.ssp_addr; 640 log_print("reject connect from unknown addr"); 641 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 642 b, sizeof(struct sockaddr_storage)); 643 sctp_send_shutdown(prim.ssp_assoc_id); 644 return; 645 } 646 647 new_con = nodeid2con(nodeid, GFP_NOFS); 648 if (!new_con) 649 return; 650 651 /* Peel off a new sock */ 652 sctp_lock_sock(con->sock->sk); 653 ret = sctp_do_peeloff(con->sock->sk, 654 sn->sn_assoc_change.sac_assoc_id, 655 &new_con->sock); 656 sctp_release_sock(con->sock->sk); 657 if (ret < 0) { 658 log_print("Can't peel off a socket for " 659 "connection %d to node %d: err=%d", 660 (int)sn->sn_assoc_change.sac_assoc_id, 661 nodeid, ret); 662 return; 663 } 664 add_sock(new_con->sock, new_con); 665 666 log_print("connecting to %d sctp association %d", 667 nodeid, (int)sn->sn_assoc_change.sac_assoc_id); 668 669 /* Send any pending writes */ 670 clear_bit(CF_CONNECT_PENDING, &new_con->flags); 671 clear_bit(CF_INIT_PENDING, &con->flags); 672 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { 673 queue_work(send_workqueue, &new_con->swork); 674 } 675 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) 676 queue_work(recv_workqueue, &new_con->rwork); 677 } 678 break; 679 680 case SCTP_COMM_LOST: 681 case SCTP_SHUTDOWN_COMP: 682 { 683 con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 684 if (con) { 685 con->sctp_assoc = 0; 686 } 687 } 688 break; 689 690 /* We don't know which INIT failed, so clear the PENDING flags 691 * on them all. if assoc_id is zero then it will then try 692 * again */ 693 694 case SCTP_CANT_STR_ASSOC: 695 { 696 log_print("Can't start SCTP association - retrying"); 697 sctp_init_failed(); 698 } 699 break; 700 701 default: 702 log_print("unexpected SCTP assoc change id=%d state=%d", 703 (int)sn->sn_assoc_change.sac_assoc_id, 704 sn->sn_assoc_change.sac_state); 705 } 706 } 707} 708 709/* Data received from remote end */ 710static int receive_from_sock(struct connection *con) 711{ 712 int ret = 0; 713 struct msghdr msg = {}; 714 struct kvec iov[2]; 715 unsigned len; 716 int r; 717 int call_again_soon = 0; 718 int nvec; 719 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 720 721 mutex_lock(&con->sock_mutex); 722 723 if (con->sock == NULL) { 724 ret = -EAGAIN; 725 goto out_close; 726 } 727 728 if (con->rx_page == NULL) { 729 /* 730 * This doesn't need to be atomic, but I think it should 731 * improve performance if it is. 732 */ 733 con->rx_page = alloc_page(GFP_ATOMIC); 734 if (con->rx_page == NULL) 735 goto out_resched; 736 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 737 } 738 739 /* Only SCTP needs these really */ 740 memset(&incmsg, 0, sizeof(incmsg)); 741 msg.msg_control = incmsg; 742 msg.msg_controllen = sizeof(incmsg); 743 744 /* 745 * iov[0] is the bit of the circular buffer between the current end 746 * point (cb.base + cb.len) and the end of the buffer. 747 */ 748 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); 749 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); 750 iov[1].iov_len = 0; 751 nvec = 1; 752 753 /* 754 * iov[1] is the bit of the circular buffer between the start of the 755 * buffer and the start of the currently used section (cb.base) 756 */ 757 if (cbuf_data(&con->cb) >= con->cb.base) { 758 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); 759 iov[1].iov_len = con->cb.base; 760 iov[1].iov_base = page_address(con->rx_page); 761 nvec = 2; 762 } 763 len = iov[0].iov_len + iov[1].iov_len; 764 765 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, 766 MSG_DONTWAIT | MSG_NOSIGNAL); 767 if (ret <= 0) 768 goto out_close; 769 770 /* Process SCTP notifications */ 771 if (msg.msg_flags & MSG_NOTIFICATION) { 772 msg.msg_control = incmsg; 773 msg.msg_controllen = sizeof(incmsg); 774 775 process_sctp_notification(con, &msg, 776 page_address(con->rx_page) + con->cb.base); 777 mutex_unlock(&con->sock_mutex); 778 return 0; 779 } 780 BUG_ON(con->nodeid == 0); 781 782 if (ret == len) 783 call_again_soon = 1; 784 cbuf_add(&con->cb, ret); 785 ret = dlm_process_incoming_buffer(con->nodeid, 786 page_address(con->rx_page), 787 con->cb.base, con->cb.len, 788 PAGE_CACHE_SIZE); 789 if (ret == -EBADMSG) { 790 log_print("lowcomms: addr=%p, base=%u, len=%u, " 791 "iov_len=%u, iov_base[0]=%p, read=%d", 792 page_address(con->rx_page), con->cb.base, con->cb.len, 793 len, iov[0].iov_base, r); 794 } 795 if (ret < 0) 796 goto out_close; 797 cbuf_eat(&con->cb, ret); 798 799 if (cbuf_empty(&con->cb) && !call_again_soon) { 800 __free_page(con->rx_page); 801 con->rx_page = NULL; 802 } 803 804 if (call_again_soon) 805 goto out_resched; 806 mutex_unlock(&con->sock_mutex); 807 return 0; 808 809out_resched: 810 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 811 queue_work(recv_workqueue, &con->rwork); 812 mutex_unlock(&con->sock_mutex); 813 return -EAGAIN; 814 815out_close: 816 mutex_unlock(&con->sock_mutex); 817 if (ret != -EAGAIN) { 818 close_connection(con, false); 819 /* Reconnect when there is something to send */ 820 } 821 /* Don't return success if we really got EOF */ 822 if (ret == 0) 823 ret = -EAGAIN; 824 825 return ret; 826} 827 828/* Listening socket is busy, accept a connection */ 829static int tcp_accept_from_sock(struct connection *con) 830{ 831 int result; 832 struct sockaddr_storage peeraddr; 833 struct socket *newsock; 834 int len; 835 int nodeid; 836 struct connection *newcon; 837 struct connection *addcon; 838 839 mutex_lock(&connections_lock); 840 if (!dlm_allow_conn) { 841 mutex_unlock(&connections_lock); 842 return -1; 843 } 844 mutex_unlock(&connections_lock); 845 846 memset(&peeraddr, 0, sizeof(peeraddr)); 847 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 848 IPPROTO_TCP, &newsock); 849 if (result < 0) 850 return -ENOMEM; 851 852 mutex_lock_nested(&con->sock_mutex, 0); 853 854 result = -ENOTCONN; 855 if (con->sock == NULL) 856 goto accept_err; 857 858 newsock->type = con->sock->type; 859 newsock->ops = con->sock->ops; 860 861 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); 862 if (result < 0) 863 goto accept_err; 864 865 /* Get the connected socket's peer */ 866 memset(&peeraddr, 0, sizeof(peeraddr)); 867 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 868 &len, 2)) { 869 result = -ECONNABORTED; 870 goto accept_err; 871 } 872 873 /* Get the new node's NODEID */ 874 make_sockaddr(&peeraddr, 0, &len); 875 if (addr_to_nodeid(&peeraddr, &nodeid)) { 876 unsigned char *b=(unsigned char *)&peeraddr; 877 log_print("connect from non cluster node"); 878 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 879 b, sizeof(struct sockaddr_storage)); 880 sock_release(newsock); 881 mutex_unlock(&con->sock_mutex); 882 return -1; 883 } 884 885 log_print("got connection from %d", nodeid); 886 887 /* Check to see if we already have a connection to this node. This 888 * could happen if the two nodes initiate a connection at roughly 889 * the same time and the connections cross on the wire. 890 * In this case we store the incoming one in "othercon" 891 */ 892 newcon = nodeid2con(nodeid, GFP_NOFS); 893 if (!newcon) { 894 result = -ENOMEM; 895 goto accept_err; 896 } 897 mutex_lock_nested(&newcon->sock_mutex, 1); 898 if (newcon->sock) { 899 struct connection *othercon = newcon->othercon; 900 901 if (!othercon) { 902 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); 903 if (!othercon) { 904 log_print("failed to allocate incoming socket"); 905 mutex_unlock(&newcon->sock_mutex); 906 result = -ENOMEM; 907 goto accept_err; 908 } 909 othercon->nodeid = nodeid; 910 othercon->rx_action = receive_from_sock; 911 mutex_init(&othercon->sock_mutex); 912 INIT_WORK(&othercon->swork, process_send_sockets); 913 INIT_WORK(&othercon->rwork, process_recv_sockets); 914 set_bit(CF_IS_OTHERCON, &othercon->flags); 915 } 916 if (!othercon->sock) { 917 newcon->othercon = othercon; 918 othercon->sock = newsock; 919 newsock->sk->sk_user_data = othercon; 920 add_sock(newsock, othercon); 921 addcon = othercon; 922 } 923 else { 924 printk("Extra connection from node %d attempted\n", nodeid); 925 result = -EAGAIN; 926 mutex_unlock(&newcon->sock_mutex); 927 goto accept_err; 928 } 929 } 930 else { 931 newsock->sk->sk_user_data = newcon; 932 newcon->rx_action = receive_from_sock; 933 add_sock(newsock, newcon); 934 addcon = newcon; 935 } 936 937 mutex_unlock(&newcon->sock_mutex); 938 939 /* 940 * Add it to the active queue in case we got data 941 * between processing the accept adding the socket 942 * to the read_sockets list 943 */ 944 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 945 queue_work(recv_workqueue, &addcon->rwork); 946 mutex_unlock(&con->sock_mutex); 947 948 return 0; 949 950accept_err: 951 mutex_unlock(&con->sock_mutex); 952 sock_release(newsock); 953 954 if (result != -EAGAIN) 955 log_print("error accepting connection from node: %d", result); 956 return result; 957} 958 959static void free_entry(struct writequeue_entry *e) 960{ 961 __free_page(e->page); 962 kfree(e); 963} 964 965/* Initiate an SCTP association. 966 This is a special case of send_to_sock() in that we don't yet have a 967 peeled-off socket for this association, so we use the listening socket 968 and add the primary IP address of the remote node. 969 */ 970static void sctp_init_assoc(struct connection *con) 971{ 972 struct sockaddr_storage rem_addr; 973 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 974 struct msghdr outmessage; 975 struct cmsghdr *cmsg; 976 struct sctp_sndrcvinfo *sinfo; 977 struct connection *base_con; 978 struct writequeue_entry *e; 979 int len, offset; 980 int ret; 981 int addrlen; 982 struct kvec iov[1]; 983 984 if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) 985 return; 986 987 if (con->retries++ > MAX_CONNECT_RETRIES) 988 return; 989 990 if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr)) { 991 log_print("no address for nodeid %d", con->nodeid); 992 return; 993 } 994 base_con = nodeid2con(0, 0); 995 BUG_ON(base_con == NULL); 996 997 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); 998 999 outmessage.msg_name = &rem_addr; 1000 outmessage.msg_namelen = addrlen; 1001 outmessage.msg_control = outcmsg; 1002 outmessage.msg_controllen = sizeof(outcmsg); 1003 outmessage.msg_flags = MSG_EOR; 1004 1005 spin_lock(&con->writequeue_lock); 1006 1007 if (list_empty(&con->writequeue)) { 1008 spin_unlock(&con->writequeue_lock); 1009 log_print("writequeue empty for nodeid %d", con->nodeid); 1010 return; 1011 } 1012 1013 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 1014 len = e->len; 1015 offset = e->offset; 1016 spin_unlock(&con->writequeue_lock); 1017 1018 /* Send the first block off the write queue */ 1019 iov[0].iov_base = page_address(e->page)+offset; 1020 iov[0].iov_len = len; 1021 1022 cmsg = CMSG_FIRSTHDR(&outmessage); 1023 cmsg->cmsg_level = IPPROTO_SCTP; 1024 cmsg->cmsg_type = SCTP_SNDRCV; 1025 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 1026 sinfo = CMSG_DATA(cmsg); 1027 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 1028 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); 1029 outmessage.msg_controllen = cmsg->cmsg_len; 1030 1031 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); 1032 if (ret < 0) { 1033 log_print("Send first packet to node %d failed: %d", 1034 con->nodeid, ret); 1035 1036 /* Try again later */ 1037 clear_bit(CF_CONNECT_PENDING, &con->flags); 1038 clear_bit(CF_INIT_PENDING, &con->flags); 1039 } 1040 else { 1041 spin_lock(&con->writequeue_lock); 1042 e->offset += ret; 1043 e->len -= ret; 1044 1045 if (e->len == 0 && e->users == 0) { 1046 list_del(&e->list); 1047 free_entry(e); 1048 } 1049 spin_unlock(&con->writequeue_lock); 1050 } 1051} 1052 1053/* Connect a new socket to its peer */ 1054static void tcp_connect_to_sock(struct connection *con) 1055{ 1056 struct sockaddr_storage saddr, src_addr; 1057 int addr_len; 1058 struct socket *sock = NULL; 1059 int one = 1; 1060 int result; 1061 1062 if (con->nodeid == 0) { 1063 log_print("attempt to connect sock 0 foiled"); 1064 return; 1065 } 1066 1067 mutex_lock(&con->sock_mutex); 1068 if (con->retries++ > MAX_CONNECT_RETRIES) 1069 goto out; 1070 1071 /* Some odd races can cause double-connects, ignore them */ 1072 if (con->sock) 1073 goto out; 1074 1075 /* Create a socket to communicate with */ 1076 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1077 IPPROTO_TCP, &sock); 1078 if (result < 0) 1079 goto out_err; 1080 1081 memset(&saddr, 0, sizeof(saddr)); 1082 result = nodeid_to_addr(con->nodeid, &saddr, NULL); 1083 if (result < 0) { 1084 log_print("no address for nodeid %d", con->nodeid); 1085 goto out_err; 1086 } 1087 1088 sock->sk->sk_user_data = con; 1089 con->rx_action = receive_from_sock; 1090 con->connect_action = tcp_connect_to_sock; 1091 add_sock(sock, con); 1092 1093 /* Bind to our cluster-known address connecting to avoid 1094 routing problems */ 1095 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1096 make_sockaddr(&src_addr, 0, &addr_len); 1097 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1098 addr_len); 1099 if (result < 0) { 1100 log_print("could not bind for connect: %d", result); 1101 /* This *may* not indicate a critical error */ 1102 } 1103 1104 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1105 1106 log_print("connecting to %d", con->nodeid); 1107 1108 /* Turn off Nagle's algorithm */ 1109 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 1110 sizeof(one)); 1111 1112 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1113 O_NONBLOCK); 1114 if (result == -EINPROGRESS) 1115 result = 0; 1116 if (result == 0) 1117 goto out; 1118 1119out_err: 1120 if (con->sock) { 1121 sock_release(con->sock); 1122 con->sock = NULL; 1123 } else if (sock) { 1124 sock_release(sock); 1125 } 1126 /* 1127 * Some errors are fatal and this list might need adjusting. For other 1128 * errors we try again until the max number of retries is reached. 1129 */ 1130 if (result != -EHOSTUNREACH && 1131 result != -ENETUNREACH && 1132 result != -ENETDOWN && 1133 result != -EINVAL && 1134 result != -EPROTONOSUPPORT) { 1135 log_print("connect %d try %d error %d", con->nodeid, 1136 con->retries, result); 1137 mutex_unlock(&con->sock_mutex); 1138 msleep(1000); 1139 lowcomms_connect_sock(con); 1140 return; 1141 } 1142out: 1143 mutex_unlock(&con->sock_mutex); 1144 return; 1145} 1146 1147static struct socket *tcp_create_listen_sock(struct connection *con, 1148 struct sockaddr_storage *saddr) 1149{ 1150 struct socket *sock = NULL; 1151 int result = 0; 1152 int one = 1; 1153 int addr_len; 1154 1155 if (dlm_local_addr[0]->ss_family == AF_INET) 1156 addr_len = sizeof(struct sockaddr_in); 1157 else 1158 addr_len = sizeof(struct sockaddr_in6); 1159 1160 /* Create a socket to communicate with */ 1161 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1162 IPPROTO_TCP, &sock); 1163 if (result < 0) { 1164 log_print("Can't create listening comms socket"); 1165 goto create_out; 1166 } 1167 1168 /* Turn off Nagle's algorithm */ 1169 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 1170 sizeof(one)); 1171 1172 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1173 (char *)&one, sizeof(one)); 1174 1175 if (result < 0) { 1176 log_print("Failed to set SO_REUSEADDR on socket: %d", result); 1177 } 1178 sock->sk->sk_user_data = con; 1179 con->rx_action = tcp_accept_from_sock; 1180 con->connect_action = tcp_connect_to_sock; 1181 con->sock = sock; 1182 1183 /* Bind to our port */ 1184 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1185 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1186 if (result < 0) { 1187 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1188 sock_release(sock); 1189 sock = NULL; 1190 con->sock = NULL; 1191 goto create_out; 1192 } 1193 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 1194 (char *)&one, sizeof(one)); 1195 if (result < 0) { 1196 log_print("Set keepalive failed: %d", result); 1197 } 1198 1199 result = sock->ops->listen(sock, 5); 1200 if (result < 0) { 1201 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1202 sock_release(sock); 1203 sock = NULL; 1204 goto create_out; 1205 } 1206 1207create_out: 1208 return sock; 1209} 1210 1211/* Get local addresses */ 1212static void init_local(void) 1213{ 1214 struct sockaddr_storage sas, *addr; 1215 int i; 1216 1217 dlm_local_count = 0; 1218 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1219 if (dlm_our_addr(&sas, i)) 1220 break; 1221 1222 addr = kmalloc(sizeof(*addr), GFP_NOFS); 1223 if (!addr) 1224 break; 1225 memcpy(addr, &sas, sizeof(*addr)); 1226 dlm_local_addr[dlm_local_count++] = addr; 1227 } 1228} 1229 1230/* Bind to an IP address. SCTP allows multiple address so it can do 1231 multi-homing */ 1232static int add_sctp_bind_addr(struct connection *sctp_con, 1233 struct sockaddr_storage *addr, 1234 int addr_len, int num) 1235{ 1236 int result = 0; 1237 1238 if (num == 1) 1239 result = kernel_bind(sctp_con->sock, 1240 (struct sockaddr *) addr, 1241 addr_len); 1242 else 1243 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, 1244 SCTP_SOCKOPT_BINDX_ADD, 1245 (char *)addr, addr_len); 1246 1247 if (result < 0) 1248 log_print("Can't bind to port %d addr number %d", 1249 dlm_config.ci_tcp_port, num); 1250 1251 return result; 1252} 1253 1254/* Initialise SCTP socket and bind to all interfaces */ 1255static int sctp_listen_for_all(void) 1256{ 1257 struct socket *sock = NULL; 1258 struct sockaddr_storage localaddr; 1259 struct sctp_event_subscribe subscribe; 1260 int result = -EINVAL, num = 1, i, addr_len; 1261 struct connection *con = nodeid2con(0, GFP_NOFS); 1262 int bufsize = NEEDED_RMEM; 1263 1264 if (!con) 1265 return -ENOMEM; 1266 1267 log_print("Using SCTP for communications"); 1268 1269 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, 1270 IPPROTO_SCTP, &sock); 1271 if (result < 0) { 1272 log_print("Can't create comms socket, check SCTP is loaded"); 1273 goto out; 1274 } 1275 1276 /* Listen for events */ 1277 memset(&subscribe, 0, sizeof(subscribe)); 1278 subscribe.sctp_data_io_event = 1; 1279 subscribe.sctp_association_event = 1; 1280 subscribe.sctp_send_failure_event = 1; 1281 subscribe.sctp_shutdown_event = 1; 1282 subscribe.sctp_partial_delivery_event = 1; 1283 1284 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, 1285 (char *)&bufsize, sizeof(bufsize)); 1286 if (result) 1287 log_print("Error increasing buffer space on socket %d", result); 1288 1289 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, 1290 (char *)&subscribe, sizeof(subscribe)); 1291 if (result < 0) { 1292 log_print("Failed to set SCTP_EVENTS on socket: result=%d", 1293 result); 1294 goto create_delsock; 1295 } 1296 1297 /* Init con struct */ 1298 sock->sk->sk_user_data = con; 1299 con->sock = sock; 1300 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1301 con->rx_action = receive_from_sock; 1302 con->connect_action = sctp_init_assoc; 1303 1304 /* Bind to all interfaces. */ 1305 for (i = 0; i < dlm_local_count; i++) { 1306 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 1307 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); 1308 1309 result = add_sctp_bind_addr(con, &localaddr, addr_len, num); 1310 if (result) 1311 goto create_delsock; 1312 ++num; 1313 } 1314 1315 result = sock->ops->listen(sock, 5); 1316 if (result < 0) { 1317 log_print("Can't set socket listening"); 1318 goto create_delsock; 1319 } 1320 1321 return 0; 1322 1323create_delsock: 1324 sock_release(sock); 1325 con->sock = NULL; 1326out: 1327 return result; 1328} 1329 1330static int tcp_listen_for_all(void) 1331{ 1332 struct socket *sock = NULL; 1333 struct connection *con = nodeid2con(0, GFP_NOFS); 1334 int result = -EINVAL; 1335 1336 if (!con) 1337 return -ENOMEM; 1338 1339 /* We don't support multi-homed hosts */ 1340 if (dlm_local_addr[1] != NULL) { 1341 log_print("TCP protocol can't handle multi-homed hosts, " 1342 "try SCTP"); 1343 return -EINVAL; 1344 } 1345 1346 log_print("Using TCP for communications"); 1347 1348 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1349 if (sock) { 1350 add_sock(sock, con); 1351 result = 0; 1352 } 1353 else { 1354 result = -EADDRINUSE; 1355 } 1356 1357 return result; 1358} 1359 1360 1361 1362static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1363 gfp_t allocation) 1364{ 1365 struct writequeue_entry *entry; 1366 1367 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1368 if (!entry) 1369 return NULL; 1370 1371 entry->page = alloc_page(allocation); 1372 if (!entry->page) { 1373 kfree(entry); 1374 return NULL; 1375 } 1376 1377 entry->offset = 0; 1378 entry->len = 0; 1379 entry->end = 0; 1380 entry->users = 0; 1381 entry->con = con; 1382 1383 return entry; 1384} 1385 1386void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1387{ 1388 struct connection *con; 1389 struct writequeue_entry *e; 1390 int offset = 0; 1391 int users = 0; 1392 1393 con = nodeid2con(nodeid, allocation); 1394 if (!con) 1395 return NULL; 1396 1397 spin_lock(&con->writequeue_lock); 1398 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1399 if ((&e->list == &con->writequeue) || 1400 (PAGE_CACHE_SIZE - e->end < len)) { 1401 e = NULL; 1402 } else { 1403 offset = e->end; 1404 e->end += len; 1405 users = e->users++; 1406 } 1407 spin_unlock(&con->writequeue_lock); 1408 1409 if (e) { 1410 got_one: 1411 *ppc = page_address(e->page) + offset; 1412 return e; 1413 } 1414 1415 e = new_writequeue_entry(con, allocation); 1416 if (e) { 1417 spin_lock(&con->writequeue_lock); 1418 offset = e->end; 1419 e->end += len; 1420 users = e->users++; 1421 list_add_tail(&e->list, &con->writequeue); 1422 spin_unlock(&con->writequeue_lock); 1423 goto got_one; 1424 } 1425 return NULL; 1426} 1427 1428void dlm_lowcomms_commit_buffer(void *mh) 1429{ 1430 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1431 struct connection *con = e->con; 1432 int users; 1433 1434 spin_lock(&con->writequeue_lock); 1435 users = --e->users; 1436 if (users) 1437 goto out; 1438 e->len = e->end - e->offset; 1439 spin_unlock(&con->writequeue_lock); 1440 1441 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { 1442 queue_work(send_workqueue, &con->swork); 1443 } 1444 return; 1445 1446out: 1447 spin_unlock(&con->writequeue_lock); 1448 return; 1449} 1450 1451/* Send a message */ 1452static void send_to_sock(struct connection *con) 1453{ 1454 int ret = 0; 1455 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1456 struct writequeue_entry *e; 1457 int len, offset; 1458 int count = 0; 1459 1460 mutex_lock(&con->sock_mutex); 1461 if (con->sock == NULL) 1462 goto out_connect; 1463 1464 spin_lock(&con->writequeue_lock); 1465 for (;;) { 1466 e = list_entry(con->writequeue.next, struct writequeue_entry, 1467 list); 1468 if ((struct list_head *) e == &con->writequeue) 1469 break; 1470 1471 len = e->len; 1472 offset = e->offset; 1473 BUG_ON(len == 0 && e->users == 0); 1474 spin_unlock(&con->writequeue_lock); 1475 1476 ret = 0; 1477 if (len) { 1478 ret = kernel_sendpage(con->sock, e->page, offset, len, 1479 msg_flags); 1480 if (ret == -EAGAIN || ret == 0) { 1481 if (ret == -EAGAIN && 1482 test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) && 1483 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1484 /* Notify TCP that we're limited by the 1485 * application window size. 1486 */ 1487 set_bit(SOCK_NOSPACE, &con->sock->flags); 1488 con->sock->sk->sk_write_pending++; 1489 } 1490 cond_resched(); 1491 goto out; 1492 } 1493 if (ret <= 0) 1494 goto send_error; 1495 } 1496 1497 /* Don't starve people filling buffers */ 1498 if (++count >= MAX_SEND_MSG_COUNT) { 1499 cond_resched(); 1500 count = 0; 1501 } 1502 1503 spin_lock(&con->writequeue_lock); 1504 e->offset += ret; 1505 e->len -= ret; 1506 1507 if (e->len == 0 && e->users == 0) { 1508 list_del(&e->list); 1509 free_entry(e); 1510 continue; 1511 } 1512 } 1513 spin_unlock(&con->writequeue_lock); 1514out: 1515 mutex_unlock(&con->sock_mutex); 1516 return; 1517 1518send_error: 1519 mutex_unlock(&con->sock_mutex); 1520 close_connection(con, false); 1521 lowcomms_connect_sock(con); 1522 return; 1523 1524out_connect: 1525 mutex_unlock(&con->sock_mutex); 1526 if (!test_bit(CF_INIT_PENDING, &con->flags)) 1527 lowcomms_connect_sock(con); 1528 return; 1529} 1530 1531static void clean_one_writequeue(struct connection *con) 1532{ 1533 struct writequeue_entry *e, *safe; 1534 1535 spin_lock(&con->writequeue_lock); 1536 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1537 list_del(&e->list); 1538 free_entry(e); 1539 } 1540 spin_unlock(&con->writequeue_lock); 1541} 1542 1543/* Called from recovery when it knows that a node has 1544 left the cluster */ 1545int dlm_lowcomms_close(int nodeid) 1546{ 1547 struct connection *con; 1548 struct dlm_node_addr *na; 1549 1550 log_print("closing connection to node %d", nodeid); 1551 con = nodeid2con(nodeid, 0); 1552 if (con) { 1553 clear_bit(CF_CONNECT_PENDING, &con->flags); 1554 clear_bit(CF_WRITE_PENDING, &con->flags); 1555 set_bit(CF_CLOSE, &con->flags); 1556 if (cancel_work_sync(&con->swork)) 1557 log_print("canceled swork for node %d", nodeid); 1558 if (cancel_work_sync(&con->rwork)) 1559 log_print("canceled rwork for node %d", nodeid); 1560 clean_one_writequeue(con); 1561 close_connection(con, true); 1562 } 1563 1564 spin_lock(&dlm_node_addrs_spin); 1565 na = find_node_addr(nodeid); 1566 if (na) { 1567 list_del(&na->list); 1568 while (na->addr_count--) 1569 kfree(na->addr[na->addr_count]); 1570 kfree(na); 1571 } 1572 spin_unlock(&dlm_node_addrs_spin); 1573 1574 return 0; 1575} 1576 1577/* Receive workqueue function */ 1578static void process_recv_sockets(struct work_struct *work) 1579{ 1580 struct connection *con = container_of(work, struct connection, rwork); 1581 int err; 1582 1583 clear_bit(CF_READ_PENDING, &con->flags); 1584 do { 1585 err = con->rx_action(con); 1586 } while (!err); 1587} 1588 1589/* Send workqueue function */ 1590static void process_send_sockets(struct work_struct *work) 1591{ 1592 struct connection *con = container_of(work, struct connection, swork); 1593 1594 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 1595 con->connect_action(con); 1596 set_bit(CF_WRITE_PENDING, &con->flags); 1597 } 1598 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) 1599 send_to_sock(con); 1600} 1601 1602 1603/* Discard all entries on the write queues */ 1604static void clean_writequeues(void) 1605{ 1606 foreach_conn(clean_one_writequeue); 1607} 1608 1609static void work_stop(void) 1610{ 1611 destroy_workqueue(recv_workqueue); 1612 destroy_workqueue(send_workqueue); 1613} 1614 1615static int work_start(void) 1616{ 1617 recv_workqueue = alloc_workqueue("dlm_recv", 1618 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1619 if (!recv_workqueue) { 1620 log_print("can't start dlm_recv"); 1621 return -ENOMEM; 1622 } 1623 1624 send_workqueue = alloc_workqueue("dlm_send", 1625 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1626 if (!send_workqueue) { 1627 log_print("can't start dlm_send"); 1628 destroy_workqueue(recv_workqueue); 1629 return -ENOMEM; 1630 } 1631 1632 return 0; 1633} 1634 1635static void stop_conn(struct connection *con) 1636{ 1637 con->flags |= 0x0F; 1638 if (con->sock && con->sock->sk) 1639 con->sock->sk->sk_user_data = NULL; 1640} 1641 1642static void free_conn(struct connection *con) 1643{ 1644 close_connection(con, true); 1645 if (con->othercon) 1646 kmem_cache_free(con_cache, con->othercon); 1647 hlist_del(&con->list); 1648 kmem_cache_free(con_cache, con); 1649} 1650 1651void dlm_lowcomms_stop(void) 1652{ 1653 /* Set all the flags to prevent any 1654 socket activity. 1655 */ 1656 mutex_lock(&connections_lock); 1657 dlm_allow_conn = 0; 1658 foreach_conn(stop_conn); 1659 mutex_unlock(&connections_lock); 1660 1661 work_stop(); 1662 1663 mutex_lock(&connections_lock); 1664 clean_writequeues(); 1665 1666 foreach_conn(free_conn); 1667 1668 mutex_unlock(&connections_lock); 1669 kmem_cache_destroy(con_cache); 1670} 1671 1672int dlm_lowcomms_start(void) 1673{ 1674 int error = -EINVAL; 1675 struct connection *con; 1676 int i; 1677 1678 for (i = 0; i < CONN_HASH_SIZE; i++) 1679 INIT_HLIST_HEAD(&connection_hash[i]); 1680 1681 init_local(); 1682 if (!dlm_local_count) { 1683 error = -ENOTCONN; 1684 log_print("no local IP address has been set"); 1685 goto fail; 1686 } 1687 1688 error = -ENOMEM; 1689 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1690 __alignof__(struct connection), 0, 1691 NULL); 1692 if (!con_cache) 1693 goto fail; 1694 1695 error = work_start(); 1696 if (error) 1697 goto fail_destroy; 1698 1699 dlm_allow_conn = 1; 1700 1701 /* Start listening */ 1702 if (dlm_config.ci_protocol == 0) 1703 error = tcp_listen_for_all(); 1704 else 1705 error = sctp_listen_for_all(); 1706 if (error) 1707 goto fail_unlisten; 1708 1709 return 0; 1710 1711fail_unlisten: 1712 dlm_allow_conn = 0; 1713 con = nodeid2con(0,0); 1714 if (con) { 1715 close_connection(con, false); 1716 kmem_cache_free(con_cache, con); 1717 } 1718fail_destroy: 1719 kmem_cache_destroy(con_cache); 1720fail: 1721 return error; 1722} 1723 1724void dlm_lowcomms_exit(void) 1725{ 1726 struct dlm_node_addr *na, *safe; 1727 1728 spin_lock(&dlm_node_addrs_spin); 1729 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 1730 list_del(&na->list); 1731 while (na->addr_count--) 1732 kfree(na->addr[na->addr_count]); 1733 kfree(na); 1734 } 1735 spin_unlock(&dlm_node_addrs_spin); 1736} 1737