dm-block-manager.c revision 8d44c98aac540cdf3cb5385bc6ef8d56930c7d70
1/*
2 * Copyright (C) 2011 Red Hat, Inc.
3 *
4 * This file is released under the GPL.
5 */
6#include "dm-block-manager.h"
7#include "dm-persistent-data-internal.h"
8#include "../dm-bufio.h"
9
10#include <linux/crc32c.h>
11#include <linux/module.h>
12#include <linux/slab.h>
13#include <linux/rwsem.h>
14#include <linux/device-mapper.h>
15#include <linux/stacktrace.h>
16
17#define DM_MSG_PREFIX "block manager"
18
19/*----------------------------------------------------------------*/
20
21/*
22 * This is a read/write semaphore with a couple of differences.
23 *
24 * i) There is a restriction on the number of concurrent read locks that
25 * may be held at once.  This is just an implementation detail.
26 *
27 * ii) Recursive locking attempts are detected and return EINVAL.  A stack
28 * trace is also emitted for the previous lock aquisition.
29 *
30 * iii) Priority is given to write locks.
31 */
32#define MAX_HOLDERS 4
33#define MAX_STACK 10
34
35typedef unsigned long stack_entries[MAX_STACK];
36
37struct block_lock {
38	spinlock_t lock;
39	__s32 count;
40	struct list_head waiters;
41	struct task_struct *holders[MAX_HOLDERS];
42
43#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
44	struct stack_trace traces[MAX_HOLDERS];
45	stack_entries entries[MAX_HOLDERS];
46#endif
47};
48
49struct waiter {
50	struct list_head list;
51	struct task_struct *task;
52	int wants_write;
53};
54
55static unsigned __find_holder(struct block_lock *lock,
56			      struct task_struct *task)
57{
58	unsigned i;
59
60	for (i = 0; i < MAX_HOLDERS; i++)
61		if (lock->holders[i] == task)
62			break;
63
64	BUG_ON(i == MAX_HOLDERS);
65	return i;
66}
67
68/* call this *after* you increment lock->count */
69static void __add_holder(struct block_lock *lock, struct task_struct *task)
70{
71	unsigned h = __find_holder(lock, NULL);
72#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
73	struct stack_trace *t;
74#endif
75
76	get_task_struct(task);
77	lock->holders[h] = task;
78
79#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
80	t = lock->traces + h;
81	t->nr_entries = 0;
82	t->max_entries = MAX_STACK;
83	t->entries = lock->entries[h];
84	t->skip = 2;
85	save_stack_trace(t);
86#endif
87}
88
89/* call this *before* you decrement lock->count */
90static void __del_holder(struct block_lock *lock, struct task_struct *task)
91{
92	unsigned h = __find_holder(lock, task);
93	lock->holders[h] = NULL;
94	put_task_struct(task);
95}
96
97static int __check_holder(struct block_lock *lock)
98{
99	unsigned i;
100#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
101	static struct stack_trace t;
102	static stack_entries entries;
103#endif
104
105	for (i = 0; i < MAX_HOLDERS; i++) {
106		if (lock->holders[i] == current) {
107			DMERR("recursive lock detected in pool metadata");
108#ifdef CONFIG_DM_DEBUG_BLOCK_STACK_TRACING
109			DMERR("previously held here:");
110			print_stack_trace(lock->traces + i, 4);
111
112			DMERR("subsequent aquisition attempted here:");
113			t.nr_entries = 0;
114			t.max_entries = MAX_STACK;
115			t.entries = entries;
116			t.skip = 3;
117			save_stack_trace(&t);
118			print_stack_trace(&t, 4);
119#endif
120			return -EINVAL;
121		}
122	}
123
124	return 0;
125}
126
127static void __wait(struct waiter *w)
128{
129	for (;;) {
130		set_task_state(current, TASK_UNINTERRUPTIBLE);
131
132		if (!w->task)
133			break;
134
135		schedule();
136	}
137
138	set_task_state(current, TASK_RUNNING);
139}
140
141static void __wake_waiter(struct waiter *w)
142{
143	struct task_struct *task;
144
145	list_del(&w->list);
146	task = w->task;
147	smp_mb();
148	w->task = NULL;
149	wake_up_process(task);
150}
151
152/*
153 * We either wake a few readers or a single writer.
154 */
155static void __wake_many(struct block_lock *lock)
156{
157	struct waiter *w, *tmp;
158
159	BUG_ON(lock->count < 0);
160	list_for_each_entry_safe(w, tmp, &lock->waiters, list) {
161		if (lock->count >= MAX_HOLDERS)
162			return;
163
164		if (w->wants_write) {
165			if (lock->count > 0)
166				return; /* still read locked */
167
168			lock->count = -1;
169			__add_holder(lock, w->task);
170			__wake_waiter(w);
171			return;
172		}
173
174		lock->count++;
175		__add_holder(lock, w->task);
176		__wake_waiter(w);
177	}
178}
179
180static void bl_init(struct block_lock *lock)
181{
182	int i;
183
184	spin_lock_init(&lock->lock);
185	lock->count = 0;
186	INIT_LIST_HEAD(&lock->waiters);
187	for (i = 0; i < MAX_HOLDERS; i++)
188		lock->holders[i] = NULL;
189}
190
191static int __available_for_read(struct block_lock *lock)
192{
193	return lock->count >= 0 &&
194		lock->count < MAX_HOLDERS &&
195		list_empty(&lock->waiters);
196}
197
198static int bl_down_read(struct block_lock *lock)
199{
200	int r;
201	struct waiter w;
202
203	spin_lock(&lock->lock);
204	r = __check_holder(lock);
205	if (r) {
206		spin_unlock(&lock->lock);
207		return r;
208	}
209
210	if (__available_for_read(lock)) {
211		lock->count++;
212		__add_holder(lock, current);
213		spin_unlock(&lock->lock);
214		return 0;
215	}
216
217	get_task_struct(current);
218
219	w.task = current;
220	w.wants_write = 0;
221	list_add_tail(&w.list, &lock->waiters);
222	spin_unlock(&lock->lock);
223
224	__wait(&w);
225	put_task_struct(current);
226	return 0;
227}
228
229static int bl_down_read_nonblock(struct block_lock *lock)
230{
231	int r;
232
233	spin_lock(&lock->lock);
234	r = __check_holder(lock);
235	if (r)
236		goto out;
237
238	if (__available_for_read(lock)) {
239		lock->count++;
240		__add_holder(lock, current);
241		r = 0;
242	} else
243		r = -EWOULDBLOCK;
244
245out:
246	spin_unlock(&lock->lock);
247	return r;
248}
249
250static void bl_up_read(struct block_lock *lock)
251{
252	spin_lock(&lock->lock);
253	BUG_ON(lock->count <= 0);
254	__del_holder(lock, current);
255	--lock->count;
256	if (!list_empty(&lock->waiters))
257		__wake_many(lock);
258	spin_unlock(&lock->lock);
259}
260
261static int bl_down_write(struct block_lock *lock)
262{
263	int r;
264	struct waiter w;
265
266	spin_lock(&lock->lock);
267	r = __check_holder(lock);
268	if (r) {
269		spin_unlock(&lock->lock);
270		return r;
271	}
272
273	if (lock->count == 0 && list_empty(&lock->waiters)) {
274		lock->count = -1;
275		__add_holder(lock, current);
276		spin_unlock(&lock->lock);
277		return 0;
278	}
279
280	get_task_struct(current);
281	w.task = current;
282	w.wants_write = 1;
283
284	/*
285	 * Writers given priority. We know there's only one mutator in the
286	 * system, so ignoring the ordering reversal.
287	 */
288	list_add(&w.list, &lock->waiters);
289	spin_unlock(&lock->lock);
290
291	__wait(&w);
292	put_task_struct(current);
293
294	return 0;
295}
296
297static void bl_up_write(struct block_lock *lock)
298{
299	spin_lock(&lock->lock);
300	__del_holder(lock, current);
301	lock->count = 0;
302	if (!list_empty(&lock->waiters))
303		__wake_many(lock);
304	spin_unlock(&lock->lock);
305}
306
307static void report_recursive_bug(dm_block_t b, int r)
308{
309	if (r == -EINVAL)
310		DMERR("recursive acquisition of block %llu requested.",
311		      (unsigned long long) b);
312}
313
314/*----------------------------------------------------------------*/
315
316/*
317 * Block manager is currently implemented using dm-bufio.  struct
318 * dm_block_manager and struct dm_block map directly onto a couple of
319 * structs in the bufio interface.  I want to retain the freedom to move
320 * away from bufio in the future.  So these structs are just cast within
321 * this .c file, rather than making it through to the public interface.
322 */
323static struct dm_buffer *to_buffer(struct dm_block *b)
324{
325	return (struct dm_buffer *) b;
326}
327
328static struct dm_bufio_client *to_bufio(struct dm_block_manager *bm)
329{
330	return (struct dm_bufio_client *) bm;
331}
332
333dm_block_t dm_block_location(struct dm_block *b)
334{
335	return dm_bufio_get_block_number(to_buffer(b));
336}
337EXPORT_SYMBOL_GPL(dm_block_location);
338
339void *dm_block_data(struct dm_block *b)
340{
341	return dm_bufio_get_block_data(to_buffer(b));
342}
343EXPORT_SYMBOL_GPL(dm_block_data);
344
345struct buffer_aux {
346	struct dm_block_validator *validator;
347	struct block_lock lock;
348	int write_locked;
349};
350
351static void dm_block_manager_alloc_callback(struct dm_buffer *buf)
352{
353	struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
354	aux->validator = NULL;
355	bl_init(&aux->lock);
356}
357
358static void dm_block_manager_write_callback(struct dm_buffer *buf)
359{
360	struct buffer_aux *aux = dm_bufio_get_aux_data(buf);
361	if (aux->validator) {
362		aux->validator->prepare_for_write(aux->validator, (struct dm_block *) buf,
363			 dm_bufio_get_block_size(dm_bufio_get_client(buf)));
364	}
365}
366
367/*----------------------------------------------------------------
368 * Public interface
369 *--------------------------------------------------------------*/
370struct dm_block_manager *dm_block_manager_create(struct block_device *bdev,
371						 unsigned block_size,
372						 unsigned cache_size,
373						 unsigned max_held_per_thread)
374{
375	return (struct dm_block_manager *)
376		dm_bufio_client_create(bdev, block_size, max_held_per_thread,
377				       sizeof(struct buffer_aux),
378				       dm_block_manager_alloc_callback,
379				       dm_block_manager_write_callback);
380}
381EXPORT_SYMBOL_GPL(dm_block_manager_create);
382
383void dm_block_manager_destroy(struct dm_block_manager *bm)
384{
385	return dm_bufio_client_destroy(to_bufio(bm));
386}
387EXPORT_SYMBOL_GPL(dm_block_manager_destroy);
388
389unsigned dm_bm_block_size(struct dm_block_manager *bm)
390{
391	return dm_bufio_get_block_size(to_bufio(bm));
392}
393EXPORT_SYMBOL_GPL(dm_bm_block_size);
394
395dm_block_t dm_bm_nr_blocks(struct dm_block_manager *bm)
396{
397	return dm_bufio_get_device_size(to_bufio(bm));
398}
399
400static int dm_bm_validate_buffer(struct dm_block_manager *bm,
401				 struct dm_buffer *buf,
402				 struct buffer_aux *aux,
403				 struct dm_block_validator *v)
404{
405	if (unlikely(!aux->validator)) {
406		int r;
407		if (!v)
408			return 0;
409		r = v->check(v, (struct dm_block *) buf, dm_bufio_get_block_size(to_bufio(bm)));
410		if (unlikely(r))
411			return r;
412		aux->validator = v;
413	} else {
414		if (unlikely(aux->validator != v)) {
415			DMERR("validator mismatch (old=%s vs new=%s) for block %llu",
416				aux->validator->name, v ? v->name : "NULL",
417				(unsigned long long)
418					dm_bufio_get_block_number(buf));
419			return -EINVAL;
420		}
421	}
422
423	return 0;
424}
425int dm_bm_read_lock(struct dm_block_manager *bm, dm_block_t b,
426		    struct dm_block_validator *v,
427		    struct dm_block **result)
428{
429	struct buffer_aux *aux;
430	void *p;
431	int r;
432
433	p = dm_bufio_read(to_bufio(bm), b, (struct dm_buffer **) result);
434	if (unlikely(IS_ERR(p)))
435		return PTR_ERR(p);
436
437	aux = dm_bufio_get_aux_data(to_buffer(*result));
438	r = bl_down_read(&aux->lock);
439	if (unlikely(r)) {
440		dm_bufio_release(to_buffer(*result));
441		report_recursive_bug(b, r);
442		return r;
443	}
444
445	aux->write_locked = 0;
446
447	r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
448	if (unlikely(r)) {
449		bl_up_read(&aux->lock);
450		dm_bufio_release(to_buffer(*result));
451		return r;
452	}
453
454	return 0;
455}
456EXPORT_SYMBOL_GPL(dm_bm_read_lock);
457
458int dm_bm_write_lock(struct dm_block_manager *bm,
459		     dm_block_t b, struct dm_block_validator *v,
460		     struct dm_block **result)
461{
462	struct buffer_aux *aux;
463	void *p;
464	int r;
465
466	p = dm_bufio_read(to_bufio(bm), b, (struct dm_buffer **) result);
467	if (unlikely(IS_ERR(p)))
468		return PTR_ERR(p);
469
470	aux = dm_bufio_get_aux_data(to_buffer(*result));
471	r = bl_down_write(&aux->lock);
472	if (r) {
473		dm_bufio_release(to_buffer(*result));
474		report_recursive_bug(b, r);
475		return r;
476	}
477
478	aux->write_locked = 1;
479
480	r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
481	if (unlikely(r)) {
482		bl_up_write(&aux->lock);
483		dm_bufio_release(to_buffer(*result));
484		return r;
485	}
486
487	return 0;
488}
489EXPORT_SYMBOL_GPL(dm_bm_write_lock);
490
491int dm_bm_read_try_lock(struct dm_block_manager *bm,
492			dm_block_t b, struct dm_block_validator *v,
493			struct dm_block **result)
494{
495	struct buffer_aux *aux;
496	void *p;
497	int r;
498
499	p = dm_bufio_get(to_bufio(bm), b, (struct dm_buffer **) result);
500	if (unlikely(IS_ERR(p)))
501		return PTR_ERR(p);
502	if (unlikely(!p))
503		return -EWOULDBLOCK;
504
505	aux = dm_bufio_get_aux_data(to_buffer(*result));
506	r = bl_down_read_nonblock(&aux->lock);
507	if (r < 0) {
508		dm_bufio_release(to_buffer(*result));
509		report_recursive_bug(b, r);
510		return r;
511	}
512	aux->write_locked = 0;
513
514	r = dm_bm_validate_buffer(bm, to_buffer(*result), aux, v);
515	if (unlikely(r)) {
516		bl_up_read(&aux->lock);
517		dm_bufio_release(to_buffer(*result));
518		return r;
519	}
520
521	return 0;
522}
523
524int dm_bm_write_lock_zero(struct dm_block_manager *bm,
525			  dm_block_t b, struct dm_block_validator *v,
526			  struct dm_block **result)
527{
528	int r;
529	struct buffer_aux *aux;
530	void *p;
531
532	p = dm_bufio_new(to_bufio(bm), b, (struct dm_buffer **) result);
533	if (unlikely(IS_ERR(p)))
534		return PTR_ERR(p);
535
536	memset(p, 0, dm_bm_block_size(bm));
537
538	aux = dm_bufio_get_aux_data(to_buffer(*result));
539	r = bl_down_write(&aux->lock);
540	if (r) {
541		dm_bufio_release(to_buffer(*result));
542		return r;
543	}
544
545	aux->write_locked = 1;
546	aux->validator = v;
547
548	return 0;
549}
550
551int dm_bm_unlock(struct dm_block *b)
552{
553	struct buffer_aux *aux;
554	aux = dm_bufio_get_aux_data(to_buffer(b));
555
556	if (aux->write_locked) {
557		dm_bufio_mark_buffer_dirty(to_buffer(b));
558		bl_up_write(&aux->lock);
559	} else
560		bl_up_read(&aux->lock);
561
562	dm_bufio_release(to_buffer(b));
563
564	return 0;
565}
566EXPORT_SYMBOL_GPL(dm_bm_unlock);
567
568int dm_bm_unlock_move(struct dm_block *b, dm_block_t n)
569{
570	struct buffer_aux *aux;
571
572	aux = dm_bufio_get_aux_data(to_buffer(b));
573
574	if (aux->write_locked) {
575		dm_bufio_mark_buffer_dirty(to_buffer(b));
576		bl_up_write(&aux->lock);
577	} else
578		bl_up_read(&aux->lock);
579
580	dm_bufio_release_move(to_buffer(b), n);
581	return 0;
582}
583
584int dm_bm_flush_and_unlock(struct dm_block_manager *bm,
585			   struct dm_block *superblock)
586{
587	int r;
588
589	r = dm_bufio_write_dirty_buffers(to_bufio(bm));
590	if (unlikely(r)) {
591		dm_bm_unlock(superblock);
592		return r;
593	}
594
595	dm_bm_unlock(superblock);
596
597	return dm_bufio_write_dirty_buffers(to_bufio(bm));
598}
599
600u32 dm_bm_checksum(const void *data, size_t len, u32 init_xor)
601{
602	return crc32c(~(u32) 0, data, len) ^ init_xor;
603}
604EXPORT_SYMBOL_GPL(dm_bm_checksum);
605
606/*----------------------------------------------------------------*/
607
608MODULE_LICENSE("GPL");
609MODULE_AUTHOR("Joe Thornber <dm-devel@redhat.com>");
610MODULE_DESCRIPTION("Immutable metadata library for dm");
611
612/*----------------------------------------------------------------*/
613