osc_request.c revision 9d8654397d0dcb1885457a2188b59995f2219676
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_OSC
38
39#include <linux/libcfs/libcfs.h>
40
41
42#include <lustre_dlm.h>
43#include <lustre_net.h>
44#include <lustre/lustre_user.h>
45#include <obd_cksum.h>
46#include <obd_ost.h>
47#include <obd_lov.h>
48
49#ifdef  __CYGWIN__
50# include <ctype.h>
51#endif
52
53#include <lustre_ha.h>
54#include <lprocfs_status.h>
55#include <lustre_log.h>
56#include <lustre_debug.h>
57#include <lustre_param.h>
58#include <lustre_fid.h>
59#include "osc_internal.h"
60#include "osc_cl_internal.h"
61
62static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63static int brw_interpret(const struct lu_env *env,
64			 struct ptlrpc_request *req, void *data, int rc);
65int osc_cleanup(struct obd_device *obd);
66
67/* Pack OSC object metadata for disk storage (LE byte order). */
68static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69		      struct lov_stripe_md *lsm)
70{
71	int lmm_size;
72	ENTRY;
73
74	lmm_size = sizeof(**lmmp);
75	if (lmmp == NULL)
76		RETURN(lmm_size);
77
78	if (*lmmp != NULL && lsm == NULL) {
79		OBD_FREE(*lmmp, lmm_size);
80		*lmmp = NULL;
81		RETURN(0);
82	} else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
83		RETURN(-EBADF);
84	}
85
86	if (*lmmp == NULL) {
87		OBD_ALLOC(*lmmp, lmm_size);
88		if (*lmmp == NULL)
89			RETURN(-ENOMEM);
90	}
91
92	if (lsm)
93		ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
94
95	RETURN(lmm_size);
96}
97
98/* Unpack OSC object metadata from disk storage (LE byte order). */
99static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
100			struct lov_mds_md *lmm, int lmm_bytes)
101{
102	int lsm_size;
103	struct obd_import *imp = class_exp2cliimp(exp);
104	ENTRY;
105
106	if (lmm != NULL) {
107		if (lmm_bytes < sizeof(*lmm)) {
108			CERROR("%s: lov_mds_md too small: %d, need %d\n",
109			       exp->exp_obd->obd_name, lmm_bytes,
110			       (int)sizeof(*lmm));
111			RETURN(-EINVAL);
112		}
113		/* XXX LOV_MAGIC etc check? */
114
115		if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
116			CERROR("%s: zero lmm_object_id: rc = %d\n",
117			       exp->exp_obd->obd_name, -EINVAL);
118			RETURN(-EINVAL);
119		}
120	}
121
122	lsm_size = lov_stripe_md_size(1);
123	if (lsmp == NULL)
124		RETURN(lsm_size);
125
126	if (*lsmp != NULL && lmm == NULL) {
127		OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
128		OBD_FREE(*lsmp, lsm_size);
129		*lsmp = NULL;
130		RETURN(0);
131	}
132
133	if (*lsmp == NULL) {
134		OBD_ALLOC(*lsmp, lsm_size);
135		if (unlikely(*lsmp == NULL))
136			RETURN(-ENOMEM);
137		OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138		if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
139			OBD_FREE(*lsmp, lsm_size);
140			RETURN(-ENOMEM);
141		}
142		loi_init((*lsmp)->lsm_oinfo[0]);
143	} else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
144		RETURN(-EBADF);
145	}
146
147	if (lmm != NULL)
148		/* XXX zero *lsmp? */
149		ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
150
151	if (imp != NULL &&
152	    (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
153		(*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
154	else
155		(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156
157	RETURN(lsm_size);
158}
159
160static inline void osc_pack_capa(struct ptlrpc_request *req,
161				 struct ost_body *body, void *capa)
162{
163	struct obd_capa *oc = (struct obd_capa *)capa;
164	struct lustre_capa *c;
165
166	if (!capa)
167		return;
168
169	c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
170	LASSERT(c);
171	capa_cpy(c, oc);
172	body->oa.o_valid |= OBD_MD_FLOSSCAPA;
173	DEBUG_CAPA(D_SEC, c, "pack");
174}
175
176static inline void osc_pack_req_body(struct ptlrpc_request *req,
177				     struct obd_info *oinfo)
178{
179	struct ost_body *body;
180
181	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
182	LASSERT(body);
183
184	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
185			     oinfo->oi_oa);
186	osc_pack_capa(req, body, oinfo->oi_capa);
187}
188
189static inline void osc_set_capa_size(struct ptlrpc_request *req,
190				     const struct req_msg_field *field,
191				     struct obd_capa *oc)
192{
193	if (oc == NULL)
194		req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
195	else
196		/* it is already calculated as sizeof struct obd_capa */
197		;
198}
199
200static int osc_getattr_interpret(const struct lu_env *env,
201				 struct ptlrpc_request *req,
202				 struct osc_async_args *aa, int rc)
203{
204	struct ost_body *body;
205	ENTRY;
206
207	if (rc != 0)
208		GOTO(out, rc);
209
210	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
211	if (body) {
212		CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
213		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
214				     aa->aa_oi->oi_oa, &body->oa);
215
216		/* This should really be sent by the OST */
217		aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
218		aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
219	} else {
220		CDEBUG(D_INFO, "can't unpack ost_body\n");
221		rc = -EPROTO;
222		aa->aa_oi->oi_oa->o_valid = 0;
223	}
224out:
225	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
226	RETURN(rc);
227}
228
229static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
230			     struct ptlrpc_request_set *set)
231{
232	struct ptlrpc_request *req;
233	struct osc_async_args *aa;
234	int		    rc;
235	ENTRY;
236
237	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
238	if (req == NULL)
239		RETURN(-ENOMEM);
240
241	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
242	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
243	if (rc) {
244		ptlrpc_request_free(req);
245		RETURN(rc);
246	}
247
248	osc_pack_req_body(req, oinfo);
249
250	ptlrpc_request_set_replen(req);
251	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
252
253	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
254	aa = ptlrpc_req_async_args(req);
255	aa->aa_oi = oinfo;
256
257	ptlrpc_set_add_req(set, req);
258	RETURN(0);
259}
260
261static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
262		       struct obd_info *oinfo)
263{
264	struct ptlrpc_request *req;
265	struct ost_body       *body;
266	int		    rc;
267	ENTRY;
268
269	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
270	if (req == NULL)
271		RETURN(-ENOMEM);
272
273	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
274	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
275	if (rc) {
276		ptlrpc_request_free(req);
277		RETURN(rc);
278	}
279
280	osc_pack_req_body(req, oinfo);
281
282	ptlrpc_request_set_replen(req);
283
284	rc = ptlrpc_queue_wait(req);
285	if (rc)
286		GOTO(out, rc);
287
288	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
289	if (body == NULL)
290		GOTO(out, rc = -EPROTO);
291
292	CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
293	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
294			     &body->oa);
295
296	oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
297	oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
298
299	EXIT;
300 out:
301	ptlrpc_req_finished(req);
302	return rc;
303}
304
305static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
306		       struct obd_info *oinfo, struct obd_trans_info *oti)
307{
308	struct ptlrpc_request *req;
309	struct ost_body       *body;
310	int		    rc;
311	ENTRY;
312
313	LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
314
315	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
316	if (req == NULL)
317		RETURN(-ENOMEM);
318
319	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
320	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
321	if (rc) {
322		ptlrpc_request_free(req);
323		RETURN(rc);
324	}
325
326	osc_pack_req_body(req, oinfo);
327
328	ptlrpc_request_set_replen(req);
329
330	rc = ptlrpc_queue_wait(req);
331	if (rc)
332		GOTO(out, rc);
333
334	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
335	if (body == NULL)
336		GOTO(out, rc = -EPROTO);
337
338	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
339			     &body->oa);
340
341	EXIT;
342out:
343	ptlrpc_req_finished(req);
344	RETURN(rc);
345}
346
347static int osc_setattr_interpret(const struct lu_env *env,
348				 struct ptlrpc_request *req,
349				 struct osc_setattr_args *sa, int rc)
350{
351	struct ost_body *body;
352	ENTRY;
353
354	if (rc != 0)
355		GOTO(out, rc);
356
357	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358	if (body == NULL)
359		GOTO(out, rc = -EPROTO);
360
361	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
362			     &body->oa);
363out:
364	rc = sa->sa_upcall(sa->sa_cookie, rc);
365	RETURN(rc);
366}
367
368int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369			   struct obd_trans_info *oti,
370			   obd_enqueue_update_f upcall, void *cookie,
371			   struct ptlrpc_request_set *rqset)
372{
373	struct ptlrpc_request   *req;
374	struct osc_setattr_args *sa;
375	int		      rc;
376	ENTRY;
377
378	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379	if (req == NULL)
380		RETURN(-ENOMEM);
381
382	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384	if (rc) {
385		ptlrpc_request_free(req);
386		RETURN(rc);
387	}
388
389	if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390		oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392	osc_pack_req_body(req, oinfo);
393
394	ptlrpc_request_set_replen(req);
395
396	/* do mds to ost setattr asynchronously */
397	if (!rqset) {
398		/* Do not wait for response. */
399		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400	} else {
401		req->rq_interpret_reply =
402			(ptlrpc_interpterer_t)osc_setattr_interpret;
403
404		CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405		sa = ptlrpc_req_async_args(req);
406		sa->sa_oa = oinfo->oi_oa;
407		sa->sa_upcall = upcall;
408		sa->sa_cookie = cookie;
409
410		if (rqset == PTLRPCD_SET)
411			ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412		else
413			ptlrpc_set_add_req(rqset, req);
414	}
415
416	RETURN(0);
417}
418
419static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420			     struct obd_trans_info *oti,
421			     struct ptlrpc_request_set *rqset)
422{
423	return osc_setattr_async_base(exp, oinfo, oti,
424				      oinfo->oi_cb_up, oinfo, rqset);
425}
426
427int osc_real_create(struct obd_export *exp, struct obdo *oa,
428		    struct lov_stripe_md **ea, struct obd_trans_info *oti)
429{
430	struct ptlrpc_request *req;
431	struct ost_body       *body;
432	struct lov_stripe_md  *lsm;
433	int		    rc;
434	ENTRY;
435
436	LASSERT(oa);
437	LASSERT(ea);
438
439	lsm = *ea;
440	if (!lsm) {
441		rc = obd_alloc_memmd(exp, &lsm);
442		if (rc < 0)
443			RETURN(rc);
444	}
445
446	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447	if (req == NULL)
448		GOTO(out, rc = -ENOMEM);
449
450	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451	if (rc) {
452		ptlrpc_request_free(req);
453		GOTO(out, rc);
454	}
455
456	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457	LASSERT(body);
458
459	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
460
461	ptlrpc_request_set_replen(req);
462
463	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
464	    oa->o_flags == OBD_FL_DELORPHAN) {
465		DEBUG_REQ(D_HA, req,
466			  "delorphan from OST integration");
467		/* Don't resend the delorphan req */
468		req->rq_no_resend = req->rq_no_delay = 1;
469	}
470
471	rc = ptlrpc_queue_wait(req);
472	if (rc)
473		GOTO(out_req, rc);
474
475	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
476	if (body == NULL)
477		GOTO(out_req, rc = -EPROTO);
478
479	CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
480	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
481
482	oa->o_blksize = cli_brw_size(exp->exp_obd);
483	oa->o_valid |= OBD_MD_FLBLKSZ;
484
485	/* XXX LOV STACKING: the lsm that is passed to us from LOV does not
486	 * have valid lsm_oinfo data structs, so don't go touching that.
487	 * This needs to be fixed in a big way.
488	 */
489	lsm->lsm_oi = oa->o_oi;
490	*ea = lsm;
491
492	if (oti != NULL) {
493		oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495		if (oa->o_valid & OBD_MD_FLCOOKIE) {
496			if (!oti->oti_logcookies)
497				oti_alloc_cookies(oti, 1);
498			*oti->oti_logcookies = oa->o_lcookie;
499		}
500	}
501
502	CDEBUG(D_HA, "transno: "LPD64"\n",
503	       lustre_msg_get_transno(req->rq_repmsg));
504out_req:
505	ptlrpc_req_finished(req);
506out:
507	if (rc && !*ea)
508		obd_free_memmd(exp, &lsm);
509	RETURN(rc);
510}
511
512int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513		   obd_enqueue_update_f upcall, void *cookie,
514		   struct ptlrpc_request_set *rqset)
515{
516	struct ptlrpc_request   *req;
517	struct osc_setattr_args *sa;
518	struct ost_body	 *body;
519	int		      rc;
520	ENTRY;
521
522	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523	if (req == NULL)
524		RETURN(-ENOMEM);
525
526	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528	if (rc) {
529		ptlrpc_request_free(req);
530		RETURN(rc);
531	}
532	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533	ptlrpc_at_set_req_timeout(req);
534
535	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536	LASSERT(body);
537	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
538			     oinfo->oi_oa);
539	osc_pack_capa(req, body, oinfo->oi_capa);
540
541	ptlrpc_request_set_replen(req);
542
543	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
544	CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
545	sa = ptlrpc_req_async_args(req);
546	sa->sa_oa     = oinfo->oi_oa;
547	sa->sa_upcall = upcall;
548	sa->sa_cookie = cookie;
549	if (rqset == PTLRPCD_SET)
550		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
551	else
552		ptlrpc_set_add_req(rqset, req);
553
554	RETURN(0);
555}
556
557static int osc_punch(const struct lu_env *env, struct obd_export *exp,
558		     struct obd_info *oinfo, struct obd_trans_info *oti,
559		     struct ptlrpc_request_set *rqset)
560{
561	oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
562	oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
563	oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
564	return osc_punch_base(exp, oinfo,
565			      oinfo->oi_cb_up, oinfo, rqset);
566}
567
568static int osc_sync_interpret(const struct lu_env *env,
569			      struct ptlrpc_request *req,
570			      void *arg, int rc)
571{
572	struct osc_fsync_args *fa = arg;
573	struct ost_body *body;
574	ENTRY;
575
576	if (rc)
577		GOTO(out, rc);
578
579	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580	if (body == NULL) {
581		CERROR ("can't unpack ost_body\n");
582		GOTO(out, rc = -EPROTO);
583	}
584
585	*fa->fa_oi->oi_oa = body->oa;
586out:
587	rc = fa->fa_upcall(fa->fa_cookie, rc);
588	RETURN(rc);
589}
590
591int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
592		  obd_enqueue_update_f upcall, void *cookie,
593		  struct ptlrpc_request_set *rqset)
594{
595	struct ptlrpc_request *req;
596	struct ost_body       *body;
597	struct osc_fsync_args *fa;
598	int		    rc;
599	ENTRY;
600
601	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
602	if (req == NULL)
603		RETURN(-ENOMEM);
604
605	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
606	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
607	if (rc) {
608		ptlrpc_request_free(req);
609		RETURN(rc);
610	}
611
612	/* overload the size and blocks fields in the oa with start/end */
613	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614	LASSERT(body);
615	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
616			     oinfo->oi_oa);
617	osc_pack_capa(req, body, oinfo->oi_capa);
618
619	ptlrpc_request_set_replen(req);
620	req->rq_interpret_reply = osc_sync_interpret;
621
622	CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
623	fa = ptlrpc_req_async_args(req);
624	fa->fa_oi = oinfo;
625	fa->fa_upcall = upcall;
626	fa->fa_cookie = cookie;
627
628	if (rqset == PTLRPCD_SET)
629		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
630	else
631		ptlrpc_set_add_req(rqset, req);
632
633	RETURN (0);
634}
635
636static int osc_sync(const struct lu_env *env, struct obd_export *exp,
637		    struct obd_info *oinfo, obd_size start, obd_size end,
638		    struct ptlrpc_request_set *set)
639{
640	ENTRY;
641
642	if (!oinfo->oi_oa) {
643		CDEBUG(D_INFO, "oa NULL\n");
644		RETURN(-EINVAL);
645	}
646
647	oinfo->oi_oa->o_size = start;
648	oinfo->oi_oa->o_blocks = end;
649	oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
650
651	RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
652}
653
654/* Find and cancel locally locks matched by @mode in the resource found by
655 * @objid. Found locks are added into @cancel list. Returns the amount of
656 * locks added to @cancels list. */
657static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
658				   struct list_head *cancels,
659				   ldlm_mode_t mode, int lock_flags)
660{
661	struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
662	struct ldlm_res_id res_id;
663	struct ldlm_resource *res;
664	int count;
665	ENTRY;
666
667	/* Return, i.e. cancel nothing, only if ELC is supported (flag in
668	 * export) but disabled through procfs (flag in NS).
669	 *
670	 * This distinguishes from a case when ELC is not supported originally,
671	 * when we still want to cancel locks in advance and just cancel them
672	 * locally, without sending any RPC. */
673	if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
674		RETURN(0);
675
676	ostid_build_res_name(&oa->o_oi, &res_id);
677	res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
678	if (res == NULL)
679		RETURN(0);
680
681	LDLM_RESOURCE_ADDREF(res);
682	count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
683					   lock_flags, 0, NULL);
684	LDLM_RESOURCE_DELREF(res);
685	ldlm_resource_putref(res);
686	RETURN(count);
687}
688
689static int osc_destroy_interpret(const struct lu_env *env,
690				 struct ptlrpc_request *req, void *data,
691				 int rc)
692{
693	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
694
695	atomic_dec(&cli->cl_destroy_in_flight);
696	wake_up(&cli->cl_destroy_waitq);
697	return 0;
698}
699
700static int osc_can_send_destroy(struct client_obd *cli)
701{
702	if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
703	    cli->cl_max_rpcs_in_flight) {
704		/* The destroy request can be sent */
705		return 1;
706	}
707	if (atomic_dec_return(&cli->cl_destroy_in_flight) <
708	    cli->cl_max_rpcs_in_flight) {
709		/*
710		 * The counter has been modified between the two atomic
711		 * operations.
712		 */
713		wake_up(&cli->cl_destroy_waitq);
714	}
715	return 0;
716}
717
718int osc_create(const struct lu_env *env, struct obd_export *exp,
719	       struct obdo *oa, struct lov_stripe_md **ea,
720	       struct obd_trans_info *oti)
721{
722	int rc = 0;
723	ENTRY;
724
725	LASSERT(oa);
726	LASSERT(ea);
727	LASSERT(oa->o_valid & OBD_MD_FLGROUP);
728
729	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
730	    oa->o_flags == OBD_FL_RECREATE_OBJS) {
731		RETURN(osc_real_create(exp, oa, ea, oti));
732	}
733
734	if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
735		RETURN(osc_real_create(exp, oa, ea, oti));
736
737	/* we should not get here anymore */
738	LBUG();
739
740	RETURN(rc);
741}
742
743/* Destroy requests can be async always on the client, and we don't even really
744 * care about the return code since the client cannot do anything at all about
745 * a destroy failure.
746 * When the MDS is unlinking a filename, it saves the file objects into a
747 * recovery llog, and these object records are cancelled when the OST reports
748 * they were destroyed and sync'd to disk (i.e. transaction committed).
749 * If the client dies, or the OST is down when the object should be destroyed,
750 * the records are not cancelled, and when the OST reconnects to the MDS next,
751 * it will retrieve the llog unlink logs and then sends the log cancellation
752 * cookies to the MDS after committing destroy transactions. */
753static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
754		       struct obdo *oa, struct lov_stripe_md *ea,
755		       struct obd_trans_info *oti, struct obd_export *md_export,
756		       void *capa)
757{
758	struct client_obd     *cli = &exp->exp_obd->u.cli;
759	struct ptlrpc_request *req;
760	struct ost_body       *body;
761	LIST_HEAD(cancels);
762	int rc, count;
763	ENTRY;
764
765	if (!oa) {
766		CDEBUG(D_INFO, "oa NULL\n");
767		RETURN(-EINVAL);
768	}
769
770	count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771					LDLM_FL_DISCARD_DATA);
772
773	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
774	if (req == NULL) {
775		ldlm_lock_list_put(&cancels, l_bl_ast, count);
776		RETURN(-ENOMEM);
777	}
778
779	osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780	rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
781			       0, &cancels, count);
782	if (rc) {
783		ptlrpc_request_free(req);
784		RETURN(rc);
785	}
786
787	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788	ptlrpc_at_set_req_timeout(req);
789
790	if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791		oa->o_lcookie = *oti->oti_logcookies;
792	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
793	LASSERT(body);
794	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
795
796	osc_pack_capa(req, body, (struct obd_capa *)capa);
797	ptlrpc_request_set_replen(req);
798
799	/* If osc_destory is for destroying the unlink orphan,
800	 * sent from MDT to OST, which should not be blocked here,
801	 * because the process might be triggered by ptlrpcd, and
802	 * it is not good to block ptlrpcd thread (b=16006)*/
803	if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804		req->rq_interpret_reply = osc_destroy_interpret;
805		if (!osc_can_send_destroy(cli)) {
806			struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
807							  NULL);
808
809			/*
810			 * Wait until the number of on-going destroy RPCs drops
811			 * under max_rpc_in_flight
812			 */
813			l_wait_event_exclusive(cli->cl_destroy_waitq,
814					       osc_can_send_destroy(cli), &lwi);
815		}
816	}
817
818	/* Do not wait for response */
819	ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
820	RETURN(0);
821}
822
823static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
824				long writing_bytes)
825{
826	obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
827
828	LASSERT(!(oa->o_valid & bits));
829
830	oa->o_valid |= bits;
831	client_obd_list_lock(&cli->cl_loi_list_lock);
832	oa->o_dirty = cli->cl_dirty;
833	if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834		     cli->cl_dirty_max)) {
835		CERROR("dirty %lu - %lu > dirty_max %lu\n",
836		       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
837		oa->o_undirty = 0;
838	} else if (unlikely(atomic_read(&obd_dirty_pages) -
839			    atomic_read(&obd_dirty_transit_pages) >
840			    (long)(obd_max_dirty_pages + 1))) {
841		/* The atomic_read() allowing the atomic_inc() are
842		 * not covered by a lock thus they may safely race and trip
843		 * this CERROR() unless we add in a small fudge factor (+1). */
844		CERROR("dirty %d - %d > system dirty_max %d\n",
845		       atomic_read(&obd_dirty_pages),
846		       atomic_read(&obd_dirty_transit_pages),
847		       obd_max_dirty_pages);
848		oa->o_undirty = 0;
849	} else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850		CERROR("dirty %lu - dirty_max %lu too big???\n",
851		       cli->cl_dirty, cli->cl_dirty_max);
852		oa->o_undirty = 0;
853	} else {
854		long max_in_flight = (cli->cl_max_pages_per_rpc <<
855				      PAGE_CACHE_SHIFT)*
856				     (cli->cl_max_rpcs_in_flight + 1);
857		oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
858	}
859	oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860	oa->o_dropped = cli->cl_lost_grant;
861	cli->cl_lost_grant = 0;
862	client_obd_list_unlock(&cli->cl_loi_list_lock);
863	CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
864	       oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
865
866}
867
868void osc_update_next_shrink(struct client_obd *cli)
869{
870	cli->cl_next_shrink_grant =
871		cfs_time_shift(cli->cl_grant_shrink_interval);
872	CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873	       cli->cl_next_shrink_grant);
874}
875
876static void __osc_update_grant(struct client_obd *cli, obd_size grant)
877{
878	client_obd_list_lock(&cli->cl_loi_list_lock);
879	cli->cl_avail_grant += grant;
880	client_obd_list_unlock(&cli->cl_loi_list_lock);
881}
882
883static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
884{
885	if (body->oa.o_valid & OBD_MD_FLGRANT) {
886		CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
887		__osc_update_grant(cli, body->oa.o_grant);
888	}
889}
890
891static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892			      obd_count keylen, void *key, obd_count vallen,
893			      void *val, struct ptlrpc_request_set *set);
894
895static int osc_shrink_grant_interpret(const struct lu_env *env,
896				      struct ptlrpc_request *req,
897				      void *aa, int rc)
898{
899	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900	struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
901	struct ost_body *body;
902
903	if (rc != 0) {
904		__osc_update_grant(cli, oa->o_grant);
905		GOTO(out, rc);
906	}
907
908	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
909	LASSERT(body);
910	osc_update_grant(cli, body);
911out:
912	OBDO_FREE(oa);
913	return rc;
914}
915
916static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
917{
918	client_obd_list_lock(&cli->cl_loi_list_lock);
919	oa->o_grant = cli->cl_avail_grant / 4;
920	cli->cl_avail_grant -= oa->o_grant;
921	client_obd_list_unlock(&cli->cl_loi_list_lock);
922	if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923		oa->o_valid |= OBD_MD_FLFLAGS;
924		oa->o_flags = 0;
925	}
926	oa->o_flags |= OBD_FL_SHRINK_GRANT;
927	osc_update_next_shrink(cli);
928}
929
930/* Shrink the current grant, either from some large amount to enough for a
931 * full set of in-flight RPCs, or if we have already shrunk to that limit
932 * then to enough for a single RPC.  This avoids keeping more grant than
933 * needed, and avoids shrinking the grant piecemeal. */
934static int osc_shrink_grant(struct client_obd *cli)
935{
936	__u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937			     (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
938
939	client_obd_list_lock(&cli->cl_loi_list_lock);
940	if (cli->cl_avail_grant <= target_bytes)
941		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942	client_obd_list_unlock(&cli->cl_loi_list_lock);
943
944	return osc_shrink_grant_to_target(cli, target_bytes);
945}
946
947int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
948{
949	int			rc = 0;
950	struct ost_body	*body;
951	ENTRY;
952
953	client_obd_list_lock(&cli->cl_loi_list_lock);
954	/* Don't shrink if we are already above or below the desired limit
955	 * We don't want to shrink below a single RPC, as that will negatively
956	 * impact block allocation and long-term performance. */
957	if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
958		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
959
960	if (target_bytes >= cli->cl_avail_grant) {
961		client_obd_list_unlock(&cli->cl_loi_list_lock);
962		RETURN(0);
963	}
964	client_obd_list_unlock(&cli->cl_loi_list_lock);
965
966	OBD_ALLOC_PTR(body);
967	if (!body)
968		RETURN(-ENOMEM);
969
970	osc_announce_cached(cli, &body->oa, 0);
971
972	client_obd_list_lock(&cli->cl_loi_list_lock);
973	body->oa.o_grant = cli->cl_avail_grant - target_bytes;
974	cli->cl_avail_grant = target_bytes;
975	client_obd_list_unlock(&cli->cl_loi_list_lock);
976	if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
977		body->oa.o_valid |= OBD_MD_FLFLAGS;
978		body->oa.o_flags = 0;
979	}
980	body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
981	osc_update_next_shrink(cli);
982
983	rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
984				sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
985				sizeof(*body), body, NULL);
986	if (rc != 0)
987		__osc_update_grant(cli, body->oa.o_grant);
988	OBD_FREE_PTR(body);
989	RETURN(rc);
990}
991
992static int osc_should_shrink_grant(struct client_obd *client)
993{
994	cfs_time_t time = cfs_time_current();
995	cfs_time_t next_shrink = client->cl_next_shrink_grant;
996
997	if ((client->cl_import->imp_connect_data.ocd_connect_flags &
998	     OBD_CONNECT_GRANT_SHRINK) == 0)
999		return 0;
1000
1001	if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1002		/* Get the current RPC size directly, instead of going via:
1003		 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1004		 * Keep comment here so that it can be found by searching. */
1005		int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1006
1007		if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1008		    client->cl_avail_grant > brw_size)
1009			return 1;
1010		else
1011			osc_update_next_shrink(client);
1012	}
1013	return 0;
1014}
1015
1016static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1017{
1018	struct client_obd *client;
1019
1020	list_for_each_entry(client, &item->ti_obd_list,
1021				cl_grant_shrink_list) {
1022		if (osc_should_shrink_grant(client))
1023			osc_shrink_grant(client);
1024	}
1025	return 0;
1026}
1027
1028static int osc_add_shrink_grant(struct client_obd *client)
1029{
1030	int rc;
1031
1032	rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1033				       TIMEOUT_GRANT,
1034				       osc_grant_shrink_grant_cb, NULL,
1035				       &client->cl_grant_shrink_list);
1036	if (rc) {
1037		CERROR("add grant client %s error %d\n",
1038			client->cl_import->imp_obd->obd_name, rc);
1039		return rc;
1040	}
1041	CDEBUG(D_CACHE, "add grant client %s \n",
1042	       client->cl_import->imp_obd->obd_name);
1043	osc_update_next_shrink(client);
1044	return 0;
1045}
1046
1047static int osc_del_shrink_grant(struct client_obd *client)
1048{
1049	return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1050					 TIMEOUT_GRANT);
1051}
1052
1053static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1054{
1055	/*
1056	 * ocd_grant is the total grant amount we're expect to hold: if we've
1057	 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1058	 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1059	 *
1060	 * race is tolerable here: if we're evicted, but imp_state already
1061	 * left EVICTED state, then cl_dirty must be 0 already.
1062	 */
1063	client_obd_list_lock(&cli->cl_loi_list_lock);
1064	if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1065		cli->cl_avail_grant = ocd->ocd_grant;
1066	else
1067		cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1068
1069	if (cli->cl_avail_grant < 0) {
1070		CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1071		      cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1072		      ocd->ocd_grant, cli->cl_dirty);
1073		/* workaround for servers which do not have the patch from
1074		 * LU-2679 */
1075		cli->cl_avail_grant = ocd->ocd_grant;
1076	}
1077
1078	/* determine the appropriate chunk size used by osc_extent. */
1079	cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1080	client_obd_list_unlock(&cli->cl_loi_list_lock);
1081
1082	CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1083		"chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1084		cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1085
1086	if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1087	    list_empty(&cli->cl_grant_shrink_list))
1088		osc_add_shrink_grant(cli);
1089}
1090
1091/* We assume that the reason this OSC got a short read is because it read
1092 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094 * this stripe never got written at or beyond this stripe offset yet. */
1095static void handle_short_read(int nob_read, obd_count page_count,
1096			      struct brw_page **pga)
1097{
1098	char *ptr;
1099	int i = 0;
1100
1101	/* skip bytes read OK */
1102	while (nob_read > 0) {
1103		LASSERT (page_count > 0);
1104
1105		if (pga[i]->count > nob_read) {
1106			/* EOF inside this page */
1107			ptr = kmap(pga[i]->pg) +
1108				(pga[i]->off & ~CFS_PAGE_MASK);
1109			memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1110			kunmap(pga[i]->pg);
1111			page_count--;
1112			i++;
1113			break;
1114		}
1115
1116		nob_read -= pga[i]->count;
1117		page_count--;
1118		i++;
1119	}
1120
1121	/* zero remaining pages */
1122	while (page_count-- > 0) {
1123		ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1124		memset(ptr, 0, pga[i]->count);
1125		kunmap(pga[i]->pg);
1126		i++;
1127	}
1128}
1129
1130static int check_write_rcs(struct ptlrpc_request *req,
1131			   int requested_nob, int niocount,
1132			   obd_count page_count, struct brw_page **pga)
1133{
1134	int     i;
1135	__u32   *remote_rcs;
1136
1137	remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1138						  sizeof(*remote_rcs) *
1139						  niocount);
1140	if (remote_rcs == NULL) {
1141		CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1142		return(-EPROTO);
1143	}
1144
1145	/* return error if any niobuf was in error */
1146	for (i = 0; i < niocount; i++) {
1147		if ((int)remote_rcs[i] < 0)
1148			return(remote_rcs[i]);
1149
1150		if (remote_rcs[i] != 0) {
1151			CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1152				i, remote_rcs[i], req);
1153			return(-EPROTO);
1154		}
1155	}
1156
1157	if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1158		CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1159		       req->rq_bulk->bd_nob_transferred, requested_nob);
1160		return(-EPROTO);
1161	}
1162
1163	return (0);
1164}
1165
1166static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1167{
1168	if (p1->flag != p2->flag) {
1169		unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1170				  OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1171
1172		/* warn if we try to combine flags that we don't know to be
1173		 * safe to combine */
1174		if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1175			CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1176			      "report this at http://bugs.whamcloud.com/\n",
1177			      p1->flag, p2->flag);
1178		}
1179		return 0;
1180	}
1181
1182	return (p1->off + p1->count == p2->off);
1183}
1184
1185static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1186				   struct brw_page **pga, int opc,
1187				   cksum_type_t cksum_type)
1188{
1189	__u32				cksum;
1190	int				i = 0;
1191	struct cfs_crypto_hash_desc	*hdesc;
1192	unsigned int			bufsize;
1193	int				err;
1194	unsigned char			cfs_alg = cksum_obd2cfs(cksum_type);
1195
1196	LASSERT(pg_count > 0);
1197
1198	hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1199	if (IS_ERR(hdesc)) {
1200		CERROR("Unable to initialize checksum hash %s\n",
1201		       cfs_crypto_hash_name(cfs_alg));
1202		return PTR_ERR(hdesc);
1203	}
1204
1205	while (nob > 0 && pg_count > 0) {
1206		int count = pga[i]->count > nob ? nob : pga[i]->count;
1207
1208		/* corrupt the data before we compute the checksum, to
1209		 * simulate an OST->client data error */
1210		if (i == 0 && opc == OST_READ &&
1211		    OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1212			unsigned char *ptr = kmap(pga[i]->pg);
1213			int off = pga[i]->off & ~CFS_PAGE_MASK;
1214			memcpy(ptr + off, "bad1", min(4, nob));
1215			kunmap(pga[i]->pg);
1216		}
1217		cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1218				  pga[i]->off & ~CFS_PAGE_MASK,
1219				  count);
1220		LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1221			       (int)(pga[i]->off & ~CFS_PAGE_MASK));
1222
1223		nob -= pga[i]->count;
1224		pg_count--;
1225		i++;
1226	}
1227
1228	bufsize = 4;
1229	err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1230
1231	if (err)
1232		cfs_crypto_hash_final(hdesc, NULL, NULL);
1233
1234	/* For sending we only compute the wrong checksum instead
1235	 * of corrupting the data so it is still correct on a redo */
1236	if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1237		cksum++;
1238
1239	return cksum;
1240}
1241
1242static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1243				struct lov_stripe_md *lsm, obd_count page_count,
1244				struct brw_page **pga,
1245				struct ptlrpc_request **reqp,
1246				struct obd_capa *ocapa, int reserve,
1247				int resend)
1248{
1249	struct ptlrpc_request   *req;
1250	struct ptlrpc_bulk_desc *desc;
1251	struct ost_body	 *body;
1252	struct obd_ioobj	*ioobj;
1253	struct niobuf_remote    *niobuf;
1254	int niocount, i, requested_nob, opc, rc;
1255	struct osc_brw_async_args *aa;
1256	struct req_capsule      *pill;
1257	struct brw_page *pg_prev;
1258
1259	ENTRY;
1260	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1261		RETURN(-ENOMEM); /* Recoverable */
1262	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1263		RETURN(-EINVAL); /* Fatal */
1264
1265	if ((cmd & OBD_BRW_WRITE) != 0) {
1266		opc = OST_WRITE;
1267		req = ptlrpc_request_alloc_pool(cli->cl_import,
1268						cli->cl_import->imp_rq_pool,
1269						&RQF_OST_BRW_WRITE);
1270	} else {
1271		opc = OST_READ;
1272		req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1273	}
1274	if (req == NULL)
1275		RETURN(-ENOMEM);
1276
1277	for (niocount = i = 1; i < page_count; i++) {
1278		if (!can_merge_pages(pga[i - 1], pga[i]))
1279			niocount++;
1280	}
1281
1282	pill = &req->rq_pill;
1283	req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1284			     sizeof(*ioobj));
1285	req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1286			     niocount * sizeof(*niobuf));
1287	osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1288
1289	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1290	if (rc) {
1291		ptlrpc_request_free(req);
1292		RETURN(rc);
1293	}
1294	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1295	ptlrpc_at_set_req_timeout(req);
1296	/* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1297	 * retry logic */
1298	req->rq_no_retry_einprogress = 1;
1299
1300	desc = ptlrpc_prep_bulk_imp(req, page_count,
1301		cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1302		opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1303		OST_BULK_PORTAL);
1304
1305	if (desc == NULL)
1306		GOTO(out, rc = -ENOMEM);
1307	/* NB request now owns desc and will free it when it gets freed */
1308
1309	body = req_capsule_client_get(pill, &RMF_OST_BODY);
1310	ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1311	niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312	LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1313
1314	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1315
1316	obdo_to_ioobj(oa, ioobj);
1317	ioobj->ioo_bufcnt = niocount;
1318	/* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1319	 * that might be send for this request.  The actual number is decided
1320	 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1321	 * "max - 1" for old client compatibility sending "0", and also so the
1322	 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1323	ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1324	osc_pack_capa(req, body, ocapa);
1325	LASSERT(page_count > 0);
1326	pg_prev = pga[0];
1327	for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1328		struct brw_page *pg = pga[i];
1329		int poff = pg->off & ~CFS_PAGE_MASK;
1330
1331		LASSERT(pg->count > 0);
1332		/* make sure there is no gap in the middle of page array */
1333		LASSERTF(page_count == 1 ||
1334			 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1335			  ergo(i > 0 && i < page_count - 1,
1336			       poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1337			  ergo(i == page_count - 1, poff == 0)),
1338			 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1339			 i, page_count, pg, pg->off, pg->count);
1340		LASSERTF(i == 0 || pg->off > pg_prev->off,
1341			 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1342			 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1343			 i, page_count,
1344			 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1345			 pg_prev->pg, page_private(pg_prev->pg),
1346			 pg_prev->pg->index, pg_prev->off);
1347		LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1348			(pg->flag & OBD_BRW_SRVLOCK));
1349
1350		ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1351		requested_nob += pg->count;
1352
1353		if (i > 0 && can_merge_pages(pg_prev, pg)) {
1354			niobuf--;
1355			niobuf->len += pg->count;
1356		} else {
1357			niobuf->offset = pg->off;
1358			niobuf->len    = pg->count;
1359			niobuf->flags  = pg->flag;
1360		}
1361		pg_prev = pg;
1362	}
1363
1364	LASSERTF((void *)(niobuf - niocount) ==
1365		req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1366		"want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1367		&RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1368
1369	osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1370	if (resend) {
1371		if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1372			body->oa.o_valid |= OBD_MD_FLFLAGS;
1373			body->oa.o_flags = 0;
1374		}
1375		body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1376	}
1377
1378	if (osc_should_shrink_grant(cli))
1379		osc_shrink_grant_local(cli, &body->oa);
1380
1381	/* size[REQ_REC_OFF] still sizeof (*body) */
1382	if (opc == OST_WRITE) {
1383		if (cli->cl_checksum &&
1384		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1385			/* store cl_cksum_type in a local variable since
1386			 * it can be changed via lprocfs */
1387			cksum_type_t cksum_type = cli->cl_cksum_type;
1388
1389			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1390				oa->o_flags &= OBD_FL_LOCAL_MASK;
1391				body->oa.o_flags = 0;
1392			}
1393			body->oa.o_flags |= cksum_type_pack(cksum_type);
1394			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1395			body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1396							     page_count, pga,
1397							     OST_WRITE,
1398							     cksum_type);
1399			CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1400			       body->oa.o_cksum);
1401			/* save this in 'oa', too, for later checking */
1402			oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403			oa->o_flags |= cksum_type_pack(cksum_type);
1404		} else {
1405			/* clear out the checksum flag, in case this is a
1406			 * resend but cl_checksum is no longer set. b=11238 */
1407			oa->o_valid &= ~OBD_MD_FLCKSUM;
1408		}
1409		oa->o_cksum = body->oa.o_cksum;
1410		/* 1 RC per niobuf */
1411		req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1412				     sizeof(__u32) * niocount);
1413	} else {
1414		if (cli->cl_checksum &&
1415		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1416			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1417				body->oa.o_flags = 0;
1418			body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1419			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1420		}
1421	}
1422	ptlrpc_request_set_replen(req);
1423
1424	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1425	aa = ptlrpc_req_async_args(req);
1426	aa->aa_oa = oa;
1427	aa->aa_requested_nob = requested_nob;
1428	aa->aa_nio_count = niocount;
1429	aa->aa_page_count = page_count;
1430	aa->aa_resends = 0;
1431	aa->aa_ppga = pga;
1432	aa->aa_cli = cli;
1433	INIT_LIST_HEAD(&aa->aa_oaps);
1434	if (ocapa && reserve)
1435		aa->aa_ocapa = capa_get(ocapa);
1436
1437	*reqp = req;
1438	RETURN(0);
1439
1440 out:
1441	ptlrpc_req_finished(req);
1442	RETURN(rc);
1443}
1444
1445static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1446				__u32 client_cksum, __u32 server_cksum, int nob,
1447				obd_count page_count, struct brw_page **pga,
1448				cksum_type_t client_cksum_type)
1449{
1450	__u32 new_cksum;
1451	char *msg;
1452	cksum_type_t cksum_type;
1453
1454	if (server_cksum == client_cksum) {
1455		CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1456		return 0;
1457	}
1458
1459	cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1460				       oa->o_flags : 0);
1461	new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1462				      cksum_type);
1463
1464	if (cksum_type != client_cksum_type)
1465		msg = "the server did not use the checksum type specified in "
1466		      "the original request - likely a protocol problem";
1467	else if (new_cksum == server_cksum)
1468		msg = "changed on the client after we checksummed it - "
1469		      "likely false positive due to mmap IO (bug 11742)";
1470	else if (new_cksum == client_cksum)
1471		msg = "changed in transit before arrival at OST";
1472	else
1473		msg = "changed in transit AND doesn't match the original - "
1474		      "likely false positive due to mmap IO (bug 11742)";
1475
1476	LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1477			   " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1478			   msg, libcfs_nid2str(peer->nid),
1479			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1480			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1481			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1482			   POSTID(&oa->o_oi), pga[0]->off,
1483			   pga[page_count-1]->off + pga[page_count-1]->count - 1);
1484	CERROR("original client csum %x (type %x), server csum %x (type %x), "
1485	       "client csum now %x\n", client_cksum, client_cksum_type,
1486	       server_cksum, cksum_type, new_cksum);
1487	return 1;
1488}
1489
1490/* Note rc enters this function as number of bytes transferred */
1491static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1492{
1493	struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1494	const lnet_process_id_t *peer =
1495			&req->rq_import->imp_connection->c_peer;
1496	struct client_obd *cli = aa->aa_cli;
1497	struct ost_body *body;
1498	__u32 client_cksum = 0;
1499	ENTRY;
1500
1501	if (rc < 0 && rc != -EDQUOT) {
1502		DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1503		RETURN(rc);
1504	}
1505
1506	LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1507	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1508	if (body == NULL) {
1509		DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1510		RETURN(-EPROTO);
1511	}
1512
1513	/* set/clear over quota flag for a uid/gid */
1514	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1515	    body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1516		unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1517
1518		CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1519		       body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1520		       body->oa.o_flags);
1521		osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1522	}
1523
1524	osc_update_grant(cli, body);
1525
1526	if (rc < 0)
1527		RETURN(rc);
1528
1529	if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1530		client_cksum = aa->aa_oa->o_cksum; /* save for later */
1531
1532	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1533		if (rc > 0) {
1534			CERROR("Unexpected +ve rc %d\n", rc);
1535			RETURN(-EPROTO);
1536		}
1537		LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1538
1539		if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1540			RETURN(-EAGAIN);
1541
1542		if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1543		    check_write_checksum(&body->oa, peer, client_cksum,
1544					 body->oa.o_cksum, aa->aa_requested_nob,
1545					 aa->aa_page_count, aa->aa_ppga,
1546					 cksum_type_unpack(aa->aa_oa->o_flags)))
1547			RETURN(-EAGAIN);
1548
1549		rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1550				     aa->aa_page_count, aa->aa_ppga);
1551		GOTO(out, rc);
1552	}
1553
1554	/* The rest of this function executes only for OST_READs */
1555
1556	/* if unwrap_bulk failed, return -EAGAIN to retry */
1557	rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1558	if (rc < 0)
1559		GOTO(out, rc = -EAGAIN);
1560
1561	if (rc > aa->aa_requested_nob) {
1562		CERROR("Unexpected rc %d (%d requested)\n", rc,
1563		       aa->aa_requested_nob);
1564		RETURN(-EPROTO);
1565	}
1566
1567	if (rc != req->rq_bulk->bd_nob_transferred) {
1568		CERROR ("Unexpected rc %d (%d transferred)\n",
1569			rc, req->rq_bulk->bd_nob_transferred);
1570		return (-EPROTO);
1571	}
1572
1573	if (rc < aa->aa_requested_nob)
1574		handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1575
1576	if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1577		static int cksum_counter;
1578		__u32      server_cksum = body->oa.o_cksum;
1579		char      *via;
1580		char      *router;
1581		cksum_type_t cksum_type;
1582
1583		cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1584					       body->oa.o_flags : 0);
1585		client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1586						 aa->aa_ppga, OST_READ,
1587						 cksum_type);
1588
1589		if (peer->nid == req->rq_bulk->bd_sender) {
1590			via = router = "";
1591		} else {
1592			via = " via ";
1593			router = libcfs_nid2str(req->rq_bulk->bd_sender);
1594		}
1595
1596		if (server_cksum == ~0 && rc > 0) {
1597			CERROR("Protocol error: server %s set the 'checksum' "
1598			       "bit, but didn't send a checksum.  Not fatal, "
1599			       "but please notify on http://bugs.whamcloud.com/\n",
1600			       libcfs_nid2str(peer->nid));
1601		} else if (server_cksum != client_cksum) {
1602			LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1603					   "%s%s%s inode "DFID" object "DOSTID
1604					   " extent ["LPU64"-"LPU64"]\n",
1605					   req->rq_import->imp_obd->obd_name,
1606					   libcfs_nid2str(peer->nid),
1607					   via, router,
1608					   body->oa.o_valid & OBD_MD_FLFID ?
1609						body->oa.o_parent_seq : (__u64)0,
1610					   body->oa.o_valid & OBD_MD_FLFID ?
1611						body->oa.o_parent_oid : 0,
1612					   body->oa.o_valid & OBD_MD_FLFID ?
1613						body->oa.o_parent_ver : 0,
1614					   POSTID(&body->oa.o_oi),
1615					   aa->aa_ppga[0]->off,
1616					   aa->aa_ppga[aa->aa_page_count-1]->off +
1617					   aa->aa_ppga[aa->aa_page_count-1]->count -
1618									1);
1619			CERROR("client %x, server %x, cksum_type %x\n",
1620			       client_cksum, server_cksum, cksum_type);
1621			cksum_counter = 0;
1622			aa->aa_oa->o_cksum = client_cksum;
1623			rc = -EAGAIN;
1624		} else {
1625			cksum_counter++;
1626			CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1627			rc = 0;
1628		}
1629	} else if (unlikely(client_cksum)) {
1630		static int cksum_missed;
1631
1632		cksum_missed++;
1633		if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1634			CERROR("Checksum %u requested from %s but not sent\n",
1635			       cksum_missed, libcfs_nid2str(peer->nid));
1636	} else {
1637		rc = 0;
1638	}
1639out:
1640	if (rc >= 0)
1641		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1642				     aa->aa_oa, &body->oa);
1643
1644	RETURN(rc);
1645}
1646
1647static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1648			    struct lov_stripe_md *lsm,
1649			    obd_count page_count, struct brw_page **pga,
1650			    struct obd_capa *ocapa)
1651{
1652	struct ptlrpc_request *req;
1653	int		    rc;
1654	wait_queue_head_t	    waitq;
1655	int		    generation, resends = 0;
1656	struct l_wait_info     lwi;
1657
1658	ENTRY;
1659
1660	init_waitqueue_head(&waitq);
1661	generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1662
1663restart_bulk:
1664	rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1665				  page_count, pga, &req, ocapa, 0, resends);
1666	if (rc != 0)
1667		return (rc);
1668
1669	if (resends) {
1670		req->rq_generation_set = 1;
1671		req->rq_import_generation = generation;
1672		req->rq_sent = cfs_time_current_sec() + resends;
1673	}
1674
1675	rc = ptlrpc_queue_wait(req);
1676
1677	if (rc == -ETIMEDOUT && req->rq_resend) {
1678		DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1679		ptlrpc_req_finished(req);
1680		goto restart_bulk;
1681	}
1682
1683	rc = osc_brw_fini_request(req, rc);
1684
1685	ptlrpc_req_finished(req);
1686	/* When server return -EINPROGRESS, client should always retry
1687	 * regardless of the number of times the bulk was resent already.*/
1688	if (osc_recoverable_error(rc)) {
1689		resends++;
1690		if (rc != -EINPROGRESS &&
1691		    !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1692			CERROR("%s: too many resend retries for object: "
1693			       ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1694			       POSTID(&oa->o_oi), rc);
1695			goto out;
1696		}
1697		if (generation !=
1698		    exp->exp_obd->u.cli.cl_import->imp_generation) {
1699			CDEBUG(D_HA, "%s: resend cross eviction for object: "
1700			       ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1701			       POSTID(&oa->o_oi), rc);
1702			goto out;
1703		}
1704
1705		lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1706				       NULL);
1707		l_wait_event(waitq, 0, &lwi);
1708
1709		goto restart_bulk;
1710	}
1711out:
1712	if (rc == -EAGAIN || rc == -EINPROGRESS)
1713		rc = -EIO;
1714	RETURN (rc);
1715}
1716
1717static int osc_brw_redo_request(struct ptlrpc_request *request,
1718				struct osc_brw_async_args *aa, int rc)
1719{
1720	struct ptlrpc_request *new_req;
1721	struct osc_brw_async_args *new_aa;
1722	struct osc_async_page *oap;
1723	ENTRY;
1724
1725	DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1726		  "redo for recoverable error %d", rc);
1727
1728	rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1729					OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1730				  aa->aa_cli, aa->aa_oa,
1731				  NULL /* lsm unused by osc currently */,
1732				  aa->aa_page_count, aa->aa_ppga,
1733				  &new_req, aa->aa_ocapa, 0, 1);
1734	if (rc)
1735		RETURN(rc);
1736
1737	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1738		if (oap->oap_request != NULL) {
1739			LASSERTF(request == oap->oap_request,
1740				 "request %p != oap_request %p\n",
1741				 request, oap->oap_request);
1742			if (oap->oap_interrupted) {
1743				ptlrpc_req_finished(new_req);
1744				RETURN(-EINTR);
1745			}
1746		}
1747	}
1748	/* New request takes over pga and oaps from old request.
1749	 * Note that copying a list_head doesn't work, need to move it... */
1750	aa->aa_resends++;
1751	new_req->rq_interpret_reply = request->rq_interpret_reply;
1752	new_req->rq_async_args = request->rq_async_args;
1753	/* cap resend delay to the current request timeout, this is similar to
1754	 * what ptlrpc does (see after_reply()) */
1755	if (aa->aa_resends > new_req->rq_timeout)
1756		new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1757	else
1758		new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1759	new_req->rq_generation_set = 1;
1760	new_req->rq_import_generation = request->rq_import_generation;
1761
1762	new_aa = ptlrpc_req_async_args(new_req);
1763
1764	INIT_LIST_HEAD(&new_aa->aa_oaps);
1765	list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1766	INIT_LIST_HEAD(&new_aa->aa_exts);
1767	list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1768	new_aa->aa_resends = aa->aa_resends;
1769
1770	list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1771		if (oap->oap_request) {
1772			ptlrpc_req_finished(oap->oap_request);
1773			oap->oap_request = ptlrpc_request_addref(new_req);
1774		}
1775	}
1776
1777	new_aa->aa_ocapa = aa->aa_ocapa;
1778	aa->aa_ocapa = NULL;
1779
1780	/* XXX: This code will run into problem if we're going to support
1781	 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1782	 * and wait for all of them to be finished. We should inherit request
1783	 * set from old request. */
1784	ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1785
1786	DEBUG_REQ(D_INFO, new_req, "new request");
1787	RETURN(0);
1788}
1789
1790/*
1791 * ugh, we want disk allocation on the target to happen in offset order.  we'll
1792 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1793 * fine for our small page arrays and doesn't require allocation.  its an
1794 * insertion sort that swaps elements that are strides apart, shrinking the
1795 * stride down until its '1' and the array is sorted.
1796 */
1797static void sort_brw_pages(struct brw_page **array, int num)
1798{
1799	int stride, i, j;
1800	struct brw_page *tmp;
1801
1802	if (num == 1)
1803		return;
1804	for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1805		;
1806
1807	do {
1808		stride /= 3;
1809		for (i = stride ; i < num ; i++) {
1810			tmp = array[i];
1811			j = i;
1812			while (j >= stride && array[j - stride]->off > tmp->off) {
1813				array[j] = array[j - stride];
1814				j -= stride;
1815			}
1816			array[j] = tmp;
1817		}
1818	} while (stride > 1);
1819}
1820
1821static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1822{
1823	int count = 1;
1824	int offset;
1825	int i = 0;
1826
1827	LASSERT (pages > 0);
1828	offset = pg[i]->off & ~CFS_PAGE_MASK;
1829
1830	for (;;) {
1831		pages--;
1832		if (pages == 0)	 /* that's all */
1833			return count;
1834
1835		if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1836			return count;   /* doesn't end on page boundary */
1837
1838		i++;
1839		offset = pg[i]->off & ~CFS_PAGE_MASK;
1840		if (offset != 0)	/* doesn't start on page boundary */
1841			return count;
1842
1843		count++;
1844	}
1845}
1846
1847static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1848{
1849	struct brw_page **ppga;
1850	int i;
1851
1852	OBD_ALLOC(ppga, sizeof(*ppga) * count);
1853	if (ppga == NULL)
1854		return NULL;
1855
1856	for (i = 0; i < count; i++)
1857		ppga[i] = pga + i;
1858	return ppga;
1859}
1860
1861static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1862{
1863	LASSERT(ppga != NULL);
1864	OBD_FREE(ppga, sizeof(*ppga) * count);
1865}
1866
1867static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1868		   obd_count page_count, struct brw_page *pga,
1869		   struct obd_trans_info *oti)
1870{
1871	struct obdo *saved_oa = NULL;
1872	struct brw_page **ppga, **orig;
1873	struct obd_import *imp = class_exp2cliimp(exp);
1874	struct client_obd *cli;
1875	int rc, page_count_orig;
1876	ENTRY;
1877
1878	LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1879	cli = &imp->imp_obd->u.cli;
1880
1881	if (cmd & OBD_BRW_CHECK) {
1882		/* The caller just wants to know if there's a chance that this
1883		 * I/O can succeed */
1884
1885		if (imp->imp_invalid)
1886			RETURN(-EIO);
1887		RETURN(0);
1888	}
1889
1890	/* test_brw with a failed create can trip this, maybe others. */
1891	LASSERT(cli->cl_max_pages_per_rpc);
1892
1893	rc = 0;
1894
1895	orig = ppga = osc_build_ppga(pga, page_count);
1896	if (ppga == NULL)
1897		RETURN(-ENOMEM);
1898	page_count_orig = page_count;
1899
1900	sort_brw_pages(ppga, page_count);
1901	while (page_count) {
1902		obd_count pages_per_brw;
1903
1904		if (page_count > cli->cl_max_pages_per_rpc)
1905			pages_per_brw = cli->cl_max_pages_per_rpc;
1906		else
1907			pages_per_brw = page_count;
1908
1909		pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1910
1911		if (saved_oa != NULL) {
1912			/* restore previously saved oa */
1913			*oinfo->oi_oa = *saved_oa;
1914		} else if (page_count > pages_per_brw) {
1915			/* save a copy of oa (brw will clobber it) */
1916			OBDO_ALLOC(saved_oa);
1917			if (saved_oa == NULL)
1918				GOTO(out, rc = -ENOMEM);
1919			*saved_oa = *oinfo->oi_oa;
1920		}
1921
1922		rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1923				      pages_per_brw, ppga, oinfo->oi_capa);
1924
1925		if (rc != 0)
1926			break;
1927
1928		page_count -= pages_per_brw;
1929		ppga += pages_per_brw;
1930	}
1931
1932out:
1933	osc_release_ppga(orig, page_count_orig);
1934
1935	if (saved_oa != NULL)
1936		OBDO_FREE(saved_oa);
1937
1938	RETURN(rc);
1939}
1940
1941static int brw_interpret(const struct lu_env *env,
1942			 struct ptlrpc_request *req, void *data, int rc)
1943{
1944	struct osc_brw_async_args *aa = data;
1945	struct osc_extent *ext;
1946	struct osc_extent *tmp;
1947	struct cl_object  *obj = NULL;
1948	struct client_obd *cli = aa->aa_cli;
1949	ENTRY;
1950
1951	rc = osc_brw_fini_request(req, rc);
1952	CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1953	/* When server return -EINPROGRESS, client should always retry
1954	 * regardless of the number of times the bulk was resent already. */
1955	if (osc_recoverable_error(rc)) {
1956		if (req->rq_import_generation !=
1957		    req->rq_import->imp_generation) {
1958			CDEBUG(D_HA, "%s: resend cross eviction for object: "
1959			       ""DOSTID", rc = %d.\n",
1960			       req->rq_import->imp_obd->obd_name,
1961			       POSTID(&aa->aa_oa->o_oi), rc);
1962		} else if (rc == -EINPROGRESS ||
1963		    client_should_resend(aa->aa_resends, aa->aa_cli)) {
1964			rc = osc_brw_redo_request(req, aa, rc);
1965		} else {
1966			CERROR("%s: too many resent retries for object: "
1967			       ""LPU64":"LPU64", rc = %d.\n",
1968			       req->rq_import->imp_obd->obd_name,
1969			       POSTID(&aa->aa_oa->o_oi), rc);
1970		}
1971
1972		if (rc == 0)
1973			RETURN(0);
1974		else if (rc == -EAGAIN || rc == -EINPROGRESS)
1975			rc = -EIO;
1976	}
1977
1978	if (aa->aa_ocapa) {
1979		capa_put(aa->aa_ocapa);
1980		aa->aa_ocapa = NULL;
1981	}
1982
1983	list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1984		if (obj == NULL && rc == 0) {
1985			obj = osc2cl(ext->oe_obj);
1986			cl_object_get(obj);
1987		}
1988
1989		list_del_init(&ext->oe_link);
1990		osc_extent_finish(env, ext, 1, rc);
1991	}
1992	LASSERT(list_empty(&aa->aa_exts));
1993	LASSERT(list_empty(&aa->aa_oaps));
1994
1995	if (obj != NULL) {
1996		struct obdo *oa = aa->aa_oa;
1997		struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1998		unsigned long valid = 0;
1999
2000		LASSERT(rc == 0);
2001		if (oa->o_valid & OBD_MD_FLBLOCKS) {
2002			attr->cat_blocks = oa->o_blocks;
2003			valid |= CAT_BLOCKS;
2004		}
2005		if (oa->o_valid & OBD_MD_FLMTIME) {
2006			attr->cat_mtime = oa->o_mtime;
2007			valid |= CAT_MTIME;
2008		}
2009		if (oa->o_valid & OBD_MD_FLATIME) {
2010			attr->cat_atime = oa->o_atime;
2011			valid |= CAT_ATIME;
2012		}
2013		if (oa->o_valid & OBD_MD_FLCTIME) {
2014			attr->cat_ctime = oa->o_ctime;
2015			valid |= CAT_CTIME;
2016		}
2017		if (valid != 0) {
2018			cl_object_attr_lock(obj);
2019			cl_object_attr_set(env, obj, attr, valid);
2020			cl_object_attr_unlock(obj);
2021		}
2022		cl_object_put(env, obj);
2023	}
2024	OBDO_FREE(aa->aa_oa);
2025
2026	cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2027			  req->rq_bulk->bd_nob_transferred);
2028	osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2029	ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2030
2031	client_obd_list_lock(&cli->cl_loi_list_lock);
2032	/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2033	 * is called so we know whether to go to sync BRWs or wait for more
2034	 * RPCs to complete */
2035	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2036		cli->cl_w_in_flight--;
2037	else
2038		cli->cl_r_in_flight--;
2039	osc_wake_cache_waiters(cli);
2040	client_obd_list_unlock(&cli->cl_loi_list_lock);
2041
2042	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2043	RETURN(rc);
2044}
2045
2046/**
2047 * Build an RPC by the list of extent @ext_list. The caller must ensure
2048 * that the total pages in this list are NOT over max pages per RPC.
2049 * Extents in the list must be in OES_RPC state.
2050 */
2051int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2052		  struct list_head *ext_list, int cmd, pdl_policy_t pol)
2053{
2054	struct ptlrpc_request		*req = NULL;
2055	struct osc_extent		*ext;
2056	struct brw_page			**pga = NULL;
2057	struct osc_brw_async_args	*aa = NULL;
2058	struct obdo			*oa = NULL;
2059	struct osc_async_page		*oap;
2060	struct osc_async_page		*tmp;
2061	struct cl_req			*clerq = NULL;
2062	enum cl_req_type		crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2063								      CRT_READ;
2064	struct ldlm_lock		*lock = NULL;
2065	struct cl_req_attr		*crattr = NULL;
2066	obd_off				starting_offset = OBD_OBJECT_EOF;
2067	obd_off				ending_offset = 0;
2068	int				mpflag = 0;
2069	int				mem_tight = 0;
2070	int				page_count = 0;
2071	int				i;
2072	int				rc;
2073	LIST_HEAD(rpc_list);
2074
2075	ENTRY;
2076	LASSERT(!list_empty(ext_list));
2077
2078	/* add pages into rpc_list to build BRW rpc */
2079	list_for_each_entry(ext, ext_list, oe_link) {
2080		LASSERT(ext->oe_state == OES_RPC);
2081		mem_tight |= ext->oe_memalloc;
2082		list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2083			++page_count;
2084			list_add_tail(&oap->oap_rpc_item, &rpc_list);
2085			if (starting_offset > oap->oap_obj_off)
2086				starting_offset = oap->oap_obj_off;
2087			else
2088				LASSERT(oap->oap_page_off == 0);
2089			if (ending_offset < oap->oap_obj_off + oap->oap_count)
2090				ending_offset = oap->oap_obj_off +
2091						oap->oap_count;
2092			else
2093				LASSERT(oap->oap_page_off + oap->oap_count ==
2094					PAGE_CACHE_SIZE);
2095		}
2096	}
2097
2098	if (mem_tight)
2099		mpflag = cfs_memory_pressure_get_and_set();
2100
2101	OBD_ALLOC(crattr, sizeof(*crattr));
2102	if (crattr == NULL)
2103		GOTO(out, rc = -ENOMEM);
2104
2105	OBD_ALLOC(pga, sizeof(*pga) * page_count);
2106	if (pga == NULL)
2107		GOTO(out, rc = -ENOMEM);
2108
2109	OBDO_ALLOC(oa);
2110	if (oa == NULL)
2111		GOTO(out, rc = -ENOMEM);
2112
2113	i = 0;
2114	list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2115		struct cl_page *page = oap2cl_page(oap);
2116		if (clerq == NULL) {
2117			clerq = cl_req_alloc(env, page, crt,
2118					     1 /* only 1-object rpcs for now */);
2119			if (IS_ERR(clerq))
2120				GOTO(out, rc = PTR_ERR(clerq));
2121			lock = oap->oap_ldlm_lock;
2122		}
2123		if (mem_tight)
2124			oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2125		pga[i] = &oap->oap_brw_page;
2126		pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2127		CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2128		       pga[i]->pg, page_index(oap->oap_page), oap,
2129		       pga[i]->flag);
2130		i++;
2131		cl_req_page_add(env, clerq, page);
2132	}
2133
2134	/* always get the data for the obdo for the rpc */
2135	LASSERT(clerq != NULL);
2136	crattr->cra_oa = oa;
2137	cl_req_attr_set(env, clerq, crattr, ~0ULL);
2138	if (lock) {
2139		oa->o_handle = lock->l_remote_handle;
2140		oa->o_valid |= OBD_MD_FLHANDLE;
2141	}
2142
2143	rc = cl_req_prep(env, clerq);
2144	if (rc != 0) {
2145		CERROR("cl_req_prep failed: %d\n", rc);
2146		GOTO(out, rc);
2147	}
2148
2149	sort_brw_pages(pga, page_count);
2150	rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2151			pga, &req, crattr->cra_capa, 1, 0);
2152	if (rc != 0) {
2153		CERROR("prep_req failed: %d\n", rc);
2154		GOTO(out, rc);
2155	}
2156
2157	req->rq_interpret_reply = brw_interpret;
2158
2159	if (mem_tight != 0)
2160		req->rq_memalloc = 1;
2161
2162	/* Need to update the timestamps after the request is built in case
2163	 * we race with setattr (locally or in queue at OST).  If OST gets
2164	 * later setattr before earlier BRW (as determined by the request xid),
2165	 * the OST will not use BRW timestamps.  Sadly, there is no obvious
2166	 * way to do this in a single call.  bug 10150 */
2167	cl_req_attr_set(env, clerq, crattr,
2168			OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2169
2170	lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2171
2172	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2173	aa = ptlrpc_req_async_args(req);
2174	INIT_LIST_HEAD(&aa->aa_oaps);
2175	list_splice_init(&rpc_list, &aa->aa_oaps);
2176	INIT_LIST_HEAD(&aa->aa_exts);
2177	list_splice_init(ext_list, &aa->aa_exts);
2178	aa->aa_clerq = clerq;
2179
2180	/* queued sync pages can be torn down while the pages
2181	 * were between the pending list and the rpc */
2182	tmp = NULL;
2183	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2184		/* only one oap gets a request reference */
2185		if (tmp == NULL)
2186			tmp = oap;
2187		if (oap->oap_interrupted && !req->rq_intr) {
2188			CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2189					oap, req);
2190			ptlrpc_mark_interrupted(req);
2191		}
2192	}
2193	if (tmp != NULL)
2194		tmp->oap_request = ptlrpc_request_addref(req);
2195
2196	client_obd_list_lock(&cli->cl_loi_list_lock);
2197	starting_offset >>= PAGE_CACHE_SHIFT;
2198	if (cmd == OBD_BRW_READ) {
2199		cli->cl_r_in_flight++;
2200		lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2201		lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2202		lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2203				      starting_offset + 1);
2204	} else {
2205		cli->cl_w_in_flight++;
2206		lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2207		lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2208		lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2209				      starting_offset + 1);
2210	}
2211	client_obd_list_unlock(&cli->cl_loi_list_lock);
2212
2213	DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2214		  page_count, aa, cli->cl_r_in_flight,
2215		  cli->cl_w_in_flight);
2216
2217	/* XXX: Maybe the caller can check the RPC bulk descriptor to
2218	 * see which CPU/NUMA node the majority of pages were allocated
2219	 * on, and try to assign the async RPC to the CPU core
2220	 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2221	 *
2222	 * But on the other hand, we expect that multiple ptlrpcd
2223	 * threads and the initial write sponsor can run in parallel,
2224	 * especially when data checksum is enabled, which is CPU-bound
2225	 * operation and single ptlrpcd thread cannot process in time.
2226	 * So more ptlrpcd threads sharing BRW load
2227	 * (with PDL_POLICY_ROUND) seems better.
2228	 */
2229	ptlrpcd_add_req(req, pol, -1);
2230	rc = 0;
2231	EXIT;
2232
2233out:
2234	if (mem_tight != 0)
2235		cfs_memory_pressure_restore(mpflag);
2236
2237	if (crattr != NULL) {
2238		capa_put(crattr->cra_capa);
2239		OBD_FREE(crattr, sizeof(*crattr));
2240	}
2241
2242	if (rc != 0) {
2243		LASSERT(req == NULL);
2244
2245		if (oa)
2246			OBDO_FREE(oa);
2247		if (pga)
2248			OBD_FREE(pga, sizeof(*pga) * page_count);
2249		/* this should happen rarely and is pretty bad, it makes the
2250		 * pending list not follow the dirty order */
2251		while (!list_empty(ext_list)) {
2252			ext = list_entry(ext_list->next, struct osc_extent,
2253					     oe_link);
2254			list_del_init(&ext->oe_link);
2255			osc_extent_finish(env, ext, 0, rc);
2256		}
2257		if (clerq && !IS_ERR(clerq))
2258			cl_req_completion(env, clerq, rc);
2259	}
2260	RETURN(rc);
2261}
2262
2263static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2264					struct ldlm_enqueue_info *einfo)
2265{
2266	void *data = einfo->ei_cbdata;
2267	int set = 0;
2268
2269	LASSERT(lock != NULL);
2270	LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2271	LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2272	LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2273	LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2274
2275	lock_res_and_lock(lock);
2276	spin_lock(&osc_ast_guard);
2277
2278	if (lock->l_ast_data == NULL)
2279		lock->l_ast_data = data;
2280	if (lock->l_ast_data == data)
2281		set = 1;
2282
2283	spin_unlock(&osc_ast_guard);
2284	unlock_res_and_lock(lock);
2285
2286	return set;
2287}
2288
2289static int osc_set_data_with_check(struct lustre_handle *lockh,
2290				   struct ldlm_enqueue_info *einfo)
2291{
2292	struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2293	int set = 0;
2294
2295	if (lock != NULL) {
2296		set = osc_set_lock_data_with_check(lock, einfo);
2297		LDLM_LOCK_PUT(lock);
2298	} else
2299		CERROR("lockh %p, data %p - client evicted?\n",
2300		       lockh, einfo->ei_cbdata);
2301	return set;
2302}
2303
2304static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2305			     ldlm_iterator_t replace, void *data)
2306{
2307	struct ldlm_res_id res_id;
2308	struct obd_device *obd = class_exp2obd(exp);
2309
2310	ostid_build_res_name(&lsm->lsm_oi, &res_id);
2311	ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2312	return 0;
2313}
2314
2315/* find any ldlm lock of the inode in osc
2316 * return 0    not find
2317 *	1    find one
2318 *      < 0    error */
2319static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2320			   ldlm_iterator_t replace, void *data)
2321{
2322	struct ldlm_res_id res_id;
2323	struct obd_device *obd = class_exp2obd(exp);
2324	int rc = 0;
2325
2326	ostid_build_res_name(&lsm->lsm_oi, &res_id);
2327	rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2328	if (rc == LDLM_ITER_STOP)
2329		return(1);
2330	if (rc == LDLM_ITER_CONTINUE)
2331		return(0);
2332	return(rc);
2333}
2334
2335static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2336			    obd_enqueue_update_f upcall, void *cookie,
2337			    __u64 *flags, int agl, int rc)
2338{
2339	int intent = *flags & LDLM_FL_HAS_INTENT;
2340	ENTRY;
2341
2342	if (intent) {
2343		/* The request was created before ldlm_cli_enqueue call. */
2344		if (rc == ELDLM_LOCK_ABORTED) {
2345			struct ldlm_reply *rep;
2346			rep = req_capsule_server_get(&req->rq_pill,
2347						     &RMF_DLM_REP);
2348
2349			LASSERT(rep != NULL);
2350			if (rep->lock_policy_res1)
2351				rc = rep->lock_policy_res1;
2352		}
2353	}
2354
2355	if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2356	    (rc == 0)) {
2357		*flags |= LDLM_FL_LVB_READY;
2358		CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2359		       lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2360	}
2361
2362	/* Call the update callback. */
2363	rc = (*upcall)(cookie, rc);
2364	RETURN(rc);
2365}
2366
2367static int osc_enqueue_interpret(const struct lu_env *env,
2368				 struct ptlrpc_request *req,
2369				 struct osc_enqueue_args *aa, int rc)
2370{
2371	struct ldlm_lock *lock;
2372	struct lustre_handle handle;
2373	__u32 mode;
2374	struct ost_lvb *lvb;
2375	__u32 lvb_len;
2376	__u64 *flags = aa->oa_flags;
2377
2378	/* Make a local copy of a lock handle and a mode, because aa->oa_*
2379	 * might be freed anytime after lock upcall has been called. */
2380	lustre_handle_copy(&handle, aa->oa_lockh);
2381	mode = aa->oa_ei->ei_mode;
2382
2383	/* ldlm_cli_enqueue is holding a reference on the lock, so it must
2384	 * be valid. */
2385	lock = ldlm_handle2lock(&handle);
2386
2387	/* Take an additional reference so that a blocking AST that
2388	 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2389	 * to arrive after an upcall has been executed by
2390	 * osc_enqueue_fini(). */
2391	ldlm_lock_addref(&handle, mode);
2392
2393	/* Let CP AST to grant the lock first. */
2394	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2395
2396	if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2397		lvb = NULL;
2398		lvb_len = 0;
2399	} else {
2400		lvb = aa->oa_lvb;
2401		lvb_len = sizeof(*aa->oa_lvb);
2402	}
2403
2404	/* Complete obtaining the lock procedure. */
2405	rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2406				   mode, flags, lvb, lvb_len, &handle, rc);
2407	/* Complete osc stuff. */
2408	rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2409			      flags, aa->oa_agl, rc);
2410
2411	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2412
2413	/* Release the lock for async request. */
2414	if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2415		/*
2416		 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2417		 * not already released by
2418		 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2419		 */
2420		ldlm_lock_decref(&handle, mode);
2421
2422	LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2423		 aa->oa_lockh, req, aa);
2424	ldlm_lock_decref(&handle, mode);
2425	LDLM_LOCK_PUT(lock);
2426	return rc;
2427}
2428
2429void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2430			struct lov_oinfo *loi, int flags,
2431			struct ost_lvb *lvb, __u32 mode, int rc)
2432{
2433	struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2434
2435	if (rc == ELDLM_OK) {
2436		__u64 tmp;
2437
2438		LASSERT(lock != NULL);
2439		loi->loi_lvb = *lvb;
2440		tmp = loi->loi_lvb.lvb_size;
2441		/* Extend KMS up to the end of this lock and no further
2442		 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2443		if (tmp > lock->l_policy_data.l_extent.end)
2444			tmp = lock->l_policy_data.l_extent.end + 1;
2445		if (tmp >= loi->loi_kms) {
2446			LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2447				   ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2448			loi_kms_set(loi, tmp);
2449		} else {
2450			LDLM_DEBUG(lock, "lock acquired, setting rss="
2451				   LPU64"; leaving kms="LPU64", end="LPU64,
2452				   loi->loi_lvb.lvb_size, loi->loi_kms,
2453				   lock->l_policy_data.l_extent.end);
2454		}
2455		ldlm_lock_allow_match(lock);
2456	} else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2457		LASSERT(lock != NULL);
2458		loi->loi_lvb = *lvb;
2459		ldlm_lock_allow_match(lock);
2460		CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2461		       " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2462		rc = ELDLM_OK;
2463	}
2464
2465	if (lock != NULL) {
2466		if (rc != ELDLM_OK)
2467			ldlm_lock_fail_match(lock);
2468
2469		LDLM_LOCK_PUT(lock);
2470	}
2471}
2472EXPORT_SYMBOL(osc_update_enqueue);
2473
2474struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2475
2476/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2477 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2478 * other synchronous requests, however keeping some locks and trying to obtain
2479 * others may take a considerable amount of time in a case of ost failure; and
2480 * when other sync requests do not get released lock from a client, the client
2481 * is excluded from the cluster -- such scenarious make the life difficult, so
2482 * release locks just after they are obtained. */
2483int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2484		     __u64 *flags, ldlm_policy_data_t *policy,
2485		     struct ost_lvb *lvb, int kms_valid,
2486		     obd_enqueue_update_f upcall, void *cookie,
2487		     struct ldlm_enqueue_info *einfo,
2488		     struct lustre_handle *lockh,
2489		     struct ptlrpc_request_set *rqset, int async, int agl)
2490{
2491	struct obd_device *obd = exp->exp_obd;
2492	struct ptlrpc_request *req = NULL;
2493	int intent = *flags & LDLM_FL_HAS_INTENT;
2494	int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2495	ldlm_mode_t mode;
2496	int rc;
2497	ENTRY;
2498
2499	/* Filesystem lock extents are extended to page boundaries so that
2500	 * dealing with the page cache is a little smoother.  */
2501	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2502	policy->l_extent.end |= ~CFS_PAGE_MASK;
2503
2504	/*
2505	 * kms is not valid when either object is completely fresh (so that no
2506	 * locks are cached), or object was evicted. In the latter case cached
2507	 * lock cannot be used, because it would prime inode state with
2508	 * potentially stale LVB.
2509	 */
2510	if (!kms_valid)
2511		goto no_match;
2512
2513	/* Next, search for already existing extent locks that will cover us */
2514	/* If we're trying to read, we also search for an existing PW lock.  The
2515	 * VFS and page cache already protect us locally, so lots of readers/
2516	 * writers can share a single PW lock.
2517	 *
2518	 * There are problems with conversion deadlocks, so instead of
2519	 * converting a read lock to a write lock, we'll just enqueue a new
2520	 * one.
2521	 *
2522	 * At some point we should cancel the read lock instead of making them
2523	 * send us a blocking callback, but there are problems with canceling
2524	 * locks out from other users right now, too. */
2525	mode = einfo->ei_mode;
2526	if (einfo->ei_mode == LCK_PR)
2527		mode |= LCK_PW;
2528	mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2529			       einfo->ei_type, policy, mode, lockh, 0);
2530	if (mode) {
2531		struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2532
2533		if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2534			/* For AGL, if enqueue RPC is sent but the lock is not
2535			 * granted, then skip to process this strpe.
2536			 * Return -ECANCELED to tell the caller. */
2537			ldlm_lock_decref(lockh, mode);
2538			LDLM_LOCK_PUT(matched);
2539			RETURN(-ECANCELED);
2540		} else if (osc_set_lock_data_with_check(matched, einfo)) {
2541			*flags |= LDLM_FL_LVB_READY;
2542			/* addref the lock only if not async requests and PW
2543			 * lock is matched whereas we asked for PR. */
2544			if (!rqset && einfo->ei_mode != mode)
2545				ldlm_lock_addref(lockh, LCK_PR);
2546			if (intent) {
2547				/* I would like to be able to ASSERT here that
2548				 * rss <= kms, but I can't, for reasons which
2549				 * are explained in lov_enqueue() */
2550			}
2551
2552			/* We already have a lock, and it's referenced.
2553			 *
2554			 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2555			 * AGL upcall may change it to CLS_HELD directly. */
2556			(*upcall)(cookie, ELDLM_OK);
2557
2558			if (einfo->ei_mode != mode)
2559				ldlm_lock_decref(lockh, LCK_PW);
2560			else if (rqset)
2561				/* For async requests, decref the lock. */
2562				ldlm_lock_decref(lockh, einfo->ei_mode);
2563			LDLM_LOCK_PUT(matched);
2564			RETURN(ELDLM_OK);
2565		} else {
2566			ldlm_lock_decref(lockh, mode);
2567			LDLM_LOCK_PUT(matched);
2568		}
2569	}
2570
2571 no_match:
2572	if (intent) {
2573		LIST_HEAD(cancels);
2574		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2575					   &RQF_LDLM_ENQUEUE_LVB);
2576		if (req == NULL)
2577			RETURN(-ENOMEM);
2578
2579		rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2580		if (rc) {
2581			ptlrpc_request_free(req);
2582			RETURN(rc);
2583		}
2584
2585		req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2586				     sizeof *lvb);
2587		ptlrpc_request_set_replen(req);
2588	}
2589
2590	/* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2591	*flags &= ~LDLM_FL_BLOCK_GRANTED;
2592
2593	rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2594			      sizeof(*lvb), LVB_T_OST, lockh, async);
2595	if (rqset) {
2596		if (!rc) {
2597			struct osc_enqueue_args *aa;
2598			CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2599			aa = ptlrpc_req_async_args(req);
2600			aa->oa_ei = einfo;
2601			aa->oa_exp = exp;
2602			aa->oa_flags  = flags;
2603			aa->oa_upcall = upcall;
2604			aa->oa_cookie = cookie;
2605			aa->oa_lvb    = lvb;
2606			aa->oa_lockh  = lockh;
2607			aa->oa_agl    = !!agl;
2608
2609			req->rq_interpret_reply =
2610				(ptlrpc_interpterer_t)osc_enqueue_interpret;
2611			if (rqset == PTLRPCD_SET)
2612				ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2613			else
2614				ptlrpc_set_add_req(rqset, req);
2615		} else if (intent) {
2616			ptlrpc_req_finished(req);
2617		}
2618		RETURN(rc);
2619	}
2620
2621	rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2622	if (intent)
2623		ptlrpc_req_finished(req);
2624
2625	RETURN(rc);
2626}
2627
2628static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2629		       struct ldlm_enqueue_info *einfo,
2630		       struct ptlrpc_request_set *rqset)
2631{
2632	struct ldlm_res_id res_id;
2633	int rc;
2634	ENTRY;
2635
2636	ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2637	rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2638			      &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2639			      oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2640			      oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2641			      rqset, rqset != NULL, 0);
2642	RETURN(rc);
2643}
2644
2645int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2646		   __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2647		   int *flags, void *data, struct lustre_handle *lockh,
2648		   int unref)
2649{
2650	struct obd_device *obd = exp->exp_obd;
2651	int lflags = *flags;
2652	ldlm_mode_t rc;
2653	ENTRY;
2654
2655	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2656		RETURN(-EIO);
2657
2658	/* Filesystem lock extents are extended to page boundaries so that
2659	 * dealing with the page cache is a little smoother */
2660	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2661	policy->l_extent.end |= ~CFS_PAGE_MASK;
2662
2663	/* Next, search for already existing extent locks that will cover us */
2664	/* If we're trying to read, we also search for an existing PW lock.  The
2665	 * VFS and page cache already protect us locally, so lots of readers/
2666	 * writers can share a single PW lock. */
2667	rc = mode;
2668	if (mode == LCK_PR)
2669		rc |= LCK_PW;
2670	rc = ldlm_lock_match(obd->obd_namespace, lflags,
2671			     res_id, type, policy, rc, lockh, unref);
2672	if (rc) {
2673		if (data != NULL) {
2674			if (!osc_set_data_with_check(lockh, data)) {
2675				if (!(lflags & LDLM_FL_TEST_LOCK))
2676					ldlm_lock_decref(lockh, rc);
2677				RETURN(0);
2678			}
2679		}
2680		if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2681			ldlm_lock_addref(lockh, LCK_PR);
2682			ldlm_lock_decref(lockh, LCK_PW);
2683		}
2684		RETURN(rc);
2685	}
2686	RETURN(rc);
2687}
2688
2689int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2690{
2691	ENTRY;
2692
2693	if (unlikely(mode == LCK_GROUP))
2694		ldlm_lock_decref_and_cancel(lockh, mode);
2695	else
2696		ldlm_lock_decref(lockh, mode);
2697
2698	RETURN(0);
2699}
2700
2701static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2702		      __u32 mode, struct lustre_handle *lockh)
2703{
2704	ENTRY;
2705	RETURN(osc_cancel_base(lockh, mode));
2706}
2707
2708static int osc_cancel_unused(struct obd_export *exp,
2709			     struct lov_stripe_md *lsm,
2710			     ldlm_cancel_flags_t flags,
2711			     void *opaque)
2712{
2713	struct obd_device *obd = class_exp2obd(exp);
2714	struct ldlm_res_id res_id, *resp = NULL;
2715
2716	if (lsm != NULL) {
2717		ostid_build_res_name(&lsm->lsm_oi, &res_id);
2718		resp = &res_id;
2719	}
2720
2721	return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2722}
2723
2724static int osc_statfs_interpret(const struct lu_env *env,
2725				struct ptlrpc_request *req,
2726				struct osc_async_args *aa, int rc)
2727{
2728	struct obd_statfs *msfs;
2729	ENTRY;
2730
2731	if (rc == -EBADR)
2732		/* The request has in fact never been sent
2733		 * due to issues at a higher level (LOV).
2734		 * Exit immediately since the caller is
2735		 * aware of the problem and takes care
2736		 * of the clean up */
2737		 RETURN(rc);
2738
2739	if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2740	    (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2741		GOTO(out, rc = 0);
2742
2743	if (rc != 0)
2744		GOTO(out, rc);
2745
2746	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2747	if (msfs == NULL) {
2748		GOTO(out, rc = -EPROTO);
2749	}
2750
2751	*aa->aa_oi->oi_osfs = *msfs;
2752out:
2753	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2754	RETURN(rc);
2755}
2756
2757static int osc_statfs_async(struct obd_export *exp,
2758			    struct obd_info *oinfo, __u64 max_age,
2759			    struct ptlrpc_request_set *rqset)
2760{
2761	struct obd_device     *obd = class_exp2obd(exp);
2762	struct ptlrpc_request *req;
2763	struct osc_async_args *aa;
2764	int		    rc;
2765	ENTRY;
2766
2767	/* We could possibly pass max_age in the request (as an absolute
2768	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2769	 * extra calls into the filesystem if that isn't necessary (e.g.
2770	 * during mount that would help a bit).  Having relative timestamps
2771	 * is not so great if request processing is slow, while absolute
2772	 * timestamps are not ideal because they need time synchronization. */
2773	req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2774	if (req == NULL)
2775		RETURN(-ENOMEM);
2776
2777	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2778	if (rc) {
2779		ptlrpc_request_free(req);
2780		RETURN(rc);
2781	}
2782	ptlrpc_request_set_replen(req);
2783	req->rq_request_portal = OST_CREATE_PORTAL;
2784	ptlrpc_at_set_req_timeout(req);
2785
2786	if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2787		/* procfs requests not want stat in wait for avoid deadlock */
2788		req->rq_no_resend = 1;
2789		req->rq_no_delay = 1;
2790	}
2791
2792	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2793	CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2794	aa = ptlrpc_req_async_args(req);
2795	aa->aa_oi = oinfo;
2796
2797	ptlrpc_set_add_req(rqset, req);
2798	RETURN(0);
2799}
2800
2801static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2802		      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2803{
2804	struct obd_device     *obd = class_exp2obd(exp);
2805	struct obd_statfs     *msfs;
2806	struct ptlrpc_request *req;
2807	struct obd_import     *imp = NULL;
2808	int rc;
2809	ENTRY;
2810
2811	/*Since the request might also come from lprocfs, so we need
2812	 *sync this with client_disconnect_export Bug15684*/
2813	down_read(&obd->u.cli.cl_sem);
2814	if (obd->u.cli.cl_import)
2815		imp = class_import_get(obd->u.cli.cl_import);
2816	up_read(&obd->u.cli.cl_sem);
2817	if (!imp)
2818		RETURN(-ENODEV);
2819
2820	/* We could possibly pass max_age in the request (as an absolute
2821	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2822	 * extra calls into the filesystem if that isn't necessary (e.g.
2823	 * during mount that would help a bit).  Having relative timestamps
2824	 * is not so great if request processing is slow, while absolute
2825	 * timestamps are not ideal because they need time synchronization. */
2826	req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2827
2828	class_import_put(imp);
2829
2830	if (req == NULL)
2831		RETURN(-ENOMEM);
2832
2833	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2834	if (rc) {
2835		ptlrpc_request_free(req);
2836		RETURN(rc);
2837	}
2838	ptlrpc_request_set_replen(req);
2839	req->rq_request_portal = OST_CREATE_PORTAL;
2840	ptlrpc_at_set_req_timeout(req);
2841
2842	if (flags & OBD_STATFS_NODELAY) {
2843		/* procfs requests not want stat in wait for avoid deadlock */
2844		req->rq_no_resend = 1;
2845		req->rq_no_delay = 1;
2846	}
2847
2848	rc = ptlrpc_queue_wait(req);
2849	if (rc)
2850		GOTO(out, rc);
2851
2852	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2853	if (msfs == NULL) {
2854		GOTO(out, rc = -EPROTO);
2855	}
2856
2857	*osfs = *msfs;
2858
2859	EXIT;
2860 out:
2861	ptlrpc_req_finished(req);
2862	return rc;
2863}
2864
2865/* Retrieve object striping information.
2866 *
2867 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2868 * the maximum number of OST indices which will fit in the user buffer.
2869 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2870 */
2871static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2872{
2873	/* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2874	struct lov_user_md_v3 lum, *lumk;
2875	struct lov_user_ost_data_v1 *lmm_objects;
2876	int rc = 0, lum_size;
2877	ENTRY;
2878
2879	if (!lsm)
2880		RETURN(-ENODATA);
2881
2882	/* we only need the header part from user space to get lmm_magic and
2883	 * lmm_stripe_count, (the header part is common to v1 and v3) */
2884	lum_size = sizeof(struct lov_user_md_v1);
2885	if (copy_from_user(&lum, lump, lum_size))
2886		RETURN(-EFAULT);
2887
2888	if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2889	    (lum.lmm_magic != LOV_USER_MAGIC_V3))
2890		RETURN(-EINVAL);
2891
2892	/* lov_user_md_vX and lov_mds_md_vX must have the same size */
2893	LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2894	LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2895	LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2896
2897	/* we can use lov_mds_md_size() to compute lum_size
2898	 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2899	if (lum.lmm_stripe_count > 0) {
2900		lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2901		OBD_ALLOC(lumk, lum_size);
2902		if (!lumk)
2903			RETURN(-ENOMEM);
2904
2905		if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2906			lmm_objects =
2907			    &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2908		else
2909			lmm_objects = &(lumk->lmm_objects[0]);
2910		lmm_objects->l_ost_oi = lsm->lsm_oi;
2911	} else {
2912		lum_size = lov_mds_md_size(0, lum.lmm_magic);
2913		lumk = &lum;
2914	}
2915
2916	lumk->lmm_oi = lsm->lsm_oi;
2917	lumk->lmm_stripe_count = 1;
2918
2919	if (copy_to_user(lump, lumk, lum_size))
2920		rc = -EFAULT;
2921
2922	if (lumk != &lum)
2923		OBD_FREE(lumk, lum_size);
2924
2925	RETURN(rc);
2926}
2927
2928
2929static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2930			 void *karg, void *uarg)
2931{
2932	struct obd_device *obd = exp->exp_obd;
2933	struct obd_ioctl_data *data = karg;
2934	int err = 0;
2935	ENTRY;
2936
2937	if (!try_module_get(THIS_MODULE)) {
2938		CERROR("Can't get module. Is it alive?");
2939		return -EINVAL;
2940	}
2941	switch (cmd) {
2942	case OBD_IOC_LOV_GET_CONFIG: {
2943		char *buf;
2944		struct lov_desc *desc;
2945		struct obd_uuid uuid;
2946
2947		buf = NULL;
2948		len = 0;
2949		if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2950			GOTO(out, err = -EINVAL);
2951
2952		data = (struct obd_ioctl_data *)buf;
2953
2954		if (sizeof(*desc) > data->ioc_inllen1) {
2955			obd_ioctl_freedata(buf, len);
2956			GOTO(out, err = -EINVAL);
2957		}
2958
2959		if (data->ioc_inllen2 < sizeof(uuid)) {
2960			obd_ioctl_freedata(buf, len);
2961			GOTO(out, err = -EINVAL);
2962		}
2963
2964		desc = (struct lov_desc *)data->ioc_inlbuf1;
2965		desc->ld_tgt_count = 1;
2966		desc->ld_active_tgt_count = 1;
2967		desc->ld_default_stripe_count = 1;
2968		desc->ld_default_stripe_size = 0;
2969		desc->ld_default_stripe_offset = 0;
2970		desc->ld_pattern = 0;
2971		memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2972
2973		memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2974
2975		err = copy_to_user((void *)uarg, buf, len);
2976		if (err)
2977			err = -EFAULT;
2978		obd_ioctl_freedata(buf, len);
2979		GOTO(out, err);
2980	}
2981	case LL_IOC_LOV_SETSTRIPE:
2982		err = obd_alloc_memmd(exp, karg);
2983		if (err > 0)
2984			err = 0;
2985		GOTO(out, err);
2986	case LL_IOC_LOV_GETSTRIPE:
2987		err = osc_getstripe(karg, uarg);
2988		GOTO(out, err);
2989	case OBD_IOC_CLIENT_RECOVER:
2990		err = ptlrpc_recover_import(obd->u.cli.cl_import,
2991					    data->ioc_inlbuf1, 0);
2992		if (err > 0)
2993			err = 0;
2994		GOTO(out, err);
2995	case IOC_OSC_SET_ACTIVE:
2996		err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2997					       data->ioc_offset);
2998		GOTO(out, err);
2999	case OBD_IOC_POLL_QUOTACHECK:
3000		err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3001		GOTO(out, err);
3002	case OBD_IOC_PING_TARGET:
3003		err = ptlrpc_obd_ping(obd);
3004		GOTO(out, err);
3005	default:
3006		CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3007		       cmd, current_comm());
3008		GOTO(out, err = -ENOTTY);
3009	}
3010out:
3011	module_put(THIS_MODULE);
3012	return err;
3013}
3014
3015static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3016			obd_count keylen, void *key, __u32 *vallen, void *val,
3017			struct lov_stripe_md *lsm)
3018{
3019	ENTRY;
3020	if (!vallen || !val)
3021		RETURN(-EFAULT);
3022
3023	if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3024		__u32 *stripe = val;
3025		*vallen = sizeof(*stripe);
3026		*stripe = 0;
3027		RETURN(0);
3028	} else if (KEY_IS(KEY_LAST_ID)) {
3029		struct ptlrpc_request *req;
3030		obd_id		*reply;
3031		char		  *tmp;
3032		int		    rc;
3033
3034		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3035					   &RQF_OST_GET_INFO_LAST_ID);
3036		if (req == NULL)
3037			RETURN(-ENOMEM);
3038
3039		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3040				     RCL_CLIENT, keylen);
3041		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3042		if (rc) {
3043			ptlrpc_request_free(req);
3044			RETURN(rc);
3045		}
3046
3047		tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3048		memcpy(tmp, key, keylen);
3049
3050		req->rq_no_delay = req->rq_no_resend = 1;
3051		ptlrpc_request_set_replen(req);
3052		rc = ptlrpc_queue_wait(req);
3053		if (rc)
3054			GOTO(out, rc);
3055
3056		reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3057		if (reply == NULL)
3058			GOTO(out, rc = -EPROTO);
3059
3060		*((obd_id *)val) = *reply;
3061	out:
3062		ptlrpc_req_finished(req);
3063		RETURN(rc);
3064	} else if (KEY_IS(KEY_FIEMAP)) {
3065		struct ll_fiemap_info_key *fm_key =
3066				(struct ll_fiemap_info_key *)key;
3067		struct ldlm_res_id	 res_id;
3068		ldlm_policy_data_t	 policy;
3069		struct lustre_handle	 lockh;
3070		ldlm_mode_t		 mode = 0;
3071		struct ptlrpc_request	*req;
3072		struct ll_user_fiemap	*reply;
3073		char			*tmp;
3074		int			 rc;
3075
3076		if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3077			goto skip_locking;
3078
3079		policy.l_extent.start = fm_key->fiemap.fm_start &
3080						CFS_PAGE_MASK;
3081
3082		if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3083		    fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3084			policy.l_extent.end = OBD_OBJECT_EOF;
3085		else
3086			policy.l_extent.end = (fm_key->fiemap.fm_start +
3087				fm_key->fiemap.fm_length +
3088				PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3089
3090		ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3091		mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3092				       LDLM_FL_BLOCK_GRANTED |
3093				       LDLM_FL_LVB_READY,
3094				       &res_id, LDLM_EXTENT, &policy,
3095				       LCK_PR | LCK_PW, &lockh, 0);
3096		if (mode) { /* lock is cached on client */
3097			if (mode != LCK_PR) {
3098				ldlm_lock_addref(&lockh, LCK_PR);
3099				ldlm_lock_decref(&lockh, LCK_PW);
3100			}
3101		} else { /* no cached lock, needs acquire lock on server side */
3102			fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3103			fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3104		}
3105
3106skip_locking:
3107		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3108					   &RQF_OST_GET_INFO_FIEMAP);
3109		if (req == NULL)
3110			GOTO(drop_lock, rc = -ENOMEM);
3111
3112		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3113				     RCL_CLIENT, keylen);
3114		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3115				     RCL_CLIENT, *vallen);
3116		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3117				     RCL_SERVER, *vallen);
3118
3119		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3120		if (rc) {
3121			ptlrpc_request_free(req);
3122			GOTO(drop_lock, rc);
3123		}
3124
3125		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3126		memcpy(tmp, key, keylen);
3127		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3128		memcpy(tmp, val, *vallen);
3129
3130		ptlrpc_request_set_replen(req);
3131		rc = ptlrpc_queue_wait(req);
3132		if (rc)
3133			GOTO(fini_req, rc);
3134
3135		reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3136		if (reply == NULL)
3137			GOTO(fini_req, rc = -EPROTO);
3138
3139		memcpy(val, reply, *vallen);
3140fini_req:
3141		ptlrpc_req_finished(req);
3142drop_lock:
3143		if (mode)
3144			ldlm_lock_decref(&lockh, LCK_PR);
3145		RETURN(rc);
3146	}
3147
3148	RETURN(-EINVAL);
3149}
3150
3151static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3152			      obd_count keylen, void *key, obd_count vallen,
3153			      void *val, struct ptlrpc_request_set *set)
3154{
3155	struct ptlrpc_request *req;
3156	struct obd_device     *obd = exp->exp_obd;
3157	struct obd_import     *imp = class_exp2cliimp(exp);
3158	char		  *tmp;
3159	int		    rc;
3160	ENTRY;
3161
3162	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3163
3164	if (KEY_IS(KEY_CHECKSUM)) {
3165		if (vallen != sizeof(int))
3166			RETURN(-EINVAL);
3167		exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3168		RETURN(0);
3169	}
3170
3171	if (KEY_IS(KEY_SPTLRPC_CONF)) {
3172		sptlrpc_conf_client_adapt(obd);
3173		RETURN(0);
3174	}
3175
3176	if (KEY_IS(KEY_FLUSH_CTX)) {
3177		sptlrpc_import_flush_my_ctx(imp);
3178		RETURN(0);
3179	}
3180
3181	if (KEY_IS(KEY_CACHE_SET)) {
3182		struct client_obd *cli = &obd->u.cli;
3183
3184		LASSERT(cli->cl_cache == NULL); /* only once */
3185		cli->cl_cache = (struct cl_client_cache *)val;
3186		atomic_inc(&cli->cl_cache->ccc_users);
3187		cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3188
3189		/* add this osc into entity list */
3190		LASSERT(list_empty(&cli->cl_lru_osc));
3191		spin_lock(&cli->cl_cache->ccc_lru_lock);
3192		list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3193		spin_unlock(&cli->cl_cache->ccc_lru_lock);
3194
3195		RETURN(0);
3196	}
3197
3198	if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3199		struct client_obd *cli = &obd->u.cli;
3200		int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3201		int target = *(int *)val;
3202
3203		nr = osc_lru_shrink(cli, min(nr, target));
3204		*(int *)val -= nr;
3205		RETURN(0);
3206	}
3207
3208	if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3209		RETURN(-EINVAL);
3210
3211	/* We pass all other commands directly to OST. Since nobody calls osc
3212	   methods directly and everybody is supposed to go through LOV, we
3213	   assume lov checked invalid values for us.
3214	   The only recognised values so far are evict_by_nid and mds_conn.
3215	   Even if something bad goes through, we'd get a -EINVAL from OST
3216	   anyway. */
3217
3218	req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3219						&RQF_OST_SET_GRANT_INFO :
3220						&RQF_OBD_SET_INFO);
3221	if (req == NULL)
3222		RETURN(-ENOMEM);
3223
3224	req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3225			     RCL_CLIENT, keylen);
3226	if (!KEY_IS(KEY_GRANT_SHRINK))
3227		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3228				     RCL_CLIENT, vallen);
3229	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3230	if (rc) {
3231		ptlrpc_request_free(req);
3232		RETURN(rc);
3233	}
3234
3235	tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3236	memcpy(tmp, key, keylen);
3237	tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3238							&RMF_OST_BODY :
3239							&RMF_SETINFO_VAL);
3240	memcpy(tmp, val, vallen);
3241
3242	if (KEY_IS(KEY_GRANT_SHRINK)) {
3243		struct osc_grant_args *aa;
3244		struct obdo *oa;
3245
3246		CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3247		aa = ptlrpc_req_async_args(req);
3248		OBDO_ALLOC(oa);
3249		if (!oa) {
3250			ptlrpc_req_finished(req);
3251			RETURN(-ENOMEM);
3252		}
3253		*oa = ((struct ost_body *)val)->oa;
3254		aa->aa_oa = oa;
3255		req->rq_interpret_reply = osc_shrink_grant_interpret;
3256	}
3257
3258	ptlrpc_request_set_replen(req);
3259	if (!KEY_IS(KEY_GRANT_SHRINK)) {
3260		LASSERT(set != NULL);
3261		ptlrpc_set_add_req(set, req);
3262		ptlrpc_check_set(NULL, set);
3263	} else
3264		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3265
3266	RETURN(0);
3267}
3268
3269
3270static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3271			 struct obd_device *disk_obd, int *index)
3272{
3273	/* this code is not supposed to be used with LOD/OSP
3274	 * to be removed soon */
3275	LBUG();
3276	return 0;
3277}
3278
3279static int osc_llog_finish(struct obd_device *obd, int count)
3280{
3281	struct llog_ctxt *ctxt;
3282
3283	ENTRY;
3284
3285	ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3286	if (ctxt) {
3287		llog_cat_close(NULL, ctxt->loc_handle);
3288		llog_cleanup(NULL, ctxt);
3289	}
3290
3291	ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3292	if (ctxt)
3293		llog_cleanup(NULL, ctxt);
3294	RETURN(0);
3295}
3296
3297static int osc_reconnect(const struct lu_env *env,
3298			 struct obd_export *exp, struct obd_device *obd,
3299			 struct obd_uuid *cluuid,
3300			 struct obd_connect_data *data,
3301			 void *localdata)
3302{
3303	struct client_obd *cli = &obd->u.cli;
3304
3305	if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3306		long lost_grant;
3307
3308		client_obd_list_lock(&cli->cl_loi_list_lock);
3309		data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3310				2 * cli_brw_size(obd);
3311		lost_grant = cli->cl_lost_grant;
3312		cli->cl_lost_grant = 0;
3313		client_obd_list_unlock(&cli->cl_loi_list_lock);
3314
3315		CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3316		       " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3317		       data->ocd_version, data->ocd_grant, lost_grant);
3318	}
3319
3320	RETURN(0);
3321}
3322
3323static int osc_disconnect(struct obd_export *exp)
3324{
3325	struct obd_device *obd = class_exp2obd(exp);
3326	struct llog_ctxt  *ctxt;
3327	int rc;
3328
3329	ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3330	if (ctxt) {
3331		if (obd->u.cli.cl_conn_count == 1) {
3332			/* Flush any remaining cancel messages out to the
3333			 * target */
3334			llog_sync(ctxt, exp, 0);
3335		}
3336		llog_ctxt_put(ctxt);
3337	} else {
3338		CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3339		       obd);
3340	}
3341
3342	rc = client_disconnect_export(exp);
3343	/**
3344	 * Initially we put del_shrink_grant before disconnect_export, but it
3345	 * causes the following problem if setup (connect) and cleanup
3346	 * (disconnect) are tangled together.
3347	 *      connect p1		     disconnect p2
3348	 *   ptlrpc_connect_import
3349	 *     ...............	       class_manual_cleanup
3350	 *				     osc_disconnect
3351	 *				     del_shrink_grant
3352	 *   ptlrpc_connect_interrupt
3353	 *     init_grant_shrink
3354	 *   add this client to shrink list
3355	 *				      cleanup_osc
3356	 * Bang! pinger trigger the shrink.
3357	 * So the osc should be disconnected from the shrink list, after we
3358	 * are sure the import has been destroyed. BUG18662
3359	 */
3360	if (obd->u.cli.cl_import == NULL)
3361		osc_del_shrink_grant(&obd->u.cli);
3362	return rc;
3363}
3364
3365static int osc_import_event(struct obd_device *obd,
3366			    struct obd_import *imp,
3367			    enum obd_import_event event)
3368{
3369	struct client_obd *cli;
3370	int rc = 0;
3371
3372	ENTRY;
3373	LASSERT(imp->imp_obd == obd);
3374
3375	switch (event) {
3376	case IMP_EVENT_DISCON: {
3377		cli = &obd->u.cli;
3378		client_obd_list_lock(&cli->cl_loi_list_lock);
3379		cli->cl_avail_grant = 0;
3380		cli->cl_lost_grant = 0;
3381		client_obd_list_unlock(&cli->cl_loi_list_lock);
3382		break;
3383	}
3384	case IMP_EVENT_INACTIVE: {
3385		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3386		break;
3387	}
3388	case IMP_EVENT_INVALIDATE: {
3389		struct ldlm_namespace *ns = obd->obd_namespace;
3390		struct lu_env	 *env;
3391		int		    refcheck;
3392
3393		env = cl_env_get(&refcheck);
3394		if (!IS_ERR(env)) {
3395			/* Reset grants */
3396			cli = &obd->u.cli;
3397			/* all pages go to failing rpcs due to the invalid
3398			 * import */
3399			osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3400
3401			ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3402			cl_env_put(env, &refcheck);
3403		} else
3404			rc = PTR_ERR(env);
3405		break;
3406	}
3407	case IMP_EVENT_ACTIVE: {
3408		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3409		break;
3410	}
3411	case IMP_EVENT_OCD: {
3412		struct obd_connect_data *ocd = &imp->imp_connect_data;
3413
3414		if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3415			osc_init_grant(&obd->u.cli, ocd);
3416
3417		/* See bug 7198 */
3418		if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3419			imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3420
3421		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3422		break;
3423	}
3424	case IMP_EVENT_DEACTIVATE: {
3425		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3426		break;
3427	}
3428	case IMP_EVENT_ACTIVATE: {
3429		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3430		break;
3431	}
3432	default:
3433		CERROR("Unknown import event %d\n", event);
3434		LBUG();
3435	}
3436	RETURN(rc);
3437}
3438
3439/**
3440 * Determine whether the lock can be canceled before replaying the lock
3441 * during recovery, see bug16774 for detailed information.
3442 *
3443 * \retval zero the lock can't be canceled
3444 * \retval other ok to cancel
3445 */
3446static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3447{
3448	check_res_locked(lock->l_resource);
3449
3450	/*
3451	 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3452	 *
3453	 * XXX as a future improvement, we can also cancel unused write lock
3454	 * if it doesn't have dirty data and active mmaps.
3455	 */
3456	if (lock->l_resource->lr_type == LDLM_EXTENT &&
3457	    (lock->l_granted_mode == LCK_PR ||
3458	     lock->l_granted_mode == LCK_CR) &&
3459	    (osc_dlm_lock_pageref(lock) == 0))
3460		RETURN(1);
3461
3462	RETURN(0);
3463}
3464
3465static int brw_queue_work(const struct lu_env *env, void *data)
3466{
3467	struct client_obd *cli = data;
3468
3469	CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3470
3471	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3472	RETURN(0);
3473}
3474
3475int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3476{
3477	struct lprocfs_static_vars lvars = { 0 };
3478	struct client_obd	  *cli = &obd->u.cli;
3479	void		       *handler;
3480	int			rc;
3481	ENTRY;
3482
3483	rc = ptlrpcd_addref();
3484	if (rc)
3485		RETURN(rc);
3486
3487	rc = client_obd_setup(obd, lcfg);
3488	if (rc)
3489		GOTO(out_ptlrpcd, rc);
3490
3491	handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3492	if (IS_ERR(handler))
3493		GOTO(out_client_setup, rc = PTR_ERR(handler));
3494	cli->cl_writeback_work = handler;
3495
3496	rc = osc_quota_setup(obd);
3497	if (rc)
3498		GOTO(out_ptlrpcd_work, rc);
3499
3500	cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3501	lprocfs_osc_init_vars(&lvars);
3502	if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3503		lproc_osc_attach_seqstat(obd);
3504		sptlrpc_lprocfs_cliobd_attach(obd);
3505		ptlrpc_lprocfs_register_obd(obd);
3506	}
3507
3508	/* We need to allocate a few requests more, because
3509	 * brw_interpret tries to create new requests before freeing
3510	 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3511	 * reserved, but I'm afraid that might be too much wasted RAM
3512	 * in fact, so 2 is just my guess and still should work. */
3513	cli->cl_import->imp_rq_pool =
3514		ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3515				    OST_MAXREQSIZE,
3516				    ptlrpc_add_rqs_to_pool);
3517
3518	INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3519	ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3520	RETURN(rc);
3521
3522out_ptlrpcd_work:
3523	ptlrpcd_destroy_work(handler);
3524out_client_setup:
3525	client_obd_cleanup(obd);
3526out_ptlrpcd:
3527	ptlrpcd_decref();
3528	RETURN(rc);
3529}
3530
3531static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3532{
3533	int rc = 0;
3534	ENTRY;
3535
3536	switch (stage) {
3537	case OBD_CLEANUP_EARLY: {
3538		struct obd_import *imp;
3539		imp = obd->u.cli.cl_import;
3540		CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3541		/* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3542		ptlrpc_deactivate_import(imp);
3543		spin_lock(&imp->imp_lock);
3544		imp->imp_pingable = 0;
3545		spin_unlock(&imp->imp_lock);
3546		break;
3547	}
3548	case OBD_CLEANUP_EXPORTS: {
3549		struct client_obd *cli = &obd->u.cli;
3550		/* LU-464
3551		 * for echo client, export may be on zombie list, wait for
3552		 * zombie thread to cull it, because cli.cl_import will be
3553		 * cleared in client_disconnect_export():
3554		 *   class_export_destroy() -> obd_cleanup() ->
3555		 *   echo_device_free() -> echo_client_cleanup() ->
3556		 *   obd_disconnect() -> osc_disconnect() ->
3557		 *   client_disconnect_export()
3558		 */
3559		obd_zombie_barrier();
3560		if (cli->cl_writeback_work) {
3561			ptlrpcd_destroy_work(cli->cl_writeback_work);
3562			cli->cl_writeback_work = NULL;
3563		}
3564		obd_cleanup_client_import(obd);
3565		ptlrpc_lprocfs_unregister_obd(obd);
3566		lprocfs_obd_cleanup(obd);
3567		rc = obd_llog_finish(obd, 0);
3568		if (rc != 0)
3569			CERROR("failed to cleanup llogging subsystems\n");
3570		break;
3571		}
3572	}
3573	RETURN(rc);
3574}
3575
3576int osc_cleanup(struct obd_device *obd)
3577{
3578	struct client_obd *cli = &obd->u.cli;
3579	int rc;
3580
3581	ENTRY;
3582
3583	/* lru cleanup */
3584	if (cli->cl_cache != NULL) {
3585		LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3586		spin_lock(&cli->cl_cache->ccc_lru_lock);
3587		list_del_init(&cli->cl_lru_osc);
3588		spin_unlock(&cli->cl_cache->ccc_lru_lock);
3589		cli->cl_lru_left = NULL;
3590		atomic_dec(&cli->cl_cache->ccc_users);
3591		cli->cl_cache = NULL;
3592	}
3593
3594	/* free memory of osc quota cache */
3595	osc_quota_cleanup(obd);
3596
3597	rc = client_obd_cleanup(obd);
3598
3599	ptlrpcd_decref();
3600	RETURN(rc);
3601}
3602
3603int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3604{
3605	struct lprocfs_static_vars lvars = { 0 };
3606	int rc = 0;
3607
3608	lprocfs_osc_init_vars(&lvars);
3609
3610	switch (lcfg->lcfg_command) {
3611	default:
3612		rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3613					      lcfg, obd);
3614		if (rc > 0)
3615			rc = 0;
3616		break;
3617	}
3618
3619	return(rc);
3620}
3621
3622static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3623{
3624	return osc_process_config_base(obd, buf);
3625}
3626
3627struct obd_ops osc_obd_ops = {
3628	.o_owner		= THIS_MODULE,
3629	.o_setup		= osc_setup,
3630	.o_precleanup	   = osc_precleanup,
3631	.o_cleanup	      = osc_cleanup,
3632	.o_add_conn	     = client_import_add_conn,
3633	.o_del_conn	     = client_import_del_conn,
3634	.o_connect	      = client_connect_import,
3635	.o_reconnect	    = osc_reconnect,
3636	.o_disconnect	   = osc_disconnect,
3637	.o_statfs	       = osc_statfs,
3638	.o_statfs_async	 = osc_statfs_async,
3639	.o_packmd	       = osc_packmd,
3640	.o_unpackmd	     = osc_unpackmd,
3641	.o_create	       = osc_create,
3642	.o_destroy	      = osc_destroy,
3643	.o_getattr	      = osc_getattr,
3644	.o_getattr_async	= osc_getattr_async,
3645	.o_setattr	      = osc_setattr,
3646	.o_setattr_async	= osc_setattr_async,
3647	.o_brw		  = osc_brw,
3648	.o_punch		= osc_punch,
3649	.o_sync		 = osc_sync,
3650	.o_enqueue	      = osc_enqueue,
3651	.o_change_cbdata	= osc_change_cbdata,
3652	.o_find_cbdata	  = osc_find_cbdata,
3653	.o_cancel	       = osc_cancel,
3654	.o_cancel_unused	= osc_cancel_unused,
3655	.o_iocontrol	    = osc_iocontrol,
3656	.o_get_info	     = osc_get_info,
3657	.o_set_info_async       = osc_set_info_async,
3658	.o_import_event	 = osc_import_event,
3659	.o_llog_init	    = osc_llog_init,
3660	.o_llog_finish	  = osc_llog_finish,
3661	.o_process_config       = osc_process_config,
3662	.o_quotactl	     = osc_quotactl,
3663	.o_quotacheck	   = osc_quotacheck,
3664};
3665
3666extern struct lu_kmem_descr osc_caches[];
3667extern spinlock_t osc_ast_guard;
3668extern struct lock_class_key osc_ast_guard_class;
3669
3670int __init osc_init(void)
3671{
3672	struct lprocfs_static_vars lvars = { 0 };
3673	int rc;
3674	ENTRY;
3675
3676	/* print an address of _any_ initialized kernel symbol from this
3677	 * module, to allow debugging with gdb that doesn't support data
3678	 * symbols from modules.*/
3679	CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3680
3681	rc = lu_kmem_init(osc_caches);
3682
3683	lprocfs_osc_init_vars(&lvars);
3684
3685	rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3686				 LUSTRE_OSC_NAME, &osc_device_type);
3687	if (rc) {
3688		lu_kmem_fini(osc_caches);
3689		RETURN(rc);
3690	}
3691
3692	spin_lock_init(&osc_ast_guard);
3693	lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3694
3695	RETURN(rc);
3696}
3697
3698static void /*__exit*/ osc_exit(void)
3699{
3700	class_unregister_type(LUSTRE_OSC_NAME);
3701	lu_kmem_fini(osc_caches);
3702}
3703
3704MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3705MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3706MODULE_LICENSE("GPL");
3707
3708cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3709