dm-raid1.c revision e4c8b3ba34cc1aeab451c7a5cc843c5fd62cbe3d
1/*
2 * Copyright (C) 2003 Sistina Software Limited.
3 *
4 * This file is released under the GPL.
5 */
6
7#include "dm.h"
8#include "dm-bio-list.h"
9#include "dm-io.h"
10#include "dm-log.h"
11#include "kcopyd.h"
12
13#include <linux/ctype.h>
14#include <linux/init.h>
15#include <linux/mempool.h>
16#include <linux/module.h>
17#include <linux/pagemap.h>
18#include <linux/slab.h>
19#include <linux/time.h>
20#include <linux/vmalloc.h>
21#include <linux/workqueue.h>
22
23static struct workqueue_struct *_kmirrord_wq;
24static struct work_struct _kmirrord_work;
25
26static inline void wake(void)
27{
28	queue_work(_kmirrord_wq, &_kmirrord_work);
29}
30
31/*-----------------------------------------------------------------
32 * Region hash
33 *
34 * The mirror splits itself up into discrete regions.  Each
35 * region can be in one of three states: clean, dirty,
36 * nosync.  There is no need to put clean regions in the hash.
37 *
38 * In addition to being present in the hash table a region _may_
39 * be present on one of three lists.
40 *
41 *   clean_regions: Regions on this list have no io pending to
42 *   them, they are in sync, we are no longer interested in them,
43 *   they are dull.  rh_update_states() will remove them from the
44 *   hash table.
45 *
46 *   quiesced_regions: These regions have been spun down, ready
47 *   for recovery.  rh_recovery_start() will remove regions from
48 *   this list and hand them to kmirrord, which will schedule the
49 *   recovery io with kcopyd.
50 *
51 *   recovered_regions: Regions that kcopyd has successfully
52 *   recovered.  rh_update_states() will now schedule any delayed
53 *   io, up the recovery_count, and remove the region from the
54 *   hash.
55 *
56 * There are 2 locks:
57 *   A rw spin lock 'hash_lock' protects just the hash table,
58 *   this is never held in write mode from interrupt context,
59 *   which I believe means that we only have to disable irqs when
60 *   doing a write lock.
61 *
62 *   An ordinary spin lock 'region_lock' that protects the three
63 *   lists in the region_hash, with the 'state', 'list' and
64 *   'bhs_delayed' fields of the regions.  This is used from irq
65 *   context, so all other uses will have to suspend local irqs.
66 *---------------------------------------------------------------*/
67struct mirror_set;
68struct region_hash {
69	struct mirror_set *ms;
70	uint32_t region_size;
71	unsigned region_shift;
72
73	/* holds persistent region state */
74	struct dirty_log *log;
75
76	/* hash table */
77	rwlock_t hash_lock;
78	mempool_t *region_pool;
79	unsigned int mask;
80	unsigned int nr_buckets;
81	struct list_head *buckets;
82
83	spinlock_t region_lock;
84	struct semaphore recovery_count;
85	struct list_head clean_regions;
86	struct list_head quiesced_regions;
87	struct list_head recovered_regions;
88};
89
90enum {
91	RH_CLEAN,
92	RH_DIRTY,
93	RH_NOSYNC,
94	RH_RECOVERING
95};
96
97struct region {
98	struct region_hash *rh;	/* FIXME: can we get rid of this ? */
99	region_t key;
100	int state;
101
102	struct list_head hash_list;
103	struct list_head list;
104
105	atomic_t pending;
106	struct bio_list delayed_bios;
107};
108
109
110/*-----------------------------------------------------------------
111 * Mirror set structures.
112 *---------------------------------------------------------------*/
113struct mirror {
114	atomic_t error_count;
115	struct dm_dev *dev;
116	sector_t offset;
117};
118
119struct mirror_set {
120	struct dm_target *ti;
121	struct list_head list;
122	struct region_hash rh;
123	struct kcopyd_client *kcopyd_client;
124
125	spinlock_t lock;	/* protects the next two lists */
126	struct bio_list reads;
127	struct bio_list writes;
128
129	/* recovery */
130	region_t nr_regions;
131	int in_sync;
132
133	struct mirror *default_mirror;	/* Default mirror */
134
135	unsigned int nr_mirrors;
136	struct mirror mirror[0];
137};
138
139/*
140 * Conversion fns
141 */
142static inline region_t bio_to_region(struct region_hash *rh, struct bio *bio)
143{
144	return (bio->bi_sector - rh->ms->ti->begin) >> rh->region_shift;
145}
146
147static inline sector_t region_to_sector(struct region_hash *rh, region_t region)
148{
149	return region << rh->region_shift;
150}
151
152/* FIXME move this */
153static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw);
154
155#define MIN_REGIONS 64
156#define MAX_RECOVERY 1
157static int rh_init(struct region_hash *rh, struct mirror_set *ms,
158		   struct dirty_log *log, uint32_t region_size,
159		   region_t nr_regions)
160{
161	unsigned int nr_buckets, max_buckets;
162	size_t i;
163
164	/*
165	 * Calculate a suitable number of buckets for our hash
166	 * table.
167	 */
168	max_buckets = nr_regions >> 6;
169	for (nr_buckets = 128u; nr_buckets < max_buckets; nr_buckets <<= 1)
170		;
171	nr_buckets >>= 1;
172
173	rh->ms = ms;
174	rh->log = log;
175	rh->region_size = region_size;
176	rh->region_shift = ffs(region_size) - 1;
177	rwlock_init(&rh->hash_lock);
178	rh->mask = nr_buckets - 1;
179	rh->nr_buckets = nr_buckets;
180
181	rh->buckets = vmalloc(nr_buckets * sizeof(*rh->buckets));
182	if (!rh->buckets) {
183		DMERR("unable to allocate region hash memory");
184		return -ENOMEM;
185	}
186
187	for (i = 0; i < nr_buckets; i++)
188		INIT_LIST_HEAD(rh->buckets + i);
189
190	spin_lock_init(&rh->region_lock);
191	sema_init(&rh->recovery_count, 0);
192	INIT_LIST_HEAD(&rh->clean_regions);
193	INIT_LIST_HEAD(&rh->quiesced_regions);
194	INIT_LIST_HEAD(&rh->recovered_regions);
195
196	rh->region_pool = mempool_create_kmalloc_pool(MIN_REGIONS,
197						      sizeof(struct region));
198	if (!rh->region_pool) {
199		vfree(rh->buckets);
200		rh->buckets = NULL;
201		return -ENOMEM;
202	}
203
204	return 0;
205}
206
207static void rh_exit(struct region_hash *rh)
208{
209	unsigned int h;
210	struct region *reg, *nreg;
211
212	BUG_ON(!list_empty(&rh->quiesced_regions));
213	for (h = 0; h < rh->nr_buckets; h++) {
214		list_for_each_entry_safe(reg, nreg, rh->buckets + h, hash_list) {
215			BUG_ON(atomic_read(&reg->pending));
216			mempool_free(reg, rh->region_pool);
217		}
218	}
219
220	if (rh->log)
221		dm_destroy_dirty_log(rh->log);
222	if (rh->region_pool)
223		mempool_destroy(rh->region_pool);
224	vfree(rh->buckets);
225}
226
227#define RH_HASH_MULT 2654435387U
228
229static inline unsigned int rh_hash(struct region_hash *rh, region_t region)
230{
231	return (unsigned int) ((region * RH_HASH_MULT) >> 12) & rh->mask;
232}
233
234static struct region *__rh_lookup(struct region_hash *rh, region_t region)
235{
236	struct region *reg;
237
238	list_for_each_entry (reg, rh->buckets + rh_hash(rh, region), hash_list)
239		if (reg->key == region)
240			return reg;
241
242	return NULL;
243}
244
245static void __rh_insert(struct region_hash *rh, struct region *reg)
246{
247	unsigned int h = rh_hash(rh, reg->key);
248	list_add(&reg->hash_list, rh->buckets + h);
249}
250
251static struct region *__rh_alloc(struct region_hash *rh, region_t region)
252{
253	struct region *reg, *nreg;
254
255	read_unlock(&rh->hash_lock);
256	nreg = mempool_alloc(rh->region_pool, GFP_NOIO);
257	nreg->state = rh->log->type->in_sync(rh->log, region, 1) ?
258		RH_CLEAN : RH_NOSYNC;
259	nreg->rh = rh;
260	nreg->key = region;
261
262	INIT_LIST_HEAD(&nreg->list);
263
264	atomic_set(&nreg->pending, 0);
265	bio_list_init(&nreg->delayed_bios);
266	write_lock_irq(&rh->hash_lock);
267
268	reg = __rh_lookup(rh, region);
269	if (reg)
270		/* we lost the race */
271		mempool_free(nreg, rh->region_pool);
272
273	else {
274		__rh_insert(rh, nreg);
275		if (nreg->state == RH_CLEAN) {
276			spin_lock(&rh->region_lock);
277			list_add(&nreg->list, &rh->clean_regions);
278			spin_unlock(&rh->region_lock);
279		}
280		reg = nreg;
281	}
282	write_unlock_irq(&rh->hash_lock);
283	read_lock(&rh->hash_lock);
284
285	return reg;
286}
287
288static inline struct region *__rh_find(struct region_hash *rh, region_t region)
289{
290	struct region *reg;
291
292	reg = __rh_lookup(rh, region);
293	if (!reg)
294		reg = __rh_alloc(rh, region);
295
296	return reg;
297}
298
299static int rh_state(struct region_hash *rh, region_t region, int may_block)
300{
301	int r;
302	struct region *reg;
303
304	read_lock(&rh->hash_lock);
305	reg = __rh_lookup(rh, region);
306	read_unlock(&rh->hash_lock);
307
308	if (reg)
309		return reg->state;
310
311	/*
312	 * The region wasn't in the hash, so we fall back to the
313	 * dirty log.
314	 */
315	r = rh->log->type->in_sync(rh->log, region, may_block);
316
317	/*
318	 * Any error from the dirty log (eg. -EWOULDBLOCK) gets
319	 * taken as a RH_NOSYNC
320	 */
321	return r == 1 ? RH_CLEAN : RH_NOSYNC;
322}
323
324static inline int rh_in_sync(struct region_hash *rh,
325			     region_t region, int may_block)
326{
327	int state = rh_state(rh, region, may_block);
328	return state == RH_CLEAN || state == RH_DIRTY;
329}
330
331static void dispatch_bios(struct mirror_set *ms, struct bio_list *bio_list)
332{
333	struct bio *bio;
334
335	while ((bio = bio_list_pop(bio_list))) {
336		queue_bio(ms, bio, WRITE);
337	}
338}
339
340static void rh_update_states(struct region_hash *rh)
341{
342	struct region *reg, *next;
343
344	LIST_HEAD(clean);
345	LIST_HEAD(recovered);
346
347	/*
348	 * Quickly grab the lists.
349	 */
350	write_lock_irq(&rh->hash_lock);
351	spin_lock(&rh->region_lock);
352	if (!list_empty(&rh->clean_regions)) {
353		list_splice(&rh->clean_regions, &clean);
354		INIT_LIST_HEAD(&rh->clean_regions);
355
356		list_for_each_entry (reg, &clean, list) {
357			rh->log->type->clear_region(rh->log, reg->key);
358			list_del(&reg->hash_list);
359		}
360	}
361
362	if (!list_empty(&rh->recovered_regions)) {
363		list_splice(&rh->recovered_regions, &recovered);
364		INIT_LIST_HEAD(&rh->recovered_regions);
365
366		list_for_each_entry (reg, &recovered, list)
367			list_del(&reg->hash_list);
368	}
369	spin_unlock(&rh->region_lock);
370	write_unlock_irq(&rh->hash_lock);
371
372	/*
373	 * All the regions on the recovered and clean lists have
374	 * now been pulled out of the system, so no need to do
375	 * any more locking.
376	 */
377	list_for_each_entry_safe (reg, next, &recovered, list) {
378		rh->log->type->clear_region(rh->log, reg->key);
379		rh->log->type->complete_resync_work(rh->log, reg->key, 1);
380		dispatch_bios(rh->ms, &reg->delayed_bios);
381		up(&rh->recovery_count);
382		mempool_free(reg, rh->region_pool);
383	}
384
385	if (!list_empty(&recovered))
386		rh->log->type->flush(rh->log);
387
388	list_for_each_entry_safe (reg, next, &clean, list)
389		mempool_free(reg, rh->region_pool);
390}
391
392static void rh_inc(struct region_hash *rh, region_t region)
393{
394	struct region *reg;
395
396	read_lock(&rh->hash_lock);
397	reg = __rh_find(rh, region);
398
399	spin_lock_irq(&rh->region_lock);
400	atomic_inc(&reg->pending);
401
402	if (reg->state == RH_CLEAN) {
403		reg->state = RH_DIRTY;
404		list_del_init(&reg->list);	/* take off the clean list */
405		spin_unlock_irq(&rh->region_lock);
406
407		rh->log->type->mark_region(rh->log, reg->key);
408	} else
409		spin_unlock_irq(&rh->region_lock);
410
411
412	read_unlock(&rh->hash_lock);
413}
414
415static void rh_inc_pending(struct region_hash *rh, struct bio_list *bios)
416{
417	struct bio *bio;
418
419	for (bio = bios->head; bio; bio = bio->bi_next)
420		rh_inc(rh, bio_to_region(rh, bio));
421}
422
423static void rh_dec(struct region_hash *rh, region_t region)
424{
425	unsigned long flags;
426	struct region *reg;
427	int should_wake = 0;
428
429	read_lock(&rh->hash_lock);
430	reg = __rh_lookup(rh, region);
431	read_unlock(&rh->hash_lock);
432
433	spin_lock_irqsave(&rh->region_lock, flags);
434	if (atomic_dec_and_test(&reg->pending)) {
435		/*
436		 * There is no pending I/O for this region.
437		 * We can move the region to corresponding list for next action.
438		 * At this point, the region is not yet connected to any list.
439		 *
440		 * If the state is RH_NOSYNC, the region should be kept off
441		 * from clean list.
442		 * The hash entry for RH_NOSYNC will remain in memory
443		 * until the region is recovered or the map is reloaded.
444		 */
445
446		/* do nothing for RH_NOSYNC */
447		if (reg->state == RH_RECOVERING) {
448			list_add_tail(&reg->list, &rh->quiesced_regions);
449		} else if (reg->state == RH_DIRTY) {
450			reg->state = RH_CLEAN;
451			list_add(&reg->list, &rh->clean_regions);
452		}
453		should_wake = 1;
454	}
455	spin_unlock_irqrestore(&rh->region_lock, flags);
456
457	if (should_wake)
458		wake();
459}
460
461/*
462 * Starts quiescing a region in preparation for recovery.
463 */
464static int __rh_recovery_prepare(struct region_hash *rh)
465{
466	int r;
467	struct region *reg;
468	region_t region;
469
470	/*
471	 * Ask the dirty log what's next.
472	 */
473	r = rh->log->type->get_resync_work(rh->log, &region);
474	if (r <= 0)
475		return r;
476
477	/*
478	 * Get this region, and start it quiescing by setting the
479	 * recovering flag.
480	 */
481	read_lock(&rh->hash_lock);
482	reg = __rh_find(rh, region);
483	read_unlock(&rh->hash_lock);
484
485	spin_lock_irq(&rh->region_lock);
486	reg->state = RH_RECOVERING;
487
488	/* Already quiesced ? */
489	if (atomic_read(&reg->pending))
490		list_del_init(&reg->list);
491	else
492		list_move(&reg->list, &rh->quiesced_regions);
493
494	spin_unlock_irq(&rh->region_lock);
495
496	return 1;
497}
498
499static void rh_recovery_prepare(struct region_hash *rh)
500{
501	while (!down_trylock(&rh->recovery_count))
502		if (__rh_recovery_prepare(rh) <= 0) {
503			up(&rh->recovery_count);
504			break;
505		}
506}
507
508/*
509 * Returns any quiesced regions.
510 */
511static struct region *rh_recovery_start(struct region_hash *rh)
512{
513	struct region *reg = NULL;
514
515	spin_lock_irq(&rh->region_lock);
516	if (!list_empty(&rh->quiesced_regions)) {
517		reg = list_entry(rh->quiesced_regions.next,
518				 struct region, list);
519		list_del_init(&reg->list);	/* remove from the quiesced list */
520	}
521	spin_unlock_irq(&rh->region_lock);
522
523	return reg;
524}
525
526/* FIXME: success ignored for now */
527static void rh_recovery_end(struct region *reg, int success)
528{
529	struct region_hash *rh = reg->rh;
530
531	spin_lock_irq(&rh->region_lock);
532	list_add(&reg->list, &reg->rh->recovered_regions);
533	spin_unlock_irq(&rh->region_lock);
534
535	wake();
536}
537
538static void rh_flush(struct region_hash *rh)
539{
540	rh->log->type->flush(rh->log);
541}
542
543static void rh_delay(struct region_hash *rh, struct bio *bio)
544{
545	struct region *reg;
546
547	read_lock(&rh->hash_lock);
548	reg = __rh_find(rh, bio_to_region(rh, bio));
549	bio_list_add(&reg->delayed_bios, bio);
550	read_unlock(&rh->hash_lock);
551}
552
553static void rh_stop_recovery(struct region_hash *rh)
554{
555	int i;
556
557	/* wait for any recovering regions */
558	for (i = 0; i < MAX_RECOVERY; i++)
559		down(&rh->recovery_count);
560}
561
562static void rh_start_recovery(struct region_hash *rh)
563{
564	int i;
565
566	for (i = 0; i < MAX_RECOVERY; i++)
567		up(&rh->recovery_count);
568
569	wake();
570}
571
572/*
573 * Every mirror should look like this one.
574 */
575#define DEFAULT_MIRROR 0
576
577/*
578 * This is yucky.  We squirrel the mirror_set struct away inside
579 * bi_next for write buffers.  This is safe since the bh
580 * doesn't get submitted to the lower levels of block layer.
581 */
582static struct mirror_set *bio_get_ms(struct bio *bio)
583{
584	return (struct mirror_set *) bio->bi_next;
585}
586
587static void bio_set_ms(struct bio *bio, struct mirror_set *ms)
588{
589	bio->bi_next = (struct bio *) ms;
590}
591
592/*-----------------------------------------------------------------
593 * Recovery.
594 *
595 * When a mirror is first activated we may find that some regions
596 * are in the no-sync state.  We have to recover these by
597 * recopying from the default mirror to all the others.
598 *---------------------------------------------------------------*/
599static void recovery_complete(int read_err, unsigned int write_err,
600			      void *context)
601{
602	struct region *reg = (struct region *) context;
603
604	/* FIXME: better error handling */
605	rh_recovery_end(reg, read_err || write_err);
606}
607
608static int recover(struct mirror_set *ms, struct region *reg)
609{
610	int r;
611	unsigned int i;
612	struct io_region from, to[KCOPYD_MAX_REGIONS], *dest;
613	struct mirror *m;
614	unsigned long flags = 0;
615
616	/* fill in the source */
617	m = ms->default_mirror;
618	from.bdev = m->dev->bdev;
619	from.sector = m->offset + region_to_sector(reg->rh, reg->key);
620	if (reg->key == (ms->nr_regions - 1)) {
621		/*
622		 * The final region may be smaller than
623		 * region_size.
624		 */
625		from.count = ms->ti->len & (reg->rh->region_size - 1);
626		if (!from.count)
627			from.count = reg->rh->region_size;
628	} else
629		from.count = reg->rh->region_size;
630
631	/* fill in the destinations */
632	for (i = 0, dest = to; i < ms->nr_mirrors; i++) {
633		if (&ms->mirror[i] == ms->default_mirror)
634			continue;
635
636		m = ms->mirror + i;
637		dest->bdev = m->dev->bdev;
638		dest->sector = m->offset + region_to_sector(reg->rh, reg->key);
639		dest->count = from.count;
640		dest++;
641	}
642
643	/* hand to kcopyd */
644	set_bit(KCOPYD_IGNORE_ERROR, &flags);
645	r = kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to, flags,
646			recovery_complete, reg);
647
648	return r;
649}
650
651static void do_recovery(struct mirror_set *ms)
652{
653	int r;
654	struct region *reg;
655	struct dirty_log *log = ms->rh.log;
656
657	/*
658	 * Start quiescing some regions.
659	 */
660	rh_recovery_prepare(&ms->rh);
661
662	/*
663	 * Copy any already quiesced regions.
664	 */
665	while ((reg = rh_recovery_start(&ms->rh))) {
666		r = recover(ms, reg);
667		if (r)
668			rh_recovery_end(reg, 0);
669	}
670
671	/*
672	 * Update the in sync flag.
673	 */
674	if (!ms->in_sync &&
675	    (log->type->get_sync_count(log) == ms->nr_regions)) {
676		/* the sync is complete */
677		dm_table_event(ms->ti->table);
678		ms->in_sync = 1;
679	}
680}
681
682/*-----------------------------------------------------------------
683 * Reads
684 *---------------------------------------------------------------*/
685static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector)
686{
687	/* FIXME: add read balancing */
688	return ms->default_mirror;
689}
690
691/*
692 * remap a buffer to a particular mirror.
693 */
694static void map_bio(struct mirror_set *ms, struct mirror *m, struct bio *bio)
695{
696	bio->bi_bdev = m->dev->bdev;
697	bio->bi_sector = m->offset + (bio->bi_sector - ms->ti->begin);
698}
699
700static void do_reads(struct mirror_set *ms, struct bio_list *reads)
701{
702	region_t region;
703	struct bio *bio;
704	struct mirror *m;
705
706	while ((bio = bio_list_pop(reads))) {
707		region = bio_to_region(&ms->rh, bio);
708
709		/*
710		 * We can only read balance if the region is in sync.
711		 */
712		if (rh_in_sync(&ms->rh, region, 0))
713			m = choose_mirror(ms, bio->bi_sector);
714		else
715			m = ms->default_mirror;
716
717		map_bio(ms, m, bio);
718		generic_make_request(bio);
719	}
720}
721
722/*-----------------------------------------------------------------
723 * Writes.
724 *
725 * We do different things with the write io depending on the
726 * state of the region that it's in:
727 *
728 * SYNC: 	increment pending, use kcopyd to write to *all* mirrors
729 * RECOVERING:	delay the io until recovery completes
730 * NOSYNC:	increment pending, just write to the default mirror
731 *---------------------------------------------------------------*/
732static void write_callback(unsigned long error, void *context)
733{
734	unsigned int i;
735	int uptodate = 1;
736	struct bio *bio = (struct bio *) context;
737	struct mirror_set *ms;
738
739	ms = bio_get_ms(bio);
740	bio_set_ms(bio, NULL);
741
742	/*
743	 * NOTE: We don't decrement the pending count here,
744	 * instead it is done by the targets endio function.
745	 * This way we handle both writes to SYNC and NOSYNC
746	 * regions with the same code.
747	 */
748
749	if (error) {
750		/*
751		 * only error the io if all mirrors failed.
752		 * FIXME: bogus
753		 */
754		uptodate = 0;
755		for (i = 0; i < ms->nr_mirrors; i++)
756			if (!test_bit(i, &error)) {
757				uptodate = 1;
758				break;
759			}
760	}
761	bio_endio(bio, bio->bi_size, 0);
762}
763
764static void do_write(struct mirror_set *ms, struct bio *bio)
765{
766	unsigned int i;
767	struct io_region io[KCOPYD_MAX_REGIONS+1];
768	struct mirror *m;
769
770	for (i = 0; i < ms->nr_mirrors; i++) {
771		m = ms->mirror + i;
772
773		io[i].bdev = m->dev->bdev;
774		io[i].sector = m->offset + (bio->bi_sector - ms->ti->begin);
775		io[i].count = bio->bi_size >> 9;
776	}
777
778	bio_set_ms(bio, ms);
779	dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
780			 bio->bi_io_vec + bio->bi_idx,
781			 write_callback, bio);
782}
783
784static void do_writes(struct mirror_set *ms, struct bio_list *writes)
785{
786	int state;
787	struct bio *bio;
788	struct bio_list sync, nosync, recover, *this_list = NULL;
789
790	if (!writes->head)
791		return;
792
793	/*
794	 * Classify each write.
795	 */
796	bio_list_init(&sync);
797	bio_list_init(&nosync);
798	bio_list_init(&recover);
799
800	while ((bio = bio_list_pop(writes))) {
801		state = rh_state(&ms->rh, bio_to_region(&ms->rh, bio), 1);
802		switch (state) {
803		case RH_CLEAN:
804		case RH_DIRTY:
805			this_list = &sync;
806			break;
807
808		case RH_NOSYNC:
809			this_list = &nosync;
810			break;
811
812		case RH_RECOVERING:
813			this_list = &recover;
814			break;
815		}
816
817		bio_list_add(this_list, bio);
818	}
819
820	/*
821	 * Increment the pending counts for any regions that will
822	 * be written to (writes to recover regions are going to
823	 * be delayed).
824	 */
825	rh_inc_pending(&ms->rh, &sync);
826	rh_inc_pending(&ms->rh, &nosync);
827	rh_flush(&ms->rh);
828
829	/*
830	 * Dispatch io.
831	 */
832	while ((bio = bio_list_pop(&sync)))
833		do_write(ms, bio);
834
835	while ((bio = bio_list_pop(&recover)))
836		rh_delay(&ms->rh, bio);
837
838	while ((bio = bio_list_pop(&nosync))) {
839		map_bio(ms, ms->default_mirror, bio);
840		generic_make_request(bio);
841	}
842}
843
844/*-----------------------------------------------------------------
845 * kmirrord
846 *---------------------------------------------------------------*/
847static LIST_HEAD(_mirror_sets);
848static DECLARE_RWSEM(_mirror_sets_lock);
849
850static void do_mirror(struct mirror_set *ms)
851{
852	struct bio_list reads, writes;
853
854	spin_lock(&ms->lock);
855	reads = ms->reads;
856	writes = ms->writes;
857	bio_list_init(&ms->reads);
858	bio_list_init(&ms->writes);
859	spin_unlock(&ms->lock);
860
861	rh_update_states(&ms->rh);
862	do_recovery(ms);
863	do_reads(ms, &reads);
864	do_writes(ms, &writes);
865}
866
867static void do_work(void *ignored)
868{
869	struct mirror_set *ms;
870
871	down_read(&_mirror_sets_lock);
872	list_for_each_entry (ms, &_mirror_sets, list)
873		do_mirror(ms);
874	up_read(&_mirror_sets_lock);
875}
876
877/*-----------------------------------------------------------------
878 * Target functions
879 *---------------------------------------------------------------*/
880static struct mirror_set *alloc_context(unsigned int nr_mirrors,
881					uint32_t region_size,
882					struct dm_target *ti,
883					struct dirty_log *dl)
884{
885	size_t len;
886	struct mirror_set *ms = NULL;
887
888	if (array_too_big(sizeof(*ms), sizeof(ms->mirror[0]), nr_mirrors))
889		return NULL;
890
891	len = sizeof(*ms) + (sizeof(ms->mirror[0]) * nr_mirrors);
892
893	ms = kmalloc(len, GFP_KERNEL);
894	if (!ms) {
895		ti->error = "dm-mirror: Cannot allocate mirror context";
896		return NULL;
897	}
898
899	memset(ms, 0, len);
900	spin_lock_init(&ms->lock);
901
902	ms->ti = ti;
903	ms->nr_mirrors = nr_mirrors;
904	ms->nr_regions = dm_sector_div_up(ti->len, region_size);
905	ms->in_sync = 0;
906	ms->default_mirror = &ms->mirror[DEFAULT_MIRROR];
907
908	if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) {
909		ti->error = "dm-mirror: Error creating dirty region hash";
910		kfree(ms);
911		return NULL;
912	}
913
914	return ms;
915}
916
917static void free_context(struct mirror_set *ms, struct dm_target *ti,
918			 unsigned int m)
919{
920	while (m--)
921		dm_put_device(ti, ms->mirror[m].dev);
922
923	rh_exit(&ms->rh);
924	kfree(ms);
925}
926
927static inline int _check_region_size(struct dm_target *ti, uint32_t size)
928{
929	return !(size % (PAGE_SIZE >> 9) || (size & (size - 1)) ||
930		 size > ti->len);
931}
932
933static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
934		      unsigned int mirror, char **argv)
935{
936	unsigned long long offset;
937
938	if (sscanf(argv[1], "%llu", &offset) != 1) {
939		ti->error = "dm-mirror: Invalid offset";
940		return -EINVAL;
941	}
942
943	if (dm_get_device(ti, argv[0], offset, ti->len,
944			  dm_table_get_mode(ti->table),
945			  &ms->mirror[mirror].dev)) {
946		ti->error = "dm-mirror: Device lookup failure";
947		return -ENXIO;
948	}
949
950	ms->mirror[mirror].offset = offset;
951
952	return 0;
953}
954
955static int add_mirror_set(struct mirror_set *ms)
956{
957	down_write(&_mirror_sets_lock);
958	list_add_tail(&ms->list, &_mirror_sets);
959	up_write(&_mirror_sets_lock);
960	wake();
961
962	return 0;
963}
964
965static void del_mirror_set(struct mirror_set *ms)
966{
967	down_write(&_mirror_sets_lock);
968	list_del(&ms->list);
969	up_write(&_mirror_sets_lock);
970}
971
972/*
973 * Create dirty log: log_type #log_params <log_params>
974 */
975static struct dirty_log *create_dirty_log(struct dm_target *ti,
976					  unsigned int argc, char **argv,
977					  unsigned int *args_used)
978{
979	unsigned int param_count;
980	struct dirty_log *dl;
981
982	if (argc < 2) {
983		ti->error = "dm-mirror: Insufficient mirror log arguments";
984		return NULL;
985	}
986
987	if (sscanf(argv[1], "%u", &param_count) != 1) {
988		ti->error = "dm-mirror: Invalid mirror log argument count";
989		return NULL;
990	}
991
992	*args_used = 2 + param_count;
993
994	if (argc < *args_used) {
995		ti->error = "dm-mirror: Insufficient mirror log arguments";
996		return NULL;
997	}
998
999	dl = dm_create_dirty_log(argv[0], ti, param_count, argv + 2);
1000	if (!dl) {
1001		ti->error = "dm-mirror: Error creating mirror dirty log";
1002		return NULL;
1003	}
1004
1005	if (!_check_region_size(ti, dl->type->get_region_size(dl))) {
1006		ti->error = "dm-mirror: Invalid region size";
1007		dm_destroy_dirty_log(dl);
1008		return NULL;
1009	}
1010
1011	return dl;
1012}
1013
1014/*
1015 * Construct a mirror mapping:
1016 *
1017 * log_type #log_params <log_params>
1018 * #mirrors [mirror_path offset]{2,}
1019 *
1020 * log_type is "core" or "disk"
1021 * #log_params is between 1 and 3
1022 */
1023#define DM_IO_PAGES 64
1024static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1025{
1026	int r;
1027	unsigned int nr_mirrors, m, args_used;
1028	struct mirror_set *ms;
1029	struct dirty_log *dl;
1030
1031	dl = create_dirty_log(ti, argc, argv, &args_used);
1032	if (!dl)
1033		return -EINVAL;
1034
1035	argv += args_used;
1036	argc -= args_used;
1037
1038	if (!argc || sscanf(argv[0], "%u", &nr_mirrors) != 1 ||
1039	    nr_mirrors < 2 || nr_mirrors > KCOPYD_MAX_REGIONS + 1) {
1040		ti->error = "dm-mirror: Invalid number of mirrors";
1041		dm_destroy_dirty_log(dl);
1042		return -EINVAL;
1043	}
1044
1045	argv++, argc--;
1046
1047	if (argc != nr_mirrors * 2) {
1048		ti->error = "dm-mirror: Wrong number of mirror arguments";
1049		dm_destroy_dirty_log(dl);
1050		return -EINVAL;
1051	}
1052
1053	ms = alloc_context(nr_mirrors, dl->type->get_region_size(dl), ti, dl);
1054	if (!ms) {
1055		dm_destroy_dirty_log(dl);
1056		return -ENOMEM;
1057	}
1058
1059	/* Get the mirror parameter sets */
1060	for (m = 0; m < nr_mirrors; m++) {
1061		r = get_mirror(ms, ti, m, argv);
1062		if (r) {
1063			free_context(ms, ti, m);
1064			return r;
1065		}
1066		argv += 2;
1067		argc -= 2;
1068	}
1069
1070	ti->private = ms;
1071 	ti->split_io = ms->rh.region_size;
1072
1073	r = kcopyd_client_create(DM_IO_PAGES, &ms->kcopyd_client);
1074	if (r) {
1075		free_context(ms, ti, ms->nr_mirrors);
1076		return r;
1077	}
1078
1079	add_mirror_set(ms);
1080	return 0;
1081}
1082
1083static void mirror_dtr(struct dm_target *ti)
1084{
1085	struct mirror_set *ms = (struct mirror_set *) ti->private;
1086
1087	del_mirror_set(ms);
1088	kcopyd_client_destroy(ms->kcopyd_client);
1089	free_context(ms, ti, ms->nr_mirrors);
1090}
1091
1092static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
1093{
1094	int should_wake = 0;
1095	struct bio_list *bl;
1096
1097	bl = (rw == WRITE) ? &ms->writes : &ms->reads;
1098	spin_lock(&ms->lock);
1099	should_wake = !(bl->head);
1100	bio_list_add(bl, bio);
1101	spin_unlock(&ms->lock);
1102
1103	if (should_wake)
1104		wake();
1105}
1106
1107/*
1108 * Mirror mapping function
1109 */
1110static int mirror_map(struct dm_target *ti, struct bio *bio,
1111		      union map_info *map_context)
1112{
1113	int r, rw = bio_rw(bio);
1114	struct mirror *m;
1115	struct mirror_set *ms = ti->private;
1116
1117	map_context->ll = bio_to_region(&ms->rh, bio);
1118
1119	if (rw == WRITE) {
1120		queue_bio(ms, bio, rw);
1121		return 0;
1122	}
1123
1124	r = ms->rh.log->type->in_sync(ms->rh.log,
1125				      bio_to_region(&ms->rh, bio), 0);
1126	if (r < 0 && r != -EWOULDBLOCK)
1127		return r;
1128
1129	if (r == -EWOULDBLOCK)	/* FIXME: ugly */
1130		r = 0;
1131
1132	/*
1133	 * We don't want to fast track a recovery just for a read
1134	 * ahead.  So we just let it silently fail.
1135	 * FIXME: get rid of this.
1136	 */
1137	if (!r && rw == READA)
1138		return -EIO;
1139
1140	if (!r) {
1141		/* Pass this io over to the daemon */
1142		queue_bio(ms, bio, rw);
1143		return 0;
1144	}
1145
1146	m = choose_mirror(ms, bio->bi_sector);
1147	if (!m)
1148		return -EIO;
1149
1150	map_bio(ms, m, bio);
1151	return 1;
1152}
1153
1154static int mirror_end_io(struct dm_target *ti, struct bio *bio,
1155			 int error, union map_info *map_context)
1156{
1157	int rw = bio_rw(bio);
1158	struct mirror_set *ms = (struct mirror_set *) ti->private;
1159	region_t region = map_context->ll;
1160
1161	/*
1162	 * We need to dec pending if this was a write.
1163	 */
1164	if (rw == WRITE)
1165		rh_dec(&ms->rh, region);
1166
1167	return 0;
1168}
1169
1170static void mirror_postsuspend(struct dm_target *ti)
1171{
1172	struct mirror_set *ms = (struct mirror_set *) ti->private;
1173	struct dirty_log *log = ms->rh.log;
1174
1175	rh_stop_recovery(&ms->rh);
1176	if (log->type->suspend && log->type->suspend(log))
1177		/* FIXME: need better error handling */
1178		DMWARN("log suspend failed");
1179}
1180
1181static void mirror_resume(struct dm_target *ti)
1182{
1183	struct mirror_set *ms = (struct mirror_set *) ti->private;
1184	struct dirty_log *log = ms->rh.log;
1185	if (log->type->resume && log->type->resume(log))
1186		/* FIXME: need better error handling */
1187		DMWARN("log resume failed");
1188	rh_start_recovery(&ms->rh);
1189}
1190
1191static int mirror_status(struct dm_target *ti, status_type_t type,
1192			 char *result, unsigned int maxlen)
1193{
1194	unsigned int m, sz;
1195	struct mirror_set *ms = (struct mirror_set *) ti->private;
1196
1197	sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen);
1198
1199	switch (type) {
1200	case STATUSTYPE_INFO:
1201		DMEMIT("%d ", ms->nr_mirrors);
1202		for (m = 0; m < ms->nr_mirrors; m++)
1203			DMEMIT("%s ", ms->mirror[m].dev->name);
1204
1205		DMEMIT("%llu/%llu",
1206			(unsigned long long)ms->rh.log->type->
1207				get_sync_count(ms->rh.log),
1208			(unsigned long long)ms->nr_regions);
1209		break;
1210
1211	case STATUSTYPE_TABLE:
1212		DMEMIT("%d ", ms->nr_mirrors);
1213		for (m = 0; m < ms->nr_mirrors; m++)
1214			DMEMIT("%s %llu ", ms->mirror[m].dev->name,
1215				(unsigned long long)ms->mirror[m].offset);
1216	}
1217
1218	return 0;
1219}
1220
1221static struct target_type mirror_target = {
1222	.name	 = "mirror",
1223	.version = {1, 0, 1},
1224	.module	 = THIS_MODULE,
1225	.ctr	 = mirror_ctr,
1226	.dtr	 = mirror_dtr,
1227	.map	 = mirror_map,
1228	.end_io	 = mirror_end_io,
1229	.postsuspend = mirror_postsuspend,
1230	.resume	 = mirror_resume,
1231	.status	 = mirror_status,
1232};
1233
1234static int __init dm_mirror_init(void)
1235{
1236	int r;
1237
1238	r = dm_dirty_log_init();
1239	if (r)
1240		return r;
1241
1242	_kmirrord_wq = create_singlethread_workqueue("kmirrord");
1243	if (!_kmirrord_wq) {
1244		DMERR("couldn't start kmirrord");
1245		dm_dirty_log_exit();
1246		return r;
1247	}
1248	INIT_WORK(&_kmirrord_work, do_work, NULL);
1249
1250	r = dm_register_target(&mirror_target);
1251	if (r < 0) {
1252		DMERR("%s: Failed to register mirror target",
1253		      mirror_target.name);
1254		dm_dirty_log_exit();
1255		destroy_workqueue(_kmirrord_wq);
1256	}
1257
1258	return r;
1259}
1260
1261static void __exit dm_mirror_exit(void)
1262{
1263	int r;
1264
1265	r = dm_unregister_target(&mirror_target);
1266	if (r < 0)
1267		DMERR("%s: unregister failed %d", mirror_target.name, r);
1268
1269	destroy_workqueue(_kmirrord_wq);
1270	dm_dirty_log_exit();
1271}
1272
1273/* Module hooks */
1274module_init(dm_mirror_init);
1275module_exit(dm_mirror_exit);
1276
1277MODULE_DESCRIPTION(DM_NAME " mirror target");
1278MODULE_AUTHOR("Joe Thornber");
1279MODULE_LICENSE("GPL");
1280