dlmglue.c revision 810d5aeba18825c754cf47db59eb83814a54bb27
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/smp_lock.h>
31#include <linux/crc32.h>
32#include <linux/kthread.h>
33#include <linux/pagemap.h>
34#include <linux/debugfs.h>
35#include <linux/seq_file.h>
36
37#include <cluster/heartbeat.h>
38#include <cluster/nodemanager.h>
39#include <cluster/tcp.h>
40
41#include <dlm/dlmapi.h>
42
43#define MLOG_MASK_PREFIX ML_DLM_GLUE
44#include <cluster/masklog.h>
45
46#include "ocfs2.h"
47
48#include "alloc.h"
49#include "dcache.h"
50#include "dlmglue.h"
51#include "extent_map.h"
52#include "heartbeat.h"
53#include "inode.h"
54#include "journal.h"
55#include "slot_map.h"
56#include "super.h"
57#include "uptodate.h"
58#include "vote.h"
59
60#include "buffer_head_io.h"
61
62struct ocfs2_mask_waiter {
63	struct list_head	mw_item;
64	int			mw_status;
65	struct completion	mw_complete;
66	unsigned long		mw_mask;
67	unsigned long		mw_goal;
68};
69
70static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
72
73/*
74 * Return value from ocfs2_convert_worker_t functions.
75 *
76 * These control the precise actions of ocfs2_generic_unblock_lock()
77 * and ocfs2_process_blocked_lock()
78 *
79 */
80enum ocfs2_unblock_action {
81	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
82	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
83				      * ->post_unlock callback */
84	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
85				      * ->post_unlock() callback. */
86};
87
88struct ocfs2_unblock_ctl {
89	int requeue;
90	enum ocfs2_unblock_action unblock_action;
91};
92
93static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
94			      struct ocfs2_unblock_ctl *ctl);
95static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
96					int new_level);
97static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
98
99static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
100			      struct ocfs2_unblock_ctl *ctl);
101static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
102				    struct ocfs2_unblock_ctl *ctl);
103static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
104				     struct ocfs2_unblock_ctl *ctl);
105static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
106				  struct ocfs2_unblock_ctl *ctl);
107
108static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
109				     struct ocfs2_lock_res *lockres);
110
111/*
112 * OCFS2 Lock Resource Operations
113 *
114 * These fine tune the behavior of the generic dlmglue locking infrastructure.
115 */
116struct ocfs2_lock_res_ops {
117	/*
118	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
119	 * this callback if ->l_priv is not an ocfs2_super pointer
120	 */
121	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
122	int  (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *);
123	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
124
125	/*
126	 * Allow a lock type to add checks to determine whether it is
127	 * safe to downconvert a lock. Return 0 to re-queue the
128	 * downconvert at a later time, nonzero to continue.
129	 *
130	 * For most locks, the default checks that there are no
131	 * incompatible holders are sufficient.
132	 *
133	 * Called with the lockres spinlock held.
134	 */
135	int (*check_downconvert)(struct ocfs2_lock_res *, int);
136
137	/*
138	 * Allows a lock type to populate the lock value block. This
139	 * is called on downconvert, and when we drop a lock.
140	 *
141	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
142	 * in the flags field.
143	 *
144	 * Called with the lockres spinlock held.
145	 */
146	void (*set_lvb)(struct ocfs2_lock_res *);
147
148	/*
149	 * LOCK_TYPE_* flags which describe the specific requirements
150	 * of a lock type. Descriptions of each individual flag follow.
151	 */
152	int flags;
153};
154
155/*
156 * Some locks want to "refresh" potentially stale data when a
157 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
158 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
159 * individual lockres l_flags member from the ast function. It is
160 * expected that the locking wrapper will clear the
161 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
162 */
163#define LOCK_TYPE_REQUIRES_REFRESH 0x1
164
165/*
166 * Indicate that a lock type makes use of the lock value block. The
167 * ->set_lvb lock type callback must be defined.
168 */
169#define LOCK_TYPE_USES_LVB		0x2
170
171typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int);
172static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
173				      struct ocfs2_lock_res *lockres,
174				      struct ocfs2_unblock_ctl *ctl,
175				      ocfs2_convert_worker_t *worker);
176
177static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
178	.get_osb	= ocfs2_get_inode_osb,
179	.unblock	= ocfs2_unblock_inode_lock,
180	.flags		= 0,
181};
182
183static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
184	.get_osb	= ocfs2_get_inode_osb,
185	.unblock	= ocfs2_unblock_meta,
186	.check_downconvert = ocfs2_check_meta_downconvert,
187	.set_lvb	= ocfs2_set_meta_lvb,
188	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
189};
190
191static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
192	.get_osb	= ocfs2_get_inode_osb,
193	.unblock	= ocfs2_unblock_data,
194	.flags		= 0,
195};
196
197static struct ocfs2_lock_res_ops ocfs2_super_lops = {
198	.unblock	= ocfs2_unblock_osb_lock,
199	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
200};
201
202static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
203	.unblock	= ocfs2_unblock_osb_lock,
204	.flags		= 0,
205};
206
207static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
208	.get_osb	= ocfs2_get_dentry_osb,
209	.unblock	= ocfs2_unblock_dentry_lock,
210	.post_unlock	= ocfs2_dentry_post_unlock,
211	.flags		= 0,
212};
213
214static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
215{
216	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
217		lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
218		lockres->l_type == OCFS2_LOCK_TYPE_RW;
219}
220
221static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
222{
223	BUG_ON(!ocfs2_is_inode_lock(lockres));
224
225	return (struct inode *) lockres->l_priv;
226}
227
228static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
229{
230	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
231
232	return (struct ocfs2_dentry_lock *)lockres->l_priv;
233}
234
235static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
236{
237	if (lockres->l_ops->get_osb)
238		return lockres->l_ops->get_osb(lockres);
239
240	return (struct ocfs2_super *)lockres->l_priv;
241}
242
243static int ocfs2_lock_create(struct ocfs2_super *osb,
244			     struct ocfs2_lock_res *lockres,
245			     int level,
246			     int dlm_flags);
247static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
248						     int wanted);
249static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
250				 struct ocfs2_lock_res *lockres,
251				 int level);
252static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
253static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
254static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
255static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
256static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
257					struct ocfs2_lock_res *lockres);
258static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
259						int convert);
260#define ocfs2_log_dlm_error(_func, _stat, _lockres) do {	\
261	mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "	\
262		"resource %s: %s\n", dlm_errname(_stat), _func,	\
263		_lockres->l_name, dlm_errmsg(_stat));		\
264} while (0)
265static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
266				 struct ocfs2_lock_res *lockres);
267static int ocfs2_meta_lock_update(struct inode *inode,
268				  struct buffer_head **bh);
269static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
270static inline int ocfs2_highest_compat_lock_level(int level);
271static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
272						  struct ocfs2_lock_res *lockres,
273						  int new_level);
274
275static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
276				  u64 blkno,
277				  u32 generation,
278				  char *name)
279{
280	int len;
281
282	mlog_entry_void();
283
284	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
285
286	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
287		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
288		       (long long)blkno, generation);
289
290	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
291
292	mlog(0, "built lock resource with name: %s\n", name);
293
294	mlog_exit_void();
295}
296
297static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
298
299static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
300				       struct ocfs2_dlm_debug *dlm_debug)
301{
302	mlog(0, "Add tracking for lockres %s\n", res->l_name);
303
304	spin_lock(&ocfs2_dlm_tracking_lock);
305	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
306	spin_unlock(&ocfs2_dlm_tracking_lock);
307}
308
309static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
310{
311	spin_lock(&ocfs2_dlm_tracking_lock);
312	if (!list_empty(&res->l_debug_list))
313		list_del_init(&res->l_debug_list);
314	spin_unlock(&ocfs2_dlm_tracking_lock);
315}
316
317static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
318				       struct ocfs2_lock_res *res,
319				       enum ocfs2_lock_type type,
320				       struct ocfs2_lock_res_ops *ops,
321				       void *priv)
322{
323	res->l_type          = type;
324	res->l_ops           = ops;
325	res->l_priv          = priv;
326
327	res->l_level         = LKM_IVMODE;
328	res->l_requested     = LKM_IVMODE;
329	res->l_blocking      = LKM_IVMODE;
330	res->l_action        = OCFS2_AST_INVALID;
331	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
332
333	res->l_flags         = OCFS2_LOCK_INITIALIZED;
334
335	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
336}
337
338void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
339{
340	/* This also clears out the lock status block */
341	memset(res, 0, sizeof(struct ocfs2_lock_res));
342	spin_lock_init(&res->l_lock);
343	init_waitqueue_head(&res->l_event);
344	INIT_LIST_HEAD(&res->l_blocked_list);
345	INIT_LIST_HEAD(&res->l_mask_waiters);
346}
347
348void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
349			       enum ocfs2_lock_type type,
350			       unsigned int generation,
351			       struct inode *inode)
352{
353	struct ocfs2_lock_res_ops *ops;
354
355	switch(type) {
356		case OCFS2_LOCK_TYPE_RW:
357			ops = &ocfs2_inode_rw_lops;
358			break;
359		case OCFS2_LOCK_TYPE_META:
360			ops = &ocfs2_inode_meta_lops;
361			break;
362		case OCFS2_LOCK_TYPE_DATA:
363			ops = &ocfs2_inode_data_lops;
364			break;
365		default:
366			mlog_bug_on_msg(1, "type: %d\n", type);
367			ops = NULL; /* thanks, gcc */
368			break;
369	};
370
371	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
372			      generation, res->l_name);
373	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
374}
375
376static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
377{
378	struct inode *inode = ocfs2_lock_res_inode(lockres);
379
380	return OCFS2_SB(inode->i_sb);
381}
382
383static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
384{
385	__be64 inode_blkno_be;
386
387	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
388	       sizeof(__be64));
389
390	return be64_to_cpu(inode_blkno_be);
391}
392
393static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
394{
395	struct ocfs2_dentry_lock *dl = lockres->l_priv;
396
397	return OCFS2_SB(dl->dl_inode->i_sb);
398}
399
400void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
401				u64 parent, struct inode *inode)
402{
403	int len;
404	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
405	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
406	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
407
408	ocfs2_lock_res_init_once(lockres);
409
410	/*
411	 * Unfortunately, the standard lock naming scheme won't work
412	 * here because we have two 16 byte values to use. Instead,
413	 * we'll stuff the inode number as a binary value. We still
414	 * want error prints to show something without garbling the
415	 * display, so drop a null byte in there before the inode
416	 * number. A future version of OCFS2 will likely use all
417	 * binary lock names. The stringified names have been a
418	 * tremendous aid in debugging, but now that the debugfs
419	 * interface exists, we can mangle things there if need be.
420	 *
421	 * NOTE: We also drop the standard "pad" value (the total lock
422	 * name size stays the same though - the last part is all
423	 * zeros due to the memset in ocfs2_lock_res_init_once()
424	 */
425	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
426		       "%c%016llx",
427		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
428		       (long long)parent);
429
430	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
431
432	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
433	       sizeof(__be64));
434
435	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
436				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
437				   dl);
438}
439
440static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
441				      struct ocfs2_super *osb)
442{
443	/* Superblock lockres doesn't come from a slab so we call init
444	 * once on it manually.  */
445	ocfs2_lock_res_init_once(res);
446	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
447			      0, res->l_name);
448	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
449				   &ocfs2_super_lops, osb);
450}
451
452static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
453				       struct ocfs2_super *osb)
454{
455	/* Rename lockres doesn't come from a slab so we call init
456	 * once on it manually.  */
457	ocfs2_lock_res_init_once(res);
458	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
459	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
460				   &ocfs2_rename_lops, osb);
461}
462
463void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
464{
465	mlog_entry_void();
466
467	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
468		return;
469
470	ocfs2_remove_lockres_tracking(res);
471
472	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
473			"Lockres %s is on the blocked list\n",
474			res->l_name);
475	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
476			"Lockres %s has mask waiters pending\n",
477			res->l_name);
478	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
479			"Lockres %s is locked\n",
480			res->l_name);
481	mlog_bug_on_msg(res->l_ro_holders,
482			"Lockres %s has %u ro holders\n",
483			res->l_name, res->l_ro_holders);
484	mlog_bug_on_msg(res->l_ex_holders,
485			"Lockres %s has %u ex holders\n",
486			res->l_name, res->l_ex_holders);
487
488	/* Need to clear out the lock status block for the dlm */
489	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
490
491	res->l_flags = 0UL;
492	mlog_exit_void();
493}
494
495static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
496				     int level)
497{
498	mlog_entry_void();
499
500	BUG_ON(!lockres);
501
502	switch(level) {
503	case LKM_EXMODE:
504		lockres->l_ex_holders++;
505		break;
506	case LKM_PRMODE:
507		lockres->l_ro_holders++;
508		break;
509	default:
510		BUG();
511	}
512
513	mlog_exit_void();
514}
515
516static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
517				     int level)
518{
519	mlog_entry_void();
520
521	BUG_ON(!lockres);
522
523	switch(level) {
524	case LKM_EXMODE:
525		BUG_ON(!lockres->l_ex_holders);
526		lockres->l_ex_holders--;
527		break;
528	case LKM_PRMODE:
529		BUG_ON(!lockres->l_ro_holders);
530		lockres->l_ro_holders--;
531		break;
532	default:
533		BUG();
534	}
535	mlog_exit_void();
536}
537
538/* WARNING: This function lives in a world where the only three lock
539 * levels are EX, PR, and NL. It *will* have to be adjusted when more
540 * lock types are added. */
541static inline int ocfs2_highest_compat_lock_level(int level)
542{
543	int new_level = LKM_EXMODE;
544
545	if (level == LKM_EXMODE)
546		new_level = LKM_NLMODE;
547	else if (level == LKM_PRMODE)
548		new_level = LKM_PRMODE;
549	return new_level;
550}
551
552static void lockres_set_flags(struct ocfs2_lock_res *lockres,
553			      unsigned long newflags)
554{
555	struct list_head *pos, *tmp;
556	struct ocfs2_mask_waiter *mw;
557
558 	assert_spin_locked(&lockres->l_lock);
559
560	lockres->l_flags = newflags;
561
562	list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
563		mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
564		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
565			continue;
566
567		list_del_init(&mw->mw_item);
568		mw->mw_status = 0;
569		complete(&mw->mw_complete);
570	}
571}
572static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
573{
574	lockres_set_flags(lockres, lockres->l_flags | or);
575}
576static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
577				unsigned long clear)
578{
579	lockres_set_flags(lockres, lockres->l_flags & ~clear);
580}
581
582static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
583{
584	mlog_entry_void();
585
586	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
587	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
588	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
589	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
590
591	lockres->l_level = lockres->l_requested;
592	if (lockres->l_level <=
593	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
594		lockres->l_blocking = LKM_NLMODE;
595		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
596	}
597	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
598
599	mlog_exit_void();
600}
601
602static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
603{
604	mlog_entry_void();
605
606	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
607	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
608
609	/* Convert from RO to EX doesn't really need anything as our
610	 * information is already up to data. Convert from NL to
611	 * *anything* however should mark ourselves as needing an
612	 * update */
613	if (lockres->l_level == LKM_NLMODE &&
614	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
615		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
616
617	lockres->l_level = lockres->l_requested;
618	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
619
620	mlog_exit_void();
621}
622
623static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
624{
625	mlog_entry_void();
626
627	BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
628	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
629
630	if (lockres->l_requested > LKM_NLMODE &&
631	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
632	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
633		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
634
635	lockres->l_level = lockres->l_requested;
636	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
637	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
638
639	mlog_exit_void();
640}
641
642static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
643				     int level)
644{
645	int needs_downconvert = 0;
646	mlog_entry_void();
647
648	assert_spin_locked(&lockres->l_lock);
649
650	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
651
652	if (level > lockres->l_blocking) {
653		/* only schedule a downconvert if we haven't already scheduled
654		 * one that goes low enough to satisfy the level we're
655		 * blocking.  this also catches the case where we get
656		 * duplicate BASTs */
657		if (ocfs2_highest_compat_lock_level(level) <
658		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
659			needs_downconvert = 1;
660
661		lockres->l_blocking = level;
662	}
663
664	mlog_exit(needs_downconvert);
665	return needs_downconvert;
666}
667
668static void ocfs2_blocking_ast(void *opaque, int level)
669{
670	struct ocfs2_lock_res *lockres = opaque;
671	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
672	int needs_downconvert;
673	unsigned long flags;
674
675	BUG_ON(level <= LKM_NLMODE);
676
677	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
678	     lockres->l_name, level, lockres->l_level,
679	     ocfs2_lock_type_string(lockres->l_type));
680
681	spin_lock_irqsave(&lockres->l_lock, flags);
682	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
683	if (needs_downconvert)
684		ocfs2_schedule_blocked_lock(osb, lockres);
685	spin_unlock_irqrestore(&lockres->l_lock, flags);
686
687	wake_up(&lockres->l_event);
688
689	ocfs2_kick_vote_thread(osb);
690}
691
692static void ocfs2_locking_ast(void *opaque)
693{
694	struct ocfs2_lock_res *lockres = opaque;
695	struct dlm_lockstatus *lksb = &lockres->l_lksb;
696	unsigned long flags;
697
698	spin_lock_irqsave(&lockres->l_lock, flags);
699
700	if (lksb->status != DLM_NORMAL) {
701		mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
702		     lockres->l_name, lksb->status);
703		spin_unlock_irqrestore(&lockres->l_lock, flags);
704		return;
705	}
706
707	switch(lockres->l_action) {
708	case OCFS2_AST_ATTACH:
709		ocfs2_generic_handle_attach_action(lockres);
710		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
711		break;
712	case OCFS2_AST_CONVERT:
713		ocfs2_generic_handle_convert_action(lockres);
714		break;
715	case OCFS2_AST_DOWNCONVERT:
716		ocfs2_generic_handle_downconvert_action(lockres);
717		break;
718	default:
719		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
720		     "lockres flags = 0x%lx, unlock action: %u\n",
721		     lockres->l_name, lockres->l_action, lockres->l_flags,
722		     lockres->l_unlock_action);
723		BUG();
724	}
725
726	/* set it to something invalid so if we get called again we
727	 * can catch it. */
728	lockres->l_action = OCFS2_AST_INVALID;
729
730	wake_up(&lockres->l_event);
731	spin_unlock_irqrestore(&lockres->l_lock, flags);
732}
733
734static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
735						int convert)
736{
737	unsigned long flags;
738
739	mlog_entry_void();
740	spin_lock_irqsave(&lockres->l_lock, flags);
741	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
742	if (convert)
743		lockres->l_action = OCFS2_AST_INVALID;
744	else
745		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
746	spin_unlock_irqrestore(&lockres->l_lock, flags);
747
748	wake_up(&lockres->l_event);
749	mlog_exit_void();
750}
751
752/* Note: If we detect another process working on the lock (i.e.,
753 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
754 * to do the right thing in that case.
755 */
756static int ocfs2_lock_create(struct ocfs2_super *osb,
757			     struct ocfs2_lock_res *lockres,
758			     int level,
759			     int dlm_flags)
760{
761	int ret = 0;
762	enum dlm_status status;
763	unsigned long flags;
764
765	mlog_entry_void();
766
767	mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
768	     dlm_flags);
769
770	spin_lock_irqsave(&lockres->l_lock, flags);
771	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
772	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
773		spin_unlock_irqrestore(&lockres->l_lock, flags);
774		goto bail;
775	}
776
777	lockres->l_action = OCFS2_AST_ATTACH;
778	lockres->l_requested = level;
779	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
780	spin_unlock_irqrestore(&lockres->l_lock, flags);
781
782	status = dlmlock(osb->dlm,
783			 level,
784			 &lockres->l_lksb,
785			 dlm_flags,
786			 lockres->l_name,
787			 OCFS2_LOCK_ID_MAX_LEN - 1,
788			 ocfs2_locking_ast,
789			 lockres,
790			 ocfs2_blocking_ast);
791	if (status != DLM_NORMAL) {
792		ocfs2_log_dlm_error("dlmlock", status, lockres);
793		ret = -EINVAL;
794		ocfs2_recover_from_dlm_error(lockres, 1);
795	}
796
797	mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
798
799bail:
800	mlog_exit(ret);
801	return ret;
802}
803
804static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
805					int flag)
806{
807	unsigned long flags;
808	int ret;
809
810	spin_lock_irqsave(&lockres->l_lock, flags);
811	ret = lockres->l_flags & flag;
812	spin_unlock_irqrestore(&lockres->l_lock, flags);
813
814	return ret;
815}
816
817static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
818
819{
820	wait_event(lockres->l_event,
821		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
822}
823
824static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
825
826{
827	wait_event(lockres->l_event,
828		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
829}
830
831/* predict what lock level we'll be dropping down to on behalf
832 * of another node, and return true if the currently wanted
833 * level will be compatible with it. */
834static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
835						     int wanted)
836{
837	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
838
839	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
840}
841
842static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
843{
844	INIT_LIST_HEAD(&mw->mw_item);
845	init_completion(&mw->mw_complete);
846}
847
848static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
849{
850	wait_for_completion(&mw->mw_complete);
851	/* Re-arm the completion in case we want to wait on it again */
852	INIT_COMPLETION(mw->mw_complete);
853	return mw->mw_status;
854}
855
856static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
857				    struct ocfs2_mask_waiter *mw,
858				    unsigned long mask,
859				    unsigned long goal)
860{
861	BUG_ON(!list_empty(&mw->mw_item));
862
863	assert_spin_locked(&lockres->l_lock);
864
865	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
866	mw->mw_mask = mask;
867	mw->mw_goal = goal;
868}
869
870/* returns 0 if the mw that was removed was already satisfied, -EBUSY
871 * if the mask still hadn't reached its goal */
872static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
873				      struct ocfs2_mask_waiter *mw)
874{
875	unsigned long flags;
876	int ret = 0;
877
878	spin_lock_irqsave(&lockres->l_lock, flags);
879	if (!list_empty(&mw->mw_item)) {
880		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
881			ret = -EBUSY;
882
883		list_del_init(&mw->mw_item);
884		init_completion(&mw->mw_complete);
885	}
886	spin_unlock_irqrestore(&lockres->l_lock, flags);
887
888	return ret;
889
890}
891
892static int ocfs2_cluster_lock(struct ocfs2_super *osb,
893			      struct ocfs2_lock_res *lockres,
894			      int level,
895			      int lkm_flags,
896			      int arg_flags)
897{
898	struct ocfs2_mask_waiter mw;
899	enum dlm_status status;
900	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
901	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
902	unsigned long flags;
903
904	mlog_entry_void();
905
906	ocfs2_init_mask_waiter(&mw);
907
908	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
909		lkm_flags |= LKM_VALBLK;
910
911again:
912	wait = 0;
913
914	if (catch_signals && signal_pending(current)) {
915		ret = -ERESTARTSYS;
916		goto out;
917	}
918
919	spin_lock_irqsave(&lockres->l_lock, flags);
920
921	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
922			"Cluster lock called on freeing lockres %s! flags "
923			"0x%lx\n", lockres->l_name, lockres->l_flags);
924
925	/* We only compare against the currently granted level
926	 * here. If the lock is blocked waiting on a downconvert,
927	 * we'll get caught below. */
928	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
929	    level > lockres->l_level) {
930		/* is someone sitting in dlm_lock? If so, wait on
931		 * them. */
932		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
933		wait = 1;
934		goto unlock;
935	}
936
937	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
938		/* lock has not been created yet. */
939		spin_unlock_irqrestore(&lockres->l_lock, flags);
940
941		ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
942		if (ret < 0) {
943			mlog_errno(ret);
944			goto out;
945		}
946		goto again;
947	}
948
949	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
950	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
951		/* is the lock is currently blocked on behalf of
952		 * another node */
953		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
954		wait = 1;
955		goto unlock;
956	}
957
958	if (level > lockres->l_level) {
959		if (lockres->l_action != OCFS2_AST_INVALID)
960			mlog(ML_ERROR, "lockres %s has action %u pending\n",
961			     lockres->l_name, lockres->l_action);
962
963		lockres->l_action = OCFS2_AST_CONVERT;
964		lockres->l_requested = level;
965		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
966		spin_unlock_irqrestore(&lockres->l_lock, flags);
967
968		BUG_ON(level == LKM_IVMODE);
969		BUG_ON(level == LKM_NLMODE);
970
971		mlog(0, "lock %s, convert from %d to level = %d\n",
972		     lockres->l_name, lockres->l_level, level);
973
974		/* call dlm_lock to upgrade lock now */
975		status = dlmlock(osb->dlm,
976				 level,
977				 &lockres->l_lksb,
978				 lkm_flags|LKM_CONVERT,
979				 lockres->l_name,
980				 OCFS2_LOCK_ID_MAX_LEN - 1,
981				 ocfs2_locking_ast,
982				 lockres,
983				 ocfs2_blocking_ast);
984		if (status != DLM_NORMAL) {
985			if ((lkm_flags & LKM_NOQUEUE) &&
986			    (status == DLM_NOTQUEUED))
987				ret = -EAGAIN;
988			else {
989				ocfs2_log_dlm_error("dlmlock", status,
990						    lockres);
991				ret = -EINVAL;
992			}
993			ocfs2_recover_from_dlm_error(lockres, 1);
994			goto out;
995		}
996
997		mlog(0, "lock %s, successfull return from dlmlock\n",
998		     lockres->l_name);
999
1000		/* At this point we've gone inside the dlm and need to
1001		 * complete our work regardless. */
1002		catch_signals = 0;
1003
1004		/* wait for busy to clear and carry on */
1005		goto again;
1006	}
1007
1008	/* Ok, if we get here then we're good to go. */
1009	ocfs2_inc_holders(lockres, level);
1010
1011	ret = 0;
1012unlock:
1013	spin_unlock_irqrestore(&lockres->l_lock, flags);
1014out:
1015	/*
1016	 * This is helping work around a lock inversion between the page lock
1017	 * and dlm locks.  One path holds the page lock while calling aops
1018	 * which block acquiring dlm locks.  The voting thread holds dlm
1019	 * locks while acquiring page locks while down converting data locks.
1020	 * This block is helping an aop path notice the inversion and back
1021	 * off to unlock its page lock before trying the dlm lock again.
1022	 */
1023	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1024	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1025		wait = 0;
1026		if (lockres_remove_mask_waiter(lockres, &mw))
1027			ret = -EAGAIN;
1028		else
1029			goto again;
1030	}
1031	if (wait) {
1032		ret = ocfs2_wait_for_mask(&mw);
1033		if (ret == 0)
1034			goto again;
1035		mlog_errno(ret);
1036	}
1037
1038	mlog_exit(ret);
1039	return ret;
1040}
1041
1042static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1043				 struct ocfs2_lock_res *lockres,
1044				 int level)
1045{
1046	unsigned long flags;
1047
1048	mlog_entry_void();
1049	spin_lock_irqsave(&lockres->l_lock, flags);
1050	ocfs2_dec_holders(lockres, level);
1051	ocfs2_vote_on_unlock(osb, lockres);
1052	spin_unlock_irqrestore(&lockres->l_lock, flags);
1053	mlog_exit_void();
1054}
1055
1056int ocfs2_create_new_lock(struct ocfs2_super *osb,
1057			  struct ocfs2_lock_res *lockres,
1058			  int ex,
1059			  int local)
1060{
1061	int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1062	unsigned long flags;
1063	int lkm_flags = local ? LKM_LOCAL : 0;
1064
1065	spin_lock_irqsave(&lockres->l_lock, flags);
1066	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1067	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1068	spin_unlock_irqrestore(&lockres->l_lock, flags);
1069
1070	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1071}
1072
1073/* Grants us an EX lock on the data and metadata resources, skipping
1074 * the normal cluster directory lookup. Use this ONLY on newly created
1075 * inodes which other nodes can't possibly see, and which haven't been
1076 * hashed in the inode hash yet. This can give us a good performance
1077 * increase as it'll skip the network broadcast normally associated
1078 * with creating a new lock resource. */
1079int ocfs2_create_new_inode_locks(struct inode *inode)
1080{
1081	int ret;
1082	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1083
1084	BUG_ON(!inode);
1085	BUG_ON(!ocfs2_inode_is_new(inode));
1086
1087	mlog_entry_void();
1088
1089	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1090
1091	/* NOTE: That we don't increment any of the holder counts, nor
1092	 * do we add anything to a journal handle. Since this is
1093	 * supposed to be a new inode which the cluster doesn't know
1094	 * about yet, there is no need to.  As far as the LVB handling
1095	 * is concerned, this is basically like acquiring an EX lock
1096	 * on a resource which has an invalid one -- we'll set it
1097	 * valid when we release the EX. */
1098
1099	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1100	if (ret) {
1101		mlog_errno(ret);
1102		goto bail;
1103	}
1104
1105	/*
1106	 * We don't want to use LKM_LOCAL on a meta data lock as they
1107	 * don't use a generation in their lock names.
1108	 */
1109	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
1110	if (ret) {
1111		mlog_errno(ret);
1112		goto bail;
1113	}
1114
1115	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
1116	if (ret) {
1117		mlog_errno(ret);
1118		goto bail;
1119	}
1120
1121bail:
1122	mlog_exit(ret);
1123	return ret;
1124}
1125
1126int ocfs2_rw_lock(struct inode *inode, int write)
1127{
1128	int status, level;
1129	struct ocfs2_lock_res *lockres;
1130
1131	BUG_ON(!inode);
1132
1133	mlog_entry_void();
1134
1135	mlog(0, "inode %llu take %s RW lock\n",
1136	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1137	     write ? "EXMODE" : "PRMODE");
1138
1139	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1140
1141	level = write ? LKM_EXMODE : LKM_PRMODE;
1142
1143	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1144				    0);
1145	if (status < 0)
1146		mlog_errno(status);
1147
1148	mlog_exit(status);
1149	return status;
1150}
1151
1152void ocfs2_rw_unlock(struct inode *inode, int write)
1153{
1154	int level = write ? LKM_EXMODE : LKM_PRMODE;
1155	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1156
1157	mlog_entry_void();
1158
1159	mlog(0, "inode %llu drop %s RW lock\n",
1160	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1161	     write ? "EXMODE" : "PRMODE");
1162
1163	ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1164
1165	mlog_exit_void();
1166}
1167
1168int ocfs2_data_lock_full(struct inode *inode,
1169			 int write,
1170			 int arg_flags)
1171{
1172	int status = 0, level;
1173	struct ocfs2_lock_res *lockres;
1174
1175	BUG_ON(!inode);
1176
1177	mlog_entry_void();
1178
1179	mlog(0, "inode %llu take %s DATA lock\n",
1180	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1181	     write ? "EXMODE" : "PRMODE");
1182
1183	/* We'll allow faking a readonly data lock for
1184	 * rodevices. */
1185	if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1186		if (write) {
1187			status = -EROFS;
1188			mlog_errno(status);
1189		}
1190		goto out;
1191	}
1192
1193	lockres = &OCFS2_I(inode)->ip_data_lockres;
1194
1195	level = write ? LKM_EXMODE : LKM_PRMODE;
1196
1197	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1198				    0, arg_flags);
1199	if (status < 0 && status != -EAGAIN)
1200		mlog_errno(status);
1201
1202out:
1203	mlog_exit(status);
1204	return status;
1205}
1206
1207/* see ocfs2_meta_lock_with_page() */
1208int ocfs2_data_lock_with_page(struct inode *inode,
1209			      int write,
1210			      struct page *page)
1211{
1212	int ret;
1213
1214	ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1215	if (ret == -EAGAIN) {
1216		unlock_page(page);
1217		if (ocfs2_data_lock(inode, write) == 0)
1218			ocfs2_data_unlock(inode, write);
1219		ret = AOP_TRUNCATED_PAGE;
1220	}
1221
1222	return ret;
1223}
1224
1225static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1226				 struct ocfs2_lock_res *lockres)
1227{
1228	int kick = 0;
1229
1230	mlog_entry_void();
1231
1232	/* If we know that another node is waiting on our lock, kick
1233	 * the vote thread * pre-emptively when we reach a release
1234	 * condition. */
1235	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1236		switch(lockres->l_blocking) {
1237		case LKM_EXMODE:
1238			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1239				kick = 1;
1240			break;
1241		case LKM_PRMODE:
1242			if (!lockres->l_ex_holders)
1243				kick = 1;
1244			break;
1245		default:
1246			BUG();
1247		}
1248	}
1249
1250	if (kick)
1251		ocfs2_kick_vote_thread(osb);
1252
1253	mlog_exit_void();
1254}
1255
1256void ocfs2_data_unlock(struct inode *inode,
1257		       int write)
1258{
1259	int level = write ? LKM_EXMODE : LKM_PRMODE;
1260	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1261
1262	mlog_entry_void();
1263
1264	mlog(0, "inode %llu drop %s DATA lock\n",
1265	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1266	     write ? "EXMODE" : "PRMODE");
1267
1268	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1269		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1270
1271	mlog_exit_void();
1272}
1273
1274#define OCFS2_SEC_BITS   34
1275#define OCFS2_SEC_SHIFT  (64 - 34)
1276#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1277
1278/* LVB only has room for 64 bits of time here so we pack it for
1279 * now. */
1280static u64 ocfs2_pack_timespec(struct timespec *spec)
1281{
1282	u64 res;
1283	u64 sec = spec->tv_sec;
1284	u32 nsec = spec->tv_nsec;
1285
1286	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1287
1288	return res;
1289}
1290
1291/* Call this with the lockres locked. I am reasonably sure we don't
1292 * need ip_lock in this function as anyone who would be changing those
1293 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1294static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1295{
1296	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1297	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1298	struct ocfs2_meta_lvb *lvb;
1299
1300	mlog_entry_void();
1301
1302	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1303
1304	/*
1305	 * Invalidate the LVB of a deleted inode - this way other
1306	 * nodes are forced to go to disk and discover the new inode
1307	 * status.
1308	 */
1309	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1310		lvb->lvb_version = 0;
1311		goto out;
1312	}
1313
1314	lvb->lvb_version   = OCFS2_LVB_VERSION;
1315	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1316	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1317	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1318	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1319	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1320	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1321	lvb->lvb_iatime_packed  =
1322		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1323	lvb->lvb_ictime_packed =
1324		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1325	lvb->lvb_imtime_packed =
1326		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1327	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1328	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1329
1330out:
1331	mlog_meta_lvb(0, lockres);
1332
1333	mlog_exit_void();
1334}
1335
1336static void ocfs2_unpack_timespec(struct timespec *spec,
1337				  u64 packed_time)
1338{
1339	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1340	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1341}
1342
1343static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1344{
1345	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1346	struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1347	struct ocfs2_meta_lvb *lvb;
1348
1349	mlog_entry_void();
1350
1351	mlog_meta_lvb(0, lockres);
1352
1353	lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1354
1355	/* We're safe here without the lockres lock... */
1356	spin_lock(&oi->ip_lock);
1357	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1358	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1359
1360	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1361	ocfs2_set_inode_flags(inode);
1362
1363	/* fast-symlinks are a special case */
1364	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1365		inode->i_blocks = 0;
1366	else
1367		inode->i_blocks =
1368			ocfs2_align_bytes_to_sectors(i_size_read(inode));
1369
1370	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1371	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1372	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1373	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1374	ocfs2_unpack_timespec(&inode->i_atime,
1375			      be64_to_cpu(lvb->lvb_iatime_packed));
1376	ocfs2_unpack_timespec(&inode->i_mtime,
1377			      be64_to_cpu(lvb->lvb_imtime_packed));
1378	ocfs2_unpack_timespec(&inode->i_ctime,
1379			      be64_to_cpu(lvb->lvb_ictime_packed));
1380	spin_unlock(&oi->ip_lock);
1381
1382	mlog_exit_void();
1383}
1384
1385static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1386					      struct ocfs2_lock_res *lockres)
1387{
1388	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1389
1390	if (lvb->lvb_version == OCFS2_LVB_VERSION
1391	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1392		return 1;
1393	return 0;
1394}
1395
1396/* Determine whether a lock resource needs to be refreshed, and
1397 * arbitrate who gets to refresh it.
1398 *
1399 *   0 means no refresh needed.
1400 *
1401 *   > 0 means you need to refresh this and you MUST call
1402 *   ocfs2_complete_lock_res_refresh afterwards. */
1403static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1404{
1405	unsigned long flags;
1406	int status = 0;
1407
1408	mlog_entry_void();
1409
1410refresh_check:
1411	spin_lock_irqsave(&lockres->l_lock, flags);
1412	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1413		spin_unlock_irqrestore(&lockres->l_lock, flags);
1414		goto bail;
1415	}
1416
1417	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1418		spin_unlock_irqrestore(&lockres->l_lock, flags);
1419
1420		ocfs2_wait_on_refreshing_lock(lockres);
1421		goto refresh_check;
1422	}
1423
1424	/* Ok, I'll be the one to refresh this lock. */
1425	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1426	spin_unlock_irqrestore(&lockres->l_lock, flags);
1427
1428	status = 1;
1429bail:
1430	mlog_exit(status);
1431	return status;
1432}
1433
1434/* If status is non zero, I'll mark it as not being in refresh
1435 * anymroe, but i won't clear the needs refresh flag. */
1436static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1437						   int status)
1438{
1439	unsigned long flags;
1440	mlog_entry_void();
1441
1442	spin_lock_irqsave(&lockres->l_lock, flags);
1443	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1444	if (!status)
1445		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1446	spin_unlock_irqrestore(&lockres->l_lock, flags);
1447
1448	wake_up(&lockres->l_event);
1449
1450	mlog_exit_void();
1451}
1452
1453/* may or may not return a bh if it went to disk. */
1454static int ocfs2_meta_lock_update(struct inode *inode,
1455				  struct buffer_head **bh)
1456{
1457	int status = 0;
1458	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1459	struct ocfs2_lock_res *lockres;
1460	struct ocfs2_dinode *fe;
1461
1462	mlog_entry_void();
1463
1464	spin_lock(&oi->ip_lock);
1465	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1466		mlog(0, "Orphaned inode %llu was deleted while we "
1467		     "were waiting on a lock. ip_flags = 0x%x\n",
1468		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
1469		spin_unlock(&oi->ip_lock);
1470		status = -ENOENT;
1471		goto bail;
1472	}
1473	spin_unlock(&oi->ip_lock);
1474
1475	lockres = &oi->ip_meta_lockres;
1476
1477	if (!ocfs2_should_refresh_lock_res(lockres))
1478		goto bail;
1479
1480	/* This will discard any caching information we might have had
1481	 * for the inode metadata. */
1482	ocfs2_metadata_cache_purge(inode);
1483
1484	/* will do nothing for inode types that don't use the extent
1485	 * map (directories, bitmap files, etc) */
1486	ocfs2_extent_map_trunc(inode, 0);
1487
1488	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1489		mlog(0, "Trusting LVB on inode %llu\n",
1490		     (unsigned long long)oi->ip_blkno);
1491		ocfs2_refresh_inode_from_lvb(inode);
1492	} else {
1493		/* Boo, we have to go to disk. */
1494		/* read bh, cast, ocfs2_refresh_inode */
1495		status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1496					  bh, OCFS2_BH_CACHED, inode);
1497		if (status < 0) {
1498			mlog_errno(status);
1499			goto bail_refresh;
1500		}
1501		fe = (struct ocfs2_dinode *) (*bh)->b_data;
1502
1503		/* This is a good chance to make sure we're not
1504		 * locking an invalid object.
1505		 *
1506		 * We bug on a stale inode here because we checked
1507		 * above whether it was wiped from disk. The wiping
1508		 * node provides a guarantee that we receive that
1509		 * message and can mark the inode before dropping any
1510		 * locks associated with it. */
1511		if (!OCFS2_IS_VALID_DINODE(fe)) {
1512			OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1513			status = -EIO;
1514			goto bail_refresh;
1515		}
1516		mlog_bug_on_msg(inode->i_generation !=
1517				le32_to_cpu(fe->i_generation),
1518				"Invalid dinode %llu disk generation: %u "
1519				"inode->i_generation: %u\n",
1520				(unsigned long long)oi->ip_blkno,
1521				le32_to_cpu(fe->i_generation),
1522				inode->i_generation);
1523		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1524				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1525				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
1526				(unsigned long long)oi->ip_blkno,
1527				(unsigned long long)le64_to_cpu(fe->i_dtime),
1528				le32_to_cpu(fe->i_flags));
1529
1530		ocfs2_refresh_inode(inode, fe);
1531	}
1532
1533	status = 0;
1534bail_refresh:
1535	ocfs2_complete_lock_res_refresh(lockres, status);
1536bail:
1537	mlog_exit(status);
1538	return status;
1539}
1540
1541static int ocfs2_assign_bh(struct inode *inode,
1542			   struct buffer_head **ret_bh,
1543			   struct buffer_head *passed_bh)
1544{
1545	int status;
1546
1547	if (passed_bh) {
1548		/* Ok, the update went to disk for us, use the
1549		 * returned bh. */
1550		*ret_bh = passed_bh;
1551		get_bh(*ret_bh);
1552
1553		return 0;
1554	}
1555
1556	status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1557				  OCFS2_I(inode)->ip_blkno,
1558				  ret_bh,
1559				  OCFS2_BH_CACHED,
1560				  inode);
1561	if (status < 0)
1562		mlog_errno(status);
1563
1564	return status;
1565}
1566
1567/*
1568 * returns < 0 error if the callback will never be called, otherwise
1569 * the result of the lock will be communicated via the callback.
1570 */
1571int ocfs2_meta_lock_full(struct inode *inode,
1572			 struct ocfs2_journal_handle *handle,
1573			 struct buffer_head **ret_bh,
1574			 int ex,
1575			 int arg_flags)
1576{
1577	int status, level, dlm_flags, acquired;
1578	struct ocfs2_lock_res *lockres;
1579	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1580	struct buffer_head *local_bh = NULL;
1581
1582	BUG_ON(!inode);
1583
1584	mlog_entry_void();
1585
1586	mlog(0, "inode %llu, take %s META lock\n",
1587	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1588	     ex ? "EXMODE" : "PRMODE");
1589
1590	status = 0;
1591	acquired = 0;
1592	/* We'll allow faking a readonly metadata lock for
1593	 * rodevices. */
1594	if (ocfs2_is_hard_readonly(osb)) {
1595		if (ex)
1596			status = -EROFS;
1597		goto bail;
1598	}
1599
1600	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1601		wait_event(osb->recovery_event,
1602			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1603
1604	acquired = 0;
1605	lockres = &OCFS2_I(inode)->ip_meta_lockres;
1606	level = ex ? LKM_EXMODE : LKM_PRMODE;
1607	dlm_flags = 0;
1608	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1609		dlm_flags |= LKM_NOQUEUE;
1610
1611	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1612	if (status < 0) {
1613		if (status != -EAGAIN && status != -EIOCBRETRY)
1614			mlog_errno(status);
1615		goto bail;
1616	}
1617
1618	/* Notify the error cleanup path to drop the cluster lock. */
1619	acquired = 1;
1620
1621	/* We wait twice because a node may have died while we were in
1622	 * the lower dlm layers. The second time though, we've
1623	 * committed to owning this lock so we don't allow signals to
1624	 * abort the operation. */
1625	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1626		wait_event(osb->recovery_event,
1627			   ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1628
1629	/*
1630	 * We only see this flag if we're being called from
1631	 * ocfs2_read_locked_inode(). It means we're locking an inode
1632	 * which hasn't been populated yet, so clear the refresh flag
1633	 * and let the caller handle it.
1634	 */
1635	if (inode->i_state & I_NEW) {
1636		status = 0;
1637		ocfs2_complete_lock_res_refresh(lockres, 0);
1638		goto bail;
1639	}
1640
1641	/* This is fun. The caller may want a bh back, or it may
1642	 * not. ocfs2_meta_lock_update definitely wants one in, but
1643	 * may or may not read one, depending on what's in the
1644	 * LVB. The result of all of this is that we've *only* gone to
1645	 * disk if we have to, so the complexity is worthwhile. */
1646	status = ocfs2_meta_lock_update(inode, &local_bh);
1647	if (status < 0) {
1648		if (status != -ENOENT)
1649			mlog_errno(status);
1650		goto bail;
1651	}
1652
1653	if (ret_bh) {
1654		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1655		if (status < 0) {
1656			mlog_errno(status);
1657			goto bail;
1658		}
1659	}
1660
1661	if (handle) {
1662		status = ocfs2_handle_add_lock(handle, inode);
1663		if (status < 0)
1664			mlog_errno(status);
1665	}
1666
1667bail:
1668	if (status < 0) {
1669		if (ret_bh && (*ret_bh)) {
1670			brelse(*ret_bh);
1671			*ret_bh = NULL;
1672		}
1673		if (acquired)
1674			ocfs2_meta_unlock(inode, ex);
1675	}
1676
1677	if (local_bh)
1678		brelse(local_bh);
1679
1680	mlog_exit(status);
1681	return status;
1682}
1683
1684/*
1685 * This is working around a lock inversion between tasks acquiring DLM locks
1686 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1687 * while acquiring page locks.
1688 *
1689 * ** These _with_page variantes are only intended to be called from aop
1690 * methods that hold page locks and return a very specific *positive* error
1691 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1692 *
1693 * The DLM is called such that it returns -EAGAIN if it would have blocked
1694 * waiting for the vote thread.  In that case we unlock our page so the vote
1695 * thread can make progress.  Once we've done this we have to return
1696 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1697 * into the VFS who will then immediately retry the aop call.
1698 *
1699 * We do a blocking lock and immediate unlock before returning, though, so that
1700 * the lock has a great chance of being cached on this node by the time the VFS
1701 * calls back to retry the aop.    This has a potential to livelock as nodes
1702 * ping locks back and forth, but that's a risk we're willing to take to avoid
1703 * the lock inversion simply.
1704 */
1705int ocfs2_meta_lock_with_page(struct inode *inode,
1706			      struct ocfs2_journal_handle *handle,
1707			      struct buffer_head **ret_bh,
1708			      int ex,
1709			      struct page *page)
1710{
1711	int ret;
1712
1713	ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1714				   OCFS2_LOCK_NONBLOCK);
1715	if (ret == -EAGAIN) {
1716		unlock_page(page);
1717		if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1718			ocfs2_meta_unlock(inode, ex);
1719		ret = AOP_TRUNCATED_PAGE;
1720	}
1721
1722	return ret;
1723}
1724
1725void ocfs2_meta_unlock(struct inode *inode,
1726		       int ex)
1727{
1728	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1729	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1730
1731	mlog_entry_void();
1732
1733	mlog(0, "inode %llu drop %s META lock\n",
1734	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1735	     ex ? "EXMODE" : "PRMODE");
1736
1737	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1738		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1739
1740	mlog_exit_void();
1741}
1742
1743int ocfs2_super_lock(struct ocfs2_super *osb,
1744		     int ex)
1745{
1746	int status;
1747	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1748	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1749	struct buffer_head *bh;
1750	struct ocfs2_slot_info *si = osb->slot_info;
1751
1752	mlog_entry_void();
1753
1754	if (ocfs2_is_hard_readonly(osb))
1755		return -EROFS;
1756
1757	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1758	if (status < 0) {
1759		mlog_errno(status);
1760		goto bail;
1761	}
1762
1763	/* The super block lock path is really in the best position to
1764	 * know when resources covered by the lock need to be
1765	 * refreshed, so we do it here. Of course, making sense of
1766	 * everything is up to the caller :) */
1767	status = ocfs2_should_refresh_lock_res(lockres);
1768	if (status < 0) {
1769		mlog_errno(status);
1770		goto bail;
1771	}
1772	if (status) {
1773		bh = si->si_bh;
1774		status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1775					  si->si_inode);
1776		if (status == 0)
1777			ocfs2_update_slot_info(si);
1778
1779		ocfs2_complete_lock_res_refresh(lockres, status);
1780
1781		if (status < 0)
1782			mlog_errno(status);
1783	}
1784bail:
1785	mlog_exit(status);
1786	return status;
1787}
1788
1789void ocfs2_super_unlock(struct ocfs2_super *osb,
1790			int ex)
1791{
1792	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1793	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1794
1795	ocfs2_cluster_unlock(osb, lockres, level);
1796}
1797
1798int ocfs2_rename_lock(struct ocfs2_super *osb)
1799{
1800	int status;
1801	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1802
1803	if (ocfs2_is_hard_readonly(osb))
1804		return -EROFS;
1805
1806	status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1807	if (status < 0)
1808		mlog_errno(status);
1809
1810	return status;
1811}
1812
1813void ocfs2_rename_unlock(struct ocfs2_super *osb)
1814{
1815	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1816
1817	ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1818}
1819
1820int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1821{
1822	int ret;
1823	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1824	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1825	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1826
1827	BUG_ON(!dl);
1828
1829	if (ocfs2_is_hard_readonly(osb))
1830		return -EROFS;
1831
1832	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1833	if (ret < 0)
1834		mlog_errno(ret);
1835
1836	return ret;
1837}
1838
1839void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1840{
1841	int level = ex ? LKM_EXMODE : LKM_PRMODE;
1842	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1843	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1844
1845	ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1846}
1847
1848/* Reference counting of the dlm debug structure. We want this because
1849 * open references on the debug inodes can live on after a mount, so
1850 * we can't rely on the ocfs2_super to always exist. */
1851static void ocfs2_dlm_debug_free(struct kref *kref)
1852{
1853	struct ocfs2_dlm_debug *dlm_debug;
1854
1855	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1856
1857	kfree(dlm_debug);
1858}
1859
1860void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1861{
1862	if (dlm_debug)
1863		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1864}
1865
1866static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1867{
1868	kref_get(&debug->d_refcnt);
1869}
1870
1871struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1872{
1873	struct ocfs2_dlm_debug *dlm_debug;
1874
1875	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1876	if (!dlm_debug) {
1877		mlog_errno(-ENOMEM);
1878		goto out;
1879	}
1880
1881	kref_init(&dlm_debug->d_refcnt);
1882	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1883	dlm_debug->d_locking_state = NULL;
1884out:
1885	return dlm_debug;
1886}
1887
1888/* Access to this is arbitrated for us via seq_file->sem. */
1889struct ocfs2_dlm_seq_priv {
1890	struct ocfs2_dlm_debug *p_dlm_debug;
1891	struct ocfs2_lock_res p_iter_res;
1892	struct ocfs2_lock_res p_tmp_res;
1893};
1894
1895static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1896						 struct ocfs2_dlm_seq_priv *priv)
1897{
1898	struct ocfs2_lock_res *iter, *ret = NULL;
1899	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1900
1901	assert_spin_locked(&ocfs2_dlm_tracking_lock);
1902
1903	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1904		/* discover the head of the list */
1905		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1906			mlog(0, "End of list found, %p\n", ret);
1907			break;
1908		}
1909
1910		/* We track our "dummy" iteration lockres' by a NULL
1911		 * l_ops field. */
1912		if (iter->l_ops != NULL) {
1913			ret = iter;
1914			break;
1915		}
1916	}
1917
1918	return ret;
1919}
1920
1921static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1922{
1923	struct ocfs2_dlm_seq_priv *priv = m->private;
1924	struct ocfs2_lock_res *iter;
1925
1926	spin_lock(&ocfs2_dlm_tracking_lock);
1927	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
1928	if (iter) {
1929		/* Since lockres' have the lifetime of their container
1930		 * (which can be inodes, ocfs2_supers, etc) we want to
1931		 * copy this out to a temporary lockres while still
1932		 * under the spinlock. Obviously after this we can't
1933		 * trust any pointers on the copy returned, but that's
1934		 * ok as the information we want isn't typically held
1935		 * in them. */
1936		priv->p_tmp_res = *iter;
1937		iter = &priv->p_tmp_res;
1938	}
1939	spin_unlock(&ocfs2_dlm_tracking_lock);
1940
1941	return iter;
1942}
1943
1944static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
1945{
1946}
1947
1948static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
1949{
1950	struct ocfs2_dlm_seq_priv *priv = m->private;
1951	struct ocfs2_lock_res *iter = v;
1952	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
1953
1954	spin_lock(&ocfs2_dlm_tracking_lock);
1955	iter = ocfs2_dlm_next_res(iter, priv);
1956	list_del_init(&dummy->l_debug_list);
1957	if (iter) {
1958		list_add(&dummy->l_debug_list, &iter->l_debug_list);
1959		priv->p_tmp_res = *iter;
1960		iter = &priv->p_tmp_res;
1961	}
1962	spin_unlock(&ocfs2_dlm_tracking_lock);
1963
1964	return iter;
1965}
1966
1967/* So that debugfs.ocfs2 can determine which format is being used */
1968#define OCFS2_DLM_DEBUG_STR_VERSION 1
1969static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
1970{
1971	int i;
1972	char *lvb;
1973	struct ocfs2_lock_res *lockres = v;
1974
1975	if (!lockres)
1976		return -EINVAL;
1977
1978	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
1979
1980	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
1981		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
1982			   lockres->l_name,
1983			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
1984	else
1985		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
1986
1987	seq_printf(m, "%d\t"
1988		   "0x%lx\t"
1989		   "0x%x\t"
1990		   "0x%x\t"
1991		   "%u\t"
1992		   "%u\t"
1993		   "%d\t"
1994		   "%d\t",
1995		   lockres->l_level,
1996		   lockres->l_flags,
1997		   lockres->l_action,
1998		   lockres->l_unlock_action,
1999		   lockres->l_ro_holders,
2000		   lockres->l_ex_holders,
2001		   lockres->l_requested,
2002		   lockres->l_blocking);
2003
2004	/* Dump the raw LVB */
2005	lvb = lockres->l_lksb.lvb;
2006	for(i = 0; i < DLM_LVB_LEN; i++)
2007		seq_printf(m, "0x%x\t", lvb[i]);
2008
2009	/* End the line */
2010	seq_printf(m, "\n");
2011	return 0;
2012}
2013
2014static struct seq_operations ocfs2_dlm_seq_ops = {
2015	.start =	ocfs2_dlm_seq_start,
2016	.stop =		ocfs2_dlm_seq_stop,
2017	.next =		ocfs2_dlm_seq_next,
2018	.show =		ocfs2_dlm_seq_show,
2019};
2020
2021static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2022{
2023	struct seq_file *seq = (struct seq_file *) file->private_data;
2024	struct ocfs2_dlm_seq_priv *priv = seq->private;
2025	struct ocfs2_lock_res *res = &priv->p_iter_res;
2026
2027	ocfs2_remove_lockres_tracking(res);
2028	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2029	return seq_release_private(inode, file);
2030}
2031
2032static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2033{
2034	int ret;
2035	struct ocfs2_dlm_seq_priv *priv;
2036	struct seq_file *seq;
2037	struct ocfs2_super *osb;
2038
2039	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2040	if (!priv) {
2041		ret = -ENOMEM;
2042		mlog_errno(ret);
2043		goto out;
2044	}
2045	osb = (struct ocfs2_super *) inode->u.generic_ip;
2046	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2047	priv->p_dlm_debug = osb->osb_dlm_debug;
2048	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2049
2050	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2051	if (ret) {
2052		kfree(priv);
2053		mlog_errno(ret);
2054		goto out;
2055	}
2056
2057	seq = (struct seq_file *) file->private_data;
2058	seq->private = priv;
2059
2060	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2061				   priv->p_dlm_debug);
2062
2063out:
2064	return ret;
2065}
2066
2067static const struct file_operations ocfs2_dlm_debug_fops = {
2068	.open =		ocfs2_dlm_debug_open,
2069	.release =	ocfs2_dlm_debug_release,
2070	.read =		seq_read,
2071	.llseek =	seq_lseek,
2072};
2073
2074static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2075{
2076	int ret = 0;
2077	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2078
2079	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2080							 S_IFREG|S_IRUSR,
2081							 osb->osb_debug_root,
2082							 osb,
2083							 &ocfs2_dlm_debug_fops);
2084	if (!dlm_debug->d_locking_state) {
2085		ret = -EINVAL;
2086		mlog(ML_ERROR,
2087		     "Unable to create locking state debugfs file.\n");
2088		goto out;
2089	}
2090
2091	ocfs2_get_dlm_debug(dlm_debug);
2092out:
2093	return ret;
2094}
2095
2096static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2097{
2098	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2099
2100	if (dlm_debug) {
2101		debugfs_remove(dlm_debug->d_locking_state);
2102		ocfs2_put_dlm_debug(dlm_debug);
2103	}
2104}
2105
2106int ocfs2_dlm_init(struct ocfs2_super *osb)
2107{
2108	int status;
2109	u32 dlm_key;
2110	struct dlm_ctxt *dlm;
2111
2112	mlog_entry_void();
2113
2114	status = ocfs2_dlm_init_debug(osb);
2115	if (status < 0) {
2116		mlog_errno(status);
2117		goto bail;
2118	}
2119
2120	/* launch vote thread */
2121	osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
2122	if (IS_ERR(osb->vote_task)) {
2123		status = PTR_ERR(osb->vote_task);
2124		osb->vote_task = NULL;
2125		mlog_errno(status);
2126		goto bail;
2127	}
2128
2129	/* used by the dlm code to make message headers unique, each
2130	 * node in this domain must agree on this. */
2131	dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2132
2133	/* for now, uuid == domain */
2134	dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2135	if (IS_ERR(dlm)) {
2136		status = PTR_ERR(dlm);
2137		mlog_errno(status);
2138		goto bail;
2139	}
2140
2141	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2142	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2143
2144	dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2145
2146	osb->dlm = dlm;
2147
2148	status = 0;
2149bail:
2150	if (status < 0) {
2151		ocfs2_dlm_shutdown_debug(osb);
2152		if (osb->vote_task)
2153			kthread_stop(osb->vote_task);
2154	}
2155
2156	mlog_exit(status);
2157	return status;
2158}
2159
2160void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2161{
2162	mlog_entry_void();
2163
2164	dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2165
2166	ocfs2_drop_osb_locks(osb);
2167
2168	if (osb->vote_task) {
2169		kthread_stop(osb->vote_task);
2170		osb->vote_task = NULL;
2171	}
2172
2173	ocfs2_lock_res_free(&osb->osb_super_lockres);
2174	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2175
2176	dlm_unregister_domain(osb->dlm);
2177	osb->dlm = NULL;
2178
2179	ocfs2_dlm_shutdown_debug(osb);
2180
2181	mlog_exit_void();
2182}
2183
2184static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2185{
2186	struct ocfs2_lock_res *lockres = opaque;
2187	unsigned long flags;
2188
2189	mlog_entry_void();
2190
2191	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2192	     lockres->l_unlock_action);
2193
2194	spin_lock_irqsave(&lockres->l_lock, flags);
2195	/* We tried to cancel a convert request, but it was already
2196	 * granted. All we want to do here is clear our unlock
2197	 * state. The wake_up call done at the bottom is redundant
2198	 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2199	 * hurt anything anyway */
2200	if (status == DLM_CANCELGRANT &&
2201	    lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2202		mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2203
2204		/* We don't clear the busy flag in this case as it
2205		 * should have been cleared by the ast which the dlm
2206		 * has called. */
2207		goto complete_unlock;
2208	}
2209
2210	if (status != DLM_NORMAL) {
2211		mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2212		     "unlock_action %d\n", status, lockres->l_name,
2213		     lockres->l_unlock_action);
2214		spin_unlock_irqrestore(&lockres->l_lock, flags);
2215		return;
2216	}
2217
2218	switch(lockres->l_unlock_action) {
2219	case OCFS2_UNLOCK_CANCEL_CONVERT:
2220		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2221		lockres->l_action = OCFS2_AST_INVALID;
2222		break;
2223	case OCFS2_UNLOCK_DROP_LOCK:
2224		lockres->l_level = LKM_IVMODE;
2225		break;
2226	default:
2227		BUG();
2228	}
2229
2230	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2231complete_unlock:
2232	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2233	spin_unlock_irqrestore(&lockres->l_lock, flags);
2234
2235	wake_up(&lockres->l_event);
2236
2237	mlog_exit_void();
2238}
2239
2240typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2241
2242struct drop_lock_cb {
2243	ocfs2_pre_drop_cb_t	*drop_func;
2244	void			*drop_data;
2245};
2246
2247static int ocfs2_drop_lock(struct ocfs2_super *osb,
2248			   struct ocfs2_lock_res *lockres,
2249			   struct drop_lock_cb *dcb)
2250{
2251	enum dlm_status status;
2252	unsigned long flags;
2253	int lkm_flags = 0;
2254
2255	/* We didn't get anywhere near actually using this lockres. */
2256	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2257		goto out;
2258
2259	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2260		lkm_flags |= LKM_VALBLK;
2261
2262	spin_lock_irqsave(&lockres->l_lock, flags);
2263
2264	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2265			"lockres %s, flags 0x%lx\n",
2266			lockres->l_name, lockres->l_flags);
2267
2268	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2269		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2270		     "%u, unlock_action = %u\n",
2271		     lockres->l_name, lockres->l_flags, lockres->l_action,
2272		     lockres->l_unlock_action);
2273
2274		spin_unlock_irqrestore(&lockres->l_lock, flags);
2275
2276		/* XXX: Today we just wait on any busy
2277		 * locks... Perhaps we need to cancel converts in the
2278		 * future? */
2279		ocfs2_wait_on_busy_lock(lockres);
2280
2281		spin_lock_irqsave(&lockres->l_lock, flags);
2282	}
2283
2284	if (dcb)
2285		dcb->drop_func(lockres, dcb->drop_data);
2286
2287	if (lockres->l_flags & OCFS2_LOCK_BUSY)
2288		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2289		     lockres->l_name);
2290	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2291		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2292
2293	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2294		spin_unlock_irqrestore(&lockres->l_lock, flags);
2295		goto out;
2296	}
2297
2298	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2299
2300	/* make sure we never get here while waiting for an ast to
2301	 * fire. */
2302	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2303
2304	/* is this necessary? */
2305	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2306	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2307	spin_unlock_irqrestore(&lockres->l_lock, flags);
2308
2309	mlog(0, "lock %s\n", lockres->l_name);
2310
2311	status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2312			   ocfs2_unlock_ast, lockres);
2313	if (status != DLM_NORMAL) {
2314		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2315		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2316		dlm_print_one_lock(lockres->l_lksb.lockid);
2317		BUG();
2318	}
2319	mlog(0, "lock %s, successfull return from dlmunlock\n",
2320	     lockres->l_name);
2321
2322	ocfs2_wait_on_busy_lock(lockres);
2323out:
2324	mlog_exit(0);
2325	return 0;
2326}
2327
2328/* Mark the lockres as being dropped. It will no longer be
2329 * queued if blocking, but we still may have to wait on it
2330 * being dequeued from the vote thread before we can consider
2331 * it safe to drop.
2332 *
2333 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2334void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2335{
2336	int status;
2337	struct ocfs2_mask_waiter mw;
2338	unsigned long flags;
2339
2340	ocfs2_init_mask_waiter(&mw);
2341
2342	spin_lock_irqsave(&lockres->l_lock, flags);
2343	lockres->l_flags |= OCFS2_LOCK_FREEING;
2344	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2345		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2346		spin_unlock_irqrestore(&lockres->l_lock, flags);
2347
2348		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2349
2350		status = ocfs2_wait_for_mask(&mw);
2351		if (status)
2352			mlog_errno(status);
2353
2354		spin_lock_irqsave(&lockres->l_lock, flags);
2355	}
2356	spin_unlock_irqrestore(&lockres->l_lock, flags);
2357}
2358
2359void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2360			       struct ocfs2_lock_res *lockres)
2361{
2362	int ret;
2363
2364	ocfs2_mark_lockres_freeing(lockres);
2365	ret = ocfs2_drop_lock(osb, lockres, NULL);
2366	if (ret)
2367		mlog_errno(ret);
2368}
2369
2370static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2371{
2372	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2373	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2374}
2375
2376static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2377{
2378	struct inode *inode = data;
2379
2380	/* the metadata lock requires a bit more work as we have an
2381	 * LVB to worry about. */
2382	if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2383	    lockres->l_level == LKM_EXMODE &&
2384	    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2385		__ocfs2_stuff_meta_lvb(inode);
2386}
2387
2388int ocfs2_drop_inode_locks(struct inode *inode)
2389{
2390	int status, err;
2391	struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2392
2393	mlog_entry_void();
2394
2395	/* No need to call ocfs2_mark_lockres_freeing here -
2396	 * ocfs2_clear_inode has done it for us. */
2397
2398	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2399			      &OCFS2_I(inode)->ip_data_lockres,
2400			      NULL);
2401	if (err < 0)
2402		mlog_errno(err);
2403
2404	status = err;
2405
2406	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2407			      &OCFS2_I(inode)->ip_meta_lockres,
2408			      &meta_dcb);
2409	if (err < 0)
2410		mlog_errno(err);
2411	if (err < 0 && !status)
2412		status = err;
2413
2414	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2415			      &OCFS2_I(inode)->ip_rw_lockres,
2416			      NULL);
2417	if (err < 0)
2418		mlog_errno(err);
2419	if (err < 0 && !status)
2420		status = err;
2421
2422	mlog_exit(status);
2423	return status;
2424}
2425
2426static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2427				      int new_level)
2428{
2429	assert_spin_locked(&lockres->l_lock);
2430
2431	BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2432
2433	if (lockres->l_level <= new_level) {
2434		mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2435		     lockres->l_level, new_level);
2436		BUG();
2437	}
2438
2439	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2440	     lockres->l_name, new_level, lockres->l_blocking);
2441
2442	lockres->l_action = OCFS2_AST_DOWNCONVERT;
2443	lockres->l_requested = new_level;
2444	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2445}
2446
2447static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2448				  struct ocfs2_lock_res *lockres,
2449				  int new_level,
2450				  int lvb)
2451{
2452	int ret, dlm_flags = LKM_CONVERT;
2453	enum dlm_status status;
2454
2455	mlog_entry_void();
2456
2457	if (lvb)
2458		dlm_flags |= LKM_VALBLK;
2459
2460	status = dlmlock(osb->dlm,
2461			 new_level,
2462			 &lockres->l_lksb,
2463			 dlm_flags,
2464			 lockres->l_name,
2465			 OCFS2_LOCK_ID_MAX_LEN - 1,
2466			 ocfs2_locking_ast,
2467			 lockres,
2468			 ocfs2_blocking_ast);
2469	if (status != DLM_NORMAL) {
2470		ocfs2_log_dlm_error("dlmlock", status, lockres);
2471		ret = -EINVAL;
2472		ocfs2_recover_from_dlm_error(lockres, 1);
2473		goto bail;
2474	}
2475
2476	ret = 0;
2477bail:
2478	mlog_exit(ret);
2479	return ret;
2480}
2481
2482/* returns 1 when the caller should unlock and call dlmunlock */
2483static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2484				        struct ocfs2_lock_res *lockres)
2485{
2486	assert_spin_locked(&lockres->l_lock);
2487
2488	mlog_entry_void();
2489	mlog(0, "lock %s\n", lockres->l_name);
2490
2491	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2492		/* If we're already trying to cancel a lock conversion
2493		 * then just drop the spinlock and allow the caller to
2494		 * requeue this lock. */
2495
2496		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2497		return 0;
2498	}
2499
2500	/* were we in a convert when we got the bast fire? */
2501	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2502	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
2503	/* set things up for the unlockast to know to just
2504	 * clear out the ast_action and unset busy, etc. */
2505	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2506
2507	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2508			"lock %s, invalid flags: 0x%lx\n",
2509			lockres->l_name, lockres->l_flags);
2510
2511	return 1;
2512}
2513
2514static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2515				struct ocfs2_lock_res *lockres)
2516{
2517	int ret;
2518	enum dlm_status status;
2519
2520	mlog_entry_void();
2521	mlog(0, "lock %s\n", lockres->l_name);
2522
2523	ret = 0;
2524	status = dlmunlock(osb->dlm,
2525			   &lockres->l_lksb,
2526			   LKM_CANCEL,
2527			   ocfs2_unlock_ast,
2528			   lockres);
2529	if (status != DLM_NORMAL) {
2530		ocfs2_log_dlm_error("dlmunlock", status, lockres);
2531		ret = -EINVAL;
2532		ocfs2_recover_from_dlm_error(lockres, 0);
2533	}
2534
2535	mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2536
2537	mlog_exit(ret);
2538	return ret;
2539}
2540
2541static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode,
2542						  struct ocfs2_lock_res *lockres,
2543						  int new_level)
2544{
2545	int ret;
2546
2547	mlog_entry_void();
2548
2549	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2550
2551	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2552		ret = 0;
2553		mlog(0, "lockres %s currently being refreshed -- backing "
2554		     "off!\n", lockres->l_name);
2555	} else if (new_level == LKM_PRMODE)
2556		ret = !lockres->l_ex_holders &&
2557			ocfs2_inode_fully_checkpointed(inode);
2558	else /* Must be NLMODE we're converting to. */
2559		ret = !lockres->l_ro_holders && !lockres->l_ex_holders &&
2560			ocfs2_inode_fully_checkpointed(inode);
2561
2562	mlog_exit(ret);
2563	return ret;
2564}
2565
2566static int ocfs2_do_unblock_meta(struct inode *inode,
2567				 int *requeue)
2568{
2569	int new_level;
2570	int set_lvb = 0;
2571	int ret = 0;
2572	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
2573	unsigned long flags;
2574
2575	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2576
2577	mlog_entry_void();
2578
2579	spin_lock_irqsave(&lockres->l_lock, flags);
2580
2581	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2582
2583	mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level,
2584	     lockres->l_blocking);
2585
2586	BUG_ON(lockres->l_level != LKM_EXMODE &&
2587	       lockres->l_level != LKM_PRMODE);
2588
2589	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2590		*requeue = 1;
2591		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2592		spin_unlock_irqrestore(&lockres->l_lock, flags);
2593		if (ret) {
2594			ret = ocfs2_cancel_convert(osb, lockres);
2595			if (ret < 0)
2596				mlog_errno(ret);
2597		}
2598		goto leave;
2599	}
2600
2601	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2602
2603	mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n",
2604	     lockres->l_level, lockres->l_blocking, new_level);
2605
2606	if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) {
2607		if (lockres->l_level == LKM_EXMODE)
2608			set_lvb = 1;
2609
2610		/* If the lock hasn't been refreshed yet (rare), then
2611		 * our memory inode values are old and we skip
2612		 * stuffing the lvb. There's no need to actually clear
2613		 * out the lvb here as it's value is still valid. */
2614		if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2615			if (set_lvb)
2616				__ocfs2_stuff_meta_lvb(inode);
2617		} else
2618			mlog(0, "lockres %s: downconverting stale lock!\n",
2619			     lockres->l_name);
2620
2621		mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, "
2622		     "l_blocking=%d, new_level=%d\n",
2623		     lockres->l_level, lockres->l_blocking, new_level);
2624
2625		ocfs2_prepare_downconvert(lockres, new_level);
2626		spin_unlock_irqrestore(&lockres->l_lock, flags);
2627		ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2628		goto leave;
2629	}
2630	if (!ocfs2_inode_fully_checkpointed(inode))
2631		ocfs2_start_checkpoint(osb);
2632
2633	*requeue = 1;
2634	spin_unlock_irqrestore(&lockres->l_lock, flags);
2635	ret = 0;
2636leave:
2637	mlog_exit(ret);
2638	return ret;
2639}
2640
2641static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb,
2642				      struct ocfs2_lock_res *lockres,
2643				      struct ocfs2_unblock_ctl *ctl,
2644				      ocfs2_convert_worker_t *worker)
2645{
2646	unsigned long flags;
2647	int blocking;
2648	int new_level;
2649	int ret = 0;
2650	int set_lvb = 0;
2651
2652	mlog_entry_void();
2653
2654	spin_lock_irqsave(&lockres->l_lock, flags);
2655
2656	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2657
2658recheck:
2659	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2660		ctl->requeue = 1;
2661		ret = ocfs2_prepare_cancel_convert(osb, lockres);
2662		spin_unlock_irqrestore(&lockres->l_lock, flags);
2663		if (ret) {
2664			ret = ocfs2_cancel_convert(osb, lockres);
2665			if (ret < 0)
2666				mlog_errno(ret);
2667		}
2668		goto leave;
2669	}
2670
2671	/* if we're blocking an exclusive and we have *any* holders,
2672	 * then requeue. */
2673	if ((lockres->l_blocking == LKM_EXMODE)
2674	    && (lockres->l_ex_holders || lockres->l_ro_holders))
2675		goto leave_requeue;
2676
2677	/* If it's a PR we're blocking, then only
2678	 * requeue if we've got any EX holders */
2679	if (lockres->l_blocking == LKM_PRMODE &&
2680	    lockres->l_ex_holders)
2681		goto leave_requeue;
2682
2683	/*
2684	 * Can we get a lock in this state if the holder counts are
2685	 * zero? The meta data unblock code used to check this.
2686	 */
2687	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2688	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2689		goto leave_requeue;
2690
2691	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2692
2693	if (lockres->l_ops->check_downconvert
2694	    && !lockres->l_ops->check_downconvert(lockres, new_level))
2695		goto leave_requeue;
2696
2697	/* If we get here, then we know that there are no more
2698	 * incompatible holders (and anyone asking for an incompatible
2699	 * lock is blocked). We can now downconvert the lock */
2700	if (!worker)
2701		goto downconvert;
2702
2703	/* Some lockres types want to do a bit of work before
2704	 * downconverting a lock. Allow that here. The worker function
2705	 * may sleep, so we save off a copy of what we're blocking as
2706	 * it may change while we're not holding the spin lock. */
2707	blocking = lockres->l_blocking;
2708	spin_unlock_irqrestore(&lockres->l_lock, flags);
2709
2710	ctl->unblock_action = worker(lockres, blocking);
2711
2712	if (ctl->unblock_action == UNBLOCK_STOP_POST)
2713		goto leave;
2714
2715	spin_lock_irqsave(&lockres->l_lock, flags);
2716	if (blocking != lockres->l_blocking) {
2717		/* If this changed underneath us, then we can't drop
2718		 * it just yet. */
2719		goto recheck;
2720	}
2721
2722downconvert:
2723	ctl->requeue = 0;
2724
2725	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2726		if (lockres->l_level == LKM_EXMODE)
2727			set_lvb = 1;
2728
2729		/*
2730		 * We only set the lvb if the lock has been fully
2731		 * refreshed - otherwise we risk setting stale
2732		 * data. Otherwise, there's no need to actually clear
2733		 * out the lvb here as it's value is still valid.
2734		 */
2735		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2736			lockres->l_ops->set_lvb(lockres);
2737	}
2738
2739	ocfs2_prepare_downconvert(lockres, new_level);
2740	spin_unlock_irqrestore(&lockres->l_lock, flags);
2741	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2742leave:
2743	mlog_exit(ret);
2744	return ret;
2745
2746leave_requeue:
2747	spin_unlock_irqrestore(&lockres->l_lock, flags);
2748	ctl->requeue = 1;
2749
2750	mlog_exit(0);
2751	return 0;
2752}
2753
2754static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2755				     int blocking)
2756{
2757	struct inode *inode;
2758	struct address_space *mapping;
2759
2760       	inode = ocfs2_lock_res_inode(lockres);
2761	mapping = inode->i_mapping;
2762
2763	if (filemap_fdatawrite(mapping)) {
2764		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2765		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2766	}
2767	sync_mapping_buffers(mapping);
2768	if (blocking == LKM_EXMODE) {
2769		truncate_inode_pages(mapping, 0);
2770		unmap_mapping_range(mapping, 0, 0, 0);
2771	} else {
2772		/* We only need to wait on the I/O if we're not also
2773		 * truncating pages because truncate_inode_pages waits
2774		 * for us above. We don't truncate pages if we're
2775		 * blocking anything < EXMODE because we want to keep
2776		 * them around in that case. */
2777		filemap_fdatawait(mapping);
2778	}
2779
2780	return UNBLOCK_CONTINUE;
2781}
2782
2783int ocfs2_unblock_data(struct ocfs2_lock_res *lockres,
2784		       struct ocfs2_unblock_ctl *ctl)
2785{
2786	int status;
2787	struct inode *inode;
2788	struct ocfs2_super *osb;
2789
2790	mlog_entry_void();
2791
2792	inode = ocfs2_lock_res_inode(lockres);
2793	osb = OCFS2_SB(inode->i_sb);
2794
2795	mlog(0, "unblock inode %llu\n",
2796	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2797
2798	status = ocfs2_generic_unblock_lock(osb, lockres, ctl,
2799					    ocfs2_data_convert_worker);
2800	if (status < 0)
2801		mlog_errno(status);
2802
2803	mlog(0, "inode %llu, requeue = %d\n",
2804	     (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2805
2806	mlog_exit(status);
2807	return status;
2808}
2809
2810static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres,
2811				    struct ocfs2_unblock_ctl *ctl)
2812{
2813	int status;
2814	struct inode *inode;
2815
2816	mlog_entry_void();
2817
2818	mlog(0, "Unblock lockres %s\n", lockres->l_name);
2819
2820	inode  = ocfs2_lock_res_inode(lockres);
2821
2822	status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2823					    lockres, ctl, NULL);
2824	if (status < 0)
2825		mlog_errno(status);
2826
2827	mlog_exit(status);
2828	return status;
2829}
2830
2831static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
2832					int new_level)
2833{
2834	struct inode *inode = ocfs2_lock_res_inode(lockres);
2835	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
2836
2837	BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2838	BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
2839
2840	if (checkpointed)
2841		return 1;
2842
2843	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
2844	return 0;
2845}
2846
2847static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2848{
2849	struct inode *inode = ocfs2_lock_res_inode(lockres);
2850
2851	__ocfs2_stuff_meta_lvb(inode);
2852}
2853
2854static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres,
2855			      struct ocfs2_unblock_ctl *ctl)
2856{
2857	int status;
2858	struct inode *inode;
2859
2860	mlog_entry_void();
2861
2862       	inode = ocfs2_lock_res_inode(lockres);
2863
2864	mlog(0, "unblock inode %llu\n",
2865	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
2866
2867	status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb),
2868					    lockres, ctl, NULL);
2869	if (status < 0)
2870		mlog_errno(status);
2871
2872	mlog(0, "inode %llu, requeue = %d\n",
2873	     (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue);
2874
2875	mlog_exit(status);
2876	return status;
2877}
2878
2879/*
2880 * Does the final reference drop on our dentry lock. Right now this
2881 * happens in the vote thread, but we could choose to simplify the
2882 * dlmglue API and push these off to the ocfs2_wq in the future.
2883 */
2884static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2885				     struct ocfs2_lock_res *lockres)
2886{
2887	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2888	ocfs2_dentry_lock_put(osb, dl);
2889}
2890
2891/*
2892 * d_delete() matching dentries before the lock downconvert.
2893 *
2894 * At this point, any process waiting to destroy the
2895 * dentry_lock due to last ref count is stopped by the
2896 * OCFS2_LOCK_QUEUED flag.
2897 *
2898 * We have two potential problems
2899 *
2900 * 1) If we do the last reference drop on our dentry_lock (via dput)
2901 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
2902 *    the downconvert to finish. Instead we take an elevated
2903 *    reference and push the drop until after we've completed our
2904 *    unblock processing.
2905 *
2906 * 2) There might be another process with a final reference,
2907 *    waiting on us to finish processing. If this is the case, we
2908 *    detect it and exit out - there's no more dentries anyway.
2909 */
2910static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2911				       int blocking)
2912{
2913	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2914	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2915	struct dentry *dentry;
2916	unsigned long flags;
2917	int extra_ref = 0;
2918
2919	/*
2920	 * This node is blocking another node from getting a read
2921	 * lock. This happens when we've renamed within a
2922	 * directory. We've forced the other nodes to d_delete(), but
2923	 * we never actually dropped our lock because it's still
2924	 * valid. The downconvert code will retain a PR for this node,
2925	 * so there's no further work to do.
2926	 */
2927	if (blocking == LKM_PRMODE)
2928		return UNBLOCK_CONTINUE;
2929
2930	/*
2931	 * Mark this inode as potentially orphaned. The code in
2932	 * ocfs2_delete_inode() will figure out whether it actually
2933	 * needs to be freed or not.
2934	 */
2935	spin_lock(&oi->ip_lock);
2936	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2937	spin_unlock(&oi->ip_lock);
2938
2939	/*
2940	 * Yuck. We need to make sure however that the check of
2941	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
2942	 * respect to a reference decrement or the setting of that
2943	 * flag.
2944	 */
2945	spin_lock_irqsave(&lockres->l_lock, flags);
2946	spin_lock(&dentry_attach_lock);
2947	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2948	    && dl->dl_count) {
2949		dl->dl_count++;
2950		extra_ref = 1;
2951	}
2952	spin_unlock(&dentry_attach_lock);
2953	spin_unlock_irqrestore(&lockres->l_lock, flags);
2954
2955	mlog(0, "extra_ref = %d\n", extra_ref);
2956
2957	/*
2958	 * We have a process waiting on us in ocfs2_dentry_iput(),
2959	 * which means we can't have any more outstanding
2960	 * aliases. There's no need to do any more work.
2961	 */
2962	if (!extra_ref)
2963		return UNBLOCK_CONTINUE;
2964
2965	spin_lock(&dentry_attach_lock);
2966	while (1) {
2967		dentry = ocfs2_find_local_alias(dl->dl_inode,
2968						dl->dl_parent_blkno, 1);
2969		if (!dentry)
2970			break;
2971		spin_unlock(&dentry_attach_lock);
2972
2973		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2974		     dentry->d_name.name);
2975
2976		/*
2977		 * The following dcache calls may do an
2978		 * iput(). Normally we don't want that from the
2979		 * downconverting thread, but in this case it's ok
2980		 * because the requesting node already has an
2981		 * exclusive lock on the inode, so it can't be queued
2982		 * for a downconvert.
2983		 */
2984		d_delete(dentry);
2985		dput(dentry);
2986
2987		spin_lock(&dentry_attach_lock);
2988	}
2989	spin_unlock(&dentry_attach_lock);
2990
2991	/*
2992	 * If we are the last holder of this dentry lock, there is no
2993	 * reason to downconvert so skip straight to the unlock.
2994	 */
2995	if (dl->dl_count == 1)
2996		return UNBLOCK_STOP_POST;
2997
2998	return UNBLOCK_CONTINUE_POST;
2999}
3000
3001static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres,
3002				     struct ocfs2_unblock_ctl *ctl)
3003{
3004	int ret;
3005	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3006	struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb);
3007
3008	mlog(0, "unblock dentry lock: %llu\n",
3009	     (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno);
3010
3011	ret = ocfs2_generic_unblock_lock(osb,
3012					 lockres,
3013					 ctl,
3014					 ocfs2_dentry_convert_worker);
3015	if (ret < 0)
3016		mlog_errno(ret);
3017
3018	mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action);
3019
3020	return ret;
3021}
3022
3023/* Generic unblock function for any lockres whose private data is an
3024 * ocfs2_super pointer. */
3025static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres,
3026				  struct ocfs2_unblock_ctl *ctl)
3027{
3028	int status;
3029	struct ocfs2_super *osb;
3030
3031	mlog_entry_void();
3032
3033	mlog(0, "Unblock lockres %s\n", lockres->l_name);
3034
3035	osb = ocfs2_get_lockres_osb(lockres);
3036
3037	status = ocfs2_generic_unblock_lock(osb,
3038					    lockres,
3039					    ctl,
3040					    NULL);
3041	if (status < 0)
3042		mlog_errno(status);
3043
3044	mlog_exit(status);
3045	return status;
3046}
3047
3048void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3049				struct ocfs2_lock_res *lockres)
3050{
3051	int status;
3052	struct ocfs2_unblock_ctl ctl = {0, 0,};
3053	unsigned long flags;
3054
3055	/* Our reference to the lockres in this function can be
3056	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3057	 * flag. */
3058
3059	mlog_entry_void();
3060
3061	BUG_ON(!lockres);
3062	BUG_ON(!lockres->l_ops);
3063	BUG_ON(!lockres->l_ops->unblock);
3064
3065	mlog(0, "lockres %s blocked.\n", lockres->l_name);
3066
3067	/* Detect whether a lock has been marked as going away while
3068	 * the vote thread was processing other things. A lock can
3069	 * still be marked with OCFS2_LOCK_FREEING after this check,
3070	 * but short circuiting here will still save us some
3071	 * performance. */
3072	spin_lock_irqsave(&lockres->l_lock, flags);
3073	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3074		goto unqueue;
3075	spin_unlock_irqrestore(&lockres->l_lock, flags);
3076
3077	status = lockres->l_ops->unblock(lockres, &ctl);
3078	if (status < 0)
3079		mlog_errno(status);
3080
3081	spin_lock_irqsave(&lockres->l_lock, flags);
3082unqueue:
3083	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3084		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3085	} else
3086		ocfs2_schedule_blocked_lock(osb, lockres);
3087
3088	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3089	     ctl.requeue ? "yes" : "no");
3090	spin_unlock_irqrestore(&lockres->l_lock, flags);
3091
3092	if (ctl.unblock_action != UNBLOCK_CONTINUE
3093	    && lockres->l_ops->post_unlock)
3094		lockres->l_ops->post_unlock(osb, lockres);
3095
3096	mlog_exit_void();
3097}
3098
3099static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3100					struct ocfs2_lock_res *lockres)
3101{
3102	mlog_entry_void();
3103
3104	assert_spin_locked(&lockres->l_lock);
3105
3106	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3107		/* Do not schedule a lock for downconvert when it's on
3108		 * the way to destruction - any nodes wanting access
3109		 * to the resource will get it soon. */
3110		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3111		     lockres->l_name, lockres->l_flags);
3112		return;
3113	}
3114
3115	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3116
3117	spin_lock(&osb->vote_task_lock);
3118	if (list_empty(&lockres->l_blocked_list)) {
3119		list_add_tail(&lockres->l_blocked_list,
3120			      &osb->blocked_lock_list);
3121		osb->blocked_lock_count++;
3122	}
3123	spin_unlock(&osb->vote_task_lock);
3124
3125	mlog_exit_void();
3126}
3127
3128/* This aids in debugging situations where a bad LVB might be involved. */
3129void ocfs2_dump_meta_lvb_info(u64 level,
3130			      const char *function,
3131			      unsigned int line,
3132			      struct ocfs2_lock_res *lockres)
3133{
3134	struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
3135
3136	mlog(level, "LVB information for %s (called from %s:%u):\n",
3137	     lockres->l_name, function, line);
3138	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
3139	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
3140	     be32_to_cpu(lvb->lvb_igeneration));
3141	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
3142	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
3143	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
3144	     be16_to_cpu(lvb->lvb_imode));
3145	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
3146	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
3147	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
3148	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
3149	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
3150	     be32_to_cpu(lvb->lvb_iattr));
3151}
3152