dlmglue.c revision 500086300e6dc5308a7328990bd50d17e075162b
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/smp_lock.h>
31#include <linux/crc32.h>
32#include <linux/kthread.h>
33#include <linux/pagemap.h>
34#include <linux/debugfs.h>
35#include <linux/seq_file.h>
36
37#include <cluster/heartbeat.h>
38#include <cluster/nodemanager.h>
39#include <cluster/tcp.h>
40
41#include <dlm/dlmapi.h>
42
43#define MLOG_MASK_PREFIX ML_DLM_GLUE
44#include <cluster/masklog.h>
45
46#include "ocfs2.h"
47
48#include "alloc.h"
49#include "dcache.h"
50#include "dlmglue.h"
51#include "extent_map.h"
52#include "file.h"
53#include "heartbeat.h"
54#include "inode.h"
55#include "journal.h"
56#include "slot_map.h"
57#include "super.h"
58#include "uptodate.h"
59#include "vote.h"
60
61#include "buffer_head_io.h"
62
63struct ocfs2_mask_waiter {
64	struct list_head	mw_item;
65	int			mw_status;
66	struct completion	mw_complete;
67	unsigned long		mw_mask;
68	unsigned long		mw_goal;
69};
70
71static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
72static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
73
74/*
75 * Return value from ->downconvert_worker functions.
76 *
77 * These control the precise actions of ocfs2_unblock_lock()
78 * and ocfs2_process_blocked_lock()
79 *
80 */
81enum ocfs2_unblock_action {
82	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
83	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
84				      * ->post_unlock callback */
85	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
86				      * ->post_unlock() callback. */
87};
88
89struct ocfs2_unblock_ctl {
90	int requeue;
91	enum ocfs2_unblock_action unblock_action;
92};
93
94static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
95					int new_level);
96static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
97
98static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
99				     int blocking);
100
101static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
102				       int blocking);
103
104static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
105				     struct ocfs2_lock_res *lockres);
106
107/*
108 * OCFS2 Lock Resource Operations
109 *
110 * These fine tune the behavior of the generic dlmglue locking infrastructure.
111 *
112 * The most basic of lock types can point ->l_priv to their respective
113 * struct ocfs2_super and allow the default actions to manage things.
114 *
115 * Right now, each lock type also needs to implement an init function,
116 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
117 * should be called when the lock is no longer needed (i.e., object
118 * destruction time).
119 */
120struct ocfs2_lock_res_ops {
121	/*
122	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
123	 * this callback if ->l_priv is not an ocfs2_super pointer
124	 */
125	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
126
127	/*
128	 * Optionally called in the downconvert (or "vote") thread
129	 * after a successful downconvert. The lockres will not be
130	 * referenced after this callback is called, so it is safe to
131	 * free memory, etc.
132	 *
133	 * The exact semantics of when this is called are controlled
134	 * by ->downconvert_worker()
135	 */
136	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
137
138	/*
139	 * Allow a lock type to add checks to determine whether it is
140	 * safe to downconvert a lock. Return 0 to re-queue the
141	 * downconvert at a later time, nonzero to continue.
142	 *
143	 * For most locks, the default checks that there are no
144	 * incompatible holders are sufficient.
145	 *
146	 * Called with the lockres spinlock held.
147	 */
148	int (*check_downconvert)(struct ocfs2_lock_res *, int);
149
150	/*
151	 * Allows a lock type to populate the lock value block. This
152	 * is called on downconvert, and when we drop a lock.
153	 *
154	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
155	 * in the flags field.
156	 *
157	 * Called with the lockres spinlock held.
158	 */
159	void (*set_lvb)(struct ocfs2_lock_res *);
160
161	/*
162	 * Called from the downconvert thread when it is determined
163	 * that a lock will be downconverted. This is called without
164	 * any locks held so the function can do work that might
165	 * schedule (syncing out data, etc).
166	 *
167	 * This should return any one of the ocfs2_unblock_action
168	 * values, depending on what it wants the thread to do.
169	 */
170	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
171
172	/*
173	 * LOCK_TYPE_* flags which describe the specific requirements
174	 * of a lock type. Descriptions of each individual flag follow.
175	 */
176	int flags;
177};
178
179/*
180 * Some locks want to "refresh" potentially stale data when a
181 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
182 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
183 * individual lockres l_flags member from the ast function. It is
184 * expected that the locking wrapper will clear the
185 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
186 */
187#define LOCK_TYPE_REQUIRES_REFRESH 0x1
188
189/*
190 * Indicate that a lock type makes use of the lock value block. The
191 * ->set_lvb lock type callback must be defined.
192 */
193#define LOCK_TYPE_USES_LVB		0x2
194
195static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
196	.get_osb	= ocfs2_get_inode_osb,
197	.flags		= 0,
198};
199
200static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
201	.get_osb	= ocfs2_get_inode_osb,
202	.check_downconvert = ocfs2_check_meta_downconvert,
203	.set_lvb	= ocfs2_set_meta_lvb,
204	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
205};
206
207static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
208	.get_osb	= ocfs2_get_inode_osb,
209	.downconvert_worker = ocfs2_data_convert_worker,
210	.flags		= 0,
211};
212
213static struct ocfs2_lock_res_ops ocfs2_super_lops = {
214	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
215};
216
217static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
218	.flags		= 0,
219};
220
221static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
222	.get_osb	= ocfs2_get_dentry_osb,
223	.post_unlock	= ocfs2_dentry_post_unlock,
224	.downconvert_worker = ocfs2_dentry_convert_worker,
225	.flags		= 0,
226};
227
228static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
229	.get_osb	= ocfs2_get_inode_osb,
230	.flags		= 0,
231};
232
233static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
234{
235	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
236		lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
237		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
238		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
239}
240
241static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
242{
243	BUG_ON(!ocfs2_is_inode_lock(lockres));
244
245	return (struct inode *) lockres->l_priv;
246}
247
248static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
249{
250	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
251
252	return (struct ocfs2_dentry_lock *)lockres->l_priv;
253}
254
255static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
256{
257	if (lockres->l_ops->get_osb)
258		return lockres->l_ops->get_osb(lockres);
259
260	return (struct ocfs2_super *)lockres->l_priv;
261}
262
263static int ocfs2_lock_create(struct ocfs2_super *osb,
264			     struct ocfs2_lock_res *lockres,
265			     int level,
266			     int dlm_flags);
267static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
268						     int wanted);
269static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
270				 struct ocfs2_lock_res *lockres,
271				 int level);
272static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
273static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
274static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
275static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
276static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
277					struct ocfs2_lock_res *lockres);
278static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
279						int convert);
280#define ocfs2_log_dlm_error(_func, _stat, _lockres) do {	\
281	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "	\
282		"resource %s: %s\n", dlm_errname(_stat), _func,	\
283		_lockres->l_name, dlm_errmsg(_stat));		\
284} while (0)
285static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
286				 struct ocfs2_lock_res *lockres);
287static int ocfs2_meta_lock_update(struct inode *inode,
288				  struct buffer_head **bh);
289static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
290static inline int ocfs2_highest_compat_lock_level(int level);
291
292static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
293				  u64 blkno,
294				  u32 generation,
295				  char *name)
296{
297	int len;
298
299	mlog_entry_void();
300
301	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
302
303	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
304		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
305		       (long long)blkno, generation);
306
307	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
308
309	mlog(0, "built lock resource with name: %s\n", name);
310
311	mlog_exit_void();
312}
313
314static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
315
316static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
317				       struct ocfs2_dlm_debug *dlm_debug)
318{
319	mlog(0, "Add tracking for lockres %s\n", res->l_name);
320
321	spin_lock(&ocfs2_dlm_tracking_lock);
322	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
323	spin_unlock(&ocfs2_dlm_tracking_lock);
324}
325
326static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
327{
328	spin_lock(&ocfs2_dlm_tracking_lock);
329	if (!list_empty(&res->l_debug_list))
330		list_del_init(&res->l_debug_list);
331	spin_unlock(&ocfs2_dlm_tracking_lock);
332}
333
334static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
335				       struct ocfs2_lock_res *res,
336				       enum ocfs2_lock_type type,
337				       struct ocfs2_lock_res_ops *ops,
338				       void *priv)
339{
340	res->l_type          = type;
341	res->l_ops           = ops;
342	res->l_priv          = priv;
343
344	res->l_level         = LKM_IVMODE;
345	res->l_requested     = LKM_IVMODE;
346	res->l_blocking      = LKM_IVMODE;
347	res->l_action        = OCFS2_AST_INVALID;
348	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
349
350	res->l_flags         = OCFS2_LOCK_INITIALIZED;
351
352	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
353}
354
355void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
356{
357	/* This also clears out the lock status block */
358	memset(res, 0, sizeof(struct ocfs2_lock_res));
359	spin_lock_init(&res->l_lock);
360	init_waitqueue_head(&res->l_event);
361	INIT_LIST_HEAD(&res->l_blocked_list);
362	INIT_LIST_HEAD(&res->l_mask_waiters);
363}
364
365void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
366			       enum ocfs2_lock_type type,
367			       unsigned int generation,
368			       struct inode *inode)
369{
370	struct ocfs2_lock_res_ops *ops;
371
372	switch(type) {
373		case OCFS2_LOCK_TYPE_RW:
374			ops = &ocfs2_inode_rw_lops;
375			break;
376		case OCFS2_LOCK_TYPE_META:
377			ops = &ocfs2_inode_meta_lops;
378			break;
379		case OCFS2_LOCK_TYPE_DATA:
380			ops = &ocfs2_inode_data_lops;
381			break;
382		case OCFS2_LOCK_TYPE_OPEN:
383			ops = &ocfs2_inode_open_lops;
384			break;
385		default:
386			mlog_bug_on_msg(1, "type: %d\n", type);
387			ops = NULL; /* thanks, gcc */
388			break;
389	};
390
391	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
392			      generation, res->l_name);
393	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
394}
395
396static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
397{
398	struct inode *inode = ocfs2_lock_res_inode(lockres);
399
400	return OCFS2_SB(inode->i_sb);
401}
402
403static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
404{
405	__be64 inode_blkno_be;
406
407	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
408	       sizeof(__be64));
409
410	return be64_to_cpu(inode_blkno_be);
411}
412
413static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
414{
415	struct ocfs2_dentry_lock *dl = lockres->l_priv;
416
417	return OCFS2_SB(dl->dl_inode->i_sb);
418}
419
420void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
421				u64 parent, struct inode *inode)
422{
423	int len;
424	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
425	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
426	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
427
428	ocfs2_lock_res_init_once(lockres);
429
430	/*
431	 * Unfortunately, the standard lock naming scheme won't work
432	 * here because we have two 16 byte values to use. Instead,
433	 * we'll stuff the inode number as a binary value. We still
434	 * want error prints to show something without garbling the
435	 * display, so drop a null byte in there before the inode
436	 * number. A future version of OCFS2 will likely use all
437	 * binary lock names. The stringified names have been a
438	 * tremendous aid in debugging, but now that the debugfs
439	 * interface exists, we can mangle things there if need be.
440	 *
441	 * NOTE: We also drop the standard "pad" value (the total lock
442	 * name size stays the same though - the last part is all
443	 * zeros due to the memset in ocfs2_lock_res_init_once()
444	 */
445	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
446		       "%c%016llx",
447		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
448		       (long long)parent);
449
450	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
451
452	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
453	       sizeof(__be64));
454
455	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
456				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
457				   dl);
458}
459
460static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
461				      struct ocfs2_super *osb)
462{
463	/* Superblock lockres doesn't come from a slab so we call init
464	 * once on it manually.  */
465	ocfs2_lock_res_init_once(res);
466	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
467			      0, res->l_name);
468	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
469				   &ocfs2_super_lops, osb);
470}
471
472static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
473				       struct ocfs2_super *osb)
474{
475	/* Rename lockres doesn't come from a slab so we call init
476	 * once on it manually.  */
477	ocfs2_lock_res_init_once(res);
478	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
479	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
480				   &ocfs2_rename_lops, osb);
481}
482
483void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
484{
485	mlog_entry_void();
486
487	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
488		return;
489
490	ocfs2_remove_lockres_tracking(res);
491
492	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
493			"Lockres %s is on the blocked list\n",
494			res->l_name);
495	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
496			"Lockres %s has mask waiters pending\n",
497			res->l_name);
498	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
499			"Lockres %s is locked\n",
500			res->l_name);
501	mlog_bug_on_msg(res->l_ro_holders,
502			"Lockres %s has %u ro holders\n",
503			res->l_name, res->l_ro_holders);
504	mlog_bug_on_msg(res->l_ex_holders,
505			"Lockres %s has %u ex holders\n",
506			res->l_name, res->l_ex_holders);
507
508	/* Need to clear out the lock status block for the dlm */
509	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
510
511	res->l_flags = 0UL;
512	mlog_exit_void();
513}
514
515static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
516				     int level)
517{
518	mlog_entry_void();
519
520	BUG_ON(!lockres);
521
522	switch(level) {
523	case LKM_EXMODE:
524		lockres->l_ex_holders++;
525		break;
526	case LKM_PRMODE:
527		lockres->l_ro_holders++;
528		break;
529	default:
530		BUG();
531	}
532
533	mlog_exit_void();
534}
535
536static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
537				     int level)
538{
539	mlog_entry_void();
540
541	BUG_ON(!lockres);
542
543	switch(level) {
544	case LKM_EXMODE:
545		BUG_ON(!lockres->l_ex_holders);
546		lockres->l_ex_holders--;
547		break;
548	case LKM_PRMODE:
549		BUG_ON(!lockres->l_ro_holders);
550		lockres->l_ro_holders--;
551		break;
552	default:
553		BUG();
554	}
555	mlog_exit_void();
556}
557
558/* WARNING: This function lives in a world where the only three lock
559 * levels are EX, PR, and NL. It *will* have to be adjusted when more
560 * lock types are added. */
561static inline int ocfs2_highest_compat_lock_level(int level)
562{
563	int new_level = LKM_EXMODE;
564
565	if (level == LKM_EXMODE)
566		new_level = LKM_NLMODE;
567	else if (level == LKM_PRMODE)
568		new_level = LKM_PRMODE;
569	return new_level;
570}
571
572static void lockres_set_flags(struct ocfs2_lock_res *lockres,
573			      unsigned long newflags)
574{
575	struct list_head *pos, *tmp;
576	struct ocfs2_mask_waiter *mw;
577
578 	assert_spin_locked(&lockres->l_lock);
579
580	lockres->l_flags = newflags;
581
582	list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
583		mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
584		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
585			continue;
586
587		list_del_init(&mw->mw_item);
588		mw->mw_status = 0;
589		complete(&mw->mw_complete);
590	}
591}
592static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
593{
594	lockres_set_flags(lockres, lockres->l_flags | or);
595}
596static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
597				unsigned long clear)
598{
599	lockres_set_flags(lockres, lockres->l_flags & ~clear);
600}
601
602static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
603{
604	mlog_entry_void();
605
606	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
607	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
608	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
609	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
610
611	lockres->l_level = lockres->l_requested;
612	if (lockres->l_level <=
613	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
614		lockres->l_blocking = LKM_NLMODE;
615		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
616	}
617	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
618
619	mlog_exit_void();
620}
621
622static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
623{
624	mlog_entry_void();
625
626	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
627	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
628
629	/* Convert from RO to EX doesn't really need anything as our
630	 * information is already up to data. Convert from NL to
631	 * *anything* however should mark ourselves as needing an
632	 * update */
633	if (lockres->l_level == LKM_NLMODE &&
634	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
635		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
636
637	lockres->l_level = lockres->l_requested;
638	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
639
640	mlog_exit_void();
641}
642
643static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
644{
645	mlog_entry_void();
646
647	BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
648	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
649
650	if (lockres->l_requested > LKM_NLMODE &&
651	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
652	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
653		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
654
655	lockres->l_level = lockres->l_requested;
656	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
657	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
658
659	mlog_exit_void();
660}
661
662static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
663				     int level)
664{
665	int needs_downconvert = 0;
666	mlog_entry_void();
667
668	assert_spin_locked(&lockres->l_lock);
669
670	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
671
672	if (level > lockres->l_blocking) {
673		/* only schedule a downconvert if we haven't already scheduled
674		 * one that goes low enough to satisfy the level we're
675		 * blocking.  this also catches the case where we get
676		 * duplicate BASTs */
677		if (ocfs2_highest_compat_lock_level(level) <
678		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
679			needs_downconvert = 1;
680
681		lockres->l_blocking = level;
682	}
683
684	mlog_exit(needs_downconvert);
685	return needs_downconvert;
686}
687
688static void ocfs2_blocking_ast(void *opaque, int level)
689{
690	struct ocfs2_lock_res *lockres = opaque;
691	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
692	int needs_downconvert;
693	unsigned long flags;
694
695	BUG_ON(level <= LKM_NLMODE);
696
697	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
698	     lockres->l_name, level, lockres->l_level,
699	     ocfs2_lock_type_string(lockres->l_type));
700
701	spin_lock_irqsave(&lockres->l_lock, flags);
702	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
703	if (needs_downconvert)
704		ocfs2_schedule_blocked_lock(osb, lockres);
705	spin_unlock_irqrestore(&lockres->l_lock, flags);
706
707	wake_up(&lockres->l_event);
708
709	ocfs2_kick_vote_thread(osb);
710}
711
712static void ocfs2_locking_ast(void *opaque)
713{
714	struct ocfs2_lock_res *lockres = opaque;
715	struct dlm_lockstatus *lksb = &lockres->l_lksb;
716	unsigned long flags;
717
718	spin_lock_irqsave(&lockres->l_lock, flags);
719
720	if (lksb->status != DLM_NORMAL) {
721		mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
722		     lockres->l_name, lksb->status);
723		spin_unlock_irqrestore(&lockres->l_lock, flags);
724		return;
725	}
726
727	switch(lockres->l_action) {
728	case OCFS2_AST_ATTACH:
729		ocfs2_generic_handle_attach_action(lockres);
730		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
731		break;
732	case OCFS2_AST_CONVERT:
733		ocfs2_generic_handle_convert_action(lockres);
734		break;
735	case OCFS2_AST_DOWNCONVERT:
736		ocfs2_generic_handle_downconvert_action(lockres);
737		break;
738	default:
739		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
740		     "lockres flags = 0x%lx, unlock action: %u\n",
741		     lockres->l_name, lockres->l_action, lockres->l_flags,
742		     lockres->l_unlock_action);
743		BUG();
744	}
745
746	/* set it to something invalid so if we get called again we
747	 * can catch it. */
748	lockres->l_action = OCFS2_AST_INVALID;
749
750	wake_up(&lockres->l_event);
751	spin_unlock_irqrestore(&lockres->l_lock, flags);
752}
753
754static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
755						int convert)
756{
757	unsigned long flags;
758
759	mlog_entry_void();
760	spin_lock_irqsave(&lockres->l_lock, flags);
761	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
762	if (convert)
763		lockres->l_action = OCFS2_AST_INVALID;
764	else
765		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
766	spin_unlock_irqrestore(&lockres->l_lock, flags);
767
768	wake_up(&lockres->l_event);
769	mlog_exit_void();
770}
771
772/* Note: If we detect another process working on the lock (i.e.,
773 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
774 * to do the right thing in that case.
775 */
776static int ocfs2_lock_create(struct ocfs2_super *osb,
777			     struct ocfs2_lock_res *lockres,
778			     int level,
779			     int dlm_flags)
780{
781	int ret = 0;
782	enum dlm_status status = DLM_NORMAL;
783	unsigned long flags;
784
785	mlog_entry_void();
786
787	mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
788	     dlm_flags);
789
790	spin_lock_irqsave(&lockres->l_lock, flags);
791	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
792	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
793		spin_unlock_irqrestore(&lockres->l_lock, flags);
794		goto bail;
795	}
796
797	lockres->l_action = OCFS2_AST_ATTACH;
798	lockres->l_requested = level;
799	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
800	spin_unlock_irqrestore(&lockres->l_lock, flags);
801
802	status = dlmlock(osb->dlm,
803			 level,
804			 &lockres->l_lksb,
805			 dlm_flags,
806			 lockres->l_name,
807			 OCFS2_LOCK_ID_MAX_LEN - 1,
808			 ocfs2_locking_ast,
809			 lockres,
810			 ocfs2_blocking_ast);
811	if (status != DLM_NORMAL) {
812		ocfs2_log_dlm_error("dlmlock", status, lockres);
813		ret = -EINVAL;
814		ocfs2_recover_from_dlm_error(lockres, 1);
815	}
816
817	mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
818
819bail:
820	mlog_exit(ret);
821	return ret;
822}
823
824static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
825					int flag)
826{
827	unsigned long flags;
828	int ret;
829
830	spin_lock_irqsave(&lockres->l_lock, flags);
831	ret = lockres->l_flags & flag;
832	spin_unlock_irqrestore(&lockres->l_lock, flags);
833
834	return ret;
835}
836
837static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
838
839{
840	wait_event(lockres->l_event,
841		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
842}
843
844static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
845
846{
847	wait_event(lockres->l_event,
848		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
849}
850
851/* predict what lock level we'll be dropping down to on behalf
852 * of another node, and return true if the currently wanted
853 * level will be compatible with it. */
854static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
855						     int wanted)
856{
857	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
858
859	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
860}
861
862static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
863{
864	INIT_LIST_HEAD(&mw->mw_item);
865	init_completion(&mw->mw_complete);
866}
867
868static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
869{
870	wait_for_completion(&mw->mw_complete);
871	/* Re-arm the completion in case we want to wait on it again */
872	INIT_COMPLETION(mw->mw_complete);
873	return mw->mw_status;
874}
875
876static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
877				    struct ocfs2_mask_waiter *mw,
878				    unsigned long mask,
879				    unsigned long goal)
880{
881	BUG_ON(!list_empty(&mw->mw_item));
882
883	assert_spin_locked(&lockres->l_lock);
884
885	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
886	mw->mw_mask = mask;
887	mw->mw_goal = goal;
888}
889
890/* returns 0 if the mw that was removed was already satisfied, -EBUSY
891 * if the mask still hadn't reached its goal */
892static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
893				      struct ocfs2_mask_waiter *mw)
894{
895	unsigned long flags;
896	int ret = 0;
897
898	spin_lock_irqsave(&lockres->l_lock, flags);
899	if (!list_empty(&mw->mw_item)) {
900		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
901			ret = -EBUSY;
902
903		list_del_init(&mw->mw_item);
904		init_completion(&mw->mw_complete);
905	}
906	spin_unlock_irqrestore(&lockres->l_lock, flags);
907
908	return ret;
909
910}
911
912static int ocfs2_cluster_lock(struct ocfs2_super *osb,
913			      struct ocfs2_lock_res *lockres,
914			      int level,
915			      int lkm_flags,
916			      int arg_flags)
917{
918	struct ocfs2_mask_waiter mw;
919	enum dlm_status status;
920	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
921	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
922	unsigned long flags;
923
924	mlog_entry_void();
925
926	ocfs2_init_mask_waiter(&mw);
927
928	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
929		lkm_flags |= LKM_VALBLK;
930
931again:
932	wait = 0;
933
934	if (catch_signals && signal_pending(current)) {
935		ret = -ERESTARTSYS;
936		goto out;
937	}
938
939	spin_lock_irqsave(&lockres->l_lock, flags);
940
941	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
942			"Cluster lock called on freeing lockres %s! flags "
943			"0x%lx\n", lockres->l_name, lockres->l_flags);
944
945	/* We only compare against the currently granted level
946	 * here. If the lock is blocked waiting on a downconvert,
947	 * we'll get caught below. */
948	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
949	    level > lockres->l_level) {
950		/* is someone sitting in dlm_lock? If so, wait on
951		 * them. */
952		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
953		wait = 1;
954		goto unlock;
955	}
956
957	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
958		/* lock has not been created yet. */
959		spin_unlock_irqrestore(&lockres->l_lock, flags);
960
961		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
962		if (ret < 0) {
963			mlog_errno(ret);
964			goto out;
965		}
966		goto again;
967	}
968
969	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
970	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
971		/* is the lock is currently blocked on behalf of
972		 * another node */
973		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
974		wait = 1;
975		goto unlock;
976	}
977
978	if (level > lockres->l_level) {
979		if (lockres->l_action != OCFS2_AST_INVALID)
980			mlog(ML_ERROR, "lockres %s has action %u pending\n",
981			     lockres->l_name, lockres->l_action);
982
983		lockres->l_action = OCFS2_AST_CONVERT;
984		lockres->l_requested = level;
985		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
986		spin_unlock_irqrestore(&lockres->l_lock, flags);
987
988		BUG_ON(level == LKM_IVMODE);
989		BUG_ON(level == LKM_NLMODE);
990
991		mlog(0, "lock %s, convert from %d to level = %d\n",
992		     lockres->l_name, lockres->l_level, level);
993
994		/* call dlm_lock to upgrade lock now */
995		status = dlmlock(osb->dlm,
996				 level,
997				 &lockres->l_lksb,
998				 lkm_flags|LKM_CONVERT,
999				 lockres->l_name,
1000				 OCFS2_LOCK_ID_MAX_LEN - 1,
1001				 ocfs2_locking_ast,
1002				 lockres,
1003				 ocfs2_blocking_ast);
1004		if (status != DLM_NORMAL) {
1005			if ((lkm_flags & LKM_NOQUEUE) &&
1006			    (status == DLM_NOTQUEUED))
1007				ret = -EAGAIN;
1008			else {
1009				ocfs2_log_dlm_error("dlmlock", status,
1010						    lockres);
1011				ret = -EINVAL;
1012			}
1013			ocfs2_recover_from_dlm_error(lockres, 1);
1014			goto out;
1015		}
1016
1017		mlog(0, "lock %s, successfull return from dlmlock\n",
1018		     lockres->l_name);
1019
1020		/* At this point we've gone inside the dlm and need to
1021		 * complete our work regardless. */
1022		catch_signals = 0;
1023
1024		/* wait for busy to clear and carry on */
1025		goto again;
1026	}
1027
1028	/* Ok, if we get here then we're good to go. */
1029	ocfs2_inc_holders(lockres, level);
1030
1031	ret = 0;
1032unlock:
1033	spin_unlock_irqrestore(&lockres->l_lock, flags);
1034out:
1035	/*
1036	 * This is helping work around a lock inversion between the page lock
1037	 * and dlm locks.  One path holds the page lock while calling aops
1038	 * which block acquiring dlm locks.  The voting thread holds dlm
1039	 * locks while acquiring page locks while down converting data locks.
1040	 * This block is helping an aop path notice the inversion and back
1041	 * off to unlock its page lock before trying the dlm lock again.
1042	 */
1043	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1044	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1045		wait = 0;
1046		if (lockres_remove_mask_waiter(lockres, &mw))
1047			ret = -EAGAIN;
1048		else
1049			goto again;
1050	}
1051	if (wait) {
1052		ret = ocfs2_wait_for_mask(&mw);
1053		if (ret == 0)
1054			goto again;
1055		mlog_errno(ret);
1056	}
1057
1058	mlog_exit(ret);
1059	return ret;
1060}
1061
1062static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1063				 struct ocfs2_lock_res *lockres,
1064				 int level)
1065{
1066	unsigned long flags;
1067
1068	mlog_entry_void();
1069	spin_lock_irqsave(&lockres->l_lock, flags);
1070	ocfs2_dec_holders(lockres, level);
1071	ocfs2_vote_on_unlock(osb, lockres);
1072	spin_unlock_irqrestore(&lockres->l_lock, flags);
1073	mlog_exit_void();
1074}
1075
1076static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1077				 struct ocfs2_lock_res *lockres,
1078				 int ex,
1079				 int local)
1080{
1081	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1082	unsigned long flags;
1083	int lkm_flags = local ? LKM_LOCAL : 0;
1084
1085	spin_lock_irqsave(&lockres->l_lock, flags);
1086	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1087	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1088	spin_unlock_irqrestore(&lockres->l_lock, flags);
1089
1090	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1091}
1092
1093/* Grants us an EX lock on the data and metadata resources, skipping
1094 * the normal cluster directory lookup. Use this ONLY on newly created
1095 * inodes which other nodes can't possibly see, and which haven't been
1096 * hashed in the inode hash yet. This can give us a good performance
1097 * increase as it'll skip the network broadcast normally associated
1098 * with creating a new lock resource. */
1099int ocfs2_create_new_inode_locks(struct inode *inode)
1100{
1101	int ret;
1102	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1103
1104	BUG_ON(!inode);
1105	BUG_ON(!ocfs2_inode_is_new(inode));
1106
1107	mlog_entry_void();
1108
1109	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1110
1111	/* NOTE: That we don't increment any of the holder counts, nor
1112	 * do we add anything to a journal handle. Since this is
1113	 * supposed to be a new inode which the cluster doesn't know
1114	 * about yet, there is no need to.  As far as the LVB handling
1115	 * is concerned, this is basically like acquiring an EX lock
1116	 * on a resource which has an invalid one -- we'll set it
1117	 * valid when we release the EX. */
1118
1119	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1120	if (ret) {
1121		mlog_errno(ret);
1122		goto bail;
1123	}
1124
1125	/*
1126	 * We don't want to use LKM_LOCAL on a meta data lock as they
1127	 * don't use a generation in their lock names.
1128	 */
1129	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1130	if (ret) {
1131		mlog_errno(ret);
1132		goto bail;
1133	}
1134
1135	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1136	if (ret) {
1137		mlog_errno(ret);
1138		goto bail;
1139	}
1140
1141	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1142	if (ret) {
1143		mlog_errno(ret);
1144		goto bail;
1145	}
1146
1147bail:
1148	mlog_exit(ret);
1149	return ret;
1150}
1151
1152int ocfs2_rw_lock(struct inode *inode, int write)
1153{
1154	int status, level;
1155	struct ocfs2_lock_res *lockres;
1156	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1157
1158	BUG_ON(!inode);
1159
1160	mlog_entry_void();
1161
1162	mlog(0, "inode %llu take %s RW lock\n",
1163	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1164	     write ? "EXMODE" : "PRMODE");
1165
1166	if (ocfs2_mount_local(osb))
1167		return 0;
1168
1169	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1170
1171	level = write ? LKM_EXMODE : LKM_PRMODE;
1172
1173	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1174				    0);
1175	if (status < 0)
1176		mlog_errno(status);
1177
1178	mlog_exit(status);
1179	return status;
1180}
1181
1182void ocfs2_rw_unlock(struct inode *inode, int write)
1183{
1184	int level = write ? LKM_EXMODE : LKM_PRMODE;
1185	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1186	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1187
1188	mlog_entry_void();
1189
1190	mlog(0, "inode %llu drop %s RW lock\n",
1191	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1192	     write ? "EXMODE" : "PRMODE");
1193
1194	if (!ocfs2_mount_local(osb))
1195		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1196
1197	mlog_exit_void();
1198}
1199
1200/*
1201 * ocfs2_open_lock always get PR mode lock.
1202 */
1203int ocfs2_open_lock(struct inode *inode)
1204{
1205	int status = 0;
1206	struct ocfs2_lock_res *lockres;
1207	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1208
1209	BUG_ON(!inode);
1210
1211	mlog_entry_void();
1212
1213	mlog(0, "inode %llu take PRMODE open lock\n",
1214	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1215
1216	if (ocfs2_mount_local(osb))
1217		goto out;
1218
1219	lockres = &OCFS2_I(inode)->ip_open_lockres;
1220
1221	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1222				    LKM_PRMODE, 0, 0);
1223	if (status < 0)
1224		mlog_errno(status);
1225
1226out:
1227	mlog_exit(status);
1228	return status;
1229}
1230
1231int ocfs2_try_open_lock(struct inode *inode, int write)
1232{
1233	int status = 0, level;
1234	struct ocfs2_lock_res *lockres;
1235	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1236
1237	BUG_ON(!inode);
1238
1239	mlog_entry_void();
1240
1241	mlog(0, "inode %llu try to take %s open lock\n",
1242	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1243	     write ? "EXMODE" : "PRMODE");
1244
1245	if (ocfs2_mount_local(osb))
1246		goto out;
1247
1248	lockres = &OCFS2_I(inode)->ip_open_lockres;
1249
1250	level = write ? LKM_EXMODE : LKM_PRMODE;
1251
1252	/*
1253	 * The file system may already holding a PRMODE/EXMODE open lock.
1254	 * Since we pass LKM_NOQUEUE, the request won't block waiting on
1255	 * other nodes and the -EAGAIN will indicate to the caller that
1256	 * this inode is still in use.
1257	 */
1258	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1259				    level, LKM_NOQUEUE, 0);
1260
1261out:
1262	mlog_exit(status);
1263	return status;
1264}
1265
1266/*
1267 * ocfs2_open_unlock unlock PR and EX mode open locks.
1268 */
1269void ocfs2_open_unlock(struct inode *inode)
1270{
1271	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1272	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1273
1274	mlog_entry_void();
1275
1276	mlog(0, "inode %llu drop open lock\n",
1277	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1278
1279	if (ocfs2_mount_local(osb))
1280		goto out;
1281
1282	if(lockres->l_ro_holders)
1283		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1284				     LKM_PRMODE);
1285	if(lockres->l_ex_holders)
1286		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1287				     LKM_EXMODE);
1288
1289out:
1290	mlog_exit_void();
1291}
1292
1293int ocfs2_data_lock_full(struct inode *inode,
1294			 int write,
1295			 int arg_flags)
1296{
1297	int status = 0, level;
1298	struct ocfs2_lock_res *lockres;
1299	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1300
1301	BUG_ON(!inode);
1302
1303	mlog_entry_void();
1304
1305	mlog(0, "inode %llu take %s DATA lock\n",
1306	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1307	     write ? "EXMODE" : "PRMODE");
1308
1309	/* We'll allow faking a readonly data lock for
1310	 * rodevices. */
1311	if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1312		if (write) {
1313			status = -EROFS;
1314			mlog_errno(status);
1315		}
1316		goto out;
1317	}
1318
1319	if (ocfs2_mount_local(osb))
1320		goto out;
1321
1322	lockres = &OCFS2_I(inode)->ip_data_lockres;
1323
1324	level = write ? LKM_EXMODE : LKM_PRMODE;
1325
1326	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1327				    0, arg_flags);
1328	if (status < 0 && status != -EAGAIN)
1329		mlog_errno(status);
1330
1331out:
1332	mlog_exit(status);
1333	return status;
1334}
1335
1336/* see ocfs2_meta_lock_with_page() */
1337int ocfs2_data_lock_with_page(struct inode *inode,
1338			      int write,
1339			      struct page *page)
1340{
1341	int ret;
1342
1343	ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1344	if (ret == -EAGAIN) {
1345		unlock_page(page);
1346		if (ocfs2_data_lock(inode, write) == 0)
1347			ocfs2_data_unlock(inode, write);
1348		ret = AOP_TRUNCATED_PAGE;
1349	}
1350
1351	return ret;
1352}
1353
1354static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1355				 struct ocfs2_lock_res *lockres)
1356{
1357	int kick = 0;
1358
1359	mlog_entry_void();
1360
1361	/* If we know that another node is waiting on our lock, kick
1362	 * the vote thread * pre-emptively when we reach a release
1363	 * condition. */
1364	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1365		switch(lockres->l_blocking) {
1366		case LKM_EXMODE:
1367			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1368				kick = 1;
1369			break;
1370		case LKM_PRMODE:
1371			if (!lockres->l_ex_holders)
1372				kick = 1;
1373			break;
1374		default:
1375			BUG();
1376		}
1377	}
1378
1379	if (kick)
1380		ocfs2_kick_vote_thread(osb);
1381
1382	mlog_exit_void();
1383}
1384
1385void ocfs2_data_unlock(struct inode *inode,
1386		       int write)
1387{
1388	int level = write ? LKM_EXMODE : LKM_PRMODE;
1389	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1390	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1391
1392	mlog_entry_void();
1393
1394	mlog(0, "inode %llu drop %s DATA lock\n",
1395	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1396	     write ? "EXMODE" : "PRMODE");
1397
1398	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1399	    !ocfs2_mount_local(osb))
1400		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1401
1402	mlog_exit_void();
1403}
1404
1405#define OCFS2_SEC_BITS   34
1406#define OCFS2_SEC_SHIFT  (64 - 34)
1407#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1408
1409/* LVB only has room for 64 bits of time here so we pack it for
1410 * now. */
1411static u64 ocfs2_pack_timespec(struct timespec *spec)
1412{
1413	u64 res;
1414	u64 sec = spec->tv_sec;
1415	u32 nsec = spec->tv_nsec;
1416
1417	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1418
1419	return res;
1420}
1421
1422/* Call this with the lockres locked. I am reasonably sure we don't
1423 * need ip_lock in this function as anyone who would be changing those
1424 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1425static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1426{
1427	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1428	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1429	struct ocfs2_meta_lvb *lvb;
1430
1431	mlog_entry_void();
1432
1433	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1434
1435	/*
1436	 * Invalidate the LVB of a deleted inode - this way other
1437	 * nodes are forced to go to disk and discover the new inode
1438	 * status.
1439	 */
1440	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1441		lvb->lvb_version = 0;
1442		goto out;
1443	}
1444
1445	lvb->lvb_version   = OCFS2_LVB_VERSION;
1446	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1447	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1448	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1449	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1450	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1451	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1452	lvb->lvb_iatime_packed  =
1453		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1454	lvb->lvb_ictime_packed =
1455		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1456	lvb->lvb_imtime_packed =
1457		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1458	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1459	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1460
1461out:
1462	mlog_meta_lvb(0, lockres);
1463
1464	mlog_exit_void();
1465}
1466
1467static void ocfs2_unpack_timespec(struct timespec *spec,
1468				  u64 packed_time)
1469{
1470	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1471	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1472}
1473
1474static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1475{
1476	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1477	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1478	struct ocfs2_meta_lvb *lvb;
1479
1480	mlog_entry_void();
1481
1482	mlog_meta_lvb(0, lockres);
1483
1484	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1485
1486	/* We're safe here without the lockres lock... */
1487	spin_lock(&oi->ip_lock);
1488	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1489	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1490
1491	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1492	ocfs2_set_inode_flags(inode);
1493
1494	/* fast-symlinks are a special case */
1495	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1496		inode->i_blocks = 0;
1497	else
1498		inode->i_blocks =
1499			ocfs2_align_bytes_to_sectors(i_size_read(inode));
1500
1501	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1502	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1503	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1504	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1505	ocfs2_unpack_timespec(&inode->i_atime,
1506			      be64_to_cpu(lvb->lvb_iatime_packed));
1507	ocfs2_unpack_timespec(&inode->i_mtime,
1508			      be64_to_cpu(lvb->lvb_imtime_packed));
1509	ocfs2_unpack_timespec(&inode->i_ctime,
1510			      be64_to_cpu(lvb->lvb_ictime_packed));
1511	spin_unlock(&oi->ip_lock);
1512
1513	mlog_exit_void();
1514}
1515
1516static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1517					      struct ocfs2_lock_res *lockres)
1518{
1519	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1520
1521	if (lvb->lvb_version == OCFS2_LVB_VERSION
1522	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1523		return 1;
1524	return 0;
1525}
1526
1527/* Determine whether a lock resource needs to be refreshed, and
1528 * arbitrate who gets to refresh it.
1529 *
1530 *   0 means no refresh needed.
1531 *
1532 *   > 0 means you need to refresh this and you MUST call
1533 *   ocfs2_complete_lock_res_refresh afterwards. */
1534static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1535{
1536	unsigned long flags;
1537	int status = 0;
1538
1539	mlog_entry_void();
1540
1541refresh_check:
1542	spin_lock_irqsave(&lockres->l_lock, flags);
1543	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1544		spin_unlock_irqrestore(&lockres->l_lock, flags);
1545		goto bail;
1546	}
1547
1548	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1549		spin_unlock_irqrestore(&lockres->l_lock, flags);
1550
1551		ocfs2_wait_on_refreshing_lock(lockres);
1552		goto refresh_check;
1553	}
1554
1555	/* Ok, I'll be the one to refresh this lock. */
1556	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1557	spin_unlock_irqrestore(&lockres->l_lock, flags);
1558
1559	status = 1;
1560bail:
1561	mlog_exit(status);
1562	return status;
1563}
1564
1565/* If status is non zero, I'll mark it as not being in refresh
1566 * anymroe, but i won't clear the needs refresh flag. */
1567static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1568						   int status)
1569{
1570	unsigned long flags;
1571	mlog_entry_void();
1572
1573	spin_lock_irqsave(&lockres->l_lock, flags);
1574	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1575	if (!status)
1576		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1577	spin_unlock_irqrestore(&lockres->l_lock, flags);
1578
1579	wake_up(&lockres->l_event);
1580
1581	mlog_exit_void();
1582}
1583
1584/* may or may not return a bh if it went to disk. */
1585static int ocfs2_meta_lock_update(struct inode *inode,
1586				  struct buffer_head **bh)
1587{
1588	int status = 0;
1589	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1590	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1591	struct ocfs2_dinode *fe;
1592	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1593
1594	mlog_entry_void();
1595
1596	if (ocfs2_mount_local(osb))
1597		goto bail;
1598
1599	spin_lock(&oi->ip_lock);
1600	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1601		mlog(0, "Orphaned inode %llu was deleted while we "
1602		     "were waiting on a lock. ip_flags = 0x%x\n",
1603		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
1604		spin_unlock(&oi->ip_lock);
1605		status = -ENOENT;
1606		goto bail;
1607	}
1608	spin_unlock(&oi->ip_lock);
1609
1610	if (!ocfs2_should_refresh_lock_res(lockres))
1611		goto bail;
1612
1613	/* This will discard any caching information we might have had
1614	 * for the inode metadata. */
1615	ocfs2_metadata_cache_purge(inode);
1616
1617	/* will do nothing for inode types that don't use the extent
1618	 * map (bitmap files, etc) */
1619	ocfs2_extent_map_trunc(inode, 0);
1620
1621	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1622		mlog(0, "Trusting LVB on inode %llu\n",
1623		     (unsigned long long)oi->ip_blkno);
1624		ocfs2_refresh_inode_from_lvb(inode);
1625	} else {
1626		/* Boo, we have to go to disk. */
1627		/* read bh, cast, ocfs2_refresh_inode */
1628		status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1629					  bh, OCFS2_BH_CACHED, inode);
1630		if (status < 0) {
1631			mlog_errno(status);
1632			goto bail_refresh;
1633		}
1634		fe = (struct ocfs2_dinode *) (*bh)->b_data;
1635
1636		/* This is a good chance to make sure we're not
1637		 * locking an invalid object.
1638		 *
1639		 * We bug on a stale inode here because we checked
1640		 * above whether it was wiped from disk. The wiping
1641		 * node provides a guarantee that we receive that
1642		 * message and can mark the inode before dropping any
1643		 * locks associated with it. */
1644		if (!OCFS2_IS_VALID_DINODE(fe)) {
1645			OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1646			status = -EIO;
1647			goto bail_refresh;
1648		}
1649		mlog_bug_on_msg(inode->i_generation !=
1650				le32_to_cpu(fe->i_generation),
1651				"Invalid dinode %llu disk generation: %u "
1652				"inode->i_generation: %u\n",
1653				(unsigned long long)oi->ip_blkno,
1654				le32_to_cpu(fe->i_generation),
1655				inode->i_generation);
1656		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1657				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1658				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
1659				(unsigned long long)oi->ip_blkno,
1660				(unsigned long long)le64_to_cpu(fe->i_dtime),
1661				le32_to_cpu(fe->i_flags));
1662
1663		ocfs2_refresh_inode(inode, fe);
1664	}
1665
1666	status = 0;
1667bail_refresh:
1668	ocfs2_complete_lock_res_refresh(lockres, status);
1669bail:
1670	mlog_exit(status);
1671	return status;
1672}
1673
1674static int ocfs2_assign_bh(struct inode *inode,
1675			   struct buffer_head **ret_bh,
1676			   struct buffer_head *passed_bh)
1677{
1678	int status;
1679
1680	if (passed_bh) {
1681		/* Ok, the update went to disk for us, use the
1682		 * returned bh. */
1683		*ret_bh = passed_bh;
1684		get_bh(*ret_bh);
1685
1686		return 0;
1687	}
1688
1689	status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1690				  OCFS2_I(inode)->ip_blkno,
1691				  ret_bh,
1692				  OCFS2_BH_CACHED,
1693				  inode);
1694	if (status < 0)
1695		mlog_errno(status);
1696
1697	return status;
1698}
1699
1700/*
1701 * returns < 0 error if the callback will never be called, otherwise
1702 * the result of the lock will be communicated via the callback.
1703 */
1704int ocfs2_meta_lock_full(struct inode *inode,
1705			 struct buffer_head **ret_bh,
1706			 int ex,
1707			 int arg_flags)
1708{
1709	int status, level, dlm_flags, acquired;
1710	struct ocfs2_lock_res *lockres = NULL;
1711	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1712	struct buffer_head *local_bh = NULL;
1713
1714	BUG_ON(!inode);
1715
1716	mlog_entry_void();
1717
1718	mlog(0, "inode %llu, take %s META lock\n",
1719	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1720	     ex ? "EXMODE" : "PRMODE");
1721
1722	status = 0;
1723	acquired = 0;
1724	/* We'll allow faking a readonly metadata lock for
1725	 * rodevices. */
1726	if (ocfs2_is_hard_readonly(osb)) {
1727		if (ex)
1728			status = -EROFS;
1729		goto bail;
1730	}
1731
1732	if (ocfs2_mount_local(osb))
1733		goto local;
1734
1735	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1736		wait_event(osb->recovery_event,
1737			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1738
1739	lockres = &OCFS2_I(inode)->ip_meta_lockres;
1740	level = ex ? LKM_EXMODE : LKM_PRMODE;
1741	dlm_flags = 0;
1742	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1743		dlm_flags |= LKM_NOQUEUE;
1744
1745	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1746	if (status < 0) {
1747		if (status != -EAGAIN && status != -EIOCBRETRY)
1748			mlog_errno(status);
1749		goto bail;
1750	}
1751
1752	/* Notify the error cleanup path to drop the cluster lock. */
1753	acquired = 1;
1754
1755	/* We wait twice because a node may have died while we were in
1756	 * the lower dlm layers. The second time though, we've
1757	 * committed to owning this lock so we don't allow signals to
1758	 * abort the operation. */
1759	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1760		wait_event(osb->recovery_event,
1761			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1762
1763local:
1764	/*
1765	 * We only see this flag if we're being called from
1766	 * ocfs2_read_locked_inode(). It means we're locking an inode
1767	 * which hasn't been populated yet, so clear the refresh flag
1768	 * and let the caller handle it.
1769	 */
1770	if (inode->i_state & I_NEW) {
1771		status = 0;
1772		if (lockres)
1773			ocfs2_complete_lock_res_refresh(lockres, 0);
1774		goto bail;
1775	}
1776
1777	/* This is fun. The caller may want a bh back, or it may
1778	 * not. ocfs2_meta_lock_update definitely wants one in, but
1779	 * may or may not read one, depending on what's in the
1780	 * LVB. The result of all of this is that we've *only* gone to
1781	 * disk if we have to, so the complexity is worthwhile. */
1782	status = ocfs2_meta_lock_update(inode, &local_bh);
1783	if (status < 0) {
1784		if (status != -ENOENT)
1785			mlog_errno(status);
1786		goto bail;
1787	}
1788
1789	if (ret_bh) {
1790		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1791		if (status < 0) {
1792			mlog_errno(status);
1793			goto bail;
1794		}
1795	}
1796
1797bail:
1798	if (status < 0) {
1799		if (ret_bh && (*ret_bh)) {
1800			brelse(*ret_bh);
1801			*ret_bh = NULL;
1802		}
1803		if (acquired)
1804			ocfs2_meta_unlock(inode, ex);
1805	}
1806
1807	if (local_bh)
1808		brelse(local_bh);
1809
1810	mlog_exit(status);
1811	return status;
1812}
1813
1814/*
1815 * This is working around a lock inversion between tasks acquiring DLM locks
1816 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1817 * while acquiring page locks.
1818 *
1819 * ** These _with_page variantes are only intended to be called from aop
1820 * methods that hold page locks and return a very specific *positive* error
1821 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1822 *
1823 * The DLM is called such that it returns -EAGAIN if it would have blocked
1824 * waiting for the vote thread.  In that case we unlock our page so the vote
1825 * thread can make progress.  Once we've done this we have to return
1826 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1827 * into the VFS who will then immediately retry the aop call.
1828 *
1829 * We do a blocking lock and immediate unlock before returning, though, so that
1830 * the lock has a great chance of being cached on this node by the time the VFS
1831 * calls back to retry the aop.    This has a potential to livelock as nodes
1832 * ping locks back and forth, but that's a risk we're willing to take to avoid
1833 * the lock inversion simply.
1834 */
1835int ocfs2_meta_lock_with_page(struct inode *inode,
1836			      struct buffer_head **ret_bh,
1837			      int ex,
1838			      struct page *page)
1839{
1840	int ret;
1841
1842	ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
1843	if (ret == -EAGAIN) {
1844		unlock_page(page);
1845		if (ocfs2_meta_lock(inode, ret_bh, ex) == 0)
1846			ocfs2_meta_unlock(inode, ex);
1847		ret = AOP_TRUNCATED_PAGE;
1848	}
1849
1850	return ret;
1851}
1852
1853int ocfs2_meta_lock_atime(struct inode *inode,
1854			  struct vfsmount *vfsmnt,
1855			  int *level)
1856{
1857	int ret;
1858
1859	mlog_entry_void();
1860	ret = ocfs2_meta_lock(inode, NULL, 0);
1861	if (ret < 0) {
1862		mlog_errno(ret);
1863		return ret;
1864	}
1865
1866	/*
1867	 * If we should update atime, we will get EX lock,
1868	 * otherwise we just get PR lock.
1869	 */
1870	if (ocfs2_should_update_atime(inode, vfsmnt)) {
1871		struct buffer_head *bh = NULL;
1872
1873		ocfs2_meta_unlock(inode, 0);
1874		ret = ocfs2_meta_lock(inode, &bh, 1);
1875		if (ret < 0) {
1876			mlog_errno(ret);
1877			return ret;
1878		}
1879		*level = 1;
1880		if (ocfs2_should_update_atime(inode, vfsmnt))
1881			ocfs2_update_inode_atime(inode, bh);
1882		if (bh)
1883			brelse(bh);
1884	} else
1885		*level = 0;
1886
1887	mlog_exit(ret);
1888	return ret;
1889}
1890
1891void ocfs2_meta_unlock(struct inode *inode,
1892		       int ex)
1893{
1894	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1895	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1896	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1897
1898	mlog_entry_void();
1899
1900	mlog(0, "inode %llu drop %s META lock\n",
1901	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1902	     ex ? "EXMODE" : "PRMODE");
1903
1904	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1905	    !ocfs2_mount_local(osb))
1906		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1907
1908	mlog_exit_void();
1909}
1910
1911int ocfs2_super_lock(struct ocfs2_super *osb,
1912		     int ex)
1913{
1914	int status = 0;
1915	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1916	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1917	struct buffer_head *bh;
1918	struct ocfs2_slot_info *si = osb->slot_info;
1919
1920	mlog_entry_void();
1921
1922	if (ocfs2_is_hard_readonly(osb))
1923		return -EROFS;
1924
1925	if (ocfs2_mount_local(osb))
1926		goto bail;
1927
1928	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1929	if (status < 0) {
1930		mlog_errno(status);
1931		goto bail;
1932	}
1933
1934	/* The super block lock path is really in the best position to
1935	 * know when resources covered by the lock need to be
1936	 * refreshed, so we do it here. Of course, making sense of
1937	 * everything is up to the caller :) */
1938	status = ocfs2_should_refresh_lock_res(lockres);
1939	if (status < 0) {
1940		mlog_errno(status);
1941		goto bail;
1942	}
1943	if (status) {
1944		bh = si->si_bh;
1945		status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1946					  si->si_inode);
1947		if (status == 0)
1948			ocfs2_update_slot_info(si);
1949
1950		ocfs2_complete_lock_res_refresh(lockres, status);
1951
1952		if (status < 0)
1953			mlog_errno(status);
1954	}
1955bail:
1956	mlog_exit(status);
1957	return status;
1958}
1959
1960void ocfs2_super_unlock(struct ocfs2_super *osb,
1961			int ex)
1962{
1963	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1964	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1965
1966	if (!ocfs2_mount_local(osb))
1967		ocfs2_cluster_unlock(osb, lockres, level);
1968}
1969
1970int ocfs2_rename_lock(struct ocfs2_super *osb)
1971{
1972	int status;
1973	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1974
1975	if (ocfs2_is_hard_readonly(osb))
1976		return -EROFS;
1977
1978	if (ocfs2_mount_local(osb))
1979		return 0;
1980
1981	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1982	if (status < 0)
1983		mlog_errno(status);
1984
1985	return status;
1986}
1987
1988void ocfs2_rename_unlock(struct ocfs2_super *osb)
1989{
1990	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1991
1992	if (!ocfs2_mount_local(osb))
1993		ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1994}
1995
1996int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1997{
1998	int ret;
1999	int level = ex ? LKM_EXMODE : LKM_PRMODE;
2000	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2001	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2002
2003	BUG_ON(!dl);
2004
2005	if (ocfs2_is_hard_readonly(osb))
2006		return -EROFS;
2007
2008	if (ocfs2_mount_local(osb))
2009		return 0;
2010
2011	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2012	if (ret < 0)
2013		mlog_errno(ret);
2014
2015	return ret;
2016}
2017
2018void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2019{
2020	int level = ex ? LKM_EXMODE : LKM_PRMODE;
2021	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2022	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2023
2024	if (!ocfs2_mount_local(osb))
2025		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2026}
2027
2028/* Reference counting of the dlm debug structure. We want this because
2029 * open references on the debug inodes can live on after a mount, so
2030 * we can't rely on the ocfs2_super to always exist. */
2031static void ocfs2_dlm_debug_free(struct kref *kref)
2032{
2033	struct ocfs2_dlm_debug *dlm_debug;
2034
2035	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2036
2037	kfree(dlm_debug);
2038}
2039
2040void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2041{
2042	if (dlm_debug)
2043		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2044}
2045
2046static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2047{
2048	kref_get(&debug->d_refcnt);
2049}
2050
2051struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2052{
2053	struct ocfs2_dlm_debug *dlm_debug;
2054
2055	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2056	if (!dlm_debug) {
2057		mlog_errno(-ENOMEM);
2058		goto out;
2059	}
2060
2061	kref_init(&dlm_debug->d_refcnt);
2062	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2063	dlm_debug->d_locking_state = NULL;
2064out:
2065	return dlm_debug;
2066}
2067
2068/* Access to this is arbitrated for us via seq_file->sem. */
2069struct ocfs2_dlm_seq_priv {
2070	struct ocfs2_dlm_debug *p_dlm_debug;
2071	struct ocfs2_lock_res p_iter_res;
2072	struct ocfs2_lock_res p_tmp_res;
2073};
2074
2075static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2076						 struct ocfs2_dlm_seq_priv *priv)
2077{
2078	struct ocfs2_lock_res *iter, *ret = NULL;
2079	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2080
2081	assert_spin_locked(&ocfs2_dlm_tracking_lock);
2082
2083	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2084		/* discover the head of the list */
2085		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2086			mlog(0, "End of list found, %p\n", ret);
2087			break;
2088		}
2089
2090		/* We track our "dummy" iteration lockres' by a NULL
2091		 * l_ops field. */
2092		if (iter->l_ops != NULL) {
2093			ret = iter;
2094			break;
2095		}
2096	}
2097
2098	return ret;
2099}
2100
2101static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2102{
2103	struct ocfs2_dlm_seq_priv *priv = m->private;
2104	struct ocfs2_lock_res *iter;
2105
2106	spin_lock(&ocfs2_dlm_tracking_lock);
2107	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2108	if (iter) {
2109		/* Since lockres' have the lifetime of their container
2110		 * (which can be inodes, ocfs2_supers, etc) we want to
2111		 * copy this out to a temporary lockres while still
2112		 * under the spinlock. Obviously after this we can't
2113		 * trust any pointers on the copy returned, but that's
2114		 * ok as the information we want isn't typically held
2115		 * in them. */
2116		priv->p_tmp_res = *iter;
2117		iter = &priv->p_tmp_res;
2118	}
2119	spin_unlock(&ocfs2_dlm_tracking_lock);
2120
2121	return iter;
2122}
2123
2124static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2125{
2126}
2127
2128static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2129{
2130	struct ocfs2_dlm_seq_priv *priv = m->private;
2131	struct ocfs2_lock_res *iter = v;
2132	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2133
2134	spin_lock(&ocfs2_dlm_tracking_lock);
2135	iter = ocfs2_dlm_next_res(iter, priv);
2136	list_del_init(&dummy->l_debug_list);
2137	if (iter) {
2138		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2139		priv->p_tmp_res = *iter;
2140		iter = &priv->p_tmp_res;
2141	}
2142	spin_unlock(&ocfs2_dlm_tracking_lock);
2143
2144	return iter;
2145}
2146
2147/* So that debugfs.ocfs2 can determine which format is being used */
2148#define OCFS2_DLM_DEBUG_STR_VERSION 1
2149static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2150{
2151	int i;
2152	char *lvb;
2153	struct ocfs2_lock_res *lockres = v;
2154
2155	if (!lockres)
2156		return -EINVAL;
2157
2158	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2159
2160	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2161		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2162			   lockres->l_name,
2163			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2164	else
2165		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2166
2167	seq_printf(m, "%d\t"
2168		   "0x%lx\t"
2169		   "0x%x\t"
2170		   "0x%x\t"
2171		   "%u\t"
2172		   "%u\t"
2173		   "%d\t"
2174		   "%d\t",
2175		   lockres->l_level,
2176		   lockres->l_flags,
2177		   lockres->l_action,
2178		   lockres->l_unlock_action,
2179		   lockres->l_ro_holders,
2180		   lockres->l_ex_holders,
2181		   lockres->l_requested,
2182		   lockres->l_blocking);
2183
2184	/* Dump the raw LVB */
2185	lvb = lockres->l_lksb.lvb;
2186	for(i = 0; i < DLM_LVB_LEN; i++)
2187		seq_printf(m, "0x%x\t", lvb[i]);
2188
2189	/* End the line */
2190	seq_printf(m, "\n");
2191	return 0;
2192}
2193
2194static struct seq_operations ocfs2_dlm_seq_ops = {
2195	.start =	ocfs2_dlm_seq_start,
2196	.stop =		ocfs2_dlm_seq_stop,
2197	.next =		ocfs2_dlm_seq_next,
2198	.show =		ocfs2_dlm_seq_show,
2199};
2200
2201static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2202{
2203	struct seq_file *seq = (struct seq_file *) file->private_data;
2204	struct ocfs2_dlm_seq_priv *priv = seq->private;
2205	struct ocfs2_lock_res *res = &priv->p_iter_res;
2206
2207	ocfs2_remove_lockres_tracking(res);
2208	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2209	return seq_release_private(inode, file);
2210}
2211
2212static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2213{
2214	int ret;
2215	struct ocfs2_dlm_seq_priv *priv;
2216	struct seq_file *seq;
2217	struct ocfs2_super *osb;
2218
2219	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2220	if (!priv) {
2221		ret = -ENOMEM;
2222		mlog_errno(ret);
2223		goto out;
2224	}
2225	osb = inode->i_private;
2226	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2227	priv->p_dlm_debug = osb->osb_dlm_debug;
2228	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2229
2230	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2231	if (ret) {
2232		kfree(priv);
2233		mlog_errno(ret);
2234		goto out;
2235	}
2236
2237	seq = (struct seq_file *) file->private_data;
2238	seq->private = priv;
2239
2240	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2241				   priv->p_dlm_debug);
2242
2243out:
2244	return ret;
2245}
2246
2247static const struct file_operations ocfs2_dlm_debug_fops = {
2248	.open =		ocfs2_dlm_debug_open,
2249	.release =	ocfs2_dlm_debug_release,
2250	.read =		seq_read,
2251	.llseek =	seq_lseek,
2252};
2253
2254static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2255{
2256	int ret = 0;
2257	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2258
2259	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2260							 S_IFREG|S_IRUSR,
2261							 osb->osb_debug_root,
2262							 osb,
2263							 &ocfs2_dlm_debug_fops);
2264	if (!dlm_debug->d_locking_state) {
2265		ret = -EINVAL;
2266		mlog(ML_ERROR,
2267		     "Unable to create locking state debugfs file.\n");
2268		goto out;
2269	}
2270
2271	ocfs2_get_dlm_debug(dlm_debug);
2272out:
2273	return ret;
2274}
2275
2276static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2277{
2278	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2279
2280	if (dlm_debug) {
2281		debugfs_remove(dlm_debug->d_locking_state);
2282		ocfs2_put_dlm_debug(dlm_debug);
2283	}
2284}
2285
2286int ocfs2_dlm_init(struct ocfs2_super *osb)
2287{
2288	int status = 0;
2289	u32 dlm_key;
2290	struct dlm_ctxt *dlm = NULL;
2291
2292	mlog_entry_void();
2293
2294	if (ocfs2_mount_local(osb))
2295		goto local;
2296
2297	status = ocfs2_dlm_init_debug(osb);
2298	if (status < 0) {
2299		mlog_errno(status);
2300		goto bail;
2301	}
2302
2303	/* launch vote thread */
2304	osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2305	if (IS_ERR(osb->vote_task)) {
2306		status = PTR_ERR(osb->vote_task);
2307		osb->vote_task = NULL;
2308		mlog_errno(status);
2309		goto bail;
2310	}
2311
2312	/* used by the dlm code to make message headers unique, each
2313	 * node in this domain must agree on this. */
2314	dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2315
2316	/* for now, uuid == domain */
2317	dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2318	if (IS_ERR(dlm)) {
2319		status = PTR_ERR(dlm);
2320		mlog_errno(status);
2321		goto bail;
2322	}
2323
2324	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2325
2326local:
2327	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2328	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2329
2330	osb->dlm = dlm;
2331
2332	status = 0;
2333bail:
2334	if (status < 0) {
2335		ocfs2_dlm_shutdown_debug(osb);
2336		if (osb->vote_task)
2337			kthread_stop(osb->vote_task);
2338	}
2339
2340	mlog_exit(status);
2341	return status;
2342}
2343
2344void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2345{
2346	mlog_entry_void();
2347
2348	dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2349
2350	ocfs2_drop_osb_locks(osb);
2351
2352	if (osb->vote_task) {
2353		kthread_stop(osb->vote_task);
2354		osb->vote_task = NULL;
2355	}
2356
2357	ocfs2_lock_res_free(&osb->osb_super_lockres);
2358	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2359
2360	dlm_unregister_domain(osb->dlm);
2361	osb->dlm = NULL;
2362
2363	ocfs2_dlm_shutdown_debug(osb);
2364
2365	mlog_exit_void();
2366}
2367
2368static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2369{
2370	struct ocfs2_lock_res *lockres = opaque;
2371	unsigned long flags;
2372
2373	mlog_entry_void();
2374
2375	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2376	     lockres->l_unlock_action);
2377
2378	spin_lock_irqsave(&lockres->l_lock, flags);
2379	/* We tried to cancel a convert request, but it was already
2380	 * granted. All we want to do here is clear our unlock
2381	 * state. The wake_up call done at the bottom is redundant
2382	 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2383	 * hurt anything anyway */
2384	if (status == DLM_CANCELGRANT &&
2385	    lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2386		mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2387
2388		/* We don't clear the busy flag in this case as it
2389		 * should have been cleared by the ast which the dlm
2390		 * has called. */
2391		goto complete_unlock;
2392	}
2393
2394	if (status != DLM_NORMAL) {
2395		mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2396		     "unlock_action %d\n", status, lockres->l_name,
2397		     lockres->l_unlock_action);
2398		spin_unlock_irqrestore(&lockres->l_lock, flags);
2399		return;
2400	}
2401
2402	switch(lockres->l_unlock_action) {
2403	case OCFS2_UNLOCK_CANCEL_CONVERT:
2404		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2405		lockres->l_action = OCFS2_AST_INVALID;
2406		break;
2407	case OCFS2_UNLOCK_DROP_LOCK:
2408		lockres->l_level = LKM_IVMODE;
2409		break;
2410	default:
2411		BUG();
2412	}
2413
2414	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2415complete_unlock:
2416	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2417	spin_unlock_irqrestore(&lockres->l_lock, flags);
2418
2419	wake_up(&lockres->l_event);
2420
2421	mlog_exit_void();
2422}
2423
2424static int ocfs2_drop_lock(struct ocfs2_super *osb,
2425			   struct ocfs2_lock_res *lockres)
2426{
2427	enum dlm_status status;
2428	unsigned long flags;
2429	int lkm_flags = 0;
2430
2431	/* We didn't get anywhere near actually using this lockres. */
2432	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2433		goto out;
2434
2435	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2436		lkm_flags |= LKM_VALBLK;
2437
2438	spin_lock_irqsave(&lockres->l_lock, flags);
2439
2440	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2441			"lockres %s, flags 0x%lx\n",
2442			lockres->l_name, lockres->l_flags);
2443
2444	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2445		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2446		     "%u, unlock_action = %u\n",
2447		     lockres->l_name, lockres->l_flags, lockres->l_action,
2448		     lockres->l_unlock_action);
2449
2450		spin_unlock_irqrestore(&lockres->l_lock, flags);
2451
2452		/* XXX: Today we just wait on any busy
2453		 * locks... Perhaps we need to cancel converts in the
2454		 * future? */
2455		ocfs2_wait_on_busy_lock(lockres);
2456
2457		spin_lock_irqsave(&lockres->l_lock, flags);
2458	}
2459
2460	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2461		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2462		    lockres->l_level == LKM_EXMODE &&
2463		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2464			lockres->l_ops->set_lvb(lockres);
2465	}
2466
2467	if (lockres->l_flags & OCFS2_LOCK_BUSY)
2468		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2469		     lockres->l_name);
2470	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2471		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2472
2473	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2474		spin_unlock_irqrestore(&lockres->l_lock, flags);
2475		goto out;
2476	}
2477
2478	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2479
2480	/* make sure we never get here while waiting for an ast to
2481	 * fire. */
2482	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2483
2484	/* is this necessary? */
2485	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2486	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2487	spin_unlock_irqrestore(&lockres->l_lock, flags);
2488
2489	mlog(0, "lock %s\n", lockres->l_name);
2490
2491	status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2492			   ocfs2_unlock_ast, lockres);
2493	if (status != DLM_NORMAL) {
2494		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2495		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2496		dlm_print_one_lock(lockres->l_lksb.lockid);
2497		BUG();
2498	}
2499	mlog(0, "lock %s, successfull return from dlmunlock\n",
2500	     lockres->l_name);
2501
2502	ocfs2_wait_on_busy_lock(lockres);
2503out:
2504	mlog_exit(0);
2505	return 0;
2506}
2507
2508/* Mark the lockres as being dropped. It will no longer be
2509 * queued if blocking, but we still may have to wait on it
2510 * being dequeued from the vote thread before we can consider
2511 * it safe to drop.
2512 *
2513 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2514void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2515{
2516	int status;
2517	struct ocfs2_mask_waiter mw;
2518	unsigned long flags;
2519
2520	ocfs2_init_mask_waiter(&mw);
2521
2522	spin_lock_irqsave(&lockres->l_lock, flags);
2523	lockres->l_flags |= OCFS2_LOCK_FREEING;
2524	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2525		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2526		spin_unlock_irqrestore(&lockres->l_lock, flags);
2527
2528		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2529
2530		status = ocfs2_wait_for_mask(&mw);
2531		if (status)
2532			mlog_errno(status);
2533
2534		spin_lock_irqsave(&lockres->l_lock, flags);
2535	}
2536	spin_unlock_irqrestore(&lockres->l_lock, flags);
2537}
2538
2539void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2540			       struct ocfs2_lock_res *lockres)
2541{
2542	int ret;
2543
2544	ocfs2_mark_lockres_freeing(lockres);
2545	ret = ocfs2_drop_lock(osb, lockres);
2546	if (ret)
2547		mlog_errno(ret);
2548}
2549
2550static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2551{
2552	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2553	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2554}
2555
2556int ocfs2_drop_inode_locks(struct inode *inode)
2557{
2558	int status, err;
2559
2560	mlog_entry_void();
2561
2562	/* No need to call ocfs2_mark_lockres_freeing here -
2563	 * ocfs2_clear_inode has done it for us. */
2564
2565	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2566			      &OCFS2_I(inode)->ip_open_lockres);
2567	if (err < 0)
2568		mlog_errno(err);
2569
2570	status = err;
2571
2572	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2573			      &OCFS2_I(inode)->ip_data_lockres);
2574	if (err < 0)
2575		mlog_errno(err);
2576	if (err < 0 && !status)
2577		status = err;
2578
2579	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2580			      &OCFS2_I(inode)->ip_meta_lockres);
2581	if (err < 0)
2582		mlog_errno(err);
2583	if (err < 0 && !status)
2584		status = err;
2585
2586	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2587			      &OCFS2_I(inode)->ip_rw_lockres);
2588	if (err < 0)
2589		mlog_errno(err);
2590	if (err < 0 && !status)
2591		status = err;
2592
2593	mlog_exit(status);
2594	return status;
2595}
2596
2597static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2598				      int new_level)
2599{
2600	assert_spin_locked(&lockres->l_lock);
2601
2602	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2603
2604	if (lockres->l_level <= new_level) {
2605		mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2606		     lockres->l_level, new_level);
2607		BUG();
2608	}
2609
2610	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2611	     lockres->l_name, new_level, lockres->l_blocking);
2612
2613	lockres->l_action = OCFS2_AST_DOWNCONVERT;
2614	lockres->l_requested = new_level;
2615	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2616}
2617
2618static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2619				  struct ocfs2_lock_res *lockres,
2620				  int new_level,
2621				  int lvb)
2622{
2623	int ret, dlm_flags = LKM_CONVERT;
2624	enum dlm_status status;
2625
2626	mlog_entry_void();
2627
2628	if (lvb)
2629		dlm_flags |= LKM_VALBLK;
2630
2631	status = dlmlock(osb->dlm,
2632			 new_level,
2633			 &lockres->l_lksb,
2634			 dlm_flags,
2635			 lockres->l_name,
2636			 OCFS2_LOCK_ID_MAX_LEN - 1,
2637			 ocfs2_locking_ast,
2638			 lockres,
2639			 ocfs2_blocking_ast);
2640	if (status != DLM_NORMAL) {
2641		ocfs2_log_dlm_error("dlmlock", status, lockres);
2642		ret = -EINVAL;
2643		ocfs2_recover_from_dlm_error(lockres, 1);
2644		goto bail;
2645	}
2646
2647	ret = 0;
2648bail:
2649	mlog_exit(ret);
2650	return ret;
2651}
2652
2653/* returns 1 when the caller should unlock and call dlmunlock */
2654static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2655				        struct ocfs2_lock_res *lockres)
2656{
2657	assert_spin_locked(&lockres->l_lock);
2658
2659	mlog_entry_void();
2660	mlog(0, "lock %s\n", lockres->l_name);
2661
2662	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2663		/* If we're already trying to cancel a lock conversion
2664		 * then just drop the spinlock and allow the caller to
2665		 * requeue this lock. */
2666
2667		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2668		return 0;
2669	}
2670
2671	/* were we in a convert when we got the bast fire? */
2672	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2673	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
2674	/* set things up for the unlockast to know to just
2675	 * clear out the ast_action and unset busy, etc. */
2676	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2677
2678	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2679			"lock %s, invalid flags: 0x%lx\n",
2680			lockres->l_name, lockres->l_flags);
2681
2682	return 1;
2683}
2684
2685static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2686				struct ocfs2_lock_res *lockres)
2687{
2688	int ret;
2689	enum dlm_status status;
2690
2691	mlog_entry_void();
2692	mlog(0, "lock %s\n", lockres->l_name);
2693
2694	ret = 0;
2695	status = dlmunlock(osb->dlm,
2696			   &lockres->l_lksb,
2697			   LKM_CANCEL,
2698			   ocfs2_unlock_ast,
2699			   lockres);
2700	if (status != DLM_NORMAL) {
2701		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2702		ret = -EINVAL;
2703		ocfs2_recover_from_dlm_error(lockres, 0);
2704	}
2705
2706	mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2707
2708	mlog_exit(ret);
2709	return ret;
2710}
2711
2712static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2713			      struct ocfs2_lock_res *lockres,
2714			      struct ocfs2_unblock_ctl *ctl)
2715{
2716	unsigned long flags;
2717	int blocking;
2718	int new_level;
2719	int ret = 0;
2720	int set_lvb = 0;
2721
2722	mlog_entry_void();
2723
2724	spin_lock_irqsave(&lockres->l_lock, flags);
2725
2726	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2727
2728recheck:
2729	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2730		ctl->requeue = 1;
2731		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2732		spin_unlock_irqrestore(&lockres->l_lock, flags);
2733		if (ret) {
2734			ret = ocfs2_cancel_convert(osb, lockres);
2735			if (ret < 0)
2736				mlog_errno(ret);
2737		}
2738		goto leave;
2739	}
2740
2741	/* if we're blocking an exclusive and we have *any* holders,
2742	 * then requeue. */
2743	if ((lockres->l_blocking == LKM_EXMODE)
2744	    && (lockres->l_ex_holders || lockres->l_ro_holders))
2745		goto leave_requeue;
2746
2747	/* If it's a PR we're blocking, then only
2748	 * requeue if we've got any EX holders */
2749	if (lockres->l_blocking == LKM_PRMODE &&
2750	    lockres->l_ex_holders)
2751		goto leave_requeue;
2752
2753	/*
2754	 * Can we get a lock in this state if the holder counts are
2755	 * zero? The meta data unblock code used to check this.
2756	 */
2757	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2758	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2759		goto leave_requeue;
2760
2761	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2762
2763	if (lockres->l_ops->check_downconvert
2764	    && !lockres->l_ops->check_downconvert(lockres, new_level))
2765		goto leave_requeue;
2766
2767	/* If we get here, then we know that there are no more
2768	 * incompatible holders (and anyone asking for an incompatible
2769	 * lock is blocked). We can now downconvert the lock */
2770	if (!lockres->l_ops->downconvert_worker)
2771		goto downconvert;
2772
2773	/* Some lockres types want to do a bit of work before
2774	 * downconverting a lock. Allow that here. The worker function
2775	 * may sleep, so we save off a copy of what we're blocking as
2776	 * it may change while we're not holding the spin lock. */
2777	blocking = lockres->l_blocking;
2778	spin_unlock_irqrestore(&lockres->l_lock, flags);
2779
2780	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2781
2782	if (ctl->unblock_action == UNBLOCK_STOP_POST)
2783		goto leave;
2784
2785	spin_lock_irqsave(&lockres->l_lock, flags);
2786	if (blocking != lockres->l_blocking) {
2787		/* If this changed underneath us, then we can't drop
2788		 * it just yet. */
2789		goto recheck;
2790	}
2791
2792downconvert:
2793	ctl->requeue = 0;
2794
2795	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2796		if (lockres->l_level == LKM_EXMODE)
2797			set_lvb = 1;
2798
2799		/*
2800		 * We only set the lvb if the lock has been fully
2801		 * refreshed - otherwise we risk setting stale
2802		 * data. Otherwise, there's no need to actually clear
2803		 * out the lvb here as it's value is still valid.
2804		 */
2805		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2806			lockres->l_ops->set_lvb(lockres);
2807	}
2808
2809	ocfs2_prepare_downconvert(lockres, new_level);
2810	spin_unlock_irqrestore(&lockres->l_lock, flags);
2811	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2812leave:
2813	mlog_exit(ret);
2814	return ret;
2815
2816leave_requeue:
2817	spin_unlock_irqrestore(&lockres->l_lock, flags);
2818	ctl->requeue = 1;
2819
2820	mlog_exit(0);
2821	return 0;
2822}
2823
2824static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2825				     int blocking)
2826{
2827	struct inode *inode;
2828	struct address_space *mapping;
2829
2830       	inode = ocfs2_lock_res_inode(lockres);
2831	mapping = inode->i_mapping;
2832
2833	/*
2834	 * We need this before the filemap_fdatawrite() so that it can
2835	 * transfer the dirty bit from the PTE to the
2836	 * page. Unfortunately this means that even for EX->PR
2837	 * downconverts, we'll lose our mappings and have to build
2838	 * them up again.
2839	 */
2840	unmap_mapping_range(mapping, 0, 0, 0);
2841
2842	if (filemap_fdatawrite(mapping)) {
2843		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2844		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2845	}
2846	sync_mapping_buffers(mapping);
2847	if (blocking == LKM_EXMODE) {
2848		truncate_inode_pages(mapping, 0);
2849	} else {
2850		/* We only need to wait on the I/O if we're not also
2851		 * truncating pages because truncate_inode_pages waits
2852		 * for us above. We don't truncate pages if we're
2853		 * blocking anything < EXMODE because we want to keep
2854		 * them around in that case. */
2855		filemap_fdatawait(mapping);
2856	}
2857
2858	return UNBLOCK_CONTINUE;
2859}
2860
2861static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
2862					int new_level)
2863{
2864	struct inode *inode = ocfs2_lock_res_inode(lockres);
2865	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
2866
2867	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2868	BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
2869
2870	if (checkpointed)
2871		return 1;
2872
2873	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
2874	return 0;
2875}
2876
2877static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2878{
2879	struct inode *inode = ocfs2_lock_res_inode(lockres);
2880
2881	__ocfs2_stuff_meta_lvb(inode);
2882}
2883
2884/*
2885 * Does the final reference drop on our dentry lock. Right now this
2886 * happens in the vote thread, but we could choose to simplify the
2887 * dlmglue API and push these off to the ocfs2_wq in the future.
2888 */
2889static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2890				     struct ocfs2_lock_res *lockres)
2891{
2892	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2893	ocfs2_dentry_lock_put(osb, dl);
2894}
2895
2896/*
2897 * d_delete() matching dentries before the lock downconvert.
2898 *
2899 * At this point, any process waiting to destroy the
2900 * dentry_lock due to last ref count is stopped by the
2901 * OCFS2_LOCK_QUEUED flag.
2902 *
2903 * We have two potential problems
2904 *
2905 * 1) If we do the last reference drop on our dentry_lock (via dput)
2906 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2907 *    the downconvert to finish. Instead we take an elevated
2908 *    reference and push the drop until after we've completed our
2909 *    unblock processing.
2910 *
2911 * 2) There might be another process with a final reference,
2912 *    waiting on us to finish processing. If this is the case, we
2913 *    detect it and exit out - there's no more dentries anyway.
2914 */
2915static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2916				       int blocking)
2917{
2918	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2919	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2920	struct dentry *dentry;
2921	unsigned long flags;
2922	int extra_ref = 0;
2923
2924	/*
2925	 * This node is blocking another node from getting a read
2926	 * lock. This happens when we've renamed within a
2927	 * directory. We've forced the other nodes to d_delete(), but
2928	 * we never actually dropped our lock because it's still
2929	 * valid. The downconvert code will retain a PR for this node,
2930	 * so there's no further work to do.
2931	 */
2932	if (blocking == LKM_PRMODE)
2933		return UNBLOCK_CONTINUE;
2934
2935	/*
2936	 * Mark this inode as potentially orphaned. The code in
2937	 * ocfs2_delete_inode() will figure out whether it actually
2938	 * needs to be freed or not.
2939	 */
2940	spin_lock(&oi->ip_lock);
2941	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2942	spin_unlock(&oi->ip_lock);
2943
2944	/*
2945	 * Yuck. We need to make sure however that the check of
2946	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
2947	 * respect to a reference decrement or the setting of that
2948	 * flag.
2949	 */
2950	spin_lock_irqsave(&lockres->l_lock, flags);
2951	spin_lock(&dentry_attach_lock);
2952	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2953	    && dl->dl_count) {
2954		dl->dl_count++;
2955		extra_ref = 1;
2956	}
2957	spin_unlock(&dentry_attach_lock);
2958	spin_unlock_irqrestore(&lockres->l_lock, flags);
2959
2960	mlog(0, "extra_ref = %d\n", extra_ref);
2961
2962	/*
2963	 * We have a process waiting on us in ocfs2_dentry_iput(),
2964	 * which means we can't have any more outstanding
2965	 * aliases. There's no need to do any more work.
2966	 */
2967	if (!extra_ref)
2968		return UNBLOCK_CONTINUE;
2969
2970	spin_lock(&dentry_attach_lock);
2971	while (1) {
2972		dentry = ocfs2_find_local_alias(dl->dl_inode,
2973						dl->dl_parent_blkno, 1);
2974		if (!dentry)
2975			break;
2976		spin_unlock(&dentry_attach_lock);
2977
2978		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2979		     dentry->d_name.name);
2980
2981		/*
2982		 * The following dcache calls may do an
2983		 * iput(). Normally we don't want that from the
2984		 * downconverting thread, but in this case it's ok
2985		 * because the requesting node already has an
2986		 * exclusive lock on the inode, so it can't be queued
2987		 * for a downconvert.
2988		 */
2989		d_delete(dentry);
2990		dput(dentry);
2991
2992		spin_lock(&dentry_attach_lock);
2993	}
2994	spin_unlock(&dentry_attach_lock);
2995
2996	/*
2997	 * If we are the last holder of this dentry lock, there is no
2998	 * reason to downconvert so skip straight to the unlock.
2999	 */
3000	if (dl->dl_count == 1)
3001		return UNBLOCK_STOP_POST;
3002
3003	return UNBLOCK_CONTINUE_POST;
3004}
3005
3006void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3007				struct ocfs2_lock_res *lockres)
3008{
3009	int status;
3010	struct ocfs2_unblock_ctl ctl = {0, 0,};
3011	unsigned long flags;
3012
3013	/* Our reference to the lockres in this function can be
3014	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3015	 * flag. */
3016
3017	mlog_entry_void();
3018
3019	BUG_ON(!lockres);
3020	BUG_ON(!lockres->l_ops);
3021
3022	mlog(0, "lockres %s blocked.\n", lockres->l_name);
3023
3024	/* Detect whether a lock has been marked as going away while
3025	 * the vote thread was processing other things. A lock can
3026	 * still be marked with OCFS2_LOCK_FREEING after this check,
3027	 * but short circuiting here will still save us some
3028	 * performance. */
3029	spin_lock_irqsave(&lockres->l_lock, flags);
3030	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3031		goto unqueue;
3032	spin_unlock_irqrestore(&lockres->l_lock, flags);
3033
3034	status = ocfs2_unblock_lock(osb, lockres, &ctl);
3035	if (status < 0)
3036		mlog_errno(status);
3037
3038	spin_lock_irqsave(&lockres->l_lock, flags);
3039unqueue:
3040	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3041		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3042	} else
3043		ocfs2_schedule_blocked_lock(osb, lockres);
3044
3045	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3046	     ctl.requeue ? "yes" : "no");
3047	spin_unlock_irqrestore(&lockres->l_lock, flags);
3048
3049	if (ctl.unblock_action != UNBLOCK_CONTINUE
3050	    && lockres->l_ops->post_unlock)
3051		lockres->l_ops->post_unlock(osb, lockres);
3052
3053	mlog_exit_void();
3054}
3055
3056static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3057					struct ocfs2_lock_res *lockres)
3058{
3059	mlog_entry_void();
3060
3061	assert_spin_locked(&lockres->l_lock);
3062
3063	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3064		/* Do not schedule a lock for downconvert when it's on
3065		 * the way to destruction - any nodes wanting access
3066		 * to the resource will get it soon. */
3067		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3068		     lockres->l_name, lockres->l_flags);
3069		return;
3070	}
3071
3072	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3073
3074	spin_lock(&osb->vote_task_lock);
3075	if (list_empty(&lockres->l_blocked_list)) {
3076		list_add_tail(&lockres->l_blocked_list,
3077			      &osb->blocked_lock_list);
3078		osb->blocked_lock_count++;
3079	}
3080	spin_unlock(&osb->vote_task_lock);
3081
3082	mlog_exit_void();
3083}
3084
3085/* This aids in debugging situations where a bad LVB might be involved. */
3086void ocfs2_dump_meta_lvb_info(u64 level,
3087			      const char *function,
3088			      unsigned int line,
3089			      struct ocfs2_lock_res *lockres)
3090{
3091	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3092
3093	mlog(level, "LVB information for %s (called from %s:%u):\n",
3094	     lockres->l_name, function, line);
3095	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3096	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3097	     be32_to_cpu(lvb->lvb_igeneration));
3098	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3099	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3100	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3101	     be16_to_cpu(lvb->lvb_imode));
3102	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3103	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3104	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3105	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3106	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3107	     be32_to_cpu(lvb->lvb_iattr));
3108}
3109