lowcomms.c revision 04bedd79a7037ee7af816b06c60c738144475c4a
1/****************************************************************************** 2******************************************************************************* 3** 4** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 6** 7** This copyrighted material is made available to anyone wishing to use, 8** modify, copy, or redistribute it subject to the terms and conditions 9** of the GNU General Public License v.2. 10** 11******************************************************************************* 12******************************************************************************/ 13 14/* 15 * lowcomms.c 16 * 17 * This is the "low-level" comms layer. 18 * 19 * It is responsible for sending/receiving messages 20 * from other nodes in the cluster. 21 * 22 * Cluster nodes are referred to by their nodeids. nodeids are 23 * simply 32 bit numbers to the locking module - if they need to 24 * be expanded for the cluster infrastructure then that is its 25 * responsibility. It is this layer's 26 * responsibility to resolve these into IP address or 27 * whatever it needs for inter-node communication. 28 * 29 * The comms level is two kernel threads that deal mainly with 30 * the receiving of messages from other nodes and passing them 31 * up to the mid-level comms layer (which understands the 32 * message format) for execution by the locking core, and 33 * a send thread which does all the setting up of connections 34 * to remote nodes and the sending of data. Threads are not allowed 35 * to send their own data because it may cause them to wait in times 36 * of high load. Also, this way, the sending thread can collect together 37 * messages bound for one node and send them in one block. 38 * 39 * lowcomms will choose to use either TCP or SCTP as its transport layer 40 * depending on the configuration variable 'protocol'. This should be set 41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 42 * cluster-wide mechanism as it must be the same on all nodes of the cluster 43 * for the DLM to function. 44 * 45 */ 46 47#include <asm/ioctls.h> 48#include <net/sock.h> 49#include <net/tcp.h> 50#include <linux/pagemap.h> 51#include <linux/file.h> 52#include <linux/mutex.h> 53#include <linux/sctp.h> 54#include <net/sctp/user.h> 55#include <net/ipv6.h> 56 57#include "dlm_internal.h" 58#include "lowcomms.h" 59#include "midcomms.h" 60#include "config.h" 61 62#define NEEDED_RMEM (4*1024*1024) 63#define CONN_HASH_SIZE 32 64 65struct cbuf { 66 unsigned int base; 67 unsigned int len; 68 unsigned int mask; 69}; 70 71static void cbuf_add(struct cbuf *cb, int n) 72{ 73 cb->len += n; 74} 75 76static int cbuf_data(struct cbuf *cb) 77{ 78 return ((cb->base + cb->len) & cb->mask); 79} 80 81static void cbuf_init(struct cbuf *cb, int size) 82{ 83 cb->base = cb->len = 0; 84 cb->mask = size-1; 85} 86 87static void cbuf_eat(struct cbuf *cb, int n) 88{ 89 cb->len -= n; 90 cb->base += n; 91 cb->base &= cb->mask; 92} 93 94static bool cbuf_empty(struct cbuf *cb) 95{ 96 return cb->len == 0; 97} 98 99struct connection { 100 struct socket *sock; /* NULL if not connected */ 101 uint32_t nodeid; /* So we know who we are in the list */ 102 struct mutex sock_mutex; 103 unsigned long flags; 104#define CF_READ_PENDING 1 105#define CF_WRITE_PENDING 2 106#define CF_CONNECT_PENDING 3 107#define CF_INIT_PENDING 4 108#define CF_IS_OTHERCON 5 109#define CF_CLOSE 6 110 struct list_head writequeue; /* List of outgoing writequeue_entries */ 111 spinlock_t writequeue_lock; 112 int (*rx_action) (struct connection *); /* What to do when active */ 113 void (*connect_action) (struct connection *); /* What to do to connect */ 114 struct page *rx_page; 115 struct cbuf cb; 116 int retries; 117#define MAX_CONNECT_RETRIES 3 118 int sctp_assoc; 119 struct hlist_node list; 120 struct connection *othercon; 121 struct work_struct rwork; /* Receive workqueue */ 122 struct work_struct swork; /* Send workqueue */ 123}; 124#define sock2con(x) ((struct connection *)(x)->sk_user_data) 125 126/* An entry waiting to be sent */ 127struct writequeue_entry { 128 struct list_head list; 129 struct page *page; 130 int offset; 131 int len; 132 int end; 133 int users; 134 struct connection *con; 135}; 136 137static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 138static int dlm_local_count; 139 140/* Work queues */ 141static struct workqueue_struct *recv_workqueue; 142static struct workqueue_struct *send_workqueue; 143 144static struct hlist_head connection_hash[CONN_HASH_SIZE]; 145static DEFINE_MUTEX(connections_lock); 146static struct kmem_cache *con_cache; 147 148static void process_recv_sockets(struct work_struct *work); 149static void process_send_sockets(struct work_struct *work); 150 151 152/* This is deliberately very simple because most clusters have simple 153 sequential nodeids, so we should be able to go straight to a connection 154 struct in the array */ 155static inline int nodeid_hash(int nodeid) 156{ 157 return nodeid & (CONN_HASH_SIZE-1); 158} 159 160static struct connection *__find_con(int nodeid) 161{ 162 int r; 163 struct hlist_node *h; 164 struct connection *con; 165 166 r = nodeid_hash(nodeid); 167 168 hlist_for_each_entry(con, h, &connection_hash[r], list) { 169 if (con->nodeid == nodeid) 170 return con; 171 } 172 return NULL; 173} 174 175/* 176 * If 'allocation' is zero then we don't attempt to create a new 177 * connection structure for this node. 178 */ 179static struct connection *__nodeid2con(int nodeid, gfp_t alloc) 180{ 181 struct connection *con = NULL; 182 int r; 183 184 con = __find_con(nodeid); 185 if (con || !alloc) 186 return con; 187 188 con = kmem_cache_zalloc(con_cache, alloc); 189 if (!con) 190 return NULL; 191 192 r = nodeid_hash(nodeid); 193 hlist_add_head(&con->list, &connection_hash[r]); 194 195 con->nodeid = nodeid; 196 mutex_init(&con->sock_mutex); 197 INIT_LIST_HEAD(&con->writequeue); 198 spin_lock_init(&con->writequeue_lock); 199 INIT_WORK(&con->swork, process_send_sockets); 200 INIT_WORK(&con->rwork, process_recv_sockets); 201 202 /* Setup action pointers for child sockets */ 203 if (con->nodeid) { 204 struct connection *zerocon = __find_con(0); 205 206 con->connect_action = zerocon->connect_action; 207 if (!con->rx_action) 208 con->rx_action = zerocon->rx_action; 209 } 210 211 return con; 212} 213 214/* Loop round all connections */ 215static void foreach_conn(void (*conn_func)(struct connection *c)) 216{ 217 int i; 218 struct hlist_node *h, *n; 219 struct connection *con; 220 221 for (i = 0; i < CONN_HASH_SIZE; i++) { 222 hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){ 223 conn_func(con); 224 } 225 } 226} 227 228static struct connection *nodeid2con(int nodeid, gfp_t allocation) 229{ 230 struct connection *con; 231 232 mutex_lock(&connections_lock); 233 con = __nodeid2con(nodeid, allocation); 234 mutex_unlock(&connections_lock); 235 236 return con; 237} 238 239/* This is a bit drastic, but only called when things go wrong */ 240static struct connection *assoc2con(int assoc_id) 241{ 242 int i; 243 struct hlist_node *h; 244 struct connection *con; 245 246 mutex_lock(&connections_lock); 247 248 for (i = 0 ; i < CONN_HASH_SIZE; i++) { 249 hlist_for_each_entry(con, h, &connection_hash[i], list) { 250 if (con && con->sctp_assoc == assoc_id) { 251 mutex_unlock(&connections_lock); 252 return con; 253 } 254 } 255 } 256 mutex_unlock(&connections_lock); 257 return NULL; 258} 259 260static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) 261{ 262 struct sockaddr_storage addr; 263 int error; 264 265 if (!dlm_local_count) 266 return -1; 267 268 error = dlm_nodeid_to_addr(nodeid, &addr); 269 if (error) 270 return error; 271 272 if (dlm_local_addr[0]->ss_family == AF_INET) { 273 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; 274 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; 275 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 276 } else { 277 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; 278 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; 279 ipv6_addr_copy(&ret6->sin6_addr, &in6->sin6_addr); 280 } 281 282 return 0; 283} 284 285/* Data available on socket or listen socket received a connect */ 286static void lowcomms_data_ready(struct sock *sk, int count_unused) 287{ 288 struct connection *con = sock2con(sk); 289 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 290 queue_work(recv_workqueue, &con->rwork); 291} 292 293static void lowcomms_write_space(struct sock *sk) 294{ 295 struct connection *con = sock2con(sk); 296 297 if (con && !test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 298 queue_work(send_workqueue, &con->swork); 299} 300 301static inline void lowcomms_connect_sock(struct connection *con) 302{ 303 if (test_bit(CF_CLOSE, &con->flags)) 304 return; 305 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 306 queue_work(send_workqueue, &con->swork); 307} 308 309static void lowcomms_state_change(struct sock *sk) 310{ 311 if (sk->sk_state == TCP_ESTABLISHED) 312 lowcomms_write_space(sk); 313} 314 315int dlm_lowcomms_connect_node(int nodeid) 316{ 317 struct connection *con; 318 319 /* with sctp there's no connecting without sending */ 320 if (dlm_config.ci_protocol != 0) 321 return 0; 322 323 if (nodeid == dlm_our_nodeid()) 324 return 0; 325 326 con = nodeid2con(nodeid, GFP_NOFS); 327 if (!con) 328 return -ENOMEM; 329 lowcomms_connect_sock(con); 330 return 0; 331} 332 333/* Make a socket active */ 334static int add_sock(struct socket *sock, struct connection *con) 335{ 336 con->sock = sock; 337 338 /* Install a data_ready callback */ 339 con->sock->sk->sk_data_ready = lowcomms_data_ready; 340 con->sock->sk->sk_write_space = lowcomms_write_space; 341 con->sock->sk->sk_state_change = lowcomms_state_change; 342 con->sock->sk->sk_user_data = con; 343 con->sock->sk->sk_allocation = GFP_NOFS; 344 return 0; 345} 346 347/* Add the port number to an IPv6 or 4 sockaddr and return the address 348 length */ 349static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 350 int *addr_len) 351{ 352 saddr->ss_family = dlm_local_addr[0]->ss_family; 353 if (saddr->ss_family == AF_INET) { 354 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 355 in4_addr->sin_port = cpu_to_be16(port); 356 *addr_len = sizeof(struct sockaddr_in); 357 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 358 } else { 359 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 360 in6_addr->sin6_port = cpu_to_be16(port); 361 *addr_len = sizeof(struct sockaddr_in6); 362 } 363 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 364} 365 366/* Close a remote connection and tidy up */ 367static void close_connection(struct connection *con, bool and_other) 368{ 369 mutex_lock(&con->sock_mutex); 370 371 if (con->sock) { 372 sock_release(con->sock); 373 con->sock = NULL; 374 } 375 if (con->othercon && and_other) { 376 /* Will only re-enter once. */ 377 close_connection(con->othercon, false); 378 } 379 if (con->rx_page) { 380 __free_page(con->rx_page); 381 con->rx_page = NULL; 382 } 383 384 con->retries = 0; 385 mutex_unlock(&con->sock_mutex); 386} 387 388/* We only send shutdown messages to nodes that are not part of the cluster */ 389static void sctp_send_shutdown(sctp_assoc_t associd) 390{ 391 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 392 struct msghdr outmessage; 393 struct cmsghdr *cmsg; 394 struct sctp_sndrcvinfo *sinfo; 395 int ret; 396 struct connection *con; 397 398 con = nodeid2con(0,0); 399 BUG_ON(con == NULL); 400 401 outmessage.msg_name = NULL; 402 outmessage.msg_namelen = 0; 403 outmessage.msg_control = outcmsg; 404 outmessage.msg_controllen = sizeof(outcmsg); 405 outmessage.msg_flags = MSG_EOR; 406 407 cmsg = CMSG_FIRSTHDR(&outmessage); 408 cmsg->cmsg_level = IPPROTO_SCTP; 409 cmsg->cmsg_type = SCTP_SNDRCV; 410 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 411 outmessage.msg_controllen = cmsg->cmsg_len; 412 sinfo = CMSG_DATA(cmsg); 413 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 414 415 sinfo->sinfo_flags |= MSG_EOF; 416 sinfo->sinfo_assoc_id = associd; 417 418 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); 419 420 if (ret != 0) 421 log_print("send EOF to node failed: %d", ret); 422} 423 424static void sctp_init_failed_foreach(struct connection *con) 425{ 426 con->sctp_assoc = 0; 427 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 428 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 429 queue_work(send_workqueue, &con->swork); 430 } 431} 432 433/* INIT failed but we don't know which node... 434 restart INIT on all pending nodes */ 435static void sctp_init_failed(void) 436{ 437 mutex_lock(&connections_lock); 438 439 foreach_conn(sctp_init_failed_foreach); 440 441 mutex_unlock(&connections_lock); 442} 443 444/* Something happened to an association */ 445static void process_sctp_notification(struct connection *con, 446 struct msghdr *msg, char *buf) 447{ 448 union sctp_notification *sn = (union sctp_notification *)buf; 449 450 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { 451 switch (sn->sn_assoc_change.sac_state) { 452 453 case SCTP_COMM_UP: 454 case SCTP_RESTART: 455 { 456 /* Check that the new node is in the lockspace */ 457 struct sctp_prim prim; 458 int nodeid; 459 int prim_len, ret; 460 int addr_len; 461 struct connection *new_con; 462 struct file *file; 463 sctp_peeloff_arg_t parg; 464 int parglen = sizeof(parg); 465 466 /* 467 * We get this before any data for an association. 468 * We verify that the node is in the cluster and 469 * then peel off a socket for it. 470 */ 471 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 472 log_print("COMM_UP for invalid assoc ID %d", 473 (int)sn->sn_assoc_change.sac_assoc_id); 474 sctp_init_failed(); 475 return; 476 } 477 memset(&prim, 0, sizeof(struct sctp_prim)); 478 prim_len = sizeof(struct sctp_prim); 479 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; 480 481 ret = kernel_getsockopt(con->sock, 482 IPPROTO_SCTP, 483 SCTP_PRIMARY_ADDR, 484 (char*)&prim, 485 &prim_len); 486 if (ret < 0) { 487 log_print("getsockopt/sctp_primary_addr on " 488 "new assoc %d failed : %d", 489 (int)sn->sn_assoc_change.sac_assoc_id, 490 ret); 491 492 /* Retry INIT later */ 493 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 494 if (new_con) 495 clear_bit(CF_CONNECT_PENDING, &con->flags); 496 return; 497 } 498 make_sockaddr(&prim.ssp_addr, 0, &addr_len); 499 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { 500 int i; 501 unsigned char *b=(unsigned char *)&prim.ssp_addr; 502 log_print("reject connect from unknown addr"); 503 for (i=0; i<sizeof(struct sockaddr_storage);i++) 504 printk("%02x ", b[i]); 505 printk("\n"); 506 sctp_send_shutdown(prim.ssp_assoc_id); 507 return; 508 } 509 510 new_con = nodeid2con(nodeid, GFP_NOFS); 511 if (!new_con) 512 return; 513 514 /* Peel off a new sock */ 515 parg.associd = sn->sn_assoc_change.sac_assoc_id; 516 ret = kernel_getsockopt(con->sock, IPPROTO_SCTP, 517 SCTP_SOCKOPT_PEELOFF, 518 (void *)&parg, &parglen); 519 if (ret) { 520 log_print("Can't peel off a socket for " 521 "connection %d to node %d: err=%d\n", 522 parg.associd, nodeid, ret); 523 } 524 file = fget(parg.sd); 525 new_con->sock = SOCKET_I(file->f_dentry->d_inode); 526 add_sock(new_con->sock, new_con); 527 fput(file); 528 put_unused_fd(parg.sd); 529 530 log_print("got new/restarted association %d nodeid %d", 531 (int)sn->sn_assoc_change.sac_assoc_id, nodeid); 532 533 /* Send any pending writes */ 534 clear_bit(CF_CONNECT_PENDING, &new_con->flags); 535 clear_bit(CF_INIT_PENDING, &con->flags); 536 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { 537 queue_work(send_workqueue, &new_con->swork); 538 } 539 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) 540 queue_work(recv_workqueue, &new_con->rwork); 541 } 542 break; 543 544 case SCTP_COMM_LOST: 545 case SCTP_SHUTDOWN_COMP: 546 { 547 con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 548 if (con) { 549 con->sctp_assoc = 0; 550 } 551 } 552 break; 553 554 /* We don't know which INIT failed, so clear the PENDING flags 555 * on them all. if assoc_id is zero then it will then try 556 * again */ 557 558 case SCTP_CANT_STR_ASSOC: 559 { 560 log_print("Can't start SCTP association - retrying"); 561 sctp_init_failed(); 562 } 563 break; 564 565 default: 566 log_print("unexpected SCTP assoc change id=%d state=%d", 567 (int)sn->sn_assoc_change.sac_assoc_id, 568 sn->sn_assoc_change.sac_state); 569 } 570 } 571} 572 573/* Data received from remote end */ 574static int receive_from_sock(struct connection *con) 575{ 576 int ret = 0; 577 struct msghdr msg = {}; 578 struct kvec iov[2]; 579 unsigned len; 580 int r; 581 int call_again_soon = 0; 582 int nvec; 583 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 584 585 mutex_lock(&con->sock_mutex); 586 587 if (con->sock == NULL) { 588 ret = -EAGAIN; 589 goto out_close; 590 } 591 592 if (con->rx_page == NULL) { 593 /* 594 * This doesn't need to be atomic, but I think it should 595 * improve performance if it is. 596 */ 597 con->rx_page = alloc_page(GFP_ATOMIC); 598 if (con->rx_page == NULL) 599 goto out_resched; 600 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 601 } 602 603 /* Only SCTP needs these really */ 604 memset(&incmsg, 0, sizeof(incmsg)); 605 msg.msg_control = incmsg; 606 msg.msg_controllen = sizeof(incmsg); 607 608 /* 609 * iov[0] is the bit of the circular buffer between the current end 610 * point (cb.base + cb.len) and the end of the buffer. 611 */ 612 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); 613 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); 614 iov[1].iov_len = 0; 615 nvec = 1; 616 617 /* 618 * iov[1] is the bit of the circular buffer between the start of the 619 * buffer and the start of the currently used section (cb.base) 620 */ 621 if (cbuf_data(&con->cb) >= con->cb.base) { 622 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); 623 iov[1].iov_len = con->cb.base; 624 iov[1].iov_base = page_address(con->rx_page); 625 nvec = 2; 626 } 627 len = iov[0].iov_len + iov[1].iov_len; 628 629 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, 630 MSG_DONTWAIT | MSG_NOSIGNAL); 631 if (ret <= 0) 632 goto out_close; 633 634 /* Process SCTP notifications */ 635 if (msg.msg_flags & MSG_NOTIFICATION) { 636 msg.msg_control = incmsg; 637 msg.msg_controllen = sizeof(incmsg); 638 639 process_sctp_notification(con, &msg, 640 page_address(con->rx_page) + con->cb.base); 641 mutex_unlock(&con->sock_mutex); 642 return 0; 643 } 644 BUG_ON(con->nodeid == 0); 645 646 if (ret == len) 647 call_again_soon = 1; 648 cbuf_add(&con->cb, ret); 649 ret = dlm_process_incoming_buffer(con->nodeid, 650 page_address(con->rx_page), 651 con->cb.base, con->cb.len, 652 PAGE_CACHE_SIZE); 653 if (ret == -EBADMSG) { 654 log_print("lowcomms: addr=%p, base=%u, len=%u, " 655 "iov_len=%u, iov_base[0]=%p, read=%d", 656 page_address(con->rx_page), con->cb.base, con->cb.len, 657 len, iov[0].iov_base, r); 658 } 659 if (ret < 0) 660 goto out_close; 661 cbuf_eat(&con->cb, ret); 662 663 if (cbuf_empty(&con->cb) && !call_again_soon) { 664 __free_page(con->rx_page); 665 con->rx_page = NULL; 666 } 667 668 if (call_again_soon) 669 goto out_resched; 670 mutex_unlock(&con->sock_mutex); 671 return 0; 672 673out_resched: 674 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 675 queue_work(recv_workqueue, &con->rwork); 676 mutex_unlock(&con->sock_mutex); 677 return -EAGAIN; 678 679out_close: 680 mutex_unlock(&con->sock_mutex); 681 if (ret != -EAGAIN) { 682 close_connection(con, false); 683 /* Reconnect when there is something to send */ 684 } 685 /* Don't return success if we really got EOF */ 686 if (ret == 0) 687 ret = -EAGAIN; 688 689 return ret; 690} 691 692/* Listening socket is busy, accept a connection */ 693static int tcp_accept_from_sock(struct connection *con) 694{ 695 int result; 696 struct sockaddr_storage peeraddr; 697 struct socket *newsock; 698 int len; 699 int nodeid; 700 struct connection *newcon; 701 struct connection *addcon; 702 703 memset(&peeraddr, 0, sizeof(peeraddr)); 704 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 705 IPPROTO_TCP, &newsock); 706 if (result < 0) 707 return -ENOMEM; 708 709 mutex_lock_nested(&con->sock_mutex, 0); 710 711 result = -ENOTCONN; 712 if (con->sock == NULL) 713 goto accept_err; 714 715 newsock->type = con->sock->type; 716 newsock->ops = con->sock->ops; 717 718 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); 719 if (result < 0) 720 goto accept_err; 721 722 /* Get the connected socket's peer */ 723 memset(&peeraddr, 0, sizeof(peeraddr)); 724 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 725 &len, 2)) { 726 result = -ECONNABORTED; 727 goto accept_err; 728 } 729 730 /* Get the new node's NODEID */ 731 make_sockaddr(&peeraddr, 0, &len); 732 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 733 log_print("connect from non cluster node"); 734 sock_release(newsock); 735 mutex_unlock(&con->sock_mutex); 736 return -1; 737 } 738 739 log_print("got connection from %d", nodeid); 740 741 /* Check to see if we already have a connection to this node. This 742 * could happen if the two nodes initiate a connection at roughly 743 * the same time and the connections cross on the wire. 744 * In this case we store the incoming one in "othercon" 745 */ 746 newcon = nodeid2con(nodeid, GFP_NOFS); 747 if (!newcon) { 748 result = -ENOMEM; 749 goto accept_err; 750 } 751 mutex_lock_nested(&newcon->sock_mutex, 1); 752 if (newcon->sock) { 753 struct connection *othercon = newcon->othercon; 754 755 if (!othercon) { 756 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); 757 if (!othercon) { 758 log_print("failed to allocate incoming socket"); 759 mutex_unlock(&newcon->sock_mutex); 760 result = -ENOMEM; 761 goto accept_err; 762 } 763 othercon->nodeid = nodeid; 764 othercon->rx_action = receive_from_sock; 765 mutex_init(&othercon->sock_mutex); 766 INIT_WORK(&othercon->swork, process_send_sockets); 767 INIT_WORK(&othercon->rwork, process_recv_sockets); 768 set_bit(CF_IS_OTHERCON, &othercon->flags); 769 } 770 if (!othercon->sock) { 771 newcon->othercon = othercon; 772 othercon->sock = newsock; 773 newsock->sk->sk_user_data = othercon; 774 add_sock(newsock, othercon); 775 addcon = othercon; 776 } 777 else { 778 printk("Extra connection from node %d attempted\n", nodeid); 779 result = -EAGAIN; 780 mutex_unlock(&newcon->sock_mutex); 781 goto accept_err; 782 } 783 } 784 else { 785 newsock->sk->sk_user_data = newcon; 786 newcon->rx_action = receive_from_sock; 787 add_sock(newsock, newcon); 788 addcon = newcon; 789 } 790 791 mutex_unlock(&newcon->sock_mutex); 792 793 /* 794 * Add it to the active queue in case we got data 795 * beween processing the accept adding the socket 796 * to the read_sockets list 797 */ 798 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 799 queue_work(recv_workqueue, &addcon->rwork); 800 mutex_unlock(&con->sock_mutex); 801 802 return 0; 803 804accept_err: 805 mutex_unlock(&con->sock_mutex); 806 sock_release(newsock); 807 808 if (result != -EAGAIN) 809 log_print("error accepting connection from node: %d", result); 810 return result; 811} 812 813static void free_entry(struct writequeue_entry *e) 814{ 815 __free_page(e->page); 816 kfree(e); 817} 818 819/* Initiate an SCTP association. 820 This is a special case of send_to_sock() in that we don't yet have a 821 peeled-off socket for this association, so we use the listening socket 822 and add the primary IP address of the remote node. 823 */ 824static void sctp_init_assoc(struct connection *con) 825{ 826 struct sockaddr_storage rem_addr; 827 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 828 struct msghdr outmessage; 829 struct cmsghdr *cmsg; 830 struct sctp_sndrcvinfo *sinfo; 831 struct connection *base_con; 832 struct writequeue_entry *e; 833 int len, offset; 834 int ret; 835 int addrlen; 836 struct kvec iov[1]; 837 838 if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) 839 return; 840 841 if (con->retries++ > MAX_CONNECT_RETRIES) 842 return; 843 844 log_print("Initiating association with node %d", con->nodeid); 845 846 if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { 847 log_print("no address for nodeid %d", con->nodeid); 848 return; 849 } 850 base_con = nodeid2con(0, 0); 851 BUG_ON(base_con == NULL); 852 853 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); 854 855 outmessage.msg_name = &rem_addr; 856 outmessage.msg_namelen = addrlen; 857 outmessage.msg_control = outcmsg; 858 outmessage.msg_controllen = sizeof(outcmsg); 859 outmessage.msg_flags = MSG_EOR; 860 861 spin_lock(&con->writequeue_lock); 862 863 if (list_empty(&con->writequeue)) { 864 spin_unlock(&con->writequeue_lock); 865 log_print("writequeue empty for nodeid %d", con->nodeid); 866 return; 867 } 868 869 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 870 len = e->len; 871 offset = e->offset; 872 spin_unlock(&con->writequeue_lock); 873 874 /* Send the first block off the write queue */ 875 iov[0].iov_base = page_address(e->page)+offset; 876 iov[0].iov_len = len; 877 878 cmsg = CMSG_FIRSTHDR(&outmessage); 879 cmsg->cmsg_level = IPPROTO_SCTP; 880 cmsg->cmsg_type = SCTP_SNDRCV; 881 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 882 sinfo = CMSG_DATA(cmsg); 883 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 884 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); 885 outmessage.msg_controllen = cmsg->cmsg_len; 886 887 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); 888 if (ret < 0) { 889 log_print("Send first packet to node %d failed: %d", 890 con->nodeid, ret); 891 892 /* Try again later */ 893 clear_bit(CF_CONNECT_PENDING, &con->flags); 894 clear_bit(CF_INIT_PENDING, &con->flags); 895 } 896 else { 897 spin_lock(&con->writequeue_lock); 898 e->offset += ret; 899 e->len -= ret; 900 901 if (e->len == 0 && e->users == 0) { 902 list_del(&e->list); 903 free_entry(e); 904 } 905 spin_unlock(&con->writequeue_lock); 906 } 907} 908 909/* Connect a new socket to its peer */ 910static void tcp_connect_to_sock(struct connection *con) 911{ 912 int result = -EHOSTUNREACH; 913 struct sockaddr_storage saddr, src_addr; 914 int addr_len; 915 struct socket *sock = NULL; 916 917 if (con->nodeid == 0) { 918 log_print("attempt to connect sock 0 foiled"); 919 return; 920 } 921 922 mutex_lock(&con->sock_mutex); 923 if (con->retries++ > MAX_CONNECT_RETRIES) 924 goto out; 925 926 /* Some odd races can cause double-connects, ignore them */ 927 if (con->sock) { 928 result = 0; 929 goto out; 930 } 931 932 /* Create a socket to communicate with */ 933 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 934 IPPROTO_TCP, &sock); 935 if (result < 0) 936 goto out_err; 937 938 memset(&saddr, 0, sizeof(saddr)); 939 if (dlm_nodeid_to_addr(con->nodeid, &saddr)) 940 goto out_err; 941 942 sock->sk->sk_user_data = con; 943 con->rx_action = receive_from_sock; 944 con->connect_action = tcp_connect_to_sock; 945 add_sock(sock, con); 946 947 /* Bind to our cluster-known address connecting to avoid 948 routing problems */ 949 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 950 make_sockaddr(&src_addr, 0, &addr_len); 951 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 952 addr_len); 953 if (result < 0) { 954 log_print("could not bind for connect: %d", result); 955 /* This *may* not indicate a critical error */ 956 } 957 958 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 959 960 log_print("connecting to %d", con->nodeid); 961 result = 962 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 963 O_NONBLOCK); 964 if (result == -EINPROGRESS) 965 result = 0; 966 if (result == 0) 967 goto out; 968 969out_err: 970 if (con->sock) { 971 sock_release(con->sock); 972 con->sock = NULL; 973 } else if (sock) { 974 sock_release(sock); 975 } 976 /* 977 * Some errors are fatal and this list might need adjusting. For other 978 * errors we try again until the max number of retries is reached. 979 */ 980 if (result != -EHOSTUNREACH && result != -ENETUNREACH && 981 result != -ENETDOWN && result != -EINVAL 982 && result != -EPROTONOSUPPORT) { 983 lowcomms_connect_sock(con); 984 result = 0; 985 } 986out: 987 mutex_unlock(&con->sock_mutex); 988 return; 989} 990 991static struct socket *tcp_create_listen_sock(struct connection *con, 992 struct sockaddr_storage *saddr) 993{ 994 struct socket *sock = NULL; 995 int result = 0; 996 int one = 1; 997 int addr_len; 998 999 if (dlm_local_addr[0]->ss_family == AF_INET) 1000 addr_len = sizeof(struct sockaddr_in); 1001 else 1002 addr_len = sizeof(struct sockaddr_in6); 1003 1004 /* Create a socket to communicate with */ 1005 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1006 IPPROTO_TCP, &sock); 1007 if (result < 0) { 1008 log_print("Can't create listening comms socket"); 1009 goto create_out; 1010 } 1011 1012 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1013 (char *)&one, sizeof(one)); 1014 1015 if (result < 0) { 1016 log_print("Failed to set SO_REUSEADDR on socket: %d", result); 1017 } 1018 sock->sk->sk_user_data = con; 1019 con->rx_action = tcp_accept_from_sock; 1020 con->connect_action = tcp_connect_to_sock; 1021 con->sock = sock; 1022 1023 /* Bind to our port */ 1024 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1025 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1026 if (result < 0) { 1027 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1028 sock_release(sock); 1029 sock = NULL; 1030 con->sock = NULL; 1031 goto create_out; 1032 } 1033 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 1034 (char *)&one, sizeof(one)); 1035 if (result < 0) { 1036 log_print("Set keepalive failed: %d", result); 1037 } 1038 1039 result = sock->ops->listen(sock, 5); 1040 if (result < 0) { 1041 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1042 sock_release(sock); 1043 sock = NULL; 1044 goto create_out; 1045 } 1046 1047create_out: 1048 return sock; 1049} 1050 1051/* Get local addresses */ 1052static void init_local(void) 1053{ 1054 struct sockaddr_storage sas, *addr; 1055 int i; 1056 1057 dlm_local_count = 0; 1058 for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) { 1059 if (dlm_our_addr(&sas, i)) 1060 break; 1061 1062 addr = kmalloc(sizeof(*addr), GFP_KERNEL); 1063 if (!addr) 1064 break; 1065 memcpy(addr, &sas, sizeof(*addr)); 1066 dlm_local_addr[dlm_local_count++] = addr; 1067 } 1068} 1069 1070/* Bind to an IP address. SCTP allows multiple address so it can do 1071 multi-homing */ 1072static int add_sctp_bind_addr(struct connection *sctp_con, 1073 struct sockaddr_storage *addr, 1074 int addr_len, int num) 1075{ 1076 int result = 0; 1077 1078 if (num == 1) 1079 result = kernel_bind(sctp_con->sock, 1080 (struct sockaddr *) addr, 1081 addr_len); 1082 else 1083 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, 1084 SCTP_SOCKOPT_BINDX_ADD, 1085 (char *)addr, addr_len); 1086 1087 if (result < 0) 1088 log_print("Can't bind to port %d addr number %d", 1089 dlm_config.ci_tcp_port, num); 1090 1091 return result; 1092} 1093 1094/* Initialise SCTP socket and bind to all interfaces */ 1095static int sctp_listen_for_all(void) 1096{ 1097 struct socket *sock = NULL; 1098 struct sockaddr_storage localaddr; 1099 struct sctp_event_subscribe subscribe; 1100 int result = -EINVAL, num = 1, i, addr_len; 1101 struct connection *con = nodeid2con(0, GFP_KERNEL); 1102 int bufsize = NEEDED_RMEM; 1103 1104 if (!con) 1105 return -ENOMEM; 1106 1107 log_print("Using SCTP for communications"); 1108 1109 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, 1110 IPPROTO_SCTP, &sock); 1111 if (result < 0) { 1112 log_print("Can't create comms socket, check SCTP is loaded"); 1113 goto out; 1114 } 1115 1116 /* Listen for events */ 1117 memset(&subscribe, 0, sizeof(subscribe)); 1118 subscribe.sctp_data_io_event = 1; 1119 subscribe.sctp_association_event = 1; 1120 subscribe.sctp_send_failure_event = 1; 1121 subscribe.sctp_shutdown_event = 1; 1122 subscribe.sctp_partial_delivery_event = 1; 1123 1124 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, 1125 (char *)&bufsize, sizeof(bufsize)); 1126 if (result) 1127 log_print("Error increasing buffer space on socket %d", result); 1128 1129 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, 1130 (char *)&subscribe, sizeof(subscribe)); 1131 if (result < 0) { 1132 log_print("Failed to set SCTP_EVENTS on socket: result=%d", 1133 result); 1134 goto create_delsock; 1135 } 1136 1137 /* Init con struct */ 1138 sock->sk->sk_user_data = con; 1139 con->sock = sock; 1140 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1141 con->rx_action = receive_from_sock; 1142 con->connect_action = sctp_init_assoc; 1143 1144 /* Bind to all interfaces. */ 1145 for (i = 0; i < dlm_local_count; i++) { 1146 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 1147 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); 1148 1149 result = add_sctp_bind_addr(con, &localaddr, addr_len, num); 1150 if (result) 1151 goto create_delsock; 1152 ++num; 1153 } 1154 1155 result = sock->ops->listen(sock, 5); 1156 if (result < 0) { 1157 log_print("Can't set socket listening"); 1158 goto create_delsock; 1159 } 1160 1161 return 0; 1162 1163create_delsock: 1164 sock_release(sock); 1165 con->sock = NULL; 1166out: 1167 return result; 1168} 1169 1170static int tcp_listen_for_all(void) 1171{ 1172 struct socket *sock = NULL; 1173 struct connection *con = nodeid2con(0, GFP_KERNEL); 1174 int result = -EINVAL; 1175 1176 if (!con) 1177 return -ENOMEM; 1178 1179 /* We don't support multi-homed hosts */ 1180 if (dlm_local_addr[1] != NULL) { 1181 log_print("TCP protocol can't handle multi-homed hosts, " 1182 "try SCTP"); 1183 return -EINVAL; 1184 } 1185 1186 log_print("Using TCP for communications"); 1187 1188 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1189 if (sock) { 1190 add_sock(sock, con); 1191 result = 0; 1192 } 1193 else { 1194 result = -EADDRINUSE; 1195 } 1196 1197 return result; 1198} 1199 1200 1201 1202static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1203 gfp_t allocation) 1204{ 1205 struct writequeue_entry *entry; 1206 1207 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1208 if (!entry) 1209 return NULL; 1210 1211 entry->page = alloc_page(allocation); 1212 if (!entry->page) { 1213 kfree(entry); 1214 return NULL; 1215 } 1216 1217 entry->offset = 0; 1218 entry->len = 0; 1219 entry->end = 0; 1220 entry->users = 0; 1221 entry->con = con; 1222 1223 return entry; 1224} 1225 1226void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1227{ 1228 struct connection *con; 1229 struct writequeue_entry *e; 1230 int offset = 0; 1231 int users = 0; 1232 1233 con = nodeid2con(nodeid, allocation); 1234 if (!con) 1235 return NULL; 1236 1237 spin_lock(&con->writequeue_lock); 1238 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1239 if ((&e->list == &con->writequeue) || 1240 (PAGE_CACHE_SIZE - e->end < len)) { 1241 e = NULL; 1242 } else { 1243 offset = e->end; 1244 e->end += len; 1245 users = e->users++; 1246 } 1247 spin_unlock(&con->writequeue_lock); 1248 1249 if (e) { 1250 got_one: 1251 *ppc = page_address(e->page) + offset; 1252 return e; 1253 } 1254 1255 e = new_writequeue_entry(con, allocation); 1256 if (e) { 1257 spin_lock(&con->writequeue_lock); 1258 offset = e->end; 1259 e->end += len; 1260 users = e->users++; 1261 list_add_tail(&e->list, &con->writequeue); 1262 spin_unlock(&con->writequeue_lock); 1263 goto got_one; 1264 } 1265 return NULL; 1266} 1267 1268void dlm_lowcomms_commit_buffer(void *mh) 1269{ 1270 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1271 struct connection *con = e->con; 1272 int users; 1273 1274 spin_lock(&con->writequeue_lock); 1275 users = --e->users; 1276 if (users) 1277 goto out; 1278 e->len = e->end - e->offset; 1279 spin_unlock(&con->writequeue_lock); 1280 1281 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { 1282 queue_work(send_workqueue, &con->swork); 1283 } 1284 return; 1285 1286out: 1287 spin_unlock(&con->writequeue_lock); 1288 return; 1289} 1290 1291/* Send a message */ 1292static void send_to_sock(struct connection *con) 1293{ 1294 int ret = 0; 1295 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1296 struct writequeue_entry *e; 1297 int len, offset; 1298 1299 mutex_lock(&con->sock_mutex); 1300 if (con->sock == NULL) 1301 goto out_connect; 1302 1303 spin_lock(&con->writequeue_lock); 1304 for (;;) { 1305 e = list_entry(con->writequeue.next, struct writequeue_entry, 1306 list); 1307 if ((struct list_head *) e == &con->writequeue) 1308 break; 1309 1310 len = e->len; 1311 offset = e->offset; 1312 BUG_ON(len == 0 && e->users == 0); 1313 spin_unlock(&con->writequeue_lock); 1314 1315 ret = 0; 1316 if (len) { 1317 ret = kernel_sendpage(con->sock, e->page, offset, len, 1318 msg_flags); 1319 if (ret == -EAGAIN || ret == 0) { 1320 cond_resched(); 1321 goto out; 1322 } 1323 if (ret <= 0) 1324 goto send_error; 1325 } 1326 /* Don't starve people filling buffers */ 1327 cond_resched(); 1328 1329 spin_lock(&con->writequeue_lock); 1330 e->offset += ret; 1331 e->len -= ret; 1332 1333 if (e->len == 0 && e->users == 0) { 1334 list_del(&e->list); 1335 free_entry(e); 1336 continue; 1337 } 1338 } 1339 spin_unlock(&con->writequeue_lock); 1340out: 1341 mutex_unlock(&con->sock_mutex); 1342 return; 1343 1344send_error: 1345 mutex_unlock(&con->sock_mutex); 1346 close_connection(con, false); 1347 lowcomms_connect_sock(con); 1348 return; 1349 1350out_connect: 1351 mutex_unlock(&con->sock_mutex); 1352 if (!test_bit(CF_INIT_PENDING, &con->flags)) 1353 lowcomms_connect_sock(con); 1354 return; 1355} 1356 1357static void clean_one_writequeue(struct connection *con) 1358{ 1359 struct writequeue_entry *e, *safe; 1360 1361 spin_lock(&con->writequeue_lock); 1362 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1363 list_del(&e->list); 1364 free_entry(e); 1365 } 1366 spin_unlock(&con->writequeue_lock); 1367} 1368 1369/* Called from recovery when it knows that a node has 1370 left the cluster */ 1371int dlm_lowcomms_close(int nodeid) 1372{ 1373 struct connection *con; 1374 1375 log_print("closing connection to node %d", nodeid); 1376 con = nodeid2con(nodeid, 0); 1377 if (con) { 1378 clear_bit(CF_CONNECT_PENDING, &con->flags); 1379 clear_bit(CF_WRITE_PENDING, &con->flags); 1380 set_bit(CF_CLOSE, &con->flags); 1381 if (cancel_work_sync(&con->swork)) 1382 log_print("canceled swork for node %d", nodeid); 1383 if (cancel_work_sync(&con->rwork)) 1384 log_print("canceled rwork for node %d", nodeid); 1385 clean_one_writequeue(con); 1386 close_connection(con, true); 1387 } 1388 return 0; 1389} 1390 1391/* Receive workqueue function */ 1392static void process_recv_sockets(struct work_struct *work) 1393{ 1394 struct connection *con = container_of(work, struct connection, rwork); 1395 int err; 1396 1397 clear_bit(CF_READ_PENDING, &con->flags); 1398 do { 1399 err = con->rx_action(con); 1400 } while (!err); 1401} 1402 1403/* Send workqueue function */ 1404static void process_send_sockets(struct work_struct *work) 1405{ 1406 struct connection *con = container_of(work, struct connection, swork); 1407 1408 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 1409 con->connect_action(con); 1410 set_bit(CF_WRITE_PENDING, &con->flags); 1411 } 1412 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) 1413 send_to_sock(con); 1414} 1415 1416 1417/* Discard all entries on the write queues */ 1418static void clean_writequeues(void) 1419{ 1420 foreach_conn(clean_one_writequeue); 1421} 1422 1423static void work_stop(void) 1424{ 1425 destroy_workqueue(recv_workqueue); 1426 destroy_workqueue(send_workqueue); 1427} 1428 1429static int work_start(void) 1430{ 1431 int error; 1432 recv_workqueue = create_workqueue("dlm_recv"); 1433 error = IS_ERR(recv_workqueue); 1434 if (error) { 1435 log_print("can't start dlm_recv %d", error); 1436 return error; 1437 } 1438 1439 send_workqueue = create_singlethread_workqueue("dlm_send"); 1440 error = IS_ERR(send_workqueue); 1441 if (error) { 1442 log_print("can't start dlm_send %d", error); 1443 destroy_workqueue(recv_workqueue); 1444 return error; 1445 } 1446 1447 return 0; 1448} 1449 1450static void stop_conn(struct connection *con) 1451{ 1452 con->flags |= 0x0F; 1453 if (con->sock && con->sock->sk) 1454 con->sock->sk->sk_user_data = NULL; 1455} 1456 1457static void free_conn(struct connection *con) 1458{ 1459 close_connection(con, true); 1460 if (con->othercon) 1461 kmem_cache_free(con_cache, con->othercon); 1462 hlist_del(&con->list); 1463 kmem_cache_free(con_cache, con); 1464} 1465 1466void dlm_lowcomms_stop(void) 1467{ 1468 /* Set all the flags to prevent any 1469 socket activity. 1470 */ 1471 mutex_lock(&connections_lock); 1472 foreach_conn(stop_conn); 1473 mutex_unlock(&connections_lock); 1474 1475 work_stop(); 1476 1477 mutex_lock(&connections_lock); 1478 clean_writequeues(); 1479 1480 foreach_conn(free_conn); 1481 1482 mutex_unlock(&connections_lock); 1483 kmem_cache_destroy(con_cache); 1484} 1485 1486int dlm_lowcomms_start(void) 1487{ 1488 int error = -EINVAL; 1489 struct connection *con; 1490 int i; 1491 1492 for (i = 0; i < CONN_HASH_SIZE; i++) 1493 INIT_HLIST_HEAD(&connection_hash[i]); 1494 1495 init_local(); 1496 if (!dlm_local_count) { 1497 error = -ENOTCONN; 1498 log_print("no local IP address has been set"); 1499 goto out; 1500 } 1501 1502 error = -ENOMEM; 1503 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1504 __alignof__(struct connection), 0, 1505 NULL); 1506 if (!con_cache) 1507 goto out; 1508 1509 /* Start listening */ 1510 if (dlm_config.ci_protocol == 0) 1511 error = tcp_listen_for_all(); 1512 else 1513 error = sctp_listen_for_all(); 1514 if (error) 1515 goto fail_unlisten; 1516 1517 error = work_start(); 1518 if (error) 1519 goto fail_unlisten; 1520 1521 return 0; 1522 1523fail_unlisten: 1524 con = nodeid2con(0,0); 1525 if (con) { 1526 close_connection(con, false); 1527 kmem_cache_free(con_cache, con); 1528 } 1529 kmem_cache_destroy(con_cache); 1530 1531out: 1532 return error; 1533} 1534