drbd_worker.c revision 5a75cc7cfbb98e896232902214432dae30653dfe
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/smp_lock.h>
30#include <linux/wait.h>
31#include <linux/mm.h>
32#include <linux/memcontrol.h>
33#include <linux/mm_inline.h>
34#include <linux/slab.h>
35#include <linux/random.h>
36#include <linux/string.h>
37#include <linux/scatterlist.h>
38
39#include "drbd_int.h"
40#include "drbd_req.h"
41
42static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46/* defined here:
47   drbd_md_io_complete
48   drbd_endio_sec
49   drbd_endio_pri
50
51 * more endio handlers:
52   atodb_endio in drbd_actlog.c
53   drbd_bm_async_io_complete in drbd_bitmap.c
54
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
59 *
60 */
61
62
63/* About the global_state_lock
64   Each state transition on an device holds a read lock. In case we have
65   to evaluate the sync after dependencies, we grab a write lock, because
66   we need stable states on all devices for that.  */
67rwlock_t global_state_lock;
68
69/* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
71 */
72void drbd_md_io_complete(struct bio *bio, int error)
73{
74	struct drbd_md_io *md_io;
75
76	md_io = (struct drbd_md_io *)bio->bi_private;
77	md_io->error = error;
78
79	complete(&md_io->event);
80}
81
82/* reads on behalf of the partner,
83 * "submitted" by the receiver
84 */
85void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86{
87	unsigned long flags = 0;
88	struct drbd_conf *mdev = e->mdev;
89
90	D_ASSERT(e->block_id != ID_VACANT);
91
92	spin_lock_irqsave(&mdev->req_lock, flags);
93	mdev->read_cnt += e->size >> 9;
94	list_del(&e->w.list);
95	if (list_empty(&mdev->read_ee))
96		wake_up(&mdev->ee_wait);
97	if (test_bit(__EE_WAS_ERROR, &e->flags))
98		__drbd_chk_io_error(mdev, FALSE);
99	spin_unlock_irqrestore(&mdev->req_lock, flags);
100
101	drbd_queue_work(&mdev->data.work, &e->w);
102	put_ldev(mdev);
103}
104
105static int is_failed_barrier(int ee_flags)
106{
107	return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108			== (EE_IS_BARRIER|EE_WAS_ERROR);
109}
110
111/* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage.  */
113static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
114{
115	unsigned long flags = 0;
116	struct drbd_conf *mdev = e->mdev;
117	sector_t e_sector;
118	int do_wake;
119	int is_syncer_req;
120	int do_al_complete_io;
121
122	/* if this is a failed barrier request, disable use of barriers,
123	 * and schedule for resubmission */
124	if (is_failed_barrier(e->flags)) {
125		drbd_bump_write_ordering(mdev, WO_bdev_flush);
126		spin_lock_irqsave(&mdev->req_lock, flags);
127		list_del(&e->w.list);
128		e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129		e->w.cb = w_e_reissue;
130		/* put_ldev actually happens below, once we come here again. */
131		__release(local);
132		spin_unlock_irqrestore(&mdev->req_lock, flags);
133		drbd_queue_work(&mdev->data.work, &e->w);
134		return;
135	}
136
137	D_ASSERT(e->block_id != ID_VACANT);
138
139	/* after we moved e to done_ee,
140	 * we may no longer access it,
141	 * it may be freed/reused already!
142	 * (as soon as we release the req_lock) */
143	e_sector = e->sector;
144	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145	is_syncer_req = is_syncer_block_id(e->block_id);
146
147	spin_lock_irqsave(&mdev->req_lock, flags);
148	mdev->writ_cnt += e->size >> 9;
149	list_del(&e->w.list); /* has been on active_ee or sync_ee */
150	list_add_tail(&e->w.list, &mdev->done_ee);
151
152	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153	 * neither did we wake possibly waiting conflicting requests.
154	 * done from "drbd_process_done_ee" within the appropriate w.cb
155	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157	do_wake = is_syncer_req
158		? list_empty(&mdev->sync_ee)
159		: list_empty(&mdev->active_ee);
160
161	if (test_bit(__EE_WAS_ERROR, &e->flags))
162		__drbd_chk_io_error(mdev, FALSE);
163	spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165	if (is_syncer_req)
166		drbd_rs_complete_io(mdev, e_sector);
167
168	if (do_wake)
169		wake_up(&mdev->ee_wait);
170
171	if (do_al_complete_io)
172		drbd_al_complete_io(mdev, e_sector);
173
174	wake_asender(mdev);
175	put_ldev(mdev);
176}
177
178/* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
180 */
181void drbd_endio_sec(struct bio *bio, int error)
182{
183	struct drbd_epoch_entry *e = bio->bi_private;
184	struct drbd_conf *mdev = e->mdev;
185	int uptodate = bio_flagged(bio, BIO_UPTODATE);
186	int is_write = bio_data_dir(bio) == WRITE;
187
188	if (error)
189		dev_warn(DEV, "%s: error=%d s=%llus\n",
190				is_write ? "write" : "read", error,
191				(unsigned long long)e->sector);
192	if (!error && !uptodate) {
193		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194				is_write ? "write" : "read",
195				(unsigned long long)e->sector);
196		/* strange behavior of some lower level drivers...
197		 * fail the request by clearing the uptodate flag,
198		 * but do not return any error?! */
199		error = -EIO;
200	}
201
202	if (error)
203		set_bit(__EE_WAS_ERROR, &e->flags);
204
205	bio_put(bio); /* no need for the bio anymore */
206	if (atomic_dec_and_test(&e->pending_bios)) {
207		if (is_write)
208			drbd_endio_write_sec_final(e);
209		else
210			drbd_endio_read_sec_final(e);
211	}
212}
213
214/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215 */
216void drbd_endio_pri(struct bio *bio, int error)
217{
218	struct drbd_request *req = bio->bi_private;
219	struct drbd_conf *mdev = req->mdev;
220	enum drbd_req_event what;
221	int uptodate = bio_flagged(bio, BIO_UPTODATE);
222
223	if (!error && !uptodate) {
224		dev_warn(DEV, "p %s: setting error to -EIO\n",
225			 bio_data_dir(bio) == WRITE ? "write" : "read");
226		/* strange behavior of some lower level drivers...
227		 * fail the request by clearing the uptodate flag,
228		 * but do not return any error?! */
229		error = -EIO;
230	}
231
232	/* to avoid recursion in __req_mod */
233	if (unlikely(error)) {
234		what = (bio_data_dir(bio) == WRITE)
235			? write_completed_with_error
236			: (bio_rw(bio) == READ)
237			  ? read_completed_with_error
238			  : read_ahead_completed_with_error;
239	} else
240		what = completed_ok;
241
242	bio_put(req->private_bio);
243	req->private_bio = ERR_PTR(error);
244
245	req_mod(req, what);
246}
247
248int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249{
250	struct drbd_request *req = container_of(w, struct drbd_request, w);
251
252	/* We should not detach for read io-error,
253	 * but try to WRITE the P_DATA_REPLY to the failed location,
254	 * to give the disk the chance to relocate that block */
255
256	spin_lock_irq(&mdev->req_lock);
257	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258		_req_mod(req, read_retry_remote_canceled);
259		spin_unlock_irq(&mdev->req_lock);
260		return 1;
261	}
262	spin_unlock_irq(&mdev->req_lock);
263
264	return w_send_read_req(mdev, w, 0);
265}
266
267int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268{
269	ERR_IF(cancel) return 1;
270	dev_err(DEV, "resync inactive, but callback triggered??\n");
271	return 1; /* Simply ignore this! */
272}
273
274void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275{
276	struct hash_desc desc;
277	struct scatterlist sg;
278	struct page *page = e->pages;
279	struct page *tmp;
280	unsigned len;
281
282	desc.tfm = tfm;
283	desc.flags = 0;
284
285	sg_init_table(&sg, 1);
286	crypto_hash_init(&desc);
287
288	while ((tmp = page_chain_next(page))) {
289		/* all but the last page will be fully used */
290		sg_set_page(&sg, page, PAGE_SIZE, 0);
291		crypto_hash_update(&desc, &sg, sg.length);
292		page = tmp;
293	}
294	/* and now the last, possibly only partially used page */
295	len = e->size & (PAGE_SIZE - 1);
296	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297	crypto_hash_update(&desc, &sg, sg.length);
298	crypto_hash_final(&desc, digest);
299}
300
301void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
302{
303	struct hash_desc desc;
304	struct scatterlist sg;
305	struct bio_vec *bvec;
306	int i;
307
308	desc.tfm = tfm;
309	desc.flags = 0;
310
311	sg_init_table(&sg, 1);
312	crypto_hash_init(&desc);
313
314	__bio_for_each_segment(bvec, bio, i, 0) {
315		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316		crypto_hash_update(&desc, &sg, sg.length);
317	}
318	crypto_hash_final(&desc, digest);
319}
320
321static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322{
323	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324	int digest_size;
325	void *digest;
326	int ok;
327
328	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329
330	if (unlikely(cancel)) {
331		drbd_free_ee(mdev, e);
332		return 1;
333	}
334
335	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337		digest = kmalloc(digest_size, GFP_NOIO);
338		if (digest) {
339			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
340
341			inc_rs_pending(mdev);
342			ok = drbd_send_drequest_csum(mdev,
343						     e->sector,
344						     e->size,
345						     digest,
346						     digest_size,
347						     P_CSUM_RS_REQUEST);
348			kfree(digest);
349		} else {
350			dev_err(DEV, "kmalloc() of digest failed.\n");
351			ok = 0;
352		}
353	} else
354		ok = 1;
355
356	drbd_free_ee(mdev, e);
357
358	if (unlikely(!ok))
359		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360	return ok;
361}
362
363#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
364
365static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366{
367	struct drbd_epoch_entry *e;
368
369	if (!get_ldev(mdev))
370		return -EIO;
371
372	if (drbd_rs_should_slow_down(mdev))
373		goto defer;
374
375	/* GFP_TRY, because if there is no memory available right now, this may
376	 * be rescheduled for later. It is "only" background resync, after all. */
377	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
378	if (!e)
379		goto defer;
380
381	e->w.cb = w_e_send_csum;
382	spin_lock_irq(&mdev->req_lock);
383	list_add(&e->w.list, &mdev->read_ee);
384	spin_unlock_irq(&mdev->req_lock);
385
386	atomic_add(size >> 9, &mdev->rs_sect_ev);
387	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
388		return 0;
389
390	drbd_free_ee(mdev, e);
391defer:
392	put_ldev(mdev);
393	return -EAGAIN;
394}
395
396void resync_timer_fn(unsigned long data)
397{
398	struct drbd_conf *mdev = (struct drbd_conf *) data;
399	int queue;
400
401	queue = 1;
402	switch (mdev->state.conn) {
403	case C_VERIFY_S:
404		mdev->resync_work.cb = w_make_ov_request;
405		break;
406	case C_SYNC_TARGET:
407		mdev->resync_work.cb = w_make_resync_request;
408		break;
409	default:
410		queue = 0;
411		mdev->resync_work.cb = w_resync_inactive;
412	}
413
414	/* harmless race: list_empty outside data.work.q_lock */
415	if (list_empty(&mdev->resync_work.list) && queue)
416		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
417}
418
419static void fifo_set(struct fifo_buffer *fb, int value)
420{
421	int i;
422
423	for (i = 0; i < fb->size; i++)
424		fb->values[i] += value;
425}
426
427static int fifo_push(struct fifo_buffer *fb, int value)
428{
429	int ov;
430
431	ov = fb->values[fb->head_index];
432	fb->values[fb->head_index++] = value;
433
434	if (fb->head_index >= fb->size)
435		fb->head_index = 0;
436
437	return ov;
438}
439
440static void fifo_add_val(struct fifo_buffer *fb, int value)
441{
442	int i;
443
444	for (i = 0; i < fb->size; i++)
445		fb->values[i] += value;
446}
447
448int drbd_rs_controller(struct drbd_conf *mdev)
449{
450	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
451	unsigned int want;     /* The number of sectors we want in the proxy */
452	int req_sect; /* Number of sectors to request in this turn */
453	int correction; /* Number of sectors more we need in the proxy*/
454	int cps; /* correction per invocation of drbd_rs_controller() */
455	int steps; /* Number of time steps to plan ahead */
456	int curr_corr;
457	int max_sect;
458
459	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
460	mdev->rs_in_flight -= sect_in;
461
462	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
463
464	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
465
466	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
467		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
468	} else { /* normal path */
469		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
470			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
471	}
472
473	correction = want - mdev->rs_in_flight - mdev->rs_planed;
474
475	/* Plan ahead */
476	cps = correction / steps;
477	fifo_add_val(&mdev->rs_plan_s, cps);
478	mdev->rs_planed += cps * steps;
479
480	/* What we do in this step */
481	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
482	spin_unlock(&mdev->peer_seq_lock);
483	mdev->rs_planed -= curr_corr;
484
485	req_sect = sect_in + curr_corr;
486	if (req_sect < 0)
487		req_sect = 0;
488
489	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
490	if (req_sect > max_sect)
491		req_sect = max_sect;
492
493	/*
494	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
495		 sect_in, mdev->rs_in_flight, want, correction,
496		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
497	*/
498
499	return req_sect;
500}
501
502int w_make_resync_request(struct drbd_conf *mdev,
503		struct drbd_work *w, int cancel)
504{
505	unsigned long bit;
506	sector_t sector;
507	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
508	int max_segment_size;
509	int number, rollback_i, size, pe, mx;
510	int align, queued, sndbuf;
511	int i = 0;
512
513	if (unlikely(cancel))
514		return 1;
515
516	if (unlikely(mdev->state.conn < C_CONNECTED)) {
517		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
518		return 0;
519	}
520
521	if (mdev->state.conn != C_SYNC_TARGET)
522		dev_err(DEV, "%s in w_make_resync_request\n",
523			drbd_conn_str(mdev->state.conn));
524
525	if (!get_ldev(mdev)) {
526		/* Since we only need to access mdev->rsync a
527		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
528		   to continue resync with a broken disk makes no sense at
529		   all */
530		dev_err(DEV, "Disk broke down during resync!\n");
531		mdev->resync_work.cb = w_resync_inactive;
532		return 1;
533	}
534
535	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
536	 * if it should be necessary */
537	max_segment_size =
538		mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
539		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
540
541	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
542		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
543		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
544	} else {
545		mdev->c_sync_rate = mdev->sync_conf.rate;
546		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
547	}
548
549	/* Throttle resync on lower level disk activity, which may also be
550	 * caused by application IO on Primary/SyncTarget.
551	 * Keep this after the call to drbd_rs_controller, as that assumes
552	 * to be called as precisely as possible every SLEEP_TIME,
553	 * and would be confused otherwise. */
554	if (drbd_rs_should_slow_down(mdev))
555		goto requeue;
556
557	mutex_lock(&mdev->data.mutex);
558	if (mdev->data.socket)
559		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
560	else
561		mx = 1;
562	mutex_unlock(&mdev->data.mutex);
563
564	/* For resync rates >160MB/sec, allow more pending RS requests */
565	if (number > mx)
566		mx = number;
567
568	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
569	pe = atomic_read(&mdev->rs_pending_cnt);
570	if ((pe + number) > mx) {
571		number = mx - pe;
572	}
573
574	for (i = 0; i < number; i++) {
575		/* Stop generating RS requests, when half of the send buffer is filled */
576		mutex_lock(&mdev->data.mutex);
577		if (mdev->data.socket) {
578			queued = mdev->data.socket->sk->sk_wmem_queued;
579			sndbuf = mdev->data.socket->sk->sk_sndbuf;
580		} else {
581			queued = 1;
582			sndbuf = 0;
583		}
584		mutex_unlock(&mdev->data.mutex);
585		if (queued > sndbuf / 2)
586			goto requeue;
587
588next_sector:
589		size = BM_BLOCK_SIZE;
590		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
591
592		if (bit == -1UL) {
593			mdev->bm_resync_fo = drbd_bm_bits(mdev);
594			mdev->resync_work.cb = w_resync_inactive;
595			put_ldev(mdev);
596			return 1;
597		}
598
599		sector = BM_BIT_TO_SECT(bit);
600
601		if (drbd_try_rs_begin_io(mdev, sector)) {
602			mdev->bm_resync_fo = bit;
603			goto requeue;
604		}
605		mdev->bm_resync_fo = bit + 1;
606
607		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
608			drbd_rs_complete_io(mdev, sector);
609			goto next_sector;
610		}
611
612#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
613		/* try to find some adjacent bits.
614		 * we stop if we have already the maximum req size.
615		 *
616		 * Additionally always align bigger requests, in order to
617		 * be prepared for all stripe sizes of software RAIDs.
618		 */
619		align = 1;
620		rollback_i = i;
621		for (;;) {
622			if (size + BM_BLOCK_SIZE > max_segment_size)
623				break;
624
625			/* Be always aligned */
626			if (sector & ((1<<(align+3))-1))
627				break;
628
629			/* do not cross extent boundaries */
630			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
631				break;
632			/* now, is it actually dirty, after all?
633			 * caution, drbd_bm_test_bit is tri-state for some
634			 * obscure reason; ( b == 0 ) would get the out-of-band
635			 * only accidentally right because of the "oddly sized"
636			 * adjustment below */
637			if (drbd_bm_test_bit(mdev, bit+1) != 1)
638				break;
639			bit++;
640			size += BM_BLOCK_SIZE;
641			if ((BM_BLOCK_SIZE << align) <= size)
642				align++;
643			i++;
644		}
645		/* if we merged some,
646		 * reset the offset to start the next drbd_bm_find_next from */
647		if (size > BM_BLOCK_SIZE)
648			mdev->bm_resync_fo = bit + 1;
649#endif
650
651		/* adjust very last sectors, in case we are oddly sized */
652		if (sector + (size>>9) > capacity)
653			size = (capacity-sector)<<9;
654		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
655			switch (read_for_csum(mdev, sector, size)) {
656			case -EIO: /* Disk failure */
657				put_ldev(mdev);
658				return 0;
659			case -EAGAIN: /* allocation failed, or ldev busy */
660				drbd_rs_complete_io(mdev, sector);
661				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
662				i = rollback_i;
663				goto requeue;
664			case 0:
665				/* everything ok */
666				break;
667			default:
668				BUG();
669			}
670		} else {
671			inc_rs_pending(mdev);
672			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
673					       sector, size, ID_SYNCER)) {
674				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
675				dec_rs_pending(mdev);
676				put_ldev(mdev);
677				return 0;
678			}
679		}
680	}
681
682	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
683		/* last syncer _request_ was sent,
684		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
685		 * next sync group will resume), as soon as we receive the last
686		 * resync data block, and the last bit is cleared.
687		 * until then resync "work" is "inactive" ...
688		 */
689		mdev->resync_work.cb = w_resync_inactive;
690		put_ldev(mdev);
691		return 1;
692	}
693
694 requeue:
695	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
696	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
697	put_ldev(mdev);
698	return 1;
699}
700
701static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
702{
703	int number, i, size;
704	sector_t sector;
705	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
706
707	if (unlikely(cancel))
708		return 1;
709
710	if (unlikely(mdev->state.conn < C_CONNECTED)) {
711		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
712		return 0;
713	}
714
715	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
716	if (atomic_read(&mdev->rs_pending_cnt) > number)
717		goto requeue;
718
719	number -= atomic_read(&mdev->rs_pending_cnt);
720
721	sector = mdev->ov_position;
722	for (i = 0; i < number; i++) {
723		if (sector >= capacity) {
724			mdev->resync_work.cb = w_resync_inactive;
725			return 1;
726		}
727
728		size = BM_BLOCK_SIZE;
729
730		if (drbd_try_rs_begin_io(mdev, sector)) {
731			mdev->ov_position = sector;
732			goto requeue;
733		}
734
735		if (sector + (size>>9) > capacity)
736			size = (capacity-sector)<<9;
737
738		inc_rs_pending(mdev);
739		if (!drbd_send_ov_request(mdev, sector, size)) {
740			dec_rs_pending(mdev);
741			return 0;
742		}
743		sector += BM_SECT_PER_BIT;
744	}
745	mdev->ov_position = sector;
746
747 requeue:
748	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
749	return 1;
750}
751
752
753int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
754{
755	kfree(w);
756	ov_oos_print(mdev);
757	drbd_resync_finished(mdev);
758
759	return 1;
760}
761
762static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
763{
764	kfree(w);
765
766	drbd_resync_finished(mdev);
767
768	return 1;
769}
770
771int drbd_resync_finished(struct drbd_conf *mdev)
772{
773	unsigned long db, dt, dbdt;
774	unsigned long n_oos;
775	union drbd_state os, ns;
776	struct drbd_work *w;
777	char *khelper_cmd = NULL;
778
779	/* Remove all elements from the resync LRU. Since future actions
780	 * might set bits in the (main) bitmap, then the entries in the
781	 * resync LRU would be wrong. */
782	if (drbd_rs_del_all(mdev)) {
783		/* In case this is not possible now, most probably because
784		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
785		 * queue (or even the read operations for those packets
786		 * is not finished by now).   Retry in 100ms. */
787
788		drbd_kick_lo(mdev);
789		__set_current_state(TASK_INTERRUPTIBLE);
790		schedule_timeout(HZ / 10);
791		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
792		if (w) {
793			w->cb = w_resync_finished;
794			drbd_queue_work(&mdev->data.work, w);
795			return 1;
796		}
797		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
798	}
799
800	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
801	if (dt <= 0)
802		dt = 1;
803	db = mdev->rs_total;
804	dbdt = Bit2KB(db/dt);
805	mdev->rs_paused /= HZ;
806
807	if (!get_ldev(mdev))
808		goto out;
809
810	spin_lock_irq(&mdev->req_lock);
811	os = mdev->state;
812
813	/* This protects us against multiple calls (that can happen in the presence
814	   of application IO), and against connectivity loss just before we arrive here. */
815	if (os.conn <= C_CONNECTED)
816		goto out_unlock;
817
818	ns = os;
819	ns.conn = C_CONNECTED;
820
821	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
822	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
823	     "Online verify " : "Resync",
824	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
825
826	n_oos = drbd_bm_total_weight(mdev);
827
828	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
829		if (n_oos) {
830			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
831			      n_oos, Bit2KB(1));
832			khelper_cmd = "out-of-sync";
833		}
834	} else {
835		D_ASSERT((n_oos - mdev->rs_failed) == 0);
836
837		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
838			khelper_cmd = "after-resync-target";
839
840		if (mdev->csums_tfm && mdev->rs_total) {
841			const unsigned long s = mdev->rs_same_csum;
842			const unsigned long t = mdev->rs_total;
843			const int ratio =
844				(t == 0)     ? 0 :
845			(t < 100000) ? ((s*100)/t) : (s/(t/100));
846			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
847			     "transferred %luK total %luK\n",
848			     ratio,
849			     Bit2KB(mdev->rs_same_csum),
850			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
851			     Bit2KB(mdev->rs_total));
852		}
853	}
854
855	if (mdev->rs_failed) {
856		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
857
858		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
859			ns.disk = D_INCONSISTENT;
860			ns.pdsk = D_UP_TO_DATE;
861		} else {
862			ns.disk = D_UP_TO_DATE;
863			ns.pdsk = D_INCONSISTENT;
864		}
865	} else {
866		ns.disk = D_UP_TO_DATE;
867		ns.pdsk = D_UP_TO_DATE;
868
869		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
870			if (mdev->p_uuid) {
871				int i;
872				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
873					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
874				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
875				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
876			} else {
877				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
878			}
879		}
880
881		drbd_uuid_set_bm(mdev, 0UL);
882
883		if (mdev->p_uuid) {
884			/* Now the two UUID sets are equal, update what we
885			 * know of the peer. */
886			int i;
887			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
888				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
889		}
890	}
891
892	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
893out_unlock:
894	spin_unlock_irq(&mdev->req_lock);
895	put_ldev(mdev);
896out:
897	mdev->rs_total  = 0;
898	mdev->rs_failed = 0;
899	mdev->rs_paused = 0;
900	mdev->ov_start_sector = 0;
901
902	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
903		dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
904		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
905	}
906
907	if (khelper_cmd)
908		drbd_khelper(mdev, khelper_cmd);
909
910	return 1;
911}
912
913/* helper */
914static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
915{
916	if (drbd_ee_has_active_page(e)) {
917		/* This might happen if sendpage() has not finished */
918		int i = DIV_ROUND_UP(e->size, PAGE_SIZE);
919		atomic_add(i, &mdev->pp_in_use_by_net);
920		atomic_sub(i, &mdev->pp_in_use);
921		spin_lock_irq(&mdev->req_lock);
922		list_add_tail(&e->w.list, &mdev->net_ee);
923		spin_unlock_irq(&mdev->req_lock);
924		wake_up(&drbd_pp_wait);
925	} else
926		drbd_free_ee(mdev, e);
927}
928
929/**
930 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
931 * @mdev:	DRBD device.
932 * @w:		work object.
933 * @cancel:	The connection will be closed anyways
934 */
935int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
936{
937	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
938	int ok;
939
940	if (unlikely(cancel)) {
941		drbd_free_ee(mdev, e);
942		dec_unacked(mdev);
943		return 1;
944	}
945
946	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
947		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
948	} else {
949		if (__ratelimit(&drbd_ratelimit_state))
950			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
951			    (unsigned long long)e->sector);
952
953		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
954	}
955
956	dec_unacked(mdev);
957
958	move_to_net_ee_or_free(mdev, e);
959
960	if (unlikely(!ok))
961		dev_err(DEV, "drbd_send_block() failed\n");
962	return ok;
963}
964
965/**
966 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
967 * @mdev:	DRBD device.
968 * @w:		work object.
969 * @cancel:	The connection will be closed anyways
970 */
971int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
972{
973	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
974	int ok;
975
976	if (unlikely(cancel)) {
977		drbd_free_ee(mdev, e);
978		dec_unacked(mdev);
979		return 1;
980	}
981
982	if (get_ldev_if_state(mdev, D_FAILED)) {
983		drbd_rs_complete_io(mdev, e->sector);
984		put_ldev(mdev);
985	}
986
987	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
988		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
989			inc_rs_pending(mdev);
990			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
991		} else {
992			if (__ratelimit(&drbd_ratelimit_state))
993				dev_err(DEV, "Not sending RSDataReply, "
994				    "partner DISKLESS!\n");
995			ok = 1;
996		}
997	} else {
998		if (__ratelimit(&drbd_ratelimit_state))
999			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1000			    (unsigned long long)e->sector);
1001
1002		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1003
1004		/* update resync data with failure */
1005		drbd_rs_failed_io(mdev, e->sector, e->size);
1006	}
1007
1008	dec_unacked(mdev);
1009
1010	move_to_net_ee_or_free(mdev, e);
1011
1012	if (unlikely(!ok))
1013		dev_err(DEV, "drbd_send_block() failed\n");
1014	return ok;
1015}
1016
1017int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1018{
1019	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1020	struct digest_info *di;
1021	int digest_size;
1022	void *digest = NULL;
1023	int ok, eq = 0;
1024
1025	if (unlikely(cancel)) {
1026		drbd_free_ee(mdev, e);
1027		dec_unacked(mdev);
1028		return 1;
1029	}
1030
1031	if (get_ldev(mdev)) {
1032		drbd_rs_complete_io(mdev, e->sector);
1033		put_ldev(mdev);
1034	}
1035
1036	di = e->digest;
1037
1038	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1039		/* quick hack to try to avoid a race against reconfiguration.
1040		 * a real fix would be much more involved,
1041		 * introducing more locking mechanisms */
1042		if (mdev->csums_tfm) {
1043			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1044			D_ASSERT(digest_size == di->digest_size);
1045			digest = kmalloc(digest_size, GFP_NOIO);
1046		}
1047		if (digest) {
1048			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1049			eq = !memcmp(digest, di->digest, digest_size);
1050			kfree(digest);
1051		}
1052
1053		if (eq) {
1054			drbd_set_in_sync(mdev, e->sector, e->size);
1055			/* rs_same_csums unit is BM_BLOCK_SIZE */
1056			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1057			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1058		} else {
1059			inc_rs_pending(mdev);
1060			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1061			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1062			kfree(di);
1063			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1064		}
1065	} else {
1066		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1067		if (__ratelimit(&drbd_ratelimit_state))
1068			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1069	}
1070
1071	dec_unacked(mdev);
1072	move_to_net_ee_or_free(mdev, e);
1073
1074	if (unlikely(!ok))
1075		dev_err(DEV, "drbd_send_block/ack() failed\n");
1076	return ok;
1077}
1078
1079int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1080{
1081	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1082	int digest_size;
1083	void *digest;
1084	int ok = 1;
1085
1086	if (unlikely(cancel))
1087		goto out;
1088
1089	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1090		goto out;
1091
1092	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1093	/* FIXME if this allocation fails, online verify will not terminate! */
1094	digest = kmalloc(digest_size, GFP_NOIO);
1095	if (digest) {
1096		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1097		inc_rs_pending(mdev);
1098		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1099					     digest, digest_size, P_OV_REPLY);
1100		if (!ok)
1101			dec_rs_pending(mdev);
1102		kfree(digest);
1103	}
1104
1105out:
1106	drbd_free_ee(mdev, e);
1107
1108	dec_unacked(mdev);
1109
1110	return ok;
1111}
1112
1113void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1114{
1115	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1116		mdev->ov_last_oos_size += size>>9;
1117	} else {
1118		mdev->ov_last_oos_start = sector;
1119		mdev->ov_last_oos_size = size>>9;
1120	}
1121	drbd_set_out_of_sync(mdev, sector, size);
1122	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1123}
1124
1125int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1126{
1127	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1128	struct digest_info *di;
1129	int digest_size;
1130	void *digest;
1131	int ok, eq = 0;
1132
1133	if (unlikely(cancel)) {
1134		drbd_free_ee(mdev, e);
1135		dec_unacked(mdev);
1136		return 1;
1137	}
1138
1139	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1140	 * the resync lru has been cleaned up already */
1141	if (get_ldev(mdev)) {
1142		drbd_rs_complete_io(mdev, e->sector);
1143		put_ldev(mdev);
1144	}
1145
1146	di = e->digest;
1147
1148	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1149		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1150		digest = kmalloc(digest_size, GFP_NOIO);
1151		if (digest) {
1152			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1153
1154			D_ASSERT(digest_size == di->digest_size);
1155			eq = !memcmp(digest, di->digest, digest_size);
1156			kfree(digest);
1157		}
1158	} else {
1159		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1160		if (__ratelimit(&drbd_ratelimit_state))
1161			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1162	}
1163
1164	dec_unacked(mdev);
1165	if (!eq)
1166		drbd_ov_oos_found(mdev, e->sector, e->size);
1167	else
1168		ov_oos_print(mdev);
1169
1170	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1171			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1172
1173	drbd_free_ee(mdev, e);
1174
1175	if (--mdev->ov_left == 0) {
1176		ov_oos_print(mdev);
1177		drbd_resync_finished(mdev);
1178	}
1179
1180	return ok;
1181}
1182
1183int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1184{
1185	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1186	complete(&b->done);
1187	return 1;
1188}
1189
1190int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1191{
1192	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1193	struct p_barrier *p = &mdev->data.sbuf.barrier;
1194	int ok = 1;
1195
1196	/* really avoid racing with tl_clear.  w.cb may have been referenced
1197	 * just before it was reassigned and re-queued, so double check that.
1198	 * actually, this race was harmless, since we only try to send the
1199	 * barrier packet here, and otherwise do nothing with the object.
1200	 * but compare with the head of w_clear_epoch */
1201	spin_lock_irq(&mdev->req_lock);
1202	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1203		cancel = 1;
1204	spin_unlock_irq(&mdev->req_lock);
1205	if (cancel)
1206		return 1;
1207
1208	if (!drbd_get_data_sock(mdev))
1209		return 0;
1210	p->barrier = b->br_number;
1211	/* inc_ap_pending was done where this was queued.
1212	 * dec_ap_pending will be done in got_BarrierAck
1213	 * or (on connection loss) in w_clear_epoch.  */
1214	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1215				(struct p_header80 *)p, sizeof(*p), 0);
1216	drbd_put_data_sock(mdev);
1217
1218	return ok;
1219}
1220
1221int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1222{
1223	if (cancel)
1224		return 1;
1225	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1226}
1227
1228/**
1229 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1230 * @mdev:	DRBD device.
1231 * @w:		work object.
1232 * @cancel:	The connection will be closed anyways
1233 */
1234int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1235{
1236	struct drbd_request *req = container_of(w, struct drbd_request, w);
1237	int ok;
1238
1239	if (unlikely(cancel)) {
1240		req_mod(req, send_canceled);
1241		return 1;
1242	}
1243
1244	ok = drbd_send_dblock(mdev, req);
1245	req_mod(req, ok ? handed_over_to_network : send_failed);
1246
1247	return ok;
1248}
1249
1250/**
1251 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1252 * @mdev:	DRBD device.
1253 * @w:		work object.
1254 * @cancel:	The connection will be closed anyways
1255 */
1256int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1257{
1258	struct drbd_request *req = container_of(w, struct drbd_request, w);
1259	int ok;
1260
1261	if (unlikely(cancel)) {
1262		req_mod(req, send_canceled);
1263		return 1;
1264	}
1265
1266	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1267				(unsigned long)req);
1268
1269	if (!ok) {
1270		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1271		 * so this is probably redundant */
1272		if (mdev->state.conn >= C_CONNECTED)
1273			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1274	}
1275	req_mod(req, ok ? handed_over_to_network : send_failed);
1276
1277	return ok;
1278}
1279
1280int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1281{
1282	struct drbd_request *req = container_of(w, struct drbd_request, w);
1283
1284	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1285		drbd_al_begin_io(mdev, req->sector);
1286	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1287	   theoretically. Practically it can not deadlock, since this is
1288	   only used when unfreezing IOs. All the extents of the requests
1289	   that made it into the TL are already active */
1290
1291	drbd_req_make_private_bio(req, req->master_bio);
1292	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1293	generic_make_request(req->private_bio);
1294
1295	return 1;
1296}
1297
1298static int _drbd_may_sync_now(struct drbd_conf *mdev)
1299{
1300	struct drbd_conf *odev = mdev;
1301
1302	while (1) {
1303		if (odev->sync_conf.after == -1)
1304			return 1;
1305		odev = minor_to_mdev(odev->sync_conf.after);
1306		ERR_IF(!odev) return 1;
1307		if ((odev->state.conn >= C_SYNC_SOURCE &&
1308		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1309		    odev->state.aftr_isp || odev->state.peer_isp ||
1310		    odev->state.user_isp)
1311			return 0;
1312	}
1313}
1314
1315/**
1316 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1317 * @mdev:	DRBD device.
1318 *
1319 * Called from process context only (admin command and after_state_ch).
1320 */
1321static int _drbd_pause_after(struct drbd_conf *mdev)
1322{
1323	struct drbd_conf *odev;
1324	int i, rv = 0;
1325
1326	for (i = 0; i < minor_count; i++) {
1327		odev = minor_to_mdev(i);
1328		if (!odev)
1329			continue;
1330		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1331			continue;
1332		if (!_drbd_may_sync_now(odev))
1333			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1334			       != SS_NOTHING_TO_DO);
1335	}
1336
1337	return rv;
1338}
1339
1340/**
1341 * _drbd_resume_next() - Resume resync on all devices that may resync now
1342 * @mdev:	DRBD device.
1343 *
1344 * Called from process context only (admin command and worker).
1345 */
1346static int _drbd_resume_next(struct drbd_conf *mdev)
1347{
1348	struct drbd_conf *odev;
1349	int i, rv = 0;
1350
1351	for (i = 0; i < minor_count; i++) {
1352		odev = minor_to_mdev(i);
1353		if (!odev)
1354			continue;
1355		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1356			continue;
1357		if (odev->state.aftr_isp) {
1358			if (_drbd_may_sync_now(odev))
1359				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1360							CS_HARD, NULL)
1361				       != SS_NOTHING_TO_DO) ;
1362		}
1363	}
1364	return rv;
1365}
1366
1367void resume_next_sg(struct drbd_conf *mdev)
1368{
1369	write_lock_irq(&global_state_lock);
1370	_drbd_resume_next(mdev);
1371	write_unlock_irq(&global_state_lock);
1372}
1373
1374void suspend_other_sg(struct drbd_conf *mdev)
1375{
1376	write_lock_irq(&global_state_lock);
1377	_drbd_pause_after(mdev);
1378	write_unlock_irq(&global_state_lock);
1379}
1380
1381static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1382{
1383	struct drbd_conf *odev;
1384
1385	if (o_minor == -1)
1386		return NO_ERROR;
1387	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1388		return ERR_SYNC_AFTER;
1389
1390	/* check for loops */
1391	odev = minor_to_mdev(o_minor);
1392	while (1) {
1393		if (odev == mdev)
1394			return ERR_SYNC_AFTER_CYCLE;
1395
1396		/* dependency chain ends here, no cycles. */
1397		if (odev->sync_conf.after == -1)
1398			return NO_ERROR;
1399
1400		/* follow the dependency chain */
1401		odev = minor_to_mdev(odev->sync_conf.after);
1402	}
1403}
1404
1405int drbd_alter_sa(struct drbd_conf *mdev, int na)
1406{
1407	int changes;
1408	int retcode;
1409
1410	write_lock_irq(&global_state_lock);
1411	retcode = sync_after_error(mdev, na);
1412	if (retcode == NO_ERROR) {
1413		mdev->sync_conf.after = na;
1414		do {
1415			changes  = _drbd_pause_after(mdev);
1416			changes |= _drbd_resume_next(mdev);
1417		} while (changes);
1418	}
1419	write_unlock_irq(&global_state_lock);
1420	return retcode;
1421}
1422
1423static void ping_peer(struct drbd_conf *mdev)
1424{
1425	clear_bit(GOT_PING_ACK, &mdev->flags);
1426	request_ping(mdev);
1427	wait_event(mdev->misc_wait,
1428		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1429}
1430
1431/**
1432 * drbd_start_resync() - Start the resync process
1433 * @mdev:	DRBD device.
1434 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1435 *
1436 * This function might bring you directly into one of the
1437 * C_PAUSED_SYNC_* states.
1438 */
1439void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1440{
1441	union drbd_state ns;
1442	int r;
1443
1444	if (mdev->state.conn >= C_SYNC_SOURCE) {
1445		dev_err(DEV, "Resync already running!\n");
1446		return;
1447	}
1448
1449	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1450	drbd_rs_cancel_all(mdev);
1451
1452	if (side == C_SYNC_TARGET) {
1453		/* Since application IO was locked out during C_WF_BITMAP_T and
1454		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1455		   we check that we might make the data inconsistent. */
1456		r = drbd_khelper(mdev, "before-resync-target");
1457		r = (r >> 8) & 0xff;
1458		if (r > 0) {
1459			dev_info(DEV, "before-resync-target handler returned %d, "
1460			     "dropping connection.\n", r);
1461			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1462			return;
1463		}
1464	}
1465
1466	drbd_state_lock(mdev);
1467
1468	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1469		drbd_state_unlock(mdev);
1470		return;
1471	}
1472
1473	if (side == C_SYNC_TARGET) {
1474		mdev->bm_resync_fo = 0;
1475	} else /* side == C_SYNC_SOURCE */ {
1476		u64 uuid;
1477
1478		get_random_bytes(&uuid, sizeof(u64));
1479		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1480		drbd_send_sync_uuid(mdev, uuid);
1481
1482		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1483	}
1484
1485	write_lock_irq(&global_state_lock);
1486	ns = mdev->state;
1487
1488	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1489
1490	ns.conn = side;
1491
1492	if (side == C_SYNC_TARGET)
1493		ns.disk = D_INCONSISTENT;
1494	else /* side == C_SYNC_SOURCE */
1495		ns.pdsk = D_INCONSISTENT;
1496
1497	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1498	ns = mdev->state;
1499
1500	if (ns.conn < C_CONNECTED)
1501		r = SS_UNKNOWN_ERROR;
1502
1503	if (r == SS_SUCCESS) {
1504		unsigned long tw = drbd_bm_total_weight(mdev);
1505		unsigned long now = jiffies;
1506		int i;
1507
1508		mdev->rs_failed    = 0;
1509		mdev->rs_paused    = 0;
1510		mdev->rs_same_csum = 0;
1511		mdev->rs_last_events = 0;
1512		mdev->rs_last_sect_ev = 0;
1513		mdev->rs_total     = tw;
1514		mdev->rs_start     = now;
1515		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1516			mdev->rs_mark_left[i] = tw;
1517			mdev->rs_mark_time[i] = now;
1518		}
1519		_drbd_pause_after(mdev);
1520	}
1521	write_unlock_irq(&global_state_lock);
1522	put_ldev(mdev);
1523
1524	if (r == SS_SUCCESS) {
1525		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1526		     drbd_conn_str(ns.conn),
1527		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1528		     (unsigned long) mdev->rs_total);
1529
1530		if (mdev->rs_total == 0) {
1531			/* Peer still reachable? Beware of failing before-resync-target handlers! */
1532			ping_peer(mdev);
1533			drbd_resync_finished(mdev);
1534		}
1535
1536		atomic_set(&mdev->rs_sect_in, 0);
1537		atomic_set(&mdev->rs_sect_ev, 0);
1538		mdev->rs_in_flight = 0;
1539		mdev->rs_planed = 0;
1540		spin_lock(&mdev->peer_seq_lock);
1541		fifo_set(&mdev->rs_plan_s, 0);
1542		spin_unlock(&mdev->peer_seq_lock);
1543		/* ns.conn may already be != mdev->state.conn,
1544		 * we may have been paused in between, or become paused until
1545		 * the timer triggers.
1546		 * No matter, that is handled in resync_timer_fn() */
1547		if (ns.conn == C_SYNC_TARGET)
1548			mod_timer(&mdev->resync_timer, jiffies);
1549
1550		drbd_md_sync(mdev);
1551	}
1552	drbd_state_unlock(mdev);
1553}
1554
1555int drbd_worker(struct drbd_thread *thi)
1556{
1557	struct drbd_conf *mdev = thi->mdev;
1558	struct drbd_work *w = NULL;
1559	LIST_HEAD(work_list);
1560	int intr = 0, i;
1561
1562	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1563
1564	while (get_t_state(thi) == Running) {
1565		drbd_thread_current_set_cpu(mdev);
1566
1567		if (down_trylock(&mdev->data.work.s)) {
1568			mutex_lock(&mdev->data.mutex);
1569			if (mdev->data.socket && !mdev->net_conf->no_cork)
1570				drbd_tcp_uncork(mdev->data.socket);
1571			mutex_unlock(&mdev->data.mutex);
1572
1573			intr = down_interruptible(&mdev->data.work.s);
1574
1575			mutex_lock(&mdev->data.mutex);
1576			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1577				drbd_tcp_cork(mdev->data.socket);
1578			mutex_unlock(&mdev->data.mutex);
1579		}
1580
1581		if (intr) {
1582			D_ASSERT(intr == -EINTR);
1583			flush_signals(current);
1584			ERR_IF (get_t_state(thi) == Running)
1585				continue;
1586			break;
1587		}
1588
1589		if (get_t_state(thi) != Running)
1590			break;
1591		/* With this break, we have done a down() but not consumed
1592		   the entry from the list. The cleanup code takes care of
1593		   this...   */
1594
1595		w = NULL;
1596		spin_lock_irq(&mdev->data.work.q_lock);
1597		ERR_IF(list_empty(&mdev->data.work.q)) {
1598			/* something terribly wrong in our logic.
1599			 * we were able to down() the semaphore,
1600			 * but the list is empty... doh.
1601			 *
1602			 * what is the best thing to do now?
1603			 * try again from scratch, restarting the receiver,
1604			 * asender, whatnot? could break even more ugly,
1605			 * e.g. when we are primary, but no good local data.
1606			 *
1607			 * I'll try to get away just starting over this loop.
1608			 */
1609			spin_unlock_irq(&mdev->data.work.q_lock);
1610			continue;
1611		}
1612		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1613		list_del_init(&w->list);
1614		spin_unlock_irq(&mdev->data.work.q_lock);
1615
1616		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1617			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1618			if (mdev->state.conn >= C_CONNECTED)
1619				drbd_force_state(mdev,
1620						NS(conn, C_NETWORK_FAILURE));
1621		}
1622	}
1623	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1624	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1625
1626	spin_lock_irq(&mdev->data.work.q_lock);
1627	i = 0;
1628	while (!list_empty(&mdev->data.work.q)) {
1629		list_splice_init(&mdev->data.work.q, &work_list);
1630		spin_unlock_irq(&mdev->data.work.q_lock);
1631
1632		while (!list_empty(&work_list)) {
1633			w = list_entry(work_list.next, struct drbd_work, list);
1634			list_del_init(&w->list);
1635			w->cb(mdev, w, 1);
1636			i++; /* dead debugging code */
1637		}
1638
1639		spin_lock_irq(&mdev->data.work.q_lock);
1640	}
1641	sema_init(&mdev->data.work.s, 0);
1642	/* DANGEROUS race: if someone did queue his work within the spinlock,
1643	 * but up() ed outside the spinlock, we could get an up() on the
1644	 * semaphore without corresponding list entry.
1645	 * So don't do that.
1646	 */
1647	spin_unlock_irq(&mdev->data.work.q_lock);
1648
1649	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1650	/* _drbd_set_state only uses stop_nowait.
1651	 * wait here for the Exiting receiver. */
1652	drbd_thread_stop(&mdev->receiver);
1653	drbd_mdev_cleanup(mdev);
1654
1655	dev_info(DEV, "worker terminated\n");
1656
1657	clear_bit(DEVICE_DYING, &mdev->flags);
1658	clear_bit(CONFIG_PENDING, &mdev->flags);
1659	wake_up(&mdev->state_wait);
1660
1661	return 0;
1662}
1663