drbd_worker.c revision a80ca1ae81fc52e304e753f6de4ef248df364f9e
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24*/
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_protocol.h"
40#include "drbd_req.h"
41
42static int make_ov_request(struct drbd_device *, int);
43static int make_resync_request(struct drbd_device *, int);
44
45/* endio handlers:
46 *   drbd_md_io_complete (defined here)
47 *   drbd_request_endio (defined here)
48 *   drbd_peer_request_endio (defined here)
49 *   bm_async_io_complete (defined in drbd_bitmap.c)
50 *
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
55 *
56 */
57
58
59/* About the global_state_lock
60   Each state transition on an device holds a read lock. In case we have
61   to evaluate the resync after dependencies, we grab a write lock, because
62   we need stable states on all devices for that.  */
63rwlock_t global_state_lock;
64
65/* used for synchronous meta data and bitmap IO
66 * submitted by drbd_md_sync_page_io()
67 */
68void drbd_md_io_complete(struct bio *bio, int error)
69{
70	struct drbd_md_io *md_io;
71	struct drbd_device *device;
72
73	md_io = (struct drbd_md_io *)bio->bi_private;
74	device = container_of(md_io, struct drbd_device, md_io);
75
76	md_io->error = error;
77
78	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
79	 * to timeout on the lower level device, and eventually detach from it.
80	 * If this io completion runs after that timeout expired, this
81	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
82	 * During normal operation, this only puts that extra reference
83	 * down to 1 again.
84	 * Make sure we first drop the reference, and only then signal
85	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
86	 * next drbd_md_sync_page_io(), that we trigger the
87	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
88	 */
89	drbd_md_put_buffer(device);
90	md_io->done = 1;
91	wake_up(&device->misc_wait);
92	bio_put(bio);
93	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
94		put_ldev(device);
95}
96
97/* reads on behalf of the partner,
98 * "submitted" by the receiver
99 */
100static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101{
102	unsigned long flags = 0;
103	struct drbd_peer_device *peer_device = peer_req->peer_device;
104	struct drbd_device *device = peer_device->device;
105
106	spin_lock_irqsave(&device->resource->req_lock, flags);
107	device->read_cnt += peer_req->i.size >> 9;
108	list_del(&peer_req->w.list);
109	if (list_empty(&device->read_ee))
110		wake_up(&device->ee_wait);
111	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
112		__drbd_chk_io_error(device, DRBD_READ_ERROR);
113	spin_unlock_irqrestore(&device->resource->req_lock, flags);
114
115	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
116	put_ldev(device);
117}
118
119/* writes on behalf of the partner, or resync writes,
120 * "submitted" by the receiver, final stage.  */
121void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
122{
123	unsigned long flags = 0;
124	struct drbd_peer_device *peer_device = peer_req->peer_device;
125	struct drbd_device *device = peer_device->device;
126	struct drbd_interval i;
127	int do_wake;
128	u64 block_id;
129	int do_al_complete_io;
130
131	/* after we moved peer_req to done_ee,
132	 * we may no longer access it,
133	 * it may be freed/reused already!
134	 * (as soon as we release the req_lock) */
135	i = peer_req->i;
136	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
137	block_id = peer_req->block_id;
138
139	spin_lock_irqsave(&device->resource->req_lock, flags);
140	device->writ_cnt += peer_req->i.size >> 9;
141	list_move_tail(&peer_req->w.list, &device->done_ee);
142
143	/*
144	 * Do not remove from the write_requests tree here: we did not send the
145	 * Ack yet and did not wake possibly waiting conflicting requests.
146	 * Removed from the tree from "drbd_process_done_ee" within the
147	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
148	 * _drbd_clear_done_ee.
149	 */
150
151	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
152
153	/* FIXME do we want to detach for failed REQ_DISCARD?
154	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
155	if (peer_req->flags & EE_WAS_ERROR)
156		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
157	spin_unlock_irqrestore(&device->resource->req_lock, flags);
158
159	if (block_id == ID_SYNCER)
160		drbd_rs_complete_io(device, i.sector);
161
162	if (do_wake)
163		wake_up(&device->ee_wait);
164
165	if (do_al_complete_io)
166		drbd_al_complete_io(device, &i);
167
168	wake_asender(peer_device->connection);
169	put_ldev(device);
170}
171
172/* writes on behalf of the partner, or resync writes,
173 * "submitted" by the receiver.
174 */
175void drbd_peer_request_endio(struct bio *bio, int error)
176{
177	struct drbd_peer_request *peer_req = bio->bi_private;
178	struct drbd_device *device = peer_req->peer_device->device;
179	int uptodate = bio_flagged(bio, BIO_UPTODATE);
180	int is_write = bio_data_dir(bio) == WRITE;
181	int is_discard = !!(bio->bi_rw & REQ_DISCARD);
182
183	if (error && __ratelimit(&drbd_ratelimit_state))
184		drbd_warn(device, "%s: error=%d s=%llus\n",
185				is_write ? (is_discard ? "discard" : "write")
186					: "read", error,
187				(unsigned long long)peer_req->i.sector);
188	if (!error && !uptodate) {
189		if (__ratelimit(&drbd_ratelimit_state))
190			drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
191					is_write ? "write" : "read",
192					(unsigned long long)peer_req->i.sector);
193		/* strange behavior of some lower level drivers...
194		 * fail the request by clearing the uptodate flag,
195		 * but do not return any error?! */
196		error = -EIO;
197	}
198
199	if (error)
200		set_bit(__EE_WAS_ERROR, &peer_req->flags);
201
202	bio_put(bio); /* no need for the bio anymore */
203	if (atomic_dec_and_test(&peer_req->pending_bios)) {
204		if (is_write)
205			drbd_endio_write_sec_final(peer_req);
206		else
207			drbd_endio_read_sec_final(peer_req);
208	}
209}
210
211/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
212 */
213void drbd_request_endio(struct bio *bio, int error)
214{
215	unsigned long flags;
216	struct drbd_request *req = bio->bi_private;
217	struct drbd_device *device = req->device;
218	struct bio_and_error m;
219	enum drbd_req_event what;
220	int uptodate = bio_flagged(bio, BIO_UPTODATE);
221
222	if (!error && !uptodate) {
223		drbd_warn(device, "p %s: setting error to -EIO\n",
224			 bio_data_dir(bio) == WRITE ? "write" : "read");
225		/* strange behavior of some lower level drivers...
226		 * fail the request by clearing the uptodate flag,
227		 * but do not return any error?! */
228		error = -EIO;
229	}
230
231
232	/* If this request was aborted locally before,
233	 * but now was completed "successfully",
234	 * chances are that this caused arbitrary data corruption.
235	 *
236	 * "aborting" requests, or force-detaching the disk, is intended for
237	 * completely blocked/hung local backing devices which do no longer
238	 * complete requests at all, not even do error completions.  In this
239	 * situation, usually a hard-reset and failover is the only way out.
240	 *
241	 * By "aborting", basically faking a local error-completion,
242	 * we allow for a more graceful swichover by cleanly migrating services.
243	 * Still the affected node has to be rebooted "soon".
244	 *
245	 * By completing these requests, we allow the upper layers to re-use
246	 * the associated data pages.
247	 *
248	 * If later the local backing device "recovers", and now DMAs some data
249	 * from disk into the original request pages, in the best case it will
250	 * just put random data into unused pages; but typically it will corrupt
251	 * meanwhile completely unrelated data, causing all sorts of damage.
252	 *
253	 * Which means delayed successful completion,
254	 * especially for READ requests,
255	 * is a reason to panic().
256	 *
257	 * We assume that a delayed *error* completion is OK,
258	 * though we still will complain noisily about it.
259	 */
260	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
261		if (__ratelimit(&drbd_ratelimit_state))
262			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
263
264		if (!error)
265			panic("possible random memory corruption caused by delayed completion of aborted local request\n");
266	}
267
268	/* to avoid recursion in __req_mod */
269	if (unlikely(error)) {
270		if (bio->bi_rw & REQ_DISCARD)
271			what = (error == -EOPNOTSUPP)
272				? DISCARD_COMPLETED_NOTSUPP
273				: DISCARD_COMPLETED_WITH_ERROR;
274		else
275			what = (bio_data_dir(bio) == WRITE)
276			? WRITE_COMPLETED_WITH_ERROR
277			: (bio_rw(bio) == READ)
278			  ? READ_COMPLETED_WITH_ERROR
279			  : READ_AHEAD_COMPLETED_WITH_ERROR;
280	} else
281		what = COMPLETED_OK;
282
283	bio_put(req->private_bio);
284	req->private_bio = ERR_PTR(error);
285
286	/* not req_mod(), we need irqsave here! */
287	spin_lock_irqsave(&device->resource->req_lock, flags);
288	__req_mod(req, what, &m);
289	spin_unlock_irqrestore(&device->resource->req_lock, flags);
290	put_ldev(device);
291
292	if (m.bio)
293		complete_master_bio(device, &m);
294}
295
296void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
297{
298	struct hash_desc desc;
299	struct scatterlist sg;
300	struct page *page = peer_req->pages;
301	struct page *tmp;
302	unsigned len;
303
304	desc.tfm = tfm;
305	desc.flags = 0;
306
307	sg_init_table(&sg, 1);
308	crypto_hash_init(&desc);
309
310	while ((tmp = page_chain_next(page))) {
311		/* all but the last page will be fully used */
312		sg_set_page(&sg, page, PAGE_SIZE, 0);
313		crypto_hash_update(&desc, &sg, sg.length);
314		page = tmp;
315	}
316	/* and now the last, possibly only partially used page */
317	len = peer_req->i.size & (PAGE_SIZE - 1);
318	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
319	crypto_hash_update(&desc, &sg, sg.length);
320	crypto_hash_final(&desc, digest);
321}
322
323void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
324{
325	struct hash_desc desc;
326	struct scatterlist sg;
327	struct bio_vec bvec;
328	struct bvec_iter iter;
329
330	desc.tfm = tfm;
331	desc.flags = 0;
332
333	sg_init_table(&sg, 1);
334	crypto_hash_init(&desc);
335
336	bio_for_each_segment(bvec, bio, iter) {
337		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
338		crypto_hash_update(&desc, &sg, sg.length);
339	}
340	crypto_hash_final(&desc, digest);
341}
342
343/* MAYBE merge common code with w_e_end_ov_req */
344static int w_e_send_csum(struct drbd_work *w, int cancel)
345{
346	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
347	struct drbd_peer_device *peer_device = peer_req->peer_device;
348	struct drbd_device *device = peer_device->device;
349	int digest_size;
350	void *digest;
351	int err = 0;
352
353	if (unlikely(cancel))
354		goto out;
355
356	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
357		goto out;
358
359	digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
360	digest = kmalloc(digest_size, GFP_NOIO);
361	if (digest) {
362		sector_t sector = peer_req->i.sector;
363		unsigned int size = peer_req->i.size;
364		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
365		/* Free peer_req and pages before send.
366		 * In case we block on congestion, we could otherwise run into
367		 * some distributed deadlock, if the other side blocks on
368		 * congestion as well, because our receiver blocks in
369		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
370		drbd_free_peer_req(device, peer_req);
371		peer_req = NULL;
372		inc_rs_pending(device);
373		err = drbd_send_drequest_csum(peer_device, sector, size,
374					      digest, digest_size,
375					      P_CSUM_RS_REQUEST);
376		kfree(digest);
377	} else {
378		drbd_err(device, "kmalloc() of digest failed.\n");
379		err = -ENOMEM;
380	}
381
382out:
383	if (peer_req)
384		drbd_free_peer_req(device, peer_req);
385
386	if (unlikely(err))
387		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
388	return err;
389}
390
391#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
392
393static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
394{
395	struct drbd_device *device = peer_device->device;
396	struct drbd_peer_request *peer_req;
397
398	if (!get_ldev(device))
399		return -EIO;
400
401	if (drbd_rs_should_slow_down(device, sector))
402		goto defer;
403
404	/* GFP_TRY, because if there is no memory available right now, this may
405	 * be rescheduled for later. It is "only" background resync, after all. */
406	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
407				       size, true /* has real payload */, GFP_TRY);
408	if (!peer_req)
409		goto defer;
410
411	peer_req->w.cb = w_e_send_csum;
412	spin_lock_irq(&device->resource->req_lock);
413	list_add(&peer_req->w.list, &device->read_ee);
414	spin_unlock_irq(&device->resource->req_lock);
415
416	atomic_add(size >> 9, &device->rs_sect_ev);
417	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
418		return 0;
419
420	/* If it failed because of ENOMEM, retry should help.  If it failed
421	 * because bio_add_page failed (probably broken lower level driver),
422	 * retry may or may not help.
423	 * If it does not, you may need to force disconnect. */
424	spin_lock_irq(&device->resource->req_lock);
425	list_del(&peer_req->w.list);
426	spin_unlock_irq(&device->resource->req_lock);
427
428	drbd_free_peer_req(device, peer_req);
429defer:
430	put_ldev(device);
431	return -EAGAIN;
432}
433
434int w_resync_timer(struct drbd_work *w, int cancel)
435{
436	struct drbd_device *device =
437		container_of(w, struct drbd_device, resync_work);
438
439	switch (device->state.conn) {
440	case C_VERIFY_S:
441		make_ov_request(device, cancel);
442		break;
443	case C_SYNC_TARGET:
444		make_resync_request(device, cancel);
445		break;
446	}
447
448	return 0;
449}
450
451void resync_timer_fn(unsigned long data)
452{
453	struct drbd_device *device = (struct drbd_device *) data;
454
455	if (list_empty(&device->resync_work.list))
456		drbd_queue_work(&first_peer_device(device)->connection->sender_work,
457				&device->resync_work);
458}
459
460static void fifo_set(struct fifo_buffer *fb, int value)
461{
462	int i;
463
464	for (i = 0; i < fb->size; i++)
465		fb->values[i] = value;
466}
467
468static int fifo_push(struct fifo_buffer *fb, int value)
469{
470	int ov;
471
472	ov = fb->values[fb->head_index];
473	fb->values[fb->head_index++] = value;
474
475	if (fb->head_index >= fb->size)
476		fb->head_index = 0;
477
478	return ov;
479}
480
481static void fifo_add_val(struct fifo_buffer *fb, int value)
482{
483	int i;
484
485	for (i = 0; i < fb->size; i++)
486		fb->values[i] += value;
487}
488
489struct fifo_buffer *fifo_alloc(int fifo_size)
490{
491	struct fifo_buffer *fb;
492
493	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
494	if (!fb)
495		return NULL;
496
497	fb->head_index = 0;
498	fb->size = fifo_size;
499	fb->total = 0;
500
501	return fb;
502}
503
504static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
505{
506	struct disk_conf *dc;
507	unsigned int want;     /* The number of sectors we want in the proxy */
508	int req_sect; /* Number of sectors to request in this turn */
509	int correction; /* Number of sectors more we need in the proxy*/
510	int cps; /* correction per invocation of drbd_rs_controller() */
511	int steps; /* Number of time steps to plan ahead */
512	int curr_corr;
513	int max_sect;
514	struct fifo_buffer *plan;
515
516	dc = rcu_dereference(device->ldev->disk_conf);
517	plan = rcu_dereference(device->rs_plan_s);
518
519	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
520
521	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
522		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
523	} else { /* normal path */
524		want = dc->c_fill_target ? dc->c_fill_target :
525			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
526	}
527
528	correction = want - device->rs_in_flight - plan->total;
529
530	/* Plan ahead */
531	cps = correction / steps;
532	fifo_add_val(plan, cps);
533	plan->total += cps * steps;
534
535	/* What we do in this step */
536	curr_corr = fifo_push(plan, 0);
537	plan->total -= curr_corr;
538
539	req_sect = sect_in + curr_corr;
540	if (req_sect < 0)
541		req_sect = 0;
542
543	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
544	if (req_sect > max_sect)
545		req_sect = max_sect;
546
547	/*
548	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
549		 sect_in, device->rs_in_flight, want, correction,
550		 steps, cps, device->rs_planed, curr_corr, req_sect);
551	*/
552
553	return req_sect;
554}
555
556static int drbd_rs_number_requests(struct drbd_device *device)
557{
558	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
559	int number, mxb;
560
561	sect_in = atomic_xchg(&device->rs_sect_in, 0);
562	device->rs_in_flight -= sect_in;
563
564	rcu_read_lock();
565	mxb = drbd_get_max_buffers(device) / 2;
566	if (rcu_dereference(device->rs_plan_s)->size) {
567		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
568		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
569	} else {
570		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
571		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
572	}
573	rcu_read_unlock();
574
575	/* Don't have more than "max-buffers"/2 in-flight.
576	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
577	 * potentially causing a distributed deadlock on congestion during
578	 * online-verify or (checksum-based) resync, if max-buffers,
579	 * socket buffer sizes and resync rate settings are mis-configured. */
580	if (mxb - device->rs_in_flight < number)
581		number = mxb - device->rs_in_flight;
582
583	return number;
584}
585
586static int make_resync_request(struct drbd_device *const device, int cancel)
587{
588	struct drbd_peer_device *const peer_device = first_peer_device(device);
589	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
590	unsigned long bit;
591	sector_t sector;
592	const sector_t capacity = drbd_get_capacity(device->this_bdev);
593	int max_bio_size;
594	int number, rollback_i, size;
595	int align, queued, sndbuf;
596	int i = 0;
597
598	if (unlikely(cancel))
599		return 0;
600
601	if (device->rs_total == 0) {
602		/* empty resync? */
603		drbd_resync_finished(device);
604		return 0;
605	}
606
607	if (!get_ldev(device)) {
608		/* Since we only need to access device->rsync a
609		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
610		   to continue resync with a broken disk makes no sense at
611		   all */
612		drbd_err(device, "Disk broke down during resync!\n");
613		return 0;
614	}
615
616	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
617	number = drbd_rs_number_requests(device);
618	if (number <= 0)
619		goto requeue;
620
621	for (i = 0; i < number; i++) {
622		/* Stop generating RS requests, when half of the send buffer is filled */
623		mutex_lock(&connection->data.mutex);
624		if (connection->data.socket) {
625			queued = connection->data.socket->sk->sk_wmem_queued;
626			sndbuf = connection->data.socket->sk->sk_sndbuf;
627		} else {
628			queued = 1;
629			sndbuf = 0;
630		}
631		mutex_unlock(&connection->data.mutex);
632		if (queued > sndbuf / 2)
633			goto requeue;
634
635next_sector:
636		size = BM_BLOCK_SIZE;
637		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
638
639		if (bit == DRBD_END_OF_BITMAP) {
640			device->bm_resync_fo = drbd_bm_bits(device);
641			put_ldev(device);
642			return 0;
643		}
644
645		sector = BM_BIT_TO_SECT(bit);
646
647		if (drbd_rs_should_slow_down(device, sector) ||
648		    drbd_try_rs_begin_io(device, sector)) {
649			device->bm_resync_fo = bit;
650			goto requeue;
651		}
652		device->bm_resync_fo = bit + 1;
653
654		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
655			drbd_rs_complete_io(device, sector);
656			goto next_sector;
657		}
658
659#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
660		/* try to find some adjacent bits.
661		 * we stop if we have already the maximum req size.
662		 *
663		 * Additionally always align bigger requests, in order to
664		 * be prepared for all stripe sizes of software RAIDs.
665		 */
666		align = 1;
667		rollback_i = i;
668		while (i < number) {
669			if (size + BM_BLOCK_SIZE > max_bio_size)
670				break;
671
672			/* Be always aligned */
673			if (sector & ((1<<(align+3))-1))
674				break;
675
676			/* do not cross extent boundaries */
677			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
678				break;
679			/* now, is it actually dirty, after all?
680			 * caution, drbd_bm_test_bit is tri-state for some
681			 * obscure reason; ( b == 0 ) would get the out-of-band
682			 * only accidentally right because of the "oddly sized"
683			 * adjustment below */
684			if (drbd_bm_test_bit(device, bit+1) != 1)
685				break;
686			bit++;
687			size += BM_BLOCK_SIZE;
688			if ((BM_BLOCK_SIZE << align) <= size)
689				align++;
690			i++;
691		}
692		/* if we merged some,
693		 * reset the offset to start the next drbd_bm_find_next from */
694		if (size > BM_BLOCK_SIZE)
695			device->bm_resync_fo = bit + 1;
696#endif
697
698		/* adjust very last sectors, in case we are oddly sized */
699		if (sector + (size>>9) > capacity)
700			size = (capacity-sector)<<9;
701		if (connection->agreed_pro_version >= 89 &&
702		    connection->csums_tfm) {
703			switch (read_for_csum(peer_device, sector, size)) {
704			case -EIO: /* Disk failure */
705				put_ldev(device);
706				return -EIO;
707			case -EAGAIN: /* allocation failed, or ldev busy */
708				drbd_rs_complete_io(device, sector);
709				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
710				i = rollback_i;
711				goto requeue;
712			case 0:
713				/* everything ok */
714				break;
715			default:
716				BUG();
717			}
718		} else {
719			int err;
720
721			inc_rs_pending(device);
722			err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
723						 sector, size, ID_SYNCER);
724			if (err) {
725				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
726				dec_rs_pending(device);
727				put_ldev(device);
728				return err;
729			}
730		}
731	}
732
733	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
734		/* last syncer _request_ was sent,
735		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
736		 * next sync group will resume), as soon as we receive the last
737		 * resync data block, and the last bit is cleared.
738		 * until then resync "work" is "inactive" ...
739		 */
740		put_ldev(device);
741		return 0;
742	}
743
744 requeue:
745	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
746	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
747	put_ldev(device);
748	return 0;
749}
750
751static int make_ov_request(struct drbd_device *device, int cancel)
752{
753	int number, i, size;
754	sector_t sector;
755	const sector_t capacity = drbd_get_capacity(device->this_bdev);
756	bool stop_sector_reached = false;
757
758	if (unlikely(cancel))
759		return 1;
760
761	number = drbd_rs_number_requests(device);
762
763	sector = device->ov_position;
764	for (i = 0; i < number; i++) {
765		if (sector >= capacity)
766			return 1;
767
768		/* We check for "finished" only in the reply path:
769		 * w_e_end_ov_reply().
770		 * We need to send at least one request out. */
771		stop_sector_reached = i > 0
772			&& verify_can_do_stop_sector(device)
773			&& sector >= device->ov_stop_sector;
774		if (stop_sector_reached)
775			break;
776
777		size = BM_BLOCK_SIZE;
778
779		if (drbd_rs_should_slow_down(device, sector) ||
780		    drbd_try_rs_begin_io(device, sector)) {
781			device->ov_position = sector;
782			goto requeue;
783		}
784
785		if (sector + (size>>9) > capacity)
786			size = (capacity-sector)<<9;
787
788		inc_rs_pending(device);
789		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
790			dec_rs_pending(device);
791			return 0;
792		}
793		sector += BM_SECT_PER_BIT;
794	}
795	device->ov_position = sector;
796
797 requeue:
798	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
799	if (i == 0 || !stop_sector_reached)
800		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
801	return 1;
802}
803
804int w_ov_finished(struct drbd_work *w, int cancel)
805{
806	struct drbd_device_work *dw =
807		container_of(w, struct drbd_device_work, w);
808	struct drbd_device *device = dw->device;
809	kfree(dw);
810	ov_out_of_sync_print(device);
811	drbd_resync_finished(device);
812
813	return 0;
814}
815
816static int w_resync_finished(struct drbd_work *w, int cancel)
817{
818	struct drbd_device_work *dw =
819		container_of(w, struct drbd_device_work, w);
820	struct drbd_device *device = dw->device;
821	kfree(dw);
822
823	drbd_resync_finished(device);
824
825	return 0;
826}
827
828static void ping_peer(struct drbd_device *device)
829{
830	struct drbd_connection *connection = first_peer_device(device)->connection;
831
832	clear_bit(GOT_PING_ACK, &connection->flags);
833	request_ping(connection);
834	wait_event(connection->ping_wait,
835		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
836}
837
838int drbd_resync_finished(struct drbd_device *device)
839{
840	unsigned long db, dt, dbdt;
841	unsigned long n_oos;
842	union drbd_state os, ns;
843	struct drbd_device_work *dw;
844	char *khelper_cmd = NULL;
845	int verify_done = 0;
846
847	/* Remove all elements from the resync LRU. Since future actions
848	 * might set bits in the (main) bitmap, then the entries in the
849	 * resync LRU would be wrong. */
850	if (drbd_rs_del_all(device)) {
851		/* In case this is not possible now, most probably because
852		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
853		 * queue (or even the read operations for those packets
854		 * is not finished by now).   Retry in 100ms. */
855
856		schedule_timeout_interruptible(HZ / 10);
857		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
858		if (dw) {
859			dw->w.cb = w_resync_finished;
860			dw->device = device;
861			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
862					&dw->w);
863			return 1;
864		}
865		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
866	}
867
868	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
869	if (dt <= 0)
870		dt = 1;
871
872	db = device->rs_total;
873	/* adjust for verify start and stop sectors, respective reached position */
874	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
875		db -= device->ov_left;
876
877	dbdt = Bit2KB(db/dt);
878	device->rs_paused /= HZ;
879
880	if (!get_ldev(device))
881		goto out;
882
883	ping_peer(device);
884
885	spin_lock_irq(&device->resource->req_lock);
886	os = drbd_read_state(device);
887
888	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
889
890	/* This protects us against multiple calls (that can happen in the presence
891	   of application IO), and against connectivity loss just before we arrive here. */
892	if (os.conn <= C_CONNECTED)
893		goto out_unlock;
894
895	ns = os;
896	ns.conn = C_CONNECTED;
897
898	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
899	     verify_done ? "Online verify" : "Resync",
900	     dt + device->rs_paused, device->rs_paused, dbdt);
901
902	n_oos = drbd_bm_total_weight(device);
903
904	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
905		if (n_oos) {
906			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
907			      n_oos, Bit2KB(1));
908			khelper_cmd = "out-of-sync";
909		}
910	} else {
911		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
912
913		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
914			khelper_cmd = "after-resync-target";
915
916		if (first_peer_device(device)->connection->csums_tfm && device->rs_total) {
917			const unsigned long s = device->rs_same_csum;
918			const unsigned long t = device->rs_total;
919			const int ratio =
920				(t == 0)     ? 0 :
921			(t < 100000) ? ((s*100)/t) : (s/(t/100));
922			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
923			     "transferred %luK total %luK\n",
924			     ratio,
925			     Bit2KB(device->rs_same_csum),
926			     Bit2KB(device->rs_total - device->rs_same_csum),
927			     Bit2KB(device->rs_total));
928		}
929	}
930
931	if (device->rs_failed) {
932		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
933
934		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
935			ns.disk = D_INCONSISTENT;
936			ns.pdsk = D_UP_TO_DATE;
937		} else {
938			ns.disk = D_UP_TO_DATE;
939			ns.pdsk = D_INCONSISTENT;
940		}
941	} else {
942		ns.disk = D_UP_TO_DATE;
943		ns.pdsk = D_UP_TO_DATE;
944
945		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
946			if (device->p_uuid) {
947				int i;
948				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
949					_drbd_uuid_set(device, i, device->p_uuid[i]);
950				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
951				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
952			} else {
953				drbd_err(device, "device->p_uuid is NULL! BUG\n");
954			}
955		}
956
957		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
958			/* for verify runs, we don't update uuids here,
959			 * so there would be nothing to report. */
960			drbd_uuid_set_bm(device, 0UL);
961			drbd_print_uuids(device, "updated UUIDs");
962			if (device->p_uuid) {
963				/* Now the two UUID sets are equal, update what we
964				 * know of the peer. */
965				int i;
966				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
967					device->p_uuid[i] = device->ldev->md.uuid[i];
968			}
969		}
970	}
971
972	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
973out_unlock:
974	spin_unlock_irq(&device->resource->req_lock);
975	put_ldev(device);
976out:
977	device->rs_total  = 0;
978	device->rs_failed = 0;
979	device->rs_paused = 0;
980
981	/* reset start sector, if we reached end of device */
982	if (verify_done && device->ov_left == 0)
983		device->ov_start_sector = 0;
984
985	drbd_md_sync(device);
986
987	if (khelper_cmd)
988		drbd_khelper(device, khelper_cmd);
989
990	return 1;
991}
992
993/* helper */
994static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
995{
996	if (drbd_peer_req_has_active_page(peer_req)) {
997		/* This might happen if sendpage() has not finished */
998		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
999		atomic_add(i, &device->pp_in_use_by_net);
1000		atomic_sub(i, &device->pp_in_use);
1001		spin_lock_irq(&device->resource->req_lock);
1002		list_add_tail(&peer_req->w.list, &device->net_ee);
1003		spin_unlock_irq(&device->resource->req_lock);
1004		wake_up(&drbd_pp_wait);
1005	} else
1006		drbd_free_peer_req(device, peer_req);
1007}
1008
1009/**
1010 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1011 * @device:	DRBD device.
1012 * @w:		work object.
1013 * @cancel:	The connection will be closed anyways
1014 */
1015int w_e_end_data_req(struct drbd_work *w, int cancel)
1016{
1017	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1018	struct drbd_peer_device *peer_device = peer_req->peer_device;
1019	struct drbd_device *device = peer_device->device;
1020	int err;
1021
1022	if (unlikely(cancel)) {
1023		drbd_free_peer_req(device, peer_req);
1024		dec_unacked(device);
1025		return 0;
1026	}
1027
1028	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1029		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1030	} else {
1031		if (__ratelimit(&drbd_ratelimit_state))
1032			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1033			    (unsigned long long)peer_req->i.sector);
1034
1035		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1036	}
1037
1038	dec_unacked(device);
1039
1040	move_to_net_ee_or_free(device, peer_req);
1041
1042	if (unlikely(err))
1043		drbd_err(device, "drbd_send_block() failed\n");
1044	return err;
1045}
1046
1047/**
1048 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1049 * @w:		work object.
1050 * @cancel:	The connection will be closed anyways
1051 */
1052int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1053{
1054	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1055	struct drbd_peer_device *peer_device = peer_req->peer_device;
1056	struct drbd_device *device = peer_device->device;
1057	int err;
1058
1059	if (unlikely(cancel)) {
1060		drbd_free_peer_req(device, peer_req);
1061		dec_unacked(device);
1062		return 0;
1063	}
1064
1065	if (get_ldev_if_state(device, D_FAILED)) {
1066		drbd_rs_complete_io(device, peer_req->i.sector);
1067		put_ldev(device);
1068	}
1069
1070	if (device->state.conn == C_AHEAD) {
1071		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1072	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1073		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1074			inc_rs_pending(device);
1075			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1076		} else {
1077			if (__ratelimit(&drbd_ratelimit_state))
1078				drbd_err(device, "Not sending RSDataReply, "
1079				    "partner DISKLESS!\n");
1080			err = 0;
1081		}
1082	} else {
1083		if (__ratelimit(&drbd_ratelimit_state))
1084			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1085			    (unsigned long long)peer_req->i.sector);
1086
1087		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1088
1089		/* update resync data with failure */
1090		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1091	}
1092
1093	dec_unacked(device);
1094
1095	move_to_net_ee_or_free(device, peer_req);
1096
1097	if (unlikely(err))
1098		drbd_err(device, "drbd_send_block() failed\n");
1099	return err;
1100}
1101
1102int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1103{
1104	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1105	struct drbd_peer_device *peer_device = peer_req->peer_device;
1106	struct drbd_device *device = peer_device->device;
1107	struct digest_info *di;
1108	int digest_size;
1109	void *digest = NULL;
1110	int err, eq = 0;
1111
1112	if (unlikely(cancel)) {
1113		drbd_free_peer_req(device, peer_req);
1114		dec_unacked(device);
1115		return 0;
1116	}
1117
1118	if (get_ldev(device)) {
1119		drbd_rs_complete_io(device, peer_req->i.sector);
1120		put_ldev(device);
1121	}
1122
1123	di = peer_req->digest;
1124
1125	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1126		/* quick hack to try to avoid a race against reconfiguration.
1127		 * a real fix would be much more involved,
1128		 * introducing more locking mechanisms */
1129		if (peer_device->connection->csums_tfm) {
1130			digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1131			D_ASSERT(device, digest_size == di->digest_size);
1132			digest = kmalloc(digest_size, GFP_NOIO);
1133		}
1134		if (digest) {
1135			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1136			eq = !memcmp(digest, di->digest, digest_size);
1137			kfree(digest);
1138		}
1139
1140		if (eq) {
1141			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1142			/* rs_same_csums unit is BM_BLOCK_SIZE */
1143			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1144			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1145		} else {
1146			inc_rs_pending(device);
1147			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1148			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1149			kfree(di);
1150			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1151		}
1152	} else {
1153		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1154		if (__ratelimit(&drbd_ratelimit_state))
1155			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1156	}
1157
1158	dec_unacked(device);
1159	move_to_net_ee_or_free(device, peer_req);
1160
1161	if (unlikely(err))
1162		drbd_err(device, "drbd_send_block/ack() failed\n");
1163	return err;
1164}
1165
1166int w_e_end_ov_req(struct drbd_work *w, int cancel)
1167{
1168	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1169	struct drbd_peer_device *peer_device = peer_req->peer_device;
1170	struct drbd_device *device = peer_device->device;
1171	sector_t sector = peer_req->i.sector;
1172	unsigned int size = peer_req->i.size;
1173	int digest_size;
1174	void *digest;
1175	int err = 0;
1176
1177	if (unlikely(cancel))
1178		goto out;
1179
1180	digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1181	digest = kmalloc(digest_size, GFP_NOIO);
1182	if (!digest) {
1183		err = 1;	/* terminate the connection in case the allocation failed */
1184		goto out;
1185	}
1186
1187	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1188		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1189	else
1190		memset(digest, 0, digest_size);
1191
1192	/* Free e and pages before send.
1193	 * In case we block on congestion, we could otherwise run into
1194	 * some distributed deadlock, if the other side blocks on
1195	 * congestion as well, because our receiver blocks in
1196	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1197	drbd_free_peer_req(device, peer_req);
1198	peer_req = NULL;
1199	inc_rs_pending(device);
1200	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1201	if (err)
1202		dec_rs_pending(device);
1203	kfree(digest);
1204
1205out:
1206	if (peer_req)
1207		drbd_free_peer_req(device, peer_req);
1208	dec_unacked(device);
1209	return err;
1210}
1211
1212void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1213{
1214	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1215		device->ov_last_oos_size += size>>9;
1216	} else {
1217		device->ov_last_oos_start = sector;
1218		device->ov_last_oos_size = size>>9;
1219	}
1220	drbd_set_out_of_sync(device, sector, size);
1221}
1222
1223int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1224{
1225	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1226	struct drbd_peer_device *peer_device = peer_req->peer_device;
1227	struct drbd_device *device = peer_device->device;
1228	struct digest_info *di;
1229	void *digest;
1230	sector_t sector = peer_req->i.sector;
1231	unsigned int size = peer_req->i.size;
1232	int digest_size;
1233	int err, eq = 0;
1234	bool stop_sector_reached = false;
1235
1236	if (unlikely(cancel)) {
1237		drbd_free_peer_req(device, peer_req);
1238		dec_unacked(device);
1239		return 0;
1240	}
1241
1242	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1243	 * the resync lru has been cleaned up already */
1244	if (get_ldev(device)) {
1245		drbd_rs_complete_io(device, peer_req->i.sector);
1246		put_ldev(device);
1247	}
1248
1249	di = peer_req->digest;
1250
1251	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1252		digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1253		digest = kmalloc(digest_size, GFP_NOIO);
1254		if (digest) {
1255			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1256
1257			D_ASSERT(device, digest_size == di->digest_size);
1258			eq = !memcmp(digest, di->digest, digest_size);
1259			kfree(digest);
1260		}
1261	}
1262
1263	/* Free peer_req and pages before send.
1264	 * In case we block on congestion, we could otherwise run into
1265	 * some distributed deadlock, if the other side blocks on
1266	 * congestion as well, because our receiver blocks in
1267	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1268	drbd_free_peer_req(device, peer_req);
1269	if (!eq)
1270		drbd_ov_out_of_sync_found(device, sector, size);
1271	else
1272		ov_out_of_sync_print(device);
1273
1274	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1275			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1276
1277	dec_unacked(device);
1278
1279	--device->ov_left;
1280
1281	/* let's advance progress step marks only for every other megabyte */
1282	if ((device->ov_left & 0x200) == 0x200)
1283		drbd_advance_rs_marks(device, device->ov_left);
1284
1285	stop_sector_reached = verify_can_do_stop_sector(device) &&
1286		(sector + (size>>9)) >= device->ov_stop_sector;
1287
1288	if (device->ov_left == 0 || stop_sector_reached) {
1289		ov_out_of_sync_print(device);
1290		drbd_resync_finished(device);
1291	}
1292
1293	return err;
1294}
1295
1296/* FIXME
1297 * We need to track the number of pending barrier acks,
1298 * and to be able to wait for them.
1299 * See also comment in drbd_adm_attach before drbd_suspend_io.
1300 */
1301static int drbd_send_barrier(struct drbd_connection *connection)
1302{
1303	struct p_barrier *p;
1304	struct drbd_socket *sock;
1305
1306	sock = &connection->data;
1307	p = conn_prepare_command(connection, sock);
1308	if (!p)
1309		return -EIO;
1310	p->barrier = connection->send.current_epoch_nr;
1311	p->pad = 0;
1312	connection->send.current_epoch_writes = 0;
1313
1314	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1315}
1316
1317int w_send_write_hint(struct drbd_work *w, int cancel)
1318{
1319	struct drbd_device *device =
1320		container_of(w, struct drbd_device, unplug_work);
1321	struct drbd_socket *sock;
1322
1323	if (cancel)
1324		return 0;
1325	sock = &first_peer_device(device)->connection->data;
1326	if (!drbd_prepare_command(first_peer_device(device), sock))
1327		return -EIO;
1328	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1329}
1330
1331static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1332{
1333	if (!connection->send.seen_any_write_yet) {
1334		connection->send.seen_any_write_yet = true;
1335		connection->send.current_epoch_nr = epoch;
1336		connection->send.current_epoch_writes = 0;
1337	}
1338}
1339
1340static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1341{
1342	/* re-init if first write on this connection */
1343	if (!connection->send.seen_any_write_yet)
1344		return;
1345	if (connection->send.current_epoch_nr != epoch) {
1346		if (connection->send.current_epoch_writes)
1347			drbd_send_barrier(connection);
1348		connection->send.current_epoch_nr = epoch;
1349	}
1350}
1351
1352int w_send_out_of_sync(struct drbd_work *w, int cancel)
1353{
1354	struct drbd_request *req = container_of(w, struct drbd_request, w);
1355	struct drbd_device *device = req->device;
1356	struct drbd_peer_device *const peer_device = first_peer_device(device);
1357	struct drbd_connection *const connection = peer_device->connection;
1358	int err;
1359
1360	if (unlikely(cancel)) {
1361		req_mod(req, SEND_CANCELED);
1362		return 0;
1363	}
1364
1365	/* this time, no connection->send.current_epoch_writes++;
1366	 * If it was sent, it was the closing barrier for the last
1367	 * replicated epoch, before we went into AHEAD mode.
1368	 * No more barriers will be sent, until we leave AHEAD mode again. */
1369	maybe_send_barrier(connection, req->epoch);
1370
1371	err = drbd_send_out_of_sync(peer_device, req);
1372	req_mod(req, OOS_HANDED_TO_NETWORK);
1373
1374	return err;
1375}
1376
1377/**
1378 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1379 * @w:		work object.
1380 * @cancel:	The connection will be closed anyways
1381 */
1382int w_send_dblock(struct drbd_work *w, int cancel)
1383{
1384	struct drbd_request *req = container_of(w, struct drbd_request, w);
1385	struct drbd_device *device = req->device;
1386	struct drbd_peer_device *const peer_device = first_peer_device(device);
1387	struct drbd_connection *connection = peer_device->connection;
1388	int err;
1389
1390	if (unlikely(cancel)) {
1391		req_mod(req, SEND_CANCELED);
1392		return 0;
1393	}
1394
1395	re_init_if_first_write(connection, req->epoch);
1396	maybe_send_barrier(connection, req->epoch);
1397	connection->send.current_epoch_writes++;
1398
1399	err = drbd_send_dblock(peer_device, req);
1400	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1401
1402	return err;
1403}
1404
1405/**
1406 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1407 * @w:		work object.
1408 * @cancel:	The connection will be closed anyways
1409 */
1410int w_send_read_req(struct drbd_work *w, int cancel)
1411{
1412	struct drbd_request *req = container_of(w, struct drbd_request, w);
1413	struct drbd_device *device = req->device;
1414	struct drbd_peer_device *const peer_device = first_peer_device(device);
1415	struct drbd_connection *connection = peer_device->connection;
1416	int err;
1417
1418	if (unlikely(cancel)) {
1419		req_mod(req, SEND_CANCELED);
1420		return 0;
1421	}
1422
1423	/* Even read requests may close a write epoch,
1424	 * if there was any yet. */
1425	maybe_send_barrier(connection, req->epoch);
1426
1427	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1428				 (unsigned long)req);
1429
1430	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1431
1432	return err;
1433}
1434
1435int w_restart_disk_io(struct drbd_work *w, int cancel)
1436{
1437	struct drbd_request *req = container_of(w, struct drbd_request, w);
1438	struct drbd_device *device = req->device;
1439
1440	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1441		drbd_al_begin_io(device, &req->i, false);
1442
1443	drbd_req_make_private_bio(req, req->master_bio);
1444	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1445	generic_make_request(req->private_bio);
1446
1447	return 0;
1448}
1449
1450static int _drbd_may_sync_now(struct drbd_device *device)
1451{
1452	struct drbd_device *odev = device;
1453	int resync_after;
1454
1455	while (1) {
1456		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1457			return 1;
1458		rcu_read_lock();
1459		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1460		rcu_read_unlock();
1461		if (resync_after == -1)
1462			return 1;
1463		odev = minor_to_device(resync_after);
1464		if (!odev)
1465			return 1;
1466		if ((odev->state.conn >= C_SYNC_SOURCE &&
1467		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1468		    odev->state.aftr_isp || odev->state.peer_isp ||
1469		    odev->state.user_isp)
1470			return 0;
1471	}
1472}
1473
1474/**
1475 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1476 * @device:	DRBD device.
1477 *
1478 * Called from process context only (admin command and after_state_ch).
1479 */
1480static int _drbd_pause_after(struct drbd_device *device)
1481{
1482	struct drbd_device *odev;
1483	int i, rv = 0;
1484
1485	rcu_read_lock();
1486	idr_for_each_entry(&drbd_devices, odev, i) {
1487		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1488			continue;
1489		if (!_drbd_may_sync_now(odev))
1490			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1491			       != SS_NOTHING_TO_DO);
1492	}
1493	rcu_read_unlock();
1494
1495	return rv;
1496}
1497
1498/**
1499 * _drbd_resume_next() - Resume resync on all devices that may resync now
1500 * @device:	DRBD device.
1501 *
1502 * Called from process context only (admin command and worker).
1503 */
1504static int _drbd_resume_next(struct drbd_device *device)
1505{
1506	struct drbd_device *odev;
1507	int i, rv = 0;
1508
1509	rcu_read_lock();
1510	idr_for_each_entry(&drbd_devices, odev, i) {
1511		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1512			continue;
1513		if (odev->state.aftr_isp) {
1514			if (_drbd_may_sync_now(odev))
1515				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1516							CS_HARD, NULL)
1517				       != SS_NOTHING_TO_DO) ;
1518		}
1519	}
1520	rcu_read_unlock();
1521	return rv;
1522}
1523
1524void resume_next_sg(struct drbd_device *device)
1525{
1526	write_lock_irq(&global_state_lock);
1527	_drbd_resume_next(device);
1528	write_unlock_irq(&global_state_lock);
1529}
1530
1531void suspend_other_sg(struct drbd_device *device)
1532{
1533	write_lock_irq(&global_state_lock);
1534	_drbd_pause_after(device);
1535	write_unlock_irq(&global_state_lock);
1536}
1537
1538/* caller must hold global_state_lock */
1539enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1540{
1541	struct drbd_device *odev;
1542	int resync_after;
1543
1544	if (o_minor == -1)
1545		return NO_ERROR;
1546	if (o_minor < -1 || o_minor > MINORMASK)
1547		return ERR_RESYNC_AFTER;
1548
1549	/* check for loops */
1550	odev = minor_to_device(o_minor);
1551	while (1) {
1552		if (odev == device)
1553			return ERR_RESYNC_AFTER_CYCLE;
1554
1555		/* You are free to depend on diskless, non-existing,
1556		 * or not yet/no longer existing minors.
1557		 * We only reject dependency loops.
1558		 * We cannot follow the dependency chain beyond a detached or
1559		 * missing minor.
1560		 */
1561		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1562			return NO_ERROR;
1563
1564		rcu_read_lock();
1565		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1566		rcu_read_unlock();
1567		/* dependency chain ends here, no cycles. */
1568		if (resync_after == -1)
1569			return NO_ERROR;
1570
1571		/* follow the dependency chain */
1572		odev = minor_to_device(resync_after);
1573	}
1574}
1575
1576/* caller must hold global_state_lock */
1577void drbd_resync_after_changed(struct drbd_device *device)
1578{
1579	int changes;
1580
1581	do {
1582		changes  = _drbd_pause_after(device);
1583		changes |= _drbd_resume_next(device);
1584	} while (changes);
1585}
1586
1587void drbd_rs_controller_reset(struct drbd_device *device)
1588{
1589	struct fifo_buffer *plan;
1590
1591	atomic_set(&device->rs_sect_in, 0);
1592	atomic_set(&device->rs_sect_ev, 0);
1593	device->rs_in_flight = 0;
1594
1595	/* Updating the RCU protected object in place is necessary since
1596	   this function gets called from atomic context.
1597	   It is valid since all other updates also lead to an completely
1598	   empty fifo */
1599	rcu_read_lock();
1600	plan = rcu_dereference(device->rs_plan_s);
1601	plan->total = 0;
1602	fifo_set(plan, 0);
1603	rcu_read_unlock();
1604}
1605
1606void start_resync_timer_fn(unsigned long data)
1607{
1608	struct drbd_device *device = (struct drbd_device *) data;
1609
1610	drbd_queue_work(&first_peer_device(device)->connection->sender_work,
1611			&device->start_resync_work);
1612}
1613
1614int w_start_resync(struct drbd_work *w, int cancel)
1615{
1616	struct drbd_device *device =
1617		container_of(w, struct drbd_device, start_resync_work);
1618
1619	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1620		drbd_warn(device, "w_start_resync later...\n");
1621		device->start_resync_timer.expires = jiffies + HZ/10;
1622		add_timer(&device->start_resync_timer);
1623		return 0;
1624	}
1625
1626	drbd_start_resync(device, C_SYNC_SOURCE);
1627	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1628	return 0;
1629}
1630
1631/**
1632 * drbd_start_resync() - Start the resync process
1633 * @device:	DRBD device.
1634 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1635 *
1636 * This function might bring you directly into one of the
1637 * C_PAUSED_SYNC_* states.
1638 */
1639void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1640{
1641	struct drbd_peer_device *peer_device = first_peer_device(device);
1642	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1643	union drbd_state ns;
1644	int r;
1645
1646	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1647		drbd_err(device, "Resync already running!\n");
1648		return;
1649	}
1650
1651	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1652		if (side == C_SYNC_TARGET) {
1653			/* Since application IO was locked out during C_WF_BITMAP_T and
1654			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1655			   we check that we might make the data inconsistent. */
1656			r = drbd_khelper(device, "before-resync-target");
1657			r = (r >> 8) & 0xff;
1658			if (r > 0) {
1659				drbd_info(device, "before-resync-target handler returned %d, "
1660					 "dropping connection.\n", r);
1661				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1662				return;
1663			}
1664		} else /* C_SYNC_SOURCE */ {
1665			r = drbd_khelper(device, "before-resync-source");
1666			r = (r >> 8) & 0xff;
1667			if (r > 0) {
1668				if (r == 3) {
1669					drbd_info(device, "before-resync-source handler returned %d, "
1670						 "ignoring. Old userland tools?", r);
1671				} else {
1672					drbd_info(device, "before-resync-source handler returned %d, "
1673						 "dropping connection.\n", r);
1674					conn_request_state(connection,
1675							   NS(conn, C_DISCONNECTING), CS_HARD);
1676					return;
1677				}
1678			}
1679		}
1680	}
1681
1682	if (current == connection->worker.task) {
1683		/* The worker should not sleep waiting for state_mutex,
1684		   that can take long */
1685		if (!mutex_trylock(device->state_mutex)) {
1686			set_bit(B_RS_H_DONE, &device->flags);
1687			device->start_resync_timer.expires = jiffies + HZ/5;
1688			add_timer(&device->start_resync_timer);
1689			return;
1690		}
1691	} else {
1692		mutex_lock(device->state_mutex);
1693	}
1694	clear_bit(B_RS_H_DONE, &device->flags);
1695
1696	/* req_lock: serialize with drbd_send_and_submit() and others
1697	 * global_state_lock: for stable sync-after dependencies */
1698	spin_lock_irq(&device->resource->req_lock);
1699	write_lock(&global_state_lock);
1700	/* Did some connection breakage or IO error race with us? */
1701	if (device->state.conn < C_CONNECTED
1702	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1703		write_unlock(&global_state_lock);
1704		spin_unlock_irq(&device->resource->req_lock);
1705		mutex_unlock(device->state_mutex);
1706		return;
1707	}
1708
1709	ns = drbd_read_state(device);
1710
1711	ns.aftr_isp = !_drbd_may_sync_now(device);
1712
1713	ns.conn = side;
1714
1715	if (side == C_SYNC_TARGET)
1716		ns.disk = D_INCONSISTENT;
1717	else /* side == C_SYNC_SOURCE */
1718		ns.pdsk = D_INCONSISTENT;
1719
1720	r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1721	ns = drbd_read_state(device);
1722
1723	if (ns.conn < C_CONNECTED)
1724		r = SS_UNKNOWN_ERROR;
1725
1726	if (r == SS_SUCCESS) {
1727		unsigned long tw = drbd_bm_total_weight(device);
1728		unsigned long now = jiffies;
1729		int i;
1730
1731		device->rs_failed    = 0;
1732		device->rs_paused    = 0;
1733		device->rs_same_csum = 0;
1734		device->rs_last_events = 0;
1735		device->rs_last_sect_ev = 0;
1736		device->rs_total     = tw;
1737		device->rs_start     = now;
1738		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1739			device->rs_mark_left[i] = tw;
1740			device->rs_mark_time[i] = now;
1741		}
1742		_drbd_pause_after(device);
1743	}
1744	write_unlock(&global_state_lock);
1745	spin_unlock_irq(&device->resource->req_lock);
1746
1747	if (r == SS_SUCCESS) {
1748		/* reset rs_last_bcast when a resync or verify is started,
1749		 * to deal with potential jiffies wrap. */
1750		device->rs_last_bcast = jiffies - HZ;
1751
1752		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1753		     drbd_conn_str(ns.conn),
1754		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1755		     (unsigned long) device->rs_total);
1756		if (side == C_SYNC_TARGET)
1757			device->bm_resync_fo = 0;
1758
1759		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1760		 * with w_send_oos, or the sync target will get confused as to
1761		 * how much bits to resync.  We cannot do that always, because for an
1762		 * empty resync and protocol < 95, we need to do it here, as we call
1763		 * drbd_resync_finished from here in that case.
1764		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1765		 * and from after_state_ch otherwise. */
1766		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1767			drbd_gen_and_send_sync_uuid(peer_device);
1768
1769		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1770			/* This still has a race (about when exactly the peers
1771			 * detect connection loss) that can lead to a full sync
1772			 * on next handshake. In 8.3.9 we fixed this with explicit
1773			 * resync-finished notifications, but the fix
1774			 * introduces a protocol change.  Sleeping for some
1775			 * time longer than the ping interval + timeout on the
1776			 * SyncSource, to give the SyncTarget the chance to
1777			 * detect connection loss, then waiting for a ping
1778			 * response (implicit in drbd_resync_finished) reduces
1779			 * the race considerably, but does not solve it. */
1780			if (side == C_SYNC_SOURCE) {
1781				struct net_conf *nc;
1782				int timeo;
1783
1784				rcu_read_lock();
1785				nc = rcu_dereference(connection->net_conf);
1786				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1787				rcu_read_unlock();
1788				schedule_timeout_interruptible(timeo);
1789			}
1790			drbd_resync_finished(device);
1791		}
1792
1793		drbd_rs_controller_reset(device);
1794		/* ns.conn may already be != device->state.conn,
1795		 * we may have been paused in between, or become paused until
1796		 * the timer triggers.
1797		 * No matter, that is handled in resync_timer_fn() */
1798		if (ns.conn == C_SYNC_TARGET)
1799			mod_timer(&device->resync_timer, jiffies);
1800
1801		drbd_md_sync(device);
1802	}
1803	put_ldev(device);
1804	mutex_unlock(device->state_mutex);
1805}
1806
1807static void update_on_disk_bitmap(struct drbd_device *device)
1808{
1809	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1810	device->rs_last_bcast = jiffies;
1811
1812	if (!get_ldev(device))
1813		return;
1814
1815	drbd_bm_write_lazy(device, 0);
1816	if (drbd_bm_total_weight(device) <= device->rs_failed)
1817		drbd_resync_finished(device);
1818	drbd_bcast_event(device, &sib);
1819	/* update timestamp, in case it took a while to write out stuff */
1820	device->rs_last_bcast = jiffies;
1821	put_ldev(device);
1822}
1823
1824bool wants_lazy_bitmap_update(struct drbd_device *device)
1825{
1826	enum drbd_conns connection_state = device->state.conn;
1827	return
1828	/* only do a lazy writeout, if device is in some resync state */
1829	   (connection_state == C_SYNC_SOURCE
1830	||  connection_state == C_SYNC_TARGET
1831	||  connection_state == C_PAUSED_SYNC_S
1832	||  connection_state == C_PAUSED_SYNC_T) &&
1833	/* AND
1834	 * either we just finished, or the last lazy update
1835	 * was some time ago already. */
1836	   (drbd_bm_total_weight(device) <= device->rs_failed
1837	||  time_after(jiffies, device->rs_last_bcast + 2*HZ));
1838}
1839
1840static void try_update_all_on_disk_bitmaps(struct drbd_connection *connection)
1841{
1842	struct drbd_peer_device *peer_device;
1843	int vnr;
1844
1845	rcu_read_lock();
1846	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1847		struct drbd_device *device = peer_device->device;
1848		if (!wants_lazy_bitmap_update(device))
1849			continue;
1850		kref_get(&device->kref);
1851		rcu_read_unlock();
1852		update_on_disk_bitmap(device);
1853		kref_put(&device->kref, drbd_destroy_device);
1854		rcu_read_lock();
1855	}
1856	rcu_read_unlock();
1857}
1858
1859static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1860{
1861	spin_lock_irq(&queue->q_lock);
1862	list_splice_init(&queue->q, work_list);
1863	spin_unlock_irq(&queue->q_lock);
1864	return !list_empty(work_list);
1865}
1866
1867static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1868{
1869	spin_lock_irq(&queue->q_lock);
1870	if (!list_empty(&queue->q))
1871		list_move(queue->q.next, work_list);
1872	spin_unlock_irq(&queue->q_lock);
1873	return !list_empty(work_list);
1874}
1875
1876static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1877{
1878	DEFINE_WAIT(wait);
1879	struct net_conf *nc;
1880	int uncork, cork;
1881
1882	dequeue_work_item(&connection->sender_work, work_list);
1883	if (!list_empty(work_list))
1884		return;
1885
1886	/* Still nothing to do?
1887	 * Maybe we still need to close the current epoch,
1888	 * even if no new requests are queued yet.
1889	 *
1890	 * Also, poke TCP, just in case.
1891	 * Then wait for new work (or signal). */
1892	rcu_read_lock();
1893	nc = rcu_dereference(connection->net_conf);
1894	uncork = nc ? nc->tcp_cork : 0;
1895	rcu_read_unlock();
1896	if (uncork) {
1897		mutex_lock(&connection->data.mutex);
1898		if (connection->data.socket)
1899			drbd_tcp_uncork(connection->data.socket);
1900		mutex_unlock(&connection->data.mutex);
1901	}
1902
1903	for (;;) {
1904		int send_barrier;
1905		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1906		spin_lock_irq(&connection->resource->req_lock);
1907		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1908		/* dequeue single item only,
1909		 * we still use drbd_queue_work_front() in some places */
1910		if (!list_empty(&connection->sender_work.q))
1911			list_move(connection->sender_work.q.next, work_list);
1912		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1913		if (!list_empty(work_list) || signal_pending(current)) {
1914			spin_unlock_irq(&connection->resource->req_lock);
1915			break;
1916		}
1917
1918		/* We found nothing new to do, no to-be-communicated request,
1919		 * no other work item.  We may still need to close the last
1920		 * epoch.  Next incoming request epoch will be connection ->
1921		 * current transfer log epoch number.  If that is different
1922		 * from the epoch of the last request we communicated, it is
1923		 * safe to send the epoch separating barrier now.
1924		 */
1925		send_barrier =
1926			atomic_read(&connection->current_tle_nr) !=
1927			connection->send.current_epoch_nr;
1928		spin_unlock_irq(&connection->resource->req_lock);
1929
1930		if (send_barrier)
1931			maybe_send_barrier(connection,
1932					connection->send.current_epoch_nr + 1);
1933		/* drbd_send() may have called flush_signals() */
1934		if (get_t_state(&connection->worker) != RUNNING)
1935			break;
1936		schedule();
1937		/* may be woken up for other things but new work, too,
1938		 * e.g. if the current epoch got closed.
1939		 * In which case we send the barrier above. */
1940
1941		try_update_all_on_disk_bitmaps(connection);
1942	}
1943	finish_wait(&connection->sender_work.q_wait, &wait);
1944
1945	/* someone may have changed the config while we have been waiting above. */
1946	rcu_read_lock();
1947	nc = rcu_dereference(connection->net_conf);
1948	cork = nc ? nc->tcp_cork : 0;
1949	rcu_read_unlock();
1950	mutex_lock(&connection->data.mutex);
1951	if (connection->data.socket) {
1952		if (cork)
1953			drbd_tcp_cork(connection->data.socket);
1954		else if (!uncork)
1955			drbd_tcp_uncork(connection->data.socket);
1956	}
1957	mutex_unlock(&connection->data.mutex);
1958}
1959
1960int drbd_worker(struct drbd_thread *thi)
1961{
1962	struct drbd_connection *connection = thi->connection;
1963	struct drbd_work *w = NULL;
1964	struct drbd_peer_device *peer_device;
1965	LIST_HEAD(work_list);
1966	int vnr;
1967
1968	while (get_t_state(thi) == RUNNING) {
1969		drbd_thread_current_set_cpu(thi);
1970
1971		/* as long as we use drbd_queue_work_front(),
1972		 * we may only dequeue single work items here, not batches. */
1973		if (list_empty(&work_list))
1974			wait_for_work(connection, &work_list);
1975
1976		if (signal_pending(current)) {
1977			flush_signals(current);
1978			if (get_t_state(thi) == RUNNING) {
1979				drbd_warn(connection, "Worker got an unexpected signal\n");
1980				continue;
1981			}
1982			break;
1983		}
1984
1985		if (get_t_state(thi) != RUNNING)
1986			break;
1987
1988		while (!list_empty(&work_list)) {
1989			w = list_first_entry(&work_list, struct drbd_work, list);
1990			list_del_init(&w->list);
1991			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
1992				continue;
1993			if (connection->cstate >= C_WF_REPORT_PARAMS)
1994				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1995		}
1996	}
1997
1998	do {
1999		while (!list_empty(&work_list)) {
2000			w = list_first_entry(&work_list, struct drbd_work, list);
2001			list_del_init(&w->list);
2002			w->cb(w, 1);
2003		}
2004		dequeue_work_batch(&connection->sender_work, &work_list);
2005	} while (!list_empty(&work_list));
2006
2007	rcu_read_lock();
2008	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2009		struct drbd_device *device = peer_device->device;
2010		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2011		kref_get(&device->kref);
2012		rcu_read_unlock();
2013		drbd_device_cleanup(device);
2014		kref_put(&device->kref, drbd_destroy_device);
2015		rcu_read_lock();
2016	}
2017	rcu_read_unlock();
2018
2019	return 0;
2020}
2021