1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28 * Developed under the sponsorship of the US Government under
29 * Subcontract No. B514193
30 *
31 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32 * Use is subject to license terms.
33 *
34 * Copyright (c) 2010, 2012, Intel Corporation.
35 */
36/*
37 * This file is part of Lustre, http://www.lustre.org/
38 * Lustre is a trademark of Sun Microsystems, Inc.
39 */
40
41/**
42 * This file implements POSIX lock type for Lustre.
43 * Its policy properties are start and end of extent and PID.
44 *
45 * These locks are only done through MDS due to POSIX semantics requiring
46 * e.g. that locks could be only partially released and as such split into
47 * two parts, and also that two adjacent locks from the same process may be
48 * merged into a single wider lock.
49 *
50 * Lock modes are mapped like this:
51 * PR and PW for READ and WRITE locks
52 * NL to request a releasing of a portion of the lock
53 *
54 * These flock locks never timeout.
55 */
56
57#define DEBUG_SUBSYSTEM S_LDLM
58
59#include "../include/lustre_dlm.h"
60#include "../include/obd_support.h"
61#include "../include/obd_class.h"
62#include "../include/lustre_lib.h"
63#include <linux/list.h>
64#include "ldlm_internal.h"
65
66int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
67			    void *data, int flag);
68
69/**
70 * list_for_remaining_safe - iterate over the remaining entries in a list
71 *	      and safeguard against removal of a list entry.
72 * \param pos   the &struct list_head to use as a loop counter. pos MUST
73 *	      have been initialized prior to using it in this macro.
74 * \param n     another &struct list_head to use as temporary storage
75 * \param head  the head for your list.
76 */
77#define list_for_remaining_safe(pos, n, head) \
78	for (n = pos->next; pos != (head); pos = n, n = pos->next)
79
80static inline int
81ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
82{
83	return((new->l_policy_data.l_flock.owner ==
84		lock->l_policy_data.l_flock.owner) &&
85	       (new->l_export == lock->l_export));
86}
87
88static inline int
89ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
90{
91	return((new->l_policy_data.l_flock.start <=
92		lock->l_policy_data.l_flock.end) &&
93	       (new->l_policy_data.l_flock.end >=
94		lock->l_policy_data.l_flock.start));
95}
96
97static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
98					    struct ldlm_lock *lock)
99{
100	/* For server only */
101	if (req->l_export == NULL)
102		return;
103
104	LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
105
106	req->l_policy_data.l_flock.blocking_owner =
107		lock->l_policy_data.l_flock.owner;
108	req->l_policy_data.l_flock.blocking_export =
109		lock->l_export;
110	req->l_policy_data.l_flock.blocking_refs = 0;
111
112	cfs_hash_add(req->l_export->exp_flock_hash,
113		     &req->l_policy_data.l_flock.owner,
114		     &req->l_exp_flock_hash);
115}
116
117static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
118{
119	/* For server only */
120	if (req->l_export == NULL)
121		return;
122
123	check_res_locked(req->l_resource);
124	if (req->l_export->exp_flock_hash != NULL &&
125	    !hlist_unhashed(&req->l_exp_flock_hash))
126		cfs_hash_del(req->l_export->exp_flock_hash,
127			     &req->l_policy_data.l_flock.owner,
128			     &req->l_exp_flock_hash);
129}
130
131static inline void
132ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
133{
134	LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
135		   mode, flags);
136
137	/* Safe to not lock here, since it should be empty anyway */
138	LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
139
140	list_del_init(&lock->l_res_link);
141	if (flags == LDLM_FL_WAIT_NOREPROC &&
142	    !(lock->l_flags & LDLM_FL_FAILED)) {
143		/* client side - set a flag to prevent sending a CANCEL */
144		lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
145
146		/* when reaching here, it is under lock_res_and_lock(). Thus,
147		   need call the nolock version of ldlm_lock_decref_internal*/
148		ldlm_lock_decref_internal_nolock(lock, mode);
149	}
150
151	ldlm_lock_destroy_nolock(lock);
152}
153
154/**
155 * POSIX locks deadlock detection code.
156 *
157 * Given a new lock \a req and an existing lock \a bl_lock it conflicts
158 * with, we need to iterate through all blocked POSIX locks for this
159 * export and see if there is a deadlock condition arising. (i.e. when
160 * one client holds a lock on something and want a lock on something
161 * else and at the same time another client has the opposite situation).
162 */
163static int
164ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
165{
166	struct obd_export *req_exp = req->l_export;
167	struct obd_export *bl_exp = bl_lock->l_export;
168	__u64 req_owner = req->l_policy_data.l_flock.owner;
169	__u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
170
171	/* For server only */
172	if (req_exp == NULL)
173		return 0;
174
175	class_export_get(bl_exp);
176	while (1) {
177		struct obd_export *bl_exp_new;
178		struct ldlm_lock *lock = NULL;
179		struct ldlm_flock *flock;
180
181		if (bl_exp->exp_flock_hash != NULL)
182			lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
183					       &bl_owner);
184		if (lock == NULL)
185			break;
186
187		LASSERT(req != lock);
188		flock = &lock->l_policy_data.l_flock;
189		LASSERT(flock->owner == bl_owner);
190		bl_owner = flock->blocking_owner;
191		bl_exp_new = class_export_get(flock->blocking_export);
192		class_export_put(bl_exp);
193
194		cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
195		bl_exp = bl_exp_new;
196
197		if (bl_owner == req_owner && bl_exp == req_exp) {
198			class_export_put(bl_exp);
199			return 1;
200		}
201	}
202	class_export_put(bl_exp);
203
204	return 0;
205}
206
207static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
208					  struct list_head *work_list)
209{
210	CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
211
212	if ((exp_connect_flags(lock->l_export) &
213				OBD_CONNECT_FLOCK_DEAD) == 0) {
214		CERROR(
215		      "deadlock found, but client doesn't support flock canceliation\n");
216	} else {
217		LASSERT(lock->l_completion_ast);
218		LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
219		lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
220			LDLM_FL_FLOCK_DEADLOCK;
221		ldlm_flock_blocking_unlink(lock);
222		ldlm_resource_unlink_lock(lock);
223		ldlm_add_ast_work_item(lock, NULL, work_list);
224	}
225}
226
227/**
228 * Process a granting attempt for flock lock.
229 * Must be called under ns lock held.
230 *
231 * This function looks for any conflicts for \a lock in the granted or
232 * waiting queues. The lock is granted if no conflicts are found in
233 * either queue.
234 *
235 * It is also responsible for splitting a lock if a portion of the lock
236 * is released.
237 *
238 * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
239 *   - blocking ASTs have already been sent
240 *
241 * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
242 *   - blocking ASTs have not been sent yet, so list of conflicting locks
243 *     would be collected and ASTs sent.
244 */
245int
246ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
247			ldlm_error_t *err, struct list_head *work_list)
248{
249	struct ldlm_resource *res = req->l_resource;
250	struct ldlm_namespace *ns = ldlm_res_to_ns(res);
251	struct list_head *tmp;
252	struct list_head *ownlocks = NULL;
253	struct ldlm_lock *lock = NULL;
254	struct ldlm_lock *new = req;
255	struct ldlm_lock *new2 = NULL;
256	ldlm_mode_t mode = req->l_req_mode;
257	int local = ns_is_client(ns);
258	int added = (mode == LCK_NL);
259	int overlaps = 0;
260	int splitted = 0;
261	const struct ldlm_callback_suite null_cbs = { NULL };
262
263	CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
264	       *flags, new->l_policy_data.l_flock.owner,
265	       new->l_policy_data.l_flock.pid, mode,
266	       req->l_policy_data.l_flock.start,
267	       req->l_policy_data.l_flock.end);
268
269	*err = ELDLM_OK;
270
271	if (local) {
272		/* No blocking ASTs are sent to the clients for
273		 * Posix file & record locks */
274		req->l_blocking_ast = NULL;
275	} else {
276		/* Called on the server for lock cancels. */
277		req->l_blocking_ast = ldlm_flock_blocking_ast;
278	}
279
280reprocess:
281	if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
282		/* This loop determines where this processes locks start
283		 * in the resource lr_granted list. */
284		list_for_each(tmp, &res->lr_granted) {
285			lock = list_entry(tmp, struct ldlm_lock,
286					      l_res_link);
287			if (ldlm_same_flock_owner(lock, req)) {
288				ownlocks = tmp;
289				break;
290			}
291		}
292	} else {
293		int reprocess_failed = 0;
294		lockmode_verify(mode);
295
296		/* This loop determines if there are existing locks
297		 * that conflict with the new lock request. */
298		list_for_each(tmp, &res->lr_granted) {
299			lock = list_entry(tmp, struct ldlm_lock,
300					      l_res_link);
301
302			if (ldlm_same_flock_owner(lock, req)) {
303				if (!ownlocks)
304					ownlocks = tmp;
305				continue;
306			}
307
308			/* locks are compatible, overlap doesn't matter */
309			if (lockmode_compat(lock->l_granted_mode, mode))
310				continue;
311
312			if (!ldlm_flocks_overlap(lock, req))
313				continue;
314
315			if (!first_enq) {
316				reprocess_failed = 1;
317				if (ldlm_flock_deadlock(req, lock)) {
318					ldlm_flock_cancel_on_deadlock(req,
319							work_list);
320					return LDLM_ITER_CONTINUE;
321				}
322				continue;
323			}
324
325			if (*flags & LDLM_FL_BLOCK_NOWAIT) {
326				ldlm_flock_destroy(req, mode, *flags);
327				*err = -EAGAIN;
328				return LDLM_ITER_STOP;
329			}
330
331			if (*flags & LDLM_FL_TEST_LOCK) {
332				ldlm_flock_destroy(req, mode, *flags);
333				req->l_req_mode = lock->l_granted_mode;
334				req->l_policy_data.l_flock.pid =
335					lock->l_policy_data.l_flock.pid;
336				req->l_policy_data.l_flock.start =
337					lock->l_policy_data.l_flock.start;
338				req->l_policy_data.l_flock.end =
339					lock->l_policy_data.l_flock.end;
340				*flags |= LDLM_FL_LOCK_CHANGED;
341				return LDLM_ITER_STOP;
342			}
343
344			/* add lock to blocking list before deadlock
345			 * check to prevent race */
346			ldlm_flock_blocking_link(req, lock);
347
348			if (ldlm_flock_deadlock(req, lock)) {
349				ldlm_flock_blocking_unlink(req);
350				ldlm_flock_destroy(req, mode, *flags);
351				*err = -EDEADLK;
352				return LDLM_ITER_STOP;
353			}
354
355			ldlm_resource_add_lock(res, &res->lr_waiting, req);
356			*flags |= LDLM_FL_BLOCK_GRANTED;
357			return LDLM_ITER_STOP;
358		}
359		if (reprocess_failed)
360			return LDLM_ITER_CONTINUE;
361	}
362
363	if (*flags & LDLM_FL_TEST_LOCK) {
364		ldlm_flock_destroy(req, mode, *flags);
365		req->l_req_mode = LCK_NL;
366		*flags |= LDLM_FL_LOCK_CHANGED;
367		return LDLM_ITER_STOP;
368	}
369
370	/* In case we had slept on this lock request take it off of the
371	 * deadlock detection hash list. */
372	ldlm_flock_blocking_unlink(req);
373
374	/* Scan the locks owned by this process that overlap this request.
375	 * We may have to merge or split existing locks. */
376
377	if (!ownlocks)
378		ownlocks = &res->lr_granted;
379
380	list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
381		lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
382
383		if (!ldlm_same_flock_owner(lock, new))
384			break;
385
386		if (lock->l_granted_mode == mode) {
387			/* If the modes are the same then we need to process
388			 * locks that overlap OR adjoin the new lock. The extra
389			 * logic condition is necessary to deal with arithmetic
390			 * overflow and underflow. */
391			if ((new->l_policy_data.l_flock.start >
392			     (lock->l_policy_data.l_flock.end + 1))
393			    && (lock->l_policy_data.l_flock.end !=
394				OBD_OBJECT_EOF))
395				continue;
396
397			if ((new->l_policy_data.l_flock.end <
398			     (lock->l_policy_data.l_flock.start - 1))
399			    && (lock->l_policy_data.l_flock.start != 0))
400				break;
401
402			if (new->l_policy_data.l_flock.start <
403			    lock->l_policy_data.l_flock.start) {
404				lock->l_policy_data.l_flock.start =
405					new->l_policy_data.l_flock.start;
406			} else {
407				new->l_policy_data.l_flock.start =
408					lock->l_policy_data.l_flock.start;
409			}
410
411			if (new->l_policy_data.l_flock.end >
412			    lock->l_policy_data.l_flock.end) {
413				lock->l_policy_data.l_flock.end =
414					new->l_policy_data.l_flock.end;
415			} else {
416				new->l_policy_data.l_flock.end =
417					lock->l_policy_data.l_flock.end;
418			}
419
420			if (added) {
421				ldlm_flock_destroy(lock, mode, *flags);
422			} else {
423				new = lock;
424				added = 1;
425			}
426			continue;
427		}
428
429		if (new->l_policy_data.l_flock.start >
430		    lock->l_policy_data.l_flock.end)
431			continue;
432
433		if (new->l_policy_data.l_flock.end <
434		    lock->l_policy_data.l_flock.start)
435			break;
436
437		++overlaps;
438
439		if (new->l_policy_data.l_flock.start <=
440		    lock->l_policy_data.l_flock.start) {
441			if (new->l_policy_data.l_flock.end <
442			    lock->l_policy_data.l_flock.end) {
443				lock->l_policy_data.l_flock.start =
444					new->l_policy_data.l_flock.end + 1;
445				break;
446			}
447			ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
448			continue;
449		}
450		if (new->l_policy_data.l_flock.end >=
451		    lock->l_policy_data.l_flock.end) {
452			lock->l_policy_data.l_flock.end =
453				new->l_policy_data.l_flock.start - 1;
454			continue;
455		}
456
457		/* split the existing lock into two locks */
458
459		/* if this is an F_UNLCK operation then we could avoid
460		 * allocating a new lock and use the req lock passed in
461		 * with the request but this would complicate the reply
462		 * processing since updates to req get reflected in the
463		 * reply. The client side replays the lock request so
464		 * it must see the original lock data in the reply. */
465
466		/* XXX - if ldlm_lock_new() can sleep we should
467		 * release the lr_lock, allocate the new lock,
468		 * and restart processing this lock. */
469		if (!new2) {
470			unlock_res_and_lock(req);
471			new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
472						lock->l_granted_mode, &null_cbs,
473						NULL, 0, LVB_T_NONE);
474			lock_res_and_lock(req);
475			if (!new2) {
476				ldlm_flock_destroy(req, lock->l_granted_mode,
477						   *flags);
478				*err = -ENOLCK;
479				return LDLM_ITER_STOP;
480			}
481			goto reprocess;
482		}
483
484		splitted = 1;
485
486		new2->l_granted_mode = lock->l_granted_mode;
487		new2->l_policy_data.l_flock.pid =
488			new->l_policy_data.l_flock.pid;
489		new2->l_policy_data.l_flock.owner =
490			new->l_policy_data.l_flock.owner;
491		new2->l_policy_data.l_flock.start =
492			lock->l_policy_data.l_flock.start;
493		new2->l_policy_data.l_flock.end =
494			new->l_policy_data.l_flock.start - 1;
495		lock->l_policy_data.l_flock.start =
496			new->l_policy_data.l_flock.end + 1;
497		new2->l_conn_export = lock->l_conn_export;
498		if (lock->l_export != NULL) {
499			new2->l_export = class_export_lock_get(lock->l_export, new2);
500			if (new2->l_export->exp_lock_hash &&
501			    hlist_unhashed(&new2->l_exp_hash))
502				cfs_hash_add(new2->l_export->exp_lock_hash,
503					     &new2->l_remote_handle,
504					     &new2->l_exp_hash);
505		}
506		if (*flags == LDLM_FL_WAIT_NOREPROC)
507			ldlm_lock_addref_internal_nolock(new2,
508							 lock->l_granted_mode);
509
510		/* insert new2 at lock */
511		ldlm_resource_add_lock(res, ownlocks, new2);
512		LDLM_LOCK_RELEASE(new2);
513		break;
514	}
515
516	/* if new2 is created but never used, destroy it*/
517	if (splitted == 0 && new2 != NULL)
518		ldlm_lock_destroy_nolock(new2);
519
520	/* At this point we're granting the lock request. */
521	req->l_granted_mode = req->l_req_mode;
522
523	/* Add req to the granted queue before calling ldlm_reprocess_all(). */
524	if (!added) {
525		list_del_init(&req->l_res_link);
526		/* insert new lock before ownlocks in list. */
527		ldlm_resource_add_lock(res, ownlocks, req);
528	}
529
530	if (*flags != LDLM_FL_WAIT_NOREPROC) {
531		/* The only one possible case for client-side calls flock
532		 * policy function is ldlm_flock_completion_ast inside which
533		 * carries LDLM_FL_WAIT_NOREPROC flag. */
534		CERROR("Illegal parameter for client-side-only module.\n");
535		LBUG();
536	}
537
538	/* In case we're reprocessing the requested lock we can't destroy
539	 * it until after calling ldlm_add_ast_work_item() above so that laawi()
540	 * can bump the reference count on \a req. Otherwise \a req
541	 * could be freed before the completion AST can be sent.  */
542	if (added)
543		ldlm_flock_destroy(req, mode, *flags);
544
545	ldlm_resource_dump(D_INFO, res);
546	return LDLM_ITER_CONTINUE;
547}
548
549struct ldlm_flock_wait_data {
550	struct ldlm_lock *fwd_lock;
551	int	       fwd_generation;
552};
553
554static void
555ldlm_flock_interrupted_wait(void *data)
556{
557	struct ldlm_lock *lock;
558
559	lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
560
561	/* take lock off the deadlock detection hash list. */
562	lock_res_and_lock(lock);
563	ldlm_flock_blocking_unlink(lock);
564
565	/* client side - set flag to prevent lock from being put on LRU list */
566	lock->l_flags |= LDLM_FL_CBPENDING;
567	unlock_res_and_lock(lock);
568}
569
570/**
571 * Flock completion callback function.
572 *
573 * \param lock [in,out]: A lock to be handled
574 * \param flags    [in]: flags
575 * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
576 *
577 * \retval 0    : success
578 * \retval <0   : failure
579 */
580int
581ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
582{
583	struct file_lock		*getlk = lock->l_ast_data;
584	struct obd_device	      *obd;
585	struct obd_import	      *imp = NULL;
586	struct ldlm_flock_wait_data     fwd;
587	struct l_wait_info	      lwi;
588	ldlm_error_t		    err;
589	int			     rc = 0;
590
591	CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
592	       flags, data, getlk);
593
594	/* Import invalidation. We need to actually release the lock
595	 * references being held, so that it can go away. No point in
596	 * holding the lock even if app still believes it has it, since
597	 * server already dropped it anyway. Only for granted locks too. */
598	if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
599	    (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
600		if (lock->l_req_mode == lock->l_granted_mode &&
601		    lock->l_granted_mode != LCK_NL &&
602		    NULL == data)
603			ldlm_lock_decref_internal(lock, lock->l_req_mode);
604
605		/* Need to wake up the waiter if we were evicted */
606		wake_up(&lock->l_waitq);
607		return 0;
608	}
609
610	LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
611
612	if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
613		       LDLM_FL_BLOCK_CONV))) {
614		if (NULL == data)
615			/* mds granted the lock in the reply */
616			goto granted;
617		/* CP AST RPC: lock get granted, wake it up */
618		wake_up(&lock->l_waitq);
619		return 0;
620	}
621
622	LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
623		   "sleeping");
624	fwd.fwd_lock = lock;
625	obd = class_exp2obd(lock->l_conn_export);
626
627	/* if this is a local lock, there is no import */
628	if (NULL != obd)
629		imp = obd->u.cli.cl_import;
630
631	if (NULL != imp) {
632		spin_lock(&imp->imp_lock);
633		fwd.fwd_generation = imp->imp_generation;
634		spin_unlock(&imp->imp_lock);
635	}
636
637	lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
638
639	/* Go to sleep until the lock is granted. */
640	rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
641
642	if (rc) {
643		LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
644			   rc);
645		return rc;
646	}
647
648granted:
649	OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
650
651	if (lock->l_flags & LDLM_FL_DESTROYED) {
652		LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
653		return 0;
654	}
655
656	if (lock->l_flags & LDLM_FL_FAILED) {
657		LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
658		return -EIO;
659	}
660
661	if (rc) {
662		LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
663			   rc);
664		return rc;
665	}
666
667	LDLM_DEBUG(lock, "client-side enqueue granted");
668
669	lock_res_and_lock(lock);
670
671	/* take lock off the deadlock detection hash list. */
672	ldlm_flock_blocking_unlink(lock);
673
674	/* ldlm_lock_enqueue() has already placed lock on the granted list. */
675	list_del_init(&lock->l_res_link);
676
677	if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
678		LDLM_DEBUG(lock, "client-side enqueue deadlock received");
679		rc = -EDEADLK;
680	} else if (flags & LDLM_FL_TEST_LOCK) {
681		/* fcntl(F_GETLK) request */
682		/* The old mode was saved in getlk->fl_type so that if the mode
683		 * in the lock changes we can decref the appropriate refcount.*/
684		ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
685		switch (lock->l_granted_mode) {
686		case LCK_PR:
687			getlk->fl_type = F_RDLCK;
688			break;
689		case LCK_PW:
690			getlk->fl_type = F_WRLCK;
691			break;
692		default:
693			getlk->fl_type = F_UNLCK;
694		}
695		getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
696		getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
697		getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
698	} else {
699		__u64 noreproc = LDLM_FL_WAIT_NOREPROC;
700
701		/* We need to reprocess the lock to do merges or splits
702		 * with existing locks owned by this process. */
703		ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
704	}
705	unlock_res_and_lock(lock);
706	return rc;
707}
708EXPORT_SYMBOL(ldlm_flock_completion_ast);
709
710int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
711			    void *data, int flag)
712{
713	LASSERT(lock);
714	LASSERT(flag == LDLM_CB_CANCELING);
715
716	/* take lock off the deadlock detection hash list. */
717	lock_res_and_lock(lock);
718	ldlm_flock_blocking_unlink(lock);
719	unlock_res_and_lock(lock);
720	return 0;
721}
722
723void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
724				       ldlm_policy_data_t *lpolicy)
725{
726	memset(lpolicy, 0, sizeof(*lpolicy));
727	lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
728	lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
729	lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
730	/* Compat code, old clients had no idea about owner field and
731	 * relied solely on pid for ownership. Introduced in LU-104, 2.1,
732	 * April 2011 */
733	lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
734}
735
736
737void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
738				       ldlm_policy_data_t *lpolicy)
739{
740	memset(lpolicy, 0, sizeof(*lpolicy));
741	lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
742	lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
743	lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
744	lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
745}
746
747void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
748				     ldlm_wire_policy_data_t *wpolicy)
749{
750	memset(wpolicy, 0, sizeof(*wpolicy));
751	wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
752	wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
753	wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
754	wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
755}
756
757/*
758 * Export handle<->flock hash operations.
759 */
760static unsigned
761ldlm_export_flock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
762{
763	return cfs_hash_u64_hash(*(__u64 *)key, mask);
764}
765
766static void *
767ldlm_export_flock_key(struct hlist_node *hnode)
768{
769	struct ldlm_lock *lock;
770
771	lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
772	return &lock->l_policy_data.l_flock.owner;
773}
774
775static int
776ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
777{
778	return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
779}
780
781static void *
782ldlm_export_flock_object(struct hlist_node *hnode)
783{
784	return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
785}
786
787static void
788ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
789{
790	struct ldlm_lock *lock;
791	struct ldlm_flock *flock;
792
793	lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
794	LDLM_LOCK_GET(lock);
795
796	flock = &lock->l_policy_data.l_flock;
797	LASSERT(flock->blocking_export != NULL);
798	class_export_get(flock->blocking_export);
799	flock->blocking_refs++;
800}
801
802static void
803ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
804{
805	struct ldlm_lock *lock;
806	struct ldlm_flock *flock;
807
808	lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
809	LDLM_LOCK_RELEASE(lock);
810
811	flock = &lock->l_policy_data.l_flock;
812	LASSERT(flock->blocking_export != NULL);
813	class_export_put(flock->blocking_export);
814	if (--flock->blocking_refs == 0) {
815		flock->blocking_owner = 0;
816		flock->blocking_export = NULL;
817	}
818}
819
820static cfs_hash_ops_t ldlm_export_flock_ops = {
821	.hs_hash	= ldlm_export_flock_hash,
822	.hs_key	 = ldlm_export_flock_key,
823	.hs_keycmp      = ldlm_export_flock_keycmp,
824	.hs_object      = ldlm_export_flock_object,
825	.hs_get	 = ldlm_export_flock_get,
826	.hs_put	 = ldlm_export_flock_put,
827	.hs_put_locked  = ldlm_export_flock_put,
828};
829
830int ldlm_init_flock_export(struct obd_export *exp)
831{
832	if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
833		return 0;
834
835	exp->exp_flock_hash =
836		cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
837				HASH_EXP_LOCK_CUR_BITS,
838				HASH_EXP_LOCK_MAX_BITS,
839				HASH_EXP_LOCK_BKT_BITS, 0,
840				CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
841				&ldlm_export_flock_ops,
842				CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
843	if (!exp->exp_flock_hash)
844		return -ENOMEM;
845
846	return 0;
847}
848EXPORT_SYMBOL(ldlm_init_flock_export);
849
850void ldlm_destroy_flock_export(struct obd_export *exp)
851{
852	if (exp->exp_flock_hash) {
853		cfs_hash_putref(exp->exp_flock_hash);
854		exp->exp_flock_hash = NULL;
855	}
856}
857EXPORT_SYMBOL(ldlm_destroy_flock_export);
858