drbd_worker.c revision 0625ac190d222fd0855bad79e93f1556fc45dd20
1/*
2   drbd_worker.c
3
4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10   drbd is free software; you can redistribute it and/or modify
11   it under the terms of the GNU General Public License as published by
12   the Free Software Foundation; either version 2, or (at your option)
13   any later version.
14
15   drbd is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License for more details.
19
20   You should have received a copy of the GNU General Public License
21   along with drbd; see the file COPYING.  If not, write to
22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26#include <linux/module.h>
27#include <linux/drbd.h>
28#include <linux/sched.h>
29#include <linux/wait.h>
30#include <linux/mm.h>
31#include <linux/memcontrol.h>
32#include <linux/mm_inline.h>
33#include <linux/slab.h>
34#include <linux/random.h>
35#include <linux/string.h>
36#include <linux/scatterlist.h>
37
38#include "drbd_int.h"
39#include "drbd_req.h"
40
41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
42static int w_make_resync_request(struct drbd_conf *mdev,
43				 struct drbd_work *w, int cancel);
44
45
46
47/* endio handlers:
48 *   drbd_md_io_complete (defined here)
49 *   drbd_endio_pri (defined here)
50 *   drbd_endio_sec (defined here)
51 *   bm_async_io_complete (defined in drbd_bitmap.c)
52 *
53 * For all these callbacks, note the following:
54 * The callbacks will be called in irq context by the IDE drivers,
55 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
56 * Try to get the locking right :)
57 *
58 */
59
60
61/* About the global_state_lock
62   Each state transition on an device holds a read lock. In case we have
63   to evaluate the sync after dependencies, we grab a write lock, because
64   we need stable states on all devices for that.  */
65rwlock_t global_state_lock;
66
67/* used for synchronous meta data and bitmap IO
68 * submitted by drbd_md_sync_page_io()
69 */
70void drbd_md_io_complete(struct bio *bio, int error)
71{
72	struct drbd_md_io *md_io;
73
74	md_io = (struct drbd_md_io *)bio->bi_private;
75	md_io->error = error;
76
77	complete(&md_io->event);
78}
79
80/* reads on behalf of the partner,
81 * "submitted" by the receiver
82 */
83void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
84{
85	unsigned long flags = 0;
86	struct drbd_conf *mdev = peer_req->mdev;
87
88	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
89	mdev->read_cnt += peer_req->i.size >> 9;
90	list_del(&peer_req->w.list);
91	if (list_empty(&mdev->read_ee))
92		wake_up(&mdev->ee_wait);
93	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
94		__drbd_chk_io_error(mdev, false);
95	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
96
97	drbd_queue_work(&mdev->tconn->data.work, &peer_req->w);
98	put_ldev(mdev);
99}
100
101/* writes on behalf of the partner, or resync writes,
102 * "submitted" by the receiver, final stage.  */
103static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
104{
105	unsigned long flags = 0;
106	struct drbd_conf *mdev = peer_req->mdev;
107	sector_t e_sector;
108	int do_wake;
109	u64 block_id;
110	int do_al_complete_io;
111
112	/* after we moved peer_req to done_ee,
113	 * we may no longer access it,
114	 * it may be freed/reused already!
115	 * (as soon as we release the req_lock) */
116	e_sector = peer_req->i.sector;
117	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
118	block_id = peer_req->block_id;
119
120	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
121	mdev->writ_cnt += peer_req->i.size >> 9;
122	list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */
123	list_add_tail(&peer_req->w.list, &mdev->done_ee);
124
125	/*
126	 * Do not remove from the write_requests tree here: we did not send the
127	 * Ack yet and did not wake possibly waiting conflicting requests.
128	 * Removed from the tree from "drbd_process_done_ee" within the
129	 * appropriate w.cb (e_end_block/e_end_resync_block) or from
130	 * _drbd_clear_done_ee.
131	 */
132
133	do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
134
135	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
136		__drbd_chk_io_error(mdev, false);
137	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
138
139	if (block_id == ID_SYNCER)
140		drbd_rs_complete_io(mdev, e_sector);
141
142	if (do_wake)
143		wake_up(&mdev->ee_wait);
144
145	if (do_al_complete_io)
146		drbd_al_complete_io(mdev, e_sector);
147
148	wake_asender(mdev->tconn);
149	put_ldev(mdev);
150}
151
152/* writes on behalf of the partner, or resync writes,
153 * "submitted" by the receiver.
154 */
155void drbd_endio_sec(struct bio *bio, int error)
156{
157	struct drbd_peer_request *peer_req = bio->bi_private;
158	struct drbd_conf *mdev = peer_req->mdev;
159	int uptodate = bio_flagged(bio, BIO_UPTODATE);
160	int is_write = bio_data_dir(bio) == WRITE;
161
162	if (error && __ratelimit(&drbd_ratelimit_state))
163		dev_warn(DEV, "%s: error=%d s=%llus\n",
164				is_write ? "write" : "read", error,
165				(unsigned long long)peer_req->i.sector);
166	if (!error && !uptodate) {
167		if (__ratelimit(&drbd_ratelimit_state))
168			dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
169					is_write ? "write" : "read",
170					(unsigned long long)peer_req->i.sector);
171		/* strange behavior of some lower level drivers...
172		 * fail the request by clearing the uptodate flag,
173		 * but do not return any error?! */
174		error = -EIO;
175	}
176
177	if (error)
178		set_bit(__EE_WAS_ERROR, &peer_req->flags);
179
180	bio_put(bio); /* no need for the bio anymore */
181	if (atomic_dec_and_test(&peer_req->pending_bios)) {
182		if (is_write)
183			drbd_endio_write_sec_final(peer_req);
184		else
185			drbd_endio_read_sec_final(peer_req);
186	}
187}
188
189/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
190 */
191void drbd_endio_pri(struct bio *bio, int error)
192{
193	unsigned long flags;
194	struct drbd_request *req = bio->bi_private;
195	struct drbd_conf *mdev = req->mdev;
196	struct bio_and_error m;
197	enum drbd_req_event what;
198	int uptodate = bio_flagged(bio, BIO_UPTODATE);
199
200	if (!error && !uptodate) {
201		dev_warn(DEV, "p %s: setting error to -EIO\n",
202			 bio_data_dir(bio) == WRITE ? "write" : "read");
203		/* strange behavior of some lower level drivers...
204		 * fail the request by clearing the uptodate flag,
205		 * but do not return any error?! */
206		error = -EIO;
207	}
208
209	/* to avoid recursion in __req_mod */
210	if (unlikely(error)) {
211		what = (bio_data_dir(bio) == WRITE)
212			? WRITE_COMPLETED_WITH_ERROR
213			: (bio_rw(bio) == READ)
214			  ? READ_COMPLETED_WITH_ERROR
215			  : READ_AHEAD_COMPLETED_WITH_ERROR;
216	} else
217		what = COMPLETED_OK;
218
219	bio_put(req->private_bio);
220	req->private_bio = ERR_PTR(error);
221
222	/* not req_mod(), we need irqsave here! */
223	spin_lock_irqsave(&mdev->tconn->req_lock, flags);
224	__req_mod(req, what, &m);
225	spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
226
227	if (m.bio)
228		complete_master_bio(mdev, &m);
229}
230
231int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
232{
233	struct drbd_request *req = container_of(w, struct drbd_request, w);
234
235	/* We should not detach for read io-error,
236	 * but try to WRITE the P_DATA_REPLY to the failed location,
237	 * to give the disk the chance to relocate that block */
238
239	spin_lock_irq(&mdev->tconn->req_lock);
240	if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
241		_req_mod(req, READ_RETRY_REMOTE_CANCELED);
242		spin_unlock_irq(&mdev->tconn->req_lock);
243		return 1;
244	}
245	spin_unlock_irq(&mdev->tconn->req_lock);
246
247	return w_send_read_req(mdev, w, 0);
248}
249
250void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
251		  struct drbd_peer_request *peer_req, void *digest)
252{
253	struct hash_desc desc;
254	struct scatterlist sg;
255	struct page *page = peer_req->pages;
256	struct page *tmp;
257	unsigned len;
258
259	desc.tfm = tfm;
260	desc.flags = 0;
261
262	sg_init_table(&sg, 1);
263	crypto_hash_init(&desc);
264
265	while ((tmp = page_chain_next(page))) {
266		/* all but the last page will be fully used */
267		sg_set_page(&sg, page, PAGE_SIZE, 0);
268		crypto_hash_update(&desc, &sg, sg.length);
269		page = tmp;
270	}
271	/* and now the last, possibly only partially used page */
272	len = peer_req->i.size & (PAGE_SIZE - 1);
273	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
274	crypto_hash_update(&desc, &sg, sg.length);
275	crypto_hash_final(&desc, digest);
276}
277
278void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
279{
280	struct hash_desc desc;
281	struct scatterlist sg;
282	struct bio_vec *bvec;
283	int i;
284
285	desc.tfm = tfm;
286	desc.flags = 0;
287
288	sg_init_table(&sg, 1);
289	crypto_hash_init(&desc);
290
291	__bio_for_each_segment(bvec, bio, i, 0) {
292		sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
293		crypto_hash_update(&desc, &sg, sg.length);
294	}
295	crypto_hash_final(&desc, digest);
296}
297
298/* TODO merge common code with w_e_end_ov_req */
299int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
300{
301	struct drbd_peer_request *peer_req =
302		container_of(w, struct drbd_peer_request, w);
303	int digest_size;
304	void *digest;
305	int ok = 1;
306
307	if (unlikely(cancel))
308		goto out;
309
310	if (likely((peer_req->flags & EE_WAS_ERROR) != 0))
311		goto out;
312
313	digest_size = crypto_hash_digestsize(mdev->csums_tfm);
314	digest = kmalloc(digest_size, GFP_NOIO);
315	if (digest) {
316		sector_t sector = peer_req->i.sector;
317		unsigned int size = peer_req->i.size;
318		drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
319		/* Free e and pages before send.
320		 * In case we block on congestion, we could otherwise run into
321		 * some distributed deadlock, if the other side blocks on
322		 * congestion as well, because our receiver blocks in
323		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
324		drbd_free_ee(mdev, peer_req);
325		peer_req = NULL;
326		inc_rs_pending(mdev);
327		ok = drbd_send_drequest_csum(mdev, sector, size,
328					     digest, digest_size,
329					     P_CSUM_RS_REQUEST);
330		kfree(digest);
331	} else {
332		dev_err(DEV, "kmalloc() of digest failed.\n");
333		ok = 0;
334	}
335
336out:
337	if (peer_req)
338		drbd_free_ee(mdev, peer_req);
339
340	if (unlikely(!ok))
341		dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
342	return ok;
343}
344
345#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
346
347static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
348{
349	struct drbd_peer_request *peer_req;
350
351	if (!get_ldev(mdev))
352		return -EIO;
353
354	if (drbd_rs_should_slow_down(mdev, sector))
355		goto defer;
356
357	/* GFP_TRY, because if there is no memory available right now, this may
358	 * be rescheduled for later. It is "only" background resync, after all. */
359	peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY);
360	if (!peer_req)
361		goto defer;
362
363	peer_req->w.cb = w_e_send_csum;
364	spin_lock_irq(&mdev->tconn->req_lock);
365	list_add(&peer_req->w.list, &mdev->read_ee);
366	spin_unlock_irq(&mdev->tconn->req_lock);
367
368	atomic_add(size >> 9, &mdev->rs_sect_ev);
369	if (drbd_submit_ee(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
370		return 0;
371
372	/* If it failed because of ENOMEM, retry should help.  If it failed
373	 * because bio_add_page failed (probably broken lower level driver),
374	 * retry may or may not help.
375	 * If it does not, you may need to force disconnect. */
376	spin_lock_irq(&mdev->tconn->req_lock);
377	list_del(&peer_req->w.list);
378	spin_unlock_irq(&mdev->tconn->req_lock);
379
380	drbd_free_ee(mdev, peer_req);
381defer:
382	put_ldev(mdev);
383	return -EAGAIN;
384}
385
386int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
387{
388	switch (mdev->state.conn) {
389	case C_VERIFY_S:
390		w_make_ov_request(mdev, w, cancel);
391		break;
392	case C_SYNC_TARGET:
393		w_make_resync_request(mdev, w, cancel);
394		break;
395	}
396
397	return 1;
398}
399
400void resync_timer_fn(unsigned long data)
401{
402	struct drbd_conf *mdev = (struct drbd_conf *) data;
403
404	if (list_empty(&mdev->resync_work.list))
405		drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work);
406}
407
408static void fifo_set(struct fifo_buffer *fb, int value)
409{
410	int i;
411
412	for (i = 0; i < fb->size; i++)
413		fb->values[i] = value;
414}
415
416static int fifo_push(struct fifo_buffer *fb, int value)
417{
418	int ov;
419
420	ov = fb->values[fb->head_index];
421	fb->values[fb->head_index++] = value;
422
423	if (fb->head_index >= fb->size)
424		fb->head_index = 0;
425
426	return ov;
427}
428
429static void fifo_add_val(struct fifo_buffer *fb, int value)
430{
431	int i;
432
433	for (i = 0; i < fb->size; i++)
434		fb->values[i] += value;
435}
436
437static int drbd_rs_controller(struct drbd_conf *mdev)
438{
439	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
440	unsigned int want;     /* The number of sectors we want in the proxy */
441	int req_sect; /* Number of sectors to request in this turn */
442	int correction; /* Number of sectors more we need in the proxy*/
443	int cps; /* correction per invocation of drbd_rs_controller() */
444	int steps; /* Number of time steps to plan ahead */
445	int curr_corr;
446	int max_sect;
447
448	sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
449	mdev->rs_in_flight -= sect_in;
450
451	spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
452
453	steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
454
455	if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
456		want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
457	} else { /* normal path */
458		want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
459			sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
460	}
461
462	correction = want - mdev->rs_in_flight - mdev->rs_planed;
463
464	/* Plan ahead */
465	cps = correction / steps;
466	fifo_add_val(&mdev->rs_plan_s, cps);
467	mdev->rs_planed += cps * steps;
468
469	/* What we do in this step */
470	curr_corr = fifo_push(&mdev->rs_plan_s, 0);
471	spin_unlock(&mdev->peer_seq_lock);
472	mdev->rs_planed -= curr_corr;
473
474	req_sect = sect_in + curr_corr;
475	if (req_sect < 0)
476		req_sect = 0;
477
478	max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
479	if (req_sect > max_sect)
480		req_sect = max_sect;
481
482	/*
483	dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
484		 sect_in, mdev->rs_in_flight, want, correction,
485		 steps, cps, mdev->rs_planed, curr_corr, req_sect);
486	*/
487
488	return req_sect;
489}
490
491static int drbd_rs_number_requests(struct drbd_conf *mdev)
492{
493	int number;
494	if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
495		number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
496		mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
497	} else {
498		mdev->c_sync_rate = mdev->sync_conf.rate;
499		number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
500	}
501
502	/* ignore the amount of pending requests, the resync controller should
503	 * throttle down to incoming reply rate soon enough anyways. */
504	return number;
505}
506
507static int w_make_resync_request(struct drbd_conf *mdev,
508				 struct drbd_work *w, int cancel)
509{
510	unsigned long bit;
511	sector_t sector;
512	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
513	int max_bio_size;
514	int number, rollback_i, size;
515	int align, queued, sndbuf;
516	int i = 0;
517
518	if (unlikely(cancel))
519		return 1;
520
521	if (mdev->rs_total == 0) {
522		/* empty resync? */
523		drbd_resync_finished(mdev);
524		return 1;
525	}
526
527	if (!get_ldev(mdev)) {
528		/* Since we only need to access mdev->rsync a
529		   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
530		   to continue resync with a broken disk makes no sense at
531		   all */
532		dev_err(DEV, "Disk broke down during resync!\n");
533		return 1;
534	}
535
536	max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
537	number = drbd_rs_number_requests(mdev);
538	if (number == 0)
539		goto requeue;
540
541	for (i = 0; i < number; i++) {
542		/* Stop generating RS requests, when half of the send buffer is filled */
543		mutex_lock(&mdev->tconn->data.mutex);
544		if (mdev->tconn->data.socket) {
545			queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
546			sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
547		} else {
548			queued = 1;
549			sndbuf = 0;
550		}
551		mutex_unlock(&mdev->tconn->data.mutex);
552		if (queued > sndbuf / 2)
553			goto requeue;
554
555next_sector:
556		size = BM_BLOCK_SIZE;
557		bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
558
559		if (bit == DRBD_END_OF_BITMAP) {
560			mdev->bm_resync_fo = drbd_bm_bits(mdev);
561			put_ldev(mdev);
562			return 1;
563		}
564
565		sector = BM_BIT_TO_SECT(bit);
566
567		if (drbd_rs_should_slow_down(mdev, sector) ||
568		    drbd_try_rs_begin_io(mdev, sector)) {
569			mdev->bm_resync_fo = bit;
570			goto requeue;
571		}
572		mdev->bm_resync_fo = bit + 1;
573
574		if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
575			drbd_rs_complete_io(mdev, sector);
576			goto next_sector;
577		}
578
579#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
580		/* try to find some adjacent bits.
581		 * we stop if we have already the maximum req size.
582		 *
583		 * Additionally always align bigger requests, in order to
584		 * be prepared for all stripe sizes of software RAIDs.
585		 */
586		align = 1;
587		rollback_i = i;
588		for (;;) {
589			if (size + BM_BLOCK_SIZE > max_bio_size)
590				break;
591
592			/* Be always aligned */
593			if (sector & ((1<<(align+3))-1))
594				break;
595
596			/* do not cross extent boundaries */
597			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
598				break;
599			/* now, is it actually dirty, after all?
600			 * caution, drbd_bm_test_bit is tri-state for some
601			 * obscure reason; ( b == 0 ) would get the out-of-band
602			 * only accidentally right because of the "oddly sized"
603			 * adjustment below */
604			if (drbd_bm_test_bit(mdev, bit+1) != 1)
605				break;
606			bit++;
607			size += BM_BLOCK_SIZE;
608			if ((BM_BLOCK_SIZE << align) <= size)
609				align++;
610			i++;
611		}
612		/* if we merged some,
613		 * reset the offset to start the next drbd_bm_find_next from */
614		if (size > BM_BLOCK_SIZE)
615			mdev->bm_resync_fo = bit + 1;
616#endif
617
618		/* adjust very last sectors, in case we are oddly sized */
619		if (sector + (size>>9) > capacity)
620			size = (capacity-sector)<<9;
621		if (mdev->tconn->agreed_pro_version >= 89 && mdev->csums_tfm) {
622			switch (read_for_csum(mdev, sector, size)) {
623			case -EIO: /* Disk failure */
624				put_ldev(mdev);
625				return 0;
626			case -EAGAIN: /* allocation failed, or ldev busy */
627				drbd_rs_complete_io(mdev, sector);
628				mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
629				i = rollback_i;
630				goto requeue;
631			case 0:
632				/* everything ok */
633				break;
634			default:
635				BUG();
636			}
637		} else {
638			inc_rs_pending(mdev);
639			if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
640					       sector, size, ID_SYNCER)) {
641				dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
642				dec_rs_pending(mdev);
643				put_ldev(mdev);
644				return 0;
645			}
646		}
647	}
648
649	if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
650		/* last syncer _request_ was sent,
651		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
652		 * next sync group will resume), as soon as we receive the last
653		 * resync data block, and the last bit is cleared.
654		 * until then resync "work" is "inactive" ...
655		 */
656		put_ldev(mdev);
657		return 1;
658	}
659
660 requeue:
661	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
662	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
663	put_ldev(mdev);
664	return 1;
665}
666
667static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
668{
669	int number, i, size;
670	sector_t sector;
671	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
672
673	if (unlikely(cancel))
674		return 1;
675
676	number = drbd_rs_number_requests(mdev);
677
678	sector = mdev->ov_position;
679	for (i = 0; i < number; i++) {
680		if (sector >= capacity) {
681			return 1;
682		}
683
684		size = BM_BLOCK_SIZE;
685
686		if (drbd_rs_should_slow_down(mdev, sector) ||
687		    drbd_try_rs_begin_io(mdev, sector)) {
688			mdev->ov_position = sector;
689			goto requeue;
690		}
691
692		if (sector + (size>>9) > capacity)
693			size = (capacity-sector)<<9;
694
695		inc_rs_pending(mdev);
696		if (!drbd_send_ov_request(mdev, sector, size)) {
697			dec_rs_pending(mdev);
698			return 0;
699		}
700		sector += BM_SECT_PER_BIT;
701	}
702	mdev->ov_position = sector;
703
704 requeue:
705	mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
706	mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
707	return 1;
708}
709
710int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
711{
712	kfree(w);
713	ov_oos_print(mdev);
714	drbd_resync_finished(mdev);
715
716	return 1;
717}
718
719static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
720{
721	kfree(w);
722
723	drbd_resync_finished(mdev);
724
725	return 1;
726}
727
728static void ping_peer(struct drbd_conf *mdev)
729{
730	clear_bit(GOT_PING_ACK, &mdev->flags);
731	request_ping(mdev->tconn);
732	wait_event(mdev->misc_wait,
733		   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
734}
735
736int drbd_resync_finished(struct drbd_conf *mdev)
737{
738	unsigned long db, dt, dbdt;
739	unsigned long n_oos;
740	union drbd_state os, ns;
741	struct drbd_work *w;
742	char *khelper_cmd = NULL;
743	int verify_done = 0;
744
745	/* Remove all elements from the resync LRU. Since future actions
746	 * might set bits in the (main) bitmap, then the entries in the
747	 * resync LRU would be wrong. */
748	if (drbd_rs_del_all(mdev)) {
749		/* In case this is not possible now, most probably because
750		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
751		 * queue (or even the read operations for those packets
752		 * is not finished by now).   Retry in 100ms. */
753
754		schedule_timeout_interruptible(HZ / 10);
755		w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
756		if (w) {
757			w->cb = w_resync_finished;
758			drbd_queue_work(&mdev->tconn->data.work, w);
759			return 1;
760		}
761		dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
762	}
763
764	dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
765	if (dt <= 0)
766		dt = 1;
767	db = mdev->rs_total;
768	dbdt = Bit2KB(db/dt);
769	mdev->rs_paused /= HZ;
770
771	if (!get_ldev(mdev))
772		goto out;
773
774	ping_peer(mdev);
775
776	spin_lock_irq(&mdev->tconn->req_lock);
777	os = mdev->state;
778
779	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
780
781	/* This protects us against multiple calls (that can happen in the presence
782	   of application IO), and against connectivity loss just before we arrive here. */
783	if (os.conn <= C_CONNECTED)
784		goto out_unlock;
785
786	ns = os;
787	ns.conn = C_CONNECTED;
788
789	dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
790	     verify_done ? "Online verify " : "Resync",
791	     dt + mdev->rs_paused, mdev->rs_paused, dbdt);
792
793	n_oos = drbd_bm_total_weight(mdev);
794
795	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
796		if (n_oos) {
797			dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
798			      n_oos, Bit2KB(1));
799			khelper_cmd = "out-of-sync";
800		}
801	} else {
802		D_ASSERT((n_oos - mdev->rs_failed) == 0);
803
804		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
805			khelper_cmd = "after-resync-target";
806
807		if (mdev->csums_tfm && mdev->rs_total) {
808			const unsigned long s = mdev->rs_same_csum;
809			const unsigned long t = mdev->rs_total;
810			const int ratio =
811				(t == 0)     ? 0 :
812			(t < 100000) ? ((s*100)/t) : (s/(t/100));
813			dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
814			     "transferred %luK total %luK\n",
815			     ratio,
816			     Bit2KB(mdev->rs_same_csum),
817			     Bit2KB(mdev->rs_total - mdev->rs_same_csum),
818			     Bit2KB(mdev->rs_total));
819		}
820	}
821
822	if (mdev->rs_failed) {
823		dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
824
825		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
826			ns.disk = D_INCONSISTENT;
827			ns.pdsk = D_UP_TO_DATE;
828		} else {
829			ns.disk = D_UP_TO_DATE;
830			ns.pdsk = D_INCONSISTENT;
831		}
832	} else {
833		ns.disk = D_UP_TO_DATE;
834		ns.pdsk = D_UP_TO_DATE;
835
836		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
837			if (mdev->p_uuid) {
838				int i;
839				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
840					_drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
841				drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
842				_drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
843			} else {
844				dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
845			}
846		}
847
848		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
849			/* for verify runs, we don't update uuids here,
850			 * so there would be nothing to report. */
851			drbd_uuid_set_bm(mdev, 0UL);
852			drbd_print_uuids(mdev, "updated UUIDs");
853			if (mdev->p_uuid) {
854				/* Now the two UUID sets are equal, update what we
855				 * know of the peer. */
856				int i;
857				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
858					mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
859			}
860		}
861	}
862
863	_drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
864out_unlock:
865	spin_unlock_irq(&mdev->tconn->req_lock);
866	put_ldev(mdev);
867out:
868	mdev->rs_total  = 0;
869	mdev->rs_failed = 0;
870	mdev->rs_paused = 0;
871	if (verify_done)
872		mdev->ov_start_sector = 0;
873
874	drbd_md_sync(mdev);
875
876	if (khelper_cmd)
877		drbd_khelper(mdev, khelper_cmd);
878
879	return 1;
880}
881
882/* helper */
883static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
884{
885	if (drbd_ee_has_active_page(peer_req)) {
886		/* This might happen if sendpage() has not finished */
887		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
888		atomic_add(i, &mdev->pp_in_use_by_net);
889		atomic_sub(i, &mdev->pp_in_use);
890		spin_lock_irq(&mdev->tconn->req_lock);
891		list_add_tail(&peer_req->w.list, &mdev->net_ee);
892		spin_unlock_irq(&mdev->tconn->req_lock);
893		wake_up(&drbd_pp_wait);
894	} else
895		drbd_free_ee(mdev, peer_req);
896}
897
898/**
899 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
900 * @mdev:	DRBD device.
901 * @w:		work object.
902 * @cancel:	The connection will be closed anyways
903 */
904int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
905{
906	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
907	int ok;
908
909	if (unlikely(cancel)) {
910		drbd_free_ee(mdev, peer_req);
911		dec_unacked(mdev);
912		return 1;
913	}
914
915	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
916		ok = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
917	} else {
918		if (__ratelimit(&drbd_ratelimit_state))
919			dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
920			    (unsigned long long)peer_req->i.sector);
921
922		ok = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
923	}
924
925	dec_unacked(mdev);
926
927	move_to_net_ee_or_free(mdev, peer_req);
928
929	if (unlikely(!ok))
930		dev_err(DEV, "drbd_send_block() failed\n");
931	return ok;
932}
933
934/**
935 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
936 * @mdev:	DRBD device.
937 * @w:		work object.
938 * @cancel:	The connection will be closed anyways
939 */
940int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
941{
942	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
943	int ok;
944
945	if (unlikely(cancel)) {
946		drbd_free_ee(mdev, peer_req);
947		dec_unacked(mdev);
948		return 1;
949	}
950
951	if (get_ldev_if_state(mdev, D_FAILED)) {
952		drbd_rs_complete_io(mdev, peer_req->i.sector);
953		put_ldev(mdev);
954	}
955
956	if (mdev->state.conn == C_AHEAD) {
957		ok = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
958	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
959		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
960			inc_rs_pending(mdev);
961			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
962		} else {
963			if (__ratelimit(&drbd_ratelimit_state))
964				dev_err(DEV, "Not sending RSDataReply, "
965				    "partner DISKLESS!\n");
966			ok = 1;
967		}
968	} else {
969		if (__ratelimit(&drbd_ratelimit_state))
970			dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
971			    (unsigned long long)peer_req->i.sector);
972
973		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
974
975		/* update resync data with failure */
976		drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
977	}
978
979	dec_unacked(mdev);
980
981	move_to_net_ee_or_free(mdev, peer_req);
982
983	if (unlikely(!ok))
984		dev_err(DEV, "drbd_send_block() failed\n");
985	return ok;
986}
987
988int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
989{
990	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
991	struct digest_info *di;
992	int digest_size;
993	void *digest = NULL;
994	int ok, eq = 0;
995
996	if (unlikely(cancel)) {
997		drbd_free_ee(mdev, peer_req);
998		dec_unacked(mdev);
999		return 1;
1000	}
1001
1002	if (get_ldev(mdev)) {
1003		drbd_rs_complete_io(mdev, peer_req->i.sector);
1004		put_ldev(mdev);
1005	}
1006
1007	di = peer_req->digest;
1008
1009	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1010		/* quick hack to try to avoid a race against reconfiguration.
1011		 * a real fix would be much more involved,
1012		 * introducing more locking mechanisms */
1013		if (mdev->csums_tfm) {
1014			digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1015			D_ASSERT(digest_size == di->digest_size);
1016			digest = kmalloc(digest_size, GFP_NOIO);
1017		}
1018		if (digest) {
1019			drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest);
1020			eq = !memcmp(digest, di->digest, digest_size);
1021			kfree(digest);
1022		}
1023
1024		if (eq) {
1025			drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1026			/* rs_same_csums unit is BM_BLOCK_SIZE */
1027			mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1028			ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1029		} else {
1030			inc_rs_pending(mdev);
1031			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1032			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1033			kfree(di);
1034			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1035		}
1036	} else {
1037		ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1038		if (__ratelimit(&drbd_ratelimit_state))
1039			dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1040	}
1041
1042	dec_unacked(mdev);
1043	move_to_net_ee_or_free(mdev, peer_req);
1044
1045	if (unlikely(!ok))
1046		dev_err(DEV, "drbd_send_block/ack() failed\n");
1047	return ok;
1048}
1049
1050/* TODO merge common code with w_e_send_csum */
1051int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1052{
1053	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1054	sector_t sector = peer_req->i.sector;
1055	unsigned int size = peer_req->i.size;
1056	int digest_size;
1057	void *digest;
1058	int ok = 1;
1059
1060	if (unlikely(cancel))
1061		goto out;
1062
1063	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1064	digest = kmalloc(digest_size, GFP_NOIO);
1065	if (!digest) {
1066		ok = 0;	/* terminate the connection in case the allocation failed */
1067		goto out;
1068	}
1069
1070	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1071		drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1072	else
1073		memset(digest, 0, digest_size);
1074
1075	/* Free e and pages before send.
1076	 * In case we block on congestion, we could otherwise run into
1077	 * some distributed deadlock, if the other side blocks on
1078	 * congestion as well, because our receiver blocks in
1079	 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1080	drbd_free_ee(mdev, peer_req);
1081	peer_req = NULL;
1082	inc_rs_pending(mdev);
1083	ok = drbd_send_drequest_csum(mdev, sector, size,
1084				     digest, digest_size,
1085				     P_OV_REPLY);
1086	if (!ok)
1087		dec_rs_pending(mdev);
1088	kfree(digest);
1089
1090out:
1091	if (peer_req)
1092		drbd_free_ee(mdev, peer_req);
1093	dec_unacked(mdev);
1094	return ok;
1095}
1096
1097void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1098{
1099	if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1100		mdev->ov_last_oos_size += size>>9;
1101	} else {
1102		mdev->ov_last_oos_start = sector;
1103		mdev->ov_last_oos_size = size>>9;
1104	}
1105	drbd_set_out_of_sync(mdev, sector, size);
1106}
1107
1108int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1109{
1110	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1111	struct digest_info *di;
1112	void *digest;
1113	sector_t sector = peer_req->i.sector;
1114	unsigned int size = peer_req->i.size;
1115	int digest_size;
1116	int ok, eq = 0;
1117
1118	if (unlikely(cancel)) {
1119		drbd_free_ee(mdev, peer_req);
1120		dec_unacked(mdev);
1121		return 1;
1122	}
1123
1124	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1125	 * the resync lru has been cleaned up already */
1126	if (get_ldev(mdev)) {
1127		drbd_rs_complete_io(mdev, peer_req->i.sector);
1128		put_ldev(mdev);
1129	}
1130
1131	di = peer_req->digest;
1132
1133	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1134		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1135		digest = kmalloc(digest_size, GFP_NOIO);
1136		if (digest) {
1137			drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest);
1138
1139			D_ASSERT(digest_size == di->digest_size);
1140			eq = !memcmp(digest, di->digest, digest_size);
1141			kfree(digest);
1142		}
1143	}
1144
1145		/* Free e and pages before send.
1146		 * In case we block on congestion, we could otherwise run into
1147		 * some distributed deadlock, if the other side blocks on
1148		 * congestion as well, because our receiver blocks in
1149		 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1150	drbd_free_ee(mdev, peer_req);
1151	if (!eq)
1152		drbd_ov_oos_found(mdev, sector, size);
1153	else
1154		ov_oos_print(mdev);
1155
1156	ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1157			      eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1158
1159	dec_unacked(mdev);
1160
1161	--mdev->ov_left;
1162
1163	/* let's advance progress step marks only for every other megabyte */
1164	if ((mdev->ov_left & 0x200) == 0x200)
1165		drbd_advance_rs_marks(mdev, mdev->ov_left);
1166
1167	if (mdev->ov_left == 0) {
1168		ov_oos_print(mdev);
1169		drbd_resync_finished(mdev);
1170	}
1171
1172	return ok;
1173}
1174
1175int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1176{
1177	struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1178	complete(&b->done);
1179	return 1;
1180}
1181
1182int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1183{
1184	struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1185	struct p_barrier *p = &mdev->tconn->data.sbuf.barrier;
1186	int ok = 1;
1187
1188	/* really avoid racing with tl_clear.  w.cb may have been referenced
1189	 * just before it was reassigned and re-queued, so double check that.
1190	 * actually, this race was harmless, since we only try to send the
1191	 * barrier packet here, and otherwise do nothing with the object.
1192	 * but compare with the head of w_clear_epoch */
1193	spin_lock_irq(&mdev->tconn->req_lock);
1194	if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1195		cancel = 1;
1196	spin_unlock_irq(&mdev->tconn->req_lock);
1197	if (cancel)
1198		return 1;
1199
1200	if (!drbd_get_data_sock(mdev))
1201		return 0;
1202	p->barrier = b->br_number;
1203	/* inc_ap_pending was done where this was queued.
1204	 * dec_ap_pending will be done in got_BarrierAck
1205	 * or (on connection loss) in w_clear_epoch.  */
1206	ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER,
1207			    &p->head, sizeof(*p), 0);
1208	drbd_put_data_sock(mdev);
1209
1210	return ok;
1211}
1212
1213int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1214{
1215	if (cancel)
1216		return 1;
1217	return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1218}
1219
1220int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1221{
1222	struct drbd_request *req = container_of(w, struct drbd_request, w);
1223	int ok;
1224
1225	if (unlikely(cancel)) {
1226		req_mod(req, SEND_CANCELED);
1227		return 1;
1228	}
1229
1230	ok = drbd_send_oos(mdev, req);
1231	req_mod(req, OOS_HANDED_TO_NETWORK);
1232
1233	return ok;
1234}
1235
1236/**
1237 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1238 * @mdev:	DRBD device.
1239 * @w:		work object.
1240 * @cancel:	The connection will be closed anyways
1241 */
1242int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1243{
1244	struct drbd_request *req = container_of(w, struct drbd_request, w);
1245	int ok;
1246
1247	if (unlikely(cancel)) {
1248		req_mod(req, SEND_CANCELED);
1249		return 1;
1250	}
1251
1252	ok = drbd_send_dblock(mdev, req);
1253	req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1254
1255	return ok;
1256}
1257
1258/**
1259 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1260 * @mdev:	DRBD device.
1261 * @w:		work object.
1262 * @cancel:	The connection will be closed anyways
1263 */
1264int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1265{
1266	struct drbd_request *req = container_of(w, struct drbd_request, w);
1267	int ok;
1268
1269	if (unlikely(cancel)) {
1270		req_mod(req, SEND_CANCELED);
1271		return 1;
1272	}
1273
1274	ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1275				(unsigned long)req);
1276
1277	if (!ok) {
1278		/* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1279		 * so this is probably redundant */
1280		if (mdev->state.conn >= C_CONNECTED)
1281			drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1282	}
1283	req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED);
1284
1285	return ok;
1286}
1287
1288int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1289{
1290	struct drbd_request *req = container_of(w, struct drbd_request, w);
1291
1292	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1293		drbd_al_begin_io(mdev, req->i.sector);
1294	/* Calling drbd_al_begin_io() out of the worker might deadlocks
1295	   theoretically. Practically it can not deadlock, since this is
1296	   only used when unfreezing IOs. All the extents of the requests
1297	   that made it into the TL are already active */
1298
1299	drbd_req_make_private_bio(req, req->master_bio);
1300	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1301	generic_make_request(req->private_bio);
1302
1303	return 1;
1304}
1305
1306static int _drbd_may_sync_now(struct drbd_conf *mdev)
1307{
1308	struct drbd_conf *odev = mdev;
1309
1310	while (1) {
1311		if (odev->sync_conf.after == -1)
1312			return 1;
1313		odev = minor_to_mdev(odev->sync_conf.after);
1314		if (!expect(odev))
1315			return 1;
1316		if ((odev->state.conn >= C_SYNC_SOURCE &&
1317		     odev->state.conn <= C_PAUSED_SYNC_T) ||
1318		    odev->state.aftr_isp || odev->state.peer_isp ||
1319		    odev->state.user_isp)
1320			return 0;
1321	}
1322}
1323
1324/**
1325 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1326 * @mdev:	DRBD device.
1327 *
1328 * Called from process context only (admin command and after_state_ch).
1329 */
1330static int _drbd_pause_after(struct drbd_conf *mdev)
1331{
1332	struct drbd_conf *odev;
1333	int i, rv = 0;
1334
1335	for (i = 0; i < minor_count; i++) {
1336		odev = minor_to_mdev(i);
1337		if (!odev)
1338			continue;
1339		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1340			continue;
1341		if (!_drbd_may_sync_now(odev))
1342			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1343			       != SS_NOTHING_TO_DO);
1344	}
1345
1346	return rv;
1347}
1348
1349/**
1350 * _drbd_resume_next() - Resume resync on all devices that may resync now
1351 * @mdev:	DRBD device.
1352 *
1353 * Called from process context only (admin command and worker).
1354 */
1355static int _drbd_resume_next(struct drbd_conf *mdev)
1356{
1357	struct drbd_conf *odev;
1358	int i, rv = 0;
1359
1360	for (i = 0; i < minor_count; i++) {
1361		odev = minor_to_mdev(i);
1362		if (!odev)
1363			continue;
1364		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1365			continue;
1366		if (odev->state.aftr_isp) {
1367			if (_drbd_may_sync_now(odev))
1368				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1369							CS_HARD, NULL)
1370				       != SS_NOTHING_TO_DO) ;
1371		}
1372	}
1373	return rv;
1374}
1375
1376void resume_next_sg(struct drbd_conf *mdev)
1377{
1378	write_lock_irq(&global_state_lock);
1379	_drbd_resume_next(mdev);
1380	write_unlock_irq(&global_state_lock);
1381}
1382
1383void suspend_other_sg(struct drbd_conf *mdev)
1384{
1385	write_lock_irq(&global_state_lock);
1386	_drbd_pause_after(mdev);
1387	write_unlock_irq(&global_state_lock);
1388}
1389
1390static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1391{
1392	struct drbd_conf *odev;
1393
1394	if (o_minor == -1)
1395		return NO_ERROR;
1396	if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1397		return ERR_SYNC_AFTER;
1398
1399	/* check for loops */
1400	odev = minor_to_mdev(o_minor);
1401	while (1) {
1402		if (odev == mdev)
1403			return ERR_SYNC_AFTER_CYCLE;
1404
1405		/* dependency chain ends here, no cycles. */
1406		if (odev->sync_conf.after == -1)
1407			return NO_ERROR;
1408
1409		/* follow the dependency chain */
1410		odev = minor_to_mdev(odev->sync_conf.after);
1411	}
1412}
1413
1414int drbd_alter_sa(struct drbd_conf *mdev, int na)
1415{
1416	int changes;
1417	int retcode;
1418
1419	write_lock_irq(&global_state_lock);
1420	retcode = sync_after_error(mdev, na);
1421	if (retcode == NO_ERROR) {
1422		mdev->sync_conf.after = na;
1423		do {
1424			changes  = _drbd_pause_after(mdev);
1425			changes |= _drbd_resume_next(mdev);
1426		} while (changes);
1427	}
1428	write_unlock_irq(&global_state_lock);
1429	return retcode;
1430}
1431
1432void drbd_rs_controller_reset(struct drbd_conf *mdev)
1433{
1434	atomic_set(&mdev->rs_sect_in, 0);
1435	atomic_set(&mdev->rs_sect_ev, 0);
1436	mdev->rs_in_flight = 0;
1437	mdev->rs_planed = 0;
1438	spin_lock(&mdev->peer_seq_lock);
1439	fifo_set(&mdev->rs_plan_s, 0);
1440	spin_unlock(&mdev->peer_seq_lock);
1441}
1442
1443void start_resync_timer_fn(unsigned long data)
1444{
1445	struct drbd_conf *mdev = (struct drbd_conf *) data;
1446
1447	drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work);
1448}
1449
1450int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1451{
1452	if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1453		dev_warn(DEV, "w_start_resync later...\n");
1454		mdev->start_resync_timer.expires = jiffies + HZ/10;
1455		add_timer(&mdev->start_resync_timer);
1456		return 1;
1457	}
1458
1459	drbd_start_resync(mdev, C_SYNC_SOURCE);
1460	clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
1461	return 1;
1462}
1463
1464/**
1465 * drbd_start_resync() - Start the resync process
1466 * @mdev:	DRBD device.
1467 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
1468 *
1469 * This function might bring you directly into one of the
1470 * C_PAUSED_SYNC_* states.
1471 */
1472void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1473{
1474	union drbd_state ns;
1475	int r;
1476
1477	if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1478		dev_err(DEV, "Resync already running!\n");
1479		return;
1480	}
1481
1482	if (mdev->state.conn < C_AHEAD) {
1483		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
1484		drbd_rs_cancel_all(mdev);
1485		/* This should be done when we abort the resync. We definitely do not
1486		   want to have this for connections going back and forth between
1487		   Ahead/Behind and SyncSource/SyncTarget */
1488	}
1489
1490	if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1491		if (side == C_SYNC_TARGET) {
1492			/* Since application IO was locked out during C_WF_BITMAP_T and
1493			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1494			   we check that we might make the data inconsistent. */
1495			r = drbd_khelper(mdev, "before-resync-target");
1496			r = (r >> 8) & 0xff;
1497			if (r > 0) {
1498				dev_info(DEV, "before-resync-target handler returned %d, "
1499					 "dropping connection.\n", r);
1500				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1501				return;
1502			}
1503		} else /* C_SYNC_SOURCE */ {
1504			r = drbd_khelper(mdev, "before-resync-source");
1505			r = (r >> 8) & 0xff;
1506			if (r > 0) {
1507				if (r == 3) {
1508					dev_info(DEV, "before-resync-source handler returned %d, "
1509						 "ignoring. Old userland tools?", r);
1510				} else {
1511					dev_info(DEV, "before-resync-source handler returned %d, "
1512						 "dropping connection.\n", r);
1513					drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1514					return;
1515				}
1516			}
1517		}
1518	}
1519
1520	if (current == mdev->tconn->worker.task) {
1521		/* The worker should not sleep waiting for drbd_state_lock(),
1522		   that can take long */
1523		if (test_and_set_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
1524			set_bit(B_RS_H_DONE, &mdev->flags);
1525			mdev->start_resync_timer.expires = jiffies + HZ/5;
1526			add_timer(&mdev->start_resync_timer);
1527			return;
1528		}
1529	} else {
1530		drbd_state_lock(mdev);
1531	}
1532	clear_bit(B_RS_H_DONE, &mdev->flags);
1533
1534	if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1535		drbd_state_unlock(mdev);
1536		return;
1537	}
1538
1539	write_lock_irq(&global_state_lock);
1540	ns = mdev->state;
1541
1542	ns.aftr_isp = !_drbd_may_sync_now(mdev);
1543
1544	ns.conn = side;
1545
1546	if (side == C_SYNC_TARGET)
1547		ns.disk = D_INCONSISTENT;
1548	else /* side == C_SYNC_SOURCE */
1549		ns.pdsk = D_INCONSISTENT;
1550
1551	r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1552	ns = mdev->state;
1553
1554	if (ns.conn < C_CONNECTED)
1555		r = SS_UNKNOWN_ERROR;
1556
1557	if (r == SS_SUCCESS) {
1558		unsigned long tw = drbd_bm_total_weight(mdev);
1559		unsigned long now = jiffies;
1560		int i;
1561
1562		mdev->rs_failed    = 0;
1563		mdev->rs_paused    = 0;
1564		mdev->rs_same_csum = 0;
1565		mdev->rs_last_events = 0;
1566		mdev->rs_last_sect_ev = 0;
1567		mdev->rs_total     = tw;
1568		mdev->rs_start     = now;
1569		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1570			mdev->rs_mark_left[i] = tw;
1571			mdev->rs_mark_time[i] = now;
1572		}
1573		_drbd_pause_after(mdev);
1574	}
1575	write_unlock_irq(&global_state_lock);
1576
1577	if (r == SS_SUCCESS) {
1578		dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1579		     drbd_conn_str(ns.conn),
1580		     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1581		     (unsigned long) mdev->rs_total);
1582		if (side == C_SYNC_TARGET)
1583			mdev->bm_resync_fo = 0;
1584
1585		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1586		 * with w_send_oos, or the sync target will get confused as to
1587		 * how much bits to resync.  We cannot do that always, because for an
1588		 * empty resync and protocol < 95, we need to do it here, as we call
1589		 * drbd_resync_finished from here in that case.
1590		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1591		 * and from after_state_ch otherwise. */
1592		if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1593			drbd_gen_and_send_sync_uuid(mdev);
1594
1595		if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1596			/* This still has a race (about when exactly the peers
1597			 * detect connection loss) that can lead to a full sync
1598			 * on next handshake. In 8.3.9 we fixed this with explicit
1599			 * resync-finished notifications, but the fix
1600			 * introduces a protocol change.  Sleeping for some
1601			 * time longer than the ping interval + timeout on the
1602			 * SyncSource, to give the SyncTarget the chance to
1603			 * detect connection loss, then waiting for a ping
1604			 * response (implicit in drbd_resync_finished) reduces
1605			 * the race considerably, but does not solve it. */
1606			if (side == C_SYNC_SOURCE)
1607				schedule_timeout_interruptible(
1608					mdev->tconn->net_conf->ping_int * HZ +
1609					mdev->tconn->net_conf->ping_timeo*HZ/9);
1610			drbd_resync_finished(mdev);
1611		}
1612
1613		drbd_rs_controller_reset(mdev);
1614		/* ns.conn may already be != mdev->state.conn,
1615		 * we may have been paused in between, or become paused until
1616		 * the timer triggers.
1617		 * No matter, that is handled in resync_timer_fn() */
1618		if (ns.conn == C_SYNC_TARGET)
1619			mod_timer(&mdev->resync_timer, jiffies);
1620
1621		drbd_md_sync(mdev);
1622	}
1623	put_ldev(mdev);
1624	drbd_state_unlock(mdev);
1625}
1626
1627int drbd_worker(struct drbd_thread *thi)
1628{
1629	struct drbd_conf *mdev = thi->mdev;
1630	struct drbd_work *w = NULL;
1631	LIST_HEAD(work_list);
1632	int intr = 0, i;
1633
1634	sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1635
1636	while (get_t_state(thi) == RUNNING) {
1637		drbd_thread_current_set_cpu(mdev, thi);
1638
1639		if (down_trylock(&mdev->tconn->data.work.s)) {
1640			mutex_lock(&mdev->tconn->data.mutex);
1641			if (mdev->tconn->data.socket && !mdev->tconn->net_conf->no_cork)
1642				drbd_tcp_uncork(mdev->tconn->data.socket);
1643			mutex_unlock(&mdev->tconn->data.mutex);
1644
1645			intr = down_interruptible(&mdev->tconn->data.work.s);
1646
1647			mutex_lock(&mdev->tconn->data.mutex);
1648			if (mdev->tconn->data.socket  && !mdev->tconn->net_conf->no_cork)
1649				drbd_tcp_cork(mdev->tconn->data.socket);
1650			mutex_unlock(&mdev->tconn->data.mutex);
1651		}
1652
1653		if (intr) {
1654			D_ASSERT(intr == -EINTR);
1655			flush_signals(current);
1656			if (!expect(get_t_state(thi) != RUNNING))
1657				continue;
1658			break;
1659		}
1660
1661		if (get_t_state(thi) != RUNNING)
1662			break;
1663		/* With this break, we have done a down() but not consumed
1664		   the entry from the list. The cleanup code takes care of
1665		   this...   */
1666
1667		w = NULL;
1668		spin_lock_irq(&mdev->tconn->data.work.q_lock);
1669		if (!expect(!list_empty(&mdev->tconn->data.work.q))) {
1670			/* something terribly wrong in our logic.
1671			 * we were able to down() the semaphore,
1672			 * but the list is empty... doh.
1673			 *
1674			 * what is the best thing to do now?
1675			 * try again from scratch, restarting the receiver,
1676			 * asender, whatnot? could break even more ugly,
1677			 * e.g. when we are primary, but no good local data.
1678			 *
1679			 * I'll try to get away just starting over this loop.
1680			 */
1681			spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1682			continue;
1683		}
1684		w = list_entry(mdev->tconn->data.work.q.next, struct drbd_work, list);
1685		list_del_init(&w->list);
1686		spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1687
1688		if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1689			/* dev_warn(DEV, "worker: a callback failed! \n"); */
1690			if (mdev->state.conn >= C_CONNECTED)
1691				drbd_force_state(mdev,
1692						NS(conn, C_NETWORK_FAILURE));
1693		}
1694	}
1695	D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1696	D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1697
1698	spin_lock_irq(&mdev->tconn->data.work.q_lock);
1699	i = 0;
1700	while (!list_empty(&mdev->tconn->data.work.q)) {
1701		list_splice_init(&mdev->tconn->data.work.q, &work_list);
1702		spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1703
1704		while (!list_empty(&work_list)) {
1705			w = list_entry(work_list.next, struct drbd_work, list);
1706			list_del_init(&w->list);
1707			w->cb(mdev, w, 1);
1708			i++; /* dead debugging code */
1709		}
1710
1711		spin_lock_irq(&mdev->tconn->data.work.q_lock);
1712	}
1713	sema_init(&mdev->tconn->data.work.s, 0);
1714	/* DANGEROUS race: if someone did queue his work within the spinlock,
1715	 * but up() ed outside the spinlock, we could get an up() on the
1716	 * semaphore without corresponding list entry.
1717	 * So don't do that.
1718	 */
1719	spin_unlock_irq(&mdev->tconn->data.work.q_lock);
1720
1721	D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1722	/* _drbd_set_state only uses stop_nowait.
1723	 * wait here for the exiting receiver. */
1724	drbd_thread_stop(&mdev->tconn->receiver);
1725	drbd_mdev_cleanup(mdev);
1726
1727	dev_info(DEV, "worker terminated\n");
1728
1729	clear_bit(DEVICE_DYING, &mdev->flags);
1730	clear_bit(CONFIG_PENDING, &mdev->flags);
1731	wake_up(&mdev->state_wait);
1732
1733	return 0;
1734}
1735