dlmglue.c revision d92bc5127b27f315ef0ef2c1e1829fd6a5cba54a
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/kthread.h>
31#include <linux/pagemap.h>
32#include <linux/debugfs.h>
33#include <linux/seq_file.h>
34#include <linux/time.h>
35#include <linux/quotaops.h>
36
37#define MLOG_MASK_PREFIX ML_DLM_GLUE
38#include <cluster/masklog.h>
39
40#include "ocfs2.h"
41#include "ocfs2_lockingver.h"
42
43#include "alloc.h"
44#include "dcache.h"
45#include "dlmglue.h"
46#include "extent_map.h"
47#include "file.h"
48#include "heartbeat.h"
49#include "inode.h"
50#include "journal.h"
51#include "stackglue.h"
52#include "slot_map.h"
53#include "super.h"
54#include "uptodate.h"
55#include "quota.h"
56#include "refcounttree.h"
57
58#include "buffer_head_io.h"
59
60struct ocfs2_mask_waiter {
61	struct list_head	mw_item;
62	int			mw_status;
63	struct completion	mw_complete;
64	unsigned long		mw_mask;
65	unsigned long		mw_goal;
66#ifdef CONFIG_OCFS2_FS_STATS
67	unsigned long long 	mw_lock_start;
68#endif
69};
70
71static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
72static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
73static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
74static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
75
76/*
77 * Return value from ->downconvert_worker functions.
78 *
79 * These control the precise actions of ocfs2_unblock_lock()
80 * and ocfs2_process_blocked_lock()
81 *
82 */
83enum ocfs2_unblock_action {
84	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
85	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
86				      * ->post_unlock callback */
87	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
88				      * ->post_unlock() callback. */
89};
90
91struct ocfs2_unblock_ctl {
92	int requeue;
93	enum ocfs2_unblock_action unblock_action;
94};
95
96/* Lockdep class keys */
97struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
98
99static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
100					int new_level);
101static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
102
103static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
104				     int blocking);
105
106static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
107				       int blocking);
108
109static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
110				     struct ocfs2_lock_res *lockres);
111
112static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
113
114static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
115					    int new_level);
116static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
117					 int blocking);
118
119#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
120
121/* This aids in debugging situations where a bad LVB might be involved. */
122static void ocfs2_dump_meta_lvb_info(u64 level,
123				     const char *function,
124				     unsigned int line,
125				     struct ocfs2_lock_res *lockres)
126{
127	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
128
129	mlog(level, "LVB information for %s (called from %s:%u):\n",
130	     lockres->l_name, function, line);
131	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
132	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
133	     be32_to_cpu(lvb->lvb_igeneration));
134	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
135	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
136	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
137	     be16_to_cpu(lvb->lvb_imode));
138	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
139	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
140	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
141	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
142	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
143	     be32_to_cpu(lvb->lvb_iattr));
144}
145
146
147/*
148 * OCFS2 Lock Resource Operations
149 *
150 * These fine tune the behavior of the generic dlmglue locking infrastructure.
151 *
152 * The most basic of lock types can point ->l_priv to their respective
153 * struct ocfs2_super and allow the default actions to manage things.
154 *
155 * Right now, each lock type also needs to implement an init function,
156 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
157 * should be called when the lock is no longer needed (i.e., object
158 * destruction time).
159 */
160struct ocfs2_lock_res_ops {
161	/*
162	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
163	 * this callback if ->l_priv is not an ocfs2_super pointer
164	 */
165	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
166
167	/*
168	 * Optionally called in the downconvert thread after a
169	 * successful downconvert. The lockres will not be referenced
170	 * after this callback is called, so it is safe to free
171	 * memory, etc.
172	 *
173	 * The exact semantics of when this is called are controlled
174	 * by ->downconvert_worker()
175	 */
176	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
177
178	/*
179	 * Allow a lock type to add checks to determine whether it is
180	 * safe to downconvert a lock. Return 0 to re-queue the
181	 * downconvert at a later time, nonzero to continue.
182	 *
183	 * For most locks, the default checks that there are no
184	 * incompatible holders are sufficient.
185	 *
186	 * Called with the lockres spinlock held.
187	 */
188	int (*check_downconvert)(struct ocfs2_lock_res *, int);
189
190	/*
191	 * Allows a lock type to populate the lock value block. This
192	 * is called on downconvert, and when we drop a lock.
193	 *
194	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
195	 * in the flags field.
196	 *
197	 * Called with the lockres spinlock held.
198	 */
199	void (*set_lvb)(struct ocfs2_lock_res *);
200
201	/*
202	 * Called from the downconvert thread when it is determined
203	 * that a lock will be downconverted. This is called without
204	 * any locks held so the function can do work that might
205	 * schedule (syncing out data, etc).
206	 *
207	 * This should return any one of the ocfs2_unblock_action
208	 * values, depending on what it wants the thread to do.
209	 */
210	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
211
212	/*
213	 * LOCK_TYPE_* flags which describe the specific requirements
214	 * of a lock type. Descriptions of each individual flag follow.
215	 */
216	int flags;
217};
218
219/*
220 * Some locks want to "refresh" potentially stale data when a
221 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
222 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
223 * individual lockres l_flags member from the ast function. It is
224 * expected that the locking wrapper will clear the
225 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
226 */
227#define LOCK_TYPE_REQUIRES_REFRESH 0x1
228
229/*
230 * Indicate that a lock type makes use of the lock value block. The
231 * ->set_lvb lock type callback must be defined.
232 */
233#define LOCK_TYPE_USES_LVB		0x2
234
235static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
236	.get_osb	= ocfs2_get_inode_osb,
237	.flags		= 0,
238};
239
240static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
241	.get_osb	= ocfs2_get_inode_osb,
242	.check_downconvert = ocfs2_check_meta_downconvert,
243	.set_lvb	= ocfs2_set_meta_lvb,
244	.downconvert_worker = ocfs2_data_convert_worker,
245	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
246};
247
248static struct ocfs2_lock_res_ops ocfs2_super_lops = {
249	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
250};
251
252static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
253	.flags		= 0,
254};
255
256static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
257	.flags		= 0,
258};
259
260static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
261	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
262};
263
264static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
265	.get_osb	= ocfs2_get_dentry_osb,
266	.post_unlock	= ocfs2_dentry_post_unlock,
267	.downconvert_worker = ocfs2_dentry_convert_worker,
268	.flags		= 0,
269};
270
271static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
272	.get_osb	= ocfs2_get_inode_osb,
273	.flags		= 0,
274};
275
276static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
277	.get_osb	= ocfs2_get_file_osb,
278	.flags		= 0,
279};
280
281static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
282	.set_lvb	= ocfs2_set_qinfo_lvb,
283	.get_osb	= ocfs2_get_qinfo_osb,
284	.flags		= LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
285};
286
287static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
288	.check_downconvert = ocfs2_check_refcount_downconvert,
289	.downconvert_worker = ocfs2_refcount_convert_worker,
290	.flags		= 0,
291};
292
293static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
294{
295	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
296		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
297		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
298}
299
300static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
301{
302	BUG_ON(!ocfs2_is_inode_lock(lockres));
303
304	return (struct inode *) lockres->l_priv;
305}
306
307static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
308{
309	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
310
311	return (struct ocfs2_dentry_lock *)lockres->l_priv;
312}
313
314static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
315{
316	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
317
318	return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
319}
320
321static inline struct ocfs2_refcount_tree *
322ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
323{
324	return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
325}
326
327static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
328{
329	if (lockres->l_ops->get_osb)
330		return lockres->l_ops->get_osb(lockres);
331
332	return (struct ocfs2_super *)lockres->l_priv;
333}
334
335static int ocfs2_lock_create(struct ocfs2_super *osb,
336			     struct ocfs2_lock_res *lockres,
337			     int level,
338			     u32 dlm_flags);
339static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
340						     int wanted);
341static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
342				   struct ocfs2_lock_res *lockres,
343				   int level, unsigned long caller_ip);
344static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
345					struct ocfs2_lock_res *lockres,
346					int level)
347{
348	__ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
349}
350
351static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
352static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
353static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
354static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
355static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
356					struct ocfs2_lock_res *lockres);
357static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
358						int convert);
359#define ocfs2_log_dlm_error(_func, _err, _lockres) do {					\
360	if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)				\
361		mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",	\
362		     _err, _func, _lockres->l_name);					\
363	else										\
364		mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",	\
365		     _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,	\
366		     (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));		\
367} while (0)
368static int ocfs2_downconvert_thread(void *arg);
369static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
370					struct ocfs2_lock_res *lockres);
371static int ocfs2_inode_lock_update(struct inode *inode,
372				  struct buffer_head **bh);
373static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
374static inline int ocfs2_highest_compat_lock_level(int level);
375static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
376					      int new_level);
377static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
378				  struct ocfs2_lock_res *lockres,
379				  int new_level,
380				  int lvb,
381				  unsigned int generation);
382static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
383				        struct ocfs2_lock_res *lockres);
384static int ocfs2_cancel_convert(struct ocfs2_super *osb,
385				struct ocfs2_lock_res *lockres);
386
387
388static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
389				  u64 blkno,
390				  u32 generation,
391				  char *name)
392{
393	int len;
394
395	mlog_entry_void();
396
397	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
398
399	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
400		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
401		       (long long)blkno, generation);
402
403	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
404
405	mlog(0, "built lock resource with name: %s\n", name);
406
407	mlog_exit_void();
408}
409
410static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
411
412static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
413				       struct ocfs2_dlm_debug *dlm_debug)
414{
415	mlog(0, "Add tracking for lockres %s\n", res->l_name);
416
417	spin_lock(&ocfs2_dlm_tracking_lock);
418	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
419	spin_unlock(&ocfs2_dlm_tracking_lock);
420}
421
422static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
423{
424	spin_lock(&ocfs2_dlm_tracking_lock);
425	if (!list_empty(&res->l_debug_list))
426		list_del_init(&res->l_debug_list);
427	spin_unlock(&ocfs2_dlm_tracking_lock);
428}
429
430#ifdef CONFIG_OCFS2_FS_STATS
431static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
432{
433	res->l_lock_num_prmode = 0;
434	res->l_lock_num_prmode_failed = 0;
435	res->l_lock_total_prmode = 0;
436	res->l_lock_max_prmode = 0;
437	res->l_lock_num_exmode = 0;
438	res->l_lock_num_exmode_failed = 0;
439	res->l_lock_total_exmode = 0;
440	res->l_lock_max_exmode = 0;
441	res->l_lock_refresh = 0;
442}
443
444static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
445				    struct ocfs2_mask_waiter *mw, int ret)
446{
447	unsigned long long *num, *sum;
448	unsigned int *max, *failed;
449	struct timespec ts = current_kernel_time();
450	unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start;
451
452	if (level == LKM_PRMODE) {
453		num = &res->l_lock_num_prmode;
454		sum = &res->l_lock_total_prmode;
455		max = &res->l_lock_max_prmode;
456		failed = &res->l_lock_num_prmode_failed;
457	} else if (level == LKM_EXMODE) {
458		num = &res->l_lock_num_exmode;
459		sum = &res->l_lock_total_exmode;
460		max = &res->l_lock_max_exmode;
461		failed = &res->l_lock_num_exmode_failed;
462	} else
463		return;
464
465	(*num)++;
466	(*sum) += time;
467	if (time > *max)
468		*max = time;
469	if (ret)
470		(*failed)++;
471}
472
473static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
474{
475	lockres->l_lock_refresh++;
476}
477
478static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
479{
480	struct timespec ts = current_kernel_time();
481	mw->mw_lock_start = timespec_to_ns(&ts);
482}
483#else
484static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
485{
486}
487static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
488			   int level, struct ocfs2_mask_waiter *mw, int ret)
489{
490}
491static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
492{
493}
494static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
495{
496}
497#endif
498
499static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
500				       struct ocfs2_lock_res *res,
501				       enum ocfs2_lock_type type,
502				       struct ocfs2_lock_res_ops *ops,
503				       void *priv)
504{
505	res->l_type          = type;
506	res->l_ops           = ops;
507	res->l_priv          = priv;
508
509	res->l_level         = DLM_LOCK_IV;
510	res->l_requested     = DLM_LOCK_IV;
511	res->l_blocking      = DLM_LOCK_IV;
512	res->l_action        = OCFS2_AST_INVALID;
513	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
514
515	res->l_flags         = OCFS2_LOCK_INITIALIZED;
516
517	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
518
519	ocfs2_init_lock_stats(res);
520#ifdef CONFIG_DEBUG_LOCK_ALLOC
521	if (type != OCFS2_LOCK_TYPE_OPEN)
522		lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
523				 &lockdep_keys[type], 0);
524	else
525		res->l_lockdep_map.key = NULL;
526#endif
527}
528
529void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
530{
531	/* This also clears out the lock status block */
532	memset(res, 0, sizeof(struct ocfs2_lock_res));
533	spin_lock_init(&res->l_lock);
534	init_waitqueue_head(&res->l_event);
535	INIT_LIST_HEAD(&res->l_blocked_list);
536	INIT_LIST_HEAD(&res->l_mask_waiters);
537}
538
539void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
540			       enum ocfs2_lock_type type,
541			       unsigned int generation,
542			       struct inode *inode)
543{
544	struct ocfs2_lock_res_ops *ops;
545
546	switch(type) {
547		case OCFS2_LOCK_TYPE_RW:
548			ops = &ocfs2_inode_rw_lops;
549			break;
550		case OCFS2_LOCK_TYPE_META:
551			ops = &ocfs2_inode_inode_lops;
552			break;
553		case OCFS2_LOCK_TYPE_OPEN:
554			ops = &ocfs2_inode_open_lops;
555			break;
556		default:
557			mlog_bug_on_msg(1, "type: %d\n", type);
558			ops = NULL; /* thanks, gcc */
559			break;
560	};
561
562	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
563			      generation, res->l_name);
564	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
565}
566
567static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
568{
569	struct inode *inode = ocfs2_lock_res_inode(lockres);
570
571	return OCFS2_SB(inode->i_sb);
572}
573
574static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
575{
576	struct ocfs2_mem_dqinfo *info = lockres->l_priv;
577
578	return OCFS2_SB(info->dqi_gi.dqi_sb);
579}
580
581static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
582{
583	struct ocfs2_file_private *fp = lockres->l_priv;
584
585	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
586}
587
588static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
589{
590	__be64 inode_blkno_be;
591
592	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
593	       sizeof(__be64));
594
595	return be64_to_cpu(inode_blkno_be);
596}
597
598static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
599{
600	struct ocfs2_dentry_lock *dl = lockres->l_priv;
601
602	return OCFS2_SB(dl->dl_inode->i_sb);
603}
604
605void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
606				u64 parent, struct inode *inode)
607{
608	int len;
609	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
610	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
611	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
612
613	ocfs2_lock_res_init_once(lockres);
614
615	/*
616	 * Unfortunately, the standard lock naming scheme won't work
617	 * here because we have two 16 byte values to use. Instead,
618	 * we'll stuff the inode number as a binary value. We still
619	 * want error prints to show something without garbling the
620	 * display, so drop a null byte in there before the inode
621	 * number. A future version of OCFS2 will likely use all
622	 * binary lock names. The stringified names have been a
623	 * tremendous aid in debugging, but now that the debugfs
624	 * interface exists, we can mangle things there if need be.
625	 *
626	 * NOTE: We also drop the standard "pad" value (the total lock
627	 * name size stays the same though - the last part is all
628	 * zeros due to the memset in ocfs2_lock_res_init_once()
629	 */
630	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
631		       "%c%016llx",
632		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
633		       (long long)parent);
634
635	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
636
637	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
638	       sizeof(__be64));
639
640	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
641				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
642				   dl);
643}
644
645static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
646				      struct ocfs2_super *osb)
647{
648	/* Superblock lockres doesn't come from a slab so we call init
649	 * once on it manually.  */
650	ocfs2_lock_res_init_once(res);
651	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
652			      0, res->l_name);
653	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
654				   &ocfs2_super_lops, osb);
655}
656
657static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
658				       struct ocfs2_super *osb)
659{
660	/* Rename lockres doesn't come from a slab so we call init
661	 * once on it manually.  */
662	ocfs2_lock_res_init_once(res);
663	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
664	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
665				   &ocfs2_rename_lops, osb);
666}
667
668static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
669					 struct ocfs2_super *osb)
670{
671	/* nfs_sync lockres doesn't come from a slab so we call init
672	 * once on it manually.  */
673	ocfs2_lock_res_init_once(res);
674	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
675	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
676				   &ocfs2_nfs_sync_lops, osb);
677}
678
679static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
680					    struct ocfs2_super *osb)
681{
682	ocfs2_lock_res_init_once(res);
683	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
684	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
685				   &ocfs2_orphan_scan_lops, osb);
686}
687
688void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
689			      struct ocfs2_file_private *fp)
690{
691	struct inode *inode = fp->fp_file->f_mapping->host;
692	struct ocfs2_inode_info *oi = OCFS2_I(inode);
693
694	ocfs2_lock_res_init_once(lockres);
695	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
696			      inode->i_generation, lockres->l_name);
697	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
698				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
699				   fp);
700	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
701}
702
703void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
704			       struct ocfs2_mem_dqinfo *info)
705{
706	ocfs2_lock_res_init_once(lockres);
707	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
708			      0, lockres->l_name);
709	ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
710				   OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
711				   info);
712}
713
714void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
715				  struct ocfs2_super *osb, u64 ref_blkno,
716				  unsigned int generation)
717{
718	ocfs2_lock_res_init_once(lockres);
719	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
720			      generation, lockres->l_name);
721	ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
722				   &ocfs2_refcount_block_lops, osb);
723}
724
725void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
726{
727	mlog_entry_void();
728
729	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
730		return;
731
732	ocfs2_remove_lockres_tracking(res);
733
734	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
735			"Lockres %s is on the blocked list\n",
736			res->l_name);
737	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
738			"Lockres %s has mask waiters pending\n",
739			res->l_name);
740	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
741			"Lockres %s is locked\n",
742			res->l_name);
743	mlog_bug_on_msg(res->l_ro_holders,
744			"Lockres %s has %u ro holders\n",
745			res->l_name, res->l_ro_holders);
746	mlog_bug_on_msg(res->l_ex_holders,
747			"Lockres %s has %u ex holders\n",
748			res->l_name, res->l_ex_holders);
749
750	/* Need to clear out the lock status block for the dlm */
751	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
752
753	res->l_flags = 0UL;
754	mlog_exit_void();
755}
756
757static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
758				     int level)
759{
760	mlog_entry_void();
761
762	BUG_ON(!lockres);
763
764	switch(level) {
765	case DLM_LOCK_EX:
766		lockres->l_ex_holders++;
767		break;
768	case DLM_LOCK_PR:
769		lockres->l_ro_holders++;
770		break;
771	default:
772		BUG();
773	}
774
775	mlog_exit_void();
776}
777
778static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
779				     int level)
780{
781	mlog_entry_void();
782
783	BUG_ON(!lockres);
784
785	switch(level) {
786	case DLM_LOCK_EX:
787		BUG_ON(!lockres->l_ex_holders);
788		lockres->l_ex_holders--;
789		break;
790	case DLM_LOCK_PR:
791		BUG_ON(!lockres->l_ro_holders);
792		lockres->l_ro_holders--;
793		break;
794	default:
795		BUG();
796	}
797	mlog_exit_void();
798}
799
800/* WARNING: This function lives in a world where the only three lock
801 * levels are EX, PR, and NL. It *will* have to be adjusted when more
802 * lock types are added. */
803static inline int ocfs2_highest_compat_lock_level(int level)
804{
805	int new_level = DLM_LOCK_EX;
806
807	if (level == DLM_LOCK_EX)
808		new_level = DLM_LOCK_NL;
809	else if (level == DLM_LOCK_PR)
810		new_level = DLM_LOCK_PR;
811	return new_level;
812}
813
814static void lockres_set_flags(struct ocfs2_lock_res *lockres,
815			      unsigned long newflags)
816{
817	struct ocfs2_mask_waiter *mw, *tmp;
818
819 	assert_spin_locked(&lockres->l_lock);
820
821	lockres->l_flags = newflags;
822
823	list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
824		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
825			continue;
826
827		list_del_init(&mw->mw_item);
828		mw->mw_status = 0;
829		complete(&mw->mw_complete);
830	}
831}
832static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
833{
834	lockres_set_flags(lockres, lockres->l_flags | or);
835}
836static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
837				unsigned long clear)
838{
839	lockres_set_flags(lockres, lockres->l_flags & ~clear);
840}
841
842static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
843{
844	mlog_entry_void();
845
846	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
847	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
848	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
849	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
850
851	lockres->l_level = lockres->l_requested;
852	if (lockres->l_level <=
853	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
854		lockres->l_blocking = DLM_LOCK_NL;
855		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
856	}
857	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
858
859	mlog_exit_void();
860}
861
862static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
863{
864	mlog_entry_void();
865
866	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
867	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
868
869	/* Convert from RO to EX doesn't really need anything as our
870	 * information is already up to data. Convert from NL to
871	 * *anything* however should mark ourselves as needing an
872	 * update */
873	if (lockres->l_level == DLM_LOCK_NL &&
874	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
875		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
876
877	lockres->l_level = lockres->l_requested;
878	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
879
880	mlog_exit_void();
881}
882
883static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
884{
885	mlog_entry_void();
886
887	BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
888	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
889
890	if (lockres->l_requested > DLM_LOCK_NL &&
891	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
892	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
893		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
894
895	lockres->l_level = lockres->l_requested;
896	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
897	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
898
899	mlog_exit_void();
900}
901
902static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
903				     int level)
904{
905	int needs_downconvert = 0;
906	mlog_entry_void();
907
908	assert_spin_locked(&lockres->l_lock);
909
910	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
911
912	if (level > lockres->l_blocking) {
913		/* only schedule a downconvert if we haven't already scheduled
914		 * one that goes low enough to satisfy the level we're
915		 * blocking.  this also catches the case where we get
916		 * duplicate BASTs */
917		if (ocfs2_highest_compat_lock_level(level) <
918		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
919			needs_downconvert = 1;
920
921		lockres->l_blocking = level;
922	}
923
924	mlog_exit(needs_downconvert);
925	return needs_downconvert;
926}
927
928/*
929 * OCFS2_LOCK_PENDING and l_pending_gen.
930 *
931 * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
932 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
933 * for more details on the race.
934 *
935 * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
936 * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
937 * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
938 * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
939 * the caller is going to try to clear PENDING again.  If nothing else is
940 * happening, __lockres_clear_pending() sees PENDING is unset and does
941 * nothing.
942 *
943 * But what if another path (eg downconvert thread) has just started a
944 * new locking action?  The other path has re-set PENDING.  Our path
945 * cannot clear PENDING, because that will re-open the original race
946 * window.
947 *
948 * [Example]
949 *
950 * ocfs2_meta_lock()
951 *  ocfs2_cluster_lock()
952 *   set BUSY
953 *   set PENDING
954 *   drop l_lock
955 *   ocfs2_dlm_lock()
956 *    ocfs2_locking_ast()		ocfs2_downconvert_thread()
957 *     clear PENDING			 ocfs2_unblock_lock()
958 *					  take_l_lock
959 *					  !BUSY
960 *					  ocfs2_prepare_downconvert()
961 *					   set BUSY
962 *					   set PENDING
963 *					  drop l_lock
964 *   take l_lock
965 *   clear PENDING
966 *   drop l_lock
967 *			<window>
968 *					  ocfs2_dlm_lock()
969 *
970 * So as you can see, we now have a window where l_lock is not held,
971 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
972 *
973 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
974 * set by ocfs2_prepare_downconvert().  That wasn't nice.
975 *
976 * To solve this we introduce l_pending_gen.  A call to
977 * lockres_clear_pending() will only do so when it is passed a generation
978 * number that matches the lockres.  lockres_set_pending() will return the
979 * current generation number.  When ocfs2_cluster_lock() goes to clear
980 * PENDING, it passes the generation it got from set_pending().  In our
981 * example above, the generation numbers will *not* match.  Thus,
982 * ocfs2_cluster_lock() will not clear the PENDING set by
983 * ocfs2_prepare_downconvert().
984 */
985
986/* Unlocked version for ocfs2_locking_ast() */
987static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
988				    unsigned int generation,
989				    struct ocfs2_super *osb)
990{
991	assert_spin_locked(&lockres->l_lock);
992
993	/*
994	 * The ast and locking functions can race us here.  The winner
995	 * will clear pending, the loser will not.
996	 */
997	if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
998	    (lockres->l_pending_gen != generation))
999		return;
1000
1001	lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
1002	lockres->l_pending_gen++;
1003
1004	/*
1005	 * The downconvert thread may have skipped us because we
1006	 * were PENDING.  Wake it up.
1007	 */
1008	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1009		ocfs2_wake_downconvert_thread(osb);
1010}
1011
1012/* Locked version for callers of ocfs2_dlm_lock() */
1013static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1014				  unsigned int generation,
1015				  struct ocfs2_super *osb)
1016{
1017	unsigned long flags;
1018
1019	spin_lock_irqsave(&lockres->l_lock, flags);
1020	__lockres_clear_pending(lockres, generation, osb);
1021	spin_unlock_irqrestore(&lockres->l_lock, flags);
1022}
1023
1024static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1025{
1026	assert_spin_locked(&lockres->l_lock);
1027	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1028
1029	lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1030
1031	return lockres->l_pending_gen;
1032}
1033
1034
1035static void ocfs2_blocking_ast(void *opaque, int level)
1036{
1037	struct ocfs2_lock_res *lockres = opaque;
1038	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1039	int needs_downconvert;
1040	unsigned long flags;
1041
1042	BUG_ON(level <= DLM_LOCK_NL);
1043
1044	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
1045	     lockres->l_name, level, lockres->l_level,
1046	     ocfs2_lock_type_string(lockres->l_type));
1047
1048	/*
1049	 * We can skip the bast for locks which don't enable caching -
1050	 * they'll be dropped at the earliest possible time anyway.
1051	 */
1052	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1053		return;
1054
1055	spin_lock_irqsave(&lockres->l_lock, flags);
1056	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1057	if (needs_downconvert)
1058		ocfs2_schedule_blocked_lock(osb, lockres);
1059	spin_unlock_irqrestore(&lockres->l_lock, flags);
1060
1061	wake_up(&lockres->l_event);
1062
1063	ocfs2_wake_downconvert_thread(osb);
1064}
1065
1066static void ocfs2_locking_ast(void *opaque)
1067{
1068	struct ocfs2_lock_res *lockres = opaque;
1069	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1070	unsigned long flags;
1071	int status;
1072
1073	spin_lock_irqsave(&lockres->l_lock, flags);
1074
1075	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1076
1077	if (status == -EAGAIN) {
1078		lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1079		goto out;
1080	}
1081
1082	if (status) {
1083		mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1084		     lockres->l_name, status);
1085		spin_unlock_irqrestore(&lockres->l_lock, flags);
1086		return;
1087	}
1088
1089	switch(lockres->l_action) {
1090	case OCFS2_AST_ATTACH:
1091		ocfs2_generic_handle_attach_action(lockres);
1092		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1093		break;
1094	case OCFS2_AST_CONVERT:
1095		ocfs2_generic_handle_convert_action(lockres);
1096		break;
1097	case OCFS2_AST_DOWNCONVERT:
1098		ocfs2_generic_handle_downconvert_action(lockres);
1099		break;
1100	default:
1101		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
1102		     "lockres flags = 0x%lx, unlock action: %u\n",
1103		     lockres->l_name, lockres->l_action, lockres->l_flags,
1104		     lockres->l_unlock_action);
1105		BUG();
1106	}
1107out:
1108	/* set it to something invalid so if we get called again we
1109	 * can catch it. */
1110	lockres->l_action = OCFS2_AST_INVALID;
1111
1112	/* Did we try to cancel this lock?  Clear that state */
1113	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1114		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1115
1116	/*
1117	 * We may have beaten the locking functions here.  We certainly
1118	 * know that dlm_lock() has been called :-)
1119	 * Because we can't have two lock calls in flight at once, we
1120	 * can use lockres->l_pending_gen.
1121	 */
1122	__lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1123
1124	wake_up(&lockres->l_event);
1125	spin_unlock_irqrestore(&lockres->l_lock, flags);
1126}
1127
1128static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1129						int convert)
1130{
1131	unsigned long flags;
1132
1133	mlog_entry_void();
1134	spin_lock_irqsave(&lockres->l_lock, flags);
1135	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1136	if (convert)
1137		lockres->l_action = OCFS2_AST_INVALID;
1138	else
1139		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1140	spin_unlock_irqrestore(&lockres->l_lock, flags);
1141
1142	wake_up(&lockres->l_event);
1143	mlog_exit_void();
1144}
1145
1146/* Note: If we detect another process working on the lock (i.e.,
1147 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1148 * to do the right thing in that case.
1149 */
1150static int ocfs2_lock_create(struct ocfs2_super *osb,
1151			     struct ocfs2_lock_res *lockres,
1152			     int level,
1153			     u32 dlm_flags)
1154{
1155	int ret = 0;
1156	unsigned long flags;
1157	unsigned int gen;
1158
1159	mlog_entry_void();
1160
1161	mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1162	     dlm_flags);
1163
1164	spin_lock_irqsave(&lockres->l_lock, flags);
1165	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1166	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1167		spin_unlock_irqrestore(&lockres->l_lock, flags);
1168		goto bail;
1169	}
1170
1171	lockres->l_action = OCFS2_AST_ATTACH;
1172	lockres->l_requested = level;
1173	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1174	gen = lockres_set_pending(lockres);
1175	spin_unlock_irqrestore(&lockres->l_lock, flags);
1176
1177	ret = ocfs2_dlm_lock(osb->cconn,
1178			     level,
1179			     &lockres->l_lksb,
1180			     dlm_flags,
1181			     lockres->l_name,
1182			     OCFS2_LOCK_ID_MAX_LEN - 1,
1183			     lockres);
1184	lockres_clear_pending(lockres, gen, osb);
1185	if (ret) {
1186		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1187		ocfs2_recover_from_dlm_error(lockres, 1);
1188	}
1189
1190	mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1191
1192bail:
1193	mlog_exit(ret);
1194	return ret;
1195}
1196
1197static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1198					int flag)
1199{
1200	unsigned long flags;
1201	int ret;
1202
1203	spin_lock_irqsave(&lockres->l_lock, flags);
1204	ret = lockres->l_flags & flag;
1205	spin_unlock_irqrestore(&lockres->l_lock, flags);
1206
1207	return ret;
1208}
1209
1210static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1211
1212{
1213	wait_event(lockres->l_event,
1214		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1215}
1216
1217static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1218
1219{
1220	wait_event(lockres->l_event,
1221		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1222}
1223
1224/* predict what lock level we'll be dropping down to on behalf
1225 * of another node, and return true if the currently wanted
1226 * level will be compatible with it. */
1227static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1228						     int wanted)
1229{
1230	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1231
1232	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1233}
1234
1235static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1236{
1237	INIT_LIST_HEAD(&mw->mw_item);
1238	init_completion(&mw->mw_complete);
1239	ocfs2_init_start_time(mw);
1240}
1241
1242static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1243{
1244	wait_for_completion(&mw->mw_complete);
1245	/* Re-arm the completion in case we want to wait on it again */
1246	INIT_COMPLETION(mw->mw_complete);
1247	return mw->mw_status;
1248}
1249
1250static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1251				    struct ocfs2_mask_waiter *mw,
1252				    unsigned long mask,
1253				    unsigned long goal)
1254{
1255	BUG_ON(!list_empty(&mw->mw_item));
1256
1257	assert_spin_locked(&lockres->l_lock);
1258
1259	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1260	mw->mw_mask = mask;
1261	mw->mw_goal = goal;
1262}
1263
1264/* returns 0 if the mw that was removed was already satisfied, -EBUSY
1265 * if the mask still hadn't reached its goal */
1266static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1267				      struct ocfs2_mask_waiter *mw)
1268{
1269	unsigned long flags;
1270	int ret = 0;
1271
1272	spin_lock_irqsave(&lockres->l_lock, flags);
1273	if (!list_empty(&mw->mw_item)) {
1274		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1275			ret = -EBUSY;
1276
1277		list_del_init(&mw->mw_item);
1278		init_completion(&mw->mw_complete);
1279	}
1280	spin_unlock_irqrestore(&lockres->l_lock, flags);
1281
1282	return ret;
1283
1284}
1285
1286static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1287					     struct ocfs2_lock_res *lockres)
1288{
1289	int ret;
1290
1291	ret = wait_for_completion_interruptible(&mw->mw_complete);
1292	if (ret)
1293		lockres_remove_mask_waiter(lockres, mw);
1294	else
1295		ret = mw->mw_status;
1296	/* Re-arm the completion in case we want to wait on it again */
1297	INIT_COMPLETION(mw->mw_complete);
1298	return ret;
1299}
1300
1301static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1302				struct ocfs2_lock_res *lockres,
1303				int level,
1304				u32 lkm_flags,
1305				int arg_flags,
1306				int l_subclass,
1307				unsigned long caller_ip)
1308{
1309	struct ocfs2_mask_waiter mw;
1310	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1311	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1312	unsigned long flags;
1313	unsigned int gen;
1314	int noqueue_attempted = 0;
1315
1316	mlog_entry_void();
1317
1318	ocfs2_init_mask_waiter(&mw);
1319
1320	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1321		lkm_flags |= DLM_LKF_VALBLK;
1322
1323again:
1324	wait = 0;
1325
1326	if (catch_signals && signal_pending(current)) {
1327		ret = -ERESTARTSYS;
1328		goto out;
1329	}
1330
1331	spin_lock_irqsave(&lockres->l_lock, flags);
1332
1333	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1334			"Cluster lock called on freeing lockres %s! flags "
1335			"0x%lx\n", lockres->l_name, lockres->l_flags);
1336
1337	/* We only compare against the currently granted level
1338	 * here. If the lock is blocked waiting on a downconvert,
1339	 * we'll get caught below. */
1340	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1341	    level > lockres->l_level) {
1342		/* is someone sitting in dlm_lock? If so, wait on
1343		 * them. */
1344		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1345		wait = 1;
1346		goto unlock;
1347	}
1348
1349	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1350	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1351		/* is the lock is currently blocked on behalf of
1352		 * another node */
1353		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1354		wait = 1;
1355		goto unlock;
1356	}
1357
1358	if (level > lockres->l_level) {
1359		if (noqueue_attempted > 0) {
1360			ret = -EAGAIN;
1361			goto unlock;
1362		}
1363		if (lkm_flags & DLM_LKF_NOQUEUE)
1364			noqueue_attempted = 1;
1365
1366		if (lockres->l_action != OCFS2_AST_INVALID)
1367			mlog(ML_ERROR, "lockres %s has action %u pending\n",
1368			     lockres->l_name, lockres->l_action);
1369
1370		if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1371			lockres->l_action = OCFS2_AST_ATTACH;
1372			lkm_flags &= ~DLM_LKF_CONVERT;
1373		} else {
1374			lockres->l_action = OCFS2_AST_CONVERT;
1375			lkm_flags |= DLM_LKF_CONVERT;
1376		}
1377
1378		lockres->l_requested = level;
1379		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1380		gen = lockres_set_pending(lockres);
1381		spin_unlock_irqrestore(&lockres->l_lock, flags);
1382
1383		BUG_ON(level == DLM_LOCK_IV);
1384		BUG_ON(level == DLM_LOCK_NL);
1385
1386		mlog(0, "lock %s, convert from %d to level = %d\n",
1387		     lockres->l_name, lockres->l_level, level);
1388
1389		/* call dlm_lock to upgrade lock now */
1390		ret = ocfs2_dlm_lock(osb->cconn,
1391				     level,
1392				     &lockres->l_lksb,
1393				     lkm_flags,
1394				     lockres->l_name,
1395				     OCFS2_LOCK_ID_MAX_LEN - 1,
1396				     lockres);
1397		lockres_clear_pending(lockres, gen, osb);
1398		if (ret) {
1399			if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1400			    (ret != -EAGAIN)) {
1401				ocfs2_log_dlm_error("ocfs2_dlm_lock",
1402						    ret, lockres);
1403			}
1404			ocfs2_recover_from_dlm_error(lockres, 1);
1405			goto out;
1406		}
1407
1408		mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1409		     lockres->l_name);
1410
1411		/* At this point we've gone inside the dlm and need to
1412		 * complete our work regardless. */
1413		catch_signals = 0;
1414
1415		/* wait for busy to clear and carry on */
1416		goto again;
1417	}
1418
1419	/* Ok, if we get here then we're good to go. */
1420	ocfs2_inc_holders(lockres, level);
1421
1422	ret = 0;
1423unlock:
1424	spin_unlock_irqrestore(&lockres->l_lock, flags);
1425out:
1426	/*
1427	 * This is helping work around a lock inversion between the page lock
1428	 * and dlm locks.  One path holds the page lock while calling aops
1429	 * which block acquiring dlm locks.  The voting thread holds dlm
1430	 * locks while acquiring page locks while down converting data locks.
1431	 * This block is helping an aop path notice the inversion and back
1432	 * off to unlock its page lock before trying the dlm lock again.
1433	 */
1434	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1435	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1436		wait = 0;
1437		if (lockres_remove_mask_waiter(lockres, &mw))
1438			ret = -EAGAIN;
1439		else
1440			goto again;
1441	}
1442	if (wait) {
1443		ret = ocfs2_wait_for_mask(&mw);
1444		if (ret == 0)
1445			goto again;
1446		mlog_errno(ret);
1447	}
1448	ocfs2_update_lock_stats(lockres, level, &mw, ret);
1449
1450#ifdef CONFIG_DEBUG_LOCK_ALLOC
1451	if (!ret && lockres->l_lockdep_map.key != NULL) {
1452		if (level == DLM_LOCK_PR)
1453			rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1454				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1455				caller_ip);
1456		else
1457			rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1458				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1459				caller_ip);
1460	}
1461#endif
1462	mlog_exit(ret);
1463	return ret;
1464}
1465
1466static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1467				     struct ocfs2_lock_res *lockres,
1468				     int level,
1469				     u32 lkm_flags,
1470				     int arg_flags)
1471{
1472	return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1473				    0, _RET_IP_);
1474}
1475
1476
1477static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1478				   struct ocfs2_lock_res *lockres,
1479				   int level,
1480				   unsigned long caller_ip)
1481{
1482	unsigned long flags;
1483
1484	mlog_entry_void();
1485	spin_lock_irqsave(&lockres->l_lock, flags);
1486	ocfs2_dec_holders(lockres, level);
1487	ocfs2_downconvert_on_unlock(osb, lockres);
1488	spin_unlock_irqrestore(&lockres->l_lock, flags);
1489#ifdef CONFIG_DEBUG_LOCK_ALLOC
1490	if (lockres->l_lockdep_map.key != NULL)
1491		rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1492#endif
1493	mlog_exit_void();
1494}
1495
1496static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1497				 struct ocfs2_lock_res *lockres,
1498				 int ex,
1499				 int local)
1500{
1501	int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1502	unsigned long flags;
1503	u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1504
1505	spin_lock_irqsave(&lockres->l_lock, flags);
1506	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1507	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1508	spin_unlock_irqrestore(&lockres->l_lock, flags);
1509
1510	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1511}
1512
1513/* Grants us an EX lock on the data and metadata resources, skipping
1514 * the normal cluster directory lookup. Use this ONLY on newly created
1515 * inodes which other nodes can't possibly see, and which haven't been
1516 * hashed in the inode hash yet. This can give us a good performance
1517 * increase as it'll skip the network broadcast normally associated
1518 * with creating a new lock resource. */
1519int ocfs2_create_new_inode_locks(struct inode *inode)
1520{
1521	int ret;
1522	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1523
1524	BUG_ON(!inode);
1525	BUG_ON(!ocfs2_inode_is_new(inode));
1526
1527	mlog_entry_void();
1528
1529	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1530
1531	/* NOTE: That we don't increment any of the holder counts, nor
1532	 * do we add anything to a journal handle. Since this is
1533	 * supposed to be a new inode which the cluster doesn't know
1534	 * about yet, there is no need to.  As far as the LVB handling
1535	 * is concerned, this is basically like acquiring an EX lock
1536	 * on a resource which has an invalid one -- we'll set it
1537	 * valid when we release the EX. */
1538
1539	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1540	if (ret) {
1541		mlog_errno(ret);
1542		goto bail;
1543	}
1544
1545	/*
1546	 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1547	 * don't use a generation in their lock names.
1548	 */
1549	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1550	if (ret) {
1551		mlog_errno(ret);
1552		goto bail;
1553	}
1554
1555	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1556	if (ret) {
1557		mlog_errno(ret);
1558		goto bail;
1559	}
1560
1561bail:
1562	mlog_exit(ret);
1563	return ret;
1564}
1565
1566int ocfs2_rw_lock(struct inode *inode, int write)
1567{
1568	int status, level;
1569	struct ocfs2_lock_res *lockres;
1570	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1571
1572	BUG_ON(!inode);
1573
1574	mlog_entry_void();
1575
1576	mlog(0, "inode %llu take %s RW lock\n",
1577	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1578	     write ? "EXMODE" : "PRMODE");
1579
1580	if (ocfs2_mount_local(osb)) {
1581		mlog_exit(0);
1582		return 0;
1583	}
1584
1585	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1586
1587	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1588
1589	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1590				    0);
1591	if (status < 0)
1592		mlog_errno(status);
1593
1594	mlog_exit(status);
1595	return status;
1596}
1597
1598void ocfs2_rw_unlock(struct inode *inode, int write)
1599{
1600	int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1601	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1602	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1603
1604	mlog_entry_void();
1605
1606	mlog(0, "inode %llu drop %s RW lock\n",
1607	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1608	     write ? "EXMODE" : "PRMODE");
1609
1610	if (!ocfs2_mount_local(osb))
1611		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1612
1613	mlog_exit_void();
1614}
1615
1616/*
1617 * ocfs2_open_lock always get PR mode lock.
1618 */
1619int ocfs2_open_lock(struct inode *inode)
1620{
1621	int status = 0;
1622	struct ocfs2_lock_res *lockres;
1623	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1624
1625	BUG_ON(!inode);
1626
1627	mlog_entry_void();
1628
1629	mlog(0, "inode %llu take PRMODE open lock\n",
1630	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1631
1632	if (ocfs2_mount_local(osb))
1633		goto out;
1634
1635	lockres = &OCFS2_I(inode)->ip_open_lockres;
1636
1637	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1638				    DLM_LOCK_PR, 0, 0);
1639	if (status < 0)
1640		mlog_errno(status);
1641
1642out:
1643	mlog_exit(status);
1644	return status;
1645}
1646
1647int ocfs2_try_open_lock(struct inode *inode, int write)
1648{
1649	int status = 0, level;
1650	struct ocfs2_lock_res *lockres;
1651	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1652
1653	BUG_ON(!inode);
1654
1655	mlog_entry_void();
1656
1657	mlog(0, "inode %llu try to take %s open lock\n",
1658	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1659	     write ? "EXMODE" : "PRMODE");
1660
1661	if (ocfs2_mount_local(osb))
1662		goto out;
1663
1664	lockres = &OCFS2_I(inode)->ip_open_lockres;
1665
1666	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1667
1668	/*
1669	 * The file system may already holding a PRMODE/EXMODE open lock.
1670	 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1671	 * other nodes and the -EAGAIN will indicate to the caller that
1672	 * this inode is still in use.
1673	 */
1674	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1675				    level, DLM_LKF_NOQUEUE, 0);
1676
1677out:
1678	mlog_exit(status);
1679	return status;
1680}
1681
1682/*
1683 * ocfs2_open_unlock unlock PR and EX mode open locks.
1684 */
1685void ocfs2_open_unlock(struct inode *inode)
1686{
1687	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1688	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1689
1690	mlog_entry_void();
1691
1692	mlog(0, "inode %llu drop open lock\n",
1693	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1694
1695	if (ocfs2_mount_local(osb))
1696		goto out;
1697
1698	if(lockres->l_ro_holders)
1699		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1700				     DLM_LOCK_PR);
1701	if(lockres->l_ex_holders)
1702		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1703				     DLM_LOCK_EX);
1704
1705out:
1706	mlog_exit_void();
1707}
1708
1709static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1710				     int level)
1711{
1712	int ret;
1713	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1714	unsigned long flags;
1715	struct ocfs2_mask_waiter mw;
1716
1717	ocfs2_init_mask_waiter(&mw);
1718
1719retry_cancel:
1720	spin_lock_irqsave(&lockres->l_lock, flags);
1721	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1722		ret = ocfs2_prepare_cancel_convert(osb, lockres);
1723		if (ret) {
1724			spin_unlock_irqrestore(&lockres->l_lock, flags);
1725			ret = ocfs2_cancel_convert(osb, lockres);
1726			if (ret < 0) {
1727				mlog_errno(ret);
1728				goto out;
1729			}
1730			goto retry_cancel;
1731		}
1732		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1733		spin_unlock_irqrestore(&lockres->l_lock, flags);
1734
1735		ocfs2_wait_for_mask(&mw);
1736		goto retry_cancel;
1737	}
1738
1739	ret = -ERESTARTSYS;
1740	/*
1741	 * We may still have gotten the lock, in which case there's no
1742	 * point to restarting the syscall.
1743	 */
1744	if (lockres->l_level == level)
1745		ret = 0;
1746
1747	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1748	     lockres->l_flags, lockres->l_level, lockres->l_action);
1749
1750	spin_unlock_irqrestore(&lockres->l_lock, flags);
1751
1752out:
1753	return ret;
1754}
1755
1756/*
1757 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1758 * flock() calls. The locking approach this requires is sufficiently
1759 * different from all other cluster lock types that we implement a
1760 * seperate path to the "low-level" dlm calls. In particular:
1761 *
1762 * - No optimization of lock levels is done - we take at exactly
1763 *   what's been requested.
1764 *
1765 * - No lock caching is employed. We immediately downconvert to
1766 *   no-lock at unlock time. This also means flock locks never go on
1767 *   the blocking list).
1768 *
1769 * - Since userspace can trivially deadlock itself with flock, we make
1770 *   sure to allow cancellation of a misbehaving applications flock()
1771 *   request.
1772 *
1773 * - Access to any flock lockres doesn't require concurrency, so we
1774 *   can simplify the code by requiring the caller to guarantee
1775 *   serialization of dlmglue flock calls.
1776 */
1777int ocfs2_file_lock(struct file *file, int ex, int trylock)
1778{
1779	int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1780	unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1781	unsigned long flags;
1782	struct ocfs2_file_private *fp = file->private_data;
1783	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1784	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1785	struct ocfs2_mask_waiter mw;
1786
1787	ocfs2_init_mask_waiter(&mw);
1788
1789	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1790	    (lockres->l_level > DLM_LOCK_NL)) {
1791		mlog(ML_ERROR,
1792		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1793		     "level: %u\n", lockres->l_name, lockres->l_flags,
1794		     lockres->l_level);
1795		return -EINVAL;
1796	}
1797
1798	spin_lock_irqsave(&lockres->l_lock, flags);
1799	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1800		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1801		spin_unlock_irqrestore(&lockres->l_lock, flags);
1802
1803		/*
1804		 * Get the lock at NLMODE to start - that way we
1805		 * can cancel the upconvert request if need be.
1806		 */
1807		ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1808		if (ret < 0) {
1809			mlog_errno(ret);
1810			goto out;
1811		}
1812
1813		ret = ocfs2_wait_for_mask(&mw);
1814		if (ret) {
1815			mlog_errno(ret);
1816			goto out;
1817		}
1818		spin_lock_irqsave(&lockres->l_lock, flags);
1819	}
1820
1821	lockres->l_action = OCFS2_AST_CONVERT;
1822	lkm_flags |= DLM_LKF_CONVERT;
1823	lockres->l_requested = level;
1824	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1825
1826	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1827	spin_unlock_irqrestore(&lockres->l_lock, flags);
1828
1829	ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1830			     lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1831			     lockres);
1832	if (ret) {
1833		if (!trylock || (ret != -EAGAIN)) {
1834			ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1835			ret = -EINVAL;
1836		}
1837
1838		ocfs2_recover_from_dlm_error(lockres, 1);
1839		lockres_remove_mask_waiter(lockres, &mw);
1840		goto out;
1841	}
1842
1843	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1844	if (ret == -ERESTARTSYS) {
1845		/*
1846		 * Userspace can cause deadlock itself with
1847		 * flock(). Current behavior locally is to allow the
1848		 * deadlock, but abort the system call if a signal is
1849		 * received. We follow this example, otherwise a
1850		 * poorly written program could sit in kernel until
1851		 * reboot.
1852		 *
1853		 * Handling this is a bit more complicated for Ocfs2
1854		 * though. We can't exit this function with an
1855		 * outstanding lock request, so a cancel convert is
1856		 * required. We intentionally overwrite 'ret' - if the
1857		 * cancel fails and the lock was granted, it's easier
1858		 * to just bubble sucess back up to the user.
1859		 */
1860		ret = ocfs2_flock_handle_signal(lockres, level);
1861	} else if (!ret && (level > lockres->l_level)) {
1862		/* Trylock failed asynchronously */
1863		BUG_ON(!trylock);
1864		ret = -EAGAIN;
1865	}
1866
1867out:
1868
1869	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1870	     lockres->l_name, ex, trylock, ret);
1871	return ret;
1872}
1873
1874void ocfs2_file_unlock(struct file *file)
1875{
1876	int ret;
1877	unsigned int gen;
1878	unsigned long flags;
1879	struct ocfs2_file_private *fp = file->private_data;
1880	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1881	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1882	struct ocfs2_mask_waiter mw;
1883
1884	ocfs2_init_mask_waiter(&mw);
1885
1886	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1887		return;
1888
1889	if (lockres->l_level == DLM_LOCK_NL)
1890		return;
1891
1892	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1893	     lockres->l_name, lockres->l_flags, lockres->l_level,
1894	     lockres->l_action);
1895
1896	spin_lock_irqsave(&lockres->l_lock, flags);
1897	/*
1898	 * Fake a blocking ast for the downconvert code.
1899	 */
1900	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1901	lockres->l_blocking = DLM_LOCK_EX;
1902
1903	gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
1904	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1905	spin_unlock_irqrestore(&lockres->l_lock, flags);
1906
1907	ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
1908	if (ret) {
1909		mlog_errno(ret);
1910		return;
1911	}
1912
1913	ret = ocfs2_wait_for_mask(&mw);
1914	if (ret)
1915		mlog_errno(ret);
1916}
1917
1918static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1919					struct ocfs2_lock_res *lockres)
1920{
1921	int kick = 0;
1922
1923	mlog_entry_void();
1924
1925	/* If we know that another node is waiting on our lock, kick
1926	 * the downconvert thread * pre-emptively when we reach a release
1927	 * condition. */
1928	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1929		switch(lockres->l_blocking) {
1930		case DLM_LOCK_EX:
1931			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1932				kick = 1;
1933			break;
1934		case DLM_LOCK_PR:
1935			if (!lockres->l_ex_holders)
1936				kick = 1;
1937			break;
1938		default:
1939			BUG();
1940		}
1941	}
1942
1943	if (kick)
1944		ocfs2_wake_downconvert_thread(osb);
1945
1946	mlog_exit_void();
1947}
1948
1949#define OCFS2_SEC_BITS   34
1950#define OCFS2_SEC_SHIFT  (64 - 34)
1951#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1952
1953/* LVB only has room for 64 bits of time here so we pack it for
1954 * now. */
1955static u64 ocfs2_pack_timespec(struct timespec *spec)
1956{
1957	u64 res;
1958	u64 sec = spec->tv_sec;
1959	u32 nsec = spec->tv_nsec;
1960
1961	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1962
1963	return res;
1964}
1965
1966/* Call this with the lockres locked. I am reasonably sure we don't
1967 * need ip_lock in this function as anyone who would be changing those
1968 * values is supposed to be blocked in ocfs2_inode_lock right now. */
1969static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1970{
1971	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1972	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1973	struct ocfs2_meta_lvb *lvb;
1974
1975	mlog_entry_void();
1976
1977	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
1978
1979	/*
1980	 * Invalidate the LVB of a deleted inode - this way other
1981	 * nodes are forced to go to disk and discover the new inode
1982	 * status.
1983	 */
1984	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1985		lvb->lvb_version = 0;
1986		goto out;
1987	}
1988
1989	lvb->lvb_version   = OCFS2_LVB_VERSION;
1990	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1991	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1992	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1993	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1994	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1995	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1996	lvb->lvb_iatime_packed  =
1997		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1998	lvb->lvb_ictime_packed =
1999		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
2000	lvb->lvb_imtime_packed =
2001		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
2002	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
2003	lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2004	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2005
2006out:
2007	mlog_meta_lvb(0, lockres);
2008
2009	mlog_exit_void();
2010}
2011
2012static void ocfs2_unpack_timespec(struct timespec *spec,
2013				  u64 packed_time)
2014{
2015	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2016	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2017}
2018
2019static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
2020{
2021	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2022	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2023	struct ocfs2_meta_lvb *lvb;
2024
2025	mlog_entry_void();
2026
2027	mlog_meta_lvb(0, lockres);
2028
2029	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2030
2031	/* We're safe here without the lockres lock... */
2032	spin_lock(&oi->ip_lock);
2033	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2034	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2035
2036	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2037	oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2038	ocfs2_set_inode_flags(inode);
2039
2040	/* fast-symlinks are a special case */
2041	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2042		inode->i_blocks = 0;
2043	else
2044		inode->i_blocks = ocfs2_inode_sector_count(inode);
2045
2046	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
2047	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
2048	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2049	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
2050	ocfs2_unpack_timespec(&inode->i_atime,
2051			      be64_to_cpu(lvb->lvb_iatime_packed));
2052	ocfs2_unpack_timespec(&inode->i_mtime,
2053			      be64_to_cpu(lvb->lvb_imtime_packed));
2054	ocfs2_unpack_timespec(&inode->i_ctime,
2055			      be64_to_cpu(lvb->lvb_ictime_packed));
2056	spin_unlock(&oi->ip_lock);
2057
2058	mlog_exit_void();
2059}
2060
2061static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2062					      struct ocfs2_lock_res *lockres)
2063{
2064	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2065
2066	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2067	    && lvb->lvb_version == OCFS2_LVB_VERSION
2068	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2069		return 1;
2070	return 0;
2071}
2072
2073/* Determine whether a lock resource needs to be refreshed, and
2074 * arbitrate who gets to refresh it.
2075 *
2076 *   0 means no refresh needed.
2077 *
2078 *   > 0 means you need to refresh this and you MUST call
2079 *   ocfs2_complete_lock_res_refresh afterwards. */
2080static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2081{
2082	unsigned long flags;
2083	int status = 0;
2084
2085	mlog_entry_void();
2086
2087refresh_check:
2088	spin_lock_irqsave(&lockres->l_lock, flags);
2089	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2090		spin_unlock_irqrestore(&lockres->l_lock, flags);
2091		goto bail;
2092	}
2093
2094	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2095		spin_unlock_irqrestore(&lockres->l_lock, flags);
2096
2097		ocfs2_wait_on_refreshing_lock(lockres);
2098		goto refresh_check;
2099	}
2100
2101	/* Ok, I'll be the one to refresh this lock. */
2102	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2103	spin_unlock_irqrestore(&lockres->l_lock, flags);
2104
2105	status = 1;
2106bail:
2107	mlog_exit(status);
2108	return status;
2109}
2110
2111/* If status is non zero, I'll mark it as not being in refresh
2112 * anymroe, but i won't clear the needs refresh flag. */
2113static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2114						   int status)
2115{
2116	unsigned long flags;
2117	mlog_entry_void();
2118
2119	spin_lock_irqsave(&lockres->l_lock, flags);
2120	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2121	if (!status)
2122		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2123	spin_unlock_irqrestore(&lockres->l_lock, flags);
2124
2125	wake_up(&lockres->l_event);
2126
2127	mlog_exit_void();
2128}
2129
2130/* may or may not return a bh if it went to disk. */
2131static int ocfs2_inode_lock_update(struct inode *inode,
2132				  struct buffer_head **bh)
2133{
2134	int status = 0;
2135	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2136	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2137	struct ocfs2_dinode *fe;
2138	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2139
2140	mlog_entry_void();
2141
2142	if (ocfs2_mount_local(osb))
2143		goto bail;
2144
2145	spin_lock(&oi->ip_lock);
2146	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2147		mlog(0, "Orphaned inode %llu was deleted while we "
2148		     "were waiting on a lock. ip_flags = 0x%x\n",
2149		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
2150		spin_unlock(&oi->ip_lock);
2151		status = -ENOENT;
2152		goto bail;
2153	}
2154	spin_unlock(&oi->ip_lock);
2155
2156	if (!ocfs2_should_refresh_lock_res(lockres))
2157		goto bail;
2158
2159	/* This will discard any caching information we might have had
2160	 * for the inode metadata. */
2161	ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2162
2163	ocfs2_extent_map_trunc(inode, 0);
2164
2165	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2166		mlog(0, "Trusting LVB on inode %llu\n",
2167		     (unsigned long long)oi->ip_blkno);
2168		ocfs2_refresh_inode_from_lvb(inode);
2169	} else {
2170		/* Boo, we have to go to disk. */
2171		/* read bh, cast, ocfs2_refresh_inode */
2172		status = ocfs2_read_inode_block(inode, bh);
2173		if (status < 0) {
2174			mlog_errno(status);
2175			goto bail_refresh;
2176		}
2177		fe = (struct ocfs2_dinode *) (*bh)->b_data;
2178
2179		/* This is a good chance to make sure we're not
2180		 * locking an invalid object.  ocfs2_read_inode_block()
2181		 * already checked that the inode block is sane.
2182		 *
2183		 * We bug on a stale inode here because we checked
2184		 * above whether it was wiped from disk. The wiping
2185		 * node provides a guarantee that we receive that
2186		 * message and can mark the inode before dropping any
2187		 * locks associated with it. */
2188		mlog_bug_on_msg(inode->i_generation !=
2189				le32_to_cpu(fe->i_generation),
2190				"Invalid dinode %llu disk generation: %u "
2191				"inode->i_generation: %u\n",
2192				(unsigned long long)oi->ip_blkno,
2193				le32_to_cpu(fe->i_generation),
2194				inode->i_generation);
2195		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2196				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2197				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
2198				(unsigned long long)oi->ip_blkno,
2199				(unsigned long long)le64_to_cpu(fe->i_dtime),
2200				le32_to_cpu(fe->i_flags));
2201
2202		ocfs2_refresh_inode(inode, fe);
2203		ocfs2_track_lock_refresh(lockres);
2204	}
2205
2206	status = 0;
2207bail_refresh:
2208	ocfs2_complete_lock_res_refresh(lockres, status);
2209bail:
2210	mlog_exit(status);
2211	return status;
2212}
2213
2214static int ocfs2_assign_bh(struct inode *inode,
2215			   struct buffer_head **ret_bh,
2216			   struct buffer_head *passed_bh)
2217{
2218	int status;
2219
2220	if (passed_bh) {
2221		/* Ok, the update went to disk for us, use the
2222		 * returned bh. */
2223		*ret_bh = passed_bh;
2224		get_bh(*ret_bh);
2225
2226		return 0;
2227	}
2228
2229	status = ocfs2_read_inode_block(inode, ret_bh);
2230	if (status < 0)
2231		mlog_errno(status);
2232
2233	return status;
2234}
2235
2236/*
2237 * returns < 0 error if the callback will never be called, otherwise
2238 * the result of the lock will be communicated via the callback.
2239 */
2240int ocfs2_inode_lock_full_nested(struct inode *inode,
2241				 struct buffer_head **ret_bh,
2242				 int ex,
2243				 int arg_flags,
2244				 int subclass)
2245{
2246	int status, level, acquired;
2247	u32 dlm_flags;
2248	struct ocfs2_lock_res *lockres = NULL;
2249	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2250	struct buffer_head *local_bh = NULL;
2251
2252	BUG_ON(!inode);
2253
2254	mlog_entry_void();
2255
2256	mlog(0, "inode %llu, take %s META lock\n",
2257	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2258	     ex ? "EXMODE" : "PRMODE");
2259
2260	status = 0;
2261	acquired = 0;
2262	/* We'll allow faking a readonly metadata lock for
2263	 * rodevices. */
2264	if (ocfs2_is_hard_readonly(osb)) {
2265		if (ex)
2266			status = -EROFS;
2267		goto bail;
2268	}
2269
2270	if (ocfs2_mount_local(osb))
2271		goto local;
2272
2273	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2274		ocfs2_wait_for_recovery(osb);
2275
2276	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2277	level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2278	dlm_flags = 0;
2279	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2280		dlm_flags |= DLM_LKF_NOQUEUE;
2281
2282	status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2283				      arg_flags, subclass, _RET_IP_);
2284	if (status < 0) {
2285		if (status != -EAGAIN && status != -EIOCBRETRY)
2286			mlog_errno(status);
2287		goto bail;
2288	}
2289
2290	/* Notify the error cleanup path to drop the cluster lock. */
2291	acquired = 1;
2292
2293	/* We wait twice because a node may have died while we were in
2294	 * the lower dlm layers. The second time though, we've
2295	 * committed to owning this lock so we don't allow signals to
2296	 * abort the operation. */
2297	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2298		ocfs2_wait_for_recovery(osb);
2299
2300local:
2301	/*
2302	 * We only see this flag if we're being called from
2303	 * ocfs2_read_locked_inode(). It means we're locking an inode
2304	 * which hasn't been populated yet, so clear the refresh flag
2305	 * and let the caller handle it.
2306	 */
2307	if (inode->i_state & I_NEW) {
2308		status = 0;
2309		if (lockres)
2310			ocfs2_complete_lock_res_refresh(lockres, 0);
2311		goto bail;
2312	}
2313
2314	/* This is fun. The caller may want a bh back, or it may
2315	 * not. ocfs2_inode_lock_update definitely wants one in, but
2316	 * may or may not read one, depending on what's in the
2317	 * LVB. The result of all of this is that we've *only* gone to
2318	 * disk if we have to, so the complexity is worthwhile. */
2319	status = ocfs2_inode_lock_update(inode, &local_bh);
2320	if (status < 0) {
2321		if (status != -ENOENT)
2322			mlog_errno(status);
2323		goto bail;
2324	}
2325
2326	if (ret_bh) {
2327		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2328		if (status < 0) {
2329			mlog_errno(status);
2330			goto bail;
2331		}
2332	}
2333
2334bail:
2335	if (status < 0) {
2336		if (ret_bh && (*ret_bh)) {
2337			brelse(*ret_bh);
2338			*ret_bh = NULL;
2339		}
2340		if (acquired)
2341			ocfs2_inode_unlock(inode, ex);
2342	}
2343
2344	if (local_bh)
2345		brelse(local_bh);
2346
2347	mlog_exit(status);
2348	return status;
2349}
2350
2351/*
2352 * This is working around a lock inversion between tasks acquiring DLM
2353 * locks while holding a page lock and the downconvert thread which
2354 * blocks dlm lock acquiry while acquiring page locks.
2355 *
2356 * ** These _with_page variantes are only intended to be called from aop
2357 * methods that hold page locks and return a very specific *positive* error
2358 * code that aop methods pass up to the VFS -- test for errors with != 0. **
2359 *
2360 * The DLM is called such that it returns -EAGAIN if it would have
2361 * blocked waiting for the downconvert thread.  In that case we unlock
2362 * our page so the downconvert thread can make progress.  Once we've
2363 * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2364 * that called us can bubble that back up into the VFS who will then
2365 * immediately retry the aop call.
2366 *
2367 * We do a blocking lock and immediate unlock before returning, though, so that
2368 * the lock has a great chance of being cached on this node by the time the VFS
2369 * calls back to retry the aop.    This has a potential to livelock as nodes
2370 * ping locks back and forth, but that's a risk we're willing to take to avoid
2371 * the lock inversion simply.
2372 */
2373int ocfs2_inode_lock_with_page(struct inode *inode,
2374			      struct buffer_head **ret_bh,
2375			      int ex,
2376			      struct page *page)
2377{
2378	int ret;
2379
2380	ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2381	if (ret == -EAGAIN) {
2382		unlock_page(page);
2383		if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2384			ocfs2_inode_unlock(inode, ex);
2385		ret = AOP_TRUNCATED_PAGE;
2386	}
2387
2388	return ret;
2389}
2390
2391int ocfs2_inode_lock_atime(struct inode *inode,
2392			  struct vfsmount *vfsmnt,
2393			  int *level)
2394{
2395	int ret;
2396
2397	mlog_entry_void();
2398	ret = ocfs2_inode_lock(inode, NULL, 0);
2399	if (ret < 0) {
2400		mlog_errno(ret);
2401		return ret;
2402	}
2403
2404	/*
2405	 * If we should update atime, we will get EX lock,
2406	 * otherwise we just get PR lock.
2407	 */
2408	if (ocfs2_should_update_atime(inode, vfsmnt)) {
2409		struct buffer_head *bh = NULL;
2410
2411		ocfs2_inode_unlock(inode, 0);
2412		ret = ocfs2_inode_lock(inode, &bh, 1);
2413		if (ret < 0) {
2414			mlog_errno(ret);
2415			return ret;
2416		}
2417		*level = 1;
2418		if (ocfs2_should_update_atime(inode, vfsmnt))
2419			ocfs2_update_inode_atime(inode, bh);
2420		if (bh)
2421			brelse(bh);
2422	} else
2423		*level = 0;
2424
2425	mlog_exit(ret);
2426	return ret;
2427}
2428
2429void ocfs2_inode_unlock(struct inode *inode,
2430		       int ex)
2431{
2432	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2433	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2434	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2435
2436	mlog_entry_void();
2437
2438	mlog(0, "inode %llu drop %s META lock\n",
2439	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2440	     ex ? "EXMODE" : "PRMODE");
2441
2442	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2443	    !ocfs2_mount_local(osb))
2444		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2445
2446	mlog_exit_void();
2447}
2448
2449int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2450{
2451	struct ocfs2_lock_res *lockres;
2452	struct ocfs2_orphan_scan_lvb *lvb;
2453	int status = 0;
2454
2455	if (ocfs2_is_hard_readonly(osb))
2456		return -EROFS;
2457
2458	if (ocfs2_mount_local(osb))
2459		return 0;
2460
2461	lockres = &osb->osb_orphan_scan.os_lockres;
2462	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2463	if (status < 0)
2464		return status;
2465
2466	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2467	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2468	    lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2469		*seqno = be32_to_cpu(lvb->lvb_os_seqno);
2470	else
2471		*seqno = osb->osb_orphan_scan.os_seqno + 1;
2472
2473	return status;
2474}
2475
2476void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2477{
2478	struct ocfs2_lock_res *lockres;
2479	struct ocfs2_orphan_scan_lvb *lvb;
2480
2481	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2482		lockres = &osb->osb_orphan_scan.os_lockres;
2483		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2484		lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2485		lvb->lvb_os_seqno = cpu_to_be32(seqno);
2486		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2487	}
2488}
2489
2490int ocfs2_super_lock(struct ocfs2_super *osb,
2491		     int ex)
2492{
2493	int status = 0;
2494	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2495	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2496
2497	mlog_entry_void();
2498
2499	if (ocfs2_is_hard_readonly(osb))
2500		return -EROFS;
2501
2502	if (ocfs2_mount_local(osb))
2503		goto bail;
2504
2505	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2506	if (status < 0) {
2507		mlog_errno(status);
2508		goto bail;
2509	}
2510
2511	/* The super block lock path is really in the best position to
2512	 * know when resources covered by the lock need to be
2513	 * refreshed, so we do it here. Of course, making sense of
2514	 * everything is up to the caller :) */
2515	status = ocfs2_should_refresh_lock_res(lockres);
2516	if (status < 0) {
2517		mlog_errno(status);
2518		goto bail;
2519	}
2520	if (status) {
2521		status = ocfs2_refresh_slot_info(osb);
2522
2523		ocfs2_complete_lock_res_refresh(lockres, status);
2524
2525		if (status < 0)
2526			mlog_errno(status);
2527		ocfs2_track_lock_refresh(lockres);
2528	}
2529bail:
2530	mlog_exit(status);
2531	return status;
2532}
2533
2534void ocfs2_super_unlock(struct ocfs2_super *osb,
2535			int ex)
2536{
2537	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2538	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2539
2540	if (!ocfs2_mount_local(osb))
2541		ocfs2_cluster_unlock(osb, lockres, level);
2542}
2543
2544int ocfs2_rename_lock(struct ocfs2_super *osb)
2545{
2546	int status;
2547	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2548
2549	if (ocfs2_is_hard_readonly(osb))
2550		return -EROFS;
2551
2552	if (ocfs2_mount_local(osb))
2553		return 0;
2554
2555	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2556	if (status < 0)
2557		mlog_errno(status);
2558
2559	return status;
2560}
2561
2562void ocfs2_rename_unlock(struct ocfs2_super *osb)
2563{
2564	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2565
2566	if (!ocfs2_mount_local(osb))
2567		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2568}
2569
2570int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2571{
2572	int status;
2573	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2574
2575	if (ocfs2_is_hard_readonly(osb))
2576		return -EROFS;
2577
2578	if (ocfs2_mount_local(osb))
2579		return 0;
2580
2581	status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2582				    0, 0);
2583	if (status < 0)
2584		mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2585
2586	return status;
2587}
2588
2589void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2590{
2591	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2592
2593	if (!ocfs2_mount_local(osb))
2594		ocfs2_cluster_unlock(osb, lockres,
2595				     ex ? LKM_EXMODE : LKM_PRMODE);
2596}
2597
2598int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2599{
2600	int ret;
2601	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2602	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2603	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2604
2605	BUG_ON(!dl);
2606
2607	if (ocfs2_is_hard_readonly(osb))
2608		return -EROFS;
2609
2610	if (ocfs2_mount_local(osb))
2611		return 0;
2612
2613	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2614	if (ret < 0)
2615		mlog_errno(ret);
2616
2617	return ret;
2618}
2619
2620void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2621{
2622	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2623	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2624	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2625
2626	if (!ocfs2_mount_local(osb))
2627		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2628}
2629
2630/* Reference counting of the dlm debug structure. We want this because
2631 * open references on the debug inodes can live on after a mount, so
2632 * we can't rely on the ocfs2_super to always exist. */
2633static void ocfs2_dlm_debug_free(struct kref *kref)
2634{
2635	struct ocfs2_dlm_debug *dlm_debug;
2636
2637	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2638
2639	kfree(dlm_debug);
2640}
2641
2642void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2643{
2644	if (dlm_debug)
2645		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2646}
2647
2648static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2649{
2650	kref_get(&debug->d_refcnt);
2651}
2652
2653struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2654{
2655	struct ocfs2_dlm_debug *dlm_debug;
2656
2657	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2658	if (!dlm_debug) {
2659		mlog_errno(-ENOMEM);
2660		goto out;
2661	}
2662
2663	kref_init(&dlm_debug->d_refcnt);
2664	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2665	dlm_debug->d_locking_state = NULL;
2666out:
2667	return dlm_debug;
2668}
2669
2670/* Access to this is arbitrated for us via seq_file->sem. */
2671struct ocfs2_dlm_seq_priv {
2672	struct ocfs2_dlm_debug *p_dlm_debug;
2673	struct ocfs2_lock_res p_iter_res;
2674	struct ocfs2_lock_res p_tmp_res;
2675};
2676
2677static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2678						 struct ocfs2_dlm_seq_priv *priv)
2679{
2680	struct ocfs2_lock_res *iter, *ret = NULL;
2681	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2682
2683	assert_spin_locked(&ocfs2_dlm_tracking_lock);
2684
2685	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2686		/* discover the head of the list */
2687		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2688			mlog(0, "End of list found, %p\n", ret);
2689			break;
2690		}
2691
2692		/* We track our "dummy" iteration lockres' by a NULL
2693		 * l_ops field. */
2694		if (iter->l_ops != NULL) {
2695			ret = iter;
2696			break;
2697		}
2698	}
2699
2700	return ret;
2701}
2702
2703static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2704{
2705	struct ocfs2_dlm_seq_priv *priv = m->private;
2706	struct ocfs2_lock_res *iter;
2707
2708	spin_lock(&ocfs2_dlm_tracking_lock);
2709	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2710	if (iter) {
2711		/* Since lockres' have the lifetime of their container
2712		 * (which can be inodes, ocfs2_supers, etc) we want to
2713		 * copy this out to a temporary lockres while still
2714		 * under the spinlock. Obviously after this we can't
2715		 * trust any pointers on the copy returned, but that's
2716		 * ok as the information we want isn't typically held
2717		 * in them. */
2718		priv->p_tmp_res = *iter;
2719		iter = &priv->p_tmp_res;
2720	}
2721	spin_unlock(&ocfs2_dlm_tracking_lock);
2722
2723	return iter;
2724}
2725
2726static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2727{
2728}
2729
2730static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2731{
2732	struct ocfs2_dlm_seq_priv *priv = m->private;
2733	struct ocfs2_lock_res *iter = v;
2734	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2735
2736	spin_lock(&ocfs2_dlm_tracking_lock);
2737	iter = ocfs2_dlm_next_res(iter, priv);
2738	list_del_init(&dummy->l_debug_list);
2739	if (iter) {
2740		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2741		priv->p_tmp_res = *iter;
2742		iter = &priv->p_tmp_res;
2743	}
2744	spin_unlock(&ocfs2_dlm_tracking_lock);
2745
2746	return iter;
2747}
2748
2749/* So that debugfs.ocfs2 can determine which format is being used */
2750#define OCFS2_DLM_DEBUG_STR_VERSION 2
2751static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2752{
2753	int i;
2754	char *lvb;
2755	struct ocfs2_lock_res *lockres = v;
2756
2757	if (!lockres)
2758		return -EINVAL;
2759
2760	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2761
2762	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2763		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2764			   lockres->l_name,
2765			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2766	else
2767		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2768
2769	seq_printf(m, "%d\t"
2770		   "0x%lx\t"
2771		   "0x%x\t"
2772		   "0x%x\t"
2773		   "%u\t"
2774		   "%u\t"
2775		   "%d\t"
2776		   "%d\t",
2777		   lockres->l_level,
2778		   lockres->l_flags,
2779		   lockres->l_action,
2780		   lockres->l_unlock_action,
2781		   lockres->l_ro_holders,
2782		   lockres->l_ex_holders,
2783		   lockres->l_requested,
2784		   lockres->l_blocking);
2785
2786	/* Dump the raw LVB */
2787	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2788	for(i = 0; i < DLM_LVB_LEN; i++)
2789		seq_printf(m, "0x%x\t", lvb[i]);
2790
2791#ifdef CONFIG_OCFS2_FS_STATS
2792# define lock_num_prmode(_l)		(_l)->l_lock_num_prmode
2793# define lock_num_exmode(_l)		(_l)->l_lock_num_exmode
2794# define lock_num_prmode_failed(_l)	(_l)->l_lock_num_prmode_failed
2795# define lock_num_exmode_failed(_l)	(_l)->l_lock_num_exmode_failed
2796# define lock_total_prmode(_l)		(_l)->l_lock_total_prmode
2797# define lock_total_exmode(_l)		(_l)->l_lock_total_exmode
2798# define lock_max_prmode(_l)		(_l)->l_lock_max_prmode
2799# define lock_max_exmode(_l)		(_l)->l_lock_max_exmode
2800# define lock_refresh(_l)		(_l)->l_lock_refresh
2801#else
2802# define lock_num_prmode(_l)		(0ULL)
2803# define lock_num_exmode(_l)		(0ULL)
2804# define lock_num_prmode_failed(_l)	(0)
2805# define lock_num_exmode_failed(_l)	(0)
2806# define lock_total_prmode(_l)		(0ULL)
2807# define lock_total_exmode(_l)		(0ULL)
2808# define lock_max_prmode(_l)		(0)
2809# define lock_max_exmode(_l)		(0)
2810# define lock_refresh(_l)		(0)
2811#endif
2812	/* The following seq_print was added in version 2 of this output */
2813	seq_printf(m, "%llu\t"
2814		   "%llu\t"
2815		   "%u\t"
2816		   "%u\t"
2817		   "%llu\t"
2818		   "%llu\t"
2819		   "%u\t"
2820		   "%u\t"
2821		   "%u\t",
2822		   lock_num_prmode(lockres),
2823		   lock_num_exmode(lockres),
2824		   lock_num_prmode_failed(lockres),
2825		   lock_num_exmode_failed(lockres),
2826		   lock_total_prmode(lockres),
2827		   lock_total_exmode(lockres),
2828		   lock_max_prmode(lockres),
2829		   lock_max_exmode(lockres),
2830		   lock_refresh(lockres));
2831
2832	/* End the line */
2833	seq_printf(m, "\n");
2834	return 0;
2835}
2836
2837static const struct seq_operations ocfs2_dlm_seq_ops = {
2838	.start =	ocfs2_dlm_seq_start,
2839	.stop =		ocfs2_dlm_seq_stop,
2840	.next =		ocfs2_dlm_seq_next,
2841	.show =		ocfs2_dlm_seq_show,
2842};
2843
2844static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2845{
2846	struct seq_file *seq = (struct seq_file *) file->private_data;
2847	struct ocfs2_dlm_seq_priv *priv = seq->private;
2848	struct ocfs2_lock_res *res = &priv->p_iter_res;
2849
2850	ocfs2_remove_lockres_tracking(res);
2851	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2852	return seq_release_private(inode, file);
2853}
2854
2855static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2856{
2857	int ret;
2858	struct ocfs2_dlm_seq_priv *priv;
2859	struct seq_file *seq;
2860	struct ocfs2_super *osb;
2861
2862	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2863	if (!priv) {
2864		ret = -ENOMEM;
2865		mlog_errno(ret);
2866		goto out;
2867	}
2868	osb = inode->i_private;
2869	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2870	priv->p_dlm_debug = osb->osb_dlm_debug;
2871	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2872
2873	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2874	if (ret) {
2875		kfree(priv);
2876		mlog_errno(ret);
2877		goto out;
2878	}
2879
2880	seq = (struct seq_file *) file->private_data;
2881	seq->private = priv;
2882
2883	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2884				   priv->p_dlm_debug);
2885
2886out:
2887	return ret;
2888}
2889
2890static const struct file_operations ocfs2_dlm_debug_fops = {
2891	.open =		ocfs2_dlm_debug_open,
2892	.release =	ocfs2_dlm_debug_release,
2893	.read =		seq_read,
2894	.llseek =	seq_lseek,
2895};
2896
2897static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2898{
2899	int ret = 0;
2900	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2901
2902	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2903							 S_IFREG|S_IRUSR,
2904							 osb->osb_debug_root,
2905							 osb,
2906							 &ocfs2_dlm_debug_fops);
2907	if (!dlm_debug->d_locking_state) {
2908		ret = -EINVAL;
2909		mlog(ML_ERROR,
2910		     "Unable to create locking state debugfs file.\n");
2911		goto out;
2912	}
2913
2914	ocfs2_get_dlm_debug(dlm_debug);
2915out:
2916	return ret;
2917}
2918
2919static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2920{
2921	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2922
2923	if (dlm_debug) {
2924		debugfs_remove(dlm_debug->d_locking_state);
2925		ocfs2_put_dlm_debug(dlm_debug);
2926	}
2927}
2928
2929int ocfs2_dlm_init(struct ocfs2_super *osb)
2930{
2931	int status = 0;
2932	struct ocfs2_cluster_connection *conn = NULL;
2933
2934	mlog_entry_void();
2935
2936	if (ocfs2_mount_local(osb)) {
2937		osb->node_num = 0;
2938		goto local;
2939	}
2940
2941	status = ocfs2_dlm_init_debug(osb);
2942	if (status < 0) {
2943		mlog_errno(status);
2944		goto bail;
2945	}
2946
2947	/* launch downconvert thread */
2948	osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2949	if (IS_ERR(osb->dc_task)) {
2950		status = PTR_ERR(osb->dc_task);
2951		osb->dc_task = NULL;
2952		mlog_errno(status);
2953		goto bail;
2954	}
2955
2956	/* for now, uuid == domain */
2957	status = ocfs2_cluster_connect(osb->osb_cluster_stack,
2958				       osb->uuid_str,
2959				       strlen(osb->uuid_str),
2960				       ocfs2_do_node_down, osb,
2961				       &conn);
2962	if (status) {
2963		mlog_errno(status);
2964		goto bail;
2965	}
2966
2967	status = ocfs2_cluster_this_node(&osb->node_num);
2968	if (status < 0) {
2969		mlog_errno(status);
2970		mlog(ML_ERROR,
2971		     "could not find this host's node number\n");
2972		ocfs2_cluster_disconnect(conn, 0);
2973		goto bail;
2974	}
2975
2976local:
2977	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2978	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2979	ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
2980	ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
2981
2982	osb->cconn = conn;
2983
2984	status = 0;
2985bail:
2986	if (status < 0) {
2987		ocfs2_dlm_shutdown_debug(osb);
2988		if (osb->dc_task)
2989			kthread_stop(osb->dc_task);
2990	}
2991
2992	mlog_exit(status);
2993	return status;
2994}
2995
2996void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
2997			int hangup_pending)
2998{
2999	mlog_entry_void();
3000
3001	ocfs2_drop_osb_locks(osb);
3002
3003	/*
3004	 * Now that we have dropped all locks and ocfs2_dismount_volume()
3005	 * has disabled recovery, the DLM won't be talking to us.  It's
3006	 * safe to tear things down before disconnecting the cluster.
3007	 */
3008
3009	if (osb->dc_task) {
3010		kthread_stop(osb->dc_task);
3011		osb->dc_task = NULL;
3012	}
3013
3014	ocfs2_lock_res_free(&osb->osb_super_lockres);
3015	ocfs2_lock_res_free(&osb->osb_rename_lockres);
3016	ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3017	ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3018
3019	ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3020	osb->cconn = NULL;
3021
3022	ocfs2_dlm_shutdown_debug(osb);
3023
3024	mlog_exit_void();
3025}
3026
3027static void ocfs2_unlock_ast(void *opaque, int error)
3028{
3029	struct ocfs2_lock_res *lockres = opaque;
3030	unsigned long flags;
3031
3032	mlog_entry_void();
3033
3034	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
3035	     lockres->l_unlock_action);
3036
3037	spin_lock_irqsave(&lockres->l_lock, flags);
3038	if (error) {
3039		mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
3040		     "unlock_action %d\n", error, lockres->l_name,
3041		     lockres->l_unlock_action);
3042		spin_unlock_irqrestore(&lockres->l_lock, flags);
3043		mlog_exit_void();
3044		return;
3045	}
3046
3047	switch(lockres->l_unlock_action) {
3048	case OCFS2_UNLOCK_CANCEL_CONVERT:
3049		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
3050		lockres->l_action = OCFS2_AST_INVALID;
3051		/* Downconvert thread may have requeued this lock, we
3052		 * need to wake it. */
3053		if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3054			ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
3055		break;
3056	case OCFS2_UNLOCK_DROP_LOCK:
3057		lockres->l_level = DLM_LOCK_IV;
3058		break;
3059	default:
3060		BUG();
3061	}
3062
3063	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
3064	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
3065	wake_up(&lockres->l_event);
3066	spin_unlock_irqrestore(&lockres->l_lock, flags);
3067
3068	mlog_exit_void();
3069}
3070
3071static int ocfs2_drop_lock(struct ocfs2_super *osb,
3072			   struct ocfs2_lock_res *lockres)
3073{
3074	int ret;
3075	unsigned long flags;
3076	u32 lkm_flags = 0;
3077
3078	/* We didn't get anywhere near actually using this lockres. */
3079	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3080		goto out;
3081
3082	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3083		lkm_flags |= DLM_LKF_VALBLK;
3084
3085	spin_lock_irqsave(&lockres->l_lock, flags);
3086
3087	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3088			"lockres %s, flags 0x%lx\n",
3089			lockres->l_name, lockres->l_flags);
3090
3091	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3092		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3093		     "%u, unlock_action = %u\n",
3094		     lockres->l_name, lockres->l_flags, lockres->l_action,
3095		     lockres->l_unlock_action);
3096
3097		spin_unlock_irqrestore(&lockres->l_lock, flags);
3098
3099		/* XXX: Today we just wait on any busy
3100		 * locks... Perhaps we need to cancel converts in the
3101		 * future? */
3102		ocfs2_wait_on_busy_lock(lockres);
3103
3104		spin_lock_irqsave(&lockres->l_lock, flags);
3105	}
3106
3107	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3108		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3109		    lockres->l_level == DLM_LOCK_EX &&
3110		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3111			lockres->l_ops->set_lvb(lockres);
3112	}
3113
3114	if (lockres->l_flags & OCFS2_LOCK_BUSY)
3115		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3116		     lockres->l_name);
3117	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3118		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3119
3120	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3121		spin_unlock_irqrestore(&lockres->l_lock, flags);
3122		goto out;
3123	}
3124
3125	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3126
3127	/* make sure we never get here while waiting for an ast to
3128	 * fire. */
3129	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3130
3131	/* is this necessary? */
3132	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3133	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3134	spin_unlock_irqrestore(&lockres->l_lock, flags);
3135
3136	mlog(0, "lock %s\n", lockres->l_name);
3137
3138	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
3139			       lockres);
3140	if (ret) {
3141		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3142		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3143		ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3144		BUG();
3145	}
3146	mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3147	     lockres->l_name);
3148
3149	ocfs2_wait_on_busy_lock(lockres);
3150out:
3151	mlog_exit(0);
3152	return 0;
3153}
3154
3155/* Mark the lockres as being dropped. It will no longer be
3156 * queued if blocking, but we still may have to wait on it
3157 * being dequeued from the downconvert thread before we can consider
3158 * it safe to drop.
3159 *
3160 * You can *not* attempt to call cluster_lock on this lockres anymore. */
3161void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
3162{
3163	int status;
3164	struct ocfs2_mask_waiter mw;
3165	unsigned long flags;
3166
3167	ocfs2_init_mask_waiter(&mw);
3168
3169	spin_lock_irqsave(&lockres->l_lock, flags);
3170	lockres->l_flags |= OCFS2_LOCK_FREEING;
3171	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3172		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3173		spin_unlock_irqrestore(&lockres->l_lock, flags);
3174
3175		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3176
3177		status = ocfs2_wait_for_mask(&mw);
3178		if (status)
3179			mlog_errno(status);
3180
3181		spin_lock_irqsave(&lockres->l_lock, flags);
3182	}
3183	spin_unlock_irqrestore(&lockres->l_lock, flags);
3184}
3185
3186void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3187			       struct ocfs2_lock_res *lockres)
3188{
3189	int ret;
3190
3191	ocfs2_mark_lockres_freeing(lockres);
3192	ret = ocfs2_drop_lock(osb, lockres);
3193	if (ret)
3194		mlog_errno(ret);
3195}
3196
3197static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3198{
3199	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3200	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3201	ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3202	ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3203}
3204
3205int ocfs2_drop_inode_locks(struct inode *inode)
3206{
3207	int status, err;
3208
3209	mlog_entry_void();
3210
3211	/* No need to call ocfs2_mark_lockres_freeing here -
3212	 * ocfs2_clear_inode has done it for us. */
3213
3214	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3215			      &OCFS2_I(inode)->ip_open_lockres);
3216	if (err < 0)
3217		mlog_errno(err);
3218
3219	status = err;
3220
3221	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3222			      &OCFS2_I(inode)->ip_inode_lockres);
3223	if (err < 0)
3224		mlog_errno(err);
3225	if (err < 0 && !status)
3226		status = err;
3227
3228	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3229			      &OCFS2_I(inode)->ip_rw_lockres);
3230	if (err < 0)
3231		mlog_errno(err);
3232	if (err < 0 && !status)
3233		status = err;
3234
3235	mlog_exit(status);
3236	return status;
3237}
3238
3239static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3240					      int new_level)
3241{
3242	assert_spin_locked(&lockres->l_lock);
3243
3244	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3245
3246	if (lockres->l_level <= new_level) {
3247		mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
3248		     lockres->l_level, new_level);
3249		BUG();
3250	}
3251
3252	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
3253	     lockres->l_name, new_level, lockres->l_blocking);
3254
3255	lockres->l_action = OCFS2_AST_DOWNCONVERT;
3256	lockres->l_requested = new_level;
3257	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3258	return lockres_set_pending(lockres);
3259}
3260
3261static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3262				  struct ocfs2_lock_res *lockres,
3263				  int new_level,
3264				  int lvb,
3265				  unsigned int generation)
3266{
3267	int ret;
3268	u32 dlm_flags = DLM_LKF_CONVERT;
3269
3270	mlog_entry_void();
3271
3272	if (lvb)
3273		dlm_flags |= DLM_LKF_VALBLK;
3274
3275	ret = ocfs2_dlm_lock(osb->cconn,
3276			     new_level,
3277			     &lockres->l_lksb,
3278			     dlm_flags,
3279			     lockres->l_name,
3280			     OCFS2_LOCK_ID_MAX_LEN - 1,
3281			     lockres);
3282	lockres_clear_pending(lockres, generation, osb);
3283	if (ret) {
3284		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3285		ocfs2_recover_from_dlm_error(lockres, 1);
3286		goto bail;
3287	}
3288
3289	ret = 0;
3290bail:
3291	mlog_exit(ret);
3292	return ret;
3293}
3294
3295/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3296static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3297				        struct ocfs2_lock_res *lockres)
3298{
3299	assert_spin_locked(&lockres->l_lock);
3300
3301	mlog_entry_void();
3302	mlog(0, "lock %s\n", lockres->l_name);
3303
3304	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3305		/* If we're already trying to cancel a lock conversion
3306		 * then just drop the spinlock and allow the caller to
3307		 * requeue this lock. */
3308
3309		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
3310		return 0;
3311	}
3312
3313	/* were we in a convert when we got the bast fire? */
3314	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3315	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
3316	/* set things up for the unlockast to know to just
3317	 * clear out the ast_action and unset busy, etc. */
3318	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3319
3320	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3321			"lock %s, invalid flags: 0x%lx\n",
3322			lockres->l_name, lockres->l_flags);
3323
3324	return 1;
3325}
3326
3327static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3328				struct ocfs2_lock_res *lockres)
3329{
3330	int ret;
3331
3332	mlog_entry_void();
3333	mlog(0, "lock %s\n", lockres->l_name);
3334
3335	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3336			       DLM_LKF_CANCEL, lockres);
3337	if (ret) {
3338		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3339		ocfs2_recover_from_dlm_error(lockres, 0);
3340	}
3341
3342	mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
3343
3344	mlog_exit(ret);
3345	return ret;
3346}
3347
3348static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3349			      struct ocfs2_lock_res *lockres,
3350			      struct ocfs2_unblock_ctl *ctl)
3351{
3352	unsigned long flags;
3353	int blocking;
3354	int new_level;
3355	int ret = 0;
3356	int set_lvb = 0;
3357	unsigned int gen;
3358
3359	mlog_entry_void();
3360
3361	spin_lock_irqsave(&lockres->l_lock, flags);
3362
3363	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
3364
3365recheck:
3366	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3367		/* XXX
3368		 * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3369		 * exists entirely for one reason - another thread has set
3370		 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3371		 *
3372		 * If we do ocfs2_cancel_convert() before the other thread
3373		 * calls dlm_lock(), our cancel will do nothing.  We will
3374		 * get no ast, and we will have no way of knowing the
3375		 * cancel failed.  Meanwhile, the other thread will call
3376		 * into dlm_lock() and wait...forever.
3377		 *
3378		 * Why forever?  Because another node has asked for the
3379		 * lock first; that's why we're here in unblock_lock().
3380		 *
3381		 * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3382		 * set, we just requeue the unblock.  Only when the other
3383		 * thread has called dlm_lock() and cleared PENDING will
3384		 * we then cancel their request.
3385		 *
3386		 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3387		 * at the same time they set OCFS2_DLM_BUSY.  They must
3388		 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3389		 */
3390		if (lockres->l_flags & OCFS2_LOCK_PENDING)
3391			goto leave_requeue;
3392
3393		ctl->requeue = 1;
3394		ret = ocfs2_prepare_cancel_convert(osb, lockres);
3395		spin_unlock_irqrestore(&lockres->l_lock, flags);
3396		if (ret) {
3397			ret = ocfs2_cancel_convert(osb, lockres);
3398			if (ret < 0)
3399				mlog_errno(ret);
3400		}
3401		goto leave;
3402	}
3403
3404	/* if we're blocking an exclusive and we have *any* holders,
3405	 * then requeue. */
3406	if ((lockres->l_blocking == DLM_LOCK_EX)
3407	    && (lockres->l_ex_holders || lockres->l_ro_holders))
3408		goto leave_requeue;
3409
3410	/* If it's a PR we're blocking, then only
3411	 * requeue if we've got any EX holders */
3412	if (lockres->l_blocking == DLM_LOCK_PR &&
3413	    lockres->l_ex_holders)
3414		goto leave_requeue;
3415
3416	/*
3417	 * Can we get a lock in this state if the holder counts are
3418	 * zero? The meta data unblock code used to check this.
3419	 */
3420	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3421	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
3422		goto leave_requeue;
3423
3424	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3425
3426	if (lockres->l_ops->check_downconvert
3427	    && !lockres->l_ops->check_downconvert(lockres, new_level))
3428		goto leave_requeue;
3429
3430	/* If we get here, then we know that there are no more
3431	 * incompatible holders (and anyone asking for an incompatible
3432	 * lock is blocked). We can now downconvert the lock */
3433	if (!lockres->l_ops->downconvert_worker)
3434		goto downconvert;
3435
3436	/* Some lockres types want to do a bit of work before
3437	 * downconverting a lock. Allow that here. The worker function
3438	 * may sleep, so we save off a copy of what we're blocking as
3439	 * it may change while we're not holding the spin lock. */
3440	blocking = lockres->l_blocking;
3441	spin_unlock_irqrestore(&lockres->l_lock, flags);
3442
3443	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3444
3445	if (ctl->unblock_action == UNBLOCK_STOP_POST)
3446		goto leave;
3447
3448	spin_lock_irqsave(&lockres->l_lock, flags);
3449	if (blocking != lockres->l_blocking) {
3450		/* If this changed underneath us, then we can't drop
3451		 * it just yet. */
3452		goto recheck;
3453	}
3454
3455downconvert:
3456	ctl->requeue = 0;
3457
3458	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3459		if (lockres->l_level == DLM_LOCK_EX)
3460			set_lvb = 1;
3461
3462		/*
3463		 * We only set the lvb if the lock has been fully
3464		 * refreshed - otherwise we risk setting stale
3465		 * data. Otherwise, there's no need to actually clear
3466		 * out the lvb here as it's value is still valid.
3467		 */
3468		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3469			lockres->l_ops->set_lvb(lockres);
3470	}
3471
3472	gen = ocfs2_prepare_downconvert(lockres, new_level);
3473	spin_unlock_irqrestore(&lockres->l_lock, flags);
3474	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3475				     gen);
3476
3477leave:
3478	mlog_exit(ret);
3479	return ret;
3480
3481leave_requeue:
3482	spin_unlock_irqrestore(&lockres->l_lock, flags);
3483	ctl->requeue = 1;
3484
3485	mlog_exit(0);
3486	return 0;
3487}
3488
3489static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3490				     int blocking)
3491{
3492	struct inode *inode;
3493	struct address_space *mapping;
3494
3495       	inode = ocfs2_lock_res_inode(lockres);
3496	mapping = inode->i_mapping;
3497
3498	if (!S_ISREG(inode->i_mode))
3499		goto out;
3500
3501	/*
3502	 * We need this before the filemap_fdatawrite() so that it can
3503	 * transfer the dirty bit from the PTE to the
3504	 * page. Unfortunately this means that even for EX->PR
3505	 * downconverts, we'll lose our mappings and have to build
3506	 * them up again.
3507	 */
3508	unmap_mapping_range(mapping, 0, 0, 0);
3509
3510	if (filemap_fdatawrite(mapping)) {
3511		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3512		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
3513	}
3514	sync_mapping_buffers(mapping);
3515	if (blocking == DLM_LOCK_EX) {
3516		truncate_inode_pages(mapping, 0);
3517	} else {
3518		/* We only need to wait on the I/O if we're not also
3519		 * truncating pages because truncate_inode_pages waits
3520		 * for us above. We don't truncate pages if we're
3521		 * blocking anything < EXMODE because we want to keep
3522		 * them around in that case. */
3523		filemap_fdatawait(mapping);
3524	}
3525
3526out:
3527	return UNBLOCK_CONTINUE;
3528}
3529
3530static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3531				 struct ocfs2_lock_res *lockres,
3532				 int new_level)
3533{
3534	int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3535
3536	BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3537	BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3538
3539	if (checkpointed)
3540		return 1;
3541
3542	ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
3543	return 0;
3544}
3545
3546static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3547					int new_level)
3548{
3549	struct inode *inode = ocfs2_lock_res_inode(lockres);
3550
3551	return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
3552}
3553
3554static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3555{
3556	struct inode *inode = ocfs2_lock_res_inode(lockres);
3557
3558	__ocfs2_stuff_meta_lvb(inode);
3559}
3560
3561/*
3562 * Does the final reference drop on our dentry lock. Right now this
3563 * happens in the downconvert thread, but we could choose to simplify the
3564 * dlmglue API and push these off to the ocfs2_wq in the future.
3565 */
3566static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3567				     struct ocfs2_lock_res *lockres)
3568{
3569	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3570	ocfs2_dentry_lock_put(osb, dl);
3571}
3572
3573/*
3574 * d_delete() matching dentries before the lock downconvert.
3575 *
3576 * At this point, any process waiting to destroy the
3577 * dentry_lock due to last ref count is stopped by the
3578 * OCFS2_LOCK_QUEUED flag.
3579 *
3580 * We have two potential problems
3581 *
3582 * 1) If we do the last reference drop on our dentry_lock (via dput)
3583 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3584 *    the downconvert to finish. Instead we take an elevated
3585 *    reference and push the drop until after we've completed our
3586 *    unblock processing.
3587 *
3588 * 2) There might be another process with a final reference,
3589 *    waiting on us to finish processing. If this is the case, we
3590 *    detect it and exit out - there's no more dentries anyway.
3591 */
3592static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3593				       int blocking)
3594{
3595	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3596	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3597	struct dentry *dentry;
3598	unsigned long flags;
3599	int extra_ref = 0;
3600
3601	/*
3602	 * This node is blocking another node from getting a read
3603	 * lock. This happens when we've renamed within a
3604	 * directory. We've forced the other nodes to d_delete(), but
3605	 * we never actually dropped our lock because it's still
3606	 * valid. The downconvert code will retain a PR for this node,
3607	 * so there's no further work to do.
3608	 */
3609	if (blocking == DLM_LOCK_PR)
3610		return UNBLOCK_CONTINUE;
3611
3612	/*
3613	 * Mark this inode as potentially orphaned. The code in
3614	 * ocfs2_delete_inode() will figure out whether it actually
3615	 * needs to be freed or not.
3616	 */
3617	spin_lock(&oi->ip_lock);
3618	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3619	spin_unlock(&oi->ip_lock);
3620
3621	/*
3622	 * Yuck. We need to make sure however that the check of
3623	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
3624	 * respect to a reference decrement or the setting of that
3625	 * flag.
3626	 */
3627	spin_lock_irqsave(&lockres->l_lock, flags);
3628	spin_lock(&dentry_attach_lock);
3629	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3630	    && dl->dl_count) {
3631		dl->dl_count++;
3632		extra_ref = 1;
3633	}
3634	spin_unlock(&dentry_attach_lock);
3635	spin_unlock_irqrestore(&lockres->l_lock, flags);
3636
3637	mlog(0, "extra_ref = %d\n", extra_ref);
3638
3639	/*
3640	 * We have a process waiting on us in ocfs2_dentry_iput(),
3641	 * which means we can't have any more outstanding
3642	 * aliases. There's no need to do any more work.
3643	 */
3644	if (!extra_ref)
3645		return UNBLOCK_CONTINUE;
3646
3647	spin_lock(&dentry_attach_lock);
3648	while (1) {
3649		dentry = ocfs2_find_local_alias(dl->dl_inode,
3650						dl->dl_parent_blkno, 1);
3651		if (!dentry)
3652			break;
3653		spin_unlock(&dentry_attach_lock);
3654
3655		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3656		     dentry->d_name.name);
3657
3658		/*
3659		 * The following dcache calls may do an
3660		 * iput(). Normally we don't want that from the
3661		 * downconverting thread, but in this case it's ok
3662		 * because the requesting node already has an
3663		 * exclusive lock on the inode, so it can't be queued
3664		 * for a downconvert.
3665		 */
3666		d_delete(dentry);
3667		dput(dentry);
3668
3669		spin_lock(&dentry_attach_lock);
3670	}
3671	spin_unlock(&dentry_attach_lock);
3672
3673	/*
3674	 * If we are the last holder of this dentry lock, there is no
3675	 * reason to downconvert so skip straight to the unlock.
3676	 */
3677	if (dl->dl_count == 1)
3678		return UNBLOCK_STOP_POST;
3679
3680	return UNBLOCK_CONTINUE_POST;
3681}
3682
3683static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
3684					    int new_level)
3685{
3686	struct ocfs2_refcount_tree *tree =
3687				ocfs2_lock_res_refcount_tree(lockres);
3688
3689	return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
3690}
3691
3692static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
3693					 int blocking)
3694{
3695	struct ocfs2_refcount_tree *tree =
3696				ocfs2_lock_res_refcount_tree(lockres);
3697
3698	ocfs2_metadata_cache_purge(&tree->rf_ci);
3699
3700	return UNBLOCK_CONTINUE;
3701}
3702
3703static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
3704{
3705	struct ocfs2_qinfo_lvb *lvb;
3706	struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
3707	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3708					    oinfo->dqi_gi.dqi_type);
3709
3710	mlog_entry_void();
3711
3712	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3713	lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
3714	lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
3715	lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
3716	lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
3717	lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
3718	lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
3719	lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
3720
3721	mlog_exit_void();
3722}
3723
3724void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3725{
3726	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3727	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3728	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3729
3730	mlog_entry_void();
3731	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
3732		ocfs2_cluster_unlock(osb, lockres, level);
3733	mlog_exit_void();
3734}
3735
3736static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
3737{
3738	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
3739					    oinfo->dqi_gi.dqi_type);
3740	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3741	struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3742	struct buffer_head *bh = NULL;
3743	struct ocfs2_global_disk_dqinfo *gdinfo;
3744	int status = 0;
3745
3746	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
3747	    lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
3748		info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
3749		info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
3750		oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
3751		oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
3752		oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
3753		oinfo->dqi_gi.dqi_free_entry =
3754					be32_to_cpu(lvb->lvb_free_entry);
3755	} else {
3756		status = ocfs2_read_quota_block(oinfo->dqi_gqinode, 0, &bh);
3757		if (status) {
3758			mlog_errno(status);
3759			goto bail;
3760		}
3761		gdinfo = (struct ocfs2_global_disk_dqinfo *)
3762					(bh->b_data + OCFS2_GLOBAL_INFO_OFF);
3763		info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
3764		info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
3765		oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
3766		oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
3767		oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
3768		oinfo->dqi_gi.dqi_free_entry =
3769					le32_to_cpu(gdinfo->dqi_free_entry);
3770		brelse(bh);
3771		ocfs2_track_lock_refresh(lockres);
3772	}
3773
3774bail:
3775	return status;
3776}
3777
3778/* Lock quota info, this function expects at least shared lock on the quota file
3779 * so that we can safely refresh quota info from disk. */
3780int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
3781{
3782	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
3783	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
3784	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3785	int status = 0;
3786
3787	mlog_entry_void();
3788
3789	/* On RO devices, locking really isn't needed... */
3790	if (ocfs2_is_hard_readonly(osb)) {
3791		if (ex)
3792			status = -EROFS;
3793		goto bail;
3794	}
3795	if (ocfs2_mount_local(osb))
3796		goto bail;
3797
3798	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3799	if (status < 0) {
3800		mlog_errno(status);
3801		goto bail;
3802	}
3803	if (!ocfs2_should_refresh_lock_res(lockres))
3804		goto bail;
3805	/* OK, we have the lock but we need to refresh the quota info */
3806	status = ocfs2_refresh_qinfo(oinfo);
3807	if (status)
3808		ocfs2_qinfo_unlock(oinfo, ex);
3809	ocfs2_complete_lock_res_refresh(lockres, status);
3810bail:
3811	mlog_exit(status);
3812	return status;
3813}
3814
3815int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
3816{
3817	int status;
3818	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3819	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3820	struct ocfs2_super *osb = lockres->l_priv;
3821
3822
3823	if (ocfs2_is_hard_readonly(osb))
3824		return -EROFS;
3825
3826	if (ocfs2_mount_local(osb))
3827		return 0;
3828
3829	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
3830	if (status < 0)
3831		mlog_errno(status);
3832
3833	return status;
3834}
3835
3836void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
3837{
3838	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
3839	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
3840	struct ocfs2_super *osb = lockres->l_priv;
3841
3842	if (!ocfs2_mount_local(osb))
3843		ocfs2_cluster_unlock(osb, lockres, level);
3844}
3845
3846/*
3847 * This is the filesystem locking protocol.  It provides the lock handling
3848 * hooks for the underlying DLM.  It has a maximum version number.
3849 * The version number allows interoperability with systems running at
3850 * the same major number and an equal or smaller minor number.
3851 *
3852 * Whenever the filesystem does new things with locks (adds or removes a
3853 * lock, orders them differently, does different things underneath a lock),
3854 * the version must be changed.  The protocol is negotiated when joining
3855 * the dlm domain.  A node may join the domain if its major version is
3856 * identical to all other nodes and its minor version is greater than
3857 * or equal to all other nodes.  When its minor version is greater than
3858 * the other nodes, it will run at the minor version specified by the
3859 * other nodes.
3860 *
3861 * If a locking change is made that will not be compatible with older
3862 * versions, the major number must be increased and the minor version set
3863 * to zero.  If a change merely adds a behavior that can be disabled when
3864 * speaking to older versions, the minor version must be increased.  If a
3865 * change adds a fully backwards compatible change (eg, LVB changes that
3866 * are just ignored by older versions), the version does not need to be
3867 * updated.
3868 */
3869static struct ocfs2_locking_protocol lproto = {
3870	.lp_max_version = {
3871		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
3872		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
3873	},
3874	.lp_lock_ast		= ocfs2_locking_ast,
3875	.lp_blocking_ast	= ocfs2_blocking_ast,
3876	.lp_unlock_ast		= ocfs2_unlock_ast,
3877};
3878
3879void ocfs2_set_locking_protocol(void)
3880{
3881	ocfs2_stack_glue_set_locking_protocol(&lproto);
3882}
3883
3884
3885static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3886				       struct ocfs2_lock_res *lockres)
3887{
3888	int status;
3889	struct ocfs2_unblock_ctl ctl = {0, 0,};
3890	unsigned long flags;
3891
3892	/* Our reference to the lockres in this function can be
3893	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3894	 * flag. */
3895
3896	mlog_entry_void();
3897
3898	BUG_ON(!lockres);
3899	BUG_ON(!lockres->l_ops);
3900
3901	mlog(0, "lockres %s blocked.\n", lockres->l_name);
3902
3903	/* Detect whether a lock has been marked as going away while
3904	 * the downconvert thread was processing other things. A lock can
3905	 * still be marked with OCFS2_LOCK_FREEING after this check,
3906	 * but short circuiting here will still save us some
3907	 * performance. */
3908	spin_lock_irqsave(&lockres->l_lock, flags);
3909	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3910		goto unqueue;
3911	spin_unlock_irqrestore(&lockres->l_lock, flags);
3912
3913	status = ocfs2_unblock_lock(osb, lockres, &ctl);
3914	if (status < 0)
3915		mlog_errno(status);
3916
3917	spin_lock_irqsave(&lockres->l_lock, flags);
3918unqueue:
3919	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3920		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3921	} else
3922		ocfs2_schedule_blocked_lock(osb, lockres);
3923
3924	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3925	     ctl.requeue ? "yes" : "no");
3926	spin_unlock_irqrestore(&lockres->l_lock, flags);
3927
3928	if (ctl.unblock_action != UNBLOCK_CONTINUE
3929	    && lockres->l_ops->post_unlock)
3930		lockres->l_ops->post_unlock(osb, lockres);
3931
3932	mlog_exit_void();
3933}
3934
3935static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3936					struct ocfs2_lock_res *lockres)
3937{
3938	mlog_entry_void();
3939
3940	assert_spin_locked(&lockres->l_lock);
3941
3942	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3943		/* Do not schedule a lock for downconvert when it's on
3944		 * the way to destruction - any nodes wanting access
3945		 * to the resource will get it soon. */
3946		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3947		     lockres->l_name, lockres->l_flags);
3948		return;
3949	}
3950
3951	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3952
3953	spin_lock(&osb->dc_task_lock);
3954	if (list_empty(&lockres->l_blocked_list)) {
3955		list_add_tail(&lockres->l_blocked_list,
3956			      &osb->blocked_lock_list);
3957		osb->blocked_lock_count++;
3958	}
3959	spin_unlock(&osb->dc_task_lock);
3960
3961	mlog_exit_void();
3962}
3963
3964static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3965{
3966	unsigned long processed;
3967	struct ocfs2_lock_res *lockres;
3968
3969	mlog_entry_void();
3970
3971	spin_lock(&osb->dc_task_lock);
3972	/* grab this early so we know to try again if a state change and
3973	 * wake happens part-way through our work  */
3974	osb->dc_work_sequence = osb->dc_wake_sequence;
3975
3976	processed = osb->blocked_lock_count;
3977	while (processed) {
3978		BUG_ON(list_empty(&osb->blocked_lock_list));
3979
3980		lockres = list_entry(osb->blocked_lock_list.next,
3981				     struct ocfs2_lock_res, l_blocked_list);
3982		list_del_init(&lockres->l_blocked_list);
3983		osb->blocked_lock_count--;
3984		spin_unlock(&osb->dc_task_lock);
3985
3986		BUG_ON(!processed);
3987		processed--;
3988
3989		ocfs2_process_blocked_lock(osb, lockres);
3990
3991		spin_lock(&osb->dc_task_lock);
3992	}
3993	spin_unlock(&osb->dc_task_lock);
3994
3995	mlog_exit_void();
3996}
3997
3998static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3999{
4000	int empty = 0;
4001
4002	spin_lock(&osb->dc_task_lock);
4003	if (list_empty(&osb->blocked_lock_list))
4004		empty = 1;
4005
4006	spin_unlock(&osb->dc_task_lock);
4007	return empty;
4008}
4009
4010static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
4011{
4012	int should_wake = 0;
4013
4014	spin_lock(&osb->dc_task_lock);
4015	if (osb->dc_work_sequence != osb->dc_wake_sequence)
4016		should_wake = 1;
4017	spin_unlock(&osb->dc_task_lock);
4018
4019	return should_wake;
4020}
4021
4022static int ocfs2_downconvert_thread(void *arg)
4023{
4024	int status = 0;
4025	struct ocfs2_super *osb = arg;
4026
4027	/* only quit once we've been asked to stop and there is no more
4028	 * work available */
4029	while (!(kthread_should_stop() &&
4030		ocfs2_downconvert_thread_lists_empty(osb))) {
4031
4032		wait_event_interruptible(osb->dc_event,
4033					 ocfs2_downconvert_thread_should_wake(osb) ||
4034					 kthread_should_stop());
4035
4036		mlog(0, "downconvert_thread: awoken\n");
4037
4038		ocfs2_downconvert_thread_do_work(osb);
4039	}
4040
4041	osb->dc_task = NULL;
4042	return status;
4043}
4044
4045void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4046{
4047	spin_lock(&osb->dc_task_lock);
4048	/* make sure the voting thread gets a swipe at whatever changes
4049	 * the caller may have made to the voting state */
4050	osb->dc_wake_sequence++;
4051	spin_unlock(&osb->dc_task_lock);
4052	wake_up(&osb->dc_event);
4053}
4054