drbd_worker.c revision a18e9d1eb0660621eb9911e59a9b4d664cbad4d9
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
40
41static int w_make_ov_request(struct drbd_work *w, int cancel);
42
43
44/* endio handlers:
45 *   drbd_md_io_complete (defined here)
46 *   drbd_request_endio (defined here)
47 *   drbd_peer_request_endio (defined here)
48 *   bm_async_io_complete (defined in drbd_bitmap.c)
49 *
50 * For all these callbacks, note the following:
51 * The callbacks will be called in irq context by the IDE drivers,
52 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
53 * Try to get the locking right :)
54 *
55 */
56
57
58/* About the global_state_lock
59   Each state transition on an device holds a read lock. In case we have
60   to evaluate the sync after dependencies, we grab a write lock, because
61   we need stable states on all devices for that.  */
62rwlock_t global_state_lock;
63
64/* used for synchronous meta data and bitmap IO
65 * submitted by drbd_md_sync_page_io()
66 */
67void drbd_md_io_complete(struct bio *bio, int error)
68{
69	struct drbd_md_io *md_io;
70
71	md_io = (struct drbd_md_io *)bio->bi_private;
72	md_io->error = error;
73
74	complete(&md_io->event);
75}
76
77/* reads on behalf of the partner,
78 * "submitted" by the receiver
79 */
80void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
81{
82	unsigned long flags = 0;
83	struct drbd_conf *mdev = peer_req->w.mdev;
84
85	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
86	mdev->read_cnt += peer_req->i.size >> 9;
87	list_del(&peer_req->w.list);
88	if (list_empty(&mdev->read_ee))
89		wake_up(&mdev->ee_wait);
90	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
91		__drbd_chk_io_error(mdev, false);
92	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
93
94	drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
95	put_ldev(mdev);
96}
97
98/* writes on behalf of the partner, or resync writes,
99 * "submitted" by the receiver, final stage.  */
100static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101{
102	unsigned long flags = 0;
103	struct drbd_conf *mdev = peer_req->w.mdev;
104	struct drbd_interval i;
105	int do_wake;
106	u64 block_id;
107	int do_al_complete_io;
108
109	/* after we moved peer_req to done_ee,
110	 * we may no longer access it,
111	 * it may be freed/reused already!
112	 * (as soon as we release the req_lock) */
113	i = peer_req->i;
114	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
115	block_id = peer_req->block_id;
116
117	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
118	mdev->writ_cnt += peer_req->i.size >> 9;
119	list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
120	list_add_tail(&peer_req->w.list, &mdev->done_ee);
121
122	/*
123	 * Do not remove from the write_requests tree here: we did not send the
124	 * Ack yet and did not wake possibly waiting conflicting requests.
125	 * Removed from the tree from "drbd_process_done_ee" within the
126	 * appropriate w.cb (e_end_block/e_end_resync_block) or from
127	 * _drbd_clear_done_ee.
128	 */
129
130	do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
131
132	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
133		__drbd_chk_io_error(mdev, false);
134	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
135
136	if (block_id == ID_SYNCER)
137		drbd_rs_complete_io(mdev, i.sector);
138
139	if (do_wake)
140		wake_up(&mdev->ee_wait);
141
142	if (do_al_complete_io)
143		drbd_al_complete_io(mdev, &i);
144
145	wake_asender(mdev->tconn);
146	put_ldev(mdev);
147}
148
149/* writes on behalf of the partner, or resync writes,
150 * "submitted" by the receiver.
151 */
152void drbd_peer_request_endio(struct bio *bio, int error)
153{
154	struct drbd_peer_request *peer_req = bio->bi_private;
155	struct drbd_conf *mdev = peer_req->w.mdev;
156	int uptodate = bio_flagged(bio, BIO_UPTODATE);
157	int is_write = bio_data_dir(bio) == WRITE;
158
159	if (error && __ratelimit(&drbd_ratelimit_state))
160		dev_warn(DEV, "%s: error=%d s=%llus\n",
161				is_write ? "write" : "read", error,
162				(unsigned long long)peer_req->i.sector);
163	if (!error && !uptodate) {
164		if (__ratelimit(&drbd_ratelimit_state))
165			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
166					is_write ? "write" : "read",
167					(unsigned long long)peer_req->i.sector);
168		/* strange behavior of some lower level drivers...
169		 * fail the request by clearing the uptodate flag,
170		 * but do not return any error?! */
171		error = -EIO;
172	}
173
174	if (error)
175		set_bit(__EE_WAS_ERROR, &peer_req->flags);
176
177	bio_put(bio); /* no need for the bio anymore */
178	if (atomic_dec_and_test(&peer_req->pending_bios)) {
179		if (is_write)
180			drbd_endio_write_sec_final(peer_req);
181		else
182			drbd_endio_read_sec_final(peer_req);
183	}
184}
185
186/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
187 */
188void drbd_request_endio(struct bio *bio, int error)
189{
190	unsigned long flags;
191	struct drbd_request *req = bio->bi_private;
192	struct drbd_conf *mdev = req->w.mdev;
193	struct bio_and_error m;
194	enum drbd_req_event what;
195	int uptodate = bio_flagged(bio, BIO_UPTODATE);
196
197	if (!error && !uptodate) {
198		dev_warn(DEV, "p %s: setting error to -EIO\n",
199			 bio_data_dir(bio) == WRITE ? "write" : "read");
200		/* strange behavior of some lower level drivers...
201		 * fail the request by clearing the uptodate flag,
202		 * but do not return any error?! */
203		error = -EIO;
204	}
205
206	/* to avoid recursion in __req_mod */
207	if (unlikely(error)) {
208		what = (bio_data_dir(bio) == WRITE)
209			? WRITE_COMPLETED_WITH_ERROR
210			: (bio_rw(bio) == READ)
211			  ? READ_COMPLETED_WITH_ERROR
212			  : READ_AHEAD_COMPLETED_WITH_ERROR;
213	} else
214		what = COMPLETED_OK;
215
216	bio_put(req->private_bio);
217	req->private_bio = ERR_PTR(error);
218
219	/* not req_mod(), we need irqsave here! */
220	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
221	__req_mod(req, what, &m);
222	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
223
224	if (m.bio)
225		complete_master_bio(mdev, &m);
226}
227
228int w_read_retry_remote(struct drbd_work *w, int cancel)
229{
230	struct drbd_request *req = container_of(w, struct drbd_request, w);
231	struct drbd_conf *mdev = w->mdev;
232
233	/* We should not detach for read io-error,
234	 * but try to WRITE the P_DATA_REPLY to the failed location,
235	 * to give the disk the chance to relocate that block */
236
237	spin_lock_irq(&mdev->tconn->req_lock);
238	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
239		_req_mod(req, READ_RETRY_REMOTE_CANCELED);
240		spin_unlock_irq(&mdev->tconn->req_lock);
241		return 0;
242	}
243	spin_unlock_irq(&mdev->tconn->req_lock);
244
245	return w_send_read_req(w, 0);
246}
247
248void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
249		  struct drbd_peer_request *peer_req, void *digest)
250{
251	struct hash_desc desc;
252	struct scatterlist sg;
253	struct page *page = peer_req->pages;
254	struct page *tmp;
255	unsigned len;
256
257	desc.tfm = tfm;
258	desc.flags = 0;
259
260	sg_init_table(&sg, 1);
261	crypto_hash_init(&desc);
262
263	while ((tmp = page_chain_next(page))) {
264		/* all but the last page will be fully used */
265		sg_set_page(&sg, page, PAGE_SIZE, 0);
266		crypto_hash_update(&desc, &sg, sg.length);
267		page = tmp;
268	}
269	/* and now the last, possibly only partially used page */
270	len = peer_req->i.size & (PAGE_SIZE - 1);
271	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
272	crypto_hash_update(&desc, &sg, sg.length);
273	crypto_hash_final(&desc, digest);
274}
275
276void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
277{
278	struct hash_desc desc;
279	struct scatterlist sg;
280	struct bio_vec *bvec;
281	int i;
282
283	desc.tfm = tfm;
284	desc.flags = 0;
285
286	sg_init_table(&sg, 1);
287	crypto_hash_init(&desc);
288
289	__bio_for_each_segment(bvec, bio, i, 0) {
290		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
291		crypto_hash_update(&desc, &sg, sg.length);
292	}
293	crypto_hash_final(&desc, digest);
294}
295
296/* MAYBE merge common code with w_e_end_ov_req */
297static int w_e_send_csum(struct drbd_work *w, int cancel)
298{
299	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
300	struct drbd_conf *mdev = w->mdev;
301	int digest_size;
302	void *digest;
303	int err = 0;
304
305	if (unlikely(cancel))
306		goto out;
307
308	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
309		goto out;
310
311	digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
312	digest = kmalloc(digest_size, GFP_NOIO);
313	if (digest) {
314		sector_t sector = peer_req->i.sector;
315		unsigned int size = peer_req->i.size;
316		drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
317		/* Free peer_req and pages before send.
318		 * In case we block on congestion, we could otherwise run into
319		 * some distributed deadlock, if the other side blocks on
320		 * congestion as well, because our receiver blocks in
321		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
322		drbd_free_peer_req(mdev, peer_req);
323		peer_req = NULL;
324		inc_rs_pending(mdev);
325		err = drbd_send_drequest_csum(mdev, sector, size,
326					      digest, digest_size,
327					      P_CSUM_RS_REQUEST);
328		kfree(digest);
329	} else {
330		dev_err(DEV, "kmalloc() of digest failed.\n");
331		err = -ENOMEM;
332	}
333
334out:
335	if (peer_req)
336		drbd_free_peer_req(mdev, peer_req);
337
338	if (unlikely(err))
339		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
340	return err;
341}
342
343#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
344
345static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
346{
347	struct drbd_peer_request *peer_req;
348
349	if (!get_ldev(mdev))
350		return -EIO;
351
352	if (drbd_rs_should_slow_down(mdev, sector))
353		goto defer;
354
355	/* GFP_TRY, because if there is no memory available right now, this may
356	 * be rescheduled for later. It is "only" background resync, after all. */
357	peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
358				       size, GFP_TRY);
359	if (!peer_req)
360		goto defer;
361
362	peer_req->w.cb = w_e_send_csum;
363	spin_lock_irq(&mdev->tconn->req_lock);
364	list_add(&peer_req->w.list, &mdev->read_ee);
365	spin_unlock_irq(&mdev->tconn->req_lock);
366
367	atomic_add(size >> 9, &mdev->rs_sect_ev);
368	if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
369		return 0;
370
371	/* If it failed because of ENOMEM, retry should help.  If it failed
372	 * because bio_add_page failed (probably broken lower level driver),
373	 * retry may or may not help.
374	 * If it does not, you may need to force disconnect. */
375	spin_lock_irq(&mdev->tconn->req_lock);
376	list_del(&peer_req->w.list);
377	spin_unlock_irq(&mdev->tconn->req_lock);
378
379	drbd_free_peer_req(mdev, peer_req);
380defer:
381	put_ldev(mdev);
382	return -EAGAIN;
383}
384
385int w_resync_timer(struct drbd_work *w, int cancel)
386{
387	struct drbd_conf *mdev = w->mdev;
388	switch (mdev->state.conn) {
389	case C_VERIFY_S:
390		w_make_ov_request(w, cancel);
391		break;
392	case C_SYNC_TARGET:
393		w_make_resync_request(w, cancel);
394		break;
395	}
396
397	return 0;
398}
399
400void resync_timer_fn(unsigned long data)
401{
402	struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404	if (list_empty(&mdev->resync_work.list))
405		drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
406}
407
408static void fifo_set(struct fifo_buffer *fb, int value)
409{
410	int i;
411
412	for (i = 0; i < fb->size; i++)
413		fb->values[i] = value;
414}
415
416static int fifo_push(struct fifo_buffer *fb, int value)
417{
418	int ov;
419
420	ov = fb->values[fb->head_index];
421	fb->values[fb->head_index++] = value;
422
423	if (fb->head_index >= fb->size)
424		fb->head_index = 0;
425
426	return ov;
427}
428
429static void fifo_add_val(struct fifo_buffer *fb, int value)
430{
431	int i;
432
433	for (i = 0; i < fb->size; i++)
434		fb->values[i] += value;
435}
436
437static int drbd_rs_controller(struct drbd_conf *mdev)
438{
439	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
440	unsigned int want;     /* The number of sectors we want in the proxy */
441	int req_sect; /* Number of sectors to request in this turn */
442	int correction; /* Number of sectors more we need in the proxy*/
443	int cps; /* correction per invocation of drbd_rs_controller() */
444	int steps; /* Number of time steps to plan ahead */
445	int curr_corr;
446	int max_sect;
447
448	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
449	mdev->rs_in_flight -= sect_in;
450
451	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
452
453	steps = mdev->rs_plan_s.size; /* (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
454
455	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
456		want = ((mdev->ldev->dc.resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
457	} else { /* normal path */
458		want = mdev->ldev->dc.c_fill_target ? mdev->ldev->dc.c_fill_target :
459			sect_in * mdev->ldev->dc.c_delay_target * HZ / (SLEEP_TIME * 10);
460	}
461
462	correction = want - mdev->rs_in_flight - mdev->rs_planed;
463
464	/* Plan ahead */
465	cps = correction / steps;
466	fifo_add_val(&mdev->rs_plan_s, cps);
467	mdev->rs_planed += cps * steps;
468
469	/* What we do in this step */
470	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
471	spin_unlock(&mdev->peer_seq_lock);
472	mdev->rs_planed -= curr_corr;
473
474	req_sect = sect_in + curr_corr;
475	if (req_sect < 0)
476		req_sect = 0;
477
478	max_sect = (mdev->ldev->dc.c_max_rate * 2 * SLEEP_TIME) / HZ;
479	if (req_sect > max_sect)
480		req_sect = max_sect;
481
482	/*
483	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
484		 sect_in, mdev->rs_in_flight, want, correction,
485		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
486	*/
487
488	return req_sect;
489}
490
491static int drbd_rs_number_requests(struct drbd_conf *mdev)
492{
493	int number;
494	if (mdev->rs_plan_s.size) { /* mdev->ldev->dc.c_plan_ahead */
495		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
496		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
497	} else {
498		mdev->c_sync_rate = mdev->ldev->dc.resync_rate;
499		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
500	}
501
502	/* ignore the amount of pending requests, the resync controller should
503	 * throttle down to incoming reply rate soon enough anyways. */
504	return number;
505}
506
507int w_make_resync_request(struct drbd_work *w, int cancel)
508{
509	struct drbd_conf *mdev = w->mdev;
510	unsigned long bit;
511	sector_t sector;
512	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
513	int max_bio_size;
514	int number, rollback_i, size;
515	int align, queued, sndbuf;
516	int i = 0;
517
518	if (unlikely(cancel))
519		return 0;
520
521	if (mdev->rs_total == 0) {
522		/* empty resync? */
523		drbd_resync_finished(mdev);
524		return 0;
525	}
526
527	if (!get_ldev(mdev)) {
528		/* Since we only need to access mdev->rsync a
529		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
530		   to continue resync with a broken disk makes no sense at
531		   all */
532		dev_err(DEV, "Disk broke down during resync!\n");
533		return 0;
534	}
535
536	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
537	number = drbd_rs_number_requests(mdev);
538	if (number == 0)
539		goto requeue;
540
541	for (i = 0; i < number; i++) {
542		/* Stop generating RS requests, when half of the send buffer is filled */
543		mutex_lock(&mdev->tconn->data.mutex);
544		if (mdev->tconn->data.socket) {
545			queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
546			sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
547		} else {
548			queued = 1;
549			sndbuf = 0;
550		}
551		mutex_unlock(&mdev->tconn->data.mutex);
552		if (queued > sndbuf / 2)
553			goto requeue;
554
555next_sector:
556		size = BM_BLOCK_SIZE;
557		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
558
559		if (bit == DRBD_END_OF_BITMAP) {
560			mdev->bm_resync_fo = drbd_bm_bits(mdev);
561			put_ldev(mdev);
562			return 0;
563		}
564
565		sector = BM_BIT_TO_SECT(bit);
566
567		if (drbd_rs_should_slow_down(mdev, sector) ||
568		    drbd_try_rs_begin_io(mdev, sector)) {
569			mdev->bm_resync_fo = bit;
570			goto requeue;
571		}
572		mdev->bm_resync_fo = bit + 1;
573
574		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
575			drbd_rs_complete_io(mdev, sector);
576			goto next_sector;
577		}
578
579#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
580		/* try to find some adjacent bits.
581		 * we stop if we have already the maximum req size.
582		 *
583		 * Additionally always align bigger requests, in order to
584		 * be prepared for all stripe sizes of software RAIDs.
585		 */
586		align = 1;
587		rollback_i = i;
588		for (;;) {
589			if (size + BM_BLOCK_SIZE > max_bio_size)
590				break;
591
592			/* Be always aligned */
593			if (sector & ((1<<(align+3))-1))
594				break;
595
596			/* do not cross extent boundaries */
597			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
598				break;
599			/* now, is it actually dirty, after all?
600			 * caution, drbd_bm_test_bit is tri-state for some
601			 * obscure reason; ( b == 0 ) would get the out-of-band
602			 * only accidentally right because of the "oddly sized"
603			 * adjustment below */
604			if (drbd_bm_test_bit(mdev, bit+1) != 1)
605				break;
606			bit++;
607			size += BM_BLOCK_SIZE;
608			if ((BM_BLOCK_SIZE << align) <= size)
609				align++;
610			i++;
611		}
612		/* if we merged some,
613		 * reset the offset to start the next drbd_bm_find_next from */
614		if (size > BM_BLOCK_SIZE)
615			mdev->bm_resync_fo = bit + 1;
616#endif
617
618		/* adjust very last sectors, in case we are oddly sized */
619		if (sector + (size>>9) > capacity)
620			size = (capacity-sector)<<9;
621		if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
622			switch (read_for_csum(mdev, sector, size)) {
623			case -EIO: /* Disk failure */
624				put_ldev(mdev);
625				return -EIO;
626			case -EAGAIN: /* allocation failed, or ldev busy */
627				drbd_rs_complete_io(mdev, sector);
628				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
629				i = rollback_i;
630				goto requeue;
631			case 0:
632				/* everything ok */
633				break;
634			default:
635				BUG();
636			}
637		} else {
638			int err;
639
640			inc_rs_pending(mdev);
641			err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
642						 sector, size, ID_SYNCER);
643			if (err) {
644				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
645				dec_rs_pending(mdev);
646				put_ldev(mdev);
647				return err;
648			}
649		}
650	}
651
652	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
653		/* last syncer _request_ was sent,
654		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
655		 * next sync group will resume), as soon as we receive the last
656		 * resync data block, and the last bit is cleared.
657		 * until then resync "work" is "inactive" ...
658		 */
659		put_ldev(mdev);
660		return 0;
661	}
662
663 requeue:
664	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
665	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
666	put_ldev(mdev);
667	return 0;
668}
669
670static int w_make_ov_request(struct drbd_work *w, int cancel)
671{
672	struct drbd_conf *mdev = w->mdev;
673	int number, i, size;
674	sector_t sector;
675	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
676
677	if (unlikely(cancel))
678		return 1;
679
680	number = drbd_rs_number_requests(mdev);
681
682	sector = mdev->ov_position;
683	for (i = 0; i < number; i++) {
684		if (sector >= capacity) {
685			return 1;
686		}
687
688		size = BM_BLOCK_SIZE;
689
690		if (drbd_rs_should_slow_down(mdev, sector) ||
691		    drbd_try_rs_begin_io(mdev, sector)) {
692			mdev->ov_position = sector;
693			goto requeue;
694		}
695
696		if (sector + (size>>9) > capacity)
697			size = (capacity-sector)<<9;
698
699		inc_rs_pending(mdev);
700		if (drbd_send_ov_request(mdev, sector, size)) {
701			dec_rs_pending(mdev);
702			return 0;
703		}
704		sector += BM_SECT_PER_BIT;
705	}
706	mdev->ov_position = sector;
707
708 requeue:
709	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
710	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
711	return 1;
712}
713
714int w_ov_finished(struct drbd_work *w, int cancel)
715{
716	struct drbd_conf *mdev = w->mdev;
717	kfree(w);
718	ov_out_of_sync_print(mdev);
719	drbd_resync_finished(mdev);
720
721	return 0;
722}
723
724static int w_resync_finished(struct drbd_work *w, int cancel)
725{
726	struct drbd_conf *mdev = w->mdev;
727	kfree(w);
728
729	drbd_resync_finished(mdev);
730
731	return 0;
732}
733
734static void ping_peer(struct drbd_conf *mdev)
735{
736	struct drbd_tconn *tconn = mdev->tconn;
737
738	clear_bit(GOT_PING_ACK, &tconn->flags);
739	request_ping(tconn);
740	wait_event(tconn->ping_wait,
741		   test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
742}
743
744int drbd_resync_finished(struct drbd_conf *mdev)
745{
746	unsigned long db, dt, dbdt;
747	unsigned long n_oos;
748	union drbd_state os, ns;
749	struct drbd_work *w;
750	char *khelper_cmd = NULL;
751	int verify_done = 0;
752
753	/* Remove all elements from the resync LRU. Since future actions
754	 * might set bits in the (main) bitmap, then the entries in the
755	 * resync LRU would be wrong. */
756	if (drbd_rs_del_all(mdev)) {
757		/* In case this is not possible now, most probably because
758		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
759		 * queue (or even the read operations for those packets
760		 * is not finished by now).   Retry in 100ms. */
761
762		schedule_timeout_interruptible(HZ / 10);
763		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
764		if (w) {
765			w->cb = w_resync_finished;
766			drbd_queue_work(&mdev->tconn->data.work, w);
767			return 1;
768		}
769		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
770	}
771
772	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
773	if (dt <= 0)
774		dt = 1;
775	db = mdev->rs_total;
776	dbdt = Bit2KB(db/dt);
777	mdev->rs_paused /= HZ;
778
779	if (!get_ldev(mdev))
780		goto out;
781
782	ping_peer(mdev);
783
784	spin_lock_irq(&mdev->tconn->req_lock);
785	os = drbd_read_state(mdev);
786
787	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
788
789	/* This protects us against multiple calls (that can happen in the presence
790	   of application IO), and against connectivity loss just before we arrive here. */
791	if (os.conn <= C_CONNECTED)
792		goto out_unlock;
793
794	ns = os;
795	ns.conn = C_CONNECTED;
796
797	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
798	     verify_done ? "Online verify " : "Resync",
799	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
800
801	n_oos = drbd_bm_total_weight(mdev);
802
803	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
804		if (n_oos) {
805			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
806			      n_oos, Bit2KB(1));
807			khelper_cmd = "out-of-sync";
808		}
809	} else {
810		D_ASSERT((n_oos - mdev->rs_failed) == 0);
811
812		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
813			khelper_cmd = "after-resync-target";
814
815		if (mdev->tconn->csums_tfm && mdev->rs_total) {
816			const unsigned long s = mdev->rs_same_csum;
817			const unsigned long t = mdev->rs_total;
818			const int ratio =
819				(t == 0)     ? 0 :
820			(t < 100000) ? ((s*100)/t) : (s/(t/100));
821			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
822			     "transferred %luK total %luK\n",
823			     ratio,
824			     Bit2KB(mdev->rs_same_csum),
825			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
826			     Bit2KB(mdev->rs_total));
827		}
828	}
829
830	if (mdev->rs_failed) {
831		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
832
833		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
834			ns.disk = D_INCONSISTENT;
835			ns.pdsk = D_UP_TO_DATE;
836		} else {
837			ns.disk = D_UP_TO_DATE;
838			ns.pdsk = D_INCONSISTENT;
839		}
840	} else {
841		ns.disk = D_UP_TO_DATE;
842		ns.pdsk = D_UP_TO_DATE;
843
844		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
845			if (mdev->p_uuid) {
846				int i;
847				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
848					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
849				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
850				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
851			} else {
852				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
853			}
854		}
855
856		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
857			/* for verify runs, we don't update uuids here,
858			 * so there would be nothing to report. */
859			drbd_uuid_set_bm(mdev, 0UL);
860			drbd_print_uuids(mdev, "updated UUIDs");
861			if (mdev->p_uuid) {
862				/* Now the two UUID sets are equal, update what we
863				 * know of the peer. */
864				int i;
865				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
866					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
867			}
868		}
869	}
870
871	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
872out_unlock:
873	spin_unlock_irq(&mdev->tconn->req_lock);
874	put_ldev(mdev);
875out:
876	mdev->rs_total  = 0;
877	mdev->rs_failed = 0;
878	mdev->rs_paused = 0;
879	if (verify_done)
880		mdev->ov_start_sector = 0;
881
882	drbd_md_sync(mdev);
883
884	if (khelper_cmd)
885		drbd_khelper(mdev, khelper_cmd);
886
887	return 1;
888}
889
890/* helper */
891static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
892{
893	if (drbd_peer_req_has_active_page(peer_req)) {
894		/* This might happen if sendpage() has not finished */
895		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
896		atomic_add(i, &mdev->pp_in_use_by_net);
897		atomic_sub(i, &mdev->pp_in_use);
898		spin_lock_irq(&mdev->tconn->req_lock);
899		list_add_tail(&peer_req->w.list, &mdev->net_ee);
900		spin_unlock_irq(&mdev->tconn->req_lock);
901		wake_up(&drbd_pp_wait);
902	} else
903		drbd_free_peer_req(mdev, peer_req);
904}
905
906/**
907 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
908 * @mdev:	DRBD device.
909 * @w:		work object.
910 * @cancel:	The connection will be closed anyways
911 */
912int w_e_end_data_req(struct drbd_work *w, int cancel)
913{
914	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
915	struct drbd_conf *mdev = w->mdev;
916	int err;
917
918	if (unlikely(cancel)) {
919		drbd_free_peer_req(mdev, peer_req);
920		dec_unacked(mdev);
921		return 0;
922	}
923
924	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
925		err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
926	} else {
927		if (__ratelimit(&drbd_ratelimit_state))
928			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
929			    (unsigned long long)peer_req->i.sector);
930
931		err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
932	}
933
934	dec_unacked(mdev);
935
936	move_to_net_ee_or_free(mdev, peer_req);
937
938	if (unlikely(err))
939		dev_err(DEV, "drbd_send_block() failed\n");
940	return err;
941}
942
943/**
944 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
945 * @mdev:	DRBD device.
946 * @w:		work object.
947 * @cancel:	The connection will be closed anyways
948 */
949int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
950{
951	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
952	struct drbd_conf *mdev = w->mdev;
953	int err;
954
955	if (unlikely(cancel)) {
956		drbd_free_peer_req(mdev, peer_req);
957		dec_unacked(mdev);
958		return 0;
959	}
960
961	if (get_ldev_if_state(mdev, D_FAILED)) {
962		drbd_rs_complete_io(mdev, peer_req->i.sector);
963		put_ldev(mdev);
964	}
965
966	if (mdev->state.conn == C_AHEAD) {
967		err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
968	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
969		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
970			inc_rs_pending(mdev);
971			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
972		} else {
973			if (__ratelimit(&drbd_ratelimit_state))
974				dev_err(DEV, "Not sending RSDataReply, "
975				    "partner DISKLESS!\n");
976			err = 0;
977		}
978	} else {
979		if (__ratelimit(&drbd_ratelimit_state))
980			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
981			    (unsigned long long)peer_req->i.sector);
982
983		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
984
985		/* update resync data with failure */
986		drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
987	}
988
989	dec_unacked(mdev);
990
991	move_to_net_ee_or_free(mdev, peer_req);
992
993	if (unlikely(err))
994		dev_err(DEV, "drbd_send_block() failed\n");
995	return err;
996}
997
998int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
999{
1000	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1001	struct drbd_conf *mdev = w->mdev;
1002	struct digest_info *di;
1003	int digest_size;
1004	void *digest = NULL;
1005	int err, eq = 0;
1006
1007	if (unlikely(cancel)) {
1008		drbd_free_peer_req(mdev, peer_req);
1009		dec_unacked(mdev);
1010		return 0;
1011	}
1012
1013	if (get_ldev(mdev)) {
1014		drbd_rs_complete_io(mdev, peer_req->i.sector);
1015		put_ldev(mdev);
1016	}
1017
1018	di = peer_req->digest;
1019
1020	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1021		/* quick hack to try to avoid a race against reconfiguration.
1022		 * a real fix would be much more involved,
1023		 * introducing more locking mechanisms */
1024		if (mdev->tconn->csums_tfm) {
1025			digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1026			D_ASSERT(digest_size == di->digest_size);
1027			digest = kmalloc(digest_size, GFP_NOIO);
1028		}
1029		if (digest) {
1030			drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1031			eq = !memcmp(digest, di->digest, digest_size);
1032			kfree(digest);
1033		}
1034
1035		if (eq) {
1036			drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1037			/* rs_same_csums unit is BM_BLOCK_SIZE */
1038			mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1039			err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1040		} else {
1041			inc_rs_pending(mdev);
1042			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1043			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1044			kfree(di);
1045			err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1046		}
1047	} else {
1048		err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1049		if (__ratelimit(&drbd_ratelimit_state))
1050			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1051	}
1052
1053	dec_unacked(mdev);
1054	move_to_net_ee_or_free(mdev, peer_req);
1055
1056	if (unlikely(err))
1057		dev_err(DEV, "drbd_send_block/ack() failed\n");
1058	return err;
1059}
1060
1061int w_e_end_ov_req(struct drbd_work *w, int cancel)
1062{
1063	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1064	struct drbd_conf *mdev = w->mdev;
1065	sector_t sector = peer_req->i.sector;
1066	unsigned int size = peer_req->i.size;
1067	int digest_size;
1068	void *digest;
1069	int err = 0;
1070
1071	if (unlikely(cancel))
1072		goto out;
1073
1074	digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1075	digest = kmalloc(digest_size, GFP_NOIO);
1076	if (!digest) {
1077		err = 1;	/* terminate the connection in case the allocation failed */
1078		goto out;
1079	}
1080
1081	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1082		drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1083	else
1084		memset(digest, 0, digest_size);
1085
1086	/* Free e and pages before send.
1087	 * In case we block on congestion, we could otherwise run into
1088	 * some distributed deadlock, if the other side blocks on
1089	 * congestion as well, because our receiver blocks in
1090	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1091	drbd_free_peer_req(mdev, peer_req);
1092	peer_req = NULL;
1093	inc_rs_pending(mdev);
1094	err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1095	if (err)
1096		dec_rs_pending(mdev);
1097	kfree(digest);
1098
1099out:
1100	if (peer_req)
1101		drbd_free_peer_req(mdev, peer_req);
1102	dec_unacked(mdev);
1103	return err;
1104}
1105
1106void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1107{
1108	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1109		mdev->ov_last_oos_size += size>>9;
1110	} else {
1111		mdev->ov_last_oos_start = sector;
1112		mdev->ov_last_oos_size = size>>9;
1113	}
1114	drbd_set_out_of_sync(mdev, sector, size);
1115}
1116
1117int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1118{
1119	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1120	struct drbd_conf *mdev = w->mdev;
1121	struct digest_info *di;
1122	void *digest;
1123	sector_t sector = peer_req->i.sector;
1124	unsigned int size = peer_req->i.size;
1125	int digest_size;
1126	int err, eq = 0;
1127
1128	if (unlikely(cancel)) {
1129		drbd_free_peer_req(mdev, peer_req);
1130		dec_unacked(mdev);
1131		return 0;
1132	}
1133
1134	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1135	 * the resync lru has been cleaned up already */
1136	if (get_ldev(mdev)) {
1137		drbd_rs_complete_io(mdev, peer_req->i.sector);
1138		put_ldev(mdev);
1139	}
1140
1141	di = peer_req->digest;
1142
1143	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1144		digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1145		digest = kmalloc(digest_size, GFP_NOIO);
1146		if (digest) {
1147			drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1148
1149			D_ASSERT(digest_size == di->digest_size);
1150			eq = !memcmp(digest, di->digest, digest_size);
1151			kfree(digest);
1152		}
1153	}
1154
1155	/* Free peer_req and pages before send.
1156	 * In case we block on congestion, we could otherwise run into
1157	 * some distributed deadlock, if the other side blocks on
1158	 * congestion as well, because our receiver blocks in
1159	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1160	drbd_free_peer_req(mdev, peer_req);
1161	if (!eq)
1162		drbd_ov_out_of_sync_found(mdev, sector, size);
1163	else
1164		ov_out_of_sync_print(mdev);
1165
1166	err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1167			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1168
1169	dec_unacked(mdev);
1170
1171	--mdev->ov_left;
1172
1173	/* let's advance progress step marks only for every other megabyte */
1174	if ((mdev->ov_left & 0x200) == 0x200)
1175		drbd_advance_rs_marks(mdev, mdev->ov_left);
1176
1177	if (mdev->ov_left == 0) {
1178		ov_out_of_sync_print(mdev);
1179		drbd_resync_finished(mdev);
1180	}
1181
1182	return err;
1183}
1184
1185int w_prev_work_done(struct drbd_work *w, int cancel)
1186{
1187	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1188
1189	complete(&b->done);
1190	return 0;
1191}
1192
1193int w_send_barrier(struct drbd_work *w, int cancel)
1194{
1195	struct drbd_socket *sock;
1196	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1197	struct drbd_conf *mdev = w->mdev;
1198	struct p_barrier *p;
1199
1200	/* really avoid racing with tl_clear.  w.cb may have been referenced
1201	 * just before it was reassigned and re-queued, so double check that.
1202	 * actually, this race was harmless, since we only try to send the
1203	 * barrier packet here, and otherwise do nothing with the object.
1204	 * but compare with the head of w_clear_epoch */
1205	spin_lock_irq(&mdev->tconn->req_lock);
1206	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1207		cancel = 1;
1208	spin_unlock_irq(&mdev->tconn->req_lock);
1209	if (cancel)
1210		return 0;
1211
1212	sock = &mdev->tconn->data;
1213	p = drbd_prepare_command(mdev, sock);
1214	if (!p)
1215		return -EIO;
1216	p->barrier = b->br_number;
1217	/* inc_ap_pending was done where this was queued.
1218	 * dec_ap_pending will be done in got_BarrierAck
1219	 * or (on connection loss) in w_clear_epoch.  */
1220	return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
1221}
1222
1223int w_send_write_hint(struct drbd_work *w, int cancel)
1224{
1225	struct drbd_conf *mdev = w->mdev;
1226	struct drbd_socket *sock;
1227
1228	if (cancel)
1229		return 0;
1230	sock = &mdev->tconn->data;
1231	if (!drbd_prepare_command(mdev, sock))
1232		return -EIO;
1233	return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1234}
1235
1236int w_send_out_of_sync(struct drbd_work *w, int cancel)
1237{
1238	struct drbd_request *req = container_of(w, struct drbd_request, w);
1239	struct drbd_conf *mdev = w->mdev;
1240	int err;
1241
1242	if (unlikely(cancel)) {
1243		req_mod(req, SEND_CANCELED);
1244		return 0;
1245	}
1246
1247	err = drbd_send_out_of_sync(mdev, req);
1248	req_mod(req, OOS_HANDED_TO_NETWORK);
1249
1250	return err;
1251}
1252
1253/**
1254 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1255 * @mdev:	DRBD device.
1256 * @w:		work object.
1257 * @cancel:	The connection will be closed anyways
1258 */
1259int w_send_dblock(struct drbd_work *w, int cancel)
1260{
1261	struct drbd_request *req = container_of(w, struct drbd_request, w);
1262	struct drbd_conf *mdev = w->mdev;
1263	int err;
1264
1265	if (unlikely(cancel)) {
1266		req_mod(req, SEND_CANCELED);
1267		return 0;
1268	}
1269
1270	err = drbd_send_dblock(mdev, req);
1271	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1272
1273	return err;
1274}
1275
1276/**
1277 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1278 * @mdev:	DRBD device.
1279 * @w:		work object.
1280 * @cancel:	The connection will be closed anyways
1281 */
1282int w_send_read_req(struct drbd_work *w, int cancel)
1283{
1284	struct drbd_request *req = container_of(w, struct drbd_request, w);
1285	struct drbd_conf *mdev = w->mdev;
1286	int err;
1287
1288	if (unlikely(cancel)) {
1289		req_mod(req, SEND_CANCELED);
1290		return 0;
1291	}
1292
1293	err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1294				 (unsigned long)req);
1295
1296	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1297
1298	return err;
1299}
1300
1301int w_restart_disk_io(struct drbd_work *w, int cancel)
1302{
1303	struct drbd_request *req = container_of(w, struct drbd_request, w);
1304	struct drbd_conf *mdev = w->mdev;
1305
1306	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1307		drbd_al_begin_io(mdev, &req->i);
1308	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1309	   theoretically. Practically it can not deadlock, since this is
1310	   only used when unfreezing IOs. All the extents of the requests
1311	   that made it into the TL are already active */
1312
1313	drbd_req_make_private_bio(req, req->master_bio);
1314	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1315	generic_make_request(req->private_bio);
1316
1317	return 0;
1318}
1319
1320static int _drbd_may_sync_now(struct drbd_conf *mdev)
1321{
1322	struct drbd_conf *odev = mdev;
1323
1324	while (1) {
1325		if (!odev->ldev)
1326			return 1;
1327		if (odev->ldev->dc.resync_after == -1)
1328			return 1;
1329		odev = minor_to_mdev(odev->ldev->dc.resync_after);
1330		if (!expect(odev))
1331			return 1;
1332		if ((odev->state.conn >= C_SYNC_SOURCE &&
1333		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1334		    odev->state.aftr_isp || odev->state.peer_isp ||
1335		    odev->state.user_isp)
1336			return 0;
1337	}
1338}
1339
1340/**
1341 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1342 * @mdev:	DRBD device.
1343 *
1344 * Called from process context only (admin command and after_state_ch).
1345 */
1346static int _drbd_pause_after(struct drbd_conf *mdev)
1347{
1348	struct drbd_conf *odev;
1349	int i, rv = 0;
1350
1351	rcu_read_lock();
1352	idr_for_each_entry(&minors, odev, i) {
1353		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1354			continue;
1355		if (!_drbd_may_sync_now(odev))
1356			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1357			       != SS_NOTHING_TO_DO);
1358	}
1359	rcu_read_unlock();
1360
1361	return rv;
1362}
1363
1364/**
1365 * _drbd_resume_next() - Resume resync on all devices that may resync now
1366 * @mdev:	DRBD device.
1367 *
1368 * Called from process context only (admin command and worker).
1369 */
1370static int _drbd_resume_next(struct drbd_conf *mdev)
1371{
1372	struct drbd_conf *odev;
1373	int i, rv = 0;
1374
1375	rcu_read_lock();
1376	idr_for_each_entry(&minors, odev, i) {
1377		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1378			continue;
1379		if (odev->state.aftr_isp) {
1380			if (_drbd_may_sync_now(odev))
1381				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1382							CS_HARD, NULL)
1383				       != SS_NOTHING_TO_DO) ;
1384		}
1385	}
1386	rcu_read_unlock();
1387	return rv;
1388}
1389
1390void resume_next_sg(struct drbd_conf *mdev)
1391{
1392	write_lock_irq(&global_state_lock);
1393	_drbd_resume_next(mdev);
1394	write_unlock_irq(&global_state_lock);
1395}
1396
1397void suspend_other_sg(struct drbd_conf *mdev)
1398{
1399	write_lock_irq(&global_state_lock);
1400	_drbd_pause_after(mdev);
1401	write_unlock_irq(&global_state_lock);
1402}
1403
1404static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1405{
1406	struct drbd_conf *odev;
1407
1408	if (o_minor == -1)
1409		return NO_ERROR;
1410	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1411		return ERR_SYNC_AFTER;
1412
1413	/* check for loops */
1414	odev = minor_to_mdev(o_minor);
1415	while (1) {
1416		if (odev == mdev)
1417			return ERR_SYNC_AFTER_CYCLE;
1418
1419		/* dependency chain ends here, no cycles. */
1420		if (odev->ldev->dc.resync_after == -1)
1421			return NO_ERROR;
1422
1423		/* follow the dependency chain */
1424		odev = minor_to_mdev(odev->ldev->dc.resync_after);
1425	}
1426}
1427
1428int drbd_alter_sa(struct drbd_conf *mdev, int na)
1429{
1430	int changes;
1431	int retcode;
1432
1433	write_lock_irq(&global_state_lock);
1434	retcode = sync_after_error(mdev, na);
1435	if (retcode == NO_ERROR) {
1436		mdev->ldev->dc.resync_after = na;
1437		do {
1438			changes  = _drbd_pause_after(mdev);
1439			changes |= _drbd_resume_next(mdev);
1440		} while (changes);
1441	}
1442	write_unlock_irq(&global_state_lock);
1443	return retcode;
1444}
1445
1446void drbd_rs_controller_reset(struct drbd_conf *mdev)
1447{
1448	atomic_set(&mdev->rs_sect_in, 0);
1449	atomic_set(&mdev->rs_sect_ev, 0);
1450	mdev->rs_in_flight = 0;
1451	mdev->rs_planed = 0;
1452	spin_lock(&mdev->peer_seq_lock);
1453	fifo_set(&mdev->rs_plan_s, 0);
1454	spin_unlock(&mdev->peer_seq_lock);
1455}
1456
1457void start_resync_timer_fn(unsigned long data)
1458{
1459	struct drbd_conf *mdev = (struct drbd_conf *) data;
1460
1461	drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1462}
1463
1464int w_start_resync(struct drbd_work *w, int cancel)
1465{
1466	struct drbd_conf *mdev = w->mdev;
1467
1468	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1469		dev_warn(DEV, "w_start_resync later...\n");
1470		mdev->start_resync_timer.expires = jiffies + HZ/10;
1471		add_timer(&mdev->start_resync_timer);
1472		return 0;
1473	}
1474
1475	drbd_start_resync(mdev, C_SYNC_SOURCE);
1476	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1477	return 0;
1478}
1479
1480/**
1481 * drbd_start_resync() - Start the resync process
1482 * @mdev:	DRBD device.
1483 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1484 *
1485 * This function might bring you directly into one of the
1486 * C_PAUSED_SYNC_* states.
1487 */
1488void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1489{
1490	union drbd_state ns;
1491	int r;
1492
1493	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1494		dev_err(DEV, "Resync already running!\n");
1495		return;
1496	}
1497
1498	if (mdev->state.conn < C_AHEAD) {
1499		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1500		drbd_rs_cancel_all(mdev);
1501		/* This should be done when we abort the resync. We definitely do not
1502		   want to have this for connections going back and forth between
1503		   Ahead/Behind and SyncSource/SyncTarget */
1504	}
1505
1506	if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1507		if (side == C_SYNC_TARGET) {
1508			/* Since application IO was locked out during C_WF_BITMAP_T and
1509			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1510			   we check that we might make the data inconsistent. */
1511			r = drbd_khelper(mdev, "before-resync-target");
1512			r = (r >> 8) & 0xff;
1513			if (r > 0) {
1514				dev_info(DEV, "before-resync-target handler returned %d, "
1515					 "dropping connection.\n", r);
1516				conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1517				return;
1518			}
1519		} else /* C_SYNC_SOURCE */ {
1520			r = drbd_khelper(mdev, "before-resync-source");
1521			r = (r >> 8) & 0xff;
1522			if (r > 0) {
1523				if (r == 3) {
1524					dev_info(DEV, "before-resync-source handler returned %d, "
1525						 "ignoring. Old userland tools?", r);
1526				} else {
1527					dev_info(DEV, "before-resync-source handler returned %d, "
1528						 "dropping connection.\n", r);
1529					conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1530					return;
1531				}
1532			}
1533		}
1534	}
1535
1536	if (current == mdev->tconn->worker.task) {
1537		/* The worker should not sleep waiting for state_mutex,
1538		   that can take long */
1539		if (!mutex_trylock(mdev->state_mutex)) {
1540			set_bit(B_RS_H_DONE, &mdev->flags);
1541			mdev->start_resync_timer.expires = jiffies + HZ/5;
1542			add_timer(&mdev->start_resync_timer);
1543			return;
1544		}
1545	} else {
1546		mutex_lock(mdev->state_mutex);
1547	}
1548	clear_bit(B_RS_H_DONE, &mdev->flags);
1549
1550	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1551		mutex_unlock(mdev->state_mutex);
1552		return;
1553	}
1554
1555	write_lock_irq(&global_state_lock);
1556	ns = drbd_read_state(mdev);
1557
1558	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1559
1560	ns.conn = side;
1561
1562	if (side == C_SYNC_TARGET)
1563		ns.disk = D_INCONSISTENT;
1564	else /* side == C_SYNC_SOURCE */
1565		ns.pdsk = D_INCONSISTENT;
1566
1567	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1568	ns = drbd_read_state(mdev);
1569
1570	if (ns.conn < C_CONNECTED)
1571		r = SS_UNKNOWN_ERROR;
1572
1573	if (r == SS_SUCCESS) {
1574		unsigned long tw = drbd_bm_total_weight(mdev);
1575		unsigned long now = jiffies;
1576		int i;
1577
1578		mdev->rs_failed    = 0;
1579		mdev->rs_paused    = 0;
1580		mdev->rs_same_csum = 0;
1581		mdev->rs_last_events = 0;
1582		mdev->rs_last_sect_ev = 0;
1583		mdev->rs_total     = tw;
1584		mdev->rs_start     = now;
1585		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1586			mdev->rs_mark_left[i] = tw;
1587			mdev->rs_mark_time[i] = now;
1588		}
1589		_drbd_pause_after(mdev);
1590	}
1591	write_unlock_irq(&global_state_lock);
1592
1593	if (r == SS_SUCCESS) {
1594		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1595		     drbd_conn_str(ns.conn),
1596		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1597		     (unsigned long) mdev->rs_total);
1598		if (side == C_SYNC_TARGET)
1599			mdev->bm_resync_fo = 0;
1600
1601		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1602		 * with w_send_oos, or the sync target will get confused as to
1603		 * how much bits to resync.  We cannot do that always, because for an
1604		 * empty resync and protocol < 95, we need to do it here, as we call
1605		 * drbd_resync_finished from here in that case.
1606		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1607		 * and from after_state_ch otherwise. */
1608		if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1609			drbd_gen_and_send_sync_uuid(mdev);
1610
1611		if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1612			/* This still has a race (about when exactly the peers
1613			 * detect connection loss) that can lead to a full sync
1614			 * on next handshake. In 8.3.9 we fixed this with explicit
1615			 * resync-finished notifications, but the fix
1616			 * introduces a protocol change.  Sleeping for some
1617			 * time longer than the ping interval + timeout on the
1618			 * SyncSource, to give the SyncTarget the chance to
1619			 * detect connection loss, then waiting for a ping
1620			 * response (implicit in drbd_resync_finished) reduces
1621			 * the race considerably, but does not solve it. */
1622			if (side == C_SYNC_SOURCE) {
1623				struct net_conf *nc;
1624				int timeo;
1625
1626				rcu_read_lock();
1627				nc = rcu_dereference(mdev->tconn->net_conf);
1628				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1629				rcu_read_unlock();
1630				schedule_timeout_interruptible(timeo);
1631			}
1632			drbd_resync_finished(mdev);
1633		}
1634
1635		drbd_rs_controller_reset(mdev);
1636		/* ns.conn may already be != mdev->state.conn,
1637		 * we may have been paused in between, or become paused until
1638		 * the timer triggers.
1639		 * No matter, that is handled in resync_timer_fn() */
1640		if (ns.conn == C_SYNC_TARGET)
1641			mod_timer(&mdev->resync_timer, jiffies);
1642
1643		drbd_md_sync(mdev);
1644	}
1645	put_ldev(mdev);
1646	mutex_unlock(mdev->state_mutex);
1647}
1648
1649int drbd_worker(struct drbd_thread *thi)
1650{
1651	struct drbd_tconn *tconn = thi->tconn;
1652	struct drbd_work *w = NULL;
1653	struct drbd_conf *mdev;
1654	struct net_conf *nc;
1655	LIST_HEAD(work_list);
1656	int vnr, intr = 0;
1657	int cork;
1658
1659	while (get_t_state(thi) == RUNNING) {
1660		drbd_thread_current_set_cpu(thi);
1661
1662		if (down_trylock(&tconn->data.work.s)) {
1663			mutex_lock(&tconn->data.mutex);
1664
1665			rcu_read_lock();
1666			nc = rcu_dereference(tconn->net_conf);
1667			cork = nc ? !nc->no_cork : 0;
1668			rcu_read_unlock();
1669
1670			if (tconn->data.socket && cork)
1671				drbd_tcp_uncork(tconn->data.socket);
1672			mutex_unlock(&tconn->data.mutex);
1673
1674			intr = down_interruptible(&tconn->data.work.s);
1675
1676			mutex_lock(&tconn->data.mutex);
1677			if (tconn->data.socket  && cork)
1678				drbd_tcp_cork(tconn->data.socket);
1679			mutex_unlock(&tconn->data.mutex);
1680		}
1681
1682		if (intr) {
1683			flush_signals(current);
1684			if (get_t_state(thi) == RUNNING) {
1685				conn_warn(tconn, "Worker got an unexpected signal\n");
1686				continue;
1687			}
1688			break;
1689		}
1690
1691		if (get_t_state(thi) != RUNNING)
1692			break;
1693		/* With this break, we have done a down() but not consumed
1694		   the entry from the list. The cleanup code takes care of
1695		   this...   */
1696
1697		w = NULL;
1698		spin_lock_irq(&tconn->data.work.q_lock);
1699		if (list_empty(&tconn->data.work.q)) {
1700			/* something terribly wrong in our logic.
1701			 * we were able to down() the semaphore,
1702			 * but the list is empty... doh.
1703			 *
1704			 * what is the best thing to do now?
1705			 * try again from scratch, restarting the receiver,
1706			 * asender, whatnot? could break even more ugly,
1707			 * e.g. when we are primary, but no good local data.
1708			 *
1709			 * I'll try to get away just starting over this loop.
1710			 */
1711			conn_warn(tconn, "Work list unexpectedly empty\n");
1712			spin_unlock_irq(&tconn->data.work.q_lock);
1713			continue;
1714		}
1715		w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1716		list_del_init(&w->list);
1717		spin_unlock_irq(&tconn->data.work.q_lock);
1718
1719		if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1720			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1721			if (tconn->cstate >= C_WF_REPORT_PARAMS)
1722				conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1723		}
1724	}
1725
1726	spin_lock_irq(&tconn->data.work.q_lock);
1727	while (!list_empty(&tconn->data.work.q)) {
1728		list_splice_init(&tconn->data.work.q, &work_list);
1729		spin_unlock_irq(&tconn->data.work.q_lock);
1730
1731		while (!list_empty(&work_list)) {
1732			w = list_entry(work_list.next, struct drbd_work, list);
1733			list_del_init(&w->list);
1734			w->cb(w, 1);
1735		}
1736
1737		spin_lock_irq(&tconn->data.work.q_lock);
1738	}
1739	sema_init(&tconn->data.work.s, 0);
1740	/* DANGEROUS race: if someone did queue his work within the spinlock,
1741	 * but up() ed outside the spinlock, we could get an up() on the
1742	 * semaphore without corresponding list entry.
1743	 * So don't do that.
1744	 */
1745	spin_unlock_irq(&tconn->data.work.q_lock);
1746
1747	/* _drbd_set_state only uses stop_nowait.
1748	 * wait here for the exiting receiver. */
1749	drbd_thread_stop(&tconn->receiver);
1750
1751	down_read(&drbd_cfg_rwsem);
1752	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1753		D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1754		drbd_mdev_cleanup(mdev);
1755	}
1756	up_read(&drbd_cfg_rwsem);
1757
1758	return 0;
1759}
1760