drbd_worker.c revision 81a5d60ecfe1d94627abb54810445f0fd5892f42
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
40
41static int w_make_ov_request(struct drbd_work *w, int cancel);
42static int w_make_resync_request(struct drbd_work *w, int cancel);
43
44
45
46/* endio handlers:
47 *   drbd_md_io_complete (defined here)
48 *   drbd_request_endio (defined here)
49 *   drbd_peer_request_endio (defined here)
50 *   bm_async_io_complete (defined in drbd_bitmap.c)
51 *
52 * For all these callbacks, note the following:
53 * The callbacks will be called in irq context by the IDE drivers,
54 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
55 * Try to get the locking right :)
56 *
57 */
58
59
60/* About the global_state_lock
61   Each state transition on an device holds a read lock. In case we have
62   to evaluate the sync after dependencies, we grab a write lock, because
63   we need stable states on all devices for that.  */
64rwlock_t global_state_lock;
65
66/* used for synchronous meta data and bitmap IO
67 * submitted by drbd_md_sync_page_io()
68 */
69void drbd_md_io_complete(struct bio *bio, int error)
70{
71	struct drbd_md_io *md_io;
72
73	md_io = (struct drbd_md_io *)bio->bi_private;
74	md_io->error = error;
75
76	complete(&md_io->event);
77}
78
79/* reads on behalf of the partner,
80 * "submitted" by the receiver
81 */
82void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
83{
84	unsigned long flags = 0;
85	struct drbd_conf *mdev = peer_req->w.mdev;
86
87	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
88	mdev->read_cnt += peer_req->i.size >> 9;
89	list_del(&peer_req->w.list);
90	if (list_empty(&mdev->read_ee))
91		wake_up(&mdev->ee_wait);
92	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93		__drbd_chk_io_error(mdev, false);
94	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
95
96	drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
97	put_ldev(mdev);
98}
99
100/* writes on behalf of the partner, or resync writes,
101 * "submitted" by the receiver, final stage.  */
102static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103{
104	unsigned long flags = 0;
105	struct drbd_conf *mdev = peer_req->w.mdev;
106	sector_t e_sector;
107	int do_wake;
108	u64 block_id;
109	int do_al_complete_io;
110
111	/* after we moved peer_req to done_ee,
112	 * we may no longer access it,
113	 * it may be freed/reused already!
114	 * (as soon as we release the req_lock) */
115	e_sector = peer_req->i.sector;
116	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
117	block_id = peer_req->block_id;
118
119	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
120	mdev->writ_cnt += peer_req->i.size >> 9;
121	list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
122	list_add_tail(&peer_req->w.list, &mdev->done_ee);
123
124	/*
125	 * Do not remove from the write_requests tree here: we did not send the
126	 * Ack yet and did not wake possibly waiting conflicting requests.
127	 * Removed from the tree from "drbd_process_done_ee" within the
128	 * appropriate w.cb (e_end_block/e_end_resync_block) or from
129	 * _drbd_clear_done_ee.
130	 */
131
132	do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
133
134	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
135		__drbd_chk_io_error(mdev, false);
136	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
137
138	if (block_id == ID_SYNCER)
139		drbd_rs_complete_io(mdev, e_sector);
140
141	if (do_wake)
142		wake_up(&mdev->ee_wait);
143
144	if (do_al_complete_io)
145		drbd_al_complete_io(mdev, e_sector);
146
147	wake_asender(mdev->tconn);
148	put_ldev(mdev);
149}
150
151/* writes on behalf of the partner, or resync writes,
152 * "submitted" by the receiver.
153 */
154void drbd_peer_request_endio(struct bio *bio, int error)
155{
156	struct drbd_peer_request *peer_req = bio->bi_private;
157	struct drbd_conf *mdev = peer_req->w.mdev;
158	int uptodate = bio_flagged(bio, BIO_UPTODATE);
159	int is_write = bio_data_dir(bio) == WRITE;
160
161	if (error && __ratelimit(&drbd_ratelimit_state))
162		dev_warn(DEV, "%s: error=%d s=%llus\n",
163				is_write ? "write" : "read", error,
164				(unsigned long long)peer_req->i.sector);
165	if (!error && !uptodate) {
166		if (__ratelimit(&drbd_ratelimit_state))
167			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
168					is_write ? "write" : "read",
169					(unsigned long long)peer_req->i.sector);
170		/* strange behavior of some lower level drivers...
171		 * fail the request by clearing the uptodate flag,
172		 * but do not return any error?! */
173		error = -EIO;
174	}
175
176	if (error)
177		set_bit(__EE_WAS_ERROR, &peer_req->flags);
178
179	bio_put(bio); /* no need for the bio anymore */
180	if (atomic_dec_and_test(&peer_req->pending_bios)) {
181		if (is_write)
182			drbd_endio_write_sec_final(peer_req);
183		else
184			drbd_endio_read_sec_final(peer_req);
185	}
186}
187
188/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
189 */
190void drbd_request_endio(struct bio *bio, int error)
191{
192	unsigned long flags;
193	struct drbd_request *req = bio->bi_private;
194	struct drbd_conf *mdev = req->w.mdev;
195	struct bio_and_error m;
196	enum drbd_req_event what;
197	int uptodate = bio_flagged(bio, BIO_UPTODATE);
198
199	if (!error && !uptodate) {
200		dev_warn(DEV, "p %s: setting error to -EIO\n",
201			 bio_data_dir(bio) == WRITE ? "write" : "read");
202		/* strange behavior of some lower level drivers...
203		 * fail the request by clearing the uptodate flag,
204		 * but do not return any error?! */
205		error = -EIO;
206	}
207
208	/* to avoid recursion in __req_mod */
209	if (unlikely(error)) {
210		what = (bio_data_dir(bio) == WRITE)
211			? WRITE_COMPLETED_WITH_ERROR
212			: (bio_rw(bio) == READ)
213			  ? READ_COMPLETED_WITH_ERROR
214			  : READ_AHEAD_COMPLETED_WITH_ERROR;
215	} else
216		what = COMPLETED_OK;
217
218	bio_put(req->private_bio);
219	req->private_bio = ERR_PTR(error);
220
221	/* not req_mod(), we need irqsave here! */
222	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
223	__req_mod(req, what, &m);
224	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
225
226	if (m.bio)
227		complete_master_bio(mdev, &m);
228}
229
230int w_read_retry_remote(struct drbd_work *w, int cancel)
231{
232	struct drbd_request *req = container_of(w, struct drbd_request, w);
233	struct drbd_conf *mdev = w->mdev;
234
235	/* We should not detach for read io-error,
236	 * but try to WRITE the P_DATA_REPLY to the failed location,
237	 * to give the disk the chance to relocate that block */
238
239	spin_lock_irq(&mdev->tconn->req_lock);
240	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241		_req_mod(req, READ_RETRY_REMOTE_CANCELED);
242		spin_unlock_irq(&mdev->tconn->req_lock);
243		return 1;
244	}
245	spin_unlock_irq(&mdev->tconn->req_lock);
246
247	return w_send_read_req(w, 0);
248}
249
250void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
251		  struct drbd_peer_request *peer_req, void *digest)
252{
253	struct hash_desc desc;
254	struct scatterlist sg;
255	struct page *page = peer_req->pages;
256	struct page *tmp;
257	unsigned len;
258
259	desc.tfm = tfm;
260	desc.flags = 0;
261
262	sg_init_table(&sg, 1);
263	crypto_hash_init(&desc);
264
265	while ((tmp = page_chain_next(page))) {
266		/* all but the last page will be fully used */
267		sg_set_page(&sg, page, PAGE_SIZE, 0);
268		crypto_hash_update(&desc, &sg, sg.length);
269		page = tmp;
270	}
271	/* and now the last, possibly only partially used page */
272	len = peer_req->i.size & (PAGE_SIZE - 1);
273	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
274	crypto_hash_update(&desc, &sg, sg.length);
275	crypto_hash_final(&desc, digest);
276}
277
278void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
279{
280	struct hash_desc desc;
281	struct scatterlist sg;
282	struct bio_vec *bvec;
283	int i;
284
285	desc.tfm = tfm;
286	desc.flags = 0;
287
288	sg_init_table(&sg, 1);
289	crypto_hash_init(&desc);
290
291	__bio_for_each_segment(bvec, bio, i, 0) {
292		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
293		crypto_hash_update(&desc, &sg, sg.length);
294	}
295	crypto_hash_final(&desc, digest);
296}
297
298/* MAYBE merge common code with w_e_end_ov_req */
299static int w_e_send_csum(struct drbd_work *w, int cancel)
300{
301	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
302	struct drbd_conf *mdev = w->mdev;
303	int digest_size;
304	void *digest;
305	int ok = 1;
306
307	if (unlikely(cancel))
308		goto out;
309
310	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
311		goto out;
312
313	digest_size = crypto_hash_digestsize(mdev->csums_tfm);
314	digest = kmalloc(digest_size, GFP_NOIO);
315	if (digest) {
316		sector_t sector = peer_req->i.sector;
317		unsigned int size = peer_req->i.size;
318		drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
319		/* Free peer_req and pages before send.
320		 * In case we block on congestion, we could otherwise run into
321		 * some distributed deadlock, if the other side blocks on
322		 * congestion as well, because our receiver blocks in
323		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
324		drbd_free_ee(mdev, peer_req);
325		peer_req = NULL;
326		inc_rs_pending(mdev);
327		ok = drbd_send_drequest_csum(mdev, sector, size,
328					     digest, digest_size,
329					     P_CSUM_RS_REQUEST);
330		kfree(digest);
331	} else {
332		dev_err(DEV, "kmalloc() of digest failed.\n");
333		ok = 0;
334	}
335
336out:
337	if (peer_req)
338		drbd_free_ee(mdev, peer_req);
339
340	if (unlikely(!ok))
341		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
342	return ok;
343}
344
345#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
346
347static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
348{
349	struct drbd_peer_request *peer_req;
350
351	if (!get_ldev(mdev))
352		return -EIO;
353
354	if (drbd_rs_should_slow_down(mdev, sector))
355		goto defer;
356
357	/* GFP_TRY, because if there is no memory available right now, this may
358	 * be rescheduled for later. It is "only" background resync, after all. */
359	peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
360	if (!peer_req)
361		goto defer;
362
363	peer_req->w.cb = w_e_send_csum;
364	spin_lock_irq(&mdev->tconn->req_lock);
365	list_add(&peer_req->w.list, &mdev->read_ee);
366	spin_unlock_irq(&mdev->tconn->req_lock);
367
368	atomic_add(size >> 9, &mdev->rs_sect_ev);
369	if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
370		return 0;
371
372	/* If it failed because of ENOMEM, retry should help.  If it failed
373	 * because bio_add_page failed (probably broken lower level driver),
374	 * retry may or may not help.
375	 * If it does not, you may need to force disconnect. */
376	spin_lock_irq(&mdev->tconn->req_lock);
377	list_del(&peer_req->w.list);
378	spin_unlock_irq(&mdev->tconn->req_lock);
379
380	drbd_free_ee(mdev, peer_req);
381defer:
382	put_ldev(mdev);
383	return -EAGAIN;
384}
385
386int w_resync_timer(struct drbd_work *w, int cancel)
387{
388	struct drbd_conf *mdev = w->mdev;
389	switch (mdev->state.conn) {
390	case C_VERIFY_S:
391		w_make_ov_request(w, cancel);
392		break;
393	case C_SYNC_TARGET:
394		w_make_resync_request(w, cancel);
395		break;
396	}
397
398	return 1;
399}
400
401void resync_timer_fn(unsigned long data)
402{
403	struct drbd_conf *mdev = (struct drbd_conf *) data;
404
405	if (list_empty(&mdev->resync_work.list))
406		drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
407}
408
409static void fifo_set(struct fifo_buffer *fb, int value)
410{
411	int i;
412
413	for (i = 0; i < fb->size; i++)
414		fb->values[i] = value;
415}
416
417static int fifo_push(struct fifo_buffer *fb, int value)
418{
419	int ov;
420
421	ov = fb->values[fb->head_index];
422	fb->values[fb->head_index++] = value;
423
424	if (fb->head_index >= fb->size)
425		fb->head_index = 0;
426
427	return ov;
428}
429
430static void fifo_add_val(struct fifo_buffer *fb, int value)
431{
432	int i;
433
434	for (i = 0; i < fb->size; i++)
435		fb->values[i] += value;
436}
437
438static int drbd_rs_controller(struct drbd_conf *mdev)
439{
440	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
441	unsigned int want;     /* The number of sectors we want in the proxy */
442	int req_sect; /* Number of sectors to request in this turn */
443	int correction; /* Number of sectors more we need in the proxy*/
444	int cps; /* correction per invocation of drbd_rs_controller() */
445	int steps; /* Number of time steps to plan ahead */
446	int curr_corr;
447	int max_sect;
448
449	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
450	mdev->rs_in_flight -= sect_in;
451
452	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
453
454	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
455
456	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
457		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
458	} else { /* normal path */
459		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
460			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
461	}
462
463	correction = want - mdev->rs_in_flight - mdev->rs_planed;
464
465	/* Plan ahead */
466	cps = correction / steps;
467	fifo_add_val(&mdev->rs_plan_s, cps);
468	mdev->rs_planed += cps * steps;
469
470	/* What we do in this step */
471	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
472	spin_unlock(&mdev->peer_seq_lock);
473	mdev->rs_planed -= curr_corr;
474
475	req_sect = sect_in + curr_corr;
476	if (req_sect < 0)
477		req_sect = 0;
478
479	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
480	if (req_sect > max_sect)
481		req_sect = max_sect;
482
483	/*
484	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
485		 sect_in, mdev->rs_in_flight, want, correction,
486		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
487	*/
488
489	return req_sect;
490}
491
492static int drbd_rs_number_requests(struct drbd_conf *mdev)
493{
494	int number;
495	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
496		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
497		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
498	} else {
499		mdev->c_sync_rate = mdev->sync_conf.rate;
500		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
501	}
502
503	/* ignore the amount of pending requests, the resync controller should
504	 * throttle down to incoming reply rate soon enough anyways. */
505	return number;
506}
507
508static int w_make_resync_request(struct drbd_work *w, int cancel)
509{
510	struct drbd_conf *mdev = w->mdev;
511	unsigned long bit;
512	sector_t sector;
513	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
514	int max_bio_size;
515	int number, rollback_i, size;
516	int align, queued, sndbuf;
517	int i = 0;
518
519	if (unlikely(cancel))
520		return 1;
521
522	if (mdev->rs_total == 0) {
523		/* empty resync? */
524		drbd_resync_finished(mdev);
525		return 1;
526	}
527
528	if (!get_ldev(mdev)) {
529		/* Since we only need to access mdev->rsync a
530		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
531		   to continue resync with a broken disk makes no sense at
532		   all */
533		dev_err(DEV, "Disk broke down during resync!\n");
534		return 1;
535	}
536
537	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
538	number = drbd_rs_number_requests(mdev);
539	if (number == 0)
540		goto requeue;
541
542	for (i = 0; i < number; i++) {
543		/* Stop generating RS requests, when half of the send buffer is filled */
544		mutex_lock(&mdev->tconn->data.mutex);
545		if (mdev->tconn->data.socket) {
546			queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
547			sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
548		} else {
549			queued = 1;
550			sndbuf = 0;
551		}
552		mutex_unlock(&mdev->tconn->data.mutex);
553		if (queued > sndbuf / 2)
554			goto requeue;
555
556next_sector:
557		size = BM_BLOCK_SIZE;
558		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
559
560		if (bit == DRBD_END_OF_BITMAP) {
561			mdev->bm_resync_fo = drbd_bm_bits(mdev);
562			put_ldev(mdev);
563			return 1;
564		}
565
566		sector = BM_BIT_TO_SECT(bit);
567
568		if (drbd_rs_should_slow_down(mdev, sector) ||
569		    drbd_try_rs_begin_io(mdev, sector)) {
570			mdev->bm_resync_fo = bit;
571			goto requeue;
572		}
573		mdev->bm_resync_fo = bit + 1;
574
575		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
576			drbd_rs_complete_io(mdev, sector);
577			goto next_sector;
578		}
579
580#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
581		/* try to find some adjacent bits.
582		 * we stop if we have already the maximum req size.
583		 *
584		 * Additionally always align bigger requests, in order to
585		 * be prepared for all stripe sizes of software RAIDs.
586		 */
587		align = 1;
588		rollback_i = i;
589		for (;;) {
590			if (size + BM_BLOCK_SIZE > max_bio_size)
591				break;
592
593			/* Be always aligned */
594			if (sector & ((1<<(align+3))-1))
595				break;
596
597			/* do not cross extent boundaries */
598			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
599				break;
600			/* now, is it actually dirty, after all?
601			 * caution, drbd_bm_test_bit is tri-state for some
602			 * obscure reason; ( b == 0 ) would get the out-of-band
603			 * only accidentally right because of the "oddly sized"
604			 * adjustment below */
605			if (drbd_bm_test_bit(mdev, bit+1) != 1)
606				break;
607			bit++;
608			size += BM_BLOCK_SIZE;
609			if ((BM_BLOCK_SIZE << align) <= size)
610				align++;
611			i++;
612		}
613		/* if we merged some,
614		 * reset the offset to start the next drbd_bm_find_next from */
615		if (size > BM_BLOCK_SIZE)
616			mdev->bm_resync_fo = bit + 1;
617#endif
618
619		/* adjust very last sectors, in case we are oddly sized */
620		if (sector + (size>>9) > capacity)
621			size = (capacity-sector)<<9;
622		if (mdev->tconn->agreed_pro_version >= 89 && mdev->csums_tfm) {
623			switch (read_for_csum(mdev, sector, size)) {
624			case -EIO: /* Disk failure */
625				put_ldev(mdev);
626				return 0;
627			case -EAGAIN: /* allocation failed, or ldev busy */
628				drbd_rs_complete_io(mdev, sector);
629				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
630				i = rollback_i;
631				goto requeue;
632			case 0:
633				/* everything ok */
634				break;
635			default:
636				BUG();
637			}
638		} else {
639			inc_rs_pending(mdev);
640			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
641					       sector, size, ID_SYNCER)) {
642				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
643				dec_rs_pending(mdev);
644				put_ldev(mdev);
645				return 0;
646			}
647		}
648	}
649
650	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
651		/* last syncer _request_ was sent,
652		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
653		 * next sync group will resume), as soon as we receive the last
654		 * resync data block, and the last bit is cleared.
655		 * until then resync "work" is "inactive" ...
656		 */
657		put_ldev(mdev);
658		return 1;
659	}
660
661 requeue:
662	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
663	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
664	put_ldev(mdev);
665	return 1;
666}
667
668static int w_make_ov_request(struct drbd_work *w, int cancel)
669{
670	struct drbd_conf *mdev = w->mdev;
671	int number, i, size;
672	sector_t sector;
673	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
674
675	if (unlikely(cancel))
676		return 1;
677
678	number = drbd_rs_number_requests(mdev);
679
680	sector = mdev->ov_position;
681	for (i = 0; i < number; i++) {
682		if (sector >= capacity) {
683			return 1;
684		}
685
686		size = BM_BLOCK_SIZE;
687
688		if (drbd_rs_should_slow_down(mdev, sector) ||
689		    drbd_try_rs_begin_io(mdev, sector)) {
690			mdev->ov_position = sector;
691			goto requeue;
692		}
693
694		if (sector + (size>>9) > capacity)
695			size = (capacity-sector)<<9;
696
697		inc_rs_pending(mdev);
698		if (!drbd_send_ov_request(mdev, sector, size)) {
699			dec_rs_pending(mdev);
700			return 0;
701		}
702		sector += BM_SECT_PER_BIT;
703	}
704	mdev->ov_position = sector;
705
706 requeue:
707	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
708	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
709	return 1;
710}
711
712int w_ov_finished(struct drbd_work *w, int cancel)
713{
714	struct drbd_conf *mdev = w->mdev;
715	kfree(w);
716	ov_oos_print(mdev);
717	drbd_resync_finished(mdev);
718
719	return 1;
720}
721
722static int w_resync_finished(struct drbd_work *w, int cancel)
723{
724	struct drbd_conf *mdev = w->mdev;
725	kfree(w);
726
727	drbd_resync_finished(mdev);
728
729	return 1;
730}
731
732static void ping_peer(struct drbd_conf *mdev)
733{
734	struct drbd_tconn *tconn = mdev->tconn;
735
736	clear_bit(GOT_PING_ACK, &tconn->flags);
737	request_ping(tconn);
738	wait_event(tconn->ping_wait,
739		   test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
740}
741
742int drbd_resync_finished(struct drbd_conf *mdev)
743{
744	unsigned long db, dt, dbdt;
745	unsigned long n_oos;
746	union drbd_state os, ns;
747	struct drbd_work *w;
748	char *khelper_cmd = NULL;
749	int verify_done = 0;
750
751	/* Remove all elements from the resync LRU. Since future actions
752	 * might set bits in the (main) bitmap, then the entries in the
753	 * resync LRU would be wrong. */
754	if (drbd_rs_del_all(mdev)) {
755		/* In case this is not possible now, most probably because
756		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
757		 * queue (or even the read operations for those packets
758		 * is not finished by now).   Retry in 100ms. */
759
760		schedule_timeout_interruptible(HZ / 10);
761		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
762		if (w) {
763			w->cb = w_resync_finished;
764			drbd_queue_work(&mdev->tconn->data.work, w);
765			return 1;
766		}
767		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
768	}
769
770	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
771	if (dt <= 0)
772		dt = 1;
773	db = mdev->rs_total;
774	dbdt = Bit2KB(db/dt);
775	mdev->rs_paused /= HZ;
776
777	if (!get_ldev(mdev))
778		goto out;
779
780	ping_peer(mdev);
781
782	spin_lock_irq(&mdev->tconn->req_lock);
783	os = mdev->state;
784
785	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
786
787	/* This protects us against multiple calls (that can happen in the presence
788	   of application IO), and against connectivity loss just before we arrive here. */
789	if (os.conn <= C_CONNECTED)
790		goto out_unlock;
791
792	ns = os;
793	ns.conn = C_CONNECTED;
794
795	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
796	     verify_done ? "Online verify " : "Resync",
797	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
798
799	n_oos = drbd_bm_total_weight(mdev);
800
801	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
802		if (n_oos) {
803			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
804			      n_oos, Bit2KB(1));
805			khelper_cmd = "out-of-sync";
806		}
807	} else {
808		D_ASSERT((n_oos - mdev->rs_failed) == 0);
809
810		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
811			khelper_cmd = "after-resync-target";
812
813		if (mdev->csums_tfm && mdev->rs_total) {
814			const unsigned long s = mdev->rs_same_csum;
815			const unsigned long t = mdev->rs_total;
816			const int ratio =
817				(t == 0)     ? 0 :
818			(t < 100000) ? ((s*100)/t) : (s/(t/100));
819			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
820			     "transferred %luK total %luK\n",
821			     ratio,
822			     Bit2KB(mdev->rs_same_csum),
823			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
824			     Bit2KB(mdev->rs_total));
825		}
826	}
827
828	if (mdev->rs_failed) {
829		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
830
831		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
832			ns.disk = D_INCONSISTENT;
833			ns.pdsk = D_UP_TO_DATE;
834		} else {
835			ns.disk = D_UP_TO_DATE;
836			ns.pdsk = D_INCONSISTENT;
837		}
838	} else {
839		ns.disk = D_UP_TO_DATE;
840		ns.pdsk = D_UP_TO_DATE;
841
842		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
843			if (mdev->p_uuid) {
844				int i;
845				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
846					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
847				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
848				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
849			} else {
850				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
851			}
852		}
853
854		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
855			/* for verify runs, we don't update uuids here,
856			 * so there would be nothing to report. */
857			drbd_uuid_set_bm(mdev, 0UL);
858			drbd_print_uuids(mdev, "updated UUIDs");
859			if (mdev->p_uuid) {
860				/* Now the two UUID sets are equal, update what we
861				 * know of the peer. */
862				int i;
863				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
864					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
865			}
866		}
867	}
868
869	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
870out_unlock:
871	spin_unlock_irq(&mdev->tconn->req_lock);
872	put_ldev(mdev);
873out:
874	mdev->rs_total  = 0;
875	mdev->rs_failed = 0;
876	mdev->rs_paused = 0;
877	if (verify_done)
878		mdev->ov_start_sector = 0;
879
880	drbd_md_sync(mdev);
881
882	if (khelper_cmd)
883		drbd_khelper(mdev, khelper_cmd);
884
885	return 1;
886}
887
888/* helper */
889static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
890{
891	if (drbd_ee_has_active_page(peer_req)) {
892		/* This might happen if sendpage() has not finished */
893		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
894		atomic_add(i, &mdev->pp_in_use_by_net);
895		atomic_sub(i, &mdev->pp_in_use);
896		spin_lock_irq(&mdev->tconn->req_lock);
897		list_add_tail(&peer_req->w.list, &mdev->net_ee);
898		spin_unlock_irq(&mdev->tconn->req_lock);
899		wake_up(&drbd_pp_wait);
900	} else
901		drbd_free_ee(mdev, peer_req);
902}
903
904/**
905 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
906 * @mdev:	DRBD device.
907 * @w:		work object.
908 * @cancel:	The connection will be closed anyways
909 */
910int w_e_end_data_req(struct drbd_work *w, int cancel)
911{
912	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
913	struct drbd_conf *mdev = w->mdev;
914	int ok;
915
916	if (unlikely(cancel)) {
917		drbd_free_ee(mdev, peer_req);
918		dec_unacked(mdev);
919		return 1;
920	}
921
922	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
923		ok = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
924	} else {
925		if (__ratelimit(&drbd_ratelimit_state))
926			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
927			    (unsigned long long)peer_req->i.sector);
928
929		ok = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
930	}
931
932	dec_unacked(mdev);
933
934	move_to_net_ee_or_free(mdev, peer_req);
935
936	if (unlikely(!ok))
937		dev_err(DEV, "drbd_send_block() failed\n");
938	return ok;
939}
940
941/**
942 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
943 * @mdev:	DRBD device.
944 * @w:		work object.
945 * @cancel:	The connection will be closed anyways
946 */
947int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
948{
949	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
950	struct drbd_conf *mdev = w->mdev;
951	int ok;
952
953	if (unlikely(cancel)) {
954		drbd_free_ee(mdev, peer_req);
955		dec_unacked(mdev);
956		return 1;
957	}
958
959	if (get_ldev_if_state(mdev, D_FAILED)) {
960		drbd_rs_complete_io(mdev, peer_req->i.sector);
961		put_ldev(mdev);
962	}
963
964	if (mdev->state.conn == C_AHEAD) {
965		ok = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
966	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
967		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
968			inc_rs_pending(mdev);
969			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
970		} else {
971			if (__ratelimit(&drbd_ratelimit_state))
972				dev_err(DEV, "Not sending RSDataReply, "
973				    "partner DISKLESS!\n");
974			ok = 1;
975		}
976	} else {
977		if (__ratelimit(&drbd_ratelimit_state))
978			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
979			    (unsigned long long)peer_req->i.sector);
980
981		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
982
983		/* update resync data with failure */
984		drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
985	}
986
987	dec_unacked(mdev);
988
989	move_to_net_ee_or_free(mdev, peer_req);
990
991	if (unlikely(!ok))
992		dev_err(DEV, "drbd_send_block() failed\n");
993	return ok;
994}
995
996int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
997{
998	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
999	struct drbd_conf *mdev = w->mdev;
1000	struct digest_info *di;
1001	int digest_size;
1002	void *digest = NULL;
1003	int ok, eq = 0;
1004
1005	if (unlikely(cancel)) {
1006		drbd_free_ee(mdev, peer_req);
1007		dec_unacked(mdev);
1008		return 1;
1009	}
1010
1011	if (get_ldev(mdev)) {
1012		drbd_rs_complete_io(mdev, peer_req->i.sector);
1013		put_ldev(mdev);
1014	}
1015
1016	di = peer_req->digest;
1017
1018	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1019		/* quick hack to try to avoid a race against reconfiguration.
1020		 * a real fix would be much more involved,
1021		 * introducing more locking mechanisms */
1022		if (mdev->csums_tfm) {
1023			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1024			D_ASSERT(digest_size == di->digest_size);
1025			digest = kmalloc(digest_size, GFP_NOIO);
1026		}
1027		if (digest) {
1028			drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
1029			eq = !memcmp(digest, di->digest, digest_size);
1030			kfree(digest);
1031		}
1032
1033		if (eq) {
1034			drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1035			/* rs_same_csums unit is BM_BLOCK_SIZE */
1036			mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1037			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1038		} else {
1039			inc_rs_pending(mdev);
1040			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1041			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1042			kfree(di);
1043			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1044		}
1045	} else {
1046		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1047		if (__ratelimit(&drbd_ratelimit_state))
1048			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1049	}
1050
1051	dec_unacked(mdev);
1052	move_to_net_ee_or_free(mdev, peer_req);
1053
1054	if (unlikely(!ok))
1055		dev_err(DEV, "drbd_send_block/ack() failed\n");
1056	return ok;
1057}
1058
1059int w_e_end_ov_req(struct drbd_work *w, int cancel)
1060{
1061	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1062	struct drbd_conf *mdev = w->mdev;
1063	sector_t sector = peer_req->i.sector;
1064	unsigned int size = peer_req->i.size;
1065	int digest_size;
1066	void *digest;
1067	int ok = 1;
1068
1069	if (unlikely(cancel))
1070		goto out;
1071
1072	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1073	digest = kmalloc(digest_size, GFP_NOIO);
1074	if (!digest) {
1075		ok = 0;	/* terminate the connection in case the allocation failed */
1076		goto out;
1077	}
1078
1079	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1080		drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1081	else
1082		memset(digest, 0, digest_size);
1083
1084	/* Free e and pages before send.
1085	 * In case we block on congestion, we could otherwise run into
1086	 * some distributed deadlock, if the other side blocks on
1087	 * congestion as well, because our receiver blocks in
1088	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1089	drbd_free_ee(mdev, peer_req);
1090	peer_req = NULL;
1091	inc_rs_pending(mdev);
1092	ok = drbd_send_drequest_csum(mdev, sector, size,
1093				     digest, digest_size,
1094				     P_OV_REPLY);
1095	if (!ok)
1096		dec_rs_pending(mdev);
1097	kfree(digest);
1098
1099out:
1100	if (peer_req)
1101		drbd_free_ee(mdev, peer_req);
1102	dec_unacked(mdev);
1103	return ok;
1104}
1105
1106void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1107{
1108	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1109		mdev->ov_last_oos_size += size>>9;
1110	} else {
1111		mdev->ov_last_oos_start = sector;
1112		mdev->ov_last_oos_size = size>>9;
1113	}
1114	drbd_set_out_of_sync(mdev, sector, size);
1115}
1116
1117int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1118{
1119	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1120	struct drbd_conf *mdev = w->mdev;
1121	struct digest_info *di;
1122	void *digest;
1123	sector_t sector = peer_req->i.sector;
1124	unsigned int size = peer_req->i.size;
1125	int digest_size;
1126	int ok, eq = 0;
1127
1128	if (unlikely(cancel)) {
1129		drbd_free_ee(mdev, peer_req);
1130		dec_unacked(mdev);
1131		return 1;
1132	}
1133
1134	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1135	 * the resync lru has been cleaned up already */
1136	if (get_ldev(mdev)) {
1137		drbd_rs_complete_io(mdev, peer_req->i.sector);
1138		put_ldev(mdev);
1139	}
1140
1141	di = peer_req->digest;
1142
1143	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1144		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1145		digest = kmalloc(digest_size, GFP_NOIO);
1146		if (digest) {
1147			drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1148
1149			D_ASSERT(digest_size == di->digest_size);
1150			eq = !memcmp(digest, di->digest, digest_size);
1151			kfree(digest);
1152		}
1153	}
1154
1155	/* Free peer_req and pages before send.
1156	 * In case we block on congestion, we could otherwise run into
1157	 * some distributed deadlock, if the other side blocks on
1158	 * congestion as well, because our receiver blocks in
1159	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1160	drbd_free_ee(mdev, peer_req);
1161	if (!eq)
1162		drbd_ov_oos_found(mdev, sector, size);
1163	else
1164		ov_oos_print(mdev);
1165
1166	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1167			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1168
1169	dec_unacked(mdev);
1170
1171	--mdev->ov_left;
1172
1173	/* let's advance progress step marks only for every other megabyte */
1174	if ((mdev->ov_left & 0x200) == 0x200)
1175		drbd_advance_rs_marks(mdev, mdev->ov_left);
1176
1177	if (mdev->ov_left == 0) {
1178		ov_oos_print(mdev);
1179		drbd_resync_finished(mdev);
1180	}
1181
1182	return ok;
1183}
1184
1185int w_prev_work_done(struct drbd_work *w, int cancel)
1186{
1187	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1188
1189	complete(&b->done);
1190	return 1;
1191}
1192
1193int w_send_barrier(struct drbd_work *w, int cancel)
1194{
1195	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1196	struct drbd_conf *mdev = w->mdev;
1197	struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1198	int ok = 1;
1199
1200	/* really avoid racing with tl_clear.  w.cb may have been referenced
1201	 * just before it was reassigned and re-queued, so double check that.
1202	 * actually, this race was harmless, since we only try to send the
1203	 * barrier packet here, and otherwise do nothing with the object.
1204	 * but compare with the head of w_clear_epoch */
1205	spin_lock_irq(&mdev->tconn->req_lock);
1206	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1207		cancel = 1;
1208	spin_unlock_irq(&mdev->tconn->req_lock);
1209	if (cancel)
1210		return 1;
1211
1212	if (!drbd_get_data_sock(mdev->tconn))
1213		return 0;
1214	p->barrier = b->br_number;
1215	/* inc_ap_pending was done where this was queued.
1216	 * dec_ap_pending will be done in got_BarrierAck
1217	 * or (on connection loss) in w_clear_epoch.  */
1218	ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1219			    &p->head, sizeof(*p), 0);
1220	drbd_put_data_sock(mdev->tconn);
1221
1222	return ok;
1223}
1224
1225int w_send_write_hint(struct drbd_work *w, int cancel)
1226{
1227	struct drbd_conf *mdev = w->mdev;
1228	if (cancel)
1229		return 1;
1230	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1231}
1232
1233int w_send_oos(struct drbd_work *w, int cancel)
1234{
1235	struct drbd_request *req = container_of(w, struct drbd_request, w);
1236	struct drbd_conf *mdev = w->mdev;
1237	int ok;
1238
1239	if (unlikely(cancel)) {
1240		req_mod(req, SEND_CANCELED);
1241		return 1;
1242	}
1243
1244	ok = drbd_send_oos(mdev, req);
1245	req_mod(req, OOS_HANDED_TO_NETWORK);
1246
1247	return ok;
1248}
1249
1250/**
1251 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1252 * @mdev:	DRBD device.
1253 * @w:		work object.
1254 * @cancel:	The connection will be closed anyways
1255 */
1256int w_send_dblock(struct drbd_work *w, int cancel)
1257{
1258	struct drbd_request *req = container_of(w, struct drbd_request, w);
1259	struct drbd_conf *mdev = w->mdev;
1260	int ok;
1261
1262	if (unlikely(cancel)) {
1263		req_mod(req, SEND_CANCELED);
1264		return 1;
1265	}
1266
1267	ok = drbd_send_dblock(mdev, req);
1268	req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1269
1270	return ok;
1271}
1272
1273/**
1274 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1275 * @mdev:	DRBD device.
1276 * @w:		work object.
1277 * @cancel:	The connection will be closed anyways
1278 */
1279int w_send_read_req(struct drbd_work *w, int cancel)
1280{
1281	struct drbd_request *req = container_of(w, struct drbd_request, w);
1282	struct drbd_conf *mdev = w->mdev;
1283	int ok;
1284
1285	if (unlikely(cancel)) {
1286		req_mod(req, SEND_CANCELED);
1287		return 1;
1288	}
1289
1290	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1291				(unsigned long)req);
1292
1293	if (!ok) {
1294		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1295		 * so this is probably redundant */
1296		if (mdev->state.conn >= C_CONNECTED)
1297			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1298	}
1299	req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1300
1301	return ok;
1302}
1303
1304int w_restart_disk_io(struct drbd_work *w, int cancel)
1305{
1306	struct drbd_request *req = container_of(w, struct drbd_request, w);
1307	struct drbd_conf *mdev = w->mdev;
1308
1309	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1310		drbd_al_begin_io(mdev, req->i.sector);
1311	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1312	   theoretically. Practically it can not deadlock, since this is
1313	   only used when unfreezing IOs. All the extents of the requests
1314	   that made it into the TL are already active */
1315
1316	drbd_req_make_private_bio(req, req->master_bio);
1317	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1318	generic_make_request(req->private_bio);
1319
1320	return 1;
1321}
1322
1323static int _drbd_may_sync_now(struct drbd_conf *mdev)
1324{
1325	struct drbd_conf *odev = mdev;
1326
1327	while (1) {
1328		if (odev->sync_conf.after == -1)
1329			return 1;
1330		odev = minor_to_mdev(odev->sync_conf.after);
1331		if (!expect(odev))
1332			return 1;
1333		if ((odev->state.conn >= C_SYNC_SOURCE &&
1334		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1335		    odev->state.aftr_isp || odev->state.peer_isp ||
1336		    odev->state.user_isp)
1337			return 0;
1338	}
1339}
1340
1341/**
1342 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1343 * @mdev:	DRBD device.
1344 *
1345 * Called from process context only (admin command and after_state_ch).
1346 */
1347static int _drbd_pause_after(struct drbd_conf *mdev)
1348{
1349	struct drbd_conf *odev;
1350	int i, rv = 0;
1351
1352	idr_for_each_entry(&minors, odev, i) {
1353		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1354			continue;
1355		if (!_drbd_may_sync_now(odev))
1356			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1357			       != SS_NOTHING_TO_DO);
1358	}
1359
1360	return rv;
1361}
1362
1363/**
1364 * _drbd_resume_next() - Resume resync on all devices that may resync now
1365 * @mdev:	DRBD device.
1366 *
1367 * Called from process context only (admin command and worker).
1368 */
1369static int _drbd_resume_next(struct drbd_conf *mdev)
1370{
1371	struct drbd_conf *odev;
1372	int i, rv = 0;
1373
1374	idr_for_each_entry(&minors, odev, i) {
1375		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1376			continue;
1377		if (odev->state.aftr_isp) {
1378			if (_drbd_may_sync_now(odev))
1379				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1380							CS_HARD, NULL)
1381				       != SS_NOTHING_TO_DO) ;
1382		}
1383	}
1384	return rv;
1385}
1386
1387void resume_next_sg(struct drbd_conf *mdev)
1388{
1389	write_lock_irq(&global_state_lock);
1390	_drbd_resume_next(mdev);
1391	write_unlock_irq(&global_state_lock);
1392}
1393
1394void suspend_other_sg(struct drbd_conf *mdev)
1395{
1396	write_lock_irq(&global_state_lock);
1397	_drbd_pause_after(mdev);
1398	write_unlock_irq(&global_state_lock);
1399}
1400
1401static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1402{
1403	struct drbd_conf *odev;
1404
1405	if (o_minor == -1)
1406		return NO_ERROR;
1407	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1408		return ERR_SYNC_AFTER;
1409
1410	/* check for loops */
1411	odev = minor_to_mdev(o_minor);
1412	while (1) {
1413		if (odev == mdev)
1414			return ERR_SYNC_AFTER_CYCLE;
1415
1416		/* dependency chain ends here, no cycles. */
1417		if (odev->sync_conf.after == -1)
1418			return NO_ERROR;
1419
1420		/* follow the dependency chain */
1421		odev = minor_to_mdev(odev->sync_conf.after);
1422	}
1423}
1424
1425int drbd_alter_sa(struct drbd_conf *mdev, int na)
1426{
1427	int changes;
1428	int retcode;
1429
1430	write_lock_irq(&global_state_lock);
1431	retcode = sync_after_error(mdev, na);
1432	if (retcode == NO_ERROR) {
1433		mdev->sync_conf.after = na;
1434		do {
1435			changes  = _drbd_pause_after(mdev);
1436			changes |= _drbd_resume_next(mdev);
1437		} while (changes);
1438	}
1439	write_unlock_irq(&global_state_lock);
1440	return retcode;
1441}
1442
1443void drbd_rs_controller_reset(struct drbd_conf *mdev)
1444{
1445	atomic_set(&mdev->rs_sect_in, 0);
1446	atomic_set(&mdev->rs_sect_ev, 0);
1447	mdev->rs_in_flight = 0;
1448	mdev->rs_planed = 0;
1449	spin_lock(&mdev->peer_seq_lock);
1450	fifo_set(&mdev->rs_plan_s, 0);
1451	spin_unlock(&mdev->peer_seq_lock);
1452}
1453
1454void start_resync_timer_fn(unsigned long data)
1455{
1456	struct drbd_conf *mdev = (struct drbd_conf *) data;
1457
1458	drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1459}
1460
1461int w_start_resync(struct drbd_work *w, int cancel)
1462{
1463	struct drbd_conf *mdev = w->mdev;
1464
1465	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1466		dev_warn(DEV, "w_start_resync later...\n");
1467		mdev->start_resync_timer.expires = jiffies + HZ/10;
1468		add_timer(&mdev->start_resync_timer);
1469		return 1;
1470	}
1471
1472	drbd_start_resync(mdev, C_SYNC_SOURCE);
1473	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1474	return 1;
1475}
1476
1477/**
1478 * drbd_start_resync() - Start the resync process
1479 * @mdev:	DRBD device.
1480 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1481 *
1482 * This function might bring you directly into one of the
1483 * C_PAUSED_SYNC_* states.
1484 */
1485void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1486{
1487	union drbd_state ns;
1488	int r;
1489
1490	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1491		dev_err(DEV, "Resync already running!\n");
1492		return;
1493	}
1494
1495	if (mdev->state.conn < C_AHEAD) {
1496		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1497		drbd_rs_cancel_all(mdev);
1498		/* This should be done when we abort the resync. We definitely do not
1499		   want to have this for connections going back and forth between
1500		   Ahead/Behind and SyncSource/SyncTarget */
1501	}
1502
1503	if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1504		if (side == C_SYNC_TARGET) {
1505			/* Since application IO was locked out during C_WF_BITMAP_T and
1506			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1507			   we check that we might make the data inconsistent. */
1508			r = drbd_khelper(mdev, "before-resync-target");
1509			r = (r >> 8) & 0xff;
1510			if (r > 0) {
1511				dev_info(DEV, "before-resync-target handler returned %d, "
1512					 "dropping connection.\n", r);
1513				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1514				return;
1515			}
1516		} else /* C_SYNC_SOURCE */ {
1517			r = drbd_khelper(mdev, "before-resync-source");
1518			r = (r >> 8) & 0xff;
1519			if (r > 0) {
1520				if (r == 3) {
1521					dev_info(DEV, "before-resync-source handler returned %d, "
1522						 "ignoring. Old userland tools?", r);
1523				} else {
1524					dev_info(DEV, "before-resync-source handler returned %d, "
1525						 "dropping connection.\n", r);
1526					drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1527					return;
1528				}
1529			}
1530		}
1531	}
1532
1533	if (current == mdev->tconn->worker.task) {
1534		/* The worker should not sleep waiting for state_mutex,
1535		   that can take long */
1536		if (!mutex_trylock(mdev->state_mutex)) {
1537			set_bit(B_RS_H_DONE, &mdev->flags);
1538			mdev->start_resync_timer.expires = jiffies + HZ/5;
1539			add_timer(&mdev->start_resync_timer);
1540			return;
1541		}
1542	} else {
1543		mutex_lock(mdev->state_mutex);
1544	}
1545	clear_bit(B_RS_H_DONE, &mdev->flags);
1546
1547	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1548		mutex_unlock(mdev->state_mutex);
1549		return;
1550	}
1551
1552	write_lock_irq(&global_state_lock);
1553	ns = mdev->state;
1554
1555	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1556
1557	ns.conn = side;
1558
1559	if (side == C_SYNC_TARGET)
1560		ns.disk = D_INCONSISTENT;
1561	else /* side == C_SYNC_SOURCE */
1562		ns.pdsk = D_INCONSISTENT;
1563
1564	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1565	ns = mdev->state;
1566
1567	if (ns.conn < C_CONNECTED)
1568		r = SS_UNKNOWN_ERROR;
1569
1570	if (r == SS_SUCCESS) {
1571		unsigned long tw = drbd_bm_total_weight(mdev);
1572		unsigned long now = jiffies;
1573		int i;
1574
1575		mdev->rs_failed    = 0;
1576		mdev->rs_paused    = 0;
1577		mdev->rs_same_csum = 0;
1578		mdev->rs_last_events = 0;
1579		mdev->rs_last_sect_ev = 0;
1580		mdev->rs_total     = tw;
1581		mdev->rs_start     = now;
1582		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1583			mdev->rs_mark_left[i] = tw;
1584			mdev->rs_mark_time[i] = now;
1585		}
1586		_drbd_pause_after(mdev);
1587	}
1588	write_unlock_irq(&global_state_lock);
1589
1590	if (r == SS_SUCCESS) {
1591		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1592		     drbd_conn_str(ns.conn),
1593		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1594		     (unsigned long) mdev->rs_total);
1595		if (side == C_SYNC_TARGET)
1596			mdev->bm_resync_fo = 0;
1597
1598		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1599		 * with w_send_oos, or the sync target will get confused as to
1600		 * how much bits to resync.  We cannot do that always, because for an
1601		 * empty resync and protocol < 95, we need to do it here, as we call
1602		 * drbd_resync_finished from here in that case.
1603		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1604		 * and from after_state_ch otherwise. */
1605		if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1606			drbd_gen_and_send_sync_uuid(mdev);
1607
1608		if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1609			/* This still has a race (about when exactly the peers
1610			 * detect connection loss) that can lead to a full sync
1611			 * on next handshake. In 8.3.9 we fixed this with explicit
1612			 * resync-finished notifications, but the fix
1613			 * introduces a protocol change.  Sleeping for some
1614			 * time longer than the ping interval + timeout on the
1615			 * SyncSource, to give the SyncTarget the chance to
1616			 * detect connection loss, then waiting for a ping
1617			 * response (implicit in drbd_resync_finished) reduces
1618			 * the race considerably, but does not solve it. */
1619			if (side == C_SYNC_SOURCE)
1620				schedule_timeout_interruptible(
1621					mdev->tconn->net_conf->ping_int * HZ +
1622					mdev->tconn->net_conf->ping_timeo*HZ/9);
1623			drbd_resync_finished(mdev);
1624		}
1625
1626		drbd_rs_controller_reset(mdev);
1627		/* ns.conn may already be != mdev->state.conn,
1628		 * we may have been paused in between, or become paused until
1629		 * the timer triggers.
1630		 * No matter, that is handled in resync_timer_fn() */
1631		if (ns.conn == C_SYNC_TARGET)
1632			mod_timer(&mdev->resync_timer, jiffies);
1633
1634		drbd_md_sync(mdev);
1635	}
1636	put_ldev(mdev);
1637	mutex_unlock(mdev->state_mutex);
1638}
1639
1640int drbd_worker(struct drbd_thread *thi)
1641{
1642	struct drbd_tconn *tconn = thi->tconn;
1643	struct drbd_work *w = NULL;
1644	struct drbd_conf *mdev;
1645	LIST_HEAD(work_list);
1646	int minor, intr = 0;
1647
1648	while (get_t_state(thi) == RUNNING) {
1649		drbd_thread_current_set_cpu(thi);
1650
1651		if (down_trylock(&tconn->data.work.s)) {
1652			mutex_lock(&tconn->data.mutex);
1653			if (tconn->data.socket && !tconn->net_conf->no_cork)
1654				drbd_tcp_uncork(tconn->data.socket);
1655			mutex_unlock(&tconn->data.mutex);
1656
1657			intr = down_interruptible(&tconn->data.work.s);
1658
1659			mutex_lock(&tconn->data.mutex);
1660			if (tconn->data.socket  && !tconn->net_conf->no_cork)
1661				drbd_tcp_cork(tconn->data.socket);
1662			mutex_unlock(&tconn->data.mutex);
1663		}
1664
1665		if (intr) {
1666			flush_signals(current);
1667			if (get_t_state(thi) == RUNNING) {
1668				conn_warn(tconn, "Worker got an unexpected signal\n");
1669				continue;
1670			}
1671			break;
1672		}
1673
1674		if (get_t_state(thi) != RUNNING)
1675			break;
1676		/* With this break, we have done a down() but not consumed
1677		   the entry from the list. The cleanup code takes care of
1678		   this...   */
1679
1680		w = NULL;
1681		spin_lock_irq(&tconn->data.work.q_lock);
1682		if (list_empty(&tconn->data.work.q)) {
1683			/* something terribly wrong in our logic.
1684			 * we were able to down() the semaphore,
1685			 * but the list is empty... doh.
1686			 *
1687			 * what is the best thing to do now?
1688			 * try again from scratch, restarting the receiver,
1689			 * asender, whatnot? could break even more ugly,
1690			 * e.g. when we are primary, but no good local data.
1691			 *
1692			 * I'll try to get away just starting over this loop.
1693			 */
1694			conn_warn(tconn, "Work list unexpectedly empty\n");
1695			spin_unlock_irq(&tconn->data.work.q_lock);
1696			continue;
1697		}
1698		w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1699		list_del_init(&w->list);
1700		spin_unlock_irq(&tconn->data.work.q_lock);
1701
1702		if (!w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1703			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1704			if (tconn->cstate >= C_WF_REPORT_PARAMS)
1705				conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1706		}
1707	}
1708
1709	spin_lock_irq(&tconn->data.work.q_lock);
1710	while (!list_empty(&tconn->data.work.q)) {
1711		list_splice_init(&tconn->data.work.q, &work_list);
1712		spin_unlock_irq(&tconn->data.work.q_lock);
1713
1714		while (!list_empty(&work_list)) {
1715			w = list_entry(work_list.next, struct drbd_work, list);
1716			list_del_init(&w->list);
1717			w->cb(w, 1);
1718		}
1719
1720		spin_lock_irq(&tconn->data.work.q_lock);
1721	}
1722	sema_init(&tconn->data.work.s, 0);
1723	/* DANGEROUS race: if someone did queue his work within the spinlock,
1724	 * but up() ed outside the spinlock, we could get an up() on the
1725	 * semaphore without corresponding list entry.
1726	 * So don't do that.
1727	 */
1728	spin_unlock_irq(&tconn->data.work.q_lock);
1729
1730	drbd_thread_stop(&tconn->receiver);
1731	idr_for_each_entry(&tconn->volumes, mdev, minor) {
1732		D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1733		/* _drbd_set_state only uses stop_nowait.
1734		 * wait here for the exiting receiver. */
1735		drbd_mdev_cleanup(mdev);
1736	}
1737	clear_bit(OBJECT_DYING, &tconn->flags);
1738	clear_bit(CONFIG_PENDING, &tconn->flags);
1739	wake_up(&tconn->ping_wait);
1740
1741	return 0;
1742}
1743