1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * Client Lustre Object.
37 *
38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
39 */
40
41/*
42 * Locking.
43 *
44 *  i_mutex
45 *      PG_locked
46 *	  ->coh_page_guard
47 *	  ->coh_lock_guard
48 *	  ->coh_attr_guard
49 *	  ->ls_guard
50 */
51
52#define DEBUG_SUBSYSTEM S_CLASS
53
54#include "../../include/linux/libcfs/libcfs.h"
55/* class_put_type() */
56#include "../include/obd_class.h"
57#include "../include/obd_support.h"
58#include "../include/lustre_fid.h"
59#include <linux/list.h>
60#include "../../include/linux/libcfs/libcfs_hash.h"	/* for cfs_hash stuff */
61#include "../include/cl_object.h"
62#include "cl_internal.h"
63
64static struct kmem_cache *cl_env_kmem;
65
66/** Lock class of cl_object_header::coh_page_guard */
67static struct lock_class_key cl_page_guard_class;
68/** Lock class of cl_object_header::coh_lock_guard */
69static struct lock_class_key cl_lock_guard_class;
70/** Lock class of cl_object_header::coh_attr_guard */
71static struct lock_class_key cl_attr_guard_class;
72
73extern __u32 lu_context_tags_default;
74extern __u32 lu_session_tags_default;
75/**
76 * Initialize cl_object_header.
77 */
78int cl_object_header_init(struct cl_object_header *h)
79{
80	int result;
81
82	result = lu_object_header_init(&h->coh_lu);
83	if (result == 0) {
84		spin_lock_init(&h->coh_page_guard);
85		spin_lock_init(&h->coh_lock_guard);
86		spin_lock_init(&h->coh_attr_guard);
87		lockdep_set_class(&h->coh_page_guard, &cl_page_guard_class);
88		lockdep_set_class(&h->coh_lock_guard, &cl_lock_guard_class);
89		lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
90		h->coh_pages = 0;
91		/* XXX hard coded GFP_* mask. */
92		INIT_RADIX_TREE(&h->coh_tree, GFP_ATOMIC);
93		INIT_LIST_HEAD(&h->coh_locks);
94		h->coh_page_bufsize = ALIGN(sizeof(struct cl_page), 8);
95	}
96	return result;
97}
98EXPORT_SYMBOL(cl_object_header_init);
99
100/**
101 * Finalize cl_object_header.
102 */
103void cl_object_header_fini(struct cl_object_header *h)
104{
105	LASSERT(list_empty(&h->coh_locks));
106	lu_object_header_fini(&h->coh_lu);
107}
108EXPORT_SYMBOL(cl_object_header_fini);
109
110/**
111 * Returns a cl_object with a given \a fid.
112 *
113 * Returns either cached or newly created object. Additional reference on the
114 * returned object is acquired.
115 *
116 * \see lu_object_find(), cl_page_find(), cl_lock_find()
117 */
118struct cl_object *cl_object_find(const struct lu_env *env,
119				 struct cl_device *cd, const struct lu_fid *fid,
120				 const struct cl_object_conf *c)
121{
122	might_sleep();
123	return lu2cl(lu_object_find_slice(env, cl2lu_dev(cd), fid, &c->coc_lu));
124}
125EXPORT_SYMBOL(cl_object_find);
126
127/**
128 * Releases a reference on \a o.
129 *
130 * When last reference is released object is returned to the cache, unless
131 * lu_object_header_flags::LU_OBJECT_HEARD_BANSHEE bit is set in its header.
132 *
133 * \see cl_page_put(), cl_lock_put().
134 */
135void cl_object_put(const struct lu_env *env, struct cl_object *o)
136{
137	lu_object_put(env, &o->co_lu);
138}
139EXPORT_SYMBOL(cl_object_put);
140
141/**
142 * Acquire an additional reference to the object \a o.
143 *
144 * This can only be used to acquire _additional_ reference, i.e., caller
145 * already has to possess at least one reference to \a o before calling this.
146 *
147 * \see cl_page_get(), cl_lock_get().
148 */
149void cl_object_get(struct cl_object *o)
150{
151	lu_object_get(&o->co_lu);
152}
153EXPORT_SYMBOL(cl_object_get);
154
155/**
156 * Returns the top-object for a given \a o.
157 *
158 * \see cl_page_top(), cl_io_top()
159 */
160struct cl_object *cl_object_top(struct cl_object *o)
161{
162	struct cl_object_header *hdr = cl_object_header(o);
163	struct cl_object *top;
164
165	while (hdr->coh_parent != NULL)
166		hdr = hdr->coh_parent;
167
168	top = lu2cl(lu_object_top(&hdr->coh_lu));
169	CDEBUG(D_TRACE, "%p -> %p\n", o, top);
170	return top;
171}
172EXPORT_SYMBOL(cl_object_top);
173
174/**
175 * Returns pointer to the lock protecting data-attributes for the given object
176 * \a o.
177 *
178 * Data-attributes are protected by the cl_object_header::coh_attr_guard
179 * spin-lock in the top-object.
180 *
181 * \see cl_attr, cl_object_attr_lock(), cl_object_operations::coo_attr_get().
182 */
183static spinlock_t *cl_object_attr_guard(struct cl_object *o)
184{
185	return &cl_object_header(cl_object_top(o))->coh_attr_guard;
186}
187
188/**
189 * Locks data-attributes.
190 *
191 * Prevents data-attributes from changing, until lock is released by
192 * cl_object_attr_unlock(). This has to be called before calls to
193 * cl_object_attr_get(), cl_object_attr_set().
194 */
195void cl_object_attr_lock(struct cl_object *o)
196{
197	spin_lock(cl_object_attr_guard(o));
198}
199EXPORT_SYMBOL(cl_object_attr_lock);
200
201/**
202 * Releases data-attributes lock, acquired by cl_object_attr_lock().
203 */
204void cl_object_attr_unlock(struct cl_object *o)
205{
206	spin_unlock(cl_object_attr_guard(o));
207}
208EXPORT_SYMBOL(cl_object_attr_unlock);
209
210/**
211 * Returns data-attributes of an object \a obj.
212 *
213 * Every layer is asked (by calling cl_object_operations::coo_attr_get())
214 * top-to-bottom to fill in parts of \a attr that this layer is responsible
215 * for.
216 */
217int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj,
218		       struct cl_attr *attr)
219{
220	struct lu_object_header *top;
221	int result;
222
223	assert_spin_locked(cl_object_attr_guard(obj));
224
225	top = obj->co_lu.lo_header;
226	result = 0;
227	list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
228		if (obj->co_ops->coo_attr_get != NULL) {
229			result = obj->co_ops->coo_attr_get(env, obj, attr);
230			if (result != 0) {
231				if (result > 0)
232					result = 0;
233				break;
234			}
235		}
236	}
237	return result;
238}
239EXPORT_SYMBOL(cl_object_attr_get);
240
241/**
242 * Updates data-attributes of an object \a obj.
243 *
244 * Only attributes, mentioned in a validness bit-mask \a v are
245 * updated. Calls cl_object_operations::coo_attr_set() on every layer, bottom
246 * to top.
247 */
248int cl_object_attr_set(const struct lu_env *env, struct cl_object *obj,
249		       const struct cl_attr *attr, unsigned v)
250{
251	struct lu_object_header *top;
252	int result;
253
254	assert_spin_locked(cl_object_attr_guard(obj));
255
256	top = obj->co_lu.lo_header;
257	result = 0;
258	list_for_each_entry_reverse(obj, &top->loh_layers,
259					co_lu.lo_linkage) {
260		if (obj->co_ops->coo_attr_set != NULL) {
261			result = obj->co_ops->coo_attr_set(env, obj, attr, v);
262			if (result != 0) {
263				if (result > 0)
264					result = 0;
265				break;
266			}
267		}
268	}
269	return result;
270}
271EXPORT_SYMBOL(cl_object_attr_set);
272
273/**
274 * Notifies layers (bottom-to-top) that glimpse AST was received.
275 *
276 * Layers have to fill \a lvb fields with information that will be shipped
277 * back to glimpse issuer.
278 *
279 * \see cl_lock_operations::clo_glimpse()
280 */
281int cl_object_glimpse(const struct lu_env *env, struct cl_object *obj,
282		      struct ost_lvb *lvb)
283{
284	struct lu_object_header *top;
285	int result;
286
287	top = obj->co_lu.lo_header;
288	result = 0;
289	list_for_each_entry_reverse(obj, &top->loh_layers,
290					co_lu.lo_linkage) {
291		if (obj->co_ops->coo_glimpse != NULL) {
292			result = obj->co_ops->coo_glimpse(env, obj, lvb);
293			if (result != 0)
294				break;
295		}
296	}
297	LU_OBJECT_HEADER(D_DLMTRACE, env, lu_object_top(top),
298			 "size: %llu mtime: %llu atime: %llu ctime: %llu blocks: %llu\n",
299			 lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime,
300			 lvb->lvb_ctime, lvb->lvb_blocks);
301	return result;
302}
303EXPORT_SYMBOL(cl_object_glimpse);
304
305/**
306 * Updates a configuration of an object \a obj.
307 */
308int cl_conf_set(const struct lu_env *env, struct cl_object *obj,
309		const struct cl_object_conf *conf)
310{
311	struct lu_object_header *top;
312	int result;
313
314	top = obj->co_lu.lo_header;
315	result = 0;
316	list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
317		if (obj->co_ops->coo_conf_set != NULL) {
318			result = obj->co_ops->coo_conf_set(env, obj, conf);
319			if (result != 0)
320				break;
321		}
322	}
323	return result;
324}
325EXPORT_SYMBOL(cl_conf_set);
326
327/**
328 * Helper function removing all object locks, and marking object for
329 * deletion. All object pages must have been deleted at this point.
330 *
331 * This is called by cl_inode_fini() and lov_object_delete() to destroy top-
332 * and sub- objects respectively.
333 */
334void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
335{
336	struct cl_object_header *hdr;
337
338	hdr = cl_object_header(obj);
339	LASSERT(hdr->coh_tree.rnode == NULL);
340	LASSERT(hdr->coh_pages == 0);
341
342	set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
343	/*
344	 * Destroy all locks. Object destruction (including cl_inode_fini())
345	 * cannot cancel the locks, because in the case of a local client,
346	 * where client and server share the same thread running
347	 * prune_icache(), this can dead-lock with ldlm_cancel_handler()
348	 * waiting on __wait_on_freeing_inode().
349	 */
350	cl_locks_prune(env, obj, 0);
351}
352EXPORT_SYMBOL(cl_object_kill);
353
354/**
355 * Prunes caches of pages and locks for this object.
356 */
357void cl_object_prune(const struct lu_env *env, struct cl_object *obj)
358{
359	cl_pages_prune(env, obj);
360	cl_locks_prune(env, obj, 1);
361}
362EXPORT_SYMBOL(cl_object_prune);
363
364/**
365 * Check if the object has locks.
366 */
367int cl_object_has_locks(struct cl_object *obj)
368{
369	struct cl_object_header *head = cl_object_header(obj);
370	int has;
371
372	spin_lock(&head->coh_lock_guard);
373	has = list_empty(&head->coh_locks);
374	spin_unlock(&head->coh_lock_guard);
375
376	return (has == 0);
377}
378EXPORT_SYMBOL(cl_object_has_locks);
379
380void cache_stats_init(struct cache_stats *cs, const char *name)
381{
382	int i;
383
384	cs->cs_name = name;
385	for (i = 0; i < CS_NR; i++)
386		atomic_set(&cs->cs_stats[i], 0);
387}
388
389int cache_stats_print(const struct cache_stats *cs, struct seq_file *m, int h)
390{
391	int i;
392	/*
393	 *   lookup    hit    total  cached create
394	 * env: ...... ...... ...... ...... ......
395	 */
396	if (h) {
397		const char *names[CS_NR] = CS_NAMES;
398
399		seq_printf(m, "%6s", " ");
400		for (i = 0; i < CS_NR; i++)
401			seq_printf(m, "%8s", names[i]);
402		seq_printf(m, "\n");
403	}
404
405	seq_printf(m, "%5.5s:", cs->cs_name);
406	for (i = 0; i < CS_NR; i++)
407		seq_printf(m, "%8u", atomic_read(&cs->cs_stats[i]));
408	return 0;
409}
410
411/**
412 * Initialize client site.
413 *
414 * Perform common initialization (lu_site_init()), and initialize statistical
415 * counters. Also perform global initializations on the first call.
416 */
417int cl_site_init(struct cl_site *s, struct cl_device *d)
418{
419	int i;
420	int result;
421
422	result = lu_site_init(&s->cs_lu, &d->cd_lu_dev);
423	if (result == 0) {
424		cache_stats_init(&s->cs_pages, "pages");
425		cache_stats_init(&s->cs_locks, "locks");
426		for (i = 0; i < ARRAY_SIZE(s->cs_pages_state); ++i)
427			atomic_set(&s->cs_pages_state[0], 0);
428		for (i = 0; i < ARRAY_SIZE(s->cs_locks_state); ++i)
429			atomic_set(&s->cs_locks_state[i], 0);
430	}
431	return result;
432}
433EXPORT_SYMBOL(cl_site_init);
434
435/**
436 * Finalize client site. Dual to cl_site_init().
437 */
438void cl_site_fini(struct cl_site *s)
439{
440	lu_site_fini(&s->cs_lu);
441}
442EXPORT_SYMBOL(cl_site_fini);
443
444static struct cache_stats cl_env_stats = {
445	.cs_name    = "envs",
446	.cs_stats = { ATOMIC_INIT(0), }
447};
448
449/**
450 * Outputs client site statistical counters into a buffer. Suitable for
451 * ll_rd_*()-style functions.
452 */
453int cl_site_stats_print(const struct cl_site *site, struct seq_file *m)
454{
455	int i;
456	static const char *pstate[] = {
457		[CPS_CACHED]  = "c",
458		[CPS_OWNED]   = "o",
459		[CPS_PAGEOUT] = "w",
460		[CPS_PAGEIN]  = "r",
461		[CPS_FREEING] = "f"
462	};
463	static const char *lstate[] = {
464		[CLS_NEW]       = "n",
465		[CLS_QUEUING]   = "q",
466		[CLS_ENQUEUED]  = "e",
467		[CLS_HELD]      = "h",
468		[CLS_INTRANSIT] = "t",
469		[CLS_CACHED]    = "c",
470		[CLS_FREEING]   = "f"
471	};
472/*
473       lookup    hit  total   busy create
474pages: ...... ...... ...... ...... ...... [...... ...... ...... ......]
475locks: ...... ...... ...... ...... ...... [...... ...... ...... ...... ......]
476  env: ...... ...... ...... ...... ......
477 */
478	lu_site_stats_print(&site->cs_lu, m);
479	cache_stats_print(&site->cs_pages, m, 1);
480	seq_printf(m, " [");
481	for (i = 0; i < ARRAY_SIZE(site->cs_pages_state); ++i)
482		seq_printf(m, "%s: %u ", pstate[i],
483				atomic_read(&site->cs_pages_state[i]));
484	seq_printf(m, "]\n");
485	cache_stats_print(&site->cs_locks, m, 0);
486	seq_printf(m, " [");
487	for (i = 0; i < ARRAY_SIZE(site->cs_locks_state); ++i)
488		seq_printf(m, "%s: %u ", lstate[i],
489				atomic_read(&site->cs_locks_state[i]));
490	seq_printf(m, "]\n");
491	cache_stats_print(&cl_env_stats, m, 0);
492	seq_printf(m, "\n");
493	return 0;
494}
495EXPORT_SYMBOL(cl_site_stats_print);
496
497/*****************************************************************************
498 *
499 * lu_env handling on client.
500 *
501 */
502
503/**
504 * The most efficient way is to store cl_env pointer in task specific
505 * structures. On Linux, it wont' be easy to use task_struct->journal_info
506 * because Lustre code may call into other fs which has certain assumptions
507 * about journal_info. Currently following fields in task_struct are identified
508 * can be used for this purpose:
509 *  - cl_env: for liblustre.
510 *  - tux_info: only on RedHat kernel.
511 *  - ...
512 * \note As long as we use task_struct to store cl_env, we assume that once
513 * called into Lustre, we'll never call into the other part of the kernel
514 * which will use those fields in task_struct without explicitly exiting
515 * Lustre.
516 *
517 * If there's no space in task_struct is available, hash will be used.
518 * bz20044, bz22683.
519 */
520
521struct cl_env {
522	void	     *ce_magic;
523	struct lu_env     ce_lu;
524	struct lu_context ce_ses;
525
526	/**
527	 * This allows cl_env to be entered into cl_env_hash which implements
528	 * the current thread -> client environment lookup.
529	 */
530	struct hlist_node  ce_node;
531	/**
532	 * Owner for the current cl_env.
533	 *
534	 * If LL_TASK_CL_ENV is defined, this point to the owning current,
535	 * only for debugging purpose ;
536	 * Otherwise hash is used, and this is the key for cfs_hash.
537	 * Now current thread pid is stored. Note using thread pointer would
538	 * lead to unbalanced hash because of its specific allocation locality
539	 * and could be varied for different platforms and OSes, even different
540	 * OS versions.
541	 */
542	void	     *ce_owner;
543
544	/*
545	 * Linkage into global list of all client environments. Used for
546	 * garbage collection.
547	 */
548	struct list_head	ce_linkage;
549	/*
550	 *
551	 */
552	int	       ce_ref;
553	/*
554	 * Debugging field: address of the caller who made original
555	 * allocation.
556	 */
557	void	     *ce_debug;
558};
559
560#define CL_ENV_INC(counter)
561#define CL_ENV_DEC(counter)
562
563static void cl_env_init0(struct cl_env *cle, void *debug)
564{
565	LASSERT(cle->ce_ref == 0);
566	LASSERT(cle->ce_magic == &cl_env_init0);
567	LASSERT(cle->ce_debug == NULL && cle->ce_owner == NULL);
568
569	cle->ce_ref = 1;
570	cle->ce_debug = debug;
571	CL_ENV_INC(busy);
572}
573
574
575/*
576 * The implementation of using hash table to connect cl_env and thread
577 */
578
579static struct cfs_hash *cl_env_hash;
580
581static unsigned cl_env_hops_hash(struct cfs_hash *lh,
582				 const void *key, unsigned mask)
583{
584#if BITS_PER_LONG == 64
585	return cfs_hash_u64_hash((__u64)key, mask);
586#else
587	return cfs_hash_u32_hash((__u32)key, mask);
588#endif
589}
590
591static void *cl_env_hops_obj(struct hlist_node *hn)
592{
593	struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
594	LASSERT(cle->ce_magic == &cl_env_init0);
595	return (void *)cle;
596}
597
598static int cl_env_hops_keycmp(const void *key, struct hlist_node *hn)
599{
600	struct cl_env *cle = cl_env_hops_obj(hn);
601
602	LASSERT(cle->ce_owner != NULL);
603	return (key == cle->ce_owner);
604}
605
606static void cl_env_hops_noop(struct cfs_hash *hs, struct hlist_node *hn)
607{
608	struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
609	LASSERT(cle->ce_magic == &cl_env_init0);
610}
611
612static cfs_hash_ops_t cl_env_hops = {
613	.hs_hash	= cl_env_hops_hash,
614	.hs_key	 = cl_env_hops_obj,
615	.hs_keycmp      = cl_env_hops_keycmp,
616	.hs_object      = cl_env_hops_obj,
617	.hs_get	 = cl_env_hops_noop,
618	.hs_put_locked  = cl_env_hops_noop,
619};
620
621static inline struct cl_env *cl_env_fetch(void)
622{
623	struct cl_env *cle;
624
625	cle = cfs_hash_lookup(cl_env_hash, (void *) (long) current->pid);
626	LASSERT(ergo(cle, cle->ce_magic == &cl_env_init0));
627	return cle;
628}
629
630static inline void cl_env_attach(struct cl_env *cle)
631{
632	if (cle) {
633		int rc;
634
635		LASSERT(cle->ce_owner == NULL);
636		cle->ce_owner = (void *) (long) current->pid;
637		rc = cfs_hash_add_unique(cl_env_hash, cle->ce_owner,
638					 &cle->ce_node);
639		LASSERT(rc == 0);
640	}
641}
642
643static inline void cl_env_do_detach(struct cl_env *cle)
644{
645	void *cookie;
646
647	LASSERT(cle->ce_owner == (void *) (long) current->pid);
648	cookie = cfs_hash_del(cl_env_hash, cle->ce_owner,
649			      &cle->ce_node);
650	LASSERT(cookie == cle);
651	cle->ce_owner = NULL;
652}
653
654static int cl_env_store_init(void) {
655	cl_env_hash = cfs_hash_create("cl_env",
656				      HASH_CL_ENV_BITS, HASH_CL_ENV_BITS,
657				      HASH_CL_ENV_BKT_BITS, 0,
658				      CFS_HASH_MIN_THETA,
659				      CFS_HASH_MAX_THETA,
660				      &cl_env_hops,
661				      CFS_HASH_RW_BKTLOCK);
662	return cl_env_hash != NULL ? 0 :-ENOMEM;
663}
664
665static void cl_env_store_fini(void) {
666	cfs_hash_putref(cl_env_hash);
667}
668
669
670static inline struct cl_env *cl_env_detach(struct cl_env *cle)
671{
672	if (cle == NULL)
673		cle = cl_env_fetch();
674
675	if (cle && cle->ce_owner)
676		cl_env_do_detach(cle);
677
678	return cle;
679}
680
681static struct lu_env *cl_env_new(__u32 ctx_tags, __u32 ses_tags, void *debug)
682{
683	struct lu_env *env;
684	struct cl_env *cle;
685
686	OBD_SLAB_ALLOC_PTR_GFP(cle, cl_env_kmem, GFP_NOFS);
687	if (cle != NULL) {
688		int rc;
689
690		INIT_LIST_HEAD(&cle->ce_linkage);
691		cle->ce_magic = &cl_env_init0;
692		env = &cle->ce_lu;
693		rc = lu_env_init(env, LCT_CL_THREAD|ctx_tags);
694		if (rc == 0) {
695			rc = lu_context_init(&cle->ce_ses,
696					     LCT_SESSION | ses_tags);
697			if (rc == 0) {
698				lu_context_enter(&cle->ce_ses);
699				env->le_ses = &cle->ce_ses;
700				cl_env_init0(cle, debug);
701			} else
702				lu_env_fini(env);
703		}
704		if (rc != 0) {
705			OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
706			env = ERR_PTR(rc);
707		} else {
708			CL_ENV_INC(create);
709			CL_ENV_INC(total);
710		}
711	} else
712		env = ERR_PTR(-ENOMEM);
713	return env;
714}
715
716static void cl_env_fini(struct cl_env *cle)
717{
718	CL_ENV_DEC(total);
719	lu_context_fini(&cle->ce_lu.le_ctx);
720	lu_context_fini(&cle->ce_ses);
721	OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
722}
723
724static inline struct cl_env *cl_env_container(struct lu_env *env)
725{
726	return container_of(env, struct cl_env, ce_lu);
727}
728
729struct lu_env *cl_env_peek(int *refcheck)
730{
731	struct lu_env *env;
732	struct cl_env *cle;
733
734	CL_ENV_INC(lookup);
735
736	/* check that we don't go far from untrusted pointer */
737	CLASSERT(offsetof(struct cl_env, ce_magic) == 0);
738
739	env = NULL;
740	cle = cl_env_fetch();
741	if (cle != NULL) {
742		CL_ENV_INC(hit);
743		env = &cle->ce_lu;
744		*refcheck = ++cle->ce_ref;
745	}
746	CDEBUG(D_OTHER, "%d@%p\n", cle ? cle->ce_ref : 0, cle);
747	return env;
748}
749EXPORT_SYMBOL(cl_env_peek);
750
751/**
752 * Returns lu_env: if there already is an environment associated with the
753 * current thread, it is returned, otherwise, new environment is allocated.
754 *
755 * \param refcheck pointer to a counter used to detect environment leaks. In
756 * the usual case cl_env_get() and cl_env_put() are called in the same lexical
757 * scope and pointer to the same integer is passed as \a refcheck. This is
758 * used to detect missed cl_env_put().
759 *
760 * \see cl_env_put()
761 */
762struct lu_env *cl_env_get(int *refcheck)
763{
764	struct lu_env *env;
765
766	env = cl_env_peek(refcheck);
767	if (env == NULL) {
768		env = cl_env_new(lu_context_tags_default,
769				 lu_session_tags_default,
770				 __builtin_return_address(0));
771
772		if (!IS_ERR(env)) {
773			struct cl_env *cle;
774
775			cle = cl_env_container(env);
776			cl_env_attach(cle);
777			*refcheck = cle->ce_ref;
778			CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
779		}
780	}
781	return env;
782}
783EXPORT_SYMBOL(cl_env_get);
784
785/**
786 * Forces an allocation of a fresh environment with given tags.
787 *
788 * \see cl_env_get()
789 */
790struct lu_env *cl_env_alloc(int *refcheck, __u32 tags)
791{
792	struct lu_env *env;
793
794	LASSERT(cl_env_peek(refcheck) == NULL);
795	env = cl_env_new(tags, tags, __builtin_return_address(0));
796	if (!IS_ERR(env)) {
797		struct cl_env *cle;
798
799		cle = cl_env_container(env);
800		*refcheck = cle->ce_ref;
801		CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
802	}
803	return env;
804}
805EXPORT_SYMBOL(cl_env_alloc);
806
807static void cl_env_exit(struct cl_env *cle)
808{
809	LASSERT(cle->ce_owner == NULL);
810	lu_context_exit(&cle->ce_lu.le_ctx);
811	lu_context_exit(&cle->ce_ses);
812}
813
814/**
815 * Release an environment.
816 *
817 * Decrement \a env reference counter. When counter drops to 0, nothing in
818 * this thread is using environment and it is returned to the allocation
819 * cache, or freed straight away, if cache is large enough.
820 */
821void cl_env_put(struct lu_env *env, int *refcheck)
822{
823	struct cl_env *cle;
824
825	cle = cl_env_container(env);
826
827	LASSERT(cle->ce_ref > 0);
828	LASSERT(ergo(refcheck != NULL, cle->ce_ref == *refcheck));
829
830	CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
831	if (--cle->ce_ref == 0) {
832		CL_ENV_DEC(busy);
833		cl_env_detach(cle);
834		cle->ce_debug = NULL;
835		cl_env_exit(cle);
836		cl_env_fini(cle);
837	}
838}
839EXPORT_SYMBOL(cl_env_put);
840
841/**
842 * Declares a point of re-entrancy.
843 *
844 * \see cl_env_reexit()
845 */
846void *cl_env_reenter(void)
847{
848	return cl_env_detach(NULL);
849}
850EXPORT_SYMBOL(cl_env_reenter);
851
852/**
853 * Exits re-entrancy.
854 */
855void cl_env_reexit(void *cookie)
856{
857	cl_env_detach(NULL);
858	cl_env_attach(cookie);
859}
860EXPORT_SYMBOL(cl_env_reexit);
861
862/**
863 * Setup user-supplied \a env as a current environment. This is to be used to
864 * guaranteed that environment exists even when cl_env_get() fails. It is up
865 * to user to ensure proper concurrency control.
866 *
867 * \see cl_env_unplant()
868 */
869void cl_env_implant(struct lu_env *env, int *refcheck)
870{
871	struct cl_env *cle = cl_env_container(env);
872
873	LASSERT(cle->ce_ref > 0);
874
875	cl_env_attach(cle);
876	cl_env_get(refcheck);
877	CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
878}
879EXPORT_SYMBOL(cl_env_implant);
880
881/**
882 * Detach environment installed earlier by cl_env_implant().
883 */
884void cl_env_unplant(struct lu_env *env, int *refcheck)
885{
886	struct cl_env *cle = cl_env_container(env);
887
888	LASSERT(cle->ce_ref > 1);
889
890	CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
891
892	cl_env_detach(cle);
893	cl_env_put(env, refcheck);
894}
895EXPORT_SYMBOL(cl_env_unplant);
896
897struct lu_env *cl_env_nested_get(struct cl_env_nest *nest)
898{
899	struct lu_env *env;
900
901	nest->cen_cookie = NULL;
902	env = cl_env_peek(&nest->cen_refcheck);
903	if (env != NULL) {
904		if (!cl_io_is_going(env))
905			return env;
906		else {
907			cl_env_put(env, &nest->cen_refcheck);
908			nest->cen_cookie = cl_env_reenter();
909		}
910	}
911	env = cl_env_get(&nest->cen_refcheck);
912	if (IS_ERR(env)) {
913		cl_env_reexit(nest->cen_cookie);
914		return env;
915	}
916
917	LASSERT(!cl_io_is_going(env));
918	return env;
919}
920EXPORT_SYMBOL(cl_env_nested_get);
921
922void cl_env_nested_put(struct cl_env_nest *nest, struct lu_env *env)
923{
924	cl_env_put(env, &nest->cen_refcheck);
925	cl_env_reexit(nest->cen_cookie);
926}
927EXPORT_SYMBOL(cl_env_nested_put);
928
929/**
930 * Converts struct cl_attr to struct ost_lvb.
931 *
932 * \see cl_lvb2attr
933 */
934void cl_attr2lvb(struct ost_lvb *lvb, const struct cl_attr *attr)
935{
936	lvb->lvb_size   = attr->cat_size;
937	lvb->lvb_mtime  = attr->cat_mtime;
938	lvb->lvb_atime  = attr->cat_atime;
939	lvb->lvb_ctime  = attr->cat_ctime;
940	lvb->lvb_blocks = attr->cat_blocks;
941}
942EXPORT_SYMBOL(cl_attr2lvb);
943
944/**
945 * Converts struct ost_lvb to struct cl_attr.
946 *
947 * \see cl_attr2lvb
948 */
949void cl_lvb2attr(struct cl_attr *attr, const struct ost_lvb *lvb)
950{
951	attr->cat_size   = lvb->lvb_size;
952	attr->cat_mtime  = lvb->lvb_mtime;
953	attr->cat_atime  = lvb->lvb_atime;
954	attr->cat_ctime  = lvb->lvb_ctime;
955	attr->cat_blocks = lvb->lvb_blocks;
956}
957EXPORT_SYMBOL(cl_lvb2attr);
958
959/*****************************************************************************
960 *
961 * Temporary prototype thing: mirror obd-devices into cl devices.
962 *
963 */
964
965struct cl_device *cl_type_setup(const struct lu_env *env, struct lu_site *site,
966				struct lu_device_type *ldt,
967				struct lu_device *next)
968{
969	const char       *typename;
970	struct lu_device *d;
971
972	LASSERT(ldt != NULL);
973
974	typename = ldt->ldt_name;
975	d = ldt->ldt_ops->ldto_device_alloc(env, ldt, NULL);
976	if (!IS_ERR(d)) {
977		int rc;
978
979		if (site != NULL)
980			d->ld_site = site;
981		rc = ldt->ldt_ops->ldto_device_init(env, d, typename, next);
982		if (rc == 0) {
983			lu_device_get(d);
984			lu_ref_add(&d->ld_reference,
985				   "lu-stack", &lu_site_init);
986		} else {
987			ldt->ldt_ops->ldto_device_free(env, d);
988			CERROR("can't init device '%s', %d\n", typename, rc);
989			d = ERR_PTR(rc);
990		}
991	} else
992		CERROR("Cannot allocate device: '%s'\n", typename);
993	return lu2cl_dev(d);
994}
995EXPORT_SYMBOL(cl_type_setup);
996
997/**
998 * Finalize device stack by calling lu_stack_fini().
999 */
1000void cl_stack_fini(const struct lu_env *env, struct cl_device *cl)
1001{
1002	lu_stack_fini(env, cl2lu_dev(cl));
1003}
1004EXPORT_SYMBOL(cl_stack_fini);
1005
1006int  cl_lock_init(void);
1007void cl_lock_fini(void);
1008
1009int  cl_page_init(void);
1010void cl_page_fini(void);
1011
1012static struct lu_context_key cl_key;
1013
1014struct cl_thread_info *cl_env_info(const struct lu_env *env)
1015{
1016	return lu_context_key_get(&env->le_ctx, &cl_key);
1017}
1018
1019/* defines cl0_key_{init,fini}() */
1020LU_KEY_INIT_FINI(cl0, struct cl_thread_info);
1021
1022static void *cl_key_init(const struct lu_context *ctx,
1023			 struct lu_context_key *key)
1024{
1025	struct cl_thread_info *info;
1026
1027	info = cl0_key_init(ctx, key);
1028	if (!IS_ERR(info)) {
1029		int i;
1030
1031		for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1032			lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1033	}
1034	return info;
1035}
1036
1037static void cl_key_fini(const struct lu_context *ctx,
1038			struct lu_context_key *key, void *data)
1039{
1040	struct cl_thread_info *info;
1041	int i;
1042
1043	info = data;
1044	for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1045		lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1046	cl0_key_fini(ctx, key, data);
1047}
1048
1049static void cl_key_exit(const struct lu_context *ctx,
1050			struct lu_context_key *key, void *data)
1051{
1052	struct cl_thread_info *info = data;
1053	int i;
1054
1055	for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i) {
1056		LASSERT(info->clt_counters[i].ctc_nr_held == 0);
1057		LASSERT(info->clt_counters[i].ctc_nr_used == 0);
1058		LASSERT(info->clt_counters[i].ctc_nr_locks_acquired == 0);
1059		LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
1060		lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1061		lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1062	}
1063}
1064
1065static struct lu_context_key cl_key = {
1066	.lct_tags = LCT_CL_THREAD,
1067	.lct_init = cl_key_init,
1068	.lct_fini = cl_key_fini,
1069	.lct_exit = cl_key_exit
1070};
1071
1072static struct lu_kmem_descr cl_object_caches[] = {
1073	{
1074		.ckd_cache = &cl_env_kmem,
1075		.ckd_name  = "cl_env_kmem",
1076		.ckd_size  = sizeof (struct cl_env)
1077	},
1078	{
1079		.ckd_cache = NULL
1080	}
1081};
1082
1083/**
1084 * Global initialization of cl-data. Create kmem caches, register
1085 * lu_context_key's, etc.
1086 *
1087 * \see cl_global_fini()
1088 */
1089int cl_global_init(void)
1090{
1091	int result;
1092
1093	result = cl_env_store_init();
1094	if (result)
1095		return result;
1096
1097	result = lu_kmem_init(cl_object_caches);
1098	if (result)
1099		goto out_store;
1100
1101	LU_CONTEXT_KEY_INIT(&cl_key);
1102	result = lu_context_key_register(&cl_key);
1103	if (result)
1104		goto out_kmem;
1105
1106	result = cl_lock_init();
1107	if (result)
1108		goto out_context;
1109
1110	result = cl_page_init();
1111	if (result)
1112		goto out_lock;
1113
1114	return 0;
1115out_lock:
1116	cl_lock_fini();
1117out_context:
1118	lu_context_key_degister(&cl_key);
1119out_kmem:
1120	lu_kmem_fini(cl_object_caches);
1121out_store:
1122	cl_env_store_fini();
1123	return result;
1124}
1125
1126/**
1127 * Finalization of global cl-data. Dual to cl_global_init().
1128 */
1129void cl_global_fini(void)
1130{
1131	cl_lock_fini();
1132	cl_page_fini();
1133	lu_context_key_degister(&cl_key);
1134	lu_kmem_fini(cl_object_caches);
1135	cl_env_store_fini();
1136}
1137