dm-raid1.c revision 1f965b19437017cea6d3f3f46acdc5acae5fd011
1/*
2 * Copyright (C) 2003 Sistina Software Limited.
3 * Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved.
4 *
5 * This file is released under the GPL.
6 */
7
8#include "dm-bio-list.h"
9#include "dm-bio-record.h"
10
11#include <linux/init.h>
12#include <linux/mempool.h>
13#include <linux/module.h>
14#include <linux/pagemap.h>
15#include <linux/slab.h>
16#include <linux/workqueue.h>
17#include <linux/device-mapper.h>
18#include <linux/dm-io.h>
19#include <linux/dm-dirty-log.h>
20#include <linux/dm-kcopyd.h>
21#include <linux/dm-region-hash.h>
22
23#define DM_MSG_PREFIX "raid1"
24
25#define MAX_RECOVERY 1	/* Maximum number of regions recovered in parallel. */
26#define DM_IO_PAGES 64
27#define DM_KCOPYD_PAGES 64
28
29#define DM_RAID1_HANDLE_ERRORS 0x01
30#define errors_handled(p)	((p)->features & DM_RAID1_HANDLE_ERRORS)
31
32static DECLARE_WAIT_QUEUE_HEAD(_kmirrord_recovery_stopped);
33
34/*-----------------------------------------------------------------
35 * Mirror set structures.
36 *---------------------------------------------------------------*/
37enum dm_raid1_error {
38	DM_RAID1_WRITE_ERROR,
39	DM_RAID1_SYNC_ERROR,
40	DM_RAID1_READ_ERROR
41};
42
43struct mirror {
44	struct mirror_set *ms;
45	atomic_t error_count;
46	unsigned long error_type;
47	struct dm_dev *dev;
48	sector_t offset;
49};
50
51struct mirror_set {
52	struct dm_target *ti;
53	struct list_head list;
54
55	uint64_t features;
56
57	spinlock_t lock;	/* protects the lists */
58	struct bio_list reads;
59	struct bio_list writes;
60	struct bio_list failures;
61
62	struct dm_region_hash *rh;
63	struct dm_kcopyd_client *kcopyd_client;
64	struct dm_io_client *io_client;
65	mempool_t *read_record_pool;
66
67	/* recovery */
68	region_t nr_regions;
69	int in_sync;
70	int log_failure;
71	atomic_t suspend;
72
73	atomic_t default_mirror;	/* Default mirror */
74
75	struct workqueue_struct *kmirrord_wq;
76	struct work_struct kmirrord_work;
77	struct timer_list timer;
78	unsigned long timer_pending;
79
80	struct work_struct trigger_event;
81
82	unsigned nr_mirrors;
83	struct mirror mirror[0];
84};
85
86static void wakeup_mirrord(void *context)
87{
88	struct mirror_set *ms = context;
89
90	queue_work(ms->kmirrord_wq, &ms->kmirrord_work);
91}
92
93static void delayed_wake_fn(unsigned long data)
94{
95	struct mirror_set *ms = (struct mirror_set *) data;
96
97	clear_bit(0, &ms->timer_pending);
98	wakeup_mirrord(ms);
99}
100
101static void delayed_wake(struct mirror_set *ms)
102{
103	if (test_and_set_bit(0, &ms->timer_pending))
104		return;
105
106	ms->timer.expires = jiffies + HZ / 5;
107	ms->timer.data = (unsigned long) ms;
108	ms->timer.function = delayed_wake_fn;
109	add_timer(&ms->timer);
110}
111
112static void wakeup_all_recovery_waiters(void *context)
113{
114	wake_up_all(&_kmirrord_recovery_stopped);
115}
116
117static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
118{
119	unsigned long flags;
120	int should_wake = 0;
121	struct bio_list *bl;
122
123	bl = (rw == WRITE) ? &ms->writes : &ms->reads;
124	spin_lock_irqsave(&ms->lock, flags);
125	should_wake = !(bl->head);
126	bio_list_add(bl, bio);
127	spin_unlock_irqrestore(&ms->lock, flags);
128
129	if (should_wake)
130		wakeup_mirrord(ms);
131}
132
133static void dispatch_bios(void *context, struct bio_list *bio_list)
134{
135	struct mirror_set *ms = context;
136	struct bio *bio;
137
138	while ((bio = bio_list_pop(bio_list)))
139		queue_bio(ms, bio, WRITE);
140}
141
142#define MIN_READ_RECORDS 20
143struct dm_raid1_read_record {
144	struct mirror *m;
145	struct dm_bio_details details;
146};
147
148/*
149 * Every mirror should look like this one.
150 */
151#define DEFAULT_MIRROR 0
152
153/*
154 * This is yucky.  We squirrel the mirror struct away inside
155 * bi_next for read/write buffers.  This is safe since the bh
156 * doesn't get submitted to the lower levels of block layer.
157 */
158static struct mirror *bio_get_m(struct bio *bio)
159{
160	return (struct mirror *) bio->bi_next;
161}
162
163static void bio_set_m(struct bio *bio, struct mirror *m)
164{
165	bio->bi_next = (struct bio *) m;
166}
167
168static struct mirror *get_default_mirror(struct mirror_set *ms)
169{
170	return &ms->mirror[atomic_read(&ms->default_mirror)];
171}
172
173static void set_default_mirror(struct mirror *m)
174{
175	struct mirror_set *ms = m->ms;
176	struct mirror *m0 = &(ms->mirror[0]);
177
178	atomic_set(&ms->default_mirror, m - m0);
179}
180
181/* fail_mirror
182 * @m: mirror device to fail
183 * @error_type: one of the enum's, DM_RAID1_*_ERROR
184 *
185 * If errors are being handled, record the type of
186 * error encountered for this device.  If this type
187 * of error has already been recorded, we can return;
188 * otherwise, we must signal userspace by triggering
189 * an event.  Additionally, if the device is the
190 * primary device, we must choose a new primary, but
191 * only if the mirror is in-sync.
192 *
193 * This function must not block.
194 */
195static void fail_mirror(struct mirror *m, enum dm_raid1_error error_type)
196{
197	struct mirror_set *ms = m->ms;
198	struct mirror *new;
199
200	if (!errors_handled(ms))
201		return;
202
203	/*
204	 * error_count is used for nothing more than a
205	 * simple way to tell if a device has encountered
206	 * errors.
207	 */
208	atomic_inc(&m->error_count);
209
210	if (test_and_set_bit(error_type, &m->error_type))
211		return;
212
213	if (m != get_default_mirror(ms))
214		goto out;
215
216	if (!ms->in_sync) {
217		/*
218		 * Better to issue requests to same failing device
219		 * than to risk returning corrupt data.
220		 */
221		DMERR("Primary mirror (%s) failed while out-of-sync: "
222		      "Reads may fail.", m->dev->name);
223		goto out;
224	}
225
226	for (new = ms->mirror; new < ms->mirror + ms->nr_mirrors; new++)
227		if (!atomic_read(&new->error_count)) {
228			set_default_mirror(new);
229			break;
230		}
231
232	if (unlikely(new == ms->mirror + ms->nr_mirrors))
233		DMWARN("All sides of mirror have failed.");
234
235out:
236	schedule_work(&ms->trigger_event);
237}
238
239/*-----------------------------------------------------------------
240 * Recovery.
241 *
242 * When a mirror is first activated we may find that some regions
243 * are in the no-sync state.  We have to recover these by
244 * recopying from the default mirror to all the others.
245 *---------------------------------------------------------------*/
246static void recovery_complete(int read_err, unsigned long write_err,
247			      void *context)
248{
249	struct dm_region *reg = context;
250	struct mirror_set *ms = dm_rh_region_context(reg);
251	int m, bit = 0;
252
253	if (read_err) {
254		/* Read error means the failure of default mirror. */
255		DMERR_LIMIT("Unable to read primary mirror during recovery");
256		fail_mirror(get_default_mirror(ms), DM_RAID1_SYNC_ERROR);
257	}
258
259	if (write_err) {
260		DMERR_LIMIT("Write error during recovery (error = 0x%lx)",
261			    write_err);
262		/*
263		 * Bits correspond to devices (excluding default mirror).
264		 * The default mirror cannot change during recovery.
265		 */
266		for (m = 0; m < ms->nr_mirrors; m++) {
267			if (&ms->mirror[m] == get_default_mirror(ms))
268				continue;
269			if (test_bit(bit, &write_err))
270				fail_mirror(ms->mirror + m,
271					    DM_RAID1_SYNC_ERROR);
272			bit++;
273		}
274	}
275
276	dm_rh_recovery_end(reg, !(read_err || write_err));
277}
278
279static int recover(struct mirror_set *ms, struct dm_region *reg)
280{
281	int r;
282	unsigned i;
283	struct dm_io_region from, to[DM_KCOPYD_MAX_REGIONS], *dest;
284	struct mirror *m;
285	unsigned long flags = 0;
286	region_t key = dm_rh_get_region_key(reg);
287	sector_t region_size = dm_rh_get_region_size(ms->rh);
288
289	/* fill in the source */
290	m = get_default_mirror(ms);
291	from.bdev = m->dev->bdev;
292	from.sector = m->offset + dm_rh_region_to_sector(ms->rh, key);
293	if (key == (ms->nr_regions - 1)) {
294		/*
295		 * The final region may be smaller than
296		 * region_size.
297		 */
298		from.count = ms->ti->len & (region_size - 1);
299		if (!from.count)
300			from.count = region_size;
301	} else
302		from.count = region_size;
303
304	/* fill in the destinations */
305	for (i = 0, dest = to; i < ms->nr_mirrors; i++) {
306		if (&ms->mirror[i] == get_default_mirror(ms))
307			continue;
308
309		m = ms->mirror + i;
310		dest->bdev = m->dev->bdev;
311		dest->sector = m->offset + dm_rh_region_to_sector(ms->rh, key);
312		dest->count = from.count;
313		dest++;
314	}
315
316	/* hand to kcopyd */
317	if (!errors_handled(ms))
318		set_bit(DM_KCOPYD_IGNORE_ERROR, &flags);
319
320	r = dm_kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to,
321			   flags, recovery_complete, reg);
322
323	return r;
324}
325
326static void do_recovery(struct mirror_set *ms)
327{
328	struct dm_region *reg;
329	struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
330	int r;
331
332	/*
333	 * Start quiescing some regions.
334	 */
335	dm_rh_recovery_prepare(ms->rh);
336
337	/*
338	 * Copy any already quiesced regions.
339	 */
340	while ((reg = dm_rh_recovery_start(ms->rh))) {
341		r = recover(ms, reg);
342		if (r)
343			dm_rh_recovery_end(reg, 0);
344	}
345
346	/*
347	 * Update the in sync flag.
348	 */
349	if (!ms->in_sync &&
350	    (log->type->get_sync_count(log) == ms->nr_regions)) {
351		/* the sync is complete */
352		dm_table_event(ms->ti->table);
353		ms->in_sync = 1;
354	}
355}
356
357/*-----------------------------------------------------------------
358 * Reads
359 *---------------------------------------------------------------*/
360static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector)
361{
362	struct mirror *m = get_default_mirror(ms);
363
364	do {
365		if (likely(!atomic_read(&m->error_count)))
366			return m;
367
368		if (m-- == ms->mirror)
369			m += ms->nr_mirrors;
370	} while (m != get_default_mirror(ms));
371
372	return NULL;
373}
374
375static int default_ok(struct mirror *m)
376{
377	struct mirror *default_mirror = get_default_mirror(m->ms);
378
379	return !atomic_read(&default_mirror->error_count);
380}
381
382static int mirror_available(struct mirror_set *ms, struct bio *bio)
383{
384	struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
385	region_t region = dm_rh_bio_to_region(ms->rh, bio);
386
387	if (log->type->in_sync(log, region, 0))
388		return choose_mirror(ms,  bio->bi_sector) ? 1 : 0;
389
390	return 0;
391}
392
393/*
394 * remap a buffer to a particular mirror.
395 */
396static sector_t map_sector(struct mirror *m, struct bio *bio)
397{
398	return m->offset + (bio->bi_sector - m->ms->ti->begin);
399}
400
401static void map_bio(struct mirror *m, struct bio *bio)
402{
403	bio->bi_bdev = m->dev->bdev;
404	bio->bi_sector = map_sector(m, bio);
405}
406
407static void map_region(struct dm_io_region *io, struct mirror *m,
408		       struct bio *bio)
409{
410	io->bdev = m->dev->bdev;
411	io->sector = map_sector(m, bio);
412	io->count = bio->bi_size >> 9;
413}
414
415/*-----------------------------------------------------------------
416 * Reads
417 *---------------------------------------------------------------*/
418static void read_callback(unsigned long error, void *context)
419{
420	struct bio *bio = context;
421	struct mirror *m;
422
423	m = bio_get_m(bio);
424	bio_set_m(bio, NULL);
425
426	if (likely(!error)) {
427		bio_endio(bio, 0);
428		return;
429	}
430
431	fail_mirror(m, DM_RAID1_READ_ERROR);
432
433	if (likely(default_ok(m)) || mirror_available(m->ms, bio)) {
434		DMWARN_LIMIT("Read failure on mirror device %s.  "
435			     "Trying alternative device.",
436			     m->dev->name);
437		queue_bio(m->ms, bio, bio_rw(bio));
438		return;
439	}
440
441	DMERR_LIMIT("Read failure on mirror device %s.  Failing I/O.",
442		    m->dev->name);
443	bio_endio(bio, -EIO);
444}
445
446/* Asynchronous read. */
447static void read_async_bio(struct mirror *m, struct bio *bio)
448{
449	struct dm_io_region io;
450	struct dm_io_request io_req = {
451		.bi_rw = READ,
452		.mem.type = DM_IO_BVEC,
453		.mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
454		.notify.fn = read_callback,
455		.notify.context = bio,
456		.client = m->ms->io_client,
457	};
458
459	map_region(&io, m, bio);
460	bio_set_m(bio, m);
461	BUG_ON(dm_io(&io_req, 1, &io, NULL));
462}
463
464static inline int region_in_sync(struct mirror_set *ms, region_t region,
465				 int may_block)
466{
467	int state = dm_rh_get_state(ms->rh, region, may_block);
468	return state == DM_RH_CLEAN || state == DM_RH_DIRTY;
469}
470
471static void do_reads(struct mirror_set *ms, struct bio_list *reads)
472{
473	region_t region;
474	struct bio *bio;
475	struct mirror *m;
476
477	while ((bio = bio_list_pop(reads))) {
478		region = dm_rh_bio_to_region(ms->rh, bio);
479		m = get_default_mirror(ms);
480
481		/*
482		 * We can only read balance if the region is in sync.
483		 */
484		if (likely(region_in_sync(ms, region, 1)))
485			m = choose_mirror(ms, bio->bi_sector);
486		else if (m && atomic_read(&m->error_count))
487			m = NULL;
488
489		if (likely(m))
490			read_async_bio(m, bio);
491		else
492			bio_endio(bio, -EIO);
493	}
494}
495
496/*-----------------------------------------------------------------
497 * Writes.
498 *
499 * We do different things with the write io depending on the
500 * state of the region that it's in:
501 *
502 * SYNC: 	increment pending, use kcopyd to write to *all* mirrors
503 * RECOVERING:	delay the io until recovery completes
504 * NOSYNC:	increment pending, just write to the default mirror
505 *---------------------------------------------------------------*/
506
507
508static void write_callback(unsigned long error, void *context)
509{
510	unsigned i, ret = 0;
511	struct bio *bio = (struct bio *) context;
512	struct mirror_set *ms;
513	int uptodate = 0;
514	int should_wake = 0;
515	unsigned long flags;
516
517	ms = bio_get_m(bio)->ms;
518	bio_set_m(bio, NULL);
519
520	/*
521	 * NOTE: We don't decrement the pending count here,
522	 * instead it is done by the targets endio function.
523	 * This way we handle both writes to SYNC and NOSYNC
524	 * regions with the same code.
525	 */
526	if (likely(!error))
527		goto out;
528
529	for (i = 0; i < ms->nr_mirrors; i++)
530		if (test_bit(i, &error))
531			fail_mirror(ms->mirror + i, DM_RAID1_WRITE_ERROR);
532		else
533			uptodate = 1;
534
535	if (unlikely(!uptodate)) {
536		DMERR("All replicated volumes dead, failing I/O");
537		/* None of the writes succeeded, fail the I/O. */
538		ret = -EIO;
539	} else if (errors_handled(ms)) {
540		/*
541		 * Need to raise event.  Since raising
542		 * events can block, we need to do it in
543		 * the main thread.
544		 */
545		spin_lock_irqsave(&ms->lock, flags);
546		if (!ms->failures.head)
547			should_wake = 1;
548		bio_list_add(&ms->failures, bio);
549		spin_unlock_irqrestore(&ms->lock, flags);
550		if (should_wake)
551			wakeup_mirrord(ms);
552		return;
553	}
554out:
555	bio_endio(bio, ret);
556}
557
558static void do_write(struct mirror_set *ms, struct bio *bio)
559{
560	unsigned int i;
561	struct dm_io_region io[ms->nr_mirrors], *dest = io;
562	struct mirror *m;
563	struct dm_io_request io_req = {
564		.bi_rw = WRITE,
565		.mem.type = DM_IO_BVEC,
566		.mem.ptr.bvec = bio->bi_io_vec + bio->bi_idx,
567		.notify.fn = write_callback,
568		.notify.context = bio,
569		.client = ms->io_client,
570	};
571
572	for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++)
573		map_region(dest++, m, bio);
574
575	/*
576	 * Use default mirror because we only need it to retrieve the reference
577	 * to the mirror set in write_callback().
578	 */
579	bio_set_m(bio, get_default_mirror(ms));
580
581	BUG_ON(dm_io(&io_req, ms->nr_mirrors, io, NULL));
582}
583
584static void do_writes(struct mirror_set *ms, struct bio_list *writes)
585{
586	int state;
587	struct bio *bio;
588	struct bio_list sync, nosync, recover, *this_list = NULL;
589
590	if (!writes->head)
591		return;
592
593	/*
594	 * Classify each write.
595	 */
596	bio_list_init(&sync);
597	bio_list_init(&nosync);
598	bio_list_init(&recover);
599
600	while ((bio = bio_list_pop(writes))) {
601		state = dm_rh_get_state(ms->rh,
602					dm_rh_bio_to_region(ms->rh, bio), 1);
603		switch (state) {
604		case DM_RH_CLEAN:
605		case DM_RH_DIRTY:
606			this_list = &sync;
607			break;
608
609		case DM_RH_NOSYNC:
610			this_list = &nosync;
611			break;
612
613		case DM_RH_RECOVERING:
614			this_list = &recover;
615			break;
616		}
617
618		bio_list_add(this_list, bio);
619	}
620
621	/*
622	 * Increment the pending counts for any regions that will
623	 * be written to (writes to recover regions are going to
624	 * be delayed).
625	 */
626	dm_rh_inc_pending(ms->rh, &sync);
627	dm_rh_inc_pending(ms->rh, &nosync);
628	ms->log_failure = dm_rh_flush(ms->rh) ? 1 : 0;
629
630	/*
631	 * Dispatch io.
632	 */
633	if (unlikely(ms->log_failure)) {
634		spin_lock_irq(&ms->lock);
635		bio_list_merge(&ms->failures, &sync);
636		spin_unlock_irq(&ms->lock);
637		wakeup_mirrord(ms);
638	} else
639		while ((bio = bio_list_pop(&sync)))
640			do_write(ms, bio);
641
642	while ((bio = bio_list_pop(&recover)))
643		dm_rh_delay(ms->rh, bio);
644
645	while ((bio = bio_list_pop(&nosync))) {
646		map_bio(get_default_mirror(ms), bio);
647		generic_make_request(bio);
648	}
649}
650
651static void do_failures(struct mirror_set *ms, struct bio_list *failures)
652{
653	struct bio *bio;
654
655	if (!failures->head)
656		return;
657
658	if (!ms->log_failure) {
659		while ((bio = bio_list_pop(failures)))
660			ms->in_sync = 0;
661			dm_rh_mark_nosync(ms->rh, bio, bio->bi_size, 0);
662		return;
663	}
664
665	/*
666	 * If the log has failed, unattempted writes are being
667	 * put on the failures list.  We can't issue those writes
668	 * until a log has been marked, so we must store them.
669	 *
670	 * If a 'noflush' suspend is in progress, we can requeue
671	 * the I/O's to the core.  This give userspace a chance
672	 * to reconfigure the mirror, at which point the core
673	 * will reissue the writes.  If the 'noflush' flag is
674	 * not set, we have no choice but to return errors.
675	 *
676	 * Some writes on the failures list may have been
677	 * submitted before the log failure and represent a
678	 * failure to write to one of the devices.  It is ok
679	 * for us to treat them the same and requeue them
680	 * as well.
681	 */
682	if (dm_noflush_suspending(ms->ti)) {
683		while ((bio = bio_list_pop(failures)))
684			bio_endio(bio, DM_ENDIO_REQUEUE);
685		return;
686	}
687
688	if (atomic_read(&ms->suspend)) {
689		while ((bio = bio_list_pop(failures)))
690			bio_endio(bio, -EIO);
691		return;
692	}
693
694	spin_lock_irq(&ms->lock);
695	bio_list_merge(&ms->failures, failures);
696	spin_unlock_irq(&ms->lock);
697
698	delayed_wake(ms);
699}
700
701static void trigger_event(struct work_struct *work)
702{
703	struct mirror_set *ms =
704		container_of(work, struct mirror_set, trigger_event);
705
706	dm_table_event(ms->ti->table);
707}
708
709/*-----------------------------------------------------------------
710 * kmirrord
711 *---------------------------------------------------------------*/
712static void do_mirror(struct work_struct *work)
713{
714	struct mirror_set *ms = container_of(work, struct mirror_set,
715					     kmirrord_work);
716	struct bio_list reads, writes, failures;
717	unsigned long flags;
718
719	spin_lock_irqsave(&ms->lock, flags);
720	reads = ms->reads;
721	writes = ms->writes;
722	failures = ms->failures;
723	bio_list_init(&ms->reads);
724	bio_list_init(&ms->writes);
725	bio_list_init(&ms->failures);
726	spin_unlock_irqrestore(&ms->lock, flags);
727
728	dm_rh_update_states(ms->rh, errors_handled(ms));
729	do_recovery(ms);
730	do_reads(ms, &reads);
731	do_writes(ms, &writes);
732	do_failures(ms, &failures);
733
734	dm_table_unplug_all(ms->ti->table);
735}
736
737/*-----------------------------------------------------------------
738 * Target functions
739 *---------------------------------------------------------------*/
740static struct mirror_set *alloc_context(unsigned int nr_mirrors,
741					uint32_t region_size,
742					struct dm_target *ti,
743					struct dm_dirty_log *dl)
744{
745	size_t len;
746	struct mirror_set *ms = NULL;
747
748	len = sizeof(*ms) + (sizeof(ms->mirror[0]) * nr_mirrors);
749
750	ms = kzalloc(len, GFP_KERNEL);
751	if (!ms) {
752		ti->error = "Cannot allocate mirror context";
753		return NULL;
754	}
755
756	spin_lock_init(&ms->lock);
757
758	ms->ti = ti;
759	ms->nr_mirrors = nr_mirrors;
760	ms->nr_regions = dm_sector_div_up(ti->len, region_size);
761	ms->in_sync = 0;
762	ms->log_failure = 0;
763	atomic_set(&ms->suspend, 0);
764	atomic_set(&ms->default_mirror, DEFAULT_MIRROR);
765
766	len = sizeof(struct dm_raid1_read_record);
767	ms->read_record_pool = mempool_create_kmalloc_pool(MIN_READ_RECORDS,
768							   len);
769	if (!ms->read_record_pool) {
770		ti->error = "Error creating mirror read_record_pool";
771		kfree(ms);
772		return NULL;
773	}
774
775	ms->io_client = dm_io_client_create(DM_IO_PAGES);
776	if (IS_ERR(ms->io_client)) {
777		ti->error = "Error creating dm_io client";
778		mempool_destroy(ms->read_record_pool);
779		kfree(ms);
780 		return NULL;
781	}
782
783	ms->rh = dm_region_hash_create(ms, dispatch_bios, wakeup_mirrord,
784				       wakeup_all_recovery_waiters,
785				       ms->ti->begin, MAX_RECOVERY,
786				       dl, region_size, ms->nr_regions);
787	if (IS_ERR(ms->rh)) {
788		ti->error = "Error creating dirty region hash";
789		dm_io_client_destroy(ms->io_client);
790		mempool_destroy(ms->read_record_pool);
791		kfree(ms);
792		return NULL;
793	}
794
795	return ms;
796}
797
798static void free_context(struct mirror_set *ms, struct dm_target *ti,
799			 unsigned int m)
800{
801	while (m--)
802		dm_put_device(ti, ms->mirror[m].dev);
803
804	dm_io_client_destroy(ms->io_client);
805	dm_region_hash_destroy(ms->rh);
806	mempool_destroy(ms->read_record_pool);
807	kfree(ms);
808}
809
810static inline int _check_region_size(struct dm_target *ti, uint32_t size)
811{
812	return !(size % (PAGE_SIZE >> 9) || !is_power_of_2(size) ||
813		 size > ti->len);
814}
815
816static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
817		      unsigned int mirror, char **argv)
818{
819	unsigned long long offset;
820
821	if (sscanf(argv[1], "%llu", &offset) != 1) {
822		ti->error = "Invalid offset";
823		return -EINVAL;
824	}
825
826	if (dm_get_device(ti, argv[0], offset, ti->len,
827			  dm_table_get_mode(ti->table),
828			  &ms->mirror[mirror].dev)) {
829		ti->error = "Device lookup failure";
830		return -ENXIO;
831	}
832
833	ms->mirror[mirror].ms = ms;
834	atomic_set(&(ms->mirror[mirror].error_count), 0);
835	ms->mirror[mirror].error_type = 0;
836	ms->mirror[mirror].offset = offset;
837
838	return 0;
839}
840
841/*
842 * Create dirty log: log_type #log_params <log_params>
843 */
844static struct dm_dirty_log *create_dirty_log(struct dm_target *ti,
845					     unsigned argc, char **argv,
846					     unsigned *args_used)
847{
848	unsigned param_count;
849	struct dm_dirty_log *dl;
850
851	if (argc < 2) {
852		ti->error = "Insufficient mirror log arguments";
853		return NULL;
854	}
855
856	if (sscanf(argv[1], "%u", &param_count) != 1) {
857		ti->error = "Invalid mirror log argument count";
858		return NULL;
859	}
860
861	*args_used = 2 + param_count;
862
863	if (argc < *args_used) {
864		ti->error = "Insufficient mirror log arguments";
865		return NULL;
866	}
867
868	dl = dm_dirty_log_create(argv[0], ti, param_count, argv + 2);
869	if (!dl) {
870		ti->error = "Error creating mirror dirty log";
871		return NULL;
872	}
873
874	if (!_check_region_size(ti, dl->type->get_region_size(dl))) {
875		ti->error = "Invalid region size";
876		dm_dirty_log_destroy(dl);
877		return NULL;
878	}
879
880	return dl;
881}
882
883static int parse_features(struct mirror_set *ms, unsigned argc, char **argv,
884			  unsigned *args_used)
885{
886	unsigned num_features;
887	struct dm_target *ti = ms->ti;
888
889	*args_used = 0;
890
891	if (!argc)
892		return 0;
893
894	if (sscanf(argv[0], "%u", &num_features) != 1) {
895		ti->error = "Invalid number of features";
896		return -EINVAL;
897	}
898
899	argc--;
900	argv++;
901	(*args_used)++;
902
903	if (num_features > argc) {
904		ti->error = "Not enough arguments to support feature count";
905		return -EINVAL;
906	}
907
908	if (!strcmp("handle_errors", argv[0]))
909		ms->features |= DM_RAID1_HANDLE_ERRORS;
910	else {
911		ti->error = "Unrecognised feature requested";
912		return -EINVAL;
913	}
914
915	(*args_used)++;
916
917	return 0;
918}
919
920/*
921 * Construct a mirror mapping:
922 *
923 * log_type #log_params <log_params>
924 * #mirrors [mirror_path offset]{2,}
925 * [#features <features>]
926 *
927 * log_type is "core" or "disk"
928 * #log_params is between 1 and 3
929 *
930 * If present, features must be "handle_errors".
931 */
932static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
933{
934	int r;
935	unsigned int nr_mirrors, m, args_used;
936	struct mirror_set *ms;
937	struct dm_dirty_log *dl;
938
939	dl = create_dirty_log(ti, argc, argv, &args_used);
940	if (!dl)
941		return -EINVAL;
942
943	argv += args_used;
944	argc -= args_used;
945
946	if (!argc || sscanf(argv[0], "%u", &nr_mirrors) != 1 ||
947	    nr_mirrors < 2 || nr_mirrors > DM_KCOPYD_MAX_REGIONS + 1) {
948		ti->error = "Invalid number of mirrors";
949		dm_dirty_log_destroy(dl);
950		return -EINVAL;
951	}
952
953	argv++, argc--;
954
955	if (argc < nr_mirrors * 2) {
956		ti->error = "Too few mirror arguments";
957		dm_dirty_log_destroy(dl);
958		return -EINVAL;
959	}
960
961	ms = alloc_context(nr_mirrors, dl->type->get_region_size(dl), ti, dl);
962	if (!ms) {
963		dm_dirty_log_destroy(dl);
964		return -ENOMEM;
965	}
966
967	/* Get the mirror parameter sets */
968	for (m = 0; m < nr_mirrors; m++) {
969		r = get_mirror(ms, ti, m, argv);
970		if (r) {
971			free_context(ms, ti, m);
972			return r;
973		}
974		argv += 2;
975		argc -= 2;
976	}
977
978	ti->private = ms;
979	ti->split_io = dm_rh_get_region_size(ms->rh);
980
981	ms->kmirrord_wq = create_singlethread_workqueue("kmirrord");
982	if (!ms->kmirrord_wq) {
983		DMERR("couldn't start kmirrord");
984		r = -ENOMEM;
985		goto err_free_context;
986	}
987	INIT_WORK(&ms->kmirrord_work, do_mirror);
988	init_timer(&ms->timer);
989	ms->timer_pending = 0;
990	INIT_WORK(&ms->trigger_event, trigger_event);
991
992	r = parse_features(ms, argc, argv, &args_used);
993	if (r)
994		goto err_destroy_wq;
995
996	argv += args_used;
997	argc -= args_used;
998
999	/*
1000	 * Any read-balancing addition depends on the
1001	 * DM_RAID1_HANDLE_ERRORS flag being present.
1002	 * This is because the decision to balance depends
1003	 * on the sync state of a region.  If the above
1004	 * flag is not present, we ignore errors; and
1005	 * the sync state may be inaccurate.
1006	 */
1007
1008	if (argc) {
1009		ti->error = "Too many mirror arguments";
1010		r = -EINVAL;
1011		goto err_destroy_wq;
1012	}
1013
1014	r = dm_kcopyd_client_create(DM_KCOPYD_PAGES, &ms->kcopyd_client);
1015	if (r)
1016		goto err_destroy_wq;
1017
1018	wakeup_mirrord(ms);
1019	return 0;
1020
1021err_destroy_wq:
1022	destroy_workqueue(ms->kmirrord_wq);
1023err_free_context:
1024	free_context(ms, ti, ms->nr_mirrors);
1025	return r;
1026}
1027
1028static void mirror_dtr(struct dm_target *ti)
1029{
1030	struct mirror_set *ms = (struct mirror_set *) ti->private;
1031
1032	del_timer_sync(&ms->timer);
1033	flush_workqueue(ms->kmirrord_wq);
1034	dm_kcopyd_client_destroy(ms->kcopyd_client);
1035	destroy_workqueue(ms->kmirrord_wq);
1036	free_context(ms, ti, ms->nr_mirrors);
1037}
1038
1039/*
1040 * Mirror mapping function
1041 */
1042static int mirror_map(struct dm_target *ti, struct bio *bio,
1043		      union map_info *map_context)
1044{
1045	int r, rw = bio_rw(bio);
1046	struct mirror *m;
1047	struct mirror_set *ms = ti->private;
1048	struct dm_raid1_read_record *read_record = NULL;
1049	struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1050
1051	if (rw == WRITE) {
1052		/* Save region for mirror_end_io() handler */
1053		map_context->ll = dm_rh_bio_to_region(ms->rh, bio);
1054		queue_bio(ms, bio, rw);
1055		return DM_MAPIO_SUBMITTED;
1056	}
1057
1058	r = log->type->in_sync(log, dm_rh_bio_to_region(ms->rh, bio), 0);
1059	if (r < 0 && r != -EWOULDBLOCK)
1060		return r;
1061
1062	/*
1063	 * If region is not in-sync queue the bio.
1064	 */
1065	if (!r || (r == -EWOULDBLOCK)) {
1066		if (rw == READA)
1067			return -EWOULDBLOCK;
1068
1069		queue_bio(ms, bio, rw);
1070		return DM_MAPIO_SUBMITTED;
1071	}
1072
1073	/*
1074	 * The region is in-sync and we can perform reads directly.
1075	 * Store enough information so we can retry if it fails.
1076	 */
1077	m = choose_mirror(ms, bio->bi_sector);
1078	if (unlikely(!m))
1079		return -EIO;
1080
1081	read_record = mempool_alloc(ms->read_record_pool, GFP_NOIO);
1082	if (likely(read_record)) {
1083		dm_bio_record(&read_record->details, bio);
1084		map_context->ptr = read_record;
1085		read_record->m = m;
1086	}
1087
1088	map_bio(m, bio);
1089
1090	return DM_MAPIO_REMAPPED;
1091}
1092
1093static int mirror_end_io(struct dm_target *ti, struct bio *bio,
1094			 int error, union map_info *map_context)
1095{
1096	int rw = bio_rw(bio);
1097	struct mirror_set *ms = (struct mirror_set *) ti->private;
1098	struct mirror *m = NULL;
1099	struct dm_bio_details *bd = NULL;
1100	struct dm_raid1_read_record *read_record = map_context->ptr;
1101
1102	/*
1103	 * We need to dec pending if this was a write.
1104	 */
1105	if (rw == WRITE) {
1106		dm_rh_dec(ms->rh, map_context->ll);
1107		return error;
1108	}
1109
1110	if (error == -EOPNOTSUPP)
1111		goto out;
1112
1113	if ((error == -EWOULDBLOCK) && bio_rw_ahead(bio))
1114		goto out;
1115
1116	if (unlikely(error)) {
1117		if (!read_record) {
1118			/*
1119			 * There wasn't enough memory to record necessary
1120			 * information for a retry or there was no other
1121			 * mirror in-sync.
1122			 */
1123			DMERR_LIMIT("Mirror read failed.");
1124			return -EIO;
1125		}
1126
1127		m = read_record->m;
1128
1129		DMERR("Mirror read failed from %s. Trying alternative device.",
1130		      m->dev->name);
1131
1132		fail_mirror(m, DM_RAID1_READ_ERROR);
1133
1134		/*
1135		 * A failed read is requeued for another attempt using an intact
1136		 * mirror.
1137		 */
1138		if (default_ok(m) || mirror_available(ms, bio)) {
1139			bd = &read_record->details;
1140
1141			dm_bio_restore(bd, bio);
1142			mempool_free(read_record, ms->read_record_pool);
1143			map_context->ptr = NULL;
1144			queue_bio(ms, bio, rw);
1145			return 1;
1146		}
1147		DMERR("All replicated volumes dead, failing I/O");
1148	}
1149
1150out:
1151	if (read_record) {
1152		mempool_free(read_record, ms->read_record_pool);
1153		map_context->ptr = NULL;
1154	}
1155
1156	return error;
1157}
1158
1159static void mirror_presuspend(struct dm_target *ti)
1160{
1161	struct mirror_set *ms = (struct mirror_set *) ti->private;
1162	struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1163
1164	atomic_set(&ms->suspend, 1);
1165
1166	/*
1167	 * We must finish up all the work that we've
1168	 * generated (i.e. recovery work).
1169	 */
1170	dm_rh_stop_recovery(ms->rh);
1171
1172	wait_event(_kmirrord_recovery_stopped,
1173		   !dm_rh_recovery_in_flight(ms->rh));
1174
1175	if (log->type->presuspend && log->type->presuspend(log))
1176		/* FIXME: need better error handling */
1177		DMWARN("log presuspend failed");
1178
1179	/*
1180	 * Now that recovery is complete/stopped and the
1181	 * delayed bios are queued, we need to wait for
1182	 * the worker thread to complete.  This way,
1183	 * we know that all of our I/O has been pushed.
1184	 */
1185	flush_workqueue(ms->kmirrord_wq);
1186}
1187
1188static void mirror_postsuspend(struct dm_target *ti)
1189{
1190	struct mirror_set *ms = ti->private;
1191	struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1192
1193	if (log->type->postsuspend && log->type->postsuspend(log))
1194		/* FIXME: need better error handling */
1195		DMWARN("log postsuspend failed");
1196}
1197
1198static void mirror_resume(struct dm_target *ti)
1199{
1200	struct mirror_set *ms = ti->private;
1201	struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1202
1203	atomic_set(&ms->suspend, 0);
1204	if (log->type->resume && log->type->resume(log))
1205		/* FIXME: need better error handling */
1206		DMWARN("log resume failed");
1207	dm_rh_start_recovery(ms->rh);
1208}
1209
1210/*
1211 * device_status_char
1212 * @m: mirror device/leg we want the status of
1213 *
1214 * We return one character representing the most severe error
1215 * we have encountered.
1216 *    A => Alive - No failures
1217 *    D => Dead - A write failure occurred leaving mirror out-of-sync
1218 *    S => Sync - A sychronization failure occurred, mirror out-of-sync
1219 *    R => Read - A read failure occurred, mirror data unaffected
1220 *
1221 * Returns: <char>
1222 */
1223static char device_status_char(struct mirror *m)
1224{
1225	if (!atomic_read(&(m->error_count)))
1226		return 'A';
1227
1228	return (test_bit(DM_RAID1_WRITE_ERROR, &(m->error_type))) ? 'D' :
1229		(test_bit(DM_RAID1_SYNC_ERROR, &(m->error_type))) ? 'S' :
1230		(test_bit(DM_RAID1_READ_ERROR, &(m->error_type))) ? 'R' : 'U';
1231}
1232
1233
1234static int mirror_status(struct dm_target *ti, status_type_t type,
1235			 char *result, unsigned int maxlen)
1236{
1237	unsigned int m, sz = 0;
1238	struct mirror_set *ms = (struct mirror_set *) ti->private;
1239	struct dm_dirty_log *log = dm_rh_dirty_log(ms->rh);
1240	char buffer[ms->nr_mirrors + 1];
1241
1242	switch (type) {
1243	case STATUSTYPE_INFO:
1244		DMEMIT("%d ", ms->nr_mirrors);
1245		for (m = 0; m < ms->nr_mirrors; m++) {
1246			DMEMIT("%s ", ms->mirror[m].dev->name);
1247			buffer[m] = device_status_char(&(ms->mirror[m]));
1248		}
1249		buffer[m] = '\0';
1250
1251		DMEMIT("%llu/%llu 1 %s ",
1252		      (unsigned long long)log->type->get_sync_count(log),
1253		      (unsigned long long)ms->nr_regions, buffer);
1254
1255		sz += log->type->status(log, type, result+sz, maxlen-sz);
1256
1257		break;
1258
1259	case STATUSTYPE_TABLE:
1260		sz = log->type->status(log, type, result, maxlen);
1261
1262		DMEMIT("%d", ms->nr_mirrors);
1263		for (m = 0; m < ms->nr_mirrors; m++)
1264			DMEMIT(" %s %llu", ms->mirror[m].dev->name,
1265			       (unsigned long long)ms->mirror[m].offset);
1266
1267		if (ms->features & DM_RAID1_HANDLE_ERRORS)
1268			DMEMIT(" 1 handle_errors");
1269	}
1270
1271	return 0;
1272}
1273
1274static struct target_type mirror_target = {
1275	.name	 = "mirror",
1276	.version = {1, 0, 20},
1277	.module	 = THIS_MODULE,
1278	.ctr	 = mirror_ctr,
1279	.dtr	 = mirror_dtr,
1280	.map	 = mirror_map,
1281	.end_io	 = mirror_end_io,
1282	.presuspend = mirror_presuspend,
1283	.postsuspend = mirror_postsuspend,
1284	.resume	 = mirror_resume,
1285	.status	 = mirror_status,
1286};
1287
1288static int __init dm_mirror_init(void)
1289{
1290	int r;
1291
1292	r = dm_register_target(&mirror_target);
1293	if (r < 0)
1294		DMERR("Failed to register mirror target");
1295
1296	return r;
1297}
1298
1299static void __exit dm_mirror_exit(void)
1300{
1301	int r;
1302
1303	r = dm_unregister_target(&mirror_target);
1304	if (r < 0)
1305		DMERR("unregister failed %d", r);
1306}
1307
1308/* Module hooks */
1309module_init(dm_mirror_init);
1310module_exit(dm_mirror_exit);
1311
1312MODULE_DESCRIPTION(DM_NAME " mirror target");
1313MODULE_AUTHOR("Joe Thornber");
1314MODULE_LICENSE("GPL");
1315