drbd_worker.c revision f399002e68e626e7bc443e6fcab1772704cc197f
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
40
41static int w_make_ov_request(struct drbd_work *w, int cancel);
42static int w_make_resync_request(struct drbd_work *w, int cancel);
43
44
45
46/* endio handlers:
47 *   drbd_md_io_complete (defined here)
48 *   drbd_request_endio (defined here)
49 *   drbd_peer_request_endio (defined here)
50 *   bm_async_io_complete (defined in drbd_bitmap.c)
51 *
52 * For all these callbacks, note the following:
53 * The callbacks will be called in irq context by the IDE drivers,
54 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
55 * Try to get the locking right :)
56 *
57 */
58
59
60/* About the global_state_lock
61   Each state transition on an device holds a read lock. In case we have
62   to evaluate the sync after dependencies, we grab a write lock, because
63   we need stable states on all devices for that.  */
64rwlock_t global_state_lock;
65
66/* used for synchronous meta data and bitmap IO
67 * submitted by drbd_md_sync_page_io()
68 */
69void drbd_md_io_complete(struct bio *bio, int error)
70{
71	struct drbd_md_io *md_io;
72
73	md_io = (struct drbd_md_io *)bio->bi_private;
74	md_io->error = error;
75
76	complete(&md_io->event);
77}
78
79/* reads on behalf of the partner,
80 * "submitted" by the receiver
81 */
82void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
83{
84	unsigned long flags = 0;
85	struct drbd_conf *mdev = peer_req->w.mdev;
86
87	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
88	mdev->read_cnt += peer_req->i.size >> 9;
89	list_del(&peer_req->w.list);
90	if (list_empty(&mdev->read_ee))
91		wake_up(&mdev->ee_wait);
92	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93		__drbd_chk_io_error(mdev, false);
94	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
95
96	drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
97	put_ldev(mdev);
98}
99
100/* writes on behalf of the partner, or resync writes,
101 * "submitted" by the receiver, final stage.  */
102static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103{
104	unsigned long flags = 0;
105	struct drbd_conf *mdev = peer_req->w.mdev;
106	sector_t e_sector;
107	int do_wake;
108	u64 block_id;
109	int do_al_complete_io;
110
111	/* after we moved peer_req to done_ee,
112	 * we may no longer access it,
113	 * it may be freed/reused already!
114	 * (as soon as we release the req_lock) */
115	e_sector = peer_req->i.sector;
116	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
117	block_id = peer_req->block_id;
118
119	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
120	mdev->writ_cnt += peer_req->i.size >> 9;
121	list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
122	list_add_tail(&peer_req->w.list, &mdev->done_ee);
123
124	/*
125	 * Do not remove from the write_requests tree here: we did not send the
126	 * Ack yet and did not wake possibly waiting conflicting requests.
127	 * Removed from the tree from "drbd_process_done_ee" within the
128	 * appropriate w.cb (e_end_block/e_end_resync_block) or from
129	 * _drbd_clear_done_ee.
130	 */
131
132	do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
133
134	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
135		__drbd_chk_io_error(mdev, false);
136	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
137
138	if (block_id == ID_SYNCER)
139		drbd_rs_complete_io(mdev, e_sector);
140
141	if (do_wake)
142		wake_up(&mdev->ee_wait);
143
144	if (do_al_complete_io)
145		drbd_al_complete_io(mdev, e_sector);
146
147	wake_asender(mdev->tconn);
148	put_ldev(mdev);
149}
150
151/* writes on behalf of the partner, or resync writes,
152 * "submitted" by the receiver.
153 */
154void drbd_peer_request_endio(struct bio *bio, int error)
155{
156	struct drbd_peer_request *peer_req = bio->bi_private;
157	struct drbd_conf *mdev = peer_req->w.mdev;
158	int uptodate = bio_flagged(bio, BIO_UPTODATE);
159	int is_write = bio_data_dir(bio) == WRITE;
160
161	if (error && __ratelimit(&drbd_ratelimit_state))
162		dev_warn(DEV, "%s: error=%d s=%llus\n",
163				is_write ? "write" : "read", error,
164				(unsigned long long)peer_req->i.sector);
165	if (!error && !uptodate) {
166		if (__ratelimit(&drbd_ratelimit_state))
167			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
168					is_write ? "write" : "read",
169					(unsigned long long)peer_req->i.sector);
170		/* strange behavior of some lower level drivers...
171		 * fail the request by clearing the uptodate flag,
172		 * but do not return any error?! */
173		error = -EIO;
174	}
175
176	if (error)
177		set_bit(__EE_WAS_ERROR, &peer_req->flags);
178
179	bio_put(bio); /* no need for the bio anymore */
180	if (atomic_dec_and_test(&peer_req->pending_bios)) {
181		if (is_write)
182			drbd_endio_write_sec_final(peer_req);
183		else
184			drbd_endio_read_sec_final(peer_req);
185	}
186}
187
188/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
189 */
190void drbd_request_endio(struct bio *bio, int error)
191{
192	unsigned long flags;
193	struct drbd_request *req = bio->bi_private;
194	struct drbd_conf *mdev = req->w.mdev;
195	struct bio_and_error m;
196	enum drbd_req_event what;
197	int uptodate = bio_flagged(bio, BIO_UPTODATE);
198
199	if (!error && !uptodate) {
200		dev_warn(DEV, "p %s: setting error to -EIO\n",
201			 bio_data_dir(bio) == WRITE ? "write" : "read");
202		/* strange behavior of some lower level drivers...
203		 * fail the request by clearing the uptodate flag,
204		 * but do not return any error?! */
205		error = -EIO;
206	}
207
208	/* to avoid recursion in __req_mod */
209	if (unlikely(error)) {
210		what = (bio_data_dir(bio) == WRITE)
211			? WRITE_COMPLETED_WITH_ERROR
212			: (bio_rw(bio) == READ)
213			  ? READ_COMPLETED_WITH_ERROR
214			  : READ_AHEAD_COMPLETED_WITH_ERROR;
215	} else
216		what = COMPLETED_OK;
217
218	bio_put(req->private_bio);
219	req->private_bio = ERR_PTR(error);
220
221	/* not req_mod(), we need irqsave here! */
222	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
223	__req_mod(req, what, &m);
224	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
225
226	if (m.bio)
227		complete_master_bio(mdev, &m);
228}
229
230int w_read_retry_remote(struct drbd_work *w, int cancel)
231{
232	struct drbd_request *req = container_of(w, struct drbd_request, w);
233	struct drbd_conf *mdev = w->mdev;
234
235	/* We should not detach for read io-error,
236	 * but try to WRITE the P_DATA_REPLY to the failed location,
237	 * to give the disk the chance to relocate that block */
238
239	spin_lock_irq(&mdev->tconn->req_lock);
240	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241		_req_mod(req, READ_RETRY_REMOTE_CANCELED);
242		spin_unlock_irq(&mdev->tconn->req_lock);
243		return 1;
244	}
245	spin_unlock_irq(&mdev->tconn->req_lock);
246
247	return w_send_read_req(w, 0);
248}
249
250void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
251		  struct drbd_peer_request *peer_req, void *digest)
252{
253	struct hash_desc desc;
254	struct scatterlist sg;
255	struct page *page = peer_req->pages;
256	struct page *tmp;
257	unsigned len;
258
259	desc.tfm = tfm;
260	desc.flags = 0;
261
262	sg_init_table(&sg, 1);
263	crypto_hash_init(&desc);
264
265	while ((tmp = page_chain_next(page))) {
266		/* all but the last page will be fully used */
267		sg_set_page(&sg, page, PAGE_SIZE, 0);
268		crypto_hash_update(&desc, &sg, sg.length);
269		page = tmp;
270	}
271	/* and now the last, possibly only partially used page */
272	len = peer_req->i.size & (PAGE_SIZE - 1);
273	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
274	crypto_hash_update(&desc, &sg, sg.length);
275	crypto_hash_final(&desc, digest);
276}
277
278void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
279{
280	struct hash_desc desc;
281	struct scatterlist sg;
282	struct bio_vec *bvec;
283	int i;
284
285	desc.tfm = tfm;
286	desc.flags = 0;
287
288	sg_init_table(&sg, 1);
289	crypto_hash_init(&desc);
290
291	__bio_for_each_segment(bvec, bio, i, 0) {
292		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
293		crypto_hash_update(&desc, &sg, sg.length);
294	}
295	crypto_hash_final(&desc, digest);
296}
297
298/* MAYBE merge common code with w_e_end_ov_req */
299static int w_e_send_csum(struct drbd_work *w, int cancel)
300{
301	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
302	struct drbd_conf *mdev = w->mdev;
303	int digest_size;
304	void *digest;
305	int ok = 1;
306
307	if (unlikely(cancel))
308		goto out;
309
310	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
311		goto out;
312
313	digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
314	digest = kmalloc(digest_size, GFP_NOIO);
315	if (digest) {
316		sector_t sector = peer_req->i.sector;
317		unsigned int size = peer_req->i.size;
318		drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
319		/* Free peer_req and pages before send.
320		 * In case we block on congestion, we could otherwise run into
321		 * some distributed deadlock, if the other side blocks on
322		 * congestion as well, because our receiver blocks in
323		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
324		drbd_free_ee(mdev, peer_req);
325		peer_req = NULL;
326		inc_rs_pending(mdev);
327		ok = drbd_send_drequest_csum(mdev, sector, size,
328					     digest, digest_size,
329					     P_CSUM_RS_REQUEST);
330		kfree(digest);
331	} else {
332		dev_err(DEV, "kmalloc() of digest failed.\n");
333		ok = 0;
334	}
335
336out:
337	if (peer_req)
338		drbd_free_ee(mdev, peer_req);
339
340	if (unlikely(!ok))
341		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
342	return ok;
343}
344
345#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
346
347static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
348{
349	struct drbd_peer_request *peer_req;
350
351	if (!get_ldev(mdev))
352		return -EIO;
353
354	if (drbd_rs_should_slow_down(mdev, sector))
355		goto defer;
356
357	/* GFP_TRY, because if there is no memory available right now, this may
358	 * be rescheduled for later. It is "only" background resync, after all. */
359	peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
360	if (!peer_req)
361		goto defer;
362
363	peer_req->w.cb = w_e_send_csum;
364	spin_lock_irq(&mdev->tconn->req_lock);
365	list_add(&peer_req->w.list, &mdev->read_ee);
366	spin_unlock_irq(&mdev->tconn->req_lock);
367
368	atomic_add(size >> 9, &mdev->rs_sect_ev);
369	if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
370		return 0;
371
372	/* If it failed because of ENOMEM, retry should help.  If it failed
373	 * because bio_add_page failed (probably broken lower level driver),
374	 * retry may or may not help.
375	 * If it does not, you may need to force disconnect. */
376	spin_lock_irq(&mdev->tconn->req_lock);
377	list_del(&peer_req->w.list);
378	spin_unlock_irq(&mdev->tconn->req_lock);
379
380	drbd_free_ee(mdev, peer_req);
381defer:
382	put_ldev(mdev);
383	return -EAGAIN;
384}
385
386int w_resync_timer(struct drbd_work *w, int cancel)
387{
388	struct drbd_conf *mdev = w->mdev;
389	switch (mdev->state.conn) {
390	case C_VERIFY_S:
391		w_make_ov_request(w, cancel);
392		break;
393	case C_SYNC_TARGET:
394		w_make_resync_request(w, cancel);
395		break;
396	}
397
398	return 1;
399}
400
401void resync_timer_fn(unsigned long data)
402{
403	struct drbd_conf *mdev = (struct drbd_conf *) data;
404
405	if (list_empty(&mdev->resync_work.list))
406		drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
407}
408
409static void fifo_set(struct fifo_buffer *fb, int value)
410{
411	int i;
412
413	for (i = 0; i < fb->size; i++)
414		fb->values[i] = value;
415}
416
417static int fifo_push(struct fifo_buffer *fb, int value)
418{
419	int ov;
420
421	ov = fb->values[fb->head_index];
422	fb->values[fb->head_index++] = value;
423
424	if (fb->head_index >= fb->size)
425		fb->head_index = 0;
426
427	return ov;
428}
429
430static void fifo_add_val(struct fifo_buffer *fb, int value)
431{
432	int i;
433
434	for (i = 0; i < fb->size; i++)
435		fb->values[i] += value;
436}
437
438static int drbd_rs_controller(struct drbd_conf *mdev)
439{
440	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
441	unsigned int want;     /* The number of sectors we want in the proxy */
442	int req_sect; /* Number of sectors to request in this turn */
443	int correction; /* Number of sectors more we need in the proxy*/
444	int cps; /* correction per invocation of drbd_rs_controller() */
445	int steps; /* Number of time steps to plan ahead */
446	int curr_corr;
447	int max_sect;
448
449	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
450	mdev->rs_in_flight -= sect_in;
451
452	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
453
454	steps = mdev->rs_plan_s.size; /* (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
455
456	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
457		want = ((mdev->ldev->dc.resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
458	} else { /* normal path */
459		want = mdev->ldev->dc.c_fill_target ? mdev->ldev->dc.c_fill_target :
460			sect_in * mdev->ldev->dc.c_delay_target * HZ / (SLEEP_TIME * 10);
461	}
462
463	correction = want - mdev->rs_in_flight - mdev->rs_planed;
464
465	/* Plan ahead */
466	cps = correction / steps;
467	fifo_add_val(&mdev->rs_plan_s, cps);
468	mdev->rs_planed += cps * steps;
469
470	/* What we do in this step */
471	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
472	spin_unlock(&mdev->peer_seq_lock);
473	mdev->rs_planed -= curr_corr;
474
475	req_sect = sect_in + curr_corr;
476	if (req_sect < 0)
477		req_sect = 0;
478
479	max_sect = (mdev->ldev->dc.c_max_rate * 2 * SLEEP_TIME) / HZ;
480	if (req_sect > max_sect)
481		req_sect = max_sect;
482
483	/*
484	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
485		 sect_in, mdev->rs_in_flight, want, correction,
486		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
487	*/
488
489	return req_sect;
490}
491
492static int drbd_rs_number_requests(struct drbd_conf *mdev)
493{
494	int number;
495	if (mdev->rs_plan_s.size) { /* mdev->ldev->dc.c_plan_ahead */
496		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
497		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
498	} else {
499		mdev->c_sync_rate = mdev->ldev->dc.resync_rate;
500		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
501	}
502
503	/* ignore the amount of pending requests, the resync controller should
504	 * throttle down to incoming reply rate soon enough anyways. */
505	return number;
506}
507
508static int w_make_resync_request(struct drbd_work *w, int cancel)
509{
510	struct drbd_conf *mdev = w->mdev;
511	unsigned long bit;
512	sector_t sector;
513	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
514	int max_bio_size;
515	int number, rollback_i, size;
516	int align, queued, sndbuf;
517	int i = 0;
518
519	if (unlikely(cancel))
520		return 1;
521
522	if (mdev->rs_total == 0) {
523		/* empty resync? */
524		drbd_resync_finished(mdev);
525		return 1;
526	}
527
528	if (!get_ldev(mdev)) {
529		/* Since we only need to access mdev->rsync a
530		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
531		   to continue resync with a broken disk makes no sense at
532		   all */
533		dev_err(DEV, "Disk broke down during resync!\n");
534		return 1;
535	}
536
537	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
538	number = drbd_rs_number_requests(mdev);
539	if (number == 0)
540		goto requeue;
541
542	for (i = 0; i < number; i++) {
543		/* Stop generating RS requests, when half of the send buffer is filled */
544		mutex_lock(&mdev->tconn->data.mutex);
545		if (mdev->tconn->data.socket) {
546			queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
547			sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
548		} else {
549			queued = 1;
550			sndbuf = 0;
551		}
552		mutex_unlock(&mdev->tconn->data.mutex);
553		if (queued > sndbuf / 2)
554			goto requeue;
555
556next_sector:
557		size = BM_BLOCK_SIZE;
558		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
559
560		if (bit == DRBD_END_OF_BITMAP) {
561			mdev->bm_resync_fo = drbd_bm_bits(mdev);
562			put_ldev(mdev);
563			return 1;
564		}
565
566		sector = BM_BIT_TO_SECT(bit);
567
568		if (drbd_rs_should_slow_down(mdev, sector) ||
569		    drbd_try_rs_begin_io(mdev, sector)) {
570			mdev->bm_resync_fo = bit;
571			goto requeue;
572		}
573		mdev->bm_resync_fo = bit + 1;
574
575		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
576			drbd_rs_complete_io(mdev, sector);
577			goto next_sector;
578		}
579
580#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
581		/* try to find some adjacent bits.
582		 * we stop if we have already the maximum req size.
583		 *
584		 * Additionally always align bigger requests, in order to
585		 * be prepared for all stripe sizes of software RAIDs.
586		 */
587		align = 1;
588		rollback_i = i;
589		for (;;) {
590			if (size + BM_BLOCK_SIZE > max_bio_size)
591				break;
592
593			/* Be always aligned */
594			if (sector & ((1<<(align+3))-1))
595				break;
596
597			/* do not cross extent boundaries */
598			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
599				break;
600			/* now, is it actually dirty, after all?
601			 * caution, drbd_bm_test_bit is tri-state for some
602			 * obscure reason; ( b == 0 ) would get the out-of-band
603			 * only accidentally right because of the "oddly sized"
604			 * adjustment below */
605			if (drbd_bm_test_bit(mdev, bit+1) != 1)
606				break;
607			bit++;
608			size += BM_BLOCK_SIZE;
609			if ((BM_BLOCK_SIZE << align) <= size)
610				align++;
611			i++;
612		}
613		/* if we merged some,
614		 * reset the offset to start the next drbd_bm_find_next from */
615		if (size > BM_BLOCK_SIZE)
616			mdev->bm_resync_fo = bit + 1;
617#endif
618
619		/* adjust very last sectors, in case we are oddly sized */
620		if (sector + (size>>9) > capacity)
621			size = (capacity-sector)<<9;
622		if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
623			switch (read_for_csum(mdev, sector, size)) {
624			case -EIO: /* Disk failure */
625				put_ldev(mdev);
626				return 0;
627			case -EAGAIN: /* allocation failed, or ldev busy */
628				drbd_rs_complete_io(mdev, sector);
629				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
630				i = rollback_i;
631				goto requeue;
632			case 0:
633				/* everything ok */
634				break;
635			default:
636				BUG();
637			}
638		} else {
639			inc_rs_pending(mdev);
640			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
641					       sector, size, ID_SYNCER)) {
642				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
643				dec_rs_pending(mdev);
644				put_ldev(mdev);
645				return 0;
646			}
647		}
648	}
649
650	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
651		/* last syncer _request_ was sent,
652		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
653		 * next sync group will resume), as soon as we receive the last
654		 * resync data block, and the last bit is cleared.
655		 * until then resync "work" is "inactive" ...
656		 */
657		put_ldev(mdev);
658		return 1;
659	}
660
661 requeue:
662	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
663	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
664	put_ldev(mdev);
665	return 1;
666}
667
668static int w_make_ov_request(struct drbd_work *w, int cancel)
669{
670	struct drbd_conf *mdev = w->mdev;
671	int number, i, size;
672	sector_t sector;
673	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
674
675	if (unlikely(cancel))
676		return 1;
677
678	number = drbd_rs_number_requests(mdev);
679
680	sector = mdev->ov_position;
681	for (i = 0; i < number; i++) {
682		if (sector >= capacity) {
683			return 1;
684		}
685
686		size = BM_BLOCK_SIZE;
687
688		if (drbd_rs_should_slow_down(mdev, sector) ||
689		    drbd_try_rs_begin_io(mdev, sector)) {
690			mdev->ov_position = sector;
691			goto requeue;
692		}
693
694		if (sector + (size>>9) > capacity)
695			size = (capacity-sector)<<9;
696
697		inc_rs_pending(mdev);
698		if (!drbd_send_ov_request(mdev, sector, size)) {
699			dec_rs_pending(mdev);
700			return 0;
701		}
702		sector += BM_SECT_PER_BIT;
703	}
704	mdev->ov_position = sector;
705
706 requeue:
707	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
708	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
709	return 1;
710}
711
712int w_ov_finished(struct drbd_work *w, int cancel)
713{
714	struct drbd_conf *mdev = w->mdev;
715	kfree(w);
716	ov_oos_print(mdev);
717	drbd_resync_finished(mdev);
718
719	return 1;
720}
721
722static int w_resync_finished(struct drbd_work *w, int cancel)
723{
724	struct drbd_conf *mdev = w->mdev;
725	kfree(w);
726
727	drbd_resync_finished(mdev);
728
729	return 1;
730}
731
732static void ping_peer(struct drbd_conf *mdev)
733{
734	struct drbd_tconn *tconn = mdev->tconn;
735
736	clear_bit(GOT_PING_ACK, &tconn->flags);
737	request_ping(tconn);
738	wait_event(tconn->ping_wait,
739		   test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
740}
741
742int drbd_resync_finished(struct drbd_conf *mdev)
743{
744	unsigned long db, dt, dbdt;
745	unsigned long n_oos;
746	union drbd_state os, ns;
747	struct drbd_work *w;
748	char *khelper_cmd = NULL;
749	int verify_done = 0;
750
751	/* Remove all elements from the resync LRU. Since future actions
752	 * might set bits in the (main) bitmap, then the entries in the
753	 * resync LRU would be wrong. */
754	if (drbd_rs_del_all(mdev)) {
755		/* In case this is not possible now, most probably because
756		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
757		 * queue (or even the read operations for those packets
758		 * is not finished by now).   Retry in 100ms. */
759
760		schedule_timeout_interruptible(HZ / 10);
761		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
762		if (w) {
763			w->cb = w_resync_finished;
764			drbd_queue_work(&mdev->tconn->data.work, w);
765			return 1;
766		}
767		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
768	}
769
770	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
771	if (dt <= 0)
772		dt = 1;
773	db = mdev->rs_total;
774	dbdt = Bit2KB(db/dt);
775	mdev->rs_paused /= HZ;
776
777	if (!get_ldev(mdev))
778		goto out;
779
780	ping_peer(mdev);
781
782	spin_lock_irq(&mdev->tconn->req_lock);
783	os = mdev->state;
784
785	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
786
787	/* This protects us against multiple calls (that can happen in the presence
788	   of application IO), and against connectivity loss just before we arrive here. */
789	if (os.conn <= C_CONNECTED)
790		goto out_unlock;
791
792	ns = os;
793	ns.conn = C_CONNECTED;
794
795	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
796	     verify_done ? "Online verify " : "Resync",
797	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
798
799	n_oos = drbd_bm_total_weight(mdev);
800
801	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
802		if (n_oos) {
803			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
804			      n_oos, Bit2KB(1));
805			khelper_cmd = "out-of-sync";
806		}
807	} else {
808		D_ASSERT((n_oos - mdev->rs_failed) == 0);
809
810		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
811			khelper_cmd = "after-resync-target";
812
813		if (mdev->tconn->csums_tfm && mdev->rs_total) {
814			const unsigned long s = mdev->rs_same_csum;
815			const unsigned long t = mdev->rs_total;
816			const int ratio =
817				(t == 0)     ? 0 :
818			(t < 100000) ? ((s*100)/t) : (s/(t/100));
819			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
820			     "transferred %luK total %luK\n",
821			     ratio,
822			     Bit2KB(mdev->rs_same_csum),
823			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
824			     Bit2KB(mdev->rs_total));
825		}
826	}
827
828	if (mdev->rs_failed) {
829		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
830
831		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
832			ns.disk = D_INCONSISTENT;
833			ns.pdsk = D_UP_TO_DATE;
834		} else {
835			ns.disk = D_UP_TO_DATE;
836			ns.pdsk = D_INCONSISTENT;
837		}
838	} else {
839		ns.disk = D_UP_TO_DATE;
840		ns.pdsk = D_UP_TO_DATE;
841
842		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
843			if (mdev->p_uuid) {
844				int i;
845				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
846					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
847				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
848				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
849			} else {
850				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
851			}
852		}
853
854		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
855			/* for verify runs, we don't update uuids here,
856			 * so there would be nothing to report. */
857			drbd_uuid_set_bm(mdev, 0UL);
858			drbd_print_uuids(mdev, "updated UUIDs");
859			if (mdev->p_uuid) {
860				/* Now the two UUID sets are equal, update what we
861				 * know of the peer. */
862				int i;
863				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
864					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
865			}
866		}
867	}
868
869	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
870out_unlock:
871	spin_unlock_irq(&mdev->tconn->req_lock);
872	put_ldev(mdev);
873out:
874	mdev->rs_total  = 0;
875	mdev->rs_failed = 0;
876	mdev->rs_paused = 0;
877	if (verify_done)
878		mdev->ov_start_sector = 0;
879
880	drbd_md_sync(mdev);
881
882	if (khelper_cmd)
883		drbd_khelper(mdev, khelper_cmd);
884
885	return 1;
886}
887
888/* helper */
889static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
890{
891	if (drbd_ee_has_active_page(peer_req)) {
892		/* This might happen if sendpage() has not finished */
893		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
894		atomic_add(i, &mdev->pp_in_use_by_net);
895		atomic_sub(i, &mdev->pp_in_use);
896		spin_lock_irq(&mdev->tconn->req_lock);
897		list_add_tail(&peer_req->w.list, &mdev->net_ee);
898		spin_unlock_irq(&mdev->tconn->req_lock);
899		wake_up(&drbd_pp_wait);
900	} else
901		drbd_free_ee(mdev, peer_req);
902}
903
904/**
905 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
906 * @mdev:	DRBD device.
907 * @w:		work object.
908 * @cancel:	The connection will be closed anyways
909 */
910int w_e_end_data_req(struct drbd_work *w, int cancel)
911{
912	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
913	struct drbd_conf *mdev = w->mdev;
914	int ok;
915
916	if (unlikely(cancel)) {
917		drbd_free_ee(mdev, peer_req);
918		dec_unacked(mdev);
919		return 1;
920	}
921
922	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
923		ok = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
924	} else {
925		if (__ratelimit(&drbd_ratelimit_state))
926			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
927			    (unsigned long long)peer_req->i.sector);
928
929		ok = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
930	}
931
932	dec_unacked(mdev);
933
934	move_to_net_ee_or_free(mdev, peer_req);
935
936	if (unlikely(!ok))
937		dev_err(DEV, "drbd_send_block() failed\n");
938	return ok;
939}
940
941/**
942 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
943 * @mdev:	DRBD device.
944 * @w:		work object.
945 * @cancel:	The connection will be closed anyways
946 */
947int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
948{
949	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
950	struct drbd_conf *mdev = w->mdev;
951	int ok;
952
953	if (unlikely(cancel)) {
954		drbd_free_ee(mdev, peer_req);
955		dec_unacked(mdev);
956		return 1;
957	}
958
959	if (get_ldev_if_state(mdev, D_FAILED)) {
960		drbd_rs_complete_io(mdev, peer_req->i.sector);
961		put_ldev(mdev);
962	}
963
964	if (mdev->state.conn == C_AHEAD) {
965		ok = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
966	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
967		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
968			inc_rs_pending(mdev);
969			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
970		} else {
971			if (__ratelimit(&drbd_ratelimit_state))
972				dev_err(DEV, "Not sending RSDataReply, "
973				    "partner DISKLESS!\n");
974			ok = 1;
975		}
976	} else {
977		if (__ratelimit(&drbd_ratelimit_state))
978			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
979			    (unsigned long long)peer_req->i.sector);
980
981		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
982
983		/* update resync data with failure */
984		drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
985	}
986
987	dec_unacked(mdev);
988
989	move_to_net_ee_or_free(mdev, peer_req);
990
991	if (unlikely(!ok))
992		dev_err(DEV, "drbd_send_block() failed\n");
993	return ok;
994}
995
996int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
997{
998	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
999	struct drbd_conf *mdev = w->mdev;
1000	struct digest_info *di;
1001	int digest_size;
1002	void *digest = NULL;
1003	int ok, eq = 0;
1004
1005	if (unlikely(cancel)) {
1006		drbd_free_ee(mdev, peer_req);
1007		dec_unacked(mdev);
1008		return 1;
1009	}
1010
1011	if (get_ldev(mdev)) {
1012		drbd_rs_complete_io(mdev, peer_req->i.sector);
1013		put_ldev(mdev);
1014	}
1015
1016	di = peer_req->digest;
1017
1018	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1019		/* quick hack to try to avoid a race against reconfiguration.
1020		 * a real fix would be much more involved,
1021		 * introducing more locking mechanisms */
1022		if (mdev->tconn->csums_tfm) {
1023			digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1024			D_ASSERT(digest_size == di->digest_size);
1025			digest = kmalloc(digest_size, GFP_NOIO);
1026		}
1027		if (digest) {
1028			drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1029			eq = !memcmp(digest, di->digest, digest_size);
1030			kfree(digest);
1031		}
1032
1033		if (eq) {
1034			drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1035			/* rs_same_csums unit is BM_BLOCK_SIZE */
1036			mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1037			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1038		} else {
1039			inc_rs_pending(mdev);
1040			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1041			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1042			kfree(di);
1043			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1044		}
1045	} else {
1046		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1047		if (__ratelimit(&drbd_ratelimit_state))
1048			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1049	}
1050
1051	dec_unacked(mdev);
1052	move_to_net_ee_or_free(mdev, peer_req);
1053
1054	if (unlikely(!ok))
1055		dev_err(DEV, "drbd_send_block/ack() failed\n");
1056	return ok;
1057}
1058
1059int w_e_end_ov_req(struct drbd_work *w, int cancel)
1060{
1061	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1062	struct drbd_conf *mdev = w->mdev;
1063	sector_t sector = peer_req->i.sector;
1064	unsigned int size = peer_req->i.size;
1065	int digest_size;
1066	void *digest;
1067	int ok = 1;
1068
1069	if (unlikely(cancel))
1070		goto out;
1071
1072	digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1073	digest = kmalloc(digest_size, GFP_NOIO);
1074	if (!digest) {
1075		ok = 0;	/* terminate the connection in case the allocation failed */
1076		goto out;
1077	}
1078
1079	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1080		drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1081	else
1082		memset(digest, 0, digest_size);
1083
1084	/* Free e and pages before send.
1085	 * In case we block on congestion, we could otherwise run into
1086	 * some distributed deadlock, if the other side blocks on
1087	 * congestion as well, because our receiver blocks in
1088	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1089	drbd_free_ee(mdev, peer_req);
1090	peer_req = NULL;
1091	inc_rs_pending(mdev);
1092	ok = drbd_send_drequest_csum(mdev, sector, size,
1093				     digest, digest_size,
1094				     P_OV_REPLY);
1095	if (!ok)
1096		dec_rs_pending(mdev);
1097	kfree(digest);
1098
1099out:
1100	if (peer_req)
1101		drbd_free_ee(mdev, peer_req);
1102	dec_unacked(mdev);
1103	return ok;
1104}
1105
1106void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1107{
1108	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1109		mdev->ov_last_oos_size += size>>9;
1110	} else {
1111		mdev->ov_last_oos_start = sector;
1112		mdev->ov_last_oos_size = size>>9;
1113	}
1114	drbd_set_out_of_sync(mdev, sector, size);
1115}
1116
1117int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1118{
1119	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1120	struct drbd_conf *mdev = w->mdev;
1121	struct digest_info *di;
1122	void *digest;
1123	sector_t sector = peer_req->i.sector;
1124	unsigned int size = peer_req->i.size;
1125	int digest_size;
1126	int ok, eq = 0;
1127
1128	if (unlikely(cancel)) {
1129		drbd_free_ee(mdev, peer_req);
1130		dec_unacked(mdev);
1131		return 1;
1132	}
1133
1134	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1135	 * the resync lru has been cleaned up already */
1136	if (get_ldev(mdev)) {
1137		drbd_rs_complete_io(mdev, peer_req->i.sector);
1138		put_ldev(mdev);
1139	}
1140
1141	di = peer_req->digest;
1142
1143	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1144		digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1145		digest = kmalloc(digest_size, GFP_NOIO);
1146		if (digest) {
1147			drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1148
1149			D_ASSERT(digest_size == di->digest_size);
1150			eq = !memcmp(digest, di->digest, digest_size);
1151			kfree(digest);
1152		}
1153	}
1154
1155	/* Free peer_req and pages before send.
1156	 * In case we block on congestion, we could otherwise run into
1157	 * some distributed deadlock, if the other side blocks on
1158	 * congestion as well, because our receiver blocks in
1159	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1160	drbd_free_ee(mdev, peer_req);
1161	if (!eq)
1162		drbd_ov_oos_found(mdev, sector, size);
1163	else
1164		ov_oos_print(mdev);
1165
1166	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1167			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1168
1169	dec_unacked(mdev);
1170
1171	--mdev->ov_left;
1172
1173	/* let's advance progress step marks only for every other megabyte */
1174	if ((mdev->ov_left & 0x200) == 0x200)
1175		drbd_advance_rs_marks(mdev, mdev->ov_left);
1176
1177	if (mdev->ov_left == 0) {
1178		ov_oos_print(mdev);
1179		drbd_resync_finished(mdev);
1180	}
1181
1182	return ok;
1183}
1184
1185int w_prev_work_done(struct drbd_work *w, int cancel)
1186{
1187	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1188
1189	complete(&b->done);
1190	return 1;
1191}
1192
1193int w_send_barrier(struct drbd_work *w, int cancel)
1194{
1195	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1196	struct drbd_conf *mdev = w->mdev;
1197	struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1198	int ok = 1;
1199
1200	/* really avoid racing with tl_clear.  w.cb may have been referenced
1201	 * just before it was reassigned and re-queued, so double check that.
1202	 * actually, this race was harmless, since we only try to send the
1203	 * barrier packet here, and otherwise do nothing with the object.
1204	 * but compare with the head of w_clear_epoch */
1205	spin_lock_irq(&mdev->tconn->req_lock);
1206	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1207		cancel = 1;
1208	spin_unlock_irq(&mdev->tconn->req_lock);
1209	if (cancel)
1210		return 1;
1211
1212	if (!drbd_get_data_sock(mdev->tconn))
1213		return 0;
1214	p->barrier = b->br_number;
1215	/* inc_ap_pending was done where this was queued.
1216	 * dec_ap_pending will be done in got_BarrierAck
1217	 * or (on connection loss) in w_clear_epoch.  */
1218	ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1219			    &p->head, sizeof(*p), 0);
1220	drbd_put_data_sock(mdev->tconn);
1221
1222	return ok;
1223}
1224
1225int w_send_write_hint(struct drbd_work *w, int cancel)
1226{
1227	struct drbd_conf *mdev = w->mdev;
1228	if (cancel)
1229		return 1;
1230	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1231}
1232
1233int w_send_oos(struct drbd_work *w, int cancel)
1234{
1235	struct drbd_request *req = container_of(w, struct drbd_request, w);
1236	struct drbd_conf *mdev = w->mdev;
1237	int ok;
1238
1239	if (unlikely(cancel)) {
1240		req_mod(req, SEND_CANCELED);
1241		return 1;
1242	}
1243
1244	ok = drbd_send_oos(mdev, req);
1245	req_mod(req, OOS_HANDED_TO_NETWORK);
1246
1247	return ok;
1248}
1249
1250/**
1251 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1252 * @mdev:	DRBD device.
1253 * @w:		work object.
1254 * @cancel:	The connection will be closed anyways
1255 */
1256int w_send_dblock(struct drbd_work *w, int cancel)
1257{
1258	struct drbd_request *req = container_of(w, struct drbd_request, w);
1259	struct drbd_conf *mdev = w->mdev;
1260	int ok;
1261
1262	if (unlikely(cancel)) {
1263		req_mod(req, SEND_CANCELED);
1264		return 1;
1265	}
1266
1267	ok = drbd_send_dblock(mdev, req);
1268	req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1269
1270	return ok;
1271}
1272
1273/**
1274 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1275 * @mdev:	DRBD device.
1276 * @w:		work object.
1277 * @cancel:	The connection will be closed anyways
1278 */
1279int w_send_read_req(struct drbd_work *w, int cancel)
1280{
1281	struct drbd_request *req = container_of(w, struct drbd_request, w);
1282	struct drbd_conf *mdev = w->mdev;
1283	int ok;
1284
1285	if (unlikely(cancel)) {
1286		req_mod(req, SEND_CANCELED);
1287		return 1;
1288	}
1289
1290	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1291				(unsigned long)req);
1292
1293	req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1294
1295	return ok;
1296}
1297
1298int w_restart_disk_io(struct drbd_work *w, int cancel)
1299{
1300	struct drbd_request *req = container_of(w, struct drbd_request, w);
1301	struct drbd_conf *mdev = w->mdev;
1302
1303	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1304		drbd_al_begin_io(mdev, req->i.sector);
1305	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1306	   theoretically. Practically it can not deadlock, since this is
1307	   only used when unfreezing IOs. All the extents of the requests
1308	   that made it into the TL are already active */
1309
1310	drbd_req_make_private_bio(req, req->master_bio);
1311	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1312	generic_make_request(req->private_bio);
1313
1314	return 1;
1315}
1316
1317static int _drbd_may_sync_now(struct drbd_conf *mdev)
1318{
1319	struct drbd_conf *odev = mdev;
1320
1321	while (1) {
1322		if (odev->ldev->dc.resync_after == -1)
1323			return 1;
1324		odev = minor_to_mdev(odev->ldev->dc.resync_after);
1325		if (!expect(odev))
1326			return 1;
1327		if ((odev->state.conn >= C_SYNC_SOURCE &&
1328		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1329		    odev->state.aftr_isp || odev->state.peer_isp ||
1330		    odev->state.user_isp)
1331			return 0;
1332	}
1333}
1334
1335/**
1336 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1337 * @mdev:	DRBD device.
1338 *
1339 * Called from process context only (admin command and after_state_ch).
1340 */
1341static int _drbd_pause_after(struct drbd_conf *mdev)
1342{
1343	struct drbd_conf *odev;
1344	int i, rv = 0;
1345
1346	idr_for_each_entry(&minors, odev, i) {
1347		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1348			continue;
1349		if (!_drbd_may_sync_now(odev))
1350			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1351			       != SS_NOTHING_TO_DO);
1352	}
1353
1354	return rv;
1355}
1356
1357/**
1358 * _drbd_resume_next() - Resume resync on all devices that may resync now
1359 * @mdev:	DRBD device.
1360 *
1361 * Called from process context only (admin command and worker).
1362 */
1363static int _drbd_resume_next(struct drbd_conf *mdev)
1364{
1365	struct drbd_conf *odev;
1366	int i, rv = 0;
1367
1368	idr_for_each_entry(&minors, odev, i) {
1369		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1370			continue;
1371		if (odev->state.aftr_isp) {
1372			if (_drbd_may_sync_now(odev))
1373				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1374							CS_HARD, NULL)
1375				       != SS_NOTHING_TO_DO) ;
1376		}
1377	}
1378	return rv;
1379}
1380
1381void resume_next_sg(struct drbd_conf *mdev)
1382{
1383	write_lock_irq(&global_state_lock);
1384	_drbd_resume_next(mdev);
1385	write_unlock_irq(&global_state_lock);
1386}
1387
1388void suspend_other_sg(struct drbd_conf *mdev)
1389{
1390	write_lock_irq(&global_state_lock);
1391	_drbd_pause_after(mdev);
1392	write_unlock_irq(&global_state_lock);
1393}
1394
1395static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1396{
1397	struct drbd_conf *odev;
1398
1399	if (o_minor == -1)
1400		return NO_ERROR;
1401	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1402		return ERR_SYNC_AFTER;
1403
1404	/* check for loops */
1405	odev = minor_to_mdev(o_minor);
1406	while (1) {
1407		if (odev == mdev)
1408			return ERR_SYNC_AFTER_CYCLE;
1409
1410		/* dependency chain ends here, no cycles. */
1411		if (odev->ldev->dc.resync_after == -1)
1412			return NO_ERROR;
1413
1414		/* follow the dependency chain */
1415		odev = minor_to_mdev(odev->ldev->dc.resync_after);
1416	}
1417}
1418
1419int drbd_alter_sa(struct drbd_conf *mdev, int na)
1420{
1421	int changes;
1422	int retcode;
1423
1424	write_lock_irq(&global_state_lock);
1425	retcode = sync_after_error(mdev, na);
1426	if (retcode == NO_ERROR) {
1427		mdev->ldev->dc.resync_after = na;
1428		do {
1429			changes  = _drbd_pause_after(mdev);
1430			changes |= _drbd_resume_next(mdev);
1431		} while (changes);
1432	}
1433	write_unlock_irq(&global_state_lock);
1434	return retcode;
1435}
1436
1437void drbd_rs_controller_reset(struct drbd_conf *mdev)
1438{
1439	atomic_set(&mdev->rs_sect_in, 0);
1440	atomic_set(&mdev->rs_sect_ev, 0);
1441	mdev->rs_in_flight = 0;
1442	mdev->rs_planed = 0;
1443	spin_lock(&mdev->peer_seq_lock);
1444	fifo_set(&mdev->rs_plan_s, 0);
1445	spin_unlock(&mdev->peer_seq_lock);
1446}
1447
1448void start_resync_timer_fn(unsigned long data)
1449{
1450	struct drbd_conf *mdev = (struct drbd_conf *) data;
1451
1452	drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1453}
1454
1455int w_start_resync(struct drbd_work *w, int cancel)
1456{
1457	struct drbd_conf *mdev = w->mdev;
1458
1459	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1460		dev_warn(DEV, "w_start_resync later...\n");
1461		mdev->start_resync_timer.expires = jiffies + HZ/10;
1462		add_timer(&mdev->start_resync_timer);
1463		return 1;
1464	}
1465
1466	drbd_start_resync(mdev, C_SYNC_SOURCE);
1467	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1468	return 1;
1469}
1470
1471/**
1472 * drbd_start_resync() - Start the resync process
1473 * @mdev:	DRBD device.
1474 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1475 *
1476 * This function might bring you directly into one of the
1477 * C_PAUSED_SYNC_* states.
1478 */
1479void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1480{
1481	union drbd_state ns;
1482	int r;
1483
1484	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1485		dev_err(DEV, "Resync already running!\n");
1486		return;
1487	}
1488
1489	if (mdev->state.conn < C_AHEAD) {
1490		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1491		drbd_rs_cancel_all(mdev);
1492		/* This should be done when we abort the resync. We definitely do not
1493		   want to have this for connections going back and forth between
1494		   Ahead/Behind and SyncSource/SyncTarget */
1495	}
1496
1497	if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1498		if (side == C_SYNC_TARGET) {
1499			/* Since application IO was locked out during C_WF_BITMAP_T and
1500			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1501			   we check that we might make the data inconsistent. */
1502			r = drbd_khelper(mdev, "before-resync-target");
1503			r = (r >> 8) & 0xff;
1504			if (r > 0) {
1505				dev_info(DEV, "before-resync-target handler returned %d, "
1506					 "dropping connection.\n", r);
1507				conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1508				return;
1509			}
1510		} else /* C_SYNC_SOURCE */ {
1511			r = drbd_khelper(mdev, "before-resync-source");
1512			r = (r >> 8) & 0xff;
1513			if (r > 0) {
1514				if (r == 3) {
1515					dev_info(DEV, "before-resync-source handler returned %d, "
1516						 "ignoring. Old userland tools?", r);
1517				} else {
1518					dev_info(DEV, "before-resync-source handler returned %d, "
1519						 "dropping connection.\n", r);
1520					conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1521					return;
1522				}
1523			}
1524		}
1525	}
1526
1527	if (current == mdev->tconn->worker.task) {
1528		/* The worker should not sleep waiting for state_mutex,
1529		   that can take long */
1530		if (!mutex_trylock(mdev->state_mutex)) {
1531			set_bit(B_RS_H_DONE, &mdev->flags);
1532			mdev->start_resync_timer.expires = jiffies + HZ/5;
1533			add_timer(&mdev->start_resync_timer);
1534			return;
1535		}
1536	} else {
1537		mutex_lock(mdev->state_mutex);
1538	}
1539	clear_bit(B_RS_H_DONE, &mdev->flags);
1540
1541	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1542		mutex_unlock(mdev->state_mutex);
1543		return;
1544	}
1545
1546	write_lock_irq(&global_state_lock);
1547	ns = mdev->state;
1548
1549	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1550
1551	ns.conn = side;
1552
1553	if (side == C_SYNC_TARGET)
1554		ns.disk = D_INCONSISTENT;
1555	else /* side == C_SYNC_SOURCE */
1556		ns.pdsk = D_INCONSISTENT;
1557
1558	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1559	ns = mdev->state;
1560
1561	if (ns.conn < C_CONNECTED)
1562		r = SS_UNKNOWN_ERROR;
1563
1564	if (r == SS_SUCCESS) {
1565		unsigned long tw = drbd_bm_total_weight(mdev);
1566		unsigned long now = jiffies;
1567		int i;
1568
1569		mdev->rs_failed    = 0;
1570		mdev->rs_paused    = 0;
1571		mdev->rs_same_csum = 0;
1572		mdev->rs_last_events = 0;
1573		mdev->rs_last_sect_ev = 0;
1574		mdev->rs_total     = tw;
1575		mdev->rs_start     = now;
1576		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1577			mdev->rs_mark_left[i] = tw;
1578			mdev->rs_mark_time[i] = now;
1579		}
1580		_drbd_pause_after(mdev);
1581	}
1582	write_unlock_irq(&global_state_lock);
1583
1584	if (r == SS_SUCCESS) {
1585		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1586		     drbd_conn_str(ns.conn),
1587		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1588		     (unsigned long) mdev->rs_total);
1589		if (side == C_SYNC_TARGET)
1590			mdev->bm_resync_fo = 0;
1591
1592		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1593		 * with w_send_oos, or the sync target will get confused as to
1594		 * how much bits to resync.  We cannot do that always, because for an
1595		 * empty resync and protocol < 95, we need to do it here, as we call
1596		 * drbd_resync_finished from here in that case.
1597		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1598		 * and from after_state_ch otherwise. */
1599		if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1600			drbd_gen_and_send_sync_uuid(mdev);
1601
1602		if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1603			/* This still has a race (about when exactly the peers
1604			 * detect connection loss) that can lead to a full sync
1605			 * on next handshake. In 8.3.9 we fixed this with explicit
1606			 * resync-finished notifications, but the fix
1607			 * introduces a protocol change.  Sleeping for some
1608			 * time longer than the ping interval + timeout on the
1609			 * SyncSource, to give the SyncTarget the chance to
1610			 * detect connection loss, then waiting for a ping
1611			 * response (implicit in drbd_resync_finished) reduces
1612			 * the race considerably, but does not solve it. */
1613			if (side == C_SYNC_SOURCE)
1614				schedule_timeout_interruptible(
1615					mdev->tconn->net_conf->ping_int * HZ +
1616					mdev->tconn->net_conf->ping_timeo*HZ/9);
1617			drbd_resync_finished(mdev);
1618		}
1619
1620		drbd_rs_controller_reset(mdev);
1621		/* ns.conn may already be != mdev->state.conn,
1622		 * we may have been paused in between, or become paused until
1623		 * the timer triggers.
1624		 * No matter, that is handled in resync_timer_fn() */
1625		if (ns.conn == C_SYNC_TARGET)
1626			mod_timer(&mdev->resync_timer, jiffies);
1627
1628		drbd_md_sync(mdev);
1629	}
1630	put_ldev(mdev);
1631	mutex_unlock(mdev->state_mutex);
1632}
1633
1634int drbd_worker(struct drbd_thread *thi)
1635{
1636	struct drbd_tconn *tconn = thi->tconn;
1637	struct drbd_work *w = NULL;
1638	struct drbd_conf *mdev;
1639	LIST_HEAD(work_list);
1640	int vnr, intr = 0;
1641
1642	while (get_t_state(thi) == RUNNING) {
1643		drbd_thread_current_set_cpu(thi);
1644
1645		if (down_trylock(&tconn->data.work.s)) {
1646			mutex_lock(&tconn->data.mutex);
1647			if (tconn->data.socket && !tconn->net_conf->no_cork)
1648				drbd_tcp_uncork(tconn->data.socket);
1649			mutex_unlock(&tconn->data.mutex);
1650
1651			intr = down_interruptible(&tconn->data.work.s);
1652
1653			mutex_lock(&tconn->data.mutex);
1654			if (tconn->data.socket  && !tconn->net_conf->no_cork)
1655				drbd_tcp_cork(tconn->data.socket);
1656			mutex_unlock(&tconn->data.mutex);
1657		}
1658
1659		if (intr) {
1660			flush_signals(current);
1661			if (get_t_state(thi) == RUNNING) {
1662				conn_warn(tconn, "Worker got an unexpected signal\n");
1663				continue;
1664			}
1665			break;
1666		}
1667
1668		if (get_t_state(thi) != RUNNING)
1669			break;
1670		/* With this break, we have done a down() but not consumed
1671		   the entry from the list. The cleanup code takes care of
1672		   this...   */
1673
1674		w = NULL;
1675		spin_lock_irq(&tconn->data.work.q_lock);
1676		if (list_empty(&tconn->data.work.q)) {
1677			/* something terribly wrong in our logic.
1678			 * we were able to down() the semaphore,
1679			 * but the list is empty... doh.
1680			 *
1681			 * what is the best thing to do now?
1682			 * try again from scratch, restarting the receiver,
1683			 * asender, whatnot? could break even more ugly,
1684			 * e.g. when we are primary, but no good local data.
1685			 *
1686			 * I'll try to get away just starting over this loop.
1687			 */
1688			conn_warn(tconn, "Work list unexpectedly empty\n");
1689			spin_unlock_irq(&tconn->data.work.q_lock);
1690			continue;
1691		}
1692		w = list_entry(tconn->data.work.q.next, struct drbd_work, list);
1693		list_del_init(&w->list);
1694		spin_unlock_irq(&tconn->data.work.q_lock);
1695
1696		if (!w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) {
1697			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1698			if (tconn->cstate >= C_WF_REPORT_PARAMS)
1699				conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1700		}
1701	}
1702
1703	spin_lock_irq(&tconn->data.work.q_lock);
1704	while (!list_empty(&tconn->data.work.q)) {
1705		list_splice_init(&tconn->data.work.q, &work_list);
1706		spin_unlock_irq(&tconn->data.work.q_lock);
1707
1708		while (!list_empty(&work_list)) {
1709			w = list_entry(work_list.next, struct drbd_work, list);
1710			list_del_init(&w->list);
1711			w->cb(w, 1);
1712		}
1713
1714		spin_lock_irq(&tconn->data.work.q_lock);
1715	}
1716	sema_init(&tconn->data.work.s, 0);
1717	/* DANGEROUS race: if someone did queue his work within the spinlock,
1718	 * but up() ed outside the spinlock, we could get an up() on the
1719	 * semaphore without corresponding list entry.
1720	 * So don't do that.
1721	 */
1722	spin_unlock_irq(&tconn->data.work.q_lock);
1723
1724	drbd_thread_stop(&tconn->receiver);
1725	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1726		D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1727		/* _drbd_set_state only uses stop_nowait.
1728		 * wait here for the exiting receiver. */
1729		drbd_mdev_cleanup(mdev);
1730	}
1731	clear_bit(OBJECT_DYING, &tconn->flags);
1732	clear_bit(CONFIG_PENDING, &tconn->flags);
1733	wake_up(&tconn->ping_wait);
1734
1735	return 0;
1736}
1737