dlmglue.c revision 8ddb7b004dfa1832a750e199df8bff4b75b73565
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/kthread.h>
31#include <linux/pagemap.h>
32#include <linux/debugfs.h>
33#include <linux/seq_file.h>
34#include <linux/time.h>
35
36#define MLOG_MASK_PREFIX ML_DLM_GLUE
37#include <cluster/masklog.h>
38
39#include "ocfs2.h"
40#include "ocfs2_lockingver.h"
41
42#include "alloc.h"
43#include "dcache.h"
44#include "dlmglue.h"
45#include "extent_map.h"
46#include "file.h"
47#include "heartbeat.h"
48#include "inode.h"
49#include "journal.h"
50#include "stackglue.h"
51#include "slot_map.h"
52#include "super.h"
53#include "uptodate.h"
54
55#include "buffer_head_io.h"
56
57struct ocfs2_mask_waiter {
58	struct list_head	mw_item;
59	int			mw_status;
60	struct completion	mw_complete;
61	unsigned long		mw_mask;
62	unsigned long		mw_goal;
63#ifdef CONFIG_OCFS2_FS_STATS
64	unsigned long long 	mw_lock_start;
65#endif
66};
67
68static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
69static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
70static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
71
72/*
73 * Return value from ->downconvert_worker functions.
74 *
75 * These control the precise actions of ocfs2_unblock_lock()
76 * and ocfs2_process_blocked_lock()
77 *
78 */
79enum ocfs2_unblock_action {
80	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
81	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
82				      * ->post_unlock callback */
83	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
84				      * ->post_unlock() callback. */
85};
86
87struct ocfs2_unblock_ctl {
88	int requeue;
89	enum ocfs2_unblock_action unblock_action;
90};
91
92static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
93					int new_level);
94static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
95
96static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
97				     int blocking);
98
99static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
100				       int blocking);
101
102static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
103				     struct ocfs2_lock_res *lockres);
104
105
106#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
107
108/* This aids in debugging situations where a bad LVB might be involved. */
109static void ocfs2_dump_meta_lvb_info(u64 level,
110				     const char *function,
111				     unsigned int line,
112				     struct ocfs2_lock_res *lockres)
113{
114	struct ocfs2_meta_lvb *lvb =
115		(struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
116
117	mlog(level, "LVB information for %s (called from %s:%u):\n",
118	     lockres->l_name, function, line);
119	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
120	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
121	     be32_to_cpu(lvb->lvb_igeneration));
122	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
123	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
124	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
125	     be16_to_cpu(lvb->lvb_imode));
126	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
127	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
128	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
129	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
130	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
131	     be32_to_cpu(lvb->lvb_iattr));
132}
133
134
135/*
136 * OCFS2 Lock Resource Operations
137 *
138 * These fine tune the behavior of the generic dlmglue locking infrastructure.
139 *
140 * The most basic of lock types can point ->l_priv to their respective
141 * struct ocfs2_super and allow the default actions to manage things.
142 *
143 * Right now, each lock type also needs to implement an init function,
144 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
145 * should be called when the lock is no longer needed (i.e., object
146 * destruction time).
147 */
148struct ocfs2_lock_res_ops {
149	/*
150	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
151	 * this callback if ->l_priv is not an ocfs2_super pointer
152	 */
153	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
154
155	/*
156	 * Optionally called in the downconvert thread after a
157	 * successful downconvert. The lockres will not be referenced
158	 * after this callback is called, so it is safe to free
159	 * memory, etc.
160	 *
161	 * The exact semantics of when this is called are controlled
162	 * by ->downconvert_worker()
163	 */
164	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
165
166	/*
167	 * Allow a lock type to add checks to determine whether it is
168	 * safe to downconvert a lock. Return 0 to re-queue the
169	 * downconvert at a later time, nonzero to continue.
170	 *
171	 * For most locks, the default checks that there are no
172	 * incompatible holders are sufficient.
173	 *
174	 * Called with the lockres spinlock held.
175	 */
176	int (*check_downconvert)(struct ocfs2_lock_res *, int);
177
178	/*
179	 * Allows a lock type to populate the lock value block. This
180	 * is called on downconvert, and when we drop a lock.
181	 *
182	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
183	 * in the flags field.
184	 *
185	 * Called with the lockres spinlock held.
186	 */
187	void (*set_lvb)(struct ocfs2_lock_res *);
188
189	/*
190	 * Called from the downconvert thread when it is determined
191	 * that a lock will be downconverted. This is called without
192	 * any locks held so the function can do work that might
193	 * schedule (syncing out data, etc).
194	 *
195	 * This should return any one of the ocfs2_unblock_action
196	 * values, depending on what it wants the thread to do.
197	 */
198	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
199
200	/*
201	 * LOCK_TYPE_* flags which describe the specific requirements
202	 * of a lock type. Descriptions of each individual flag follow.
203	 */
204	int flags;
205};
206
207/*
208 * Some locks want to "refresh" potentially stale data when a
209 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
210 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
211 * individual lockres l_flags member from the ast function. It is
212 * expected that the locking wrapper will clear the
213 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
214 */
215#define LOCK_TYPE_REQUIRES_REFRESH 0x1
216
217/*
218 * Indicate that a lock type makes use of the lock value block. The
219 * ->set_lvb lock type callback must be defined.
220 */
221#define LOCK_TYPE_USES_LVB		0x2
222
223static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
224	.get_osb	= ocfs2_get_inode_osb,
225	.flags		= 0,
226};
227
228static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
229	.get_osb	= ocfs2_get_inode_osb,
230	.check_downconvert = ocfs2_check_meta_downconvert,
231	.set_lvb	= ocfs2_set_meta_lvb,
232	.downconvert_worker = ocfs2_data_convert_worker,
233	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
234};
235
236static struct ocfs2_lock_res_ops ocfs2_super_lops = {
237	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
238};
239
240static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
241	.flags		= 0,
242};
243
244static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
245	.get_osb	= ocfs2_get_dentry_osb,
246	.post_unlock	= ocfs2_dentry_post_unlock,
247	.downconvert_worker = ocfs2_dentry_convert_worker,
248	.flags		= 0,
249};
250
251static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
252	.get_osb	= ocfs2_get_inode_osb,
253	.flags		= 0,
254};
255
256static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
257	.get_osb	= ocfs2_get_file_osb,
258	.flags		= 0,
259};
260
261static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
262{
263	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
264		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
265		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
266}
267
268static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
269{
270	BUG_ON(!ocfs2_is_inode_lock(lockres));
271
272	return (struct inode *) lockres->l_priv;
273}
274
275static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
276{
277	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
278
279	return (struct ocfs2_dentry_lock *)lockres->l_priv;
280}
281
282static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
283{
284	if (lockres->l_ops->get_osb)
285		return lockres->l_ops->get_osb(lockres);
286
287	return (struct ocfs2_super *)lockres->l_priv;
288}
289
290static int ocfs2_lock_create(struct ocfs2_super *osb,
291			     struct ocfs2_lock_res *lockres,
292			     int level,
293			     u32 dlm_flags);
294static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
295						     int wanted);
296static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
297				 struct ocfs2_lock_res *lockres,
298				 int level);
299static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
300static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
301static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
302static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
303static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
304					struct ocfs2_lock_res *lockres);
305static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
306						int convert);
307#define ocfs2_log_dlm_error(_func, _err, _lockres) do {			\
308	mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \
309	     _err, _func, _lockres->l_name);				\
310} while (0)
311static int ocfs2_downconvert_thread(void *arg);
312static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
313					struct ocfs2_lock_res *lockres);
314static int ocfs2_inode_lock_update(struct inode *inode,
315				  struct buffer_head **bh);
316static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
317static inline int ocfs2_highest_compat_lock_level(int level);
318static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
319					      int new_level);
320static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
321				  struct ocfs2_lock_res *lockres,
322				  int new_level,
323				  int lvb,
324				  unsigned int generation);
325static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
326				        struct ocfs2_lock_res *lockres);
327static int ocfs2_cancel_convert(struct ocfs2_super *osb,
328				struct ocfs2_lock_res *lockres);
329
330
331static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
332				  u64 blkno,
333				  u32 generation,
334				  char *name)
335{
336	int len;
337
338	mlog_entry_void();
339
340	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
341
342	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
343		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
344		       (long long)blkno, generation);
345
346	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
347
348	mlog(0, "built lock resource with name: %s\n", name);
349
350	mlog_exit_void();
351}
352
353static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
354
355static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
356				       struct ocfs2_dlm_debug *dlm_debug)
357{
358	mlog(0, "Add tracking for lockres %s\n", res->l_name);
359
360	spin_lock(&ocfs2_dlm_tracking_lock);
361	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
362	spin_unlock(&ocfs2_dlm_tracking_lock);
363}
364
365static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
366{
367	spin_lock(&ocfs2_dlm_tracking_lock);
368	if (!list_empty(&res->l_debug_list))
369		list_del_init(&res->l_debug_list);
370	spin_unlock(&ocfs2_dlm_tracking_lock);
371}
372
373#ifdef CONFIG_OCFS2_FS_STATS
374static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
375{
376	res->l_lock_num_prmode = 0;
377	res->l_lock_num_prmode_failed = 0;
378	res->l_lock_total_prmode = 0;
379	res->l_lock_max_prmode = 0;
380	res->l_lock_num_exmode = 0;
381	res->l_lock_num_exmode_failed = 0;
382	res->l_lock_total_exmode = 0;
383	res->l_lock_max_exmode = 0;
384	res->l_lock_refresh = 0;
385}
386
387static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
388				    struct ocfs2_mask_waiter *mw, int ret)
389{
390	unsigned long long *num, *sum;
391	unsigned int *max, *failed;
392	struct timespec ts = current_kernel_time();
393	unsigned long long time = timespec_to_ns(&ts) - mw->mw_lock_start;
394
395	if (level == LKM_PRMODE) {
396		num = &res->l_lock_num_prmode;
397		sum = &res->l_lock_total_prmode;
398		max = &res->l_lock_max_prmode;
399		failed = &res->l_lock_num_prmode_failed;
400	} else if (level == LKM_EXMODE) {
401		num = &res->l_lock_num_exmode;
402		sum = &res->l_lock_total_exmode;
403		max = &res->l_lock_max_exmode;
404		failed = &res->l_lock_num_exmode_failed;
405	} else
406		return;
407
408	(*num)++;
409	(*sum) += time;
410	if (time > *max)
411		*max = time;
412	if (ret)
413		(*failed)++;
414}
415
416static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
417{
418	lockres->l_lock_refresh++;
419}
420
421static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
422{
423	struct timespec ts = current_kernel_time();
424	mw->mw_lock_start = timespec_to_ns(&ts);
425}
426#else
427static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
428{
429}
430static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
431			   int level, struct ocfs2_mask_waiter *mw, int ret)
432{
433}
434static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
435{
436}
437static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
438{
439}
440#endif
441
442static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
443				       struct ocfs2_lock_res *res,
444				       enum ocfs2_lock_type type,
445				       struct ocfs2_lock_res_ops *ops,
446				       void *priv)
447{
448	res->l_type          = type;
449	res->l_ops           = ops;
450	res->l_priv          = priv;
451
452	res->l_level         = DLM_LOCK_IV;
453	res->l_requested     = DLM_LOCK_IV;
454	res->l_blocking      = DLM_LOCK_IV;
455	res->l_action        = OCFS2_AST_INVALID;
456	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
457
458	res->l_flags         = OCFS2_LOCK_INITIALIZED;
459
460	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
461
462	ocfs2_init_lock_stats(res);
463}
464
465void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
466{
467	/* This also clears out the lock status block */
468	memset(res, 0, sizeof(struct ocfs2_lock_res));
469	spin_lock_init(&res->l_lock);
470	init_waitqueue_head(&res->l_event);
471	INIT_LIST_HEAD(&res->l_blocked_list);
472	INIT_LIST_HEAD(&res->l_mask_waiters);
473}
474
475void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
476			       enum ocfs2_lock_type type,
477			       unsigned int generation,
478			       struct inode *inode)
479{
480	struct ocfs2_lock_res_ops *ops;
481
482	switch(type) {
483		case OCFS2_LOCK_TYPE_RW:
484			ops = &ocfs2_inode_rw_lops;
485			break;
486		case OCFS2_LOCK_TYPE_META:
487			ops = &ocfs2_inode_inode_lops;
488			break;
489		case OCFS2_LOCK_TYPE_OPEN:
490			ops = &ocfs2_inode_open_lops;
491			break;
492		default:
493			mlog_bug_on_msg(1, "type: %d\n", type);
494			ops = NULL; /* thanks, gcc */
495			break;
496	};
497
498	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
499			      generation, res->l_name);
500	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
501}
502
503static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
504{
505	struct inode *inode = ocfs2_lock_res_inode(lockres);
506
507	return OCFS2_SB(inode->i_sb);
508}
509
510static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
511{
512	struct ocfs2_file_private *fp = lockres->l_priv;
513
514	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
515}
516
517static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
518{
519	__be64 inode_blkno_be;
520
521	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
522	       sizeof(__be64));
523
524	return be64_to_cpu(inode_blkno_be);
525}
526
527static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
528{
529	struct ocfs2_dentry_lock *dl = lockres->l_priv;
530
531	return OCFS2_SB(dl->dl_inode->i_sb);
532}
533
534void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
535				u64 parent, struct inode *inode)
536{
537	int len;
538	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
539	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
540	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
541
542	ocfs2_lock_res_init_once(lockres);
543
544	/*
545	 * Unfortunately, the standard lock naming scheme won't work
546	 * here because we have two 16 byte values to use. Instead,
547	 * we'll stuff the inode number as a binary value. We still
548	 * want error prints to show something without garbling the
549	 * display, so drop a null byte in there before the inode
550	 * number. A future version of OCFS2 will likely use all
551	 * binary lock names. The stringified names have been a
552	 * tremendous aid in debugging, but now that the debugfs
553	 * interface exists, we can mangle things there if need be.
554	 *
555	 * NOTE: We also drop the standard "pad" value (the total lock
556	 * name size stays the same though - the last part is all
557	 * zeros due to the memset in ocfs2_lock_res_init_once()
558	 */
559	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
560		       "%c%016llx",
561		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
562		       (long long)parent);
563
564	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
565
566	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
567	       sizeof(__be64));
568
569	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
570				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
571				   dl);
572}
573
574static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
575				      struct ocfs2_super *osb)
576{
577	/* Superblock lockres doesn't come from a slab so we call init
578	 * once on it manually.  */
579	ocfs2_lock_res_init_once(res);
580	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
581			      0, res->l_name);
582	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
583				   &ocfs2_super_lops, osb);
584}
585
586static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
587				       struct ocfs2_super *osb)
588{
589	/* Rename lockres doesn't come from a slab so we call init
590	 * once on it manually.  */
591	ocfs2_lock_res_init_once(res);
592	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
593	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
594				   &ocfs2_rename_lops, osb);
595}
596
597void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
598			      struct ocfs2_file_private *fp)
599{
600	struct inode *inode = fp->fp_file->f_mapping->host;
601	struct ocfs2_inode_info *oi = OCFS2_I(inode);
602
603	ocfs2_lock_res_init_once(lockres);
604	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
605			      inode->i_generation, lockres->l_name);
606	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
607				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
608				   fp);
609	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
610}
611
612void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
613{
614	mlog_entry_void();
615
616	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
617		return;
618
619	ocfs2_remove_lockres_tracking(res);
620
621	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
622			"Lockres %s is on the blocked list\n",
623			res->l_name);
624	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
625			"Lockres %s has mask waiters pending\n",
626			res->l_name);
627	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
628			"Lockres %s is locked\n",
629			res->l_name);
630	mlog_bug_on_msg(res->l_ro_holders,
631			"Lockres %s has %u ro holders\n",
632			res->l_name, res->l_ro_holders);
633	mlog_bug_on_msg(res->l_ex_holders,
634			"Lockres %s has %u ex holders\n",
635			res->l_name, res->l_ex_holders);
636
637	/* Need to clear out the lock status block for the dlm */
638	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
639
640	res->l_flags = 0UL;
641	mlog_exit_void();
642}
643
644static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
645				     int level)
646{
647	mlog_entry_void();
648
649	BUG_ON(!lockres);
650
651	switch(level) {
652	case DLM_LOCK_EX:
653		lockres->l_ex_holders++;
654		break;
655	case DLM_LOCK_PR:
656		lockres->l_ro_holders++;
657		break;
658	default:
659		BUG();
660	}
661
662	mlog_exit_void();
663}
664
665static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
666				     int level)
667{
668	mlog_entry_void();
669
670	BUG_ON(!lockres);
671
672	switch(level) {
673	case DLM_LOCK_EX:
674		BUG_ON(!lockres->l_ex_holders);
675		lockres->l_ex_holders--;
676		break;
677	case DLM_LOCK_PR:
678		BUG_ON(!lockres->l_ro_holders);
679		lockres->l_ro_holders--;
680		break;
681	default:
682		BUG();
683	}
684	mlog_exit_void();
685}
686
687/* WARNING: This function lives in a world where the only three lock
688 * levels are EX, PR, and NL. It *will* have to be adjusted when more
689 * lock types are added. */
690static inline int ocfs2_highest_compat_lock_level(int level)
691{
692	int new_level = DLM_LOCK_EX;
693
694	if (level == DLM_LOCK_EX)
695		new_level = DLM_LOCK_NL;
696	else if (level == DLM_LOCK_PR)
697		new_level = DLM_LOCK_PR;
698	return new_level;
699}
700
701static void lockres_set_flags(struct ocfs2_lock_res *lockres,
702			      unsigned long newflags)
703{
704	struct ocfs2_mask_waiter *mw, *tmp;
705
706 	assert_spin_locked(&lockres->l_lock);
707
708	lockres->l_flags = newflags;
709
710	list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
711		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
712			continue;
713
714		list_del_init(&mw->mw_item);
715		mw->mw_status = 0;
716		complete(&mw->mw_complete);
717	}
718}
719static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
720{
721	lockres_set_flags(lockres, lockres->l_flags | or);
722}
723static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
724				unsigned long clear)
725{
726	lockres_set_flags(lockres, lockres->l_flags & ~clear);
727}
728
729static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
730{
731	mlog_entry_void();
732
733	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
734	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
735	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
736	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
737
738	lockres->l_level = lockres->l_requested;
739	if (lockres->l_level <=
740	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
741		lockres->l_blocking = DLM_LOCK_NL;
742		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
743	}
744	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
745
746	mlog_exit_void();
747}
748
749static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
750{
751	mlog_entry_void();
752
753	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
754	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
755
756	/* Convert from RO to EX doesn't really need anything as our
757	 * information is already up to data. Convert from NL to
758	 * *anything* however should mark ourselves as needing an
759	 * update */
760	if (lockres->l_level == DLM_LOCK_NL &&
761	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
762		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
763
764	lockres->l_level = lockres->l_requested;
765	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
766
767	mlog_exit_void();
768}
769
770static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
771{
772	mlog_entry_void();
773
774	BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
775	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
776
777	if (lockres->l_requested > DLM_LOCK_NL &&
778	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
779	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
780		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
781
782	lockres->l_level = lockres->l_requested;
783	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
784	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
785
786	mlog_exit_void();
787}
788
789static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
790				     int level)
791{
792	int needs_downconvert = 0;
793	mlog_entry_void();
794
795	assert_spin_locked(&lockres->l_lock);
796
797	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
798
799	if (level > lockres->l_blocking) {
800		/* only schedule a downconvert if we haven't already scheduled
801		 * one that goes low enough to satisfy the level we're
802		 * blocking.  this also catches the case where we get
803		 * duplicate BASTs */
804		if (ocfs2_highest_compat_lock_level(level) <
805		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
806			needs_downconvert = 1;
807
808		lockres->l_blocking = level;
809	}
810
811	mlog_exit(needs_downconvert);
812	return needs_downconvert;
813}
814
815/*
816 * OCFS2_LOCK_PENDING and l_pending_gen.
817 *
818 * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
819 * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
820 * for more details on the race.
821 *
822 * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
823 * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
824 * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
825 * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
826 * the caller is going to try to clear PENDING again.  If nothing else is
827 * happening, __lockres_clear_pending() sees PENDING is unset and does
828 * nothing.
829 *
830 * But what if another path (eg downconvert thread) has just started a
831 * new locking action?  The other path has re-set PENDING.  Our path
832 * cannot clear PENDING, because that will re-open the original race
833 * window.
834 *
835 * [Example]
836 *
837 * ocfs2_meta_lock()
838 *  ocfs2_cluster_lock()
839 *   set BUSY
840 *   set PENDING
841 *   drop l_lock
842 *   ocfs2_dlm_lock()
843 *    ocfs2_locking_ast()		ocfs2_downconvert_thread()
844 *     clear PENDING			 ocfs2_unblock_lock()
845 *					  take_l_lock
846 *					  !BUSY
847 *					  ocfs2_prepare_downconvert()
848 *					   set BUSY
849 *					   set PENDING
850 *					  drop l_lock
851 *   take l_lock
852 *   clear PENDING
853 *   drop l_lock
854 *			<window>
855 *					  ocfs2_dlm_lock()
856 *
857 * So as you can see, we now have a window where l_lock is not held,
858 * PENDING is not set, and ocfs2_dlm_lock() has not been called.
859 *
860 * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
861 * set by ocfs2_prepare_downconvert().  That wasn't nice.
862 *
863 * To solve this we introduce l_pending_gen.  A call to
864 * lockres_clear_pending() will only do so when it is passed a generation
865 * number that matches the lockres.  lockres_set_pending() will return the
866 * current generation number.  When ocfs2_cluster_lock() goes to clear
867 * PENDING, it passes the generation it got from set_pending().  In our
868 * example above, the generation numbers will *not* match.  Thus,
869 * ocfs2_cluster_lock() will not clear the PENDING set by
870 * ocfs2_prepare_downconvert().
871 */
872
873/* Unlocked version for ocfs2_locking_ast() */
874static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
875				    unsigned int generation,
876				    struct ocfs2_super *osb)
877{
878	assert_spin_locked(&lockres->l_lock);
879
880	/*
881	 * The ast and locking functions can race us here.  The winner
882	 * will clear pending, the loser will not.
883	 */
884	if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
885	    (lockres->l_pending_gen != generation))
886		return;
887
888	lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
889	lockres->l_pending_gen++;
890
891	/*
892	 * The downconvert thread may have skipped us because we
893	 * were PENDING.  Wake it up.
894	 */
895	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
896		ocfs2_wake_downconvert_thread(osb);
897}
898
899/* Locked version for callers of ocfs2_dlm_lock() */
900static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
901				  unsigned int generation,
902				  struct ocfs2_super *osb)
903{
904	unsigned long flags;
905
906	spin_lock_irqsave(&lockres->l_lock, flags);
907	__lockres_clear_pending(lockres, generation, osb);
908	spin_unlock_irqrestore(&lockres->l_lock, flags);
909}
910
911static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
912{
913	assert_spin_locked(&lockres->l_lock);
914	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
915
916	lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
917
918	return lockres->l_pending_gen;
919}
920
921
922static void ocfs2_blocking_ast(void *opaque, int level)
923{
924	struct ocfs2_lock_res *lockres = opaque;
925	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
926	int needs_downconvert;
927	unsigned long flags;
928
929	BUG_ON(level <= DLM_LOCK_NL);
930
931	mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
932	     lockres->l_name, level, lockres->l_level,
933	     ocfs2_lock_type_string(lockres->l_type));
934
935	/*
936	 * We can skip the bast for locks which don't enable caching -
937	 * they'll be dropped at the earliest possible time anyway.
938	 */
939	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
940		return;
941
942	spin_lock_irqsave(&lockres->l_lock, flags);
943	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
944	if (needs_downconvert)
945		ocfs2_schedule_blocked_lock(osb, lockres);
946	spin_unlock_irqrestore(&lockres->l_lock, flags);
947
948	wake_up(&lockres->l_event);
949
950	ocfs2_wake_downconvert_thread(osb);
951}
952
953static void ocfs2_locking_ast(void *opaque)
954{
955	struct ocfs2_lock_res *lockres = opaque;
956	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
957	unsigned long flags;
958	int status;
959
960	spin_lock_irqsave(&lockres->l_lock, flags);
961
962	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
963
964	if (status == -EAGAIN) {
965		lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
966		goto out;
967	}
968
969	if (status) {
970		mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
971		     lockres->l_name, status);
972		spin_unlock_irqrestore(&lockres->l_lock, flags);
973		return;
974	}
975
976	switch(lockres->l_action) {
977	case OCFS2_AST_ATTACH:
978		ocfs2_generic_handle_attach_action(lockres);
979		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
980		break;
981	case OCFS2_AST_CONVERT:
982		ocfs2_generic_handle_convert_action(lockres);
983		break;
984	case OCFS2_AST_DOWNCONVERT:
985		ocfs2_generic_handle_downconvert_action(lockres);
986		break;
987	default:
988		mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
989		     "lockres flags = 0x%lx, unlock action: %u\n",
990		     lockres->l_name, lockres->l_action, lockres->l_flags,
991		     lockres->l_unlock_action);
992		BUG();
993	}
994out:
995	/* set it to something invalid so if we get called again we
996	 * can catch it. */
997	lockres->l_action = OCFS2_AST_INVALID;
998
999	/* Did we try to cancel this lock?  Clear that state */
1000	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1001		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1002
1003	/*
1004	 * We may have beaten the locking functions here.  We certainly
1005	 * know that dlm_lock() has been called :-)
1006	 * Because we can't have two lock calls in flight at once, we
1007	 * can use lockres->l_pending_gen.
1008	 */
1009	__lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1010
1011	wake_up(&lockres->l_event);
1012	spin_unlock_irqrestore(&lockres->l_lock, flags);
1013}
1014
1015static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1016						int convert)
1017{
1018	unsigned long flags;
1019
1020	mlog_entry_void();
1021	spin_lock_irqsave(&lockres->l_lock, flags);
1022	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1023	if (convert)
1024		lockres->l_action = OCFS2_AST_INVALID;
1025	else
1026		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1027	spin_unlock_irqrestore(&lockres->l_lock, flags);
1028
1029	wake_up(&lockres->l_event);
1030	mlog_exit_void();
1031}
1032
1033/* Note: If we detect another process working on the lock (i.e.,
1034 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1035 * to do the right thing in that case.
1036 */
1037static int ocfs2_lock_create(struct ocfs2_super *osb,
1038			     struct ocfs2_lock_res *lockres,
1039			     int level,
1040			     u32 dlm_flags)
1041{
1042	int ret = 0;
1043	unsigned long flags;
1044	unsigned int gen;
1045
1046	mlog_entry_void();
1047
1048	mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1049	     dlm_flags);
1050
1051	spin_lock_irqsave(&lockres->l_lock, flags);
1052	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1053	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1054		spin_unlock_irqrestore(&lockres->l_lock, flags);
1055		goto bail;
1056	}
1057
1058	lockres->l_action = OCFS2_AST_ATTACH;
1059	lockres->l_requested = level;
1060	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1061	gen = lockres_set_pending(lockres);
1062	spin_unlock_irqrestore(&lockres->l_lock, flags);
1063
1064	ret = ocfs2_dlm_lock(osb->cconn,
1065			     level,
1066			     &lockres->l_lksb,
1067			     dlm_flags,
1068			     lockres->l_name,
1069			     OCFS2_LOCK_ID_MAX_LEN - 1,
1070			     lockres);
1071	lockres_clear_pending(lockres, gen, osb);
1072	if (ret) {
1073		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1074		ocfs2_recover_from_dlm_error(lockres, 1);
1075	}
1076
1077	mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1078
1079bail:
1080	mlog_exit(ret);
1081	return ret;
1082}
1083
1084static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1085					int flag)
1086{
1087	unsigned long flags;
1088	int ret;
1089
1090	spin_lock_irqsave(&lockres->l_lock, flags);
1091	ret = lockres->l_flags & flag;
1092	spin_unlock_irqrestore(&lockres->l_lock, flags);
1093
1094	return ret;
1095}
1096
1097static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1098
1099{
1100	wait_event(lockres->l_event,
1101		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1102}
1103
1104static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1105
1106{
1107	wait_event(lockres->l_event,
1108		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1109}
1110
1111/* predict what lock level we'll be dropping down to on behalf
1112 * of another node, and return true if the currently wanted
1113 * level will be compatible with it. */
1114static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1115						     int wanted)
1116{
1117	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1118
1119	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1120}
1121
1122static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1123{
1124	INIT_LIST_HEAD(&mw->mw_item);
1125	init_completion(&mw->mw_complete);
1126	ocfs2_init_start_time(mw);
1127}
1128
1129static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1130{
1131	wait_for_completion(&mw->mw_complete);
1132	/* Re-arm the completion in case we want to wait on it again */
1133	INIT_COMPLETION(mw->mw_complete);
1134	return mw->mw_status;
1135}
1136
1137static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1138				    struct ocfs2_mask_waiter *mw,
1139				    unsigned long mask,
1140				    unsigned long goal)
1141{
1142	BUG_ON(!list_empty(&mw->mw_item));
1143
1144	assert_spin_locked(&lockres->l_lock);
1145
1146	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1147	mw->mw_mask = mask;
1148	mw->mw_goal = goal;
1149}
1150
1151/* returns 0 if the mw that was removed was already satisfied, -EBUSY
1152 * if the mask still hadn't reached its goal */
1153static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1154				      struct ocfs2_mask_waiter *mw)
1155{
1156	unsigned long flags;
1157	int ret = 0;
1158
1159	spin_lock_irqsave(&lockres->l_lock, flags);
1160	if (!list_empty(&mw->mw_item)) {
1161		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1162			ret = -EBUSY;
1163
1164		list_del_init(&mw->mw_item);
1165		init_completion(&mw->mw_complete);
1166	}
1167	spin_unlock_irqrestore(&lockres->l_lock, flags);
1168
1169	return ret;
1170
1171}
1172
1173static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1174					     struct ocfs2_lock_res *lockres)
1175{
1176	int ret;
1177
1178	ret = wait_for_completion_interruptible(&mw->mw_complete);
1179	if (ret)
1180		lockres_remove_mask_waiter(lockres, mw);
1181	else
1182		ret = mw->mw_status;
1183	/* Re-arm the completion in case we want to wait on it again */
1184	INIT_COMPLETION(mw->mw_complete);
1185	return ret;
1186}
1187
1188static int ocfs2_cluster_lock(struct ocfs2_super *osb,
1189			      struct ocfs2_lock_res *lockres,
1190			      int level,
1191			      u32 lkm_flags,
1192			      int arg_flags)
1193{
1194	struct ocfs2_mask_waiter mw;
1195	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1196	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1197	unsigned long flags;
1198	unsigned int gen;
1199	int noqueue_attempted = 0;
1200
1201	mlog_entry_void();
1202
1203	ocfs2_init_mask_waiter(&mw);
1204
1205	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1206		lkm_flags |= DLM_LKF_VALBLK;
1207
1208again:
1209	wait = 0;
1210
1211	if (catch_signals && signal_pending(current)) {
1212		ret = -ERESTARTSYS;
1213		goto out;
1214	}
1215
1216	spin_lock_irqsave(&lockres->l_lock, flags);
1217
1218	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1219			"Cluster lock called on freeing lockres %s! flags "
1220			"0x%lx\n", lockres->l_name, lockres->l_flags);
1221
1222	/* We only compare against the currently granted level
1223	 * here. If the lock is blocked waiting on a downconvert,
1224	 * we'll get caught below. */
1225	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1226	    level > lockres->l_level) {
1227		/* is someone sitting in dlm_lock? If so, wait on
1228		 * them. */
1229		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1230		wait = 1;
1231		goto unlock;
1232	}
1233
1234	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1235	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1236		/* is the lock is currently blocked on behalf of
1237		 * another node */
1238		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1239		wait = 1;
1240		goto unlock;
1241	}
1242
1243	if (level > lockres->l_level) {
1244		if (noqueue_attempted > 0) {
1245			ret = -EAGAIN;
1246			goto unlock;
1247		}
1248		if (lkm_flags & DLM_LKF_NOQUEUE)
1249			noqueue_attempted = 1;
1250
1251		if (lockres->l_action != OCFS2_AST_INVALID)
1252			mlog(ML_ERROR, "lockres %s has action %u pending\n",
1253			     lockres->l_name, lockres->l_action);
1254
1255		if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1256			lockres->l_action = OCFS2_AST_ATTACH;
1257			lkm_flags &= ~DLM_LKF_CONVERT;
1258		} else {
1259			lockres->l_action = OCFS2_AST_CONVERT;
1260			lkm_flags |= DLM_LKF_CONVERT;
1261		}
1262
1263		lockres->l_requested = level;
1264		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1265		gen = lockres_set_pending(lockres);
1266		spin_unlock_irqrestore(&lockres->l_lock, flags);
1267
1268		BUG_ON(level == DLM_LOCK_IV);
1269		BUG_ON(level == DLM_LOCK_NL);
1270
1271		mlog(0, "lock %s, convert from %d to level = %d\n",
1272		     lockres->l_name, lockres->l_level, level);
1273
1274		/* call dlm_lock to upgrade lock now */
1275		ret = ocfs2_dlm_lock(osb->cconn,
1276				     level,
1277				     &lockres->l_lksb,
1278				     lkm_flags,
1279				     lockres->l_name,
1280				     OCFS2_LOCK_ID_MAX_LEN - 1,
1281				     lockres);
1282		lockres_clear_pending(lockres, gen, osb);
1283		if (ret) {
1284			if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1285			    (ret != -EAGAIN)) {
1286				ocfs2_log_dlm_error("ocfs2_dlm_lock",
1287						    ret, lockres);
1288			}
1289			ocfs2_recover_from_dlm_error(lockres, 1);
1290			goto out;
1291		}
1292
1293		mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n",
1294		     lockres->l_name);
1295
1296		/* At this point we've gone inside the dlm and need to
1297		 * complete our work regardless. */
1298		catch_signals = 0;
1299
1300		/* wait for busy to clear and carry on */
1301		goto again;
1302	}
1303
1304	/* Ok, if we get here then we're good to go. */
1305	ocfs2_inc_holders(lockres, level);
1306
1307	ret = 0;
1308unlock:
1309	spin_unlock_irqrestore(&lockres->l_lock, flags);
1310out:
1311	/*
1312	 * This is helping work around a lock inversion between the page lock
1313	 * and dlm locks.  One path holds the page lock while calling aops
1314	 * which block acquiring dlm locks.  The voting thread holds dlm
1315	 * locks while acquiring page locks while down converting data locks.
1316	 * This block is helping an aop path notice the inversion and back
1317	 * off to unlock its page lock before trying the dlm lock again.
1318	 */
1319	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1320	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1321		wait = 0;
1322		if (lockres_remove_mask_waiter(lockres, &mw))
1323			ret = -EAGAIN;
1324		else
1325			goto again;
1326	}
1327	if (wait) {
1328		ret = ocfs2_wait_for_mask(&mw);
1329		if (ret == 0)
1330			goto again;
1331		mlog_errno(ret);
1332	}
1333	ocfs2_update_lock_stats(lockres, level, &mw, ret);
1334
1335	mlog_exit(ret);
1336	return ret;
1337}
1338
1339static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1340				 struct ocfs2_lock_res *lockres,
1341				 int level)
1342{
1343	unsigned long flags;
1344
1345	mlog_entry_void();
1346	spin_lock_irqsave(&lockres->l_lock, flags);
1347	ocfs2_dec_holders(lockres, level);
1348	ocfs2_downconvert_on_unlock(osb, lockres);
1349	spin_unlock_irqrestore(&lockres->l_lock, flags);
1350	mlog_exit_void();
1351}
1352
1353static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1354				 struct ocfs2_lock_res *lockres,
1355				 int ex,
1356				 int local)
1357{
1358	int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1359	unsigned long flags;
1360	u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1361
1362	spin_lock_irqsave(&lockres->l_lock, flags);
1363	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1364	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1365	spin_unlock_irqrestore(&lockres->l_lock, flags);
1366
1367	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1368}
1369
1370/* Grants us an EX lock on the data and metadata resources, skipping
1371 * the normal cluster directory lookup. Use this ONLY on newly created
1372 * inodes which other nodes can't possibly see, and which haven't been
1373 * hashed in the inode hash yet. This can give us a good performance
1374 * increase as it'll skip the network broadcast normally associated
1375 * with creating a new lock resource. */
1376int ocfs2_create_new_inode_locks(struct inode *inode)
1377{
1378	int ret;
1379	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1380
1381	BUG_ON(!inode);
1382	BUG_ON(!ocfs2_inode_is_new(inode));
1383
1384	mlog_entry_void();
1385
1386	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1387
1388	/* NOTE: That we don't increment any of the holder counts, nor
1389	 * do we add anything to a journal handle. Since this is
1390	 * supposed to be a new inode which the cluster doesn't know
1391	 * about yet, there is no need to.  As far as the LVB handling
1392	 * is concerned, this is basically like acquiring an EX lock
1393	 * on a resource which has an invalid one -- we'll set it
1394	 * valid when we release the EX. */
1395
1396	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1397	if (ret) {
1398		mlog_errno(ret);
1399		goto bail;
1400	}
1401
1402	/*
1403	 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1404	 * don't use a generation in their lock names.
1405	 */
1406	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1407	if (ret) {
1408		mlog_errno(ret);
1409		goto bail;
1410	}
1411
1412	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1413	if (ret) {
1414		mlog_errno(ret);
1415		goto bail;
1416	}
1417
1418bail:
1419	mlog_exit(ret);
1420	return ret;
1421}
1422
1423int ocfs2_rw_lock(struct inode *inode, int write)
1424{
1425	int status, level;
1426	struct ocfs2_lock_res *lockres;
1427	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1428
1429	BUG_ON(!inode);
1430
1431	mlog_entry_void();
1432
1433	mlog(0, "inode %llu take %s RW lock\n",
1434	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1435	     write ? "EXMODE" : "PRMODE");
1436
1437	if (ocfs2_mount_local(osb))
1438		return 0;
1439
1440	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1441
1442	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1443
1444	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1445				    0);
1446	if (status < 0)
1447		mlog_errno(status);
1448
1449	mlog_exit(status);
1450	return status;
1451}
1452
1453void ocfs2_rw_unlock(struct inode *inode, int write)
1454{
1455	int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1456	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1457	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1458
1459	mlog_entry_void();
1460
1461	mlog(0, "inode %llu drop %s RW lock\n",
1462	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1463	     write ? "EXMODE" : "PRMODE");
1464
1465	if (!ocfs2_mount_local(osb))
1466		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1467
1468	mlog_exit_void();
1469}
1470
1471/*
1472 * ocfs2_open_lock always get PR mode lock.
1473 */
1474int ocfs2_open_lock(struct inode *inode)
1475{
1476	int status = 0;
1477	struct ocfs2_lock_res *lockres;
1478	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1479
1480	BUG_ON(!inode);
1481
1482	mlog_entry_void();
1483
1484	mlog(0, "inode %llu take PRMODE open lock\n",
1485	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1486
1487	if (ocfs2_mount_local(osb))
1488		goto out;
1489
1490	lockres = &OCFS2_I(inode)->ip_open_lockres;
1491
1492	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1493				    DLM_LOCK_PR, 0, 0);
1494	if (status < 0)
1495		mlog_errno(status);
1496
1497out:
1498	mlog_exit(status);
1499	return status;
1500}
1501
1502int ocfs2_try_open_lock(struct inode *inode, int write)
1503{
1504	int status = 0, level;
1505	struct ocfs2_lock_res *lockres;
1506	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1507
1508	BUG_ON(!inode);
1509
1510	mlog_entry_void();
1511
1512	mlog(0, "inode %llu try to take %s open lock\n",
1513	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1514	     write ? "EXMODE" : "PRMODE");
1515
1516	if (ocfs2_mount_local(osb))
1517		goto out;
1518
1519	lockres = &OCFS2_I(inode)->ip_open_lockres;
1520
1521	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1522
1523	/*
1524	 * The file system may already holding a PRMODE/EXMODE open lock.
1525	 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1526	 * other nodes and the -EAGAIN will indicate to the caller that
1527	 * this inode is still in use.
1528	 */
1529	status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1530				    level, DLM_LKF_NOQUEUE, 0);
1531
1532out:
1533	mlog_exit(status);
1534	return status;
1535}
1536
1537/*
1538 * ocfs2_open_unlock unlock PR and EX mode open locks.
1539 */
1540void ocfs2_open_unlock(struct inode *inode)
1541{
1542	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1543	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1544
1545	mlog_entry_void();
1546
1547	mlog(0, "inode %llu drop open lock\n",
1548	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1549
1550	if (ocfs2_mount_local(osb))
1551		goto out;
1552
1553	if(lockres->l_ro_holders)
1554		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1555				     DLM_LOCK_PR);
1556	if(lockres->l_ex_holders)
1557		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1558				     DLM_LOCK_EX);
1559
1560out:
1561	mlog_exit_void();
1562}
1563
1564static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1565				     int level)
1566{
1567	int ret;
1568	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1569	unsigned long flags;
1570	struct ocfs2_mask_waiter mw;
1571
1572	ocfs2_init_mask_waiter(&mw);
1573
1574retry_cancel:
1575	spin_lock_irqsave(&lockres->l_lock, flags);
1576	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1577		ret = ocfs2_prepare_cancel_convert(osb, lockres);
1578		if (ret) {
1579			spin_unlock_irqrestore(&lockres->l_lock, flags);
1580			ret = ocfs2_cancel_convert(osb, lockres);
1581			if (ret < 0) {
1582				mlog_errno(ret);
1583				goto out;
1584			}
1585			goto retry_cancel;
1586		}
1587		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1588		spin_unlock_irqrestore(&lockres->l_lock, flags);
1589
1590		ocfs2_wait_for_mask(&mw);
1591		goto retry_cancel;
1592	}
1593
1594	ret = -ERESTARTSYS;
1595	/*
1596	 * We may still have gotten the lock, in which case there's no
1597	 * point to restarting the syscall.
1598	 */
1599	if (lockres->l_level == level)
1600		ret = 0;
1601
1602	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1603	     lockres->l_flags, lockres->l_level, lockres->l_action);
1604
1605	spin_unlock_irqrestore(&lockres->l_lock, flags);
1606
1607out:
1608	return ret;
1609}
1610
1611/*
1612 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1613 * flock() calls. The locking approach this requires is sufficiently
1614 * different from all other cluster lock types that we implement a
1615 * seperate path to the "low-level" dlm calls. In particular:
1616 *
1617 * - No optimization of lock levels is done - we take at exactly
1618 *   what's been requested.
1619 *
1620 * - No lock caching is employed. We immediately downconvert to
1621 *   no-lock at unlock time. This also means flock locks never go on
1622 *   the blocking list).
1623 *
1624 * - Since userspace can trivially deadlock itself with flock, we make
1625 *   sure to allow cancellation of a misbehaving applications flock()
1626 *   request.
1627 *
1628 * - Access to any flock lockres doesn't require concurrency, so we
1629 *   can simplify the code by requiring the caller to guarantee
1630 *   serialization of dlmglue flock calls.
1631 */
1632int ocfs2_file_lock(struct file *file, int ex, int trylock)
1633{
1634	int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1635	unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1636	unsigned long flags;
1637	struct ocfs2_file_private *fp = file->private_data;
1638	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1639	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1640	struct ocfs2_mask_waiter mw;
1641
1642	ocfs2_init_mask_waiter(&mw);
1643
1644	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1645	    (lockres->l_level > DLM_LOCK_NL)) {
1646		mlog(ML_ERROR,
1647		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1648		     "level: %u\n", lockres->l_name, lockres->l_flags,
1649		     lockres->l_level);
1650		return -EINVAL;
1651	}
1652
1653	spin_lock_irqsave(&lockres->l_lock, flags);
1654	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1655		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1656		spin_unlock_irqrestore(&lockres->l_lock, flags);
1657
1658		/*
1659		 * Get the lock at NLMODE to start - that way we
1660		 * can cancel the upconvert request if need be.
1661		 */
1662		ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1663		if (ret < 0) {
1664			mlog_errno(ret);
1665			goto out;
1666		}
1667
1668		ret = ocfs2_wait_for_mask(&mw);
1669		if (ret) {
1670			mlog_errno(ret);
1671			goto out;
1672		}
1673		spin_lock_irqsave(&lockres->l_lock, flags);
1674	}
1675
1676	lockres->l_action = OCFS2_AST_CONVERT;
1677	lkm_flags |= DLM_LKF_CONVERT;
1678	lockres->l_requested = level;
1679	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1680
1681	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1682	spin_unlock_irqrestore(&lockres->l_lock, flags);
1683
1684	ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
1685			     lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1686			     lockres);
1687	if (ret) {
1688		if (!trylock || (ret != -EAGAIN)) {
1689			ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1690			ret = -EINVAL;
1691		}
1692
1693		ocfs2_recover_from_dlm_error(lockres, 1);
1694		lockres_remove_mask_waiter(lockres, &mw);
1695		goto out;
1696	}
1697
1698	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1699	if (ret == -ERESTARTSYS) {
1700		/*
1701		 * Userspace can cause deadlock itself with
1702		 * flock(). Current behavior locally is to allow the
1703		 * deadlock, but abort the system call if a signal is
1704		 * received. We follow this example, otherwise a
1705		 * poorly written program could sit in kernel until
1706		 * reboot.
1707		 *
1708		 * Handling this is a bit more complicated for Ocfs2
1709		 * though. We can't exit this function with an
1710		 * outstanding lock request, so a cancel convert is
1711		 * required. We intentionally overwrite 'ret' - if the
1712		 * cancel fails and the lock was granted, it's easier
1713		 * to just bubble sucess back up to the user.
1714		 */
1715		ret = ocfs2_flock_handle_signal(lockres, level);
1716	} else if (!ret && (level > lockres->l_level)) {
1717		/* Trylock failed asynchronously */
1718		BUG_ON(!trylock);
1719		ret = -EAGAIN;
1720	}
1721
1722out:
1723
1724	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1725	     lockres->l_name, ex, trylock, ret);
1726	return ret;
1727}
1728
1729void ocfs2_file_unlock(struct file *file)
1730{
1731	int ret;
1732	unsigned int gen;
1733	unsigned long flags;
1734	struct ocfs2_file_private *fp = file->private_data;
1735	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1736	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1737	struct ocfs2_mask_waiter mw;
1738
1739	ocfs2_init_mask_waiter(&mw);
1740
1741	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1742		return;
1743
1744	if (lockres->l_level == DLM_LOCK_NL)
1745		return;
1746
1747	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1748	     lockres->l_name, lockres->l_flags, lockres->l_level,
1749	     lockres->l_action);
1750
1751	spin_lock_irqsave(&lockres->l_lock, flags);
1752	/*
1753	 * Fake a blocking ast for the downconvert code.
1754	 */
1755	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1756	lockres->l_blocking = DLM_LOCK_EX;
1757
1758	gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
1759	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1760	spin_unlock_irqrestore(&lockres->l_lock, flags);
1761
1762	ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
1763	if (ret) {
1764		mlog_errno(ret);
1765		return;
1766	}
1767
1768	ret = ocfs2_wait_for_mask(&mw);
1769	if (ret)
1770		mlog_errno(ret);
1771}
1772
1773static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1774					struct ocfs2_lock_res *lockres)
1775{
1776	int kick = 0;
1777
1778	mlog_entry_void();
1779
1780	/* If we know that another node is waiting on our lock, kick
1781	 * the downconvert thread * pre-emptively when we reach a release
1782	 * condition. */
1783	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1784		switch(lockres->l_blocking) {
1785		case DLM_LOCK_EX:
1786			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1787				kick = 1;
1788			break;
1789		case DLM_LOCK_PR:
1790			if (!lockres->l_ex_holders)
1791				kick = 1;
1792			break;
1793		default:
1794			BUG();
1795		}
1796	}
1797
1798	if (kick)
1799		ocfs2_wake_downconvert_thread(osb);
1800
1801	mlog_exit_void();
1802}
1803
1804#define OCFS2_SEC_BITS   34
1805#define OCFS2_SEC_SHIFT  (64 - 34)
1806#define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1807
1808/* LVB only has room for 64 bits of time here so we pack it for
1809 * now. */
1810static u64 ocfs2_pack_timespec(struct timespec *spec)
1811{
1812	u64 res;
1813	u64 sec = spec->tv_sec;
1814	u32 nsec = spec->tv_nsec;
1815
1816	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1817
1818	return res;
1819}
1820
1821/* Call this with the lockres locked. I am reasonably sure we don't
1822 * need ip_lock in this function as anyone who would be changing those
1823 * values is supposed to be blocked in ocfs2_inode_lock right now. */
1824static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1825{
1826	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1827	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1828	struct ocfs2_meta_lvb *lvb;
1829
1830	mlog_entry_void();
1831
1832	lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
1833
1834	/*
1835	 * Invalidate the LVB of a deleted inode - this way other
1836	 * nodes are forced to go to disk and discover the new inode
1837	 * status.
1838	 */
1839	if (oi->ip_flags & OCFS2_INODE_DELETED) {
1840		lvb->lvb_version = 0;
1841		goto out;
1842	}
1843
1844	lvb->lvb_version   = OCFS2_LVB_VERSION;
1845	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
1846	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1847	lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1848	lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1849	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1850	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1851	lvb->lvb_iatime_packed  =
1852		cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1853	lvb->lvb_ictime_packed =
1854		cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1855	lvb->lvb_imtime_packed =
1856		cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1857	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1858	lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
1859	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1860
1861out:
1862	mlog_meta_lvb(0, lockres);
1863
1864	mlog_exit_void();
1865}
1866
1867static void ocfs2_unpack_timespec(struct timespec *spec,
1868				  u64 packed_time)
1869{
1870	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1871	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1872}
1873
1874static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1875{
1876	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1877	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1878	struct ocfs2_meta_lvb *lvb;
1879
1880	mlog_entry_void();
1881
1882	mlog_meta_lvb(0, lockres);
1883
1884	lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
1885
1886	/* We're safe here without the lockres lock... */
1887	spin_lock(&oi->ip_lock);
1888	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1889	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1890
1891	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1892	oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
1893	ocfs2_set_inode_flags(inode);
1894
1895	/* fast-symlinks are a special case */
1896	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1897		inode->i_blocks = 0;
1898	else
1899		inode->i_blocks = ocfs2_inode_sector_count(inode);
1900
1901	inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1902	inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1903	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1904	inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1905	ocfs2_unpack_timespec(&inode->i_atime,
1906			      be64_to_cpu(lvb->lvb_iatime_packed));
1907	ocfs2_unpack_timespec(&inode->i_mtime,
1908			      be64_to_cpu(lvb->lvb_imtime_packed));
1909	ocfs2_unpack_timespec(&inode->i_ctime,
1910			      be64_to_cpu(lvb->lvb_ictime_packed));
1911	spin_unlock(&oi->ip_lock);
1912
1913	mlog_exit_void();
1914}
1915
1916static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1917					      struct ocfs2_lock_res *lockres)
1918{
1919	struct ocfs2_meta_lvb *lvb =
1920		(struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb);
1921
1922	if (lvb->lvb_version == OCFS2_LVB_VERSION
1923	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1924		return 1;
1925	return 0;
1926}
1927
1928/* Determine whether a lock resource needs to be refreshed, and
1929 * arbitrate who gets to refresh it.
1930 *
1931 *   0 means no refresh needed.
1932 *
1933 *   > 0 means you need to refresh this and you MUST call
1934 *   ocfs2_complete_lock_res_refresh afterwards. */
1935static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1936{
1937	unsigned long flags;
1938	int status = 0;
1939
1940	mlog_entry_void();
1941
1942refresh_check:
1943	spin_lock_irqsave(&lockres->l_lock, flags);
1944	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1945		spin_unlock_irqrestore(&lockres->l_lock, flags);
1946		goto bail;
1947	}
1948
1949	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1950		spin_unlock_irqrestore(&lockres->l_lock, flags);
1951
1952		ocfs2_wait_on_refreshing_lock(lockres);
1953		goto refresh_check;
1954	}
1955
1956	/* Ok, I'll be the one to refresh this lock. */
1957	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1958	spin_unlock_irqrestore(&lockres->l_lock, flags);
1959
1960	status = 1;
1961bail:
1962	mlog_exit(status);
1963	return status;
1964}
1965
1966/* If status is non zero, I'll mark it as not being in refresh
1967 * anymroe, but i won't clear the needs refresh flag. */
1968static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1969						   int status)
1970{
1971	unsigned long flags;
1972	mlog_entry_void();
1973
1974	spin_lock_irqsave(&lockres->l_lock, flags);
1975	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1976	if (!status)
1977		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1978	spin_unlock_irqrestore(&lockres->l_lock, flags);
1979
1980	wake_up(&lockres->l_event);
1981
1982	mlog_exit_void();
1983}
1984
1985/* may or may not return a bh if it went to disk. */
1986static int ocfs2_inode_lock_update(struct inode *inode,
1987				  struct buffer_head **bh)
1988{
1989	int status = 0;
1990	struct ocfs2_inode_info *oi = OCFS2_I(inode);
1991	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1992	struct ocfs2_dinode *fe;
1993	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1994
1995	mlog_entry_void();
1996
1997	if (ocfs2_mount_local(osb))
1998		goto bail;
1999
2000	spin_lock(&oi->ip_lock);
2001	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2002		mlog(0, "Orphaned inode %llu was deleted while we "
2003		     "were waiting on a lock. ip_flags = 0x%x\n",
2004		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
2005		spin_unlock(&oi->ip_lock);
2006		status = -ENOENT;
2007		goto bail;
2008	}
2009	spin_unlock(&oi->ip_lock);
2010
2011	if (!ocfs2_should_refresh_lock_res(lockres))
2012		goto bail;
2013
2014	/* This will discard any caching information we might have had
2015	 * for the inode metadata. */
2016	ocfs2_metadata_cache_purge(inode);
2017
2018	ocfs2_extent_map_trunc(inode, 0);
2019
2020	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2021		mlog(0, "Trusting LVB on inode %llu\n",
2022		     (unsigned long long)oi->ip_blkno);
2023		ocfs2_refresh_inode_from_lvb(inode);
2024	} else {
2025		/* Boo, we have to go to disk. */
2026		/* read bh, cast, ocfs2_refresh_inode */
2027		status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
2028					  bh, OCFS2_BH_CACHED, inode);
2029		if (status < 0) {
2030			mlog_errno(status);
2031			goto bail_refresh;
2032		}
2033		fe = (struct ocfs2_dinode *) (*bh)->b_data;
2034
2035		/* This is a good chance to make sure we're not
2036		 * locking an invalid object.
2037		 *
2038		 * We bug on a stale inode here because we checked
2039		 * above whether it was wiped from disk. The wiping
2040		 * node provides a guarantee that we receive that
2041		 * message and can mark the inode before dropping any
2042		 * locks associated with it. */
2043		if (!OCFS2_IS_VALID_DINODE(fe)) {
2044			OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
2045			status = -EIO;
2046			goto bail_refresh;
2047		}
2048		mlog_bug_on_msg(inode->i_generation !=
2049				le32_to_cpu(fe->i_generation),
2050				"Invalid dinode %llu disk generation: %u "
2051				"inode->i_generation: %u\n",
2052				(unsigned long long)oi->ip_blkno,
2053				le32_to_cpu(fe->i_generation),
2054				inode->i_generation);
2055		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2056				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2057				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
2058				(unsigned long long)oi->ip_blkno,
2059				(unsigned long long)le64_to_cpu(fe->i_dtime),
2060				le32_to_cpu(fe->i_flags));
2061
2062		ocfs2_refresh_inode(inode, fe);
2063		ocfs2_track_lock_refresh(lockres);
2064	}
2065
2066	status = 0;
2067bail_refresh:
2068	ocfs2_complete_lock_res_refresh(lockres, status);
2069bail:
2070	mlog_exit(status);
2071	return status;
2072}
2073
2074static int ocfs2_assign_bh(struct inode *inode,
2075			   struct buffer_head **ret_bh,
2076			   struct buffer_head *passed_bh)
2077{
2078	int status;
2079
2080	if (passed_bh) {
2081		/* Ok, the update went to disk for us, use the
2082		 * returned bh. */
2083		*ret_bh = passed_bh;
2084		get_bh(*ret_bh);
2085
2086		return 0;
2087	}
2088
2089	status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
2090				  OCFS2_I(inode)->ip_blkno,
2091				  ret_bh,
2092				  OCFS2_BH_CACHED,
2093				  inode);
2094	if (status < 0)
2095		mlog_errno(status);
2096
2097	return status;
2098}
2099
2100/*
2101 * returns < 0 error if the callback will never be called, otherwise
2102 * the result of the lock will be communicated via the callback.
2103 */
2104int ocfs2_inode_lock_full(struct inode *inode,
2105			 struct buffer_head **ret_bh,
2106			 int ex,
2107			 int arg_flags)
2108{
2109	int status, level, acquired;
2110	u32 dlm_flags;
2111	struct ocfs2_lock_res *lockres = NULL;
2112	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2113	struct buffer_head *local_bh = NULL;
2114
2115	BUG_ON(!inode);
2116
2117	mlog_entry_void();
2118
2119	mlog(0, "inode %llu, take %s META lock\n",
2120	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2121	     ex ? "EXMODE" : "PRMODE");
2122
2123	status = 0;
2124	acquired = 0;
2125	/* We'll allow faking a readonly metadata lock for
2126	 * rodevices. */
2127	if (ocfs2_is_hard_readonly(osb)) {
2128		if (ex)
2129			status = -EROFS;
2130		goto bail;
2131	}
2132
2133	if (ocfs2_mount_local(osb))
2134		goto local;
2135
2136	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2137		ocfs2_wait_for_recovery(osb);
2138
2139	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2140	level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2141	dlm_flags = 0;
2142	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2143		dlm_flags |= DLM_LKF_NOQUEUE;
2144
2145	status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
2146	if (status < 0) {
2147		if (status != -EAGAIN && status != -EIOCBRETRY)
2148			mlog_errno(status);
2149		goto bail;
2150	}
2151
2152	/* Notify the error cleanup path to drop the cluster lock. */
2153	acquired = 1;
2154
2155	/* We wait twice because a node may have died while we were in
2156	 * the lower dlm layers. The second time though, we've
2157	 * committed to owning this lock so we don't allow signals to
2158	 * abort the operation. */
2159	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2160		ocfs2_wait_for_recovery(osb);
2161
2162local:
2163	/*
2164	 * We only see this flag if we're being called from
2165	 * ocfs2_read_locked_inode(). It means we're locking an inode
2166	 * which hasn't been populated yet, so clear the refresh flag
2167	 * and let the caller handle it.
2168	 */
2169	if (inode->i_state & I_NEW) {
2170		status = 0;
2171		if (lockres)
2172			ocfs2_complete_lock_res_refresh(lockres, 0);
2173		goto bail;
2174	}
2175
2176	/* This is fun. The caller may want a bh back, or it may
2177	 * not. ocfs2_inode_lock_update definitely wants one in, but
2178	 * may or may not read one, depending on what's in the
2179	 * LVB. The result of all of this is that we've *only* gone to
2180	 * disk if we have to, so the complexity is worthwhile. */
2181	status = ocfs2_inode_lock_update(inode, &local_bh);
2182	if (status < 0) {
2183		if (status != -ENOENT)
2184			mlog_errno(status);
2185		goto bail;
2186	}
2187
2188	if (ret_bh) {
2189		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2190		if (status < 0) {
2191			mlog_errno(status);
2192			goto bail;
2193		}
2194	}
2195
2196bail:
2197	if (status < 0) {
2198		if (ret_bh && (*ret_bh)) {
2199			brelse(*ret_bh);
2200			*ret_bh = NULL;
2201		}
2202		if (acquired)
2203			ocfs2_inode_unlock(inode, ex);
2204	}
2205
2206	if (local_bh)
2207		brelse(local_bh);
2208
2209	mlog_exit(status);
2210	return status;
2211}
2212
2213/*
2214 * This is working around a lock inversion between tasks acquiring DLM
2215 * locks while holding a page lock and the downconvert thread which
2216 * blocks dlm lock acquiry while acquiring page locks.
2217 *
2218 * ** These _with_page variantes are only intended to be called from aop
2219 * methods that hold page locks and return a very specific *positive* error
2220 * code that aop methods pass up to the VFS -- test for errors with != 0. **
2221 *
2222 * The DLM is called such that it returns -EAGAIN if it would have
2223 * blocked waiting for the downconvert thread.  In that case we unlock
2224 * our page so the downconvert thread can make progress.  Once we've
2225 * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2226 * that called us can bubble that back up into the VFS who will then
2227 * immediately retry the aop call.
2228 *
2229 * We do a blocking lock and immediate unlock before returning, though, so that
2230 * the lock has a great chance of being cached on this node by the time the VFS
2231 * calls back to retry the aop.    This has a potential to livelock as nodes
2232 * ping locks back and forth, but that's a risk we're willing to take to avoid
2233 * the lock inversion simply.
2234 */
2235int ocfs2_inode_lock_with_page(struct inode *inode,
2236			      struct buffer_head **ret_bh,
2237			      int ex,
2238			      struct page *page)
2239{
2240	int ret;
2241
2242	ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2243	if (ret == -EAGAIN) {
2244		unlock_page(page);
2245		if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2246			ocfs2_inode_unlock(inode, ex);
2247		ret = AOP_TRUNCATED_PAGE;
2248	}
2249
2250	return ret;
2251}
2252
2253int ocfs2_inode_lock_atime(struct inode *inode,
2254			  struct vfsmount *vfsmnt,
2255			  int *level)
2256{
2257	int ret;
2258
2259	mlog_entry_void();
2260	ret = ocfs2_inode_lock(inode, NULL, 0);
2261	if (ret < 0) {
2262		mlog_errno(ret);
2263		return ret;
2264	}
2265
2266	/*
2267	 * If we should update atime, we will get EX lock,
2268	 * otherwise we just get PR lock.
2269	 */
2270	if (ocfs2_should_update_atime(inode, vfsmnt)) {
2271		struct buffer_head *bh = NULL;
2272
2273		ocfs2_inode_unlock(inode, 0);
2274		ret = ocfs2_inode_lock(inode, &bh, 1);
2275		if (ret < 0) {
2276			mlog_errno(ret);
2277			return ret;
2278		}
2279		*level = 1;
2280		if (ocfs2_should_update_atime(inode, vfsmnt))
2281			ocfs2_update_inode_atime(inode, bh);
2282		if (bh)
2283			brelse(bh);
2284	} else
2285		*level = 0;
2286
2287	mlog_exit(ret);
2288	return ret;
2289}
2290
2291void ocfs2_inode_unlock(struct inode *inode,
2292		       int ex)
2293{
2294	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2295	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2296	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2297
2298	mlog_entry_void();
2299
2300	mlog(0, "inode %llu drop %s META lock\n",
2301	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2302	     ex ? "EXMODE" : "PRMODE");
2303
2304	if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2305	    !ocfs2_mount_local(osb))
2306		ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2307
2308	mlog_exit_void();
2309}
2310
2311int ocfs2_super_lock(struct ocfs2_super *osb,
2312		     int ex)
2313{
2314	int status = 0;
2315	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2316	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2317
2318	mlog_entry_void();
2319
2320	if (ocfs2_is_hard_readonly(osb))
2321		return -EROFS;
2322
2323	if (ocfs2_mount_local(osb))
2324		goto bail;
2325
2326	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2327	if (status < 0) {
2328		mlog_errno(status);
2329		goto bail;
2330	}
2331
2332	/* The super block lock path is really in the best position to
2333	 * know when resources covered by the lock need to be
2334	 * refreshed, so we do it here. Of course, making sense of
2335	 * everything is up to the caller :) */
2336	status = ocfs2_should_refresh_lock_res(lockres);
2337	if (status < 0) {
2338		mlog_errno(status);
2339		goto bail;
2340	}
2341	if (status) {
2342		status = ocfs2_refresh_slot_info(osb);
2343
2344		ocfs2_complete_lock_res_refresh(lockres, status);
2345
2346		if (status < 0)
2347			mlog_errno(status);
2348		ocfs2_track_lock_refresh(lockres);
2349	}
2350bail:
2351	mlog_exit(status);
2352	return status;
2353}
2354
2355void ocfs2_super_unlock(struct ocfs2_super *osb,
2356			int ex)
2357{
2358	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2359	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2360
2361	if (!ocfs2_mount_local(osb))
2362		ocfs2_cluster_unlock(osb, lockres, level);
2363}
2364
2365int ocfs2_rename_lock(struct ocfs2_super *osb)
2366{
2367	int status;
2368	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2369
2370	if (ocfs2_is_hard_readonly(osb))
2371		return -EROFS;
2372
2373	if (ocfs2_mount_local(osb))
2374		return 0;
2375
2376	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2377	if (status < 0)
2378		mlog_errno(status);
2379
2380	return status;
2381}
2382
2383void ocfs2_rename_unlock(struct ocfs2_super *osb)
2384{
2385	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2386
2387	if (!ocfs2_mount_local(osb))
2388		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2389}
2390
2391int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2392{
2393	int ret;
2394	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2395	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2396	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2397
2398	BUG_ON(!dl);
2399
2400	if (ocfs2_is_hard_readonly(osb))
2401		return -EROFS;
2402
2403	if (ocfs2_mount_local(osb))
2404		return 0;
2405
2406	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2407	if (ret < 0)
2408		mlog_errno(ret);
2409
2410	return ret;
2411}
2412
2413void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2414{
2415	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2416	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2417	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2418
2419	if (!ocfs2_mount_local(osb))
2420		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2421}
2422
2423/* Reference counting of the dlm debug structure. We want this because
2424 * open references on the debug inodes can live on after a mount, so
2425 * we can't rely on the ocfs2_super to always exist. */
2426static void ocfs2_dlm_debug_free(struct kref *kref)
2427{
2428	struct ocfs2_dlm_debug *dlm_debug;
2429
2430	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2431
2432	kfree(dlm_debug);
2433}
2434
2435void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2436{
2437	if (dlm_debug)
2438		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2439}
2440
2441static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2442{
2443	kref_get(&debug->d_refcnt);
2444}
2445
2446struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2447{
2448	struct ocfs2_dlm_debug *dlm_debug;
2449
2450	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2451	if (!dlm_debug) {
2452		mlog_errno(-ENOMEM);
2453		goto out;
2454	}
2455
2456	kref_init(&dlm_debug->d_refcnt);
2457	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2458	dlm_debug->d_locking_state = NULL;
2459out:
2460	return dlm_debug;
2461}
2462
2463/* Access to this is arbitrated for us via seq_file->sem. */
2464struct ocfs2_dlm_seq_priv {
2465	struct ocfs2_dlm_debug *p_dlm_debug;
2466	struct ocfs2_lock_res p_iter_res;
2467	struct ocfs2_lock_res p_tmp_res;
2468};
2469
2470static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2471						 struct ocfs2_dlm_seq_priv *priv)
2472{
2473	struct ocfs2_lock_res *iter, *ret = NULL;
2474	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2475
2476	assert_spin_locked(&ocfs2_dlm_tracking_lock);
2477
2478	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2479		/* discover the head of the list */
2480		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2481			mlog(0, "End of list found, %p\n", ret);
2482			break;
2483		}
2484
2485		/* We track our "dummy" iteration lockres' by a NULL
2486		 * l_ops field. */
2487		if (iter->l_ops != NULL) {
2488			ret = iter;
2489			break;
2490		}
2491	}
2492
2493	return ret;
2494}
2495
2496static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2497{
2498	struct ocfs2_dlm_seq_priv *priv = m->private;
2499	struct ocfs2_lock_res *iter;
2500
2501	spin_lock(&ocfs2_dlm_tracking_lock);
2502	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2503	if (iter) {
2504		/* Since lockres' have the lifetime of their container
2505		 * (which can be inodes, ocfs2_supers, etc) we want to
2506		 * copy this out to a temporary lockres while still
2507		 * under the spinlock. Obviously after this we can't
2508		 * trust any pointers on the copy returned, but that's
2509		 * ok as the information we want isn't typically held
2510		 * in them. */
2511		priv->p_tmp_res = *iter;
2512		iter = &priv->p_tmp_res;
2513	}
2514	spin_unlock(&ocfs2_dlm_tracking_lock);
2515
2516	return iter;
2517}
2518
2519static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2520{
2521}
2522
2523static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2524{
2525	struct ocfs2_dlm_seq_priv *priv = m->private;
2526	struct ocfs2_lock_res *iter = v;
2527	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2528
2529	spin_lock(&ocfs2_dlm_tracking_lock);
2530	iter = ocfs2_dlm_next_res(iter, priv);
2531	list_del_init(&dummy->l_debug_list);
2532	if (iter) {
2533		list_add(&dummy->l_debug_list, &iter->l_debug_list);
2534		priv->p_tmp_res = *iter;
2535		iter = &priv->p_tmp_res;
2536	}
2537	spin_unlock(&ocfs2_dlm_tracking_lock);
2538
2539	return iter;
2540}
2541
2542/* So that debugfs.ocfs2 can determine which format is being used */
2543#define OCFS2_DLM_DEBUG_STR_VERSION 2
2544static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2545{
2546	int i;
2547	char *lvb;
2548	struct ocfs2_lock_res *lockres = v;
2549
2550	if (!lockres)
2551		return -EINVAL;
2552
2553	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2554
2555	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2556		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2557			   lockres->l_name,
2558			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2559	else
2560		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2561
2562	seq_printf(m, "%d\t"
2563		   "0x%lx\t"
2564		   "0x%x\t"
2565		   "0x%x\t"
2566		   "%u\t"
2567		   "%u\t"
2568		   "%d\t"
2569		   "%d\t",
2570		   lockres->l_level,
2571		   lockres->l_flags,
2572		   lockres->l_action,
2573		   lockres->l_unlock_action,
2574		   lockres->l_ro_holders,
2575		   lockres->l_ex_holders,
2576		   lockres->l_requested,
2577		   lockres->l_blocking);
2578
2579	/* Dump the raw LVB */
2580	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2581	for(i = 0; i < DLM_LVB_LEN; i++)
2582		seq_printf(m, "0x%x\t", lvb[i]);
2583
2584#ifdef CONFIG_OCFS2_FS_STATS
2585# define lock_num_prmode(_l)		(_l)->l_lock_num_prmode
2586# define lock_num_exmode(_l)		(_l)->l_lock_num_exmode
2587# define lock_num_prmode_failed(_l)	(_l)->l_lock_num_prmode_failed
2588# define lock_num_exmode_failed(_l)	(_l)->l_lock_num_exmode_failed
2589# define lock_total_prmode(_l)		(_l)->l_lock_total_prmode
2590# define lock_total_exmode(_l)		(_l)->l_lock_total_exmode
2591# define lock_max_prmode(_l)		(_l)->l_lock_max_prmode
2592# define lock_max_exmode(_l)		(_l)->l_lock_max_exmode
2593# define lock_refresh(_l)		(_l)->l_lock_refresh
2594#else
2595# define lock_num_prmode(_l)		(0)
2596# define lock_num_exmode(_l)		(0)
2597# define lock_num_prmode_failed(_l)	(0)
2598# define lock_num_exmode_failed(_l)	(0)
2599# define lock_total_prmode(_l)		(0)
2600# define lock_total_exmode(_l)		(0)
2601# define lock_max_prmode(_l)		(0)
2602# define lock_max_exmode(_l)		(0)
2603# define lock_refresh(_l)		(0)
2604#endif
2605	/* The following seq_print was added in version 2 of this output */
2606	seq_printf(m, "%llu\t"
2607		   "%llu\t"
2608		   "%u\t"
2609		   "%u\t"
2610		   "%llu\t"
2611		   "%llu\t"
2612		   "%u\t"
2613		   "%u\t"
2614		   "%u\t",
2615		   lock_num_prmode(lockres),
2616		   lock_num_exmode(lockres),
2617		   lock_num_prmode_failed(lockres),
2618		   lock_num_exmode_failed(lockres),
2619		   lock_total_prmode(lockres),
2620		   lock_total_exmode(lockres),
2621		   lock_max_prmode(lockres),
2622		   lock_max_exmode(lockres),
2623		   lock_refresh(lockres));
2624
2625	/* End the line */
2626	seq_printf(m, "\n");
2627	return 0;
2628}
2629
2630static const struct seq_operations ocfs2_dlm_seq_ops = {
2631	.start =	ocfs2_dlm_seq_start,
2632	.stop =		ocfs2_dlm_seq_stop,
2633	.next =		ocfs2_dlm_seq_next,
2634	.show =		ocfs2_dlm_seq_show,
2635};
2636
2637static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2638{
2639	struct seq_file *seq = (struct seq_file *) file->private_data;
2640	struct ocfs2_dlm_seq_priv *priv = seq->private;
2641	struct ocfs2_lock_res *res = &priv->p_iter_res;
2642
2643	ocfs2_remove_lockres_tracking(res);
2644	ocfs2_put_dlm_debug(priv->p_dlm_debug);
2645	return seq_release_private(inode, file);
2646}
2647
2648static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2649{
2650	int ret;
2651	struct ocfs2_dlm_seq_priv *priv;
2652	struct seq_file *seq;
2653	struct ocfs2_super *osb;
2654
2655	priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2656	if (!priv) {
2657		ret = -ENOMEM;
2658		mlog_errno(ret);
2659		goto out;
2660	}
2661	osb = inode->i_private;
2662	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2663	priv->p_dlm_debug = osb->osb_dlm_debug;
2664	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2665
2666	ret = seq_open(file, &ocfs2_dlm_seq_ops);
2667	if (ret) {
2668		kfree(priv);
2669		mlog_errno(ret);
2670		goto out;
2671	}
2672
2673	seq = (struct seq_file *) file->private_data;
2674	seq->private = priv;
2675
2676	ocfs2_add_lockres_tracking(&priv->p_iter_res,
2677				   priv->p_dlm_debug);
2678
2679out:
2680	return ret;
2681}
2682
2683static const struct file_operations ocfs2_dlm_debug_fops = {
2684	.open =		ocfs2_dlm_debug_open,
2685	.release =	ocfs2_dlm_debug_release,
2686	.read =		seq_read,
2687	.llseek =	seq_lseek,
2688};
2689
2690static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2691{
2692	int ret = 0;
2693	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2694
2695	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2696							 S_IFREG|S_IRUSR,
2697							 osb->osb_debug_root,
2698							 osb,
2699							 &ocfs2_dlm_debug_fops);
2700	if (!dlm_debug->d_locking_state) {
2701		ret = -EINVAL;
2702		mlog(ML_ERROR,
2703		     "Unable to create locking state debugfs file.\n");
2704		goto out;
2705	}
2706
2707	ocfs2_get_dlm_debug(dlm_debug);
2708out:
2709	return ret;
2710}
2711
2712static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2713{
2714	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2715
2716	if (dlm_debug) {
2717		debugfs_remove(dlm_debug->d_locking_state);
2718		ocfs2_put_dlm_debug(dlm_debug);
2719	}
2720}
2721
2722int ocfs2_dlm_init(struct ocfs2_super *osb)
2723{
2724	int status = 0;
2725	struct ocfs2_cluster_connection *conn = NULL;
2726
2727	mlog_entry_void();
2728
2729	if (ocfs2_mount_local(osb)) {
2730		osb->node_num = 0;
2731		goto local;
2732	}
2733
2734	status = ocfs2_dlm_init_debug(osb);
2735	if (status < 0) {
2736		mlog_errno(status);
2737		goto bail;
2738	}
2739
2740	/* launch downconvert thread */
2741	osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2742	if (IS_ERR(osb->dc_task)) {
2743		status = PTR_ERR(osb->dc_task);
2744		osb->dc_task = NULL;
2745		mlog_errno(status);
2746		goto bail;
2747	}
2748
2749	/* for now, uuid == domain */
2750	status = ocfs2_cluster_connect(osb->osb_cluster_stack,
2751				       osb->uuid_str,
2752				       strlen(osb->uuid_str),
2753				       ocfs2_do_node_down, osb,
2754				       &conn);
2755	if (status) {
2756		mlog_errno(status);
2757		goto bail;
2758	}
2759
2760	status = ocfs2_cluster_this_node(&osb->node_num);
2761	if (status < 0) {
2762		mlog_errno(status);
2763		mlog(ML_ERROR,
2764		     "could not find this host's node number\n");
2765		ocfs2_cluster_disconnect(conn, 0);
2766		goto bail;
2767	}
2768
2769local:
2770	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2771	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2772
2773	osb->cconn = conn;
2774
2775	status = 0;
2776bail:
2777	if (status < 0) {
2778		ocfs2_dlm_shutdown_debug(osb);
2779		if (osb->dc_task)
2780			kthread_stop(osb->dc_task);
2781	}
2782
2783	mlog_exit(status);
2784	return status;
2785}
2786
2787void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
2788			int hangup_pending)
2789{
2790	mlog_entry_void();
2791
2792	ocfs2_drop_osb_locks(osb);
2793
2794	/*
2795	 * Now that we have dropped all locks and ocfs2_dismount_volume()
2796	 * has disabled recovery, the DLM won't be talking to us.  It's
2797	 * safe to tear things down before disconnecting the cluster.
2798	 */
2799
2800	if (osb->dc_task) {
2801		kthread_stop(osb->dc_task);
2802		osb->dc_task = NULL;
2803	}
2804
2805	ocfs2_lock_res_free(&osb->osb_super_lockres);
2806	ocfs2_lock_res_free(&osb->osb_rename_lockres);
2807
2808	ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
2809	osb->cconn = NULL;
2810
2811	ocfs2_dlm_shutdown_debug(osb);
2812
2813	mlog_exit_void();
2814}
2815
2816static void ocfs2_unlock_ast(void *opaque, int error)
2817{
2818	struct ocfs2_lock_res *lockres = opaque;
2819	unsigned long flags;
2820
2821	mlog_entry_void();
2822
2823	mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2824	     lockres->l_unlock_action);
2825
2826	spin_lock_irqsave(&lockres->l_lock, flags);
2827	if (error) {
2828		mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
2829		     "unlock_action %d\n", error, lockres->l_name,
2830		     lockres->l_unlock_action);
2831		spin_unlock_irqrestore(&lockres->l_lock, flags);
2832		return;
2833	}
2834
2835	switch(lockres->l_unlock_action) {
2836	case OCFS2_UNLOCK_CANCEL_CONVERT:
2837		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2838		lockres->l_action = OCFS2_AST_INVALID;
2839		break;
2840	case OCFS2_UNLOCK_DROP_LOCK:
2841		lockres->l_level = DLM_LOCK_IV;
2842		break;
2843	default:
2844		BUG();
2845	}
2846
2847	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2848	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2849	spin_unlock_irqrestore(&lockres->l_lock, flags);
2850
2851	wake_up(&lockres->l_event);
2852
2853	mlog_exit_void();
2854}
2855
2856static int ocfs2_drop_lock(struct ocfs2_super *osb,
2857			   struct ocfs2_lock_res *lockres)
2858{
2859	int ret;
2860	unsigned long flags;
2861	u32 lkm_flags = 0;
2862
2863	/* We didn't get anywhere near actually using this lockres. */
2864	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2865		goto out;
2866
2867	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2868		lkm_flags |= DLM_LKF_VALBLK;
2869
2870	spin_lock_irqsave(&lockres->l_lock, flags);
2871
2872	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2873			"lockres %s, flags 0x%lx\n",
2874			lockres->l_name, lockres->l_flags);
2875
2876	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2877		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2878		     "%u, unlock_action = %u\n",
2879		     lockres->l_name, lockres->l_flags, lockres->l_action,
2880		     lockres->l_unlock_action);
2881
2882		spin_unlock_irqrestore(&lockres->l_lock, flags);
2883
2884		/* XXX: Today we just wait on any busy
2885		 * locks... Perhaps we need to cancel converts in the
2886		 * future? */
2887		ocfs2_wait_on_busy_lock(lockres);
2888
2889		spin_lock_irqsave(&lockres->l_lock, flags);
2890	}
2891
2892	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2893		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2894		    lockres->l_level == DLM_LOCK_EX &&
2895		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2896			lockres->l_ops->set_lvb(lockres);
2897	}
2898
2899	if (lockres->l_flags & OCFS2_LOCK_BUSY)
2900		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2901		     lockres->l_name);
2902	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2903		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2904
2905	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2906		spin_unlock_irqrestore(&lockres->l_lock, flags);
2907		goto out;
2908	}
2909
2910	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2911
2912	/* make sure we never get here while waiting for an ast to
2913	 * fire. */
2914	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2915
2916	/* is this necessary? */
2917	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2918	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2919	spin_unlock_irqrestore(&lockres->l_lock, flags);
2920
2921	mlog(0, "lock %s\n", lockres->l_name);
2922
2923	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags,
2924			       lockres);
2925	if (ret) {
2926		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
2927		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2928		ocfs2_dlm_dump_lksb(&lockres->l_lksb);
2929		BUG();
2930	}
2931	mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n",
2932	     lockres->l_name);
2933
2934	ocfs2_wait_on_busy_lock(lockres);
2935out:
2936	mlog_exit(0);
2937	return 0;
2938}
2939
2940/* Mark the lockres as being dropped. It will no longer be
2941 * queued if blocking, but we still may have to wait on it
2942 * being dequeued from the downconvert thread before we can consider
2943 * it safe to drop.
2944 *
2945 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2946void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2947{
2948	int status;
2949	struct ocfs2_mask_waiter mw;
2950	unsigned long flags;
2951
2952	ocfs2_init_mask_waiter(&mw);
2953
2954	spin_lock_irqsave(&lockres->l_lock, flags);
2955	lockres->l_flags |= OCFS2_LOCK_FREEING;
2956	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2957		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2958		spin_unlock_irqrestore(&lockres->l_lock, flags);
2959
2960		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2961
2962		status = ocfs2_wait_for_mask(&mw);
2963		if (status)
2964			mlog_errno(status);
2965
2966		spin_lock_irqsave(&lockres->l_lock, flags);
2967	}
2968	spin_unlock_irqrestore(&lockres->l_lock, flags);
2969}
2970
2971void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2972			       struct ocfs2_lock_res *lockres)
2973{
2974	int ret;
2975
2976	ocfs2_mark_lockres_freeing(lockres);
2977	ret = ocfs2_drop_lock(osb, lockres);
2978	if (ret)
2979		mlog_errno(ret);
2980}
2981
2982static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2983{
2984	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2985	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2986}
2987
2988int ocfs2_drop_inode_locks(struct inode *inode)
2989{
2990	int status, err;
2991
2992	mlog_entry_void();
2993
2994	/* No need to call ocfs2_mark_lockres_freeing here -
2995	 * ocfs2_clear_inode has done it for us. */
2996
2997	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2998			      &OCFS2_I(inode)->ip_open_lockres);
2999	if (err < 0)
3000		mlog_errno(err);
3001
3002	status = err;
3003
3004	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3005			      &OCFS2_I(inode)->ip_inode_lockres);
3006	if (err < 0)
3007		mlog_errno(err);
3008	if (err < 0 && !status)
3009		status = err;
3010
3011	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3012			      &OCFS2_I(inode)->ip_rw_lockres);
3013	if (err < 0)
3014		mlog_errno(err);
3015	if (err < 0 && !status)
3016		status = err;
3017
3018	mlog_exit(status);
3019	return status;
3020}
3021
3022static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3023					      int new_level)
3024{
3025	assert_spin_locked(&lockres->l_lock);
3026
3027	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3028
3029	if (lockres->l_level <= new_level) {
3030		mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n",
3031		     lockres->l_level, new_level);
3032		BUG();
3033	}
3034
3035	mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
3036	     lockres->l_name, new_level, lockres->l_blocking);
3037
3038	lockres->l_action = OCFS2_AST_DOWNCONVERT;
3039	lockres->l_requested = new_level;
3040	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3041	return lockres_set_pending(lockres);
3042}
3043
3044static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3045				  struct ocfs2_lock_res *lockres,
3046				  int new_level,
3047				  int lvb,
3048				  unsigned int generation)
3049{
3050	int ret;
3051	u32 dlm_flags = DLM_LKF_CONVERT;
3052
3053	mlog_entry_void();
3054
3055	if (lvb)
3056		dlm_flags |= DLM_LKF_VALBLK;
3057
3058	ret = ocfs2_dlm_lock(osb->cconn,
3059			     new_level,
3060			     &lockres->l_lksb,
3061			     dlm_flags,
3062			     lockres->l_name,
3063			     OCFS2_LOCK_ID_MAX_LEN - 1,
3064			     lockres);
3065	lockres_clear_pending(lockres, generation, osb);
3066	if (ret) {
3067		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3068		ocfs2_recover_from_dlm_error(lockres, 1);
3069		goto bail;
3070	}
3071
3072	ret = 0;
3073bail:
3074	mlog_exit(ret);
3075	return ret;
3076}
3077
3078/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3079static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3080				        struct ocfs2_lock_res *lockres)
3081{
3082	assert_spin_locked(&lockres->l_lock);
3083
3084	mlog_entry_void();
3085	mlog(0, "lock %s\n", lockres->l_name);
3086
3087	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3088		/* If we're already trying to cancel a lock conversion
3089		 * then just drop the spinlock and allow the caller to
3090		 * requeue this lock. */
3091
3092		mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
3093		return 0;
3094	}
3095
3096	/* were we in a convert when we got the bast fire? */
3097	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3098	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
3099	/* set things up for the unlockast to know to just
3100	 * clear out the ast_action and unset busy, etc. */
3101	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3102
3103	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3104			"lock %s, invalid flags: 0x%lx\n",
3105			lockres->l_name, lockres->l_flags);
3106
3107	return 1;
3108}
3109
3110static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3111				struct ocfs2_lock_res *lockres)
3112{
3113	int ret;
3114
3115	mlog_entry_void();
3116	mlog(0, "lock %s\n", lockres->l_name);
3117
3118	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3119			       DLM_LKF_CANCEL, lockres);
3120	if (ret) {
3121		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3122		ocfs2_recover_from_dlm_error(lockres, 0);
3123	}
3124
3125	mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name);
3126
3127	mlog_exit(ret);
3128	return ret;
3129}
3130
3131static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3132			      struct ocfs2_lock_res *lockres,
3133			      struct ocfs2_unblock_ctl *ctl)
3134{
3135	unsigned long flags;
3136	int blocking;
3137	int new_level;
3138	int ret = 0;
3139	int set_lvb = 0;
3140	unsigned int gen;
3141
3142	mlog_entry_void();
3143
3144	spin_lock_irqsave(&lockres->l_lock, flags);
3145
3146	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
3147
3148recheck:
3149	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3150		/* XXX
3151		 * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3152		 * exists entirely for one reason - another thread has set
3153		 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3154		 *
3155		 * If we do ocfs2_cancel_convert() before the other thread
3156		 * calls dlm_lock(), our cancel will do nothing.  We will
3157		 * get no ast, and we will have no way of knowing the
3158		 * cancel failed.  Meanwhile, the other thread will call
3159		 * into dlm_lock() and wait...forever.
3160		 *
3161		 * Why forever?  Because another node has asked for the
3162		 * lock first; that's why we're here in unblock_lock().
3163		 *
3164		 * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3165		 * set, we just requeue the unblock.  Only when the other
3166		 * thread has called dlm_lock() and cleared PENDING will
3167		 * we then cancel their request.
3168		 *
3169		 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3170		 * at the same time they set OCFS2_DLM_BUSY.  They must
3171		 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3172		 */
3173		if (lockres->l_flags & OCFS2_LOCK_PENDING)
3174			goto leave_requeue;
3175
3176		ctl->requeue = 1;
3177		ret = ocfs2_prepare_cancel_convert(osb, lockres);
3178		spin_unlock_irqrestore(&lockres->l_lock, flags);
3179		if (ret) {
3180			ret = ocfs2_cancel_convert(osb, lockres);
3181			if (ret < 0)
3182				mlog_errno(ret);
3183		}
3184		goto leave;
3185	}
3186
3187	/* if we're blocking an exclusive and we have *any* holders,
3188	 * then requeue. */
3189	if ((lockres->l_blocking == DLM_LOCK_EX)
3190	    && (lockres->l_ex_holders || lockres->l_ro_holders))
3191		goto leave_requeue;
3192
3193	/* If it's a PR we're blocking, then only
3194	 * requeue if we've got any EX holders */
3195	if (lockres->l_blocking == DLM_LOCK_PR &&
3196	    lockres->l_ex_holders)
3197		goto leave_requeue;
3198
3199	/*
3200	 * Can we get a lock in this state if the holder counts are
3201	 * zero? The meta data unblock code used to check this.
3202	 */
3203	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3204	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
3205		goto leave_requeue;
3206
3207	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3208
3209	if (lockres->l_ops->check_downconvert
3210	    && !lockres->l_ops->check_downconvert(lockres, new_level))
3211		goto leave_requeue;
3212
3213	/* If we get here, then we know that there are no more
3214	 * incompatible holders (and anyone asking for an incompatible
3215	 * lock is blocked). We can now downconvert the lock */
3216	if (!lockres->l_ops->downconvert_worker)
3217		goto downconvert;
3218
3219	/* Some lockres types want to do a bit of work before
3220	 * downconverting a lock. Allow that here. The worker function
3221	 * may sleep, so we save off a copy of what we're blocking as
3222	 * it may change while we're not holding the spin lock. */
3223	blocking = lockres->l_blocking;
3224	spin_unlock_irqrestore(&lockres->l_lock, flags);
3225
3226	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3227
3228	if (ctl->unblock_action == UNBLOCK_STOP_POST)
3229		goto leave;
3230
3231	spin_lock_irqsave(&lockres->l_lock, flags);
3232	if (blocking != lockres->l_blocking) {
3233		/* If this changed underneath us, then we can't drop
3234		 * it just yet. */
3235		goto recheck;
3236	}
3237
3238downconvert:
3239	ctl->requeue = 0;
3240
3241	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3242		if (lockres->l_level == DLM_LOCK_EX)
3243			set_lvb = 1;
3244
3245		/*
3246		 * We only set the lvb if the lock has been fully
3247		 * refreshed - otherwise we risk setting stale
3248		 * data. Otherwise, there's no need to actually clear
3249		 * out the lvb here as it's value is still valid.
3250		 */
3251		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3252			lockres->l_ops->set_lvb(lockres);
3253	}
3254
3255	gen = ocfs2_prepare_downconvert(lockres, new_level);
3256	spin_unlock_irqrestore(&lockres->l_lock, flags);
3257	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3258				     gen);
3259
3260leave:
3261	mlog_exit(ret);
3262	return ret;
3263
3264leave_requeue:
3265	spin_unlock_irqrestore(&lockres->l_lock, flags);
3266	ctl->requeue = 1;
3267
3268	mlog_exit(0);
3269	return 0;
3270}
3271
3272static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3273				     int blocking)
3274{
3275	struct inode *inode;
3276	struct address_space *mapping;
3277
3278       	inode = ocfs2_lock_res_inode(lockres);
3279	mapping = inode->i_mapping;
3280
3281	if (!S_ISREG(inode->i_mode))
3282		goto out;
3283
3284	/*
3285	 * We need this before the filemap_fdatawrite() so that it can
3286	 * transfer the dirty bit from the PTE to the
3287	 * page. Unfortunately this means that even for EX->PR
3288	 * downconverts, we'll lose our mappings and have to build
3289	 * them up again.
3290	 */
3291	unmap_mapping_range(mapping, 0, 0, 0);
3292
3293	if (filemap_fdatawrite(mapping)) {
3294		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3295		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
3296	}
3297	sync_mapping_buffers(mapping);
3298	if (blocking == DLM_LOCK_EX) {
3299		truncate_inode_pages(mapping, 0);
3300	} else {
3301		/* We only need to wait on the I/O if we're not also
3302		 * truncating pages because truncate_inode_pages waits
3303		 * for us above. We don't truncate pages if we're
3304		 * blocking anything < EXMODE because we want to keep
3305		 * them around in that case. */
3306		filemap_fdatawait(mapping);
3307	}
3308
3309out:
3310	return UNBLOCK_CONTINUE;
3311}
3312
3313static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3314					int new_level)
3315{
3316	struct inode *inode = ocfs2_lock_res_inode(lockres);
3317	int checkpointed = ocfs2_inode_fully_checkpointed(inode);
3318
3319	BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3320	BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3321
3322	if (checkpointed)
3323		return 1;
3324
3325	ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
3326	return 0;
3327}
3328
3329static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3330{
3331	struct inode *inode = ocfs2_lock_res_inode(lockres);
3332
3333	__ocfs2_stuff_meta_lvb(inode);
3334}
3335
3336/*
3337 * Does the final reference drop on our dentry lock. Right now this
3338 * happens in the downconvert thread, but we could choose to simplify the
3339 * dlmglue API and push these off to the ocfs2_wq in the future.
3340 */
3341static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3342				     struct ocfs2_lock_res *lockres)
3343{
3344	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3345	ocfs2_dentry_lock_put(osb, dl);
3346}
3347
3348/*
3349 * d_delete() matching dentries before the lock downconvert.
3350 *
3351 * At this point, any process waiting to destroy the
3352 * dentry_lock due to last ref count is stopped by the
3353 * OCFS2_LOCK_QUEUED flag.
3354 *
3355 * We have two potential problems
3356 *
3357 * 1) If we do the last reference drop on our dentry_lock (via dput)
3358 *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3359 *    the downconvert to finish. Instead we take an elevated
3360 *    reference and push the drop until after we've completed our
3361 *    unblock processing.
3362 *
3363 * 2) There might be another process with a final reference,
3364 *    waiting on us to finish processing. If this is the case, we
3365 *    detect it and exit out - there's no more dentries anyway.
3366 */
3367static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3368				       int blocking)
3369{
3370	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3371	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3372	struct dentry *dentry;
3373	unsigned long flags;
3374	int extra_ref = 0;
3375
3376	/*
3377	 * This node is blocking another node from getting a read
3378	 * lock. This happens when we've renamed within a
3379	 * directory. We've forced the other nodes to d_delete(), but
3380	 * we never actually dropped our lock because it's still
3381	 * valid. The downconvert code will retain a PR for this node,
3382	 * so there's no further work to do.
3383	 */
3384	if (blocking == DLM_LOCK_PR)
3385		return UNBLOCK_CONTINUE;
3386
3387	/*
3388	 * Mark this inode as potentially orphaned. The code in
3389	 * ocfs2_delete_inode() will figure out whether it actually
3390	 * needs to be freed or not.
3391	 */
3392	spin_lock(&oi->ip_lock);
3393	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3394	spin_unlock(&oi->ip_lock);
3395
3396	/*
3397	 * Yuck. We need to make sure however that the check of
3398	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
3399	 * respect to a reference decrement or the setting of that
3400	 * flag.
3401	 */
3402	spin_lock_irqsave(&lockres->l_lock, flags);
3403	spin_lock(&dentry_attach_lock);
3404	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3405	    && dl->dl_count) {
3406		dl->dl_count++;
3407		extra_ref = 1;
3408	}
3409	spin_unlock(&dentry_attach_lock);
3410	spin_unlock_irqrestore(&lockres->l_lock, flags);
3411
3412	mlog(0, "extra_ref = %d\n", extra_ref);
3413
3414	/*
3415	 * We have a process waiting on us in ocfs2_dentry_iput(),
3416	 * which means we can't have any more outstanding
3417	 * aliases. There's no need to do any more work.
3418	 */
3419	if (!extra_ref)
3420		return UNBLOCK_CONTINUE;
3421
3422	spin_lock(&dentry_attach_lock);
3423	while (1) {
3424		dentry = ocfs2_find_local_alias(dl->dl_inode,
3425						dl->dl_parent_blkno, 1);
3426		if (!dentry)
3427			break;
3428		spin_unlock(&dentry_attach_lock);
3429
3430		mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3431		     dentry->d_name.name);
3432
3433		/*
3434		 * The following dcache calls may do an
3435		 * iput(). Normally we don't want that from the
3436		 * downconverting thread, but in this case it's ok
3437		 * because the requesting node already has an
3438		 * exclusive lock on the inode, so it can't be queued
3439		 * for a downconvert.
3440		 */
3441		d_delete(dentry);
3442		dput(dentry);
3443
3444		spin_lock(&dentry_attach_lock);
3445	}
3446	spin_unlock(&dentry_attach_lock);
3447
3448	/*
3449	 * If we are the last holder of this dentry lock, there is no
3450	 * reason to downconvert so skip straight to the unlock.
3451	 */
3452	if (dl->dl_count == 1)
3453		return UNBLOCK_STOP_POST;
3454
3455	return UNBLOCK_CONTINUE_POST;
3456}
3457
3458/*
3459 * This is the filesystem locking protocol.  It provides the lock handling
3460 * hooks for the underlying DLM.  It has a maximum version number.
3461 * The version number allows interoperability with systems running at
3462 * the same major number and an equal or smaller minor number.
3463 *
3464 * Whenever the filesystem does new things with locks (adds or removes a
3465 * lock, orders them differently, does different things underneath a lock),
3466 * the version must be changed.  The protocol is negotiated when joining
3467 * the dlm domain.  A node may join the domain if its major version is
3468 * identical to all other nodes and its minor version is greater than
3469 * or equal to all other nodes.  When its minor version is greater than
3470 * the other nodes, it will run at the minor version specified by the
3471 * other nodes.
3472 *
3473 * If a locking change is made that will not be compatible with older
3474 * versions, the major number must be increased and the minor version set
3475 * to zero.  If a change merely adds a behavior that can be disabled when
3476 * speaking to older versions, the minor version must be increased.  If a
3477 * change adds a fully backwards compatible change (eg, LVB changes that
3478 * are just ignored by older versions), the version does not need to be
3479 * updated.
3480 */
3481static struct ocfs2_locking_protocol lproto = {
3482	.lp_max_version = {
3483		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
3484		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
3485	},
3486	.lp_lock_ast		= ocfs2_locking_ast,
3487	.lp_blocking_ast	= ocfs2_blocking_ast,
3488	.lp_unlock_ast		= ocfs2_unlock_ast,
3489};
3490
3491void ocfs2_set_locking_protocol(void)
3492{
3493	ocfs2_stack_glue_set_locking_protocol(&lproto);
3494}
3495
3496
3497static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3498				       struct ocfs2_lock_res *lockres)
3499{
3500	int status;
3501	struct ocfs2_unblock_ctl ctl = {0, 0,};
3502	unsigned long flags;
3503
3504	/* Our reference to the lockres in this function can be
3505	 * considered valid until we remove the OCFS2_LOCK_QUEUED
3506	 * flag. */
3507
3508	mlog_entry_void();
3509
3510	BUG_ON(!lockres);
3511	BUG_ON(!lockres->l_ops);
3512
3513	mlog(0, "lockres %s blocked.\n", lockres->l_name);
3514
3515	/* Detect whether a lock has been marked as going away while
3516	 * the downconvert thread was processing other things. A lock can
3517	 * still be marked with OCFS2_LOCK_FREEING after this check,
3518	 * but short circuiting here will still save us some
3519	 * performance. */
3520	spin_lock_irqsave(&lockres->l_lock, flags);
3521	if (lockres->l_flags & OCFS2_LOCK_FREEING)
3522		goto unqueue;
3523	spin_unlock_irqrestore(&lockres->l_lock, flags);
3524
3525	status = ocfs2_unblock_lock(osb, lockres, &ctl);
3526	if (status < 0)
3527		mlog_errno(status);
3528
3529	spin_lock_irqsave(&lockres->l_lock, flags);
3530unqueue:
3531	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3532		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3533	} else
3534		ocfs2_schedule_blocked_lock(osb, lockres);
3535
3536	mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3537	     ctl.requeue ? "yes" : "no");
3538	spin_unlock_irqrestore(&lockres->l_lock, flags);
3539
3540	if (ctl.unblock_action != UNBLOCK_CONTINUE
3541	    && lockres->l_ops->post_unlock)
3542		lockres->l_ops->post_unlock(osb, lockres);
3543
3544	mlog_exit_void();
3545}
3546
3547static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3548					struct ocfs2_lock_res *lockres)
3549{
3550	mlog_entry_void();
3551
3552	assert_spin_locked(&lockres->l_lock);
3553
3554	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3555		/* Do not schedule a lock for downconvert when it's on
3556		 * the way to destruction - any nodes wanting access
3557		 * to the resource will get it soon. */
3558		mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3559		     lockres->l_name, lockres->l_flags);
3560		return;
3561	}
3562
3563	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3564
3565	spin_lock(&osb->dc_task_lock);
3566	if (list_empty(&lockres->l_blocked_list)) {
3567		list_add_tail(&lockres->l_blocked_list,
3568			      &osb->blocked_lock_list);
3569		osb->blocked_lock_count++;
3570	}
3571	spin_unlock(&osb->dc_task_lock);
3572
3573	mlog_exit_void();
3574}
3575
3576static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3577{
3578	unsigned long processed;
3579	struct ocfs2_lock_res *lockres;
3580
3581	mlog_entry_void();
3582
3583	spin_lock(&osb->dc_task_lock);
3584	/* grab this early so we know to try again if a state change and
3585	 * wake happens part-way through our work  */
3586	osb->dc_work_sequence = osb->dc_wake_sequence;
3587
3588	processed = osb->blocked_lock_count;
3589	while (processed) {
3590		BUG_ON(list_empty(&osb->blocked_lock_list));
3591
3592		lockres = list_entry(osb->blocked_lock_list.next,
3593				     struct ocfs2_lock_res, l_blocked_list);
3594		list_del_init(&lockres->l_blocked_list);
3595		osb->blocked_lock_count--;
3596		spin_unlock(&osb->dc_task_lock);
3597
3598		BUG_ON(!processed);
3599		processed--;
3600
3601		ocfs2_process_blocked_lock(osb, lockres);
3602
3603		spin_lock(&osb->dc_task_lock);
3604	}
3605	spin_unlock(&osb->dc_task_lock);
3606
3607	mlog_exit_void();
3608}
3609
3610static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3611{
3612	int empty = 0;
3613
3614	spin_lock(&osb->dc_task_lock);
3615	if (list_empty(&osb->blocked_lock_list))
3616		empty = 1;
3617
3618	spin_unlock(&osb->dc_task_lock);
3619	return empty;
3620}
3621
3622static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3623{
3624	int should_wake = 0;
3625
3626	spin_lock(&osb->dc_task_lock);
3627	if (osb->dc_work_sequence != osb->dc_wake_sequence)
3628		should_wake = 1;
3629	spin_unlock(&osb->dc_task_lock);
3630
3631	return should_wake;
3632}
3633
3634static int ocfs2_downconvert_thread(void *arg)
3635{
3636	int status = 0;
3637	struct ocfs2_super *osb = arg;
3638
3639	/* only quit once we've been asked to stop and there is no more
3640	 * work available */
3641	while (!(kthread_should_stop() &&
3642		ocfs2_downconvert_thread_lists_empty(osb))) {
3643
3644		wait_event_interruptible(osb->dc_event,
3645					 ocfs2_downconvert_thread_should_wake(osb) ||
3646					 kthread_should_stop());
3647
3648		mlog(0, "downconvert_thread: awoken\n");
3649
3650		ocfs2_downconvert_thread_do_work(osb);
3651	}
3652
3653	osb->dc_task = NULL;
3654	return status;
3655}
3656
3657void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
3658{
3659	spin_lock(&osb->dc_task_lock);
3660	/* make sure the voting thread gets a swipe at whatever changes
3661	 * the caller may have made to the voting state */
3662	osb->dc_wake_sequence++;
3663	spin_unlock(&osb->dc_task_lock);
3664	wake_up(&osb->dc_event);
3665}
3666