drbd_worker.c revision a8cd15ba7919eaf1f416857f983a502cc261af26
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24*/
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_protocol.h"
40#include "drbd_req.h"
41
42static int w_make_ov_request(struct drbd_work *, int);
43
44
45/* endio handlers:
46 *   drbd_md_io_complete (defined here)
47 *   drbd_request_endio (defined here)
48 *   drbd_peer_request_endio (defined here)
49 *   bm_async_io_complete (defined in drbd_bitmap.c)
50 *
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
55 *
56 */
57
58
59/* About the global_state_lock
60   Each state transition on an device holds a read lock. In case we have
61   to evaluate the resync after dependencies, we grab a write lock, because
62   we need stable states on all devices for that.  */
63rwlock_t global_state_lock;
64
65/* used for synchronous meta data and bitmap IO
66 * submitted by drbd_md_sync_page_io()
67 */
68void drbd_md_io_complete(struct bio *bio, int error)
69{
70	struct drbd_md_io *md_io;
71	struct drbd_device *device;
72
73	md_io = (struct drbd_md_io *)bio->bi_private;
74	device = container_of(md_io, struct drbd_device, md_io);
75
76	md_io->error = error;
77
78	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
79	 * to timeout on the lower level device, and eventually detach from it.
80	 * If this io completion runs after that timeout expired, this
81	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
82	 * During normal operation, this only puts that extra reference
83	 * down to 1 again.
84	 * Make sure we first drop the reference, and only then signal
85	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
86	 * next drbd_md_sync_page_io(), that we trigger the
87	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
88	 */
89	drbd_md_put_buffer(device);
90	md_io->done = 1;
91	wake_up(&device->misc_wait);
92	bio_put(bio);
93	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
94		put_ldev(device);
95}
96
97/* reads on behalf of the partner,
98 * "submitted" by the receiver
99 */
100static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101{
102	unsigned long flags = 0;
103	struct drbd_device *device = peer_req->peer_device->device;
104
105	spin_lock_irqsave(&device->resource->req_lock, flags);
106	device->read_cnt += peer_req->i.size >> 9;
107	list_del(&peer_req->w.list);
108	if (list_empty(&device->read_ee))
109		wake_up(&device->ee_wait);
110	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
111		__drbd_chk_io_error(device, DRBD_READ_ERROR);
112	spin_unlock_irqrestore(&device->resource->req_lock, flags);
113
114	drbd_queue_work(&first_peer_device(device)->connection->sender_work,
115			&peer_req->w);
116	put_ldev(device);
117}
118
119/* writes on behalf of the partner, or resync writes,
120 * "submitted" by the receiver, final stage.  */
121static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
122{
123	unsigned long flags = 0;
124	struct drbd_device *device = peer_req->peer_device->device;
125	struct drbd_interval i;
126	int do_wake;
127	u64 block_id;
128	int do_al_complete_io;
129
130	/* after we moved peer_req to done_ee,
131	 * we may no longer access it,
132	 * it may be freed/reused already!
133	 * (as soon as we release the req_lock) */
134	i = peer_req->i;
135	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
136	block_id = peer_req->block_id;
137
138	spin_lock_irqsave(&device->resource->req_lock, flags);
139	device->writ_cnt += peer_req->i.size >> 9;
140	list_move_tail(&peer_req->w.list, &device->done_ee);
141
142	/*
143	 * Do not remove from the write_requests tree here: we did not send the
144	 * Ack yet and did not wake possibly waiting conflicting requests.
145	 * Removed from the tree from "drbd_process_done_ee" within the
146	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
147	 * _drbd_clear_done_ee.
148	 */
149
150	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
151
152	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
153		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
154	spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156	if (block_id == ID_SYNCER)
157		drbd_rs_complete_io(device, i.sector);
158
159	if (do_wake)
160		wake_up(&device->ee_wait);
161
162	if (do_al_complete_io)
163		drbd_al_complete_io(device, &i);
164
165	wake_asender(first_peer_device(device)->connection);
166	put_ldev(device);
167}
168
169/* writes on behalf of the partner, or resync writes,
170 * "submitted" by the receiver.
171 */
172void drbd_peer_request_endio(struct bio *bio, int error)
173{
174	struct drbd_peer_request *peer_req = bio->bi_private;
175	struct drbd_device *device = peer_req->peer_device->device;
176	int uptodate = bio_flagged(bio, BIO_UPTODATE);
177	int is_write = bio_data_dir(bio) == WRITE;
178
179	if (error && __ratelimit(&drbd_ratelimit_state))
180		drbd_warn(device, "%s: error=%d s=%llus\n",
181				is_write ? "write" : "read", error,
182				(unsigned long long)peer_req->i.sector);
183	if (!error && !uptodate) {
184		if (__ratelimit(&drbd_ratelimit_state))
185			drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
186					is_write ? "write" : "read",
187					(unsigned long long)peer_req->i.sector);
188		/* strange behavior of some lower level drivers...
189		 * fail the request by clearing the uptodate flag,
190		 * but do not return any error?! */
191		error = -EIO;
192	}
193
194	if (error)
195		set_bit(__EE_WAS_ERROR, &peer_req->flags);
196
197	bio_put(bio); /* no need for the bio anymore */
198	if (atomic_dec_and_test(&peer_req->pending_bios)) {
199		if (is_write)
200			drbd_endio_write_sec_final(peer_req);
201		else
202			drbd_endio_read_sec_final(peer_req);
203	}
204}
205
206/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
207 */
208void drbd_request_endio(struct bio *bio, int error)
209{
210	unsigned long flags;
211	struct drbd_request *req = bio->bi_private;
212	struct drbd_device *device = req->device;
213	struct bio_and_error m;
214	enum drbd_req_event what;
215	int uptodate = bio_flagged(bio, BIO_UPTODATE);
216
217	if (!error && !uptodate) {
218		drbd_warn(device, "p %s: setting error to -EIO\n",
219			 bio_data_dir(bio) == WRITE ? "write" : "read");
220		/* strange behavior of some lower level drivers...
221		 * fail the request by clearing the uptodate flag,
222		 * but do not return any error?! */
223		error = -EIO;
224	}
225
226
227	/* If this request was aborted locally before,
228	 * but now was completed "successfully",
229	 * chances are that this caused arbitrary data corruption.
230	 *
231	 * "aborting" requests, or force-detaching the disk, is intended for
232	 * completely blocked/hung local backing devices which do no longer
233	 * complete requests at all, not even do error completions.  In this
234	 * situation, usually a hard-reset and failover is the only way out.
235	 *
236	 * By "aborting", basically faking a local error-completion,
237	 * we allow for a more graceful swichover by cleanly migrating services.
238	 * Still the affected node has to be rebooted "soon".
239	 *
240	 * By completing these requests, we allow the upper layers to re-use
241	 * the associated data pages.
242	 *
243	 * If later the local backing device "recovers", and now DMAs some data
244	 * from disk into the original request pages, in the best case it will
245	 * just put random data into unused pages; but typically it will corrupt
246	 * meanwhile completely unrelated data, causing all sorts of damage.
247	 *
248	 * Which means delayed successful completion,
249	 * especially for READ requests,
250	 * is a reason to panic().
251	 *
252	 * We assume that a delayed *error* completion is OK,
253	 * though we still will complain noisily about it.
254	 */
255	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
256		if (__ratelimit(&drbd_ratelimit_state))
257			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
258
259		if (!error)
260			panic("possible random memory corruption caused by delayed completion of aborted local request\n");
261	}
262
263	/* to avoid recursion in __req_mod */
264	if (unlikely(error)) {
265		what = (bio_data_dir(bio) == WRITE)
266			? WRITE_COMPLETED_WITH_ERROR
267			: (bio_rw(bio) == READ)
268			  ? READ_COMPLETED_WITH_ERROR
269			  : READ_AHEAD_COMPLETED_WITH_ERROR;
270	} else
271		what = COMPLETED_OK;
272
273	bio_put(req->private_bio);
274	req->private_bio = ERR_PTR(error);
275
276	/* not req_mod(), we need irqsave here! */
277	spin_lock_irqsave(&device->resource->req_lock, flags);
278	__req_mod(req, what, &m);
279	spin_unlock_irqrestore(&device->resource->req_lock, flags);
280	put_ldev(device);
281
282	if (m.bio)
283		complete_master_bio(device, &m);
284}
285
286void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
287{
288	struct hash_desc desc;
289	struct scatterlist sg;
290	struct page *page = peer_req->pages;
291	struct page *tmp;
292	unsigned len;
293
294	desc.tfm = tfm;
295	desc.flags = 0;
296
297	sg_init_table(&sg, 1);
298	crypto_hash_init(&desc);
299
300	while ((tmp = page_chain_next(page))) {
301		/* all but the last page will be fully used */
302		sg_set_page(&sg, page, PAGE_SIZE, 0);
303		crypto_hash_update(&desc, &sg, sg.length);
304		page = tmp;
305	}
306	/* and now the last, possibly only partially used page */
307	len = peer_req->i.size & (PAGE_SIZE - 1);
308	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
309	crypto_hash_update(&desc, &sg, sg.length);
310	crypto_hash_final(&desc, digest);
311}
312
313void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
314{
315	struct hash_desc desc;
316	struct scatterlist sg;
317	struct bio_vec bvec;
318	struct bvec_iter iter;
319
320	desc.tfm = tfm;
321	desc.flags = 0;
322
323	sg_init_table(&sg, 1);
324	crypto_hash_init(&desc);
325
326	bio_for_each_segment(bvec, bio, iter) {
327		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
328		crypto_hash_update(&desc, &sg, sg.length);
329	}
330	crypto_hash_final(&desc, digest);
331}
332
333/* MAYBE merge common code with w_e_end_ov_req */
334static int w_e_send_csum(struct drbd_work *w, int cancel)
335{
336	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
337	struct drbd_device *device = peer_req->peer_device->device;
338	int digest_size;
339	void *digest;
340	int err = 0;
341
342	if (unlikely(cancel))
343		goto out;
344
345	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
346		goto out;
347
348	digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->csums_tfm);
349	digest = kmalloc(digest_size, GFP_NOIO);
350	if (digest) {
351		sector_t sector = peer_req->i.sector;
352		unsigned int size = peer_req->i.size;
353		drbd_csum_ee(first_peer_device(device)->connection->csums_tfm, peer_req, digest);
354		/* Free peer_req and pages before send.
355		 * In case we block on congestion, we could otherwise run into
356		 * some distributed deadlock, if the other side blocks on
357		 * congestion as well, because our receiver blocks in
358		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
359		drbd_free_peer_req(device, peer_req);
360		peer_req = NULL;
361		inc_rs_pending(device);
362		err = drbd_send_drequest_csum(first_peer_device(device), sector, size,
363					      digest, digest_size,
364					      P_CSUM_RS_REQUEST);
365		kfree(digest);
366	} else {
367		drbd_err(device, "kmalloc() of digest failed.\n");
368		err = -ENOMEM;
369	}
370
371out:
372	if (peer_req)
373		drbd_free_peer_req(device, peer_req);
374
375	if (unlikely(err))
376		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
377	return err;
378}
379
380#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
381
382static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
383{
384	struct drbd_device *device = peer_device->device;
385	struct drbd_peer_request *peer_req;
386
387	if (!get_ldev(device))
388		return -EIO;
389
390	if (drbd_rs_should_slow_down(device, sector))
391		goto defer;
392
393	/* GFP_TRY, because if there is no memory available right now, this may
394	 * be rescheduled for later. It is "only" background resync, after all. */
395	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
396				       size, GFP_TRY);
397	if (!peer_req)
398		goto defer;
399
400	peer_req->w.cb = w_e_send_csum;
401	spin_lock_irq(&device->resource->req_lock);
402	list_add(&peer_req->w.list, &device->read_ee);
403	spin_unlock_irq(&device->resource->req_lock);
404
405	atomic_add(size >> 9, &device->rs_sect_ev);
406	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
407		return 0;
408
409	/* If it failed because of ENOMEM, retry should help.  If it failed
410	 * because bio_add_page failed (probably broken lower level driver),
411	 * retry may or may not help.
412	 * If it does not, you may need to force disconnect. */
413	spin_lock_irq(&device->resource->req_lock);
414	list_del(&peer_req->w.list);
415	spin_unlock_irq(&device->resource->req_lock);
416
417	drbd_free_peer_req(device, peer_req);
418defer:
419	put_ldev(device);
420	return -EAGAIN;
421}
422
423int w_resync_timer(struct drbd_work *w, int cancel)
424{
425	struct drbd_device *device =
426		container_of(w, struct drbd_device, resync_work);
427
428	switch (device->state.conn) {
429	case C_VERIFY_S:
430		w_make_ov_request(w, cancel);
431		break;
432	case C_SYNC_TARGET:
433		w_make_resync_request(w, cancel);
434		break;
435	}
436
437	return 0;
438}
439
440void resync_timer_fn(unsigned long data)
441{
442	struct drbd_device *device = (struct drbd_device *) data;
443
444	if (list_empty(&device->resync_work.list))
445		drbd_queue_work(&first_peer_device(device)->connection->sender_work,
446				&device->resync_work);
447}
448
449static void fifo_set(struct fifo_buffer *fb, int value)
450{
451	int i;
452
453	for (i = 0; i < fb->size; i++)
454		fb->values[i] = value;
455}
456
457static int fifo_push(struct fifo_buffer *fb, int value)
458{
459	int ov;
460
461	ov = fb->values[fb->head_index];
462	fb->values[fb->head_index++] = value;
463
464	if (fb->head_index >= fb->size)
465		fb->head_index = 0;
466
467	return ov;
468}
469
470static void fifo_add_val(struct fifo_buffer *fb, int value)
471{
472	int i;
473
474	for (i = 0; i < fb->size; i++)
475		fb->values[i] += value;
476}
477
478struct fifo_buffer *fifo_alloc(int fifo_size)
479{
480	struct fifo_buffer *fb;
481
482	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
483	if (!fb)
484		return NULL;
485
486	fb->head_index = 0;
487	fb->size = fifo_size;
488	fb->total = 0;
489
490	return fb;
491}
492
493static int drbd_rs_controller(struct drbd_device *device)
494{
495	struct disk_conf *dc;
496	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
497	unsigned int want;     /* The number of sectors we want in the proxy */
498	int req_sect; /* Number of sectors to request in this turn */
499	int correction; /* Number of sectors more we need in the proxy*/
500	int cps; /* correction per invocation of drbd_rs_controller() */
501	int steps; /* Number of time steps to plan ahead */
502	int curr_corr;
503	int max_sect;
504	struct fifo_buffer *plan;
505
506	sect_in = atomic_xchg(&device->rs_sect_in, 0); /* Number of sectors that came in */
507	device->rs_in_flight -= sect_in;
508
509	dc = rcu_dereference(device->ldev->disk_conf);
510	plan = rcu_dereference(device->rs_plan_s);
511
512	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
513
514	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
515		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
516	} else { /* normal path */
517		want = dc->c_fill_target ? dc->c_fill_target :
518			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
519	}
520
521	correction = want - device->rs_in_flight - plan->total;
522
523	/* Plan ahead */
524	cps = correction / steps;
525	fifo_add_val(plan, cps);
526	plan->total += cps * steps;
527
528	/* What we do in this step */
529	curr_corr = fifo_push(plan, 0);
530	plan->total -= curr_corr;
531
532	req_sect = sect_in + curr_corr;
533	if (req_sect < 0)
534		req_sect = 0;
535
536	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
537	if (req_sect > max_sect)
538		req_sect = max_sect;
539
540	/*
541	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
542		 sect_in, device->rs_in_flight, want, correction,
543		 steps, cps, device->rs_planed, curr_corr, req_sect);
544	*/
545
546	return req_sect;
547}
548
549static int drbd_rs_number_requests(struct drbd_device *device)
550{
551	int number;
552
553	rcu_read_lock();
554	if (rcu_dereference(device->rs_plan_s)->size) {
555		number = drbd_rs_controller(device) >> (BM_BLOCK_SHIFT - 9);
556		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
557	} else {
558		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
559		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
560	}
561	rcu_read_unlock();
562
563	/* ignore the amount of pending requests, the resync controller should
564	 * throttle down to incoming reply rate soon enough anyways. */
565	return number;
566}
567
568int w_make_resync_request(struct drbd_work *w, int cancel)
569{
570	struct drbd_device_work *dw = device_work(w);
571	struct drbd_device *device = dw->device;
572	unsigned long bit;
573	sector_t sector;
574	const sector_t capacity = drbd_get_capacity(device->this_bdev);
575	int max_bio_size;
576	int number, rollback_i, size;
577	int align, queued, sndbuf;
578	int i = 0;
579
580	if (unlikely(cancel))
581		return 0;
582
583	if (device->rs_total == 0) {
584		/* empty resync? */
585		drbd_resync_finished(device);
586		return 0;
587	}
588
589	if (!get_ldev(device)) {
590		/* Since we only need to access device->rsync a
591		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
592		   to continue resync with a broken disk makes no sense at
593		   all */
594		drbd_err(device, "Disk broke down during resync!\n");
595		return 0;
596	}
597
598	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
599	number = drbd_rs_number_requests(device);
600	if (number == 0)
601		goto requeue;
602
603	for (i = 0; i < number; i++) {
604		/* Stop generating RS requests, when half of the send buffer is filled */
605		mutex_lock(&first_peer_device(device)->connection->data.mutex);
606		if (first_peer_device(device)->connection->data.socket) {
607			queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued;
608			sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf;
609		} else {
610			queued = 1;
611			sndbuf = 0;
612		}
613		mutex_unlock(&first_peer_device(device)->connection->data.mutex);
614		if (queued > sndbuf / 2)
615			goto requeue;
616
617next_sector:
618		size = BM_BLOCK_SIZE;
619		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
620
621		if (bit == DRBD_END_OF_BITMAP) {
622			device->bm_resync_fo = drbd_bm_bits(device);
623			put_ldev(device);
624			return 0;
625		}
626
627		sector = BM_BIT_TO_SECT(bit);
628
629		if (drbd_rs_should_slow_down(device, sector) ||
630		    drbd_try_rs_begin_io(device, sector)) {
631			device->bm_resync_fo = bit;
632			goto requeue;
633		}
634		device->bm_resync_fo = bit + 1;
635
636		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
637			drbd_rs_complete_io(device, sector);
638			goto next_sector;
639		}
640
641#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
642		/* try to find some adjacent bits.
643		 * we stop if we have already the maximum req size.
644		 *
645		 * Additionally always align bigger requests, in order to
646		 * be prepared for all stripe sizes of software RAIDs.
647		 */
648		align = 1;
649		rollback_i = i;
650		for (;;) {
651			if (size + BM_BLOCK_SIZE > max_bio_size)
652				break;
653
654			/* Be always aligned */
655			if (sector & ((1<<(align+3))-1))
656				break;
657
658			/* do not cross extent boundaries */
659			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
660				break;
661			/* now, is it actually dirty, after all?
662			 * caution, drbd_bm_test_bit is tri-state for some
663			 * obscure reason; ( b == 0 ) would get the out-of-band
664			 * only accidentally right because of the "oddly sized"
665			 * adjustment below */
666			if (drbd_bm_test_bit(device, bit+1) != 1)
667				break;
668			bit++;
669			size += BM_BLOCK_SIZE;
670			if ((BM_BLOCK_SIZE << align) <= size)
671				align++;
672			i++;
673		}
674		/* if we merged some,
675		 * reset the offset to start the next drbd_bm_find_next from */
676		if (size > BM_BLOCK_SIZE)
677			device->bm_resync_fo = bit + 1;
678#endif
679
680		/* adjust very last sectors, in case we are oddly sized */
681		if (sector + (size>>9) > capacity)
682			size = (capacity-sector)<<9;
683		if (first_peer_device(device)->connection->agreed_pro_version >= 89 &&
684		    first_peer_device(device)->connection->csums_tfm) {
685			switch (read_for_csum(first_peer_device(device), sector, size)) {
686			case -EIO: /* Disk failure */
687				put_ldev(device);
688				return -EIO;
689			case -EAGAIN: /* allocation failed, or ldev busy */
690				drbd_rs_complete_io(device, sector);
691				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
692				i = rollback_i;
693				goto requeue;
694			case 0:
695				/* everything ok */
696				break;
697			default:
698				BUG();
699			}
700		} else {
701			int err;
702
703			inc_rs_pending(device);
704			err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST,
705						 sector, size, ID_SYNCER);
706			if (err) {
707				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
708				dec_rs_pending(device);
709				put_ldev(device);
710				return err;
711			}
712		}
713	}
714
715	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
716		/* last syncer _request_ was sent,
717		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
718		 * next sync group will resume), as soon as we receive the last
719		 * resync data block, and the last bit is cleared.
720		 * until then resync "work" is "inactive" ...
721		 */
722		put_ldev(device);
723		return 0;
724	}
725
726 requeue:
727	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
728	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
729	put_ldev(device);
730	return 0;
731}
732
733static int w_make_ov_request(struct drbd_work *w, int cancel)
734{
735	struct drbd_device *device = device_work(w)->device;
736	int number, i, size;
737	sector_t sector;
738	const sector_t capacity = drbd_get_capacity(device->this_bdev);
739	bool stop_sector_reached = false;
740
741	if (unlikely(cancel))
742		return 1;
743
744	number = drbd_rs_number_requests(device);
745
746	sector = device->ov_position;
747	for (i = 0; i < number; i++) {
748		if (sector >= capacity)
749			return 1;
750
751		/* We check for "finished" only in the reply path:
752		 * w_e_end_ov_reply().
753		 * We need to send at least one request out. */
754		stop_sector_reached = i > 0
755			&& verify_can_do_stop_sector(device)
756			&& sector >= device->ov_stop_sector;
757		if (stop_sector_reached)
758			break;
759
760		size = BM_BLOCK_SIZE;
761
762		if (drbd_rs_should_slow_down(device, sector) ||
763		    drbd_try_rs_begin_io(device, sector)) {
764			device->ov_position = sector;
765			goto requeue;
766		}
767
768		if (sector + (size>>9) > capacity)
769			size = (capacity-sector)<<9;
770
771		inc_rs_pending(device);
772		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
773			dec_rs_pending(device);
774			return 0;
775		}
776		sector += BM_SECT_PER_BIT;
777	}
778	device->ov_position = sector;
779
780 requeue:
781	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
782	if (i == 0 || !stop_sector_reached)
783		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
784	return 1;
785}
786
787int w_ov_finished(struct drbd_work *w, int cancel)
788{
789	struct drbd_device_work *dw =
790		container_of(w, struct drbd_device_work, w);
791	struct drbd_device *device = dw->device;
792	kfree(dw);
793	ov_out_of_sync_print(device);
794	drbd_resync_finished(device);
795
796	return 0;
797}
798
799static int w_resync_finished(struct drbd_work *w, int cancel)
800{
801	struct drbd_device_work *dw =
802		container_of(w, struct drbd_device_work, w);
803	struct drbd_device *device = dw->device;
804	kfree(dw);
805
806	drbd_resync_finished(device);
807
808	return 0;
809}
810
811static void ping_peer(struct drbd_device *device)
812{
813	struct drbd_connection *connection = first_peer_device(device)->connection;
814
815	clear_bit(GOT_PING_ACK, &connection->flags);
816	request_ping(connection);
817	wait_event(connection->ping_wait,
818		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
819}
820
821int drbd_resync_finished(struct drbd_device *device)
822{
823	unsigned long db, dt, dbdt;
824	unsigned long n_oos;
825	union drbd_state os, ns;
826	struct drbd_device_work *dw;
827	char *khelper_cmd = NULL;
828	int verify_done = 0;
829
830	/* Remove all elements from the resync LRU. Since future actions
831	 * might set bits in the (main) bitmap, then the entries in the
832	 * resync LRU would be wrong. */
833	if (drbd_rs_del_all(device)) {
834		/* In case this is not possible now, most probably because
835		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
836		 * queue (or even the read operations for those packets
837		 * is not finished by now).   Retry in 100ms. */
838
839		schedule_timeout_interruptible(HZ / 10);
840		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
841		if (dw) {
842			dw->w.cb = w_resync_finished;
843			dw->device = device;
844			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
845					&dw->w);
846			return 1;
847		}
848		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
849	}
850
851	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
852	if (dt <= 0)
853		dt = 1;
854
855	db = device->rs_total;
856	/* adjust for verify start and stop sectors, respective reached position */
857	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
858		db -= device->ov_left;
859
860	dbdt = Bit2KB(db/dt);
861	device->rs_paused /= HZ;
862
863	if (!get_ldev(device))
864		goto out;
865
866	ping_peer(device);
867
868	spin_lock_irq(&device->resource->req_lock);
869	os = drbd_read_state(device);
870
871	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
872
873	/* This protects us against multiple calls (that can happen in the presence
874	   of application IO), and against connectivity loss just before we arrive here. */
875	if (os.conn <= C_CONNECTED)
876		goto out_unlock;
877
878	ns = os;
879	ns.conn = C_CONNECTED;
880
881	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
882	     verify_done ? "Online verify" : "Resync",
883	     dt + device->rs_paused, device->rs_paused, dbdt);
884
885	n_oos = drbd_bm_total_weight(device);
886
887	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
888		if (n_oos) {
889			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
890			      n_oos, Bit2KB(1));
891			khelper_cmd = "out-of-sync";
892		}
893	} else {
894		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
895
896		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
897			khelper_cmd = "after-resync-target";
898
899		if (first_peer_device(device)->connection->csums_tfm && device->rs_total) {
900			const unsigned long s = device->rs_same_csum;
901			const unsigned long t = device->rs_total;
902			const int ratio =
903				(t == 0)     ? 0 :
904			(t < 100000) ? ((s*100)/t) : (s/(t/100));
905			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
906			     "transferred %luK total %luK\n",
907			     ratio,
908			     Bit2KB(device->rs_same_csum),
909			     Bit2KB(device->rs_total - device->rs_same_csum),
910			     Bit2KB(device->rs_total));
911		}
912	}
913
914	if (device->rs_failed) {
915		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
916
917		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
918			ns.disk = D_INCONSISTENT;
919			ns.pdsk = D_UP_TO_DATE;
920		} else {
921			ns.disk = D_UP_TO_DATE;
922			ns.pdsk = D_INCONSISTENT;
923		}
924	} else {
925		ns.disk = D_UP_TO_DATE;
926		ns.pdsk = D_UP_TO_DATE;
927
928		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
929			if (device->p_uuid) {
930				int i;
931				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
932					_drbd_uuid_set(device, i, device->p_uuid[i]);
933				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
934				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
935			} else {
936				drbd_err(device, "device->p_uuid is NULL! BUG\n");
937			}
938		}
939
940		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
941			/* for verify runs, we don't update uuids here,
942			 * so there would be nothing to report. */
943			drbd_uuid_set_bm(device, 0UL);
944			drbd_print_uuids(device, "updated UUIDs");
945			if (device->p_uuid) {
946				/* Now the two UUID sets are equal, update what we
947				 * know of the peer. */
948				int i;
949				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
950					device->p_uuid[i] = device->ldev->md.uuid[i];
951			}
952		}
953	}
954
955	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
956out_unlock:
957	spin_unlock_irq(&device->resource->req_lock);
958	put_ldev(device);
959out:
960	device->rs_total  = 0;
961	device->rs_failed = 0;
962	device->rs_paused = 0;
963
964	/* reset start sector, if we reached end of device */
965	if (verify_done && device->ov_left == 0)
966		device->ov_start_sector = 0;
967
968	drbd_md_sync(device);
969
970	if (khelper_cmd)
971		drbd_khelper(device, khelper_cmd);
972
973	return 1;
974}
975
976/* helper */
977static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
978{
979	if (drbd_peer_req_has_active_page(peer_req)) {
980		/* This might happen if sendpage() has not finished */
981		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
982		atomic_add(i, &device->pp_in_use_by_net);
983		atomic_sub(i, &device->pp_in_use);
984		spin_lock_irq(&device->resource->req_lock);
985		list_add_tail(&peer_req->w.list, &device->net_ee);
986		spin_unlock_irq(&device->resource->req_lock);
987		wake_up(&drbd_pp_wait);
988	} else
989		drbd_free_peer_req(device, peer_req);
990}
991
992/**
993 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
994 * @device:	DRBD device.
995 * @w:		work object.
996 * @cancel:	The connection will be closed anyways
997 */
998int w_e_end_data_req(struct drbd_work *w, int cancel)
999{
1000	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1001	struct drbd_device *device = peer_req->peer_device->device;
1002	int err;
1003
1004	if (unlikely(cancel)) {
1005		drbd_free_peer_req(device, peer_req);
1006		dec_unacked(device);
1007		return 0;
1008	}
1009
1010	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1011		err = drbd_send_block(first_peer_device(device), P_DATA_REPLY, peer_req);
1012	} else {
1013		if (__ratelimit(&drbd_ratelimit_state))
1014			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1015			    (unsigned long long)peer_req->i.sector);
1016
1017		err = drbd_send_ack(first_peer_device(device), P_NEG_DREPLY, peer_req);
1018	}
1019
1020	dec_unacked(device);
1021
1022	move_to_net_ee_or_free(device, peer_req);
1023
1024	if (unlikely(err))
1025		drbd_err(device, "drbd_send_block() failed\n");
1026	return err;
1027}
1028
1029/**
1030 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1031 * @w:		work object.
1032 * @cancel:	The connection will be closed anyways
1033 */
1034int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1035{
1036	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1037	struct drbd_device *device = peer_req->peer_device->device;
1038	int err;
1039
1040	if (unlikely(cancel)) {
1041		drbd_free_peer_req(device, peer_req);
1042		dec_unacked(device);
1043		return 0;
1044	}
1045
1046	if (get_ldev_if_state(device, D_FAILED)) {
1047		drbd_rs_complete_io(device, peer_req->i.sector);
1048		put_ldev(device);
1049	}
1050
1051	if (device->state.conn == C_AHEAD) {
1052		err = drbd_send_ack(first_peer_device(device), P_RS_CANCEL, peer_req);
1053	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1054		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1055			inc_rs_pending(device);
1056			err = drbd_send_block(first_peer_device(device), P_RS_DATA_REPLY, peer_req);
1057		} else {
1058			if (__ratelimit(&drbd_ratelimit_state))
1059				drbd_err(device, "Not sending RSDataReply, "
1060				    "partner DISKLESS!\n");
1061			err = 0;
1062		}
1063	} else {
1064		if (__ratelimit(&drbd_ratelimit_state))
1065			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1066			    (unsigned long long)peer_req->i.sector);
1067
1068		err = drbd_send_ack(first_peer_device(device), P_NEG_RS_DREPLY, peer_req);
1069
1070		/* update resync data with failure */
1071		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1072	}
1073
1074	dec_unacked(device);
1075
1076	move_to_net_ee_or_free(device, peer_req);
1077
1078	if (unlikely(err))
1079		drbd_err(device, "drbd_send_block() failed\n");
1080	return err;
1081}
1082
1083int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1084{
1085	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1086	struct drbd_device *device = peer_req->peer_device->device;
1087	struct digest_info *di;
1088	int digest_size;
1089	void *digest = NULL;
1090	int err, eq = 0;
1091
1092	if (unlikely(cancel)) {
1093		drbd_free_peer_req(device, peer_req);
1094		dec_unacked(device);
1095		return 0;
1096	}
1097
1098	if (get_ldev(device)) {
1099		drbd_rs_complete_io(device, peer_req->i.sector);
1100		put_ldev(device);
1101	}
1102
1103	di = peer_req->digest;
1104
1105	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1106		/* quick hack to try to avoid a race against reconfiguration.
1107		 * a real fix would be much more involved,
1108		 * introducing more locking mechanisms */
1109		if (first_peer_device(device)->connection->csums_tfm) {
1110			digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->csums_tfm);
1111			D_ASSERT(device, digest_size == di->digest_size);
1112			digest = kmalloc(digest_size, GFP_NOIO);
1113		}
1114		if (digest) {
1115			drbd_csum_ee(first_peer_device(device)->connection->csums_tfm, peer_req, digest);
1116			eq = !memcmp(digest, di->digest, digest_size);
1117			kfree(digest);
1118		}
1119
1120		if (eq) {
1121			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1122			/* rs_same_csums unit is BM_BLOCK_SIZE */
1123			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1124			err = drbd_send_ack(first_peer_device(device), P_RS_IS_IN_SYNC, peer_req);
1125		} else {
1126			inc_rs_pending(device);
1127			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1128			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1129			kfree(di);
1130			err = drbd_send_block(first_peer_device(device), P_RS_DATA_REPLY, peer_req);
1131		}
1132	} else {
1133		err = drbd_send_ack(first_peer_device(device), P_NEG_RS_DREPLY, peer_req);
1134		if (__ratelimit(&drbd_ratelimit_state))
1135			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1136	}
1137
1138	dec_unacked(device);
1139	move_to_net_ee_or_free(device, peer_req);
1140
1141	if (unlikely(err))
1142		drbd_err(device, "drbd_send_block/ack() failed\n");
1143	return err;
1144}
1145
1146int w_e_end_ov_req(struct drbd_work *w, int cancel)
1147{
1148	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1149	struct drbd_device *device = peer_req->peer_device->device;
1150	sector_t sector = peer_req->i.sector;
1151	unsigned int size = peer_req->i.size;
1152	int digest_size;
1153	void *digest;
1154	int err = 0;
1155
1156	if (unlikely(cancel))
1157		goto out;
1158
1159	digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->verify_tfm);
1160	digest = kmalloc(digest_size, GFP_NOIO);
1161	if (!digest) {
1162		err = 1;	/* terminate the connection in case the allocation failed */
1163		goto out;
1164	}
1165
1166	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1167		drbd_csum_ee(first_peer_device(device)->connection->verify_tfm, peer_req, digest);
1168	else
1169		memset(digest, 0, digest_size);
1170
1171	/* Free e and pages before send.
1172	 * In case we block on congestion, we could otherwise run into
1173	 * some distributed deadlock, if the other side blocks on
1174	 * congestion as well, because our receiver blocks in
1175	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1176	drbd_free_peer_req(device, peer_req);
1177	peer_req = NULL;
1178	inc_rs_pending(device);
1179	err = drbd_send_drequest_csum(first_peer_device(device), sector, size, digest, digest_size, P_OV_REPLY);
1180	if (err)
1181		dec_rs_pending(device);
1182	kfree(digest);
1183
1184out:
1185	if (peer_req)
1186		drbd_free_peer_req(device, peer_req);
1187	dec_unacked(device);
1188	return err;
1189}
1190
1191void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1192{
1193	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1194		device->ov_last_oos_size += size>>9;
1195	} else {
1196		device->ov_last_oos_start = sector;
1197		device->ov_last_oos_size = size>>9;
1198	}
1199	drbd_set_out_of_sync(device, sector, size);
1200}
1201
1202int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1203{
1204	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1205	struct drbd_device *device = peer_req->peer_device->device;
1206	struct digest_info *di;
1207	void *digest;
1208	sector_t sector = peer_req->i.sector;
1209	unsigned int size = peer_req->i.size;
1210	int digest_size;
1211	int err, eq = 0;
1212	bool stop_sector_reached = false;
1213
1214	if (unlikely(cancel)) {
1215		drbd_free_peer_req(device, peer_req);
1216		dec_unacked(device);
1217		return 0;
1218	}
1219
1220	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1221	 * the resync lru has been cleaned up already */
1222	if (get_ldev(device)) {
1223		drbd_rs_complete_io(device, peer_req->i.sector);
1224		put_ldev(device);
1225	}
1226
1227	di = peer_req->digest;
1228
1229	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1230		digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->verify_tfm);
1231		digest = kmalloc(digest_size, GFP_NOIO);
1232		if (digest) {
1233			drbd_csum_ee(first_peer_device(device)->connection->verify_tfm, peer_req, digest);
1234
1235			D_ASSERT(device, digest_size == di->digest_size);
1236			eq = !memcmp(digest, di->digest, digest_size);
1237			kfree(digest);
1238		}
1239	}
1240
1241	/* Free peer_req and pages before send.
1242	 * In case we block on congestion, we could otherwise run into
1243	 * some distributed deadlock, if the other side blocks on
1244	 * congestion as well, because our receiver blocks in
1245	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1246	drbd_free_peer_req(device, peer_req);
1247	if (!eq)
1248		drbd_ov_out_of_sync_found(device, sector, size);
1249	else
1250		ov_out_of_sync_print(device);
1251
1252	err = drbd_send_ack_ex(first_peer_device(device), P_OV_RESULT, sector, size,
1253			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1254
1255	dec_unacked(device);
1256
1257	--device->ov_left;
1258
1259	/* let's advance progress step marks only for every other megabyte */
1260	if ((device->ov_left & 0x200) == 0x200)
1261		drbd_advance_rs_marks(device, device->ov_left);
1262
1263	stop_sector_reached = verify_can_do_stop_sector(device) &&
1264		(sector + (size>>9)) >= device->ov_stop_sector;
1265
1266	if (device->ov_left == 0 || stop_sector_reached) {
1267		ov_out_of_sync_print(device);
1268		drbd_resync_finished(device);
1269	}
1270
1271	return err;
1272}
1273
1274/* FIXME
1275 * We need to track the number of pending barrier acks,
1276 * and to be able to wait for them.
1277 * See also comment in drbd_adm_attach before drbd_suspend_io.
1278 */
1279static int drbd_send_barrier(struct drbd_connection *connection)
1280{
1281	struct p_barrier *p;
1282	struct drbd_socket *sock;
1283
1284	sock = &connection->data;
1285	p = conn_prepare_command(connection, sock);
1286	if (!p)
1287		return -EIO;
1288	p->barrier = connection->send.current_epoch_nr;
1289	p->pad = 0;
1290	connection->send.current_epoch_writes = 0;
1291
1292	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1293}
1294
1295int w_send_write_hint(struct drbd_work *w, int cancel)
1296{
1297	struct drbd_device *device =
1298		container_of(w, struct drbd_device, unplug_work);
1299	struct drbd_socket *sock;
1300
1301	if (cancel)
1302		return 0;
1303	sock = &first_peer_device(device)->connection->data;
1304	if (!drbd_prepare_command(first_peer_device(device), sock))
1305		return -EIO;
1306	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1307}
1308
1309static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1310{
1311	if (!connection->send.seen_any_write_yet) {
1312		connection->send.seen_any_write_yet = true;
1313		connection->send.current_epoch_nr = epoch;
1314		connection->send.current_epoch_writes = 0;
1315	}
1316}
1317
1318static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1319{
1320	/* re-init if first write on this connection */
1321	if (!connection->send.seen_any_write_yet)
1322		return;
1323	if (connection->send.current_epoch_nr != epoch) {
1324		if (connection->send.current_epoch_writes)
1325			drbd_send_barrier(connection);
1326		connection->send.current_epoch_nr = epoch;
1327	}
1328}
1329
1330int w_send_out_of_sync(struct drbd_work *w, int cancel)
1331{
1332	struct drbd_request *req = container_of(w, struct drbd_request, w);
1333	struct drbd_device *device = req->device;
1334	struct drbd_connection *connection = first_peer_device(device)->connection;
1335	int err;
1336
1337	if (unlikely(cancel)) {
1338		req_mod(req, SEND_CANCELED);
1339		return 0;
1340	}
1341
1342	/* this time, no connection->send.current_epoch_writes++;
1343	 * If it was sent, it was the closing barrier for the last
1344	 * replicated epoch, before we went into AHEAD mode.
1345	 * No more barriers will be sent, until we leave AHEAD mode again. */
1346	maybe_send_barrier(connection, req->epoch);
1347
1348	err = drbd_send_out_of_sync(first_peer_device(device), req);
1349	req_mod(req, OOS_HANDED_TO_NETWORK);
1350
1351	return err;
1352}
1353
1354/**
1355 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1356 * @w:		work object.
1357 * @cancel:	The connection will be closed anyways
1358 */
1359int w_send_dblock(struct drbd_work *w, int cancel)
1360{
1361	struct drbd_request *req = container_of(w, struct drbd_request, w);
1362	struct drbd_device *device = req->device;
1363	struct drbd_connection *connection = first_peer_device(device)->connection;
1364	int err;
1365
1366	if (unlikely(cancel)) {
1367		req_mod(req, SEND_CANCELED);
1368		return 0;
1369	}
1370
1371	re_init_if_first_write(connection, req->epoch);
1372	maybe_send_barrier(connection, req->epoch);
1373	connection->send.current_epoch_writes++;
1374
1375	err = drbd_send_dblock(first_peer_device(device), req);
1376	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1377
1378	return err;
1379}
1380
1381/**
1382 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1383 * @w:		work object.
1384 * @cancel:	The connection will be closed anyways
1385 */
1386int w_send_read_req(struct drbd_work *w, int cancel)
1387{
1388	struct drbd_request *req = container_of(w, struct drbd_request, w);
1389	struct drbd_device *device = req->device;
1390	struct drbd_connection *connection = first_peer_device(device)->connection;
1391	int err;
1392
1393	if (unlikely(cancel)) {
1394		req_mod(req, SEND_CANCELED);
1395		return 0;
1396	}
1397
1398	/* Even read requests may close a write epoch,
1399	 * if there was any yet. */
1400	maybe_send_barrier(connection, req->epoch);
1401
1402	err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size,
1403				 (unsigned long)req);
1404
1405	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1406
1407	return err;
1408}
1409
1410int w_restart_disk_io(struct drbd_work *w, int cancel)
1411{
1412	struct drbd_request *req = container_of(w, struct drbd_request, w);
1413	struct drbd_device *device = req->device;
1414
1415	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1416		drbd_al_begin_io(device, &req->i, false);
1417
1418	drbd_req_make_private_bio(req, req->master_bio);
1419	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1420	generic_make_request(req->private_bio);
1421
1422	return 0;
1423}
1424
1425static int _drbd_may_sync_now(struct drbd_device *device)
1426{
1427	struct drbd_device *odev = device;
1428	int resync_after;
1429
1430	while (1) {
1431		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1432			return 1;
1433		rcu_read_lock();
1434		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1435		rcu_read_unlock();
1436		if (resync_after == -1)
1437			return 1;
1438		odev = minor_to_device(resync_after);
1439		if (!odev)
1440			return 1;
1441		if ((odev->state.conn >= C_SYNC_SOURCE &&
1442		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1443		    odev->state.aftr_isp || odev->state.peer_isp ||
1444		    odev->state.user_isp)
1445			return 0;
1446	}
1447}
1448
1449/**
1450 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1451 * @device:	DRBD device.
1452 *
1453 * Called from process context only (admin command and after_state_ch).
1454 */
1455static int _drbd_pause_after(struct drbd_device *device)
1456{
1457	struct drbd_device *odev;
1458	int i, rv = 0;
1459
1460	rcu_read_lock();
1461	idr_for_each_entry(&drbd_devices, odev, i) {
1462		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1463			continue;
1464		if (!_drbd_may_sync_now(odev))
1465			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1466			       != SS_NOTHING_TO_DO);
1467	}
1468	rcu_read_unlock();
1469
1470	return rv;
1471}
1472
1473/**
1474 * _drbd_resume_next() - Resume resync on all devices that may resync now
1475 * @device:	DRBD device.
1476 *
1477 * Called from process context only (admin command and worker).
1478 */
1479static int _drbd_resume_next(struct drbd_device *device)
1480{
1481	struct drbd_device *odev;
1482	int i, rv = 0;
1483
1484	rcu_read_lock();
1485	idr_for_each_entry(&drbd_devices, odev, i) {
1486		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1487			continue;
1488		if (odev->state.aftr_isp) {
1489			if (_drbd_may_sync_now(odev))
1490				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1491							CS_HARD, NULL)
1492				       != SS_NOTHING_TO_DO) ;
1493		}
1494	}
1495	rcu_read_unlock();
1496	return rv;
1497}
1498
1499void resume_next_sg(struct drbd_device *device)
1500{
1501	write_lock_irq(&global_state_lock);
1502	_drbd_resume_next(device);
1503	write_unlock_irq(&global_state_lock);
1504}
1505
1506void suspend_other_sg(struct drbd_device *device)
1507{
1508	write_lock_irq(&global_state_lock);
1509	_drbd_pause_after(device);
1510	write_unlock_irq(&global_state_lock);
1511}
1512
1513/* caller must hold global_state_lock */
1514enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1515{
1516	struct drbd_device *odev;
1517	int resync_after;
1518
1519	if (o_minor == -1)
1520		return NO_ERROR;
1521	if (o_minor < -1 || o_minor > MINORMASK)
1522		return ERR_RESYNC_AFTER;
1523
1524	/* check for loops */
1525	odev = minor_to_device(o_minor);
1526	while (1) {
1527		if (odev == device)
1528			return ERR_RESYNC_AFTER_CYCLE;
1529
1530		/* You are free to depend on diskless, non-existing,
1531		 * or not yet/no longer existing minors.
1532		 * We only reject dependency loops.
1533		 * We cannot follow the dependency chain beyond a detached or
1534		 * missing minor.
1535		 */
1536		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1537			return NO_ERROR;
1538
1539		rcu_read_lock();
1540		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1541		rcu_read_unlock();
1542		/* dependency chain ends here, no cycles. */
1543		if (resync_after == -1)
1544			return NO_ERROR;
1545
1546		/* follow the dependency chain */
1547		odev = minor_to_device(resync_after);
1548	}
1549}
1550
1551/* caller must hold global_state_lock */
1552void drbd_resync_after_changed(struct drbd_device *device)
1553{
1554	int changes;
1555
1556	do {
1557		changes  = _drbd_pause_after(device);
1558		changes |= _drbd_resume_next(device);
1559	} while (changes);
1560}
1561
1562void drbd_rs_controller_reset(struct drbd_device *device)
1563{
1564	struct fifo_buffer *plan;
1565
1566	atomic_set(&device->rs_sect_in, 0);
1567	atomic_set(&device->rs_sect_ev, 0);
1568	device->rs_in_flight = 0;
1569
1570	/* Updating the RCU protected object in place is necessary since
1571	   this function gets called from atomic context.
1572	   It is valid since all other updates also lead to an completely
1573	   empty fifo */
1574	rcu_read_lock();
1575	plan = rcu_dereference(device->rs_plan_s);
1576	plan->total = 0;
1577	fifo_set(plan, 0);
1578	rcu_read_unlock();
1579}
1580
1581void start_resync_timer_fn(unsigned long data)
1582{
1583	struct drbd_device *device = (struct drbd_device *) data;
1584
1585	drbd_queue_work(&first_peer_device(device)->connection->sender_work,
1586			&device->start_resync_work);
1587}
1588
1589int w_start_resync(struct drbd_work *w, int cancel)
1590{
1591	struct drbd_device *device =
1592		container_of(w, struct drbd_device, start_resync_work);
1593
1594	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1595		drbd_warn(device, "w_start_resync later...\n");
1596		device->start_resync_timer.expires = jiffies + HZ/10;
1597		add_timer(&device->start_resync_timer);
1598		return 0;
1599	}
1600
1601	drbd_start_resync(device, C_SYNC_SOURCE);
1602	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1603	return 0;
1604}
1605
1606/**
1607 * drbd_start_resync() - Start the resync process
1608 * @device:	DRBD device.
1609 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1610 *
1611 * This function might bring you directly into one of the
1612 * C_PAUSED_SYNC_* states.
1613 */
1614void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1615{
1616	union drbd_state ns;
1617	int r;
1618
1619	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1620		drbd_err(device, "Resync already running!\n");
1621		return;
1622	}
1623
1624	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1625		if (side == C_SYNC_TARGET) {
1626			/* Since application IO was locked out during C_WF_BITMAP_T and
1627			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1628			   we check that we might make the data inconsistent. */
1629			r = drbd_khelper(device, "before-resync-target");
1630			r = (r >> 8) & 0xff;
1631			if (r > 0) {
1632				drbd_info(device, "before-resync-target handler returned %d, "
1633					 "dropping connection.\n", r);
1634				conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
1635				return;
1636			}
1637		} else /* C_SYNC_SOURCE */ {
1638			r = drbd_khelper(device, "before-resync-source");
1639			r = (r >> 8) & 0xff;
1640			if (r > 0) {
1641				if (r == 3) {
1642					drbd_info(device, "before-resync-source handler returned %d, "
1643						 "ignoring. Old userland tools?", r);
1644				} else {
1645					drbd_info(device, "before-resync-source handler returned %d, "
1646						 "dropping connection.\n", r);
1647					conn_request_state(first_peer_device(device)->connection,
1648							   NS(conn, C_DISCONNECTING), CS_HARD);
1649					return;
1650				}
1651			}
1652		}
1653	}
1654
1655	if (current == first_peer_device(device)->connection->worker.task) {
1656		/* The worker should not sleep waiting for state_mutex,
1657		   that can take long */
1658		if (!mutex_trylock(device->state_mutex)) {
1659			set_bit(B_RS_H_DONE, &device->flags);
1660			device->start_resync_timer.expires = jiffies + HZ/5;
1661			add_timer(&device->start_resync_timer);
1662			return;
1663		}
1664	} else {
1665		mutex_lock(device->state_mutex);
1666	}
1667	clear_bit(B_RS_H_DONE, &device->flags);
1668
1669	write_lock_irq(&global_state_lock);
1670	/* Did some connection breakage or IO error race with us? */
1671	if (device->state.conn < C_CONNECTED
1672	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1673		write_unlock_irq(&global_state_lock);
1674		mutex_unlock(device->state_mutex);
1675		return;
1676	}
1677
1678	ns = drbd_read_state(device);
1679
1680	ns.aftr_isp = !_drbd_may_sync_now(device);
1681
1682	ns.conn = side;
1683
1684	if (side == C_SYNC_TARGET)
1685		ns.disk = D_INCONSISTENT;
1686	else /* side == C_SYNC_SOURCE */
1687		ns.pdsk = D_INCONSISTENT;
1688
1689	r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1690	ns = drbd_read_state(device);
1691
1692	if (ns.conn < C_CONNECTED)
1693		r = SS_UNKNOWN_ERROR;
1694
1695	if (r == SS_SUCCESS) {
1696		unsigned long tw = drbd_bm_total_weight(device);
1697		unsigned long now = jiffies;
1698		int i;
1699
1700		device->rs_failed    = 0;
1701		device->rs_paused    = 0;
1702		device->rs_same_csum = 0;
1703		device->rs_last_events = 0;
1704		device->rs_last_sect_ev = 0;
1705		device->rs_total     = tw;
1706		device->rs_start     = now;
1707		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1708			device->rs_mark_left[i] = tw;
1709			device->rs_mark_time[i] = now;
1710		}
1711		_drbd_pause_after(device);
1712	}
1713	write_unlock_irq(&global_state_lock);
1714
1715	if (r == SS_SUCCESS) {
1716		/* reset rs_last_bcast when a resync or verify is started,
1717		 * to deal with potential jiffies wrap. */
1718		device->rs_last_bcast = jiffies - HZ;
1719
1720		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1721		     drbd_conn_str(ns.conn),
1722		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1723		     (unsigned long) device->rs_total);
1724		if (side == C_SYNC_TARGET)
1725			device->bm_resync_fo = 0;
1726
1727		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1728		 * with w_send_oos, or the sync target will get confused as to
1729		 * how much bits to resync.  We cannot do that always, because for an
1730		 * empty resync and protocol < 95, we need to do it here, as we call
1731		 * drbd_resync_finished from here in that case.
1732		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1733		 * and from after_state_ch otherwise. */
1734		if (side == C_SYNC_SOURCE &&
1735		    first_peer_device(device)->connection->agreed_pro_version < 96)
1736			drbd_gen_and_send_sync_uuid(first_peer_device(device));
1737
1738		if (first_peer_device(device)->connection->agreed_pro_version < 95 &&
1739		    device->rs_total == 0) {
1740			/* This still has a race (about when exactly the peers
1741			 * detect connection loss) that can lead to a full sync
1742			 * on next handshake. In 8.3.9 we fixed this with explicit
1743			 * resync-finished notifications, but the fix
1744			 * introduces a protocol change.  Sleeping for some
1745			 * time longer than the ping interval + timeout on the
1746			 * SyncSource, to give the SyncTarget the chance to
1747			 * detect connection loss, then waiting for a ping
1748			 * response (implicit in drbd_resync_finished) reduces
1749			 * the race considerably, but does not solve it. */
1750			if (side == C_SYNC_SOURCE) {
1751				struct net_conf *nc;
1752				int timeo;
1753
1754				rcu_read_lock();
1755				nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
1756				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1757				rcu_read_unlock();
1758				schedule_timeout_interruptible(timeo);
1759			}
1760			drbd_resync_finished(device);
1761		}
1762
1763		drbd_rs_controller_reset(device);
1764		/* ns.conn may already be != device->state.conn,
1765		 * we may have been paused in between, or become paused until
1766		 * the timer triggers.
1767		 * No matter, that is handled in resync_timer_fn() */
1768		if (ns.conn == C_SYNC_TARGET)
1769			mod_timer(&device->resync_timer, jiffies);
1770
1771		drbd_md_sync(device);
1772	}
1773	put_ldev(device);
1774	mutex_unlock(device->state_mutex);
1775}
1776
1777/* If the resource already closed the current epoch, but we did not
1778 * (because we have not yet seen new requests), we should send the
1779 * corresponding barrier now.  Must be checked within the same spinlock
1780 * that is used to check for new requests. */
1781static bool need_to_send_barrier(struct drbd_connection *connection)
1782{
1783	if (!connection->send.seen_any_write_yet)
1784		return false;
1785
1786	/* Skip barriers that do not contain any writes.
1787	 * This may happen during AHEAD mode. */
1788	if (!connection->send.current_epoch_writes)
1789		return false;
1790
1791	/* ->req_lock is held when requests are queued on
1792	 * connection->sender_work, and put into ->transfer_log.
1793	 * It is also held when ->current_tle_nr is increased.
1794	 * So either there are already new requests queued,
1795	 * and corresponding barriers will be send there.
1796	 * Or nothing new is queued yet, so the difference will be 1.
1797	 */
1798	if (atomic_read(&connection->current_tle_nr) !=
1799	    connection->send.current_epoch_nr + 1)
1800		return false;
1801
1802	return true;
1803}
1804
1805static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1806{
1807	spin_lock_irq(&queue->q_lock);
1808	list_splice_init(&queue->q, work_list);
1809	spin_unlock_irq(&queue->q_lock);
1810	return !list_empty(work_list);
1811}
1812
1813static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1814{
1815	spin_lock_irq(&queue->q_lock);
1816	if (!list_empty(&queue->q))
1817		list_move(queue->q.next, work_list);
1818	spin_unlock_irq(&queue->q_lock);
1819	return !list_empty(work_list);
1820}
1821
1822static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1823{
1824	DEFINE_WAIT(wait);
1825	struct net_conf *nc;
1826	int uncork, cork;
1827
1828	dequeue_work_item(&connection->sender_work, work_list);
1829	if (!list_empty(work_list))
1830		return;
1831
1832	/* Still nothing to do?
1833	 * Maybe we still need to close the current epoch,
1834	 * even if no new requests are queued yet.
1835	 *
1836	 * Also, poke TCP, just in case.
1837	 * Then wait for new work (or signal). */
1838	rcu_read_lock();
1839	nc = rcu_dereference(connection->net_conf);
1840	uncork = nc ? nc->tcp_cork : 0;
1841	rcu_read_unlock();
1842	if (uncork) {
1843		mutex_lock(&connection->data.mutex);
1844		if (connection->data.socket)
1845			drbd_tcp_uncork(connection->data.socket);
1846		mutex_unlock(&connection->data.mutex);
1847	}
1848
1849	for (;;) {
1850		int send_barrier;
1851		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1852		spin_lock_irq(&connection->resource->req_lock);
1853		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1854		/* dequeue single item only,
1855		 * we still use drbd_queue_work_front() in some places */
1856		if (!list_empty(&connection->sender_work.q))
1857			list_move(connection->sender_work.q.next, work_list);
1858		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
1859		if (!list_empty(work_list) || signal_pending(current)) {
1860			spin_unlock_irq(&connection->resource->req_lock);
1861			break;
1862		}
1863		send_barrier = need_to_send_barrier(connection);
1864		spin_unlock_irq(&connection->resource->req_lock);
1865		if (send_barrier) {
1866			drbd_send_barrier(connection);
1867			connection->send.current_epoch_nr++;
1868		}
1869		schedule();
1870		/* may be woken up for other things but new work, too,
1871		 * e.g. if the current epoch got closed.
1872		 * In which case we send the barrier above. */
1873	}
1874	finish_wait(&connection->sender_work.q_wait, &wait);
1875
1876	/* someone may have changed the config while we have been waiting above. */
1877	rcu_read_lock();
1878	nc = rcu_dereference(connection->net_conf);
1879	cork = nc ? nc->tcp_cork : 0;
1880	rcu_read_unlock();
1881	mutex_lock(&connection->data.mutex);
1882	if (connection->data.socket) {
1883		if (cork)
1884			drbd_tcp_cork(connection->data.socket);
1885		else if (!uncork)
1886			drbd_tcp_uncork(connection->data.socket);
1887	}
1888	mutex_unlock(&connection->data.mutex);
1889}
1890
1891int drbd_worker(struct drbd_thread *thi)
1892{
1893	struct drbd_connection *connection = thi->connection;
1894	struct drbd_device_work *dw = NULL;
1895	struct drbd_peer_device *peer_device;
1896	LIST_HEAD(work_list);
1897	int vnr;
1898
1899	while (get_t_state(thi) == RUNNING) {
1900		drbd_thread_current_set_cpu(thi);
1901
1902		/* as long as we use drbd_queue_work_front(),
1903		 * we may only dequeue single work items here, not batches. */
1904		if (list_empty(&work_list))
1905			wait_for_work(connection, &work_list);
1906
1907		if (signal_pending(current)) {
1908			flush_signals(current);
1909			if (get_t_state(thi) == RUNNING) {
1910				drbd_warn(connection, "Worker got an unexpected signal\n");
1911				continue;
1912			}
1913			break;
1914		}
1915
1916		if (get_t_state(thi) != RUNNING)
1917			break;
1918
1919		while (!list_empty(&work_list)) {
1920			dw = list_first_entry(&work_list, struct drbd_device_work, w.list);
1921			list_del_init(&dw->w.list);
1922			if (dw->w.cb(&dw->w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
1923				continue;
1924			if (connection->cstate >= C_WF_REPORT_PARAMS)
1925				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1926		}
1927	}
1928
1929	do {
1930		while (!list_empty(&work_list)) {
1931			dw = list_first_entry(&work_list, struct drbd_device_work, w.list);
1932			list_del_init(&dw->w.list);
1933			dw->w.cb(&dw->w, 1);
1934		}
1935		dequeue_work_batch(&connection->sender_work, &work_list);
1936	} while (!list_empty(&work_list));
1937
1938	rcu_read_lock();
1939	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1940		struct drbd_device *device = peer_device->device;
1941		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
1942		kref_get(&device->kref);
1943		rcu_read_unlock();
1944		drbd_device_cleanup(device);
1945		kref_put(&device->kref, drbd_destroy_device);
1946		rcu_read_lock();
1947	}
1948	rcu_read_unlock();
1949
1950	return 0;
1951}
1952