drbd_worker.c revision 10f6d9926cd17afff9dc03c967706419798b4929
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
40
41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42static int w_make_resync_request(struct drbd_conf *mdev,
43				 struct drbd_work *w, int cancel);
44
45
46
47/* defined here:
48   drbd_md_io_complete
49   drbd_endio_sec
50   drbd_endio_pri
51
52 * more endio handlers:
53   atodb_endio in drbd_actlog.c
54   drbd_bm_async_io_complete in drbd_bitmap.c
55
56 * For all these callbacks, note the following:
57 * The callbacks will be called in irq context by the IDE drivers,
58 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
59 * Try to get the locking right :)
60 *
61 */
62
63
64/* About the global_state_lock
65   Each state transition on an device holds a read lock. In case we have
66   to evaluate the sync after dependencies, we grab a write lock, because
67   we need stable states on all devices for that.  */
68rwlock_t global_state_lock;
69
70/* used for synchronous meta data and bitmap IO
71 * submitted by drbd_md_sync_page_io()
72 */
73void drbd_md_io_complete(struct bio *bio, int error)
74{
75	struct drbd_md_io *md_io;
76
77	md_io = (struct drbd_md_io *)bio->bi_private;
78	md_io->error = error;
79
80	complete(&md_io->event);
81}
82
83/* reads on behalf of the partner,
84 * "submitted" by the receiver
85 */
86void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
87{
88	unsigned long flags = 0;
89	struct drbd_conf *mdev = e->mdev;
90
91	D_ASSERT(e->block_id != ID_VACANT);
92
93	spin_lock_irqsave(&mdev->req_lock, flags);
94	mdev->read_cnt += e->size >> 9;
95	list_del(&e->w.list);
96	if (list_empty(&mdev->read_ee))
97		wake_up(&mdev->ee_wait);
98	if (test_bit(__EE_WAS_ERROR, &e->flags))
99		__drbd_chk_io_error(mdev, false);
100	spin_unlock_irqrestore(&mdev->req_lock, flags);
101
102	drbd_queue_work(&mdev->data.work, &e->w);
103	put_ldev(mdev);
104}
105
106/* writes on behalf of the partner, or resync writes,
107 * "submitted" by the receiver, final stage.  */
108static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
109{
110	unsigned long flags = 0;
111	struct drbd_conf *mdev = e->mdev;
112	sector_t e_sector;
113	int do_wake;
114	int is_syncer_req;
115	int do_al_complete_io;
116
117	D_ASSERT(e->block_id != ID_VACANT);
118
119	/* after we moved e to done_ee,
120	 * we may no longer access it,
121	 * it may be freed/reused already!
122	 * (as soon as we release the req_lock) */
123	e_sector = e->sector;
124	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
125	is_syncer_req = is_syncer_block_id(e->block_id);
126
127	spin_lock_irqsave(&mdev->req_lock, flags);
128	mdev->writ_cnt += e->size >> 9;
129	list_del(&e->w.list); /* has been on active_ee or sync_ee */
130	list_add_tail(&e->w.list, &mdev->done_ee);
131
132	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
133	 * neither did we wake possibly waiting conflicting requests.
134	 * done from "drbd_process_done_ee" within the appropriate w.cb
135	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
136
137	do_wake = is_syncer_req
138		? list_empty(&mdev->sync_ee)
139		: list_empty(&mdev->active_ee);
140
141	if (test_bit(__EE_WAS_ERROR, &e->flags))
142		__drbd_chk_io_error(mdev, false);
143	spin_unlock_irqrestore(&mdev->req_lock, flags);
144
145	if (is_syncer_req)
146		drbd_rs_complete_io(mdev, e_sector);
147
148	if (do_wake)
149		wake_up(&mdev->ee_wait);
150
151	if (do_al_complete_io)
152		drbd_al_complete_io(mdev, e_sector);
153
154	wake_asender(mdev);
155	put_ldev(mdev);
156}
157
158/* writes on behalf of the partner, or resync writes,
159 * "submitted" by the receiver.
160 */
161void drbd_endio_sec(struct bio *bio, int error)
162{
163	struct drbd_epoch_entry *e = bio->bi_private;
164	struct drbd_conf *mdev = e->mdev;
165	int uptodate = bio_flagged(bio, BIO_UPTODATE);
166	int is_write = bio_data_dir(bio) == WRITE;
167
168	if (error && __ratelimit(&drbd_ratelimit_state))
169		dev_warn(DEV, "%s: error=%d s=%llus\n",
170				is_write ? "write" : "read", error,
171				(unsigned long long)e->sector);
172	if (!error && !uptodate) {
173		if (__ratelimit(&drbd_ratelimit_state))
174			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
175					is_write ? "write" : "read",
176					(unsigned long long)e->sector);
177		/* strange behavior of some lower level drivers...
178		 * fail the request by clearing the uptodate flag,
179		 * but do not return any error?! */
180		error = -EIO;
181	}
182
183	if (error)
184		set_bit(__EE_WAS_ERROR, &e->flags);
185
186	bio_put(bio); /* no need for the bio anymore */
187	if (atomic_dec_and_test(&e->pending_bios)) {
188		if (is_write)
189			drbd_endio_write_sec_final(e);
190		else
191			drbd_endio_read_sec_final(e);
192	}
193}
194
195/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
196 */
197void drbd_endio_pri(struct bio *bio, int error)
198{
199	unsigned long flags;
200	struct drbd_request *req = bio->bi_private;
201	struct drbd_conf *mdev = req->mdev;
202	struct bio_and_error m;
203	enum drbd_req_event what;
204	int uptodate = bio_flagged(bio, BIO_UPTODATE);
205
206	if (!error && !uptodate) {
207		dev_warn(DEV, "p %s: setting error to -EIO\n",
208			 bio_data_dir(bio) == WRITE ? "write" : "read");
209		/* strange behavior of some lower level drivers...
210		 * fail the request by clearing the uptodate flag,
211		 * but do not return any error?! */
212		error = -EIO;
213	}
214
215	/* to avoid recursion in __req_mod */
216	if (unlikely(error)) {
217		what = (bio_data_dir(bio) == WRITE)
218			? write_completed_with_error
219			: (bio_rw(bio) == READ)
220			  ? read_completed_with_error
221			  : read_ahead_completed_with_error;
222	} else
223		what = completed_ok;
224
225	bio_put(req->private_bio);
226	req->private_bio = ERR_PTR(error);
227
228	/* not req_mod(), we need irqsave here! */
229	spin_lock_irqsave(&mdev->req_lock, flags);
230	__req_mod(req, what, &m);
231	spin_unlock_irqrestore(&mdev->req_lock, flags);
232
233	if (m.bio)
234		complete_master_bio(mdev, &m);
235}
236
237int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
238{
239	struct drbd_request *req = container_of(w, struct drbd_request, w);
240
241	/* We should not detach for read io-error,
242	 * but try to WRITE the P_DATA_REPLY to the failed location,
243	 * to give the disk the chance to relocate that block */
244
245	spin_lock_irq(&mdev->req_lock);
246	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
247		_req_mod(req, read_retry_remote_canceled);
248		spin_unlock_irq(&mdev->req_lock);
249		return 1;
250	}
251	spin_unlock_irq(&mdev->req_lock);
252
253	return w_send_read_req(mdev, w, 0);
254}
255
256void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
257{
258	struct hash_desc desc;
259	struct scatterlist sg;
260	struct page *page = e->pages;
261	struct page *tmp;
262	unsigned len;
263
264	desc.tfm = tfm;
265	desc.flags = 0;
266
267	sg_init_table(&sg, 1);
268	crypto_hash_init(&desc);
269
270	while ((tmp = page_chain_next(page))) {
271		/* all but the last page will be fully used */
272		sg_set_page(&sg, page, PAGE_SIZE, 0);
273		crypto_hash_update(&desc, &sg, sg.length);
274		page = tmp;
275	}
276	/* and now the last, possibly only partially used page */
277	len = e->size & (PAGE_SIZE - 1);
278	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
279	crypto_hash_update(&desc, &sg, sg.length);
280	crypto_hash_final(&desc, digest);
281}
282
283void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
284{
285	struct hash_desc desc;
286	struct scatterlist sg;
287	struct bio_vec *bvec;
288	int i;
289
290	desc.tfm = tfm;
291	desc.flags = 0;
292
293	sg_init_table(&sg, 1);
294	crypto_hash_init(&desc);
295
296	__bio_for_each_segment(bvec, bio, i, 0) {
297		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
298		crypto_hash_update(&desc, &sg, sg.length);
299	}
300	crypto_hash_final(&desc, digest);
301}
302
303static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
304{
305	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
306	int digest_size;
307	void *digest;
308	int ok;
309
310	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
311
312	if (unlikely(cancel)) {
313		drbd_free_ee(mdev, e);
314		return 1;
315	}
316
317	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
318		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
319		digest = kmalloc(digest_size, GFP_NOIO);
320		if (digest) {
321			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
322
323			inc_rs_pending(mdev);
324			ok = drbd_send_drequest_csum(mdev,
325						     e->sector,
326						     e->size,
327						     digest,
328						     digest_size,
329						     P_CSUM_RS_REQUEST);
330			kfree(digest);
331		} else {
332			dev_err(DEV, "kmalloc() of digest failed.\n");
333			ok = 0;
334		}
335	} else
336		ok = 1;
337
338	drbd_free_ee(mdev, e);
339
340	if (unlikely(!ok))
341		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
342	return ok;
343}
344
345#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
346
347static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
348{
349	struct drbd_epoch_entry *e;
350
351	if (!get_ldev(mdev))
352		return -EIO;
353
354	if (drbd_rs_should_slow_down(mdev, sector))
355		goto defer;
356
357	/* GFP_TRY, because if there is no memory available right now, this may
358	 * be rescheduled for later. It is "only" background resync, after all. */
359	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
360	if (!e)
361		goto defer;
362
363	e->w.cb = w_e_send_csum;
364	spin_lock_irq(&mdev->req_lock);
365	list_add(&e->w.list, &mdev->read_ee);
366	spin_unlock_irq(&mdev->req_lock);
367
368	atomic_add(size >> 9, &mdev->rs_sect_ev);
369	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
370		return 0;
371
372	/* If it failed because of ENOMEM, retry should help.  If it failed
373	 * because bio_add_page failed (probably broken lower level driver),
374	 * retry may or may not help.
375	 * If it does not, you may need to force disconnect. */
376	spin_lock_irq(&mdev->req_lock);
377	list_del(&e->w.list);
378	spin_unlock_irq(&mdev->req_lock);
379
380	drbd_free_ee(mdev, e);
381defer:
382	put_ldev(mdev);
383	return -EAGAIN;
384}
385
386int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
387{
388	switch (mdev->state.conn) {
389	case C_VERIFY_S:
390		w_make_ov_request(mdev, w, cancel);
391		break;
392	case C_SYNC_TARGET:
393		w_make_resync_request(mdev, w, cancel);
394		break;
395	}
396
397	return 1;
398}
399
400void resync_timer_fn(unsigned long data)
401{
402	struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404	if (list_empty(&mdev->resync_work.list))
405		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
406}
407
408static void fifo_set(struct fifo_buffer *fb, int value)
409{
410	int i;
411
412	for (i = 0; i < fb->size; i++)
413		fb->values[i] = value;
414}
415
416static int fifo_push(struct fifo_buffer *fb, int value)
417{
418	int ov;
419
420	ov = fb->values[fb->head_index];
421	fb->values[fb->head_index++] = value;
422
423	if (fb->head_index >= fb->size)
424		fb->head_index = 0;
425
426	return ov;
427}
428
429static void fifo_add_val(struct fifo_buffer *fb, int value)
430{
431	int i;
432
433	for (i = 0; i < fb->size; i++)
434		fb->values[i] += value;
435}
436
437static int drbd_rs_controller(struct drbd_conf *mdev)
438{
439	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
440	unsigned int want;     /* The number of sectors we want in the proxy */
441	int req_sect; /* Number of sectors to request in this turn */
442	int correction; /* Number of sectors more we need in the proxy*/
443	int cps; /* correction per invocation of drbd_rs_controller() */
444	int steps; /* Number of time steps to plan ahead */
445	int curr_corr;
446	int max_sect;
447
448	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
449	mdev->rs_in_flight -= sect_in;
450
451	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
452
453	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
454
455	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
456		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
457	} else { /* normal path */
458		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
459			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
460	}
461
462	correction = want - mdev->rs_in_flight - mdev->rs_planed;
463
464	/* Plan ahead */
465	cps = correction / steps;
466	fifo_add_val(&mdev->rs_plan_s, cps);
467	mdev->rs_planed += cps * steps;
468
469	/* What we do in this step */
470	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
471	spin_unlock(&mdev->peer_seq_lock);
472	mdev->rs_planed -= curr_corr;
473
474	req_sect = sect_in + curr_corr;
475	if (req_sect < 0)
476		req_sect = 0;
477
478	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
479	if (req_sect > max_sect)
480		req_sect = max_sect;
481
482	/*
483	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
484		 sect_in, mdev->rs_in_flight, want, correction,
485		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
486	*/
487
488	return req_sect;
489}
490
491static int drbd_rs_number_requests(struct drbd_conf *mdev)
492{
493	int number;
494	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
495		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
496		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
497	} else {
498		mdev->c_sync_rate = mdev->sync_conf.rate;
499		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
500	}
501
502	/* ignore the amount of pending requests, the resync controller should
503	 * throttle down to incoming reply rate soon enough anyways. */
504	return number;
505}
506
507static int w_make_resync_request(struct drbd_conf *mdev,
508				 struct drbd_work *w, int cancel)
509{
510	unsigned long bit;
511	sector_t sector;
512	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
513	int max_bio_size;
514	int number, rollback_i, size;
515	int align, queued, sndbuf;
516	int i = 0;
517
518	if (unlikely(cancel))
519		return 1;
520
521	if (mdev->rs_total == 0) {
522		/* empty resync? */
523		drbd_resync_finished(mdev);
524		return 1;
525	}
526
527	if (!get_ldev(mdev)) {
528		/* Since we only need to access mdev->rsync a
529		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
530		   to continue resync with a broken disk makes no sense at
531		   all */
532		dev_err(DEV, "Disk broke down during resync!\n");
533		return 1;
534	}
535
536	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
537	 * if it should be necessary */
538	max_bio_size =
539		mdev->agreed_pro_version < 94 ? queue_max_hw_sectors(mdev->rq_queue) << 9 :
540		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_BIO_SIZE;
541
542	number = drbd_rs_number_requests(mdev);
543	if (number == 0)
544		goto requeue;
545
546	for (i = 0; i < number; i++) {
547		/* Stop generating RS requests, when half of the send buffer is filled */
548		mutex_lock(&mdev->data.mutex);
549		if (mdev->data.socket) {
550			queued = mdev->data.socket->sk->sk_wmem_queued;
551			sndbuf = mdev->data.socket->sk->sk_sndbuf;
552		} else {
553			queued = 1;
554			sndbuf = 0;
555		}
556		mutex_unlock(&mdev->data.mutex);
557		if (queued > sndbuf / 2)
558			goto requeue;
559
560next_sector:
561		size = BM_BLOCK_SIZE;
562		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
563
564		if (bit == DRBD_END_OF_BITMAP) {
565			mdev->bm_resync_fo = drbd_bm_bits(mdev);
566			put_ldev(mdev);
567			return 1;
568		}
569
570		sector = BM_BIT_TO_SECT(bit);
571
572		if (drbd_rs_should_slow_down(mdev, sector) ||
573		    drbd_try_rs_begin_io(mdev, sector)) {
574			mdev->bm_resync_fo = bit;
575			goto requeue;
576		}
577		mdev->bm_resync_fo = bit + 1;
578
579		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
580			drbd_rs_complete_io(mdev, sector);
581			goto next_sector;
582		}
583
584#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
585		/* try to find some adjacent bits.
586		 * we stop if we have already the maximum req size.
587		 *
588		 * Additionally always align bigger requests, in order to
589		 * be prepared for all stripe sizes of software RAIDs.
590		 */
591		align = 1;
592		rollback_i = i;
593		for (;;) {
594			if (size + BM_BLOCK_SIZE > max_bio_size)
595				break;
596
597			/* Be always aligned */
598			if (sector & ((1<<(align+3))-1))
599				break;
600
601			/* do not cross extent boundaries */
602			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
603				break;
604			/* now, is it actually dirty, after all?
605			 * caution, drbd_bm_test_bit is tri-state for some
606			 * obscure reason; ( b == 0 ) would get the out-of-band
607			 * only accidentally right because of the "oddly sized"
608			 * adjustment below */
609			if (drbd_bm_test_bit(mdev, bit+1) != 1)
610				break;
611			bit++;
612			size += BM_BLOCK_SIZE;
613			if ((BM_BLOCK_SIZE << align) <= size)
614				align++;
615			i++;
616		}
617		/* if we merged some,
618		 * reset the offset to start the next drbd_bm_find_next from */
619		if (size > BM_BLOCK_SIZE)
620			mdev->bm_resync_fo = bit + 1;
621#endif
622
623		/* adjust very last sectors, in case we are oddly sized */
624		if (sector + (size>>9) > capacity)
625			size = (capacity-sector)<<9;
626		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
627			switch (read_for_csum(mdev, sector, size)) {
628			case -EIO: /* Disk failure */
629				put_ldev(mdev);
630				return 0;
631			case -EAGAIN: /* allocation failed, or ldev busy */
632				drbd_rs_complete_io(mdev, sector);
633				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
634				i = rollback_i;
635				goto requeue;
636			case 0:
637				/* everything ok */
638				break;
639			default:
640				BUG();
641			}
642		} else {
643			inc_rs_pending(mdev);
644			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
645					       sector, size, ID_SYNCER)) {
646				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
647				dec_rs_pending(mdev);
648				put_ldev(mdev);
649				return 0;
650			}
651		}
652	}
653
654	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
655		/* last syncer _request_ was sent,
656		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
657		 * next sync group will resume), as soon as we receive the last
658		 * resync data block, and the last bit is cleared.
659		 * until then resync "work" is "inactive" ...
660		 */
661		put_ldev(mdev);
662		return 1;
663	}
664
665 requeue:
666	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
667	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
668	put_ldev(mdev);
669	return 1;
670}
671
672static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
673{
674	int number, i, size;
675	sector_t sector;
676	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
677
678	if (unlikely(cancel))
679		return 1;
680
681	number = drbd_rs_number_requests(mdev);
682
683	sector = mdev->ov_position;
684	for (i = 0; i < number; i++) {
685		if (sector >= capacity) {
686			return 1;
687		}
688
689		size = BM_BLOCK_SIZE;
690
691		if (drbd_rs_should_slow_down(mdev, sector) ||
692		    drbd_try_rs_begin_io(mdev, sector)) {
693			mdev->ov_position = sector;
694			goto requeue;
695		}
696
697		if (sector + (size>>9) > capacity)
698			size = (capacity-sector)<<9;
699
700		inc_rs_pending(mdev);
701		if (!drbd_send_ov_request(mdev, sector, size)) {
702			dec_rs_pending(mdev);
703			return 0;
704		}
705		sector += BM_SECT_PER_BIT;
706	}
707	mdev->ov_position = sector;
708
709 requeue:
710	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
711	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
712	return 1;
713}
714
715
716void start_resync_timer_fn(unsigned long data)
717{
718	struct drbd_conf *mdev = (struct drbd_conf *) data;
719
720	drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
721}
722
723int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
724{
725	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
726		dev_warn(DEV, "w_start_resync later...\n");
727		mdev->start_resync_timer.expires = jiffies + HZ/10;
728		add_timer(&mdev->start_resync_timer);
729		return 1;
730	}
731
732	drbd_start_resync(mdev, C_SYNC_SOURCE);
733	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
734	return 1;
735}
736
737int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
738{
739	kfree(w);
740	ov_oos_print(mdev);
741	drbd_resync_finished(mdev);
742
743	return 1;
744}
745
746static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
747{
748	kfree(w);
749
750	drbd_resync_finished(mdev);
751
752	return 1;
753}
754
755static void ping_peer(struct drbd_conf *mdev)
756{
757	clear_bit(GOT_PING_ACK, &mdev->flags);
758	request_ping(mdev);
759	wait_event(mdev->misc_wait,
760		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
761}
762
763int drbd_resync_finished(struct drbd_conf *mdev)
764{
765	unsigned long db, dt, dbdt;
766	unsigned long n_oos;
767	union drbd_state os, ns;
768	struct drbd_work *w;
769	char *khelper_cmd = NULL;
770	int verify_done = 0;
771
772	/* Remove all elements from the resync LRU. Since future actions
773	 * might set bits in the (main) bitmap, then the entries in the
774	 * resync LRU would be wrong. */
775	if (drbd_rs_del_all(mdev)) {
776		/* In case this is not possible now, most probably because
777		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
778		 * queue (or even the read operations for those packets
779		 * is not finished by now).   Retry in 100ms. */
780
781		schedule_timeout_interruptible(HZ / 10);
782		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
783		if (w) {
784			w->cb = w_resync_finished;
785			drbd_queue_work(&mdev->data.work, w);
786			return 1;
787		}
788		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
789	}
790
791	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
792	if (dt <= 0)
793		dt = 1;
794	db = mdev->rs_total;
795	dbdt = Bit2KB(db/dt);
796	mdev->rs_paused /= HZ;
797
798	if (!get_ldev(mdev))
799		goto out;
800
801	ping_peer(mdev);
802
803	spin_lock_irq(&mdev->req_lock);
804	os = mdev->state;
805
806	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
807
808	/* This protects us against multiple calls (that can happen in the presence
809	   of application IO), and against connectivity loss just before we arrive here. */
810	if (os.conn <= C_CONNECTED)
811		goto out_unlock;
812
813	ns = os;
814	ns.conn = C_CONNECTED;
815
816	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
817	     verify_done ? "Online verify " : "Resync",
818	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
819
820	n_oos = drbd_bm_total_weight(mdev);
821
822	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
823		if (n_oos) {
824			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
825			      n_oos, Bit2KB(1));
826			khelper_cmd = "out-of-sync";
827		}
828	} else {
829		D_ASSERT((n_oos - mdev->rs_failed) == 0);
830
831		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
832			khelper_cmd = "after-resync-target";
833
834		if (mdev->csums_tfm && mdev->rs_total) {
835			const unsigned long s = mdev->rs_same_csum;
836			const unsigned long t = mdev->rs_total;
837			const int ratio =
838				(t == 0)     ? 0 :
839			(t < 100000) ? ((s*100)/t) : (s/(t/100));
840			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
841			     "transferred %luK total %luK\n",
842			     ratio,
843			     Bit2KB(mdev->rs_same_csum),
844			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
845			     Bit2KB(mdev->rs_total));
846		}
847	}
848
849	if (mdev->rs_failed) {
850		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
851
852		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
853			ns.disk = D_INCONSISTENT;
854			ns.pdsk = D_UP_TO_DATE;
855		} else {
856			ns.disk = D_UP_TO_DATE;
857			ns.pdsk = D_INCONSISTENT;
858		}
859	} else {
860		ns.disk = D_UP_TO_DATE;
861		ns.pdsk = D_UP_TO_DATE;
862
863		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
864			if (mdev->p_uuid) {
865				int i;
866				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
867					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
868				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
869				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
870			} else {
871				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
872			}
873		}
874
875		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
876			/* for verify runs, we don't update uuids here,
877			 * so there would be nothing to report. */
878			drbd_uuid_set_bm(mdev, 0UL);
879			drbd_print_uuids(mdev, "updated UUIDs");
880			if (mdev->p_uuid) {
881				/* Now the two UUID sets are equal, update what we
882				 * know of the peer. */
883				int i;
884				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
885					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
886			}
887		}
888	}
889
890	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
891out_unlock:
892	spin_unlock_irq(&mdev->req_lock);
893	put_ldev(mdev);
894out:
895	mdev->rs_total  = 0;
896	mdev->rs_failed = 0;
897	mdev->rs_paused = 0;
898	if (verify_done)
899		mdev->ov_start_sector = 0;
900
901	drbd_md_sync(mdev);
902
903	if (khelper_cmd)
904		drbd_khelper(mdev, khelper_cmd);
905
906	return 1;
907}
908
909/* helper */
910static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
911{
912	if (drbd_ee_has_active_page(e)) {
913		/* This might happen if sendpage() has not finished */
914		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
915		atomic_add(i, &mdev->pp_in_use_by_net);
916		atomic_sub(i, &mdev->pp_in_use);
917		spin_lock_irq(&mdev->req_lock);
918		list_add_tail(&e->w.list, &mdev->net_ee);
919		spin_unlock_irq(&mdev->req_lock);
920		wake_up(&drbd_pp_wait);
921	} else
922		drbd_free_ee(mdev, e);
923}
924
925/**
926 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
927 * @mdev:	DRBD device.
928 * @w:		work object.
929 * @cancel:	The connection will be closed anyways
930 */
931int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
932{
933	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
934	int ok;
935
936	if (unlikely(cancel)) {
937		drbd_free_ee(mdev, e);
938		dec_unacked(mdev);
939		return 1;
940	}
941
942	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
943		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
944	} else {
945		if (__ratelimit(&drbd_ratelimit_state))
946			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
947			    (unsigned long long)e->sector);
948
949		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
950	}
951
952	dec_unacked(mdev);
953
954	move_to_net_ee_or_free(mdev, e);
955
956	if (unlikely(!ok))
957		dev_err(DEV, "drbd_send_block() failed\n");
958	return ok;
959}
960
961/**
962 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
963 * @mdev:	DRBD device.
964 * @w:		work object.
965 * @cancel:	The connection will be closed anyways
966 */
967int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
968{
969	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
970	int ok;
971
972	if (unlikely(cancel)) {
973		drbd_free_ee(mdev, e);
974		dec_unacked(mdev);
975		return 1;
976	}
977
978	if (get_ldev_if_state(mdev, D_FAILED)) {
979		drbd_rs_complete_io(mdev, e->sector);
980		put_ldev(mdev);
981	}
982
983	if (mdev->state.conn == C_AHEAD) {
984		ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
985	} else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
986		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
987			inc_rs_pending(mdev);
988			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
989		} else {
990			if (__ratelimit(&drbd_ratelimit_state))
991				dev_err(DEV, "Not sending RSDataReply, "
992				    "partner DISKLESS!\n");
993			ok = 1;
994		}
995	} else {
996		if (__ratelimit(&drbd_ratelimit_state))
997			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
998			    (unsigned long long)e->sector);
999
1000		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1001
1002		/* update resync data with failure */
1003		drbd_rs_failed_io(mdev, e->sector, e->size);
1004	}
1005
1006	dec_unacked(mdev);
1007
1008	move_to_net_ee_or_free(mdev, e);
1009
1010	if (unlikely(!ok))
1011		dev_err(DEV, "drbd_send_block() failed\n");
1012	return ok;
1013}
1014
1015int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1016{
1017	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1018	struct digest_info *di;
1019	int digest_size;
1020	void *digest = NULL;
1021	int ok, eq = 0;
1022
1023	if (unlikely(cancel)) {
1024		drbd_free_ee(mdev, e);
1025		dec_unacked(mdev);
1026		return 1;
1027	}
1028
1029	if (get_ldev(mdev)) {
1030		drbd_rs_complete_io(mdev, e->sector);
1031		put_ldev(mdev);
1032	}
1033
1034	di = e->digest;
1035
1036	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1037		/* quick hack to try to avoid a race against reconfiguration.
1038		 * a real fix would be much more involved,
1039		 * introducing more locking mechanisms */
1040		if (mdev->csums_tfm) {
1041			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1042			D_ASSERT(digest_size == di->digest_size);
1043			digest = kmalloc(digest_size, GFP_NOIO);
1044		}
1045		if (digest) {
1046			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1047			eq = !memcmp(digest, di->digest, digest_size);
1048			kfree(digest);
1049		}
1050
1051		if (eq) {
1052			drbd_set_in_sync(mdev, e->sector, e->size);
1053			/* rs_same_csums unit is BM_BLOCK_SIZE */
1054			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1055			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1056		} else {
1057			inc_rs_pending(mdev);
1058			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1059			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1060			kfree(di);
1061			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1062		}
1063	} else {
1064		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1065		if (__ratelimit(&drbd_ratelimit_state))
1066			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1067	}
1068
1069	dec_unacked(mdev);
1070	move_to_net_ee_or_free(mdev, e);
1071
1072	if (unlikely(!ok))
1073		dev_err(DEV, "drbd_send_block/ack() failed\n");
1074	return ok;
1075}
1076
1077int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1078{
1079	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1080	int digest_size;
1081	void *digest;
1082	int ok = 1;
1083
1084	if (unlikely(cancel))
1085		goto out;
1086
1087	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1088		goto out;
1089
1090	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1091	/* FIXME if this allocation fails, online verify will not terminate! */
1092	digest = kmalloc(digest_size, GFP_NOIO);
1093	if (digest) {
1094		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1095		inc_rs_pending(mdev);
1096		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1097					     digest, digest_size, P_OV_REPLY);
1098		if (!ok)
1099			dec_rs_pending(mdev);
1100		kfree(digest);
1101	}
1102
1103out:
1104	drbd_free_ee(mdev, e);
1105
1106	dec_unacked(mdev);
1107
1108	return ok;
1109}
1110
1111void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1112{
1113	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1114		mdev->ov_last_oos_size += size>>9;
1115	} else {
1116		mdev->ov_last_oos_start = sector;
1117		mdev->ov_last_oos_size = size>>9;
1118	}
1119	drbd_set_out_of_sync(mdev, sector, size);
1120}
1121
1122int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1123{
1124	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1125	struct digest_info *di;
1126	int digest_size;
1127	void *digest;
1128	int ok, eq = 0;
1129
1130	if (unlikely(cancel)) {
1131		drbd_free_ee(mdev, e);
1132		dec_unacked(mdev);
1133		return 1;
1134	}
1135
1136	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1137	 * the resync lru has been cleaned up already */
1138	if (get_ldev(mdev)) {
1139		drbd_rs_complete_io(mdev, e->sector);
1140		put_ldev(mdev);
1141	}
1142
1143	di = e->digest;
1144
1145	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1146		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1147		digest = kmalloc(digest_size, GFP_NOIO);
1148		if (digest) {
1149			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1150
1151			D_ASSERT(digest_size == di->digest_size);
1152			eq = !memcmp(digest, di->digest, digest_size);
1153			kfree(digest);
1154		}
1155	} else {
1156		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1157		if (__ratelimit(&drbd_ratelimit_state))
1158			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1159	}
1160
1161	dec_unacked(mdev);
1162	if (!eq)
1163		drbd_ov_oos_found(mdev, e->sector, e->size);
1164	else
1165		ov_oos_print(mdev);
1166
1167	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1168			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1169
1170	drbd_free_ee(mdev, e);
1171
1172	--mdev->ov_left;
1173
1174	/* let's advance progress step marks only for every other megabyte */
1175	if ((mdev->ov_left & 0x200) == 0x200)
1176		drbd_advance_rs_marks(mdev, mdev->ov_left);
1177
1178	if (mdev->ov_left == 0) {
1179		ov_oos_print(mdev);
1180		drbd_resync_finished(mdev);
1181	}
1182
1183	return ok;
1184}
1185
1186int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1187{
1188	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1189	complete(&b->done);
1190	return 1;
1191}
1192
1193int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1194{
1195	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1196	struct p_barrier *p = &mdev->data.sbuf.barrier;
1197	int ok = 1;
1198
1199	/* really avoid racing with tl_clear.  w.cb may have been referenced
1200	 * just before it was reassigned and re-queued, so double check that.
1201	 * actually, this race was harmless, since we only try to send the
1202	 * barrier packet here, and otherwise do nothing with the object.
1203	 * but compare with the head of w_clear_epoch */
1204	spin_lock_irq(&mdev->req_lock);
1205	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1206		cancel = 1;
1207	spin_unlock_irq(&mdev->req_lock);
1208	if (cancel)
1209		return 1;
1210
1211	if (!drbd_get_data_sock(mdev))
1212		return 0;
1213	p->barrier = b->br_number;
1214	/* inc_ap_pending was done where this was queued.
1215	 * dec_ap_pending will be done in got_BarrierAck
1216	 * or (on connection loss) in w_clear_epoch.  */
1217	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1218				(struct p_header80 *)p, sizeof(*p), 0);
1219	drbd_put_data_sock(mdev);
1220
1221	return ok;
1222}
1223
1224int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1225{
1226	if (cancel)
1227		return 1;
1228	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1229}
1230
1231int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1232{
1233	struct drbd_request *req = container_of(w, struct drbd_request, w);
1234	int ok;
1235
1236	if (unlikely(cancel)) {
1237		req_mod(req, send_canceled);
1238		return 1;
1239	}
1240
1241	ok = drbd_send_oos(mdev, req);
1242	req_mod(req, oos_handed_to_network);
1243
1244	return ok;
1245}
1246
1247/**
1248 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1249 * @mdev:	DRBD device.
1250 * @w:		work object.
1251 * @cancel:	The connection will be closed anyways
1252 */
1253int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1254{
1255	struct drbd_request *req = container_of(w, struct drbd_request, w);
1256	int ok;
1257
1258	if (unlikely(cancel)) {
1259		req_mod(req, send_canceled);
1260		return 1;
1261	}
1262
1263	ok = drbd_send_dblock(mdev, req);
1264	req_mod(req, ok ? handed_over_to_network : send_failed);
1265
1266	return ok;
1267}
1268
1269/**
1270 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1271 * @mdev:	DRBD device.
1272 * @w:		work object.
1273 * @cancel:	The connection will be closed anyways
1274 */
1275int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1276{
1277	struct drbd_request *req = container_of(w, struct drbd_request, w);
1278	int ok;
1279
1280	if (unlikely(cancel)) {
1281		req_mod(req, send_canceled);
1282		return 1;
1283	}
1284
1285	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1286				(unsigned long)req);
1287
1288	if (!ok) {
1289		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1290		 * so this is probably redundant */
1291		if (mdev->state.conn >= C_CONNECTED)
1292			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1293	}
1294	req_mod(req, ok ? handed_over_to_network : send_failed);
1295
1296	return ok;
1297}
1298
1299int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1300{
1301	struct drbd_request *req = container_of(w, struct drbd_request, w);
1302
1303	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1304		drbd_al_begin_io(mdev, req->sector);
1305	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1306	   theoretically. Practically it can not deadlock, since this is
1307	   only used when unfreezing IOs. All the extents of the requests
1308	   that made it into the TL are already active */
1309
1310	drbd_req_make_private_bio(req, req->master_bio);
1311	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1312	generic_make_request(req->private_bio);
1313
1314	return 1;
1315}
1316
1317static int _drbd_may_sync_now(struct drbd_conf *mdev)
1318{
1319	struct drbd_conf *odev = mdev;
1320
1321	while (1) {
1322		if (odev->sync_conf.after == -1)
1323			return 1;
1324		odev = minor_to_mdev(odev->sync_conf.after);
1325		ERR_IF(!odev) return 1;
1326		if ((odev->state.conn >= C_SYNC_SOURCE &&
1327		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1328		    odev->state.aftr_isp || odev->state.peer_isp ||
1329		    odev->state.user_isp)
1330			return 0;
1331	}
1332}
1333
1334/**
1335 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1336 * @mdev:	DRBD device.
1337 *
1338 * Called from process context only (admin command and after_state_ch).
1339 */
1340static int _drbd_pause_after(struct drbd_conf *mdev)
1341{
1342	struct drbd_conf *odev;
1343	int i, rv = 0;
1344
1345	for (i = 0; i < minor_count; i++) {
1346		odev = minor_to_mdev(i);
1347		if (!odev)
1348			continue;
1349		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1350			continue;
1351		if (!_drbd_may_sync_now(odev))
1352			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1353			       != SS_NOTHING_TO_DO);
1354	}
1355
1356	return rv;
1357}
1358
1359/**
1360 * _drbd_resume_next() - Resume resync on all devices that may resync now
1361 * @mdev:	DRBD device.
1362 *
1363 * Called from process context only (admin command and worker).
1364 */
1365static int _drbd_resume_next(struct drbd_conf *mdev)
1366{
1367	struct drbd_conf *odev;
1368	int i, rv = 0;
1369
1370	for (i = 0; i < minor_count; i++) {
1371		odev = minor_to_mdev(i);
1372		if (!odev)
1373			continue;
1374		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1375			continue;
1376		if (odev->state.aftr_isp) {
1377			if (_drbd_may_sync_now(odev))
1378				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1379							CS_HARD, NULL)
1380				       != SS_NOTHING_TO_DO) ;
1381		}
1382	}
1383	return rv;
1384}
1385
1386void resume_next_sg(struct drbd_conf *mdev)
1387{
1388	write_lock_irq(&global_state_lock);
1389	_drbd_resume_next(mdev);
1390	write_unlock_irq(&global_state_lock);
1391}
1392
1393void suspend_other_sg(struct drbd_conf *mdev)
1394{
1395	write_lock_irq(&global_state_lock);
1396	_drbd_pause_after(mdev);
1397	write_unlock_irq(&global_state_lock);
1398}
1399
1400static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1401{
1402	struct drbd_conf *odev;
1403
1404	if (o_minor == -1)
1405		return NO_ERROR;
1406	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1407		return ERR_SYNC_AFTER;
1408
1409	/* check for loops */
1410	odev = minor_to_mdev(o_minor);
1411	while (1) {
1412		if (odev == mdev)
1413			return ERR_SYNC_AFTER_CYCLE;
1414
1415		/* dependency chain ends here, no cycles. */
1416		if (odev->sync_conf.after == -1)
1417			return NO_ERROR;
1418
1419		/* follow the dependency chain */
1420		odev = minor_to_mdev(odev->sync_conf.after);
1421	}
1422}
1423
1424int drbd_alter_sa(struct drbd_conf *mdev, int na)
1425{
1426	int changes;
1427	int retcode;
1428
1429	write_lock_irq(&global_state_lock);
1430	retcode = sync_after_error(mdev, na);
1431	if (retcode == NO_ERROR) {
1432		mdev->sync_conf.after = na;
1433		do {
1434			changes  = _drbd_pause_after(mdev);
1435			changes |= _drbd_resume_next(mdev);
1436		} while (changes);
1437	}
1438	write_unlock_irq(&global_state_lock);
1439	return retcode;
1440}
1441
1442void drbd_rs_controller_reset(struct drbd_conf *mdev)
1443{
1444	atomic_set(&mdev->rs_sect_in, 0);
1445	atomic_set(&mdev->rs_sect_ev, 0);
1446	mdev->rs_in_flight = 0;
1447	mdev->rs_planed = 0;
1448	spin_lock(&mdev->peer_seq_lock);
1449	fifo_set(&mdev->rs_plan_s, 0);
1450	spin_unlock(&mdev->peer_seq_lock);
1451}
1452
1453/**
1454 * drbd_start_resync() - Start the resync process
1455 * @mdev:	DRBD device.
1456 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1457 *
1458 * This function might bring you directly into one of the
1459 * C_PAUSED_SYNC_* states.
1460 */
1461void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1462{
1463	union drbd_state ns;
1464	int r;
1465
1466	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1467		dev_err(DEV, "Resync already running!\n");
1468		return;
1469	}
1470
1471	if (mdev->state.conn < C_AHEAD) {
1472		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1473		drbd_rs_cancel_all(mdev);
1474		/* This should be done when we abort the resync. We definitely do not
1475		   want to have this for connections going back and forth between
1476		   Ahead/Behind and SyncSource/SyncTarget */
1477	}
1478
1479	if (side == C_SYNC_TARGET) {
1480		/* Since application IO was locked out during C_WF_BITMAP_T and
1481		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1482		   we check that we might make the data inconsistent. */
1483		r = drbd_khelper(mdev, "before-resync-target");
1484		r = (r >> 8) & 0xff;
1485		if (r > 0) {
1486			dev_info(DEV, "before-resync-target handler returned %d, "
1487			     "dropping connection.\n", r);
1488			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1489			return;
1490		}
1491	} else /* C_SYNC_SOURCE */ {
1492		r = drbd_khelper(mdev, "before-resync-source");
1493		r = (r >> 8) & 0xff;
1494		if (r > 0) {
1495			if (r == 3) {
1496				dev_info(DEV, "before-resync-source handler returned %d, "
1497					 "ignoring. Old userland tools?", r);
1498			} else {
1499				dev_info(DEV, "before-resync-source handler returned %d, "
1500					 "dropping connection.\n", r);
1501				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1502				return;
1503			}
1504		}
1505	}
1506
1507	drbd_state_lock(mdev);
1508
1509	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1510		drbd_state_unlock(mdev);
1511		return;
1512	}
1513
1514	write_lock_irq(&global_state_lock);
1515	ns = mdev->state;
1516
1517	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1518
1519	ns.conn = side;
1520
1521	if (side == C_SYNC_TARGET)
1522		ns.disk = D_INCONSISTENT;
1523	else /* side == C_SYNC_SOURCE */
1524		ns.pdsk = D_INCONSISTENT;
1525
1526	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1527	ns = mdev->state;
1528
1529	if (ns.conn < C_CONNECTED)
1530		r = SS_UNKNOWN_ERROR;
1531
1532	if (r == SS_SUCCESS) {
1533		unsigned long tw = drbd_bm_total_weight(mdev);
1534		unsigned long now = jiffies;
1535		int i;
1536
1537		mdev->rs_failed    = 0;
1538		mdev->rs_paused    = 0;
1539		mdev->rs_same_csum = 0;
1540		mdev->rs_last_events = 0;
1541		mdev->rs_last_sect_ev = 0;
1542		mdev->rs_total     = tw;
1543		mdev->rs_start     = now;
1544		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1545			mdev->rs_mark_left[i] = tw;
1546			mdev->rs_mark_time[i] = now;
1547		}
1548		_drbd_pause_after(mdev);
1549	}
1550	write_unlock_irq(&global_state_lock);
1551
1552	if (r == SS_SUCCESS) {
1553		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1554		     drbd_conn_str(ns.conn),
1555		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1556		     (unsigned long) mdev->rs_total);
1557		if (side == C_SYNC_TARGET)
1558			mdev->bm_resync_fo = 0;
1559
1560		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1561		 * with w_send_oos, or the sync target will get confused as to
1562		 * how much bits to resync.  We cannot do that always, because for an
1563		 * empty resync and protocol < 95, we need to do it here, as we call
1564		 * drbd_resync_finished from here in that case.
1565		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1566		 * and from after_state_ch otherwise. */
1567		if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1568			drbd_gen_and_send_sync_uuid(mdev);
1569
1570		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1571			/* This still has a race (about when exactly the peers
1572			 * detect connection loss) that can lead to a full sync
1573			 * on next handshake. In 8.3.9 we fixed this with explicit
1574			 * resync-finished notifications, but the fix
1575			 * introduces a protocol change.  Sleeping for some
1576			 * time longer than the ping interval + timeout on the
1577			 * SyncSource, to give the SyncTarget the chance to
1578			 * detect connection loss, then waiting for a ping
1579			 * response (implicit in drbd_resync_finished) reduces
1580			 * the race considerably, but does not solve it. */
1581			if (side == C_SYNC_SOURCE)
1582				schedule_timeout_interruptible(
1583					mdev->net_conf->ping_int * HZ +
1584					mdev->net_conf->ping_timeo*HZ/9);
1585			drbd_resync_finished(mdev);
1586		}
1587
1588		drbd_rs_controller_reset(mdev);
1589		/* ns.conn may already be != mdev->state.conn,
1590		 * we may have been paused in between, or become paused until
1591		 * the timer triggers.
1592		 * No matter, that is handled in resync_timer_fn() */
1593		if (ns.conn == C_SYNC_TARGET)
1594			mod_timer(&mdev->resync_timer, jiffies);
1595
1596		drbd_md_sync(mdev);
1597	}
1598	put_ldev(mdev);
1599	drbd_state_unlock(mdev);
1600}
1601
1602int drbd_worker(struct drbd_thread *thi)
1603{
1604	struct drbd_conf *mdev = thi->mdev;
1605	struct drbd_work *w = NULL;
1606	LIST_HEAD(work_list);
1607	int intr = 0, i;
1608
1609	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1610
1611	while (get_t_state(thi) == Running) {
1612		drbd_thread_current_set_cpu(mdev);
1613
1614		if (down_trylock(&mdev->data.work.s)) {
1615			mutex_lock(&mdev->data.mutex);
1616			if (mdev->data.socket && !mdev->net_conf->no_cork)
1617				drbd_tcp_uncork(mdev->data.socket);
1618			mutex_unlock(&mdev->data.mutex);
1619
1620			intr = down_interruptible(&mdev->data.work.s);
1621
1622			mutex_lock(&mdev->data.mutex);
1623			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1624				drbd_tcp_cork(mdev->data.socket);
1625			mutex_unlock(&mdev->data.mutex);
1626		}
1627
1628		if (intr) {
1629			D_ASSERT(intr == -EINTR);
1630			flush_signals(current);
1631			ERR_IF (get_t_state(thi) == Running)
1632				continue;
1633			break;
1634		}
1635
1636		if (get_t_state(thi) != Running)
1637			break;
1638		/* With this break, we have done a down() but not consumed
1639		   the entry from the list. The cleanup code takes care of
1640		   this...   */
1641
1642		w = NULL;
1643		spin_lock_irq(&mdev->data.work.q_lock);
1644		ERR_IF(list_empty(&mdev->data.work.q)) {
1645			/* something terribly wrong in our logic.
1646			 * we were able to down() the semaphore,
1647			 * but the list is empty... doh.
1648			 *
1649			 * what is the best thing to do now?
1650			 * try again from scratch, restarting the receiver,
1651			 * asender, whatnot? could break even more ugly,
1652			 * e.g. when we are primary, but no good local data.
1653			 *
1654			 * I'll try to get away just starting over this loop.
1655			 */
1656			spin_unlock_irq(&mdev->data.work.q_lock);
1657			continue;
1658		}
1659		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1660		list_del_init(&w->list);
1661		spin_unlock_irq(&mdev->data.work.q_lock);
1662
1663		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1664			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1665			if (mdev->state.conn >= C_CONNECTED)
1666				drbd_force_state(mdev,
1667						NS(conn, C_NETWORK_FAILURE));
1668		}
1669	}
1670	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1671	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1672
1673	spin_lock_irq(&mdev->data.work.q_lock);
1674	i = 0;
1675	while (!list_empty(&mdev->data.work.q)) {
1676		list_splice_init(&mdev->data.work.q, &work_list);
1677		spin_unlock_irq(&mdev->data.work.q_lock);
1678
1679		while (!list_empty(&work_list)) {
1680			w = list_entry(work_list.next, struct drbd_work, list);
1681			list_del_init(&w->list);
1682			w->cb(mdev, w, 1);
1683			i++; /* dead debugging code */
1684		}
1685
1686		spin_lock_irq(&mdev->data.work.q_lock);
1687	}
1688	sema_init(&mdev->data.work.s, 0);
1689	/* DANGEROUS race: if someone did queue his work within the spinlock,
1690	 * but up() ed outside the spinlock, we could get an up() on the
1691	 * semaphore without corresponding list entry.
1692	 * So don't do that.
1693	 */
1694	spin_unlock_irq(&mdev->data.work.q_lock);
1695
1696	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1697	/* _drbd_set_state only uses stop_nowait.
1698	 * wait here for the Exiting receiver. */
1699	drbd_thread_stop(&mdev->receiver);
1700	drbd_mdev_cleanup(mdev);
1701
1702	dev_info(DEV, "worker terminated\n");
1703
1704	clear_bit(DEVICE_DYING, &mdev->flags);
1705	clear_bit(CONFIG_PENDING, &mdev->flags);
1706	wake_up(&mdev->state_wait);
1707
1708	return 0;
1709}
1710