osc_request.c revision 21aef7d9d654416b8167ad8047a628d3968a97da
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_OSC
38
39#include "../../include/linux/libcfs/libcfs.h"
40
41
42#include "../include/lustre_dlm.h"
43#include "../include/lustre_net.h"
44#include "../include/lustre/lustre_user.h"
45#include "../include/obd_cksum.h"
46#include "../include/obd_ost.h"
47
48#include "../include/lustre_ha.h"
49#include "../include/lprocfs_status.h"
50#include "../include/lustre_log.h"
51#include "../include/lustre_debug.h"
52#include "../include/lustre_param.h"
53#include "../include/lustre_fid.h"
54#include "osc_internal.h"
55#include "osc_cl_internal.h"
56
57static void osc_release_ppga(struct brw_page **ppga, u32 count);
58static int brw_interpret(const struct lu_env *env,
59			 struct ptlrpc_request *req, void *data, int rc);
60int osc_cleanup(struct obd_device *obd);
61
62/* Pack OSC object metadata for disk storage (LE byte order). */
63static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
64		      struct lov_stripe_md *lsm)
65{
66	int lmm_size;
67
68	lmm_size = sizeof(**lmmp);
69	if (lmmp == NULL)
70		return lmm_size;
71
72	if (*lmmp != NULL && lsm == NULL) {
73		OBD_FREE(*lmmp, lmm_size);
74		*lmmp = NULL;
75		return 0;
76	} else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
77		return -EBADF;
78	}
79
80	if (*lmmp == NULL) {
81		OBD_ALLOC(*lmmp, lmm_size);
82		if (*lmmp == NULL)
83			return -ENOMEM;
84	}
85
86	if (lsm)
87		ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
88
89	return lmm_size;
90}
91
92/* Unpack OSC object metadata from disk storage (LE byte order). */
93static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
94			struct lov_mds_md *lmm, int lmm_bytes)
95{
96	int lsm_size;
97	struct obd_import *imp = class_exp2cliimp(exp);
98
99	if (lmm != NULL) {
100		if (lmm_bytes < sizeof(*lmm)) {
101			CERROR("%s: lov_mds_md too small: %d, need %d\n",
102			       exp->exp_obd->obd_name, lmm_bytes,
103			       (int)sizeof(*lmm));
104			return -EINVAL;
105		}
106		/* XXX LOV_MAGIC etc check? */
107
108		if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
109			CERROR("%s: zero lmm_object_id: rc = %d\n",
110			       exp->exp_obd->obd_name, -EINVAL);
111			return -EINVAL;
112		}
113	}
114
115	lsm_size = lov_stripe_md_size(1);
116	if (lsmp == NULL)
117		return lsm_size;
118
119	if (*lsmp != NULL && lmm == NULL) {
120		OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
121		OBD_FREE(*lsmp, lsm_size);
122		*lsmp = NULL;
123		return 0;
124	}
125
126	if (*lsmp == NULL) {
127		OBD_ALLOC(*lsmp, lsm_size);
128		if (unlikely(*lsmp == NULL))
129			return -ENOMEM;
130		OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131		if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
132			OBD_FREE(*lsmp, lsm_size);
133			return -ENOMEM;
134		}
135		loi_init((*lsmp)->lsm_oinfo[0]);
136	} else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
137		return -EBADF;
138	}
139
140	if (lmm != NULL)
141		/* XXX zero *lsmp? */
142		ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
143
144	if (imp != NULL &&
145	    (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
146		(*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
147	else
148		(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
149
150	return lsm_size;
151}
152
153static inline void osc_pack_capa(struct ptlrpc_request *req,
154				 struct ost_body *body, void *capa)
155{
156	struct obd_capa *oc = (struct obd_capa *)capa;
157	struct lustre_capa *c;
158
159	if (!capa)
160		return;
161
162	c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
163	LASSERT(c);
164	capa_cpy(c, oc);
165	body->oa.o_valid |= OBD_MD_FLOSSCAPA;
166	DEBUG_CAPA(D_SEC, c, "pack");
167}
168
169static inline void osc_pack_req_body(struct ptlrpc_request *req,
170				     struct obd_info *oinfo)
171{
172	struct ost_body *body;
173
174	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
175	LASSERT(body);
176
177	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
178			     oinfo->oi_oa);
179	osc_pack_capa(req, body, oinfo->oi_capa);
180}
181
182static inline void osc_set_capa_size(struct ptlrpc_request *req,
183				     const struct req_msg_field *field,
184				     struct obd_capa *oc)
185{
186	if (oc == NULL)
187		req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
188	else
189		/* it is already calculated as sizeof struct obd_capa */
190		;
191}
192
193static int osc_getattr_interpret(const struct lu_env *env,
194				 struct ptlrpc_request *req,
195				 struct osc_async_args *aa, int rc)
196{
197	struct ost_body *body;
198
199	if (rc != 0)
200		GOTO(out, rc);
201
202	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
203	if (body) {
204		CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
205		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
206				     aa->aa_oi->oi_oa, &body->oa);
207
208		/* This should really be sent by the OST */
209		aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
210		aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
211	} else {
212		CDEBUG(D_INFO, "can't unpack ost_body\n");
213		rc = -EPROTO;
214		aa->aa_oi->oi_oa->o_valid = 0;
215	}
216out:
217	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
218	return rc;
219}
220
221static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
222			     struct ptlrpc_request_set *set)
223{
224	struct ptlrpc_request *req;
225	struct osc_async_args *aa;
226	int		    rc;
227
228	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
229	if (req == NULL)
230		return -ENOMEM;
231
232	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
233	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
234	if (rc) {
235		ptlrpc_request_free(req);
236		return rc;
237	}
238
239	osc_pack_req_body(req, oinfo);
240
241	ptlrpc_request_set_replen(req);
242	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
243
244	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
245	aa = ptlrpc_req_async_args(req);
246	aa->aa_oi = oinfo;
247
248	ptlrpc_set_add_req(set, req);
249	return 0;
250}
251
252static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
253		       struct obd_info *oinfo)
254{
255	struct ptlrpc_request *req;
256	struct ost_body       *body;
257	int		    rc;
258
259	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
260	if (req == NULL)
261		return -ENOMEM;
262
263	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
264	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
265	if (rc) {
266		ptlrpc_request_free(req);
267		return rc;
268	}
269
270	osc_pack_req_body(req, oinfo);
271
272	ptlrpc_request_set_replen(req);
273
274	rc = ptlrpc_queue_wait(req);
275	if (rc)
276		GOTO(out, rc);
277
278	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
279	if (body == NULL)
280		GOTO(out, rc = -EPROTO);
281
282	CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
283	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
284			     &body->oa);
285
286	oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
287	oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
288
289 out:
290	ptlrpc_req_finished(req);
291	return rc;
292}
293
294static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
295		       struct obd_info *oinfo, struct obd_trans_info *oti)
296{
297	struct ptlrpc_request *req;
298	struct ost_body       *body;
299	int		    rc;
300
301	LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
302
303	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
304	if (req == NULL)
305		return -ENOMEM;
306
307	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
308	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
309	if (rc) {
310		ptlrpc_request_free(req);
311		return rc;
312	}
313
314	osc_pack_req_body(req, oinfo);
315
316	ptlrpc_request_set_replen(req);
317
318	rc = ptlrpc_queue_wait(req);
319	if (rc)
320		GOTO(out, rc);
321
322	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
323	if (body == NULL)
324		GOTO(out, rc = -EPROTO);
325
326	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
327			     &body->oa);
328
329out:
330	ptlrpc_req_finished(req);
331	return rc;
332}
333
334static int osc_setattr_interpret(const struct lu_env *env,
335				 struct ptlrpc_request *req,
336				 struct osc_setattr_args *sa, int rc)
337{
338	struct ost_body *body;
339
340	if (rc != 0)
341		GOTO(out, rc);
342
343	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344	if (body == NULL)
345		GOTO(out, rc = -EPROTO);
346
347	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
348			     &body->oa);
349out:
350	rc = sa->sa_upcall(sa->sa_cookie, rc);
351	return rc;
352}
353
354int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
355			   struct obd_trans_info *oti,
356			   obd_enqueue_update_f upcall, void *cookie,
357			   struct ptlrpc_request_set *rqset)
358{
359	struct ptlrpc_request   *req;
360	struct osc_setattr_args *sa;
361	int		      rc;
362
363	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
364	if (req == NULL)
365		return -ENOMEM;
366
367	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
368	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
369	if (rc) {
370		ptlrpc_request_free(req);
371		return rc;
372	}
373
374	if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
375		oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
376
377	osc_pack_req_body(req, oinfo);
378
379	ptlrpc_request_set_replen(req);
380
381	/* do mds to ost setattr asynchronously */
382	if (!rqset) {
383		/* Do not wait for response. */
384		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
385	} else {
386		req->rq_interpret_reply =
387			(ptlrpc_interpterer_t)osc_setattr_interpret;
388
389		CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
390		sa = ptlrpc_req_async_args(req);
391		sa->sa_oa = oinfo->oi_oa;
392		sa->sa_upcall = upcall;
393		sa->sa_cookie = cookie;
394
395		if (rqset == PTLRPCD_SET)
396			ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
397		else
398			ptlrpc_set_add_req(rqset, req);
399	}
400
401	return 0;
402}
403
404static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
405			     struct obd_trans_info *oti,
406			     struct ptlrpc_request_set *rqset)
407{
408	return osc_setattr_async_base(exp, oinfo, oti,
409				      oinfo->oi_cb_up, oinfo, rqset);
410}
411
412int osc_real_create(struct obd_export *exp, struct obdo *oa,
413		    struct lov_stripe_md **ea, struct obd_trans_info *oti)
414{
415	struct ptlrpc_request *req;
416	struct ost_body       *body;
417	struct lov_stripe_md  *lsm;
418	int		    rc;
419
420	LASSERT(oa);
421	LASSERT(ea);
422
423	lsm = *ea;
424	if (!lsm) {
425		rc = obd_alloc_memmd(exp, &lsm);
426		if (rc < 0)
427			return rc;
428	}
429
430	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
431	if (req == NULL)
432		GOTO(out, rc = -ENOMEM);
433
434	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
435	if (rc) {
436		ptlrpc_request_free(req);
437		GOTO(out, rc);
438	}
439
440	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441	LASSERT(body);
442
443	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
444
445	ptlrpc_request_set_replen(req);
446
447	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448	    oa->o_flags == OBD_FL_DELORPHAN) {
449		DEBUG_REQ(D_HA, req,
450			  "delorphan from OST integration");
451		/* Don't resend the delorphan req */
452		req->rq_no_resend = req->rq_no_delay = 1;
453	}
454
455	rc = ptlrpc_queue_wait(req);
456	if (rc)
457		GOTO(out_req, rc);
458
459	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
460	if (body == NULL)
461		GOTO(out_req, rc = -EPROTO);
462
463	CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
464	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
465
466	oa->o_blksize = cli_brw_size(exp->exp_obd);
467	oa->o_valid |= OBD_MD_FLBLKSZ;
468
469	/* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470	 * have valid lsm_oinfo data structs, so don't go touching that.
471	 * This needs to be fixed in a big way.
472	 */
473	lsm->lsm_oi = oa->o_oi;
474	*ea = lsm;
475
476	if (oti != NULL) {
477		oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
478
479		if (oa->o_valid & OBD_MD_FLCOOKIE) {
480			if (!oti->oti_logcookies)
481				oti_alloc_cookies(oti, 1);
482			*oti->oti_logcookies = oa->o_lcookie;
483		}
484	}
485
486	CDEBUG(D_HA, "transno: %lld\n",
487	       lustre_msg_get_transno(req->rq_repmsg));
488out_req:
489	ptlrpc_req_finished(req);
490out:
491	if (rc && !*ea)
492		obd_free_memmd(exp, &lsm);
493	return rc;
494}
495
496int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
497		   obd_enqueue_update_f upcall, void *cookie,
498		   struct ptlrpc_request_set *rqset)
499{
500	struct ptlrpc_request   *req;
501	struct osc_setattr_args *sa;
502	struct ost_body	 *body;
503	int		      rc;
504
505	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
506	if (req == NULL)
507		return -ENOMEM;
508
509	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
510	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
511	if (rc) {
512		ptlrpc_request_free(req);
513		return rc;
514	}
515	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
516	ptlrpc_at_set_req_timeout(req);
517
518	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
519	LASSERT(body);
520	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
521			     oinfo->oi_oa);
522	osc_pack_capa(req, body, oinfo->oi_capa);
523
524	ptlrpc_request_set_replen(req);
525
526	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
527	CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
528	sa = ptlrpc_req_async_args(req);
529	sa->sa_oa     = oinfo->oi_oa;
530	sa->sa_upcall = upcall;
531	sa->sa_cookie = cookie;
532	if (rqset == PTLRPCD_SET)
533		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
534	else
535		ptlrpc_set_add_req(rqset, req);
536
537	return 0;
538}
539
540static int osc_punch(const struct lu_env *env, struct obd_export *exp,
541		     struct obd_info *oinfo, struct obd_trans_info *oti,
542		     struct ptlrpc_request_set *rqset)
543{
544	oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
545	oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
546	oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
547	return osc_punch_base(exp, oinfo,
548			      oinfo->oi_cb_up, oinfo, rqset);
549}
550
551static int osc_sync_interpret(const struct lu_env *env,
552			      struct ptlrpc_request *req,
553			      void *arg, int rc)
554{
555	struct osc_fsync_args *fa = arg;
556	struct ost_body *body;
557
558	if (rc)
559		GOTO(out, rc);
560
561	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
562	if (body == NULL) {
563		CERROR ("can't unpack ost_body\n");
564		GOTO(out, rc = -EPROTO);
565	}
566
567	*fa->fa_oi->oi_oa = body->oa;
568out:
569	rc = fa->fa_upcall(fa->fa_cookie, rc);
570	return rc;
571}
572
573int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
574		  obd_enqueue_update_f upcall, void *cookie,
575		  struct ptlrpc_request_set *rqset)
576{
577	struct ptlrpc_request *req;
578	struct ost_body       *body;
579	struct osc_fsync_args *fa;
580	int		    rc;
581
582	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
583	if (req == NULL)
584		return -ENOMEM;
585
586	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
587	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
588	if (rc) {
589		ptlrpc_request_free(req);
590		return rc;
591	}
592
593	/* overload the size and blocks fields in the oa with start/end */
594	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
595	LASSERT(body);
596	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
597			     oinfo->oi_oa);
598	osc_pack_capa(req, body, oinfo->oi_capa);
599
600	ptlrpc_request_set_replen(req);
601	req->rq_interpret_reply = osc_sync_interpret;
602
603	CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
604	fa = ptlrpc_req_async_args(req);
605	fa->fa_oi = oinfo;
606	fa->fa_upcall = upcall;
607	fa->fa_cookie = cookie;
608
609	if (rqset == PTLRPCD_SET)
610		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
611	else
612		ptlrpc_set_add_req(rqset, req);
613
614	return 0;
615}
616
617static int osc_sync(const struct lu_env *env, struct obd_export *exp,
618		    struct obd_info *oinfo, u64 start, u64 end,
619		    struct ptlrpc_request_set *set)
620{
621	if (!oinfo->oi_oa) {
622		CDEBUG(D_INFO, "oa NULL\n");
623		return -EINVAL;
624	}
625
626	oinfo->oi_oa->o_size = start;
627	oinfo->oi_oa->o_blocks = end;
628	oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
629
630	return osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set);
631}
632
633/* Find and cancel locally locks matched by @mode in the resource found by
634 * @objid. Found locks are added into @cancel list. Returns the amount of
635 * locks added to @cancels list. */
636static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
637				   struct list_head *cancels,
638				   ldlm_mode_t mode, __u64 lock_flags)
639{
640	struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
641	struct ldlm_res_id res_id;
642	struct ldlm_resource *res;
643	int count;
644
645	/* Return, i.e. cancel nothing, only if ELC is supported (flag in
646	 * export) but disabled through procfs (flag in NS).
647	 *
648	 * This distinguishes from a case when ELC is not supported originally,
649	 * when we still want to cancel locks in advance and just cancel them
650	 * locally, without sending any RPC. */
651	if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
652		return 0;
653
654	ostid_build_res_name(&oa->o_oi, &res_id);
655	res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
656	if (res == NULL)
657		return 0;
658
659	LDLM_RESOURCE_ADDREF(res);
660	count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
661					   lock_flags, 0, NULL);
662	LDLM_RESOURCE_DELREF(res);
663	ldlm_resource_putref(res);
664	return count;
665}
666
667static int osc_destroy_interpret(const struct lu_env *env,
668				 struct ptlrpc_request *req, void *data,
669				 int rc)
670{
671	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
672
673	atomic_dec(&cli->cl_destroy_in_flight);
674	wake_up(&cli->cl_destroy_waitq);
675	return 0;
676}
677
678static int osc_can_send_destroy(struct client_obd *cli)
679{
680	if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
681	    cli->cl_max_rpcs_in_flight) {
682		/* The destroy request can be sent */
683		return 1;
684	}
685	if (atomic_dec_return(&cli->cl_destroy_in_flight) <
686	    cli->cl_max_rpcs_in_flight) {
687		/*
688		 * The counter has been modified between the two atomic
689		 * operations.
690		 */
691		wake_up(&cli->cl_destroy_waitq);
692	}
693	return 0;
694}
695
696int osc_create(const struct lu_env *env, struct obd_export *exp,
697	       struct obdo *oa, struct lov_stripe_md **ea,
698	       struct obd_trans_info *oti)
699{
700	int rc = 0;
701
702	LASSERT(oa);
703	LASSERT(ea);
704	LASSERT(oa->o_valid & OBD_MD_FLGROUP);
705
706	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
707	    oa->o_flags == OBD_FL_RECREATE_OBJS) {
708		return osc_real_create(exp, oa, ea, oti);
709	}
710
711	if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
712		return osc_real_create(exp, oa, ea, oti);
713
714	/* we should not get here anymore */
715	LBUG();
716
717	return rc;
718}
719
720/* Destroy requests can be async always on the client, and we don't even really
721 * care about the return code since the client cannot do anything at all about
722 * a destroy failure.
723 * When the MDS is unlinking a filename, it saves the file objects into a
724 * recovery llog, and these object records are cancelled when the OST reports
725 * they were destroyed and sync'd to disk (i.e. transaction committed).
726 * If the client dies, or the OST is down when the object should be destroyed,
727 * the records are not cancelled, and when the OST reconnects to the MDS next,
728 * it will retrieve the llog unlink logs and then sends the log cancellation
729 * cookies to the MDS after committing destroy transactions. */
730static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
731		       struct obdo *oa, struct lov_stripe_md *ea,
732		       struct obd_trans_info *oti, struct obd_export *md_export,
733		       void *capa)
734{
735	struct client_obd     *cli = &exp->exp_obd->u.cli;
736	struct ptlrpc_request *req;
737	struct ost_body       *body;
738	LIST_HEAD(cancels);
739	int rc, count;
740
741	if (!oa) {
742		CDEBUG(D_INFO, "oa NULL\n");
743		return -EINVAL;
744	}
745
746	count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
747					LDLM_FL_DISCARD_DATA);
748
749	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
750	if (req == NULL) {
751		ldlm_lock_list_put(&cancels, l_bl_ast, count);
752		return -ENOMEM;
753	}
754
755	osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
756	rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
757			       0, &cancels, count);
758	if (rc) {
759		ptlrpc_request_free(req);
760		return rc;
761	}
762
763	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
764	ptlrpc_at_set_req_timeout(req);
765
766	if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
767		oa->o_lcookie = *oti->oti_logcookies;
768	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
769	LASSERT(body);
770	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
771
772	osc_pack_capa(req, body, (struct obd_capa *)capa);
773	ptlrpc_request_set_replen(req);
774
775	/* If osc_destroy is for destroying the unlink orphan,
776	 * sent from MDT to OST, which should not be blocked here,
777	 * because the process might be triggered by ptlrpcd, and
778	 * it is not good to block ptlrpcd thread (b=16006)*/
779	if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
780		req->rq_interpret_reply = osc_destroy_interpret;
781		if (!osc_can_send_destroy(cli)) {
782			struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
783							  NULL);
784
785			/*
786			 * Wait until the number of on-going destroy RPCs drops
787			 * under max_rpc_in_flight
788			 */
789			l_wait_event_exclusive(cli->cl_destroy_waitq,
790					       osc_can_send_destroy(cli), &lwi);
791		}
792	}
793
794	/* Do not wait for response */
795	ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
796	return 0;
797}
798
799static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
800				long writing_bytes)
801{
802	u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
803
804	LASSERT(!(oa->o_valid & bits));
805
806	oa->o_valid |= bits;
807	client_obd_list_lock(&cli->cl_loi_list_lock);
808	oa->o_dirty = cli->cl_dirty;
809	if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
810		     cli->cl_dirty_max)) {
811		CERROR("dirty %lu - %lu > dirty_max %lu\n",
812		       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
813		oa->o_undirty = 0;
814	} else if (unlikely(atomic_read(&obd_dirty_pages) -
815			    atomic_read(&obd_dirty_transit_pages) >
816			    (long)(obd_max_dirty_pages + 1))) {
817		/* The atomic_read() allowing the atomic_inc() are
818		 * not covered by a lock thus they may safely race and trip
819		 * this CERROR() unless we add in a small fudge factor (+1). */
820		CERROR("dirty %d - %d > system dirty_max %d\n",
821		       atomic_read(&obd_dirty_pages),
822		       atomic_read(&obd_dirty_transit_pages),
823		       obd_max_dirty_pages);
824		oa->o_undirty = 0;
825	} else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
826		CERROR("dirty %lu - dirty_max %lu too big???\n",
827		       cli->cl_dirty, cli->cl_dirty_max);
828		oa->o_undirty = 0;
829	} else {
830		long max_in_flight = (cli->cl_max_pages_per_rpc <<
831				      PAGE_CACHE_SHIFT)*
832				     (cli->cl_max_rpcs_in_flight + 1);
833		oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
834	}
835	oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
836	oa->o_dropped = cli->cl_lost_grant;
837	cli->cl_lost_grant = 0;
838	client_obd_list_unlock(&cli->cl_loi_list_lock);
839	CDEBUG(D_CACHE,"dirty: %llu undirty: %u dropped %u grant: %llu\n",
840	       oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
841
842}
843
844void osc_update_next_shrink(struct client_obd *cli)
845{
846	cli->cl_next_shrink_grant =
847		cfs_time_shift(cli->cl_grant_shrink_interval);
848	CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
849	       cli->cl_next_shrink_grant);
850}
851
852static void __osc_update_grant(struct client_obd *cli, u64 grant)
853{
854	client_obd_list_lock(&cli->cl_loi_list_lock);
855	cli->cl_avail_grant += grant;
856	client_obd_list_unlock(&cli->cl_loi_list_lock);
857}
858
859static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
860{
861	if (body->oa.o_valid & OBD_MD_FLGRANT) {
862		CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
863		__osc_update_grant(cli, body->oa.o_grant);
864	}
865}
866
867static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
868			      u32 keylen, void *key, u32 vallen,
869			      void *val, struct ptlrpc_request_set *set);
870
871static int osc_shrink_grant_interpret(const struct lu_env *env,
872				      struct ptlrpc_request *req,
873				      void *aa, int rc)
874{
875	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
876	struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
877	struct ost_body *body;
878
879	if (rc != 0) {
880		__osc_update_grant(cli, oa->o_grant);
881		GOTO(out, rc);
882	}
883
884	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
885	LASSERT(body);
886	osc_update_grant(cli, body);
887out:
888	OBDO_FREE(oa);
889	return rc;
890}
891
892static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
893{
894	client_obd_list_lock(&cli->cl_loi_list_lock);
895	oa->o_grant = cli->cl_avail_grant / 4;
896	cli->cl_avail_grant -= oa->o_grant;
897	client_obd_list_unlock(&cli->cl_loi_list_lock);
898	if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
899		oa->o_valid |= OBD_MD_FLFLAGS;
900		oa->o_flags = 0;
901	}
902	oa->o_flags |= OBD_FL_SHRINK_GRANT;
903	osc_update_next_shrink(cli);
904}
905
906/* Shrink the current grant, either from some large amount to enough for a
907 * full set of in-flight RPCs, or if we have already shrunk to that limit
908 * then to enough for a single RPC.  This avoids keeping more grant than
909 * needed, and avoids shrinking the grant piecemeal. */
910static int osc_shrink_grant(struct client_obd *cli)
911{
912	__u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
913			     (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
914
915	client_obd_list_lock(&cli->cl_loi_list_lock);
916	if (cli->cl_avail_grant <= target_bytes)
917		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
918	client_obd_list_unlock(&cli->cl_loi_list_lock);
919
920	return osc_shrink_grant_to_target(cli, target_bytes);
921}
922
923int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
924{
925	int			rc = 0;
926	struct ost_body	*body;
927
928	client_obd_list_lock(&cli->cl_loi_list_lock);
929	/* Don't shrink if we are already above or below the desired limit
930	 * We don't want to shrink below a single RPC, as that will negatively
931	 * impact block allocation and long-term performance. */
932	if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
933		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
934
935	if (target_bytes >= cli->cl_avail_grant) {
936		client_obd_list_unlock(&cli->cl_loi_list_lock);
937		return 0;
938	}
939	client_obd_list_unlock(&cli->cl_loi_list_lock);
940
941	OBD_ALLOC_PTR(body);
942	if (!body)
943		return -ENOMEM;
944
945	osc_announce_cached(cli, &body->oa, 0);
946
947	client_obd_list_lock(&cli->cl_loi_list_lock);
948	body->oa.o_grant = cli->cl_avail_grant - target_bytes;
949	cli->cl_avail_grant = target_bytes;
950	client_obd_list_unlock(&cli->cl_loi_list_lock);
951	if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
952		body->oa.o_valid |= OBD_MD_FLFLAGS;
953		body->oa.o_flags = 0;
954	}
955	body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
956	osc_update_next_shrink(cli);
957
958	rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
959				sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
960				sizeof(*body), body, NULL);
961	if (rc != 0)
962		__osc_update_grant(cli, body->oa.o_grant);
963	OBD_FREE_PTR(body);
964	return rc;
965}
966
967static int osc_should_shrink_grant(struct client_obd *client)
968{
969	unsigned long time = cfs_time_current();
970	unsigned long next_shrink = client->cl_next_shrink_grant;
971
972	if ((client->cl_import->imp_connect_data.ocd_connect_flags &
973	     OBD_CONNECT_GRANT_SHRINK) == 0)
974		return 0;
975
976	if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
977		/* Get the current RPC size directly, instead of going via:
978		 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
979		 * Keep comment here so that it can be found by searching. */
980		int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
981
982		if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
983		    client->cl_avail_grant > brw_size)
984			return 1;
985		else
986			osc_update_next_shrink(client);
987	}
988	return 0;
989}
990
991static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
992{
993	struct client_obd *client;
994
995	list_for_each_entry(client, &item->ti_obd_list,
996				cl_grant_shrink_list) {
997		if (osc_should_shrink_grant(client))
998			osc_shrink_grant(client);
999	}
1000	return 0;
1001}
1002
1003static int osc_add_shrink_grant(struct client_obd *client)
1004{
1005	int rc;
1006
1007	rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1008				       TIMEOUT_GRANT,
1009				       osc_grant_shrink_grant_cb, NULL,
1010				       &client->cl_grant_shrink_list);
1011	if (rc) {
1012		CERROR("add grant client %s error %d\n",
1013			client->cl_import->imp_obd->obd_name, rc);
1014		return rc;
1015	}
1016	CDEBUG(D_CACHE, "add grant client %s \n",
1017	       client->cl_import->imp_obd->obd_name);
1018	osc_update_next_shrink(client);
1019	return 0;
1020}
1021
1022static int osc_del_shrink_grant(struct client_obd *client)
1023{
1024	return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1025					 TIMEOUT_GRANT);
1026}
1027
1028static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1029{
1030	/*
1031	 * ocd_grant is the total grant amount we're expect to hold: if we've
1032	 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1033	 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1034	 *
1035	 * race is tolerable here: if we're evicted, but imp_state already
1036	 * left EVICTED state, then cl_dirty must be 0 already.
1037	 */
1038	client_obd_list_lock(&cli->cl_loi_list_lock);
1039	if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1040		cli->cl_avail_grant = ocd->ocd_grant;
1041	else
1042		cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1043
1044	if (cli->cl_avail_grant < 0) {
1045		CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046		      cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1047		      ocd->ocd_grant, cli->cl_dirty);
1048		/* workaround for servers which do not have the patch from
1049		 * LU-2679 */
1050		cli->cl_avail_grant = ocd->ocd_grant;
1051	}
1052
1053	/* determine the appropriate chunk size used by osc_extent. */
1054	cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1055	client_obd_list_unlock(&cli->cl_loi_list_lock);
1056
1057	CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1058		"chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1059		cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1060
1061	if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1062	    list_empty(&cli->cl_grant_shrink_list))
1063		osc_add_shrink_grant(cli);
1064}
1065
1066/* We assume that the reason this OSC got a short read is because it read
1067 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1068 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1069 * this stripe never got written at or beyond this stripe offset yet. */
1070static void handle_short_read(int nob_read, u32 page_count,
1071			      struct brw_page **pga)
1072{
1073	char *ptr;
1074	int i = 0;
1075
1076	/* skip bytes read OK */
1077	while (nob_read > 0) {
1078		LASSERT (page_count > 0);
1079
1080		if (pga[i]->count > nob_read) {
1081			/* EOF inside this page */
1082			ptr = kmap(pga[i]->pg) +
1083				(pga[i]->off & ~CFS_PAGE_MASK);
1084			memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1085			kunmap(pga[i]->pg);
1086			page_count--;
1087			i++;
1088			break;
1089		}
1090
1091		nob_read -= pga[i]->count;
1092		page_count--;
1093		i++;
1094	}
1095
1096	/* zero remaining pages */
1097	while (page_count-- > 0) {
1098		ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1099		memset(ptr, 0, pga[i]->count);
1100		kunmap(pga[i]->pg);
1101		i++;
1102	}
1103}
1104
1105static int check_write_rcs(struct ptlrpc_request *req,
1106			   int requested_nob, int niocount,
1107			   u32 page_count, struct brw_page **pga)
1108{
1109	int     i;
1110	__u32   *remote_rcs;
1111
1112	remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1113						  sizeof(*remote_rcs) *
1114						  niocount);
1115	if (remote_rcs == NULL) {
1116		CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1117		return(-EPROTO);
1118	}
1119
1120	/* return error if any niobuf was in error */
1121	for (i = 0; i < niocount; i++) {
1122		if ((int)remote_rcs[i] < 0)
1123			return(remote_rcs[i]);
1124
1125		if (remote_rcs[i] != 0) {
1126			CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1127				i, remote_rcs[i], req);
1128			return(-EPROTO);
1129		}
1130	}
1131
1132	if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1133		CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1134		       req->rq_bulk->bd_nob_transferred, requested_nob);
1135		return(-EPROTO);
1136	}
1137
1138	return (0);
1139}
1140
1141static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1142{
1143	if (p1->flag != p2->flag) {
1144		unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1145				  OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1146
1147		/* warn if we try to combine flags that we don't know to be
1148		 * safe to combine */
1149		if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1150			CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1151			      "report this at http://bugs.whamcloud.com/\n",
1152			      p1->flag, p2->flag);
1153		}
1154		return 0;
1155	}
1156
1157	return (p1->off + p1->count == p2->off);
1158}
1159
1160static u32 osc_checksum_bulk(int nob, u32 pg_count,
1161				   struct brw_page **pga, int opc,
1162				   cksum_type_t cksum_type)
1163{
1164	__u32				cksum;
1165	int				i = 0;
1166	struct cfs_crypto_hash_desc	*hdesc;
1167	unsigned int			bufsize;
1168	int				err;
1169	unsigned char			cfs_alg = cksum_obd2cfs(cksum_type);
1170
1171	LASSERT(pg_count > 0);
1172
1173	hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1174	if (IS_ERR(hdesc)) {
1175		CERROR("Unable to initialize checksum hash %s\n",
1176		       cfs_crypto_hash_name(cfs_alg));
1177		return PTR_ERR(hdesc);
1178	}
1179
1180	while (nob > 0 && pg_count > 0) {
1181		int count = pga[i]->count > nob ? nob : pga[i]->count;
1182
1183		/* corrupt the data before we compute the checksum, to
1184		 * simulate an OST->client data error */
1185		if (i == 0 && opc == OST_READ &&
1186		    OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1187			unsigned char *ptr = kmap(pga[i]->pg);
1188			int off = pga[i]->off & ~CFS_PAGE_MASK;
1189			memcpy(ptr + off, "bad1", min(4, nob));
1190			kunmap(pga[i]->pg);
1191		}
1192		cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1193				  pga[i]->off & ~CFS_PAGE_MASK,
1194				  count);
1195		CDEBUG(D_PAGE,
1196		       "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1197		       pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1198		       (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1199		       page_private(pga[i]->pg),
1200		       (int)(pga[i]->off & ~CFS_PAGE_MASK));
1201
1202		nob -= pga[i]->count;
1203		pg_count--;
1204		i++;
1205	}
1206
1207	bufsize = 4;
1208	err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1209
1210	if (err)
1211		cfs_crypto_hash_final(hdesc, NULL, NULL);
1212
1213	/* For sending we only compute the wrong checksum instead
1214	 * of corrupting the data so it is still correct on a redo */
1215	if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216		cksum++;
1217
1218	return cksum;
1219}
1220
1221static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222				struct lov_stripe_md *lsm, u32 page_count,
1223				struct brw_page **pga,
1224				struct ptlrpc_request **reqp,
1225				struct obd_capa *ocapa, int reserve,
1226				int resend)
1227{
1228	struct ptlrpc_request   *req;
1229	struct ptlrpc_bulk_desc *desc;
1230	struct ost_body	 *body;
1231	struct obd_ioobj	*ioobj;
1232	struct niobuf_remote    *niobuf;
1233	int niocount, i, requested_nob, opc, rc;
1234	struct osc_brw_async_args *aa;
1235	struct req_capsule      *pill;
1236	struct brw_page *pg_prev;
1237
1238	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1239		return -ENOMEM; /* Recoverable */
1240	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1241		return -EINVAL; /* Fatal */
1242
1243	if ((cmd & OBD_BRW_WRITE) != 0) {
1244		opc = OST_WRITE;
1245		req = ptlrpc_request_alloc_pool(cli->cl_import,
1246						cli->cl_import->imp_rq_pool,
1247						&RQF_OST_BRW_WRITE);
1248	} else {
1249		opc = OST_READ;
1250		req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1251	}
1252	if (req == NULL)
1253		return -ENOMEM;
1254
1255	for (niocount = i = 1; i < page_count; i++) {
1256		if (!can_merge_pages(pga[i - 1], pga[i]))
1257			niocount++;
1258	}
1259
1260	pill = &req->rq_pill;
1261	req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1262			     sizeof(*ioobj));
1263	req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1264			     niocount * sizeof(*niobuf));
1265	osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1266
1267	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1268	if (rc) {
1269		ptlrpc_request_free(req);
1270		return rc;
1271	}
1272	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1273	ptlrpc_at_set_req_timeout(req);
1274	/* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1275	 * retry logic */
1276	req->rq_no_retry_einprogress = 1;
1277
1278	desc = ptlrpc_prep_bulk_imp(req, page_count,
1279		cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1280		opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1281		OST_BULK_PORTAL);
1282
1283	if (desc == NULL)
1284		GOTO(out, rc = -ENOMEM);
1285	/* NB request now owns desc and will free it when it gets freed */
1286
1287	body = req_capsule_client_get(pill, &RMF_OST_BODY);
1288	ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1289	niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1290	LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1291
1292	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1293
1294	obdo_to_ioobj(oa, ioobj);
1295	ioobj->ioo_bufcnt = niocount;
1296	/* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1297	 * that might be send for this request.  The actual number is decided
1298	 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1299	 * "max - 1" for old client compatibility sending "0", and also so the
1300	 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1301	ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1302	osc_pack_capa(req, body, ocapa);
1303	LASSERT(page_count > 0);
1304	pg_prev = pga[0];
1305	for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1306		struct brw_page *pg = pga[i];
1307		int poff = pg->off & ~CFS_PAGE_MASK;
1308
1309		LASSERT(pg->count > 0);
1310		/* make sure there is no gap in the middle of page array */
1311		LASSERTF(page_count == 1 ||
1312			 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1313			  ergo(i > 0 && i < page_count - 1,
1314			       poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1315			  ergo(i == page_count - 1, poff == 0)),
1316			 "i: %d/%d pg: %p off: %llu, count: %u\n",
1317			 i, page_count, pg, pg->off, pg->count);
1318		LASSERTF(i == 0 || pg->off > pg_prev->off,
1319			 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1320			 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1321			 i, page_count,
1322			 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1323			 pg_prev->pg, page_private(pg_prev->pg),
1324			 pg_prev->pg->index, pg_prev->off);
1325		LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1326			(pg->flag & OBD_BRW_SRVLOCK));
1327
1328		ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1329		requested_nob += pg->count;
1330
1331		if (i > 0 && can_merge_pages(pg_prev, pg)) {
1332			niobuf--;
1333			niobuf->len += pg->count;
1334		} else {
1335			niobuf->offset = pg->off;
1336			niobuf->len    = pg->count;
1337			niobuf->flags  = pg->flag;
1338		}
1339		pg_prev = pg;
1340	}
1341
1342	LASSERTF((void *)(niobuf - niocount) ==
1343		req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1344		"want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1345		&RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1346
1347	osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1348	if (resend) {
1349		if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1350			body->oa.o_valid |= OBD_MD_FLFLAGS;
1351			body->oa.o_flags = 0;
1352		}
1353		body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1354	}
1355
1356	if (osc_should_shrink_grant(cli))
1357		osc_shrink_grant_local(cli, &body->oa);
1358
1359	/* size[REQ_REC_OFF] still sizeof (*body) */
1360	if (opc == OST_WRITE) {
1361		if (cli->cl_checksum &&
1362		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1363			/* store cl_cksum_type in a local variable since
1364			 * it can be changed via lprocfs */
1365			cksum_type_t cksum_type = cli->cl_cksum_type;
1366
1367			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1368				oa->o_flags &= OBD_FL_LOCAL_MASK;
1369				body->oa.o_flags = 0;
1370			}
1371			body->oa.o_flags |= cksum_type_pack(cksum_type);
1372			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1373			body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1374							     page_count, pga,
1375							     OST_WRITE,
1376							     cksum_type);
1377			CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1378			       body->oa.o_cksum);
1379			/* save this in 'oa', too, for later checking */
1380			oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1381			oa->o_flags |= cksum_type_pack(cksum_type);
1382		} else {
1383			/* clear out the checksum flag, in case this is a
1384			 * resend but cl_checksum is no longer set. b=11238 */
1385			oa->o_valid &= ~OBD_MD_FLCKSUM;
1386		}
1387		oa->o_cksum = body->oa.o_cksum;
1388		/* 1 RC per niobuf */
1389		req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1390				     sizeof(__u32) * niocount);
1391	} else {
1392		if (cli->cl_checksum &&
1393		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1394			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1395				body->oa.o_flags = 0;
1396			body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1397			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398		}
1399	}
1400	ptlrpc_request_set_replen(req);
1401
1402	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1403	aa = ptlrpc_req_async_args(req);
1404	aa->aa_oa = oa;
1405	aa->aa_requested_nob = requested_nob;
1406	aa->aa_nio_count = niocount;
1407	aa->aa_page_count = page_count;
1408	aa->aa_resends = 0;
1409	aa->aa_ppga = pga;
1410	aa->aa_cli = cli;
1411	INIT_LIST_HEAD(&aa->aa_oaps);
1412	if (ocapa && reserve)
1413		aa->aa_ocapa = capa_get(ocapa);
1414
1415	*reqp = req;
1416	return 0;
1417
1418 out:
1419	ptlrpc_req_finished(req);
1420	return rc;
1421}
1422
1423static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1424				__u32 client_cksum, __u32 server_cksum, int nob,
1425				u32 page_count, struct brw_page **pga,
1426				cksum_type_t client_cksum_type)
1427{
1428	__u32 new_cksum;
1429	char *msg;
1430	cksum_type_t cksum_type;
1431
1432	if (server_cksum == client_cksum) {
1433		CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1434		return 0;
1435	}
1436
1437	cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1438				       oa->o_flags : 0);
1439	new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1440				      cksum_type);
1441
1442	if (cksum_type != client_cksum_type)
1443		msg = "the server did not use the checksum type specified in "
1444		      "the original request - likely a protocol problem";
1445	else if (new_cksum == server_cksum)
1446		msg = "changed on the client after we checksummed it - "
1447		      "likely false positive due to mmap IO (bug 11742)";
1448	else if (new_cksum == client_cksum)
1449		msg = "changed in transit before arrival at OST";
1450	else
1451		msg = "changed in transit AND doesn't match the original - "
1452		      "likely false positive due to mmap IO (bug 11742)";
1453
1454	LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1455			   " object "DOSTID" extent [%llu-%llu]\n",
1456			   msg, libcfs_nid2str(peer->nid),
1457			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1458			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1459			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1460			   POSTID(&oa->o_oi), pga[0]->off,
1461			   pga[page_count-1]->off + pga[page_count-1]->count - 1);
1462	CERROR("original client csum %x (type %x), server csum %x (type %x), "
1463	       "client csum now %x\n", client_cksum, client_cksum_type,
1464	       server_cksum, cksum_type, new_cksum);
1465	return 1;
1466}
1467
1468/* Note rc enters this function as number of bytes transferred */
1469static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1470{
1471	struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1472	const lnet_process_id_t *peer =
1473			&req->rq_import->imp_connection->c_peer;
1474	struct client_obd *cli = aa->aa_cli;
1475	struct ost_body *body;
1476	__u32 client_cksum = 0;
1477
1478	if (rc < 0 && rc != -EDQUOT) {
1479		DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1480		return rc;
1481	}
1482
1483	LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1484	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1485	if (body == NULL) {
1486		DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1487		return -EPROTO;
1488	}
1489
1490	/* set/clear over quota flag for a uid/gid */
1491	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1492	    body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1493		unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1494
1495		CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1496		       body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1497		       body->oa.o_flags);
1498		osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1499	}
1500
1501	osc_update_grant(cli, body);
1502
1503	if (rc < 0)
1504		return rc;
1505
1506	if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1507		client_cksum = aa->aa_oa->o_cksum; /* save for later */
1508
1509	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1510		if (rc > 0) {
1511			CERROR("Unexpected +ve rc %d\n", rc);
1512			return -EPROTO;
1513		}
1514		LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1515
1516		if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517			return -EAGAIN;
1518
1519		if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520		    check_write_checksum(&body->oa, peer, client_cksum,
1521					 body->oa.o_cksum, aa->aa_requested_nob,
1522					 aa->aa_page_count, aa->aa_ppga,
1523					 cksum_type_unpack(aa->aa_oa->o_flags)))
1524			return -EAGAIN;
1525
1526		rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1527				     aa->aa_page_count, aa->aa_ppga);
1528		GOTO(out, rc);
1529	}
1530
1531	/* The rest of this function executes only for OST_READs */
1532
1533	/* if unwrap_bulk failed, return -EAGAIN to retry */
1534	rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1535	if (rc < 0)
1536		GOTO(out, rc = -EAGAIN);
1537
1538	if (rc > aa->aa_requested_nob) {
1539		CERROR("Unexpected rc %d (%d requested)\n", rc,
1540		       aa->aa_requested_nob);
1541		return -EPROTO;
1542	}
1543
1544	if (rc != req->rq_bulk->bd_nob_transferred) {
1545		CERROR ("Unexpected rc %d (%d transferred)\n",
1546			rc, req->rq_bulk->bd_nob_transferred);
1547		return (-EPROTO);
1548	}
1549
1550	if (rc < aa->aa_requested_nob)
1551		handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1552
1553	if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1554		static int cksum_counter;
1555		__u32      server_cksum = body->oa.o_cksum;
1556		char      *via;
1557		char      *router;
1558		cksum_type_t cksum_type;
1559
1560		cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1561					       body->oa.o_flags : 0);
1562		client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1563						 aa->aa_ppga, OST_READ,
1564						 cksum_type);
1565
1566		if (peer->nid == req->rq_bulk->bd_sender) {
1567			via = router = "";
1568		} else {
1569			via = " via ";
1570			router = libcfs_nid2str(req->rq_bulk->bd_sender);
1571		}
1572
1573		if (server_cksum != client_cksum) {
1574			LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1575					   "%s%s%s inode "DFID" object "DOSTID
1576					   " extent [%llu-%llu]\n",
1577					   req->rq_import->imp_obd->obd_name,
1578					   libcfs_nid2str(peer->nid),
1579					   via, router,
1580					   body->oa.o_valid & OBD_MD_FLFID ?
1581						body->oa.o_parent_seq : (__u64)0,
1582					   body->oa.o_valid & OBD_MD_FLFID ?
1583						body->oa.o_parent_oid : 0,
1584					   body->oa.o_valid & OBD_MD_FLFID ?
1585						body->oa.o_parent_ver : 0,
1586					   POSTID(&body->oa.o_oi),
1587					   aa->aa_ppga[0]->off,
1588					   aa->aa_ppga[aa->aa_page_count-1]->off +
1589					   aa->aa_ppga[aa->aa_page_count-1]->count -
1590									1);
1591			CERROR("client %x, server %x, cksum_type %x\n",
1592			       client_cksum, server_cksum, cksum_type);
1593			cksum_counter = 0;
1594			aa->aa_oa->o_cksum = client_cksum;
1595			rc = -EAGAIN;
1596		} else {
1597			cksum_counter++;
1598			CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1599			rc = 0;
1600		}
1601	} else if (unlikely(client_cksum)) {
1602		static int cksum_missed;
1603
1604		cksum_missed++;
1605		if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1606			CERROR("Checksum %u requested from %s but not sent\n",
1607			       cksum_missed, libcfs_nid2str(peer->nid));
1608	} else {
1609		rc = 0;
1610	}
1611out:
1612	if (rc >= 0)
1613		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1614				     aa->aa_oa, &body->oa);
1615
1616	return rc;
1617}
1618
1619static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1620			    struct lov_stripe_md *lsm,
1621			    u32 page_count, struct brw_page **pga,
1622			    struct obd_capa *ocapa)
1623{
1624	struct ptlrpc_request *req;
1625	int		    rc;
1626	wait_queue_head_t	    waitq;
1627	int		    generation, resends = 0;
1628	struct l_wait_info     lwi;
1629
1630	init_waitqueue_head(&waitq);
1631	generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1632
1633restart_bulk:
1634	rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1635				  page_count, pga, &req, ocapa, 0, resends);
1636	if (rc != 0)
1637		return (rc);
1638
1639	if (resends) {
1640		req->rq_generation_set = 1;
1641		req->rq_import_generation = generation;
1642		req->rq_sent = get_seconds() + resends;
1643	}
1644
1645	rc = ptlrpc_queue_wait(req);
1646
1647	if (rc == -ETIMEDOUT && req->rq_resend) {
1648		DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1649		ptlrpc_req_finished(req);
1650		goto restart_bulk;
1651	}
1652
1653	rc = osc_brw_fini_request(req, rc);
1654
1655	ptlrpc_req_finished(req);
1656	/* When server return -EINPROGRESS, client should always retry
1657	 * regardless of the number of times the bulk was resent already.*/
1658	if (osc_recoverable_error(rc)) {
1659		resends++;
1660		if (rc != -EINPROGRESS &&
1661		    !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1662			CERROR("%s: too many resend retries for object: "
1663			       ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1664			       POSTID(&oa->o_oi), rc);
1665			goto out;
1666		}
1667		if (generation !=
1668		    exp->exp_obd->u.cli.cl_import->imp_generation) {
1669			CDEBUG(D_HA, "%s: resend cross eviction for object: "
1670			       ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1671			       POSTID(&oa->o_oi), rc);
1672			goto out;
1673		}
1674
1675		lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1676				       NULL);
1677		l_wait_event(waitq, 0, &lwi);
1678
1679		goto restart_bulk;
1680	}
1681out:
1682	if (rc == -EAGAIN || rc == -EINPROGRESS)
1683		rc = -EIO;
1684	return rc;
1685}
1686
1687static int osc_brw_redo_request(struct ptlrpc_request *request,
1688				struct osc_brw_async_args *aa, int rc)
1689{
1690	struct ptlrpc_request *new_req;
1691	struct osc_brw_async_args *new_aa;
1692	struct osc_async_page *oap;
1693
1694	DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1695		  "redo for recoverable error %d", rc);
1696
1697	rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1698					OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1699				  aa->aa_cli, aa->aa_oa,
1700				  NULL /* lsm unused by osc currently */,
1701				  aa->aa_page_count, aa->aa_ppga,
1702				  &new_req, aa->aa_ocapa, 0, 1);
1703	if (rc)
1704		return rc;
1705
1706	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1707		if (oap->oap_request != NULL) {
1708			LASSERTF(request == oap->oap_request,
1709				 "request %p != oap_request %p\n",
1710				 request, oap->oap_request);
1711			if (oap->oap_interrupted) {
1712				ptlrpc_req_finished(new_req);
1713				return -EINTR;
1714			}
1715		}
1716	}
1717	/* New request takes over pga and oaps from old request.
1718	 * Note that copying a list_head doesn't work, need to move it... */
1719	aa->aa_resends++;
1720	new_req->rq_interpret_reply = request->rq_interpret_reply;
1721	new_req->rq_async_args = request->rq_async_args;
1722	/* cap resend delay to the current request timeout, this is similar to
1723	 * what ptlrpc does (see after_reply()) */
1724	if (aa->aa_resends > new_req->rq_timeout)
1725		new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1726	else
1727		new_req->rq_sent = get_seconds() + aa->aa_resends;
1728	new_req->rq_generation_set = 1;
1729	new_req->rq_import_generation = request->rq_import_generation;
1730
1731	new_aa = ptlrpc_req_async_args(new_req);
1732
1733	INIT_LIST_HEAD(&new_aa->aa_oaps);
1734	list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1735	INIT_LIST_HEAD(&new_aa->aa_exts);
1736	list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1737	new_aa->aa_resends = aa->aa_resends;
1738
1739	list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1740		if (oap->oap_request) {
1741			ptlrpc_req_finished(oap->oap_request);
1742			oap->oap_request = ptlrpc_request_addref(new_req);
1743		}
1744	}
1745
1746	new_aa->aa_ocapa = aa->aa_ocapa;
1747	aa->aa_ocapa = NULL;
1748
1749	/* XXX: This code will run into problem if we're going to support
1750	 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1751	 * and wait for all of them to be finished. We should inherit request
1752	 * set from old request. */
1753	ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1754
1755	DEBUG_REQ(D_INFO, new_req, "new request");
1756	return 0;
1757}
1758
1759/*
1760 * ugh, we want disk allocation on the target to happen in offset order.  we'll
1761 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1762 * fine for our small page arrays and doesn't require allocation.  its an
1763 * insertion sort that swaps elements that are strides apart, shrinking the
1764 * stride down until its '1' and the array is sorted.
1765 */
1766static void sort_brw_pages(struct brw_page **array, int num)
1767{
1768	int stride, i, j;
1769	struct brw_page *tmp;
1770
1771	if (num == 1)
1772		return;
1773	for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1774		;
1775
1776	do {
1777		stride /= 3;
1778		for (i = stride ; i < num ; i++) {
1779			tmp = array[i];
1780			j = i;
1781			while (j >= stride && array[j - stride]->off > tmp->off) {
1782				array[j] = array[j - stride];
1783				j -= stride;
1784			}
1785			array[j] = tmp;
1786		}
1787	} while (stride > 1);
1788}
1789
1790static u32 max_unfragmented_pages(struct brw_page **pg, u32 pages)
1791{
1792	int count = 1;
1793	int offset;
1794	int i = 0;
1795
1796	LASSERT (pages > 0);
1797	offset = pg[i]->off & ~CFS_PAGE_MASK;
1798
1799	for (;;) {
1800		pages--;
1801		if (pages == 0)	 /* that's all */
1802			return count;
1803
1804		if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1805			return count;   /* doesn't end on page boundary */
1806
1807		i++;
1808		offset = pg[i]->off & ~CFS_PAGE_MASK;
1809		if (offset != 0)	/* doesn't start on page boundary */
1810			return count;
1811
1812		count++;
1813	}
1814}
1815
1816static struct brw_page **osc_build_ppga(struct brw_page *pga, u32 count)
1817{
1818	struct brw_page **ppga;
1819	int i;
1820
1821	OBD_ALLOC(ppga, sizeof(*ppga) * count);
1822	if (ppga == NULL)
1823		return NULL;
1824
1825	for (i = 0; i < count; i++)
1826		ppga[i] = pga + i;
1827	return ppga;
1828}
1829
1830static void osc_release_ppga(struct brw_page **ppga, u32 count)
1831{
1832	LASSERT(ppga != NULL);
1833	OBD_FREE(ppga, sizeof(*ppga) * count);
1834}
1835
1836static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1837		   u32 page_count, struct brw_page *pga,
1838		   struct obd_trans_info *oti)
1839{
1840	struct obdo *saved_oa = NULL;
1841	struct brw_page **ppga, **orig;
1842	struct obd_import *imp = class_exp2cliimp(exp);
1843	struct client_obd *cli;
1844	int rc, page_count_orig;
1845
1846	LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1847	cli = &imp->imp_obd->u.cli;
1848
1849	if (cmd & OBD_BRW_CHECK) {
1850		/* The caller just wants to know if there's a chance that this
1851		 * I/O can succeed */
1852
1853		if (imp->imp_invalid)
1854			return -EIO;
1855		return 0;
1856	}
1857
1858	/* test_brw with a failed create can trip this, maybe others. */
1859	LASSERT(cli->cl_max_pages_per_rpc);
1860
1861	rc = 0;
1862
1863	orig = ppga = osc_build_ppga(pga, page_count);
1864	if (ppga == NULL)
1865		return -ENOMEM;
1866	page_count_orig = page_count;
1867
1868	sort_brw_pages(ppga, page_count);
1869	while (page_count) {
1870		u32 pages_per_brw;
1871
1872		if (page_count > cli->cl_max_pages_per_rpc)
1873			pages_per_brw = cli->cl_max_pages_per_rpc;
1874		else
1875			pages_per_brw = page_count;
1876
1877		pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1878
1879		if (saved_oa != NULL) {
1880			/* restore previously saved oa */
1881			*oinfo->oi_oa = *saved_oa;
1882		} else if (page_count > pages_per_brw) {
1883			/* save a copy of oa (brw will clobber it) */
1884			OBDO_ALLOC(saved_oa);
1885			if (saved_oa == NULL)
1886				GOTO(out, rc = -ENOMEM);
1887			*saved_oa = *oinfo->oi_oa;
1888		}
1889
1890		rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1891				      pages_per_brw, ppga, oinfo->oi_capa);
1892
1893		if (rc != 0)
1894			break;
1895
1896		page_count -= pages_per_brw;
1897		ppga += pages_per_brw;
1898	}
1899
1900out:
1901	osc_release_ppga(orig, page_count_orig);
1902
1903	if (saved_oa != NULL)
1904		OBDO_FREE(saved_oa);
1905
1906	return rc;
1907}
1908
1909static int brw_interpret(const struct lu_env *env,
1910			 struct ptlrpc_request *req, void *data, int rc)
1911{
1912	struct osc_brw_async_args *aa = data;
1913	struct osc_extent *ext;
1914	struct osc_extent *tmp;
1915	struct cl_object  *obj = NULL;
1916	struct client_obd *cli = aa->aa_cli;
1917
1918	rc = osc_brw_fini_request(req, rc);
1919	CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1920	/* When server return -EINPROGRESS, client should always retry
1921	 * regardless of the number of times the bulk was resent already. */
1922	if (osc_recoverable_error(rc)) {
1923		if (req->rq_import_generation !=
1924		    req->rq_import->imp_generation) {
1925			CDEBUG(D_HA, "%s: resend cross eviction for object: "
1926			       ""DOSTID", rc = %d.\n",
1927			       req->rq_import->imp_obd->obd_name,
1928			       POSTID(&aa->aa_oa->o_oi), rc);
1929		} else if (rc == -EINPROGRESS ||
1930		    client_should_resend(aa->aa_resends, aa->aa_cli)) {
1931			rc = osc_brw_redo_request(req, aa, rc);
1932		} else {
1933			CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1934			       req->rq_import->imp_obd->obd_name,
1935			       POSTID(&aa->aa_oa->o_oi), rc);
1936		}
1937
1938		if (rc == 0)
1939			return 0;
1940		else if (rc == -EAGAIN || rc == -EINPROGRESS)
1941			rc = -EIO;
1942	}
1943
1944	if (aa->aa_ocapa) {
1945		capa_put(aa->aa_ocapa);
1946		aa->aa_ocapa = NULL;
1947	}
1948
1949	list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1950		if (obj == NULL && rc == 0) {
1951			obj = osc2cl(ext->oe_obj);
1952			cl_object_get(obj);
1953		}
1954
1955		list_del_init(&ext->oe_link);
1956		osc_extent_finish(env, ext, 1, rc);
1957	}
1958	LASSERT(list_empty(&aa->aa_exts));
1959	LASSERT(list_empty(&aa->aa_oaps));
1960
1961	if (obj != NULL) {
1962		struct obdo *oa = aa->aa_oa;
1963		struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1964		unsigned long valid = 0;
1965
1966		LASSERT(rc == 0);
1967		if (oa->o_valid & OBD_MD_FLBLOCKS) {
1968			attr->cat_blocks = oa->o_blocks;
1969			valid |= CAT_BLOCKS;
1970		}
1971		if (oa->o_valid & OBD_MD_FLMTIME) {
1972			attr->cat_mtime = oa->o_mtime;
1973			valid |= CAT_MTIME;
1974		}
1975		if (oa->o_valid & OBD_MD_FLATIME) {
1976			attr->cat_atime = oa->o_atime;
1977			valid |= CAT_ATIME;
1978		}
1979		if (oa->o_valid & OBD_MD_FLCTIME) {
1980			attr->cat_ctime = oa->o_ctime;
1981			valid |= CAT_CTIME;
1982		}
1983		if (valid != 0) {
1984			cl_object_attr_lock(obj);
1985			cl_object_attr_set(env, obj, attr, valid);
1986			cl_object_attr_unlock(obj);
1987		}
1988		cl_object_put(env, obj);
1989	}
1990	OBDO_FREE(aa->aa_oa);
1991
1992	cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1993			  req->rq_bulk->bd_nob_transferred);
1994	osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1995	ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1996
1997	client_obd_list_lock(&cli->cl_loi_list_lock);
1998	/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1999	 * is called so we know whether to go to sync BRWs or wait for more
2000	 * RPCs to complete */
2001	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2002		cli->cl_w_in_flight--;
2003	else
2004		cli->cl_r_in_flight--;
2005	osc_wake_cache_waiters(cli);
2006	client_obd_list_unlock(&cli->cl_loi_list_lock);
2007
2008	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2009	return rc;
2010}
2011
2012/**
2013 * Build an RPC by the list of extent @ext_list. The caller must ensure
2014 * that the total pages in this list are NOT over max pages per RPC.
2015 * Extents in the list must be in OES_RPC state.
2016 */
2017int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2018		  struct list_head *ext_list, int cmd, pdl_policy_t pol)
2019{
2020	struct ptlrpc_request		*req = NULL;
2021	struct osc_extent		*ext;
2022	struct brw_page			**pga = NULL;
2023	struct osc_brw_async_args	*aa = NULL;
2024	struct obdo			*oa = NULL;
2025	struct osc_async_page		*oap;
2026	struct osc_async_page		*tmp;
2027	struct cl_req			*clerq = NULL;
2028	enum cl_req_type		crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2029								      CRT_READ;
2030	struct ldlm_lock		*lock = NULL;
2031	struct cl_req_attr		*crattr = NULL;
2032	u64				starting_offset = OBD_OBJECT_EOF;
2033	u64				ending_offset = 0;
2034	int				mpflag = 0;
2035	int				mem_tight = 0;
2036	int				page_count = 0;
2037	int				i;
2038	int				rc;
2039	LIST_HEAD(rpc_list);
2040
2041	LASSERT(!list_empty(ext_list));
2042
2043	/* add pages into rpc_list to build BRW rpc */
2044	list_for_each_entry(ext, ext_list, oe_link) {
2045		LASSERT(ext->oe_state == OES_RPC);
2046		mem_tight |= ext->oe_memalloc;
2047		list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2048			++page_count;
2049			list_add_tail(&oap->oap_rpc_item, &rpc_list);
2050			if (starting_offset > oap->oap_obj_off)
2051				starting_offset = oap->oap_obj_off;
2052			else
2053				LASSERT(oap->oap_page_off == 0);
2054			if (ending_offset < oap->oap_obj_off + oap->oap_count)
2055				ending_offset = oap->oap_obj_off +
2056						oap->oap_count;
2057			else
2058				LASSERT(oap->oap_page_off + oap->oap_count ==
2059					PAGE_CACHE_SIZE);
2060		}
2061	}
2062
2063	if (mem_tight)
2064		mpflag = cfs_memory_pressure_get_and_set();
2065
2066	OBD_ALLOC(crattr, sizeof(*crattr));
2067	if (crattr == NULL)
2068		GOTO(out, rc = -ENOMEM);
2069
2070	OBD_ALLOC(pga, sizeof(*pga) * page_count);
2071	if (pga == NULL)
2072		GOTO(out, rc = -ENOMEM);
2073
2074	OBDO_ALLOC(oa);
2075	if (oa == NULL)
2076		GOTO(out, rc = -ENOMEM);
2077
2078	i = 0;
2079	list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2080		struct cl_page *page = oap2cl_page(oap);
2081		if (clerq == NULL) {
2082			clerq = cl_req_alloc(env, page, crt,
2083					     1 /* only 1-object rpcs for now */);
2084			if (IS_ERR(clerq))
2085				GOTO(out, rc = PTR_ERR(clerq));
2086			lock = oap->oap_ldlm_lock;
2087		}
2088		if (mem_tight)
2089			oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2090		pga[i] = &oap->oap_brw_page;
2091		pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2092		CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2093		       pga[i]->pg, page_index(oap->oap_page), oap,
2094		       pga[i]->flag);
2095		i++;
2096		cl_req_page_add(env, clerq, page);
2097	}
2098
2099	/* always get the data for the obdo for the rpc */
2100	LASSERT(clerq != NULL);
2101	crattr->cra_oa = oa;
2102	cl_req_attr_set(env, clerq, crattr, ~0ULL);
2103	if (lock) {
2104		oa->o_handle = lock->l_remote_handle;
2105		oa->o_valid |= OBD_MD_FLHANDLE;
2106	}
2107
2108	rc = cl_req_prep(env, clerq);
2109	if (rc != 0) {
2110		CERROR("cl_req_prep failed: %d\n", rc);
2111		GOTO(out, rc);
2112	}
2113
2114	sort_brw_pages(pga, page_count);
2115	rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2116			pga, &req, crattr->cra_capa, 1, 0);
2117	if (rc != 0) {
2118		CERROR("prep_req failed: %d\n", rc);
2119		GOTO(out, rc);
2120	}
2121
2122	req->rq_interpret_reply = brw_interpret;
2123
2124	if (mem_tight != 0)
2125		req->rq_memalloc = 1;
2126
2127	/* Need to update the timestamps after the request is built in case
2128	 * we race with setattr (locally or in queue at OST).  If OST gets
2129	 * later setattr before earlier BRW (as determined by the request xid),
2130	 * the OST will not use BRW timestamps.  Sadly, there is no obvious
2131	 * way to do this in a single call.  bug 10150 */
2132	cl_req_attr_set(env, clerq, crattr,
2133			OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2134
2135	lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2136
2137	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2138	aa = ptlrpc_req_async_args(req);
2139	INIT_LIST_HEAD(&aa->aa_oaps);
2140	list_splice_init(&rpc_list, &aa->aa_oaps);
2141	INIT_LIST_HEAD(&aa->aa_exts);
2142	list_splice_init(ext_list, &aa->aa_exts);
2143	aa->aa_clerq = clerq;
2144
2145	/* queued sync pages can be torn down while the pages
2146	 * were between the pending list and the rpc */
2147	tmp = NULL;
2148	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2149		/* only one oap gets a request reference */
2150		if (tmp == NULL)
2151			tmp = oap;
2152		if (oap->oap_interrupted && !req->rq_intr) {
2153			CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2154					oap, req);
2155			ptlrpc_mark_interrupted(req);
2156		}
2157	}
2158	if (tmp != NULL)
2159		tmp->oap_request = ptlrpc_request_addref(req);
2160
2161	client_obd_list_lock(&cli->cl_loi_list_lock);
2162	starting_offset >>= PAGE_CACHE_SHIFT;
2163	if (cmd == OBD_BRW_READ) {
2164		cli->cl_r_in_flight++;
2165		lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2166		lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2167		lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2168				      starting_offset + 1);
2169	} else {
2170		cli->cl_w_in_flight++;
2171		lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2172		lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2173		lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2174				      starting_offset + 1);
2175	}
2176	client_obd_list_unlock(&cli->cl_loi_list_lock);
2177
2178	DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2179		  page_count, aa, cli->cl_r_in_flight,
2180		  cli->cl_w_in_flight);
2181
2182	/* XXX: Maybe the caller can check the RPC bulk descriptor to
2183	 * see which CPU/NUMA node the majority of pages were allocated
2184	 * on, and try to assign the async RPC to the CPU core
2185	 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2186	 *
2187	 * But on the other hand, we expect that multiple ptlrpcd
2188	 * threads and the initial write sponsor can run in parallel,
2189	 * especially when data checksum is enabled, which is CPU-bound
2190	 * operation and single ptlrpcd thread cannot process in time.
2191	 * So more ptlrpcd threads sharing BRW load
2192	 * (with PDL_POLICY_ROUND) seems better.
2193	 */
2194	ptlrpcd_add_req(req, pol, -1);
2195	rc = 0;
2196
2197out:
2198	if (mem_tight != 0)
2199		cfs_memory_pressure_restore(mpflag);
2200
2201	if (crattr != NULL) {
2202		capa_put(crattr->cra_capa);
2203		OBD_FREE(crattr, sizeof(*crattr));
2204	}
2205
2206	if (rc != 0) {
2207		LASSERT(req == NULL);
2208
2209		if (oa)
2210			OBDO_FREE(oa);
2211		if (pga)
2212			OBD_FREE(pga, sizeof(*pga) * page_count);
2213		/* this should happen rarely and is pretty bad, it makes the
2214		 * pending list not follow the dirty order */
2215		while (!list_empty(ext_list)) {
2216			ext = list_entry(ext_list->next, struct osc_extent,
2217					     oe_link);
2218			list_del_init(&ext->oe_link);
2219			osc_extent_finish(env, ext, 0, rc);
2220		}
2221		if (clerq && !IS_ERR(clerq))
2222			cl_req_completion(env, clerq, rc);
2223	}
2224	return rc;
2225}
2226
2227static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2228					struct ldlm_enqueue_info *einfo)
2229{
2230	void *data = einfo->ei_cbdata;
2231	int set = 0;
2232
2233	LASSERT(lock != NULL);
2234	LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2235	LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2236	LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2237	LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2238
2239	lock_res_and_lock(lock);
2240	spin_lock(&osc_ast_guard);
2241
2242	if (lock->l_ast_data == NULL)
2243		lock->l_ast_data = data;
2244	if (lock->l_ast_data == data)
2245		set = 1;
2246
2247	spin_unlock(&osc_ast_guard);
2248	unlock_res_and_lock(lock);
2249
2250	return set;
2251}
2252
2253static int osc_set_data_with_check(struct lustre_handle *lockh,
2254				   struct ldlm_enqueue_info *einfo)
2255{
2256	struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2257	int set = 0;
2258
2259	if (lock != NULL) {
2260		set = osc_set_lock_data_with_check(lock, einfo);
2261		LDLM_LOCK_PUT(lock);
2262	} else
2263		CERROR("lockh %p, data %p - client evicted?\n",
2264		       lockh, einfo->ei_cbdata);
2265	return set;
2266}
2267
2268static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2269			     ldlm_iterator_t replace, void *data)
2270{
2271	struct ldlm_res_id res_id;
2272	struct obd_device *obd = class_exp2obd(exp);
2273
2274	ostid_build_res_name(&lsm->lsm_oi, &res_id);
2275	ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2276	return 0;
2277}
2278
2279/* find any ldlm lock of the inode in osc
2280 * return 0    not find
2281 *	1    find one
2282 *      < 0    error */
2283static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2284			   ldlm_iterator_t replace, void *data)
2285{
2286	struct ldlm_res_id res_id;
2287	struct obd_device *obd = class_exp2obd(exp);
2288	int rc = 0;
2289
2290	ostid_build_res_name(&lsm->lsm_oi, &res_id);
2291	rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2292	if (rc == LDLM_ITER_STOP)
2293		return(1);
2294	if (rc == LDLM_ITER_CONTINUE)
2295		return(0);
2296	return(rc);
2297}
2298
2299static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2300			    obd_enqueue_update_f upcall, void *cookie,
2301			    __u64 *flags, int agl, int rc)
2302{
2303	int intent = *flags & LDLM_FL_HAS_INTENT;
2304
2305	if (intent) {
2306		/* The request was created before ldlm_cli_enqueue call. */
2307		if (rc == ELDLM_LOCK_ABORTED) {
2308			struct ldlm_reply *rep;
2309			rep = req_capsule_server_get(&req->rq_pill,
2310						     &RMF_DLM_REP);
2311
2312			LASSERT(rep != NULL);
2313			rep->lock_policy_res1 =
2314				ptlrpc_status_ntoh(rep->lock_policy_res1);
2315			if (rep->lock_policy_res1)
2316				rc = rep->lock_policy_res1;
2317		}
2318	}
2319
2320	if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2321	    (rc == 0)) {
2322		*flags |= LDLM_FL_LVB_READY;
2323		CDEBUG(D_INODE,"got kms %llu blocks %llu mtime %llu\n",
2324		       lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2325	}
2326
2327	/* Call the update callback. */
2328	rc = (*upcall)(cookie, rc);
2329	return rc;
2330}
2331
2332static int osc_enqueue_interpret(const struct lu_env *env,
2333				 struct ptlrpc_request *req,
2334				 struct osc_enqueue_args *aa, int rc)
2335{
2336	struct ldlm_lock *lock;
2337	struct lustre_handle handle;
2338	__u32 mode;
2339	struct ost_lvb *lvb;
2340	__u32 lvb_len;
2341	__u64 *flags = aa->oa_flags;
2342
2343	/* Make a local copy of a lock handle and a mode, because aa->oa_*
2344	 * might be freed anytime after lock upcall has been called. */
2345	lustre_handle_copy(&handle, aa->oa_lockh);
2346	mode = aa->oa_ei->ei_mode;
2347
2348	/* ldlm_cli_enqueue is holding a reference on the lock, so it must
2349	 * be valid. */
2350	lock = ldlm_handle2lock(&handle);
2351
2352	/* Take an additional reference so that a blocking AST that
2353	 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2354	 * to arrive after an upcall has been executed by
2355	 * osc_enqueue_fini(). */
2356	ldlm_lock_addref(&handle, mode);
2357
2358	/* Let CP AST to grant the lock first. */
2359	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2360
2361	if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2362		lvb = NULL;
2363		lvb_len = 0;
2364	} else {
2365		lvb = aa->oa_lvb;
2366		lvb_len = sizeof(*aa->oa_lvb);
2367	}
2368
2369	/* Complete obtaining the lock procedure. */
2370	rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2371				   mode, flags, lvb, lvb_len, &handle, rc);
2372	/* Complete osc stuff. */
2373	rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2374			      flags, aa->oa_agl, rc);
2375
2376	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2377
2378	/* Release the lock for async request. */
2379	if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2380		/*
2381		 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2382		 * not already released by
2383		 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2384		 */
2385		ldlm_lock_decref(&handle, mode);
2386
2387	LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2388		 aa->oa_lockh, req, aa);
2389	ldlm_lock_decref(&handle, mode);
2390	LDLM_LOCK_PUT(lock);
2391	return rc;
2392}
2393
2394void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2395			struct lov_oinfo *loi, __u64 flags,
2396			struct ost_lvb *lvb, __u32 mode, int rc)
2397{
2398	struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2399
2400	if (rc == ELDLM_OK) {
2401		__u64 tmp;
2402
2403		LASSERT(lock != NULL);
2404		loi->loi_lvb = *lvb;
2405		tmp = loi->loi_lvb.lvb_size;
2406		/* Extend KMS up to the end of this lock and no further
2407		 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2408		if (tmp > lock->l_policy_data.l_extent.end)
2409			tmp = lock->l_policy_data.l_extent.end + 1;
2410		if (tmp >= loi->loi_kms) {
2411			LDLM_DEBUG(lock, "lock acquired, setting rss=%llu, kms=%llu",
2412				   loi->loi_lvb.lvb_size, tmp);
2413			loi_kms_set(loi, tmp);
2414		} else {
2415			LDLM_DEBUG(lock, "lock acquired, setting rss=%llu; leaving kms=%llu, end=%llu",
2416				   loi->loi_lvb.lvb_size, loi->loi_kms,
2417				   lock->l_policy_data.l_extent.end);
2418		}
2419		ldlm_lock_allow_match(lock);
2420	} else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2421		LASSERT(lock != NULL);
2422		loi->loi_lvb = *lvb;
2423		ldlm_lock_allow_match(lock);
2424		CDEBUG(D_INODE, "glimpsed, setting rss=%llu; leaving kms=%llu\n",
2425		       loi->loi_lvb.lvb_size, loi->loi_kms);
2426		rc = ELDLM_OK;
2427	}
2428
2429	if (lock != NULL) {
2430		if (rc != ELDLM_OK)
2431			ldlm_lock_fail_match(lock);
2432
2433		LDLM_LOCK_PUT(lock);
2434	}
2435}
2436EXPORT_SYMBOL(osc_update_enqueue);
2437
2438struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2439
2440/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2441 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2442 * other synchronous requests, however keeping some locks and trying to obtain
2443 * others may take a considerable amount of time in a case of ost failure; and
2444 * when other sync requests do not get released lock from a client, the client
2445 * is excluded from the cluster -- such scenarious make the life difficult, so
2446 * release locks just after they are obtained. */
2447int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2448		     __u64 *flags, ldlm_policy_data_t *policy,
2449		     struct ost_lvb *lvb, int kms_valid,
2450		     obd_enqueue_update_f upcall, void *cookie,
2451		     struct ldlm_enqueue_info *einfo,
2452		     struct lustre_handle *lockh,
2453		     struct ptlrpc_request_set *rqset, int async, int agl)
2454{
2455	struct obd_device *obd = exp->exp_obd;
2456	struct ptlrpc_request *req = NULL;
2457	int intent = *flags & LDLM_FL_HAS_INTENT;
2458	__u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2459	ldlm_mode_t mode;
2460	int rc;
2461
2462	/* Filesystem lock extents are extended to page boundaries so that
2463	 * dealing with the page cache is a little smoother.  */
2464	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2465	policy->l_extent.end |= ~CFS_PAGE_MASK;
2466
2467	/*
2468	 * kms is not valid when either object is completely fresh (so that no
2469	 * locks are cached), or object was evicted. In the latter case cached
2470	 * lock cannot be used, because it would prime inode state with
2471	 * potentially stale LVB.
2472	 */
2473	if (!kms_valid)
2474		goto no_match;
2475
2476	/* Next, search for already existing extent locks that will cover us */
2477	/* If we're trying to read, we also search for an existing PW lock.  The
2478	 * VFS and page cache already protect us locally, so lots of readers/
2479	 * writers can share a single PW lock.
2480	 *
2481	 * There are problems with conversion deadlocks, so instead of
2482	 * converting a read lock to a write lock, we'll just enqueue a new
2483	 * one.
2484	 *
2485	 * At some point we should cancel the read lock instead of making them
2486	 * send us a blocking callback, but there are problems with canceling
2487	 * locks out from other users right now, too. */
2488	mode = einfo->ei_mode;
2489	if (einfo->ei_mode == LCK_PR)
2490		mode |= LCK_PW;
2491	mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2492			       einfo->ei_type, policy, mode, lockh, 0);
2493	if (mode) {
2494		struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2495
2496		if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2497			/* For AGL, if enqueue RPC is sent but the lock is not
2498			 * granted, then skip to process this strpe.
2499			 * Return -ECANCELED to tell the caller. */
2500			ldlm_lock_decref(lockh, mode);
2501			LDLM_LOCK_PUT(matched);
2502			return -ECANCELED;
2503		} else if (osc_set_lock_data_with_check(matched, einfo)) {
2504			*flags |= LDLM_FL_LVB_READY;
2505			/* addref the lock only if not async requests and PW
2506			 * lock is matched whereas we asked for PR. */
2507			if (!rqset && einfo->ei_mode != mode)
2508				ldlm_lock_addref(lockh, LCK_PR);
2509			if (intent) {
2510				/* I would like to be able to ASSERT here that
2511				 * rss <= kms, but I can't, for reasons which
2512				 * are explained in lov_enqueue() */
2513			}
2514
2515			/* We already have a lock, and it's referenced.
2516			 *
2517			 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2518			 * AGL upcall may change it to CLS_HELD directly. */
2519			(*upcall)(cookie, ELDLM_OK);
2520
2521			if (einfo->ei_mode != mode)
2522				ldlm_lock_decref(lockh, LCK_PW);
2523			else if (rqset)
2524				/* For async requests, decref the lock. */
2525				ldlm_lock_decref(lockh, einfo->ei_mode);
2526			LDLM_LOCK_PUT(matched);
2527			return ELDLM_OK;
2528		} else {
2529			ldlm_lock_decref(lockh, mode);
2530			LDLM_LOCK_PUT(matched);
2531		}
2532	}
2533
2534 no_match:
2535	if (intent) {
2536		LIST_HEAD(cancels);
2537		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2538					   &RQF_LDLM_ENQUEUE_LVB);
2539		if (req == NULL)
2540			return -ENOMEM;
2541
2542		rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2543		if (rc) {
2544			ptlrpc_request_free(req);
2545			return rc;
2546		}
2547
2548		req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2549				     sizeof(*lvb));
2550		ptlrpc_request_set_replen(req);
2551	}
2552
2553	/* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2554	*flags &= ~LDLM_FL_BLOCK_GRANTED;
2555
2556	rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2557			      sizeof(*lvb), LVB_T_OST, lockh, async);
2558	if (rqset) {
2559		if (!rc) {
2560			struct osc_enqueue_args *aa;
2561			CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2562			aa = ptlrpc_req_async_args(req);
2563			aa->oa_ei = einfo;
2564			aa->oa_exp = exp;
2565			aa->oa_flags  = flags;
2566			aa->oa_upcall = upcall;
2567			aa->oa_cookie = cookie;
2568			aa->oa_lvb    = lvb;
2569			aa->oa_lockh  = lockh;
2570			aa->oa_agl    = !!agl;
2571
2572			req->rq_interpret_reply =
2573				(ptlrpc_interpterer_t)osc_enqueue_interpret;
2574			if (rqset == PTLRPCD_SET)
2575				ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2576			else
2577				ptlrpc_set_add_req(rqset, req);
2578		} else if (intent) {
2579			ptlrpc_req_finished(req);
2580		}
2581		return rc;
2582	}
2583
2584	rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2585	if (intent)
2586		ptlrpc_req_finished(req);
2587
2588	return rc;
2589}
2590
2591static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2592		       struct ldlm_enqueue_info *einfo,
2593		       struct ptlrpc_request_set *rqset)
2594{
2595	struct ldlm_res_id res_id;
2596	int rc;
2597
2598	ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2599	rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2600			      &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2601			      oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2602			      oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2603			      rqset, rqset != NULL, 0);
2604	return rc;
2605}
2606
2607int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2608		   __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2609		   __u64 *flags, void *data, struct lustre_handle *lockh,
2610		   int unref)
2611{
2612	struct obd_device *obd = exp->exp_obd;
2613	__u64 lflags = *flags;
2614	ldlm_mode_t rc;
2615
2616	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2617		return -EIO;
2618
2619	/* Filesystem lock extents are extended to page boundaries so that
2620	 * dealing with the page cache is a little smoother */
2621	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2622	policy->l_extent.end |= ~CFS_PAGE_MASK;
2623
2624	/* Next, search for already existing extent locks that will cover us */
2625	/* If we're trying to read, we also search for an existing PW lock.  The
2626	 * VFS and page cache already protect us locally, so lots of readers/
2627	 * writers can share a single PW lock. */
2628	rc = mode;
2629	if (mode == LCK_PR)
2630		rc |= LCK_PW;
2631	rc = ldlm_lock_match(obd->obd_namespace, lflags,
2632			     res_id, type, policy, rc, lockh, unref);
2633	if (rc) {
2634		if (data != NULL) {
2635			if (!osc_set_data_with_check(lockh, data)) {
2636				if (!(lflags & LDLM_FL_TEST_LOCK))
2637					ldlm_lock_decref(lockh, rc);
2638				return 0;
2639			}
2640		}
2641		if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2642			ldlm_lock_addref(lockh, LCK_PR);
2643			ldlm_lock_decref(lockh, LCK_PW);
2644		}
2645		return rc;
2646	}
2647	return rc;
2648}
2649
2650int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2651{
2652	if (unlikely(mode == LCK_GROUP))
2653		ldlm_lock_decref_and_cancel(lockh, mode);
2654	else
2655		ldlm_lock_decref(lockh, mode);
2656
2657	return 0;
2658}
2659
2660static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2661		      __u32 mode, struct lustre_handle *lockh)
2662{
2663	return osc_cancel_base(lockh, mode);
2664}
2665
2666static int osc_cancel_unused(struct obd_export *exp,
2667			     struct lov_stripe_md *lsm,
2668			     ldlm_cancel_flags_t flags,
2669			     void *opaque)
2670{
2671	struct obd_device *obd = class_exp2obd(exp);
2672	struct ldlm_res_id res_id, *resp = NULL;
2673
2674	if (lsm != NULL) {
2675		ostid_build_res_name(&lsm->lsm_oi, &res_id);
2676		resp = &res_id;
2677	}
2678
2679	return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2680}
2681
2682static int osc_statfs_interpret(const struct lu_env *env,
2683				struct ptlrpc_request *req,
2684				struct osc_async_args *aa, int rc)
2685{
2686	struct obd_statfs *msfs;
2687
2688	if (rc == -EBADR)
2689		/* The request has in fact never been sent
2690		 * due to issues at a higher level (LOV).
2691		 * Exit immediately since the caller is
2692		 * aware of the problem and takes care
2693		 * of the clean up */
2694		 return rc;
2695
2696	if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2697	    (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2698		GOTO(out, rc = 0);
2699
2700	if (rc != 0)
2701		GOTO(out, rc);
2702
2703	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2704	if (msfs == NULL) {
2705		GOTO(out, rc = -EPROTO);
2706	}
2707
2708	*aa->aa_oi->oi_osfs = *msfs;
2709out:
2710	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2711	return rc;
2712}
2713
2714static int osc_statfs_async(struct obd_export *exp,
2715			    struct obd_info *oinfo, __u64 max_age,
2716			    struct ptlrpc_request_set *rqset)
2717{
2718	struct obd_device     *obd = class_exp2obd(exp);
2719	struct ptlrpc_request *req;
2720	struct osc_async_args *aa;
2721	int		    rc;
2722
2723	/* We could possibly pass max_age in the request (as an absolute
2724	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2725	 * extra calls into the filesystem if that isn't necessary (e.g.
2726	 * during mount that would help a bit).  Having relative timestamps
2727	 * is not so great if request processing is slow, while absolute
2728	 * timestamps are not ideal because they need time synchronization. */
2729	req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2730	if (req == NULL)
2731		return -ENOMEM;
2732
2733	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2734	if (rc) {
2735		ptlrpc_request_free(req);
2736		return rc;
2737	}
2738	ptlrpc_request_set_replen(req);
2739	req->rq_request_portal = OST_CREATE_PORTAL;
2740	ptlrpc_at_set_req_timeout(req);
2741
2742	if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2743		/* procfs requests not want stat in wait for avoid deadlock */
2744		req->rq_no_resend = 1;
2745		req->rq_no_delay = 1;
2746	}
2747
2748	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2749	CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2750	aa = ptlrpc_req_async_args(req);
2751	aa->aa_oi = oinfo;
2752
2753	ptlrpc_set_add_req(rqset, req);
2754	return 0;
2755}
2756
2757static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2758		      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2759{
2760	struct obd_device     *obd = class_exp2obd(exp);
2761	struct obd_statfs     *msfs;
2762	struct ptlrpc_request *req;
2763	struct obd_import     *imp = NULL;
2764	int rc;
2765
2766	/*Since the request might also come from lprocfs, so we need
2767	 *sync this with client_disconnect_export Bug15684*/
2768	down_read(&obd->u.cli.cl_sem);
2769	if (obd->u.cli.cl_import)
2770		imp = class_import_get(obd->u.cli.cl_import);
2771	up_read(&obd->u.cli.cl_sem);
2772	if (!imp)
2773		return -ENODEV;
2774
2775	/* We could possibly pass max_age in the request (as an absolute
2776	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2777	 * extra calls into the filesystem if that isn't necessary (e.g.
2778	 * during mount that would help a bit).  Having relative timestamps
2779	 * is not so great if request processing is slow, while absolute
2780	 * timestamps are not ideal because they need time synchronization. */
2781	req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2782
2783	class_import_put(imp);
2784
2785	if (req == NULL)
2786		return -ENOMEM;
2787
2788	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2789	if (rc) {
2790		ptlrpc_request_free(req);
2791		return rc;
2792	}
2793	ptlrpc_request_set_replen(req);
2794	req->rq_request_portal = OST_CREATE_PORTAL;
2795	ptlrpc_at_set_req_timeout(req);
2796
2797	if (flags & OBD_STATFS_NODELAY) {
2798		/* procfs requests not want stat in wait for avoid deadlock */
2799		req->rq_no_resend = 1;
2800		req->rq_no_delay = 1;
2801	}
2802
2803	rc = ptlrpc_queue_wait(req);
2804	if (rc)
2805		GOTO(out, rc);
2806
2807	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2808	if (msfs == NULL) {
2809		GOTO(out, rc = -EPROTO);
2810	}
2811
2812	*osfs = *msfs;
2813
2814 out:
2815	ptlrpc_req_finished(req);
2816	return rc;
2817}
2818
2819/* Retrieve object striping information.
2820 *
2821 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2822 * the maximum number of OST indices which will fit in the user buffer.
2823 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2824 */
2825static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2826{
2827	/* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2828	struct lov_user_md_v3 lum, *lumk;
2829	struct lov_user_ost_data_v1 *lmm_objects;
2830	int rc = 0, lum_size;
2831
2832	if (!lsm)
2833		return -ENODATA;
2834
2835	/* we only need the header part from user space to get lmm_magic and
2836	 * lmm_stripe_count, (the header part is common to v1 and v3) */
2837	lum_size = sizeof(struct lov_user_md_v1);
2838	if (copy_from_user(&lum, lump, lum_size))
2839		return -EFAULT;
2840
2841	if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2842	    (lum.lmm_magic != LOV_USER_MAGIC_V3))
2843		return -EINVAL;
2844
2845	/* lov_user_md_vX and lov_mds_md_vX must have the same size */
2846	LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2847	LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2848	LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2849
2850	/* we can use lov_mds_md_size() to compute lum_size
2851	 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2852	if (lum.lmm_stripe_count > 0) {
2853		lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2854		OBD_ALLOC(lumk, lum_size);
2855		if (!lumk)
2856			return -ENOMEM;
2857
2858		if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2859			lmm_objects =
2860			    &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2861		else
2862			lmm_objects = &(lumk->lmm_objects[0]);
2863		lmm_objects->l_ost_oi = lsm->lsm_oi;
2864	} else {
2865		lum_size = lov_mds_md_size(0, lum.lmm_magic);
2866		lumk = &lum;
2867	}
2868
2869	lumk->lmm_oi = lsm->lsm_oi;
2870	lumk->lmm_stripe_count = 1;
2871
2872	if (copy_to_user(lump, lumk, lum_size))
2873		rc = -EFAULT;
2874
2875	if (lumk != &lum)
2876		OBD_FREE(lumk, lum_size);
2877
2878	return rc;
2879}
2880
2881
2882static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2883			 void *karg, void *uarg)
2884{
2885	struct obd_device *obd = exp->exp_obd;
2886	struct obd_ioctl_data *data = karg;
2887	int err = 0;
2888
2889	if (!try_module_get(THIS_MODULE)) {
2890		CERROR("Can't get module. Is it alive?");
2891		return -EINVAL;
2892	}
2893	switch (cmd) {
2894	case OBD_IOC_LOV_GET_CONFIG: {
2895		char *buf;
2896		struct lov_desc *desc;
2897		struct obd_uuid uuid;
2898
2899		buf = NULL;
2900		len = 0;
2901		if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2902			GOTO(out, err = -EINVAL);
2903
2904		data = (struct obd_ioctl_data *)buf;
2905
2906		if (sizeof(*desc) > data->ioc_inllen1) {
2907			obd_ioctl_freedata(buf, len);
2908			GOTO(out, err = -EINVAL);
2909		}
2910
2911		if (data->ioc_inllen2 < sizeof(uuid)) {
2912			obd_ioctl_freedata(buf, len);
2913			GOTO(out, err = -EINVAL);
2914		}
2915
2916		desc = (struct lov_desc *)data->ioc_inlbuf1;
2917		desc->ld_tgt_count = 1;
2918		desc->ld_active_tgt_count = 1;
2919		desc->ld_default_stripe_count = 1;
2920		desc->ld_default_stripe_size = 0;
2921		desc->ld_default_stripe_offset = 0;
2922		desc->ld_pattern = 0;
2923		memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2924
2925		memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2926
2927		err = copy_to_user((void *)uarg, buf, len);
2928		if (err)
2929			err = -EFAULT;
2930		obd_ioctl_freedata(buf, len);
2931		GOTO(out, err);
2932	}
2933	case LL_IOC_LOV_SETSTRIPE:
2934		err = obd_alloc_memmd(exp, karg);
2935		if (err > 0)
2936			err = 0;
2937		GOTO(out, err);
2938	case LL_IOC_LOV_GETSTRIPE:
2939		err = osc_getstripe(karg, uarg);
2940		GOTO(out, err);
2941	case OBD_IOC_CLIENT_RECOVER:
2942		err = ptlrpc_recover_import(obd->u.cli.cl_import,
2943					    data->ioc_inlbuf1, 0);
2944		if (err > 0)
2945			err = 0;
2946		GOTO(out, err);
2947	case IOC_OSC_SET_ACTIVE:
2948		err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2949					       data->ioc_offset);
2950		GOTO(out, err);
2951	case OBD_IOC_POLL_QUOTACHECK:
2952		err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2953		GOTO(out, err);
2954	case OBD_IOC_PING_TARGET:
2955		err = ptlrpc_obd_ping(obd);
2956		GOTO(out, err);
2957	default:
2958		CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2959		       cmd, current_comm());
2960		GOTO(out, err = -ENOTTY);
2961	}
2962out:
2963	module_put(THIS_MODULE);
2964	return err;
2965}
2966
2967static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2968			u32 keylen, void *key, __u32 *vallen, void *val,
2969			struct lov_stripe_md *lsm)
2970{
2971	if (!vallen || !val)
2972		return -EFAULT;
2973
2974	if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2975		__u32 *stripe = val;
2976		*vallen = sizeof(*stripe);
2977		*stripe = 0;
2978		return 0;
2979	} else if (KEY_IS(KEY_LAST_ID)) {
2980		struct ptlrpc_request *req;
2981		u64		*reply;
2982		char		  *tmp;
2983		int		    rc;
2984
2985		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2986					   &RQF_OST_GET_INFO_LAST_ID);
2987		if (req == NULL)
2988			return -ENOMEM;
2989
2990		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2991				     RCL_CLIENT, keylen);
2992		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2993		if (rc) {
2994			ptlrpc_request_free(req);
2995			return rc;
2996		}
2997
2998		tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2999		memcpy(tmp, key, keylen);
3000
3001		req->rq_no_delay = req->rq_no_resend = 1;
3002		ptlrpc_request_set_replen(req);
3003		rc = ptlrpc_queue_wait(req);
3004		if (rc)
3005			GOTO(out, rc);
3006
3007		reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3008		if (reply == NULL)
3009			GOTO(out, rc = -EPROTO);
3010
3011		*((u64 *)val) = *reply;
3012	out:
3013		ptlrpc_req_finished(req);
3014		return rc;
3015	} else if (KEY_IS(KEY_FIEMAP)) {
3016		struct ll_fiemap_info_key *fm_key =
3017				(struct ll_fiemap_info_key *)key;
3018		struct ldlm_res_id	 res_id;
3019		ldlm_policy_data_t	 policy;
3020		struct lustre_handle	 lockh;
3021		ldlm_mode_t		 mode = 0;
3022		struct ptlrpc_request	*req;
3023		struct ll_user_fiemap	*reply;
3024		char			*tmp;
3025		int			 rc;
3026
3027		if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3028			goto skip_locking;
3029
3030		policy.l_extent.start = fm_key->fiemap.fm_start &
3031						CFS_PAGE_MASK;
3032
3033		if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3034		    fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3035			policy.l_extent.end = OBD_OBJECT_EOF;
3036		else
3037			policy.l_extent.end = (fm_key->fiemap.fm_start +
3038				fm_key->fiemap.fm_length +
3039				PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3040
3041		ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3042		mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3043				       LDLM_FL_BLOCK_GRANTED |
3044				       LDLM_FL_LVB_READY,
3045				       &res_id, LDLM_EXTENT, &policy,
3046				       LCK_PR | LCK_PW, &lockh, 0);
3047		if (mode) { /* lock is cached on client */
3048			if (mode != LCK_PR) {
3049				ldlm_lock_addref(&lockh, LCK_PR);
3050				ldlm_lock_decref(&lockh, LCK_PW);
3051			}
3052		} else { /* no cached lock, needs acquire lock on server side */
3053			fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3054			fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3055		}
3056
3057skip_locking:
3058		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3059					   &RQF_OST_GET_INFO_FIEMAP);
3060		if (req == NULL)
3061			GOTO(drop_lock, rc = -ENOMEM);
3062
3063		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3064				     RCL_CLIENT, keylen);
3065		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3066				     RCL_CLIENT, *vallen);
3067		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3068				     RCL_SERVER, *vallen);
3069
3070		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3071		if (rc) {
3072			ptlrpc_request_free(req);
3073			GOTO(drop_lock, rc);
3074		}
3075
3076		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3077		memcpy(tmp, key, keylen);
3078		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3079		memcpy(tmp, val, *vallen);
3080
3081		ptlrpc_request_set_replen(req);
3082		rc = ptlrpc_queue_wait(req);
3083		if (rc)
3084			GOTO(fini_req, rc);
3085
3086		reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3087		if (reply == NULL)
3088			GOTO(fini_req, rc = -EPROTO);
3089
3090		memcpy(val, reply, *vallen);
3091fini_req:
3092		ptlrpc_req_finished(req);
3093drop_lock:
3094		if (mode)
3095			ldlm_lock_decref(&lockh, LCK_PR);
3096		return rc;
3097	}
3098
3099	return -EINVAL;
3100}
3101
3102static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3103			      u32 keylen, void *key, u32 vallen,
3104			      void *val, struct ptlrpc_request_set *set)
3105{
3106	struct ptlrpc_request *req;
3107	struct obd_device     *obd = exp->exp_obd;
3108	struct obd_import     *imp = class_exp2cliimp(exp);
3109	char		  *tmp;
3110	int		    rc;
3111
3112	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3113
3114	if (KEY_IS(KEY_CHECKSUM)) {
3115		if (vallen != sizeof(int))
3116			return -EINVAL;
3117		exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3118		return 0;
3119	}
3120
3121	if (KEY_IS(KEY_SPTLRPC_CONF)) {
3122		sptlrpc_conf_client_adapt(obd);
3123		return 0;
3124	}
3125
3126	if (KEY_IS(KEY_FLUSH_CTX)) {
3127		sptlrpc_import_flush_my_ctx(imp);
3128		return 0;
3129	}
3130
3131	if (KEY_IS(KEY_CACHE_SET)) {
3132		struct client_obd *cli = &obd->u.cli;
3133
3134		LASSERT(cli->cl_cache == NULL); /* only once */
3135		cli->cl_cache = (struct cl_client_cache *)val;
3136		atomic_inc(&cli->cl_cache->ccc_users);
3137		cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3138
3139		/* add this osc into entity list */
3140		LASSERT(list_empty(&cli->cl_lru_osc));
3141		spin_lock(&cli->cl_cache->ccc_lru_lock);
3142		list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3143		spin_unlock(&cli->cl_cache->ccc_lru_lock);
3144
3145		return 0;
3146	}
3147
3148	if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3149		struct client_obd *cli = &obd->u.cli;
3150		int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3151		int target = *(int *)val;
3152
3153		nr = osc_lru_shrink(cli, min(nr, target));
3154		*(int *)val -= nr;
3155		return 0;
3156	}
3157
3158	if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3159		return -EINVAL;
3160
3161	/* We pass all other commands directly to OST. Since nobody calls osc
3162	   methods directly and everybody is supposed to go through LOV, we
3163	   assume lov checked invalid values for us.
3164	   The only recognised values so far are evict_by_nid and mds_conn.
3165	   Even if something bad goes through, we'd get a -EINVAL from OST
3166	   anyway. */
3167
3168	req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3169						&RQF_OST_SET_GRANT_INFO :
3170						&RQF_OBD_SET_INFO);
3171	if (req == NULL)
3172		return -ENOMEM;
3173
3174	req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3175			     RCL_CLIENT, keylen);
3176	if (!KEY_IS(KEY_GRANT_SHRINK))
3177		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3178				     RCL_CLIENT, vallen);
3179	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3180	if (rc) {
3181		ptlrpc_request_free(req);
3182		return rc;
3183	}
3184
3185	tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3186	memcpy(tmp, key, keylen);
3187	tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3188							&RMF_OST_BODY :
3189							&RMF_SETINFO_VAL);
3190	memcpy(tmp, val, vallen);
3191
3192	if (KEY_IS(KEY_GRANT_SHRINK)) {
3193		struct osc_grant_args *aa;
3194		struct obdo *oa;
3195
3196		CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3197		aa = ptlrpc_req_async_args(req);
3198		OBDO_ALLOC(oa);
3199		if (!oa) {
3200			ptlrpc_req_finished(req);
3201			return -ENOMEM;
3202		}
3203		*oa = ((struct ost_body *)val)->oa;
3204		aa->aa_oa = oa;
3205		req->rq_interpret_reply = osc_shrink_grant_interpret;
3206	}
3207
3208	ptlrpc_request_set_replen(req);
3209	if (!KEY_IS(KEY_GRANT_SHRINK)) {
3210		LASSERT(set != NULL);
3211		ptlrpc_set_add_req(set, req);
3212		ptlrpc_check_set(NULL, set);
3213	} else
3214		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3215
3216	return 0;
3217}
3218
3219
3220static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3221			 struct obd_device *disk_obd, int *index)
3222{
3223	/* this code is not supposed to be used with LOD/OSP
3224	 * to be removed soon */
3225	LBUG();
3226	return 0;
3227}
3228
3229static int osc_llog_finish(struct obd_device *obd, int count)
3230{
3231	struct llog_ctxt *ctxt;
3232
3233	ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3234	if (ctxt) {
3235		llog_cat_close(NULL, ctxt->loc_handle);
3236		llog_cleanup(NULL, ctxt);
3237	}
3238
3239	ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3240	if (ctxt)
3241		llog_cleanup(NULL, ctxt);
3242	return 0;
3243}
3244
3245static int osc_reconnect(const struct lu_env *env,
3246			 struct obd_export *exp, struct obd_device *obd,
3247			 struct obd_uuid *cluuid,
3248			 struct obd_connect_data *data,
3249			 void *localdata)
3250{
3251	struct client_obd *cli = &obd->u.cli;
3252
3253	if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3254		long lost_grant;
3255
3256		client_obd_list_lock(&cli->cl_loi_list_lock);
3257		data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3258				2 * cli_brw_size(obd);
3259		lost_grant = cli->cl_lost_grant;
3260		cli->cl_lost_grant = 0;
3261		client_obd_list_unlock(&cli->cl_loi_list_lock);
3262
3263		CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3264		       " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3265		       data->ocd_version, data->ocd_grant, lost_grant);
3266	}
3267
3268	return 0;
3269}
3270
3271static int osc_disconnect(struct obd_export *exp)
3272{
3273	struct obd_device *obd = class_exp2obd(exp);
3274	struct llog_ctxt  *ctxt;
3275	int rc;
3276
3277	ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3278	if (ctxt) {
3279		if (obd->u.cli.cl_conn_count == 1) {
3280			/* Flush any remaining cancel messages out to the
3281			 * target */
3282			llog_sync(ctxt, exp, 0);
3283		}
3284		llog_ctxt_put(ctxt);
3285	} else {
3286		CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3287		       obd);
3288	}
3289
3290	rc = client_disconnect_export(exp);
3291	/**
3292	 * Initially we put del_shrink_grant before disconnect_export, but it
3293	 * causes the following problem if setup (connect) and cleanup
3294	 * (disconnect) are tangled together.
3295	 *      connect p1		     disconnect p2
3296	 *   ptlrpc_connect_import
3297	 *     ...............	       class_manual_cleanup
3298	 *				     osc_disconnect
3299	 *				     del_shrink_grant
3300	 *   ptlrpc_connect_interrupt
3301	 *     init_grant_shrink
3302	 *   add this client to shrink list
3303	 *				      cleanup_osc
3304	 * Bang! pinger trigger the shrink.
3305	 * So the osc should be disconnected from the shrink list, after we
3306	 * are sure the import has been destroyed. BUG18662
3307	 */
3308	if (obd->u.cli.cl_import == NULL)
3309		osc_del_shrink_grant(&obd->u.cli);
3310	return rc;
3311}
3312
3313static int osc_import_event(struct obd_device *obd,
3314			    struct obd_import *imp,
3315			    enum obd_import_event event)
3316{
3317	struct client_obd *cli;
3318	int rc = 0;
3319
3320	LASSERT(imp->imp_obd == obd);
3321
3322	switch (event) {
3323	case IMP_EVENT_DISCON: {
3324		cli = &obd->u.cli;
3325		client_obd_list_lock(&cli->cl_loi_list_lock);
3326		cli->cl_avail_grant = 0;
3327		cli->cl_lost_grant = 0;
3328		client_obd_list_unlock(&cli->cl_loi_list_lock);
3329		break;
3330	}
3331	case IMP_EVENT_INACTIVE: {
3332		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3333		break;
3334	}
3335	case IMP_EVENT_INVALIDATE: {
3336		struct ldlm_namespace *ns = obd->obd_namespace;
3337		struct lu_env	 *env;
3338		int		    refcheck;
3339
3340		env = cl_env_get(&refcheck);
3341		if (!IS_ERR(env)) {
3342			/* Reset grants */
3343			cli = &obd->u.cli;
3344			/* all pages go to failing rpcs due to the invalid
3345			 * import */
3346			osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3347
3348			ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3349			cl_env_put(env, &refcheck);
3350		} else
3351			rc = PTR_ERR(env);
3352		break;
3353	}
3354	case IMP_EVENT_ACTIVE: {
3355		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3356		break;
3357	}
3358	case IMP_EVENT_OCD: {
3359		struct obd_connect_data *ocd = &imp->imp_connect_data;
3360
3361		if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3362			osc_init_grant(&obd->u.cli, ocd);
3363
3364		/* See bug 7198 */
3365		if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3366			imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3367
3368		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3369		break;
3370	}
3371	case IMP_EVENT_DEACTIVATE: {
3372		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3373		break;
3374	}
3375	case IMP_EVENT_ACTIVATE: {
3376		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3377		break;
3378	}
3379	default:
3380		CERROR("Unknown import event %d\n", event);
3381		LBUG();
3382	}
3383	return rc;
3384}
3385
3386/**
3387 * Determine whether the lock can be canceled before replaying the lock
3388 * during recovery, see bug16774 for detailed information.
3389 *
3390 * \retval zero the lock can't be canceled
3391 * \retval other ok to cancel
3392 */
3393static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3394{
3395	check_res_locked(lock->l_resource);
3396
3397	/*
3398	 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3399	 *
3400	 * XXX as a future improvement, we can also cancel unused write lock
3401	 * if it doesn't have dirty data and active mmaps.
3402	 */
3403	if (lock->l_resource->lr_type == LDLM_EXTENT &&
3404	    (lock->l_granted_mode == LCK_PR ||
3405	     lock->l_granted_mode == LCK_CR) &&
3406	    (osc_dlm_lock_pageref(lock) == 0))
3407		return 1;
3408
3409	return 0;
3410}
3411
3412static int brw_queue_work(const struct lu_env *env, void *data)
3413{
3414	struct client_obd *cli = data;
3415
3416	CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3417
3418	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3419	return 0;
3420}
3421
3422int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3423{
3424	struct lprocfs_static_vars lvars = { NULL };
3425	struct client_obd	  *cli = &obd->u.cli;
3426	void		       *handler;
3427	int			rc;
3428
3429	rc = ptlrpcd_addref();
3430	if (rc)
3431		return rc;
3432
3433	rc = client_obd_setup(obd, lcfg);
3434	if (rc)
3435		GOTO(out_ptlrpcd, rc);
3436
3437	handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3438	if (IS_ERR(handler))
3439		GOTO(out_client_setup, rc = PTR_ERR(handler));
3440	cli->cl_writeback_work = handler;
3441
3442	rc = osc_quota_setup(obd);
3443	if (rc)
3444		GOTO(out_ptlrpcd_work, rc);
3445
3446	cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3447	lprocfs_osc_init_vars(&lvars);
3448	if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3449		lproc_osc_attach_seqstat(obd);
3450		sptlrpc_lprocfs_cliobd_attach(obd);
3451		ptlrpc_lprocfs_register_obd(obd);
3452	}
3453
3454	/* We need to allocate a few requests more, because
3455	 * brw_interpret tries to create new requests before freeing
3456	 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3457	 * reserved, but I'm afraid that might be too much wasted RAM
3458	 * in fact, so 2 is just my guess and still should work. */
3459	cli->cl_import->imp_rq_pool =
3460		ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3461				    OST_MAXREQSIZE,
3462				    ptlrpc_add_rqs_to_pool);
3463
3464	INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3465	ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3466	return rc;
3467
3468out_ptlrpcd_work:
3469	ptlrpcd_destroy_work(handler);
3470out_client_setup:
3471	client_obd_cleanup(obd);
3472out_ptlrpcd:
3473	ptlrpcd_decref();
3474	return rc;
3475}
3476
3477static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3478{
3479	int rc = 0;
3480
3481	switch (stage) {
3482	case OBD_CLEANUP_EARLY: {
3483		struct obd_import *imp;
3484		imp = obd->u.cli.cl_import;
3485		CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3486		/* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3487		ptlrpc_deactivate_import(imp);
3488		spin_lock(&imp->imp_lock);
3489		imp->imp_pingable = 0;
3490		spin_unlock(&imp->imp_lock);
3491		break;
3492	}
3493	case OBD_CLEANUP_EXPORTS: {
3494		struct client_obd *cli = &obd->u.cli;
3495		/* LU-464
3496		 * for echo client, export may be on zombie list, wait for
3497		 * zombie thread to cull it, because cli.cl_import will be
3498		 * cleared in client_disconnect_export():
3499		 *   class_export_destroy() -> obd_cleanup() ->
3500		 *   echo_device_free() -> echo_client_cleanup() ->
3501		 *   obd_disconnect() -> osc_disconnect() ->
3502		 *   client_disconnect_export()
3503		 */
3504		obd_zombie_barrier();
3505		if (cli->cl_writeback_work) {
3506			ptlrpcd_destroy_work(cli->cl_writeback_work);
3507			cli->cl_writeback_work = NULL;
3508		}
3509		obd_cleanup_client_import(obd);
3510		ptlrpc_lprocfs_unregister_obd(obd);
3511		lprocfs_obd_cleanup(obd);
3512		rc = obd_llog_finish(obd, 0);
3513		if (rc != 0)
3514			CERROR("failed to cleanup llogging subsystems\n");
3515		break;
3516		}
3517	}
3518	return rc;
3519}
3520
3521int osc_cleanup(struct obd_device *obd)
3522{
3523	struct client_obd *cli = &obd->u.cli;
3524	int rc;
3525
3526	/* lru cleanup */
3527	if (cli->cl_cache != NULL) {
3528		LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3529		spin_lock(&cli->cl_cache->ccc_lru_lock);
3530		list_del_init(&cli->cl_lru_osc);
3531		spin_unlock(&cli->cl_cache->ccc_lru_lock);
3532		cli->cl_lru_left = NULL;
3533		atomic_dec(&cli->cl_cache->ccc_users);
3534		cli->cl_cache = NULL;
3535	}
3536
3537	/* free memory of osc quota cache */
3538	osc_quota_cleanup(obd);
3539
3540	rc = client_obd_cleanup(obd);
3541
3542	ptlrpcd_decref();
3543	return rc;
3544}
3545
3546int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3547{
3548	struct lprocfs_static_vars lvars = { NULL };
3549	int rc = 0;
3550
3551	lprocfs_osc_init_vars(&lvars);
3552
3553	switch (lcfg->lcfg_command) {
3554	default:
3555		rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3556					      lcfg, obd);
3557		if (rc > 0)
3558			rc = 0;
3559		break;
3560	}
3561
3562	return(rc);
3563}
3564
3565static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3566{
3567	return osc_process_config_base(obd, buf);
3568}
3569
3570struct obd_ops osc_obd_ops = {
3571	.o_owner		= THIS_MODULE,
3572	.o_setup		= osc_setup,
3573	.o_precleanup	   = osc_precleanup,
3574	.o_cleanup	      = osc_cleanup,
3575	.o_add_conn	     = client_import_add_conn,
3576	.o_del_conn	     = client_import_del_conn,
3577	.o_connect	      = client_connect_import,
3578	.o_reconnect	    = osc_reconnect,
3579	.o_disconnect	   = osc_disconnect,
3580	.o_statfs	       = osc_statfs,
3581	.o_statfs_async	 = osc_statfs_async,
3582	.o_packmd	       = osc_packmd,
3583	.o_unpackmd	     = osc_unpackmd,
3584	.o_create	       = osc_create,
3585	.o_destroy	      = osc_destroy,
3586	.o_getattr	      = osc_getattr,
3587	.o_getattr_async	= osc_getattr_async,
3588	.o_setattr	      = osc_setattr,
3589	.o_setattr_async	= osc_setattr_async,
3590	.o_brw		  = osc_brw,
3591	.o_punch		= osc_punch,
3592	.o_sync		 = osc_sync,
3593	.o_enqueue	      = osc_enqueue,
3594	.o_change_cbdata	= osc_change_cbdata,
3595	.o_find_cbdata	  = osc_find_cbdata,
3596	.o_cancel	       = osc_cancel,
3597	.o_cancel_unused	= osc_cancel_unused,
3598	.o_iocontrol	    = osc_iocontrol,
3599	.o_get_info	     = osc_get_info,
3600	.o_set_info_async       = osc_set_info_async,
3601	.o_import_event	 = osc_import_event,
3602	.o_llog_init	    = osc_llog_init,
3603	.o_llog_finish	  = osc_llog_finish,
3604	.o_process_config       = osc_process_config,
3605	.o_quotactl	     = osc_quotactl,
3606	.o_quotacheck	   = osc_quotacheck,
3607};
3608
3609extern struct lu_kmem_descr osc_caches[];
3610extern spinlock_t osc_ast_guard;
3611extern struct lock_class_key osc_ast_guard_class;
3612
3613int __init osc_init(void)
3614{
3615	struct lprocfs_static_vars lvars = { NULL };
3616	int rc;
3617
3618	/* print an address of _any_ initialized kernel symbol from this
3619	 * module, to allow debugging with gdb that doesn't support data
3620	 * symbols from modules.*/
3621	CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3622
3623	rc = lu_kmem_init(osc_caches);
3624	if (rc)
3625		return rc;
3626
3627	lprocfs_osc_init_vars(&lvars);
3628
3629	rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3630				 LUSTRE_OSC_NAME, &osc_device_type);
3631	if (rc) {
3632		lu_kmem_fini(osc_caches);
3633		return rc;
3634	}
3635
3636	spin_lock_init(&osc_ast_guard);
3637	lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3638
3639	return rc;
3640}
3641
3642static void /*__exit*/ osc_exit(void)
3643{
3644	class_unregister_type(LUSTRE_OSC_NAME);
3645	lu_kmem_fini(osc_caches);
3646}
3647
3648MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3649MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3650MODULE_LICENSE("GPL");
3651MODULE_VERSION(LUSTRE_VERSION_STRING);
3652
3653module_init(osc_init);
3654module_exit(osc_exit);
3655