dlmglue.c revision 2a45f2d13e1dd91bc110801f5818379f2699509c
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/smp_lock.h>
31#include <linux/crc32.h>
32#include <linux/kthread.h>
33#include <linux/pagemap.h>
34#include <linux/debugfs.h>
35#include <linux/seq_file.h>
36
37#include <cluster/heartbeat.h>
38#include <cluster/nodemanager.h>
39#include <cluster/tcp.h>
40
41#include <dlm/dlmapi.h>
42
43#define MLOG_MASK_PREFIX ML_DLM_GLUE
44#include <cluster/masklog.h>
45
46#include "ocfs2.h"
47
48#include "alloc.h"
49#include "dcache.h"
50#include "dlmglue.h"
51#include "extent_map.h"
52#include "heartbeat.h"
53#include "inode.h"
54#include "journal.h"
55#include "slot_map.h"
56#include "super.h"
57#include "uptodate.h"
58#include "vote.h"
59
60#include "buffer_head_io.h"
61
62struct ocfs2_mask_waiter {
63	struct list_head	mw_item;
64	int			mw_status;
65	struct completion	mw_complete;
66	unsigned long		mw_mask;
67	unsigned long		mw_goal;
68};
69
70static void ocfs2_inode_bast_func(void *opaque,
71				  int level);
72static void ocfs2_dentry_bast_func(void *opaque,
73				  int level);
74static void ocfs2_super_bast_func(void *opaque,
75				  int level);
76static void ocfs2_rename_bast_func(void *opaque,
77				   int level);
78
79/*
80 * Return value from ocfs2_convert_worker_t functions.
81 *
82 * These control the precise actions of ocfs2_generic_unblock_lock()
83 * and ocfs2_process_blocked_lock()
84 *
85 */
86enum ocfs2_unblock_action {
87	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
88	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
89				      * ->post_unlock callback */
90	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
91				      * ->post_unlock() callback. */
92};
93
94struct ocfs2_unblock_ctl {
95	int requeue;
96	enum ocfs2_unblock_action unblock_action;
97};
98
99static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
100			      struct ocfs2_unblock_ctl *ctl);
101static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
102			      struct ocfs2_unblock_ctl *ctl);
103static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
104				    struct ocfs2_unblock_ctl *ctl);
105static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
106				     struct ocfs2_unblock_ctl *ctl);
107static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
108				  struct ocfs2_unblock_ctl *ctl);
109
110static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
111				     struct ocfs2_lock_res *lockres);
112
113/*
114 * OCFS2 Lock Resource Operations
115 *
116 * These fine tune the behavior of the generic dlmglue locking infrastructure.
117 */
118struct ocfs2_lock_res_ops {
119	void (*bast)(void *, int);
120	int  (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
121	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
122
123	/*
124	 * LOCK_TYPE_* flags which describe the specific requirements
125	 * of a lock type. Descriptions of each individual flag follow.
126	 */
127	int flags;
128};
129
130/*
131 * Some locks want to "refresh" potentially stale data when a
132 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
133 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
134 * individual lockres l_flags member from the ast function. It is
135 * expected that the locking wrapper will clear the
136 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
137 */
138#define LOCK_TYPE_REQUIRES_REFRESH 0x1
139
140typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
141static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
142				      struct ocfs2_lock_res *lockres,
143				      struct ocfs2_unblock_ctl *ctl,
144				      ocfs2_convert_worker_t *worker);
145
146static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
147	.bast		= ocfs2_inode_bast_func,
148	.unblock	= ocfs2_unblock_inode_lock,
149	.flags		= 0,
150};
151
152static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
153	.bast		= ocfs2_inode_bast_func,
154	.unblock	= ocfs2_unblock_meta,
155	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
156};
157
158static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
159	.bast		= ocfs2_inode_bast_func,
160	.unblock	= ocfs2_unblock_data,
161	.flags		= 0,
162};
163
164static struct ocfs2_lock_res_ops ocfs2_super_lops = {
165	.bast		= ocfs2_super_bast_func,
166	.unblock	= ocfs2_unblock_osb_lock,
167	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
168};
169
170static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
171	.bast		= ocfs2_rename_bast_func,
172	.unblock	= ocfs2_unblock_osb_lock,
173	.flags		= 0,
174};
175
176static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
177	.bast		= ocfs2_dentry_bast_func,
178	.unblock	= ocfs2_unblock_dentry_lock,
179	.post_unlock	= ocfs2_dentry_post_unlock,
180	.flags		= 0,
181};
182
183static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
184{
185	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
186		lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
187		lockres->l_type == OCFS2_LOCK_TYPE_RW;
188}
189
190static inline int ocfs2_is_super_lock(struct ocfs2_lock_res *lockres)
191{
192	return lockres->l_type == OCFS2_LOCK_TYPE_SUPER;
193}
194
195static inline int ocfs2_is_rename_lock(struct ocfs2_lock_res *lockres)
196{
197	return lockres->l_type == OCFS2_LOCK_TYPE_RENAME;
198}
199
200static inline struct ocfs2_super *ocfs2_lock_res_super(struct ocfs2_lock_res *lockres)
201{
202	BUG_ON(!ocfs2_is_super_lock(lockres)
203	       && !ocfs2_is_rename_lock(lockres));
204
205	return (struct ocfs2_super *) lockres->l_priv;
206}
207
208static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
209{
210	BUG_ON(!ocfs2_is_inode_lock(lockres));
211
212	return (struct inode *) lockres->l_priv;
213}
214
215static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
216{
217	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
218
219	return (struct ocfs2_dentry_lock *)lockres->l_priv;
220}
221
222static int ocfs2_lock_create(struct ocfs2_super *osb,
223			     struct ocfs2_lock_res *lockres,
224			     int level,
225			     int dlm_flags);
226static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
227						     int wanted);
228static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
229				 struct ocfs2_lock_res *lockres,
230				 int level);
231static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
232static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
233static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
234static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
235static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
236					struct ocfs2_lock_res *lockres);
237static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
238						int convert);
239#define ocfs2_log_dlm_error(_func, _stat, _lockres) do {	\
240	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "	\
241		"resource %s: %s\n", dlm_errname(_stat), _func,	\
242		_lockres->l_name, dlm_errmsg(_stat));		\
243} while (0)
244static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
245				 struct ocfs2_lock_res *lockres);
246static int ocfs2_meta_lock_update(struct inode *inode,
247				  struct buffer_head **bh);
248static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
249static inline int ocfs2_highest_compat_lock_level(int level);
250static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
251						  struct ocfs2_lock_res *lockres,
252						  int new_level);
253
254static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
255				  u64 blkno,
256				  u32 generation,
257				  char *name)
258{
259	int len;
260
261	mlog_entry_void();
262
263	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
264
265	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
266		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
267		       (long long)blkno, generation);
268
269	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
270
271	mlog(0, "built lock resource with name: %s\n", name);
272
273	mlog_exit_void();
274}
275
276static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
277
278static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
279				       struct ocfs2_dlm_debug *dlm_debug)
280{
281	mlog(0, "Add tracking for lockres %s\n", res->l_name);
282
283	spin_lock(&ocfs2_dlm_tracking_lock);
284	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
285	spin_unlock(&ocfs2_dlm_tracking_lock);
286}
287
288static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
289{
290	spin_lock(&ocfs2_dlm_tracking_lock);
291	if (!list_empty(&res->l_debug_list))
292		list_del_init(&res->l_debug_list);
293	spin_unlock(&ocfs2_dlm_tracking_lock);
294}
295
296static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
297				       struct ocfs2_lock_res *res,
298				       enum ocfs2_lock_type type,
299				       struct ocfs2_lock_res_ops *ops,
300				       void *priv)
301{
302	res->l_type          = type;
303	res->l_ops           = ops;
304	res->l_priv          = priv;
305
306	res->l_level         = LKM_IVMODE;
307	res->l_requested     = LKM_IVMODE;
308	res->l_blocking      = LKM_IVMODE;
309	res->l_action        = OCFS2_AST_INVALID;
310	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
311
312	res->l_flags         = OCFS2_LOCK_INITIALIZED;
313
314	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
315}
316
317void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
318{
319	/* This also clears out the lock status block */
320	memset(res, 0, sizeof(struct ocfs2_lock_res));
321	spin_lock_init(&res->l_lock);
322	init_waitqueue_head(&res->l_event);
323	INIT_LIST_HEAD(&res->l_blocked_list);
324	INIT_LIST_HEAD(&res->l_mask_waiters);
325}
326
327void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
328			       enum ocfs2_lock_type type,
329			       unsigned int generation,
330			       struct inode *inode)
331{
332	struct ocfs2_lock_res_ops *ops;
333
334	switch(type) {
335		case OCFS2_LOCK_TYPE_RW:
336			ops = &ocfs2_inode_rw_lops;
337			break;
338		case OCFS2_LOCK_TYPE_META:
339			ops = &ocfs2_inode_meta_lops;
340			break;
341		case OCFS2_LOCK_TYPE_DATA:
342			ops = &ocfs2_inode_data_lops;
343			break;
344		default:
345			mlog_bug_on_msg(1, "type: %d\n", type);
346			ops = NULL; /* thanks, gcc */
347			break;
348	};
349
350	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
351			      generation, res->l_name);
352	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
353}
354
355static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
356{
357	__be64 inode_blkno_be;
358
359	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
360	       sizeof(__be64));
361
362	return be64_to_cpu(inode_blkno_be);
363}
364
365void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
366				u64 parent, struct inode *inode)
367{
368	int len;
369	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
370	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
371	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
372
373	ocfs2_lock_res_init_once(lockres);
374
375	/*
376	 * Unfortunately, the standard lock naming scheme won't work
377	 * here because we have two 16 byte values to use. Instead,
378	 * we'll stuff the inode number as a binary value. We still
379	 * want error prints to show something without garbling the
380	 * display, so drop a null byte in there before the inode
381	 * number. A future version of OCFS2 will likely use all
382	 * binary lock names. The stringified names have been a
383	 * tremendous aid in debugging, but now that the debugfs
384	 * interface exists, we can mangle things there if need be.
385	 *
386	 * NOTE: We also drop the standard "pad" value (the total lock
387	 * name size stays the same though - the last part is all
388	 * zeros due to the memset in ocfs2_lock_res_init_once()
389	 */
390	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
391		       "%c%016llx",
392		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
393		       (long long)parent);
394
395	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
396
397	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
398	       sizeof(__be64));
399
400	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
401				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
402				   dl);
403}
404
405static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
406				      struct ocfs2_super *osb)
407{
408	/* Superblock lockres doesn't come from a slab so we call init
409	 * once on it manually.  */
410	ocfs2_lock_res_init_once(res);
411	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
412			      0, res->l_name);
413	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
414				   &ocfs2_super_lops, osb);
415}
416
417static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
418				       struct ocfs2_super *osb)
419{
420	/* Rename lockres doesn't come from a slab so we call init
421	 * once on it manually.  */
422	ocfs2_lock_res_init_once(res);
423	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
424	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
425				   &ocfs2_rename_lops, osb);
426}
427
428void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
429{
430	mlog_entry_void();
431
432	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
433		return;
434
435	ocfs2_remove_lockres_tracking(res);
436
437	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
438			"Lockres %s is on the blocked list\n",
439			res->l_name);
440	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
441			"Lockres %s has mask waiters pending\n",
442			res->l_name);
443	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
444			"Lockres %s is locked\n",
445			res->l_name);
446	mlog_bug_on_msg(res->l_ro_holders,
447			"Lockres %s has %u ro holders\n",
448			res->l_name, res->l_ro_holders);
449	mlog_bug_on_msg(res->l_ex_holders,
450			"Lockres %s has %u ex holders\n",
451			res->l_name, res->l_ex_holders);
452
453	/* Need to clear out the lock status block for the dlm */
454	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
455
456	res->l_flags = 0UL;
457	mlog_exit_void();
458}
459
460static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
461				     int level)
462{
463	mlog_entry_void();
464
465	BUG_ON(!lockres);
466
467	switch(level) {
468	case LKM_EXMODE:
469		lockres->l_ex_holders++;
470		break;
471	case LKM_PRMODE:
472		lockres->l_ro_holders++;
473		break;
474	default:
475		BUG();
476	}
477
478	mlog_exit_void();
479}
480
481static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
482				     int level)
483{
484	mlog_entry_void();
485
486	BUG_ON(!lockres);
487
488	switch(level) {
489	case LKM_EXMODE:
490		BUG_ON(!lockres->l_ex_holders);
491		lockres->l_ex_holders--;
492		break;
493	case LKM_PRMODE:
494		BUG_ON(!lockres->l_ro_holders);
495		lockres->l_ro_holders--;
496		break;
497	default:
498		BUG();
499	}
500	mlog_exit_void();
501}
502
503/* WARNING: This function lives in a world where the only three lock
504 * levels are EX, PR, and NL. It *will* have to be adjusted when more
505 * lock types are added. */
506static inline int ocfs2_highest_compat_lock_level(int level)
507{
508	int new_level = LKM_EXMODE;
509
510	if (level == LKM_EXMODE)
511		new_level = LKM_NLMODE;
512	else if (level == LKM_PRMODE)
513		new_level = LKM_PRMODE;
514	return new_level;
515}
516
517static void lockres_set_flags(struct ocfs2_lock_res *lockres,
518			      unsigned long newflags)
519{
520	struct list_head *pos, *tmp;
521	struct ocfs2_mask_waiter *mw;
522
523 	assert_spin_locked(&lockres->l_lock);
524
525	lockres->l_flags = newflags;
526
527	list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
528		mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
529		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
530			continue;
531
532		list_del_init(&mw->mw_item);
533		mw->mw_status = 0;
534		complete(&mw->mw_complete);
535	}
536}
537static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
538{
539	lockres_set_flags(lockres, lockres->l_flags | or);
540}
541static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
542				unsigned long clear)
543{
544	lockres_set_flags(lockres, lockres->l_flags & ~clear);
545}
546
547static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
548{
549	mlog_entry_void();
550
551	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
552	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
553	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
554	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
555
556	lockres->l_level = lockres->l_requested;
557	if (lockres->l_level <=
558	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
559		lockres->l_blocking = LKM_NLMODE;
560		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
561	}
562	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
563
564	mlog_exit_void();
565}
566
567static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
568{
569	mlog_entry_void();
570
571	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
572	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
573
574	/* Convert from RO to EX doesn't really need anything as our
575	 * information is already up to data. Convert from NL to
576	 * *anything* however should mark ourselves as needing an
577	 * update */
578	if (lockres->l_level == LKM_NLMODE &&
579	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
580		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
581
582	lockres->l_level = lockres->l_requested;
583	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
584
585	mlog_exit_void();
586}
587
588static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
589{
590	mlog_entry_void();
591
592	BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
593	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
594
595	if (lockres->l_requested > LKM_NLMODE &&
596	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
597	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
598		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
599
600	lockres->l_level = lockres->l_requested;
601	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
602	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
603
604	mlog_exit_void();
605}
606
607static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
608				     int level)
609{
610	int needs_downconvert = 0;
611	mlog_entry_void();
612
613	assert_spin_locked(&lockres->l_lock);
614
615	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
616
617	if (level > lockres->l_blocking) {
618		/* only schedule a downconvert if we haven't already scheduled
619		 * one that goes low enough to satisfy the level we're
620		 * blocking.  this also catches the case where we get
621		 * duplicate BASTs */
622		if (ocfs2_highest_compat_lock_level(level) <
623		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
624			needs_downconvert = 1;
625
626		lockres->l_blocking = level;
627	}
628
629	mlog_exit(needs_downconvert);
630	return needs_downconvert;
631}
632
633static void ocfs2_generic_bast_func(struct ocfs2_super *osb,
634				    struct ocfs2_lock_res *lockres,
635				    int level)
636{
637	int needs_downconvert;
638	unsigned long flags;
639
640	mlog_entry_void();
641
642	BUG_ON(level <= LKM_NLMODE);
643
644	spin_lock_irqsave(&lockres->l_lock, flags);
645	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
646	if (needs_downconvert)
647		ocfs2_schedule_blocked_lock(osb, lockres);
648	spin_unlock_irqrestore(&lockres->l_lock, flags);
649
650	wake_up(&lockres->l_event);
651
652	ocfs2_kick_vote_thread(osb);
653
654	mlog_exit_void();
655}
656
657static void ocfs2_inode_bast_func(void *opaque, int level)
658{
659	struct ocfs2_lock_res *lockres = opaque;
660	struct inode *inode;
661	struct ocfs2_super *osb;
662
663	mlog_entry_void();
664
665	BUG_ON(!ocfs2_is_inode_lock(lockres));
666
667	inode = ocfs2_lock_res_inode(lockres);
668	osb = OCFS2_SB(inode->i_sb);
669
670	mlog(0, "BAST fired for inode %llu, blocking %d, level %d type %s\n",
671	     (unsigned long long)OCFS2_I(inode)->ip_blkno, level,
672	     lockres->l_level, ocfs2_lock_type_string(lockres->l_type));
673
674	ocfs2_generic_bast_func(osb, lockres, level);
675
676	mlog_exit_void();
677}
678
679static void ocfs2_locking_ast(void *opaque)
680{
681	struct ocfs2_lock_res *lockres = opaque;
682	struct dlm_lockstatus *lksb = &lockres->l_lksb;
683	unsigned long flags;
684
685	spin_lock_irqsave(&lockres->l_lock, flags);
686
687	if (lksb->status != DLM_NORMAL) {
688		mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
689		     lockres->l_name, lksb->status);
690		spin_unlock_irqrestore(&lockres->l_lock, flags);
691		return;
692	}
693
694	switch(lockres->l_action) {
695	case OCFS2_AST_ATTACH:
696		ocfs2_generic_handle_attach_action(lockres);
697		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
698		break;
699	case OCFS2_AST_CONVERT:
700		ocfs2_generic_handle_convert_action(lockres);
701		break;
702	case OCFS2_AST_DOWNCONVERT:
703		ocfs2_generic_handle_downconvert_action(lockres);
704		break;
705	default:
706		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
707		     "lockres flags = 0x%lx, unlock action: %u\n",
708		     lockres->l_name, lockres->l_action, lockres->l_flags,
709		     lockres->l_unlock_action);
710		BUG();
711	}
712
713	/* set it to something invalid so if we get called again we
714	 * can catch it. */
715	lockres->l_action = OCFS2_AST_INVALID;
716
717	wake_up(&lockres->l_event);
718	spin_unlock_irqrestore(&lockres->l_lock, flags);
719}
720
721static void ocfs2_super_bast_func(void *opaque,
722				  int level)
723{
724	struct ocfs2_lock_res *lockres = opaque;
725	struct ocfs2_super *osb;
726
727	mlog_entry_void();
728	mlog(0, "Superblock BAST fired\n");
729
730	BUG_ON(!ocfs2_is_super_lock(lockres));
731       	osb = ocfs2_lock_res_super(lockres);
732	ocfs2_generic_bast_func(osb, lockres, level);
733
734	mlog_exit_void();
735}
736
737static void ocfs2_rename_bast_func(void *opaque,
738				   int level)
739{
740	struct ocfs2_lock_res *lockres = opaque;
741	struct ocfs2_super *osb;
742
743	mlog_entry_void();
744
745	mlog(0, "Rename BAST fired\n");
746
747	BUG_ON(!ocfs2_is_rename_lock(lockres));
748
749	osb = ocfs2_lock_res_super(lockres);
750	ocfs2_generic_bast_func(osb, lockres, level);
751
752	mlog_exit_void();
753}
754
755static void ocfs2_dentry_bast_func(void *opaque, int level)
756{
757	struct ocfs2_lock_res *lockres = opaque;
758	struct ocfs2_dentry_lock *dl = lockres->l_priv;
759	struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
760
761	mlog(0, "Dentry bast: level: %d, name: %s\n", level,
762	     lockres->l_name);
763
764	ocfs2_generic_bast_func(osb, lockres, level);
765}
766
767static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
768						int convert)
769{
770	unsigned long flags;
771
772	mlog_entry_void();
773	spin_lock_irqsave(&lockres->l_lock, flags);
774	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
775	if (convert)
776		lockres->l_action = OCFS2_AST_INVALID;
777	else
778		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
779	spin_unlock_irqrestore(&lockres->l_lock, flags);
780
781	wake_up(&lockres->l_event);
782	mlog_exit_void();
783}
784
785/* Note: If we detect another process working on the lock (i.e.,
786 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
787 * to do the right thing in that case.
788 */
789static int ocfs2_lock_create(struct ocfs2_super *osb,
790			     struct ocfs2_lock_res *lockres,
791			     int level,
792			     int dlm_flags)
793{
794	int ret = 0;
795	enum dlm_status status;
796	unsigned long flags;
797
798	mlog_entry_void();
799
800	mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
801	     dlm_flags);
802
803	spin_lock_irqsave(&lockres->l_lock, flags);
804	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
805	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
806		spin_unlock_irqrestore(&lockres->l_lock, flags);
807		goto bail;
808	}
809
810	lockres->l_action = OCFS2_AST_ATTACH;
811	lockres->l_requested = level;
812	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
813	spin_unlock_irqrestore(&lockres->l_lock, flags);
814
815	status = dlmlock(osb->dlm,
816			 level,
817			 &lockres->l_lksb,
818			 dlm_flags,
819			 lockres->l_name,
820			 OCFS2_LOCK_ID_MAX_LEN - 1,
821			 ocfs2_locking_ast,
822			 lockres,
823			 lockres->l_ops->bast);
824	if (status != DLM_NORMAL) {
825		ocfs2_log_dlm_error("dlmlock", status, lockres);
826		ret = -EINVAL;
827		ocfs2_recover_from_dlm_error(lockres, 1);
828	}
829
830	mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
831
832bail:
833	mlog_exit(ret);
834	return ret;
835}
836
837static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
838					int flag)
839{
840	unsigned long flags;
841	int ret;
842
843	spin_lock_irqsave(&lockres->l_lock, flags);
844	ret = lockres->l_flags & flag;
845	spin_unlock_irqrestore(&lockres->l_lock, flags);
846
847	return ret;
848}
849
850static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
851
852{
853	wait_event(lockres->l_event,
854		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
855}
856
857static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
858
859{
860	wait_event(lockres->l_event,
861		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
862}
863
864/* predict what lock level we'll be dropping down to on behalf
865 * of another node, and return true if the currently wanted
866 * level will be compatible with it. */
867static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
868						     int wanted)
869{
870	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
871
872	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
873}
874
875static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
876{
877	INIT_LIST_HEAD(&mw->mw_item);
878	init_completion(&mw->mw_complete);
879}
880
881static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
882{
883	wait_for_completion(&mw->mw_complete);
884	/* Re-arm the completion in case we want to wait on it again */
885	INIT_COMPLETION(mw->mw_complete);
886	return mw->mw_status;
887}
888
889static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
890				    struct ocfs2_mask_waiter *mw,
891				    unsigned long mask,
892				    unsigned long goal)
893{
894	BUG_ON(!list_empty(&mw->mw_item));
895
896	assert_spin_locked(&lockres->l_lock);
897
898	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
899	mw->mw_mask = mask;
900	mw->mw_goal = goal;
901}
902
903/* returns 0 if the mw that was removed was already satisfied, -EBUSY
904 * if the mask still hadn't reached its goal */
905static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
906				      struct ocfs2_mask_waiter *mw)
907{
908	unsigned long flags;
909	int ret = 0;
910
911	spin_lock_irqsave(&lockres->l_lock, flags);
912	if (!list_empty(&mw->mw_item)) {
913		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
914			ret = -EBUSY;
915
916		list_del_init(&mw->mw_item);
917		init_completion(&mw->mw_complete);
918	}
919	spin_unlock_irqrestore(&lockres->l_lock, flags);
920
921	return ret;
922
923}
924
925static int ocfs2_cluster_lock(struct ocfs2_super *osb,
926			      struct ocfs2_lock_res *lockres,
927			      int level,
928			      int lkm_flags,
929			      int arg_flags)
930{
931	struct ocfs2_mask_waiter mw;
932	enum dlm_status status;
933	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
934	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
935	unsigned long flags;
936
937	mlog_entry_void();
938
939	ocfs2_init_mask_waiter(&mw);
940
941again:
942	wait = 0;
943
944	if (catch_signals && signal_pending(current)) {
945		ret = -ERESTARTSYS;
946		goto out;
947	}
948
949	spin_lock_irqsave(&lockres->l_lock, flags);
950
951	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
952			"Cluster lock called on freeing lockres %s! flags "
953			"0x%lx\n", lockres->l_name, lockres->l_flags);
954
955	/* We only compare against the currently granted level
956	 * here. If the lock is blocked waiting on a downconvert,
957	 * we'll get caught below. */
958	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
959	    level > lockres->l_level) {
960		/* is someone sitting in dlm_lock? If so, wait on
961		 * them. */
962		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
963		wait = 1;
964		goto unlock;
965	}
966
967	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
968		/* lock has not been created yet. */
969		spin_unlock_irqrestore(&lockres->l_lock, flags);
970
971		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
972		if (ret < 0) {
973			mlog_errno(ret);
974			goto out;
975		}
976		goto again;
977	}
978
979	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
980	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
981		/* is the lock is currently blocked on behalf of
982		 * another node */
983		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
984		wait = 1;
985		goto unlock;
986	}
987
988	if (level > lockres->l_level) {
989		if (lockres->l_action != OCFS2_AST_INVALID)
990			mlog(ML_ERROR, "lockres %s has action %u pending\n",
991			     lockres->l_name, lockres->l_action);
992
993		lockres->l_action = OCFS2_AST_CONVERT;
994		lockres->l_requested = level;
995		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
996		spin_unlock_irqrestore(&lockres->l_lock, flags);
997
998		BUG_ON(level == LKM_IVMODE);
999		BUG_ON(level == LKM_NLMODE);
1000
1001		mlog(0, "lock %s, convert from %d to level = %d\n",
1002		     lockres->l_name, lockres->l_level, level);
1003
1004		/* call dlm_lock to upgrade lock now */
1005		status = dlmlock(osb->dlm,
1006				 level,
1007				 &lockres->l_lksb,
1008				 lkm_flags|LKM_CONVERT|LKM_VALBLK,
1009				 lockres->l_name,
1010				 OCFS2_LOCK_ID_MAX_LEN - 1,
1011				 ocfs2_locking_ast,
1012				 lockres,
1013				 lockres->l_ops->bast);
1014		if (status != DLM_NORMAL) {
1015			if ((lkm_flags & LKM_NOQUEUE) &&
1016			    (status == DLM_NOTQUEUED))
1017				ret = -EAGAIN;
1018			else {
1019				ocfs2_log_dlm_error("dlmlock", status,
1020						    lockres);
1021				ret = -EINVAL;
1022			}
1023			ocfs2_recover_from_dlm_error(lockres, 1);
1024			goto out;
1025		}
1026
1027		mlog(0, "lock %s, successfull return from dlmlock\n",
1028		     lockres->l_name);
1029
1030		/* At this point we've gone inside the dlm and need to
1031		 * complete our work regardless. */
1032		catch_signals = 0;
1033
1034		/* wait for busy to clear and carry on */
1035		goto again;
1036	}
1037
1038	/* Ok, if we get here then we're good to go. */
1039	ocfs2_inc_holders(lockres, level);
1040
1041	ret = 0;
1042unlock:
1043	spin_unlock_irqrestore(&lockres->l_lock, flags);
1044out:
1045	/*
1046	 * This is helping work around a lock inversion between the page lock
1047	 * and dlm locks.  One path holds the page lock while calling aops
1048	 * which block acquiring dlm locks.  The voting thread holds dlm
1049	 * locks while acquiring page locks while down converting data locks.
1050	 * This block is helping an aop path notice the inversion and back
1051	 * off to unlock its page lock before trying the dlm lock again.
1052	 */
1053	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1054	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1055		wait = 0;
1056		if (lockres_remove_mask_waiter(lockres, &mw))
1057			ret = -EAGAIN;
1058		else
1059			goto again;
1060	}
1061	if (wait) {
1062		ret = ocfs2_wait_for_mask(&mw);
1063		if (ret == 0)
1064			goto again;
1065		mlog_errno(ret);
1066	}
1067
1068	mlog_exit(ret);
1069	return ret;
1070}
1071
1072static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1073				 struct ocfs2_lock_res *lockres,
1074				 int level)
1075{
1076	unsigned long flags;
1077
1078	mlog_entry_void();
1079	spin_lock_irqsave(&lockres->l_lock, flags);
1080	ocfs2_dec_holders(lockres, level);
1081	ocfs2_vote_on_unlock(osb, lockres);
1082	spin_unlock_irqrestore(&lockres->l_lock, flags);
1083	mlog_exit_void();
1084}
1085
1086int ocfs2_create_new_lock(struct ocfs2_super *osb,
1087			  struct ocfs2_lock_res *lockres,
1088			  int ex,
1089			  int local)
1090{
1091	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1092	unsigned long flags;
1093	int lkm_flags = local ? LKM_LOCAL : 0;
1094
1095	spin_lock_irqsave(&lockres->l_lock, flags);
1096	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1097	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1098	spin_unlock_irqrestore(&lockres->l_lock, flags);
1099
1100	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1101}
1102
1103/* Grants us an EX lock on the data and metadata resources, skipping
1104 * the normal cluster directory lookup. Use this ONLY on newly created
1105 * inodes which other nodes can't possibly see, and which haven't been
1106 * hashed in the inode hash yet. This can give us a good performance
1107 * increase as it'll skip the network broadcast normally associated
1108 * with creating a new lock resource. */
1109int ocfs2_create_new_inode_locks(struct inode *inode)
1110{
1111	int ret;
1112	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1113
1114	BUG_ON(!inode);
1115	BUG_ON(!ocfs2_inode_is_new(inode));
1116
1117	mlog_entry_void();
1118
1119	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1120
1121	/* NOTE: That we don't increment any of the holder counts, nor
1122	 * do we add anything to a journal handle. Since this is
1123	 * supposed to be a new inode which the cluster doesn't know
1124	 * about yet, there is no need to.  As far as the LVB handling
1125	 * is concerned, this is basically like acquiring an EX lock
1126	 * on a resource which has an invalid one -- we'll set it
1127	 * valid when we release the EX. */
1128
1129	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1130	if (ret) {
1131		mlog_errno(ret);
1132		goto bail;
1133	}
1134
1135	/*
1136	 * We don't want to use LKM_LOCAL on a meta data lock as they
1137	 * don't use a generation in their lock names.
1138	 */
1139	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1140	if (ret) {
1141		mlog_errno(ret);
1142		goto bail;
1143	}
1144
1145	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1146	if (ret) {
1147		mlog_errno(ret);
1148		goto bail;
1149	}
1150
1151bail:
1152	mlog_exit(ret);
1153	return ret;
1154}
1155
1156int ocfs2_rw_lock(struct inode *inode, int write)
1157{
1158	int status, level;
1159	struct ocfs2_lock_res *lockres;
1160
1161	BUG_ON(!inode);
1162
1163	mlog_entry_void();
1164
1165	mlog(0, "inode %llu take %s RW lock\n",
1166	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1167	     write ? "EXMODE" : "PRMODE");
1168
1169	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1170
1171	level = write ? LKM_EXMODE : LKM_PRMODE;
1172
1173	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1174				    0);
1175	if (status < 0)
1176		mlog_errno(status);
1177
1178	mlog_exit(status);
1179	return status;
1180}
1181
1182void ocfs2_rw_unlock(struct inode *inode, int write)
1183{
1184	int level = write ? LKM_EXMODE : LKM_PRMODE;
1185	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1186
1187	mlog_entry_void();
1188
1189	mlog(0, "inode %llu drop %s RW lock\n",
1190	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1191	     write ? "EXMODE" : "PRMODE");
1192
1193	ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1194
1195	mlog_exit_void();
1196}
1197
1198int ocfs2_data_lock_full(struct inode *inode,
1199			 int write,
1200			 int arg_flags)
1201{
1202	int status = 0, level;
1203	struct ocfs2_lock_res *lockres;
1204
1205	BUG_ON(!inode);
1206
1207	mlog_entry_void();
1208
1209	mlog(0, "inode %llu take %s DATA lock\n",
1210	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1211	     write ? "EXMODE" : "PRMODE");
1212
1213	/* We'll allow faking a readonly data lock for
1214	 * rodevices. */
1215	if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1216		if (write) {
1217			status = -EROFS;
1218			mlog_errno(status);
1219		}
1220		goto out;
1221	}
1222
1223	lockres = &OCFS2_I(inode)->ip_data_lockres;
1224
1225	level = write ? LKM_EXMODE : LKM_PRMODE;
1226
1227	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1228				    0, arg_flags);
1229	if (status < 0 && status != -EAGAIN)
1230		mlog_errno(status);
1231
1232out:
1233	mlog_exit(status);
1234	return status;
1235}
1236
1237/* see ocfs2_meta_lock_with_page() */
1238int ocfs2_data_lock_with_page(struct inode *inode,
1239			      int write,
1240			      struct page *page)
1241{
1242	int ret;
1243
1244	ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1245	if (ret == -EAGAIN) {
1246		unlock_page(page);
1247		if (ocfs2_data_lock(inode, write) == 0)
1248			ocfs2_data_unlock(inode, write);
1249		ret = AOP_TRUNCATED_PAGE;
1250	}
1251
1252	return ret;
1253}
1254
1255static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1256				 struct ocfs2_lock_res *lockres)
1257{
1258	int kick = 0;
1259
1260	mlog_entry_void();
1261
1262	/* If we know that another node is waiting on our lock, kick
1263	 * the vote thread * pre-emptively when we reach a release
1264	 * condition. */
1265	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1266		switch(lockres->l_blocking) {
1267		case LKM_EXMODE:
1268			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1269				kick = 1;
1270			break;
1271		case LKM_PRMODE:
1272			if (!lockres->l_ex_holders)
1273				kick = 1;
1274			break;
1275		default:
1276			BUG();
1277		}
1278	}
1279
1280	if (kick)
1281		ocfs2_kick_vote_thread(osb);
1282
1283	mlog_exit_void();
1284}
1285
1286void ocfs2_data_unlock(struct inode *inode,
1287		       int write)
1288{
1289	int level = write ? LKM_EXMODE : LKM_PRMODE;
1290	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1291
1292	mlog_entry_void();
1293
1294	mlog(0, "inode %llu drop %s DATA lock\n",
1295	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1296	     write ? "EXMODE" : "PRMODE");
1297
1298	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1299		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1300
1301	mlog_exit_void();
1302}
1303
1304#define OCFS2_SEC_BITS   34
1305#define OCFS2_SEC_SHIFT  (64 - 34)
1306#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1307
1308/* LVB only has room for 64 bits of time here so we pack it for
1309 * now. */
1310static u64 ocfs2_pack_timespec(struct timespec *spec)
1311{
1312	u64 res;
1313	u64 sec = spec->tv_sec;
1314	u32 nsec = spec->tv_nsec;
1315
1316	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1317
1318	return res;
1319}
1320
1321/* Call this with the lockres locked. I am reasonably sure we don't
1322 * need ip_lock in this function as anyone who would be changing those
1323 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1324static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1325{
1326	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1327	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1328	struct ocfs2_meta_lvb *lvb;
1329
1330	mlog_entry_void();
1331
1332	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1333
1334	/*
1335	 * Invalidate the LVB of a deleted inode - this way other
1336	 * nodes are forced to go to disk and discover the new inode
1337	 * status.
1338	 */
1339	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1340		lvb->lvb_version = 0;
1341		goto out;
1342	}
1343
1344	lvb->lvb_version   = OCFS2_LVB_VERSION;
1345	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1346	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1347	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1348	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1349	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1350	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1351	lvb->lvb_iatime_packed  =
1352		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1353	lvb->lvb_ictime_packed =
1354		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1355	lvb->lvb_imtime_packed =
1356		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1357	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1358	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1359
1360out:
1361	mlog_meta_lvb(0, lockres);
1362
1363	mlog_exit_void();
1364}
1365
1366static void ocfs2_unpack_timespec(struct timespec *spec,
1367				  u64 packed_time)
1368{
1369	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1370	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1371}
1372
1373static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1374{
1375	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1376	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1377	struct ocfs2_meta_lvb *lvb;
1378
1379	mlog_entry_void();
1380
1381	mlog_meta_lvb(0, lockres);
1382
1383	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1384
1385	/* We're safe here without the lockres lock... */
1386	spin_lock(&oi->ip_lock);
1387	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1388	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1389
1390	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1391	ocfs2_set_inode_flags(inode);
1392
1393	/* fast-symlinks are a special case */
1394	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1395		inode->i_blocks = 0;
1396	else
1397		inode->i_blocks =
1398			ocfs2_align_bytes_to_sectors(i_size_read(inode));
1399
1400	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1401	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1402	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1403	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1404	ocfs2_unpack_timespec(&inode->i_atime,
1405			      be64_to_cpu(lvb->lvb_iatime_packed));
1406	ocfs2_unpack_timespec(&inode->i_mtime,
1407			      be64_to_cpu(lvb->lvb_imtime_packed));
1408	ocfs2_unpack_timespec(&inode->i_ctime,
1409			      be64_to_cpu(lvb->lvb_ictime_packed));
1410	spin_unlock(&oi->ip_lock);
1411
1412	mlog_exit_void();
1413}
1414
1415static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1416					      struct ocfs2_lock_res *lockres)
1417{
1418	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1419
1420	if (lvb->lvb_version == OCFS2_LVB_VERSION
1421	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1422		return 1;
1423	return 0;
1424}
1425
1426/* Determine whether a lock resource needs to be refreshed, and
1427 * arbitrate who gets to refresh it.
1428 *
1429 *   0 means no refresh needed.
1430 *
1431 *   > 0 means you need to refresh this and you MUST call
1432 *   ocfs2_complete_lock_res_refresh afterwards. */
1433static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1434{
1435	unsigned long flags;
1436	int status = 0;
1437
1438	mlog_entry_void();
1439
1440refresh_check:
1441	spin_lock_irqsave(&lockres->l_lock, flags);
1442	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1443		spin_unlock_irqrestore(&lockres->l_lock, flags);
1444		goto bail;
1445	}
1446
1447	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1448		spin_unlock_irqrestore(&lockres->l_lock, flags);
1449
1450		ocfs2_wait_on_refreshing_lock(lockres);
1451		goto refresh_check;
1452	}
1453
1454	/* Ok, I'll be the one to refresh this lock. */
1455	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1456	spin_unlock_irqrestore(&lockres->l_lock, flags);
1457
1458	status = 1;
1459bail:
1460	mlog_exit(status);
1461	return status;
1462}
1463
1464/* If status is non zero, I'll mark it as not being in refresh
1465 * anymroe, but i won't clear the needs refresh flag. */
1466static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1467						   int status)
1468{
1469	unsigned long flags;
1470	mlog_entry_void();
1471
1472	spin_lock_irqsave(&lockres->l_lock, flags);
1473	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1474	if (!status)
1475		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1476	spin_unlock_irqrestore(&lockres->l_lock, flags);
1477
1478	wake_up(&lockres->l_event);
1479
1480	mlog_exit_void();
1481}
1482
1483/* may or may not return a bh if it went to disk. */
1484static int ocfs2_meta_lock_update(struct inode *inode,
1485				  struct buffer_head **bh)
1486{
1487	int status = 0;
1488	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1489	struct ocfs2_lock_res *lockres;
1490	struct ocfs2_dinode *fe;
1491
1492	mlog_entry_void();
1493
1494	spin_lock(&oi->ip_lock);
1495	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1496		mlog(0, "Orphaned inode %llu was deleted while we "
1497		     "were waiting on a lock. ip_flags = 0x%x\n",
1498		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
1499		spin_unlock(&oi->ip_lock);
1500		status = -ENOENT;
1501		goto bail;
1502	}
1503	spin_unlock(&oi->ip_lock);
1504
1505	lockres = &oi->ip_meta_lockres;
1506
1507	if (!ocfs2_should_refresh_lock_res(lockres))
1508		goto bail;
1509
1510	/* This will discard any caching information we might have had
1511	 * for the inode metadata. */
1512	ocfs2_metadata_cache_purge(inode);
1513
1514	/* will do nothing for inode types that don't use the extent
1515	 * map (directories, bitmap files, etc) */
1516	ocfs2_extent_map_trunc(inode, 0);
1517
1518	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1519		mlog(0, "Trusting LVB on inode %llu\n",
1520		     (unsigned long long)oi->ip_blkno);
1521		ocfs2_refresh_inode_from_lvb(inode);
1522	} else {
1523		/* Boo, we have to go to disk. */
1524		/* read bh, cast, ocfs2_refresh_inode */
1525		status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1526					  bh, OCFS2_BH_CACHED, inode);
1527		if (status < 0) {
1528			mlog_errno(status);
1529			goto bail_refresh;
1530		}
1531		fe = (struct ocfs2_dinode *) (*bh)->b_data;
1532
1533		/* This is a good chance to make sure we're not
1534		 * locking an invalid object.
1535		 *
1536		 * We bug on a stale inode here because we checked
1537		 * above whether it was wiped from disk. The wiping
1538		 * node provides a guarantee that we receive that
1539		 * message and can mark the inode before dropping any
1540		 * locks associated with it. */
1541		if (!OCFS2_IS_VALID_DINODE(fe)) {
1542			OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1543			status = -EIO;
1544			goto bail_refresh;
1545		}
1546		mlog_bug_on_msg(inode->i_generation !=
1547				le32_to_cpu(fe->i_generation),
1548				"Invalid dinode %llu disk generation: %u "
1549				"inode->i_generation: %u\n",
1550				(unsigned long long)oi->ip_blkno,
1551				le32_to_cpu(fe->i_generation),
1552				inode->i_generation);
1553		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1554				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1555				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
1556				(unsigned long long)oi->ip_blkno,
1557				(unsigned long long)le64_to_cpu(fe->i_dtime),
1558				le32_to_cpu(fe->i_flags));
1559
1560		ocfs2_refresh_inode(inode, fe);
1561	}
1562
1563	status = 0;
1564bail_refresh:
1565	ocfs2_complete_lock_res_refresh(lockres, status);
1566bail:
1567	mlog_exit(status);
1568	return status;
1569}
1570
1571static int ocfs2_assign_bh(struct inode *inode,
1572			   struct buffer_head **ret_bh,
1573			   struct buffer_head *passed_bh)
1574{
1575	int status;
1576
1577	if (passed_bh) {
1578		/* Ok, the update went to disk for us, use the
1579		 * returned bh. */
1580		*ret_bh = passed_bh;
1581		get_bh(*ret_bh);
1582
1583		return 0;
1584	}
1585
1586	status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1587				  OCFS2_I(inode)->ip_blkno,
1588				  ret_bh,
1589				  OCFS2_BH_CACHED,
1590				  inode);
1591	if (status < 0)
1592		mlog_errno(status);
1593
1594	return status;
1595}
1596
1597/*
1598 * returns < 0 error if the callback will never be called, otherwise
1599 * the result of the lock will be communicated via the callback.
1600 */
1601int ocfs2_meta_lock_full(struct inode *inode,
1602			 struct ocfs2_journal_handle *handle,
1603			 struct buffer_head **ret_bh,
1604			 int ex,
1605			 int arg_flags)
1606{
1607	int status, level, dlm_flags, acquired;
1608	struct ocfs2_lock_res *lockres;
1609	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1610	struct buffer_head *local_bh = NULL;
1611
1612	BUG_ON(!inode);
1613
1614	mlog_entry_void();
1615
1616	mlog(0, "inode %llu, take %s META lock\n",
1617	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1618	     ex ? "EXMODE" : "PRMODE");
1619
1620	status = 0;
1621	acquired = 0;
1622	/* We'll allow faking a readonly metadata lock for
1623	 * rodevices. */
1624	if (ocfs2_is_hard_readonly(osb)) {
1625		if (ex)
1626			status = -EROFS;
1627		goto bail;
1628	}
1629
1630	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1631		wait_event(osb->recovery_event,
1632			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1633
1634	acquired = 0;
1635	lockres = &OCFS2_I(inode)->ip_meta_lockres;
1636	level = ex ? LKM_EXMODE : LKM_PRMODE;
1637	dlm_flags = 0;
1638	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1639		dlm_flags |= LKM_NOQUEUE;
1640
1641	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1642	if (status < 0) {
1643		if (status != -EAGAIN && status != -EIOCBRETRY)
1644			mlog_errno(status);
1645		goto bail;
1646	}
1647
1648	/* Notify the error cleanup path to drop the cluster lock. */
1649	acquired = 1;
1650
1651	/* We wait twice because a node may have died while we were in
1652	 * the lower dlm layers. The second time though, we've
1653	 * committed to owning this lock so we don't allow signals to
1654	 * abort the operation. */
1655	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1656		wait_event(osb->recovery_event,
1657			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1658
1659	/*
1660	 * We only see this flag if we're being called from
1661	 * ocfs2_read_locked_inode(). It means we're locking an inode
1662	 * which hasn't been populated yet, so clear the refresh flag
1663	 * and let the caller handle it.
1664	 */
1665	if (inode->i_state & I_NEW) {
1666		status = 0;
1667		ocfs2_complete_lock_res_refresh(lockres, 0);
1668		goto bail;
1669	}
1670
1671	/* This is fun. The caller may want a bh back, or it may
1672	 * not. ocfs2_meta_lock_update definitely wants one in, but
1673	 * may or may not read one, depending on what's in the
1674	 * LVB. The result of all of this is that we've *only* gone to
1675	 * disk if we have to, so the complexity is worthwhile. */
1676	status = ocfs2_meta_lock_update(inode, &local_bh);
1677	if (status < 0) {
1678		if (status != -ENOENT)
1679			mlog_errno(status);
1680		goto bail;
1681	}
1682
1683	if (ret_bh) {
1684		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1685		if (status < 0) {
1686			mlog_errno(status);
1687			goto bail;
1688		}
1689	}
1690
1691	if (handle) {
1692		status = ocfs2_handle_add_lock(handle, inode);
1693		if (status < 0)
1694			mlog_errno(status);
1695	}
1696
1697bail:
1698	if (status < 0) {
1699		if (ret_bh && (*ret_bh)) {
1700			brelse(*ret_bh);
1701			*ret_bh = NULL;
1702		}
1703		if (acquired)
1704			ocfs2_meta_unlock(inode, ex);
1705	}
1706
1707	if (local_bh)
1708		brelse(local_bh);
1709
1710	mlog_exit(status);
1711	return status;
1712}
1713
1714/*
1715 * This is working around a lock inversion between tasks acquiring DLM locks
1716 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1717 * while acquiring page locks.
1718 *
1719 * ** These _with_page variantes are only intended to be called from aop
1720 * methods that hold page locks and return a very specific *positive* error
1721 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1722 *
1723 * The DLM is called such that it returns -EAGAIN if it would have blocked
1724 * waiting for the vote thread.  In that case we unlock our page so the vote
1725 * thread can make progress.  Once we've done this we have to return
1726 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1727 * into the VFS who will then immediately retry the aop call.
1728 *
1729 * We do a blocking lock and immediate unlock before returning, though, so that
1730 * the lock has a great chance of being cached on this node by the time the VFS
1731 * calls back to retry the aop.    This has a potential to livelock as nodes
1732 * ping locks back and forth, but that's a risk we're willing to take to avoid
1733 * the lock inversion simply.
1734 */
1735int ocfs2_meta_lock_with_page(struct inode *inode,
1736			      struct ocfs2_journal_handle *handle,
1737			      struct buffer_head **ret_bh,
1738			      int ex,
1739			      struct page *page)
1740{
1741	int ret;
1742
1743	ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1744				   OCFS2_LOCK_NONBLOCK);
1745	if (ret == -EAGAIN) {
1746		unlock_page(page);
1747		if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1748			ocfs2_meta_unlock(inode, ex);
1749		ret = AOP_TRUNCATED_PAGE;
1750	}
1751
1752	return ret;
1753}
1754
1755void ocfs2_meta_unlock(struct inode *inode,
1756		       int ex)
1757{
1758	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1759	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1760
1761	mlog_entry_void();
1762
1763	mlog(0, "inode %llu drop %s META lock\n",
1764	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1765	     ex ? "EXMODE" : "PRMODE");
1766
1767	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1768		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1769
1770	mlog_exit_void();
1771}
1772
1773int ocfs2_super_lock(struct ocfs2_super *osb,
1774		     int ex)
1775{
1776	int status;
1777	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1778	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1779	struct buffer_head *bh;
1780	struct ocfs2_slot_info *si = osb->slot_info;
1781
1782	mlog_entry_void();
1783
1784	if (ocfs2_is_hard_readonly(osb))
1785		return -EROFS;
1786
1787	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1788	if (status < 0) {
1789		mlog_errno(status);
1790		goto bail;
1791	}
1792
1793	/* The super block lock path is really in the best position to
1794	 * know when resources covered by the lock need to be
1795	 * refreshed, so we do it here. Of course, making sense of
1796	 * everything is up to the caller :) */
1797	status = ocfs2_should_refresh_lock_res(lockres);
1798	if (status < 0) {
1799		mlog_errno(status);
1800		goto bail;
1801	}
1802	if (status) {
1803		bh = si->si_bh;
1804		status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1805					  si->si_inode);
1806		if (status == 0)
1807			ocfs2_update_slot_info(si);
1808
1809		ocfs2_complete_lock_res_refresh(lockres, status);
1810
1811		if (status < 0)
1812			mlog_errno(status);
1813	}
1814bail:
1815	mlog_exit(status);
1816	return status;
1817}
1818
1819void ocfs2_super_unlock(struct ocfs2_super *osb,
1820			int ex)
1821{
1822	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1823	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1824
1825	ocfs2_cluster_unlock(osb, lockres, level);
1826}
1827
1828int ocfs2_rename_lock(struct ocfs2_super *osb)
1829{
1830	int status;
1831	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1832
1833	if (ocfs2_is_hard_readonly(osb))
1834		return -EROFS;
1835
1836	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1837	if (status < 0)
1838		mlog_errno(status);
1839
1840	return status;
1841}
1842
1843void ocfs2_rename_unlock(struct ocfs2_super *osb)
1844{
1845	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1846
1847	ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1848}
1849
1850int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1851{
1852	int ret;
1853	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1854	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1855	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1856
1857	BUG_ON(!dl);
1858
1859	if (ocfs2_is_hard_readonly(osb))
1860		return -EROFS;
1861
1862	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1863	if (ret < 0)
1864		mlog_errno(ret);
1865
1866	return ret;
1867}
1868
1869void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1870{
1871	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1872	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1873	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1874
1875	ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1876}
1877
1878/* Reference counting of the dlm debug structure. We want this because
1879 * open references on the debug inodes can live on after a mount, so
1880 * we can't rely on the ocfs2_super to always exist. */
1881static void ocfs2_dlm_debug_free(struct kref *kref)
1882{
1883	struct ocfs2_dlm_debug *dlm_debug;
1884
1885	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1886
1887	kfree(dlm_debug);
1888}
1889
1890void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1891{
1892	if (dlm_debug)
1893		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1894}
1895
1896static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1897{
1898	kref_get(&debug->d_refcnt);
1899}
1900
1901struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1902{
1903	struct ocfs2_dlm_debug *dlm_debug;
1904
1905	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1906	if (!dlm_debug) {
1907		mlog_errno(-ENOMEM);
1908		goto out;
1909	}
1910
1911	kref_init(&dlm_debug->d_refcnt);
1912	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1913	dlm_debug->d_locking_state = NULL;
1914out:
1915	return dlm_debug;
1916}
1917
1918/* Access to this is arbitrated for us via seq_file->sem. */
1919struct ocfs2_dlm_seq_priv {
1920	struct ocfs2_dlm_debug *p_dlm_debug;
1921	struct ocfs2_lock_res p_iter_res;
1922	struct ocfs2_lock_res p_tmp_res;
1923};
1924
1925static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1926						 struct ocfs2_dlm_seq_priv *priv)
1927{
1928	struct ocfs2_lock_res *iter, *ret = NULL;
1929	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1930
1931	assert_spin_locked(&ocfs2_dlm_tracking_lock);
1932
1933	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1934		/* discover the head of the list */
1935		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1936			mlog(0, "End of list found, %p\n", ret);
1937			break;
1938		}
1939
1940		/* We track our "dummy" iteration lockres' by a NULL
1941		 * l_ops field. */
1942		if (iter->l_ops != NULL) {
1943			ret = iter;
1944			break;
1945		}
1946	}
1947
1948	return ret;
1949}
1950
1951static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1952{
1953	struct ocfs2_dlm_seq_priv *priv = m->private;
1954	struct ocfs2_lock_res *iter;
1955
1956	spin_lock(&ocfs2_dlm_tracking_lock);
1957	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
1958	if (iter) {
1959		/* Since lockres' have the lifetime of their container
1960		 * (which can be inodes, ocfs2_supers, etc) we want to
1961		 * copy this out to a temporary lockres while still
1962		 * under the spinlock. Obviously after this we can't
1963		 * trust any pointers on the copy returned, but that's
1964		 * ok as the information we want isn't typically held
1965		 * in them. */
1966		priv->p_tmp_res = *iter;
1967		iter = &priv->p_tmp_res;
1968	}
1969	spin_unlock(&ocfs2_dlm_tracking_lock);
1970
1971	return iter;
1972}
1973
1974static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
1975{
1976}
1977
1978static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
1979{
1980	struct ocfs2_dlm_seq_priv *priv = m->private;
1981	struct ocfs2_lock_res *iter = v;
1982	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
1983
1984	spin_lock(&ocfs2_dlm_tracking_lock);
1985	iter = ocfs2_dlm_next_res(iter, priv);
1986	list_del_init(&dummy->l_debug_list);
1987	if (iter) {
1988		list_add(&dummy->l_debug_list, &iter->l_debug_list);
1989		priv->p_tmp_res = *iter;
1990		iter = &priv->p_tmp_res;
1991	}
1992	spin_unlock(&ocfs2_dlm_tracking_lock);
1993
1994	return iter;
1995}
1996
1997/* So that debugfs.ocfs2 can determine which format is being used */
1998#define OCFS2_DLM_DEBUG_STR_VERSION 1
1999static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2000{
2001	int i;
2002	char *lvb;
2003	struct ocfs2_lock_res *lockres = v;
2004
2005	if (!lockres)
2006		return -EINVAL;
2007
2008	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2009
2010	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2011		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2012			   lockres->l_name,
2013			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2014	else
2015		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2016
2017	seq_printf(m, "%d\t"
2018		   "0x%lx\t"
2019		   "0x%x\t"
2020		   "0x%x\t"
2021		   "%u\t"
2022		   "%u\t"
2023		   "%d\t"
2024		   "%d\t",
2025		   lockres->l_level,
2026		   lockres->l_flags,
2027		   lockres->l_action,
2028		   lockres->l_unlock_action,
2029		   lockres->l_ro_holders,
2030		   lockres->l_ex_holders,
2031		   lockres->l_requested,
2032		   lockres->l_blocking);
2033
2034	/* Dump the raw LVB */
2035	lvb = lockres->l_lksb.lvb;
2036	for(i = 0; i < DLM_LVB_LEN; i++)
2037		seq_printf(m, "0x%x\t", lvb[i]);
2038
2039	/* End the line */
2040	seq_printf(m, "\n");
2041	return 0;
2042}
2043
2044static struct seq_operations ocfs2_dlm_seq_ops = {
2045	.start =	ocfs2_dlm_seq_start,
2046	.stop =		ocfs2_dlm_seq_stop,
2047	.next =		ocfs2_dlm_seq_next,
2048	.show =		ocfs2_dlm_seq_show,
2049};
2050
2051static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2052{
2053	struct seq_file *seq = (struct seq_file *) file->private_data;
2054	struct ocfs2_dlm_seq_priv *priv = seq->private;
2055	struct ocfs2_lock_res *res = &priv->p_iter_res;
2056
2057	ocfs2_remove_lockres_tracking(res);
2058	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2059	return seq_release_private(inode, file);
2060}
2061
2062static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2063{
2064	int ret;
2065	struct ocfs2_dlm_seq_priv *priv;
2066	struct seq_file *seq;
2067	struct ocfs2_super *osb;
2068
2069	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2070	if (!priv) {
2071		ret = -ENOMEM;
2072		mlog_errno(ret);
2073		goto out;
2074	}
2075	osb = (struct ocfs2_super *) inode->u.generic_ip;
2076	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2077	priv->p_dlm_debug = osb->osb_dlm_debug;
2078	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2079
2080	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2081	if (ret) {
2082		kfree(priv);
2083		mlog_errno(ret);
2084		goto out;
2085	}
2086
2087	seq = (struct seq_file *) file->private_data;
2088	seq->private = priv;
2089
2090	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2091				   priv->p_dlm_debug);
2092
2093out:
2094	return ret;
2095}
2096
2097static const struct file_operations ocfs2_dlm_debug_fops = {
2098	.open =		ocfs2_dlm_debug_open,
2099	.release =	ocfs2_dlm_debug_release,
2100	.read =		seq_read,
2101	.llseek =	seq_lseek,
2102};
2103
2104static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2105{
2106	int ret = 0;
2107	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2108
2109	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2110							 S_IFREG|S_IRUSR,
2111							 osb->osb_debug_root,
2112							 osb,
2113							 &ocfs2_dlm_debug_fops);
2114	if (!dlm_debug->d_locking_state) {
2115		ret = -EINVAL;
2116		mlog(ML_ERROR,
2117		     "Unable to create locking state debugfs file.\n");
2118		goto out;
2119	}
2120
2121	ocfs2_get_dlm_debug(dlm_debug);
2122out:
2123	return ret;
2124}
2125
2126static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2127{
2128	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2129
2130	if (dlm_debug) {
2131		debugfs_remove(dlm_debug->d_locking_state);
2132		ocfs2_put_dlm_debug(dlm_debug);
2133	}
2134}
2135
2136int ocfs2_dlm_init(struct ocfs2_super *osb)
2137{
2138	int status;
2139	u32 dlm_key;
2140	struct dlm_ctxt *dlm;
2141
2142	mlog_entry_void();
2143
2144	status = ocfs2_dlm_init_debug(osb);
2145	if (status < 0) {
2146		mlog_errno(status);
2147		goto bail;
2148	}
2149
2150	/* launch vote thread */
2151	osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2152	if (IS_ERR(osb->vote_task)) {
2153		status = PTR_ERR(osb->vote_task);
2154		osb->vote_task = NULL;
2155		mlog_errno(status);
2156		goto bail;
2157	}
2158
2159	/* used by the dlm code to make message headers unique, each
2160	 * node in this domain must agree on this. */
2161	dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2162
2163	/* for now, uuid == domain */
2164	dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2165	if (IS_ERR(dlm)) {
2166		status = PTR_ERR(dlm);
2167		mlog_errno(status);
2168		goto bail;
2169	}
2170
2171	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2172	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2173
2174	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2175
2176	osb->dlm = dlm;
2177
2178	status = 0;
2179bail:
2180	if (status < 0) {
2181		ocfs2_dlm_shutdown_debug(osb);
2182		if (osb->vote_task)
2183			kthread_stop(osb->vote_task);
2184	}
2185
2186	mlog_exit(status);
2187	return status;
2188}
2189
2190void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2191{
2192	mlog_entry_void();
2193
2194	dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2195
2196	ocfs2_drop_osb_locks(osb);
2197
2198	if (osb->vote_task) {
2199		kthread_stop(osb->vote_task);
2200		osb->vote_task = NULL;
2201	}
2202
2203	ocfs2_lock_res_free(&osb->osb_super_lockres);
2204	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2205
2206	dlm_unregister_domain(osb->dlm);
2207	osb->dlm = NULL;
2208
2209	ocfs2_dlm_shutdown_debug(osb);
2210
2211	mlog_exit_void();
2212}
2213
2214static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2215{
2216	struct ocfs2_lock_res *lockres = opaque;
2217	unsigned long flags;
2218
2219	mlog_entry_void();
2220
2221	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2222	     lockres->l_unlock_action);
2223
2224	spin_lock_irqsave(&lockres->l_lock, flags);
2225	/* We tried to cancel a convert request, but it was already
2226	 * granted. All we want to do here is clear our unlock
2227	 * state. The wake_up call done at the bottom is redundant
2228	 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2229	 * hurt anything anyway */
2230	if (status == DLM_CANCELGRANT &&
2231	    lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2232		mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2233
2234		/* We don't clear the busy flag in this case as it
2235		 * should have been cleared by the ast which the dlm
2236		 * has called. */
2237		goto complete_unlock;
2238	}
2239
2240	if (status != DLM_NORMAL) {
2241		mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2242		     "unlock_action %d\n", status, lockres->l_name,
2243		     lockres->l_unlock_action);
2244		spin_unlock_irqrestore(&lockres->l_lock, flags);
2245		return;
2246	}
2247
2248	switch(lockres->l_unlock_action) {
2249	case OCFS2_UNLOCK_CANCEL_CONVERT:
2250		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2251		lockres->l_action = OCFS2_AST_INVALID;
2252		break;
2253	case OCFS2_UNLOCK_DROP_LOCK:
2254		lockres->l_level = LKM_IVMODE;
2255		break;
2256	default:
2257		BUG();
2258	}
2259
2260	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2261complete_unlock:
2262	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2263	spin_unlock_irqrestore(&lockres->l_lock, flags);
2264
2265	wake_up(&lockres->l_event);
2266
2267	mlog_exit_void();
2268}
2269
2270typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2271
2272struct drop_lock_cb {
2273	ocfs2_pre_drop_cb_t	*drop_func;
2274	void			*drop_data;
2275};
2276
2277static int ocfs2_drop_lock(struct ocfs2_super *osb,
2278			   struct ocfs2_lock_res *lockres,
2279			   struct drop_lock_cb *dcb)
2280{
2281	enum dlm_status status;
2282	unsigned long flags;
2283
2284	/* We didn't get anywhere near actually using this lockres. */
2285	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2286		goto out;
2287
2288	spin_lock_irqsave(&lockres->l_lock, flags);
2289
2290	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2291			"lockres %s, flags 0x%lx\n",
2292			lockres->l_name, lockres->l_flags);
2293
2294	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2295		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2296		     "%u, unlock_action = %u\n",
2297		     lockres->l_name, lockres->l_flags, lockres->l_action,
2298		     lockres->l_unlock_action);
2299
2300		spin_unlock_irqrestore(&lockres->l_lock, flags);
2301
2302		/* XXX: Today we just wait on any busy
2303		 * locks... Perhaps we need to cancel converts in the
2304		 * future? */
2305		ocfs2_wait_on_busy_lock(lockres);
2306
2307		spin_lock_irqsave(&lockres->l_lock, flags);
2308	}
2309
2310	if (dcb)
2311		dcb->drop_func(lockres, dcb->drop_data);
2312
2313	if (lockres->l_flags & OCFS2_LOCK_BUSY)
2314		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2315		     lockres->l_name);
2316	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2317		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2318
2319	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2320		spin_unlock_irqrestore(&lockres->l_lock, flags);
2321		goto out;
2322	}
2323
2324	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2325
2326	/* make sure we never get here while waiting for an ast to
2327	 * fire. */
2328	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2329
2330	/* is this necessary? */
2331	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2332	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2333	spin_unlock_irqrestore(&lockres->l_lock, flags);
2334
2335	mlog(0, "lock %s\n", lockres->l_name);
2336
2337	status = dlmunlock(osb->dlm, &lockres->l_lksb, LKM_VALBLK,
2338			   ocfs2_unlock_ast, lockres);
2339	if (status != DLM_NORMAL) {
2340		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2341		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2342		dlm_print_one_lock(lockres->l_lksb.lockid);
2343		BUG();
2344	}
2345	mlog(0, "lock %s, successfull return from dlmunlock\n",
2346	     lockres->l_name);
2347
2348	ocfs2_wait_on_busy_lock(lockres);
2349out:
2350	mlog_exit(0);
2351	return 0;
2352}
2353
2354/* Mark the lockres as being dropped. It will no longer be
2355 * queued if blocking, but we still may have to wait on it
2356 * being dequeued from the vote thread before we can consider
2357 * it safe to drop.
2358 *
2359 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2360void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2361{
2362	int status;
2363	struct ocfs2_mask_waiter mw;
2364	unsigned long flags;
2365
2366	ocfs2_init_mask_waiter(&mw);
2367
2368	spin_lock_irqsave(&lockres->l_lock, flags);
2369	lockres->l_flags |= OCFS2_LOCK_FREEING;
2370	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2371		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2372		spin_unlock_irqrestore(&lockres->l_lock, flags);
2373
2374		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2375
2376		status = ocfs2_wait_for_mask(&mw);
2377		if (status)
2378			mlog_errno(status);
2379
2380		spin_lock_irqsave(&lockres->l_lock, flags);
2381	}
2382	spin_unlock_irqrestore(&lockres->l_lock, flags);
2383}
2384
2385void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2386			       struct ocfs2_lock_res *lockres)
2387{
2388	int ret;
2389
2390	ocfs2_mark_lockres_freeing(lockres);
2391	ret = ocfs2_drop_lock(osb, lockres, NULL);
2392	if (ret)
2393		mlog_errno(ret);
2394}
2395
2396static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2397{
2398	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2399	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2400}
2401
2402static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2403{
2404	struct inode *inode = data;
2405
2406	/* the metadata lock requires a bit more work as we have an
2407	 * LVB to worry about. */
2408	if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2409	    lockres->l_level == LKM_EXMODE &&
2410	    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2411		__ocfs2_stuff_meta_lvb(inode);
2412}
2413
2414int ocfs2_drop_inode_locks(struct inode *inode)
2415{
2416	int status, err;
2417	struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2418
2419	mlog_entry_void();
2420
2421	/* No need to call ocfs2_mark_lockres_freeing here -
2422	 * ocfs2_clear_inode has done it for us. */
2423
2424	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2425			      &OCFS2_I(inode)->ip_data_lockres,
2426			      NULL);
2427	if (err < 0)
2428		mlog_errno(err);
2429
2430	status = err;
2431
2432	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2433			      &OCFS2_I(inode)->ip_meta_lockres,
2434			      &meta_dcb);
2435	if (err < 0)
2436		mlog_errno(err);
2437	if (err < 0 && !status)
2438		status = err;
2439
2440	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2441			      &OCFS2_I(inode)->ip_rw_lockres,
2442			      NULL);
2443	if (err < 0)
2444		mlog_errno(err);
2445	if (err < 0 && !status)
2446		status = err;
2447
2448	mlog_exit(status);
2449	return status;
2450}
2451
2452static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2453				      int new_level)
2454{
2455	assert_spin_locked(&lockres->l_lock);
2456
2457	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2458
2459	if (lockres->l_level <= new_level) {
2460		mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2461		     lockres->l_level, new_level);
2462		BUG();
2463	}
2464
2465	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2466	     lockres->l_name, new_level, lockres->l_blocking);
2467
2468	lockres->l_action = OCFS2_AST_DOWNCONVERT;
2469	lockres->l_requested = new_level;
2470	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2471}
2472
2473static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2474				  struct ocfs2_lock_res *lockres,
2475				  int new_level,
2476				  int lvb)
2477{
2478	int ret, dlm_flags = LKM_CONVERT;
2479	enum dlm_status status;
2480
2481	mlog_entry_void();
2482
2483	if (lvb)
2484		dlm_flags |= LKM_VALBLK;
2485
2486	status = dlmlock(osb->dlm,
2487			 new_level,
2488			 &lockres->l_lksb,
2489			 dlm_flags,
2490			 lockres->l_name,
2491			 OCFS2_LOCK_ID_MAX_LEN - 1,
2492			 ocfs2_locking_ast,
2493			 lockres,
2494			 lockres->l_ops->bast);
2495	if (status != DLM_NORMAL) {
2496		ocfs2_log_dlm_error("dlmlock", status, lockres);
2497		ret = -EINVAL;
2498		ocfs2_recover_from_dlm_error(lockres, 1);
2499		goto bail;
2500	}
2501
2502	ret = 0;
2503bail:
2504	mlog_exit(ret);
2505	return ret;
2506}
2507
2508/* returns 1 when the caller should unlock and call dlmunlock */
2509static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2510				        struct ocfs2_lock_res *lockres)
2511{
2512	assert_spin_locked(&lockres->l_lock);
2513
2514	mlog_entry_void();
2515	mlog(0, "lock %s\n", lockres->l_name);
2516
2517	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2518		/* If we're already trying to cancel a lock conversion
2519		 * then just drop the spinlock and allow the caller to
2520		 * requeue this lock. */
2521
2522		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2523		return 0;
2524	}
2525
2526	/* were we in a convert when we got the bast fire? */
2527	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2528	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
2529	/* set things up for the unlockast to know to just
2530	 * clear out the ast_action and unset busy, etc. */
2531	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2532
2533	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2534			"lock %s, invalid flags: 0x%lx\n",
2535			lockres->l_name, lockres->l_flags);
2536
2537	return 1;
2538}
2539
2540static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2541				struct ocfs2_lock_res *lockres)
2542{
2543	int ret;
2544	enum dlm_status status;
2545
2546	mlog_entry_void();
2547	mlog(0, "lock %s\n", lockres->l_name);
2548
2549	ret = 0;
2550	status = dlmunlock(osb->dlm,
2551			   &lockres->l_lksb,
2552			   LKM_CANCEL,
2553			   ocfs2_unlock_ast,
2554			   lockres);
2555	if (status != DLM_NORMAL) {
2556		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2557		ret = -EINVAL;
2558		ocfs2_recover_from_dlm_error(lockres, 0);
2559	}
2560
2561	mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2562
2563	mlog_exit(ret);
2564	return ret;
2565}
2566
2567static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
2568						  struct ocfs2_lock_res *lockres,
2569						  int new_level)
2570{
2571	int ret;
2572
2573	mlog_entry_void();
2574
2575	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2576
2577	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2578		ret = 0;
2579		mlog(0, "lockres %s currently being refreshed -- backing "
2580		     "off!\n", lockres->l_name);
2581	} else if (new_level == LKM_PRMODE)
2582		ret = !lockres->l_ex_holders &&
2583			ocfs2_inode_fully_checkpointed(inode);
2584	else /* Must be NLMODE we're converting to. */
2585		ret = !lockres->l_ro_holders && !lockres->l_ex_holders &&
2586			ocfs2_inode_fully_checkpointed(inode);
2587
2588	mlog_exit(ret);
2589	return ret;
2590}
2591
2592static int ocfs2_do_unblock_meta(struct inode *inode,
2593				 int *requeue)
2594{
2595	int new_level;
2596	int set_lvb = 0;
2597	int ret = 0;
2598	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
2599	unsigned long flags;
2600
2601	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2602
2603	mlog_entry_void();
2604
2605	spin_lock_irqsave(&lockres->l_lock, flags);
2606
2607	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2608
2609	mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level,
2610	     lockres->l_blocking);
2611
2612	BUG_ON(lockres->l_level != LKM_EXMODE &&
2613	       lockres->l_level != LKM_PRMODE);
2614
2615	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2616		*requeue = 1;
2617		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2618		spin_unlock_irqrestore(&lockres->l_lock, flags);
2619		if (ret) {
2620			ret = ocfs2_cancel_convert(osb, lockres);
2621			if (ret < 0)
2622				mlog_errno(ret);
2623		}
2624		goto leave;
2625	}
2626
2627	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2628
2629	mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
2630	     lockres->l_level, lockres->l_blocking, new_level);
2631
2632	if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) {
2633		if (lockres->l_level == LKM_EXMODE)
2634			set_lvb = 1;
2635
2636		/* If the lock hasn't been refreshed yet (rare), then
2637		 * our memory inode values are old and we skip
2638		 * stuffing the lvb. There's no need to actually clear
2639		 * out the lvb here as it's value is still valid. */
2640		if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2641			if (set_lvb)
2642				__ocfs2_stuff_meta_lvb(inode);
2643		} else
2644			mlog(0, "lockres %s: downconverting stale lock!\n",
2645			     lockres->l_name);
2646
2647		mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
2648		     "l_blocking=%d, new_level=%d\n",
2649		     lockres->l_level, lockres->l_blocking, new_level);
2650
2651		ocfs2_prepare_downconvert(lockres, new_level);
2652		spin_unlock_irqrestore(&lockres->l_lock, flags);
2653		ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2654		goto leave;
2655	}
2656	if (!ocfs2_inode_fully_checkpointed(inode))
2657		ocfs2_start_checkpoint(osb);
2658
2659	*requeue = 1;
2660	spin_unlock_irqrestore(&lockres->l_lock, flags);
2661	ret = 0;
2662leave:
2663	mlog_exit(ret);
2664	return ret;
2665}
2666
2667static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2668				      struct ocfs2_lock_res *lockres,
2669				      struct ocfs2_unblock_ctl *ctl,
2670				      ocfs2_convert_worker_t *worker)
2671{
2672	unsigned long flags;
2673	int blocking;
2674	int new_level;
2675	int ret = 0;
2676
2677	mlog_entry_void();
2678
2679	spin_lock_irqsave(&lockres->l_lock, flags);
2680
2681	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2682
2683recheck:
2684	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2685		ctl->requeue = 1;
2686		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2687		spin_unlock_irqrestore(&lockres->l_lock, flags);
2688		if (ret) {
2689			ret = ocfs2_cancel_convert(osb, lockres);
2690			if (ret < 0)
2691				mlog_errno(ret);
2692		}
2693		goto leave;
2694	}
2695
2696	/* if we're blocking an exclusive and we have *any* holders,
2697	 * then requeue. */
2698	if ((lockres->l_blocking == LKM_EXMODE)
2699	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
2700		spin_unlock_irqrestore(&lockres->l_lock, flags);
2701		ctl->requeue = 1;
2702		ret = 0;
2703		goto leave;
2704	}
2705
2706	/* If it's a PR we're blocking, then only
2707	 * requeue if we've got any EX holders */
2708	if (lockres->l_blocking == LKM_PRMODE &&
2709	    lockres->l_ex_holders) {
2710		spin_unlock_irqrestore(&lockres->l_lock, flags);
2711		ctl->requeue = 1;
2712		ret = 0;
2713		goto leave;
2714	}
2715
2716	/* If we get here, then we know that there are no more
2717	 * incompatible holders (and anyone asking for an incompatible
2718	 * lock is blocked). We can now downconvert the lock */
2719	if (!worker)
2720		goto downconvert;
2721
2722	/* Some lockres types want to do a bit of work before
2723	 * downconverting a lock. Allow that here. The worker function
2724	 * may sleep, so we save off a copy of what we're blocking as
2725	 * it may change while we're not holding the spin lock. */
2726	blocking = lockres->l_blocking;
2727	spin_unlock_irqrestore(&lockres->l_lock, flags);
2728
2729	ctl->unblock_action = worker(lockres, blocking);
2730
2731	if (ctl->unblock_action == UNBLOCK_STOP_POST)
2732		goto leave;
2733
2734	spin_lock_irqsave(&lockres->l_lock, flags);
2735	if (blocking != lockres->l_blocking) {
2736		/* If this changed underneath us, then we can't drop
2737		 * it just yet. */
2738		goto recheck;
2739	}
2740
2741downconvert:
2742	ctl->requeue = 0;
2743	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2744
2745	ocfs2_prepare_downconvert(lockres, new_level);
2746	spin_unlock_irqrestore(&lockres->l_lock, flags);
2747	ret = ocfs2_downconvert_lock(osb, lockres, new_level, 0);
2748leave:
2749	mlog_exit(ret);
2750	return ret;
2751}
2752
2753static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2754				     int blocking)
2755{
2756	struct inode *inode;
2757	struct address_space *mapping;
2758
2759       	inode = ocfs2_lock_res_inode(lockres);
2760	mapping = inode->i_mapping;
2761
2762	if (filemap_fdatawrite(mapping)) {
2763		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2764		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2765	}
2766	sync_mapping_buffers(mapping);
2767	if (blocking == LKM_EXMODE) {
2768		truncate_inode_pages(mapping, 0);
2769		unmap_mapping_range(mapping, 0, 0, 0);
2770	} else {
2771		/* We only need to wait on the I/O if we're not also
2772		 * truncating pages because truncate_inode_pages waits
2773		 * for us above. We don't truncate pages if we're
2774		 * blocking anything < EXMODE because we want to keep
2775		 * them around in that case. */
2776		filemap_fdatawait(mapping);
2777	}
2778
2779	return UNBLOCK_CONTINUE;
2780}
2781
2782int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2783		       struct ocfs2_unblock_ctl *ctl)
2784{
2785	int status;
2786	struct inode *inode;
2787	struct ocfs2_super *osb;
2788
2789	mlog_entry_void();
2790
2791	inode = ocfs2_lock_res_inode(lockres);
2792	osb = OCFS2_SB(inode->i_sb);
2793
2794	mlog(0, "unblock inode %llu\n",
2795	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2796
2797	status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
2798					    ocfs2_data_convert_worker);
2799	if (status < 0)
2800		mlog_errno(status);
2801
2802	mlog(0, "inode %llu, requeue = %d\n",
2803	     (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2804
2805	mlog_exit(status);
2806	return status;
2807}
2808
2809static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2810				    struct ocfs2_unblock_ctl *ctl)
2811{
2812	int status;
2813	struct inode *inode;
2814
2815	mlog_entry_void();
2816
2817	mlog(0, "Unblock lockres %s\n", lockres->l_name);
2818
2819	inode  = ocfs2_lock_res_inode(lockres);
2820
2821	status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2822					    lockres, ctl, NULL);
2823	if (status < 0)
2824		mlog_errno(status);
2825
2826	mlog_exit(status);
2827	return status;
2828}
2829
2830static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2831			      struct ocfs2_unblock_ctl *ctl)
2832{
2833	int status;
2834	struct inode *inode;
2835
2836	mlog_entry_void();
2837
2838       	inode = ocfs2_lock_res_inode(lockres);
2839
2840	mlog(0, "unblock inode %llu\n",
2841	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2842
2843	status = ocfs2_do_unblock_meta(inode, &ctl->requeue);
2844	if (status < 0)
2845		mlog_errno(status);
2846
2847	mlog(0, "inode %llu, requeue = %d\n",
2848	     (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2849
2850	mlog_exit(status);
2851	return status;
2852}
2853
2854/*
2855 * Does the final reference drop on our dentry lock. Right now this
2856 * happens in the vote thread, but we could choose to simplify the
2857 * dlmglue API and push these off to the ocfs2_wq in the future.
2858 */
2859static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2860				     struct ocfs2_lock_res *lockres)
2861{
2862	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2863	ocfs2_dentry_lock_put(osb, dl);
2864}
2865
2866/*
2867 * d_delete() matching dentries before the lock downconvert.
2868 *
2869 * At this point, any process waiting to destroy the
2870 * dentry_lock due to last ref count is stopped by the
2871 * OCFS2_LOCK_QUEUED flag.
2872 *
2873 * We have two potential problems
2874 *
2875 * 1) If we do the last reference drop on our dentry_lock (via dput)
2876 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2877 *    the downconvert to finish. Instead we take an elevated
2878 *    reference and push the drop until after we've completed our
2879 *    unblock processing.
2880 *
2881 * 2) There might be another process with a final reference,
2882 *    waiting on us to finish processing. If this is the case, we
2883 *    detect it and exit out - there's no more dentries anyway.
2884 */
2885static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2886				       int blocking)
2887{
2888	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2889	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2890	struct dentry *dentry;
2891	unsigned long flags;
2892	int extra_ref = 0;
2893
2894	/*
2895	 * This node is blocking another node from getting a read
2896	 * lock. This happens when we've renamed within a
2897	 * directory. We've forced the other nodes to d_delete(), but
2898	 * we never actually dropped our lock because it's still
2899	 * valid. The downconvert code will retain a PR for this node,
2900	 * so there's no further work to do.
2901	 */
2902	if (blocking == LKM_PRMODE)
2903		return UNBLOCK_CONTINUE;
2904
2905	/*
2906	 * Mark this inode as potentially orphaned. The code in
2907	 * ocfs2_delete_inode() will figure out whether it actually
2908	 * needs to be freed or not.
2909	 */
2910	spin_lock(&oi->ip_lock);
2911	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2912	spin_unlock(&oi->ip_lock);
2913
2914	/*
2915	 * Yuck. We need to make sure however that the check of
2916	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
2917	 * respect to a reference decrement or the setting of that
2918	 * flag.
2919	 */
2920	spin_lock_irqsave(&lockres->l_lock, flags);
2921	spin_lock(&dentry_attach_lock);
2922	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2923	    && dl->dl_count) {
2924		dl->dl_count++;
2925		extra_ref = 1;
2926	}
2927	spin_unlock(&dentry_attach_lock);
2928	spin_unlock_irqrestore(&lockres->l_lock, flags);
2929
2930	mlog(0, "extra_ref = %d\n", extra_ref);
2931
2932	/*
2933	 * We have a process waiting on us in ocfs2_dentry_iput(),
2934	 * which means we can't have any more outstanding
2935	 * aliases. There's no need to do any more work.
2936	 */
2937	if (!extra_ref)
2938		return UNBLOCK_CONTINUE;
2939
2940	spin_lock(&dentry_attach_lock);
2941	while (1) {
2942		dentry = ocfs2_find_local_alias(dl->dl_inode,
2943						dl->dl_parent_blkno, 1);
2944		if (!dentry)
2945			break;
2946		spin_unlock(&dentry_attach_lock);
2947
2948		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2949		     dentry->d_name.name);
2950
2951		/*
2952		 * The following dcache calls may do an
2953		 * iput(). Normally we don't want that from the
2954		 * downconverting thread, but in this case it's ok
2955		 * because the requesting node already has an
2956		 * exclusive lock on the inode, so it can't be queued
2957		 * for a downconvert.
2958		 */
2959		d_delete(dentry);
2960		dput(dentry);
2961
2962		spin_lock(&dentry_attach_lock);
2963	}
2964	spin_unlock(&dentry_attach_lock);
2965
2966	/*
2967	 * If we are the last holder of this dentry lock, there is no
2968	 * reason to downconvert so skip straight to the unlock.
2969	 */
2970	if (dl->dl_count == 1)
2971		return UNBLOCK_STOP_POST;
2972
2973	return UNBLOCK_CONTINUE_POST;
2974}
2975
2976static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
2977				     struct ocfs2_unblock_ctl *ctl)
2978{
2979	int ret;
2980	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2981	struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
2982
2983	mlog(0, "unblock dentry lock: %llu\n",
2984	     (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
2985
2986	ret = ocfs2_generic_unblock_lock(osb,
2987					 lockres,
2988					 ctl,
2989					 ocfs2_dentry_convert_worker);
2990	if (ret < 0)
2991		mlog_errno(ret);
2992
2993	mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
2994
2995	return ret;
2996}
2997
2998/* Generic unblock function for any lockres whose private data is an
2999 * ocfs2_super pointer. */
3000static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
3001				  struct ocfs2_unblock_ctl *ctl)
3002{
3003	int status;
3004	struct ocfs2_super *osb;
3005
3006	mlog_entry_void();
3007
3008	mlog(0, "Unblock lockres %s\n", lockres->l_name);
3009
3010	osb = ocfs2_lock_res_super(lockres);
3011
3012	status = ocfs2_generic_unblock_lock(osb,
3013					    lockres,
3014					    ctl,
3015					    NULL);
3016	if (status < 0)
3017		mlog_errno(status);
3018
3019	mlog_exit(status);
3020	return status;
3021}
3022
3023void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3024				struct ocfs2_lock_res *lockres)
3025{
3026	int status;
3027	struct ocfs2_unblock_ctl ctl = {0, 0,};
3028	unsigned long flags;
3029
3030	/* Our reference to the lockres in this function can be
3031	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3032	 * flag. */
3033
3034	mlog_entry_void();
3035
3036	BUG_ON(!lockres);
3037	BUG_ON(!lockres->l_ops);
3038	BUG_ON(!lockres->l_ops->unblock);
3039
3040	mlog(0, "lockres %s blocked.\n", lockres->l_name);
3041
3042	/* Detect whether a lock has been marked as going away while
3043	 * the vote thread was processing other things. A lock can
3044	 * still be marked with OCFS2_LOCK_FREEING after this check,
3045	 * but short circuiting here will still save us some
3046	 * performance. */
3047	spin_lock_irqsave(&lockres->l_lock, flags);
3048	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3049		goto unqueue;
3050	spin_unlock_irqrestore(&lockres->l_lock, flags);
3051
3052	status = lockres->l_ops->unblock(lockres, &ctl);
3053	if (status < 0)
3054		mlog_errno(status);
3055
3056	spin_lock_irqsave(&lockres->l_lock, flags);
3057unqueue:
3058	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3059		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3060	} else
3061		ocfs2_schedule_blocked_lock(osb, lockres);
3062
3063	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3064	     ctl.requeue ? "yes" : "no");
3065	spin_unlock_irqrestore(&lockres->l_lock, flags);
3066
3067	if (ctl.unblock_action != UNBLOCK_CONTINUE
3068	    && lockres->l_ops->post_unlock)
3069		lockres->l_ops->post_unlock(osb, lockres);
3070
3071	mlog_exit_void();
3072}
3073
3074static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3075					struct ocfs2_lock_res *lockres)
3076{
3077	mlog_entry_void();
3078
3079	assert_spin_locked(&lockres->l_lock);
3080
3081	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3082		/* Do not schedule a lock for downconvert when it's on
3083		 * the way to destruction - any nodes wanting access
3084		 * to the resource will get it soon. */
3085		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3086		     lockres->l_name, lockres->l_flags);
3087		return;
3088	}
3089
3090	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3091
3092	spin_lock(&osb->vote_task_lock);
3093	if (list_empty(&lockres->l_blocked_list)) {
3094		list_add_tail(&lockres->l_blocked_list,
3095			      &osb->blocked_lock_list);
3096		osb->blocked_lock_count++;
3097	}
3098	spin_unlock(&osb->vote_task_lock);
3099
3100	mlog_exit_void();
3101}
3102
3103/* This aids in debugging situations where a bad LVB might be involved. */
3104void ocfs2_dump_meta_lvb_info(u64 level,
3105			      const char *function,
3106			      unsigned int line,
3107			      struct ocfs2_lock_res *lockres)
3108{
3109	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3110
3111	mlog(level, "LVB information for %s (called from %s:%u):\n",
3112	     lockres->l_name, function, line);
3113	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3114	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3115	     be32_to_cpu(lvb->lvb_igeneration));
3116	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3117	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3118	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3119	     be16_to_cpu(lvb->lvb_imode));
3120	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3121	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3122	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3123	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3124	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3125	     be32_to_cpu(lvb->lvb_iattr));
3126}
3127