drbd_worker.c revision c37c8ecfee685fa42de8fd418ad8ca1e66408bd8
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
40
41static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44/* endio handlers:
45 *   drbd_md_io_complete (defined here)
46 *   drbd_request_endio (defined here)
47 *   drbd_peer_request_endio (defined here)
48 *   bm_async_io_complete (defined in drbd_bitmap.c)
49 *
50 * For all these callbacks, note the following:
51 * The callbacks will be called in irq context by the IDE drivers,
52 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53 * Try to get the locking right :)
54 *
55 */
56
57
58/* About the global_state_lock
59   Each state transition on an device holds a read lock. In case we have
60   to evaluate the sync after dependencies, we grab a write lock, because
61   we need stable states on all devices for that.  */
62rwlock_t global_state_lock;
63
64/* used for synchronous meta data and bitmap IO
65 * submitted by drbd_md_sync_page_io()
66 */
67void drbd_md_io_complete(struct bio *bio, int error)
68{
69	struct drbd_md_io *md_io;
70
71	md_io = (struct drbd_md_io *)bio->bi_private;
72	md_io->error = error;
73
74	complete(&md_io->event);
75}
76
77/* reads on behalf of the partner,
78 * "submitted" by the receiver
79 */
80void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
81{
82	unsigned long flags = 0;
83	struct drbd_conf *mdev = peer_req->w.mdev;
84
85	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
86	mdev->read_cnt += peer_req->i.size >> 9;
87	list_del(&peer_req->w.list);
88	if (list_empty(&mdev->read_ee))
89		wake_up(&mdev->ee_wait);
90	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
91		__drbd_chk_io_error(mdev, false);
92	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
93
94	drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
95	put_ldev(mdev);
96}
97
98/* writes on behalf of the partner, or resync writes,
99 * "submitted" by the receiver, final stage.  */
100static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101{
102	unsigned long flags = 0;
103	struct drbd_conf *mdev = peer_req->w.mdev;
104	struct drbd_interval i;
105	int do_wake;
106	u64 block_id;
107	int do_al_complete_io;
108
109	/* after we moved peer_req to done_ee,
110	 * we may no longer access it,
111	 * it may be freed/reused already!
112	 * (as soon as we release the req_lock) */
113	i = peer_req->i;
114	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
115	block_id = peer_req->block_id;
116
117	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
118	mdev->writ_cnt += peer_req->i.size >> 9;
119	list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
120	list_add_tail(&peer_req->w.list, &mdev->done_ee);
121
122	/*
123	 * Do not remove from the write_requests tree here: we did not send the
124	 * Ack yet and did not wake possibly waiting conflicting requests.
125	 * Removed from the tree from "drbd_process_done_ee" within the
126	 * appropriate w.cb (e_end_block/e_end_resync_block) or from
127	 * _drbd_clear_done_ee.
128	 */
129
130	do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
131
132	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
133		__drbd_chk_io_error(mdev, false);
134	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
135
136	if (block_id == ID_SYNCER)
137		drbd_rs_complete_io(mdev, i.sector);
138
139	if (do_wake)
140		wake_up(&mdev->ee_wait);
141
142	if (do_al_complete_io)
143		drbd_al_complete_io(mdev, &i);
144
145	wake_asender(mdev->tconn);
146	put_ldev(mdev);
147}
148
149/* writes on behalf of the partner, or resync writes,
150 * "submitted" by the receiver.
151 */
152void drbd_peer_request_endio(struct bio *bio, int error)
153{
154	struct drbd_peer_request *peer_req = bio->bi_private;
155	struct drbd_conf *mdev = peer_req->w.mdev;
156	int uptodate = bio_flagged(bio, BIO_UPTODATE);
157	int is_write = bio_data_dir(bio) == WRITE;
158
159	if (error && __ratelimit(&drbd_ratelimit_state))
160		dev_warn(DEV, "%s: error=%d s=%llus\n",
161				is_write ? "write" : "read", error,
162				(unsigned long long)peer_req->i.sector);
163	if (!error && !uptodate) {
164		if (__ratelimit(&drbd_ratelimit_state))
165			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
166					is_write ? "write" : "read",
167					(unsigned long long)peer_req->i.sector);
168		/* strange behavior of some lower level drivers...
169		 * fail the request by clearing the uptodate flag,
170		 * but do not return any error?! */
171		error = -EIO;
172	}
173
174	if (error)
175		set_bit(__EE_WAS_ERROR, &peer_req->flags);
176
177	bio_put(bio); /* no need for the bio anymore */
178	if (atomic_dec_and_test(&peer_req->pending_bios)) {
179		if (is_write)
180			drbd_endio_write_sec_final(peer_req);
181		else
182			drbd_endio_read_sec_final(peer_req);
183	}
184}
185
186/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
187 */
188void drbd_request_endio(struct bio *bio, int error)
189{
190	unsigned long flags;
191	struct drbd_request *req = bio->bi_private;
192	struct drbd_conf *mdev = req->w.mdev;
193	struct bio_and_error m;
194	enum drbd_req_event what;
195	int uptodate = bio_flagged(bio, BIO_UPTODATE);
196
197	if (!error && !uptodate) {
198		dev_warn(DEV, "p %s: setting error to -EIO\n",
199			 bio_data_dir(bio) == WRITE ? "write" : "read");
200		/* strange behavior of some lower level drivers...
201		 * fail the request by clearing the uptodate flag,
202		 * but do not return any error?! */
203		error = -EIO;
204	}
205
206	/* to avoid recursion in __req_mod */
207	if (unlikely(error)) {
208		what = (bio_data_dir(bio) == WRITE)
209			? WRITE_COMPLETED_WITH_ERROR
210			: (bio_rw(bio) == READ)
211			  ? READ_COMPLETED_WITH_ERROR
212			  : READ_AHEAD_COMPLETED_WITH_ERROR;
213	} else
214		what = COMPLETED_OK;
215
216	bio_put(req->private_bio);
217	req->private_bio = ERR_PTR(error);
218
219	/* not req_mod(), we need irqsave here! */
220	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
221	__req_mod(req, what, &m);
222	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
223
224	if (m.bio)
225		complete_master_bio(mdev, &m);
226}
227
228int w_read_retry_remote(struct drbd_work *w, int cancel)
229{
230	struct drbd_request *req = container_of(w, struct drbd_request, w);
231	struct drbd_conf *mdev = w->mdev;
232
233	/* We should not detach for read io-error,
234	 * but try to WRITE the P_DATA_REPLY to the failed location,
235	 * to give the disk the chance to relocate that block */
236
237	spin_lock_irq(&mdev->tconn->req_lock);
238	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
239		_req_mod(req, READ_RETRY_REMOTE_CANCELED);
240		spin_unlock_irq(&mdev->tconn->req_lock);
241		return 0;
242	}
243	spin_unlock_irq(&mdev->tconn->req_lock);
244
245	return w_send_read_req(w, 0);
246}
247
248void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
249		  struct drbd_peer_request *peer_req, void *digest)
250{
251	struct hash_desc desc;
252	struct scatterlist sg;
253	struct page *page = peer_req->pages;
254	struct page *tmp;
255	unsigned len;
256
257	desc.tfm = tfm;
258	desc.flags = 0;
259
260	sg_init_table(&sg, 1);
261	crypto_hash_init(&desc);
262
263	while ((tmp = page_chain_next(page))) {
264		/* all but the last page will be fully used */
265		sg_set_page(&sg, page, PAGE_SIZE, 0);
266		crypto_hash_update(&desc, &sg, sg.length);
267		page = tmp;
268	}
269	/* and now the last, possibly only partially used page */
270	len = peer_req->i.size & (PAGE_SIZE - 1);
271	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
272	crypto_hash_update(&desc, &sg, sg.length);
273	crypto_hash_final(&desc, digest);
274}
275
276void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
277{
278	struct hash_desc desc;
279	struct scatterlist sg;
280	struct bio_vec *bvec;
281	int i;
282
283	desc.tfm = tfm;
284	desc.flags = 0;
285
286	sg_init_table(&sg, 1);
287	crypto_hash_init(&desc);
288
289	__bio_for_each_segment(bvec, bio, i, 0) {
290		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
291		crypto_hash_update(&desc, &sg, sg.length);
292	}
293	crypto_hash_final(&desc, digest);
294}
295
296/* MAYBE merge common code with w_e_end_ov_req */
297static int w_e_send_csum(struct drbd_work *w, int cancel)
298{
299	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
300	struct drbd_conf *mdev = w->mdev;
301	int digest_size;
302	void *digest;
303	int err = 0;
304
305	if (unlikely(cancel))
306		goto out;
307
308	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
309		goto out;
310
311	digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
312	digest = kmalloc(digest_size, GFP_NOIO);
313	if (digest) {
314		sector_t sector = peer_req->i.sector;
315		unsigned int size = peer_req->i.size;
316		drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
317		/* Free peer_req and pages before send.
318		 * In case we block on congestion, we could otherwise run into
319		 * some distributed deadlock, if the other side blocks on
320		 * congestion as well, because our receiver blocks in
321		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
322		drbd_free_peer_req(mdev, peer_req);
323		peer_req = NULL;
324		inc_rs_pending(mdev);
325		err = drbd_send_drequest_csum(mdev, sector, size,
326					      digest, digest_size,
327					      P_CSUM_RS_REQUEST);
328		kfree(digest);
329	} else {
330		dev_err(DEV, "kmalloc() of digest failed.\n");
331		err = -ENOMEM;
332	}
333
334out:
335	if (peer_req)
336		drbd_free_peer_req(mdev, peer_req);
337
338	if (unlikely(err))
339		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340	return err;
341}
342
343#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
344
345static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346{
347	struct drbd_peer_request *peer_req;
348
349	if (!get_ldev(mdev))
350		return -EIO;
351
352	if (drbd_rs_should_slow_down(mdev, sector))
353		goto defer;
354
355	/* GFP_TRY, because if there is no memory available right now, this may
356	 * be rescheduled for later. It is "only" background resync, after all. */
357	peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
358				       size, GFP_TRY);
359	if (!peer_req)
360		goto defer;
361
362	peer_req->w.cb = w_e_send_csum;
363	spin_lock_irq(&mdev->tconn->req_lock);
364	list_add(&peer_req->w.list, &mdev->read_ee);
365	spin_unlock_irq(&mdev->tconn->req_lock);
366
367	atomic_add(size >> 9, &mdev->rs_sect_ev);
368	if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
369		return 0;
370
371	/* If it failed because of ENOMEM, retry should help.  If it failed
372	 * because bio_add_page failed (probably broken lower level driver),
373	 * retry may or may not help.
374	 * If it does not, you may need to force disconnect. */
375	spin_lock_irq(&mdev->tconn->req_lock);
376	list_del(&peer_req->w.list);
377	spin_unlock_irq(&mdev->tconn->req_lock);
378
379	drbd_free_peer_req(mdev, peer_req);
380defer:
381	put_ldev(mdev);
382	return -EAGAIN;
383}
384
385int w_resync_timer(struct drbd_work *w, int cancel)
386{
387	struct drbd_conf *mdev = w->mdev;
388	switch (mdev->state.conn) {
389	case C_VERIFY_S:
390		w_make_ov_request(w, cancel);
391		break;
392	case C_SYNC_TARGET:
393		w_make_resync_request(w, cancel);
394		break;
395	}
396
397	return 0;
398}
399
400void resync_timer_fn(unsigned long data)
401{
402	struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404	if (list_empty(&mdev->resync_work.list))
405		drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
406}
407
408static void fifo_set(struct fifo_buffer *fb, int value)
409{
410	int i;
411
412	for (i = 0; i < fb->size; i++)
413		fb->values[i] = value;
414}
415
416static int fifo_push(struct fifo_buffer *fb, int value)
417{
418	int ov;
419
420	ov = fb->values[fb->head_index];
421	fb->values[fb->head_index++] = value;
422
423	if (fb->head_index >= fb->size)
424		fb->head_index = 0;
425
426	return ov;
427}
428
429static void fifo_add_val(struct fifo_buffer *fb, int value)
430{
431	int i;
432
433	for (i = 0; i < fb->size; i++)
434		fb->values[i] += value;
435}
436
437static int drbd_rs_controller(struct drbd_conf *mdev)
438{
439	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
440	unsigned int want;     /* The number of sectors we want in the proxy */
441	int req_sect; /* Number of sectors to request in this turn */
442	int correction; /* Number of sectors more we need in the proxy*/
443	int cps; /* correction per invocation of drbd_rs_controller() */
444	int steps; /* Number of time steps to plan ahead */
445	int curr_corr;
446	int max_sect;
447
448	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
449	mdev->rs_in_flight -= sect_in;
450
451	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
452
453	steps = mdev->rs_plan_s.size; /* (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
454
455	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
456		want = ((mdev->ldev->dc.resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
457	} else { /* normal path */
458		want = mdev->ldev->dc.c_fill_target ? mdev->ldev->dc.c_fill_target :
459			sect_in * mdev->ldev->dc.c_delay_target * HZ / (SLEEP_TIME * 10);
460	}
461
462	correction = want - mdev->rs_in_flight - mdev->rs_planed;
463
464	/* Plan ahead */
465	cps = correction / steps;
466	fifo_add_val(&mdev->rs_plan_s, cps);
467	mdev->rs_planed += cps * steps;
468
469	/* What we do in this step */
470	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
471	spin_unlock(&mdev->peer_seq_lock);
472	mdev->rs_planed -= curr_corr;
473
474	req_sect = sect_in + curr_corr;
475	if (req_sect < 0)
476		req_sect = 0;
477
478	max_sect = (mdev->ldev->dc.c_max_rate * 2 * SLEEP_TIME) / HZ;
479	if (req_sect > max_sect)
480		req_sect = max_sect;
481
482	/*
483	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
484		 sect_in, mdev->rs_in_flight, want, correction,
485		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
486	*/
487
488	return req_sect;
489}
490
491static int drbd_rs_number_requests(struct drbd_conf *mdev)
492{
493	int number;
494	if (mdev->rs_plan_s.size) { /* mdev->ldev->dc.c_plan_ahead */
495		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
496		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
497	} else {
498		mdev->c_sync_rate = mdev->ldev->dc.resync_rate;
499		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
500	}
501
502	/* ignore the amount of pending requests, the resync controller should
503	 * throttle down to incoming reply rate soon enough anyways. */
504	return number;
505}
506
507int w_make_resync_request(struct drbd_work *w, int cancel)
508{
509	struct drbd_conf *mdev = w->mdev;
510	unsigned long bit;
511	sector_t sector;
512	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
513	int max_bio_size;
514	int number, rollback_i, size;
515	int align, queued, sndbuf;
516	int i = 0;
517
518	if (unlikely(cancel))
519		return 0;
520
521	if (mdev->rs_total == 0) {
522		/* empty resync? */
523		drbd_resync_finished(mdev);
524		return 0;
525	}
526
527	if (!get_ldev(mdev)) {
528		/* Since we only need to access mdev->rsync a
529		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
530		   to continue resync with a broken disk makes no sense at
531		   all */
532		dev_err(DEV, "Disk broke down during resync!\n");
533		return 0;
534	}
535
536	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
537	number = drbd_rs_number_requests(mdev);
538	if (number == 0)
539		goto requeue;
540
541	for (i = 0; i < number; i++) {
542		/* Stop generating RS requests, when half of the send buffer is filled */
543		mutex_lock(&mdev->tconn->data.mutex);
544		if (mdev->tconn->data.socket) {
545			queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
546			sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
547		} else {
548			queued = 1;
549			sndbuf = 0;
550		}
551		mutex_unlock(&mdev->tconn->data.mutex);
552		if (queued > sndbuf / 2)
553			goto requeue;
554
555next_sector:
556		size = BM_BLOCK_SIZE;
557		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
558
559		if (bit == DRBD_END_OF_BITMAP) {
560			mdev->bm_resync_fo = drbd_bm_bits(mdev);
561			put_ldev(mdev);
562			return 0;
563		}
564
565		sector = BM_BIT_TO_SECT(bit);
566
567		if (drbd_rs_should_slow_down(mdev, sector) ||
568		    drbd_try_rs_begin_io(mdev, sector)) {
569			mdev->bm_resync_fo = bit;
570			goto requeue;
571		}
572		mdev->bm_resync_fo = bit + 1;
573
574		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
575			drbd_rs_complete_io(mdev, sector);
576			goto next_sector;
577		}
578
579#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
580		/* try to find some adjacent bits.
581		 * we stop if we have already the maximum req size.
582		 *
583		 * Additionally always align bigger requests, in order to
584		 * be prepared for all stripe sizes of software RAIDs.
585		 */
586		align = 1;
587		rollback_i = i;
588		for (;;) {
589			if (size + BM_BLOCK_SIZE > max_bio_size)
590				break;
591
592			/* Be always aligned */
593			if (sector & ((1<<(align+3))-1))
594				break;
595
596			/* do not cross extent boundaries */
597			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
598				break;
599			/* now, is it actually dirty, after all?
600			 * caution, drbd_bm_test_bit is tri-state for some
601			 * obscure reason; ( b == 0 ) would get the out-of-band
602			 * only accidentally right because of the "oddly sized"
603			 * adjustment below */
604			if (drbd_bm_test_bit(mdev, bit+1) != 1)
605				break;
606			bit++;
607			size += BM_BLOCK_SIZE;
608			if ((BM_BLOCK_SIZE << align) <= size)
609				align++;
610			i++;
611		}
612		/* if we merged some,
613		 * reset the offset to start the next drbd_bm_find_next from */
614		if (size > BM_BLOCK_SIZE)
615			mdev->bm_resync_fo = bit + 1;
616#endif
617
618		/* adjust very last sectors, in case we are oddly sized */
619		if (sector + (size>>9) > capacity)
620			size = (capacity-sector)<<9;
621		if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
622			switch (read_for_csum(mdev, sector, size)) {
623			case -EIO: /* Disk failure */
624				put_ldev(mdev);
625				return -EIO;
626			case -EAGAIN: /* allocation failed, or ldev busy */
627				drbd_rs_complete_io(mdev, sector);
628				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
629				i = rollback_i;
630				goto requeue;
631			case 0:
632				/* everything ok */
633				break;
634			default:
635				BUG();
636			}
637		} else {
638			int err;
639
640			inc_rs_pending(mdev);
641			err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
642						 sector, size, ID_SYNCER);
643			if (err) {
644				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
645				dec_rs_pending(mdev);
646				put_ldev(mdev);
647				return err;
648			}
649		}
650	}
651
652	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
653		/* last syncer _request_ was sent,
654		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
655		 * next sync group will resume), as soon as we receive the last
656		 * resync data block, and the last bit is cleared.
657		 * until then resync "work" is "inactive" ...
658		 */
659		put_ldev(mdev);
660		return 0;
661	}
662
663 requeue:
664	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
665	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
666	put_ldev(mdev);
667	return 0;
668}
669
670static int w_make_ov_request(struct drbd_work *w, int cancel)
671{
672	struct drbd_conf *mdev = w->mdev;
673	int number, i, size;
674	sector_t sector;
675	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
676
677	if (unlikely(cancel))
678		return 1;
679
680	number = drbd_rs_number_requests(mdev);
681
682	sector = mdev->ov_position;
683	for (i = 0; i < number; i++) {
684		if (sector >= capacity) {
685			return 1;
686		}
687
688		size = BM_BLOCK_SIZE;
689
690		if (drbd_rs_should_slow_down(mdev, sector) ||
691		    drbd_try_rs_begin_io(mdev, sector)) {
692			mdev->ov_position = sector;
693			goto requeue;
694		}
695
696		if (sector + (size>>9) > capacity)
697			size = (capacity-sector)<<9;
698
699		inc_rs_pending(mdev);
700		if (drbd_send_ov_request(mdev, sector, size)) {
701			dec_rs_pending(mdev);
702			return 0;
703		}
704		sector += BM_SECT_PER_BIT;
705	}
706	mdev->ov_position = sector;
707
708 requeue:
709	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
710	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
711	return 1;
712}
713
714int w_ov_finished(struct drbd_work *w, int cancel)
715{
716	struct drbd_conf *mdev = w->mdev;
717	kfree(w);
718	ov_out_of_sync_print(mdev);
719	drbd_resync_finished(mdev);
720
721	return 0;
722}
723
724static int w_resync_finished(struct drbd_work *w, int cancel)
725{
726	struct drbd_conf *mdev = w->mdev;
727	kfree(w);
728
729	drbd_resync_finished(mdev);
730
731	return 0;
732}
733
734static void ping_peer(struct drbd_conf *mdev)
735{
736	struct drbd_tconn *tconn = mdev->tconn;
737
738	clear_bit(GOT_PING_ACK, &tconn->flags);
739	request_ping(tconn);
740	wait_event(tconn->ping_wait,
741		   test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
742}
743
744int drbd_resync_finished(struct drbd_conf *mdev)
745{
746	unsigned long db, dt, dbdt;
747	unsigned long n_oos;
748	union drbd_state os, ns;
749	struct drbd_work *w;
750	char *khelper_cmd = NULL;
751	int verify_done = 0;
752
753	/* Remove all elements from the resync LRU. Since future actions
754	 * might set bits in the (main) bitmap, then the entries in the
755	 * resync LRU would be wrong. */
756	if (drbd_rs_del_all(mdev)) {
757		/* In case this is not possible now, most probably because
758		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
759		 * queue (or even the read operations for those packets
760		 * is not finished by now).   Retry in 100ms. */
761
762		schedule_timeout_interruptible(HZ / 10);
763		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
764		if (w) {
765			w->cb = w_resync_finished;
766			drbd_queue_work(&mdev->tconn->data.work, w);
767			return 1;
768		}
769		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
770	}
771
772	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
773	if (dt <= 0)
774		dt = 1;
775	db = mdev->rs_total;
776	dbdt = Bit2KB(db/dt);
777	mdev->rs_paused /= HZ;
778
779	if (!get_ldev(mdev))
780		goto out;
781
782	ping_peer(mdev);
783
784	spin_lock_irq(&mdev->tconn->req_lock);
785	os = drbd_read_state(mdev);
786
787	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
788
789	/* This protects us against multiple calls (that can happen in the presence
790	   of application IO), and against connectivity loss just before we arrive here. */
791	if (os.conn <= C_CONNECTED)
792		goto out_unlock;
793
794	ns = os;
795	ns.conn = C_CONNECTED;
796
797	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
798	     verify_done ? "Online verify " : "Resync",
799	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
800
801	n_oos = drbd_bm_total_weight(mdev);
802
803	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
804		if (n_oos) {
805			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
806			      n_oos, Bit2KB(1));
807			khelper_cmd = "out-of-sync";
808		}
809	} else {
810		D_ASSERT((n_oos - mdev->rs_failed) == 0);
811
812		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
813			khelper_cmd = "after-resync-target";
814
815		if (mdev->tconn->csums_tfm && mdev->rs_total) {
816			const unsigned long s = mdev->rs_same_csum;
817			const unsigned long t = mdev->rs_total;
818			const int ratio =
819				(t == 0)     ? 0 :
820			(t < 100000) ? ((s*100)/t) : (s/(t/100));
821			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
822			     "transferred %luK total %luK\n",
823			     ratio,
824			     Bit2KB(mdev->rs_same_csum),
825			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
826			     Bit2KB(mdev->rs_total));
827		}
828	}
829
830	if (mdev->rs_failed) {
831		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
832
833		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
834			ns.disk = D_INCONSISTENT;
835			ns.pdsk = D_UP_TO_DATE;
836		} else {
837			ns.disk = D_UP_TO_DATE;
838			ns.pdsk = D_INCONSISTENT;
839		}
840	} else {
841		ns.disk = D_UP_TO_DATE;
842		ns.pdsk = D_UP_TO_DATE;
843
844		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
845			if (mdev->p_uuid) {
846				int i;
847				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
848					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
849				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
850				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
851			} else {
852				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
853			}
854		}
855
856		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
857			/* for verify runs, we don't update uuids here,
858			 * so there would be nothing to report. */
859			drbd_uuid_set_bm(mdev, 0UL);
860			drbd_print_uuids(mdev, "updated UUIDs");
861			if (mdev->p_uuid) {
862				/* Now the two UUID sets are equal, update what we
863				 * know of the peer. */
864				int i;
865				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
866					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
867			}
868		}
869	}
870
871	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
872out_unlock:
873	spin_unlock_irq(&mdev->tconn->req_lock);
874	put_ldev(mdev);
875out:
876	mdev->rs_total  = 0;
877	mdev->rs_failed = 0;
878	mdev->rs_paused = 0;
879	if (verify_done)
880		mdev->ov_start_sector = 0;
881
882	drbd_md_sync(mdev);
883
884	if (khelper_cmd)
885		drbd_khelper(mdev, khelper_cmd);
886
887	return 1;
888}
889
890/* helper */
891static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
892{
893	if (drbd_peer_req_has_active_page(peer_req)) {
894		/* This might happen if sendpage() has not finished */
895		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
896		atomic_add(i, &mdev->pp_in_use_by_net);
897		atomic_sub(i, &mdev->pp_in_use);
898		spin_lock_irq(&mdev->tconn->req_lock);
899		list_add_tail(&peer_req->w.list, &mdev->net_ee);
900		spin_unlock_irq(&mdev->tconn->req_lock);
901		wake_up(&drbd_pp_wait);
902	} else
903		drbd_free_peer_req(mdev, peer_req);
904}
905
906/**
907 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
908 * @mdev:	DRBD device.
909 * @w:		work object.
910 * @cancel:	The connection will be closed anyways
911 */
912int w_e_end_data_req(struct drbd_work *w, int cancel)
913{
914	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
915	struct drbd_conf *mdev = w->mdev;
916	int err;
917
918	if (unlikely(cancel)) {
919		drbd_free_peer_req(mdev, peer_req);
920		dec_unacked(mdev);
921		return 0;
922	}
923
924	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
925		err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
926	} else {
927		if (__ratelimit(&drbd_ratelimit_state))
928			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
929			    (unsigned long long)peer_req->i.sector);
930
931		err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
932	}
933
934	dec_unacked(mdev);
935
936	move_to_net_ee_or_free(mdev, peer_req);
937
938	if (unlikely(err))
939		dev_err(DEV, "drbd_send_block() failed\n");
940	return err;
941}
942
943/**
944 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
945 * @mdev:	DRBD device.
946 * @w:		work object.
947 * @cancel:	The connection will be closed anyways
948 */
949int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
950{
951	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
952	struct drbd_conf *mdev = w->mdev;
953	int err;
954
955	if (unlikely(cancel)) {
956		drbd_free_peer_req(mdev, peer_req);
957		dec_unacked(mdev);
958		return 0;
959	}
960
961	if (get_ldev_if_state(mdev, D_FAILED)) {
962		drbd_rs_complete_io(mdev, peer_req->i.sector);
963		put_ldev(mdev);
964	}
965
966	if (mdev->state.conn == C_AHEAD) {
967		err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
968	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
969		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
970			inc_rs_pending(mdev);
971			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
972		} else {
973			if (__ratelimit(&drbd_ratelimit_state))
974				dev_err(DEV, "Not sending RSDataReply, "
975				    "partner DISKLESS!\n");
976			err = 0;
977		}
978	} else {
979		if (__ratelimit(&drbd_ratelimit_state))
980			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
981			    (unsigned long long)peer_req->i.sector);
982
983		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
984
985		/* update resync data with failure */
986		drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
987	}
988
989	dec_unacked(mdev);
990
991	move_to_net_ee_or_free(mdev, peer_req);
992
993	if (unlikely(err))
994		dev_err(DEV, "drbd_send_block() failed\n");
995	return err;
996}
997
998int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
999{
1000	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1001	struct drbd_conf *mdev = w->mdev;
1002	struct digest_info *di;
1003	int digest_size;
1004	void *digest = NULL;
1005	int err, eq = 0;
1006
1007	if (unlikely(cancel)) {
1008		drbd_free_peer_req(mdev, peer_req);
1009		dec_unacked(mdev);
1010		return 0;
1011	}
1012
1013	if (get_ldev(mdev)) {
1014		drbd_rs_complete_io(mdev, peer_req->i.sector);
1015		put_ldev(mdev);
1016	}
1017
1018	di = peer_req->digest;
1019
1020	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1021		/* quick hack to try to avoid a race against reconfiguration.
1022		 * a real fix would be much more involved,
1023		 * introducing more locking mechanisms */
1024		if (mdev->tconn->csums_tfm) {
1025			digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1026			D_ASSERT(digest_size == di->digest_size);
1027			digest = kmalloc(digest_size, GFP_NOIO);
1028		}
1029		if (digest) {
1030			drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1031			eq = !memcmp(digest, di->digest, digest_size);
1032			kfree(digest);
1033		}
1034
1035		if (eq) {
1036			drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1037			/* rs_same_csums unit is BM_BLOCK_SIZE */
1038			mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1039			err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1040		} else {
1041			inc_rs_pending(mdev);
1042			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1043			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1044			kfree(di);
1045			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1046		}
1047	} else {
1048		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1049		if (__ratelimit(&drbd_ratelimit_state))
1050			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1051	}
1052
1053	dec_unacked(mdev);
1054	move_to_net_ee_or_free(mdev, peer_req);
1055
1056	if (unlikely(err))
1057		dev_err(DEV, "drbd_send_block/ack() failed\n");
1058	return err;
1059}
1060
1061int w_e_end_ov_req(struct drbd_work *w, int cancel)
1062{
1063	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1064	struct drbd_conf *mdev = w->mdev;
1065	sector_t sector = peer_req->i.sector;
1066	unsigned int size = peer_req->i.size;
1067	int digest_size;
1068	void *digest;
1069	int err = 0;
1070
1071	if (unlikely(cancel))
1072		goto out;
1073
1074	digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1075	digest = kmalloc(digest_size, GFP_NOIO);
1076	if (!digest) {
1077		err = 1;	/* terminate the connection in case the allocation failed */
1078		goto out;
1079	}
1080
1081	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1082		drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1083	else
1084		memset(digest, 0, digest_size);
1085
1086	/* Free e and pages before send.
1087	 * In case we block on congestion, we could otherwise run into
1088	 * some distributed deadlock, if the other side blocks on
1089	 * congestion as well, because our receiver blocks in
1090	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1091	drbd_free_peer_req(mdev, peer_req);
1092	peer_req = NULL;
1093	inc_rs_pending(mdev);
1094	err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1095	if (err)
1096		dec_rs_pending(mdev);
1097	kfree(digest);
1098
1099out:
1100	if (peer_req)
1101		drbd_free_peer_req(mdev, peer_req);
1102	dec_unacked(mdev);
1103	return err;
1104}
1105
1106void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1107{
1108	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1109		mdev->ov_last_oos_size += size>>9;
1110	} else {
1111		mdev->ov_last_oos_start = sector;
1112		mdev->ov_last_oos_size = size>>9;
1113	}
1114	drbd_set_out_of_sync(mdev, sector, size);
1115}
1116
1117int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1118{
1119	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1120	struct drbd_conf *mdev = w->mdev;
1121	struct digest_info *di;
1122	void *digest;
1123	sector_t sector = peer_req->i.sector;
1124	unsigned int size = peer_req->i.size;
1125	int digest_size;
1126	int err, eq = 0;
1127
1128	if (unlikely(cancel)) {
1129		drbd_free_peer_req(mdev, peer_req);
1130		dec_unacked(mdev);
1131		return 0;
1132	}
1133
1134	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1135	 * the resync lru has been cleaned up already */
1136	if (get_ldev(mdev)) {
1137		drbd_rs_complete_io(mdev, peer_req->i.sector);
1138		put_ldev(mdev);
1139	}
1140
1141	di = peer_req->digest;
1142
1143	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1144		digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1145		digest = kmalloc(digest_size, GFP_NOIO);
1146		if (digest) {
1147			drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1148
1149			D_ASSERT(digest_size == di->digest_size);
1150			eq = !memcmp(digest, di->digest, digest_size);
1151			kfree(digest);
1152		}
1153	}
1154
1155	/* Free peer_req and pages before send.
1156	 * In case we block on congestion, we could otherwise run into
1157	 * some distributed deadlock, if the other side blocks on
1158	 * congestion as well, because our receiver blocks in
1159	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1160	drbd_free_peer_req(mdev, peer_req);
1161	if (!eq)
1162		drbd_ov_out_of_sync_found(mdev, sector, size);
1163	else
1164		ov_out_of_sync_print(mdev);
1165
1166	err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1167			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1168
1169	dec_unacked(mdev);
1170
1171	--mdev->ov_left;
1172
1173	/* let's advance progress step marks only for every other megabyte */
1174	if ((mdev->ov_left & 0x200) == 0x200)
1175		drbd_advance_rs_marks(mdev, mdev->ov_left);
1176
1177	if (mdev->ov_left == 0) {
1178		ov_out_of_sync_print(mdev);
1179		drbd_resync_finished(mdev);
1180	}
1181
1182	return err;
1183}
1184
1185int w_prev_work_done(struct drbd_work *w, int cancel)
1186{
1187	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1188
1189	complete(&b->done);
1190	return 0;
1191}
1192
1193int w_send_barrier(struct drbd_work *w, int cancel)
1194{
1195	struct drbd_socket *sock;
1196	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1197	struct drbd_conf *mdev = w->mdev;
1198	struct p_barrier *p;
1199
1200	/* really avoid racing with tl_clear.  w.cb may have been referenced
1201	 * just before it was reassigned and re-queued, so double check that.
1202	 * actually, this race was harmless, since we only try to send the
1203	 * barrier packet here, and otherwise do nothing with the object.
1204	 * but compare with the head of w_clear_epoch */
1205	spin_lock_irq(&mdev->tconn->req_lock);
1206	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1207		cancel = 1;
1208	spin_unlock_irq(&mdev->tconn->req_lock);
1209	if (cancel)
1210		return 0;
1211
1212	sock = &mdev->tconn->data;
1213	p = drbd_prepare_command(mdev, sock);
1214	if (!p)
1215		return -EIO;
1216	p->barrier = b->br_number;
1217	/* inc_ap_pending was done where this was queued.
1218	 * dec_ap_pending will be done in got_BarrierAck
1219	 * or (on connection loss) in w_clear_epoch.  */
1220	return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
1221}
1222
1223int w_send_write_hint(struct drbd_work *w, int cancel)
1224{
1225	struct drbd_conf *mdev = w->mdev;
1226	struct drbd_socket *sock;
1227
1228	if (cancel)
1229		return 0;
1230	sock = &mdev->tconn->data;
1231	if (!drbd_prepare_command(mdev, sock))
1232		return -EIO;
1233	return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1234}
1235
1236int w_send_out_of_sync(struct drbd_work *w, int cancel)
1237{
1238	struct drbd_request *req = container_of(w, struct drbd_request, w);
1239	struct drbd_conf *mdev = w->mdev;
1240	int err;
1241
1242	if (unlikely(cancel)) {
1243		req_mod(req, SEND_CANCELED);
1244		return 0;
1245	}
1246
1247	err = drbd_send_out_of_sync(mdev, req);
1248	req_mod(req, OOS_HANDED_TO_NETWORK);
1249
1250	return err;
1251}
1252
1253/**
1254 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1255 * @mdev:	DRBD device.
1256 * @w:		work object.
1257 * @cancel:	The connection will be closed anyways
1258 */
1259int w_send_dblock(struct drbd_work *w, int cancel)
1260{
1261	struct drbd_request *req = container_of(w, struct drbd_request, w);
1262	struct drbd_conf *mdev = w->mdev;
1263	int err;
1264
1265	if (unlikely(cancel)) {
1266		req_mod(req, SEND_CANCELED);
1267		return 0;
1268	}
1269
1270	err = drbd_send_dblock(mdev, req);
1271	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1272
1273	return err;
1274}
1275
1276/**
1277 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1278 * @mdev:	DRBD device.
1279 * @w:		work object.
1280 * @cancel:	The connection will be closed anyways
1281 */
1282int w_send_read_req(struct drbd_work *w, int cancel)
1283{
1284	struct drbd_request *req = container_of(w, struct drbd_request, w);
1285	struct drbd_conf *mdev = w->mdev;
1286	int err;
1287
1288	if (unlikely(cancel)) {
1289		req_mod(req, SEND_CANCELED);
1290		return 0;
1291	}
1292
1293	err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1294				 (unsigned long)req);
1295
1296	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1297
1298	return err;
1299}
1300
1301int w_restart_disk_io(struct drbd_work *w, int cancel)
1302{
1303	struct drbd_request *req = container_of(w, struct drbd_request, w);
1304	struct drbd_conf *mdev = w->mdev;
1305
1306	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1307		drbd_al_begin_io(mdev, &req->i);
1308	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1309	   theoretically. Practically it can not deadlock, since this is
1310	   only used when unfreezing IOs. All the extents of the requests
1311	   that made it into the TL are already active */
1312
1313	drbd_req_make_private_bio(req, req->master_bio);
1314	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1315	generic_make_request(req->private_bio);
1316
1317	return 0;
1318}
1319
1320static int _drbd_may_sync_now(struct drbd_conf *mdev)
1321{
1322	struct drbd_conf *odev = mdev;
1323
1324	while (1) {
1325		if (!odev->ldev)
1326			return 1;
1327		if (odev->ldev->dc.resync_after == -1)
1328			return 1;
1329		odev = minor_to_mdev(odev->ldev->dc.resync_after);
1330		if (!expect(odev))
1331			return 1;
1332		if ((odev->state.conn >= C_SYNC_SOURCE &&
1333		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1334		    odev->state.aftr_isp || odev->state.peer_isp ||
1335		    odev->state.user_isp)
1336			return 0;
1337	}
1338}
1339
1340/**
1341 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342 * @mdev:	DRBD device.
1343 *
1344 * Called from process context only (admin command and after_state_ch).
1345 */
1346static int _drbd_pause_after(struct drbd_conf *mdev)
1347{
1348	struct drbd_conf *odev;
1349	int i, rv = 0;
1350
1351	idr_for_each_entry(&minors, odev, i) {
1352		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1353			continue;
1354		if (!_drbd_may_sync_now(odev))
1355			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1356			       != SS_NOTHING_TO_DO);
1357	}
1358
1359	return rv;
1360}
1361
1362/**
1363 * _drbd_resume_next() - Resume resync on all devices that may resync now
1364 * @mdev:	DRBD device.
1365 *
1366 * Called from process context only (admin command and worker).
1367 */
1368static int _drbd_resume_next(struct drbd_conf *mdev)
1369{
1370	struct drbd_conf *odev;
1371	int i, rv = 0;
1372
1373	idr_for_each_entry(&minors, odev, i) {
1374		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1375			continue;
1376		if (odev->state.aftr_isp) {
1377			if (_drbd_may_sync_now(odev))
1378				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1379							CS_HARD, NULL)
1380				       != SS_NOTHING_TO_DO) ;
1381		}
1382	}
1383	return rv;
1384}
1385
1386void resume_next_sg(struct drbd_conf *mdev)
1387{
1388	write_lock_irq(&global_state_lock);
1389	_drbd_resume_next(mdev);
1390	write_unlock_irq(&global_state_lock);
1391}
1392
1393void suspend_other_sg(struct drbd_conf *mdev)
1394{
1395	write_lock_irq(&global_state_lock);
1396	_drbd_pause_after(mdev);
1397	write_unlock_irq(&global_state_lock);
1398}
1399
1400static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1401{
1402	struct drbd_conf *odev;
1403
1404	if (o_minor == -1)
1405		return NO_ERROR;
1406	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1407		return ERR_SYNC_AFTER;
1408
1409	/* check for loops */
1410	odev = minor_to_mdev(o_minor);
1411	while (1) {
1412		if (odev == mdev)
1413			return ERR_SYNC_AFTER_CYCLE;
1414
1415		/* dependency chain ends here, no cycles. */
1416		if (odev->ldev->dc.resync_after == -1)
1417			return NO_ERROR;
1418
1419		/* follow the dependency chain */
1420		odev = minor_to_mdev(odev->ldev->dc.resync_after);
1421	}
1422}
1423
1424int drbd_alter_sa(struct drbd_conf *mdev, int na)
1425{
1426	int changes;
1427	int retcode;
1428
1429	write_lock_irq(&global_state_lock);
1430	retcode = sync_after_error(mdev, na);
1431	if (retcode == NO_ERROR) {
1432		mdev->ldev->dc.resync_after = na;
1433		do {
1434			changes  = _drbd_pause_after(mdev);
1435			changes |= _drbd_resume_next(mdev);
1436		} while (changes);
1437	}
1438	write_unlock_irq(&global_state_lock);
1439	return retcode;
1440}
1441
1442void drbd_rs_controller_reset(struct drbd_conf *mdev)
1443{
1444	atomic_set(&mdev->rs_sect_in, 0);
1445	atomic_set(&mdev->rs_sect_ev, 0);
1446	mdev->rs_in_flight = 0;
1447	mdev->rs_planed = 0;
1448	spin_lock(&mdev->peer_seq_lock);
1449	fifo_set(&mdev->rs_plan_s, 0);
1450	spin_unlock(&mdev->peer_seq_lock);
1451}
1452
1453void start_resync_timer_fn(unsigned long data)
1454{
1455	struct drbd_conf *mdev = (struct drbd_conf *) data;
1456
1457	drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1458}
1459
1460int w_start_resync(struct drbd_work *w, int cancel)
1461{
1462	struct drbd_conf *mdev = w->mdev;
1463
1464	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1465		dev_warn(DEV, "w_start_resync later...\n");
1466		mdev->start_resync_timer.expires = jiffies + HZ/10;
1467		add_timer(&mdev->start_resync_timer);
1468		return 0;
1469	}
1470
1471	drbd_start_resync(mdev, C_SYNC_SOURCE);
1472	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1473	return 0;
1474}
1475
1476/**
1477 * drbd_start_resync() - Start the resync process
1478 * @mdev:	DRBD device.
1479 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1480 *
1481 * This function might bring you directly into one of the
1482 * C_PAUSED_SYNC_* states.
1483 */
1484void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1485{
1486	union drbd_state ns;
1487	int r;
1488
1489	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1490		dev_err(DEV, "Resync already running!\n");
1491		return;
1492	}
1493
1494	if (mdev->state.conn < C_AHEAD) {
1495		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1496		drbd_rs_cancel_all(mdev);
1497		/* This should be done when we abort the resync. We definitely do not
1498		   want to have this for connections going back and forth between
1499		   Ahead/Behind and SyncSource/SyncTarget */
1500	}
1501
1502	if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1503		if (side == C_SYNC_TARGET) {
1504			/* Since application IO was locked out during C_WF_BITMAP_T and
1505			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1506			   we check that we might make the data inconsistent. */
1507			r = drbd_khelper(mdev, "before-resync-target");
1508			r = (r >> 8) & 0xff;
1509			if (r > 0) {
1510				dev_info(DEV, "before-resync-target handler returned %d, "
1511					 "dropping connection.\n", r);
1512				conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1513				return;
1514			}
1515		} else /* C_SYNC_SOURCE */ {
1516			r = drbd_khelper(mdev, "before-resync-source");
1517			r = (r >> 8) & 0xff;
1518			if (r > 0) {
1519				if (r == 3) {
1520					dev_info(DEV, "before-resync-source handler returned %d, "
1521						 "ignoring. Old userland tools?", r);
1522				} else {
1523					dev_info(DEV, "before-resync-source handler returned %d, "
1524						 "dropping connection.\n", r);
1525					conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1526					return;
1527				}
1528			}
1529		}
1530	}
1531
1532	if (current == mdev->tconn->worker.task) {
1533		/* The worker should not sleep waiting for state_mutex,
1534		   that can take long */
1535		if (!mutex_trylock(mdev->state_mutex)) {
1536			set_bit(B_RS_H_DONE, &mdev->flags);
1537			mdev->start_resync_timer.expires = jiffies + HZ/5;
1538			add_timer(&mdev->start_resync_timer);
1539			return;
1540		}
1541	} else {
1542		mutex_lock(mdev->state_mutex);
1543	}
1544	clear_bit(B_RS_H_DONE, &mdev->flags);
1545
1546	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1547		mutex_unlock(mdev->state_mutex);
1548		return;
1549	}
1550
1551	write_lock_irq(&global_state_lock);
1552	ns = drbd_read_state(mdev);
1553
1554	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1555
1556	ns.conn = side;
1557
1558	if (side == C_SYNC_TARGET)
1559		ns.disk = D_INCONSISTENT;
1560	else /* side == C_SYNC_SOURCE */
1561		ns.pdsk = D_INCONSISTENT;
1562
1563	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1564	ns = drbd_read_state(mdev);
1565
1566	if (ns.conn < C_CONNECTED)
1567		r = SS_UNKNOWN_ERROR;
1568
1569	if (r == SS_SUCCESS) {
1570		unsigned long tw = drbd_bm_total_weight(mdev);
1571		unsigned long now = jiffies;
1572		int i;
1573
1574		mdev->rs_failed    = 0;
1575		mdev->rs_paused    = 0;
1576		mdev->rs_same_csum = 0;
1577		mdev->rs_last_events = 0;
1578		mdev->rs_last_sect_ev = 0;
1579		mdev->rs_total     = tw;
1580		mdev->rs_start     = now;
1581		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1582			mdev->rs_mark_left[i] = tw;
1583			mdev->rs_mark_time[i] = now;
1584		}
1585		_drbd_pause_after(mdev);
1586	}
1587	write_unlock_irq(&global_state_lock);
1588
1589	if (r == SS_SUCCESS) {
1590		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1591		     drbd_conn_str(ns.conn),
1592		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1593		     (unsigned long) mdev->rs_total);
1594		if (side == C_SYNC_TARGET)
1595			mdev->bm_resync_fo = 0;
1596
1597		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1598		 * with w_send_oos, or the sync target will get confused as to
1599		 * how much bits to resync.  We cannot do that always, because for an
1600		 * empty resync and protocol < 95, we need to do it here, as we call
1601		 * drbd_resync_finished from here in that case.
1602		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1603		 * and from after_state_ch otherwise. */
1604		if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1605			drbd_gen_and_send_sync_uuid(mdev);
1606
1607		if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1608			/* This still has a race (about when exactly the peers
1609			 * detect connection loss) that can lead to a full sync
1610			 * on next handshake. In 8.3.9 we fixed this with explicit
1611			 * resync-finished notifications, but the fix
1612			 * introduces a protocol change.  Sleeping for some
1613			 * time longer than the ping interval + timeout on the
1614			 * SyncSource, to give the SyncTarget the chance to
1615			 * detect connection loss, then waiting for a ping
1616			 * response (implicit in drbd_resync_finished) reduces
1617			 * the race considerably, but does not solve it. */
1618			if (side == C_SYNC_SOURCE)
1619				schedule_timeout_interruptible(
1620					mdev->tconn->net_conf->ping_int * HZ +
1621					mdev->tconn->net_conf->ping_timeo*HZ/9);
1622			drbd_resync_finished(mdev);
1623		}
1624
1625		drbd_rs_controller_reset(mdev);
1626		/* ns.conn may already be != mdev->state.conn,
1627		 * we may have been paused in between, or become paused until
1628		 * the timer triggers.
1629		 * No matter, that is handled in resync_timer_fn() */
1630		if (ns.conn == C_SYNC_TARGET)
1631			mod_timer(&mdev->resync_timer, jiffies);
1632
1633		drbd_md_sync(mdev);
1634	}
1635	put_ldev(mdev);
1636	mutex_unlock(mdev->state_mutex);
1637}
1638
1639int drbd_worker(struct drbd_thread *thi)
1640{
1641	struct drbd_tconn *tconn = thi->tconn;
1642	struct drbd_work *w = NULL;
1643	struct drbd_conf *mdev;
1644	LIST_HEAD(work_list);
1645	int vnr, intr = 0;
1646
1647	while (get_t_state(thi) == RUNNING) {
1648		drbd_thread_current_set_cpu(thi);
1649
1650		if (down_trylock(&tconn->data.work.s)) {
1651			mutex_lock(&tconn->data.mutex);
1652			if (tconn->data.socket && !tconn->net_conf->no_cork)
1653				drbd_tcp_uncork(tconn->data.socket);
1654			mutex_unlock(&tconn->data.mutex);
1655
1656			intr = down_interruptible(&tconn->data.work.s);
1657
1658			mutex_lock(&tconn->data.mutex);
1659			if (tconn->data.socket  && !tconn->net_conf->no_cork)
1660				drbd_tcp_cork(tconn->data.socket);
1661			mutex_unlock(&tconn->data.mutex);
1662		}
1663
1664		if (intr) {
1665			flush_signals(current);
1666			if (get_t_state(thi) == RUNNING) {
1667				conn_warn(tconn, "Worker got an unexpected signal\n");
1668				continue;
1669			}
1670			break;
1671		}
1672
1673		if (get_t_state(thi) != RUNNING)
1674			break;
1675		/* With this break, we have done a down() but not consumed
1676		   the entry from the list. The cleanup code takes care of
1677		   this...   */
1678
1679		w = NULL;
1680		spin_lock_irq(&tconn->data.work.q_lock);
1681		if (list_empty(&tconn->data.work.q)) {
1682			/* something terribly wrong in our logic.
1683			 * we were able to down() the semaphore,
1684			 * but the list is empty... doh.
1685			 *
1686			 * what is the best thing to do now?
1687			 * try again from scratch, restarting the receiver,
1688			 * asender, whatnot? could break even more ugly,
1689			 * e.g. when we are primary, but no good local data.
1690			 *
1691			 * I'll try to get away just starting over this loop.
1692			 */
1693			conn_warn(tconn, "Work list unexpectedly empty\n");
1694			spin_unlock_irq(&tconn->data.work.q_lock);
1695			continue;
1696		}
1697		w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1698		list_del_init(&w->list);
1699		spin_unlock_irq(&tconn->data.work.q_lock);
1700
1701		if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1702			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1703			if (tconn->cstate >= C_WF_REPORT_PARAMS)
1704				conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1705		}
1706	}
1707
1708	spin_lock_irq(&tconn->data.work.q_lock);
1709	while (!list_empty(&tconn->data.work.q)) {
1710		list_splice_init(&tconn->data.work.q, &work_list);
1711		spin_unlock_irq(&tconn->data.work.q_lock);
1712
1713		while (!list_empty(&work_list)) {
1714			w = list_entry(work_list.next, struct drbd_work, list);
1715			list_del_init(&w->list);
1716			w->cb(w, 1);
1717		}
1718
1719		spin_lock_irq(&tconn->data.work.q_lock);
1720	}
1721	sema_init(&tconn->data.work.s, 0);
1722	/* DANGEROUS race: if someone did queue his work within the spinlock,
1723	 * but up() ed outside the spinlock, we could get an up() on the
1724	 * semaphore without corresponding list entry.
1725	 * So don't do that.
1726	 */
1727	spin_unlock_irq(&tconn->data.work.q_lock);
1728
1729	drbd_thread_stop(&tconn->receiver);
1730	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1731		D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1732		/* _drbd_set_state only uses stop_nowait.
1733		 * wait here for the exiting receiver. */
1734		drbd_mdev_cleanup(mdev);
1735	}
1736	clear_bit(OBJECT_DYING, &tconn->flags);
1737	clear_bit(CONFIG_PENDING, &tconn->flags);
1738	wake_up(&tconn->ping_wait);
1739
1740	return 0;
1741}
1742