dlmglue.c revision 08280f11de91beac2f5234ce5fc2ed246dfe6a86
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/smp_lock.h>
31#include <linux/crc32.h>
32#include <linux/kthread.h>
33#include <linux/pagemap.h>
34#include <linux/debugfs.h>
35#include <linux/seq_file.h>
36
37#include <cluster/heartbeat.h>
38#include <cluster/nodemanager.h>
39#include <cluster/tcp.h>
40
41#include <dlm/dlmapi.h>
42
43#define MLOG_MASK_PREFIX ML_DLM_GLUE
44#include <cluster/masklog.h>
45
46#include "ocfs2.h"
47
48#include "alloc.h"
49#include "dcache.h"
50#include "dlmglue.h"
51#include "extent_map.h"
52#include "heartbeat.h"
53#include "inode.h"
54#include "journal.h"
55#include "slot_map.h"
56#include "super.h"
57#include "uptodate.h"
58#include "vote.h"
59
60#include "buffer_head_io.h"
61
62struct ocfs2_mask_waiter {
63	struct list_head	mw_item;
64	int			mw_status;
65	struct completion	mw_complete;
66	unsigned long		mw_mask;
67	unsigned long		mw_goal;
68};
69
70static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
72
73/*
74 * Return value from ocfs2_convert_worker_t functions.
75 *
76 * These control the precise actions of ocfs2_generic_unblock_lock()
77 * and ocfs2_process_blocked_lock()
78 *
79 */
80enum ocfs2_unblock_action {
81	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
82	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
83				      * ->post_unlock callback */
84	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
85				      * ->post_unlock() callback. */
86};
87
88struct ocfs2_unblock_ctl {
89	int requeue;
90	enum ocfs2_unblock_action unblock_action;
91};
92
93static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
94			      struct ocfs2_unblock_ctl *ctl);
95static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
96					int new_level);
97static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
98
99static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
100			      struct ocfs2_unblock_ctl *ctl);
101static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
102				    struct ocfs2_unblock_ctl *ctl);
103static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
104				     struct ocfs2_unblock_ctl *ctl);
105static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
106				  struct ocfs2_unblock_ctl *ctl);
107
108static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
109				     struct ocfs2_lock_res *lockres);
110
111/*
112 * OCFS2 Lock Resource Operations
113 *
114 * These fine tune the behavior of the generic dlmglue locking infrastructure.
115 */
116struct ocfs2_lock_res_ops {
117	/*
118	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
119	 * this callback if ->l_priv is not an ocfs2_super pointer
120	 */
121	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
122	int  (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
123	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
124
125	/*
126	 * Allow a lock type to add checks to determine whether it is
127	 * safe to downconvert a lock. Return 0 to re-queue the
128	 * downconvert at a later time, nonzero to continue.
129	 *
130	 * For most locks, the default checks that there are no
131	 * incompatible holders are sufficient.
132	 *
133	 * Called with the lockres spinlock held.
134	 */
135	int (*check_downconvert)(struct ocfs2_lock_res *, int);
136
137	/*
138	 * Allows a lock type to populate the lock value block. This
139	 * is called on downconvert, and when we drop a lock.
140	 *
141	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
142	 * in the flags field.
143	 *
144	 * Called with the lockres spinlock held.
145	 */
146	void (*set_lvb)(struct ocfs2_lock_res *);
147
148	/*
149	 * LOCK_TYPE_* flags which describe the specific requirements
150	 * of a lock type. Descriptions of each individual flag follow.
151	 */
152	int flags;
153};
154
155/*
156 * Some locks want to "refresh" potentially stale data when a
157 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
158 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
159 * individual lockres l_flags member from the ast function. It is
160 * expected that the locking wrapper will clear the
161 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
162 */
163#define LOCK_TYPE_REQUIRES_REFRESH 0x1
164
165/*
166 * Indicate that a lock type makes use of the lock value block. The
167 * ->set_lvb lock type callback must be defined.
168 */
169#define LOCK_TYPE_USES_LVB		0x2
170
171typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
172static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
173				      struct ocfs2_lock_res *lockres,
174				      struct ocfs2_unblock_ctl *ctl,
175				      ocfs2_convert_worker_t *worker);
176
177static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
178	.get_osb	= ocfs2_get_inode_osb,
179	.unblock	= ocfs2_unblock_inode_lock,
180	.flags		= 0,
181};
182
183static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
184	.get_osb	= ocfs2_get_inode_osb,
185	.unblock	= ocfs2_unblock_meta,
186	.check_downconvert = ocfs2_check_meta_downconvert,
187	.set_lvb	= ocfs2_set_meta_lvb,
188	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
189};
190
191static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
192	.get_osb	= ocfs2_get_inode_osb,
193	.unblock	= ocfs2_unblock_data,
194	.flags		= 0,
195};
196
197static struct ocfs2_lock_res_ops ocfs2_super_lops = {
198	.unblock	= ocfs2_unblock_osb_lock,
199	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
200};
201
202static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
203	.unblock	= ocfs2_unblock_osb_lock,
204	.flags		= 0,
205};
206
207static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
208	.get_osb	= ocfs2_get_dentry_osb,
209	.unblock	= ocfs2_unblock_dentry_lock,
210	.post_unlock	= ocfs2_dentry_post_unlock,
211	.flags		= 0,
212};
213
214static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
215{
216	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
217		lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
218		lockres->l_type == OCFS2_LOCK_TYPE_RW;
219}
220
221static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
222{
223	BUG_ON(!ocfs2_is_inode_lock(lockres));
224
225	return (struct inode *) lockres->l_priv;
226}
227
228static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
229{
230	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
231
232	return (struct ocfs2_dentry_lock *)lockres->l_priv;
233}
234
235static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
236{
237	if (lockres->l_ops->get_osb)
238		return lockres->l_ops->get_osb(lockres);
239
240	return (struct ocfs2_super *)lockres->l_priv;
241}
242
243static int ocfs2_lock_create(struct ocfs2_super *osb,
244			     struct ocfs2_lock_res *lockres,
245			     int level,
246			     int dlm_flags);
247static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
248						     int wanted);
249static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
250				 struct ocfs2_lock_res *lockres,
251				 int level);
252static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
253static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
254static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
255static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
256static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
257					struct ocfs2_lock_res *lockres);
258static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
259						int convert);
260#define ocfs2_log_dlm_error(_func, _stat, _lockres) do {	\
261	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "	\
262		"resource %s: %s\n", dlm_errname(_stat), _func,	\
263		_lockres->l_name, dlm_errmsg(_stat));		\
264} while (0)
265static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
266				 struct ocfs2_lock_res *lockres);
267static int ocfs2_meta_lock_update(struct inode *inode,
268				  struct buffer_head **bh);
269static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
270static inline int ocfs2_highest_compat_lock_level(int level);
271
272static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
273				  u64 blkno,
274				  u32 generation,
275				  char *name)
276{
277	int len;
278
279	mlog_entry_void();
280
281	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
282
283	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
284		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
285		       (long long)blkno, generation);
286
287	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
288
289	mlog(0, "built lock resource with name: %s\n", name);
290
291	mlog_exit_void();
292}
293
294static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
295
296static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
297				       struct ocfs2_dlm_debug *dlm_debug)
298{
299	mlog(0, "Add tracking for lockres %s\n", res->l_name);
300
301	spin_lock(&ocfs2_dlm_tracking_lock);
302	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
303	spin_unlock(&ocfs2_dlm_tracking_lock);
304}
305
306static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
307{
308	spin_lock(&ocfs2_dlm_tracking_lock);
309	if (!list_empty(&res->l_debug_list))
310		list_del_init(&res->l_debug_list);
311	spin_unlock(&ocfs2_dlm_tracking_lock);
312}
313
314static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
315				       struct ocfs2_lock_res *res,
316				       enum ocfs2_lock_type type,
317				       struct ocfs2_lock_res_ops *ops,
318				       void *priv)
319{
320	res->l_type          = type;
321	res->l_ops           = ops;
322	res->l_priv          = priv;
323
324	res->l_level         = LKM_IVMODE;
325	res->l_requested     = LKM_IVMODE;
326	res->l_blocking      = LKM_IVMODE;
327	res->l_action        = OCFS2_AST_INVALID;
328	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
329
330	res->l_flags         = OCFS2_LOCK_INITIALIZED;
331
332	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
333}
334
335void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
336{
337	/* This also clears out the lock status block */
338	memset(res, 0, sizeof(struct ocfs2_lock_res));
339	spin_lock_init(&res->l_lock);
340	init_waitqueue_head(&res->l_event);
341	INIT_LIST_HEAD(&res->l_blocked_list);
342	INIT_LIST_HEAD(&res->l_mask_waiters);
343}
344
345void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
346			       enum ocfs2_lock_type type,
347			       unsigned int generation,
348			       struct inode *inode)
349{
350	struct ocfs2_lock_res_ops *ops;
351
352	switch(type) {
353		case OCFS2_LOCK_TYPE_RW:
354			ops = &ocfs2_inode_rw_lops;
355			break;
356		case OCFS2_LOCK_TYPE_META:
357			ops = &ocfs2_inode_meta_lops;
358			break;
359		case OCFS2_LOCK_TYPE_DATA:
360			ops = &ocfs2_inode_data_lops;
361			break;
362		default:
363			mlog_bug_on_msg(1, "type: %d\n", type);
364			ops = NULL; /* thanks, gcc */
365			break;
366	};
367
368	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
369			      generation, res->l_name);
370	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
371}
372
373static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
374{
375	struct inode *inode = ocfs2_lock_res_inode(lockres);
376
377	return OCFS2_SB(inode->i_sb);
378}
379
380static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
381{
382	__be64 inode_blkno_be;
383
384	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
385	       sizeof(__be64));
386
387	return be64_to_cpu(inode_blkno_be);
388}
389
390static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
391{
392	struct ocfs2_dentry_lock *dl = lockres->l_priv;
393
394	return OCFS2_SB(dl->dl_inode->i_sb);
395}
396
397void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
398				u64 parent, struct inode *inode)
399{
400	int len;
401	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
402	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
403	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
404
405	ocfs2_lock_res_init_once(lockres);
406
407	/*
408	 * Unfortunately, the standard lock naming scheme won't work
409	 * here because we have two 16 byte values to use. Instead,
410	 * we'll stuff the inode number as a binary value. We still
411	 * want error prints to show something without garbling the
412	 * display, so drop a null byte in there before the inode
413	 * number. A future version of OCFS2 will likely use all
414	 * binary lock names. The stringified names have been a
415	 * tremendous aid in debugging, but now that the debugfs
416	 * interface exists, we can mangle things there if need be.
417	 *
418	 * NOTE: We also drop the standard "pad" value (the total lock
419	 * name size stays the same though - the last part is all
420	 * zeros due to the memset in ocfs2_lock_res_init_once()
421	 */
422	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
423		       "%c%016llx",
424		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
425		       (long long)parent);
426
427	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
428
429	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
430	       sizeof(__be64));
431
432	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
433				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
434				   dl);
435}
436
437static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
438				      struct ocfs2_super *osb)
439{
440	/* Superblock lockres doesn't come from a slab so we call init
441	 * once on it manually.  */
442	ocfs2_lock_res_init_once(res);
443	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
444			      0, res->l_name);
445	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
446				   &ocfs2_super_lops, osb);
447}
448
449static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
450				       struct ocfs2_super *osb)
451{
452	/* Rename lockres doesn't come from a slab so we call init
453	 * once on it manually.  */
454	ocfs2_lock_res_init_once(res);
455	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
456	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
457				   &ocfs2_rename_lops, osb);
458}
459
460void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
461{
462	mlog_entry_void();
463
464	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
465		return;
466
467	ocfs2_remove_lockres_tracking(res);
468
469	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
470			"Lockres %s is on the blocked list\n",
471			res->l_name);
472	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
473			"Lockres %s has mask waiters pending\n",
474			res->l_name);
475	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
476			"Lockres %s is locked\n",
477			res->l_name);
478	mlog_bug_on_msg(res->l_ro_holders,
479			"Lockres %s has %u ro holders\n",
480			res->l_name, res->l_ro_holders);
481	mlog_bug_on_msg(res->l_ex_holders,
482			"Lockres %s has %u ex holders\n",
483			res->l_name, res->l_ex_holders);
484
485	/* Need to clear out the lock status block for the dlm */
486	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
487
488	res->l_flags = 0UL;
489	mlog_exit_void();
490}
491
492static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
493				     int level)
494{
495	mlog_entry_void();
496
497	BUG_ON(!lockres);
498
499	switch(level) {
500	case LKM_EXMODE:
501		lockres->l_ex_holders++;
502		break;
503	case LKM_PRMODE:
504		lockres->l_ro_holders++;
505		break;
506	default:
507		BUG();
508	}
509
510	mlog_exit_void();
511}
512
513static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
514				     int level)
515{
516	mlog_entry_void();
517
518	BUG_ON(!lockres);
519
520	switch(level) {
521	case LKM_EXMODE:
522		BUG_ON(!lockres->l_ex_holders);
523		lockres->l_ex_holders--;
524		break;
525	case LKM_PRMODE:
526		BUG_ON(!lockres->l_ro_holders);
527		lockres->l_ro_holders--;
528		break;
529	default:
530		BUG();
531	}
532	mlog_exit_void();
533}
534
535/* WARNING: This function lives in a world where the only three lock
536 * levels are EX, PR, and NL. It *will* have to be adjusted when more
537 * lock types are added. */
538static inline int ocfs2_highest_compat_lock_level(int level)
539{
540	int new_level = LKM_EXMODE;
541
542	if (level == LKM_EXMODE)
543		new_level = LKM_NLMODE;
544	else if (level == LKM_PRMODE)
545		new_level = LKM_PRMODE;
546	return new_level;
547}
548
549static void lockres_set_flags(struct ocfs2_lock_res *lockres,
550			      unsigned long newflags)
551{
552	struct list_head *pos, *tmp;
553	struct ocfs2_mask_waiter *mw;
554
555 	assert_spin_locked(&lockres->l_lock);
556
557	lockres->l_flags = newflags;
558
559	list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
560		mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
561		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
562			continue;
563
564		list_del_init(&mw->mw_item);
565		mw->mw_status = 0;
566		complete(&mw->mw_complete);
567	}
568}
569static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
570{
571	lockres_set_flags(lockres, lockres->l_flags | or);
572}
573static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
574				unsigned long clear)
575{
576	lockres_set_flags(lockres, lockres->l_flags & ~clear);
577}
578
579static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
580{
581	mlog_entry_void();
582
583	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
584	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
585	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
586	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
587
588	lockres->l_level = lockres->l_requested;
589	if (lockres->l_level <=
590	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
591		lockres->l_blocking = LKM_NLMODE;
592		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
593	}
594	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
595
596	mlog_exit_void();
597}
598
599static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
600{
601	mlog_entry_void();
602
603	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
604	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
605
606	/* Convert from RO to EX doesn't really need anything as our
607	 * information is already up to data. Convert from NL to
608	 * *anything* however should mark ourselves as needing an
609	 * update */
610	if (lockres->l_level == LKM_NLMODE &&
611	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
612		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
613
614	lockres->l_level = lockres->l_requested;
615	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
616
617	mlog_exit_void();
618}
619
620static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
621{
622	mlog_entry_void();
623
624	BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
625	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
626
627	if (lockres->l_requested > LKM_NLMODE &&
628	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
629	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
630		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
631
632	lockres->l_level = lockres->l_requested;
633	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
634	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
635
636	mlog_exit_void();
637}
638
639static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
640				     int level)
641{
642	int needs_downconvert = 0;
643	mlog_entry_void();
644
645	assert_spin_locked(&lockres->l_lock);
646
647	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
648
649	if (level > lockres->l_blocking) {
650		/* only schedule a downconvert if we haven't already scheduled
651		 * one that goes low enough to satisfy the level we're
652		 * blocking.  this also catches the case where we get
653		 * duplicate BASTs */
654		if (ocfs2_highest_compat_lock_level(level) <
655		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
656			needs_downconvert = 1;
657
658		lockres->l_blocking = level;
659	}
660
661	mlog_exit(needs_downconvert);
662	return needs_downconvert;
663}
664
665static void ocfs2_blocking_ast(void *opaque, int level)
666{
667	struct ocfs2_lock_res *lockres = opaque;
668	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
669	int needs_downconvert;
670	unsigned long flags;
671
672	BUG_ON(level <= LKM_NLMODE);
673
674	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
675	     lockres->l_name, level, lockres->l_level,
676	     ocfs2_lock_type_string(lockres->l_type));
677
678	spin_lock_irqsave(&lockres->l_lock, flags);
679	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
680	if (needs_downconvert)
681		ocfs2_schedule_blocked_lock(osb, lockres);
682	spin_unlock_irqrestore(&lockres->l_lock, flags);
683
684	wake_up(&lockres->l_event);
685
686	ocfs2_kick_vote_thread(osb);
687}
688
689static void ocfs2_locking_ast(void *opaque)
690{
691	struct ocfs2_lock_res *lockres = opaque;
692	struct dlm_lockstatus *lksb = &lockres->l_lksb;
693	unsigned long flags;
694
695	spin_lock_irqsave(&lockres->l_lock, flags);
696
697	if (lksb->status != DLM_NORMAL) {
698		mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
699		     lockres->l_name, lksb->status);
700		spin_unlock_irqrestore(&lockres->l_lock, flags);
701		return;
702	}
703
704	switch(lockres->l_action) {
705	case OCFS2_AST_ATTACH:
706		ocfs2_generic_handle_attach_action(lockres);
707		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
708		break;
709	case OCFS2_AST_CONVERT:
710		ocfs2_generic_handle_convert_action(lockres);
711		break;
712	case OCFS2_AST_DOWNCONVERT:
713		ocfs2_generic_handle_downconvert_action(lockres);
714		break;
715	default:
716		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
717		     "lockres flags = 0x%lx, unlock action: %u\n",
718		     lockres->l_name, lockres->l_action, lockres->l_flags,
719		     lockres->l_unlock_action);
720		BUG();
721	}
722
723	/* set it to something invalid so if we get called again we
724	 * can catch it. */
725	lockres->l_action = OCFS2_AST_INVALID;
726
727	wake_up(&lockres->l_event);
728	spin_unlock_irqrestore(&lockres->l_lock, flags);
729}
730
731static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
732						int convert)
733{
734	unsigned long flags;
735
736	mlog_entry_void();
737	spin_lock_irqsave(&lockres->l_lock, flags);
738	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
739	if (convert)
740		lockres->l_action = OCFS2_AST_INVALID;
741	else
742		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
743	spin_unlock_irqrestore(&lockres->l_lock, flags);
744
745	wake_up(&lockres->l_event);
746	mlog_exit_void();
747}
748
749/* Note: If we detect another process working on the lock (i.e.,
750 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
751 * to do the right thing in that case.
752 */
753static int ocfs2_lock_create(struct ocfs2_super *osb,
754			     struct ocfs2_lock_res *lockres,
755			     int level,
756			     int dlm_flags)
757{
758	int ret = 0;
759	enum dlm_status status;
760	unsigned long flags;
761
762	mlog_entry_void();
763
764	mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
765	     dlm_flags);
766
767	spin_lock_irqsave(&lockres->l_lock, flags);
768	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
769	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
770		spin_unlock_irqrestore(&lockres->l_lock, flags);
771		goto bail;
772	}
773
774	lockres->l_action = OCFS2_AST_ATTACH;
775	lockres->l_requested = level;
776	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
777	spin_unlock_irqrestore(&lockres->l_lock, flags);
778
779	status = dlmlock(osb->dlm,
780			 level,
781			 &lockres->l_lksb,
782			 dlm_flags,
783			 lockres->l_name,
784			 OCFS2_LOCK_ID_MAX_LEN - 1,
785			 ocfs2_locking_ast,
786			 lockres,
787			 ocfs2_blocking_ast);
788	if (status != DLM_NORMAL) {
789		ocfs2_log_dlm_error("dlmlock", status, lockres);
790		ret = -EINVAL;
791		ocfs2_recover_from_dlm_error(lockres, 1);
792	}
793
794	mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
795
796bail:
797	mlog_exit(ret);
798	return ret;
799}
800
801static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
802					int flag)
803{
804	unsigned long flags;
805	int ret;
806
807	spin_lock_irqsave(&lockres->l_lock, flags);
808	ret = lockres->l_flags & flag;
809	spin_unlock_irqrestore(&lockres->l_lock, flags);
810
811	return ret;
812}
813
814static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
815
816{
817	wait_event(lockres->l_event,
818		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
819}
820
821static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
822
823{
824	wait_event(lockres->l_event,
825		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
826}
827
828/* predict what lock level we'll be dropping down to on behalf
829 * of another node, and return true if the currently wanted
830 * level will be compatible with it. */
831static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
832						     int wanted)
833{
834	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
835
836	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
837}
838
839static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
840{
841	INIT_LIST_HEAD(&mw->mw_item);
842	init_completion(&mw->mw_complete);
843}
844
845static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
846{
847	wait_for_completion(&mw->mw_complete);
848	/* Re-arm the completion in case we want to wait on it again */
849	INIT_COMPLETION(mw->mw_complete);
850	return mw->mw_status;
851}
852
853static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
854				    struct ocfs2_mask_waiter *mw,
855				    unsigned long mask,
856				    unsigned long goal)
857{
858	BUG_ON(!list_empty(&mw->mw_item));
859
860	assert_spin_locked(&lockres->l_lock);
861
862	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
863	mw->mw_mask = mask;
864	mw->mw_goal = goal;
865}
866
867/* returns 0 if the mw that was removed was already satisfied, -EBUSY
868 * if the mask still hadn't reached its goal */
869static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
870				      struct ocfs2_mask_waiter *mw)
871{
872	unsigned long flags;
873	int ret = 0;
874
875	spin_lock_irqsave(&lockres->l_lock, flags);
876	if (!list_empty(&mw->mw_item)) {
877		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
878			ret = -EBUSY;
879
880		list_del_init(&mw->mw_item);
881		init_completion(&mw->mw_complete);
882	}
883	spin_unlock_irqrestore(&lockres->l_lock, flags);
884
885	return ret;
886
887}
888
889static int ocfs2_cluster_lock(struct ocfs2_super *osb,
890			      struct ocfs2_lock_res *lockres,
891			      int level,
892			      int lkm_flags,
893			      int arg_flags)
894{
895	struct ocfs2_mask_waiter mw;
896	enum dlm_status status;
897	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
898	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
899	unsigned long flags;
900
901	mlog_entry_void();
902
903	ocfs2_init_mask_waiter(&mw);
904
905	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
906		lkm_flags |= LKM_VALBLK;
907
908again:
909	wait = 0;
910
911	if (catch_signals && signal_pending(current)) {
912		ret = -ERESTARTSYS;
913		goto out;
914	}
915
916	spin_lock_irqsave(&lockres->l_lock, flags);
917
918	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
919			"Cluster lock called on freeing lockres %s! flags "
920			"0x%lx\n", lockres->l_name, lockres->l_flags);
921
922	/* We only compare against the currently granted level
923	 * here. If the lock is blocked waiting on a downconvert,
924	 * we'll get caught below. */
925	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
926	    level > lockres->l_level) {
927		/* is someone sitting in dlm_lock? If so, wait on
928		 * them. */
929		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
930		wait = 1;
931		goto unlock;
932	}
933
934	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
935		/* lock has not been created yet. */
936		spin_unlock_irqrestore(&lockres->l_lock, flags);
937
938		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
939		if (ret < 0) {
940			mlog_errno(ret);
941			goto out;
942		}
943		goto again;
944	}
945
946	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
947	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
948		/* is the lock is currently blocked on behalf of
949		 * another node */
950		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
951		wait = 1;
952		goto unlock;
953	}
954
955	if (level > lockres->l_level) {
956		if (lockres->l_action != OCFS2_AST_INVALID)
957			mlog(ML_ERROR, "lockres %s has action %u pending\n",
958			     lockres->l_name, lockres->l_action);
959
960		lockres->l_action = OCFS2_AST_CONVERT;
961		lockres->l_requested = level;
962		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
963		spin_unlock_irqrestore(&lockres->l_lock, flags);
964
965		BUG_ON(level == LKM_IVMODE);
966		BUG_ON(level == LKM_NLMODE);
967
968		mlog(0, "lock %s, convert from %d to level = %d\n",
969		     lockres->l_name, lockres->l_level, level);
970
971		/* call dlm_lock to upgrade lock now */
972		status = dlmlock(osb->dlm,
973				 level,
974				 &lockres->l_lksb,
975				 lkm_flags|LKM_CONVERT,
976				 lockres->l_name,
977				 OCFS2_LOCK_ID_MAX_LEN - 1,
978				 ocfs2_locking_ast,
979				 lockres,
980				 ocfs2_blocking_ast);
981		if (status != DLM_NORMAL) {
982			if ((lkm_flags & LKM_NOQUEUE) &&
983			    (status == DLM_NOTQUEUED))
984				ret = -EAGAIN;
985			else {
986				ocfs2_log_dlm_error("dlmlock", status,
987						    lockres);
988				ret = -EINVAL;
989			}
990			ocfs2_recover_from_dlm_error(lockres, 1);
991			goto out;
992		}
993
994		mlog(0, "lock %s, successfull return from dlmlock\n",
995		     lockres->l_name);
996
997		/* At this point we've gone inside the dlm and need to
998		 * complete our work regardless. */
999		catch_signals = 0;
1000
1001		/* wait for busy to clear and carry on */
1002		goto again;
1003	}
1004
1005	/* Ok, if we get here then we're good to go. */
1006	ocfs2_inc_holders(lockres, level);
1007
1008	ret = 0;
1009unlock:
1010	spin_unlock_irqrestore(&lockres->l_lock, flags);
1011out:
1012	/*
1013	 * This is helping work around a lock inversion between the page lock
1014	 * and dlm locks.  One path holds the page lock while calling aops
1015	 * which block acquiring dlm locks.  The voting thread holds dlm
1016	 * locks while acquiring page locks while down converting data locks.
1017	 * This block is helping an aop path notice the inversion and back
1018	 * off to unlock its page lock before trying the dlm lock again.
1019	 */
1020	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1021	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1022		wait = 0;
1023		if (lockres_remove_mask_waiter(lockres, &mw))
1024			ret = -EAGAIN;
1025		else
1026			goto again;
1027	}
1028	if (wait) {
1029		ret = ocfs2_wait_for_mask(&mw);
1030		if (ret == 0)
1031			goto again;
1032		mlog_errno(ret);
1033	}
1034
1035	mlog_exit(ret);
1036	return ret;
1037}
1038
1039static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1040				 struct ocfs2_lock_res *lockres,
1041				 int level)
1042{
1043	unsigned long flags;
1044
1045	mlog_entry_void();
1046	spin_lock_irqsave(&lockres->l_lock, flags);
1047	ocfs2_dec_holders(lockres, level);
1048	ocfs2_vote_on_unlock(osb, lockres);
1049	spin_unlock_irqrestore(&lockres->l_lock, flags);
1050	mlog_exit_void();
1051}
1052
1053int ocfs2_create_new_lock(struct ocfs2_super *osb,
1054			  struct ocfs2_lock_res *lockres,
1055			  int ex,
1056			  int local)
1057{
1058	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1059	unsigned long flags;
1060	int lkm_flags = local ? LKM_LOCAL : 0;
1061
1062	spin_lock_irqsave(&lockres->l_lock, flags);
1063	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1064	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1065	spin_unlock_irqrestore(&lockres->l_lock, flags);
1066
1067	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1068}
1069
1070/* Grants us an EX lock on the data and metadata resources, skipping
1071 * the normal cluster directory lookup. Use this ONLY on newly created
1072 * inodes which other nodes can't possibly see, and which haven't been
1073 * hashed in the inode hash yet. This can give us a good performance
1074 * increase as it'll skip the network broadcast normally associated
1075 * with creating a new lock resource. */
1076int ocfs2_create_new_inode_locks(struct inode *inode)
1077{
1078	int ret;
1079	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1080
1081	BUG_ON(!inode);
1082	BUG_ON(!ocfs2_inode_is_new(inode));
1083
1084	mlog_entry_void();
1085
1086	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1087
1088	/* NOTE: That we don't increment any of the holder counts, nor
1089	 * do we add anything to a journal handle. Since this is
1090	 * supposed to be a new inode which the cluster doesn't know
1091	 * about yet, there is no need to.  As far as the LVB handling
1092	 * is concerned, this is basically like acquiring an EX lock
1093	 * on a resource which has an invalid one -- we'll set it
1094	 * valid when we release the EX. */
1095
1096	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1097	if (ret) {
1098		mlog_errno(ret);
1099		goto bail;
1100	}
1101
1102	/*
1103	 * We don't want to use LKM_LOCAL on a meta data lock as they
1104	 * don't use a generation in their lock names.
1105	 */
1106	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1107	if (ret) {
1108		mlog_errno(ret);
1109		goto bail;
1110	}
1111
1112	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1113	if (ret) {
1114		mlog_errno(ret);
1115		goto bail;
1116	}
1117
1118bail:
1119	mlog_exit(ret);
1120	return ret;
1121}
1122
1123int ocfs2_rw_lock(struct inode *inode, int write)
1124{
1125	int status, level;
1126	struct ocfs2_lock_res *lockres;
1127
1128	BUG_ON(!inode);
1129
1130	mlog_entry_void();
1131
1132	mlog(0, "inode %llu take %s RW lock\n",
1133	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1134	     write ? "EXMODE" : "PRMODE");
1135
1136	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1137
1138	level = write ? LKM_EXMODE : LKM_PRMODE;
1139
1140	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1141				    0);
1142	if (status < 0)
1143		mlog_errno(status);
1144
1145	mlog_exit(status);
1146	return status;
1147}
1148
1149void ocfs2_rw_unlock(struct inode *inode, int write)
1150{
1151	int level = write ? LKM_EXMODE : LKM_PRMODE;
1152	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1153
1154	mlog_entry_void();
1155
1156	mlog(0, "inode %llu drop %s RW lock\n",
1157	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1158	     write ? "EXMODE" : "PRMODE");
1159
1160	ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1161
1162	mlog_exit_void();
1163}
1164
1165int ocfs2_data_lock_full(struct inode *inode,
1166			 int write,
1167			 int arg_flags)
1168{
1169	int status = 0, level;
1170	struct ocfs2_lock_res *lockres;
1171
1172	BUG_ON(!inode);
1173
1174	mlog_entry_void();
1175
1176	mlog(0, "inode %llu take %s DATA lock\n",
1177	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1178	     write ? "EXMODE" : "PRMODE");
1179
1180	/* We'll allow faking a readonly data lock for
1181	 * rodevices. */
1182	if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1183		if (write) {
1184			status = -EROFS;
1185			mlog_errno(status);
1186		}
1187		goto out;
1188	}
1189
1190	lockres = &OCFS2_I(inode)->ip_data_lockres;
1191
1192	level = write ? LKM_EXMODE : LKM_PRMODE;
1193
1194	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1195				    0, arg_flags);
1196	if (status < 0 && status != -EAGAIN)
1197		mlog_errno(status);
1198
1199out:
1200	mlog_exit(status);
1201	return status;
1202}
1203
1204/* see ocfs2_meta_lock_with_page() */
1205int ocfs2_data_lock_with_page(struct inode *inode,
1206			      int write,
1207			      struct page *page)
1208{
1209	int ret;
1210
1211	ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1212	if (ret == -EAGAIN) {
1213		unlock_page(page);
1214		if (ocfs2_data_lock(inode, write) == 0)
1215			ocfs2_data_unlock(inode, write);
1216		ret = AOP_TRUNCATED_PAGE;
1217	}
1218
1219	return ret;
1220}
1221
1222static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1223				 struct ocfs2_lock_res *lockres)
1224{
1225	int kick = 0;
1226
1227	mlog_entry_void();
1228
1229	/* If we know that another node is waiting on our lock, kick
1230	 * the vote thread * pre-emptively when we reach a release
1231	 * condition. */
1232	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1233		switch(lockres->l_blocking) {
1234		case LKM_EXMODE:
1235			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1236				kick = 1;
1237			break;
1238		case LKM_PRMODE:
1239			if (!lockres->l_ex_holders)
1240				kick = 1;
1241			break;
1242		default:
1243			BUG();
1244		}
1245	}
1246
1247	if (kick)
1248		ocfs2_kick_vote_thread(osb);
1249
1250	mlog_exit_void();
1251}
1252
1253void ocfs2_data_unlock(struct inode *inode,
1254		       int write)
1255{
1256	int level = write ? LKM_EXMODE : LKM_PRMODE;
1257	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1258
1259	mlog_entry_void();
1260
1261	mlog(0, "inode %llu drop %s DATA lock\n",
1262	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1263	     write ? "EXMODE" : "PRMODE");
1264
1265	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1266		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1267
1268	mlog_exit_void();
1269}
1270
1271#define OCFS2_SEC_BITS   34
1272#define OCFS2_SEC_SHIFT  (64 - 34)
1273#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1274
1275/* LVB only has room for 64 bits of time here so we pack it for
1276 * now. */
1277static u64 ocfs2_pack_timespec(struct timespec *spec)
1278{
1279	u64 res;
1280	u64 sec = spec->tv_sec;
1281	u32 nsec = spec->tv_nsec;
1282
1283	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1284
1285	return res;
1286}
1287
1288/* Call this with the lockres locked. I am reasonably sure we don't
1289 * need ip_lock in this function as anyone who would be changing those
1290 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1291static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1292{
1293	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1294	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1295	struct ocfs2_meta_lvb *lvb;
1296
1297	mlog_entry_void();
1298
1299	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1300
1301	/*
1302	 * Invalidate the LVB of a deleted inode - this way other
1303	 * nodes are forced to go to disk and discover the new inode
1304	 * status.
1305	 */
1306	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1307		lvb->lvb_version = 0;
1308		goto out;
1309	}
1310
1311	lvb->lvb_version   = OCFS2_LVB_VERSION;
1312	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1313	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1314	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1315	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1316	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1317	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1318	lvb->lvb_iatime_packed  =
1319		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1320	lvb->lvb_ictime_packed =
1321		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1322	lvb->lvb_imtime_packed =
1323		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1324	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1325	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1326
1327out:
1328	mlog_meta_lvb(0, lockres);
1329
1330	mlog_exit_void();
1331}
1332
1333static void ocfs2_unpack_timespec(struct timespec *spec,
1334				  u64 packed_time)
1335{
1336	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1337	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1338}
1339
1340static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1341{
1342	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1343	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1344	struct ocfs2_meta_lvb *lvb;
1345
1346	mlog_entry_void();
1347
1348	mlog_meta_lvb(0, lockres);
1349
1350	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1351
1352	/* We're safe here without the lockres lock... */
1353	spin_lock(&oi->ip_lock);
1354	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1355	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1356
1357	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1358	ocfs2_set_inode_flags(inode);
1359
1360	/* fast-symlinks are a special case */
1361	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1362		inode->i_blocks = 0;
1363	else
1364		inode->i_blocks =
1365			ocfs2_align_bytes_to_sectors(i_size_read(inode));
1366
1367	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1368	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1369	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1370	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1371	ocfs2_unpack_timespec(&inode->i_atime,
1372			      be64_to_cpu(lvb->lvb_iatime_packed));
1373	ocfs2_unpack_timespec(&inode->i_mtime,
1374			      be64_to_cpu(lvb->lvb_imtime_packed));
1375	ocfs2_unpack_timespec(&inode->i_ctime,
1376			      be64_to_cpu(lvb->lvb_ictime_packed));
1377	spin_unlock(&oi->ip_lock);
1378
1379	mlog_exit_void();
1380}
1381
1382static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1383					      struct ocfs2_lock_res *lockres)
1384{
1385	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1386
1387	if (lvb->lvb_version == OCFS2_LVB_VERSION
1388	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1389		return 1;
1390	return 0;
1391}
1392
1393/* Determine whether a lock resource needs to be refreshed, and
1394 * arbitrate who gets to refresh it.
1395 *
1396 *   0 means no refresh needed.
1397 *
1398 *   > 0 means you need to refresh this and you MUST call
1399 *   ocfs2_complete_lock_res_refresh afterwards. */
1400static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1401{
1402	unsigned long flags;
1403	int status = 0;
1404
1405	mlog_entry_void();
1406
1407refresh_check:
1408	spin_lock_irqsave(&lockres->l_lock, flags);
1409	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1410		spin_unlock_irqrestore(&lockres->l_lock, flags);
1411		goto bail;
1412	}
1413
1414	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1415		spin_unlock_irqrestore(&lockres->l_lock, flags);
1416
1417		ocfs2_wait_on_refreshing_lock(lockres);
1418		goto refresh_check;
1419	}
1420
1421	/* Ok, I'll be the one to refresh this lock. */
1422	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1423	spin_unlock_irqrestore(&lockres->l_lock, flags);
1424
1425	status = 1;
1426bail:
1427	mlog_exit(status);
1428	return status;
1429}
1430
1431/* If status is non zero, I'll mark it as not being in refresh
1432 * anymroe, but i won't clear the needs refresh flag. */
1433static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1434						   int status)
1435{
1436	unsigned long flags;
1437	mlog_entry_void();
1438
1439	spin_lock_irqsave(&lockres->l_lock, flags);
1440	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1441	if (!status)
1442		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1443	spin_unlock_irqrestore(&lockres->l_lock, flags);
1444
1445	wake_up(&lockres->l_event);
1446
1447	mlog_exit_void();
1448}
1449
1450/* may or may not return a bh if it went to disk. */
1451static int ocfs2_meta_lock_update(struct inode *inode,
1452				  struct buffer_head **bh)
1453{
1454	int status = 0;
1455	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1456	struct ocfs2_lock_res *lockres;
1457	struct ocfs2_dinode *fe;
1458
1459	mlog_entry_void();
1460
1461	spin_lock(&oi->ip_lock);
1462	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1463		mlog(0, "Orphaned inode %llu was deleted while we "
1464		     "were waiting on a lock. ip_flags = 0x%x\n",
1465		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
1466		spin_unlock(&oi->ip_lock);
1467		status = -ENOENT;
1468		goto bail;
1469	}
1470	spin_unlock(&oi->ip_lock);
1471
1472	lockres = &oi->ip_meta_lockres;
1473
1474	if (!ocfs2_should_refresh_lock_res(lockres))
1475		goto bail;
1476
1477	/* This will discard any caching information we might have had
1478	 * for the inode metadata. */
1479	ocfs2_metadata_cache_purge(inode);
1480
1481	/* will do nothing for inode types that don't use the extent
1482	 * map (directories, bitmap files, etc) */
1483	ocfs2_extent_map_trunc(inode, 0);
1484
1485	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1486		mlog(0, "Trusting LVB on inode %llu\n",
1487		     (unsigned long long)oi->ip_blkno);
1488		ocfs2_refresh_inode_from_lvb(inode);
1489	} else {
1490		/* Boo, we have to go to disk. */
1491		/* read bh, cast, ocfs2_refresh_inode */
1492		status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1493					  bh, OCFS2_BH_CACHED, inode);
1494		if (status < 0) {
1495			mlog_errno(status);
1496			goto bail_refresh;
1497		}
1498		fe = (struct ocfs2_dinode *) (*bh)->b_data;
1499
1500		/* This is a good chance to make sure we're not
1501		 * locking an invalid object.
1502		 *
1503		 * We bug on a stale inode here because we checked
1504		 * above whether it was wiped from disk. The wiping
1505		 * node provides a guarantee that we receive that
1506		 * message and can mark the inode before dropping any
1507		 * locks associated with it. */
1508		if (!OCFS2_IS_VALID_DINODE(fe)) {
1509			OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1510			status = -EIO;
1511			goto bail_refresh;
1512		}
1513		mlog_bug_on_msg(inode->i_generation !=
1514				le32_to_cpu(fe->i_generation),
1515				"Invalid dinode %llu disk generation: %u "
1516				"inode->i_generation: %u\n",
1517				(unsigned long long)oi->ip_blkno,
1518				le32_to_cpu(fe->i_generation),
1519				inode->i_generation);
1520		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1521				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1522				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
1523				(unsigned long long)oi->ip_blkno,
1524				(unsigned long long)le64_to_cpu(fe->i_dtime),
1525				le32_to_cpu(fe->i_flags));
1526
1527		ocfs2_refresh_inode(inode, fe);
1528	}
1529
1530	status = 0;
1531bail_refresh:
1532	ocfs2_complete_lock_res_refresh(lockres, status);
1533bail:
1534	mlog_exit(status);
1535	return status;
1536}
1537
1538static int ocfs2_assign_bh(struct inode *inode,
1539			   struct buffer_head **ret_bh,
1540			   struct buffer_head *passed_bh)
1541{
1542	int status;
1543
1544	if (passed_bh) {
1545		/* Ok, the update went to disk for us, use the
1546		 * returned bh. */
1547		*ret_bh = passed_bh;
1548		get_bh(*ret_bh);
1549
1550		return 0;
1551	}
1552
1553	status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1554				  OCFS2_I(inode)->ip_blkno,
1555				  ret_bh,
1556				  OCFS2_BH_CACHED,
1557				  inode);
1558	if (status < 0)
1559		mlog_errno(status);
1560
1561	return status;
1562}
1563
1564/*
1565 * returns < 0 error if the callback will never be called, otherwise
1566 * the result of the lock will be communicated via the callback.
1567 */
1568int ocfs2_meta_lock_full(struct inode *inode,
1569			 struct ocfs2_journal_handle *handle,
1570			 struct buffer_head **ret_bh,
1571			 int ex,
1572			 int arg_flags)
1573{
1574	int status, level, dlm_flags, acquired;
1575	struct ocfs2_lock_res *lockres;
1576	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1577	struct buffer_head *local_bh = NULL;
1578
1579	BUG_ON(!inode);
1580
1581	mlog_entry_void();
1582
1583	mlog(0, "inode %llu, take %s META lock\n",
1584	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1585	     ex ? "EXMODE" : "PRMODE");
1586
1587	status = 0;
1588	acquired = 0;
1589	/* We'll allow faking a readonly metadata lock for
1590	 * rodevices. */
1591	if (ocfs2_is_hard_readonly(osb)) {
1592		if (ex)
1593			status = -EROFS;
1594		goto bail;
1595	}
1596
1597	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1598		wait_event(osb->recovery_event,
1599			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1600
1601	acquired = 0;
1602	lockres = &OCFS2_I(inode)->ip_meta_lockres;
1603	level = ex ? LKM_EXMODE : LKM_PRMODE;
1604	dlm_flags = 0;
1605	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1606		dlm_flags |= LKM_NOQUEUE;
1607
1608	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1609	if (status < 0) {
1610		if (status != -EAGAIN && status != -EIOCBRETRY)
1611			mlog_errno(status);
1612		goto bail;
1613	}
1614
1615	/* Notify the error cleanup path to drop the cluster lock. */
1616	acquired = 1;
1617
1618	/* We wait twice because a node may have died while we were in
1619	 * the lower dlm layers. The second time though, we've
1620	 * committed to owning this lock so we don't allow signals to
1621	 * abort the operation. */
1622	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1623		wait_event(osb->recovery_event,
1624			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1625
1626	/*
1627	 * We only see this flag if we're being called from
1628	 * ocfs2_read_locked_inode(). It means we're locking an inode
1629	 * which hasn't been populated yet, so clear the refresh flag
1630	 * and let the caller handle it.
1631	 */
1632	if (inode->i_state & I_NEW) {
1633		status = 0;
1634		ocfs2_complete_lock_res_refresh(lockres, 0);
1635		goto bail;
1636	}
1637
1638	/* This is fun. The caller may want a bh back, or it may
1639	 * not. ocfs2_meta_lock_update definitely wants one in, but
1640	 * may or may not read one, depending on what's in the
1641	 * LVB. The result of all of this is that we've *only* gone to
1642	 * disk if we have to, so the complexity is worthwhile. */
1643	status = ocfs2_meta_lock_update(inode, &local_bh);
1644	if (status < 0) {
1645		if (status != -ENOENT)
1646			mlog_errno(status);
1647		goto bail;
1648	}
1649
1650	if (ret_bh) {
1651		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1652		if (status < 0) {
1653			mlog_errno(status);
1654			goto bail;
1655		}
1656	}
1657
1658	if (handle) {
1659		status = ocfs2_handle_add_lock(handle, inode);
1660		if (status < 0)
1661			mlog_errno(status);
1662	}
1663
1664bail:
1665	if (status < 0) {
1666		if (ret_bh && (*ret_bh)) {
1667			brelse(*ret_bh);
1668			*ret_bh = NULL;
1669		}
1670		if (acquired)
1671			ocfs2_meta_unlock(inode, ex);
1672	}
1673
1674	if (local_bh)
1675		brelse(local_bh);
1676
1677	mlog_exit(status);
1678	return status;
1679}
1680
1681/*
1682 * This is working around a lock inversion between tasks acquiring DLM locks
1683 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1684 * while acquiring page locks.
1685 *
1686 * ** These _with_page variantes are only intended to be called from aop
1687 * methods that hold page locks and return a very specific *positive* error
1688 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1689 *
1690 * The DLM is called such that it returns -EAGAIN if it would have blocked
1691 * waiting for the vote thread.  In that case we unlock our page so the vote
1692 * thread can make progress.  Once we've done this we have to return
1693 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1694 * into the VFS who will then immediately retry the aop call.
1695 *
1696 * We do a blocking lock and immediate unlock before returning, though, so that
1697 * the lock has a great chance of being cached on this node by the time the VFS
1698 * calls back to retry the aop.    This has a potential to livelock as nodes
1699 * ping locks back and forth, but that's a risk we're willing to take to avoid
1700 * the lock inversion simply.
1701 */
1702int ocfs2_meta_lock_with_page(struct inode *inode,
1703			      struct ocfs2_journal_handle *handle,
1704			      struct buffer_head **ret_bh,
1705			      int ex,
1706			      struct page *page)
1707{
1708	int ret;
1709
1710	ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1711				   OCFS2_LOCK_NONBLOCK);
1712	if (ret == -EAGAIN) {
1713		unlock_page(page);
1714		if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1715			ocfs2_meta_unlock(inode, ex);
1716		ret = AOP_TRUNCATED_PAGE;
1717	}
1718
1719	return ret;
1720}
1721
1722void ocfs2_meta_unlock(struct inode *inode,
1723		       int ex)
1724{
1725	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1726	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1727
1728	mlog_entry_void();
1729
1730	mlog(0, "inode %llu drop %s META lock\n",
1731	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1732	     ex ? "EXMODE" : "PRMODE");
1733
1734	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1735		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1736
1737	mlog_exit_void();
1738}
1739
1740int ocfs2_super_lock(struct ocfs2_super *osb,
1741		     int ex)
1742{
1743	int status;
1744	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1745	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1746	struct buffer_head *bh;
1747	struct ocfs2_slot_info *si = osb->slot_info;
1748
1749	mlog_entry_void();
1750
1751	if (ocfs2_is_hard_readonly(osb))
1752		return -EROFS;
1753
1754	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1755	if (status < 0) {
1756		mlog_errno(status);
1757		goto bail;
1758	}
1759
1760	/* The super block lock path is really in the best position to
1761	 * know when resources covered by the lock need to be
1762	 * refreshed, so we do it here. Of course, making sense of
1763	 * everything is up to the caller :) */
1764	status = ocfs2_should_refresh_lock_res(lockres);
1765	if (status < 0) {
1766		mlog_errno(status);
1767		goto bail;
1768	}
1769	if (status) {
1770		bh = si->si_bh;
1771		status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1772					  si->si_inode);
1773		if (status == 0)
1774			ocfs2_update_slot_info(si);
1775
1776		ocfs2_complete_lock_res_refresh(lockres, status);
1777
1778		if (status < 0)
1779			mlog_errno(status);
1780	}
1781bail:
1782	mlog_exit(status);
1783	return status;
1784}
1785
1786void ocfs2_super_unlock(struct ocfs2_super *osb,
1787			int ex)
1788{
1789	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1790	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1791
1792	ocfs2_cluster_unlock(osb, lockres, level);
1793}
1794
1795int ocfs2_rename_lock(struct ocfs2_super *osb)
1796{
1797	int status;
1798	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1799
1800	if (ocfs2_is_hard_readonly(osb))
1801		return -EROFS;
1802
1803	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1804	if (status < 0)
1805		mlog_errno(status);
1806
1807	return status;
1808}
1809
1810void ocfs2_rename_unlock(struct ocfs2_super *osb)
1811{
1812	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1813
1814	ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1815}
1816
1817int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1818{
1819	int ret;
1820	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1821	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1822	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1823
1824	BUG_ON(!dl);
1825
1826	if (ocfs2_is_hard_readonly(osb))
1827		return -EROFS;
1828
1829	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1830	if (ret < 0)
1831		mlog_errno(ret);
1832
1833	return ret;
1834}
1835
1836void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1837{
1838	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1839	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1840	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1841
1842	ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1843}
1844
1845/* Reference counting of the dlm debug structure. We want this because
1846 * open references on the debug inodes can live on after a mount, so
1847 * we can't rely on the ocfs2_super to always exist. */
1848static void ocfs2_dlm_debug_free(struct kref *kref)
1849{
1850	struct ocfs2_dlm_debug *dlm_debug;
1851
1852	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1853
1854	kfree(dlm_debug);
1855}
1856
1857void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1858{
1859	if (dlm_debug)
1860		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1861}
1862
1863static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1864{
1865	kref_get(&debug->d_refcnt);
1866}
1867
1868struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1869{
1870	struct ocfs2_dlm_debug *dlm_debug;
1871
1872	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1873	if (!dlm_debug) {
1874		mlog_errno(-ENOMEM);
1875		goto out;
1876	}
1877
1878	kref_init(&dlm_debug->d_refcnt);
1879	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1880	dlm_debug->d_locking_state = NULL;
1881out:
1882	return dlm_debug;
1883}
1884
1885/* Access to this is arbitrated for us via seq_file->sem. */
1886struct ocfs2_dlm_seq_priv {
1887	struct ocfs2_dlm_debug *p_dlm_debug;
1888	struct ocfs2_lock_res p_iter_res;
1889	struct ocfs2_lock_res p_tmp_res;
1890};
1891
1892static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1893						 struct ocfs2_dlm_seq_priv *priv)
1894{
1895	struct ocfs2_lock_res *iter, *ret = NULL;
1896	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1897
1898	assert_spin_locked(&ocfs2_dlm_tracking_lock);
1899
1900	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1901		/* discover the head of the list */
1902		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1903			mlog(0, "End of list found, %p\n", ret);
1904			break;
1905		}
1906
1907		/* We track our "dummy" iteration lockres' by a NULL
1908		 * l_ops field. */
1909		if (iter->l_ops != NULL) {
1910			ret = iter;
1911			break;
1912		}
1913	}
1914
1915	return ret;
1916}
1917
1918static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1919{
1920	struct ocfs2_dlm_seq_priv *priv = m->private;
1921	struct ocfs2_lock_res *iter;
1922
1923	spin_lock(&ocfs2_dlm_tracking_lock);
1924	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
1925	if (iter) {
1926		/* Since lockres' have the lifetime of their container
1927		 * (which can be inodes, ocfs2_supers, etc) we want to
1928		 * copy this out to a temporary lockres while still
1929		 * under the spinlock. Obviously after this we can't
1930		 * trust any pointers on the copy returned, but that's
1931		 * ok as the information we want isn't typically held
1932		 * in them. */
1933		priv->p_tmp_res = *iter;
1934		iter = &priv->p_tmp_res;
1935	}
1936	spin_unlock(&ocfs2_dlm_tracking_lock);
1937
1938	return iter;
1939}
1940
1941static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
1942{
1943}
1944
1945static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
1946{
1947	struct ocfs2_dlm_seq_priv *priv = m->private;
1948	struct ocfs2_lock_res *iter = v;
1949	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
1950
1951	spin_lock(&ocfs2_dlm_tracking_lock);
1952	iter = ocfs2_dlm_next_res(iter, priv);
1953	list_del_init(&dummy->l_debug_list);
1954	if (iter) {
1955		list_add(&dummy->l_debug_list, &iter->l_debug_list);
1956		priv->p_tmp_res = *iter;
1957		iter = &priv->p_tmp_res;
1958	}
1959	spin_unlock(&ocfs2_dlm_tracking_lock);
1960
1961	return iter;
1962}
1963
1964/* So that debugfs.ocfs2 can determine which format is being used */
1965#define OCFS2_DLM_DEBUG_STR_VERSION 1
1966static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
1967{
1968	int i;
1969	char *lvb;
1970	struct ocfs2_lock_res *lockres = v;
1971
1972	if (!lockres)
1973		return -EINVAL;
1974
1975	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
1976
1977	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
1978		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
1979			   lockres->l_name,
1980			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
1981	else
1982		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
1983
1984	seq_printf(m, "%d\t"
1985		   "0x%lx\t"
1986		   "0x%x\t"
1987		   "0x%x\t"
1988		   "%u\t"
1989		   "%u\t"
1990		   "%d\t"
1991		   "%d\t",
1992		   lockres->l_level,
1993		   lockres->l_flags,
1994		   lockres->l_action,
1995		   lockres->l_unlock_action,
1996		   lockres->l_ro_holders,
1997		   lockres->l_ex_holders,
1998		   lockres->l_requested,
1999		   lockres->l_blocking);
2000
2001	/* Dump the raw LVB */
2002	lvb = lockres->l_lksb.lvb;
2003	for(i = 0; i < DLM_LVB_LEN; i++)
2004		seq_printf(m, "0x%x\t", lvb[i]);
2005
2006	/* End the line */
2007	seq_printf(m, "\n");
2008	return 0;
2009}
2010
2011static struct seq_operations ocfs2_dlm_seq_ops = {
2012	.start =	ocfs2_dlm_seq_start,
2013	.stop =		ocfs2_dlm_seq_stop,
2014	.next =		ocfs2_dlm_seq_next,
2015	.show =		ocfs2_dlm_seq_show,
2016};
2017
2018static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2019{
2020	struct seq_file *seq = (struct seq_file *) file->private_data;
2021	struct ocfs2_dlm_seq_priv *priv = seq->private;
2022	struct ocfs2_lock_res *res = &priv->p_iter_res;
2023
2024	ocfs2_remove_lockres_tracking(res);
2025	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2026	return seq_release_private(inode, file);
2027}
2028
2029static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2030{
2031	int ret;
2032	struct ocfs2_dlm_seq_priv *priv;
2033	struct seq_file *seq;
2034	struct ocfs2_super *osb;
2035
2036	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2037	if (!priv) {
2038		ret = -ENOMEM;
2039		mlog_errno(ret);
2040		goto out;
2041	}
2042	osb = (struct ocfs2_super *) inode->u.generic_ip;
2043	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2044	priv->p_dlm_debug = osb->osb_dlm_debug;
2045	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2046
2047	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2048	if (ret) {
2049		kfree(priv);
2050		mlog_errno(ret);
2051		goto out;
2052	}
2053
2054	seq = (struct seq_file *) file->private_data;
2055	seq->private = priv;
2056
2057	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2058				   priv->p_dlm_debug);
2059
2060out:
2061	return ret;
2062}
2063
2064static const struct file_operations ocfs2_dlm_debug_fops = {
2065	.open =		ocfs2_dlm_debug_open,
2066	.release =	ocfs2_dlm_debug_release,
2067	.read =		seq_read,
2068	.llseek =	seq_lseek,
2069};
2070
2071static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2072{
2073	int ret = 0;
2074	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2075
2076	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2077							 S_IFREG|S_IRUSR,
2078							 osb->osb_debug_root,
2079							 osb,
2080							 &ocfs2_dlm_debug_fops);
2081	if (!dlm_debug->d_locking_state) {
2082		ret = -EINVAL;
2083		mlog(ML_ERROR,
2084		     "Unable to create locking state debugfs file.\n");
2085		goto out;
2086	}
2087
2088	ocfs2_get_dlm_debug(dlm_debug);
2089out:
2090	return ret;
2091}
2092
2093static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2094{
2095	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2096
2097	if (dlm_debug) {
2098		debugfs_remove(dlm_debug->d_locking_state);
2099		ocfs2_put_dlm_debug(dlm_debug);
2100	}
2101}
2102
2103int ocfs2_dlm_init(struct ocfs2_super *osb)
2104{
2105	int status;
2106	u32 dlm_key;
2107	struct dlm_ctxt *dlm;
2108
2109	mlog_entry_void();
2110
2111	status = ocfs2_dlm_init_debug(osb);
2112	if (status < 0) {
2113		mlog_errno(status);
2114		goto bail;
2115	}
2116
2117	/* launch vote thread */
2118	osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2119	if (IS_ERR(osb->vote_task)) {
2120		status = PTR_ERR(osb->vote_task);
2121		osb->vote_task = NULL;
2122		mlog_errno(status);
2123		goto bail;
2124	}
2125
2126	/* used by the dlm code to make message headers unique, each
2127	 * node in this domain must agree on this. */
2128	dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2129
2130	/* for now, uuid == domain */
2131	dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2132	if (IS_ERR(dlm)) {
2133		status = PTR_ERR(dlm);
2134		mlog_errno(status);
2135		goto bail;
2136	}
2137
2138	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2139	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2140
2141	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2142
2143	osb->dlm = dlm;
2144
2145	status = 0;
2146bail:
2147	if (status < 0) {
2148		ocfs2_dlm_shutdown_debug(osb);
2149		if (osb->vote_task)
2150			kthread_stop(osb->vote_task);
2151	}
2152
2153	mlog_exit(status);
2154	return status;
2155}
2156
2157void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2158{
2159	mlog_entry_void();
2160
2161	dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2162
2163	ocfs2_drop_osb_locks(osb);
2164
2165	if (osb->vote_task) {
2166		kthread_stop(osb->vote_task);
2167		osb->vote_task = NULL;
2168	}
2169
2170	ocfs2_lock_res_free(&osb->osb_super_lockres);
2171	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2172
2173	dlm_unregister_domain(osb->dlm);
2174	osb->dlm = NULL;
2175
2176	ocfs2_dlm_shutdown_debug(osb);
2177
2178	mlog_exit_void();
2179}
2180
2181static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2182{
2183	struct ocfs2_lock_res *lockres = opaque;
2184	unsigned long flags;
2185
2186	mlog_entry_void();
2187
2188	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2189	     lockres->l_unlock_action);
2190
2191	spin_lock_irqsave(&lockres->l_lock, flags);
2192	/* We tried to cancel a convert request, but it was already
2193	 * granted. All we want to do here is clear our unlock
2194	 * state. The wake_up call done at the bottom is redundant
2195	 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2196	 * hurt anything anyway */
2197	if (status == DLM_CANCELGRANT &&
2198	    lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2199		mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2200
2201		/* We don't clear the busy flag in this case as it
2202		 * should have been cleared by the ast which the dlm
2203		 * has called. */
2204		goto complete_unlock;
2205	}
2206
2207	if (status != DLM_NORMAL) {
2208		mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2209		     "unlock_action %d\n", status, lockres->l_name,
2210		     lockres->l_unlock_action);
2211		spin_unlock_irqrestore(&lockres->l_lock, flags);
2212		return;
2213	}
2214
2215	switch(lockres->l_unlock_action) {
2216	case OCFS2_UNLOCK_CANCEL_CONVERT:
2217		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2218		lockres->l_action = OCFS2_AST_INVALID;
2219		break;
2220	case OCFS2_UNLOCK_DROP_LOCK:
2221		lockres->l_level = LKM_IVMODE;
2222		break;
2223	default:
2224		BUG();
2225	}
2226
2227	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2228complete_unlock:
2229	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2230	spin_unlock_irqrestore(&lockres->l_lock, flags);
2231
2232	wake_up(&lockres->l_event);
2233
2234	mlog_exit_void();
2235}
2236
2237typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2238
2239struct drop_lock_cb {
2240	ocfs2_pre_drop_cb_t	*drop_func;
2241	void			*drop_data;
2242};
2243
2244static int ocfs2_drop_lock(struct ocfs2_super *osb,
2245			   struct ocfs2_lock_res *lockres,
2246			   struct drop_lock_cb *dcb)
2247{
2248	enum dlm_status status;
2249	unsigned long flags;
2250	int lkm_flags = 0;
2251
2252	/* We didn't get anywhere near actually using this lockres. */
2253	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2254		goto out;
2255
2256	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2257		lkm_flags |= LKM_VALBLK;
2258
2259	spin_lock_irqsave(&lockres->l_lock, flags);
2260
2261	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2262			"lockres %s, flags 0x%lx\n",
2263			lockres->l_name, lockres->l_flags);
2264
2265	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2266		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2267		     "%u, unlock_action = %u\n",
2268		     lockres->l_name, lockres->l_flags, lockres->l_action,
2269		     lockres->l_unlock_action);
2270
2271		spin_unlock_irqrestore(&lockres->l_lock, flags);
2272
2273		/* XXX: Today we just wait on any busy
2274		 * locks... Perhaps we need to cancel converts in the
2275		 * future? */
2276		ocfs2_wait_on_busy_lock(lockres);
2277
2278		spin_lock_irqsave(&lockres->l_lock, flags);
2279	}
2280
2281	if (dcb)
2282		dcb->drop_func(lockres, dcb->drop_data);
2283
2284	if (lockres->l_flags & OCFS2_LOCK_BUSY)
2285		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2286		     lockres->l_name);
2287	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2288		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2289
2290	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2291		spin_unlock_irqrestore(&lockres->l_lock, flags);
2292		goto out;
2293	}
2294
2295	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2296
2297	/* make sure we never get here while waiting for an ast to
2298	 * fire. */
2299	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2300
2301	/* is this necessary? */
2302	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2303	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2304	spin_unlock_irqrestore(&lockres->l_lock, flags);
2305
2306	mlog(0, "lock %s\n", lockres->l_name);
2307
2308	status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2309			   ocfs2_unlock_ast, lockres);
2310	if (status != DLM_NORMAL) {
2311		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2312		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2313		dlm_print_one_lock(lockres->l_lksb.lockid);
2314		BUG();
2315	}
2316	mlog(0, "lock %s, successfull return from dlmunlock\n",
2317	     lockres->l_name);
2318
2319	ocfs2_wait_on_busy_lock(lockres);
2320out:
2321	mlog_exit(0);
2322	return 0;
2323}
2324
2325/* Mark the lockres as being dropped. It will no longer be
2326 * queued if blocking, but we still may have to wait on it
2327 * being dequeued from the vote thread before we can consider
2328 * it safe to drop.
2329 *
2330 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2331void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2332{
2333	int status;
2334	struct ocfs2_mask_waiter mw;
2335	unsigned long flags;
2336
2337	ocfs2_init_mask_waiter(&mw);
2338
2339	spin_lock_irqsave(&lockres->l_lock, flags);
2340	lockres->l_flags |= OCFS2_LOCK_FREEING;
2341	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2342		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2343		spin_unlock_irqrestore(&lockres->l_lock, flags);
2344
2345		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2346
2347		status = ocfs2_wait_for_mask(&mw);
2348		if (status)
2349			mlog_errno(status);
2350
2351		spin_lock_irqsave(&lockres->l_lock, flags);
2352	}
2353	spin_unlock_irqrestore(&lockres->l_lock, flags);
2354}
2355
2356void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2357			       struct ocfs2_lock_res *lockres)
2358{
2359	int ret;
2360
2361	ocfs2_mark_lockres_freeing(lockres);
2362	ret = ocfs2_drop_lock(osb, lockres, NULL);
2363	if (ret)
2364		mlog_errno(ret);
2365}
2366
2367static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2368{
2369	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2370	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2371}
2372
2373static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2374{
2375	struct inode *inode = data;
2376
2377	/* the metadata lock requires a bit more work as we have an
2378	 * LVB to worry about. */
2379	if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2380	    lockres->l_level == LKM_EXMODE &&
2381	    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2382		__ocfs2_stuff_meta_lvb(inode);
2383}
2384
2385int ocfs2_drop_inode_locks(struct inode *inode)
2386{
2387	int status, err;
2388	struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2389
2390	mlog_entry_void();
2391
2392	/* No need to call ocfs2_mark_lockres_freeing here -
2393	 * ocfs2_clear_inode has done it for us. */
2394
2395	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2396			      &OCFS2_I(inode)->ip_data_lockres,
2397			      NULL);
2398	if (err < 0)
2399		mlog_errno(err);
2400
2401	status = err;
2402
2403	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2404			      &OCFS2_I(inode)->ip_meta_lockres,
2405			      &meta_dcb);
2406	if (err < 0)
2407		mlog_errno(err);
2408	if (err < 0 && !status)
2409		status = err;
2410
2411	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2412			      &OCFS2_I(inode)->ip_rw_lockres,
2413			      NULL);
2414	if (err < 0)
2415		mlog_errno(err);
2416	if (err < 0 && !status)
2417		status = err;
2418
2419	mlog_exit(status);
2420	return status;
2421}
2422
2423static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2424				      int new_level)
2425{
2426	assert_spin_locked(&lockres->l_lock);
2427
2428	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2429
2430	if (lockres->l_level <= new_level) {
2431		mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2432		     lockres->l_level, new_level);
2433		BUG();
2434	}
2435
2436	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2437	     lockres->l_name, new_level, lockres->l_blocking);
2438
2439	lockres->l_action = OCFS2_AST_DOWNCONVERT;
2440	lockres->l_requested = new_level;
2441	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2442}
2443
2444static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2445				  struct ocfs2_lock_res *lockres,
2446				  int new_level,
2447				  int lvb)
2448{
2449	int ret, dlm_flags = LKM_CONVERT;
2450	enum dlm_status status;
2451
2452	mlog_entry_void();
2453
2454	if (lvb)
2455		dlm_flags |= LKM_VALBLK;
2456
2457	status = dlmlock(osb->dlm,
2458			 new_level,
2459			 &lockres->l_lksb,
2460			 dlm_flags,
2461			 lockres->l_name,
2462			 OCFS2_LOCK_ID_MAX_LEN - 1,
2463			 ocfs2_locking_ast,
2464			 lockres,
2465			 ocfs2_blocking_ast);
2466	if (status != DLM_NORMAL) {
2467		ocfs2_log_dlm_error("dlmlock", status, lockres);
2468		ret = -EINVAL;
2469		ocfs2_recover_from_dlm_error(lockres, 1);
2470		goto bail;
2471	}
2472
2473	ret = 0;
2474bail:
2475	mlog_exit(ret);
2476	return ret;
2477}
2478
2479/* returns 1 when the caller should unlock and call dlmunlock */
2480static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2481				        struct ocfs2_lock_res *lockres)
2482{
2483	assert_spin_locked(&lockres->l_lock);
2484
2485	mlog_entry_void();
2486	mlog(0, "lock %s\n", lockres->l_name);
2487
2488	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2489		/* If we're already trying to cancel a lock conversion
2490		 * then just drop the spinlock and allow the caller to
2491		 * requeue this lock. */
2492
2493		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2494		return 0;
2495	}
2496
2497	/* were we in a convert when we got the bast fire? */
2498	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2499	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
2500	/* set things up for the unlockast to know to just
2501	 * clear out the ast_action and unset busy, etc. */
2502	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2503
2504	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2505			"lock %s, invalid flags: 0x%lx\n",
2506			lockres->l_name, lockres->l_flags);
2507
2508	return 1;
2509}
2510
2511static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2512				struct ocfs2_lock_res *lockres)
2513{
2514	int ret;
2515	enum dlm_status status;
2516
2517	mlog_entry_void();
2518	mlog(0, "lock %s\n", lockres->l_name);
2519
2520	ret = 0;
2521	status = dlmunlock(osb->dlm,
2522			   &lockres->l_lksb,
2523			   LKM_CANCEL,
2524			   ocfs2_unlock_ast,
2525			   lockres);
2526	if (status != DLM_NORMAL) {
2527		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2528		ret = -EINVAL;
2529		ocfs2_recover_from_dlm_error(lockres, 0);
2530	}
2531
2532	mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2533
2534	mlog_exit(ret);
2535	return ret;
2536}
2537
2538static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2539				      struct ocfs2_lock_res *lockres,
2540				      struct ocfs2_unblock_ctl *ctl,
2541				      ocfs2_convert_worker_t *worker)
2542{
2543	unsigned long flags;
2544	int blocking;
2545	int new_level;
2546	int ret = 0;
2547	int set_lvb = 0;
2548
2549	mlog_entry_void();
2550
2551	spin_lock_irqsave(&lockres->l_lock, flags);
2552
2553	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2554
2555recheck:
2556	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2557		ctl->requeue = 1;
2558		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2559		spin_unlock_irqrestore(&lockres->l_lock, flags);
2560		if (ret) {
2561			ret = ocfs2_cancel_convert(osb, lockres);
2562			if (ret < 0)
2563				mlog_errno(ret);
2564		}
2565		goto leave;
2566	}
2567
2568	/* if we're blocking an exclusive and we have *any* holders,
2569	 * then requeue. */
2570	if ((lockres->l_blocking == LKM_EXMODE)
2571	    && (lockres->l_ex_holders || lockres->l_ro_holders))
2572		goto leave_requeue;
2573
2574	/* If it's a PR we're blocking, then only
2575	 * requeue if we've got any EX holders */
2576	if (lockres->l_blocking == LKM_PRMODE &&
2577	    lockres->l_ex_holders)
2578		goto leave_requeue;
2579
2580	/*
2581	 * Can we get a lock in this state if the holder counts are
2582	 * zero? The meta data unblock code used to check this.
2583	 */
2584	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2585	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2586		goto leave_requeue;
2587
2588	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2589
2590	if (lockres->l_ops->check_downconvert
2591	    && !lockres->l_ops->check_downconvert(lockres, new_level))
2592		goto leave_requeue;
2593
2594	/* If we get here, then we know that there are no more
2595	 * incompatible holders (and anyone asking for an incompatible
2596	 * lock is blocked). We can now downconvert the lock */
2597	if (!worker)
2598		goto downconvert;
2599
2600	/* Some lockres types want to do a bit of work before
2601	 * downconverting a lock. Allow that here. The worker function
2602	 * may sleep, so we save off a copy of what we're blocking as
2603	 * it may change while we're not holding the spin lock. */
2604	blocking = lockres->l_blocking;
2605	spin_unlock_irqrestore(&lockres->l_lock, flags);
2606
2607	ctl->unblock_action = worker(lockres, blocking);
2608
2609	if (ctl->unblock_action == UNBLOCK_STOP_POST)
2610		goto leave;
2611
2612	spin_lock_irqsave(&lockres->l_lock, flags);
2613	if (blocking != lockres->l_blocking) {
2614		/* If this changed underneath us, then we can't drop
2615		 * it just yet. */
2616		goto recheck;
2617	}
2618
2619downconvert:
2620	ctl->requeue = 0;
2621
2622	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2623		if (lockres->l_level == LKM_EXMODE)
2624			set_lvb = 1;
2625
2626		/*
2627		 * We only set the lvb if the lock has been fully
2628		 * refreshed - otherwise we risk setting stale
2629		 * data. Otherwise, there's no need to actually clear
2630		 * out the lvb here as it's value is still valid.
2631		 */
2632		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2633			lockres->l_ops->set_lvb(lockres);
2634	}
2635
2636	ocfs2_prepare_downconvert(lockres, new_level);
2637	spin_unlock_irqrestore(&lockres->l_lock, flags);
2638	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2639leave:
2640	mlog_exit(ret);
2641	return ret;
2642
2643leave_requeue:
2644	spin_unlock_irqrestore(&lockres->l_lock, flags);
2645	ctl->requeue = 1;
2646
2647	mlog_exit(0);
2648	return 0;
2649}
2650
2651static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2652				     int blocking)
2653{
2654	struct inode *inode;
2655	struct address_space *mapping;
2656
2657       	inode = ocfs2_lock_res_inode(lockres);
2658	mapping = inode->i_mapping;
2659
2660	if (filemap_fdatawrite(mapping)) {
2661		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2662		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2663	}
2664	sync_mapping_buffers(mapping);
2665	if (blocking == LKM_EXMODE) {
2666		truncate_inode_pages(mapping, 0);
2667		unmap_mapping_range(mapping, 0, 0, 0);
2668	} else {
2669		/* We only need to wait on the I/O if we're not also
2670		 * truncating pages because truncate_inode_pages waits
2671		 * for us above. We don't truncate pages if we're
2672		 * blocking anything < EXMODE because we want to keep
2673		 * them around in that case. */
2674		filemap_fdatawait(mapping);
2675	}
2676
2677	return UNBLOCK_CONTINUE;
2678}
2679
2680int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2681		       struct ocfs2_unblock_ctl *ctl)
2682{
2683	int status;
2684	struct inode *inode;
2685	struct ocfs2_super *osb;
2686
2687	mlog_entry_void();
2688
2689	inode = ocfs2_lock_res_inode(lockres);
2690	osb = OCFS2_SB(inode->i_sb);
2691
2692	mlog(0, "unblock inode %llu\n",
2693	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2694
2695	status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
2696					    ocfs2_data_convert_worker);
2697	if (status < 0)
2698		mlog_errno(status);
2699
2700	mlog(0, "inode %llu, requeue = %d\n",
2701	     (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2702
2703	mlog_exit(status);
2704	return status;
2705}
2706
2707static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2708				    struct ocfs2_unblock_ctl *ctl)
2709{
2710	int status;
2711	struct inode *inode;
2712
2713	mlog_entry_void();
2714
2715	mlog(0, "Unblock lockres %s\n", lockres->l_name);
2716
2717	inode  = ocfs2_lock_res_inode(lockres);
2718
2719	status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2720					    lockres, ctl, NULL);
2721	if (status < 0)
2722		mlog_errno(status);
2723
2724	mlog_exit(status);
2725	return status;
2726}
2727
2728static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
2729					int new_level)
2730{
2731	struct inode *inode = ocfs2_lock_res_inode(lockres);
2732	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
2733
2734	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2735	BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
2736
2737	if (checkpointed)
2738		return 1;
2739
2740	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
2741	return 0;
2742}
2743
2744static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2745{
2746	struct inode *inode = ocfs2_lock_res_inode(lockres);
2747
2748	__ocfs2_stuff_meta_lvb(inode);
2749}
2750
2751static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2752			      struct ocfs2_unblock_ctl *ctl)
2753{
2754	int status;
2755	struct inode *inode;
2756
2757	mlog_entry_void();
2758
2759       	inode = ocfs2_lock_res_inode(lockres);
2760
2761	mlog(0, "unblock inode %llu\n",
2762	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2763
2764	status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2765					    lockres, ctl, NULL);
2766	if (status < 0)
2767		mlog_errno(status);
2768
2769	mlog(0, "inode %llu, requeue = %d\n",
2770	     (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2771
2772	mlog_exit(status);
2773	return status;
2774}
2775
2776/*
2777 * Does the final reference drop on our dentry lock. Right now this
2778 * happens in the vote thread, but we could choose to simplify the
2779 * dlmglue API and push these off to the ocfs2_wq in the future.
2780 */
2781static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2782				     struct ocfs2_lock_res *lockres)
2783{
2784	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2785	ocfs2_dentry_lock_put(osb, dl);
2786}
2787
2788/*
2789 * d_delete() matching dentries before the lock downconvert.
2790 *
2791 * At this point, any process waiting to destroy the
2792 * dentry_lock due to last ref count is stopped by the
2793 * OCFS2_LOCK_QUEUED flag.
2794 *
2795 * We have two potential problems
2796 *
2797 * 1) If we do the last reference drop on our dentry_lock (via dput)
2798 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2799 *    the downconvert to finish. Instead we take an elevated
2800 *    reference and push the drop until after we've completed our
2801 *    unblock processing.
2802 *
2803 * 2) There might be another process with a final reference,
2804 *    waiting on us to finish processing. If this is the case, we
2805 *    detect it and exit out - there's no more dentries anyway.
2806 */
2807static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2808				       int blocking)
2809{
2810	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2811	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2812	struct dentry *dentry;
2813	unsigned long flags;
2814	int extra_ref = 0;
2815
2816	/*
2817	 * This node is blocking another node from getting a read
2818	 * lock. This happens when we've renamed within a
2819	 * directory. We've forced the other nodes to d_delete(), but
2820	 * we never actually dropped our lock because it's still
2821	 * valid. The downconvert code will retain a PR for this node,
2822	 * so there's no further work to do.
2823	 */
2824	if (blocking == LKM_PRMODE)
2825		return UNBLOCK_CONTINUE;
2826
2827	/*
2828	 * Mark this inode as potentially orphaned. The code in
2829	 * ocfs2_delete_inode() will figure out whether it actually
2830	 * needs to be freed or not.
2831	 */
2832	spin_lock(&oi->ip_lock);
2833	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2834	spin_unlock(&oi->ip_lock);
2835
2836	/*
2837	 * Yuck. We need to make sure however that the check of
2838	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
2839	 * respect to a reference decrement or the setting of that
2840	 * flag.
2841	 */
2842	spin_lock_irqsave(&lockres->l_lock, flags);
2843	spin_lock(&dentry_attach_lock);
2844	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2845	    && dl->dl_count) {
2846		dl->dl_count++;
2847		extra_ref = 1;
2848	}
2849	spin_unlock(&dentry_attach_lock);
2850	spin_unlock_irqrestore(&lockres->l_lock, flags);
2851
2852	mlog(0, "extra_ref = %d\n", extra_ref);
2853
2854	/*
2855	 * We have a process waiting on us in ocfs2_dentry_iput(),
2856	 * which means we can't have any more outstanding
2857	 * aliases. There's no need to do any more work.
2858	 */
2859	if (!extra_ref)
2860		return UNBLOCK_CONTINUE;
2861
2862	spin_lock(&dentry_attach_lock);
2863	while (1) {
2864		dentry = ocfs2_find_local_alias(dl->dl_inode,
2865						dl->dl_parent_blkno, 1);
2866		if (!dentry)
2867			break;
2868		spin_unlock(&dentry_attach_lock);
2869
2870		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2871		     dentry->d_name.name);
2872
2873		/*
2874		 * The following dcache calls may do an
2875		 * iput(). Normally we don't want that from the
2876		 * downconverting thread, but in this case it's ok
2877		 * because the requesting node already has an
2878		 * exclusive lock on the inode, so it can't be queued
2879		 * for a downconvert.
2880		 */
2881		d_delete(dentry);
2882		dput(dentry);
2883
2884		spin_lock(&dentry_attach_lock);
2885	}
2886	spin_unlock(&dentry_attach_lock);
2887
2888	/*
2889	 * If we are the last holder of this dentry lock, there is no
2890	 * reason to downconvert so skip straight to the unlock.
2891	 */
2892	if (dl->dl_count == 1)
2893		return UNBLOCK_STOP_POST;
2894
2895	return UNBLOCK_CONTINUE_POST;
2896}
2897
2898static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
2899				     struct ocfs2_unblock_ctl *ctl)
2900{
2901	int ret;
2902	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2903	struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
2904
2905	mlog(0, "unblock dentry lock: %llu\n",
2906	     (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
2907
2908	ret = ocfs2_generic_unblock_lock(osb,
2909					 lockres,
2910					 ctl,
2911					 ocfs2_dentry_convert_worker);
2912	if (ret < 0)
2913		mlog_errno(ret);
2914
2915	mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
2916
2917	return ret;
2918}
2919
2920/* Generic unblock function for any lockres whose private data is an
2921 * ocfs2_super pointer. */
2922static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
2923				  struct ocfs2_unblock_ctl *ctl)
2924{
2925	int status;
2926	struct ocfs2_super *osb;
2927
2928	mlog_entry_void();
2929
2930	mlog(0, "Unblock lockres %s\n", lockres->l_name);
2931
2932	osb = ocfs2_get_lockres_osb(lockres);
2933
2934	status = ocfs2_generic_unblock_lock(osb,
2935					    lockres,
2936					    ctl,
2937					    NULL);
2938	if (status < 0)
2939		mlog_errno(status);
2940
2941	mlog_exit(status);
2942	return status;
2943}
2944
2945void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
2946				struct ocfs2_lock_res *lockres)
2947{
2948	int status;
2949	struct ocfs2_unblock_ctl ctl = {0, 0,};
2950	unsigned long flags;
2951
2952	/* Our reference to the lockres in this function can be
2953	 * considered valid until we remove the OCFS2_LOCK_QUEUED
2954	 * flag. */
2955
2956	mlog_entry_void();
2957
2958	BUG_ON(!lockres);
2959	BUG_ON(!lockres->l_ops);
2960	BUG_ON(!lockres->l_ops->unblock);
2961
2962	mlog(0, "lockres %s blocked.\n", lockres->l_name);
2963
2964	/* Detect whether a lock has been marked as going away while
2965	 * the vote thread was processing other things. A lock can
2966	 * still be marked with OCFS2_LOCK_FREEING after this check,
2967	 * but short circuiting here will still save us some
2968	 * performance. */
2969	spin_lock_irqsave(&lockres->l_lock, flags);
2970	if (lockres->l_flags & OCFS2_LOCK_FREEING)
2971		goto unqueue;
2972	spin_unlock_irqrestore(&lockres->l_lock, flags);
2973
2974	status = lockres->l_ops->unblock(lockres, &ctl);
2975	if (status < 0)
2976		mlog_errno(status);
2977
2978	spin_lock_irqsave(&lockres->l_lock, flags);
2979unqueue:
2980	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
2981		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
2982	} else
2983		ocfs2_schedule_blocked_lock(osb, lockres);
2984
2985	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
2986	     ctl.requeue ? "yes" : "no");
2987	spin_unlock_irqrestore(&lockres->l_lock, flags);
2988
2989	if (ctl.unblock_action != UNBLOCK_CONTINUE
2990	    && lockres->l_ops->post_unlock)
2991		lockres->l_ops->post_unlock(osb, lockres);
2992
2993	mlog_exit_void();
2994}
2995
2996static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
2997					struct ocfs2_lock_res *lockres)
2998{
2999	mlog_entry_void();
3000
3001	assert_spin_locked(&lockres->l_lock);
3002
3003	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3004		/* Do not schedule a lock for downconvert when it's on
3005		 * the way to destruction - any nodes wanting access
3006		 * to the resource will get it soon. */
3007		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3008		     lockres->l_name, lockres->l_flags);
3009		return;
3010	}
3011
3012	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3013
3014	spin_lock(&osb->vote_task_lock);
3015	if (list_empty(&lockres->l_blocked_list)) {
3016		list_add_tail(&lockres->l_blocked_list,
3017			      &osb->blocked_lock_list);
3018		osb->blocked_lock_count++;
3019	}
3020	spin_unlock(&osb->vote_task_lock);
3021
3022	mlog_exit_void();
3023}
3024
3025/* This aids in debugging situations where a bad LVB might be involved. */
3026void ocfs2_dump_meta_lvb_info(u64 level,
3027			      const char *function,
3028			      unsigned int line,
3029			      struct ocfs2_lock_res *lockres)
3030{
3031	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3032
3033	mlog(level, "LVB information for %s (called from %s:%u):\n",
3034	     lockres->l_name, function, line);
3035	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3036	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3037	     be32_to_cpu(lvb->lvb_igeneration));
3038	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3039	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3040	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3041	     be16_to_cpu(lvb->lvb_imode));
3042	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3043	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3044	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3045	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3046	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3047	     be32_to_cpu(lvb->lvb_iattr));
3048}
3049