1/*
2   drbd_receiver.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24
25
26#include <linux/module.h>
27
28#include <asm/uaccess.h>
29#include <net/sock.h>
30
31#include <linux/drbd.h>
32#include <linux/fs.h>
33#include <linux/file.h>
34#include <linux/in.h>
35#include <linux/mm.h>
36#include <linux/memcontrol.h>
37#include <linux/mm_inline.h>
38#include <linux/slab.h>
39#include <linux/pkt_sched.h>
40#define __KERNEL_SYSCALLS__
41#include <linux/unistd.h>
42#include <linux/vmalloc.h>
43#include <linux/random.h>
44#include <linux/string.h>
45#include <linux/scatterlist.h>
46#include "drbd_int.h"
47#include "drbd_protocol.h"
48#include "drbd_req.h"
49#include "drbd_vli.h"
50
51#define PRO_FEATURES (FF_TRIM)
52
53struct packet_info {
54	enum drbd_packet cmd;
55	unsigned int size;
56	unsigned int vnr;
57	void *data;
58};
59
60enum finish_epoch {
61	FE_STILL_LIVE,
62	FE_DESTROYED,
63	FE_RECYCLED,
64};
65
66static int drbd_do_features(struct drbd_connection *connection);
67static int drbd_do_auth(struct drbd_connection *connection);
68static int drbd_disconnected(struct drbd_peer_device *);
69static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71static int e_end_block(struct drbd_work *, int);
72
73
74#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
75
76/*
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
79 */
80
81/* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
84 */
85static struct page *page_chain_del(struct page **head, int n)
86{
87	struct page *page;
88	struct page *tmp;
89
90	BUG_ON(!n);
91	BUG_ON(!head);
92
93	page = *head;
94
95	if (!page)
96		return NULL;
97
98	while (page) {
99		tmp = page_chain_next(page);
100		if (--n == 0)
101			break; /* found sufficient pages */
102		if (tmp == NULL)
103			/* insufficient pages, don't use any of them. */
104			return NULL;
105		page = tmp;
106	}
107
108	/* add end of list marker for the returned list */
109	set_page_private(page, 0);
110	/* actual return value, and adjustment of head */
111	page = *head;
112	*head = tmp;
113	return page;
114}
115
116/* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119static struct page *page_chain_tail(struct page *page, int *len)
120{
121	struct page *tmp;
122	int i = 1;
123	while ((tmp = page_chain_next(page)))
124		++i, page = tmp;
125	if (len)
126		*len = i;
127	return page;
128}
129
130static int page_chain_free(struct page *page)
131{
132	struct page *tmp;
133	int i = 0;
134	page_chain_for_each_safe(page, tmp) {
135		put_page(page);
136		++i;
137	}
138	return i;
139}
140
141static void page_chain_add(struct page **head,
142		struct page *chain_first, struct page *chain_last)
143{
144#if 1
145	struct page *tmp;
146	tmp = page_chain_tail(chain_first, NULL);
147	BUG_ON(tmp != chain_last);
148#endif
149
150	/* add chain to head */
151	set_page_private(chain_last, (unsigned long)*head);
152	*head = chain_first;
153}
154
155static struct page *__drbd_alloc_pages(struct drbd_device *device,
156				       unsigned int number)
157{
158	struct page *page = NULL;
159	struct page *tmp = NULL;
160	unsigned int i = 0;
161
162	/* Yes, testing drbd_pp_vacant outside the lock is racy.
163	 * So what. It saves a spin_lock. */
164	if (drbd_pp_vacant >= number) {
165		spin_lock(&drbd_pp_lock);
166		page = page_chain_del(&drbd_pp_pool, number);
167		if (page)
168			drbd_pp_vacant -= number;
169		spin_unlock(&drbd_pp_lock);
170		if (page)
171			return page;
172	}
173
174	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175	 * "criss-cross" setup, that might cause write-out on some other DRBD,
176	 * which in turn might block on the other node at this very place.  */
177	for (i = 0; i < number; i++) {
178		tmp = alloc_page(GFP_TRY);
179		if (!tmp)
180			break;
181		set_page_private(tmp, (unsigned long)page);
182		page = tmp;
183	}
184
185	if (i == number)
186		return page;
187
188	/* Not enough pages immediately available this time.
189	 * No need to jump around here, drbd_alloc_pages will retry this
190	 * function "soon". */
191	if (page) {
192		tmp = page_chain_tail(page, NULL);
193		spin_lock(&drbd_pp_lock);
194		page_chain_add(&drbd_pp_pool, page, tmp);
195		drbd_pp_vacant += i;
196		spin_unlock(&drbd_pp_lock);
197	}
198	return NULL;
199}
200
201static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202					   struct list_head *to_be_freed)
203{
204	struct drbd_peer_request *peer_req, *tmp;
205
206	/* The EEs are always appended to the end of the list. Since
207	   they are sent in order over the wire, they have to finish
208	   in order. As soon as we see the first not finished we can
209	   stop to examine the list... */
210
211	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212		if (drbd_peer_req_has_active_page(peer_req))
213			break;
214		list_move(&peer_req->w.list, to_be_freed);
215	}
216}
217
218static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219{
220	LIST_HEAD(reclaimed);
221	struct drbd_peer_request *peer_req, *t;
222
223	spin_lock_irq(&device->resource->req_lock);
224	reclaim_finished_net_peer_reqs(device, &reclaimed);
225	spin_unlock_irq(&device->resource->req_lock);
226
227	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228		drbd_free_net_peer_req(device, peer_req);
229}
230
231/**
232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233 * @device:	DRBD device.
234 * @number:	number of pages requested
235 * @retry:	whether to retry, if not enough pages are available right now
236 *
237 * Tries to allocate number pages, first from our own page pool, then from
238 * the kernel.
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
240 *
241 * If this allocation would exceed the max_buffers setting, we throttle
242 * allocation (schedule_timeout) to give the system some room to breathe.
243 *
244 * We do not use max-buffers as hard limit, because it could lead to
245 * congestion and further to a distributed deadlock during online-verify or
246 * (checksum based) resync, if the max-buffers, socket buffer sizes and
247 * resync-rate settings are mis-configured.
248 *
249 * Returns a page chain linked via page->private.
250 */
251struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252			      bool retry)
253{
254	struct drbd_device *device = peer_device->device;
255	struct page *page = NULL;
256	struct net_conf *nc;
257	DEFINE_WAIT(wait);
258	unsigned int mxb;
259
260	rcu_read_lock();
261	nc = rcu_dereference(peer_device->connection->net_conf);
262	mxb = nc ? nc->max_buffers : 1000000;
263	rcu_read_unlock();
264
265	if (atomic_read(&device->pp_in_use) < mxb)
266		page = __drbd_alloc_pages(device, number);
267
268	while (page == NULL) {
269		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270
271		drbd_kick_lo_and_reclaim_net(device);
272
273		if (atomic_read(&device->pp_in_use) < mxb) {
274			page = __drbd_alloc_pages(device, number);
275			if (page)
276				break;
277		}
278
279		if (!retry)
280			break;
281
282		if (signal_pending(current)) {
283			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284			break;
285		}
286
287		if (schedule_timeout(HZ/10) == 0)
288			mxb = UINT_MAX;
289	}
290	finish_wait(&drbd_pp_wait, &wait);
291
292	if (page)
293		atomic_add(number, &device->pp_in_use);
294	return page;
295}
296
297/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299 * Either links the page chain back to the global pool,
300 * or returns all pages to the system. */
301static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302{
303	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304	int i;
305
306	if (page == NULL)
307		return;
308
309	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310		i = page_chain_free(page);
311	else {
312		struct page *tmp;
313		tmp = page_chain_tail(page, &i);
314		spin_lock(&drbd_pp_lock);
315		page_chain_add(&drbd_pp_pool, page, tmp);
316		drbd_pp_vacant += i;
317		spin_unlock(&drbd_pp_lock);
318	}
319	i = atomic_sub_return(i, a);
320	if (i < 0)
321		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323	wake_up(&drbd_pp_wait);
324}
325
326/*
327You need to hold the req_lock:
328 _drbd_wait_ee_list_empty()
329
330You must not have the req_lock:
331 drbd_free_peer_req()
332 drbd_alloc_peer_req()
333 drbd_free_peer_reqs()
334 drbd_ee_fix_bhs()
335 drbd_finish_peer_reqs()
336 drbd_clear_done_ee()
337 drbd_wait_ee_list_empty()
338*/
339
340struct drbd_peer_request *
341drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342		    unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343{
344	struct drbd_device *device = peer_device->device;
345	struct drbd_peer_request *peer_req;
346	struct page *page = NULL;
347	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348
349	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350		return NULL;
351
352	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353	if (!peer_req) {
354		if (!(gfp_mask & __GFP_NOWARN))
355			drbd_err(device, "%s: allocation failed\n", __func__);
356		return NULL;
357	}
358
359	if (has_payload && data_size) {
360		page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
361		if (!page)
362			goto fail;
363	}
364
365	memset(peer_req, 0, sizeof(*peer_req));
366	INIT_LIST_HEAD(&peer_req->w.list);
367	drbd_clear_interval(&peer_req->i);
368	peer_req->i.size = data_size;
369	peer_req->i.sector = sector;
370	peer_req->submit_jif = jiffies;
371	peer_req->peer_device = peer_device;
372	peer_req->pages = page;
373	/*
374	 * The block_id is opaque to the receiver.  It is not endianness
375	 * converted, and sent back to the sender unchanged.
376	 */
377	peer_req->block_id = id;
378
379	return peer_req;
380
381 fail:
382	mempool_free(peer_req, drbd_ee_mempool);
383	return NULL;
384}
385
386void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
387		       int is_net)
388{
389	might_sleep();
390	if (peer_req->flags & EE_HAS_DIGEST)
391		kfree(peer_req->digest);
392	drbd_free_pages(device, peer_req->pages, is_net);
393	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
394	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
395	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
396		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
397		drbd_al_complete_io(device, &peer_req->i);
398	}
399	mempool_free(peer_req, drbd_ee_mempool);
400}
401
402int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
403{
404	LIST_HEAD(work_list);
405	struct drbd_peer_request *peer_req, *t;
406	int count = 0;
407	int is_net = list == &device->net_ee;
408
409	spin_lock_irq(&device->resource->req_lock);
410	list_splice_init(list, &work_list);
411	spin_unlock_irq(&device->resource->req_lock);
412
413	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
414		__drbd_free_peer_req(device, peer_req, is_net);
415		count++;
416	}
417	return count;
418}
419
420/*
421 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
422 */
423static int drbd_finish_peer_reqs(struct drbd_device *device)
424{
425	LIST_HEAD(work_list);
426	LIST_HEAD(reclaimed);
427	struct drbd_peer_request *peer_req, *t;
428	int err = 0;
429
430	spin_lock_irq(&device->resource->req_lock);
431	reclaim_finished_net_peer_reqs(device, &reclaimed);
432	list_splice_init(&device->done_ee, &work_list);
433	spin_unlock_irq(&device->resource->req_lock);
434
435	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
436		drbd_free_net_peer_req(device, peer_req);
437
438	/* possible callbacks here:
439	 * e_end_block, and e_end_resync_block, e_send_superseded.
440	 * all ignore the last argument.
441	 */
442	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
443		int err2;
444
445		/* list_del not necessary, next/prev members not touched */
446		err2 = peer_req->w.cb(&peer_req->w, !!err);
447		if (!err)
448			err = err2;
449		drbd_free_peer_req(device, peer_req);
450	}
451	wake_up(&device->ee_wait);
452
453	return err;
454}
455
456static void _drbd_wait_ee_list_empty(struct drbd_device *device,
457				     struct list_head *head)
458{
459	DEFINE_WAIT(wait);
460
461	/* avoids spin_lock/unlock
462	 * and calling prepare_to_wait in the fast path */
463	while (!list_empty(head)) {
464		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
465		spin_unlock_irq(&device->resource->req_lock);
466		io_schedule();
467		finish_wait(&device->ee_wait, &wait);
468		spin_lock_irq(&device->resource->req_lock);
469	}
470}
471
472static void drbd_wait_ee_list_empty(struct drbd_device *device,
473				    struct list_head *head)
474{
475	spin_lock_irq(&device->resource->req_lock);
476	_drbd_wait_ee_list_empty(device, head);
477	spin_unlock_irq(&device->resource->req_lock);
478}
479
480static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
481{
482	struct kvec iov = {
483		.iov_base = buf,
484		.iov_len = size,
485	};
486	struct msghdr msg = {
487		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
488	};
489	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
490}
491
492static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
493{
494	int rv;
495
496	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
497
498	if (rv < 0) {
499		if (rv == -ECONNRESET)
500			drbd_info(connection, "sock was reset by peer\n");
501		else if (rv != -ERESTARTSYS)
502			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
503	} else if (rv == 0) {
504		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505			long t;
506			rcu_read_lock();
507			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508			rcu_read_unlock();
509
510			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
511
512			if (t)
513				goto out;
514		}
515		drbd_info(connection, "sock was shut down by peer\n");
516	}
517
518	if (rv != size)
519		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
520
521out:
522	return rv;
523}
524
525static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
526{
527	int err;
528
529	err = drbd_recv(connection, buf, size);
530	if (err != size) {
531		if (err >= 0)
532			err = -EIO;
533	} else
534		err = 0;
535	return err;
536}
537
538static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
539{
540	int err;
541
542	err = drbd_recv_all(connection, buf, size);
543	if (err && !signal_pending(current))
544		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
545	return err;
546}
547
548/* quoting tcp(7):
549 *   On individual connections, the socket buffer size must be set prior to the
550 *   listen(2) or connect(2) calls in order to have it take effect.
551 * This is our wrapper to do so.
552 */
553static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554		unsigned int rcv)
555{
556	/* open coded SO_SNDBUF, SO_RCVBUF */
557	if (snd) {
558		sock->sk->sk_sndbuf = snd;
559		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560	}
561	if (rcv) {
562		sock->sk->sk_rcvbuf = rcv;
563		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
564	}
565}
566
567static struct socket *drbd_try_connect(struct drbd_connection *connection)
568{
569	const char *what;
570	struct socket *sock;
571	struct sockaddr_in6 src_in6;
572	struct sockaddr_in6 peer_in6;
573	struct net_conf *nc;
574	int err, peer_addr_len, my_addr_len;
575	int sndbuf_size, rcvbuf_size, connect_int;
576	int disconnect_on_error = 1;
577
578	rcu_read_lock();
579	nc = rcu_dereference(connection->net_conf);
580	if (!nc) {
581		rcu_read_unlock();
582		return NULL;
583	}
584	sndbuf_size = nc->sndbuf_size;
585	rcvbuf_size = nc->rcvbuf_size;
586	connect_int = nc->connect_int;
587	rcu_read_unlock();
588
589	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
590	memcpy(&src_in6, &connection->my_addr, my_addr_len);
591
592	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
593		src_in6.sin6_port = 0;
594	else
595		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
596
597	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
598	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
599
600	what = "sock_create_kern";
601	err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
602			       SOCK_STREAM, IPPROTO_TCP, &sock);
603	if (err < 0) {
604		sock = NULL;
605		goto out;
606	}
607
608	sock->sk->sk_rcvtimeo =
609	sock->sk->sk_sndtimeo = connect_int * HZ;
610	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
611
612       /* explicitly bind to the configured IP as source IP
613	*  for the outgoing connections.
614	*  This is needed for multihomed hosts and to be
615	*  able to use lo: interfaces for drbd.
616	* Make sure to use 0 as port number, so linux selects
617	*  a free one dynamically.
618	*/
619	what = "bind before connect";
620	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
621	if (err < 0)
622		goto out;
623
624	/* connect may fail, peer not yet available.
625	 * stay C_WF_CONNECTION, don't go Disconnecting! */
626	disconnect_on_error = 0;
627	what = "connect";
628	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
629
630out:
631	if (err < 0) {
632		if (sock) {
633			sock_release(sock);
634			sock = NULL;
635		}
636		switch (-err) {
637			/* timeout, busy, signal pending */
638		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
639		case EINTR: case ERESTARTSYS:
640			/* peer not (yet) available, network problem */
641		case ECONNREFUSED: case ENETUNREACH:
642		case EHOSTDOWN:    case EHOSTUNREACH:
643			disconnect_on_error = 0;
644			break;
645		default:
646			drbd_err(connection, "%s failed, err = %d\n", what, err);
647		}
648		if (disconnect_on_error)
649			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
650	}
651
652	return sock;
653}
654
655struct accept_wait_data {
656	struct drbd_connection *connection;
657	struct socket *s_listen;
658	struct completion door_bell;
659	void (*original_sk_state_change)(struct sock *sk);
660
661};
662
663static void drbd_incoming_connection(struct sock *sk)
664{
665	struct accept_wait_data *ad = sk->sk_user_data;
666	void (*state_change)(struct sock *sk);
667
668	state_change = ad->original_sk_state_change;
669	if (sk->sk_state == TCP_ESTABLISHED)
670		complete(&ad->door_bell);
671	state_change(sk);
672}
673
674static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
675{
676	int err, sndbuf_size, rcvbuf_size, my_addr_len;
677	struct sockaddr_in6 my_addr;
678	struct socket *s_listen;
679	struct net_conf *nc;
680	const char *what;
681
682	rcu_read_lock();
683	nc = rcu_dereference(connection->net_conf);
684	if (!nc) {
685		rcu_read_unlock();
686		return -EIO;
687	}
688	sndbuf_size = nc->sndbuf_size;
689	rcvbuf_size = nc->rcvbuf_size;
690	rcu_read_unlock();
691
692	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
693	memcpy(&my_addr, &connection->my_addr, my_addr_len);
694
695	what = "sock_create_kern";
696	err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
697			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
698	if (err) {
699		s_listen = NULL;
700		goto out;
701	}
702
703	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
704	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
705
706	what = "bind before listen";
707	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
708	if (err < 0)
709		goto out;
710
711	ad->s_listen = s_listen;
712	write_lock_bh(&s_listen->sk->sk_callback_lock);
713	ad->original_sk_state_change = s_listen->sk->sk_state_change;
714	s_listen->sk->sk_state_change = drbd_incoming_connection;
715	s_listen->sk->sk_user_data = ad;
716	write_unlock_bh(&s_listen->sk->sk_callback_lock);
717
718	what = "listen";
719	err = s_listen->ops->listen(s_listen, 5);
720	if (err < 0)
721		goto out;
722
723	return 0;
724out:
725	if (s_listen)
726		sock_release(s_listen);
727	if (err < 0) {
728		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
729			drbd_err(connection, "%s failed, err = %d\n", what, err);
730			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
731		}
732	}
733
734	return -EIO;
735}
736
737static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
738{
739	write_lock_bh(&sk->sk_callback_lock);
740	sk->sk_state_change = ad->original_sk_state_change;
741	sk->sk_user_data = NULL;
742	write_unlock_bh(&sk->sk_callback_lock);
743}
744
745static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
746{
747	int timeo, connect_int, err = 0;
748	struct socket *s_estab = NULL;
749	struct net_conf *nc;
750
751	rcu_read_lock();
752	nc = rcu_dereference(connection->net_conf);
753	if (!nc) {
754		rcu_read_unlock();
755		return NULL;
756	}
757	connect_int = nc->connect_int;
758	rcu_read_unlock();
759
760	timeo = connect_int * HZ;
761	/* 28.5% random jitter */
762	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
763
764	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
765	if (err <= 0)
766		return NULL;
767
768	err = kernel_accept(ad->s_listen, &s_estab, 0);
769	if (err < 0) {
770		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
771			drbd_err(connection, "accept failed, err = %d\n", err);
772			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
773		}
774	}
775
776	if (s_estab)
777		unregister_state_change(s_estab->sk, ad);
778
779	return s_estab;
780}
781
782static int decode_header(struct drbd_connection *, void *, struct packet_info *);
783
784static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
785			     enum drbd_packet cmd)
786{
787	if (!conn_prepare_command(connection, sock))
788		return -EIO;
789	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
790}
791
792static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
793{
794	unsigned int header_size = drbd_header_size(connection);
795	struct packet_info pi;
796	struct net_conf *nc;
797	int err;
798
799	rcu_read_lock();
800	nc = rcu_dereference(connection->net_conf);
801	if (!nc) {
802		rcu_read_unlock();
803		return -EIO;
804	}
805	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
806	rcu_read_unlock();
807
808	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
809	if (err != header_size) {
810		if (err >= 0)
811			err = -EIO;
812		return err;
813	}
814	err = decode_header(connection, connection->data.rbuf, &pi);
815	if (err)
816		return err;
817	return pi.cmd;
818}
819
820/**
821 * drbd_socket_okay() - Free the socket if its connection is not okay
822 * @sock:	pointer to the pointer to the socket.
823 */
824static bool drbd_socket_okay(struct socket **sock)
825{
826	int rr;
827	char tb[4];
828
829	if (!*sock)
830		return false;
831
832	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
833
834	if (rr > 0 || rr == -EAGAIN) {
835		return true;
836	} else {
837		sock_release(*sock);
838		*sock = NULL;
839		return false;
840	}
841}
842
843static bool connection_established(struct drbd_connection *connection,
844				   struct socket **sock1,
845				   struct socket **sock2)
846{
847	struct net_conf *nc;
848	int timeout;
849	bool ok;
850
851	if (!*sock1 || !*sock2)
852		return false;
853
854	rcu_read_lock();
855	nc = rcu_dereference(connection->net_conf);
856	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
857	rcu_read_unlock();
858	schedule_timeout_interruptible(timeout);
859
860	ok = drbd_socket_okay(sock1);
861	ok = drbd_socket_okay(sock2) && ok;
862
863	return ok;
864}
865
866/* Gets called if a connection is established, or if a new minor gets created
867   in a connection */
868int drbd_connected(struct drbd_peer_device *peer_device)
869{
870	struct drbd_device *device = peer_device->device;
871	int err;
872
873	atomic_set(&device->packet_seq, 0);
874	device->peer_seq = 0;
875
876	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
877		&peer_device->connection->cstate_mutex :
878		&device->own_state_mutex;
879
880	err = drbd_send_sync_param(peer_device);
881	if (!err)
882		err = drbd_send_sizes(peer_device, 0, 0);
883	if (!err)
884		err = drbd_send_uuids(peer_device);
885	if (!err)
886		err = drbd_send_current_state(peer_device);
887	clear_bit(USE_DEGR_WFC_T, &device->flags);
888	clear_bit(RESIZE_PENDING, &device->flags);
889	atomic_set(&device->ap_in_flight, 0);
890	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
891	return err;
892}
893
894/*
895 * return values:
896 *   1 yes, we have a valid connection
897 *   0 oops, did not work out, please try again
898 *  -1 peer talks different language,
899 *     no point in trying again, please go standalone.
900 *  -2 We do not have a network config...
901 */
902static int conn_connect(struct drbd_connection *connection)
903{
904	struct drbd_socket sock, msock;
905	struct drbd_peer_device *peer_device;
906	struct net_conf *nc;
907	int vnr, timeout, h;
908	bool discard_my_data, ok;
909	enum drbd_state_rv rv;
910	struct accept_wait_data ad = {
911		.connection = connection,
912		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
913	};
914
915	clear_bit(DISCONNECT_SENT, &connection->flags);
916	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
917		return -2;
918
919	mutex_init(&sock.mutex);
920	sock.sbuf = connection->data.sbuf;
921	sock.rbuf = connection->data.rbuf;
922	sock.socket = NULL;
923	mutex_init(&msock.mutex);
924	msock.sbuf = connection->meta.sbuf;
925	msock.rbuf = connection->meta.rbuf;
926	msock.socket = NULL;
927
928	/* Assume that the peer only understands protocol 80 until we know better.  */
929	connection->agreed_pro_version = 80;
930
931	if (prepare_listen_socket(connection, &ad))
932		return 0;
933
934	do {
935		struct socket *s;
936
937		s = drbd_try_connect(connection);
938		if (s) {
939			if (!sock.socket) {
940				sock.socket = s;
941				send_first_packet(connection, &sock, P_INITIAL_DATA);
942			} else if (!msock.socket) {
943				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
944				msock.socket = s;
945				send_first_packet(connection, &msock, P_INITIAL_META);
946			} else {
947				drbd_err(connection, "Logic error in conn_connect()\n");
948				goto out_release_sockets;
949			}
950		}
951
952		if (connection_established(connection, &sock.socket, &msock.socket))
953			break;
954
955retry:
956		s = drbd_wait_for_connect(connection, &ad);
957		if (s) {
958			int fp = receive_first_packet(connection, s);
959			drbd_socket_okay(&sock.socket);
960			drbd_socket_okay(&msock.socket);
961			switch (fp) {
962			case P_INITIAL_DATA:
963				if (sock.socket) {
964					drbd_warn(connection, "initial packet S crossed\n");
965					sock_release(sock.socket);
966					sock.socket = s;
967					goto randomize;
968				}
969				sock.socket = s;
970				break;
971			case P_INITIAL_META:
972				set_bit(RESOLVE_CONFLICTS, &connection->flags);
973				if (msock.socket) {
974					drbd_warn(connection, "initial packet M crossed\n");
975					sock_release(msock.socket);
976					msock.socket = s;
977					goto randomize;
978				}
979				msock.socket = s;
980				break;
981			default:
982				drbd_warn(connection, "Error receiving initial packet\n");
983				sock_release(s);
984randomize:
985				if (prandom_u32() & 1)
986					goto retry;
987			}
988		}
989
990		if (connection->cstate <= C_DISCONNECTING)
991			goto out_release_sockets;
992		if (signal_pending(current)) {
993			flush_signals(current);
994			smp_rmb();
995			if (get_t_state(&connection->receiver) == EXITING)
996				goto out_release_sockets;
997		}
998
999		ok = connection_established(connection, &sock.socket, &msock.socket);
1000	} while (!ok);
1001
1002	if (ad.s_listen)
1003		sock_release(ad.s_listen);
1004
1005	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1006	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1007
1008	sock.socket->sk->sk_allocation = GFP_NOIO;
1009	msock.socket->sk->sk_allocation = GFP_NOIO;
1010
1011	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1012	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1013
1014	/* NOT YET ...
1015	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1016	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1017	 * first set it to the P_CONNECTION_FEATURES timeout,
1018	 * which we set to 4x the configured ping_timeout. */
1019	rcu_read_lock();
1020	nc = rcu_dereference(connection->net_conf);
1021
1022	sock.socket->sk->sk_sndtimeo =
1023	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1024
1025	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1026	timeout = nc->timeout * HZ / 10;
1027	discard_my_data = nc->discard_my_data;
1028	rcu_read_unlock();
1029
1030	msock.socket->sk->sk_sndtimeo = timeout;
1031
1032	/* we don't want delays.
1033	 * we use TCP_CORK where appropriate, though */
1034	drbd_tcp_nodelay(sock.socket);
1035	drbd_tcp_nodelay(msock.socket);
1036
1037	connection->data.socket = sock.socket;
1038	connection->meta.socket = msock.socket;
1039	connection->last_received = jiffies;
1040
1041	h = drbd_do_features(connection);
1042	if (h <= 0)
1043		return h;
1044
1045	if (connection->cram_hmac_tfm) {
1046		/* drbd_request_state(device, NS(conn, WFAuth)); */
1047		switch (drbd_do_auth(connection)) {
1048		case -1:
1049			drbd_err(connection, "Authentication of peer failed\n");
1050			return -1;
1051		case 0:
1052			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1053			return 0;
1054		}
1055	}
1056
1057	connection->data.socket->sk->sk_sndtimeo = timeout;
1058	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1059
1060	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1061		return -1;
1062
1063	/* Prevent a race between resync-handshake and
1064	 * being promoted to Primary.
1065	 *
1066	 * Grab and release the state mutex, so we know that any current
1067	 * drbd_set_role() is finished, and any incoming drbd_set_role
1068	 * will see the STATE_SENT flag, and wait for it to be cleared.
1069	 */
1070	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1071		mutex_lock(peer_device->device->state_mutex);
1072
1073	set_bit(STATE_SENT, &connection->flags);
1074
1075	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1076		mutex_unlock(peer_device->device->state_mutex);
1077
1078	rcu_read_lock();
1079	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1080		struct drbd_device *device = peer_device->device;
1081		kref_get(&device->kref);
1082		rcu_read_unlock();
1083
1084		if (discard_my_data)
1085			set_bit(DISCARD_MY_DATA, &device->flags);
1086		else
1087			clear_bit(DISCARD_MY_DATA, &device->flags);
1088
1089		drbd_connected(peer_device);
1090		kref_put(&device->kref, drbd_destroy_device);
1091		rcu_read_lock();
1092	}
1093	rcu_read_unlock();
1094
1095	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1096	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1097		clear_bit(STATE_SENT, &connection->flags);
1098		return 0;
1099	}
1100
1101	drbd_thread_start(&connection->asender);
1102
1103	mutex_lock(&connection->resource->conf_update);
1104	/* The discard_my_data flag is a single-shot modifier to the next
1105	 * connection attempt, the handshake of which is now well underway.
1106	 * No need for rcu style copying of the whole struct
1107	 * just to clear a single value. */
1108	connection->net_conf->discard_my_data = 0;
1109	mutex_unlock(&connection->resource->conf_update);
1110
1111	return h;
1112
1113out_release_sockets:
1114	if (ad.s_listen)
1115		sock_release(ad.s_listen);
1116	if (sock.socket)
1117		sock_release(sock.socket);
1118	if (msock.socket)
1119		sock_release(msock.socket);
1120	return -1;
1121}
1122
1123static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1124{
1125	unsigned int header_size = drbd_header_size(connection);
1126
1127	if (header_size == sizeof(struct p_header100) &&
1128	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1129		struct p_header100 *h = header;
1130		if (h->pad != 0) {
1131			drbd_err(connection, "Header padding is not zero\n");
1132			return -EINVAL;
1133		}
1134		pi->vnr = be16_to_cpu(h->volume);
1135		pi->cmd = be16_to_cpu(h->command);
1136		pi->size = be32_to_cpu(h->length);
1137	} else if (header_size == sizeof(struct p_header95) &&
1138		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1139		struct p_header95 *h = header;
1140		pi->cmd = be16_to_cpu(h->command);
1141		pi->size = be32_to_cpu(h->length);
1142		pi->vnr = 0;
1143	} else if (header_size == sizeof(struct p_header80) &&
1144		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1145		struct p_header80 *h = header;
1146		pi->cmd = be16_to_cpu(h->command);
1147		pi->size = be16_to_cpu(h->length);
1148		pi->vnr = 0;
1149	} else {
1150		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1151			 be32_to_cpu(*(__be32 *)header),
1152			 connection->agreed_pro_version);
1153		return -EINVAL;
1154	}
1155	pi->data = header + header_size;
1156	return 0;
1157}
1158
1159static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1160{
1161	void *buffer = connection->data.rbuf;
1162	int err;
1163
1164	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1165	if (err)
1166		return err;
1167
1168	err = decode_header(connection, buffer, pi);
1169	connection->last_received = jiffies;
1170
1171	return err;
1172}
1173
1174static void drbd_flush(struct drbd_connection *connection)
1175{
1176	int rv;
1177	struct drbd_peer_device *peer_device;
1178	int vnr;
1179
1180	if (connection->resource->write_ordering >= WO_bdev_flush) {
1181		rcu_read_lock();
1182		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1183			struct drbd_device *device = peer_device->device;
1184
1185			if (!get_ldev(device))
1186				continue;
1187			kref_get(&device->kref);
1188			rcu_read_unlock();
1189
1190			/* Right now, we have only this one synchronous code path
1191			 * for flushes between request epochs.
1192			 * We may want to make those asynchronous,
1193			 * or at least parallelize the flushes to the volume devices.
1194			 */
1195			device->flush_jif = jiffies;
1196			set_bit(FLUSH_PENDING, &device->flags);
1197			rv = blkdev_issue_flush(device->ldev->backing_bdev,
1198					GFP_NOIO, NULL);
1199			clear_bit(FLUSH_PENDING, &device->flags);
1200			if (rv) {
1201				drbd_info(device, "local disk flush failed with status %d\n", rv);
1202				/* would rather check on EOPNOTSUPP, but that is not reliable.
1203				 * don't try again for ANY return value != 0
1204				 * if (rv == -EOPNOTSUPP) */
1205				drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
1206			}
1207			put_ldev(device);
1208			kref_put(&device->kref, drbd_destroy_device);
1209
1210			rcu_read_lock();
1211			if (rv)
1212				break;
1213		}
1214		rcu_read_unlock();
1215	}
1216}
1217
1218/**
1219 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1220 * @device:	DRBD device.
1221 * @epoch:	Epoch object.
1222 * @ev:		Epoch event.
1223 */
1224static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1225					       struct drbd_epoch *epoch,
1226					       enum epoch_event ev)
1227{
1228	int epoch_size;
1229	struct drbd_epoch *next_epoch;
1230	enum finish_epoch rv = FE_STILL_LIVE;
1231
1232	spin_lock(&connection->epoch_lock);
1233	do {
1234		next_epoch = NULL;
1235
1236		epoch_size = atomic_read(&epoch->epoch_size);
1237
1238		switch (ev & ~EV_CLEANUP) {
1239		case EV_PUT:
1240			atomic_dec(&epoch->active);
1241			break;
1242		case EV_GOT_BARRIER_NR:
1243			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1244			break;
1245		case EV_BECAME_LAST:
1246			/* nothing to do*/
1247			break;
1248		}
1249
1250		if (epoch_size != 0 &&
1251		    atomic_read(&epoch->active) == 0 &&
1252		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1253			if (!(ev & EV_CLEANUP)) {
1254				spin_unlock(&connection->epoch_lock);
1255				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1256				spin_lock(&connection->epoch_lock);
1257			}
1258#if 0
1259			/* FIXME: dec unacked on connection, once we have
1260			 * something to count pending connection packets in. */
1261			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1262				dec_unacked(epoch->connection);
1263#endif
1264
1265			if (connection->current_epoch != epoch) {
1266				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1267				list_del(&epoch->list);
1268				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1269				connection->epochs--;
1270				kfree(epoch);
1271
1272				if (rv == FE_STILL_LIVE)
1273					rv = FE_DESTROYED;
1274			} else {
1275				epoch->flags = 0;
1276				atomic_set(&epoch->epoch_size, 0);
1277				/* atomic_set(&epoch->active, 0); is already zero */
1278				if (rv == FE_STILL_LIVE)
1279					rv = FE_RECYCLED;
1280			}
1281		}
1282
1283		if (!next_epoch)
1284			break;
1285
1286		epoch = next_epoch;
1287	} while (1);
1288
1289	spin_unlock(&connection->epoch_lock);
1290
1291	return rv;
1292}
1293
1294static enum write_ordering_e
1295max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1296{
1297	struct disk_conf *dc;
1298
1299	dc = rcu_dereference(bdev->disk_conf);
1300
1301	if (wo == WO_bdev_flush && !dc->disk_flushes)
1302		wo = WO_drain_io;
1303	if (wo == WO_drain_io && !dc->disk_drain)
1304		wo = WO_none;
1305
1306	return wo;
1307}
1308
1309/**
1310 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1311 * @connection:	DRBD connection.
1312 * @wo:		Write ordering method to try.
1313 */
1314void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1315			      enum write_ordering_e wo)
1316{
1317	struct drbd_device *device;
1318	enum write_ordering_e pwo;
1319	int vnr;
1320	static char *write_ordering_str[] = {
1321		[WO_none] = "none",
1322		[WO_drain_io] = "drain",
1323		[WO_bdev_flush] = "flush",
1324	};
1325
1326	pwo = resource->write_ordering;
1327	if (wo != WO_bdev_flush)
1328		wo = min(pwo, wo);
1329	rcu_read_lock();
1330	idr_for_each_entry(&resource->devices, device, vnr) {
1331		if (get_ldev(device)) {
1332			wo = max_allowed_wo(device->ldev, wo);
1333			if (device->ldev == bdev)
1334				bdev = NULL;
1335			put_ldev(device);
1336		}
1337	}
1338
1339	if (bdev)
1340		wo = max_allowed_wo(bdev, wo);
1341
1342	rcu_read_unlock();
1343
1344	resource->write_ordering = wo;
1345	if (pwo != resource->write_ordering || wo == WO_bdev_flush)
1346		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1347}
1348
1349/**
1350 * drbd_submit_peer_request()
1351 * @device:	DRBD device.
1352 * @peer_req:	peer request
1353 * @rw:		flag field, see bio->bi_rw
1354 *
1355 * May spread the pages to multiple bios,
1356 * depending on bio_add_page restrictions.
1357 *
1358 * Returns 0 if all bios have been submitted,
1359 * -ENOMEM if we could not allocate enough bios,
1360 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1361 *  single page to an empty bio (which should never happen and likely indicates
1362 *  that the lower level IO stack is in some way broken). This has been observed
1363 *  on certain Xen deployments.
1364 */
1365/* TODO allocate from our own bio_set. */
1366int drbd_submit_peer_request(struct drbd_device *device,
1367			     struct drbd_peer_request *peer_req,
1368			     const unsigned rw, const int fault_type)
1369{
1370	struct bio *bios = NULL;
1371	struct bio *bio;
1372	struct page *page = peer_req->pages;
1373	sector_t sector = peer_req->i.sector;
1374	unsigned data_size = peer_req->i.size;
1375	unsigned n_bios = 0;
1376	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1377	int err = -ENOMEM;
1378
1379	if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1380		/* wait for all pending IO completions, before we start
1381		 * zeroing things out. */
1382		conn_wait_active_ee_empty(first_peer_device(device)->connection);
1383		/* add it to the active list now,
1384		 * so we can find it to present it in debugfs */
1385		peer_req->submit_jif = jiffies;
1386		peer_req->flags |= EE_SUBMITTED;
1387		spin_lock_irq(&device->resource->req_lock);
1388		list_add_tail(&peer_req->w.list, &device->active_ee);
1389		spin_unlock_irq(&device->resource->req_lock);
1390		if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1391			sector, data_size >> 9, GFP_NOIO))
1392			peer_req->flags |= EE_WAS_ERROR;
1393		drbd_endio_write_sec_final(peer_req);
1394		return 0;
1395	}
1396
1397	/* Discards don't have any payload.
1398	 * But the scsi layer still expects a bio_vec it can use internally,
1399	 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1400	if (peer_req->flags & EE_IS_TRIM)
1401		nr_pages = 1;
1402
1403	/* In most cases, we will only need one bio.  But in case the lower
1404	 * level restrictions happen to be different at this offset on this
1405	 * side than those of the sending peer, we may need to submit the
1406	 * request in more than one bio.
1407	 *
1408	 * Plain bio_alloc is good enough here, this is no DRBD internally
1409	 * generated bio, but a bio allocated on behalf of the peer.
1410	 */
1411next_bio:
1412	bio = bio_alloc(GFP_NOIO, nr_pages);
1413	if (!bio) {
1414		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1415		goto fail;
1416	}
1417	/* > peer_req->i.sector, unless this is the first bio */
1418	bio->bi_iter.bi_sector = sector;
1419	bio->bi_bdev = device->ldev->backing_bdev;
1420	bio->bi_rw = rw;
1421	bio->bi_private = peer_req;
1422	bio->bi_end_io = drbd_peer_request_endio;
1423
1424	bio->bi_next = bios;
1425	bios = bio;
1426	++n_bios;
1427
1428	if (rw & REQ_DISCARD) {
1429		bio->bi_iter.bi_size = data_size;
1430		goto submit;
1431	}
1432
1433	page_chain_for_each(page) {
1434		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1435		if (!bio_add_page(bio, page, len, 0)) {
1436			/* A single page must always be possible!
1437			 * But in case it fails anyways,
1438			 * we deal with it, and complain (below). */
1439			if (bio->bi_vcnt == 0) {
1440				drbd_err(device,
1441					"bio_add_page failed for len=%u, "
1442					"bi_vcnt=0 (bi_sector=%llu)\n",
1443					len, (uint64_t)bio->bi_iter.bi_sector);
1444				err = -ENOSPC;
1445				goto fail;
1446			}
1447			goto next_bio;
1448		}
1449		data_size -= len;
1450		sector += len >> 9;
1451		--nr_pages;
1452	}
1453	D_ASSERT(device, data_size == 0);
1454submit:
1455	D_ASSERT(device, page == NULL);
1456
1457	atomic_set(&peer_req->pending_bios, n_bios);
1458	/* for debugfs: update timestamp, mark as submitted */
1459	peer_req->submit_jif = jiffies;
1460	peer_req->flags |= EE_SUBMITTED;
1461	do {
1462		bio = bios;
1463		bios = bios->bi_next;
1464		bio->bi_next = NULL;
1465
1466		drbd_generic_make_request(device, fault_type, bio);
1467	} while (bios);
1468	return 0;
1469
1470fail:
1471	while (bios) {
1472		bio = bios;
1473		bios = bios->bi_next;
1474		bio_put(bio);
1475	}
1476	return err;
1477}
1478
1479static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1480					     struct drbd_peer_request *peer_req)
1481{
1482	struct drbd_interval *i = &peer_req->i;
1483
1484	drbd_remove_interval(&device->write_requests, i);
1485	drbd_clear_interval(i);
1486
1487	/* Wake up any processes waiting for this peer request to complete.  */
1488	if (i->waiting)
1489		wake_up(&device->misc_wait);
1490}
1491
1492static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1493{
1494	struct drbd_peer_device *peer_device;
1495	int vnr;
1496
1497	rcu_read_lock();
1498	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1499		struct drbd_device *device = peer_device->device;
1500
1501		kref_get(&device->kref);
1502		rcu_read_unlock();
1503		drbd_wait_ee_list_empty(device, &device->active_ee);
1504		kref_put(&device->kref, drbd_destroy_device);
1505		rcu_read_lock();
1506	}
1507	rcu_read_unlock();
1508}
1509
1510static struct drbd_peer_device *
1511conn_peer_device(struct drbd_connection *connection, int volume_number)
1512{
1513	return idr_find(&connection->peer_devices, volume_number);
1514}
1515
1516static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1517{
1518	int rv;
1519	struct p_barrier *p = pi->data;
1520	struct drbd_epoch *epoch;
1521
1522	/* FIXME these are unacked on connection,
1523	 * not a specific (peer)device.
1524	 */
1525	connection->current_epoch->barrier_nr = p->barrier;
1526	connection->current_epoch->connection = connection;
1527	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1528
1529	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1530	 * the activity log, which means it would not be resynced in case the
1531	 * R_PRIMARY crashes now.
1532	 * Therefore we must send the barrier_ack after the barrier request was
1533	 * completed. */
1534	switch (connection->resource->write_ordering) {
1535	case WO_none:
1536		if (rv == FE_RECYCLED)
1537			return 0;
1538
1539		/* receiver context, in the writeout path of the other node.
1540		 * avoid potential distributed deadlock */
1541		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1542		if (epoch)
1543			break;
1544		else
1545			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1546			/* Fall through */
1547
1548	case WO_bdev_flush:
1549	case WO_drain_io:
1550		conn_wait_active_ee_empty(connection);
1551		drbd_flush(connection);
1552
1553		if (atomic_read(&connection->current_epoch->epoch_size)) {
1554			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1555			if (epoch)
1556				break;
1557		}
1558
1559		return 0;
1560	default:
1561		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1562			 connection->resource->write_ordering);
1563		return -EIO;
1564	}
1565
1566	epoch->flags = 0;
1567	atomic_set(&epoch->epoch_size, 0);
1568	atomic_set(&epoch->active, 0);
1569
1570	spin_lock(&connection->epoch_lock);
1571	if (atomic_read(&connection->current_epoch->epoch_size)) {
1572		list_add(&epoch->list, &connection->current_epoch->list);
1573		connection->current_epoch = epoch;
1574		connection->epochs++;
1575	} else {
1576		/* The current_epoch got recycled while we allocated this one... */
1577		kfree(epoch);
1578	}
1579	spin_unlock(&connection->epoch_lock);
1580
1581	return 0;
1582}
1583
1584/* used from receive_RSDataReply (recv_resync_read)
1585 * and from receive_Data */
1586static struct drbd_peer_request *
1587read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1588	      struct packet_info *pi) __must_hold(local)
1589{
1590	struct drbd_device *device = peer_device->device;
1591	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1592	struct drbd_peer_request *peer_req;
1593	struct page *page;
1594	int digest_size, err;
1595	unsigned int data_size = pi->size, ds;
1596	void *dig_in = peer_device->connection->int_dig_in;
1597	void *dig_vv = peer_device->connection->int_dig_vv;
1598	unsigned long *data;
1599	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1600
1601	digest_size = 0;
1602	if (!trim && peer_device->connection->peer_integrity_tfm) {
1603		digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1604		/*
1605		 * FIXME: Receive the incoming digest into the receive buffer
1606		 *	  here, together with its struct p_data?
1607		 */
1608		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1609		if (err)
1610			return NULL;
1611		data_size -= digest_size;
1612	}
1613
1614	if (trim) {
1615		D_ASSERT(peer_device, data_size == 0);
1616		data_size = be32_to_cpu(trim->size);
1617	}
1618
1619	if (!expect(IS_ALIGNED(data_size, 512)))
1620		return NULL;
1621	/* prepare for larger trim requests. */
1622	if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1623		return NULL;
1624
1625	/* even though we trust out peer,
1626	 * we sometimes have to double check. */
1627	if (sector + (data_size>>9) > capacity) {
1628		drbd_err(device, "request from peer beyond end of local disk: "
1629			"capacity: %llus < sector: %llus + size: %u\n",
1630			(unsigned long long)capacity,
1631			(unsigned long long)sector, data_size);
1632		return NULL;
1633	}
1634
1635	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1636	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1637	 * which in turn might block on the other node at this very place.  */
1638	peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1639	if (!peer_req)
1640		return NULL;
1641
1642	peer_req->flags |= EE_WRITE;
1643	if (trim)
1644		return peer_req;
1645
1646	ds = data_size;
1647	page = peer_req->pages;
1648	page_chain_for_each(page) {
1649		unsigned len = min_t(int, ds, PAGE_SIZE);
1650		data = kmap(page);
1651		err = drbd_recv_all_warn(peer_device->connection, data, len);
1652		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1653			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1654			data[0] = data[0] ^ (unsigned long)-1;
1655		}
1656		kunmap(page);
1657		if (err) {
1658			drbd_free_peer_req(device, peer_req);
1659			return NULL;
1660		}
1661		ds -= len;
1662	}
1663
1664	if (digest_size) {
1665		drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1666		if (memcmp(dig_in, dig_vv, digest_size)) {
1667			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1668				(unsigned long long)sector, data_size);
1669			drbd_free_peer_req(device, peer_req);
1670			return NULL;
1671		}
1672	}
1673	device->recv_cnt += data_size >> 9;
1674	return peer_req;
1675}
1676
1677/* drbd_drain_block() just takes a data block
1678 * out of the socket input buffer, and discards it.
1679 */
1680static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1681{
1682	struct page *page;
1683	int err = 0;
1684	void *data;
1685
1686	if (!data_size)
1687		return 0;
1688
1689	page = drbd_alloc_pages(peer_device, 1, 1);
1690
1691	data = kmap(page);
1692	while (data_size) {
1693		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1694
1695		err = drbd_recv_all_warn(peer_device->connection, data, len);
1696		if (err)
1697			break;
1698		data_size -= len;
1699	}
1700	kunmap(page);
1701	drbd_free_pages(peer_device->device, page, 0);
1702	return err;
1703}
1704
1705static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1706			   sector_t sector, int data_size)
1707{
1708	struct bio_vec bvec;
1709	struct bvec_iter iter;
1710	struct bio *bio;
1711	int digest_size, err, expect;
1712	void *dig_in = peer_device->connection->int_dig_in;
1713	void *dig_vv = peer_device->connection->int_dig_vv;
1714
1715	digest_size = 0;
1716	if (peer_device->connection->peer_integrity_tfm) {
1717		digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1718		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1719		if (err)
1720			return err;
1721		data_size -= digest_size;
1722	}
1723
1724	/* optimistically update recv_cnt.  if receiving fails below,
1725	 * we disconnect anyways, and counters will be reset. */
1726	peer_device->device->recv_cnt += data_size>>9;
1727
1728	bio = req->master_bio;
1729	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1730
1731	bio_for_each_segment(bvec, bio, iter) {
1732		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1733		expect = min_t(int, data_size, bvec.bv_len);
1734		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1735		kunmap(bvec.bv_page);
1736		if (err)
1737			return err;
1738		data_size -= expect;
1739	}
1740
1741	if (digest_size) {
1742		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1743		if (memcmp(dig_in, dig_vv, digest_size)) {
1744			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1745			return -EINVAL;
1746		}
1747	}
1748
1749	D_ASSERT(peer_device->device, data_size == 0);
1750	return 0;
1751}
1752
1753/*
1754 * e_end_resync_block() is called in asender context via
1755 * drbd_finish_peer_reqs().
1756 */
1757static int e_end_resync_block(struct drbd_work *w, int unused)
1758{
1759	struct drbd_peer_request *peer_req =
1760		container_of(w, struct drbd_peer_request, w);
1761	struct drbd_peer_device *peer_device = peer_req->peer_device;
1762	struct drbd_device *device = peer_device->device;
1763	sector_t sector = peer_req->i.sector;
1764	int err;
1765
1766	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1767
1768	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1769		drbd_set_in_sync(device, sector, peer_req->i.size);
1770		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1771	} else {
1772		/* Record failure to sync */
1773		drbd_rs_failed_io(device, sector, peer_req->i.size);
1774
1775		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1776	}
1777	dec_unacked(device);
1778
1779	return err;
1780}
1781
1782static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1783			    struct packet_info *pi) __releases(local)
1784{
1785	struct drbd_device *device = peer_device->device;
1786	struct drbd_peer_request *peer_req;
1787
1788	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1789	if (!peer_req)
1790		goto fail;
1791
1792	dec_rs_pending(device);
1793
1794	inc_unacked(device);
1795	/* corresponding dec_unacked() in e_end_resync_block()
1796	 * respective _drbd_clear_done_ee */
1797
1798	peer_req->w.cb = e_end_resync_block;
1799	peer_req->submit_jif = jiffies;
1800
1801	spin_lock_irq(&device->resource->req_lock);
1802	list_add_tail(&peer_req->w.list, &device->sync_ee);
1803	spin_unlock_irq(&device->resource->req_lock);
1804
1805	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1806	if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1807		return 0;
1808
1809	/* don't care for the reason here */
1810	drbd_err(device, "submit failed, triggering re-connect\n");
1811	spin_lock_irq(&device->resource->req_lock);
1812	list_del(&peer_req->w.list);
1813	spin_unlock_irq(&device->resource->req_lock);
1814
1815	drbd_free_peer_req(device, peer_req);
1816fail:
1817	put_ldev(device);
1818	return -EIO;
1819}
1820
1821static struct drbd_request *
1822find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1823	     sector_t sector, bool missing_ok, const char *func)
1824{
1825	struct drbd_request *req;
1826
1827	/* Request object according to our peer */
1828	req = (struct drbd_request *)(unsigned long)id;
1829	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1830		return req;
1831	if (!missing_ok) {
1832		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1833			(unsigned long)id, (unsigned long long)sector);
1834	}
1835	return NULL;
1836}
1837
1838static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1839{
1840	struct drbd_peer_device *peer_device;
1841	struct drbd_device *device;
1842	struct drbd_request *req;
1843	sector_t sector;
1844	int err;
1845	struct p_data *p = pi->data;
1846
1847	peer_device = conn_peer_device(connection, pi->vnr);
1848	if (!peer_device)
1849		return -EIO;
1850	device = peer_device->device;
1851
1852	sector = be64_to_cpu(p->sector);
1853
1854	spin_lock_irq(&device->resource->req_lock);
1855	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1856	spin_unlock_irq(&device->resource->req_lock);
1857	if (unlikely(!req))
1858		return -EIO;
1859
1860	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1861	 * special casing it there for the various failure cases.
1862	 * still no race with drbd_fail_pending_reads */
1863	err = recv_dless_read(peer_device, req, sector, pi->size);
1864	if (!err)
1865		req_mod(req, DATA_RECEIVED);
1866	/* else: nothing. handled from drbd_disconnect...
1867	 * I don't think we may complete this just yet
1868	 * in case we are "on-disconnect: freeze" */
1869
1870	return err;
1871}
1872
1873static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1874{
1875	struct drbd_peer_device *peer_device;
1876	struct drbd_device *device;
1877	sector_t sector;
1878	int err;
1879	struct p_data *p = pi->data;
1880
1881	peer_device = conn_peer_device(connection, pi->vnr);
1882	if (!peer_device)
1883		return -EIO;
1884	device = peer_device->device;
1885
1886	sector = be64_to_cpu(p->sector);
1887	D_ASSERT(device, p->block_id == ID_SYNCER);
1888
1889	if (get_ldev(device)) {
1890		/* data is submitted to disk within recv_resync_read.
1891		 * corresponding put_ldev done below on error,
1892		 * or in drbd_peer_request_endio. */
1893		err = recv_resync_read(peer_device, sector, pi);
1894	} else {
1895		if (__ratelimit(&drbd_ratelimit_state))
1896			drbd_err(device, "Can not write resync data to local disk.\n");
1897
1898		err = drbd_drain_block(peer_device, pi->size);
1899
1900		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1901	}
1902
1903	atomic_add(pi->size >> 9, &device->rs_sect_in);
1904
1905	return err;
1906}
1907
1908static void restart_conflicting_writes(struct drbd_device *device,
1909				       sector_t sector, int size)
1910{
1911	struct drbd_interval *i;
1912	struct drbd_request *req;
1913
1914	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1915		if (!i->local)
1916			continue;
1917		req = container_of(i, struct drbd_request, i);
1918		if (req->rq_state & RQ_LOCAL_PENDING ||
1919		    !(req->rq_state & RQ_POSTPONED))
1920			continue;
1921		/* as it is RQ_POSTPONED, this will cause it to
1922		 * be queued on the retry workqueue. */
1923		__req_mod(req, CONFLICT_RESOLVED, NULL);
1924	}
1925}
1926
1927/*
1928 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1929 */
1930static int e_end_block(struct drbd_work *w, int cancel)
1931{
1932	struct drbd_peer_request *peer_req =
1933		container_of(w, struct drbd_peer_request, w);
1934	struct drbd_peer_device *peer_device = peer_req->peer_device;
1935	struct drbd_device *device = peer_device->device;
1936	sector_t sector = peer_req->i.sector;
1937	int err = 0, pcmd;
1938
1939	if (peer_req->flags & EE_SEND_WRITE_ACK) {
1940		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1941			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1942				device->state.conn <= C_PAUSED_SYNC_T &&
1943				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1944				P_RS_WRITE_ACK : P_WRITE_ACK;
1945			err = drbd_send_ack(peer_device, pcmd, peer_req);
1946			if (pcmd == P_RS_WRITE_ACK)
1947				drbd_set_in_sync(device, sector, peer_req->i.size);
1948		} else {
1949			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1950			/* we expect it to be marked out of sync anyways...
1951			 * maybe assert this?  */
1952		}
1953		dec_unacked(device);
1954	}
1955
1956	/* we delete from the conflict detection hash _after_ we sent out the
1957	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1958	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1959		spin_lock_irq(&device->resource->req_lock);
1960		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1961		drbd_remove_epoch_entry_interval(device, peer_req);
1962		if (peer_req->flags & EE_RESTART_REQUESTS)
1963			restart_conflicting_writes(device, sector, peer_req->i.size);
1964		spin_unlock_irq(&device->resource->req_lock);
1965	} else
1966		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1967
1968	drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1969
1970	return err;
1971}
1972
1973static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1974{
1975	struct drbd_peer_request *peer_req =
1976		container_of(w, struct drbd_peer_request, w);
1977	struct drbd_peer_device *peer_device = peer_req->peer_device;
1978	int err;
1979
1980	err = drbd_send_ack(peer_device, ack, peer_req);
1981	dec_unacked(peer_device->device);
1982
1983	return err;
1984}
1985
1986static int e_send_superseded(struct drbd_work *w, int unused)
1987{
1988	return e_send_ack(w, P_SUPERSEDED);
1989}
1990
1991static int e_send_retry_write(struct drbd_work *w, int unused)
1992{
1993	struct drbd_peer_request *peer_req =
1994		container_of(w, struct drbd_peer_request, w);
1995	struct drbd_connection *connection = peer_req->peer_device->connection;
1996
1997	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1998			     P_RETRY_WRITE : P_SUPERSEDED);
1999}
2000
2001static bool seq_greater(u32 a, u32 b)
2002{
2003	/*
2004	 * We assume 32-bit wrap-around here.
2005	 * For 24-bit wrap-around, we would have to shift:
2006	 *  a <<= 8; b <<= 8;
2007	 */
2008	return (s32)a - (s32)b > 0;
2009}
2010
2011static u32 seq_max(u32 a, u32 b)
2012{
2013	return seq_greater(a, b) ? a : b;
2014}
2015
2016static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2017{
2018	struct drbd_device *device = peer_device->device;
2019	unsigned int newest_peer_seq;
2020
2021	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2022		spin_lock(&device->peer_seq_lock);
2023		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2024		device->peer_seq = newest_peer_seq;
2025		spin_unlock(&device->peer_seq_lock);
2026		/* wake up only if we actually changed device->peer_seq */
2027		if (peer_seq == newest_peer_seq)
2028			wake_up(&device->seq_wait);
2029	}
2030}
2031
2032static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2033{
2034	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2035}
2036
2037/* maybe change sync_ee into interval trees as well? */
2038static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2039{
2040	struct drbd_peer_request *rs_req;
2041	bool rv = 0;
2042
2043	spin_lock_irq(&device->resource->req_lock);
2044	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2045		if (overlaps(peer_req->i.sector, peer_req->i.size,
2046			     rs_req->i.sector, rs_req->i.size)) {
2047			rv = 1;
2048			break;
2049		}
2050	}
2051	spin_unlock_irq(&device->resource->req_lock);
2052
2053	return rv;
2054}
2055
2056/* Called from receive_Data.
2057 * Synchronize packets on sock with packets on msock.
2058 *
2059 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2060 * packet traveling on msock, they are still processed in the order they have
2061 * been sent.
2062 *
2063 * Note: we don't care for Ack packets overtaking P_DATA packets.
2064 *
2065 * In case packet_seq is larger than device->peer_seq number, there are
2066 * outstanding packets on the msock. We wait for them to arrive.
2067 * In case we are the logically next packet, we update device->peer_seq
2068 * ourselves. Correctly handles 32bit wrap around.
2069 *
2070 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2071 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2072 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2073 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2074 *
2075 * returns 0 if we may process the packet,
2076 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2077static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2078{
2079	struct drbd_device *device = peer_device->device;
2080	DEFINE_WAIT(wait);
2081	long timeout;
2082	int ret = 0, tp;
2083
2084	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2085		return 0;
2086
2087	spin_lock(&device->peer_seq_lock);
2088	for (;;) {
2089		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2090			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2091			break;
2092		}
2093
2094		if (signal_pending(current)) {
2095			ret = -ERESTARTSYS;
2096			break;
2097		}
2098
2099		rcu_read_lock();
2100		tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2101		rcu_read_unlock();
2102
2103		if (!tp)
2104			break;
2105
2106		/* Only need to wait if two_primaries is enabled */
2107		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2108		spin_unlock(&device->peer_seq_lock);
2109		rcu_read_lock();
2110		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2111		rcu_read_unlock();
2112		timeout = schedule_timeout(timeout);
2113		spin_lock(&device->peer_seq_lock);
2114		if (!timeout) {
2115			ret = -ETIMEDOUT;
2116			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2117			break;
2118		}
2119	}
2120	spin_unlock(&device->peer_seq_lock);
2121	finish_wait(&device->seq_wait, &wait);
2122	return ret;
2123}
2124
2125/* see also bio_flags_to_wire()
2126 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2127 * flags and back. We may replicate to other kernel versions. */
2128static unsigned long wire_flags_to_bio(u32 dpf)
2129{
2130	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2131		(dpf & DP_FUA ? REQ_FUA : 0) |
2132		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2133		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
2134}
2135
2136static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2137				    unsigned int size)
2138{
2139	struct drbd_interval *i;
2140
2141    repeat:
2142	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2143		struct drbd_request *req;
2144		struct bio_and_error m;
2145
2146		if (!i->local)
2147			continue;
2148		req = container_of(i, struct drbd_request, i);
2149		if (!(req->rq_state & RQ_POSTPONED))
2150			continue;
2151		req->rq_state &= ~RQ_POSTPONED;
2152		__req_mod(req, NEG_ACKED, &m);
2153		spin_unlock_irq(&device->resource->req_lock);
2154		if (m.bio)
2155			complete_master_bio(device, &m);
2156		spin_lock_irq(&device->resource->req_lock);
2157		goto repeat;
2158	}
2159}
2160
2161static int handle_write_conflicts(struct drbd_device *device,
2162				  struct drbd_peer_request *peer_req)
2163{
2164	struct drbd_connection *connection = peer_req->peer_device->connection;
2165	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2166	sector_t sector = peer_req->i.sector;
2167	const unsigned int size = peer_req->i.size;
2168	struct drbd_interval *i;
2169	bool equal;
2170	int err;
2171
2172	/*
2173	 * Inserting the peer request into the write_requests tree will prevent
2174	 * new conflicting local requests from being added.
2175	 */
2176	drbd_insert_interval(&device->write_requests, &peer_req->i);
2177
2178    repeat:
2179	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2180		if (i == &peer_req->i)
2181			continue;
2182		if (i->completed)
2183			continue;
2184
2185		if (!i->local) {
2186			/*
2187			 * Our peer has sent a conflicting remote request; this
2188			 * should not happen in a two-node setup.  Wait for the
2189			 * earlier peer request to complete.
2190			 */
2191			err = drbd_wait_misc(device, i);
2192			if (err)
2193				goto out;
2194			goto repeat;
2195		}
2196
2197		equal = i->sector == sector && i->size == size;
2198		if (resolve_conflicts) {
2199			/*
2200			 * If the peer request is fully contained within the
2201			 * overlapping request, it can be considered overwritten
2202			 * and thus superseded; otherwise, it will be retried
2203			 * once all overlapping requests have completed.
2204			 */
2205			bool superseded = i->sector <= sector && i->sector +
2206				       (i->size >> 9) >= sector + (size >> 9);
2207
2208			if (!equal)
2209				drbd_alert(device, "Concurrent writes detected: "
2210					       "local=%llus +%u, remote=%llus +%u, "
2211					       "assuming %s came first\n",
2212					  (unsigned long long)i->sector, i->size,
2213					  (unsigned long long)sector, size,
2214					  superseded ? "local" : "remote");
2215
2216			peer_req->w.cb = superseded ? e_send_superseded :
2217						   e_send_retry_write;
2218			list_add_tail(&peer_req->w.list, &device->done_ee);
2219			wake_asender(connection);
2220
2221			err = -ENOENT;
2222			goto out;
2223		} else {
2224			struct drbd_request *req =
2225				container_of(i, struct drbd_request, i);
2226
2227			if (!equal)
2228				drbd_alert(device, "Concurrent writes detected: "
2229					       "local=%llus +%u, remote=%llus +%u\n",
2230					  (unsigned long long)i->sector, i->size,
2231					  (unsigned long long)sector, size);
2232
2233			if (req->rq_state & RQ_LOCAL_PENDING ||
2234			    !(req->rq_state & RQ_POSTPONED)) {
2235				/*
2236				 * Wait for the node with the discard flag to
2237				 * decide if this request has been superseded
2238				 * or needs to be retried.
2239				 * Requests that have been superseded will
2240				 * disappear from the write_requests tree.
2241				 *
2242				 * In addition, wait for the conflicting
2243				 * request to finish locally before submitting
2244				 * the conflicting peer request.
2245				 */
2246				err = drbd_wait_misc(device, &req->i);
2247				if (err) {
2248					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2249					fail_postponed_requests(device, sector, size);
2250					goto out;
2251				}
2252				goto repeat;
2253			}
2254			/*
2255			 * Remember to restart the conflicting requests after
2256			 * the new peer request has completed.
2257			 */
2258			peer_req->flags |= EE_RESTART_REQUESTS;
2259		}
2260	}
2261	err = 0;
2262
2263    out:
2264	if (err)
2265		drbd_remove_epoch_entry_interval(device, peer_req);
2266	return err;
2267}
2268
2269/* mirrored write */
2270static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2271{
2272	struct drbd_peer_device *peer_device;
2273	struct drbd_device *device;
2274	struct net_conf *nc;
2275	sector_t sector;
2276	struct drbd_peer_request *peer_req;
2277	struct p_data *p = pi->data;
2278	u32 peer_seq = be32_to_cpu(p->seq_num);
2279	int rw = WRITE;
2280	u32 dp_flags;
2281	int err, tp;
2282
2283	peer_device = conn_peer_device(connection, pi->vnr);
2284	if (!peer_device)
2285		return -EIO;
2286	device = peer_device->device;
2287
2288	if (!get_ldev(device)) {
2289		int err2;
2290
2291		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2292		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2293		atomic_inc(&connection->current_epoch->epoch_size);
2294		err2 = drbd_drain_block(peer_device, pi->size);
2295		if (!err)
2296			err = err2;
2297		return err;
2298	}
2299
2300	/*
2301	 * Corresponding put_ldev done either below (on various errors), or in
2302	 * drbd_peer_request_endio, if we successfully submit the data at the
2303	 * end of this function.
2304	 */
2305
2306	sector = be64_to_cpu(p->sector);
2307	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2308	if (!peer_req) {
2309		put_ldev(device);
2310		return -EIO;
2311	}
2312
2313	peer_req->w.cb = e_end_block;
2314	peer_req->submit_jif = jiffies;
2315	peer_req->flags |= EE_APPLICATION;
2316
2317	dp_flags = be32_to_cpu(p->dp_flags);
2318	rw |= wire_flags_to_bio(dp_flags);
2319	if (pi->cmd == P_TRIM) {
2320		struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2321		peer_req->flags |= EE_IS_TRIM;
2322		if (!blk_queue_discard(q))
2323			peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2324		D_ASSERT(peer_device, peer_req->i.size > 0);
2325		D_ASSERT(peer_device, rw & REQ_DISCARD);
2326		D_ASSERT(peer_device, peer_req->pages == NULL);
2327	} else if (peer_req->pages == NULL) {
2328		D_ASSERT(device, peer_req->i.size == 0);
2329		D_ASSERT(device, dp_flags & DP_FLUSH);
2330	}
2331
2332	if (dp_flags & DP_MAY_SET_IN_SYNC)
2333		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2334
2335	spin_lock(&connection->epoch_lock);
2336	peer_req->epoch = connection->current_epoch;
2337	atomic_inc(&peer_req->epoch->epoch_size);
2338	atomic_inc(&peer_req->epoch->active);
2339	spin_unlock(&connection->epoch_lock);
2340
2341	rcu_read_lock();
2342	nc = rcu_dereference(peer_device->connection->net_conf);
2343	tp = nc->two_primaries;
2344	if (peer_device->connection->agreed_pro_version < 100) {
2345		switch (nc->wire_protocol) {
2346		case DRBD_PROT_C:
2347			dp_flags |= DP_SEND_WRITE_ACK;
2348			break;
2349		case DRBD_PROT_B:
2350			dp_flags |= DP_SEND_RECEIVE_ACK;
2351			break;
2352		}
2353	}
2354	rcu_read_unlock();
2355
2356	if (dp_flags & DP_SEND_WRITE_ACK) {
2357		peer_req->flags |= EE_SEND_WRITE_ACK;
2358		inc_unacked(device);
2359		/* corresponding dec_unacked() in e_end_block()
2360		 * respective _drbd_clear_done_ee */
2361	}
2362
2363	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2364		/* I really don't like it that the receiver thread
2365		 * sends on the msock, but anyways */
2366		drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2367	}
2368
2369	if (tp) {
2370		/* two primaries implies protocol C */
2371		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2372		peer_req->flags |= EE_IN_INTERVAL_TREE;
2373		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2374		if (err)
2375			goto out_interrupted;
2376		spin_lock_irq(&device->resource->req_lock);
2377		err = handle_write_conflicts(device, peer_req);
2378		if (err) {
2379			spin_unlock_irq(&device->resource->req_lock);
2380			if (err == -ENOENT) {
2381				put_ldev(device);
2382				return 0;
2383			}
2384			goto out_interrupted;
2385		}
2386	} else {
2387		update_peer_seq(peer_device, peer_seq);
2388		spin_lock_irq(&device->resource->req_lock);
2389	}
2390	/* if we use the zeroout fallback code, we process synchronously
2391	 * and we wait for all pending requests, respectively wait for
2392	 * active_ee to become empty in drbd_submit_peer_request();
2393	 * better not add ourselves here. */
2394	if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2395		list_add_tail(&peer_req->w.list, &device->active_ee);
2396	spin_unlock_irq(&device->resource->req_lock);
2397
2398	if (device->state.conn == C_SYNC_TARGET)
2399		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2400
2401	if (device->state.pdsk < D_INCONSISTENT) {
2402		/* In case we have the only disk of the cluster, */
2403		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2404		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2405		drbd_al_begin_io(device, &peer_req->i);
2406		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2407	}
2408
2409	err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2410	if (!err)
2411		return 0;
2412
2413	/* don't care for the reason here */
2414	drbd_err(device, "submit failed, triggering re-connect\n");
2415	spin_lock_irq(&device->resource->req_lock);
2416	list_del(&peer_req->w.list);
2417	drbd_remove_epoch_entry_interval(device, peer_req);
2418	spin_unlock_irq(&device->resource->req_lock);
2419	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2420		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2421		drbd_al_complete_io(device, &peer_req->i);
2422	}
2423
2424out_interrupted:
2425	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2426	put_ldev(device);
2427	drbd_free_peer_req(device, peer_req);
2428	return err;
2429}
2430
2431/* We may throttle resync, if the lower device seems to be busy,
2432 * and current sync rate is above c_min_rate.
2433 *
2434 * To decide whether or not the lower device is busy, we use a scheme similar
2435 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2436 * (more than 64 sectors) of activity we cannot account for with our own resync
2437 * activity, it obviously is "busy".
2438 *
2439 * The current sync rate used here uses only the most recent two step marks,
2440 * to have a short time average so we can react faster.
2441 */
2442bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2443		bool throttle_if_app_is_waiting)
2444{
2445	struct lc_element *tmp;
2446	bool throttle = drbd_rs_c_min_rate_throttle(device);
2447
2448	if (!throttle || throttle_if_app_is_waiting)
2449		return throttle;
2450
2451	spin_lock_irq(&device->al_lock);
2452	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2453	if (tmp) {
2454		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2455		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2456			throttle = false;
2457		/* Do not slow down if app IO is already waiting for this extent,
2458		 * and our progress is necessary for application IO to complete. */
2459	}
2460	spin_unlock_irq(&device->al_lock);
2461
2462	return throttle;
2463}
2464
2465bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2466{
2467	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2468	unsigned long db, dt, dbdt;
2469	unsigned int c_min_rate;
2470	int curr_events;
2471
2472	rcu_read_lock();
2473	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2474	rcu_read_unlock();
2475
2476	/* feature disabled? */
2477	if (c_min_rate == 0)
2478		return false;
2479
2480	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2481		      (int)part_stat_read(&disk->part0, sectors[1]) -
2482			atomic_read(&device->rs_sect_ev);
2483
2484	if (atomic_read(&device->ap_actlog_cnt)
2485	    || !device->rs_last_events || curr_events - device->rs_last_events > 64) {
2486		unsigned long rs_left;
2487		int i;
2488
2489		device->rs_last_events = curr_events;
2490
2491		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2492		 * approx. */
2493		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2494
2495		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2496			rs_left = device->ov_left;
2497		else
2498			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2499
2500		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2501		if (!dt)
2502			dt++;
2503		db = device->rs_mark_left[i] - rs_left;
2504		dbdt = Bit2KB(db/dt);
2505
2506		if (dbdt > c_min_rate)
2507			return true;
2508	}
2509	return false;
2510}
2511
2512static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2513{
2514	struct drbd_peer_device *peer_device;
2515	struct drbd_device *device;
2516	sector_t sector;
2517	sector_t capacity;
2518	struct drbd_peer_request *peer_req;
2519	struct digest_info *di = NULL;
2520	int size, verb;
2521	unsigned int fault_type;
2522	struct p_block_req *p =	pi->data;
2523
2524	peer_device = conn_peer_device(connection, pi->vnr);
2525	if (!peer_device)
2526		return -EIO;
2527	device = peer_device->device;
2528	capacity = drbd_get_capacity(device->this_bdev);
2529
2530	sector = be64_to_cpu(p->sector);
2531	size   = be32_to_cpu(p->blksize);
2532
2533	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2534		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2535				(unsigned long long)sector, size);
2536		return -EINVAL;
2537	}
2538	if (sector + (size>>9) > capacity) {
2539		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2540				(unsigned long long)sector, size);
2541		return -EINVAL;
2542	}
2543
2544	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2545		verb = 1;
2546		switch (pi->cmd) {
2547		case P_DATA_REQUEST:
2548			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2549			break;
2550		case P_RS_DATA_REQUEST:
2551		case P_CSUM_RS_REQUEST:
2552		case P_OV_REQUEST:
2553			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2554			break;
2555		case P_OV_REPLY:
2556			verb = 0;
2557			dec_rs_pending(device);
2558			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2559			break;
2560		default:
2561			BUG();
2562		}
2563		if (verb && __ratelimit(&drbd_ratelimit_state))
2564			drbd_err(device, "Can not satisfy peer's read request, "
2565			    "no local data.\n");
2566
2567		/* drain possibly payload */
2568		return drbd_drain_block(peer_device, pi->size);
2569	}
2570
2571	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2572	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2573	 * which in turn might block on the other node at this very place.  */
2574	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2575			true /* has real payload */, GFP_NOIO);
2576	if (!peer_req) {
2577		put_ldev(device);
2578		return -ENOMEM;
2579	}
2580
2581	switch (pi->cmd) {
2582	case P_DATA_REQUEST:
2583		peer_req->w.cb = w_e_end_data_req;
2584		fault_type = DRBD_FAULT_DT_RD;
2585		/* application IO, don't drbd_rs_begin_io */
2586		peer_req->flags |= EE_APPLICATION;
2587		goto submit;
2588
2589	case P_RS_DATA_REQUEST:
2590		peer_req->w.cb = w_e_end_rsdata_req;
2591		fault_type = DRBD_FAULT_RS_RD;
2592		/* used in the sector offset progress display */
2593		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2594		break;
2595
2596	case P_OV_REPLY:
2597	case P_CSUM_RS_REQUEST:
2598		fault_type = DRBD_FAULT_RS_RD;
2599		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2600		if (!di)
2601			goto out_free_e;
2602
2603		di->digest_size = pi->size;
2604		di->digest = (((char *)di)+sizeof(struct digest_info));
2605
2606		peer_req->digest = di;
2607		peer_req->flags |= EE_HAS_DIGEST;
2608
2609		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2610			goto out_free_e;
2611
2612		if (pi->cmd == P_CSUM_RS_REQUEST) {
2613			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2614			peer_req->w.cb = w_e_end_csum_rs_req;
2615			/* used in the sector offset progress display */
2616			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2617			/* remember to report stats in drbd_resync_finished */
2618			device->use_csums = true;
2619		} else if (pi->cmd == P_OV_REPLY) {
2620			/* track progress, we may need to throttle */
2621			atomic_add(size >> 9, &device->rs_sect_in);
2622			peer_req->w.cb = w_e_end_ov_reply;
2623			dec_rs_pending(device);
2624			/* drbd_rs_begin_io done when we sent this request,
2625			 * but accounting still needs to be done. */
2626			goto submit_for_resync;
2627		}
2628		break;
2629
2630	case P_OV_REQUEST:
2631		if (device->ov_start_sector == ~(sector_t)0 &&
2632		    peer_device->connection->agreed_pro_version >= 90) {
2633			unsigned long now = jiffies;
2634			int i;
2635			device->ov_start_sector = sector;
2636			device->ov_position = sector;
2637			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2638			device->rs_total = device->ov_left;
2639			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2640				device->rs_mark_left[i] = device->ov_left;
2641				device->rs_mark_time[i] = now;
2642			}
2643			drbd_info(device, "Online Verify start sector: %llu\n",
2644					(unsigned long long)sector);
2645		}
2646		peer_req->w.cb = w_e_end_ov_req;
2647		fault_type = DRBD_FAULT_RS_RD;
2648		break;
2649
2650	default:
2651		BUG();
2652	}
2653
2654	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2655	 * wrt the receiver, but it is not as straightforward as it may seem.
2656	 * Various places in the resync start and stop logic assume resync
2657	 * requests are processed in order, requeuing this on the worker thread
2658	 * introduces a bunch of new code for synchronization between threads.
2659	 *
2660	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2661	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2662	 * for application writes for the same time.  For now, just throttle
2663	 * here, where the rest of the code expects the receiver to sleep for
2664	 * a while, anyways.
2665	 */
2666
2667	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2668	 * this defers syncer requests for some time, before letting at least
2669	 * on request through.  The resync controller on the receiving side
2670	 * will adapt to the incoming rate accordingly.
2671	 *
2672	 * We cannot throttle here if remote is Primary/SyncTarget:
2673	 * we would also throttle its application reads.
2674	 * In that case, throttling is done on the SyncTarget only.
2675	 */
2676
2677	/* Even though this may be a resync request, we do add to "read_ee";
2678	 * "sync_ee" is only used for resync WRITEs.
2679	 * Add to list early, so debugfs can find this request
2680	 * even if we have to sleep below. */
2681	spin_lock_irq(&device->resource->req_lock);
2682	list_add_tail(&peer_req->w.list, &device->read_ee);
2683	spin_unlock_irq(&device->resource->req_lock);
2684
2685	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2686	if (device->state.peer != R_PRIMARY
2687	&& drbd_rs_should_slow_down(device, sector, false))
2688		schedule_timeout_uninterruptible(HZ/10);
2689	update_receiver_timing_details(connection, drbd_rs_begin_io);
2690	if (drbd_rs_begin_io(device, sector))
2691		goto out_free_e;
2692
2693submit_for_resync:
2694	atomic_add(size >> 9, &device->rs_sect_ev);
2695
2696submit:
2697	update_receiver_timing_details(connection, drbd_submit_peer_request);
2698	inc_unacked(device);
2699	if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2700		return 0;
2701
2702	/* don't care for the reason here */
2703	drbd_err(device, "submit failed, triggering re-connect\n");
2704
2705out_free_e:
2706	spin_lock_irq(&device->resource->req_lock);
2707	list_del(&peer_req->w.list);
2708	spin_unlock_irq(&device->resource->req_lock);
2709	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2710
2711	put_ldev(device);
2712	drbd_free_peer_req(device, peer_req);
2713	return -EIO;
2714}
2715
2716/**
2717 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2718 */
2719static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2720{
2721	struct drbd_device *device = peer_device->device;
2722	int self, peer, rv = -100;
2723	unsigned long ch_self, ch_peer;
2724	enum drbd_after_sb_p after_sb_0p;
2725
2726	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2727	peer = device->p_uuid[UI_BITMAP] & 1;
2728
2729	ch_peer = device->p_uuid[UI_SIZE];
2730	ch_self = device->comm_bm_set;
2731
2732	rcu_read_lock();
2733	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2734	rcu_read_unlock();
2735	switch (after_sb_0p) {
2736	case ASB_CONSENSUS:
2737	case ASB_DISCARD_SECONDARY:
2738	case ASB_CALL_HELPER:
2739	case ASB_VIOLENTLY:
2740		drbd_err(device, "Configuration error.\n");
2741		break;
2742	case ASB_DISCONNECT:
2743		break;
2744	case ASB_DISCARD_YOUNGER_PRI:
2745		if (self == 0 && peer == 1) {
2746			rv = -1;
2747			break;
2748		}
2749		if (self == 1 && peer == 0) {
2750			rv =  1;
2751			break;
2752		}
2753		/* Else fall through to one of the other strategies... */
2754	case ASB_DISCARD_OLDER_PRI:
2755		if (self == 0 && peer == 1) {
2756			rv = 1;
2757			break;
2758		}
2759		if (self == 1 && peer == 0) {
2760			rv = -1;
2761			break;
2762		}
2763		/* Else fall through to one of the other strategies... */
2764		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2765		     "Using discard-least-changes instead\n");
2766	case ASB_DISCARD_ZERO_CHG:
2767		if (ch_peer == 0 && ch_self == 0) {
2768			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2769				? -1 : 1;
2770			break;
2771		} else {
2772			if (ch_peer == 0) { rv =  1; break; }
2773			if (ch_self == 0) { rv = -1; break; }
2774		}
2775		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2776			break;
2777	case ASB_DISCARD_LEAST_CHG:
2778		if	(ch_self < ch_peer)
2779			rv = -1;
2780		else if (ch_self > ch_peer)
2781			rv =  1;
2782		else /* ( ch_self == ch_peer ) */
2783		     /* Well, then use something else. */
2784			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2785				? -1 : 1;
2786		break;
2787	case ASB_DISCARD_LOCAL:
2788		rv = -1;
2789		break;
2790	case ASB_DISCARD_REMOTE:
2791		rv =  1;
2792	}
2793
2794	return rv;
2795}
2796
2797/**
2798 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2799 */
2800static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2801{
2802	struct drbd_device *device = peer_device->device;
2803	int hg, rv = -100;
2804	enum drbd_after_sb_p after_sb_1p;
2805
2806	rcu_read_lock();
2807	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2808	rcu_read_unlock();
2809	switch (after_sb_1p) {
2810	case ASB_DISCARD_YOUNGER_PRI:
2811	case ASB_DISCARD_OLDER_PRI:
2812	case ASB_DISCARD_LEAST_CHG:
2813	case ASB_DISCARD_LOCAL:
2814	case ASB_DISCARD_REMOTE:
2815	case ASB_DISCARD_ZERO_CHG:
2816		drbd_err(device, "Configuration error.\n");
2817		break;
2818	case ASB_DISCONNECT:
2819		break;
2820	case ASB_CONSENSUS:
2821		hg = drbd_asb_recover_0p(peer_device);
2822		if (hg == -1 && device->state.role == R_SECONDARY)
2823			rv = hg;
2824		if (hg == 1  && device->state.role == R_PRIMARY)
2825			rv = hg;
2826		break;
2827	case ASB_VIOLENTLY:
2828		rv = drbd_asb_recover_0p(peer_device);
2829		break;
2830	case ASB_DISCARD_SECONDARY:
2831		return device->state.role == R_PRIMARY ? 1 : -1;
2832	case ASB_CALL_HELPER:
2833		hg = drbd_asb_recover_0p(peer_device);
2834		if (hg == -1 && device->state.role == R_PRIMARY) {
2835			enum drbd_state_rv rv2;
2836
2837			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2838			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2839			  * we do not need to wait for the after state change work either. */
2840			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2841			if (rv2 != SS_SUCCESS) {
2842				drbd_khelper(device, "pri-lost-after-sb");
2843			} else {
2844				drbd_warn(device, "Successfully gave up primary role.\n");
2845				rv = hg;
2846			}
2847		} else
2848			rv = hg;
2849	}
2850
2851	return rv;
2852}
2853
2854/**
2855 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2856 */
2857static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2858{
2859	struct drbd_device *device = peer_device->device;
2860	int hg, rv = -100;
2861	enum drbd_after_sb_p after_sb_2p;
2862
2863	rcu_read_lock();
2864	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2865	rcu_read_unlock();
2866	switch (after_sb_2p) {
2867	case ASB_DISCARD_YOUNGER_PRI:
2868	case ASB_DISCARD_OLDER_PRI:
2869	case ASB_DISCARD_LEAST_CHG:
2870	case ASB_DISCARD_LOCAL:
2871	case ASB_DISCARD_REMOTE:
2872	case ASB_CONSENSUS:
2873	case ASB_DISCARD_SECONDARY:
2874	case ASB_DISCARD_ZERO_CHG:
2875		drbd_err(device, "Configuration error.\n");
2876		break;
2877	case ASB_VIOLENTLY:
2878		rv = drbd_asb_recover_0p(peer_device);
2879		break;
2880	case ASB_DISCONNECT:
2881		break;
2882	case ASB_CALL_HELPER:
2883		hg = drbd_asb_recover_0p(peer_device);
2884		if (hg == -1) {
2885			enum drbd_state_rv rv2;
2886
2887			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2888			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2889			  * we do not need to wait for the after state change work either. */
2890			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2891			if (rv2 != SS_SUCCESS) {
2892				drbd_khelper(device, "pri-lost-after-sb");
2893			} else {
2894				drbd_warn(device, "Successfully gave up primary role.\n");
2895				rv = hg;
2896			}
2897		} else
2898			rv = hg;
2899	}
2900
2901	return rv;
2902}
2903
2904static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2905			   u64 bits, u64 flags)
2906{
2907	if (!uuid) {
2908		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2909		return;
2910	}
2911	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2912	     text,
2913	     (unsigned long long)uuid[UI_CURRENT],
2914	     (unsigned long long)uuid[UI_BITMAP],
2915	     (unsigned long long)uuid[UI_HISTORY_START],
2916	     (unsigned long long)uuid[UI_HISTORY_END],
2917	     (unsigned long long)bits,
2918	     (unsigned long long)flags);
2919}
2920
2921/*
2922  100	after split brain try auto recover
2923    2	C_SYNC_SOURCE set BitMap
2924    1	C_SYNC_SOURCE use BitMap
2925    0	no Sync
2926   -1	C_SYNC_TARGET use BitMap
2927   -2	C_SYNC_TARGET set BitMap
2928 -100	after split brain, disconnect
2929-1000	unrelated data
2930-1091   requires proto 91
2931-1096   requires proto 96
2932 */
2933static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2934{
2935	struct drbd_peer_device *const peer_device = first_peer_device(device);
2936	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2937	u64 self, peer;
2938	int i, j;
2939
2940	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2941	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2942
2943	*rule_nr = 10;
2944	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2945		return 0;
2946
2947	*rule_nr = 20;
2948	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2949	     peer != UUID_JUST_CREATED)
2950		return -2;
2951
2952	*rule_nr = 30;
2953	if (self != UUID_JUST_CREATED &&
2954	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2955		return 2;
2956
2957	if (self == peer) {
2958		int rct, dc; /* roles at crash time */
2959
2960		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2961
2962			if (connection->agreed_pro_version < 91)
2963				return -1091;
2964
2965			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2966			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2967				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2968				drbd_uuid_move_history(device);
2969				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2970				device->ldev->md.uuid[UI_BITMAP] = 0;
2971
2972				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2973					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2974				*rule_nr = 34;
2975			} else {
2976				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2977				*rule_nr = 36;
2978			}
2979
2980			return 1;
2981		}
2982
2983		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2984
2985			if (connection->agreed_pro_version < 91)
2986				return -1091;
2987
2988			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2989			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2990				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2991
2992				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2993				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2994				device->p_uuid[UI_BITMAP] = 0UL;
2995
2996				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2997				*rule_nr = 35;
2998			} else {
2999				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3000				*rule_nr = 37;
3001			}
3002
3003			return -1;
3004		}
3005
3006		/* Common power [off|failure] */
3007		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3008			(device->p_uuid[UI_FLAGS] & 2);
3009		/* lowest bit is set when we were primary,
3010		 * next bit (weight 2) is set when peer was primary */
3011		*rule_nr = 40;
3012
3013		switch (rct) {
3014		case 0: /* !self_pri && !peer_pri */ return 0;
3015		case 1: /*  self_pri && !peer_pri */ return 1;
3016		case 2: /* !self_pri &&  peer_pri */ return -1;
3017		case 3: /*  self_pri &&  peer_pri */
3018			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3019			return dc ? -1 : 1;
3020		}
3021	}
3022
3023	*rule_nr = 50;
3024	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3025	if (self == peer)
3026		return -1;
3027
3028	*rule_nr = 51;
3029	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3030	if (self == peer) {
3031		if (connection->agreed_pro_version < 96 ?
3032		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3033		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3034		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3035			/* The last P_SYNC_UUID did not get though. Undo the last start of
3036			   resync as sync source modifications of the peer's UUIDs. */
3037
3038			if (connection->agreed_pro_version < 91)
3039				return -1091;
3040
3041			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3042			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3043
3044			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3045			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3046
3047			return -1;
3048		}
3049	}
3050
3051	*rule_nr = 60;
3052	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3053	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3054		peer = device->p_uuid[i] & ~((u64)1);
3055		if (self == peer)
3056			return -2;
3057	}
3058
3059	*rule_nr = 70;
3060	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3061	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3062	if (self == peer)
3063		return 1;
3064
3065	*rule_nr = 71;
3066	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3067	if (self == peer) {
3068		if (connection->agreed_pro_version < 96 ?
3069		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3070		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3071		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3072			/* The last P_SYNC_UUID did not get though. Undo the last start of
3073			   resync as sync source modifications of our UUIDs. */
3074
3075			if (connection->agreed_pro_version < 91)
3076				return -1091;
3077
3078			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3079			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3080
3081			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3082			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3083				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3084
3085			return 1;
3086		}
3087	}
3088
3089
3090	*rule_nr = 80;
3091	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3092	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3093		self = device->ldev->md.uuid[i] & ~((u64)1);
3094		if (self == peer)
3095			return 2;
3096	}
3097
3098	*rule_nr = 90;
3099	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3100	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3101	if (self == peer && self != ((u64)0))
3102		return 100;
3103
3104	*rule_nr = 100;
3105	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3106		self = device->ldev->md.uuid[i] & ~((u64)1);
3107		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3108			peer = device->p_uuid[j] & ~((u64)1);
3109			if (self == peer)
3110				return -100;
3111		}
3112	}
3113
3114	return -1000;
3115}
3116
3117/* drbd_sync_handshake() returns the new conn state on success, or
3118   CONN_MASK (-1) on failure.
3119 */
3120static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3121					   enum drbd_role peer_role,
3122					   enum drbd_disk_state peer_disk) __must_hold(local)
3123{
3124	struct drbd_device *device = peer_device->device;
3125	enum drbd_conns rv = C_MASK;
3126	enum drbd_disk_state mydisk;
3127	struct net_conf *nc;
3128	int hg, rule_nr, rr_conflict, tentative;
3129
3130	mydisk = device->state.disk;
3131	if (mydisk == D_NEGOTIATING)
3132		mydisk = device->new_state_tmp.disk;
3133
3134	drbd_info(device, "drbd_sync_handshake:\n");
3135
3136	spin_lock_irq(&device->ldev->md.uuid_lock);
3137	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3138	drbd_uuid_dump(device, "peer", device->p_uuid,
3139		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3140
3141	hg = drbd_uuid_compare(device, &rule_nr);
3142	spin_unlock_irq(&device->ldev->md.uuid_lock);
3143
3144	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3145
3146	if (hg == -1000) {
3147		drbd_alert(device, "Unrelated data, aborting!\n");
3148		return C_MASK;
3149	}
3150	if (hg < -1000) {
3151		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3152		return C_MASK;
3153	}
3154
3155	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3156	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3157		int f = (hg == -100) || abs(hg) == 2;
3158		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3159		if (f)
3160			hg = hg*2;
3161		drbd_info(device, "Becoming sync %s due to disk states.\n",
3162		     hg > 0 ? "source" : "target");
3163	}
3164
3165	if (abs(hg) == 100)
3166		drbd_khelper(device, "initial-split-brain");
3167
3168	rcu_read_lock();
3169	nc = rcu_dereference(peer_device->connection->net_conf);
3170
3171	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3172		int pcount = (device->state.role == R_PRIMARY)
3173			   + (peer_role == R_PRIMARY);
3174		int forced = (hg == -100);
3175
3176		switch (pcount) {
3177		case 0:
3178			hg = drbd_asb_recover_0p(peer_device);
3179			break;
3180		case 1:
3181			hg = drbd_asb_recover_1p(peer_device);
3182			break;
3183		case 2:
3184			hg = drbd_asb_recover_2p(peer_device);
3185			break;
3186		}
3187		if (abs(hg) < 100) {
3188			drbd_warn(device, "Split-Brain detected, %d primaries, "
3189			     "automatically solved. Sync from %s node\n",
3190			     pcount, (hg < 0) ? "peer" : "this");
3191			if (forced) {
3192				drbd_warn(device, "Doing a full sync, since"
3193				     " UUIDs where ambiguous.\n");
3194				hg = hg*2;
3195			}
3196		}
3197	}
3198
3199	if (hg == -100) {
3200		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3201			hg = -1;
3202		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3203			hg = 1;
3204
3205		if (abs(hg) < 100)
3206			drbd_warn(device, "Split-Brain detected, manually solved. "
3207			     "Sync from %s node\n",
3208			     (hg < 0) ? "peer" : "this");
3209	}
3210	rr_conflict = nc->rr_conflict;
3211	tentative = nc->tentative;
3212	rcu_read_unlock();
3213
3214	if (hg == -100) {
3215		/* FIXME this log message is not correct if we end up here
3216		 * after an attempted attach on a diskless node.
3217		 * We just refuse to attach -- well, we drop the "connection"
3218		 * to that disk, in a way... */
3219		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3220		drbd_khelper(device, "split-brain");
3221		return C_MASK;
3222	}
3223
3224	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3225		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3226		return C_MASK;
3227	}
3228
3229	if (hg < 0 && /* by intention we do not use mydisk here. */
3230	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3231		switch (rr_conflict) {
3232		case ASB_CALL_HELPER:
3233			drbd_khelper(device, "pri-lost");
3234			/* fall through */
3235		case ASB_DISCONNECT:
3236			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3237			return C_MASK;
3238		case ASB_VIOLENTLY:
3239			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3240			     "assumption\n");
3241		}
3242	}
3243
3244	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3245		if (hg == 0)
3246			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3247		else
3248			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3249				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3250				 abs(hg) >= 2 ? "full" : "bit-map based");
3251		return C_MASK;
3252	}
3253
3254	if (abs(hg) >= 2) {
3255		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3256		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3257					BM_LOCKED_SET_ALLOWED))
3258			return C_MASK;
3259	}
3260
3261	if (hg > 0) { /* become sync source. */
3262		rv = C_WF_BITMAP_S;
3263	} else if (hg < 0) { /* become sync target */
3264		rv = C_WF_BITMAP_T;
3265	} else {
3266		rv = C_CONNECTED;
3267		if (drbd_bm_total_weight(device)) {
3268			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3269			     drbd_bm_total_weight(device));
3270		}
3271	}
3272
3273	return rv;
3274}
3275
3276static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3277{
3278	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3279	if (peer == ASB_DISCARD_REMOTE)
3280		return ASB_DISCARD_LOCAL;
3281
3282	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3283	if (peer == ASB_DISCARD_LOCAL)
3284		return ASB_DISCARD_REMOTE;
3285
3286	/* everything else is valid if they are equal on both sides. */
3287	return peer;
3288}
3289
3290static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3291{
3292	struct p_protocol *p = pi->data;
3293	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3294	int p_proto, p_discard_my_data, p_two_primaries, cf;
3295	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3296	char integrity_alg[SHARED_SECRET_MAX] = "";
3297	struct crypto_hash *peer_integrity_tfm = NULL;
3298	void *int_dig_in = NULL, *int_dig_vv = NULL;
3299
3300	p_proto		= be32_to_cpu(p->protocol);
3301	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3302	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3303	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3304	p_two_primaries = be32_to_cpu(p->two_primaries);
3305	cf		= be32_to_cpu(p->conn_flags);
3306	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3307
3308	if (connection->agreed_pro_version >= 87) {
3309		int err;
3310
3311		if (pi->size > sizeof(integrity_alg))
3312			return -EIO;
3313		err = drbd_recv_all(connection, integrity_alg, pi->size);
3314		if (err)
3315			return err;
3316		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3317	}
3318
3319	if (pi->cmd != P_PROTOCOL_UPDATE) {
3320		clear_bit(CONN_DRY_RUN, &connection->flags);
3321
3322		if (cf & CF_DRY_RUN)
3323			set_bit(CONN_DRY_RUN, &connection->flags);
3324
3325		rcu_read_lock();
3326		nc = rcu_dereference(connection->net_conf);
3327
3328		if (p_proto != nc->wire_protocol) {
3329			drbd_err(connection, "incompatible %s settings\n", "protocol");
3330			goto disconnect_rcu_unlock;
3331		}
3332
3333		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3334			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3335			goto disconnect_rcu_unlock;
3336		}
3337
3338		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3339			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3340			goto disconnect_rcu_unlock;
3341		}
3342
3343		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3344			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3345			goto disconnect_rcu_unlock;
3346		}
3347
3348		if (p_discard_my_data && nc->discard_my_data) {
3349			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3350			goto disconnect_rcu_unlock;
3351		}
3352
3353		if (p_two_primaries != nc->two_primaries) {
3354			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3355			goto disconnect_rcu_unlock;
3356		}
3357
3358		if (strcmp(integrity_alg, nc->integrity_alg)) {
3359			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3360			goto disconnect_rcu_unlock;
3361		}
3362
3363		rcu_read_unlock();
3364	}
3365
3366	if (integrity_alg[0]) {
3367		int hash_size;
3368
3369		/*
3370		 * We can only change the peer data integrity algorithm
3371		 * here.  Changing our own data integrity algorithm
3372		 * requires that we send a P_PROTOCOL_UPDATE packet at
3373		 * the same time; otherwise, the peer has no way to
3374		 * tell between which packets the algorithm should
3375		 * change.
3376		 */
3377
3378		peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3379		if (!peer_integrity_tfm) {
3380			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3381				 integrity_alg);
3382			goto disconnect;
3383		}
3384
3385		hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3386		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3387		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3388		if (!(int_dig_in && int_dig_vv)) {
3389			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3390			goto disconnect;
3391		}
3392	}
3393
3394	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3395	if (!new_net_conf) {
3396		drbd_err(connection, "Allocation of new net_conf failed\n");
3397		goto disconnect;
3398	}
3399
3400	mutex_lock(&connection->data.mutex);
3401	mutex_lock(&connection->resource->conf_update);
3402	old_net_conf = connection->net_conf;
3403	*new_net_conf = *old_net_conf;
3404
3405	new_net_conf->wire_protocol = p_proto;
3406	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3407	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3408	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3409	new_net_conf->two_primaries = p_two_primaries;
3410
3411	rcu_assign_pointer(connection->net_conf, new_net_conf);
3412	mutex_unlock(&connection->resource->conf_update);
3413	mutex_unlock(&connection->data.mutex);
3414
3415	crypto_free_hash(connection->peer_integrity_tfm);
3416	kfree(connection->int_dig_in);
3417	kfree(connection->int_dig_vv);
3418	connection->peer_integrity_tfm = peer_integrity_tfm;
3419	connection->int_dig_in = int_dig_in;
3420	connection->int_dig_vv = int_dig_vv;
3421
3422	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3423		drbd_info(connection, "peer data-integrity-alg: %s\n",
3424			  integrity_alg[0] ? integrity_alg : "(none)");
3425
3426	synchronize_rcu();
3427	kfree(old_net_conf);
3428	return 0;
3429
3430disconnect_rcu_unlock:
3431	rcu_read_unlock();
3432disconnect:
3433	crypto_free_hash(peer_integrity_tfm);
3434	kfree(int_dig_in);
3435	kfree(int_dig_vv);
3436	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3437	return -EIO;
3438}
3439
3440/* helper function
3441 * input: alg name, feature name
3442 * return: NULL (alg name was "")
3443 *         ERR_PTR(error) if something goes wrong
3444 *         or the crypto hash ptr, if it worked out ok. */
3445static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3446		const char *alg, const char *name)
3447{
3448	struct crypto_hash *tfm;
3449
3450	if (!alg[0])
3451		return NULL;
3452
3453	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3454	if (IS_ERR(tfm)) {
3455		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3456			alg, name, PTR_ERR(tfm));
3457		return tfm;
3458	}
3459	return tfm;
3460}
3461
3462static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3463{
3464	void *buffer = connection->data.rbuf;
3465	int size = pi->size;
3466
3467	while (size) {
3468		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3469		s = drbd_recv(connection, buffer, s);
3470		if (s <= 0) {
3471			if (s < 0)
3472				return s;
3473			break;
3474		}
3475		size -= s;
3476	}
3477	if (size)
3478		return -EIO;
3479	return 0;
3480}
3481
3482/*
3483 * config_unknown_volume  -  device configuration command for unknown volume
3484 *
3485 * When a device is added to an existing connection, the node on which the
3486 * device is added first will send configuration commands to its peer but the
3487 * peer will not know about the device yet.  It will warn and ignore these
3488 * commands.  Once the device is added on the second node, the second node will
3489 * send the same device configuration commands, but in the other direction.
3490 *
3491 * (We can also end up here if drbd is misconfigured.)
3492 */
3493static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3494{
3495	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3496		  cmdname(pi->cmd), pi->vnr);
3497	return ignore_remaining_packet(connection, pi);
3498}
3499
3500static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3501{
3502	struct drbd_peer_device *peer_device;
3503	struct drbd_device *device;
3504	struct p_rs_param_95 *p;
3505	unsigned int header_size, data_size, exp_max_sz;
3506	struct crypto_hash *verify_tfm = NULL;
3507	struct crypto_hash *csums_tfm = NULL;
3508	struct net_conf *old_net_conf, *new_net_conf = NULL;
3509	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3510	const int apv = connection->agreed_pro_version;
3511	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3512	int fifo_size = 0;
3513	int err;
3514
3515	peer_device = conn_peer_device(connection, pi->vnr);
3516	if (!peer_device)
3517		return config_unknown_volume(connection, pi);
3518	device = peer_device->device;
3519
3520	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3521		    : apv == 88 ? sizeof(struct p_rs_param)
3522					+ SHARED_SECRET_MAX
3523		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3524		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3525
3526	if (pi->size > exp_max_sz) {
3527		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3528		    pi->size, exp_max_sz);
3529		return -EIO;
3530	}
3531
3532	if (apv <= 88) {
3533		header_size = sizeof(struct p_rs_param);
3534		data_size = pi->size - header_size;
3535	} else if (apv <= 94) {
3536		header_size = sizeof(struct p_rs_param_89);
3537		data_size = pi->size - header_size;
3538		D_ASSERT(device, data_size == 0);
3539	} else {
3540		header_size = sizeof(struct p_rs_param_95);
3541		data_size = pi->size - header_size;
3542		D_ASSERT(device, data_size == 0);
3543	}
3544
3545	/* initialize verify_alg and csums_alg */
3546	p = pi->data;
3547	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3548
3549	err = drbd_recv_all(peer_device->connection, p, header_size);
3550	if (err)
3551		return err;
3552
3553	mutex_lock(&connection->resource->conf_update);
3554	old_net_conf = peer_device->connection->net_conf;
3555	if (get_ldev(device)) {
3556		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3557		if (!new_disk_conf) {
3558			put_ldev(device);
3559			mutex_unlock(&connection->resource->conf_update);
3560			drbd_err(device, "Allocation of new disk_conf failed\n");
3561			return -ENOMEM;
3562		}
3563
3564		old_disk_conf = device->ldev->disk_conf;
3565		*new_disk_conf = *old_disk_conf;
3566
3567		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3568	}
3569
3570	if (apv >= 88) {
3571		if (apv == 88) {
3572			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3573				drbd_err(device, "verify-alg of wrong size, "
3574					"peer wants %u, accepting only up to %u byte\n",
3575					data_size, SHARED_SECRET_MAX);
3576				err = -EIO;
3577				goto reconnect;
3578			}
3579
3580			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3581			if (err)
3582				goto reconnect;
3583			/* we expect NUL terminated string */
3584			/* but just in case someone tries to be evil */
3585			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3586			p->verify_alg[data_size-1] = 0;
3587
3588		} else /* apv >= 89 */ {
3589			/* we still expect NUL terminated strings */
3590			/* but just in case someone tries to be evil */
3591			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3592			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3593			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3594			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3595		}
3596
3597		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3598			if (device->state.conn == C_WF_REPORT_PARAMS) {
3599				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3600				    old_net_conf->verify_alg, p->verify_alg);
3601				goto disconnect;
3602			}
3603			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3604					p->verify_alg, "verify-alg");
3605			if (IS_ERR(verify_tfm)) {
3606				verify_tfm = NULL;
3607				goto disconnect;
3608			}
3609		}
3610
3611		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3612			if (device->state.conn == C_WF_REPORT_PARAMS) {
3613				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3614				    old_net_conf->csums_alg, p->csums_alg);
3615				goto disconnect;
3616			}
3617			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3618					p->csums_alg, "csums-alg");
3619			if (IS_ERR(csums_tfm)) {
3620				csums_tfm = NULL;
3621				goto disconnect;
3622			}
3623		}
3624
3625		if (apv > 94 && new_disk_conf) {
3626			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3627			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3628			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3629			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3630
3631			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3632			if (fifo_size != device->rs_plan_s->size) {
3633				new_plan = fifo_alloc(fifo_size);
3634				if (!new_plan) {
3635					drbd_err(device, "kmalloc of fifo_buffer failed");
3636					put_ldev(device);
3637					goto disconnect;
3638				}
3639			}
3640		}
3641
3642		if (verify_tfm || csums_tfm) {
3643			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3644			if (!new_net_conf) {
3645				drbd_err(device, "Allocation of new net_conf failed\n");
3646				goto disconnect;
3647			}
3648
3649			*new_net_conf = *old_net_conf;
3650
3651			if (verify_tfm) {
3652				strcpy(new_net_conf->verify_alg, p->verify_alg);
3653				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3654				crypto_free_hash(peer_device->connection->verify_tfm);
3655				peer_device->connection->verify_tfm = verify_tfm;
3656				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3657			}
3658			if (csums_tfm) {
3659				strcpy(new_net_conf->csums_alg, p->csums_alg);
3660				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3661				crypto_free_hash(peer_device->connection->csums_tfm);
3662				peer_device->connection->csums_tfm = csums_tfm;
3663				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3664			}
3665			rcu_assign_pointer(connection->net_conf, new_net_conf);
3666		}
3667	}
3668
3669	if (new_disk_conf) {
3670		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3671		put_ldev(device);
3672	}
3673
3674	if (new_plan) {
3675		old_plan = device->rs_plan_s;
3676		rcu_assign_pointer(device->rs_plan_s, new_plan);
3677	}
3678
3679	mutex_unlock(&connection->resource->conf_update);
3680	synchronize_rcu();
3681	if (new_net_conf)
3682		kfree(old_net_conf);
3683	kfree(old_disk_conf);
3684	kfree(old_plan);
3685
3686	return 0;
3687
3688reconnect:
3689	if (new_disk_conf) {
3690		put_ldev(device);
3691		kfree(new_disk_conf);
3692	}
3693	mutex_unlock(&connection->resource->conf_update);
3694	return -EIO;
3695
3696disconnect:
3697	kfree(new_plan);
3698	if (new_disk_conf) {
3699		put_ldev(device);
3700		kfree(new_disk_conf);
3701	}
3702	mutex_unlock(&connection->resource->conf_update);
3703	/* just for completeness: actually not needed,
3704	 * as this is not reached if csums_tfm was ok. */
3705	crypto_free_hash(csums_tfm);
3706	/* but free the verify_tfm again, if csums_tfm did not work out */
3707	crypto_free_hash(verify_tfm);
3708	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3709	return -EIO;
3710}
3711
3712/* warn if the arguments differ by more than 12.5% */
3713static void warn_if_differ_considerably(struct drbd_device *device,
3714	const char *s, sector_t a, sector_t b)
3715{
3716	sector_t d;
3717	if (a == 0 || b == 0)
3718		return;
3719	d = (a > b) ? (a - b) : (b - a);
3720	if (d > (a>>3) || d > (b>>3))
3721		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3722		     (unsigned long long)a, (unsigned long long)b);
3723}
3724
3725static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3726{
3727	struct drbd_peer_device *peer_device;
3728	struct drbd_device *device;
3729	struct p_sizes *p = pi->data;
3730	enum determine_dev_size dd = DS_UNCHANGED;
3731	sector_t p_size, p_usize, p_csize, my_usize;
3732	int ldsc = 0; /* local disk size changed */
3733	enum dds_flags ddsf;
3734
3735	peer_device = conn_peer_device(connection, pi->vnr);
3736	if (!peer_device)
3737		return config_unknown_volume(connection, pi);
3738	device = peer_device->device;
3739
3740	p_size = be64_to_cpu(p->d_size);
3741	p_usize = be64_to_cpu(p->u_size);
3742	p_csize = be64_to_cpu(p->c_size);
3743
3744	/* just store the peer's disk size for now.
3745	 * we still need to figure out whether we accept that. */
3746	device->p_size = p_size;
3747
3748	if (get_ldev(device)) {
3749		rcu_read_lock();
3750		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3751		rcu_read_unlock();
3752
3753		warn_if_differ_considerably(device, "lower level device sizes",
3754			   p_size, drbd_get_max_capacity(device->ldev));
3755		warn_if_differ_considerably(device, "user requested size",
3756					    p_usize, my_usize);
3757
3758		/* if this is the first connect, or an otherwise expected
3759		 * param exchange, choose the minimum */
3760		if (device->state.conn == C_WF_REPORT_PARAMS)
3761			p_usize = min_not_zero(my_usize, p_usize);
3762
3763		/* Never shrink a device with usable data during connect.
3764		   But allow online shrinking if we are connected. */
3765		if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3766		    drbd_get_capacity(device->this_bdev) &&
3767		    device->state.disk >= D_OUTDATED &&
3768		    device->state.conn < C_CONNECTED) {
3769			drbd_err(device, "The peer's disk size is too small!\n");
3770			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3771			put_ldev(device);
3772			return -EIO;
3773		}
3774
3775		if (my_usize != p_usize) {
3776			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3777
3778			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3779			if (!new_disk_conf) {
3780				drbd_err(device, "Allocation of new disk_conf failed\n");
3781				put_ldev(device);
3782				return -ENOMEM;
3783			}
3784
3785			mutex_lock(&connection->resource->conf_update);
3786			old_disk_conf = device->ldev->disk_conf;
3787			*new_disk_conf = *old_disk_conf;
3788			new_disk_conf->disk_size = p_usize;
3789
3790			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3791			mutex_unlock(&connection->resource->conf_update);
3792			synchronize_rcu();
3793			kfree(old_disk_conf);
3794
3795			drbd_info(device, "Peer sets u_size to %lu sectors\n",
3796				 (unsigned long)my_usize);
3797		}
3798
3799		put_ldev(device);
3800	}
3801
3802	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3803	/* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3804	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3805	   drbd_reconsider_max_bio_size(), we can be sure that after
3806	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3807
3808	ddsf = be16_to_cpu(p->dds_flags);
3809	if (get_ldev(device)) {
3810		drbd_reconsider_max_bio_size(device, device->ldev);
3811		dd = drbd_determine_dev_size(device, ddsf, NULL);
3812		put_ldev(device);
3813		if (dd == DS_ERROR)
3814			return -EIO;
3815		drbd_md_sync(device);
3816	} else {
3817		/*
3818		 * I am diskless, need to accept the peer's *current* size.
3819		 * I must NOT accept the peers backing disk size,
3820		 * it may have been larger than mine all along...
3821		 *
3822		 * At this point, the peer knows more about my disk, or at
3823		 * least about what we last agreed upon, than myself.
3824		 * So if his c_size is less than his d_size, the most likely
3825		 * reason is that *my* d_size was smaller last time we checked.
3826		 *
3827		 * However, if he sends a zero current size,
3828		 * take his (user-capped or) backing disk size anyways.
3829		 */
3830		drbd_reconsider_max_bio_size(device, NULL);
3831		drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3832	}
3833
3834	if (get_ldev(device)) {
3835		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3836			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3837			ldsc = 1;
3838		}
3839
3840		put_ldev(device);
3841	}
3842
3843	if (device->state.conn > C_WF_REPORT_PARAMS) {
3844		if (be64_to_cpu(p->c_size) !=
3845		    drbd_get_capacity(device->this_bdev) || ldsc) {
3846			/* we have different sizes, probably peer
3847			 * needs to know my new size... */
3848			drbd_send_sizes(peer_device, 0, ddsf);
3849		}
3850		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3851		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3852			if (device->state.pdsk >= D_INCONSISTENT &&
3853			    device->state.disk >= D_INCONSISTENT) {
3854				if (ddsf & DDSF_NO_RESYNC)
3855					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3856				else
3857					resync_after_online_grow(device);
3858			} else
3859				set_bit(RESYNC_AFTER_NEG, &device->flags);
3860		}
3861	}
3862
3863	return 0;
3864}
3865
3866static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3867{
3868	struct drbd_peer_device *peer_device;
3869	struct drbd_device *device;
3870	struct p_uuids *p = pi->data;
3871	u64 *p_uuid;
3872	int i, updated_uuids = 0;
3873
3874	peer_device = conn_peer_device(connection, pi->vnr);
3875	if (!peer_device)
3876		return config_unknown_volume(connection, pi);
3877	device = peer_device->device;
3878
3879	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3880	if (!p_uuid) {
3881		drbd_err(device, "kmalloc of p_uuid failed\n");
3882		return false;
3883	}
3884
3885	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3886		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3887
3888	kfree(device->p_uuid);
3889	device->p_uuid = p_uuid;
3890
3891	if (device->state.conn < C_CONNECTED &&
3892	    device->state.disk < D_INCONSISTENT &&
3893	    device->state.role == R_PRIMARY &&
3894	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3895		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3896		    (unsigned long long)device->ed_uuid);
3897		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3898		return -EIO;
3899	}
3900
3901	if (get_ldev(device)) {
3902		int skip_initial_sync =
3903			device->state.conn == C_CONNECTED &&
3904			peer_device->connection->agreed_pro_version >= 90 &&
3905			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3906			(p_uuid[UI_FLAGS] & 8);
3907		if (skip_initial_sync) {
3908			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3909			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3910					"clear_n_write from receive_uuids",
3911					BM_LOCKED_TEST_ALLOWED);
3912			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3913			_drbd_uuid_set(device, UI_BITMAP, 0);
3914			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3915					CS_VERBOSE, NULL);
3916			drbd_md_sync(device);
3917			updated_uuids = 1;
3918		}
3919		put_ldev(device);
3920	} else if (device->state.disk < D_INCONSISTENT &&
3921		   device->state.role == R_PRIMARY) {
3922		/* I am a diskless primary, the peer just created a new current UUID
3923		   for me. */
3924		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3925	}
3926
3927	/* Before we test for the disk state, we should wait until an eventually
3928	   ongoing cluster wide state change is finished. That is important if
3929	   we are primary and are detaching from our disk. We need to see the
3930	   new disk state... */
3931	mutex_lock(device->state_mutex);
3932	mutex_unlock(device->state_mutex);
3933	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3934		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3935
3936	if (updated_uuids)
3937		drbd_print_uuids(device, "receiver updated UUIDs to");
3938
3939	return 0;
3940}
3941
3942/**
3943 * convert_state() - Converts the peer's view of the cluster state to our point of view
3944 * @ps:		The state as seen by the peer.
3945 */
3946static union drbd_state convert_state(union drbd_state ps)
3947{
3948	union drbd_state ms;
3949
3950	static enum drbd_conns c_tab[] = {
3951		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3952		[C_CONNECTED] = C_CONNECTED,
3953
3954		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3955		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3956		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3957		[C_VERIFY_S]       = C_VERIFY_T,
3958		[C_MASK]   = C_MASK,
3959	};
3960
3961	ms.i = ps.i;
3962
3963	ms.conn = c_tab[ps.conn];
3964	ms.peer = ps.role;
3965	ms.role = ps.peer;
3966	ms.pdsk = ps.disk;
3967	ms.disk = ps.pdsk;
3968	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3969
3970	return ms;
3971}
3972
3973static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3974{
3975	struct drbd_peer_device *peer_device;
3976	struct drbd_device *device;
3977	struct p_req_state *p = pi->data;
3978	union drbd_state mask, val;
3979	enum drbd_state_rv rv;
3980
3981	peer_device = conn_peer_device(connection, pi->vnr);
3982	if (!peer_device)
3983		return -EIO;
3984	device = peer_device->device;
3985
3986	mask.i = be32_to_cpu(p->mask);
3987	val.i = be32_to_cpu(p->val);
3988
3989	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3990	    mutex_is_locked(device->state_mutex)) {
3991		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3992		return 0;
3993	}
3994
3995	mask = convert_state(mask);
3996	val = convert_state(val);
3997
3998	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3999	drbd_send_sr_reply(peer_device, rv);
4000
4001	drbd_md_sync(device);
4002
4003	return 0;
4004}
4005
4006static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4007{
4008	struct p_req_state *p = pi->data;
4009	union drbd_state mask, val;
4010	enum drbd_state_rv rv;
4011
4012	mask.i = be32_to_cpu(p->mask);
4013	val.i = be32_to_cpu(p->val);
4014
4015	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4016	    mutex_is_locked(&connection->cstate_mutex)) {
4017		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4018		return 0;
4019	}
4020
4021	mask = convert_state(mask);
4022	val = convert_state(val);
4023
4024	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4025	conn_send_sr_reply(connection, rv);
4026
4027	return 0;
4028}
4029
4030static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4031{
4032	struct drbd_peer_device *peer_device;
4033	struct drbd_device *device;
4034	struct p_state *p = pi->data;
4035	union drbd_state os, ns, peer_state;
4036	enum drbd_disk_state real_peer_disk;
4037	enum chg_state_flags cs_flags;
4038	int rv;
4039
4040	peer_device = conn_peer_device(connection, pi->vnr);
4041	if (!peer_device)
4042		return config_unknown_volume(connection, pi);
4043	device = peer_device->device;
4044
4045	peer_state.i = be32_to_cpu(p->state);
4046
4047	real_peer_disk = peer_state.disk;
4048	if (peer_state.disk == D_NEGOTIATING) {
4049		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4050		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4051	}
4052
4053	spin_lock_irq(&device->resource->req_lock);
4054 retry:
4055	os = ns = drbd_read_state(device);
4056	spin_unlock_irq(&device->resource->req_lock);
4057
4058	/* If some other part of the code (asender thread, timeout)
4059	 * already decided to close the connection again,
4060	 * we must not "re-establish" it here. */
4061	if (os.conn <= C_TEAR_DOWN)
4062		return -ECONNRESET;
4063
4064	/* If this is the "end of sync" confirmation, usually the peer disk
4065	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4066	 * set) resync started in PausedSyncT, or if the timing of pause-/
4067	 * unpause-sync events has been "just right", the peer disk may
4068	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4069	 */
4070	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4071	    real_peer_disk == D_UP_TO_DATE &&
4072	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4073		/* If we are (becoming) SyncSource, but peer is still in sync
4074		 * preparation, ignore its uptodate-ness to avoid flapping, it
4075		 * will change to inconsistent once the peer reaches active
4076		 * syncing states.
4077		 * It may have changed syncer-paused flags, however, so we
4078		 * cannot ignore this completely. */
4079		if (peer_state.conn > C_CONNECTED &&
4080		    peer_state.conn < C_SYNC_SOURCE)
4081			real_peer_disk = D_INCONSISTENT;
4082
4083		/* if peer_state changes to connected at the same time,
4084		 * it explicitly notifies us that it finished resync.
4085		 * Maybe we should finish it up, too? */
4086		else if (os.conn >= C_SYNC_SOURCE &&
4087			 peer_state.conn == C_CONNECTED) {
4088			if (drbd_bm_total_weight(device) <= device->rs_failed)
4089				drbd_resync_finished(device);
4090			return 0;
4091		}
4092	}
4093
4094	/* explicit verify finished notification, stop sector reached. */
4095	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4096	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4097		ov_out_of_sync_print(device);
4098		drbd_resync_finished(device);
4099		return 0;
4100	}
4101
4102	/* peer says his disk is inconsistent, while we think it is uptodate,
4103	 * and this happens while the peer still thinks we have a sync going on,
4104	 * but we think we are already done with the sync.
4105	 * We ignore this to avoid flapping pdsk.
4106	 * This should not happen, if the peer is a recent version of drbd. */
4107	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4108	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4109		real_peer_disk = D_UP_TO_DATE;
4110
4111	if (ns.conn == C_WF_REPORT_PARAMS)
4112		ns.conn = C_CONNECTED;
4113
4114	if (peer_state.conn == C_AHEAD)
4115		ns.conn = C_BEHIND;
4116
4117	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4118	    get_ldev_if_state(device, D_NEGOTIATING)) {
4119		int cr; /* consider resync */
4120
4121		/* if we established a new connection */
4122		cr  = (os.conn < C_CONNECTED);
4123		/* if we had an established connection
4124		 * and one of the nodes newly attaches a disk */
4125		cr |= (os.conn == C_CONNECTED &&
4126		       (peer_state.disk == D_NEGOTIATING ||
4127			os.disk == D_NEGOTIATING));
4128		/* if we have both been inconsistent, and the peer has been
4129		 * forced to be UpToDate with --overwrite-data */
4130		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4131		/* if we had been plain connected, and the admin requested to
4132		 * start a sync by "invalidate" or "invalidate-remote" */
4133		cr |= (os.conn == C_CONNECTED &&
4134				(peer_state.conn >= C_STARTING_SYNC_S &&
4135				 peer_state.conn <= C_WF_BITMAP_T));
4136
4137		if (cr)
4138			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4139
4140		put_ldev(device);
4141		if (ns.conn == C_MASK) {
4142			ns.conn = C_CONNECTED;
4143			if (device->state.disk == D_NEGOTIATING) {
4144				drbd_force_state(device, NS(disk, D_FAILED));
4145			} else if (peer_state.disk == D_NEGOTIATING) {
4146				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4147				peer_state.disk = D_DISKLESS;
4148				real_peer_disk = D_DISKLESS;
4149			} else {
4150				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4151					return -EIO;
4152				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4153				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4154				return -EIO;
4155			}
4156		}
4157	}
4158
4159	spin_lock_irq(&device->resource->req_lock);
4160	if (os.i != drbd_read_state(device).i)
4161		goto retry;
4162	clear_bit(CONSIDER_RESYNC, &device->flags);
4163	ns.peer = peer_state.role;
4164	ns.pdsk = real_peer_disk;
4165	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4166	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4167		ns.disk = device->new_state_tmp.disk;
4168	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4169	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4170	    test_bit(NEW_CUR_UUID, &device->flags)) {
4171		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4172		   for temporal network outages! */
4173		spin_unlock_irq(&device->resource->req_lock);
4174		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4175		tl_clear(peer_device->connection);
4176		drbd_uuid_new_current(device);
4177		clear_bit(NEW_CUR_UUID, &device->flags);
4178		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4179		return -EIO;
4180	}
4181	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4182	ns = drbd_read_state(device);
4183	spin_unlock_irq(&device->resource->req_lock);
4184
4185	if (rv < SS_SUCCESS) {
4186		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4187		return -EIO;
4188	}
4189
4190	if (os.conn > C_WF_REPORT_PARAMS) {
4191		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4192		    peer_state.disk != D_NEGOTIATING ) {
4193			/* we want resync, peer has not yet decided to sync... */
4194			/* Nowadays only used when forcing a node into primary role and
4195			   setting its disk to UpToDate with that */
4196			drbd_send_uuids(peer_device);
4197			drbd_send_current_state(peer_device);
4198		}
4199	}
4200
4201	clear_bit(DISCARD_MY_DATA, &device->flags);
4202
4203	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4204
4205	return 0;
4206}
4207
4208static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4209{
4210	struct drbd_peer_device *peer_device;
4211	struct drbd_device *device;
4212	struct p_rs_uuid *p = pi->data;
4213
4214	peer_device = conn_peer_device(connection, pi->vnr);
4215	if (!peer_device)
4216		return -EIO;
4217	device = peer_device->device;
4218
4219	wait_event(device->misc_wait,
4220		   device->state.conn == C_WF_SYNC_UUID ||
4221		   device->state.conn == C_BEHIND ||
4222		   device->state.conn < C_CONNECTED ||
4223		   device->state.disk < D_NEGOTIATING);
4224
4225	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4226
4227	/* Here the _drbd_uuid_ functions are right, current should
4228	   _not_ be rotated into the history */
4229	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4230		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4231		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4232
4233		drbd_print_uuids(device, "updated sync uuid");
4234		drbd_start_resync(device, C_SYNC_TARGET);
4235
4236		put_ldev(device);
4237	} else
4238		drbd_err(device, "Ignoring SyncUUID packet!\n");
4239
4240	return 0;
4241}
4242
4243/**
4244 * receive_bitmap_plain
4245 *
4246 * Return 0 when done, 1 when another iteration is needed, and a negative error
4247 * code upon failure.
4248 */
4249static int
4250receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4251		     unsigned long *p, struct bm_xfer_ctx *c)
4252{
4253	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4254				 drbd_header_size(peer_device->connection);
4255	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4256				       c->bm_words - c->word_offset);
4257	unsigned int want = num_words * sizeof(*p);
4258	int err;
4259
4260	if (want != size) {
4261		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4262		return -EIO;
4263	}
4264	if (want == 0)
4265		return 0;
4266	err = drbd_recv_all(peer_device->connection, p, want);
4267	if (err)
4268		return err;
4269
4270	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4271
4272	c->word_offset += num_words;
4273	c->bit_offset = c->word_offset * BITS_PER_LONG;
4274	if (c->bit_offset > c->bm_bits)
4275		c->bit_offset = c->bm_bits;
4276
4277	return 1;
4278}
4279
4280static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4281{
4282	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4283}
4284
4285static int dcbp_get_start(struct p_compressed_bm *p)
4286{
4287	return (p->encoding & 0x80) != 0;
4288}
4289
4290static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4291{
4292	return (p->encoding >> 4) & 0x7;
4293}
4294
4295/**
4296 * recv_bm_rle_bits
4297 *
4298 * Return 0 when done, 1 when another iteration is needed, and a negative error
4299 * code upon failure.
4300 */
4301static int
4302recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4303		struct p_compressed_bm *p,
4304		 struct bm_xfer_ctx *c,
4305		 unsigned int len)
4306{
4307	struct bitstream bs;
4308	u64 look_ahead;
4309	u64 rl;
4310	u64 tmp;
4311	unsigned long s = c->bit_offset;
4312	unsigned long e;
4313	int toggle = dcbp_get_start(p);
4314	int have;
4315	int bits;
4316
4317	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4318
4319	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4320	if (bits < 0)
4321		return -EIO;
4322
4323	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4324		bits = vli_decode_bits(&rl, look_ahead);
4325		if (bits <= 0)
4326			return -EIO;
4327
4328		if (toggle) {
4329			e = s + rl -1;
4330			if (e >= c->bm_bits) {
4331				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4332				return -EIO;
4333			}
4334			_drbd_bm_set_bits(peer_device->device, s, e);
4335		}
4336
4337		if (have < bits) {
4338			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4339				have, bits, look_ahead,
4340				(unsigned int)(bs.cur.b - p->code),
4341				(unsigned int)bs.buf_len);
4342			return -EIO;
4343		}
4344		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4345		if (likely(bits < 64))
4346			look_ahead >>= bits;
4347		else
4348			look_ahead = 0;
4349		have -= bits;
4350
4351		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4352		if (bits < 0)
4353			return -EIO;
4354		look_ahead |= tmp << have;
4355		have += bits;
4356	}
4357
4358	c->bit_offset = s;
4359	bm_xfer_ctx_bit_to_word_offset(c);
4360
4361	return (s != c->bm_bits);
4362}
4363
4364/**
4365 * decode_bitmap_c
4366 *
4367 * Return 0 when done, 1 when another iteration is needed, and a negative error
4368 * code upon failure.
4369 */
4370static int
4371decode_bitmap_c(struct drbd_peer_device *peer_device,
4372		struct p_compressed_bm *p,
4373		struct bm_xfer_ctx *c,
4374		unsigned int len)
4375{
4376	if (dcbp_get_code(p) == RLE_VLI_Bits)
4377		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4378
4379	/* other variants had been implemented for evaluation,
4380	 * but have been dropped as this one turned out to be "best"
4381	 * during all our tests. */
4382
4383	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4384	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4385	return -EIO;
4386}
4387
4388void INFO_bm_xfer_stats(struct drbd_device *device,
4389		const char *direction, struct bm_xfer_ctx *c)
4390{
4391	/* what would it take to transfer it "plaintext" */
4392	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4393	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4394	unsigned int plain =
4395		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4396		c->bm_words * sizeof(unsigned long);
4397	unsigned int total = c->bytes[0] + c->bytes[1];
4398	unsigned int r;
4399
4400	/* total can not be zero. but just in case: */
4401	if (total == 0)
4402		return;
4403
4404	/* don't report if not compressed */
4405	if (total >= plain)
4406		return;
4407
4408	/* total < plain. check for overflow, still */
4409	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4410		                    : (1000 * total / plain);
4411
4412	if (r > 1000)
4413		r = 1000;
4414
4415	r = 1000 - r;
4416	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4417	     "total %u; compression: %u.%u%%\n",
4418			direction,
4419			c->bytes[1], c->packets[1],
4420			c->bytes[0], c->packets[0],
4421			total, r/10, r % 10);
4422}
4423
4424/* Since we are processing the bitfield from lower addresses to higher,
4425   it does not matter if the process it in 32 bit chunks or 64 bit
4426   chunks as long as it is little endian. (Understand it as byte stream,
4427   beginning with the lowest byte...) If we would use big endian
4428   we would need to process it from the highest address to the lowest,
4429   in order to be agnostic to the 32 vs 64 bits issue.
4430
4431   returns 0 on failure, 1 if we successfully received it. */
4432static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4433{
4434	struct drbd_peer_device *peer_device;
4435	struct drbd_device *device;
4436	struct bm_xfer_ctx c;
4437	int err;
4438
4439	peer_device = conn_peer_device(connection, pi->vnr);
4440	if (!peer_device)
4441		return -EIO;
4442	device = peer_device->device;
4443
4444	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4445	/* you are supposed to send additional out-of-sync information
4446	 * if you actually set bits during this phase */
4447
4448	c = (struct bm_xfer_ctx) {
4449		.bm_bits = drbd_bm_bits(device),
4450		.bm_words = drbd_bm_words(device),
4451	};
4452
4453	for(;;) {
4454		if (pi->cmd == P_BITMAP)
4455			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4456		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4457			/* MAYBE: sanity check that we speak proto >= 90,
4458			 * and the feature is enabled! */
4459			struct p_compressed_bm *p = pi->data;
4460
4461			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4462				drbd_err(device, "ReportCBitmap packet too large\n");
4463				err = -EIO;
4464				goto out;
4465			}
4466			if (pi->size <= sizeof(*p)) {
4467				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4468				err = -EIO;
4469				goto out;
4470			}
4471			err = drbd_recv_all(peer_device->connection, p, pi->size);
4472			if (err)
4473			       goto out;
4474			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4475		} else {
4476			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4477			err = -EIO;
4478			goto out;
4479		}
4480
4481		c.packets[pi->cmd == P_BITMAP]++;
4482		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4483
4484		if (err <= 0) {
4485			if (err < 0)
4486				goto out;
4487			break;
4488		}
4489		err = drbd_recv_header(peer_device->connection, pi);
4490		if (err)
4491			goto out;
4492	}
4493
4494	INFO_bm_xfer_stats(device, "receive", &c);
4495
4496	if (device->state.conn == C_WF_BITMAP_T) {
4497		enum drbd_state_rv rv;
4498
4499		err = drbd_send_bitmap(device);
4500		if (err)
4501			goto out;
4502		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4503		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4504		D_ASSERT(device, rv == SS_SUCCESS);
4505	} else if (device->state.conn != C_WF_BITMAP_S) {
4506		/* admin may have requested C_DISCONNECTING,
4507		 * other threads may have noticed network errors */
4508		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4509		    drbd_conn_str(device->state.conn));
4510	}
4511	err = 0;
4512
4513 out:
4514	drbd_bm_unlock(device);
4515	if (!err && device->state.conn == C_WF_BITMAP_S)
4516		drbd_start_resync(device, C_SYNC_SOURCE);
4517	return err;
4518}
4519
4520static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4521{
4522	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4523		 pi->cmd, pi->size);
4524
4525	return ignore_remaining_packet(connection, pi);
4526}
4527
4528static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4529{
4530	/* Make sure we've acked all the TCP data associated
4531	 * with the data requests being unplugged */
4532	drbd_tcp_quickack(connection->data.socket);
4533
4534	return 0;
4535}
4536
4537static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4538{
4539	struct drbd_peer_device *peer_device;
4540	struct drbd_device *device;
4541	struct p_block_desc *p = pi->data;
4542
4543	peer_device = conn_peer_device(connection, pi->vnr);
4544	if (!peer_device)
4545		return -EIO;
4546	device = peer_device->device;
4547
4548	switch (device->state.conn) {
4549	case C_WF_SYNC_UUID:
4550	case C_WF_BITMAP_T:
4551	case C_BEHIND:
4552			break;
4553	default:
4554		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4555				drbd_conn_str(device->state.conn));
4556	}
4557
4558	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4559
4560	return 0;
4561}
4562
4563struct data_cmd {
4564	int expect_payload;
4565	size_t pkt_size;
4566	int (*fn)(struct drbd_connection *, struct packet_info *);
4567};
4568
4569static struct data_cmd drbd_cmd_handler[] = {
4570	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4571	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4572	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4573	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4574	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4575	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4576	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4577	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4578	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4579	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4580	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4581	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4582	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4583	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4584	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4585	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4586	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4587	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4588	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4589	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4590	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4591	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4592	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4593	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4594	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4595};
4596
4597static void drbdd(struct drbd_connection *connection)
4598{
4599	struct packet_info pi;
4600	size_t shs; /* sub header size */
4601	int err;
4602
4603	while (get_t_state(&connection->receiver) == RUNNING) {
4604		struct data_cmd *cmd;
4605
4606		drbd_thread_current_set_cpu(&connection->receiver);
4607		update_receiver_timing_details(connection, drbd_recv_header);
4608		if (drbd_recv_header(connection, &pi))
4609			goto err_out;
4610
4611		cmd = &drbd_cmd_handler[pi.cmd];
4612		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4613			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4614				 cmdname(pi.cmd), pi.cmd);
4615			goto err_out;
4616		}
4617
4618		shs = cmd->pkt_size;
4619		if (pi.size > shs && !cmd->expect_payload) {
4620			drbd_err(connection, "No payload expected %s l:%d\n",
4621				 cmdname(pi.cmd), pi.size);
4622			goto err_out;
4623		}
4624
4625		if (shs) {
4626			update_receiver_timing_details(connection, drbd_recv_all_warn);
4627			err = drbd_recv_all_warn(connection, pi.data, shs);
4628			if (err)
4629				goto err_out;
4630			pi.size -= shs;
4631		}
4632
4633		update_receiver_timing_details(connection, cmd->fn);
4634		err = cmd->fn(connection, &pi);
4635		if (err) {
4636			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4637				 cmdname(pi.cmd), err, pi.size);
4638			goto err_out;
4639		}
4640	}
4641	return;
4642
4643    err_out:
4644	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4645}
4646
4647static void conn_disconnect(struct drbd_connection *connection)
4648{
4649	struct drbd_peer_device *peer_device;
4650	enum drbd_conns oc;
4651	int vnr;
4652
4653	if (connection->cstate == C_STANDALONE)
4654		return;
4655
4656	/* We are about to start the cleanup after connection loss.
4657	 * Make sure drbd_make_request knows about that.
4658	 * Usually we should be in some network failure state already,
4659	 * but just in case we are not, we fix it up here.
4660	 */
4661	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4662
4663	/* asender does not clean up anything. it must not interfere, either */
4664	drbd_thread_stop(&connection->asender);
4665	drbd_free_sock(connection);
4666
4667	rcu_read_lock();
4668	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4669		struct drbd_device *device = peer_device->device;
4670		kref_get(&device->kref);
4671		rcu_read_unlock();
4672		drbd_disconnected(peer_device);
4673		kref_put(&device->kref, drbd_destroy_device);
4674		rcu_read_lock();
4675	}
4676	rcu_read_unlock();
4677
4678	if (!list_empty(&connection->current_epoch->list))
4679		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4680	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4681	atomic_set(&connection->current_epoch->epoch_size, 0);
4682	connection->send.seen_any_write_yet = false;
4683
4684	drbd_info(connection, "Connection closed\n");
4685
4686	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4687		conn_try_outdate_peer_async(connection);
4688
4689	spin_lock_irq(&connection->resource->req_lock);
4690	oc = connection->cstate;
4691	if (oc >= C_UNCONNECTED)
4692		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4693
4694	spin_unlock_irq(&connection->resource->req_lock);
4695
4696	if (oc == C_DISCONNECTING)
4697		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4698}
4699
4700static int drbd_disconnected(struct drbd_peer_device *peer_device)
4701{
4702	struct drbd_device *device = peer_device->device;
4703	unsigned int i;
4704
4705	/* wait for current activity to cease. */
4706	spin_lock_irq(&device->resource->req_lock);
4707	_drbd_wait_ee_list_empty(device, &device->active_ee);
4708	_drbd_wait_ee_list_empty(device, &device->sync_ee);
4709	_drbd_wait_ee_list_empty(device, &device->read_ee);
4710	spin_unlock_irq(&device->resource->req_lock);
4711
4712	/* We do not have data structures that would allow us to
4713	 * get the rs_pending_cnt down to 0 again.
4714	 *  * On C_SYNC_TARGET we do not have any data structures describing
4715	 *    the pending RSDataRequest's we have sent.
4716	 *  * On C_SYNC_SOURCE there is no data structure that tracks
4717	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4718	 *  And no, it is not the sum of the reference counts in the
4719	 *  resync_LRU. The resync_LRU tracks the whole operation including
4720	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4721	 *  on the fly. */
4722	drbd_rs_cancel_all(device);
4723	device->rs_total = 0;
4724	device->rs_failed = 0;
4725	atomic_set(&device->rs_pending_cnt, 0);
4726	wake_up(&device->misc_wait);
4727
4728	del_timer_sync(&device->resync_timer);
4729	resync_timer_fn((unsigned long)device);
4730
4731	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4732	 * w_make_resync_request etc. which may still be on the worker queue
4733	 * to be "canceled" */
4734	drbd_flush_workqueue(&peer_device->connection->sender_work);
4735
4736	drbd_finish_peer_reqs(device);
4737
4738	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4739	   might have issued a work again. The one before drbd_finish_peer_reqs() is
4740	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4741	drbd_flush_workqueue(&peer_device->connection->sender_work);
4742
4743	/* need to do it again, drbd_finish_peer_reqs() may have populated it
4744	 * again via drbd_try_clear_on_disk_bm(). */
4745	drbd_rs_cancel_all(device);
4746
4747	kfree(device->p_uuid);
4748	device->p_uuid = NULL;
4749
4750	if (!drbd_suspended(device))
4751		tl_clear(peer_device->connection);
4752
4753	drbd_md_sync(device);
4754
4755	/* serialize with bitmap writeout triggered by the state change,
4756	 * if any. */
4757	wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4758
4759	/* tcp_close and release of sendpage pages can be deferred.  I don't
4760	 * want to use SO_LINGER, because apparently it can be deferred for
4761	 * more than 20 seconds (longest time I checked).
4762	 *
4763	 * Actually we don't care for exactly when the network stack does its
4764	 * put_page(), but release our reference on these pages right here.
4765	 */
4766	i = drbd_free_peer_reqs(device, &device->net_ee);
4767	if (i)
4768		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4769	i = atomic_read(&device->pp_in_use_by_net);
4770	if (i)
4771		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4772	i = atomic_read(&device->pp_in_use);
4773	if (i)
4774		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4775
4776	D_ASSERT(device, list_empty(&device->read_ee));
4777	D_ASSERT(device, list_empty(&device->active_ee));
4778	D_ASSERT(device, list_empty(&device->sync_ee));
4779	D_ASSERT(device, list_empty(&device->done_ee));
4780
4781	return 0;
4782}
4783
4784/*
4785 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4786 * we can agree on is stored in agreed_pro_version.
4787 *
4788 * feature flags and the reserved array should be enough room for future
4789 * enhancements of the handshake protocol, and possible plugins...
4790 *
4791 * for now, they are expected to be zero, but ignored.
4792 */
4793static int drbd_send_features(struct drbd_connection *connection)
4794{
4795	struct drbd_socket *sock;
4796	struct p_connection_features *p;
4797
4798	sock = &connection->data;
4799	p = conn_prepare_command(connection, sock);
4800	if (!p)
4801		return -EIO;
4802	memset(p, 0, sizeof(*p));
4803	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4804	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4805	p->feature_flags = cpu_to_be32(PRO_FEATURES);
4806	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4807}
4808
4809/*
4810 * return values:
4811 *   1 yes, we have a valid connection
4812 *   0 oops, did not work out, please try again
4813 *  -1 peer talks different language,
4814 *     no point in trying again, please go standalone.
4815 */
4816static int drbd_do_features(struct drbd_connection *connection)
4817{
4818	/* ASSERT current == connection->receiver ... */
4819	struct p_connection_features *p;
4820	const int expect = sizeof(struct p_connection_features);
4821	struct packet_info pi;
4822	int err;
4823
4824	err = drbd_send_features(connection);
4825	if (err)
4826		return 0;
4827
4828	err = drbd_recv_header(connection, &pi);
4829	if (err)
4830		return 0;
4831
4832	if (pi.cmd != P_CONNECTION_FEATURES) {
4833		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4834			 cmdname(pi.cmd), pi.cmd);
4835		return -1;
4836	}
4837
4838	if (pi.size != expect) {
4839		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4840		     expect, pi.size);
4841		return -1;
4842	}
4843
4844	p = pi.data;
4845	err = drbd_recv_all_warn(connection, p, expect);
4846	if (err)
4847		return 0;
4848
4849	p->protocol_min = be32_to_cpu(p->protocol_min);
4850	p->protocol_max = be32_to_cpu(p->protocol_max);
4851	if (p->protocol_max == 0)
4852		p->protocol_max = p->protocol_min;
4853
4854	if (PRO_VERSION_MAX < p->protocol_min ||
4855	    PRO_VERSION_MIN > p->protocol_max)
4856		goto incompat;
4857
4858	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4859	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4860
4861	drbd_info(connection, "Handshake successful: "
4862	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
4863
4864	drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4865		  connection->agreed_features & FF_TRIM ? " " : " not ");
4866
4867	return 1;
4868
4869 incompat:
4870	drbd_err(connection, "incompatible DRBD dialects: "
4871	    "I support %d-%d, peer supports %d-%d\n",
4872	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4873	    p->protocol_min, p->protocol_max);
4874	return -1;
4875}
4876
4877#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4878static int drbd_do_auth(struct drbd_connection *connection)
4879{
4880	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4881	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4882	return -1;
4883}
4884#else
4885#define CHALLENGE_LEN 64
4886
4887/* Return value:
4888	1 - auth succeeded,
4889	0 - failed, try again (network error),
4890	-1 - auth failed, don't try again.
4891*/
4892
4893static int drbd_do_auth(struct drbd_connection *connection)
4894{
4895	struct drbd_socket *sock;
4896	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4897	struct scatterlist sg;
4898	char *response = NULL;
4899	char *right_response = NULL;
4900	char *peers_ch = NULL;
4901	unsigned int key_len;
4902	char secret[SHARED_SECRET_MAX]; /* 64 byte */
4903	unsigned int resp_size;
4904	struct hash_desc desc;
4905	struct packet_info pi;
4906	struct net_conf *nc;
4907	int err, rv;
4908
4909	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4910
4911	rcu_read_lock();
4912	nc = rcu_dereference(connection->net_conf);
4913	key_len = strlen(nc->shared_secret);
4914	memcpy(secret, nc->shared_secret, key_len);
4915	rcu_read_unlock();
4916
4917	desc.tfm = connection->cram_hmac_tfm;
4918	desc.flags = 0;
4919
4920	rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4921	if (rv) {
4922		drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4923		rv = -1;
4924		goto fail;
4925	}
4926
4927	get_random_bytes(my_challenge, CHALLENGE_LEN);
4928
4929	sock = &connection->data;
4930	if (!conn_prepare_command(connection, sock)) {
4931		rv = 0;
4932		goto fail;
4933	}
4934	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4935				my_challenge, CHALLENGE_LEN);
4936	if (!rv)
4937		goto fail;
4938
4939	err = drbd_recv_header(connection, &pi);
4940	if (err) {
4941		rv = 0;
4942		goto fail;
4943	}
4944
4945	if (pi.cmd != P_AUTH_CHALLENGE) {
4946		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4947			 cmdname(pi.cmd), pi.cmd);
4948		rv = 0;
4949		goto fail;
4950	}
4951
4952	if (pi.size > CHALLENGE_LEN * 2) {
4953		drbd_err(connection, "expected AuthChallenge payload too big.\n");
4954		rv = -1;
4955		goto fail;
4956	}
4957
4958	if (pi.size < CHALLENGE_LEN) {
4959		drbd_err(connection, "AuthChallenge payload too small.\n");
4960		rv = -1;
4961		goto fail;
4962	}
4963
4964	peers_ch = kmalloc(pi.size, GFP_NOIO);
4965	if (peers_ch == NULL) {
4966		drbd_err(connection, "kmalloc of peers_ch failed\n");
4967		rv = -1;
4968		goto fail;
4969	}
4970
4971	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4972	if (err) {
4973		rv = 0;
4974		goto fail;
4975	}
4976
4977	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4978		drbd_err(connection, "Peer presented the same challenge!\n");
4979		rv = -1;
4980		goto fail;
4981	}
4982
4983	resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4984	response = kmalloc(resp_size, GFP_NOIO);
4985	if (response == NULL) {
4986		drbd_err(connection, "kmalloc of response failed\n");
4987		rv = -1;
4988		goto fail;
4989	}
4990
4991	sg_init_table(&sg, 1);
4992	sg_set_buf(&sg, peers_ch, pi.size);
4993
4994	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4995	if (rv) {
4996		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4997		rv = -1;
4998		goto fail;
4999	}
5000
5001	if (!conn_prepare_command(connection, sock)) {
5002		rv = 0;
5003		goto fail;
5004	}
5005	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5006				response, resp_size);
5007	if (!rv)
5008		goto fail;
5009
5010	err = drbd_recv_header(connection, &pi);
5011	if (err) {
5012		rv = 0;
5013		goto fail;
5014	}
5015
5016	if (pi.cmd != P_AUTH_RESPONSE) {
5017		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5018			 cmdname(pi.cmd), pi.cmd);
5019		rv = 0;
5020		goto fail;
5021	}
5022
5023	if (pi.size != resp_size) {
5024		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5025		rv = 0;
5026		goto fail;
5027	}
5028
5029	err = drbd_recv_all_warn(connection, response , resp_size);
5030	if (err) {
5031		rv = 0;
5032		goto fail;
5033	}
5034
5035	right_response = kmalloc(resp_size, GFP_NOIO);
5036	if (right_response == NULL) {
5037		drbd_err(connection, "kmalloc of right_response failed\n");
5038		rv = -1;
5039		goto fail;
5040	}
5041
5042	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
5043
5044	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
5045	if (rv) {
5046		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5047		rv = -1;
5048		goto fail;
5049	}
5050
5051	rv = !memcmp(response, right_response, resp_size);
5052
5053	if (rv)
5054		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5055		     resp_size);
5056	else
5057		rv = -1;
5058
5059 fail:
5060	kfree(peers_ch);
5061	kfree(response);
5062	kfree(right_response);
5063
5064	return rv;
5065}
5066#endif
5067
5068int drbd_receiver(struct drbd_thread *thi)
5069{
5070	struct drbd_connection *connection = thi->connection;
5071	int h;
5072
5073	drbd_info(connection, "receiver (re)started\n");
5074
5075	do {
5076		h = conn_connect(connection);
5077		if (h == 0) {
5078			conn_disconnect(connection);
5079			schedule_timeout_interruptible(HZ);
5080		}
5081		if (h == -1) {
5082			drbd_warn(connection, "Discarding network configuration.\n");
5083			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5084		}
5085	} while (h == 0);
5086
5087	if (h > 0)
5088		drbdd(connection);
5089
5090	conn_disconnect(connection);
5091
5092	drbd_info(connection, "receiver terminated\n");
5093	return 0;
5094}
5095
5096/* ********* acknowledge sender ******** */
5097
5098static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5099{
5100	struct p_req_state_reply *p = pi->data;
5101	int retcode = be32_to_cpu(p->retcode);
5102
5103	if (retcode >= SS_SUCCESS) {
5104		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5105	} else {
5106		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5107		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5108			 drbd_set_st_err_str(retcode), retcode);
5109	}
5110	wake_up(&connection->ping_wait);
5111
5112	return 0;
5113}
5114
5115static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5116{
5117	struct drbd_peer_device *peer_device;
5118	struct drbd_device *device;
5119	struct p_req_state_reply *p = pi->data;
5120	int retcode = be32_to_cpu(p->retcode);
5121
5122	peer_device = conn_peer_device(connection, pi->vnr);
5123	if (!peer_device)
5124		return -EIO;
5125	device = peer_device->device;
5126
5127	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5128		D_ASSERT(device, connection->agreed_pro_version < 100);
5129		return got_conn_RqSReply(connection, pi);
5130	}
5131
5132	if (retcode >= SS_SUCCESS) {
5133		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5134	} else {
5135		set_bit(CL_ST_CHG_FAIL, &device->flags);
5136		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5137			drbd_set_st_err_str(retcode), retcode);
5138	}
5139	wake_up(&device->state_wait);
5140
5141	return 0;
5142}
5143
5144static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5145{
5146	return drbd_send_ping_ack(connection);
5147
5148}
5149
5150static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5151{
5152	/* restore idle timeout */
5153	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5154	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5155		wake_up(&connection->ping_wait);
5156
5157	return 0;
5158}
5159
5160static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5161{
5162	struct drbd_peer_device *peer_device;
5163	struct drbd_device *device;
5164	struct p_block_ack *p = pi->data;
5165	sector_t sector = be64_to_cpu(p->sector);
5166	int blksize = be32_to_cpu(p->blksize);
5167
5168	peer_device = conn_peer_device(connection, pi->vnr);
5169	if (!peer_device)
5170		return -EIO;
5171	device = peer_device->device;
5172
5173	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5174
5175	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5176
5177	if (get_ldev(device)) {
5178		drbd_rs_complete_io(device, sector);
5179		drbd_set_in_sync(device, sector, blksize);
5180		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5181		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5182		put_ldev(device);
5183	}
5184	dec_rs_pending(device);
5185	atomic_add(blksize >> 9, &device->rs_sect_in);
5186
5187	return 0;
5188}
5189
5190static int
5191validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5192			      struct rb_root *root, const char *func,
5193			      enum drbd_req_event what, bool missing_ok)
5194{
5195	struct drbd_request *req;
5196	struct bio_and_error m;
5197
5198	spin_lock_irq(&device->resource->req_lock);
5199	req = find_request(device, root, id, sector, missing_ok, func);
5200	if (unlikely(!req)) {
5201		spin_unlock_irq(&device->resource->req_lock);
5202		return -EIO;
5203	}
5204	__req_mod(req, what, &m);
5205	spin_unlock_irq(&device->resource->req_lock);
5206
5207	if (m.bio)
5208		complete_master_bio(device, &m);
5209	return 0;
5210}
5211
5212static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5213{
5214	struct drbd_peer_device *peer_device;
5215	struct drbd_device *device;
5216	struct p_block_ack *p = pi->data;
5217	sector_t sector = be64_to_cpu(p->sector);
5218	int blksize = be32_to_cpu(p->blksize);
5219	enum drbd_req_event what;
5220
5221	peer_device = conn_peer_device(connection, pi->vnr);
5222	if (!peer_device)
5223		return -EIO;
5224	device = peer_device->device;
5225
5226	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5227
5228	if (p->block_id == ID_SYNCER) {
5229		drbd_set_in_sync(device, sector, blksize);
5230		dec_rs_pending(device);
5231		return 0;
5232	}
5233	switch (pi->cmd) {
5234	case P_RS_WRITE_ACK:
5235		what = WRITE_ACKED_BY_PEER_AND_SIS;
5236		break;
5237	case P_WRITE_ACK:
5238		what = WRITE_ACKED_BY_PEER;
5239		break;
5240	case P_RECV_ACK:
5241		what = RECV_ACKED_BY_PEER;
5242		break;
5243	case P_SUPERSEDED:
5244		what = CONFLICT_RESOLVED;
5245		break;
5246	case P_RETRY_WRITE:
5247		what = POSTPONE_WRITE;
5248		break;
5249	default:
5250		BUG();
5251	}
5252
5253	return validate_req_change_req_state(device, p->block_id, sector,
5254					     &device->write_requests, __func__,
5255					     what, false);
5256}
5257
5258static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5259{
5260	struct drbd_peer_device *peer_device;
5261	struct drbd_device *device;
5262	struct p_block_ack *p = pi->data;
5263	sector_t sector = be64_to_cpu(p->sector);
5264	int size = be32_to_cpu(p->blksize);
5265	int err;
5266
5267	peer_device = conn_peer_device(connection, pi->vnr);
5268	if (!peer_device)
5269		return -EIO;
5270	device = peer_device->device;
5271
5272	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5273
5274	if (p->block_id == ID_SYNCER) {
5275		dec_rs_pending(device);
5276		drbd_rs_failed_io(device, sector, size);
5277		return 0;
5278	}
5279
5280	err = validate_req_change_req_state(device, p->block_id, sector,
5281					    &device->write_requests, __func__,
5282					    NEG_ACKED, true);
5283	if (err) {
5284		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5285		   The master bio might already be completed, therefore the
5286		   request is no longer in the collision hash. */
5287		/* In Protocol B we might already have got a P_RECV_ACK
5288		   but then get a P_NEG_ACK afterwards. */
5289		drbd_set_out_of_sync(device, sector, size);
5290	}
5291	return 0;
5292}
5293
5294static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5295{
5296	struct drbd_peer_device *peer_device;
5297	struct drbd_device *device;
5298	struct p_block_ack *p = pi->data;
5299	sector_t sector = be64_to_cpu(p->sector);
5300
5301	peer_device = conn_peer_device(connection, pi->vnr);
5302	if (!peer_device)
5303		return -EIO;
5304	device = peer_device->device;
5305
5306	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5307
5308	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5309	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5310
5311	return validate_req_change_req_state(device, p->block_id, sector,
5312					     &device->read_requests, __func__,
5313					     NEG_ACKED, false);
5314}
5315
5316static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5317{
5318	struct drbd_peer_device *peer_device;
5319	struct drbd_device *device;
5320	sector_t sector;
5321	int size;
5322	struct p_block_ack *p = pi->data;
5323
5324	peer_device = conn_peer_device(connection, pi->vnr);
5325	if (!peer_device)
5326		return -EIO;
5327	device = peer_device->device;
5328
5329	sector = be64_to_cpu(p->sector);
5330	size = be32_to_cpu(p->blksize);
5331
5332	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5333
5334	dec_rs_pending(device);
5335
5336	if (get_ldev_if_state(device, D_FAILED)) {
5337		drbd_rs_complete_io(device, sector);
5338		switch (pi->cmd) {
5339		case P_NEG_RS_DREPLY:
5340			drbd_rs_failed_io(device, sector, size);
5341		case P_RS_CANCEL:
5342			break;
5343		default:
5344			BUG();
5345		}
5346		put_ldev(device);
5347	}
5348
5349	return 0;
5350}
5351
5352static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5353{
5354	struct p_barrier_ack *p = pi->data;
5355	struct drbd_peer_device *peer_device;
5356	int vnr;
5357
5358	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5359
5360	rcu_read_lock();
5361	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5362		struct drbd_device *device = peer_device->device;
5363
5364		if (device->state.conn == C_AHEAD &&
5365		    atomic_read(&device->ap_in_flight) == 0 &&
5366		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5367			device->start_resync_timer.expires = jiffies + HZ;
5368			add_timer(&device->start_resync_timer);
5369		}
5370	}
5371	rcu_read_unlock();
5372
5373	return 0;
5374}
5375
5376static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5377{
5378	struct drbd_peer_device *peer_device;
5379	struct drbd_device *device;
5380	struct p_block_ack *p = pi->data;
5381	struct drbd_device_work *dw;
5382	sector_t sector;
5383	int size;
5384
5385	peer_device = conn_peer_device(connection, pi->vnr);
5386	if (!peer_device)
5387		return -EIO;
5388	device = peer_device->device;
5389
5390	sector = be64_to_cpu(p->sector);
5391	size = be32_to_cpu(p->blksize);
5392
5393	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5394
5395	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5396		drbd_ov_out_of_sync_found(device, sector, size);
5397	else
5398		ov_out_of_sync_print(device);
5399
5400	if (!get_ldev(device))
5401		return 0;
5402
5403	drbd_rs_complete_io(device, sector);
5404	dec_rs_pending(device);
5405
5406	--device->ov_left;
5407
5408	/* let's advance progress step marks only for every other megabyte */
5409	if ((device->ov_left & 0x200) == 0x200)
5410		drbd_advance_rs_marks(device, device->ov_left);
5411
5412	if (device->ov_left == 0) {
5413		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5414		if (dw) {
5415			dw->w.cb = w_ov_finished;
5416			dw->device = device;
5417			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5418		} else {
5419			drbd_err(device, "kmalloc(dw) failed.");
5420			ov_out_of_sync_print(device);
5421			drbd_resync_finished(device);
5422		}
5423	}
5424	put_ldev(device);
5425	return 0;
5426}
5427
5428static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5429{
5430	return 0;
5431}
5432
5433static int connection_finish_peer_reqs(struct drbd_connection *connection)
5434{
5435	struct drbd_peer_device *peer_device;
5436	int vnr, not_empty = 0;
5437
5438	do {
5439		clear_bit(SIGNAL_ASENDER, &connection->flags);
5440		flush_signals(current);
5441
5442		rcu_read_lock();
5443		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5444			struct drbd_device *device = peer_device->device;
5445			kref_get(&device->kref);
5446			rcu_read_unlock();
5447			if (drbd_finish_peer_reqs(device)) {
5448				kref_put(&device->kref, drbd_destroy_device);
5449				return 1;
5450			}
5451			kref_put(&device->kref, drbd_destroy_device);
5452			rcu_read_lock();
5453		}
5454		set_bit(SIGNAL_ASENDER, &connection->flags);
5455
5456		spin_lock_irq(&connection->resource->req_lock);
5457		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5458			struct drbd_device *device = peer_device->device;
5459			not_empty = !list_empty(&device->done_ee);
5460			if (not_empty)
5461				break;
5462		}
5463		spin_unlock_irq(&connection->resource->req_lock);
5464		rcu_read_unlock();
5465	} while (not_empty);
5466
5467	return 0;
5468}
5469
5470struct asender_cmd {
5471	size_t pkt_size;
5472	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5473};
5474
5475static struct asender_cmd asender_tbl[] = {
5476	[P_PING]	    = { 0, got_Ping },
5477	[P_PING_ACK]	    = { 0, got_PingAck },
5478	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5479	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5480	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5481	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5482	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5483	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5484	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5485	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5486	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5487	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5488	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5489	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5490	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5491	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5492	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5493};
5494
5495int drbd_asender(struct drbd_thread *thi)
5496{
5497	struct drbd_connection *connection = thi->connection;
5498	struct asender_cmd *cmd = NULL;
5499	struct packet_info pi;
5500	int rv;
5501	void *buf    = connection->meta.rbuf;
5502	int received = 0;
5503	unsigned int header_size = drbd_header_size(connection);
5504	int expect   = header_size;
5505	bool ping_timeout_active = false;
5506	struct net_conf *nc;
5507	int ping_timeo, tcp_cork, ping_int;
5508	struct sched_param param = { .sched_priority = 2 };
5509
5510	rv = sched_setscheduler(current, SCHED_RR, &param);
5511	if (rv < 0)
5512		drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5513
5514	while (get_t_state(thi) == RUNNING) {
5515		drbd_thread_current_set_cpu(thi);
5516
5517		rcu_read_lock();
5518		nc = rcu_dereference(connection->net_conf);
5519		ping_timeo = nc->ping_timeo;
5520		tcp_cork = nc->tcp_cork;
5521		ping_int = nc->ping_int;
5522		rcu_read_unlock();
5523
5524		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5525			if (drbd_send_ping(connection)) {
5526				drbd_err(connection, "drbd_send_ping has failed\n");
5527				goto reconnect;
5528			}
5529			connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5530			ping_timeout_active = true;
5531		}
5532
5533		/* TODO: conditionally cork; it may hurt latency if we cork without
5534		   much to send */
5535		if (tcp_cork)
5536			drbd_tcp_cork(connection->meta.socket);
5537		if (connection_finish_peer_reqs(connection)) {
5538			drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5539			goto reconnect;
5540		}
5541		/* but unconditionally uncork unless disabled */
5542		if (tcp_cork)
5543			drbd_tcp_uncork(connection->meta.socket);
5544
5545		/* short circuit, recv_msg would return EINTR anyways. */
5546		if (signal_pending(current))
5547			continue;
5548
5549		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5550		clear_bit(SIGNAL_ASENDER, &connection->flags);
5551
5552		flush_signals(current);
5553
5554		/* Note:
5555		 * -EINTR	 (on meta) we got a signal
5556		 * -EAGAIN	 (on meta) rcvtimeo expired
5557		 * -ECONNRESET	 other side closed the connection
5558		 * -ERESTARTSYS  (on data) we got a signal
5559		 * rv <  0	 other than above: unexpected error!
5560		 * rv == expected: full header or command
5561		 * rv <  expected: "woken" by signal during receive
5562		 * rv == 0	 : "connection shut down by peer"
5563		 */
5564received_more:
5565		if (likely(rv > 0)) {
5566			received += rv;
5567			buf	 += rv;
5568		} else if (rv == 0) {
5569			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5570				long t;
5571				rcu_read_lock();
5572				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5573				rcu_read_unlock();
5574
5575				t = wait_event_timeout(connection->ping_wait,
5576						       connection->cstate < C_WF_REPORT_PARAMS,
5577						       t);
5578				if (t)
5579					break;
5580			}
5581			drbd_err(connection, "meta connection shut down by peer.\n");
5582			goto reconnect;
5583		} else if (rv == -EAGAIN) {
5584			/* If the data socket received something meanwhile,
5585			 * that is good enough: peer is still alive. */
5586			if (time_after(connection->last_received,
5587				jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5588				continue;
5589			if (ping_timeout_active) {
5590				drbd_err(connection, "PingAck did not arrive in time.\n");
5591				goto reconnect;
5592			}
5593			set_bit(SEND_PING, &connection->flags);
5594			continue;
5595		} else if (rv == -EINTR) {
5596			continue;
5597		} else {
5598			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5599			goto reconnect;
5600		}
5601
5602		if (received == expect && cmd == NULL) {
5603			if (decode_header(connection, connection->meta.rbuf, &pi))
5604				goto reconnect;
5605			cmd = &asender_tbl[pi.cmd];
5606			if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5607				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5608					 cmdname(pi.cmd), pi.cmd);
5609				goto disconnect;
5610			}
5611			expect = header_size + cmd->pkt_size;
5612			if (pi.size != expect - header_size) {
5613				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5614					pi.cmd, pi.size);
5615				goto reconnect;
5616			}
5617		}
5618		if (received == expect) {
5619			bool err;
5620
5621			err = cmd->fn(connection, &pi);
5622			if (err) {
5623				drbd_err(connection, "%pf failed\n", cmd->fn);
5624				goto reconnect;
5625			}
5626
5627			connection->last_received = jiffies;
5628
5629			if (cmd == &asender_tbl[P_PING_ACK]) {
5630				/* restore idle timeout */
5631				connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5632				ping_timeout_active = false;
5633			}
5634
5635			buf	 = connection->meta.rbuf;
5636			received = 0;
5637			expect	 = header_size;
5638			cmd	 = NULL;
5639		}
5640		if (test_bit(SEND_PING, &connection->flags))
5641			continue;
5642		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT);
5643		if (rv > 0)
5644			goto received_more;
5645	}
5646
5647	if (0) {
5648reconnect:
5649		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5650		conn_md_sync(connection);
5651	}
5652	if (0) {
5653disconnect:
5654		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5655	}
5656	clear_bit(SIGNAL_ASENDER, &connection->flags);
5657
5658	drbd_info(connection, "asender terminated\n");
5659
5660	return 0;
5661}
5662