lowcomms.c revision 36b71a8bfbc92e1ba164e9aec840c0180ee933b5
1/******************************************************************************
2*******************************************************************************
3**
4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
6**
7**  This copyrighted material is made available to anyone wishing to use,
8**  modify, copy, or redistribute it subject to the terms and conditions
9**  of the GNU General Public License v.2.
10**
11*******************************************************************************
12******************************************************************************/
13
14/*
15 * lowcomms.c
16 *
17 * This is the "low-level" comms layer.
18 *
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
21 *
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is its
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
28 *
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
38 *
39 * lowcomms will choose to use either TCP or SCTP as its transport layer
40 * depending on the configuration variable 'protocol'. This should be set
41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
43 * for the DLM to function.
44 *
45 */
46
47#include <asm/ioctls.h>
48#include <net/sock.h>
49#include <net/tcp.h>
50#include <linux/pagemap.h>
51#include <linux/file.h>
52#include <linux/mutex.h>
53#include <linux/sctp.h>
54#include <linux/slab.h>
55#include <net/sctp/sctp.h>
56#include <net/sctp/user.h>
57#include <net/ipv6.h>
58
59#include "dlm_internal.h"
60#include "lowcomms.h"
61#include "midcomms.h"
62#include "config.h"
63
64#define NEEDED_RMEM (4*1024*1024)
65#define CONN_HASH_SIZE 32
66
67/* Number of messages to send before rescheduling */
68#define MAX_SEND_MSG_COUNT 25
69
70struct cbuf {
71	unsigned int base;
72	unsigned int len;
73	unsigned int mask;
74};
75
76static void cbuf_add(struct cbuf *cb, int n)
77{
78	cb->len += n;
79}
80
81static int cbuf_data(struct cbuf *cb)
82{
83	return ((cb->base + cb->len) & cb->mask);
84}
85
86static void cbuf_init(struct cbuf *cb, int size)
87{
88	cb->base = cb->len = 0;
89	cb->mask = size-1;
90}
91
92static void cbuf_eat(struct cbuf *cb, int n)
93{
94	cb->len  -= n;
95	cb->base += n;
96	cb->base &= cb->mask;
97}
98
99static bool cbuf_empty(struct cbuf *cb)
100{
101	return cb->len == 0;
102}
103
104struct connection {
105	struct socket *sock;	/* NULL if not connected */
106	uint32_t nodeid;	/* So we know who we are in the list */
107	struct mutex sock_mutex;
108	unsigned long flags;
109#define CF_READ_PENDING 1
110#define CF_WRITE_PENDING 2
111#define CF_CONNECT_PENDING 3
112#define CF_INIT_PENDING 4
113#define CF_IS_OTHERCON 5
114#define CF_CLOSE 6
115#define CF_APP_LIMITED 7
116	struct list_head writequeue;  /* List of outgoing writequeue_entries */
117	spinlock_t writequeue_lock;
118	int (*rx_action) (struct connection *);	/* What to do when active */
119	void (*connect_action) (struct connection *);	/* What to do to connect */
120	struct page *rx_page;
121	struct cbuf cb;
122	int retries;
123#define MAX_CONNECT_RETRIES 3
124	int sctp_assoc;
125	struct hlist_node list;
126	struct connection *othercon;
127	struct work_struct rwork; /* Receive workqueue */
128	struct work_struct swork; /* Send workqueue */
129};
130#define sock2con(x) ((struct connection *)(x)->sk_user_data)
131
132/* An entry waiting to be sent */
133struct writequeue_entry {
134	struct list_head list;
135	struct page *page;
136	int offset;
137	int len;
138	int end;
139	int users;
140	struct connection *con;
141};
142
143struct dlm_node_addr {
144	struct list_head list;
145	int nodeid;
146	int addr_count;
147	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
148};
149
150static LIST_HEAD(dlm_node_addrs);
151static DEFINE_SPINLOCK(dlm_node_addrs_spin);
152
153static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
154static int dlm_local_count;
155static int dlm_allow_conn;
156
157/* Work queues */
158static struct workqueue_struct *recv_workqueue;
159static struct workqueue_struct *send_workqueue;
160
161static struct hlist_head connection_hash[CONN_HASH_SIZE];
162static DEFINE_MUTEX(connections_lock);
163static struct kmem_cache *con_cache;
164
165static void process_recv_sockets(struct work_struct *work);
166static void process_send_sockets(struct work_struct *work);
167
168
169/* This is deliberately very simple because most clusters have simple
170   sequential nodeids, so we should be able to go straight to a connection
171   struct in the array */
172static inline int nodeid_hash(int nodeid)
173{
174	return nodeid & (CONN_HASH_SIZE-1);
175}
176
177static struct connection *__find_con(int nodeid)
178{
179	int r;
180	struct hlist_node *h;
181	struct connection *con;
182
183	r = nodeid_hash(nodeid);
184
185	hlist_for_each_entry(con, h, &connection_hash[r], list) {
186		if (con->nodeid == nodeid)
187			return con;
188	}
189	return NULL;
190}
191
192/*
193 * If 'allocation' is zero then we don't attempt to create a new
194 * connection structure for this node.
195 */
196static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
197{
198	struct connection *con = NULL;
199	int r;
200
201	con = __find_con(nodeid);
202	if (con || !alloc)
203		return con;
204
205	con = kmem_cache_zalloc(con_cache, alloc);
206	if (!con)
207		return NULL;
208
209	r = nodeid_hash(nodeid);
210	hlist_add_head(&con->list, &connection_hash[r]);
211
212	con->nodeid = nodeid;
213	mutex_init(&con->sock_mutex);
214	INIT_LIST_HEAD(&con->writequeue);
215	spin_lock_init(&con->writequeue_lock);
216	INIT_WORK(&con->swork, process_send_sockets);
217	INIT_WORK(&con->rwork, process_recv_sockets);
218
219	/* Setup action pointers for child sockets */
220	if (con->nodeid) {
221		struct connection *zerocon = __find_con(0);
222
223		con->connect_action = zerocon->connect_action;
224		if (!con->rx_action)
225			con->rx_action = zerocon->rx_action;
226	}
227
228	return con;
229}
230
231/* Loop round all connections */
232static void foreach_conn(void (*conn_func)(struct connection *c))
233{
234	int i;
235	struct hlist_node *h, *n;
236	struct connection *con;
237
238	for (i = 0; i < CONN_HASH_SIZE; i++) {
239		hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){
240			conn_func(con);
241		}
242	}
243}
244
245static struct connection *nodeid2con(int nodeid, gfp_t allocation)
246{
247	struct connection *con;
248
249	mutex_lock(&connections_lock);
250	con = __nodeid2con(nodeid, allocation);
251	mutex_unlock(&connections_lock);
252
253	return con;
254}
255
256/* This is a bit drastic, but only called when things go wrong */
257static struct connection *assoc2con(int assoc_id)
258{
259	int i;
260	struct hlist_node *h;
261	struct connection *con;
262
263	mutex_lock(&connections_lock);
264
265	for (i = 0 ; i < CONN_HASH_SIZE; i++) {
266		hlist_for_each_entry(con, h, &connection_hash[i], list) {
267			if (con->sctp_assoc == assoc_id) {
268				mutex_unlock(&connections_lock);
269				return con;
270			}
271		}
272	}
273	mutex_unlock(&connections_lock);
274	return NULL;
275}
276
277static struct dlm_node_addr *find_node_addr(int nodeid)
278{
279	struct dlm_node_addr *na;
280
281	list_for_each_entry(na, &dlm_node_addrs, list) {
282		if (na->nodeid == nodeid)
283			return na;
284	}
285	return NULL;
286}
287
288static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
289{
290	switch (x->ss_family) {
291	case AF_INET: {
292		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
293		struct sockaddr_in *siny = (struct sockaddr_in *)y;
294		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
295			return 0;
296		if (sinx->sin_port != siny->sin_port)
297			return 0;
298		break;
299	}
300	case AF_INET6: {
301		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
302		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
303		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
304			return 0;
305		if (sinx->sin6_port != siny->sin6_port)
306			return 0;
307		break;
308	}
309	default:
310		return 0;
311	}
312	return 1;
313}
314
315static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
316			  struct sockaddr *sa_out)
317{
318	struct sockaddr_storage sas;
319	struct dlm_node_addr *na;
320
321	if (!dlm_local_count)
322		return -1;
323
324	spin_lock(&dlm_node_addrs_spin);
325	na = find_node_addr(nodeid);
326	if (na && na->addr_count)
327		memcpy(&sas, na->addr[0], sizeof(struct sockaddr_storage));
328	spin_unlock(&dlm_node_addrs_spin);
329
330	if (!na)
331		return -EEXIST;
332
333	if (!na->addr_count)
334		return -ENOENT;
335
336	if (sas_out)
337		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
338
339	if (!sa_out)
340		return 0;
341
342	if (dlm_local_addr[0]->ss_family == AF_INET) {
343		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
344		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
345		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
346	} else {
347		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
348		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
349		ret6->sin6_addr = in6->sin6_addr;
350	}
351
352	return 0;
353}
354
355static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
356{
357	struct dlm_node_addr *na;
358	int rv = -EEXIST;
359
360	spin_lock(&dlm_node_addrs_spin);
361	list_for_each_entry(na, &dlm_node_addrs, list) {
362		if (!na->addr_count)
363			continue;
364
365		if (!addr_compare(na->addr[0], addr))
366			continue;
367
368		*nodeid = na->nodeid;
369		rv = 0;
370		break;
371	}
372	spin_unlock(&dlm_node_addrs_spin);
373	return rv;
374}
375
376int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
377{
378	struct sockaddr_storage *new_addr;
379	struct dlm_node_addr *new_node, *na;
380
381	new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
382	if (!new_node)
383		return -ENOMEM;
384
385	new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
386	if (!new_addr) {
387		kfree(new_node);
388		return -ENOMEM;
389	}
390
391	memcpy(new_addr, addr, len);
392
393	spin_lock(&dlm_node_addrs_spin);
394	na = find_node_addr(nodeid);
395	if (!na) {
396		new_node->nodeid = nodeid;
397		new_node->addr[0] = new_addr;
398		new_node->addr_count = 1;
399		list_add(&new_node->list, &dlm_node_addrs);
400		spin_unlock(&dlm_node_addrs_spin);
401		return 0;
402	}
403
404	if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
405		spin_unlock(&dlm_node_addrs_spin);
406		kfree(new_addr);
407		kfree(new_node);
408		return -ENOSPC;
409	}
410
411	na->addr[na->addr_count++] = new_addr;
412	spin_unlock(&dlm_node_addrs_spin);
413	kfree(new_node);
414	return 0;
415}
416
417/* Data available on socket or listen socket received a connect */
418static void lowcomms_data_ready(struct sock *sk, int count_unused)
419{
420	struct connection *con = sock2con(sk);
421	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
422		queue_work(recv_workqueue, &con->rwork);
423}
424
425static void lowcomms_write_space(struct sock *sk)
426{
427	struct connection *con = sock2con(sk);
428
429	if (!con)
430		return;
431
432	clear_bit(SOCK_NOSPACE, &con->sock->flags);
433
434	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
435		con->sock->sk->sk_write_pending--;
436		clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
437	}
438
439	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
440		queue_work(send_workqueue, &con->swork);
441}
442
443static inline void lowcomms_connect_sock(struct connection *con)
444{
445	if (test_bit(CF_CLOSE, &con->flags))
446		return;
447	if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
448		queue_work(send_workqueue, &con->swork);
449}
450
451static void lowcomms_state_change(struct sock *sk)
452{
453	if (sk->sk_state == TCP_ESTABLISHED)
454		lowcomms_write_space(sk);
455}
456
457int dlm_lowcomms_connect_node(int nodeid)
458{
459	struct connection *con;
460
461	/* with sctp there's no connecting without sending */
462	if (dlm_config.ci_protocol != 0)
463		return 0;
464
465	if (nodeid == dlm_our_nodeid())
466		return 0;
467
468	con = nodeid2con(nodeid, GFP_NOFS);
469	if (!con)
470		return -ENOMEM;
471	lowcomms_connect_sock(con);
472	return 0;
473}
474
475/* Make a socket active */
476static int add_sock(struct socket *sock, struct connection *con)
477{
478	con->sock = sock;
479
480	/* Install a data_ready callback */
481	con->sock->sk->sk_data_ready = lowcomms_data_ready;
482	con->sock->sk->sk_write_space = lowcomms_write_space;
483	con->sock->sk->sk_state_change = lowcomms_state_change;
484	con->sock->sk->sk_user_data = con;
485	con->sock->sk->sk_allocation = GFP_NOFS;
486	return 0;
487}
488
489/* Add the port number to an IPv6 or 4 sockaddr and return the address
490   length */
491static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
492			  int *addr_len)
493{
494	saddr->ss_family =  dlm_local_addr[0]->ss_family;
495	if (saddr->ss_family == AF_INET) {
496		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
497		in4_addr->sin_port = cpu_to_be16(port);
498		*addr_len = sizeof(struct sockaddr_in);
499		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
500	} else {
501		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
502		in6_addr->sin6_port = cpu_to_be16(port);
503		*addr_len = sizeof(struct sockaddr_in6);
504	}
505	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
506}
507
508/* Close a remote connection and tidy up */
509static void close_connection(struct connection *con, bool and_other)
510{
511	mutex_lock(&con->sock_mutex);
512
513	if (con->sock) {
514		sock_release(con->sock);
515		con->sock = NULL;
516	}
517	if (con->othercon && and_other) {
518		/* Will only re-enter once. */
519		close_connection(con->othercon, false);
520	}
521	if (con->rx_page) {
522		__free_page(con->rx_page);
523		con->rx_page = NULL;
524	}
525
526	con->retries = 0;
527	mutex_unlock(&con->sock_mutex);
528}
529
530/* We only send shutdown messages to nodes that are not part of the cluster */
531static void sctp_send_shutdown(sctp_assoc_t associd)
532{
533	static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
534	struct msghdr outmessage;
535	struct cmsghdr *cmsg;
536	struct sctp_sndrcvinfo *sinfo;
537	int ret;
538	struct connection *con;
539
540	con = nodeid2con(0,0);
541	BUG_ON(con == NULL);
542
543	outmessage.msg_name = NULL;
544	outmessage.msg_namelen = 0;
545	outmessage.msg_control = outcmsg;
546	outmessage.msg_controllen = sizeof(outcmsg);
547	outmessage.msg_flags = MSG_EOR;
548
549	cmsg = CMSG_FIRSTHDR(&outmessage);
550	cmsg->cmsg_level = IPPROTO_SCTP;
551	cmsg->cmsg_type = SCTP_SNDRCV;
552	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
553	outmessage.msg_controllen = cmsg->cmsg_len;
554	sinfo = CMSG_DATA(cmsg);
555	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
556
557	sinfo->sinfo_flags |= MSG_EOF;
558	sinfo->sinfo_assoc_id = associd;
559
560	ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
561
562	if (ret != 0)
563		log_print("send EOF to node failed: %d", ret);
564}
565
566static void sctp_init_failed_foreach(struct connection *con)
567{
568	con->sctp_assoc = 0;
569	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
570		if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
571			queue_work(send_workqueue, &con->swork);
572	}
573}
574
575/* INIT failed but we don't know which node...
576   restart INIT on all pending nodes */
577static void sctp_init_failed(void)
578{
579	mutex_lock(&connections_lock);
580
581	foreach_conn(sctp_init_failed_foreach);
582
583	mutex_unlock(&connections_lock);
584}
585
586/* Something happened to an association */
587static void process_sctp_notification(struct connection *con,
588				      struct msghdr *msg, char *buf)
589{
590	union sctp_notification *sn = (union sctp_notification *)buf;
591
592	if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
593		switch (sn->sn_assoc_change.sac_state) {
594
595		case SCTP_COMM_UP:
596		case SCTP_RESTART:
597		{
598			/* Check that the new node is in the lockspace */
599			struct sctp_prim prim;
600			int nodeid;
601			int prim_len, ret;
602			int addr_len;
603			struct connection *new_con;
604
605			/*
606			 * We get this before any data for an association.
607			 * We verify that the node is in the cluster and
608			 * then peel off a socket for it.
609			 */
610			if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
611				log_print("COMM_UP for invalid assoc ID %d",
612					 (int)sn->sn_assoc_change.sac_assoc_id);
613				sctp_init_failed();
614				return;
615			}
616			memset(&prim, 0, sizeof(struct sctp_prim));
617			prim_len = sizeof(struct sctp_prim);
618			prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
619
620			ret = kernel_getsockopt(con->sock,
621						IPPROTO_SCTP,
622						SCTP_PRIMARY_ADDR,
623						(char*)&prim,
624						&prim_len);
625			if (ret < 0) {
626				log_print("getsockopt/sctp_primary_addr on "
627					  "new assoc %d failed : %d",
628					  (int)sn->sn_assoc_change.sac_assoc_id,
629					  ret);
630
631				/* Retry INIT later */
632				new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
633				if (new_con)
634					clear_bit(CF_CONNECT_PENDING, &con->flags);
635				return;
636			}
637			make_sockaddr(&prim.ssp_addr, 0, &addr_len);
638			if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
639				unsigned char *b=(unsigned char *)&prim.ssp_addr;
640				log_print("reject connect from unknown addr");
641				print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
642						     b, sizeof(struct sockaddr_storage));
643				sctp_send_shutdown(prim.ssp_assoc_id);
644				return;
645			}
646
647			new_con = nodeid2con(nodeid, GFP_NOFS);
648			if (!new_con)
649				return;
650
651			/* Peel off a new sock */
652			sctp_lock_sock(con->sock->sk);
653			ret = sctp_do_peeloff(con->sock->sk,
654				sn->sn_assoc_change.sac_assoc_id,
655				&new_con->sock);
656			sctp_release_sock(con->sock->sk);
657			if (ret < 0) {
658				log_print("Can't peel off a socket for "
659					  "connection %d to node %d: err=%d",
660					  (int)sn->sn_assoc_change.sac_assoc_id,
661					  nodeid, ret);
662				return;
663			}
664			add_sock(new_con->sock, new_con);
665
666			log_print("connecting to %d sctp association %d",
667				 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
668
669			/* Send any pending writes */
670			clear_bit(CF_CONNECT_PENDING, &new_con->flags);
671			clear_bit(CF_INIT_PENDING, &con->flags);
672			if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
673				queue_work(send_workqueue, &new_con->swork);
674			}
675			if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
676				queue_work(recv_workqueue, &new_con->rwork);
677		}
678		break;
679
680		case SCTP_COMM_LOST:
681		case SCTP_SHUTDOWN_COMP:
682		{
683			con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
684			if (con) {
685				con->sctp_assoc = 0;
686			}
687		}
688		break;
689
690		/* We don't know which INIT failed, so clear the PENDING flags
691		 * on them all.  if assoc_id is zero then it will then try
692		 * again */
693
694		case SCTP_CANT_STR_ASSOC:
695		{
696			log_print("Can't start SCTP association - retrying");
697			sctp_init_failed();
698		}
699		break;
700
701		default:
702			log_print("unexpected SCTP assoc change id=%d state=%d",
703				  (int)sn->sn_assoc_change.sac_assoc_id,
704				  sn->sn_assoc_change.sac_state);
705		}
706	}
707}
708
709/* Data received from remote end */
710static int receive_from_sock(struct connection *con)
711{
712	int ret = 0;
713	struct msghdr msg = {};
714	struct kvec iov[2];
715	unsigned len;
716	int r;
717	int call_again_soon = 0;
718	int nvec;
719	char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
720
721	mutex_lock(&con->sock_mutex);
722
723	if (con->sock == NULL) {
724		ret = -EAGAIN;
725		goto out_close;
726	}
727
728	if (con->rx_page == NULL) {
729		/*
730		 * This doesn't need to be atomic, but I think it should
731		 * improve performance if it is.
732		 */
733		con->rx_page = alloc_page(GFP_ATOMIC);
734		if (con->rx_page == NULL)
735			goto out_resched;
736		cbuf_init(&con->cb, PAGE_CACHE_SIZE);
737	}
738
739	/* Only SCTP needs these really */
740	memset(&incmsg, 0, sizeof(incmsg));
741	msg.msg_control = incmsg;
742	msg.msg_controllen = sizeof(incmsg);
743
744	/*
745	 * iov[0] is the bit of the circular buffer between the current end
746	 * point (cb.base + cb.len) and the end of the buffer.
747	 */
748	iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
749	iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
750	iov[1].iov_len = 0;
751	nvec = 1;
752
753	/*
754	 * iov[1] is the bit of the circular buffer between the start of the
755	 * buffer and the start of the currently used section (cb.base)
756	 */
757	if (cbuf_data(&con->cb) >= con->cb.base) {
758		iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
759		iov[1].iov_len = con->cb.base;
760		iov[1].iov_base = page_address(con->rx_page);
761		nvec = 2;
762	}
763	len = iov[0].iov_len + iov[1].iov_len;
764
765	r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
766			       MSG_DONTWAIT | MSG_NOSIGNAL);
767	if (ret <= 0)
768		goto out_close;
769
770	/* Process SCTP notifications */
771	if (msg.msg_flags & MSG_NOTIFICATION) {
772		msg.msg_control = incmsg;
773		msg.msg_controllen = sizeof(incmsg);
774
775		process_sctp_notification(con, &msg,
776				page_address(con->rx_page) + con->cb.base);
777		mutex_unlock(&con->sock_mutex);
778		return 0;
779	}
780	BUG_ON(con->nodeid == 0);
781
782	if (ret == len)
783		call_again_soon = 1;
784	cbuf_add(&con->cb, ret);
785	ret = dlm_process_incoming_buffer(con->nodeid,
786					  page_address(con->rx_page),
787					  con->cb.base, con->cb.len,
788					  PAGE_CACHE_SIZE);
789	if (ret == -EBADMSG) {
790		log_print("lowcomms: addr=%p, base=%u, len=%u, "
791			  "iov_len=%u, iov_base[0]=%p, read=%d",
792			  page_address(con->rx_page), con->cb.base, con->cb.len,
793			  len, iov[0].iov_base, r);
794	}
795	if (ret < 0)
796		goto out_close;
797	cbuf_eat(&con->cb, ret);
798
799	if (cbuf_empty(&con->cb) && !call_again_soon) {
800		__free_page(con->rx_page);
801		con->rx_page = NULL;
802	}
803
804	if (call_again_soon)
805		goto out_resched;
806	mutex_unlock(&con->sock_mutex);
807	return 0;
808
809out_resched:
810	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
811		queue_work(recv_workqueue, &con->rwork);
812	mutex_unlock(&con->sock_mutex);
813	return -EAGAIN;
814
815out_close:
816	mutex_unlock(&con->sock_mutex);
817	if (ret != -EAGAIN) {
818		close_connection(con, false);
819		/* Reconnect when there is something to send */
820	}
821	/* Don't return success if we really got EOF */
822	if (ret == 0)
823		ret = -EAGAIN;
824
825	return ret;
826}
827
828/* Listening socket is busy, accept a connection */
829static int tcp_accept_from_sock(struct connection *con)
830{
831	int result;
832	struct sockaddr_storage peeraddr;
833	struct socket *newsock;
834	int len;
835	int nodeid;
836	struct connection *newcon;
837	struct connection *addcon;
838
839	mutex_lock(&connections_lock);
840	if (!dlm_allow_conn) {
841		mutex_unlock(&connections_lock);
842		return -1;
843	}
844	mutex_unlock(&connections_lock);
845
846	memset(&peeraddr, 0, sizeof(peeraddr));
847	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
848				  IPPROTO_TCP, &newsock);
849	if (result < 0)
850		return -ENOMEM;
851
852	mutex_lock_nested(&con->sock_mutex, 0);
853
854	result = -ENOTCONN;
855	if (con->sock == NULL)
856		goto accept_err;
857
858	newsock->type = con->sock->type;
859	newsock->ops = con->sock->ops;
860
861	result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
862	if (result < 0)
863		goto accept_err;
864
865	/* Get the connected socket's peer */
866	memset(&peeraddr, 0, sizeof(peeraddr));
867	if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
868				  &len, 2)) {
869		result = -ECONNABORTED;
870		goto accept_err;
871	}
872
873	/* Get the new node's NODEID */
874	make_sockaddr(&peeraddr, 0, &len);
875	if (addr_to_nodeid(&peeraddr, &nodeid)) {
876		unsigned char *b=(unsigned char *)&peeraddr;
877		log_print("connect from non cluster node");
878		print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
879				     b, sizeof(struct sockaddr_storage));
880		sock_release(newsock);
881		mutex_unlock(&con->sock_mutex);
882		return -1;
883	}
884
885	log_print("got connection from %d", nodeid);
886
887	/*  Check to see if we already have a connection to this node. This
888	 *  could happen if the two nodes initiate a connection at roughly
889	 *  the same time and the connections cross on the wire.
890	 *  In this case we store the incoming one in "othercon"
891	 */
892	newcon = nodeid2con(nodeid, GFP_NOFS);
893	if (!newcon) {
894		result = -ENOMEM;
895		goto accept_err;
896	}
897	mutex_lock_nested(&newcon->sock_mutex, 1);
898	if (newcon->sock) {
899		struct connection *othercon = newcon->othercon;
900
901		if (!othercon) {
902			othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
903			if (!othercon) {
904				log_print("failed to allocate incoming socket");
905				mutex_unlock(&newcon->sock_mutex);
906				result = -ENOMEM;
907				goto accept_err;
908			}
909			othercon->nodeid = nodeid;
910			othercon->rx_action = receive_from_sock;
911			mutex_init(&othercon->sock_mutex);
912			INIT_WORK(&othercon->swork, process_send_sockets);
913			INIT_WORK(&othercon->rwork, process_recv_sockets);
914			set_bit(CF_IS_OTHERCON, &othercon->flags);
915		}
916		if (!othercon->sock) {
917			newcon->othercon = othercon;
918			othercon->sock = newsock;
919			newsock->sk->sk_user_data = othercon;
920			add_sock(newsock, othercon);
921			addcon = othercon;
922		}
923		else {
924			printk("Extra connection from node %d attempted\n", nodeid);
925			result = -EAGAIN;
926			mutex_unlock(&newcon->sock_mutex);
927			goto accept_err;
928		}
929	}
930	else {
931		newsock->sk->sk_user_data = newcon;
932		newcon->rx_action = receive_from_sock;
933		add_sock(newsock, newcon);
934		addcon = newcon;
935	}
936
937	mutex_unlock(&newcon->sock_mutex);
938
939	/*
940	 * Add it to the active queue in case we got data
941	 * between processing the accept adding the socket
942	 * to the read_sockets list
943	 */
944	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
945		queue_work(recv_workqueue, &addcon->rwork);
946	mutex_unlock(&con->sock_mutex);
947
948	return 0;
949
950accept_err:
951	mutex_unlock(&con->sock_mutex);
952	sock_release(newsock);
953
954	if (result != -EAGAIN)
955		log_print("error accepting connection from node: %d", result);
956	return result;
957}
958
959static void free_entry(struct writequeue_entry *e)
960{
961	__free_page(e->page);
962	kfree(e);
963}
964
965/* Initiate an SCTP association.
966   This is a special case of send_to_sock() in that we don't yet have a
967   peeled-off socket for this association, so we use the listening socket
968   and add the primary IP address of the remote node.
969 */
970static void sctp_init_assoc(struct connection *con)
971{
972	struct sockaddr_storage rem_addr;
973	char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
974	struct msghdr outmessage;
975	struct cmsghdr *cmsg;
976	struct sctp_sndrcvinfo *sinfo;
977	struct connection *base_con;
978	struct writequeue_entry *e;
979	int len, offset;
980	int ret;
981	int addrlen;
982	struct kvec iov[1];
983
984	if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
985		return;
986
987	if (con->retries++ > MAX_CONNECT_RETRIES)
988		return;
989
990	if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr)) {
991		log_print("no address for nodeid %d", con->nodeid);
992		return;
993	}
994	base_con = nodeid2con(0, 0);
995	BUG_ON(base_con == NULL);
996
997	make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
998
999	outmessage.msg_name = &rem_addr;
1000	outmessage.msg_namelen = addrlen;
1001	outmessage.msg_control = outcmsg;
1002	outmessage.msg_controllen = sizeof(outcmsg);
1003	outmessage.msg_flags = MSG_EOR;
1004
1005	spin_lock(&con->writequeue_lock);
1006
1007	if (list_empty(&con->writequeue)) {
1008		spin_unlock(&con->writequeue_lock);
1009		log_print("writequeue empty for nodeid %d", con->nodeid);
1010		return;
1011	}
1012
1013	e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
1014	len = e->len;
1015	offset = e->offset;
1016	spin_unlock(&con->writequeue_lock);
1017
1018	/* Send the first block off the write queue */
1019	iov[0].iov_base = page_address(e->page)+offset;
1020	iov[0].iov_len = len;
1021
1022	cmsg = CMSG_FIRSTHDR(&outmessage);
1023	cmsg->cmsg_level = IPPROTO_SCTP;
1024	cmsg->cmsg_type = SCTP_SNDRCV;
1025	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1026	sinfo = CMSG_DATA(cmsg);
1027	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1028	sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
1029	outmessage.msg_controllen = cmsg->cmsg_len;
1030
1031	ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
1032	if (ret < 0) {
1033		log_print("Send first packet to node %d failed: %d",
1034			  con->nodeid, ret);
1035
1036		/* Try again later */
1037		clear_bit(CF_CONNECT_PENDING, &con->flags);
1038		clear_bit(CF_INIT_PENDING, &con->flags);
1039	}
1040	else {
1041		spin_lock(&con->writequeue_lock);
1042		e->offset += ret;
1043		e->len -= ret;
1044
1045		if (e->len == 0 && e->users == 0) {
1046			list_del(&e->list);
1047			free_entry(e);
1048		}
1049		spin_unlock(&con->writequeue_lock);
1050	}
1051}
1052
1053/* Connect a new socket to its peer */
1054static void tcp_connect_to_sock(struct connection *con)
1055{
1056	struct sockaddr_storage saddr, src_addr;
1057	int addr_len;
1058	struct socket *sock = NULL;
1059	int one = 1;
1060	int result;
1061
1062	if (con->nodeid == 0) {
1063		log_print("attempt to connect sock 0 foiled");
1064		return;
1065	}
1066
1067	mutex_lock(&con->sock_mutex);
1068	if (con->retries++ > MAX_CONNECT_RETRIES)
1069		goto out;
1070
1071	/* Some odd races can cause double-connects, ignore them */
1072	if (con->sock)
1073		goto out;
1074
1075	/* Create a socket to communicate with */
1076	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1077				  IPPROTO_TCP, &sock);
1078	if (result < 0)
1079		goto out_err;
1080
1081	memset(&saddr, 0, sizeof(saddr));
1082	result = nodeid_to_addr(con->nodeid, &saddr, NULL);
1083	if (result < 0) {
1084		log_print("no address for nodeid %d", con->nodeid);
1085		goto out_err;
1086	}
1087
1088	sock->sk->sk_user_data = con;
1089	con->rx_action = receive_from_sock;
1090	con->connect_action = tcp_connect_to_sock;
1091	add_sock(sock, con);
1092
1093	/* Bind to our cluster-known address connecting to avoid
1094	   routing problems */
1095	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1096	make_sockaddr(&src_addr, 0, &addr_len);
1097	result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1098				 addr_len);
1099	if (result < 0) {
1100		log_print("could not bind for connect: %d", result);
1101		/* This *may* not indicate a critical error */
1102	}
1103
1104	make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1105
1106	log_print("connecting to %d", con->nodeid);
1107
1108	/* Turn off Nagle's algorithm */
1109	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1110			  sizeof(one));
1111
1112	result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1113				   O_NONBLOCK);
1114	if (result == -EINPROGRESS)
1115		result = 0;
1116	if (result == 0)
1117		goto out;
1118
1119out_err:
1120	if (con->sock) {
1121		sock_release(con->sock);
1122		con->sock = NULL;
1123	} else if (sock) {
1124		sock_release(sock);
1125	}
1126	/*
1127	 * Some errors are fatal and this list might need adjusting. For other
1128	 * errors we try again until the max number of retries is reached.
1129	 */
1130	if (result != -EHOSTUNREACH &&
1131	    result != -ENETUNREACH &&
1132	    result != -ENETDOWN &&
1133	    result != -EINVAL &&
1134	    result != -EPROTONOSUPPORT) {
1135		log_print("connect %d try %d error %d", con->nodeid,
1136			  con->retries, result);
1137		mutex_unlock(&con->sock_mutex);
1138		msleep(1000);
1139		lowcomms_connect_sock(con);
1140		return;
1141	}
1142out:
1143	mutex_unlock(&con->sock_mutex);
1144	return;
1145}
1146
1147static struct socket *tcp_create_listen_sock(struct connection *con,
1148					     struct sockaddr_storage *saddr)
1149{
1150	struct socket *sock = NULL;
1151	int result = 0;
1152	int one = 1;
1153	int addr_len;
1154
1155	if (dlm_local_addr[0]->ss_family == AF_INET)
1156		addr_len = sizeof(struct sockaddr_in);
1157	else
1158		addr_len = sizeof(struct sockaddr_in6);
1159
1160	/* Create a socket to communicate with */
1161	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1162				  IPPROTO_TCP, &sock);
1163	if (result < 0) {
1164		log_print("Can't create listening comms socket");
1165		goto create_out;
1166	}
1167
1168	/* Turn off Nagle's algorithm */
1169	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1170			  sizeof(one));
1171
1172	result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1173				   (char *)&one, sizeof(one));
1174
1175	if (result < 0) {
1176		log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1177	}
1178	sock->sk->sk_user_data = con;
1179	con->rx_action = tcp_accept_from_sock;
1180	con->connect_action = tcp_connect_to_sock;
1181	con->sock = sock;
1182
1183	/* Bind to our port */
1184	make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1185	result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1186	if (result < 0) {
1187		log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1188		sock_release(sock);
1189		sock = NULL;
1190		con->sock = NULL;
1191		goto create_out;
1192	}
1193	result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1194				 (char *)&one, sizeof(one));
1195	if (result < 0) {
1196		log_print("Set keepalive failed: %d", result);
1197	}
1198
1199	result = sock->ops->listen(sock, 5);
1200	if (result < 0) {
1201		log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1202		sock_release(sock);
1203		sock = NULL;
1204		goto create_out;
1205	}
1206
1207create_out:
1208	return sock;
1209}
1210
1211/* Get local addresses */
1212static void init_local(void)
1213{
1214	struct sockaddr_storage sas, *addr;
1215	int i;
1216
1217	dlm_local_count = 0;
1218	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1219		if (dlm_our_addr(&sas, i))
1220			break;
1221
1222		addr = kmalloc(sizeof(*addr), GFP_NOFS);
1223		if (!addr)
1224			break;
1225		memcpy(addr, &sas, sizeof(*addr));
1226		dlm_local_addr[dlm_local_count++] = addr;
1227	}
1228}
1229
1230/* Bind to an IP address. SCTP allows multiple address so it can do
1231   multi-homing */
1232static int add_sctp_bind_addr(struct connection *sctp_con,
1233			      struct sockaddr_storage *addr,
1234			      int addr_len, int num)
1235{
1236	int result = 0;
1237
1238	if (num == 1)
1239		result = kernel_bind(sctp_con->sock,
1240				     (struct sockaddr *) addr,
1241				     addr_len);
1242	else
1243		result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1244					   SCTP_SOCKOPT_BINDX_ADD,
1245					   (char *)addr, addr_len);
1246
1247	if (result < 0)
1248		log_print("Can't bind to port %d addr number %d",
1249			  dlm_config.ci_tcp_port, num);
1250
1251	return result;
1252}
1253
1254/* Initialise SCTP socket and bind to all interfaces */
1255static int sctp_listen_for_all(void)
1256{
1257	struct socket *sock = NULL;
1258	struct sockaddr_storage localaddr;
1259	struct sctp_event_subscribe subscribe;
1260	int result = -EINVAL, num = 1, i, addr_len;
1261	struct connection *con = nodeid2con(0, GFP_NOFS);
1262	int bufsize = NEEDED_RMEM;
1263
1264	if (!con)
1265		return -ENOMEM;
1266
1267	log_print("Using SCTP for communications");
1268
1269	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1270				  IPPROTO_SCTP, &sock);
1271	if (result < 0) {
1272		log_print("Can't create comms socket, check SCTP is loaded");
1273		goto out;
1274	}
1275
1276	/* Listen for events */
1277	memset(&subscribe, 0, sizeof(subscribe));
1278	subscribe.sctp_data_io_event = 1;
1279	subscribe.sctp_association_event = 1;
1280	subscribe.sctp_send_failure_event = 1;
1281	subscribe.sctp_shutdown_event = 1;
1282	subscribe.sctp_partial_delivery_event = 1;
1283
1284	result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1285				 (char *)&bufsize, sizeof(bufsize));
1286	if (result)
1287		log_print("Error increasing buffer space on socket %d", result);
1288
1289	result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1290				   (char *)&subscribe, sizeof(subscribe));
1291	if (result < 0) {
1292		log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1293			  result);
1294		goto create_delsock;
1295	}
1296
1297	/* Init con struct */
1298	sock->sk->sk_user_data = con;
1299	con->sock = sock;
1300	con->sock->sk->sk_data_ready = lowcomms_data_ready;
1301	con->rx_action = receive_from_sock;
1302	con->connect_action = sctp_init_assoc;
1303
1304	/* Bind to all interfaces. */
1305	for (i = 0; i < dlm_local_count; i++) {
1306		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1307		make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1308
1309		result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1310		if (result)
1311			goto create_delsock;
1312		++num;
1313	}
1314
1315	result = sock->ops->listen(sock, 5);
1316	if (result < 0) {
1317		log_print("Can't set socket listening");
1318		goto create_delsock;
1319	}
1320
1321	return 0;
1322
1323create_delsock:
1324	sock_release(sock);
1325	con->sock = NULL;
1326out:
1327	return result;
1328}
1329
1330static int tcp_listen_for_all(void)
1331{
1332	struct socket *sock = NULL;
1333	struct connection *con = nodeid2con(0, GFP_NOFS);
1334	int result = -EINVAL;
1335
1336	if (!con)
1337		return -ENOMEM;
1338
1339	/* We don't support multi-homed hosts */
1340	if (dlm_local_addr[1] != NULL) {
1341		log_print("TCP protocol can't handle multi-homed hosts, "
1342			  "try SCTP");
1343		return -EINVAL;
1344	}
1345
1346	log_print("Using TCP for communications");
1347
1348	sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1349	if (sock) {
1350		add_sock(sock, con);
1351		result = 0;
1352	}
1353	else {
1354		result = -EADDRINUSE;
1355	}
1356
1357	return result;
1358}
1359
1360
1361
1362static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1363						     gfp_t allocation)
1364{
1365	struct writequeue_entry *entry;
1366
1367	entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1368	if (!entry)
1369		return NULL;
1370
1371	entry->page = alloc_page(allocation);
1372	if (!entry->page) {
1373		kfree(entry);
1374		return NULL;
1375	}
1376
1377	entry->offset = 0;
1378	entry->len = 0;
1379	entry->end = 0;
1380	entry->users = 0;
1381	entry->con = con;
1382
1383	return entry;
1384}
1385
1386void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1387{
1388	struct connection *con;
1389	struct writequeue_entry *e;
1390	int offset = 0;
1391	int users = 0;
1392
1393	con = nodeid2con(nodeid, allocation);
1394	if (!con)
1395		return NULL;
1396
1397	spin_lock(&con->writequeue_lock);
1398	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1399	if ((&e->list == &con->writequeue) ||
1400	    (PAGE_CACHE_SIZE - e->end < len)) {
1401		e = NULL;
1402	} else {
1403		offset = e->end;
1404		e->end += len;
1405		users = e->users++;
1406	}
1407	spin_unlock(&con->writequeue_lock);
1408
1409	if (e) {
1410	got_one:
1411		*ppc = page_address(e->page) + offset;
1412		return e;
1413	}
1414
1415	e = new_writequeue_entry(con, allocation);
1416	if (e) {
1417		spin_lock(&con->writequeue_lock);
1418		offset = e->end;
1419		e->end += len;
1420		users = e->users++;
1421		list_add_tail(&e->list, &con->writequeue);
1422		spin_unlock(&con->writequeue_lock);
1423		goto got_one;
1424	}
1425	return NULL;
1426}
1427
1428void dlm_lowcomms_commit_buffer(void *mh)
1429{
1430	struct writequeue_entry *e = (struct writequeue_entry *)mh;
1431	struct connection *con = e->con;
1432	int users;
1433
1434	spin_lock(&con->writequeue_lock);
1435	users = --e->users;
1436	if (users)
1437		goto out;
1438	e->len = e->end - e->offset;
1439	spin_unlock(&con->writequeue_lock);
1440
1441	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1442		queue_work(send_workqueue, &con->swork);
1443	}
1444	return;
1445
1446out:
1447	spin_unlock(&con->writequeue_lock);
1448	return;
1449}
1450
1451/* Send a message */
1452static void send_to_sock(struct connection *con)
1453{
1454	int ret = 0;
1455	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1456	struct writequeue_entry *e;
1457	int len, offset;
1458	int count = 0;
1459
1460	mutex_lock(&con->sock_mutex);
1461	if (con->sock == NULL)
1462		goto out_connect;
1463
1464	spin_lock(&con->writequeue_lock);
1465	for (;;) {
1466		e = list_entry(con->writequeue.next, struct writequeue_entry,
1467			       list);
1468		if ((struct list_head *) e == &con->writequeue)
1469			break;
1470
1471		len = e->len;
1472		offset = e->offset;
1473		BUG_ON(len == 0 && e->users == 0);
1474		spin_unlock(&con->writequeue_lock);
1475
1476		ret = 0;
1477		if (len) {
1478			ret = kernel_sendpage(con->sock, e->page, offset, len,
1479					      msg_flags);
1480			if (ret == -EAGAIN || ret == 0) {
1481				if (ret == -EAGAIN &&
1482				    test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1483				    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1484					/* Notify TCP that we're limited by the
1485					 * application window size.
1486					 */
1487					set_bit(SOCK_NOSPACE, &con->sock->flags);
1488					con->sock->sk->sk_write_pending++;
1489				}
1490				cond_resched();
1491				goto out;
1492			}
1493			if (ret <= 0)
1494				goto send_error;
1495		}
1496
1497		/* Don't starve people filling buffers */
1498		if (++count >= MAX_SEND_MSG_COUNT) {
1499			cond_resched();
1500			count = 0;
1501		}
1502
1503		spin_lock(&con->writequeue_lock);
1504		e->offset += ret;
1505		e->len -= ret;
1506
1507		if (e->len == 0 && e->users == 0) {
1508			list_del(&e->list);
1509			free_entry(e);
1510			continue;
1511		}
1512	}
1513	spin_unlock(&con->writequeue_lock);
1514out:
1515	mutex_unlock(&con->sock_mutex);
1516	return;
1517
1518send_error:
1519	mutex_unlock(&con->sock_mutex);
1520	close_connection(con, false);
1521	lowcomms_connect_sock(con);
1522	return;
1523
1524out_connect:
1525	mutex_unlock(&con->sock_mutex);
1526	if (!test_bit(CF_INIT_PENDING, &con->flags))
1527		lowcomms_connect_sock(con);
1528	return;
1529}
1530
1531static void clean_one_writequeue(struct connection *con)
1532{
1533	struct writequeue_entry *e, *safe;
1534
1535	spin_lock(&con->writequeue_lock);
1536	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1537		list_del(&e->list);
1538		free_entry(e);
1539	}
1540	spin_unlock(&con->writequeue_lock);
1541}
1542
1543/* Called from recovery when it knows that a node has
1544   left the cluster */
1545int dlm_lowcomms_close(int nodeid)
1546{
1547	struct connection *con;
1548	struct dlm_node_addr *na;
1549
1550	log_print("closing connection to node %d", nodeid);
1551	con = nodeid2con(nodeid, 0);
1552	if (con) {
1553		clear_bit(CF_CONNECT_PENDING, &con->flags);
1554		clear_bit(CF_WRITE_PENDING, &con->flags);
1555		set_bit(CF_CLOSE, &con->flags);
1556		if (cancel_work_sync(&con->swork))
1557			log_print("canceled swork for node %d", nodeid);
1558		if (cancel_work_sync(&con->rwork))
1559			log_print("canceled rwork for node %d", nodeid);
1560		clean_one_writequeue(con);
1561		close_connection(con, true);
1562	}
1563
1564	spin_lock(&dlm_node_addrs_spin);
1565	na = find_node_addr(nodeid);
1566	if (na) {
1567		list_del(&na->list);
1568		while (na->addr_count--)
1569			kfree(na->addr[na->addr_count]);
1570		kfree(na);
1571	}
1572	spin_unlock(&dlm_node_addrs_spin);
1573
1574	return 0;
1575}
1576
1577/* Receive workqueue function */
1578static void process_recv_sockets(struct work_struct *work)
1579{
1580	struct connection *con = container_of(work, struct connection, rwork);
1581	int err;
1582
1583	clear_bit(CF_READ_PENDING, &con->flags);
1584	do {
1585		err = con->rx_action(con);
1586	} while (!err);
1587}
1588
1589/* Send workqueue function */
1590static void process_send_sockets(struct work_struct *work)
1591{
1592	struct connection *con = container_of(work, struct connection, swork);
1593
1594	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1595		con->connect_action(con);
1596		set_bit(CF_WRITE_PENDING, &con->flags);
1597	}
1598	if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1599		send_to_sock(con);
1600}
1601
1602
1603/* Discard all entries on the write queues */
1604static void clean_writequeues(void)
1605{
1606	foreach_conn(clean_one_writequeue);
1607}
1608
1609static void work_stop(void)
1610{
1611	destroy_workqueue(recv_workqueue);
1612	destroy_workqueue(send_workqueue);
1613}
1614
1615static int work_start(void)
1616{
1617	recv_workqueue = alloc_workqueue("dlm_recv",
1618					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1619	if (!recv_workqueue) {
1620		log_print("can't start dlm_recv");
1621		return -ENOMEM;
1622	}
1623
1624	send_workqueue = alloc_workqueue("dlm_send",
1625					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1626	if (!send_workqueue) {
1627		log_print("can't start dlm_send");
1628		destroy_workqueue(recv_workqueue);
1629		return -ENOMEM;
1630	}
1631
1632	return 0;
1633}
1634
1635static void stop_conn(struct connection *con)
1636{
1637	con->flags |= 0x0F;
1638	if (con->sock && con->sock->sk)
1639		con->sock->sk->sk_user_data = NULL;
1640}
1641
1642static void free_conn(struct connection *con)
1643{
1644	close_connection(con, true);
1645	if (con->othercon)
1646		kmem_cache_free(con_cache, con->othercon);
1647	hlist_del(&con->list);
1648	kmem_cache_free(con_cache, con);
1649}
1650
1651void dlm_lowcomms_stop(void)
1652{
1653	/* Set all the flags to prevent any
1654	   socket activity.
1655	*/
1656	mutex_lock(&connections_lock);
1657	dlm_allow_conn = 0;
1658	foreach_conn(stop_conn);
1659	mutex_unlock(&connections_lock);
1660
1661	work_stop();
1662
1663	mutex_lock(&connections_lock);
1664	clean_writequeues();
1665
1666	foreach_conn(free_conn);
1667
1668	mutex_unlock(&connections_lock);
1669	kmem_cache_destroy(con_cache);
1670}
1671
1672int dlm_lowcomms_start(void)
1673{
1674	int error = -EINVAL;
1675	struct connection *con;
1676	int i;
1677
1678	for (i = 0; i < CONN_HASH_SIZE; i++)
1679		INIT_HLIST_HEAD(&connection_hash[i]);
1680
1681	init_local();
1682	if (!dlm_local_count) {
1683		error = -ENOTCONN;
1684		log_print("no local IP address has been set");
1685		goto fail;
1686	}
1687
1688	error = -ENOMEM;
1689	con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1690				      __alignof__(struct connection), 0,
1691				      NULL);
1692	if (!con_cache)
1693		goto fail;
1694
1695	error = work_start();
1696	if (error)
1697		goto fail_destroy;
1698
1699	dlm_allow_conn = 1;
1700
1701	/* Start listening */
1702	if (dlm_config.ci_protocol == 0)
1703		error = tcp_listen_for_all();
1704	else
1705		error = sctp_listen_for_all();
1706	if (error)
1707		goto fail_unlisten;
1708
1709	return 0;
1710
1711fail_unlisten:
1712	dlm_allow_conn = 0;
1713	con = nodeid2con(0,0);
1714	if (con) {
1715		close_connection(con, false);
1716		kmem_cache_free(con_cache, con);
1717	}
1718fail_destroy:
1719	kmem_cache_destroy(con_cache);
1720fail:
1721	return error;
1722}
1723
1724void dlm_lowcomms_exit(void)
1725{
1726	struct dlm_node_addr *na, *safe;
1727
1728	spin_lock(&dlm_node_addrs_spin);
1729	list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1730		list_del(&na->list);
1731		while (na->addr_count--)
1732			kfree(na->addr[na->addr_count]);
1733		kfree(na);
1734	}
1735	spin_unlock(&dlm_node_addrs_spin);
1736}
1737