drbd_worker.c revision 89e58e755e37137135c28a90c93be1b28faff485
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
40
41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42static int w_make_resync_request(struct drbd_conf *mdev,
43				 struct drbd_work *w, int cancel);
44
45
46
47/* endio handlers:
48 *   drbd_md_io_complete (defined here)
49 *   drbd_endio_pri (defined here)
50 *   drbd_endio_sec (defined here)
51 *   bm_async_io_complete (defined in drbd_bitmap.c)
52 *
53 * For all these callbacks, note the following:
54 * The callbacks will be called in irq context by the IDE drivers,
55 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56 * Try to get the locking right :)
57 *
58 */
59
60
61/* About the global_state_lock
62   Each state transition on an device holds a read lock. In case we have
63   to evaluate the sync after dependencies, we grab a write lock, because
64   we need stable states on all devices for that.  */
65rwlock_t global_state_lock;
66
67/* used for synchronous meta data and bitmap IO
68 * submitted by drbd_md_sync_page_io()
69 */
70void drbd_md_io_complete(struct bio *bio, int error)
71{
72	struct drbd_md_io *md_io;
73
74	md_io = (struct drbd_md_io *)bio->bi_private;
75	md_io->error = error;
76
77	complete(&md_io->event);
78}
79
80/* reads on behalf of the partner,
81 * "submitted" by the receiver
82 */
83void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
84{
85	unsigned long flags = 0;
86	struct drbd_conf *mdev = e->mdev;
87
88	spin_lock_irqsave(&mdev->req_lock, flags);
89	mdev->read_cnt += e->i.size >> 9;
90	list_del(&e->w.list);
91	if (list_empty(&mdev->read_ee))
92		wake_up(&mdev->ee_wait);
93	if (test_bit(__EE_WAS_ERROR, &e->flags))
94		__drbd_chk_io_error(mdev, false);
95	spin_unlock_irqrestore(&mdev->req_lock, flags);
96
97	drbd_queue_work(&mdev->data.work, &e->w);
98	put_ldev(mdev);
99}
100
101/* writes on behalf of the partner, or resync writes,
102 * "submitted" by the receiver, final stage.  */
103static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
104{
105	unsigned long flags = 0;
106	struct drbd_conf *mdev = e->mdev;
107	sector_t e_sector;
108	int do_wake;
109	u64 block_id;
110	int do_al_complete_io;
111
112	/* after we moved e to done_ee,
113	 * we may no longer access it,
114	 * it may be freed/reused already!
115	 * (as soon as we release the req_lock) */
116	e_sector = e->i.sector;
117	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
118	block_id = e->block_id;
119
120	spin_lock_irqsave(&mdev->req_lock, flags);
121	mdev->writ_cnt += e->i.size >> 9;
122	list_del(&e->w.list); /* has been on active_ee or sync_ee */
123	list_add_tail(&e->w.list, &mdev->done_ee);
124
125	/*
126	 * Do not remove from the epoch_entries tree here: we did not send the
127	 * Ack yet and did not wake possibly waiting conflicting requests.
128	 * Removed from the tree from "drbd_process_done_ee" within the
129	 * appropriate w.cb (e_end_block/e_end_resync_block) or from
130	 * _drbd_clear_done_ee.
131	 */
132
133	do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
134
135	if (test_bit(__EE_WAS_ERROR, &e->flags))
136		__drbd_chk_io_error(mdev, false);
137	spin_unlock_irqrestore(&mdev->req_lock, flags);
138
139	if (block_id == ID_SYNCER)
140		drbd_rs_complete_io(mdev, e_sector);
141
142	if (do_wake)
143		wake_up(&mdev->ee_wait);
144
145	if (do_al_complete_io)
146		drbd_al_complete_io(mdev, e_sector);
147
148	wake_asender(mdev);
149	put_ldev(mdev);
150}
151
152/* writes on behalf of the partner, or resync writes,
153 * "submitted" by the receiver.
154 */
155void drbd_endio_sec(struct bio *bio, int error)
156{
157	struct drbd_epoch_entry *e = bio->bi_private;
158	struct drbd_conf *mdev = e->mdev;
159	int uptodate = bio_flagged(bio, BIO_UPTODATE);
160	int is_write = bio_data_dir(bio) == WRITE;
161
162	if (error && __ratelimit(&drbd_ratelimit_state))
163		dev_warn(DEV, "%s: error=%d s=%llus\n",
164				is_write ? "write" : "read", error,
165				(unsigned long long)e->i.sector);
166	if (!error && !uptodate) {
167		if (__ratelimit(&drbd_ratelimit_state))
168			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
169					is_write ? "write" : "read",
170					(unsigned long long)e->i.sector);
171		/* strange behavior of some lower level drivers...
172		 * fail the request by clearing the uptodate flag,
173		 * but do not return any error?! */
174		error = -EIO;
175	}
176
177	if (error)
178		set_bit(__EE_WAS_ERROR, &e->flags);
179
180	bio_put(bio); /* no need for the bio anymore */
181	if (atomic_dec_and_test(&e->pending_bios)) {
182		if (is_write)
183			drbd_endio_write_sec_final(e);
184		else
185			drbd_endio_read_sec_final(e);
186	}
187}
188
189/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
190 */
191void drbd_endio_pri(struct bio *bio, int error)
192{
193	unsigned long flags;
194	struct drbd_request *req = bio->bi_private;
195	struct drbd_conf *mdev = req->mdev;
196	struct bio_and_error m;
197	enum drbd_req_event what;
198	int uptodate = bio_flagged(bio, BIO_UPTODATE);
199
200	if (!error && !uptodate) {
201		dev_warn(DEV, "p %s: setting error to -EIO\n",
202			 bio_data_dir(bio) == WRITE ? "write" : "read");
203		/* strange behavior of some lower level drivers...
204		 * fail the request by clearing the uptodate flag,
205		 * but do not return any error?! */
206		error = -EIO;
207	}
208
209	/* to avoid recursion in __req_mod */
210	if (unlikely(error)) {
211		what = (bio_data_dir(bio) == WRITE)
212			? WRITE_COMPLETED_WITH_ERROR
213			: (bio_rw(bio) == READ)
214			  ? READ_COMPLETED_WITH_ERROR
215			  : READ_AHEAD_COMPLETED_WITH_ERROR;
216	} else
217		what = COMPLETED_OK;
218
219	bio_put(req->private_bio);
220	req->private_bio = ERR_PTR(error);
221
222	/* not req_mod(), we need irqsave here! */
223	spin_lock_irqsave(&mdev->req_lock, flags);
224	__req_mod(req, what, &m);
225	spin_unlock_irqrestore(&mdev->req_lock, flags);
226
227	if (m.bio)
228		complete_master_bio(mdev, &m);
229}
230
231int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
232{
233	struct drbd_request *req = container_of(w, struct drbd_request, w);
234
235	/* We should not detach for read io-error,
236	 * but try to WRITE the P_DATA_REPLY to the failed location,
237	 * to give the disk the chance to relocate that block */
238
239	spin_lock_irq(&mdev->req_lock);
240	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241		_req_mod(req, READ_RETRY_REMOTE_CANCELED);
242		spin_unlock_irq(&mdev->req_lock);
243		return 1;
244	}
245	spin_unlock_irq(&mdev->req_lock);
246
247	return w_send_read_req(mdev, w, 0);
248}
249
250void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
251{
252	struct hash_desc desc;
253	struct scatterlist sg;
254	struct page *page = e->pages;
255	struct page *tmp;
256	unsigned len;
257
258	desc.tfm = tfm;
259	desc.flags = 0;
260
261	sg_init_table(&sg, 1);
262	crypto_hash_init(&desc);
263
264	while ((tmp = page_chain_next(page))) {
265		/* all but the last page will be fully used */
266		sg_set_page(&sg, page, PAGE_SIZE, 0);
267		crypto_hash_update(&desc, &sg, sg.length);
268		page = tmp;
269	}
270	/* and now the last, possibly only partially used page */
271	len = e->i.size & (PAGE_SIZE - 1);
272	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
273	crypto_hash_update(&desc, &sg, sg.length);
274	crypto_hash_final(&desc, digest);
275}
276
277void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
278{
279	struct hash_desc desc;
280	struct scatterlist sg;
281	struct bio_vec *bvec;
282	int i;
283
284	desc.tfm = tfm;
285	desc.flags = 0;
286
287	sg_init_table(&sg, 1);
288	crypto_hash_init(&desc);
289
290	__bio_for_each_segment(bvec, bio, i, 0) {
291		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
292		crypto_hash_update(&desc, &sg, sg.length);
293	}
294	crypto_hash_final(&desc, digest);
295}
296
297/* TODO merge common code with w_e_end_ov_req */
298int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
299{
300	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
301	int digest_size;
302	void *digest;
303	int ok = 1;
304
305	if (unlikely(cancel))
306		goto out;
307
308	if (likely((e->flags & EE_WAS_ERROR) != 0))
309		goto out;
310
311	digest_size = crypto_hash_digestsize(mdev->csums_tfm);
312	digest = kmalloc(digest_size, GFP_NOIO);
313	if (digest) {
314		sector_t sector = e->i.sector;
315		unsigned int size = e->i.size;
316		drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
317		/* Free e and pages before send.
318		 * In case we block on congestion, we could otherwise run into
319		 * some distributed deadlock, if the other side blocks on
320		 * congestion as well, because our receiver blocks in
321		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
322		drbd_free_ee(mdev, e);
323		e = NULL;
324		inc_rs_pending(mdev);
325		ok = drbd_send_drequest_csum(mdev, sector, size,
326					     digest, digest_size,
327					     P_CSUM_RS_REQUEST);
328		kfree(digest);
329	} else {
330		dev_err(DEV, "kmalloc() of digest failed.\n");
331		ok = 0;
332	}
333
334out:
335	if (e)
336		drbd_free_ee(mdev, e);
337
338	if (unlikely(!ok))
339		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340	return ok;
341}
342
343#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
344
345static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346{
347	struct drbd_epoch_entry *e;
348
349	if (!get_ldev(mdev))
350		return -EIO;
351
352	if (drbd_rs_should_slow_down(mdev, sector))
353		goto defer;
354
355	/* GFP_TRY, because if there is no memory available right now, this may
356	 * be rescheduled for later. It is "only" background resync, after all. */
357	e = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
358	if (!e)
359		goto defer;
360
361	e->w.cb = w_e_send_csum;
362	spin_lock_irq(&mdev->req_lock);
363	list_add(&e->w.list, &mdev->read_ee);
364	spin_unlock_irq(&mdev->req_lock);
365
366	atomic_add(size >> 9, &mdev->rs_sect_ev);
367	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
368		return 0;
369
370	/* If it failed because of ENOMEM, retry should help.  If it failed
371	 * because bio_add_page failed (probably broken lower level driver),
372	 * retry may or may not help.
373	 * If it does not, you may need to force disconnect. */
374	spin_lock_irq(&mdev->req_lock);
375	list_del(&e->w.list);
376	spin_unlock_irq(&mdev->req_lock);
377
378	drbd_free_ee(mdev, e);
379defer:
380	put_ldev(mdev);
381	return -EAGAIN;
382}
383
384int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
385{
386	switch (mdev->state.conn) {
387	case C_VERIFY_S:
388		w_make_ov_request(mdev, w, cancel);
389		break;
390	case C_SYNC_TARGET:
391		w_make_resync_request(mdev, w, cancel);
392		break;
393	}
394
395	return 1;
396}
397
398void resync_timer_fn(unsigned long data)
399{
400	struct drbd_conf *mdev = (struct drbd_conf *) data;
401
402	if (list_empty(&mdev->resync_work.list))
403		drbd_queue_work(&mdev->data.work, &mdev->resync_work);
404}
405
406static void fifo_set(struct fifo_buffer *fb, int value)
407{
408	int i;
409
410	for (i = 0; i < fb->size; i++)
411		fb->values[i] = value;
412}
413
414static int fifo_push(struct fifo_buffer *fb, int value)
415{
416	int ov;
417
418	ov = fb->values[fb->head_index];
419	fb->values[fb->head_index++] = value;
420
421	if (fb->head_index >= fb->size)
422		fb->head_index = 0;
423
424	return ov;
425}
426
427static void fifo_add_val(struct fifo_buffer *fb, int value)
428{
429	int i;
430
431	for (i = 0; i < fb->size; i++)
432		fb->values[i] += value;
433}
434
435static int drbd_rs_controller(struct drbd_conf *mdev)
436{
437	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
438	unsigned int want;     /* The number of sectors we want in the proxy */
439	int req_sect; /* Number of sectors to request in this turn */
440	int correction; /* Number of sectors more we need in the proxy*/
441	int cps; /* correction per invocation of drbd_rs_controller() */
442	int steps; /* Number of time steps to plan ahead */
443	int curr_corr;
444	int max_sect;
445
446	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
447	mdev->rs_in_flight -= sect_in;
448
449	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
450
451	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
452
453	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
454		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
455	} else { /* normal path */
456		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
457			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
458	}
459
460	correction = want - mdev->rs_in_flight - mdev->rs_planed;
461
462	/* Plan ahead */
463	cps = correction / steps;
464	fifo_add_val(&mdev->rs_plan_s, cps);
465	mdev->rs_planed += cps * steps;
466
467	/* What we do in this step */
468	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
469	spin_unlock(&mdev->peer_seq_lock);
470	mdev->rs_planed -= curr_corr;
471
472	req_sect = sect_in + curr_corr;
473	if (req_sect < 0)
474		req_sect = 0;
475
476	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
477	if (req_sect > max_sect)
478		req_sect = max_sect;
479
480	/*
481	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
482		 sect_in, mdev->rs_in_flight, want, correction,
483		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
484	*/
485
486	return req_sect;
487}
488
489static int drbd_rs_number_requests(struct drbd_conf *mdev)
490{
491	int number;
492	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
493		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
494		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
495	} else {
496		mdev->c_sync_rate = mdev->sync_conf.rate;
497		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
498	}
499
500	/* ignore the amount of pending requests, the resync controller should
501	 * throttle down to incoming reply rate soon enough anyways. */
502	return number;
503}
504
505static int w_make_resync_request(struct drbd_conf *mdev,
506				 struct drbd_work *w, int cancel)
507{
508	unsigned long bit;
509	sector_t sector;
510	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
511	int max_bio_size;
512	int number, rollback_i, size;
513	int align, queued, sndbuf;
514	int i = 0;
515
516	if (unlikely(cancel))
517		return 1;
518
519	if (mdev->rs_total == 0) {
520		/* empty resync? */
521		drbd_resync_finished(mdev);
522		return 1;
523	}
524
525	if (!get_ldev(mdev)) {
526		/* Since we only need to access mdev->rsync a
527		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
528		   to continue resync with a broken disk makes no sense at
529		   all */
530		dev_err(DEV, "Disk broke down during resync!\n");
531		return 1;
532	}
533
534	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
535	number = drbd_rs_number_requests(mdev);
536	if (number == 0)
537		goto requeue;
538
539	for (i = 0; i < number; i++) {
540		/* Stop generating RS requests, when half of the send buffer is filled */
541		mutex_lock(&mdev->data.mutex);
542		if (mdev->data.socket) {
543			queued = mdev->data.socket->sk->sk_wmem_queued;
544			sndbuf = mdev->data.socket->sk->sk_sndbuf;
545		} else {
546			queued = 1;
547			sndbuf = 0;
548		}
549		mutex_unlock(&mdev->data.mutex);
550		if (queued > sndbuf / 2)
551			goto requeue;
552
553next_sector:
554		size = BM_BLOCK_SIZE;
555		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
556
557		if (bit == DRBD_END_OF_BITMAP) {
558			mdev->bm_resync_fo = drbd_bm_bits(mdev);
559			put_ldev(mdev);
560			return 1;
561		}
562
563		sector = BM_BIT_TO_SECT(bit);
564
565		if (drbd_rs_should_slow_down(mdev, sector) ||
566		    drbd_try_rs_begin_io(mdev, sector)) {
567			mdev->bm_resync_fo = bit;
568			goto requeue;
569		}
570		mdev->bm_resync_fo = bit + 1;
571
572		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
573			drbd_rs_complete_io(mdev, sector);
574			goto next_sector;
575		}
576
577#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
578		/* try to find some adjacent bits.
579		 * we stop if we have already the maximum req size.
580		 *
581		 * Additionally always align bigger requests, in order to
582		 * be prepared for all stripe sizes of software RAIDs.
583		 */
584		align = 1;
585		rollback_i = i;
586		for (;;) {
587			if (size + BM_BLOCK_SIZE > max_bio_size)
588				break;
589
590			/* Be always aligned */
591			if (sector & ((1<<(align+3))-1))
592				break;
593
594			/* do not cross extent boundaries */
595			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
596				break;
597			/* now, is it actually dirty, after all?
598			 * caution, drbd_bm_test_bit is tri-state for some
599			 * obscure reason; ( b == 0 ) would get the out-of-band
600			 * only accidentally right because of the "oddly sized"
601			 * adjustment below */
602			if (drbd_bm_test_bit(mdev, bit+1) != 1)
603				break;
604			bit++;
605			size += BM_BLOCK_SIZE;
606			if ((BM_BLOCK_SIZE << align) <= size)
607				align++;
608			i++;
609		}
610		/* if we merged some,
611		 * reset the offset to start the next drbd_bm_find_next from */
612		if (size > BM_BLOCK_SIZE)
613			mdev->bm_resync_fo = bit + 1;
614#endif
615
616		/* adjust very last sectors, in case we are oddly sized */
617		if (sector + (size>>9) > capacity)
618			size = (capacity-sector)<<9;
619		if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
620			switch (read_for_csum(mdev, sector, size)) {
621			case -EIO: /* Disk failure */
622				put_ldev(mdev);
623				return 0;
624			case -EAGAIN: /* allocation failed, or ldev busy */
625				drbd_rs_complete_io(mdev, sector);
626				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
627				i = rollback_i;
628				goto requeue;
629			case 0:
630				/* everything ok */
631				break;
632			default:
633				BUG();
634			}
635		} else {
636			inc_rs_pending(mdev);
637			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
638					       sector, size, ID_SYNCER)) {
639				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
640				dec_rs_pending(mdev);
641				put_ldev(mdev);
642				return 0;
643			}
644		}
645	}
646
647	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
648		/* last syncer _request_ was sent,
649		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
650		 * next sync group will resume), as soon as we receive the last
651		 * resync data block, and the last bit is cleared.
652		 * until then resync "work" is "inactive" ...
653		 */
654		put_ldev(mdev);
655		return 1;
656	}
657
658 requeue:
659	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
660	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
661	put_ldev(mdev);
662	return 1;
663}
664
665static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
666{
667	int number, i, size;
668	sector_t sector;
669	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
670
671	if (unlikely(cancel))
672		return 1;
673
674	number = drbd_rs_number_requests(mdev);
675
676	sector = mdev->ov_position;
677	for (i = 0; i < number; i++) {
678		if (sector >= capacity) {
679			return 1;
680		}
681
682		size = BM_BLOCK_SIZE;
683
684		if (drbd_rs_should_slow_down(mdev, sector) ||
685		    drbd_try_rs_begin_io(mdev, sector)) {
686			mdev->ov_position = sector;
687			goto requeue;
688		}
689
690		if (sector + (size>>9) > capacity)
691			size = (capacity-sector)<<9;
692
693		inc_rs_pending(mdev);
694		if (!drbd_send_ov_request(mdev, sector, size)) {
695			dec_rs_pending(mdev);
696			return 0;
697		}
698		sector += BM_SECT_PER_BIT;
699	}
700	mdev->ov_position = sector;
701
702 requeue:
703	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
704	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
705	return 1;
706}
707
708
709void start_resync_timer_fn(unsigned long data)
710{
711	struct drbd_conf *mdev = (struct drbd_conf *) data;
712
713	drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
714}
715
716int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
717{
718	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
719		dev_warn(DEV, "w_start_resync later...\n");
720		mdev->start_resync_timer.expires = jiffies + HZ/10;
721		add_timer(&mdev->start_resync_timer);
722		return 1;
723	}
724
725	drbd_start_resync(mdev, C_SYNC_SOURCE);
726	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
727	return 1;
728}
729
730int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
731{
732	kfree(w);
733	ov_oos_print(mdev);
734	drbd_resync_finished(mdev);
735
736	return 1;
737}
738
739static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
740{
741	kfree(w);
742
743	drbd_resync_finished(mdev);
744
745	return 1;
746}
747
748static void ping_peer(struct drbd_conf *mdev)
749{
750	clear_bit(GOT_PING_ACK, &mdev->flags);
751	request_ping(mdev);
752	wait_event(mdev->misc_wait,
753		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
754}
755
756int drbd_resync_finished(struct drbd_conf *mdev)
757{
758	unsigned long db, dt, dbdt;
759	unsigned long n_oos;
760	union drbd_state os, ns;
761	struct drbd_work *w;
762	char *khelper_cmd = NULL;
763	int verify_done = 0;
764
765	/* Remove all elements from the resync LRU. Since future actions
766	 * might set bits in the (main) bitmap, then the entries in the
767	 * resync LRU would be wrong. */
768	if (drbd_rs_del_all(mdev)) {
769		/* In case this is not possible now, most probably because
770		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
771		 * queue (or even the read operations for those packets
772		 * is not finished by now).   Retry in 100ms. */
773
774		schedule_timeout_interruptible(HZ / 10);
775		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
776		if (w) {
777			w->cb = w_resync_finished;
778			drbd_queue_work(&mdev->data.work, w);
779			return 1;
780		}
781		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
782	}
783
784	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
785	if (dt <= 0)
786		dt = 1;
787	db = mdev->rs_total;
788	dbdt = Bit2KB(db/dt);
789	mdev->rs_paused /= HZ;
790
791	if (!get_ldev(mdev))
792		goto out;
793
794	ping_peer(mdev);
795
796	spin_lock_irq(&mdev->req_lock);
797	os = mdev->state;
798
799	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
800
801	/* This protects us against multiple calls (that can happen in the presence
802	   of application IO), and against connectivity loss just before we arrive here. */
803	if (os.conn <= C_CONNECTED)
804		goto out_unlock;
805
806	ns = os;
807	ns.conn = C_CONNECTED;
808
809	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
810	     verify_done ? "Online verify " : "Resync",
811	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
812
813	n_oos = drbd_bm_total_weight(mdev);
814
815	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
816		if (n_oos) {
817			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
818			      n_oos, Bit2KB(1));
819			khelper_cmd = "out-of-sync";
820		}
821	} else {
822		D_ASSERT((n_oos - mdev->rs_failed) == 0);
823
824		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
825			khelper_cmd = "after-resync-target";
826
827		if (mdev->csums_tfm && mdev->rs_total) {
828			const unsigned long s = mdev->rs_same_csum;
829			const unsigned long t = mdev->rs_total;
830			const int ratio =
831				(t == 0)     ? 0 :
832			(t < 100000) ? ((s*100)/t) : (s/(t/100));
833			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
834			     "transferred %luK total %luK\n",
835			     ratio,
836			     Bit2KB(mdev->rs_same_csum),
837			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
838			     Bit2KB(mdev->rs_total));
839		}
840	}
841
842	if (mdev->rs_failed) {
843		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
844
845		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
846			ns.disk = D_INCONSISTENT;
847			ns.pdsk = D_UP_TO_DATE;
848		} else {
849			ns.disk = D_UP_TO_DATE;
850			ns.pdsk = D_INCONSISTENT;
851		}
852	} else {
853		ns.disk = D_UP_TO_DATE;
854		ns.pdsk = D_UP_TO_DATE;
855
856		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
857			if (mdev->p_uuid) {
858				int i;
859				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
860					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
861				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
862				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
863			} else {
864				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
865			}
866		}
867
868		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
869			/* for verify runs, we don't update uuids here,
870			 * so there would be nothing to report. */
871			drbd_uuid_set_bm(mdev, 0UL);
872			drbd_print_uuids(mdev, "updated UUIDs");
873			if (mdev->p_uuid) {
874				/* Now the two UUID sets are equal, update what we
875				 * know of the peer. */
876				int i;
877				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
878					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
879			}
880		}
881	}
882
883	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
884out_unlock:
885	spin_unlock_irq(&mdev->req_lock);
886	put_ldev(mdev);
887out:
888	mdev->rs_total  = 0;
889	mdev->rs_failed = 0;
890	mdev->rs_paused = 0;
891	if (verify_done)
892		mdev->ov_start_sector = 0;
893
894	drbd_md_sync(mdev);
895
896	if (khelper_cmd)
897		drbd_khelper(mdev, khelper_cmd);
898
899	return 1;
900}
901
902/* helper */
903static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
904{
905	if (drbd_ee_has_active_page(e)) {
906		/* This might happen if sendpage() has not finished */
907		int i = (e->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
908		atomic_add(i, &mdev->pp_in_use_by_net);
909		atomic_sub(i, &mdev->pp_in_use);
910		spin_lock_irq(&mdev->req_lock);
911		list_add_tail(&e->w.list, &mdev->net_ee);
912		spin_unlock_irq(&mdev->req_lock);
913		wake_up(&drbd_pp_wait);
914	} else
915		drbd_free_ee(mdev, e);
916}
917
918/**
919 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
920 * @mdev:	DRBD device.
921 * @w:		work object.
922 * @cancel:	The connection will be closed anyways
923 */
924int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
925{
926	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
927	int ok;
928
929	if (unlikely(cancel)) {
930		drbd_free_ee(mdev, e);
931		dec_unacked(mdev);
932		return 1;
933	}
934
935	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
936		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
937	} else {
938		if (__ratelimit(&drbd_ratelimit_state))
939			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
940			    (unsigned long long)e->i.sector);
941
942		ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
943	}
944
945	dec_unacked(mdev);
946
947	move_to_net_ee_or_free(mdev, e);
948
949	if (unlikely(!ok))
950		dev_err(DEV, "drbd_send_block() failed\n");
951	return ok;
952}
953
954/**
955 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
956 * @mdev:	DRBD device.
957 * @w:		work object.
958 * @cancel:	The connection will be closed anyways
959 */
960int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
961{
962	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
963	int ok;
964
965	if (unlikely(cancel)) {
966		drbd_free_ee(mdev, e);
967		dec_unacked(mdev);
968		return 1;
969	}
970
971	if (get_ldev_if_state(mdev, D_FAILED)) {
972		drbd_rs_complete_io(mdev, e->i.sector);
973		put_ldev(mdev);
974	}
975
976	if (mdev->state.conn == C_AHEAD) {
977		ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
978	} else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
979		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
980			inc_rs_pending(mdev);
981			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
982		} else {
983			if (__ratelimit(&drbd_ratelimit_state))
984				dev_err(DEV, "Not sending RSDataReply, "
985				    "partner DISKLESS!\n");
986			ok = 1;
987		}
988	} else {
989		if (__ratelimit(&drbd_ratelimit_state))
990			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
991			    (unsigned long long)e->i.sector);
992
993		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
994
995		/* update resync data with failure */
996		drbd_rs_failed_io(mdev, e->i.sector, e->i.size);
997	}
998
999	dec_unacked(mdev);
1000
1001	move_to_net_ee_or_free(mdev, e);
1002
1003	if (unlikely(!ok))
1004		dev_err(DEV, "drbd_send_block() failed\n");
1005	return ok;
1006}
1007
1008int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1009{
1010	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1011	struct digest_info *di;
1012	int digest_size;
1013	void *digest = NULL;
1014	int ok, eq = 0;
1015
1016	if (unlikely(cancel)) {
1017		drbd_free_ee(mdev, e);
1018		dec_unacked(mdev);
1019		return 1;
1020	}
1021
1022	if (get_ldev(mdev)) {
1023		drbd_rs_complete_io(mdev, e->i.sector);
1024		put_ldev(mdev);
1025	}
1026
1027	di = e->digest;
1028
1029	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1030		/* quick hack to try to avoid a race against reconfiguration.
1031		 * a real fix would be much more involved,
1032		 * introducing more locking mechanisms */
1033		if (mdev->csums_tfm) {
1034			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1035			D_ASSERT(digest_size == di->digest_size);
1036			digest = kmalloc(digest_size, GFP_NOIO);
1037		}
1038		if (digest) {
1039			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1040			eq = !memcmp(digest, di->digest, digest_size);
1041			kfree(digest);
1042		}
1043
1044		if (eq) {
1045			drbd_set_in_sync(mdev, e->i.sector, e->i.size);
1046			/* rs_same_csums unit is BM_BLOCK_SIZE */
1047			mdev->rs_same_csum += e->i.size >> BM_BLOCK_SHIFT;
1048			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1049		} else {
1050			inc_rs_pending(mdev);
1051			e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1052			e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1053			kfree(di);
1054			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1055		}
1056	} else {
1057		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1058		if (__ratelimit(&drbd_ratelimit_state))
1059			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1060	}
1061
1062	dec_unacked(mdev);
1063	move_to_net_ee_or_free(mdev, e);
1064
1065	if (unlikely(!ok))
1066		dev_err(DEV, "drbd_send_block/ack() failed\n");
1067	return ok;
1068}
1069
1070/* TODO merge common code with w_e_send_csum */
1071int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1072{
1073	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1074	sector_t sector = e->i.sector;
1075	unsigned int size = e->i.size;
1076	int digest_size;
1077	void *digest;
1078	int ok = 1;
1079
1080	if (unlikely(cancel))
1081		goto out;
1082
1083	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1084	digest = kmalloc(digest_size, GFP_NOIO);
1085	if (!digest) {
1086		ok = 0;	/* terminate the connection in case the allocation failed */
1087		goto out;
1088	}
1089
1090	if (likely(!(e->flags & EE_WAS_ERROR)))
1091		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1092	else
1093		memset(digest, 0, digest_size);
1094
1095	/* Free e and pages before send.
1096	 * In case we block on congestion, we could otherwise run into
1097	 * some distributed deadlock, if the other side blocks on
1098	 * congestion as well, because our receiver blocks in
1099	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1100	drbd_free_ee(mdev, e);
1101	e = NULL;
1102	inc_rs_pending(mdev);
1103	ok = drbd_send_drequest_csum(mdev, sector, size,
1104				     digest, digest_size,
1105				     P_OV_REPLY);
1106	if (!ok)
1107		dec_rs_pending(mdev);
1108	kfree(digest);
1109
1110out:
1111	if (e)
1112		drbd_free_ee(mdev, e);
1113	dec_unacked(mdev);
1114	return ok;
1115}
1116
1117void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1118{
1119	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1120		mdev->ov_last_oos_size += size>>9;
1121	} else {
1122		mdev->ov_last_oos_start = sector;
1123		mdev->ov_last_oos_size = size>>9;
1124	}
1125	drbd_set_out_of_sync(mdev, sector, size);
1126}
1127
1128int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1129{
1130	struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1131	struct digest_info *di;
1132	void *digest;
1133	sector_t sector = e->i.sector;
1134	unsigned int size = e->i.size;
1135	int digest_size;
1136	int ok, eq = 0;
1137
1138	if (unlikely(cancel)) {
1139		drbd_free_ee(mdev, e);
1140		dec_unacked(mdev);
1141		return 1;
1142	}
1143
1144	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1145	 * the resync lru has been cleaned up already */
1146	if (get_ldev(mdev)) {
1147		drbd_rs_complete_io(mdev, e->i.sector);
1148		put_ldev(mdev);
1149	}
1150
1151	di = e->digest;
1152
1153	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1154		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1155		digest = kmalloc(digest_size, GFP_NOIO);
1156		if (digest) {
1157			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1158
1159			D_ASSERT(digest_size == di->digest_size);
1160			eq = !memcmp(digest, di->digest, digest_size);
1161			kfree(digest);
1162		}
1163	}
1164
1165		/* Free e and pages before send.
1166		 * In case we block on congestion, we could otherwise run into
1167		 * some distributed deadlock, if the other side blocks on
1168		 * congestion as well, because our receiver blocks in
1169		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1170	drbd_free_ee(mdev, e);
1171	if (!eq)
1172		drbd_ov_oos_found(mdev, sector, size);
1173	else
1174		ov_oos_print(mdev);
1175
1176	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1177			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1178
1179	dec_unacked(mdev);
1180
1181	--mdev->ov_left;
1182
1183	/* let's advance progress step marks only for every other megabyte */
1184	if ((mdev->ov_left & 0x200) == 0x200)
1185		drbd_advance_rs_marks(mdev, mdev->ov_left);
1186
1187	if (mdev->ov_left == 0) {
1188		ov_oos_print(mdev);
1189		drbd_resync_finished(mdev);
1190	}
1191
1192	return ok;
1193}
1194
1195int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1196{
1197	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1198	complete(&b->done);
1199	return 1;
1200}
1201
1202int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1203{
1204	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1205	struct p_barrier *p = &mdev->data.sbuf.barrier;
1206	int ok = 1;
1207
1208	/* really avoid racing with tl_clear.  w.cb may have been referenced
1209	 * just before it was reassigned and re-queued, so double check that.
1210	 * actually, this race was harmless, since we only try to send the
1211	 * barrier packet here, and otherwise do nothing with the object.
1212	 * but compare with the head of w_clear_epoch */
1213	spin_lock_irq(&mdev->req_lock);
1214	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1215		cancel = 1;
1216	spin_unlock_irq(&mdev->req_lock);
1217	if (cancel)
1218		return 1;
1219
1220	if (!drbd_get_data_sock(mdev))
1221		return 0;
1222	p->barrier = b->br_number;
1223	/* inc_ap_pending was done where this was queued.
1224	 * dec_ap_pending will be done in got_BarrierAck
1225	 * or (on connection loss) in w_clear_epoch.  */
1226	ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1227				(struct p_header80 *)p, sizeof(*p), 0);
1228	drbd_put_data_sock(mdev);
1229
1230	return ok;
1231}
1232
1233int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1234{
1235	if (cancel)
1236		return 1;
1237	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1238}
1239
1240int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1241{
1242	struct drbd_request *req = container_of(w, struct drbd_request, w);
1243	int ok;
1244
1245	if (unlikely(cancel)) {
1246		req_mod(req, SEND_CANCELED);
1247		return 1;
1248	}
1249
1250	ok = drbd_send_oos(mdev, req);
1251	req_mod(req, OOS_HANDED_TO_NETWORK);
1252
1253	return ok;
1254}
1255
1256/**
1257 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1258 * @mdev:	DRBD device.
1259 * @w:		work object.
1260 * @cancel:	The connection will be closed anyways
1261 */
1262int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1263{
1264	struct drbd_request *req = container_of(w, struct drbd_request, w);
1265	int ok;
1266
1267	if (unlikely(cancel)) {
1268		req_mod(req, SEND_CANCELED);
1269		return 1;
1270	}
1271
1272	ok = drbd_send_dblock(mdev, req);
1273	req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1274
1275	return ok;
1276}
1277
1278/**
1279 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1280 * @mdev:	DRBD device.
1281 * @w:		work object.
1282 * @cancel:	The connection will be closed anyways
1283 */
1284int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1285{
1286	struct drbd_request *req = container_of(w, struct drbd_request, w);
1287	int ok;
1288
1289	if (unlikely(cancel)) {
1290		req_mod(req, SEND_CANCELED);
1291		return 1;
1292	}
1293
1294	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1295				(unsigned long)req);
1296
1297	if (!ok) {
1298		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1299		 * so this is probably redundant */
1300		if (mdev->state.conn >= C_CONNECTED)
1301			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1302	}
1303	req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1304
1305	return ok;
1306}
1307
1308int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1309{
1310	struct drbd_request *req = container_of(w, struct drbd_request, w);
1311
1312	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1313		drbd_al_begin_io(mdev, req->i.sector);
1314	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1315	   theoretically. Practically it can not deadlock, since this is
1316	   only used when unfreezing IOs. All the extents of the requests
1317	   that made it into the TL are already active */
1318
1319	drbd_req_make_private_bio(req, req->master_bio);
1320	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1321	generic_make_request(req->private_bio);
1322
1323	return 1;
1324}
1325
1326static int _drbd_may_sync_now(struct drbd_conf *mdev)
1327{
1328	struct drbd_conf *odev = mdev;
1329
1330	while (1) {
1331		if (odev->sync_conf.after == -1)
1332			return 1;
1333		odev = minor_to_mdev(odev->sync_conf.after);
1334		if (!expect(odev))
1335			return 1;
1336		if ((odev->state.conn >= C_SYNC_SOURCE &&
1337		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1338		    odev->state.aftr_isp || odev->state.peer_isp ||
1339		    odev->state.user_isp)
1340			return 0;
1341	}
1342}
1343
1344/**
1345 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1346 * @mdev:	DRBD device.
1347 *
1348 * Called from process context only (admin command and after_state_ch).
1349 */
1350static int _drbd_pause_after(struct drbd_conf *mdev)
1351{
1352	struct drbd_conf *odev;
1353	int i, rv = 0;
1354
1355	for (i = 0; i < minor_count; i++) {
1356		odev = minor_to_mdev(i);
1357		if (!odev)
1358			continue;
1359		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1360			continue;
1361		if (!_drbd_may_sync_now(odev))
1362			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1363			       != SS_NOTHING_TO_DO);
1364	}
1365
1366	return rv;
1367}
1368
1369/**
1370 * _drbd_resume_next() - Resume resync on all devices that may resync now
1371 * @mdev:	DRBD device.
1372 *
1373 * Called from process context only (admin command and worker).
1374 */
1375static int _drbd_resume_next(struct drbd_conf *mdev)
1376{
1377	struct drbd_conf *odev;
1378	int i, rv = 0;
1379
1380	for (i = 0; i < minor_count; i++) {
1381		odev = minor_to_mdev(i);
1382		if (!odev)
1383			continue;
1384		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1385			continue;
1386		if (odev->state.aftr_isp) {
1387			if (_drbd_may_sync_now(odev))
1388				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1389							CS_HARD, NULL)
1390				       != SS_NOTHING_TO_DO) ;
1391		}
1392	}
1393	return rv;
1394}
1395
1396void resume_next_sg(struct drbd_conf *mdev)
1397{
1398	write_lock_irq(&global_state_lock);
1399	_drbd_resume_next(mdev);
1400	write_unlock_irq(&global_state_lock);
1401}
1402
1403void suspend_other_sg(struct drbd_conf *mdev)
1404{
1405	write_lock_irq(&global_state_lock);
1406	_drbd_pause_after(mdev);
1407	write_unlock_irq(&global_state_lock);
1408}
1409
1410static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1411{
1412	struct drbd_conf *odev;
1413
1414	if (o_minor == -1)
1415		return NO_ERROR;
1416	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1417		return ERR_SYNC_AFTER;
1418
1419	/* check for loops */
1420	odev = minor_to_mdev(o_minor);
1421	while (1) {
1422		if (odev == mdev)
1423			return ERR_SYNC_AFTER_CYCLE;
1424
1425		/* dependency chain ends here, no cycles. */
1426		if (odev->sync_conf.after == -1)
1427			return NO_ERROR;
1428
1429		/* follow the dependency chain */
1430		odev = minor_to_mdev(odev->sync_conf.after);
1431	}
1432}
1433
1434int drbd_alter_sa(struct drbd_conf *mdev, int na)
1435{
1436	int changes;
1437	int retcode;
1438
1439	write_lock_irq(&global_state_lock);
1440	retcode = sync_after_error(mdev, na);
1441	if (retcode == NO_ERROR) {
1442		mdev->sync_conf.after = na;
1443		do {
1444			changes  = _drbd_pause_after(mdev);
1445			changes |= _drbd_resume_next(mdev);
1446		} while (changes);
1447	}
1448	write_unlock_irq(&global_state_lock);
1449	return retcode;
1450}
1451
1452void drbd_rs_controller_reset(struct drbd_conf *mdev)
1453{
1454	atomic_set(&mdev->rs_sect_in, 0);
1455	atomic_set(&mdev->rs_sect_ev, 0);
1456	mdev->rs_in_flight = 0;
1457	mdev->rs_planed = 0;
1458	spin_lock(&mdev->peer_seq_lock);
1459	fifo_set(&mdev->rs_plan_s, 0);
1460	spin_unlock(&mdev->peer_seq_lock);
1461}
1462
1463/**
1464 * drbd_start_resync() - Start the resync process
1465 * @mdev:	DRBD device.
1466 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1467 *
1468 * This function might bring you directly into one of the
1469 * C_PAUSED_SYNC_* states.
1470 */
1471void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1472{
1473	union drbd_state ns;
1474	int r;
1475
1476	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1477		dev_err(DEV, "Resync already running!\n");
1478		return;
1479	}
1480
1481	if (mdev->state.conn < C_AHEAD) {
1482		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1483		drbd_rs_cancel_all(mdev);
1484		/* This should be done when we abort the resync. We definitely do not
1485		   want to have this for connections going back and forth between
1486		   Ahead/Behind and SyncSource/SyncTarget */
1487	}
1488
1489	if (side == C_SYNC_TARGET) {
1490		/* Since application IO was locked out during C_WF_BITMAP_T and
1491		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1492		   we check that we might make the data inconsistent. */
1493		r = drbd_khelper(mdev, "before-resync-target");
1494		r = (r >> 8) & 0xff;
1495		if (r > 0) {
1496			dev_info(DEV, "before-resync-target handler returned %d, "
1497			     "dropping connection.\n", r);
1498			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1499			return;
1500		}
1501	} else /* C_SYNC_SOURCE */ {
1502		r = drbd_khelper(mdev, "before-resync-source");
1503		r = (r >> 8) & 0xff;
1504		if (r > 0) {
1505			if (r == 3) {
1506				dev_info(DEV, "before-resync-source handler returned %d, "
1507					 "ignoring. Old userland tools?", r);
1508			} else {
1509				dev_info(DEV, "before-resync-source handler returned %d, "
1510					 "dropping connection.\n", r);
1511				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1512				return;
1513			}
1514		}
1515	}
1516
1517	drbd_state_lock(mdev);
1518
1519	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1520		drbd_state_unlock(mdev);
1521		return;
1522	}
1523
1524	write_lock_irq(&global_state_lock);
1525	ns = mdev->state;
1526
1527	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1528
1529	ns.conn = side;
1530
1531	if (side == C_SYNC_TARGET)
1532		ns.disk = D_INCONSISTENT;
1533	else /* side == C_SYNC_SOURCE */
1534		ns.pdsk = D_INCONSISTENT;
1535
1536	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1537	ns = mdev->state;
1538
1539	if (ns.conn < C_CONNECTED)
1540		r = SS_UNKNOWN_ERROR;
1541
1542	if (r == SS_SUCCESS) {
1543		unsigned long tw = drbd_bm_total_weight(mdev);
1544		unsigned long now = jiffies;
1545		int i;
1546
1547		mdev->rs_failed    = 0;
1548		mdev->rs_paused    = 0;
1549		mdev->rs_same_csum = 0;
1550		mdev->rs_last_events = 0;
1551		mdev->rs_last_sect_ev = 0;
1552		mdev->rs_total     = tw;
1553		mdev->rs_start     = now;
1554		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1555			mdev->rs_mark_left[i] = tw;
1556			mdev->rs_mark_time[i] = now;
1557		}
1558		_drbd_pause_after(mdev);
1559	}
1560	write_unlock_irq(&global_state_lock);
1561
1562	if (r == SS_SUCCESS) {
1563		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1564		     drbd_conn_str(ns.conn),
1565		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1566		     (unsigned long) mdev->rs_total);
1567		if (side == C_SYNC_TARGET)
1568			mdev->bm_resync_fo = 0;
1569
1570		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1571		 * with w_send_oos, or the sync target will get confused as to
1572		 * how much bits to resync.  We cannot do that always, because for an
1573		 * empty resync and protocol < 95, we need to do it here, as we call
1574		 * drbd_resync_finished from here in that case.
1575		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1576		 * and from after_state_ch otherwise. */
1577		if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1578			drbd_gen_and_send_sync_uuid(mdev);
1579
1580		if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1581			/* This still has a race (about when exactly the peers
1582			 * detect connection loss) that can lead to a full sync
1583			 * on next handshake. In 8.3.9 we fixed this with explicit
1584			 * resync-finished notifications, but the fix
1585			 * introduces a protocol change.  Sleeping for some
1586			 * time longer than the ping interval + timeout on the
1587			 * SyncSource, to give the SyncTarget the chance to
1588			 * detect connection loss, then waiting for a ping
1589			 * response (implicit in drbd_resync_finished) reduces
1590			 * the race considerably, but does not solve it. */
1591			if (side == C_SYNC_SOURCE)
1592				schedule_timeout_interruptible(
1593					mdev->tconn->net_conf->ping_int * HZ +
1594					mdev->tconn->net_conf->ping_timeo*HZ/9);
1595			drbd_resync_finished(mdev);
1596		}
1597
1598		drbd_rs_controller_reset(mdev);
1599		/* ns.conn may already be != mdev->state.conn,
1600		 * we may have been paused in between, or become paused until
1601		 * the timer triggers.
1602		 * No matter, that is handled in resync_timer_fn() */
1603		if (ns.conn == C_SYNC_TARGET)
1604			mod_timer(&mdev->resync_timer, jiffies);
1605
1606		drbd_md_sync(mdev);
1607	}
1608	put_ldev(mdev);
1609	drbd_state_unlock(mdev);
1610}
1611
1612int drbd_worker(struct drbd_thread *thi)
1613{
1614	struct drbd_conf *mdev = thi->mdev;
1615	struct drbd_work *w = NULL;
1616	LIST_HEAD(work_list);
1617	int intr = 0, i;
1618
1619	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1620
1621	while (get_t_state(thi) == RUNNING) {
1622		drbd_thread_current_set_cpu(mdev);
1623
1624		if (down_trylock(&mdev->data.work.s)) {
1625			mutex_lock(&mdev->data.mutex);
1626			if (mdev->data.socket && !mdev->tconn->net_conf->no_cork)
1627				drbd_tcp_uncork(mdev->data.socket);
1628			mutex_unlock(&mdev->data.mutex);
1629
1630			intr = down_interruptible(&mdev->data.work.s);
1631
1632			mutex_lock(&mdev->data.mutex);
1633			if (mdev->data.socket  && !mdev->tconn->net_conf->no_cork)
1634				drbd_tcp_cork(mdev->data.socket);
1635			mutex_unlock(&mdev->data.mutex);
1636		}
1637
1638		if (intr) {
1639			D_ASSERT(intr == -EINTR);
1640			flush_signals(current);
1641			if (!expect(get_t_state(thi) != RUNNING))
1642				continue;
1643			break;
1644		}
1645
1646		if (get_t_state(thi) != RUNNING)
1647			break;
1648		/* With this break, we have done a down() but not consumed
1649		   the entry from the list. The cleanup code takes care of
1650		   this...   */
1651
1652		w = NULL;
1653		spin_lock_irq(&mdev->data.work.q_lock);
1654		if (!expect(!list_empty(&mdev->data.work.q))) {
1655			/* something terribly wrong in our logic.
1656			 * we were able to down() the semaphore,
1657			 * but the list is empty... doh.
1658			 *
1659			 * what is the best thing to do now?
1660			 * try again from scratch, restarting the receiver,
1661			 * asender, whatnot? could break even more ugly,
1662			 * e.g. when we are primary, but no good local data.
1663			 *
1664			 * I'll try to get away just starting over this loop.
1665			 */
1666			spin_unlock_irq(&mdev->data.work.q_lock);
1667			continue;
1668		}
1669		w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1670		list_del_init(&w->list);
1671		spin_unlock_irq(&mdev->data.work.q_lock);
1672
1673		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1674			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1675			if (mdev->state.conn >= C_CONNECTED)
1676				drbd_force_state(mdev,
1677						NS(conn, C_NETWORK_FAILURE));
1678		}
1679	}
1680	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1681	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1682
1683	spin_lock_irq(&mdev->data.work.q_lock);
1684	i = 0;
1685	while (!list_empty(&mdev->data.work.q)) {
1686		list_splice_init(&mdev->data.work.q, &work_list);
1687		spin_unlock_irq(&mdev->data.work.q_lock);
1688
1689		while (!list_empty(&work_list)) {
1690			w = list_entry(work_list.next, struct drbd_work, list);
1691			list_del_init(&w->list);
1692			w->cb(mdev, w, 1);
1693			i++; /* dead debugging code */
1694		}
1695
1696		spin_lock_irq(&mdev->data.work.q_lock);
1697	}
1698	sema_init(&mdev->data.work.s, 0);
1699	/* DANGEROUS race: if someone did queue his work within the spinlock,
1700	 * but up() ed outside the spinlock, we could get an up() on the
1701	 * semaphore without corresponding list entry.
1702	 * So don't do that.
1703	 */
1704	spin_unlock_irq(&mdev->data.work.q_lock);
1705
1706	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1707	/* _drbd_set_state only uses stop_nowait.
1708	 * wait here for the EXITING receiver. */
1709	drbd_thread_stop(&mdev->receiver);
1710	drbd_mdev_cleanup(mdev);
1711
1712	dev_info(DEV, "worker terminated\n");
1713
1714	clear_bit(DEVICE_DYING, &mdev->flags);
1715	clear_bit(CONFIG_PENDING, &mdev->flags);
1716	wake_up(&mdev->state_wait);
1717
1718	return 0;
1719}
1720