drbd_worker.c revision 13d42685bec1f012dcbc5d187490eb1d15ec8219
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/smp_lock.h>
30#include <linux/wait.h>
31#include <linux/mm.h>
32#include <linux/memcontrol.h>
33#include <linux/mm_inline.h>
34#include <linux/slab.h>
35#include <linux/random.h>
36#include <linux/string.h>
37#include <linux/scatterlist.h>
38
39#include "drbd_int.h"
40#include "drbd_req.h"
41
42static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
43
44
45
46/* defined here:
47   drbd_md_io_complete
48   drbd_endio_sec
49   drbd_endio_pri
50
51 * more endio handlers:
52   atodb_endio in drbd_actlog.c
53   drbd_bm_async_io_complete in drbd_bitmap.c
54
55 * For all these callbacks, note the following:
56 * The callbacks will be called in irq context by the IDE drivers,
57 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
58 * Try to get the locking right :)
59 *
60 */
61
62
63/* About the global_state_lock
64   Each state transition on an device holds a read lock. In case we have
65   to evaluate the sync after dependencies, we grab a write lock, because
66   we need stable states on all devices for that.  */
67rwlock_t global_state_lock;
68
69/* used for synchronous meta data and bitmap IO
70 * submitted by drbd_md_sync_page_io()
71 */
72void drbd_md_io_complete(struct bio *bio, int error)
73{
74	struct drbd_md_io *md_io;
75
76	md_io = (struct drbd_md_io *)bio->bi_private;
77	md_io->error = error;
78
79	complete(&md_io->event);
80}
81
82/* reads on behalf of the partner,
83 * "submitted" by the receiver
84 */
85void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
86{
87	unsigned long flags = 0;
88	struct drbd_conf *mdev = e->mdev;
89
90	D_ASSERT(e->block_id != ID_VACANT);
91
92	spin_lock_irqsave(&mdev->req_lock, flags);
93	mdev->read_cnt += e->size >> 9;
94	list_del(&e->w.list);
95	if (list_empty(&mdev->read_ee))
96		wake_up(&mdev->ee_wait);
97	if (test_bit(__EE_WAS_ERROR, &e->flags))
98		__drbd_chk_io_error(mdev, FALSE);
99	spin_unlock_irqrestore(&mdev->req_lock, flags);
100
101	drbd_queue_work(&mdev->data.work, &e->w);
102	put_ldev(mdev);
103}
104
105static int is_failed_barrier(int ee_flags)
106{
107	return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
108			== (EE_IS_BARRIER|EE_WAS_ERROR);
109}
110
111/* writes on behalf of the partner, or resync writes,
112 * "submitted" by the receiver, final stage.  */
113static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
114{
115	unsigned long flags = 0;
116	struct drbd_conf *mdev = e->mdev;
117	sector_t e_sector;
118	int do_wake;
119	int is_syncer_req;
120	int do_al_complete_io;
121
122	/* if this is a failed barrier request, disable use of barriers,
123	 * and schedule for resubmission */
124	if (is_failed_barrier(e->flags)) {
125		drbd_bump_write_ordering(mdev, WO_bdev_flush);
126		spin_lock_irqsave(&mdev->req_lock, flags);
127		list_del(&e->w.list);
128		e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
129		e->w.cb = w_e_reissue;
130		/* put_ldev actually happens below, once we come here again. */
131		__release(local);
132		spin_unlock_irqrestore(&mdev->req_lock, flags);
133		drbd_queue_work(&mdev->data.work, &e->w);
134		return;
135	}
136
137	D_ASSERT(e->block_id != ID_VACANT);
138
139	/* after we moved e to done_ee,
140	 * we may no longer access it,
141	 * it may be freed/reused already!
142	 * (as soon as we release the req_lock) */
143	e_sector = e->sector;
144	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
145	is_syncer_req = is_syncer_block_id(e->block_id);
146
147	spin_lock_irqsave(&mdev->req_lock, flags);
148	mdev->writ_cnt += e->size >> 9;
149	list_del(&e->w.list); /* has been on active_ee or sync_ee */
150	list_add_tail(&e->w.list, &mdev->done_ee);
151
152	/* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
153	 * neither did we wake possibly waiting conflicting requests.
154	 * done from "drbd_process_done_ee" within the appropriate w.cb
155	 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
156
157	do_wake = is_syncer_req
158		? list_empty(&mdev->sync_ee)
159		: list_empty(&mdev->active_ee);
160
161	if (test_bit(__EE_WAS_ERROR, &e->flags))
162		__drbd_chk_io_error(mdev, FALSE);
163	spin_unlock_irqrestore(&mdev->req_lock, flags);
164
165	if (is_syncer_req)
166		drbd_rs_complete_io(mdev, e_sector);
167
168	if (do_wake)
169		wake_up(&mdev->ee_wait);
170
171	if (do_al_complete_io)
172		drbd_al_complete_io(mdev, e_sector);
173
174	wake_asender(mdev);
175	put_ldev(mdev);
176}
177
178/* writes on behalf of the partner, or resync writes,
179 * "submitted" by the receiver.
180 */
181void drbd_endio_sec(struct bio *bio, int error)
182{
183	struct drbd_epoch_entry *e = bio->bi_private;
184	struct drbd_conf *mdev = e->mdev;
185	int uptodate = bio_flagged(bio, BIO_UPTODATE);
186	int is_write = bio_data_dir(bio) == WRITE;
187
188	if (error)
189		dev_warn(DEV, "%s: error=%d s=%llus\n",
190				is_write ? "write" : "read", error,
191				(unsigned long long)e->sector);
192	if (!error && !uptodate) {
193		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
194				is_write ? "write" : "read",
195				(unsigned long long)e->sector);
196		/* strange behavior of some lower level drivers...
197		 * fail the request by clearing the uptodate flag,
198		 * but do not return any error?! */
199		error = -EIO;
200	}
201
202	if (error)
203		set_bit(__EE_WAS_ERROR, &e->flags);
204
205	bio_put(bio); /* no need for the bio anymore */
206	if (atomic_dec_and_test(&e->pending_bios)) {
207		if (is_write)
208			drbd_endio_write_sec_final(e);
209		else
210			drbd_endio_read_sec_final(e);
211	}
212}
213
214/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
215 */
216void drbd_endio_pri(struct bio *bio, int error)
217{
218	struct drbd_request *req = bio->bi_private;
219	struct drbd_conf *mdev = req->mdev;
220	enum drbd_req_event what;
221	int uptodate = bio_flagged(bio, BIO_UPTODATE);
222
223	if (!error && !uptodate) {
224		dev_warn(DEV, "p %s: setting error to -EIO\n",
225			 bio_data_dir(bio) == WRITE ? "write" : "read");
226		/* strange behavior of some lower level drivers...
227		 * fail the request by clearing the uptodate flag,
228		 * but do not return any error?! */
229		error = -EIO;
230	}
231
232	/* to avoid recursion in __req_mod */
233	if (unlikely(error)) {
234		what = (bio_data_dir(bio) == WRITE)
235			? write_completed_with_error
236			: (bio_rw(bio) == READ)
237			  ? read_completed_with_error
238			  : read_ahead_completed_with_error;
239	} else
240		what = completed_ok;
241
242	bio_put(req->private_bio);
243	req->private_bio = ERR_PTR(error);
244
245	req_mod(req, what);
246}
247
248int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
249{
250	struct drbd_request *req = container_of(w, struct drbd_request, w);
251
252	/* We should not detach for read io-error,
253	 * but try to WRITE the P_DATA_REPLY to the failed location,
254	 * to give the disk the chance to relocate that block */
255
256	spin_lock_irq(&mdev->req_lock);
257	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
258		_req_mod(req, read_retry_remote_canceled);
259		spin_unlock_irq(&mdev->req_lock);
260		return 1;
261	}
262	spin_unlock_irq(&mdev->req_lock);
263
264	return w_send_read_req(mdev, w, 0);
265}
266
267int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
268{
269	ERR_IF(cancel) return 1;
270	dev_err(DEV, "resync inactive, but callback triggered??\n");
271	return 1; /* Simply ignore this! */
272}
273
274void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
275{
276	struct hash_desc desc;
277	struct scatterlist sg;
278	struct page *page = e->pages;
279	struct page *tmp;
280	unsigned len;
281
282	desc.tfm = tfm;
283	desc.flags = 0;
284
285	sg_init_table(&sg, 1);
286	crypto_hash_init(&desc);
287
288	while ((tmp = page_chain_next(page))) {
289		/* all but the last page will be fully used */
290		sg_set_page(&sg, page, PAGE_SIZE, 0);
291		crypto_hash_update(&desc, &sg, sg.length);
292		page = tmp;
293	}
294	/* and now the last, possibly only partially used page */
295	len = e->size & (PAGE_SIZE - 1);
296	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
297	crypto_hash_update(&desc, &sg, sg.length);
298	crypto_hash_final(&desc, digest);
299}
300
301void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
302{
303	struct hash_desc desc;
304	struct scatterlist sg;
305	struct bio_vec *bvec;
306	int i;
307
308	desc.tfm = tfm;
309	desc.flags = 0;
310
311	sg_init_table(&sg, 1);
312	crypto_hash_init(&desc);
313
314	__bio_for_each_segment(bvec, bio, i, 0) {
315		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
316		crypto_hash_update(&desc, &sg, sg.length);
317	}
318	crypto_hash_final(&desc, digest);
319}
320
321static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
322{
323	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
324	int digest_size;
325	void *digest;
326	int ok;
327
328	D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
329
330	if (unlikely(cancel)) {
331		drbd_free_ee(mdev, e);
332		return 1;
333	}
334
335	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
336		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
337		digest = kmalloc(digest_size, GFP_NOIO);
338		if (digest) {
339			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
340
341			inc_rs_pending(mdev);
342			ok = drbd_send_drequest_csum(mdev,
343						     e->sector,
344						     e->size,
345						     digest,
346						     digest_size,
347						     P_CSUM_RS_REQUEST);
348			kfree(digest);
349		} else {
350			dev_err(DEV, "kmalloc() of digest failed.\n");
351			ok = 0;
352		}
353	} else
354		ok = 1;
355
356	drbd_free_ee(mdev, e);
357
358	if (unlikely(!ok))
359		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
360	return ok;
361}
362
363#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
364
365static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
366{
367	struct drbd_epoch_entry *e;
368
369	if (!get_ldev(mdev))
370		return -EIO;
371
372	if (drbd_rs_should_slow_down(mdev))
373		goto defer;
374
375	/* GFP_TRY, because if there is no memory available right now, this may
376	 * be rescheduled for later. It is "only" background resync, after all. */
377	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
378	if (!e)
379		goto defer;
380
381	e->w.cb = w_e_send_csum;
382	spin_lock_irq(&mdev->req_lock);
383	list_add(&e->w.list, &mdev->read_ee);
384	spin_unlock_irq(&mdev->req_lock);
385
386	atomic_add(size >> 9, &mdev->rs_sect_ev);
387	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
388		return 0;
389
390	/* drbd_submit_ee currently fails for one reason only:
391	 * not being able to allocate enough bios.
392	 * Is dropping the connection going to help? */
393	spin_lock_irq(&mdev->req_lock);
394	list_del(&e->w.list);
395	spin_unlock_irq(&mdev->req_lock);
396
397	drbd_free_ee(mdev, e);
398defer:
399	put_ldev(mdev);
400	return -EAGAIN;
401}
402
403void resync_timer_fn(unsigned long data)
404{
405	struct drbd_conf *mdev = (struct drbd_conf *) data;
406	int queue;
407
408	queue = 1;
409	switch (mdev->state.conn) {
410	case C_VERIFY_S:
411		mdev->resync_work.cb = w_make_ov_request;
412		break;
413	case C_SYNC_TARGET:
414		mdev->resync_work.cb = w_make_resync_request;
415		break;
416	default:
417		queue = 0;
418		mdev->resync_work.cb = w_resync_inactive;
419	}
420
421	/* harmless race: list_empty outside data.work.q_lock */
422	if (list_empty(&mdev->resync_work.list) && queue)
423		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
424}
425
426static void fifo_set(struct fifo_buffer *fb, int value)
427{
428	int i;
429
430	for (i = 0; i < fb->size; i++)
431		fb->values[i] = value;
432}
433
434static int fifo_push(struct fifo_buffer *fb, int value)
435{
436	int ov;
437
438	ov = fb->values[fb->head_index];
439	fb->values[fb->head_index++] = value;
440
441	if (fb->head_index >= fb->size)
442		fb->head_index = 0;
443
444	return ov;
445}
446
447static void fifo_add_val(struct fifo_buffer *fb, int value)
448{
449	int i;
450
451	for (i = 0; i < fb->size; i++)
452		fb->values[i] += value;
453}
454
455int drbd_rs_controller(struct drbd_conf *mdev)
456{
457	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
458	unsigned int want;     /* The number of sectors we want in the proxy */
459	int req_sect; /* Number of sectors to request in this turn */
460	int correction; /* Number of sectors more we need in the proxy*/
461	int cps; /* correction per invocation of drbd_rs_controller() */
462	int steps; /* Number of time steps to plan ahead */
463	int curr_corr;
464	int max_sect;
465
466	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
467	mdev->rs_in_flight -= sect_in;
468
469	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
470
471	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
472
473	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
474		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
475	} else { /* normal path */
476		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
477			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
478	}
479
480	correction = want - mdev->rs_in_flight - mdev->rs_planed;
481
482	/* Plan ahead */
483	cps = correction / steps;
484	fifo_add_val(&mdev->rs_plan_s, cps);
485	mdev->rs_planed += cps * steps;
486
487	/* What we do in this step */
488	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
489	spin_unlock(&mdev->peer_seq_lock);
490	mdev->rs_planed -= curr_corr;
491
492	req_sect = sect_in + curr_corr;
493	if (req_sect < 0)
494		req_sect = 0;
495
496	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
497	if (req_sect > max_sect)
498		req_sect = max_sect;
499
500	/*
501	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
502		 sect_in, mdev->rs_in_flight, want, correction,
503		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
504	*/
505
506	return req_sect;
507}
508
509int w_make_resync_request(struct drbd_conf *mdev,
510		struct drbd_work *w, int cancel)
511{
512	unsigned long bit;
513	sector_t sector;
514	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
515	int max_segment_size;
516	int number, rollback_i, size, pe, mx;
517	int align, queued, sndbuf;
518	int i = 0;
519
520	if (unlikely(cancel))
521		return 1;
522
523	if (unlikely(mdev->state.conn < C_CONNECTED)) {
524		dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
525		return 0;
526	}
527
528	if (mdev->state.conn != C_SYNC_TARGET)
529		dev_err(DEV, "%s in w_make_resync_request\n",
530			drbd_conn_str(mdev->state.conn));
531
532	if (mdev->rs_total == 0) {
533		/* empty resync? */
534		drbd_resync_finished(mdev);
535		return 1;
536	}
537
538	if (!get_ldev(mdev)) {
539		/* Since we only need to access mdev->rsync a
540		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
541		   to continue resync with a broken disk makes no sense at
542		   all */
543		dev_err(DEV, "Disk broke down during resync!\n");
544		mdev->resync_work.cb = w_resync_inactive;
545		return 1;
546	}
547
548	/* starting with drbd 8.3.8, we can handle multi-bio EEs,
549	 * if it should be necessary */
550	max_segment_size =
551		mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
552		mdev->agreed_pro_version < 95 ?	DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
553
554	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
555		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
556		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
557	} else {
558		mdev->c_sync_rate = mdev->sync_conf.rate;
559		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
560	}
561
562	/* Throttle resync on lower level disk activity, which may also be
563	 * caused by application IO on Primary/SyncTarget.
564	 * Keep this after the call to drbd_rs_controller, as that assumes
565	 * to be called as precisely as possible every SLEEP_TIME,
566	 * and would be confused otherwise. */
567	if (drbd_rs_should_slow_down(mdev))
568		goto requeue;
569
570	mutex_lock(&mdev->data.mutex);
571	if (mdev->data.socket)
572		mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
573	else
574		mx = 1;
575	mutex_unlock(&mdev->data.mutex);
576
577	/* For resync rates >160MB/sec, allow more pending RS requests */
578	if (number > mx)
579		mx = number;
580
581	/* Limit the number of pending RS requests to no more than the peer's receive buffer */
582	pe = atomic_read(&mdev->rs_pending_cnt);
583	if ((pe + number) > mx) {
584		number = mx - pe;
585	}
586
587	for (i = 0; i < number; i++) {
588		/* Stop generating RS requests, when half of the send buffer is filled */
589		mutex_lock(&mdev->data.mutex);
590		if (mdev->data.socket) {
591			queued = mdev->data.socket->sk->sk_wmem_queued;
592			sndbuf = mdev->data.socket->sk->sk_sndbuf;
593		} else {
594			queued = 1;
595			sndbuf = 0;
596		}
597		mutex_unlock(&mdev->data.mutex);
598		if (queued > sndbuf / 2)
599			goto requeue;
600
601next_sector:
602		size = BM_BLOCK_SIZE;
603		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
604
605		if (bit == -1UL) {
606			mdev->bm_resync_fo = drbd_bm_bits(mdev);
607			mdev->resync_work.cb = w_resync_inactive;
608			put_ldev(mdev);
609			return 1;
610		}
611
612		sector = BM_BIT_TO_SECT(bit);
613
614		if (drbd_try_rs_begin_io(mdev, sector)) {
615			mdev->bm_resync_fo = bit;
616			goto requeue;
617		}
618		mdev->bm_resync_fo = bit + 1;
619
620		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
621			drbd_rs_complete_io(mdev, sector);
622			goto next_sector;
623		}
624
625#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
626		/* try to find some adjacent bits.
627		 * we stop if we have already the maximum req size.
628		 *
629		 * Additionally always align bigger requests, in order to
630		 * be prepared for all stripe sizes of software RAIDs.
631		 */
632		align = 1;
633		rollback_i = i;
634		for (;;) {
635			if (size + BM_BLOCK_SIZE > max_segment_size)
636				break;
637
638			/* Be always aligned */
639			if (sector & ((1<<(align+3))-1))
640				break;
641
642			/* do not cross extent boundaries */
643			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
644				break;
645			/* now, is it actually dirty, after all?
646			 * caution, drbd_bm_test_bit is tri-state for some
647			 * obscure reason; ( b == 0 ) would get the out-of-band
648			 * only accidentally right because of the "oddly sized"
649			 * adjustment below */
650			if (drbd_bm_test_bit(mdev, bit+1) != 1)
651				break;
652			bit++;
653			size += BM_BLOCK_SIZE;
654			if ((BM_BLOCK_SIZE << align) <= size)
655				align++;
656			i++;
657		}
658		/* if we merged some,
659		 * reset the offset to start the next drbd_bm_find_next from */
660		if (size > BM_BLOCK_SIZE)
661			mdev->bm_resync_fo = bit + 1;
662#endif
663
664		/* adjust very last sectors, in case we are oddly sized */
665		if (sector + (size>>9) > capacity)
666			size = (capacity-sector)<<9;
667		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
668			switch (read_for_csum(mdev, sector, size)) {
669			case -EIO: /* Disk failure */
670				put_ldev(mdev);
671				return 0;
672			case -EAGAIN: /* allocation failed, or ldev busy */
673				drbd_rs_complete_io(mdev, sector);
674				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
675				i = rollback_i;
676				goto requeue;
677			case 0:
678				/* everything ok */
679				break;
680			default:
681				BUG();
682			}
683		} else {
684			inc_rs_pending(mdev);
685			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
686					       sector, size, ID_SYNCER)) {
687				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
688				dec_rs_pending(mdev);
689				put_ldev(mdev);
690				return 0;
691			}
692		}
693	}
694
695	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
696		/* last syncer _request_ was sent,
697		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
698		 * next sync group will resume), as soon as we receive the last
699		 * resync data block, and the last bit is cleared.
700		 * until then resync "work" is "inactive" ...
701		 */
702		mdev->resync_work.cb = w_resync_inactive;
703		put_ldev(mdev);
704		return 1;
705	}
706
707 requeue:
708	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
709	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
710	put_ldev(mdev);
711	return 1;
712}
713
714static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
715{
716	int number, i, size;
717	sector_t sector;
718	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
719
720	if (unlikely(cancel))
721		return 1;
722
723	if (unlikely(mdev->state.conn < C_CONNECTED)) {
724		dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
725		return 0;
726	}
727
728	number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
729	if (atomic_read(&mdev->rs_pending_cnt) > number)
730		goto requeue;
731
732	number -= atomic_read(&mdev->rs_pending_cnt);
733
734	sector = mdev->ov_position;
735	for (i = 0; i < number; i++) {
736		if (sector >= capacity) {
737			mdev->resync_work.cb = w_resync_inactive;
738			return 1;
739		}
740
741		size = BM_BLOCK_SIZE;
742
743		if (drbd_try_rs_begin_io(mdev, sector)) {
744			mdev->ov_position = sector;
745			goto requeue;
746		}
747
748		if (sector + (size>>9) > capacity)
749			size = (capacity-sector)<<9;
750
751		inc_rs_pending(mdev);
752		if (!drbd_send_ov_request(mdev, sector, size)) {
753			dec_rs_pending(mdev);
754			return 0;
755		}
756		sector += BM_SECT_PER_BIT;
757	}
758	mdev->ov_position = sector;
759
760 requeue:
761	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
762	return 1;
763}
764
765
766int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
767{
768	kfree(w);
769	ov_oos_print(mdev);
770	drbd_resync_finished(mdev);
771
772	return 1;
773}
774
775static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
776{
777	kfree(w);
778
779	drbd_resync_finished(mdev);
780
781	return 1;
782}
783
784static void ping_peer(struct drbd_conf *mdev)
785{
786	clear_bit(GOT_PING_ACK, &mdev->flags);
787	request_ping(mdev);
788	wait_event(mdev->misc_wait,
789		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
790}
791
792int drbd_resync_finished(struct drbd_conf *mdev)
793{
794	unsigned long db, dt, dbdt;
795	unsigned long n_oos;
796	union drbd_state os, ns;
797	struct drbd_work *w;
798	char *khelper_cmd = NULL;
799
800	/* Remove all elements from the resync LRU. Since future actions
801	 * might set bits in the (main) bitmap, then the entries in the
802	 * resync LRU would be wrong. */
803	if (drbd_rs_del_all(mdev)) {
804		/* In case this is not possible now, most probably because
805		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
806		 * queue (or even the read operations for those packets
807		 * is not finished by now).   Retry in 100ms. */
808
809		drbd_kick_lo(mdev);
810		__set_current_state(TASK_INTERRUPTIBLE);
811		schedule_timeout(HZ / 10);
812		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
813		if (w) {
814			w->cb = w_resync_finished;
815			drbd_queue_work(&mdev->data.work, w);
816			return 1;
817		}
818		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
819	}
820
821	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
822	if (dt <= 0)
823		dt = 1;
824	db = mdev->rs_total;
825	dbdt = Bit2KB(db/dt);
826	mdev->rs_paused /= HZ;
827
828	if (!get_ldev(mdev))
829		goto out;
830
831	ping_peer(mdev);
832
833	spin_lock_irq(&mdev->req_lock);
834	os = mdev->state;
835
836	/* This protects us against multiple calls (that can happen in the presence
837	   of application IO), and against connectivity loss just before we arrive here. */
838	if (os.conn <= C_CONNECTED)
839		goto out_unlock;
840
841	ns = os;
842	ns.conn = C_CONNECTED;
843
844	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
845	     (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
846	     "Online verify " : "Resync",
847	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
848
849	n_oos = drbd_bm_total_weight(mdev);
850
851	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
852		if (n_oos) {
853			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
854			      n_oos, Bit2KB(1));
855			khelper_cmd = "out-of-sync";
856		}
857	} else {
858		D_ASSERT((n_oos - mdev->rs_failed) == 0);
859
860		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
861			khelper_cmd = "after-resync-target";
862
863		if (mdev->csums_tfm && mdev->rs_total) {
864			const unsigned long s = mdev->rs_same_csum;
865			const unsigned long t = mdev->rs_total;
866			const int ratio =
867				(t == 0)     ? 0 :
868			(t < 100000) ? ((s*100)/t) : (s/(t/100));
869			dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
870			     "transferred %luK total %luK\n",
871			     ratio,
872			     Bit2KB(mdev->rs_same_csum),
873			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
874			     Bit2KB(mdev->rs_total));
875		}
876	}
877
878	if (mdev->rs_failed) {
879		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
880
881		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
882			ns.disk = D_INCONSISTENT;
883			ns.pdsk = D_UP_TO_DATE;
884		} else {
885			ns.disk = D_UP_TO_DATE;
886			ns.pdsk = D_INCONSISTENT;
887		}
888	} else {
889		ns.disk = D_UP_TO_DATE;
890		ns.pdsk = D_UP_TO_DATE;
891
892		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
893			if (mdev->p_uuid) {
894				int i;
895				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
896					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
897				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
898				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
899			} else {
900				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
901			}
902		}
903
904		drbd_uuid_set_bm(mdev, 0UL);
905
906		if (mdev->p_uuid) {
907			/* Now the two UUID sets are equal, update what we
908			 * know of the peer. */
909			int i;
910			for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
911				mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
912		}
913	}
914
915	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
916out_unlock:
917	spin_unlock_irq(&mdev->req_lock);
918	put_ldev(mdev);
919out:
920	mdev->rs_total  = 0;
921	mdev->rs_failed = 0;
922	mdev->rs_paused = 0;
923	mdev->ov_start_sector = 0;
924
925	drbd_md_sync(mdev);
926
927	if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
928		dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
929		drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
930	}
931
932	if (khelper_cmd)
933		drbd_khelper(mdev, khelper_cmd);
934
935	return 1;
936}
937
938/* helper */
939static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
940{
941	if (drbd_ee_has_active_page(e)) {
942		/* This might happen if sendpage() has not finished */
943		int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
944		atomic_add(i, &mdev->pp_in_use_by_net);
945		atomic_sub(i, &mdev->pp_in_use);
946		spin_lock_irq(&mdev->req_lock);
947		list_add_tail(&e->w.list, &mdev->net_ee);
948		spin_unlock_irq(&mdev->req_lock);
949		wake_up(&drbd_pp_wait);
950	} else
951		drbd_free_ee(mdev, e);
952}
953
954/**
955 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
956 * @mdev:	DRBD device.
957 * @w:		work object.
958 * @cancel:	The connection will be closed anyways
959 */
960int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
961{
962	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
963	int ok;
964
965	if (unlikely(cancel)) {
966		drbd_free_ee(mdev, e);
967		dec_unacked(mdev);
968		return 1;
969	}
970
971	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
972		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
973	} else {
974		if (__ratelimit(&drbd_ratelimit_state))
975			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
976			    (unsigned long long)e->sector);
977
978		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
979	}
980
981	dec_unacked(mdev);
982
983	move_to_net_ee_or_free(mdev, e);
984
985	if (unlikely(!ok))
986		dev_err(DEV, "drbd_send_block() failed\n");
987	return ok;
988}
989
990/**
991 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
992 * @mdev:	DRBD device.
993 * @w:		work object.
994 * @cancel:	The connection will be closed anyways
995 */
996int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
997{
998	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
999	int ok;
1000
1001	if (unlikely(cancel)) {
1002		drbd_free_ee(mdev, e);
1003		dec_unacked(mdev);
1004		return 1;
1005	}
1006
1007	if (get_ldev_if_state(mdev, D_FAILED)) {
1008		drbd_rs_complete_io(mdev, e->sector);
1009		put_ldev(mdev);
1010	}
1011
1012	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1013		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1014			inc_rs_pending(mdev);
1015			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1016		} else {
1017			if (__ratelimit(&drbd_ratelimit_state))
1018				dev_err(DEV, "Not sending RSDataReply, "
1019				    "partner DISKLESS!\n");
1020			ok = 1;
1021		}
1022	} else {
1023		if (__ratelimit(&drbd_ratelimit_state))
1024			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1025			    (unsigned long long)e->sector);
1026
1027		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1028
1029		/* update resync data with failure */
1030		drbd_rs_failed_io(mdev, e->sector, e->size);
1031	}
1032
1033	dec_unacked(mdev);
1034
1035	move_to_net_ee_or_free(mdev, e);
1036
1037	if (unlikely(!ok))
1038		dev_err(DEV, "drbd_send_block() failed\n");
1039	return ok;
1040}
1041
1042int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1043{
1044	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1045	struct digest_info *di;
1046	int digest_size;
1047	void *digest = NULL;
1048	int ok, eq = 0;
1049
1050	if (unlikely(cancel)) {
1051		drbd_free_ee(mdev, e);
1052		dec_unacked(mdev);
1053		return 1;
1054	}
1055
1056	if (get_ldev(mdev)) {
1057		drbd_rs_complete_io(mdev, e->sector);
1058		put_ldev(mdev);
1059	}
1060
1061	di = e->digest;
1062
1063	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1064		/* quick hack to try to avoid a race against reconfiguration.
1065		 * a real fix would be much more involved,
1066		 * introducing more locking mechanisms */
1067		if (mdev->csums_tfm) {
1068			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1069			D_ASSERT(digest_size == di->digest_size);
1070			digest = kmalloc(digest_size, GFP_NOIO);
1071		}
1072		if (digest) {
1073			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1074			eq = !memcmp(digest, di->digest, digest_size);
1075			kfree(digest);
1076		}
1077
1078		if (eq) {
1079			drbd_set_in_sync(mdev, e->sector, e->size);
1080			/* rs_same_csums unit is BM_BLOCK_SIZE */
1081			mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1082			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1083		} else {
1084			inc_rs_pending(mdev);
1085			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1086			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1087			kfree(di);
1088			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1089		}
1090	} else {
1091		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1092		if (__ratelimit(&drbd_ratelimit_state))
1093			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1094	}
1095
1096	dec_unacked(mdev);
1097	move_to_net_ee_or_free(mdev, e);
1098
1099	if (unlikely(!ok))
1100		dev_err(DEV, "drbd_send_block/ack() failed\n");
1101	return ok;
1102}
1103
1104int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1105{
1106	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1107	int digest_size;
1108	void *digest;
1109	int ok = 1;
1110
1111	if (unlikely(cancel))
1112		goto out;
1113
1114	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1115		goto out;
1116
1117	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1118	/* FIXME if this allocation fails, online verify will not terminate! */
1119	digest = kmalloc(digest_size, GFP_NOIO);
1120	if (digest) {
1121		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1122		inc_rs_pending(mdev);
1123		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1124					     digest, digest_size, P_OV_REPLY);
1125		if (!ok)
1126			dec_rs_pending(mdev);
1127		kfree(digest);
1128	}
1129
1130out:
1131	drbd_free_ee(mdev, e);
1132
1133	dec_unacked(mdev);
1134
1135	return ok;
1136}
1137
1138void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1139{
1140	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1141		mdev->ov_last_oos_size += size>>9;
1142	} else {
1143		mdev->ov_last_oos_start = sector;
1144		mdev->ov_last_oos_size = size>>9;
1145	}
1146	drbd_set_out_of_sync(mdev, sector, size);
1147	set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1148}
1149
1150int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1151{
1152	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1153	struct digest_info *di;
1154	int digest_size;
1155	void *digest;
1156	int ok, eq = 0;
1157
1158	if (unlikely(cancel)) {
1159		drbd_free_ee(mdev, e);
1160		dec_unacked(mdev);
1161		return 1;
1162	}
1163
1164	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1165	 * the resync lru has been cleaned up already */
1166	if (get_ldev(mdev)) {
1167		drbd_rs_complete_io(mdev, e->sector);
1168		put_ldev(mdev);
1169	}
1170
1171	di = e->digest;
1172
1173	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1174		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1175		digest = kmalloc(digest_size, GFP_NOIO);
1176		if (digest) {
1177			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1178
1179			D_ASSERT(digest_size == di->digest_size);
1180			eq = !memcmp(digest, di->digest, digest_size);
1181			kfree(digest);
1182		}
1183	} else {
1184		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1185		if (__ratelimit(&drbd_ratelimit_state))
1186			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1187	}
1188
1189	dec_unacked(mdev);
1190	if (!eq)
1191		drbd_ov_oos_found(mdev, e->sector, e->size);
1192	else
1193		ov_oos_print(mdev);
1194
1195	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1196			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1197
1198	drbd_free_ee(mdev, e);
1199
1200	if (--mdev->ov_left == 0) {
1201		ov_oos_print(mdev);
1202		drbd_resync_finished(mdev);
1203	}
1204
1205	return ok;
1206}
1207
1208int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1209{
1210	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1211	complete(&b->done);
1212	return 1;
1213}
1214
1215int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1216{
1217	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1218	struct p_barrier *p = &mdev->data.sbuf.barrier;
1219	int ok = 1;
1220
1221	/* really avoid racing with tl_clear.  w.cb may have been referenced
1222	 * just before it was reassigned and re-queued, so double check that.
1223	 * actually, this race was harmless, since we only try to send the
1224	 * barrier packet here, and otherwise do nothing with the object.
1225	 * but compare with the head of w_clear_epoch */
1226	spin_lock_irq(&mdev->req_lock);
1227	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1228		cancel = 1;
1229	spin_unlock_irq(&mdev->req_lock);
1230	if (cancel)
1231		return 1;
1232
1233	if (!drbd_get_data_sock(mdev))
1234		return 0;
1235	p->barrier = b->br_number;
1236	/* inc_ap_pending was done where this was queued.
1237	 * dec_ap_pending will be done in got_BarrierAck
1238	 * or (on connection loss) in w_clear_epoch.  */
1239	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1240				(struct p_header80 *)p, sizeof(*p), 0);
1241	drbd_put_data_sock(mdev);
1242
1243	return ok;
1244}
1245
1246int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1247{
1248	if (cancel)
1249		return 1;
1250	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1251}
1252
1253/**
1254 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1255 * @mdev:	DRBD device.
1256 * @w:		work object.
1257 * @cancel:	The connection will be closed anyways
1258 */
1259int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1260{
1261	struct drbd_request *req = container_of(w, struct drbd_request, w);
1262	int ok;
1263
1264	if (unlikely(cancel)) {
1265		req_mod(req, send_canceled);
1266		return 1;
1267	}
1268
1269	ok = drbd_send_dblock(mdev, req);
1270	req_mod(req, ok ? handed_over_to_network : send_failed);
1271
1272	return ok;
1273}
1274
1275/**
1276 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1277 * @mdev:	DRBD device.
1278 * @w:		work object.
1279 * @cancel:	The connection will be closed anyways
1280 */
1281int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1282{
1283	struct drbd_request *req = container_of(w, struct drbd_request, w);
1284	int ok;
1285
1286	if (unlikely(cancel)) {
1287		req_mod(req, send_canceled);
1288		return 1;
1289	}
1290
1291	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1292				(unsigned long)req);
1293
1294	if (!ok) {
1295		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1296		 * so this is probably redundant */
1297		if (mdev->state.conn >= C_CONNECTED)
1298			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1299	}
1300	req_mod(req, ok ? handed_over_to_network : send_failed);
1301
1302	return ok;
1303}
1304
1305int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1306{
1307	struct drbd_request *req = container_of(w, struct drbd_request, w);
1308
1309	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1310		drbd_al_begin_io(mdev, req->sector);
1311	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1312	   theoretically. Practically it can not deadlock, since this is
1313	   only used when unfreezing IOs. All the extents of the requests
1314	   that made it into the TL are already active */
1315
1316	drbd_req_make_private_bio(req, req->master_bio);
1317	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1318	generic_make_request(req->private_bio);
1319
1320	return 1;
1321}
1322
1323static int _drbd_may_sync_now(struct drbd_conf *mdev)
1324{
1325	struct drbd_conf *odev = mdev;
1326
1327	while (1) {
1328		if (odev->sync_conf.after == -1)
1329			return 1;
1330		odev = minor_to_mdev(odev->sync_conf.after);
1331		ERR_IF(!odev) return 1;
1332		if ((odev->state.conn >= C_SYNC_SOURCE &&
1333		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1334		    odev->state.aftr_isp || odev->state.peer_isp ||
1335		    odev->state.user_isp)
1336			return 0;
1337	}
1338}
1339
1340/**
1341 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342 * @mdev:	DRBD device.
1343 *
1344 * Called from process context only (admin command and after_state_ch).
1345 */
1346static int _drbd_pause_after(struct drbd_conf *mdev)
1347{
1348	struct drbd_conf *odev;
1349	int i, rv = 0;
1350
1351	for (i = 0; i < minor_count; i++) {
1352		odev = minor_to_mdev(i);
1353		if (!odev)
1354			continue;
1355		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1356			continue;
1357		if (!_drbd_may_sync_now(odev))
1358			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1359			       != SS_NOTHING_TO_DO);
1360	}
1361
1362	return rv;
1363}
1364
1365/**
1366 * _drbd_resume_next() - Resume resync on all devices that may resync now
1367 * @mdev:	DRBD device.
1368 *
1369 * Called from process context only (admin command and worker).
1370 */
1371static int _drbd_resume_next(struct drbd_conf *mdev)
1372{
1373	struct drbd_conf *odev;
1374	int i, rv = 0;
1375
1376	for (i = 0; i < minor_count; i++) {
1377		odev = minor_to_mdev(i);
1378		if (!odev)
1379			continue;
1380		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1381			continue;
1382		if (odev->state.aftr_isp) {
1383			if (_drbd_may_sync_now(odev))
1384				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1385							CS_HARD, NULL)
1386				       != SS_NOTHING_TO_DO) ;
1387		}
1388	}
1389	return rv;
1390}
1391
1392void resume_next_sg(struct drbd_conf *mdev)
1393{
1394	write_lock_irq(&global_state_lock);
1395	_drbd_resume_next(mdev);
1396	write_unlock_irq(&global_state_lock);
1397}
1398
1399void suspend_other_sg(struct drbd_conf *mdev)
1400{
1401	write_lock_irq(&global_state_lock);
1402	_drbd_pause_after(mdev);
1403	write_unlock_irq(&global_state_lock);
1404}
1405
1406static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1407{
1408	struct drbd_conf *odev;
1409
1410	if (o_minor == -1)
1411		return NO_ERROR;
1412	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1413		return ERR_SYNC_AFTER;
1414
1415	/* check for loops */
1416	odev = minor_to_mdev(o_minor);
1417	while (1) {
1418		if (odev == mdev)
1419			return ERR_SYNC_AFTER_CYCLE;
1420
1421		/* dependency chain ends here, no cycles. */
1422		if (odev->sync_conf.after == -1)
1423			return NO_ERROR;
1424
1425		/* follow the dependency chain */
1426		odev = minor_to_mdev(odev->sync_conf.after);
1427	}
1428}
1429
1430int drbd_alter_sa(struct drbd_conf *mdev, int na)
1431{
1432	int changes;
1433	int retcode;
1434
1435	write_lock_irq(&global_state_lock);
1436	retcode = sync_after_error(mdev, na);
1437	if (retcode == NO_ERROR) {
1438		mdev->sync_conf.after = na;
1439		do {
1440			changes  = _drbd_pause_after(mdev);
1441			changes |= _drbd_resume_next(mdev);
1442		} while (changes);
1443	}
1444	write_unlock_irq(&global_state_lock);
1445	return retcode;
1446}
1447
1448/**
1449 * drbd_start_resync() - Start the resync process
1450 * @mdev:	DRBD device.
1451 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1452 *
1453 * This function might bring you directly into one of the
1454 * C_PAUSED_SYNC_* states.
1455 */
1456void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1457{
1458	union drbd_state ns;
1459	int r;
1460
1461	if (mdev->state.conn >= C_SYNC_SOURCE) {
1462		dev_err(DEV, "Resync already running!\n");
1463		return;
1464	}
1465
1466	/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1467	drbd_rs_cancel_all(mdev);
1468
1469	if (side == C_SYNC_TARGET) {
1470		/* Since application IO was locked out during C_WF_BITMAP_T and
1471		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1472		   we check that we might make the data inconsistent. */
1473		r = drbd_khelper(mdev, "before-resync-target");
1474		r = (r >> 8) & 0xff;
1475		if (r > 0) {
1476			dev_info(DEV, "before-resync-target handler returned %d, "
1477			     "dropping connection.\n", r);
1478			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1479			return;
1480		}
1481	}
1482
1483	drbd_state_lock(mdev);
1484
1485	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1486		drbd_state_unlock(mdev);
1487		return;
1488	}
1489
1490	if (side == C_SYNC_TARGET) {
1491		mdev->bm_resync_fo = 0;
1492	} else /* side == C_SYNC_SOURCE */ {
1493		u64 uuid;
1494
1495		get_random_bytes(&uuid, sizeof(u64));
1496		drbd_uuid_set(mdev, UI_BITMAP, uuid);
1497		drbd_send_sync_uuid(mdev, uuid);
1498
1499		D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1500	}
1501
1502	write_lock_irq(&global_state_lock);
1503	ns = mdev->state;
1504
1505	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1506
1507	ns.conn = side;
1508
1509	if (side == C_SYNC_TARGET)
1510		ns.disk = D_INCONSISTENT;
1511	else /* side == C_SYNC_SOURCE */
1512		ns.pdsk = D_INCONSISTENT;
1513
1514	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1515	ns = mdev->state;
1516
1517	if (ns.conn < C_CONNECTED)
1518		r = SS_UNKNOWN_ERROR;
1519
1520	if (r == SS_SUCCESS) {
1521		unsigned long tw = drbd_bm_total_weight(mdev);
1522		unsigned long now = jiffies;
1523		int i;
1524
1525		mdev->rs_failed    = 0;
1526		mdev->rs_paused    = 0;
1527		mdev->rs_same_csum = 0;
1528		mdev->rs_last_events = 0;
1529		mdev->rs_last_sect_ev = 0;
1530		mdev->rs_total     = tw;
1531		mdev->rs_start     = now;
1532		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1533			mdev->rs_mark_left[i] = tw;
1534			mdev->rs_mark_time[i] = now;
1535		}
1536		_drbd_pause_after(mdev);
1537	}
1538	write_unlock_irq(&global_state_lock);
1539	put_ldev(mdev);
1540
1541	if (r == SS_SUCCESS) {
1542		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1543		     drbd_conn_str(ns.conn),
1544		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1545		     (unsigned long) mdev->rs_total);
1546
1547		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1548			/* This still has a race (about when exactly the peers
1549			 * detect connection loss) that can lead to a full sync
1550			 * on next handshake. In 8.3.9 we fixed this with explicit
1551			 * resync-finished notifications, but the fix
1552			 * introduces a protocol change.  Sleeping for some
1553			 * time longer than the ping interval + timeout on the
1554			 * SyncSource, to give the SyncTarget the chance to
1555			 * detect connection loss, then waiting for a ping
1556			 * response (implicit in drbd_resync_finished) reduces
1557			 * the race considerably, but does not solve it. */
1558			if (side == C_SYNC_SOURCE)
1559				schedule_timeout_interruptible(
1560					mdev->net_conf->ping_int * HZ +
1561					mdev->net_conf->ping_timeo*HZ/9);
1562			drbd_resync_finished(mdev);
1563		}
1564
1565		atomic_set(&mdev->rs_sect_in, 0);
1566		atomic_set(&mdev->rs_sect_ev, 0);
1567		mdev->rs_in_flight = 0;
1568		mdev->rs_planed = 0;
1569		spin_lock(&mdev->peer_seq_lock);
1570		fifo_set(&mdev->rs_plan_s, 0);
1571		spin_unlock(&mdev->peer_seq_lock);
1572		/* ns.conn may already be != mdev->state.conn,
1573		 * we may have been paused in between, or become paused until
1574		 * the timer triggers.
1575		 * No matter, that is handled in resync_timer_fn() */
1576		if (ns.conn == C_SYNC_TARGET)
1577			mod_timer(&mdev->resync_timer, jiffies);
1578
1579		drbd_md_sync(mdev);
1580	}
1581	drbd_state_unlock(mdev);
1582}
1583
1584int drbd_worker(struct drbd_thread *thi)
1585{
1586	struct drbd_conf *mdev = thi->mdev;
1587	struct drbd_work *w = NULL;
1588	LIST_HEAD(work_list);
1589	int intr = 0, i;
1590
1591	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1592
1593	while (get_t_state(thi) == Running) {
1594		drbd_thread_current_set_cpu(mdev);
1595
1596		if (down_trylock(&mdev->data.work.s)) {
1597			mutex_lock(&mdev->data.mutex);
1598			if (mdev->data.socket && !mdev->net_conf->no_cork)
1599				drbd_tcp_uncork(mdev->data.socket);
1600			mutex_unlock(&mdev->data.mutex);
1601
1602			intr = down_interruptible(&mdev->data.work.s);
1603
1604			mutex_lock(&mdev->data.mutex);
1605			if (mdev->data.socket  && !mdev->net_conf->no_cork)
1606				drbd_tcp_cork(mdev->data.socket);
1607			mutex_unlock(&mdev->data.mutex);
1608		}
1609
1610		if (intr) {
1611			D_ASSERT(intr == -EINTR);
1612			flush_signals(current);
1613			ERR_IF (get_t_state(thi) == Running)
1614				continue;
1615			break;
1616		}
1617
1618		if (get_t_state(thi) != Running)
1619			break;
1620		/* With this break, we have done a down() but not consumed
1621		   the entry from the list. The cleanup code takes care of
1622		   this...   */
1623
1624		w = NULL;
1625		spin_lock_irq(&mdev->data.work.q_lock);
1626		ERR_IF(list_empty(&mdev->data.work.q)) {
1627			/* something terribly wrong in our logic.
1628			 * we were able to down() the semaphore,
1629			 * but the list is empty... doh.
1630			 *
1631			 * what is the best thing to do now?
1632			 * try again from scratch, restarting the receiver,
1633			 * asender, whatnot? could break even more ugly,
1634			 * e.g. when we are primary, but no good local data.
1635			 *
1636			 * I'll try to get away just starting over this loop.
1637			 */
1638			spin_unlock_irq(&mdev->data.work.q_lock);
1639			continue;
1640		}
1641		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1642		list_del_init(&w->list);
1643		spin_unlock_irq(&mdev->data.work.q_lock);
1644
1645		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1646			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1647			if (mdev->state.conn >= C_CONNECTED)
1648				drbd_force_state(mdev,
1649						NS(conn, C_NETWORK_FAILURE));
1650		}
1651	}
1652	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1653	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1654
1655	spin_lock_irq(&mdev->data.work.q_lock);
1656	i = 0;
1657	while (!list_empty(&mdev->data.work.q)) {
1658		list_splice_init(&mdev->data.work.q, &work_list);
1659		spin_unlock_irq(&mdev->data.work.q_lock);
1660
1661		while (!list_empty(&work_list)) {
1662			w = list_entry(work_list.next, struct drbd_work, list);
1663			list_del_init(&w->list);
1664			w->cb(mdev, w, 1);
1665			i++; /* dead debugging code */
1666		}
1667
1668		spin_lock_irq(&mdev->data.work.q_lock);
1669	}
1670	sema_init(&mdev->data.work.s, 0);
1671	/* DANGEROUS race: if someone did queue his work within the spinlock,
1672	 * but up() ed outside the spinlock, we could get an up() on the
1673	 * semaphore without corresponding list entry.
1674	 * So don't do that.
1675	 */
1676	spin_unlock_irq(&mdev->data.work.q_lock);
1677
1678	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1679	/* _drbd_set_state only uses stop_nowait.
1680	 * wait here for the Exiting receiver. */
1681	drbd_thread_stop(&mdev->receiver);
1682	drbd_mdev_cleanup(mdev);
1683
1684	dev_info(DEV, "worker terminated\n");
1685
1686	clear_bit(DEVICE_DYING, &mdev->flags);
1687	clear_bit(CONFIG_PENDING, &mdev->flags);
1688	wake_up(&mdev->state_wait);
1689
1690	return 0;
1691}
1692