drbd_worker.c revision daeda1cca91d58bb6c8e45f6734f021bab9c28b7
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
40
41static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44/* endio handlers:
45 *   drbd_md_io_complete (defined here)
46 *   drbd_request_endio (defined here)
47 *   drbd_peer_request_endio (defined here)
48 *   bm_async_io_complete (defined in drbd_bitmap.c)
49 *
50 * For all these callbacks, note the following:
51 * The callbacks will be called in irq context by the IDE drivers,
52 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53 * Try to get the locking right :)
54 *
55 */
56
57
58/* About the global_state_lock
59   Each state transition on an device holds a read lock. In case we have
60   to evaluate the sync after dependencies, we grab a write lock, because
61   we need stable states on all devices for that.  */
62rwlock_t global_state_lock;
63
64/* used for synchronous meta data and bitmap IO
65 * submitted by drbd_md_sync_page_io()
66 */
67void drbd_md_io_complete(struct bio *bio, int error)
68{
69	struct drbd_md_io *md_io;
70
71	md_io = (struct drbd_md_io *)bio->bi_private;
72	md_io->error = error;
73
74	complete(&md_io->event);
75}
76
77/* reads on behalf of the partner,
78 * "submitted" by the receiver
79 */
80void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
81{
82	unsigned long flags = 0;
83	struct drbd_conf *mdev = peer_req->w.mdev;
84
85	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
86	mdev->read_cnt += peer_req->i.size >> 9;
87	list_del(&peer_req->w.list);
88	if (list_empty(&mdev->read_ee))
89		wake_up(&mdev->ee_wait);
90	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
91		__drbd_chk_io_error(mdev, false);
92	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
93
94	drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
95	put_ldev(mdev);
96}
97
98/* writes on behalf of the partner, or resync writes,
99 * "submitted" by the receiver, final stage.  */
100static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101{
102	unsigned long flags = 0;
103	struct drbd_conf *mdev = peer_req->w.mdev;
104	struct drbd_interval i;
105	int do_wake;
106	u64 block_id;
107	int do_al_complete_io;
108
109	/* after we moved peer_req to done_ee,
110	 * we may no longer access it,
111	 * it may be freed/reused already!
112	 * (as soon as we release the req_lock) */
113	i = peer_req->i;
114	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
115	block_id = peer_req->block_id;
116
117	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
118	mdev->writ_cnt += peer_req->i.size >> 9;
119	list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
120	list_add_tail(&peer_req->w.list, &mdev->done_ee);
121
122	/*
123	 * Do not remove from the write_requests tree here: we did not send the
124	 * Ack yet and did not wake possibly waiting conflicting requests.
125	 * Removed from the tree from "drbd_process_done_ee" within the
126	 * appropriate w.cb (e_end_block/e_end_resync_block) or from
127	 * _drbd_clear_done_ee.
128	 */
129
130	do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
131
132	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
133		__drbd_chk_io_error(mdev, false);
134	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
135
136	if (block_id == ID_SYNCER)
137		drbd_rs_complete_io(mdev, i.sector);
138
139	if (do_wake)
140		wake_up(&mdev->ee_wait);
141
142	if (do_al_complete_io)
143		drbd_al_complete_io(mdev, &i);
144
145	wake_asender(mdev->tconn);
146	put_ldev(mdev);
147}
148
149/* writes on behalf of the partner, or resync writes,
150 * "submitted" by the receiver.
151 */
152void drbd_peer_request_endio(struct bio *bio, int error)
153{
154	struct drbd_peer_request *peer_req = bio->bi_private;
155	struct drbd_conf *mdev = peer_req->w.mdev;
156	int uptodate = bio_flagged(bio, BIO_UPTODATE);
157	int is_write = bio_data_dir(bio) == WRITE;
158
159	if (error && __ratelimit(&drbd_ratelimit_state))
160		dev_warn(DEV, "%s: error=%d s=%llus\n",
161				is_write ? "write" : "read", error,
162				(unsigned long long)peer_req->i.sector);
163	if (!error && !uptodate) {
164		if (__ratelimit(&drbd_ratelimit_state))
165			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
166					is_write ? "write" : "read",
167					(unsigned long long)peer_req->i.sector);
168		/* strange behavior of some lower level drivers...
169		 * fail the request by clearing the uptodate flag,
170		 * but do not return any error?! */
171		error = -EIO;
172	}
173
174	if (error)
175		set_bit(__EE_WAS_ERROR, &peer_req->flags);
176
177	bio_put(bio); /* no need for the bio anymore */
178	if (atomic_dec_and_test(&peer_req->pending_bios)) {
179		if (is_write)
180			drbd_endio_write_sec_final(peer_req);
181		else
182			drbd_endio_read_sec_final(peer_req);
183	}
184}
185
186/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
187 */
188void drbd_request_endio(struct bio *bio, int error)
189{
190	unsigned long flags;
191	struct drbd_request *req = bio->bi_private;
192	struct drbd_conf *mdev = req->w.mdev;
193	struct bio_and_error m;
194	enum drbd_req_event what;
195	int uptodate = bio_flagged(bio, BIO_UPTODATE);
196
197	if (!error && !uptodate) {
198		dev_warn(DEV, "p %s: setting error to -EIO\n",
199			 bio_data_dir(bio) == WRITE ? "write" : "read");
200		/* strange behavior of some lower level drivers...
201		 * fail the request by clearing the uptodate flag,
202		 * but do not return any error?! */
203		error = -EIO;
204	}
205
206	/* to avoid recursion in __req_mod */
207	if (unlikely(error)) {
208		what = (bio_data_dir(bio) == WRITE)
209			? WRITE_COMPLETED_WITH_ERROR
210			: (bio_rw(bio) == READ)
211			  ? READ_COMPLETED_WITH_ERROR
212			  : READ_AHEAD_COMPLETED_WITH_ERROR;
213	} else
214		what = COMPLETED_OK;
215
216	bio_put(req->private_bio);
217	req->private_bio = ERR_PTR(error);
218
219	/* not req_mod(), we need irqsave here! */
220	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
221	__req_mod(req, what, &m);
222	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
223
224	if (m.bio)
225		complete_master_bio(mdev, &m);
226}
227
228int w_read_retry_remote(struct drbd_work *w, int cancel)
229{
230	struct drbd_request *req = container_of(w, struct drbd_request, w);
231	struct drbd_conf *mdev = w->mdev;
232
233	/* We should not detach for read io-error,
234	 * but try to WRITE the P_DATA_REPLY to the failed location,
235	 * to give the disk the chance to relocate that block */
236
237	spin_lock_irq(&mdev->tconn->req_lock);
238	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
239		_req_mod(req, READ_RETRY_REMOTE_CANCELED);
240		spin_unlock_irq(&mdev->tconn->req_lock);
241		return 0;
242	}
243	spin_unlock_irq(&mdev->tconn->req_lock);
244
245	return w_send_read_req(w, 0);
246}
247
248void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
249		  struct drbd_peer_request *peer_req, void *digest)
250{
251	struct hash_desc desc;
252	struct scatterlist sg;
253	struct page *page = peer_req->pages;
254	struct page *tmp;
255	unsigned len;
256
257	desc.tfm = tfm;
258	desc.flags = 0;
259
260	sg_init_table(&sg, 1);
261	crypto_hash_init(&desc);
262
263	while ((tmp = page_chain_next(page))) {
264		/* all but the last page will be fully used */
265		sg_set_page(&sg, page, PAGE_SIZE, 0);
266		crypto_hash_update(&desc, &sg, sg.length);
267		page = tmp;
268	}
269	/* and now the last, possibly only partially used page */
270	len = peer_req->i.size & (PAGE_SIZE - 1);
271	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
272	crypto_hash_update(&desc, &sg, sg.length);
273	crypto_hash_final(&desc, digest);
274}
275
276void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
277{
278	struct hash_desc desc;
279	struct scatterlist sg;
280	struct bio_vec *bvec;
281	int i;
282
283	desc.tfm = tfm;
284	desc.flags = 0;
285
286	sg_init_table(&sg, 1);
287	crypto_hash_init(&desc);
288
289	__bio_for_each_segment(bvec, bio, i, 0) {
290		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
291		crypto_hash_update(&desc, &sg, sg.length);
292	}
293	crypto_hash_final(&desc, digest);
294}
295
296/* MAYBE merge common code with w_e_end_ov_req */
297static int w_e_send_csum(struct drbd_work *w, int cancel)
298{
299	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
300	struct drbd_conf *mdev = w->mdev;
301	int digest_size;
302	void *digest;
303	int err = 0;
304
305	if (unlikely(cancel))
306		goto out;
307
308	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
309		goto out;
310
311	digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
312	digest = kmalloc(digest_size, GFP_NOIO);
313	if (digest) {
314		sector_t sector = peer_req->i.sector;
315		unsigned int size = peer_req->i.size;
316		drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
317		/* Free peer_req and pages before send.
318		 * In case we block on congestion, we could otherwise run into
319		 * some distributed deadlock, if the other side blocks on
320		 * congestion as well, because our receiver blocks in
321		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
322		drbd_free_peer_req(mdev, peer_req);
323		peer_req = NULL;
324		inc_rs_pending(mdev);
325		err = drbd_send_drequest_csum(mdev, sector, size,
326					      digest, digest_size,
327					      P_CSUM_RS_REQUEST);
328		kfree(digest);
329	} else {
330		dev_err(DEV, "kmalloc() of digest failed.\n");
331		err = -ENOMEM;
332	}
333
334out:
335	if (peer_req)
336		drbd_free_peer_req(mdev, peer_req);
337
338	if (unlikely(err))
339		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340	return err;
341}
342
343#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
344
345static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346{
347	struct drbd_peer_request *peer_req;
348
349	if (!get_ldev(mdev))
350		return -EIO;
351
352	if (drbd_rs_should_slow_down(mdev, sector))
353		goto defer;
354
355	/* GFP_TRY, because if there is no memory available right now, this may
356	 * be rescheduled for later. It is "only" background resync, after all. */
357	peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
358				       size, GFP_TRY);
359	if (!peer_req)
360		goto defer;
361
362	peer_req->w.cb = w_e_send_csum;
363	spin_lock_irq(&mdev->tconn->req_lock);
364	list_add(&peer_req->w.list, &mdev->read_ee);
365	spin_unlock_irq(&mdev->tconn->req_lock);
366
367	atomic_add(size >> 9, &mdev->rs_sect_ev);
368	if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
369		return 0;
370
371	/* If it failed because of ENOMEM, retry should help.  If it failed
372	 * because bio_add_page failed (probably broken lower level driver),
373	 * retry may or may not help.
374	 * If it does not, you may need to force disconnect. */
375	spin_lock_irq(&mdev->tconn->req_lock);
376	list_del(&peer_req->w.list);
377	spin_unlock_irq(&mdev->tconn->req_lock);
378
379	drbd_free_peer_req(mdev, peer_req);
380defer:
381	put_ldev(mdev);
382	return -EAGAIN;
383}
384
385int w_resync_timer(struct drbd_work *w, int cancel)
386{
387	struct drbd_conf *mdev = w->mdev;
388	switch (mdev->state.conn) {
389	case C_VERIFY_S:
390		w_make_ov_request(w, cancel);
391		break;
392	case C_SYNC_TARGET:
393		w_make_resync_request(w, cancel);
394		break;
395	}
396
397	return 0;
398}
399
400void resync_timer_fn(unsigned long data)
401{
402	struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404	if (list_empty(&mdev->resync_work.list))
405		drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
406}
407
408static void fifo_set(struct fifo_buffer *fb, int value)
409{
410	int i;
411
412	for (i = 0; i < fb->size; i++)
413		fb->values[i] = value;
414}
415
416static int fifo_push(struct fifo_buffer *fb, int value)
417{
418	int ov;
419
420	ov = fb->values[fb->head_index];
421	fb->values[fb->head_index++] = value;
422
423	if (fb->head_index >= fb->size)
424		fb->head_index = 0;
425
426	return ov;
427}
428
429static void fifo_add_val(struct fifo_buffer *fb, int value)
430{
431	int i;
432
433	for (i = 0; i < fb->size; i++)
434		fb->values[i] += value;
435}
436
437static int drbd_rs_controller(struct drbd_conf *mdev)
438{
439	struct disk_conf *dc;
440	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
441	unsigned int want;     /* The number of sectors we want in the proxy */
442	int req_sect; /* Number of sectors to request in this turn */
443	int correction; /* Number of sectors more we need in the proxy*/
444	int cps; /* correction per invocation of drbd_rs_controller() */
445	int steps; /* Number of time steps to plan ahead */
446	int curr_corr;
447	int max_sect;
448
449	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
450	mdev->rs_in_flight -= sect_in;
451
452	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
453	rcu_read_lock();
454	dc = rcu_dereference(mdev->ldev->disk_conf);
455
456	steps = mdev->rs_plan_s.size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
457
458	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
459		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
460	} else { /* normal path */
461		want = dc->c_fill_target ? dc->c_fill_target :
462			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
463	}
464
465	correction = want - mdev->rs_in_flight - mdev->rs_planed;
466
467	/* Plan ahead */
468	cps = correction / steps;
469	fifo_add_val(&mdev->rs_plan_s, cps);
470	mdev->rs_planed += cps * steps;
471
472	/* What we do in this step */
473	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
474	mdev->rs_planed -= curr_corr;
475
476	req_sect = sect_in + curr_corr;
477	if (req_sect < 0)
478		req_sect = 0;
479
480	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
481	if (req_sect > max_sect)
482		req_sect = max_sect;
483
484	/*
485	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
486		 sect_in, mdev->rs_in_flight, want, correction,
487		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
488	*/
489	rcu_read_unlock();
490	spin_unlock(&mdev->peer_seq_lock);
491
492	return req_sect;
493}
494
495static int drbd_rs_number_requests(struct drbd_conf *mdev)
496{
497	int number;
498	if (mdev->rs_plan_s.size) { /* rcu_dereference(mdev->ldev->disk_conf)->c_plan_ahead */
499		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
500		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
501	} else {
502		rcu_read_lock();
503		mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
504		rcu_read_unlock();
505		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
506	}
507
508	/* ignore the amount of pending requests, the resync controller should
509	 * throttle down to incoming reply rate soon enough anyways. */
510	return number;
511}
512
513int w_make_resync_request(struct drbd_work *w, int cancel)
514{
515	struct drbd_conf *mdev = w->mdev;
516	unsigned long bit;
517	sector_t sector;
518	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
519	int max_bio_size;
520	int number, rollback_i, size;
521	int align, queued, sndbuf;
522	int i = 0;
523
524	if (unlikely(cancel))
525		return 0;
526
527	if (mdev->rs_total == 0) {
528		/* empty resync? */
529		drbd_resync_finished(mdev);
530		return 0;
531	}
532
533	if (!get_ldev(mdev)) {
534		/* Since we only need to access mdev->rsync a
535		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
536		   to continue resync with a broken disk makes no sense at
537		   all */
538		dev_err(DEV, "Disk broke down during resync!\n");
539		return 0;
540	}
541
542	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
543	number = drbd_rs_number_requests(mdev);
544	if (number == 0)
545		goto requeue;
546
547	for (i = 0; i < number; i++) {
548		/* Stop generating RS requests, when half of the send buffer is filled */
549		mutex_lock(&mdev->tconn->data.mutex);
550		if (mdev->tconn->data.socket) {
551			queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
552			sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
553		} else {
554			queued = 1;
555			sndbuf = 0;
556		}
557		mutex_unlock(&mdev->tconn->data.mutex);
558		if (queued > sndbuf / 2)
559			goto requeue;
560
561next_sector:
562		size = BM_BLOCK_SIZE;
563		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
564
565		if (bit == DRBD_END_OF_BITMAP) {
566			mdev->bm_resync_fo = drbd_bm_bits(mdev);
567			put_ldev(mdev);
568			return 0;
569		}
570
571		sector = BM_BIT_TO_SECT(bit);
572
573		if (drbd_rs_should_slow_down(mdev, sector) ||
574		    drbd_try_rs_begin_io(mdev, sector)) {
575			mdev->bm_resync_fo = bit;
576			goto requeue;
577		}
578		mdev->bm_resync_fo = bit + 1;
579
580		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
581			drbd_rs_complete_io(mdev, sector);
582			goto next_sector;
583		}
584
585#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
586		/* try to find some adjacent bits.
587		 * we stop if we have already the maximum req size.
588		 *
589		 * Additionally always align bigger requests, in order to
590		 * be prepared for all stripe sizes of software RAIDs.
591		 */
592		align = 1;
593		rollback_i = i;
594		for (;;) {
595			if (size + BM_BLOCK_SIZE > max_bio_size)
596				break;
597
598			/* Be always aligned */
599			if (sector & ((1<<(align+3))-1))
600				break;
601
602			/* do not cross extent boundaries */
603			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
604				break;
605			/* now, is it actually dirty, after all?
606			 * caution, drbd_bm_test_bit is tri-state for some
607			 * obscure reason; ( b == 0 ) would get the out-of-band
608			 * only accidentally right because of the "oddly sized"
609			 * adjustment below */
610			if (drbd_bm_test_bit(mdev, bit+1) != 1)
611				break;
612			bit++;
613			size += BM_BLOCK_SIZE;
614			if ((BM_BLOCK_SIZE << align) <= size)
615				align++;
616			i++;
617		}
618		/* if we merged some,
619		 * reset the offset to start the next drbd_bm_find_next from */
620		if (size > BM_BLOCK_SIZE)
621			mdev->bm_resync_fo = bit + 1;
622#endif
623
624		/* adjust very last sectors, in case we are oddly sized */
625		if (sector + (size>>9) > capacity)
626			size = (capacity-sector)<<9;
627		if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
628			switch (read_for_csum(mdev, sector, size)) {
629			case -EIO: /* Disk failure */
630				put_ldev(mdev);
631				return -EIO;
632			case -EAGAIN: /* allocation failed, or ldev busy */
633				drbd_rs_complete_io(mdev, sector);
634				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
635				i = rollback_i;
636				goto requeue;
637			case 0:
638				/* everything ok */
639				break;
640			default:
641				BUG();
642			}
643		} else {
644			int err;
645
646			inc_rs_pending(mdev);
647			err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
648						 sector, size, ID_SYNCER);
649			if (err) {
650				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
651				dec_rs_pending(mdev);
652				put_ldev(mdev);
653				return err;
654			}
655		}
656	}
657
658	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
659		/* last syncer _request_ was sent,
660		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
661		 * next sync group will resume), as soon as we receive the last
662		 * resync data block, and the last bit is cleared.
663		 * until then resync "work" is "inactive" ...
664		 */
665		put_ldev(mdev);
666		return 0;
667	}
668
669 requeue:
670	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
671	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
672	put_ldev(mdev);
673	return 0;
674}
675
676static int w_make_ov_request(struct drbd_work *w, int cancel)
677{
678	struct drbd_conf *mdev = w->mdev;
679	int number, i, size;
680	sector_t sector;
681	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
682
683	if (unlikely(cancel))
684		return 1;
685
686	number = drbd_rs_number_requests(mdev);
687
688	sector = mdev->ov_position;
689	for (i = 0; i < number; i++) {
690		if (sector >= capacity) {
691			return 1;
692		}
693
694		size = BM_BLOCK_SIZE;
695
696		if (drbd_rs_should_slow_down(mdev, sector) ||
697		    drbd_try_rs_begin_io(mdev, sector)) {
698			mdev->ov_position = sector;
699			goto requeue;
700		}
701
702		if (sector + (size>>9) > capacity)
703			size = (capacity-sector)<<9;
704
705		inc_rs_pending(mdev);
706		if (drbd_send_ov_request(mdev, sector, size)) {
707			dec_rs_pending(mdev);
708			return 0;
709		}
710		sector += BM_SECT_PER_BIT;
711	}
712	mdev->ov_position = sector;
713
714 requeue:
715	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
716	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
717	return 1;
718}
719
720int w_ov_finished(struct drbd_work *w, int cancel)
721{
722	struct drbd_conf *mdev = w->mdev;
723	kfree(w);
724	ov_out_of_sync_print(mdev);
725	drbd_resync_finished(mdev);
726
727	return 0;
728}
729
730static int w_resync_finished(struct drbd_work *w, int cancel)
731{
732	struct drbd_conf *mdev = w->mdev;
733	kfree(w);
734
735	drbd_resync_finished(mdev);
736
737	return 0;
738}
739
740static void ping_peer(struct drbd_conf *mdev)
741{
742	struct drbd_tconn *tconn = mdev->tconn;
743
744	clear_bit(GOT_PING_ACK, &tconn->flags);
745	request_ping(tconn);
746	wait_event(tconn->ping_wait,
747		   test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
748}
749
750int drbd_resync_finished(struct drbd_conf *mdev)
751{
752	unsigned long db, dt, dbdt;
753	unsigned long n_oos;
754	union drbd_state os, ns;
755	struct drbd_work *w;
756	char *khelper_cmd = NULL;
757	int verify_done = 0;
758
759	/* Remove all elements from the resync LRU. Since future actions
760	 * might set bits in the (main) bitmap, then the entries in the
761	 * resync LRU would be wrong. */
762	if (drbd_rs_del_all(mdev)) {
763		/* In case this is not possible now, most probably because
764		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
765		 * queue (or even the read operations for those packets
766		 * is not finished by now).   Retry in 100ms. */
767
768		schedule_timeout_interruptible(HZ / 10);
769		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
770		if (w) {
771			w->cb = w_resync_finished;
772			drbd_queue_work(&mdev->tconn->data.work, w);
773			return 1;
774		}
775		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
776	}
777
778	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
779	if (dt <= 0)
780		dt = 1;
781	db = mdev->rs_total;
782	dbdt = Bit2KB(db/dt);
783	mdev->rs_paused /= HZ;
784
785	if (!get_ldev(mdev))
786		goto out;
787
788	ping_peer(mdev);
789
790	spin_lock_irq(&mdev->tconn->req_lock);
791	os = drbd_read_state(mdev);
792
793	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
794
795	/* This protects us against multiple calls (that can happen in the presence
796	   of application IO), and against connectivity loss just before we arrive here. */
797	if (os.conn <= C_CONNECTED)
798		goto out_unlock;
799
800	ns = os;
801	ns.conn = C_CONNECTED;
802
803	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
804	     verify_done ? "Online verify " : "Resync",
805	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
806
807	n_oos = drbd_bm_total_weight(mdev);
808
809	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
810		if (n_oos) {
811			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
812			      n_oos, Bit2KB(1));
813			khelper_cmd = "out-of-sync";
814		}
815	} else {
816		D_ASSERT((n_oos - mdev->rs_failed) == 0);
817
818		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
819			khelper_cmd = "after-resync-target";
820
821		if (mdev->tconn->csums_tfm && mdev->rs_total) {
822			const unsigned long s = mdev->rs_same_csum;
823			const unsigned long t = mdev->rs_total;
824			const int ratio =
825				(t == 0)     ? 0 :
826			(t < 100000) ? ((s*100)/t) : (s/(t/100));
827			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
828			     "transferred %luK total %luK\n",
829			     ratio,
830			     Bit2KB(mdev->rs_same_csum),
831			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
832			     Bit2KB(mdev->rs_total));
833		}
834	}
835
836	if (mdev->rs_failed) {
837		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
838
839		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
840			ns.disk = D_INCONSISTENT;
841			ns.pdsk = D_UP_TO_DATE;
842		} else {
843			ns.disk = D_UP_TO_DATE;
844			ns.pdsk = D_INCONSISTENT;
845		}
846	} else {
847		ns.disk = D_UP_TO_DATE;
848		ns.pdsk = D_UP_TO_DATE;
849
850		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
851			if (mdev->p_uuid) {
852				int i;
853				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
854					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
855				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
856				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
857			} else {
858				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
859			}
860		}
861
862		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
863			/* for verify runs, we don't update uuids here,
864			 * so there would be nothing to report. */
865			drbd_uuid_set_bm(mdev, 0UL);
866			drbd_print_uuids(mdev, "updated UUIDs");
867			if (mdev->p_uuid) {
868				/* Now the two UUID sets are equal, update what we
869				 * know of the peer. */
870				int i;
871				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
872					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
873			}
874		}
875	}
876
877	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
878out_unlock:
879	spin_unlock_irq(&mdev->tconn->req_lock);
880	put_ldev(mdev);
881out:
882	mdev->rs_total  = 0;
883	mdev->rs_failed = 0;
884	mdev->rs_paused = 0;
885	if (verify_done)
886		mdev->ov_start_sector = 0;
887
888	drbd_md_sync(mdev);
889
890	if (khelper_cmd)
891		drbd_khelper(mdev, khelper_cmd);
892
893	return 1;
894}
895
896/* helper */
897static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
898{
899	if (drbd_peer_req_has_active_page(peer_req)) {
900		/* This might happen if sendpage() has not finished */
901		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
902		atomic_add(i, &mdev->pp_in_use_by_net);
903		atomic_sub(i, &mdev->pp_in_use);
904		spin_lock_irq(&mdev->tconn->req_lock);
905		list_add_tail(&peer_req->w.list, &mdev->net_ee);
906		spin_unlock_irq(&mdev->tconn->req_lock);
907		wake_up(&drbd_pp_wait);
908	} else
909		drbd_free_peer_req(mdev, peer_req);
910}
911
912/**
913 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
914 * @mdev:	DRBD device.
915 * @w:		work object.
916 * @cancel:	The connection will be closed anyways
917 */
918int w_e_end_data_req(struct drbd_work *w, int cancel)
919{
920	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
921	struct drbd_conf *mdev = w->mdev;
922	int err;
923
924	if (unlikely(cancel)) {
925		drbd_free_peer_req(mdev, peer_req);
926		dec_unacked(mdev);
927		return 0;
928	}
929
930	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
931		err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
932	} else {
933		if (__ratelimit(&drbd_ratelimit_state))
934			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
935			    (unsigned long long)peer_req->i.sector);
936
937		err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
938	}
939
940	dec_unacked(mdev);
941
942	move_to_net_ee_or_free(mdev, peer_req);
943
944	if (unlikely(err))
945		dev_err(DEV, "drbd_send_block() failed\n");
946	return err;
947}
948
949/**
950 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
951 * @mdev:	DRBD device.
952 * @w:		work object.
953 * @cancel:	The connection will be closed anyways
954 */
955int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
956{
957	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
958	struct drbd_conf *mdev = w->mdev;
959	int err;
960
961	if (unlikely(cancel)) {
962		drbd_free_peer_req(mdev, peer_req);
963		dec_unacked(mdev);
964		return 0;
965	}
966
967	if (get_ldev_if_state(mdev, D_FAILED)) {
968		drbd_rs_complete_io(mdev, peer_req->i.sector);
969		put_ldev(mdev);
970	}
971
972	if (mdev->state.conn == C_AHEAD) {
973		err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
974	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
975		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
976			inc_rs_pending(mdev);
977			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
978		} else {
979			if (__ratelimit(&drbd_ratelimit_state))
980				dev_err(DEV, "Not sending RSDataReply, "
981				    "partner DISKLESS!\n");
982			err = 0;
983		}
984	} else {
985		if (__ratelimit(&drbd_ratelimit_state))
986			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
987			    (unsigned long long)peer_req->i.sector);
988
989		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
990
991		/* update resync data with failure */
992		drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
993	}
994
995	dec_unacked(mdev);
996
997	move_to_net_ee_or_free(mdev, peer_req);
998
999	if (unlikely(err))
1000		dev_err(DEV, "drbd_send_block() failed\n");
1001	return err;
1002}
1003
1004int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1005{
1006	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1007	struct drbd_conf *mdev = w->mdev;
1008	struct digest_info *di;
1009	int digest_size;
1010	void *digest = NULL;
1011	int err, eq = 0;
1012
1013	if (unlikely(cancel)) {
1014		drbd_free_peer_req(mdev, peer_req);
1015		dec_unacked(mdev);
1016		return 0;
1017	}
1018
1019	if (get_ldev(mdev)) {
1020		drbd_rs_complete_io(mdev, peer_req->i.sector);
1021		put_ldev(mdev);
1022	}
1023
1024	di = peer_req->digest;
1025
1026	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1027		/* quick hack to try to avoid a race against reconfiguration.
1028		 * a real fix would be much more involved,
1029		 * introducing more locking mechanisms */
1030		if (mdev->tconn->csums_tfm) {
1031			digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1032			D_ASSERT(digest_size == di->digest_size);
1033			digest = kmalloc(digest_size, GFP_NOIO);
1034		}
1035		if (digest) {
1036			drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1037			eq = !memcmp(digest, di->digest, digest_size);
1038			kfree(digest);
1039		}
1040
1041		if (eq) {
1042			drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1043			/* rs_same_csums unit is BM_BLOCK_SIZE */
1044			mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1045			err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1046		} else {
1047			inc_rs_pending(mdev);
1048			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1049			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1050			kfree(di);
1051			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1052		}
1053	} else {
1054		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1055		if (__ratelimit(&drbd_ratelimit_state))
1056			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1057	}
1058
1059	dec_unacked(mdev);
1060	move_to_net_ee_or_free(mdev, peer_req);
1061
1062	if (unlikely(err))
1063		dev_err(DEV, "drbd_send_block/ack() failed\n");
1064	return err;
1065}
1066
1067int w_e_end_ov_req(struct drbd_work *w, int cancel)
1068{
1069	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1070	struct drbd_conf *mdev = w->mdev;
1071	sector_t sector = peer_req->i.sector;
1072	unsigned int size = peer_req->i.size;
1073	int digest_size;
1074	void *digest;
1075	int err = 0;
1076
1077	if (unlikely(cancel))
1078		goto out;
1079
1080	digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1081	digest = kmalloc(digest_size, GFP_NOIO);
1082	if (!digest) {
1083		err = 1;	/* terminate the connection in case the allocation failed */
1084		goto out;
1085	}
1086
1087	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1088		drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1089	else
1090		memset(digest, 0, digest_size);
1091
1092	/* Free e and pages before send.
1093	 * In case we block on congestion, we could otherwise run into
1094	 * some distributed deadlock, if the other side blocks on
1095	 * congestion as well, because our receiver blocks in
1096	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1097	drbd_free_peer_req(mdev, peer_req);
1098	peer_req = NULL;
1099	inc_rs_pending(mdev);
1100	err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1101	if (err)
1102		dec_rs_pending(mdev);
1103	kfree(digest);
1104
1105out:
1106	if (peer_req)
1107		drbd_free_peer_req(mdev, peer_req);
1108	dec_unacked(mdev);
1109	return err;
1110}
1111
1112void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1113{
1114	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1115		mdev->ov_last_oos_size += size>>9;
1116	} else {
1117		mdev->ov_last_oos_start = sector;
1118		mdev->ov_last_oos_size = size>>9;
1119	}
1120	drbd_set_out_of_sync(mdev, sector, size);
1121}
1122
1123int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1124{
1125	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1126	struct drbd_conf *mdev = w->mdev;
1127	struct digest_info *di;
1128	void *digest;
1129	sector_t sector = peer_req->i.sector;
1130	unsigned int size = peer_req->i.size;
1131	int digest_size;
1132	int err, eq = 0;
1133
1134	if (unlikely(cancel)) {
1135		drbd_free_peer_req(mdev, peer_req);
1136		dec_unacked(mdev);
1137		return 0;
1138	}
1139
1140	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1141	 * the resync lru has been cleaned up already */
1142	if (get_ldev(mdev)) {
1143		drbd_rs_complete_io(mdev, peer_req->i.sector);
1144		put_ldev(mdev);
1145	}
1146
1147	di = peer_req->digest;
1148
1149	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1150		digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1151		digest = kmalloc(digest_size, GFP_NOIO);
1152		if (digest) {
1153			drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1154
1155			D_ASSERT(digest_size == di->digest_size);
1156			eq = !memcmp(digest, di->digest, digest_size);
1157			kfree(digest);
1158		}
1159	}
1160
1161	/* Free peer_req and pages before send.
1162	 * In case we block on congestion, we could otherwise run into
1163	 * some distributed deadlock, if the other side blocks on
1164	 * congestion as well, because our receiver blocks in
1165	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1166	drbd_free_peer_req(mdev, peer_req);
1167	if (!eq)
1168		drbd_ov_out_of_sync_found(mdev, sector, size);
1169	else
1170		ov_out_of_sync_print(mdev);
1171
1172	err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1173			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1174
1175	dec_unacked(mdev);
1176
1177	--mdev->ov_left;
1178
1179	/* let's advance progress step marks only for every other megabyte */
1180	if ((mdev->ov_left & 0x200) == 0x200)
1181		drbd_advance_rs_marks(mdev, mdev->ov_left);
1182
1183	if (mdev->ov_left == 0) {
1184		ov_out_of_sync_print(mdev);
1185		drbd_resync_finished(mdev);
1186	}
1187
1188	return err;
1189}
1190
1191int w_prev_work_done(struct drbd_work *w, int cancel)
1192{
1193	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1194
1195	complete(&b->done);
1196	return 0;
1197}
1198
1199int w_send_barrier(struct drbd_work *w, int cancel)
1200{
1201	struct drbd_socket *sock;
1202	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1203	struct drbd_conf *mdev = w->mdev;
1204	struct p_barrier *p;
1205
1206	/* really avoid racing with tl_clear.  w.cb may have been referenced
1207	 * just before it was reassigned and re-queued, so double check that.
1208	 * actually, this race was harmless, since we only try to send the
1209	 * barrier packet here, and otherwise do nothing with the object.
1210	 * but compare with the head of w_clear_epoch */
1211	spin_lock_irq(&mdev->tconn->req_lock);
1212	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1213		cancel = 1;
1214	spin_unlock_irq(&mdev->tconn->req_lock);
1215	if (cancel)
1216		return 0;
1217
1218	sock = &mdev->tconn->data;
1219	p = drbd_prepare_command(mdev, sock);
1220	if (!p)
1221		return -EIO;
1222	p->barrier = b->br_number;
1223	/* inc_ap_pending was done where this was queued.
1224	 * dec_ap_pending will be done in got_BarrierAck
1225	 * or (on connection loss) in w_clear_epoch.  */
1226	return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
1227}
1228
1229int w_send_write_hint(struct drbd_work *w, int cancel)
1230{
1231	struct drbd_conf *mdev = w->mdev;
1232	struct drbd_socket *sock;
1233
1234	if (cancel)
1235		return 0;
1236	sock = &mdev->tconn->data;
1237	if (!drbd_prepare_command(mdev, sock))
1238		return -EIO;
1239	return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1240}
1241
1242int w_send_out_of_sync(struct drbd_work *w, int cancel)
1243{
1244	struct drbd_request *req = container_of(w, struct drbd_request, w);
1245	struct drbd_conf *mdev = w->mdev;
1246	int err;
1247
1248	if (unlikely(cancel)) {
1249		req_mod(req, SEND_CANCELED);
1250		return 0;
1251	}
1252
1253	err = drbd_send_out_of_sync(mdev, req);
1254	req_mod(req, OOS_HANDED_TO_NETWORK);
1255
1256	return err;
1257}
1258
1259/**
1260 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1261 * @mdev:	DRBD device.
1262 * @w:		work object.
1263 * @cancel:	The connection will be closed anyways
1264 */
1265int w_send_dblock(struct drbd_work *w, int cancel)
1266{
1267	struct drbd_request *req = container_of(w, struct drbd_request, w);
1268	struct drbd_conf *mdev = w->mdev;
1269	int err;
1270
1271	if (unlikely(cancel)) {
1272		req_mod(req, SEND_CANCELED);
1273		return 0;
1274	}
1275
1276	err = drbd_send_dblock(mdev, req);
1277	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1278
1279	return err;
1280}
1281
1282/**
1283 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1284 * @mdev:	DRBD device.
1285 * @w:		work object.
1286 * @cancel:	The connection will be closed anyways
1287 */
1288int w_send_read_req(struct drbd_work *w, int cancel)
1289{
1290	struct drbd_request *req = container_of(w, struct drbd_request, w);
1291	struct drbd_conf *mdev = w->mdev;
1292	int err;
1293
1294	if (unlikely(cancel)) {
1295		req_mod(req, SEND_CANCELED);
1296		return 0;
1297	}
1298
1299	err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1300				 (unsigned long)req);
1301
1302	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1303
1304	return err;
1305}
1306
1307int w_restart_disk_io(struct drbd_work *w, int cancel)
1308{
1309	struct drbd_request *req = container_of(w, struct drbd_request, w);
1310	struct drbd_conf *mdev = w->mdev;
1311
1312	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1313		drbd_al_begin_io(mdev, &req->i);
1314	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1315	   theoretically. Practically it can not deadlock, since this is
1316	   only used when unfreezing IOs. All the extents of the requests
1317	   that made it into the TL are already active */
1318
1319	drbd_req_make_private_bio(req, req->master_bio);
1320	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1321	generic_make_request(req->private_bio);
1322
1323	return 0;
1324}
1325
1326static int _drbd_may_sync_now(struct drbd_conf *mdev)
1327{
1328	struct drbd_conf *odev = mdev;
1329	int ra;
1330
1331	while (1) {
1332		if (!odev->ldev)
1333			return 1;
1334		rcu_read_lock();
1335		ra = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1336		rcu_read_unlock();
1337		if (ra == -1)
1338			return 1;
1339		odev = minor_to_mdev(ra);
1340		if (!expect(odev))
1341			return 1;
1342		if ((odev->state.conn >= C_SYNC_SOURCE &&
1343		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1344		    odev->state.aftr_isp || odev->state.peer_isp ||
1345		    odev->state.user_isp)
1346			return 0;
1347	}
1348}
1349
1350/**
1351 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1352 * @mdev:	DRBD device.
1353 *
1354 * Called from process context only (admin command and after_state_ch).
1355 */
1356static int _drbd_pause_after(struct drbd_conf *mdev)
1357{
1358	struct drbd_conf *odev;
1359	int i, rv = 0;
1360
1361	rcu_read_lock();
1362	idr_for_each_entry(&minors, odev, i) {
1363		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1364			continue;
1365		if (!_drbd_may_sync_now(odev))
1366			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1367			       != SS_NOTHING_TO_DO);
1368	}
1369	rcu_read_unlock();
1370
1371	return rv;
1372}
1373
1374/**
1375 * _drbd_resume_next() - Resume resync on all devices that may resync now
1376 * @mdev:	DRBD device.
1377 *
1378 * Called from process context only (admin command and worker).
1379 */
1380static int _drbd_resume_next(struct drbd_conf *mdev)
1381{
1382	struct drbd_conf *odev;
1383	int i, rv = 0;
1384
1385	rcu_read_lock();
1386	idr_for_each_entry(&minors, odev, i) {
1387		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1388			continue;
1389		if (odev->state.aftr_isp) {
1390			if (_drbd_may_sync_now(odev))
1391				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1392							CS_HARD, NULL)
1393				       != SS_NOTHING_TO_DO) ;
1394		}
1395	}
1396	rcu_read_unlock();
1397	return rv;
1398}
1399
1400void resume_next_sg(struct drbd_conf *mdev)
1401{
1402	write_lock_irq(&global_state_lock);
1403	_drbd_resume_next(mdev);
1404	write_unlock_irq(&global_state_lock);
1405}
1406
1407void suspend_other_sg(struct drbd_conf *mdev)
1408{
1409	write_lock_irq(&global_state_lock);
1410	_drbd_pause_after(mdev);
1411	write_unlock_irq(&global_state_lock);
1412}
1413
1414/* caller must hold global_state_lock */
1415enum drbd_ret_code drbd_sync_after_valid(struct drbd_conf *mdev, int o_minor)
1416{
1417	struct drbd_conf *odev;
1418	int ra;
1419
1420	if (o_minor == -1)
1421		return NO_ERROR;
1422	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1423		return ERR_SYNC_AFTER;
1424
1425	/* check for loops */
1426	odev = minor_to_mdev(o_minor);
1427	while (1) {
1428		if (odev == mdev)
1429			return ERR_SYNC_AFTER_CYCLE;
1430
1431		rcu_read_lock();
1432		ra = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1433		rcu_read_unlock();
1434		/* dependency chain ends here, no cycles. */
1435		if (ra == -1)
1436			return NO_ERROR;
1437
1438		/* follow the dependency chain */
1439		odev = minor_to_mdev(ra);
1440	}
1441}
1442
1443/* caller must hold global_state_lock */
1444void drbd_sync_after_changed(struct drbd_conf *mdev)
1445{
1446	int changes;
1447
1448	do {
1449		changes  = _drbd_pause_after(mdev);
1450		changes |= _drbd_resume_next(mdev);
1451	} while (changes);
1452}
1453
1454void drbd_rs_controller_reset(struct drbd_conf *mdev)
1455{
1456	atomic_set(&mdev->rs_sect_in, 0);
1457	atomic_set(&mdev->rs_sect_ev, 0);
1458	mdev->rs_in_flight = 0;
1459	mdev->rs_planed = 0;
1460	spin_lock(&mdev->peer_seq_lock);
1461	fifo_set(&mdev->rs_plan_s, 0);
1462	spin_unlock(&mdev->peer_seq_lock);
1463}
1464
1465void start_resync_timer_fn(unsigned long data)
1466{
1467	struct drbd_conf *mdev = (struct drbd_conf *) data;
1468
1469	drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1470}
1471
1472int w_start_resync(struct drbd_work *w, int cancel)
1473{
1474	struct drbd_conf *mdev = w->mdev;
1475
1476	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1477		dev_warn(DEV, "w_start_resync later...\n");
1478		mdev->start_resync_timer.expires = jiffies + HZ/10;
1479		add_timer(&mdev->start_resync_timer);
1480		return 0;
1481	}
1482
1483	drbd_start_resync(mdev, C_SYNC_SOURCE);
1484	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1485	return 0;
1486}
1487
1488/**
1489 * drbd_start_resync() - Start the resync process
1490 * @mdev:	DRBD device.
1491 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1492 *
1493 * This function might bring you directly into one of the
1494 * C_PAUSED_SYNC_* states.
1495 */
1496void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1497{
1498	union drbd_state ns;
1499	int r;
1500
1501	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1502		dev_err(DEV, "Resync already running!\n");
1503		return;
1504	}
1505
1506	if (mdev->state.conn < C_AHEAD) {
1507		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1508		drbd_rs_cancel_all(mdev);
1509		/* This should be done when we abort the resync. We definitely do not
1510		   want to have this for connections going back and forth between
1511		   Ahead/Behind and SyncSource/SyncTarget */
1512	}
1513
1514	if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1515		if (side == C_SYNC_TARGET) {
1516			/* Since application IO was locked out during C_WF_BITMAP_T and
1517			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1518			   we check that we might make the data inconsistent. */
1519			r = drbd_khelper(mdev, "before-resync-target");
1520			r = (r >> 8) & 0xff;
1521			if (r > 0) {
1522				dev_info(DEV, "before-resync-target handler returned %d, "
1523					 "dropping connection.\n", r);
1524				conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1525				return;
1526			}
1527		} else /* C_SYNC_SOURCE */ {
1528			r = drbd_khelper(mdev, "before-resync-source");
1529			r = (r >> 8) & 0xff;
1530			if (r > 0) {
1531				if (r == 3) {
1532					dev_info(DEV, "before-resync-source handler returned %d, "
1533						 "ignoring. Old userland tools?", r);
1534				} else {
1535					dev_info(DEV, "before-resync-source handler returned %d, "
1536						 "dropping connection.\n", r);
1537					conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1538					return;
1539				}
1540			}
1541		}
1542	}
1543
1544	if (current == mdev->tconn->worker.task) {
1545		/* The worker should not sleep waiting for state_mutex,
1546		   that can take long */
1547		if (!mutex_trylock(mdev->state_mutex)) {
1548			set_bit(B_RS_H_DONE, &mdev->flags);
1549			mdev->start_resync_timer.expires = jiffies + HZ/5;
1550			add_timer(&mdev->start_resync_timer);
1551			return;
1552		}
1553	} else {
1554		mutex_lock(mdev->state_mutex);
1555	}
1556	clear_bit(B_RS_H_DONE, &mdev->flags);
1557
1558	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1559		mutex_unlock(mdev->state_mutex);
1560		return;
1561	}
1562
1563	write_lock_irq(&global_state_lock);
1564	ns = drbd_read_state(mdev);
1565
1566	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1567
1568	ns.conn = side;
1569
1570	if (side == C_SYNC_TARGET)
1571		ns.disk = D_INCONSISTENT;
1572	else /* side == C_SYNC_SOURCE */
1573		ns.pdsk = D_INCONSISTENT;
1574
1575	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1576	ns = drbd_read_state(mdev);
1577
1578	if (ns.conn < C_CONNECTED)
1579		r = SS_UNKNOWN_ERROR;
1580
1581	if (r == SS_SUCCESS) {
1582		unsigned long tw = drbd_bm_total_weight(mdev);
1583		unsigned long now = jiffies;
1584		int i;
1585
1586		mdev->rs_failed    = 0;
1587		mdev->rs_paused    = 0;
1588		mdev->rs_same_csum = 0;
1589		mdev->rs_last_events = 0;
1590		mdev->rs_last_sect_ev = 0;
1591		mdev->rs_total     = tw;
1592		mdev->rs_start     = now;
1593		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1594			mdev->rs_mark_left[i] = tw;
1595			mdev->rs_mark_time[i] = now;
1596		}
1597		_drbd_pause_after(mdev);
1598	}
1599	write_unlock_irq(&global_state_lock);
1600
1601	if (r == SS_SUCCESS) {
1602		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1603		     drbd_conn_str(ns.conn),
1604		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1605		     (unsigned long) mdev->rs_total);
1606		if (side == C_SYNC_TARGET)
1607			mdev->bm_resync_fo = 0;
1608
1609		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1610		 * with w_send_oos, or the sync target will get confused as to
1611		 * how much bits to resync.  We cannot do that always, because for an
1612		 * empty resync and protocol < 95, we need to do it here, as we call
1613		 * drbd_resync_finished from here in that case.
1614		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1615		 * and from after_state_ch otherwise. */
1616		if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1617			drbd_gen_and_send_sync_uuid(mdev);
1618
1619		if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1620			/* This still has a race (about when exactly the peers
1621			 * detect connection loss) that can lead to a full sync
1622			 * on next handshake. In 8.3.9 we fixed this with explicit
1623			 * resync-finished notifications, but the fix
1624			 * introduces a protocol change.  Sleeping for some
1625			 * time longer than the ping interval + timeout on the
1626			 * SyncSource, to give the SyncTarget the chance to
1627			 * detect connection loss, then waiting for a ping
1628			 * response (implicit in drbd_resync_finished) reduces
1629			 * the race considerably, but does not solve it. */
1630			if (side == C_SYNC_SOURCE) {
1631				struct net_conf *nc;
1632				int timeo;
1633
1634				rcu_read_lock();
1635				nc = rcu_dereference(mdev->tconn->net_conf);
1636				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1637				rcu_read_unlock();
1638				schedule_timeout_interruptible(timeo);
1639			}
1640			drbd_resync_finished(mdev);
1641		}
1642
1643		drbd_rs_controller_reset(mdev);
1644		/* ns.conn may already be != mdev->state.conn,
1645		 * we may have been paused in between, or become paused until
1646		 * the timer triggers.
1647		 * No matter, that is handled in resync_timer_fn() */
1648		if (ns.conn == C_SYNC_TARGET)
1649			mod_timer(&mdev->resync_timer, jiffies);
1650
1651		drbd_md_sync(mdev);
1652	}
1653	put_ldev(mdev);
1654	mutex_unlock(mdev->state_mutex);
1655}
1656
1657int drbd_worker(struct drbd_thread *thi)
1658{
1659	struct drbd_tconn *tconn = thi->tconn;
1660	struct drbd_work *w = NULL;
1661	struct drbd_conf *mdev;
1662	struct net_conf *nc;
1663	LIST_HEAD(work_list);
1664	int vnr, intr = 0;
1665	int cork;
1666
1667	while (get_t_state(thi) == RUNNING) {
1668		drbd_thread_current_set_cpu(thi);
1669
1670		if (down_trylock(&tconn->data.work.s)) {
1671			mutex_lock(&tconn->data.mutex);
1672
1673			rcu_read_lock();
1674			nc = rcu_dereference(tconn->net_conf);
1675			cork = nc ? !nc->no_cork : 0;
1676			rcu_read_unlock();
1677
1678			if (tconn->data.socket && cork)
1679				drbd_tcp_uncork(tconn->data.socket);
1680			mutex_unlock(&tconn->data.mutex);
1681
1682			intr = down_interruptible(&tconn->data.work.s);
1683
1684			mutex_lock(&tconn->data.mutex);
1685			if (tconn->data.socket  && cork)
1686				drbd_tcp_cork(tconn->data.socket);
1687			mutex_unlock(&tconn->data.mutex);
1688		}
1689
1690		if (intr) {
1691			flush_signals(current);
1692			if (get_t_state(thi) == RUNNING) {
1693				conn_warn(tconn, "Worker got an unexpected signal\n");
1694				continue;
1695			}
1696			break;
1697		}
1698
1699		if (get_t_state(thi) != RUNNING)
1700			break;
1701		/* With this break, we have done a down() but not consumed
1702		   the entry from the list. The cleanup code takes care of
1703		   this...   */
1704
1705		w = NULL;
1706		spin_lock_irq(&tconn->data.work.q_lock);
1707		if (list_empty(&tconn->data.work.q)) {
1708			/* something terribly wrong in our logic.
1709			 * we were able to down() the semaphore,
1710			 * but the list is empty... doh.
1711			 *
1712			 * what is the best thing to do now?
1713			 * try again from scratch, restarting the receiver,
1714			 * asender, whatnot? could break even more ugly,
1715			 * e.g. when we are primary, but no good local data.
1716			 *
1717			 * I'll try to get away just starting over this loop.
1718			 */
1719			conn_warn(tconn, "Work list unexpectedly empty\n");
1720			spin_unlock_irq(&tconn->data.work.q_lock);
1721			continue;
1722		}
1723		w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1724		list_del_init(&w->list);
1725		spin_unlock_irq(&tconn->data.work.q_lock);
1726
1727		if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1728			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1729			if (tconn->cstate >= C_WF_REPORT_PARAMS)
1730				conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1731		}
1732	}
1733
1734	spin_lock_irq(&tconn->data.work.q_lock);
1735	while (!list_empty(&tconn->data.work.q)) {
1736		list_splice_init(&tconn->data.work.q, &work_list);
1737		spin_unlock_irq(&tconn->data.work.q_lock);
1738
1739		while (!list_empty(&work_list)) {
1740			w = list_entry(work_list.next, struct drbd_work, list);
1741			list_del_init(&w->list);
1742			w->cb(w, 1);
1743		}
1744
1745		spin_lock_irq(&tconn->data.work.q_lock);
1746	}
1747	sema_init(&tconn->data.work.s, 0);
1748	/* DANGEROUS race: if someone did queue his work within the spinlock,
1749	 * but up() ed outside the spinlock, we could get an up() on the
1750	 * semaphore without corresponding list entry.
1751	 * So don't do that.
1752	 */
1753	spin_unlock_irq(&tconn->data.work.q_lock);
1754
1755	down_read(&drbd_cfg_rwsem);
1756	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1757		D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1758		drbd_mdev_cleanup(mdev);
1759	}
1760	up_read(&drbd_cfg_rwsem);
1761
1762	return 0;
1763}
1764