dlmglue.c revision c271c5c22b0a7ca45fda15f1f4d258bca36a5b94
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/smp_lock.h>
31#include <linux/crc32.h>
32#include <linux/kthread.h>
33#include <linux/pagemap.h>
34#include <linux/debugfs.h>
35#include <linux/seq_file.h>
36
37#include <cluster/heartbeat.h>
38#include <cluster/nodemanager.h>
39#include <cluster/tcp.h>
40
41#include <dlm/dlmapi.h>
42
43#define MLOG_MASK_PREFIX ML_DLM_GLUE
44#include <cluster/masklog.h>
45
46#include "ocfs2.h"
47
48#include "alloc.h"
49#include "dcache.h"
50#include "dlmglue.h"
51#include "extent_map.h"
52#include "file.h"
53#include "heartbeat.h"
54#include "inode.h"
55#include "journal.h"
56#include "slot_map.h"
57#include "super.h"
58#include "uptodate.h"
59#include "vote.h"
60
61#include "buffer_head_io.h"
62
63struct ocfs2_mask_waiter {
64	struct list_head	mw_item;
65	int			mw_status;
66	struct completion	mw_complete;
67	unsigned long		mw_mask;
68	unsigned long		mw_goal;
69};
70
71static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
72static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
73
74/*
75 * Return value from ->downconvert_worker functions.
76 *
77 * These control the precise actions of ocfs2_unblock_lock()
78 * and ocfs2_process_blocked_lock()
79 *
80 */
81enum ocfs2_unblock_action {
82	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
83	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
84				      * ->post_unlock callback */
85	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
86				      * ->post_unlock() callback. */
87};
88
89struct ocfs2_unblock_ctl {
90	int requeue;
91	enum ocfs2_unblock_action unblock_action;
92};
93
94static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
95					int new_level);
96static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
97
98static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
99				     int blocking);
100
101static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
102				       int blocking);
103
104static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
105				     struct ocfs2_lock_res *lockres);
106
107/*
108 * OCFS2 Lock Resource Operations
109 *
110 * These fine tune the behavior of the generic dlmglue locking infrastructure.
111 *
112 * The most basic of lock types can point ->l_priv to their respective
113 * struct ocfs2_super and allow the default actions to manage things.
114 *
115 * Right now, each lock type also needs to implement an init function,
116 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
117 * should be called when the lock is no longer needed (i.e., object
118 * destruction time).
119 */
120struct ocfs2_lock_res_ops {
121	/*
122	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
123	 * this callback if ->l_priv is not an ocfs2_super pointer
124	 */
125	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
126
127	/*
128	 * Optionally called in the downconvert (or "vote") thread
129	 * after a successful downconvert. The lockres will not be
130	 * referenced after this callback is called, so it is safe to
131	 * free memory, etc.
132	 *
133	 * The exact semantics of when this is called are controlled
134	 * by ->downconvert_worker()
135	 */
136	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
137
138	/*
139	 * Allow a lock type to add checks to determine whether it is
140	 * safe to downconvert a lock. Return 0 to re-queue the
141	 * downconvert at a later time, nonzero to continue.
142	 *
143	 * For most locks, the default checks that there are no
144	 * incompatible holders are sufficient.
145	 *
146	 * Called with the lockres spinlock held.
147	 */
148	int (*check_downconvert)(struct ocfs2_lock_res *, int);
149
150	/*
151	 * Allows a lock type to populate the lock value block. This
152	 * is called on downconvert, and when we drop a lock.
153	 *
154	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
155	 * in the flags field.
156	 *
157	 * Called with the lockres spinlock held.
158	 */
159	void (*set_lvb)(struct ocfs2_lock_res *);
160
161	/*
162	 * Called from the downconvert thread when it is determined
163	 * that a lock will be downconverted. This is called without
164	 * any locks held so the function can do work that might
165	 * schedule (syncing out data, etc).
166	 *
167	 * This should return any one of the ocfs2_unblock_action
168	 * values, depending on what it wants the thread to do.
169	 */
170	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
171
172	/*
173	 * LOCK_TYPE_* flags which describe the specific requirements
174	 * of a lock type. Descriptions of each individual flag follow.
175	 */
176	int flags;
177};
178
179/*
180 * Some locks want to "refresh" potentially stale data when a
181 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
182 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
183 * individual lockres l_flags member from the ast function. It is
184 * expected that the locking wrapper will clear the
185 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
186 */
187#define LOCK_TYPE_REQUIRES_REFRESH 0x1
188
189/*
190 * Indicate that a lock type makes use of the lock value block. The
191 * ->set_lvb lock type callback must be defined.
192 */
193#define LOCK_TYPE_USES_LVB		0x2
194
195static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
196	.get_osb	= ocfs2_get_inode_osb,
197	.flags		= 0,
198};
199
200static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
201	.get_osb	= ocfs2_get_inode_osb,
202	.check_downconvert = ocfs2_check_meta_downconvert,
203	.set_lvb	= ocfs2_set_meta_lvb,
204	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
205};
206
207static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
208	.get_osb	= ocfs2_get_inode_osb,
209	.downconvert_worker = ocfs2_data_convert_worker,
210	.flags		= 0,
211};
212
213static struct ocfs2_lock_res_ops ocfs2_super_lops = {
214	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
215};
216
217static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
218	.flags		= 0,
219};
220
221static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
222	.get_osb	= ocfs2_get_dentry_osb,
223	.post_unlock	= ocfs2_dentry_post_unlock,
224	.downconvert_worker = ocfs2_dentry_convert_worker,
225	.flags		= 0,
226};
227
228static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
229{
230	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
231		lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
232		lockres->l_type == OCFS2_LOCK_TYPE_RW;
233}
234
235static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
236{
237	BUG_ON(!ocfs2_is_inode_lock(lockres));
238
239	return (struct inode *) lockres->l_priv;
240}
241
242static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
243{
244	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
245
246	return (struct ocfs2_dentry_lock *)lockres->l_priv;
247}
248
249static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
250{
251	if (lockres->l_ops->get_osb)
252		return lockres->l_ops->get_osb(lockres);
253
254	return (struct ocfs2_super *)lockres->l_priv;
255}
256
257static int ocfs2_lock_create(struct ocfs2_super *osb,
258			     struct ocfs2_lock_res *lockres,
259			     int level,
260			     int dlm_flags);
261static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
262						     int wanted);
263static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
264				 struct ocfs2_lock_res *lockres,
265				 int level);
266static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
267static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
268static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
269static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
270static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
271					struct ocfs2_lock_res *lockres);
272static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
273						int convert);
274#define ocfs2_log_dlm_error(_func, _stat, _lockres) do {	\
275	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "	\
276		"resource %s: %s\n", dlm_errname(_stat), _func,	\
277		_lockres->l_name, dlm_errmsg(_stat));		\
278} while (0)
279static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
280				 struct ocfs2_lock_res *lockres);
281static int ocfs2_meta_lock_update(struct inode *inode,
282				  struct buffer_head **bh);
283static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
284static inline int ocfs2_highest_compat_lock_level(int level);
285
286static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
287				  u64 blkno,
288				  u32 generation,
289				  char *name)
290{
291	int len;
292
293	mlog_entry_void();
294
295	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
296
297	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
298		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
299		       (long long)blkno, generation);
300
301	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
302
303	mlog(0, "built lock resource with name: %s\n", name);
304
305	mlog_exit_void();
306}
307
308static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
309
310static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
311				       struct ocfs2_dlm_debug *dlm_debug)
312{
313	mlog(0, "Add tracking for lockres %s\n", res->l_name);
314
315	spin_lock(&ocfs2_dlm_tracking_lock);
316	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
317	spin_unlock(&ocfs2_dlm_tracking_lock);
318}
319
320static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
321{
322	spin_lock(&ocfs2_dlm_tracking_lock);
323	if (!list_empty(&res->l_debug_list))
324		list_del_init(&res->l_debug_list);
325	spin_unlock(&ocfs2_dlm_tracking_lock);
326}
327
328static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
329				       struct ocfs2_lock_res *res,
330				       enum ocfs2_lock_type type,
331				       struct ocfs2_lock_res_ops *ops,
332				       void *priv)
333{
334	res->l_type          = type;
335	res->l_ops           = ops;
336	res->l_priv          = priv;
337
338	res->l_level         = LKM_IVMODE;
339	res->l_requested     = LKM_IVMODE;
340	res->l_blocking      = LKM_IVMODE;
341	res->l_action        = OCFS2_AST_INVALID;
342	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
343
344	res->l_flags         = OCFS2_LOCK_INITIALIZED;
345
346	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
347}
348
349void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
350{
351	/* This also clears out the lock status block */
352	memset(res, 0, sizeof(struct ocfs2_lock_res));
353	spin_lock_init(&res->l_lock);
354	init_waitqueue_head(&res->l_event);
355	INIT_LIST_HEAD(&res->l_blocked_list);
356	INIT_LIST_HEAD(&res->l_mask_waiters);
357}
358
359void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
360			       enum ocfs2_lock_type type,
361			       unsigned int generation,
362			       struct inode *inode)
363{
364	struct ocfs2_lock_res_ops *ops;
365
366	switch(type) {
367		case OCFS2_LOCK_TYPE_RW:
368			ops = &ocfs2_inode_rw_lops;
369			break;
370		case OCFS2_LOCK_TYPE_META:
371			ops = &ocfs2_inode_meta_lops;
372			break;
373		case OCFS2_LOCK_TYPE_DATA:
374			ops = &ocfs2_inode_data_lops;
375			break;
376		default:
377			mlog_bug_on_msg(1, "type: %d\n", type);
378			ops = NULL; /* thanks, gcc */
379			break;
380	};
381
382	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
383			      generation, res->l_name);
384	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
385}
386
387static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
388{
389	struct inode *inode = ocfs2_lock_res_inode(lockres);
390
391	return OCFS2_SB(inode->i_sb);
392}
393
394static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
395{
396	__be64 inode_blkno_be;
397
398	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
399	       sizeof(__be64));
400
401	return be64_to_cpu(inode_blkno_be);
402}
403
404static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
405{
406	struct ocfs2_dentry_lock *dl = lockres->l_priv;
407
408	return OCFS2_SB(dl->dl_inode->i_sb);
409}
410
411void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
412				u64 parent, struct inode *inode)
413{
414	int len;
415	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
416	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
417	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
418
419	ocfs2_lock_res_init_once(lockres);
420
421	/*
422	 * Unfortunately, the standard lock naming scheme won't work
423	 * here because we have two 16 byte values to use. Instead,
424	 * we'll stuff the inode number as a binary value. We still
425	 * want error prints to show something without garbling the
426	 * display, so drop a null byte in there before the inode
427	 * number. A future version of OCFS2 will likely use all
428	 * binary lock names. The stringified names have been a
429	 * tremendous aid in debugging, but now that the debugfs
430	 * interface exists, we can mangle things there if need be.
431	 *
432	 * NOTE: We also drop the standard "pad" value (the total lock
433	 * name size stays the same though - the last part is all
434	 * zeros due to the memset in ocfs2_lock_res_init_once()
435	 */
436	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
437		       "%c%016llx",
438		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
439		       (long long)parent);
440
441	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
442
443	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
444	       sizeof(__be64));
445
446	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
447				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
448				   dl);
449}
450
451static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
452				      struct ocfs2_super *osb)
453{
454	/* Superblock lockres doesn't come from a slab so we call init
455	 * once on it manually.  */
456	ocfs2_lock_res_init_once(res);
457	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
458			      0, res->l_name);
459	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
460				   &ocfs2_super_lops, osb);
461}
462
463static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
464				       struct ocfs2_super *osb)
465{
466	/* Rename lockres doesn't come from a slab so we call init
467	 * once on it manually.  */
468	ocfs2_lock_res_init_once(res);
469	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
470	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
471				   &ocfs2_rename_lops, osb);
472}
473
474void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
475{
476	mlog_entry_void();
477
478	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
479		return;
480
481	ocfs2_remove_lockres_tracking(res);
482
483	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
484			"Lockres %s is on the blocked list\n",
485			res->l_name);
486	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
487			"Lockres %s has mask waiters pending\n",
488			res->l_name);
489	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
490			"Lockres %s is locked\n",
491			res->l_name);
492	mlog_bug_on_msg(res->l_ro_holders,
493			"Lockres %s has %u ro holders\n",
494			res->l_name, res->l_ro_holders);
495	mlog_bug_on_msg(res->l_ex_holders,
496			"Lockres %s has %u ex holders\n",
497			res->l_name, res->l_ex_holders);
498
499	/* Need to clear out the lock status block for the dlm */
500	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
501
502	res->l_flags = 0UL;
503	mlog_exit_void();
504}
505
506static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
507				     int level)
508{
509	mlog_entry_void();
510
511	BUG_ON(!lockres);
512
513	switch(level) {
514	case LKM_EXMODE:
515		lockres->l_ex_holders++;
516		break;
517	case LKM_PRMODE:
518		lockres->l_ro_holders++;
519		break;
520	default:
521		BUG();
522	}
523
524	mlog_exit_void();
525}
526
527static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
528				     int level)
529{
530	mlog_entry_void();
531
532	BUG_ON(!lockres);
533
534	switch(level) {
535	case LKM_EXMODE:
536		BUG_ON(!lockres->l_ex_holders);
537		lockres->l_ex_holders--;
538		break;
539	case LKM_PRMODE:
540		BUG_ON(!lockres->l_ro_holders);
541		lockres->l_ro_holders--;
542		break;
543	default:
544		BUG();
545	}
546	mlog_exit_void();
547}
548
549/* WARNING: This function lives in a world where the only three lock
550 * levels are EX, PR, and NL. It *will* have to be adjusted when more
551 * lock types are added. */
552static inline int ocfs2_highest_compat_lock_level(int level)
553{
554	int new_level = LKM_EXMODE;
555
556	if (level == LKM_EXMODE)
557		new_level = LKM_NLMODE;
558	else if (level == LKM_PRMODE)
559		new_level = LKM_PRMODE;
560	return new_level;
561}
562
563static void lockres_set_flags(struct ocfs2_lock_res *lockres,
564			      unsigned long newflags)
565{
566	struct list_head *pos, *tmp;
567	struct ocfs2_mask_waiter *mw;
568
569 	assert_spin_locked(&lockres->l_lock);
570
571	lockres->l_flags = newflags;
572
573	list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
574		mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
575		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
576			continue;
577
578		list_del_init(&mw->mw_item);
579		mw->mw_status = 0;
580		complete(&mw->mw_complete);
581	}
582}
583static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
584{
585	lockres_set_flags(lockres, lockres->l_flags | or);
586}
587static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
588				unsigned long clear)
589{
590	lockres_set_flags(lockres, lockres->l_flags & ~clear);
591}
592
593static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
594{
595	mlog_entry_void();
596
597	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
598	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
599	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
600	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
601
602	lockres->l_level = lockres->l_requested;
603	if (lockres->l_level <=
604	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
605		lockres->l_blocking = LKM_NLMODE;
606		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
607	}
608	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
609
610	mlog_exit_void();
611}
612
613static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
614{
615	mlog_entry_void();
616
617	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
618	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
619
620	/* Convert from RO to EX doesn't really need anything as our
621	 * information is already up to data. Convert from NL to
622	 * *anything* however should mark ourselves as needing an
623	 * update */
624	if (lockres->l_level == LKM_NLMODE &&
625	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
626		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
627
628	lockres->l_level = lockres->l_requested;
629	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
630
631	mlog_exit_void();
632}
633
634static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
635{
636	mlog_entry_void();
637
638	BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
639	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
640
641	if (lockres->l_requested > LKM_NLMODE &&
642	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
643	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
644		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
645
646	lockres->l_level = lockres->l_requested;
647	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
648	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
649
650	mlog_exit_void();
651}
652
653static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
654				     int level)
655{
656	int needs_downconvert = 0;
657	mlog_entry_void();
658
659	assert_spin_locked(&lockres->l_lock);
660
661	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
662
663	if (level > lockres->l_blocking) {
664		/* only schedule a downconvert if we haven't already scheduled
665		 * one that goes low enough to satisfy the level we're
666		 * blocking.  this also catches the case where we get
667		 * duplicate BASTs */
668		if (ocfs2_highest_compat_lock_level(level) <
669		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
670			needs_downconvert = 1;
671
672		lockres->l_blocking = level;
673	}
674
675	mlog_exit(needs_downconvert);
676	return needs_downconvert;
677}
678
679static void ocfs2_blocking_ast(void *opaque, int level)
680{
681	struct ocfs2_lock_res *lockres = opaque;
682	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
683	int needs_downconvert;
684	unsigned long flags;
685
686	BUG_ON(level <= LKM_NLMODE);
687
688	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
689	     lockres->l_name, level, lockres->l_level,
690	     ocfs2_lock_type_string(lockres->l_type));
691
692	spin_lock_irqsave(&lockres->l_lock, flags);
693	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
694	if (needs_downconvert)
695		ocfs2_schedule_blocked_lock(osb, lockres);
696	spin_unlock_irqrestore(&lockres->l_lock, flags);
697
698	wake_up(&lockres->l_event);
699
700	ocfs2_kick_vote_thread(osb);
701}
702
703static void ocfs2_locking_ast(void *opaque)
704{
705	struct ocfs2_lock_res *lockres = opaque;
706	struct dlm_lockstatus *lksb = &lockres->l_lksb;
707	unsigned long flags;
708
709	spin_lock_irqsave(&lockres->l_lock, flags);
710
711	if (lksb->status != DLM_NORMAL) {
712		mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
713		     lockres->l_name, lksb->status);
714		spin_unlock_irqrestore(&lockres->l_lock, flags);
715		return;
716	}
717
718	switch(lockres->l_action) {
719	case OCFS2_AST_ATTACH:
720		ocfs2_generic_handle_attach_action(lockres);
721		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
722		break;
723	case OCFS2_AST_CONVERT:
724		ocfs2_generic_handle_convert_action(lockres);
725		break;
726	case OCFS2_AST_DOWNCONVERT:
727		ocfs2_generic_handle_downconvert_action(lockres);
728		break;
729	default:
730		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
731		     "lockres flags = 0x%lx, unlock action: %u\n",
732		     lockres->l_name, lockres->l_action, lockres->l_flags,
733		     lockres->l_unlock_action);
734		BUG();
735	}
736
737	/* set it to something invalid so if we get called again we
738	 * can catch it. */
739	lockres->l_action = OCFS2_AST_INVALID;
740
741	wake_up(&lockres->l_event);
742	spin_unlock_irqrestore(&lockres->l_lock, flags);
743}
744
745static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
746						int convert)
747{
748	unsigned long flags;
749
750	mlog_entry_void();
751	spin_lock_irqsave(&lockres->l_lock, flags);
752	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
753	if (convert)
754		lockres->l_action = OCFS2_AST_INVALID;
755	else
756		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
757	spin_unlock_irqrestore(&lockres->l_lock, flags);
758
759	wake_up(&lockres->l_event);
760	mlog_exit_void();
761}
762
763/* Note: If we detect another process working on the lock (i.e.,
764 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
765 * to do the right thing in that case.
766 */
767static int ocfs2_lock_create(struct ocfs2_super *osb,
768			     struct ocfs2_lock_res *lockres,
769			     int level,
770			     int dlm_flags)
771{
772	int ret = 0;
773	enum dlm_status status = DLM_NORMAL;
774	unsigned long flags;
775
776	mlog_entry_void();
777
778	mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
779	     dlm_flags);
780
781	spin_lock_irqsave(&lockres->l_lock, flags);
782	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
783	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
784		spin_unlock_irqrestore(&lockres->l_lock, flags);
785		goto bail;
786	}
787
788	lockres->l_action = OCFS2_AST_ATTACH;
789	lockres->l_requested = level;
790	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
791	spin_unlock_irqrestore(&lockres->l_lock, flags);
792
793	status = dlmlock(osb->dlm,
794			 level,
795			 &lockres->l_lksb,
796			 dlm_flags,
797			 lockres->l_name,
798			 OCFS2_LOCK_ID_MAX_LEN - 1,
799			 ocfs2_locking_ast,
800			 lockres,
801			 ocfs2_blocking_ast);
802	if (status != DLM_NORMAL) {
803		ocfs2_log_dlm_error("dlmlock", status, lockres);
804		ret = -EINVAL;
805		ocfs2_recover_from_dlm_error(lockres, 1);
806	}
807
808	mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
809
810bail:
811	mlog_exit(ret);
812	return ret;
813}
814
815static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
816					int flag)
817{
818	unsigned long flags;
819	int ret;
820
821	spin_lock_irqsave(&lockres->l_lock, flags);
822	ret = lockres->l_flags & flag;
823	spin_unlock_irqrestore(&lockres->l_lock, flags);
824
825	return ret;
826}
827
828static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
829
830{
831	wait_event(lockres->l_event,
832		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
833}
834
835static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
836
837{
838	wait_event(lockres->l_event,
839		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
840}
841
842/* predict what lock level we'll be dropping down to on behalf
843 * of another node, and return true if the currently wanted
844 * level will be compatible with it. */
845static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
846						     int wanted)
847{
848	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
849
850	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
851}
852
853static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
854{
855	INIT_LIST_HEAD(&mw->mw_item);
856	init_completion(&mw->mw_complete);
857}
858
859static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
860{
861	wait_for_completion(&mw->mw_complete);
862	/* Re-arm the completion in case we want to wait on it again */
863	INIT_COMPLETION(mw->mw_complete);
864	return mw->mw_status;
865}
866
867static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
868				    struct ocfs2_mask_waiter *mw,
869				    unsigned long mask,
870				    unsigned long goal)
871{
872	BUG_ON(!list_empty(&mw->mw_item));
873
874	assert_spin_locked(&lockres->l_lock);
875
876	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
877	mw->mw_mask = mask;
878	mw->mw_goal = goal;
879}
880
881/* returns 0 if the mw that was removed was already satisfied, -EBUSY
882 * if the mask still hadn't reached its goal */
883static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
884				      struct ocfs2_mask_waiter *mw)
885{
886	unsigned long flags;
887	int ret = 0;
888
889	spin_lock_irqsave(&lockres->l_lock, flags);
890	if (!list_empty(&mw->mw_item)) {
891		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
892			ret = -EBUSY;
893
894		list_del_init(&mw->mw_item);
895		init_completion(&mw->mw_complete);
896	}
897	spin_unlock_irqrestore(&lockres->l_lock, flags);
898
899	return ret;
900
901}
902
903static int ocfs2_cluster_lock(struct ocfs2_super *osb,
904			      struct ocfs2_lock_res *lockres,
905			      int level,
906			      int lkm_flags,
907			      int arg_flags)
908{
909	struct ocfs2_mask_waiter mw;
910	enum dlm_status status;
911	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
912	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
913	unsigned long flags;
914
915	mlog_entry_void();
916
917	ocfs2_init_mask_waiter(&mw);
918
919	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
920		lkm_flags |= LKM_VALBLK;
921
922again:
923	wait = 0;
924
925	if (catch_signals && signal_pending(current)) {
926		ret = -ERESTARTSYS;
927		goto out;
928	}
929
930	spin_lock_irqsave(&lockres->l_lock, flags);
931
932	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
933			"Cluster lock called on freeing lockres %s! flags "
934			"0x%lx\n", lockres->l_name, lockres->l_flags);
935
936	/* We only compare against the currently granted level
937	 * here. If the lock is blocked waiting on a downconvert,
938	 * we'll get caught below. */
939	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
940	    level > lockres->l_level) {
941		/* is someone sitting in dlm_lock? If so, wait on
942		 * them. */
943		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
944		wait = 1;
945		goto unlock;
946	}
947
948	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
949		/* lock has not been created yet. */
950		spin_unlock_irqrestore(&lockres->l_lock, flags);
951
952		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
953		if (ret < 0) {
954			mlog_errno(ret);
955			goto out;
956		}
957		goto again;
958	}
959
960	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
961	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
962		/* is the lock is currently blocked on behalf of
963		 * another node */
964		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
965		wait = 1;
966		goto unlock;
967	}
968
969	if (level > lockres->l_level) {
970		if (lockres->l_action != OCFS2_AST_INVALID)
971			mlog(ML_ERROR, "lockres %s has action %u pending\n",
972			     lockres->l_name, lockres->l_action);
973
974		lockres->l_action = OCFS2_AST_CONVERT;
975		lockres->l_requested = level;
976		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
977		spin_unlock_irqrestore(&lockres->l_lock, flags);
978
979		BUG_ON(level == LKM_IVMODE);
980		BUG_ON(level == LKM_NLMODE);
981
982		mlog(0, "lock %s, convert from %d to level = %d\n",
983		     lockres->l_name, lockres->l_level, level);
984
985		/* call dlm_lock to upgrade lock now */
986		status = dlmlock(osb->dlm,
987				 level,
988				 &lockres->l_lksb,
989				 lkm_flags|LKM_CONVERT,
990				 lockres->l_name,
991				 OCFS2_LOCK_ID_MAX_LEN - 1,
992				 ocfs2_locking_ast,
993				 lockres,
994				 ocfs2_blocking_ast);
995		if (status != DLM_NORMAL) {
996			if ((lkm_flags & LKM_NOQUEUE) &&
997			    (status == DLM_NOTQUEUED))
998				ret = -EAGAIN;
999			else {
1000				ocfs2_log_dlm_error("dlmlock", status,
1001						    lockres);
1002				ret = -EINVAL;
1003			}
1004			ocfs2_recover_from_dlm_error(lockres, 1);
1005			goto out;
1006		}
1007
1008		mlog(0, "lock %s, successfull return from dlmlock\n",
1009		     lockres->l_name);
1010
1011		/* At this point we've gone inside the dlm and need to
1012		 * complete our work regardless. */
1013		catch_signals = 0;
1014
1015		/* wait for busy to clear and carry on */
1016		goto again;
1017	}
1018
1019	/* Ok, if we get here then we're good to go. */
1020	ocfs2_inc_holders(lockres, level);
1021
1022	ret = 0;
1023unlock:
1024	spin_unlock_irqrestore(&lockres->l_lock, flags);
1025out:
1026	/*
1027	 * This is helping work around a lock inversion between the page lock
1028	 * and dlm locks.  One path holds the page lock while calling aops
1029	 * which block acquiring dlm locks.  The voting thread holds dlm
1030	 * locks while acquiring page locks while down converting data locks.
1031	 * This block is helping an aop path notice the inversion and back
1032	 * off to unlock its page lock before trying the dlm lock again.
1033	 */
1034	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1035	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1036		wait = 0;
1037		if (lockres_remove_mask_waiter(lockres, &mw))
1038			ret = -EAGAIN;
1039		else
1040			goto again;
1041	}
1042	if (wait) {
1043		ret = ocfs2_wait_for_mask(&mw);
1044		if (ret == 0)
1045			goto again;
1046		mlog_errno(ret);
1047	}
1048
1049	mlog_exit(ret);
1050	return ret;
1051}
1052
1053static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1054				 struct ocfs2_lock_res *lockres,
1055				 int level)
1056{
1057	unsigned long flags;
1058
1059	mlog_entry_void();
1060	spin_lock_irqsave(&lockres->l_lock, flags);
1061	ocfs2_dec_holders(lockres, level);
1062	ocfs2_vote_on_unlock(osb, lockres);
1063	spin_unlock_irqrestore(&lockres->l_lock, flags);
1064	mlog_exit_void();
1065}
1066
1067static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1068				 struct ocfs2_lock_res *lockres,
1069				 int ex,
1070				 int local)
1071{
1072	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1073	unsigned long flags;
1074	int lkm_flags = local ? LKM_LOCAL : 0;
1075
1076	spin_lock_irqsave(&lockres->l_lock, flags);
1077	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1078	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1079	spin_unlock_irqrestore(&lockres->l_lock, flags);
1080
1081	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1082}
1083
1084/* Grants us an EX lock on the data and metadata resources, skipping
1085 * the normal cluster directory lookup. Use this ONLY on newly created
1086 * inodes which other nodes can't possibly see, and which haven't been
1087 * hashed in the inode hash yet. This can give us a good performance
1088 * increase as it'll skip the network broadcast normally associated
1089 * with creating a new lock resource. */
1090int ocfs2_create_new_inode_locks(struct inode *inode)
1091{
1092	int ret;
1093	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1094
1095	BUG_ON(!inode);
1096	BUG_ON(!ocfs2_inode_is_new(inode));
1097
1098	mlog_entry_void();
1099
1100	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1101
1102	/* NOTE: That we don't increment any of the holder counts, nor
1103	 * do we add anything to a journal handle. Since this is
1104	 * supposed to be a new inode which the cluster doesn't know
1105	 * about yet, there is no need to.  As far as the LVB handling
1106	 * is concerned, this is basically like acquiring an EX lock
1107	 * on a resource which has an invalid one -- we'll set it
1108	 * valid when we release the EX. */
1109
1110	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1111	if (ret) {
1112		mlog_errno(ret);
1113		goto bail;
1114	}
1115
1116	/*
1117	 * We don't want to use LKM_LOCAL on a meta data lock as they
1118	 * don't use a generation in their lock names.
1119	 */
1120	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1121	if (ret) {
1122		mlog_errno(ret);
1123		goto bail;
1124	}
1125
1126	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1127	if (ret) {
1128		mlog_errno(ret);
1129		goto bail;
1130	}
1131
1132bail:
1133	mlog_exit(ret);
1134	return ret;
1135}
1136
1137int ocfs2_rw_lock(struct inode *inode, int write)
1138{
1139	int status, level;
1140	struct ocfs2_lock_res *lockres;
1141	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1142
1143	BUG_ON(!inode);
1144
1145	mlog_entry_void();
1146
1147	mlog(0, "inode %llu take %s RW lock\n",
1148	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1149	     write ? "EXMODE" : "PRMODE");
1150
1151	if (ocfs2_mount_local(osb))
1152		return 0;
1153
1154	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1155
1156	level = write ? LKM_EXMODE : LKM_PRMODE;
1157
1158	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1159				    0);
1160	if (status < 0)
1161		mlog_errno(status);
1162
1163	mlog_exit(status);
1164	return status;
1165}
1166
1167void ocfs2_rw_unlock(struct inode *inode, int write)
1168{
1169	int level = write ? LKM_EXMODE : LKM_PRMODE;
1170	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1171	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1172
1173	mlog_entry_void();
1174
1175	mlog(0, "inode %llu drop %s RW lock\n",
1176	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1177	     write ? "EXMODE" : "PRMODE");
1178
1179	if (!ocfs2_mount_local(osb))
1180		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1181
1182	mlog_exit_void();
1183}
1184
1185int ocfs2_data_lock_full(struct inode *inode,
1186			 int write,
1187			 int arg_flags)
1188{
1189	int status = 0, level;
1190	struct ocfs2_lock_res *lockres;
1191	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1192
1193	BUG_ON(!inode);
1194
1195	mlog_entry_void();
1196
1197	mlog(0, "inode %llu take %s DATA lock\n",
1198	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1199	     write ? "EXMODE" : "PRMODE");
1200
1201	/* We'll allow faking a readonly data lock for
1202	 * rodevices. */
1203	if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1204		if (write) {
1205			status = -EROFS;
1206			mlog_errno(status);
1207		}
1208		goto out;
1209	}
1210
1211	if (ocfs2_mount_local(osb))
1212		goto out;
1213
1214	lockres = &OCFS2_I(inode)->ip_data_lockres;
1215
1216	level = write ? LKM_EXMODE : LKM_PRMODE;
1217
1218	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1219				    0, arg_flags);
1220	if (status < 0 && status != -EAGAIN)
1221		mlog_errno(status);
1222
1223out:
1224	mlog_exit(status);
1225	return status;
1226}
1227
1228/* see ocfs2_meta_lock_with_page() */
1229int ocfs2_data_lock_with_page(struct inode *inode,
1230			      int write,
1231			      struct page *page)
1232{
1233	int ret;
1234
1235	ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1236	if (ret == -EAGAIN) {
1237		unlock_page(page);
1238		if (ocfs2_data_lock(inode, write) == 0)
1239			ocfs2_data_unlock(inode, write);
1240		ret = AOP_TRUNCATED_PAGE;
1241	}
1242
1243	return ret;
1244}
1245
1246static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1247				 struct ocfs2_lock_res *lockres)
1248{
1249	int kick = 0;
1250
1251	mlog_entry_void();
1252
1253	/* If we know that another node is waiting on our lock, kick
1254	 * the vote thread * pre-emptively when we reach a release
1255	 * condition. */
1256	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1257		switch(lockres->l_blocking) {
1258		case LKM_EXMODE:
1259			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1260				kick = 1;
1261			break;
1262		case LKM_PRMODE:
1263			if (!lockres->l_ex_holders)
1264				kick = 1;
1265			break;
1266		default:
1267			BUG();
1268		}
1269	}
1270
1271	if (kick)
1272		ocfs2_kick_vote_thread(osb);
1273
1274	mlog_exit_void();
1275}
1276
1277void ocfs2_data_unlock(struct inode *inode,
1278		       int write)
1279{
1280	int level = write ? LKM_EXMODE : LKM_PRMODE;
1281	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1282	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1283
1284	mlog_entry_void();
1285
1286	mlog(0, "inode %llu drop %s DATA lock\n",
1287	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1288	     write ? "EXMODE" : "PRMODE");
1289
1290	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1291	    !ocfs2_mount_local(osb))
1292		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1293
1294	mlog_exit_void();
1295}
1296
1297#define OCFS2_SEC_BITS   34
1298#define OCFS2_SEC_SHIFT  (64 - 34)
1299#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1300
1301/* LVB only has room for 64 bits of time here so we pack it for
1302 * now. */
1303static u64 ocfs2_pack_timespec(struct timespec *spec)
1304{
1305	u64 res;
1306	u64 sec = spec->tv_sec;
1307	u32 nsec = spec->tv_nsec;
1308
1309	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1310
1311	return res;
1312}
1313
1314/* Call this with the lockres locked. I am reasonably sure we don't
1315 * need ip_lock in this function as anyone who would be changing those
1316 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1317static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1318{
1319	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1320	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1321	struct ocfs2_meta_lvb *lvb;
1322
1323	mlog_entry_void();
1324
1325	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1326
1327	/*
1328	 * Invalidate the LVB of a deleted inode - this way other
1329	 * nodes are forced to go to disk and discover the new inode
1330	 * status.
1331	 */
1332	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1333		lvb->lvb_version = 0;
1334		goto out;
1335	}
1336
1337	lvb->lvb_version   = OCFS2_LVB_VERSION;
1338	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1339	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1340	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1341	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1342	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1343	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1344	lvb->lvb_iatime_packed  =
1345		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1346	lvb->lvb_ictime_packed =
1347		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1348	lvb->lvb_imtime_packed =
1349		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1350	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1351	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1352
1353out:
1354	mlog_meta_lvb(0, lockres);
1355
1356	mlog_exit_void();
1357}
1358
1359static void ocfs2_unpack_timespec(struct timespec *spec,
1360				  u64 packed_time)
1361{
1362	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1363	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1364}
1365
1366static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1367{
1368	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1369	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1370	struct ocfs2_meta_lvb *lvb;
1371
1372	mlog_entry_void();
1373
1374	mlog_meta_lvb(0, lockres);
1375
1376	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1377
1378	/* We're safe here without the lockres lock... */
1379	spin_lock(&oi->ip_lock);
1380	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1381	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1382
1383	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1384	ocfs2_set_inode_flags(inode);
1385
1386	/* fast-symlinks are a special case */
1387	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1388		inode->i_blocks = 0;
1389	else
1390		inode->i_blocks =
1391			ocfs2_align_bytes_to_sectors(i_size_read(inode));
1392
1393	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1394	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1395	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1396	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1397	ocfs2_unpack_timespec(&inode->i_atime,
1398			      be64_to_cpu(lvb->lvb_iatime_packed));
1399	ocfs2_unpack_timespec(&inode->i_mtime,
1400			      be64_to_cpu(lvb->lvb_imtime_packed));
1401	ocfs2_unpack_timespec(&inode->i_ctime,
1402			      be64_to_cpu(lvb->lvb_ictime_packed));
1403	spin_unlock(&oi->ip_lock);
1404
1405	mlog_exit_void();
1406}
1407
1408static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1409					      struct ocfs2_lock_res *lockres)
1410{
1411	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1412
1413	if (lvb->lvb_version == OCFS2_LVB_VERSION
1414	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1415		return 1;
1416	return 0;
1417}
1418
1419/* Determine whether a lock resource needs to be refreshed, and
1420 * arbitrate who gets to refresh it.
1421 *
1422 *   0 means no refresh needed.
1423 *
1424 *   > 0 means you need to refresh this and you MUST call
1425 *   ocfs2_complete_lock_res_refresh afterwards. */
1426static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1427{
1428	unsigned long flags;
1429	int status = 0;
1430
1431	mlog_entry_void();
1432
1433refresh_check:
1434	spin_lock_irqsave(&lockres->l_lock, flags);
1435	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1436		spin_unlock_irqrestore(&lockres->l_lock, flags);
1437		goto bail;
1438	}
1439
1440	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1441		spin_unlock_irqrestore(&lockres->l_lock, flags);
1442
1443		ocfs2_wait_on_refreshing_lock(lockres);
1444		goto refresh_check;
1445	}
1446
1447	/* Ok, I'll be the one to refresh this lock. */
1448	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1449	spin_unlock_irqrestore(&lockres->l_lock, flags);
1450
1451	status = 1;
1452bail:
1453	mlog_exit(status);
1454	return status;
1455}
1456
1457/* If status is non zero, I'll mark it as not being in refresh
1458 * anymroe, but i won't clear the needs refresh flag. */
1459static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1460						   int status)
1461{
1462	unsigned long flags;
1463	mlog_entry_void();
1464
1465	spin_lock_irqsave(&lockres->l_lock, flags);
1466	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1467	if (!status)
1468		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1469	spin_unlock_irqrestore(&lockres->l_lock, flags);
1470
1471	wake_up(&lockres->l_event);
1472
1473	mlog_exit_void();
1474}
1475
1476/* may or may not return a bh if it went to disk. */
1477static int ocfs2_meta_lock_update(struct inode *inode,
1478				  struct buffer_head **bh)
1479{
1480	int status = 0;
1481	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1482	struct ocfs2_lock_res *lockres = NULL;
1483	struct ocfs2_dinode *fe;
1484	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1485
1486	mlog_entry_void();
1487
1488	spin_lock(&oi->ip_lock);
1489	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1490		mlog(0, "Orphaned inode %llu was deleted while we "
1491		     "were waiting on a lock. ip_flags = 0x%x\n",
1492		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
1493		spin_unlock(&oi->ip_lock);
1494		status = -ENOENT;
1495		goto bail;
1496	}
1497	spin_unlock(&oi->ip_lock);
1498
1499	if (!ocfs2_mount_local(osb)) {
1500		lockres = &oi->ip_meta_lockres;
1501
1502		if (!ocfs2_should_refresh_lock_res(lockres))
1503			goto bail;
1504	}
1505
1506	/* This will discard any caching information we might have had
1507	 * for the inode metadata. */
1508	ocfs2_metadata_cache_purge(inode);
1509
1510	/* will do nothing for inode types that don't use the extent
1511	 * map (directories, bitmap files, etc) */
1512	ocfs2_extent_map_trunc(inode, 0);
1513
1514	if (lockres && ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1515		mlog(0, "Trusting LVB on inode %llu\n",
1516		     (unsigned long long)oi->ip_blkno);
1517		ocfs2_refresh_inode_from_lvb(inode);
1518	} else {
1519		/* Boo, we have to go to disk. */
1520		/* read bh, cast, ocfs2_refresh_inode */
1521		status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1522					  bh, OCFS2_BH_CACHED, inode);
1523		if (status < 0) {
1524			mlog_errno(status);
1525			goto bail_refresh;
1526		}
1527		fe = (struct ocfs2_dinode *) (*bh)->b_data;
1528
1529		/* This is a good chance to make sure we're not
1530		 * locking an invalid object.
1531		 *
1532		 * We bug on a stale inode here because we checked
1533		 * above whether it was wiped from disk. The wiping
1534		 * node provides a guarantee that we receive that
1535		 * message and can mark the inode before dropping any
1536		 * locks associated with it. */
1537		if (!OCFS2_IS_VALID_DINODE(fe)) {
1538			OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1539			status = -EIO;
1540			goto bail_refresh;
1541		}
1542		mlog_bug_on_msg(inode->i_generation !=
1543				le32_to_cpu(fe->i_generation),
1544				"Invalid dinode %llu disk generation: %u "
1545				"inode->i_generation: %u\n",
1546				(unsigned long long)oi->ip_blkno,
1547				le32_to_cpu(fe->i_generation),
1548				inode->i_generation);
1549		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1550				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1551				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
1552				(unsigned long long)oi->ip_blkno,
1553				(unsigned long long)le64_to_cpu(fe->i_dtime),
1554				le32_to_cpu(fe->i_flags));
1555
1556		ocfs2_refresh_inode(inode, fe);
1557	}
1558
1559	status = 0;
1560bail_refresh:
1561	if (lockres)
1562		ocfs2_complete_lock_res_refresh(lockres, status);
1563bail:
1564	mlog_exit(status);
1565	return status;
1566}
1567
1568static int ocfs2_assign_bh(struct inode *inode,
1569			   struct buffer_head **ret_bh,
1570			   struct buffer_head *passed_bh)
1571{
1572	int status;
1573
1574	if (passed_bh) {
1575		/* Ok, the update went to disk for us, use the
1576		 * returned bh. */
1577		*ret_bh = passed_bh;
1578		get_bh(*ret_bh);
1579
1580		return 0;
1581	}
1582
1583	status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1584				  OCFS2_I(inode)->ip_blkno,
1585				  ret_bh,
1586				  OCFS2_BH_CACHED,
1587				  inode);
1588	if (status < 0)
1589		mlog_errno(status);
1590
1591	return status;
1592}
1593
1594/*
1595 * returns < 0 error if the callback will never be called, otherwise
1596 * the result of the lock will be communicated via the callback.
1597 */
1598int ocfs2_meta_lock_full(struct inode *inode,
1599			 struct buffer_head **ret_bh,
1600			 int ex,
1601			 int arg_flags)
1602{
1603	int status, level, dlm_flags, acquired;
1604	struct ocfs2_lock_res *lockres = NULL;
1605	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1606	struct buffer_head *local_bh = NULL;
1607
1608	BUG_ON(!inode);
1609
1610	mlog_entry_void();
1611
1612	mlog(0, "inode %llu, take %s META lock\n",
1613	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1614	     ex ? "EXMODE" : "PRMODE");
1615
1616	status = 0;
1617	acquired = 0;
1618	/* We'll allow faking a readonly metadata lock for
1619	 * rodevices. */
1620	if (ocfs2_is_hard_readonly(osb)) {
1621		if (ex)
1622			status = -EROFS;
1623		goto bail;
1624	}
1625
1626	if (ocfs2_mount_local(osb))
1627		goto local;
1628
1629	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1630		wait_event(osb->recovery_event,
1631			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1632
1633	acquired = 0;
1634	lockres = &OCFS2_I(inode)->ip_meta_lockres;
1635	level = ex ? LKM_EXMODE : LKM_PRMODE;
1636	dlm_flags = 0;
1637	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1638		dlm_flags |= LKM_NOQUEUE;
1639
1640	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1641	if (status < 0) {
1642		if (status != -EAGAIN && status != -EIOCBRETRY)
1643			mlog_errno(status);
1644		goto bail;
1645	}
1646
1647	/* Notify the error cleanup path to drop the cluster lock. */
1648	acquired = 1;
1649
1650	/* We wait twice because a node may have died while we were in
1651	 * the lower dlm layers. The second time though, we've
1652	 * committed to owning this lock so we don't allow signals to
1653	 * abort the operation. */
1654	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1655		wait_event(osb->recovery_event,
1656			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1657
1658local:
1659	/*
1660	 * We only see this flag if we're being called from
1661	 * ocfs2_read_locked_inode(). It means we're locking an inode
1662	 * which hasn't been populated yet, so clear the refresh flag
1663	 * and let the caller handle it.
1664	 */
1665	if (inode->i_state & I_NEW) {
1666		status = 0;
1667		if (lockres)
1668			ocfs2_complete_lock_res_refresh(lockres, 0);
1669		goto bail;
1670	}
1671
1672	/* This is fun. The caller may want a bh back, or it may
1673	 * not. ocfs2_meta_lock_update definitely wants one in, but
1674	 * may or may not read one, depending on what's in the
1675	 * LVB. The result of all of this is that we've *only* gone to
1676	 * disk if we have to, so the complexity is worthwhile. */
1677	status = ocfs2_meta_lock_update(inode, &local_bh);
1678	if (status < 0) {
1679		if (status != -ENOENT)
1680			mlog_errno(status);
1681		goto bail;
1682	}
1683
1684	if (ret_bh) {
1685		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1686		if (status < 0) {
1687			mlog_errno(status);
1688			goto bail;
1689		}
1690	}
1691
1692bail:
1693	if (status < 0) {
1694		if (ret_bh && (*ret_bh)) {
1695			brelse(*ret_bh);
1696			*ret_bh = NULL;
1697		}
1698		if (acquired)
1699			ocfs2_meta_unlock(inode, ex);
1700	}
1701
1702	if (local_bh)
1703		brelse(local_bh);
1704
1705	mlog_exit(status);
1706	return status;
1707}
1708
1709/*
1710 * This is working around a lock inversion between tasks acquiring DLM locks
1711 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1712 * while acquiring page locks.
1713 *
1714 * ** These _with_page variantes are only intended to be called from aop
1715 * methods that hold page locks and return a very specific *positive* error
1716 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1717 *
1718 * The DLM is called such that it returns -EAGAIN if it would have blocked
1719 * waiting for the vote thread.  In that case we unlock our page so the vote
1720 * thread can make progress.  Once we've done this we have to return
1721 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1722 * into the VFS who will then immediately retry the aop call.
1723 *
1724 * We do a blocking lock and immediate unlock before returning, though, so that
1725 * the lock has a great chance of being cached on this node by the time the VFS
1726 * calls back to retry the aop.    This has a potential to livelock as nodes
1727 * ping locks back and forth, but that's a risk we're willing to take to avoid
1728 * the lock inversion simply.
1729 */
1730int ocfs2_meta_lock_with_page(struct inode *inode,
1731			      struct buffer_head **ret_bh,
1732			      int ex,
1733			      struct page *page)
1734{
1735	int ret;
1736
1737	ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
1738	if (ret == -EAGAIN) {
1739		unlock_page(page);
1740		if (ocfs2_meta_lock(inode, ret_bh, ex) == 0)
1741			ocfs2_meta_unlock(inode, ex);
1742		ret = AOP_TRUNCATED_PAGE;
1743	}
1744
1745	return ret;
1746}
1747
1748int ocfs2_meta_lock_atime(struct inode *inode,
1749			  struct vfsmount *vfsmnt,
1750			  int *level)
1751{
1752	int ret;
1753
1754	mlog_entry_void();
1755	ret = ocfs2_meta_lock(inode, NULL, 0);
1756	if (ret < 0) {
1757		mlog_errno(ret);
1758		return ret;
1759	}
1760
1761	/*
1762	 * If we should update atime, we will get EX lock,
1763	 * otherwise we just get PR lock.
1764	 */
1765	if (ocfs2_should_update_atime(inode, vfsmnt)) {
1766		struct buffer_head *bh = NULL;
1767
1768		ocfs2_meta_unlock(inode, 0);
1769		ret = ocfs2_meta_lock(inode, &bh, 1);
1770		if (ret < 0) {
1771			mlog_errno(ret);
1772			return ret;
1773		}
1774		*level = 1;
1775		if (ocfs2_should_update_atime(inode, vfsmnt))
1776			ocfs2_update_inode_atime(inode, bh);
1777		if (bh)
1778			brelse(bh);
1779	} else
1780		*level = 0;
1781
1782	mlog_exit(ret);
1783	return ret;
1784}
1785
1786void ocfs2_meta_unlock(struct inode *inode,
1787		       int ex)
1788{
1789	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1790	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1791	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1792
1793	mlog_entry_void();
1794
1795	mlog(0, "inode %llu drop %s META lock\n",
1796	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1797	     ex ? "EXMODE" : "PRMODE");
1798
1799	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
1800	    !ocfs2_mount_local(osb))
1801		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1802
1803	mlog_exit_void();
1804}
1805
1806int ocfs2_super_lock(struct ocfs2_super *osb,
1807		     int ex)
1808{
1809	int status = 0;
1810	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1811	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1812	struct buffer_head *bh;
1813	struct ocfs2_slot_info *si = osb->slot_info;
1814
1815	mlog_entry_void();
1816
1817	if (ocfs2_is_hard_readonly(osb))
1818		return -EROFS;
1819
1820	if (ocfs2_mount_local(osb))
1821		goto bail;
1822
1823	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1824	if (status < 0) {
1825		mlog_errno(status);
1826		goto bail;
1827	}
1828
1829	/* The super block lock path is really in the best position to
1830	 * know when resources covered by the lock need to be
1831	 * refreshed, so we do it here. Of course, making sense of
1832	 * everything is up to the caller :) */
1833	status = ocfs2_should_refresh_lock_res(lockres);
1834	if (status < 0) {
1835		mlog_errno(status);
1836		goto bail;
1837	}
1838	if (status) {
1839		bh = si->si_bh;
1840		status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1841					  si->si_inode);
1842		if (status == 0)
1843			ocfs2_update_slot_info(si);
1844
1845		ocfs2_complete_lock_res_refresh(lockres, status);
1846
1847		if (status < 0)
1848			mlog_errno(status);
1849	}
1850bail:
1851	mlog_exit(status);
1852	return status;
1853}
1854
1855void ocfs2_super_unlock(struct ocfs2_super *osb,
1856			int ex)
1857{
1858	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1859	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1860
1861	if (!ocfs2_mount_local(osb))
1862		ocfs2_cluster_unlock(osb, lockres, level);
1863}
1864
1865int ocfs2_rename_lock(struct ocfs2_super *osb)
1866{
1867	int status;
1868	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1869
1870	if (ocfs2_is_hard_readonly(osb))
1871		return -EROFS;
1872
1873	if (ocfs2_mount_local(osb))
1874		return 0;
1875
1876	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1877	if (status < 0)
1878		mlog_errno(status);
1879
1880	return status;
1881}
1882
1883void ocfs2_rename_unlock(struct ocfs2_super *osb)
1884{
1885	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1886
1887	if (!ocfs2_mount_local(osb))
1888		ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1889}
1890
1891int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1892{
1893	int ret;
1894	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1895	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1896	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1897
1898	BUG_ON(!dl);
1899
1900	if (ocfs2_is_hard_readonly(osb))
1901		return -EROFS;
1902
1903	if (ocfs2_mount_local(osb))
1904		return 0;
1905
1906	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1907	if (ret < 0)
1908		mlog_errno(ret);
1909
1910	return ret;
1911}
1912
1913void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1914{
1915	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1916	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1917	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1918
1919	if (!ocfs2_mount_local(osb))
1920		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1921}
1922
1923/* Reference counting of the dlm debug structure. We want this because
1924 * open references on the debug inodes can live on after a mount, so
1925 * we can't rely on the ocfs2_super to always exist. */
1926static void ocfs2_dlm_debug_free(struct kref *kref)
1927{
1928	struct ocfs2_dlm_debug *dlm_debug;
1929
1930	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1931
1932	kfree(dlm_debug);
1933}
1934
1935void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1936{
1937	if (dlm_debug)
1938		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1939}
1940
1941static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1942{
1943	kref_get(&debug->d_refcnt);
1944}
1945
1946struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1947{
1948	struct ocfs2_dlm_debug *dlm_debug;
1949
1950	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1951	if (!dlm_debug) {
1952		mlog_errno(-ENOMEM);
1953		goto out;
1954	}
1955
1956	kref_init(&dlm_debug->d_refcnt);
1957	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1958	dlm_debug->d_locking_state = NULL;
1959out:
1960	return dlm_debug;
1961}
1962
1963/* Access to this is arbitrated for us via seq_file->sem. */
1964struct ocfs2_dlm_seq_priv {
1965	struct ocfs2_dlm_debug *p_dlm_debug;
1966	struct ocfs2_lock_res p_iter_res;
1967	struct ocfs2_lock_res p_tmp_res;
1968};
1969
1970static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1971						 struct ocfs2_dlm_seq_priv *priv)
1972{
1973	struct ocfs2_lock_res *iter, *ret = NULL;
1974	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1975
1976	assert_spin_locked(&ocfs2_dlm_tracking_lock);
1977
1978	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1979		/* discover the head of the list */
1980		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1981			mlog(0, "End of list found, %p\n", ret);
1982			break;
1983		}
1984
1985		/* We track our "dummy" iteration lockres' by a NULL
1986		 * l_ops field. */
1987		if (iter->l_ops != NULL) {
1988			ret = iter;
1989			break;
1990		}
1991	}
1992
1993	return ret;
1994}
1995
1996static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1997{
1998	struct ocfs2_dlm_seq_priv *priv = m->private;
1999	struct ocfs2_lock_res *iter;
2000
2001	spin_lock(&ocfs2_dlm_tracking_lock);
2002	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2003	if (iter) {
2004		/* Since lockres' have the lifetime of their container
2005		 * (which can be inodes, ocfs2_supers, etc) we want to
2006		 * copy this out to a temporary lockres while still
2007		 * under the spinlock. Obviously after this we can't
2008		 * trust any pointers on the copy returned, but that's
2009		 * ok as the information we want isn't typically held
2010		 * in them. */
2011		priv->p_tmp_res = *iter;
2012		iter = &priv->p_tmp_res;
2013	}
2014	spin_unlock(&ocfs2_dlm_tracking_lock);
2015
2016	return iter;
2017}
2018
2019static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2020{
2021}
2022
2023static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2024{
2025	struct ocfs2_dlm_seq_priv *priv = m->private;
2026	struct ocfs2_lock_res *iter = v;
2027	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2028
2029	spin_lock(&ocfs2_dlm_tracking_lock);
2030	iter = ocfs2_dlm_next_res(iter, priv);
2031	list_del_init(&dummy->l_debug_list);
2032	if (iter) {
2033		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2034		priv->p_tmp_res = *iter;
2035		iter = &priv->p_tmp_res;
2036	}
2037	spin_unlock(&ocfs2_dlm_tracking_lock);
2038
2039	return iter;
2040}
2041
2042/* So that debugfs.ocfs2 can determine which format is being used */
2043#define OCFS2_DLM_DEBUG_STR_VERSION 1
2044static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2045{
2046	int i;
2047	char *lvb;
2048	struct ocfs2_lock_res *lockres = v;
2049
2050	if (!lockres)
2051		return -EINVAL;
2052
2053	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2054
2055	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2056		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2057			   lockres->l_name,
2058			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2059	else
2060		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2061
2062	seq_printf(m, "%d\t"
2063		   "0x%lx\t"
2064		   "0x%x\t"
2065		   "0x%x\t"
2066		   "%u\t"
2067		   "%u\t"
2068		   "%d\t"
2069		   "%d\t",
2070		   lockres->l_level,
2071		   lockres->l_flags,
2072		   lockres->l_action,
2073		   lockres->l_unlock_action,
2074		   lockres->l_ro_holders,
2075		   lockres->l_ex_holders,
2076		   lockres->l_requested,
2077		   lockres->l_blocking);
2078
2079	/* Dump the raw LVB */
2080	lvb = lockres->l_lksb.lvb;
2081	for(i = 0; i < DLM_LVB_LEN; i++)
2082		seq_printf(m, "0x%x\t", lvb[i]);
2083
2084	/* End the line */
2085	seq_printf(m, "\n");
2086	return 0;
2087}
2088
2089static struct seq_operations ocfs2_dlm_seq_ops = {
2090	.start =	ocfs2_dlm_seq_start,
2091	.stop =		ocfs2_dlm_seq_stop,
2092	.next =		ocfs2_dlm_seq_next,
2093	.show =		ocfs2_dlm_seq_show,
2094};
2095
2096static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2097{
2098	struct seq_file *seq = (struct seq_file *) file->private_data;
2099	struct ocfs2_dlm_seq_priv *priv = seq->private;
2100	struct ocfs2_lock_res *res = &priv->p_iter_res;
2101
2102	ocfs2_remove_lockres_tracking(res);
2103	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2104	return seq_release_private(inode, file);
2105}
2106
2107static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2108{
2109	int ret;
2110	struct ocfs2_dlm_seq_priv *priv;
2111	struct seq_file *seq;
2112	struct ocfs2_super *osb;
2113
2114	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2115	if (!priv) {
2116		ret = -ENOMEM;
2117		mlog_errno(ret);
2118		goto out;
2119	}
2120	osb = inode->i_private;
2121	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2122	priv->p_dlm_debug = osb->osb_dlm_debug;
2123	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2124
2125	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2126	if (ret) {
2127		kfree(priv);
2128		mlog_errno(ret);
2129		goto out;
2130	}
2131
2132	seq = (struct seq_file *) file->private_data;
2133	seq->private = priv;
2134
2135	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2136				   priv->p_dlm_debug);
2137
2138out:
2139	return ret;
2140}
2141
2142static const struct file_operations ocfs2_dlm_debug_fops = {
2143	.open =		ocfs2_dlm_debug_open,
2144	.release =	ocfs2_dlm_debug_release,
2145	.read =		seq_read,
2146	.llseek =	seq_lseek,
2147};
2148
2149static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2150{
2151	int ret = 0;
2152	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2153
2154	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2155							 S_IFREG|S_IRUSR,
2156							 osb->osb_debug_root,
2157							 osb,
2158							 &ocfs2_dlm_debug_fops);
2159	if (!dlm_debug->d_locking_state) {
2160		ret = -EINVAL;
2161		mlog(ML_ERROR,
2162		     "Unable to create locking state debugfs file.\n");
2163		goto out;
2164	}
2165
2166	ocfs2_get_dlm_debug(dlm_debug);
2167out:
2168	return ret;
2169}
2170
2171static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2172{
2173	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2174
2175	if (dlm_debug) {
2176		debugfs_remove(dlm_debug->d_locking_state);
2177		ocfs2_put_dlm_debug(dlm_debug);
2178	}
2179}
2180
2181int ocfs2_dlm_init(struct ocfs2_super *osb)
2182{
2183	int status = 0;
2184	u32 dlm_key;
2185	struct dlm_ctxt *dlm = NULL;
2186
2187	mlog_entry_void();
2188
2189	if (ocfs2_mount_local(osb))
2190		goto local;
2191
2192	status = ocfs2_dlm_init_debug(osb);
2193	if (status < 0) {
2194		mlog_errno(status);
2195		goto bail;
2196	}
2197
2198	/* launch vote thread */
2199	osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2200	if (IS_ERR(osb->vote_task)) {
2201		status = PTR_ERR(osb->vote_task);
2202		osb->vote_task = NULL;
2203		mlog_errno(status);
2204		goto bail;
2205	}
2206
2207	/* used by the dlm code to make message headers unique, each
2208	 * node in this domain must agree on this. */
2209	dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2210
2211	/* for now, uuid == domain */
2212	dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2213	if (IS_ERR(dlm)) {
2214		status = PTR_ERR(dlm);
2215		mlog_errno(status);
2216		goto bail;
2217	}
2218
2219	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2220
2221local:
2222	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2223	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2224
2225	osb->dlm = dlm;
2226
2227	status = 0;
2228bail:
2229	if (status < 0) {
2230		ocfs2_dlm_shutdown_debug(osb);
2231		if (osb->vote_task)
2232			kthread_stop(osb->vote_task);
2233	}
2234
2235	mlog_exit(status);
2236	return status;
2237}
2238
2239void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2240{
2241	mlog_entry_void();
2242
2243	dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2244
2245	ocfs2_drop_osb_locks(osb);
2246
2247	if (osb->vote_task) {
2248		kthread_stop(osb->vote_task);
2249		osb->vote_task = NULL;
2250	}
2251
2252	ocfs2_lock_res_free(&osb->osb_super_lockres);
2253	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2254
2255	dlm_unregister_domain(osb->dlm);
2256	osb->dlm = NULL;
2257
2258	ocfs2_dlm_shutdown_debug(osb);
2259
2260	mlog_exit_void();
2261}
2262
2263static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2264{
2265	struct ocfs2_lock_res *lockres = opaque;
2266	unsigned long flags;
2267
2268	mlog_entry_void();
2269
2270	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2271	     lockres->l_unlock_action);
2272
2273	spin_lock_irqsave(&lockres->l_lock, flags);
2274	/* We tried to cancel a convert request, but it was already
2275	 * granted. All we want to do here is clear our unlock
2276	 * state. The wake_up call done at the bottom is redundant
2277	 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2278	 * hurt anything anyway */
2279	if (status == DLM_CANCELGRANT &&
2280	    lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2281		mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2282
2283		/* We don't clear the busy flag in this case as it
2284		 * should have been cleared by the ast which the dlm
2285		 * has called. */
2286		goto complete_unlock;
2287	}
2288
2289	if (status != DLM_NORMAL) {
2290		mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2291		     "unlock_action %d\n", status, lockres->l_name,
2292		     lockres->l_unlock_action);
2293		spin_unlock_irqrestore(&lockres->l_lock, flags);
2294		return;
2295	}
2296
2297	switch(lockres->l_unlock_action) {
2298	case OCFS2_UNLOCK_CANCEL_CONVERT:
2299		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2300		lockres->l_action = OCFS2_AST_INVALID;
2301		break;
2302	case OCFS2_UNLOCK_DROP_LOCK:
2303		lockres->l_level = LKM_IVMODE;
2304		break;
2305	default:
2306		BUG();
2307	}
2308
2309	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2310complete_unlock:
2311	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2312	spin_unlock_irqrestore(&lockres->l_lock, flags);
2313
2314	wake_up(&lockres->l_event);
2315
2316	mlog_exit_void();
2317}
2318
2319static int ocfs2_drop_lock(struct ocfs2_super *osb,
2320			   struct ocfs2_lock_res *lockres)
2321{
2322	enum dlm_status status;
2323	unsigned long flags;
2324	int lkm_flags = 0;
2325
2326	/* We didn't get anywhere near actually using this lockres. */
2327	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2328		goto out;
2329
2330	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2331		lkm_flags |= LKM_VALBLK;
2332
2333	spin_lock_irqsave(&lockres->l_lock, flags);
2334
2335	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2336			"lockres %s, flags 0x%lx\n",
2337			lockres->l_name, lockres->l_flags);
2338
2339	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2340		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2341		     "%u, unlock_action = %u\n",
2342		     lockres->l_name, lockres->l_flags, lockres->l_action,
2343		     lockres->l_unlock_action);
2344
2345		spin_unlock_irqrestore(&lockres->l_lock, flags);
2346
2347		/* XXX: Today we just wait on any busy
2348		 * locks... Perhaps we need to cancel converts in the
2349		 * future? */
2350		ocfs2_wait_on_busy_lock(lockres);
2351
2352		spin_lock_irqsave(&lockres->l_lock, flags);
2353	}
2354
2355	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2356		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2357		    lockres->l_level == LKM_EXMODE &&
2358		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2359			lockres->l_ops->set_lvb(lockres);
2360	}
2361
2362	if (lockres->l_flags & OCFS2_LOCK_BUSY)
2363		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2364		     lockres->l_name);
2365	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2366		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2367
2368	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2369		spin_unlock_irqrestore(&lockres->l_lock, flags);
2370		goto out;
2371	}
2372
2373	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2374
2375	/* make sure we never get here while waiting for an ast to
2376	 * fire. */
2377	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2378
2379	/* is this necessary? */
2380	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2381	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2382	spin_unlock_irqrestore(&lockres->l_lock, flags);
2383
2384	mlog(0, "lock %s\n", lockres->l_name);
2385
2386	status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2387			   ocfs2_unlock_ast, lockres);
2388	if (status != DLM_NORMAL) {
2389		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2390		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2391		dlm_print_one_lock(lockres->l_lksb.lockid);
2392		BUG();
2393	}
2394	mlog(0, "lock %s, successfull return from dlmunlock\n",
2395	     lockres->l_name);
2396
2397	ocfs2_wait_on_busy_lock(lockres);
2398out:
2399	mlog_exit(0);
2400	return 0;
2401}
2402
2403/* Mark the lockres as being dropped. It will no longer be
2404 * queued if blocking, but we still may have to wait on it
2405 * being dequeued from the vote thread before we can consider
2406 * it safe to drop.
2407 *
2408 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2409void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2410{
2411	int status;
2412	struct ocfs2_mask_waiter mw;
2413	unsigned long flags;
2414
2415	ocfs2_init_mask_waiter(&mw);
2416
2417	spin_lock_irqsave(&lockres->l_lock, flags);
2418	lockres->l_flags |= OCFS2_LOCK_FREEING;
2419	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2420		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2421		spin_unlock_irqrestore(&lockres->l_lock, flags);
2422
2423		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2424
2425		status = ocfs2_wait_for_mask(&mw);
2426		if (status)
2427			mlog_errno(status);
2428
2429		spin_lock_irqsave(&lockres->l_lock, flags);
2430	}
2431	spin_unlock_irqrestore(&lockres->l_lock, flags);
2432}
2433
2434void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2435			       struct ocfs2_lock_res *lockres)
2436{
2437	int ret;
2438
2439	ocfs2_mark_lockres_freeing(lockres);
2440	ret = ocfs2_drop_lock(osb, lockres);
2441	if (ret)
2442		mlog_errno(ret);
2443}
2444
2445static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2446{
2447	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2448	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2449}
2450
2451int ocfs2_drop_inode_locks(struct inode *inode)
2452{
2453	int status, err;
2454
2455	mlog_entry_void();
2456
2457	/* No need to call ocfs2_mark_lockres_freeing here -
2458	 * ocfs2_clear_inode has done it for us. */
2459
2460	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2461			      &OCFS2_I(inode)->ip_data_lockres);
2462	if (err < 0)
2463		mlog_errno(err);
2464
2465	status = err;
2466
2467	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2468			      &OCFS2_I(inode)->ip_meta_lockres);
2469	if (err < 0)
2470		mlog_errno(err);
2471	if (err < 0 && !status)
2472		status = err;
2473
2474	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2475			      &OCFS2_I(inode)->ip_rw_lockres);
2476	if (err < 0)
2477		mlog_errno(err);
2478	if (err < 0 && !status)
2479		status = err;
2480
2481	mlog_exit(status);
2482	return status;
2483}
2484
2485static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2486				      int new_level)
2487{
2488	assert_spin_locked(&lockres->l_lock);
2489
2490	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2491
2492	if (lockres->l_level <= new_level) {
2493		mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2494		     lockres->l_level, new_level);
2495		BUG();
2496	}
2497
2498	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2499	     lockres->l_name, new_level, lockres->l_blocking);
2500
2501	lockres->l_action = OCFS2_AST_DOWNCONVERT;
2502	lockres->l_requested = new_level;
2503	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2504}
2505
2506static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2507				  struct ocfs2_lock_res *lockres,
2508				  int new_level,
2509				  int lvb)
2510{
2511	int ret, dlm_flags = LKM_CONVERT;
2512	enum dlm_status status;
2513
2514	mlog_entry_void();
2515
2516	if (lvb)
2517		dlm_flags |= LKM_VALBLK;
2518
2519	status = dlmlock(osb->dlm,
2520			 new_level,
2521			 &lockres->l_lksb,
2522			 dlm_flags,
2523			 lockres->l_name,
2524			 OCFS2_LOCK_ID_MAX_LEN - 1,
2525			 ocfs2_locking_ast,
2526			 lockres,
2527			 ocfs2_blocking_ast);
2528	if (status != DLM_NORMAL) {
2529		ocfs2_log_dlm_error("dlmlock", status, lockres);
2530		ret = -EINVAL;
2531		ocfs2_recover_from_dlm_error(lockres, 1);
2532		goto bail;
2533	}
2534
2535	ret = 0;
2536bail:
2537	mlog_exit(ret);
2538	return ret;
2539}
2540
2541/* returns 1 when the caller should unlock and call dlmunlock */
2542static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2543				        struct ocfs2_lock_res *lockres)
2544{
2545	assert_spin_locked(&lockres->l_lock);
2546
2547	mlog_entry_void();
2548	mlog(0, "lock %s\n", lockres->l_name);
2549
2550	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2551		/* If we're already trying to cancel a lock conversion
2552		 * then just drop the spinlock and allow the caller to
2553		 * requeue this lock. */
2554
2555		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2556		return 0;
2557	}
2558
2559	/* were we in a convert when we got the bast fire? */
2560	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2561	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
2562	/* set things up for the unlockast to know to just
2563	 * clear out the ast_action and unset busy, etc. */
2564	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2565
2566	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2567			"lock %s, invalid flags: 0x%lx\n",
2568			lockres->l_name, lockres->l_flags);
2569
2570	return 1;
2571}
2572
2573static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2574				struct ocfs2_lock_res *lockres)
2575{
2576	int ret;
2577	enum dlm_status status;
2578
2579	mlog_entry_void();
2580	mlog(0, "lock %s\n", lockres->l_name);
2581
2582	ret = 0;
2583	status = dlmunlock(osb->dlm,
2584			   &lockres->l_lksb,
2585			   LKM_CANCEL,
2586			   ocfs2_unlock_ast,
2587			   lockres);
2588	if (status != DLM_NORMAL) {
2589		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2590		ret = -EINVAL;
2591		ocfs2_recover_from_dlm_error(lockres, 0);
2592	}
2593
2594	mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2595
2596	mlog_exit(ret);
2597	return ret;
2598}
2599
2600static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2601			      struct ocfs2_lock_res *lockres,
2602			      struct ocfs2_unblock_ctl *ctl)
2603{
2604	unsigned long flags;
2605	int blocking;
2606	int new_level;
2607	int ret = 0;
2608	int set_lvb = 0;
2609
2610	mlog_entry_void();
2611
2612	spin_lock_irqsave(&lockres->l_lock, flags);
2613
2614	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2615
2616recheck:
2617	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2618		ctl->requeue = 1;
2619		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2620		spin_unlock_irqrestore(&lockres->l_lock, flags);
2621		if (ret) {
2622			ret = ocfs2_cancel_convert(osb, lockres);
2623			if (ret < 0)
2624				mlog_errno(ret);
2625		}
2626		goto leave;
2627	}
2628
2629	/* if we're blocking an exclusive and we have *any* holders,
2630	 * then requeue. */
2631	if ((lockres->l_blocking == LKM_EXMODE)
2632	    && (lockres->l_ex_holders || lockres->l_ro_holders))
2633		goto leave_requeue;
2634
2635	/* If it's a PR we're blocking, then only
2636	 * requeue if we've got any EX holders */
2637	if (lockres->l_blocking == LKM_PRMODE &&
2638	    lockres->l_ex_holders)
2639		goto leave_requeue;
2640
2641	/*
2642	 * Can we get a lock in this state if the holder counts are
2643	 * zero? The meta data unblock code used to check this.
2644	 */
2645	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2646	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2647		goto leave_requeue;
2648
2649	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2650
2651	if (lockres->l_ops->check_downconvert
2652	    && !lockres->l_ops->check_downconvert(lockres, new_level))
2653		goto leave_requeue;
2654
2655	/* If we get here, then we know that there are no more
2656	 * incompatible holders (and anyone asking for an incompatible
2657	 * lock is blocked). We can now downconvert the lock */
2658	if (!lockres->l_ops->downconvert_worker)
2659		goto downconvert;
2660
2661	/* Some lockres types want to do a bit of work before
2662	 * downconverting a lock. Allow that here. The worker function
2663	 * may sleep, so we save off a copy of what we're blocking as
2664	 * it may change while we're not holding the spin lock. */
2665	blocking = lockres->l_blocking;
2666	spin_unlock_irqrestore(&lockres->l_lock, flags);
2667
2668	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2669
2670	if (ctl->unblock_action == UNBLOCK_STOP_POST)
2671		goto leave;
2672
2673	spin_lock_irqsave(&lockres->l_lock, flags);
2674	if (blocking != lockres->l_blocking) {
2675		/* If this changed underneath us, then we can't drop
2676		 * it just yet. */
2677		goto recheck;
2678	}
2679
2680downconvert:
2681	ctl->requeue = 0;
2682
2683	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2684		if (lockres->l_level == LKM_EXMODE)
2685			set_lvb = 1;
2686
2687		/*
2688		 * We only set the lvb if the lock has been fully
2689		 * refreshed - otherwise we risk setting stale
2690		 * data. Otherwise, there's no need to actually clear
2691		 * out the lvb here as it's value is still valid.
2692		 */
2693		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2694			lockres->l_ops->set_lvb(lockres);
2695	}
2696
2697	ocfs2_prepare_downconvert(lockres, new_level);
2698	spin_unlock_irqrestore(&lockres->l_lock, flags);
2699	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2700leave:
2701	mlog_exit(ret);
2702	return ret;
2703
2704leave_requeue:
2705	spin_unlock_irqrestore(&lockres->l_lock, flags);
2706	ctl->requeue = 1;
2707
2708	mlog_exit(0);
2709	return 0;
2710}
2711
2712static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2713				     int blocking)
2714{
2715	struct inode *inode;
2716	struct address_space *mapping;
2717
2718       	inode = ocfs2_lock_res_inode(lockres);
2719	mapping = inode->i_mapping;
2720
2721	if (filemap_fdatawrite(mapping)) {
2722		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2723		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2724	}
2725	sync_mapping_buffers(mapping);
2726	if (blocking == LKM_EXMODE) {
2727		truncate_inode_pages(mapping, 0);
2728		unmap_mapping_range(mapping, 0, 0, 0);
2729	} else {
2730		/* We only need to wait on the I/O if we're not also
2731		 * truncating pages because truncate_inode_pages waits
2732		 * for us above. We don't truncate pages if we're
2733		 * blocking anything < EXMODE because we want to keep
2734		 * them around in that case. */
2735		filemap_fdatawait(mapping);
2736	}
2737
2738	return UNBLOCK_CONTINUE;
2739}
2740
2741static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
2742					int new_level)
2743{
2744	struct inode *inode = ocfs2_lock_res_inode(lockres);
2745	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
2746
2747	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2748	BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
2749
2750	if (checkpointed)
2751		return 1;
2752
2753	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
2754	return 0;
2755}
2756
2757static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2758{
2759	struct inode *inode = ocfs2_lock_res_inode(lockres);
2760
2761	__ocfs2_stuff_meta_lvb(inode);
2762}
2763
2764/*
2765 * Does the final reference drop on our dentry lock. Right now this
2766 * happens in the vote thread, but we could choose to simplify the
2767 * dlmglue API and push these off to the ocfs2_wq in the future.
2768 */
2769static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2770				     struct ocfs2_lock_res *lockres)
2771{
2772	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2773	ocfs2_dentry_lock_put(osb, dl);
2774}
2775
2776/*
2777 * d_delete() matching dentries before the lock downconvert.
2778 *
2779 * At this point, any process waiting to destroy the
2780 * dentry_lock due to last ref count is stopped by the
2781 * OCFS2_LOCK_QUEUED flag.
2782 *
2783 * We have two potential problems
2784 *
2785 * 1) If we do the last reference drop on our dentry_lock (via dput)
2786 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2787 *    the downconvert to finish. Instead we take an elevated
2788 *    reference and push the drop until after we've completed our
2789 *    unblock processing.
2790 *
2791 * 2) There might be another process with a final reference,
2792 *    waiting on us to finish processing. If this is the case, we
2793 *    detect it and exit out - there's no more dentries anyway.
2794 */
2795static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2796				       int blocking)
2797{
2798	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2799	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2800	struct dentry *dentry;
2801	unsigned long flags;
2802	int extra_ref = 0;
2803
2804	/*
2805	 * This node is blocking another node from getting a read
2806	 * lock. This happens when we've renamed within a
2807	 * directory. We've forced the other nodes to d_delete(), but
2808	 * we never actually dropped our lock because it's still
2809	 * valid. The downconvert code will retain a PR for this node,
2810	 * so there's no further work to do.
2811	 */
2812	if (blocking == LKM_PRMODE)
2813		return UNBLOCK_CONTINUE;
2814
2815	/*
2816	 * Mark this inode as potentially orphaned. The code in
2817	 * ocfs2_delete_inode() will figure out whether it actually
2818	 * needs to be freed or not.
2819	 */
2820	spin_lock(&oi->ip_lock);
2821	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2822	spin_unlock(&oi->ip_lock);
2823
2824	/*
2825	 * Yuck. We need to make sure however that the check of
2826	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
2827	 * respect to a reference decrement or the setting of that
2828	 * flag.
2829	 */
2830	spin_lock_irqsave(&lockres->l_lock, flags);
2831	spin_lock(&dentry_attach_lock);
2832	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2833	    && dl->dl_count) {
2834		dl->dl_count++;
2835		extra_ref = 1;
2836	}
2837	spin_unlock(&dentry_attach_lock);
2838	spin_unlock_irqrestore(&lockres->l_lock, flags);
2839
2840	mlog(0, "extra_ref = %d\n", extra_ref);
2841
2842	/*
2843	 * We have a process waiting on us in ocfs2_dentry_iput(),
2844	 * which means we can't have any more outstanding
2845	 * aliases. There's no need to do any more work.
2846	 */
2847	if (!extra_ref)
2848		return UNBLOCK_CONTINUE;
2849
2850	spin_lock(&dentry_attach_lock);
2851	while (1) {
2852		dentry = ocfs2_find_local_alias(dl->dl_inode,
2853						dl->dl_parent_blkno, 1);
2854		if (!dentry)
2855			break;
2856		spin_unlock(&dentry_attach_lock);
2857
2858		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2859		     dentry->d_name.name);
2860
2861		/*
2862		 * The following dcache calls may do an
2863		 * iput(). Normally we don't want that from the
2864		 * downconverting thread, but in this case it's ok
2865		 * because the requesting node already has an
2866		 * exclusive lock on the inode, so it can't be queued
2867		 * for a downconvert.
2868		 */
2869		d_delete(dentry);
2870		dput(dentry);
2871
2872		spin_lock(&dentry_attach_lock);
2873	}
2874	spin_unlock(&dentry_attach_lock);
2875
2876	/*
2877	 * If we are the last holder of this dentry lock, there is no
2878	 * reason to downconvert so skip straight to the unlock.
2879	 */
2880	if (dl->dl_count == 1)
2881		return UNBLOCK_STOP_POST;
2882
2883	return UNBLOCK_CONTINUE_POST;
2884}
2885
2886void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
2887				struct ocfs2_lock_res *lockres)
2888{
2889	int status;
2890	struct ocfs2_unblock_ctl ctl = {0, 0,};
2891	unsigned long flags;
2892
2893	/* Our reference to the lockres in this function can be
2894	 * considered valid until we remove the OCFS2_LOCK_QUEUED
2895	 * flag. */
2896
2897	mlog_entry_void();
2898
2899	BUG_ON(!lockres);
2900	BUG_ON(!lockres->l_ops);
2901
2902	mlog(0, "lockres %s blocked.\n", lockres->l_name);
2903
2904	/* Detect whether a lock has been marked as going away while
2905	 * the vote thread was processing other things. A lock can
2906	 * still be marked with OCFS2_LOCK_FREEING after this check,
2907	 * but short circuiting here will still save us some
2908	 * performance. */
2909	spin_lock_irqsave(&lockres->l_lock, flags);
2910	if (lockres->l_flags & OCFS2_LOCK_FREEING)
2911		goto unqueue;
2912	spin_unlock_irqrestore(&lockres->l_lock, flags);
2913
2914	status = ocfs2_unblock_lock(osb, lockres, &ctl);
2915	if (status < 0)
2916		mlog_errno(status);
2917
2918	spin_lock_irqsave(&lockres->l_lock, flags);
2919unqueue:
2920	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
2921		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
2922	} else
2923		ocfs2_schedule_blocked_lock(osb, lockres);
2924
2925	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
2926	     ctl.requeue ? "yes" : "no");
2927	spin_unlock_irqrestore(&lockres->l_lock, flags);
2928
2929	if (ctl.unblock_action != UNBLOCK_CONTINUE
2930	    && lockres->l_ops->post_unlock)
2931		lockres->l_ops->post_unlock(osb, lockres);
2932
2933	mlog_exit_void();
2934}
2935
2936static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
2937					struct ocfs2_lock_res *lockres)
2938{
2939	mlog_entry_void();
2940
2941	assert_spin_locked(&lockres->l_lock);
2942
2943	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
2944		/* Do not schedule a lock for downconvert when it's on
2945		 * the way to destruction - any nodes wanting access
2946		 * to the resource will get it soon. */
2947		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
2948		     lockres->l_name, lockres->l_flags);
2949		return;
2950	}
2951
2952	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
2953
2954	spin_lock(&osb->vote_task_lock);
2955	if (list_empty(&lockres->l_blocked_list)) {
2956		list_add_tail(&lockres->l_blocked_list,
2957			      &osb->blocked_lock_list);
2958		osb->blocked_lock_count++;
2959	}
2960	spin_unlock(&osb->vote_task_lock);
2961
2962	mlog_exit_void();
2963}
2964
2965/* This aids in debugging situations where a bad LVB might be involved. */
2966void ocfs2_dump_meta_lvb_info(u64 level,
2967			      const char *function,
2968			      unsigned int line,
2969			      struct ocfs2_lock_res *lockres)
2970{
2971	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
2972
2973	mlog(level, "LVB information for %s (called from %s:%u):\n",
2974	     lockres->l_name, function, line);
2975	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
2976	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
2977	     be32_to_cpu(lvb->lvb_igeneration));
2978	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
2979	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
2980	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
2981	     be16_to_cpu(lvb->lvb_imode));
2982	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
2983	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
2984	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
2985	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
2986	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
2987	     be32_to_cpu(lvb->lvb_iattr));
2988}
2989