osc_request.c revision 26c4ea46a55c9056fa20e3c91b1989f3cd9473d7
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_OSC
38
39#include "../../include/linux/libcfs/libcfs.h"
40
41
42#include "../include/lustre_dlm.h"
43#include "../include/lustre_net.h"
44#include "../include/lustre/lustre_user.h"
45#include "../include/obd_cksum.h"
46
47#include "../include/lustre_ha.h"
48#include "../include/lprocfs_status.h"
49#include "../include/lustre_debug.h"
50#include "../include/lustre_param.h"
51#include "../include/lustre_fid.h"
52#include "../include/obd_class.h"
53#include "osc_internal.h"
54#include "osc_cl_internal.h"
55
56struct osc_brw_async_args {
57	struct obdo       *aa_oa;
58	int		aa_requested_nob;
59	int		aa_nio_count;
60	u32		aa_page_count;
61	int		aa_resends;
62	struct brw_page  **aa_ppga;
63	struct client_obd *aa_cli;
64	struct list_head	 aa_oaps;
65	struct list_head	 aa_exts;
66	struct obd_capa   *aa_ocapa;
67	struct cl_req     *aa_clerq;
68};
69
70struct osc_async_args {
71	struct obd_info   *aa_oi;
72};
73
74struct osc_setattr_args {
75	struct obdo	 *sa_oa;
76	obd_enqueue_update_f sa_upcall;
77	void		*sa_cookie;
78};
79
80struct osc_fsync_args {
81	struct obd_info     *fa_oi;
82	obd_enqueue_update_f fa_upcall;
83	void		*fa_cookie;
84};
85
86struct osc_enqueue_args {
87	struct obd_export	*oa_exp;
88	__u64		    *oa_flags;
89	obd_enqueue_update_f      oa_upcall;
90	void		     *oa_cookie;
91	struct ost_lvb	   *oa_lvb;
92	struct lustre_handle     *oa_lockh;
93	struct ldlm_enqueue_info *oa_ei;
94	unsigned int	      oa_agl:1;
95};
96
97static void osc_release_ppga(struct brw_page **ppga, u32 count);
98static int brw_interpret(const struct lu_env *env,
99			 struct ptlrpc_request *req, void *data, int rc);
100int osc_cleanup(struct obd_device *obd);
101
102/* Pack OSC object metadata for disk storage (LE byte order). */
103static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
104		      struct lov_stripe_md *lsm)
105{
106	int lmm_size;
107
108	lmm_size = sizeof(**lmmp);
109	if (lmmp == NULL)
110		return lmm_size;
111
112	if (*lmmp != NULL && lsm == NULL) {
113		OBD_FREE(*lmmp, lmm_size);
114		*lmmp = NULL;
115		return 0;
116	} else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
117		return -EBADF;
118	}
119
120	if (*lmmp == NULL) {
121		OBD_ALLOC(*lmmp, lmm_size);
122		if (*lmmp == NULL)
123			return -ENOMEM;
124	}
125
126	if (lsm)
127		ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
128
129	return lmm_size;
130}
131
132/* Unpack OSC object metadata from disk storage (LE byte order). */
133static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
134			struct lov_mds_md *lmm, int lmm_bytes)
135{
136	int lsm_size;
137	struct obd_import *imp = class_exp2cliimp(exp);
138
139	if (lmm != NULL) {
140		if (lmm_bytes < sizeof(*lmm)) {
141			CERROR("%s: lov_mds_md too small: %d, need %d\n",
142			       exp->exp_obd->obd_name, lmm_bytes,
143			       (int)sizeof(*lmm));
144			return -EINVAL;
145		}
146		/* XXX LOV_MAGIC etc check? */
147
148		if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
149			CERROR("%s: zero lmm_object_id: rc = %d\n",
150			       exp->exp_obd->obd_name, -EINVAL);
151			return -EINVAL;
152		}
153	}
154
155	lsm_size = lov_stripe_md_size(1);
156	if (lsmp == NULL)
157		return lsm_size;
158
159	if (*lsmp != NULL && lmm == NULL) {
160		OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
161		OBD_FREE(*lsmp, lsm_size);
162		*lsmp = NULL;
163		return 0;
164	}
165
166	if (*lsmp == NULL) {
167		OBD_ALLOC(*lsmp, lsm_size);
168		if (unlikely(*lsmp == NULL))
169			return -ENOMEM;
170		OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
171		if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
172			OBD_FREE(*lsmp, lsm_size);
173			return -ENOMEM;
174		}
175		loi_init((*lsmp)->lsm_oinfo[0]);
176	} else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
177		return -EBADF;
178	}
179
180	if (lmm != NULL)
181		/* XXX zero *lsmp? */
182		ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
183
184	if (imp != NULL &&
185	    (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
186		(*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
187	else
188		(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
189
190	return lsm_size;
191}
192
193static inline void osc_pack_capa(struct ptlrpc_request *req,
194				 struct ost_body *body, void *capa)
195{
196	struct obd_capa *oc = (struct obd_capa *)capa;
197	struct lustre_capa *c;
198
199	if (!capa)
200		return;
201
202	c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
203	LASSERT(c);
204	capa_cpy(c, oc);
205	body->oa.o_valid |= OBD_MD_FLOSSCAPA;
206	DEBUG_CAPA(D_SEC, c, "pack");
207}
208
209static inline void osc_pack_req_body(struct ptlrpc_request *req,
210				     struct obd_info *oinfo)
211{
212	struct ost_body *body;
213
214	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
215	LASSERT(body);
216
217	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
218			     oinfo->oi_oa);
219	osc_pack_capa(req, body, oinfo->oi_capa);
220}
221
222static inline void osc_set_capa_size(struct ptlrpc_request *req,
223				     const struct req_msg_field *field,
224				     struct obd_capa *oc)
225{
226	if (oc == NULL)
227		req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
228	else
229		/* it is already calculated as sizeof struct obd_capa */
230		;
231}
232
233static int osc_getattr_interpret(const struct lu_env *env,
234				 struct ptlrpc_request *req,
235				 struct osc_async_args *aa, int rc)
236{
237	struct ost_body *body;
238
239	if (rc != 0)
240		goto out;
241
242	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
243	if (body) {
244		CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
246				     aa->aa_oi->oi_oa, &body->oa);
247
248		/* This should really be sent by the OST */
249		aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
250		aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
251	} else {
252		CDEBUG(D_INFO, "can't unpack ost_body\n");
253		rc = -EPROTO;
254		aa->aa_oi->oi_oa->o_valid = 0;
255	}
256out:
257	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
258	return rc;
259}
260
261static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
262			     struct ptlrpc_request_set *set)
263{
264	struct ptlrpc_request *req;
265	struct osc_async_args *aa;
266	int		    rc;
267
268	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
269	if (req == NULL)
270		return -ENOMEM;
271
272	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
273	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
274	if (rc) {
275		ptlrpc_request_free(req);
276		return rc;
277	}
278
279	osc_pack_req_body(req, oinfo);
280
281	ptlrpc_request_set_replen(req);
282	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
283
284	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
285	aa = ptlrpc_req_async_args(req);
286	aa->aa_oi = oinfo;
287
288	ptlrpc_set_add_req(set, req);
289	return 0;
290}
291
292static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
293		       struct obd_info *oinfo)
294{
295	struct ptlrpc_request *req;
296	struct ost_body       *body;
297	int		    rc;
298
299	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
300	if (req == NULL)
301		return -ENOMEM;
302
303	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
304	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
305	if (rc) {
306		ptlrpc_request_free(req);
307		return rc;
308	}
309
310	osc_pack_req_body(req, oinfo);
311
312	ptlrpc_request_set_replen(req);
313
314	rc = ptlrpc_queue_wait(req);
315	if (rc)
316		goto out;
317
318	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
319	if (body == NULL) {
320		rc = -EPROTO;
321		goto out;
322	}
323
324	CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
325	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
326			     &body->oa);
327
328	oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
329	oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
330
331 out:
332	ptlrpc_req_finished(req);
333	return rc;
334}
335
336static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
337		       struct obd_info *oinfo, struct obd_trans_info *oti)
338{
339	struct ptlrpc_request *req;
340	struct ost_body       *body;
341	int		    rc;
342
343	LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
344
345	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
346	if (req == NULL)
347		return -ENOMEM;
348
349	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
350	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
351	if (rc) {
352		ptlrpc_request_free(req);
353		return rc;
354	}
355
356	osc_pack_req_body(req, oinfo);
357
358	ptlrpc_request_set_replen(req);
359
360	rc = ptlrpc_queue_wait(req);
361	if (rc)
362		goto out;
363
364	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
365	if (body == NULL) {
366		rc = -EPROTO;
367		goto out;
368	}
369
370	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
371			     &body->oa);
372
373out:
374	ptlrpc_req_finished(req);
375	return rc;
376}
377
378static int osc_setattr_interpret(const struct lu_env *env,
379				 struct ptlrpc_request *req,
380				 struct osc_setattr_args *sa, int rc)
381{
382	struct ost_body *body;
383
384	if (rc != 0)
385		goto out;
386
387	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
388	if (body == NULL) {
389		rc = -EPROTO;
390		goto out;
391	}
392
393	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
394			     &body->oa);
395out:
396	rc = sa->sa_upcall(sa->sa_cookie, rc);
397	return rc;
398}
399
400int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
401			   struct obd_trans_info *oti,
402			   obd_enqueue_update_f upcall, void *cookie,
403			   struct ptlrpc_request_set *rqset)
404{
405	struct ptlrpc_request   *req;
406	struct osc_setattr_args *sa;
407	int		      rc;
408
409	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
410	if (req == NULL)
411		return -ENOMEM;
412
413	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
414	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
415	if (rc) {
416		ptlrpc_request_free(req);
417		return rc;
418	}
419
420	if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
421		oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
422
423	osc_pack_req_body(req, oinfo);
424
425	ptlrpc_request_set_replen(req);
426
427	/* do mds to ost setattr asynchronously */
428	if (!rqset) {
429		/* Do not wait for response. */
430		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
431	} else {
432		req->rq_interpret_reply =
433			(ptlrpc_interpterer_t)osc_setattr_interpret;
434
435		CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
436		sa = ptlrpc_req_async_args(req);
437		sa->sa_oa = oinfo->oi_oa;
438		sa->sa_upcall = upcall;
439		sa->sa_cookie = cookie;
440
441		if (rqset == PTLRPCD_SET)
442			ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
443		else
444			ptlrpc_set_add_req(rqset, req);
445	}
446
447	return 0;
448}
449
450static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
451			     struct obd_trans_info *oti,
452			     struct ptlrpc_request_set *rqset)
453{
454	return osc_setattr_async_base(exp, oinfo, oti,
455				      oinfo->oi_cb_up, oinfo, rqset);
456}
457
458int osc_real_create(struct obd_export *exp, struct obdo *oa,
459		    struct lov_stripe_md **ea, struct obd_trans_info *oti)
460{
461	struct ptlrpc_request *req;
462	struct ost_body       *body;
463	struct lov_stripe_md  *lsm;
464	int		    rc;
465
466	LASSERT(oa);
467	LASSERT(ea);
468
469	lsm = *ea;
470	if (!lsm) {
471		rc = obd_alloc_memmd(exp, &lsm);
472		if (rc < 0)
473			return rc;
474	}
475
476	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
477	if (req == NULL) {
478		rc = -ENOMEM;
479		goto out;
480	}
481
482	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
483	if (rc) {
484		ptlrpc_request_free(req);
485		goto out;
486	}
487
488	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
489	LASSERT(body);
490
491	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
492
493	ptlrpc_request_set_replen(req);
494
495	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
496	    oa->o_flags == OBD_FL_DELORPHAN) {
497		DEBUG_REQ(D_HA, req,
498			  "delorphan from OST integration");
499		/* Don't resend the delorphan req */
500		req->rq_no_resend = req->rq_no_delay = 1;
501	}
502
503	rc = ptlrpc_queue_wait(req);
504	if (rc)
505		goto out_req;
506
507	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
508	if (body == NULL) {
509		rc = -EPROTO;
510		goto out_req;
511	}
512
513	CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
514	lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
515
516	oa->o_blksize = cli_brw_size(exp->exp_obd);
517	oa->o_valid |= OBD_MD_FLBLKSZ;
518
519	/* XXX LOV STACKING: the lsm that is passed to us from LOV does not
520	 * have valid lsm_oinfo data structs, so don't go touching that.
521	 * This needs to be fixed in a big way.
522	 */
523	lsm->lsm_oi = oa->o_oi;
524	*ea = lsm;
525
526	if (oti != NULL) {
527		oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
528
529		if (oa->o_valid & OBD_MD_FLCOOKIE) {
530			if (!oti->oti_logcookies)
531				oti_alloc_cookies(oti, 1);
532			*oti->oti_logcookies = oa->o_lcookie;
533		}
534	}
535
536	CDEBUG(D_HA, "transno: %lld\n",
537	       lustre_msg_get_transno(req->rq_repmsg));
538out_req:
539	ptlrpc_req_finished(req);
540out:
541	if (rc && !*ea)
542		obd_free_memmd(exp, &lsm);
543	return rc;
544}
545
546int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
547		   obd_enqueue_update_f upcall, void *cookie,
548		   struct ptlrpc_request_set *rqset)
549{
550	struct ptlrpc_request   *req;
551	struct osc_setattr_args *sa;
552	struct ost_body	 *body;
553	int		      rc;
554
555	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
556	if (req == NULL)
557		return -ENOMEM;
558
559	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
560	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
561	if (rc) {
562		ptlrpc_request_free(req);
563		return rc;
564	}
565	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
566	ptlrpc_at_set_req_timeout(req);
567
568	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
569	LASSERT(body);
570	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
571			     oinfo->oi_oa);
572	osc_pack_capa(req, body, oinfo->oi_capa);
573
574	ptlrpc_request_set_replen(req);
575
576	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
577	CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
578	sa = ptlrpc_req_async_args(req);
579	sa->sa_oa     = oinfo->oi_oa;
580	sa->sa_upcall = upcall;
581	sa->sa_cookie = cookie;
582	if (rqset == PTLRPCD_SET)
583		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
584	else
585		ptlrpc_set_add_req(rqset, req);
586
587	return 0;
588}
589
590static int osc_sync_interpret(const struct lu_env *env,
591			      struct ptlrpc_request *req,
592			      void *arg, int rc)
593{
594	struct osc_fsync_args *fa = arg;
595	struct ost_body *body;
596
597	if (rc)
598		goto out;
599
600	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
601	if (body == NULL) {
602		CERROR ("can't unpack ost_body\n");
603		rc = -EPROTO;
604		goto out;
605	}
606
607	*fa->fa_oi->oi_oa = body->oa;
608out:
609	rc = fa->fa_upcall(fa->fa_cookie, rc);
610	return rc;
611}
612
613int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
614		  obd_enqueue_update_f upcall, void *cookie,
615		  struct ptlrpc_request_set *rqset)
616{
617	struct ptlrpc_request *req;
618	struct ost_body       *body;
619	struct osc_fsync_args *fa;
620	int		    rc;
621
622	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
623	if (req == NULL)
624		return -ENOMEM;
625
626	osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
627	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
628	if (rc) {
629		ptlrpc_request_free(req);
630		return rc;
631	}
632
633	/* overload the size and blocks fields in the oa with start/end */
634	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
635	LASSERT(body);
636	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
637			     oinfo->oi_oa);
638	osc_pack_capa(req, body, oinfo->oi_capa);
639
640	ptlrpc_request_set_replen(req);
641	req->rq_interpret_reply = osc_sync_interpret;
642
643	CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
644	fa = ptlrpc_req_async_args(req);
645	fa->fa_oi = oinfo;
646	fa->fa_upcall = upcall;
647	fa->fa_cookie = cookie;
648
649	if (rqset == PTLRPCD_SET)
650		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
651	else
652		ptlrpc_set_add_req(rqset, req);
653
654	return 0;
655}
656
657/* Find and cancel locally locks matched by @mode in the resource found by
658 * @objid. Found locks are added into @cancel list. Returns the amount of
659 * locks added to @cancels list. */
660static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661				   struct list_head *cancels,
662				   ldlm_mode_t mode, __u64 lock_flags)
663{
664	struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665	struct ldlm_res_id res_id;
666	struct ldlm_resource *res;
667	int count;
668
669	/* Return, i.e. cancel nothing, only if ELC is supported (flag in
670	 * export) but disabled through procfs (flag in NS).
671	 *
672	 * This distinguishes from a case when ELC is not supported originally,
673	 * when we still want to cancel locks in advance and just cancel them
674	 * locally, without sending any RPC. */
675	if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
676		return 0;
677
678	ostid_build_res_name(&oa->o_oi, &res_id);
679	res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
680	if (res == NULL)
681		return 0;
682
683	LDLM_RESOURCE_ADDREF(res);
684	count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
685					   lock_flags, 0, NULL);
686	LDLM_RESOURCE_DELREF(res);
687	ldlm_resource_putref(res);
688	return count;
689}
690
691static int osc_destroy_interpret(const struct lu_env *env,
692				 struct ptlrpc_request *req, void *data,
693				 int rc)
694{
695	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
696
697	atomic_dec(&cli->cl_destroy_in_flight);
698	wake_up(&cli->cl_destroy_waitq);
699	return 0;
700}
701
702static int osc_can_send_destroy(struct client_obd *cli)
703{
704	if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
705	    cli->cl_max_rpcs_in_flight) {
706		/* The destroy request can be sent */
707		return 1;
708	}
709	if (atomic_dec_return(&cli->cl_destroy_in_flight) <
710	    cli->cl_max_rpcs_in_flight) {
711		/*
712		 * The counter has been modified between the two atomic
713		 * operations.
714		 */
715		wake_up(&cli->cl_destroy_waitq);
716	}
717	return 0;
718}
719
720int osc_create(const struct lu_env *env, struct obd_export *exp,
721	       struct obdo *oa, struct lov_stripe_md **ea,
722	       struct obd_trans_info *oti)
723{
724	int rc = 0;
725
726	LASSERT(oa);
727	LASSERT(ea);
728	LASSERT(oa->o_valid & OBD_MD_FLGROUP);
729
730	if ((oa->o_valid & OBD_MD_FLFLAGS) &&
731	    oa->o_flags == OBD_FL_RECREATE_OBJS) {
732		return osc_real_create(exp, oa, ea, oti);
733	}
734
735	if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
736		return osc_real_create(exp, oa, ea, oti);
737
738	/* we should not get here anymore */
739	LBUG();
740
741	return rc;
742}
743
744/* Destroy requests can be async always on the client, and we don't even really
745 * care about the return code since the client cannot do anything at all about
746 * a destroy failure.
747 * When the MDS is unlinking a filename, it saves the file objects into a
748 * recovery llog, and these object records are cancelled when the OST reports
749 * they were destroyed and sync'd to disk (i.e. transaction committed).
750 * If the client dies, or the OST is down when the object should be destroyed,
751 * the records are not cancelled, and when the OST reconnects to the MDS next,
752 * it will retrieve the llog unlink logs and then sends the log cancellation
753 * cookies to the MDS after committing destroy transactions. */
754static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
755		       struct obdo *oa, struct lov_stripe_md *ea,
756		       struct obd_trans_info *oti, struct obd_export *md_export,
757		       void *capa)
758{
759	struct client_obd     *cli = &exp->exp_obd->u.cli;
760	struct ptlrpc_request *req;
761	struct ost_body       *body;
762	LIST_HEAD(cancels);
763	int rc, count;
764
765	if (!oa) {
766		CDEBUG(D_INFO, "oa NULL\n");
767		return -EINVAL;
768	}
769
770	count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771					LDLM_FL_DISCARD_DATA);
772
773	req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
774	if (req == NULL) {
775		ldlm_lock_list_put(&cancels, l_bl_ast, count);
776		return -ENOMEM;
777	}
778
779	osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780	rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
781			       0, &cancels, count);
782	if (rc) {
783		ptlrpc_request_free(req);
784		return rc;
785	}
786
787	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788	ptlrpc_at_set_req_timeout(req);
789
790	if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791		oa->o_lcookie = *oti->oti_logcookies;
792	body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
793	LASSERT(body);
794	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
795
796	osc_pack_capa(req, body, (struct obd_capa *)capa);
797	ptlrpc_request_set_replen(req);
798
799	/* If osc_destroy is for destroying the unlink orphan,
800	 * sent from MDT to OST, which should not be blocked here,
801	 * because the process might be triggered by ptlrpcd, and
802	 * it is not good to block ptlrpcd thread (b=16006)*/
803	if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804		req->rq_interpret_reply = osc_destroy_interpret;
805		if (!osc_can_send_destroy(cli)) {
806			struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
807							  NULL);
808
809			/*
810			 * Wait until the number of on-going destroy RPCs drops
811			 * under max_rpc_in_flight
812			 */
813			l_wait_event_exclusive(cli->cl_destroy_waitq,
814					       osc_can_send_destroy(cli), &lwi);
815		}
816	}
817
818	/* Do not wait for response */
819	ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
820	return 0;
821}
822
823static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
824				long writing_bytes)
825{
826	u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
827
828	LASSERT(!(oa->o_valid & bits));
829
830	oa->o_valid |= bits;
831	client_obd_list_lock(&cli->cl_loi_list_lock);
832	oa->o_dirty = cli->cl_dirty;
833	if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834		     cli->cl_dirty_max)) {
835		CERROR("dirty %lu - %lu > dirty_max %lu\n",
836		       cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
837		oa->o_undirty = 0;
838	} else if (unlikely(atomic_read(&obd_dirty_pages) -
839			    atomic_read(&obd_dirty_transit_pages) >
840			    (long)(obd_max_dirty_pages + 1))) {
841		/* The atomic_read() allowing the atomic_inc() are
842		 * not covered by a lock thus they may safely race and trip
843		 * this CERROR() unless we add in a small fudge factor (+1). */
844		CERROR("dirty %d - %d > system dirty_max %d\n",
845		       atomic_read(&obd_dirty_pages),
846		       atomic_read(&obd_dirty_transit_pages),
847		       obd_max_dirty_pages);
848		oa->o_undirty = 0;
849	} else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850		CERROR("dirty %lu - dirty_max %lu too big???\n",
851		       cli->cl_dirty, cli->cl_dirty_max);
852		oa->o_undirty = 0;
853	} else {
854		long max_in_flight = (cli->cl_max_pages_per_rpc <<
855				      PAGE_CACHE_SHIFT)*
856				     (cli->cl_max_rpcs_in_flight + 1);
857		oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
858	}
859	oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860	oa->o_dropped = cli->cl_lost_grant;
861	cli->cl_lost_grant = 0;
862	client_obd_list_unlock(&cli->cl_loi_list_lock);
863	CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
864	       oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
865
866}
867
868void osc_update_next_shrink(struct client_obd *cli)
869{
870	cli->cl_next_shrink_grant =
871		cfs_time_shift(cli->cl_grant_shrink_interval);
872	CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873	       cli->cl_next_shrink_grant);
874}
875
876static void __osc_update_grant(struct client_obd *cli, u64 grant)
877{
878	client_obd_list_lock(&cli->cl_loi_list_lock);
879	cli->cl_avail_grant += grant;
880	client_obd_list_unlock(&cli->cl_loi_list_lock);
881}
882
883static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
884{
885	if (body->oa.o_valid & OBD_MD_FLGRANT) {
886		CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
887		__osc_update_grant(cli, body->oa.o_grant);
888	}
889}
890
891static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892			      u32 keylen, void *key, u32 vallen,
893			      void *val, struct ptlrpc_request_set *set);
894
895static int osc_shrink_grant_interpret(const struct lu_env *env,
896				      struct ptlrpc_request *req,
897				      void *aa, int rc)
898{
899	struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900	struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
901	struct ost_body *body;
902
903	if (rc != 0) {
904		__osc_update_grant(cli, oa->o_grant);
905		goto out;
906	}
907
908	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
909	LASSERT(body);
910	osc_update_grant(cli, body);
911out:
912	OBDO_FREE(oa);
913	return rc;
914}
915
916static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
917{
918	client_obd_list_lock(&cli->cl_loi_list_lock);
919	oa->o_grant = cli->cl_avail_grant / 4;
920	cli->cl_avail_grant -= oa->o_grant;
921	client_obd_list_unlock(&cli->cl_loi_list_lock);
922	if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923		oa->o_valid |= OBD_MD_FLFLAGS;
924		oa->o_flags = 0;
925	}
926	oa->o_flags |= OBD_FL_SHRINK_GRANT;
927	osc_update_next_shrink(cli);
928}
929
930/* Shrink the current grant, either from some large amount to enough for a
931 * full set of in-flight RPCs, or if we have already shrunk to that limit
932 * then to enough for a single RPC.  This avoids keeping more grant than
933 * needed, and avoids shrinking the grant piecemeal. */
934static int osc_shrink_grant(struct client_obd *cli)
935{
936	__u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937			     (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
938
939	client_obd_list_lock(&cli->cl_loi_list_lock);
940	if (cli->cl_avail_grant <= target_bytes)
941		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942	client_obd_list_unlock(&cli->cl_loi_list_lock);
943
944	return osc_shrink_grant_to_target(cli, target_bytes);
945}
946
947int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
948{
949	int			rc = 0;
950	struct ost_body	*body;
951
952	client_obd_list_lock(&cli->cl_loi_list_lock);
953	/* Don't shrink if we are already above or below the desired limit
954	 * We don't want to shrink below a single RPC, as that will negatively
955	 * impact block allocation and long-term performance. */
956	if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
957		target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
958
959	if (target_bytes >= cli->cl_avail_grant) {
960		client_obd_list_unlock(&cli->cl_loi_list_lock);
961		return 0;
962	}
963	client_obd_list_unlock(&cli->cl_loi_list_lock);
964
965	OBD_ALLOC_PTR(body);
966	if (!body)
967		return -ENOMEM;
968
969	osc_announce_cached(cli, &body->oa, 0);
970
971	client_obd_list_lock(&cli->cl_loi_list_lock);
972	body->oa.o_grant = cli->cl_avail_grant - target_bytes;
973	cli->cl_avail_grant = target_bytes;
974	client_obd_list_unlock(&cli->cl_loi_list_lock);
975	if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
976		body->oa.o_valid |= OBD_MD_FLFLAGS;
977		body->oa.o_flags = 0;
978	}
979	body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
980	osc_update_next_shrink(cli);
981
982	rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
983				sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
984				sizeof(*body), body, NULL);
985	if (rc != 0)
986		__osc_update_grant(cli, body->oa.o_grant);
987	OBD_FREE_PTR(body);
988	return rc;
989}
990
991static int osc_should_shrink_grant(struct client_obd *client)
992{
993	unsigned long time = cfs_time_current();
994	unsigned long next_shrink = client->cl_next_shrink_grant;
995
996	if ((client->cl_import->imp_connect_data.ocd_connect_flags &
997	     OBD_CONNECT_GRANT_SHRINK) == 0)
998		return 0;
999
1000	if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1001		/* Get the current RPC size directly, instead of going via:
1002		 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1003		 * Keep comment here so that it can be found by searching. */
1004		int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1005
1006		if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1007		    client->cl_avail_grant > brw_size)
1008			return 1;
1009		else
1010			osc_update_next_shrink(client);
1011	}
1012	return 0;
1013}
1014
1015static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1016{
1017	struct client_obd *client;
1018
1019	list_for_each_entry(client, &item->ti_obd_list,
1020				cl_grant_shrink_list) {
1021		if (osc_should_shrink_grant(client))
1022			osc_shrink_grant(client);
1023	}
1024	return 0;
1025}
1026
1027static int osc_add_shrink_grant(struct client_obd *client)
1028{
1029	int rc;
1030
1031	rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1032				       TIMEOUT_GRANT,
1033				       osc_grant_shrink_grant_cb, NULL,
1034				       &client->cl_grant_shrink_list);
1035	if (rc) {
1036		CERROR("add grant client %s error %d\n",
1037			client->cl_import->imp_obd->obd_name, rc);
1038		return rc;
1039	}
1040	CDEBUG(D_CACHE, "add grant client %s \n",
1041	       client->cl_import->imp_obd->obd_name);
1042	osc_update_next_shrink(client);
1043	return 0;
1044}
1045
1046static int osc_del_shrink_grant(struct client_obd *client)
1047{
1048	return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1049					 TIMEOUT_GRANT);
1050}
1051
1052static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1053{
1054	/*
1055	 * ocd_grant is the total grant amount we're expect to hold: if we've
1056	 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1057	 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1058	 *
1059	 * race is tolerable here: if we're evicted, but imp_state already
1060	 * left EVICTED state, then cl_dirty must be 0 already.
1061	 */
1062	client_obd_list_lock(&cli->cl_loi_list_lock);
1063	if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1064		cli->cl_avail_grant = ocd->ocd_grant;
1065	else
1066		cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1067
1068	if (cli->cl_avail_grant < 0) {
1069		CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1070		      cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1071		      ocd->ocd_grant, cli->cl_dirty);
1072		/* workaround for servers which do not have the patch from
1073		 * LU-2679 */
1074		cli->cl_avail_grant = ocd->ocd_grant;
1075	}
1076
1077	/* determine the appropriate chunk size used by osc_extent. */
1078	cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1079	client_obd_list_unlock(&cli->cl_loi_list_lock);
1080
1081	CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1082		"chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1083		cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1084
1085	if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1086	    list_empty(&cli->cl_grant_shrink_list))
1087		osc_add_shrink_grant(cli);
1088}
1089
1090/* We assume that the reason this OSC got a short read is because it read
1091 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1092 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1093 * this stripe never got written at or beyond this stripe offset yet. */
1094static void handle_short_read(int nob_read, u32 page_count,
1095			      struct brw_page **pga)
1096{
1097	char *ptr;
1098	int i = 0;
1099
1100	/* skip bytes read OK */
1101	while (nob_read > 0) {
1102		LASSERT (page_count > 0);
1103
1104		if (pga[i]->count > nob_read) {
1105			/* EOF inside this page */
1106			ptr = kmap(pga[i]->pg) +
1107				(pga[i]->off & ~CFS_PAGE_MASK);
1108			memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1109			kunmap(pga[i]->pg);
1110			page_count--;
1111			i++;
1112			break;
1113		}
1114
1115		nob_read -= pga[i]->count;
1116		page_count--;
1117		i++;
1118	}
1119
1120	/* zero remaining pages */
1121	while (page_count-- > 0) {
1122		ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1123		memset(ptr, 0, pga[i]->count);
1124		kunmap(pga[i]->pg);
1125		i++;
1126	}
1127}
1128
1129static int check_write_rcs(struct ptlrpc_request *req,
1130			   int requested_nob, int niocount,
1131			   u32 page_count, struct brw_page **pga)
1132{
1133	int     i;
1134	__u32   *remote_rcs;
1135
1136	remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1137						  sizeof(*remote_rcs) *
1138						  niocount);
1139	if (remote_rcs == NULL) {
1140		CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1141		return -EPROTO;
1142	}
1143
1144	/* return error if any niobuf was in error */
1145	for (i = 0; i < niocount; i++) {
1146		if ((int)remote_rcs[i] < 0)
1147			return remote_rcs[i];
1148
1149		if (remote_rcs[i] != 0) {
1150			CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1151				i, remote_rcs[i], req);
1152			return -EPROTO;
1153		}
1154	}
1155
1156	if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1157		CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1158		       req->rq_bulk->bd_nob_transferred, requested_nob);
1159		return -EPROTO;
1160	}
1161
1162	return 0;
1163}
1164
1165static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1166{
1167	if (p1->flag != p2->flag) {
1168		unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1169				  OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1170
1171		/* warn if we try to combine flags that we don't know to be
1172		 * safe to combine */
1173		if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174			CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1175			      "report this at http://bugs.whamcloud.com/\n",
1176			      p1->flag, p2->flag);
1177		}
1178		return 0;
1179	}
1180
1181	return (p1->off + p1->count == p2->off);
1182}
1183
1184static u32 osc_checksum_bulk(int nob, u32 pg_count,
1185				   struct brw_page **pga, int opc,
1186				   cksum_type_t cksum_type)
1187{
1188	__u32				cksum;
1189	int				i = 0;
1190	struct cfs_crypto_hash_desc	*hdesc;
1191	unsigned int			bufsize;
1192	int				err;
1193	unsigned char			cfs_alg = cksum_obd2cfs(cksum_type);
1194
1195	LASSERT(pg_count > 0);
1196
1197	hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1198	if (IS_ERR(hdesc)) {
1199		CERROR("Unable to initialize checksum hash %s\n",
1200		       cfs_crypto_hash_name(cfs_alg));
1201		return PTR_ERR(hdesc);
1202	}
1203
1204	while (nob > 0 && pg_count > 0) {
1205		int count = pga[i]->count > nob ? nob : pga[i]->count;
1206
1207		/* corrupt the data before we compute the checksum, to
1208		 * simulate an OST->client data error */
1209		if (i == 0 && opc == OST_READ &&
1210		    OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1211			unsigned char *ptr = kmap(pga[i]->pg);
1212			int off = pga[i]->off & ~CFS_PAGE_MASK;
1213			memcpy(ptr + off, "bad1", min(4, nob));
1214			kunmap(pga[i]->pg);
1215		}
1216		cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1217				  pga[i]->off & ~CFS_PAGE_MASK,
1218				  count);
1219		CDEBUG(D_PAGE,
1220		       "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1221		       pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1222		       (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1223		       page_private(pga[i]->pg),
1224		       (int)(pga[i]->off & ~CFS_PAGE_MASK));
1225
1226		nob -= pga[i]->count;
1227		pg_count--;
1228		i++;
1229	}
1230
1231	bufsize = 4;
1232	err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1233
1234	if (err)
1235		cfs_crypto_hash_final(hdesc, NULL, NULL);
1236
1237	/* For sending we only compute the wrong checksum instead
1238	 * of corrupting the data so it is still correct on a redo */
1239	if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1240		cksum++;
1241
1242	return cksum;
1243}
1244
1245static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1246				struct obdo *oa,
1247				struct lov_stripe_md *lsm, u32 page_count,
1248				struct brw_page **pga,
1249				struct ptlrpc_request **reqp,
1250				struct obd_capa *ocapa, int reserve,
1251				int resend)
1252{
1253	struct ptlrpc_request   *req;
1254	struct ptlrpc_bulk_desc *desc;
1255	struct ost_body	 *body;
1256	struct obd_ioobj	*ioobj;
1257	struct niobuf_remote    *niobuf;
1258	int niocount, i, requested_nob, opc, rc;
1259	struct osc_brw_async_args *aa;
1260	struct req_capsule      *pill;
1261	struct brw_page *pg_prev;
1262
1263	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1264		return -ENOMEM; /* Recoverable */
1265	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1266		return -EINVAL; /* Fatal */
1267
1268	if ((cmd & OBD_BRW_WRITE) != 0) {
1269		opc = OST_WRITE;
1270		req = ptlrpc_request_alloc_pool(cli->cl_import,
1271						cli->cl_import->imp_rq_pool,
1272						&RQF_OST_BRW_WRITE);
1273	} else {
1274		opc = OST_READ;
1275		req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1276	}
1277	if (req == NULL)
1278		return -ENOMEM;
1279
1280	for (niocount = i = 1; i < page_count; i++) {
1281		if (!can_merge_pages(pga[i - 1], pga[i]))
1282			niocount++;
1283	}
1284
1285	pill = &req->rq_pill;
1286	req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1287			     sizeof(*ioobj));
1288	req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1289			     niocount * sizeof(*niobuf));
1290	osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1291
1292	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1293	if (rc) {
1294		ptlrpc_request_free(req);
1295		return rc;
1296	}
1297	req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1298	ptlrpc_at_set_req_timeout(req);
1299	/* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1300	 * retry logic */
1301	req->rq_no_retry_einprogress = 1;
1302
1303	desc = ptlrpc_prep_bulk_imp(req, page_count,
1304		cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1305		opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1306		OST_BULK_PORTAL);
1307
1308	if (desc == NULL) {
1309		rc = -ENOMEM;
1310		goto out;
1311	}
1312	/* NB request now owns desc and will free it when it gets freed */
1313
1314	body = req_capsule_client_get(pill, &RMF_OST_BODY);
1315	ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1316	niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1317	LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1318
1319	lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1320
1321	obdo_to_ioobj(oa, ioobj);
1322	ioobj->ioo_bufcnt = niocount;
1323	/* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1324	 * that might be send for this request.  The actual number is decided
1325	 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1326	 * "max - 1" for old client compatibility sending "0", and also so the
1327	 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1328	ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1329	osc_pack_capa(req, body, ocapa);
1330	LASSERT(page_count > 0);
1331	pg_prev = pga[0];
1332	for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1333		struct brw_page *pg = pga[i];
1334		int poff = pg->off & ~CFS_PAGE_MASK;
1335
1336		LASSERT(pg->count > 0);
1337		/* make sure there is no gap in the middle of page array */
1338		LASSERTF(page_count == 1 ||
1339			 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1340			  ergo(i > 0 && i < page_count - 1,
1341			       poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1342			  ergo(i == page_count - 1, poff == 0)),
1343			 "i: %d/%d pg: %p off: %llu, count: %u\n",
1344			 i, page_count, pg, pg->off, pg->count);
1345		LASSERTF(i == 0 || pg->off > pg_prev->off,
1346			 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1347			 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1348			 i, page_count,
1349			 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1350			 pg_prev->pg, page_private(pg_prev->pg),
1351			 pg_prev->pg->index, pg_prev->off);
1352		LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1353			(pg->flag & OBD_BRW_SRVLOCK));
1354
1355		ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1356		requested_nob += pg->count;
1357
1358		if (i > 0 && can_merge_pages(pg_prev, pg)) {
1359			niobuf--;
1360			niobuf->len += pg->count;
1361		} else {
1362			niobuf->offset = pg->off;
1363			niobuf->len    = pg->count;
1364			niobuf->flags  = pg->flag;
1365		}
1366		pg_prev = pg;
1367	}
1368
1369	LASSERTF((void *)(niobuf - niocount) ==
1370		req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1371		"want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1372		&RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1373
1374	osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1375	if (resend) {
1376		if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1377			body->oa.o_valid |= OBD_MD_FLFLAGS;
1378			body->oa.o_flags = 0;
1379		}
1380		body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1381	}
1382
1383	if (osc_should_shrink_grant(cli))
1384		osc_shrink_grant_local(cli, &body->oa);
1385
1386	/* size[REQ_REC_OFF] still sizeof (*body) */
1387	if (opc == OST_WRITE) {
1388		if (cli->cl_checksum &&
1389		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1390			/* store cl_cksum_type in a local variable since
1391			 * it can be changed via lprocfs */
1392			cksum_type_t cksum_type = cli->cl_cksum_type;
1393
1394			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1395				oa->o_flags &= OBD_FL_LOCAL_MASK;
1396				body->oa.o_flags = 0;
1397			}
1398			body->oa.o_flags |= cksum_type_pack(cksum_type);
1399			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1400			body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1401							     page_count, pga,
1402							     OST_WRITE,
1403							     cksum_type);
1404			CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1405			       body->oa.o_cksum);
1406			/* save this in 'oa', too, for later checking */
1407			oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1408			oa->o_flags |= cksum_type_pack(cksum_type);
1409		} else {
1410			/* clear out the checksum flag, in case this is a
1411			 * resend but cl_checksum is no longer set. b=11238 */
1412			oa->o_valid &= ~OBD_MD_FLCKSUM;
1413		}
1414		oa->o_cksum = body->oa.o_cksum;
1415		/* 1 RC per niobuf */
1416		req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1417				     sizeof(__u32) * niocount);
1418	} else {
1419		if (cli->cl_checksum &&
1420		    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1421			if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1422				body->oa.o_flags = 0;
1423			body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1424			body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1425		}
1426	}
1427	ptlrpc_request_set_replen(req);
1428
1429	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1430	aa = ptlrpc_req_async_args(req);
1431	aa->aa_oa = oa;
1432	aa->aa_requested_nob = requested_nob;
1433	aa->aa_nio_count = niocount;
1434	aa->aa_page_count = page_count;
1435	aa->aa_resends = 0;
1436	aa->aa_ppga = pga;
1437	aa->aa_cli = cli;
1438	INIT_LIST_HEAD(&aa->aa_oaps);
1439	if (ocapa && reserve)
1440		aa->aa_ocapa = capa_get(ocapa);
1441
1442	*reqp = req;
1443	return 0;
1444
1445 out:
1446	ptlrpc_req_finished(req);
1447	return rc;
1448}
1449
1450static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1451				__u32 client_cksum, __u32 server_cksum, int nob,
1452				u32 page_count, struct brw_page **pga,
1453				cksum_type_t client_cksum_type)
1454{
1455	__u32 new_cksum;
1456	char *msg;
1457	cksum_type_t cksum_type;
1458
1459	if (server_cksum == client_cksum) {
1460		CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1461		return 0;
1462	}
1463
1464	cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1465				       oa->o_flags : 0);
1466	new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1467				      cksum_type);
1468
1469	if (cksum_type != client_cksum_type)
1470		msg = "the server did not use the checksum type specified in "
1471		      "the original request - likely a protocol problem";
1472	else if (new_cksum == server_cksum)
1473		msg = "changed on the client after we checksummed it - "
1474		      "likely false positive due to mmap IO (bug 11742)";
1475	else if (new_cksum == client_cksum)
1476		msg = "changed in transit before arrival at OST";
1477	else
1478		msg = "changed in transit AND doesn't match the original - "
1479		      "likely false positive due to mmap IO (bug 11742)";
1480
1481	LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1482			   " object "DOSTID" extent [%llu-%llu]\n",
1483			   msg, libcfs_nid2str(peer->nid),
1484			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1485			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1486			   oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1487			   POSTID(&oa->o_oi), pga[0]->off,
1488			   pga[page_count-1]->off + pga[page_count-1]->count - 1);
1489	CERROR("original client csum %x (type %x), server csum %x (type %x), "
1490	       "client csum now %x\n", client_cksum, client_cksum_type,
1491	       server_cksum, cksum_type, new_cksum);
1492	return 1;
1493}
1494
1495/* Note rc enters this function as number of bytes transferred */
1496static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1497{
1498	struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1499	const lnet_process_id_t *peer =
1500			&req->rq_import->imp_connection->c_peer;
1501	struct client_obd *cli = aa->aa_cli;
1502	struct ost_body *body;
1503	__u32 client_cksum = 0;
1504
1505	if (rc < 0 && rc != -EDQUOT) {
1506		DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1507		return rc;
1508	}
1509
1510	LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1511	body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1512	if (body == NULL) {
1513		DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1514		return -EPROTO;
1515	}
1516
1517	/* set/clear over quota flag for a uid/gid */
1518	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1519	    body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1520		unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1521
1522		CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1523		       body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1524		       body->oa.o_flags);
1525		osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1526	}
1527
1528	osc_update_grant(cli, body);
1529
1530	if (rc < 0)
1531		return rc;
1532
1533	if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1534		client_cksum = aa->aa_oa->o_cksum; /* save for later */
1535
1536	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1537		if (rc > 0) {
1538			CERROR("Unexpected +ve rc %d\n", rc);
1539			return -EPROTO;
1540		}
1541		LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1542
1543		if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1544			return -EAGAIN;
1545
1546		if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1547		    check_write_checksum(&body->oa, peer, client_cksum,
1548					 body->oa.o_cksum, aa->aa_requested_nob,
1549					 aa->aa_page_count, aa->aa_ppga,
1550					 cksum_type_unpack(aa->aa_oa->o_flags)))
1551			return -EAGAIN;
1552
1553		rc = check_write_rcs(req, aa->aa_requested_nob,
1554				     aa->aa_nio_count,
1555				     aa->aa_page_count, aa->aa_ppga);
1556		goto out;
1557	}
1558
1559	/* The rest of this function executes only for OST_READs */
1560
1561	/* if unwrap_bulk failed, return -EAGAIN to retry */
1562	rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1563	if (rc < 0) {
1564		rc = -EAGAIN;
1565		goto out;
1566	}
1567
1568	if (rc > aa->aa_requested_nob) {
1569		CERROR("Unexpected rc %d (%d requested)\n", rc,
1570		       aa->aa_requested_nob);
1571		return -EPROTO;
1572	}
1573
1574	if (rc != req->rq_bulk->bd_nob_transferred) {
1575		CERROR ("Unexpected rc %d (%d transferred)\n",
1576			rc, req->rq_bulk->bd_nob_transferred);
1577		return -EPROTO;
1578	}
1579
1580	if (rc < aa->aa_requested_nob)
1581		handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1582
1583	if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1584		static int cksum_counter;
1585		__u32      server_cksum = body->oa.o_cksum;
1586		char      *via;
1587		char      *router;
1588		cksum_type_t cksum_type;
1589
1590		cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1591					       body->oa.o_flags : 0);
1592		client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1593						 aa->aa_ppga, OST_READ,
1594						 cksum_type);
1595
1596		if (peer->nid == req->rq_bulk->bd_sender) {
1597			via = router = "";
1598		} else {
1599			via = " via ";
1600			router = libcfs_nid2str(req->rq_bulk->bd_sender);
1601		}
1602
1603		if (server_cksum != client_cksum) {
1604			LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1605					   "%s%s%s inode "DFID" object "DOSTID
1606					   " extent [%llu-%llu]\n",
1607					   req->rq_import->imp_obd->obd_name,
1608					   libcfs_nid2str(peer->nid),
1609					   via, router,
1610					   body->oa.o_valid & OBD_MD_FLFID ?
1611						body->oa.o_parent_seq : (__u64)0,
1612					   body->oa.o_valid & OBD_MD_FLFID ?
1613						body->oa.o_parent_oid : 0,
1614					   body->oa.o_valid & OBD_MD_FLFID ?
1615						body->oa.o_parent_ver : 0,
1616					   POSTID(&body->oa.o_oi),
1617					   aa->aa_ppga[0]->off,
1618					   aa->aa_ppga[aa->aa_page_count-1]->off +
1619					   aa->aa_ppga[aa->aa_page_count-1]->count -
1620									1);
1621			CERROR("client %x, server %x, cksum_type %x\n",
1622			       client_cksum, server_cksum, cksum_type);
1623			cksum_counter = 0;
1624			aa->aa_oa->o_cksum = client_cksum;
1625			rc = -EAGAIN;
1626		} else {
1627			cksum_counter++;
1628			CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1629			rc = 0;
1630		}
1631	} else if (unlikely(client_cksum)) {
1632		static int cksum_missed;
1633
1634		cksum_missed++;
1635		if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1636			CERROR("Checksum %u requested from %s but not sent\n",
1637			       cksum_missed, libcfs_nid2str(peer->nid));
1638	} else {
1639		rc = 0;
1640	}
1641out:
1642	if (rc >= 0)
1643		lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1644				     aa->aa_oa, &body->oa);
1645
1646	return rc;
1647}
1648
1649static int osc_brw_redo_request(struct ptlrpc_request *request,
1650				struct osc_brw_async_args *aa, int rc)
1651{
1652	struct ptlrpc_request *new_req;
1653	struct osc_brw_async_args *new_aa;
1654	struct osc_async_page *oap;
1655
1656	DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1657		  "redo for recoverable error %d", rc);
1658
1659	rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1660					OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1661				  aa->aa_cli, aa->aa_oa,
1662				  NULL /* lsm unused by osc currently */,
1663				  aa->aa_page_count, aa->aa_ppga,
1664				  &new_req, aa->aa_ocapa, 0, 1);
1665	if (rc)
1666		return rc;
1667
1668	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1669		if (oap->oap_request != NULL) {
1670			LASSERTF(request == oap->oap_request,
1671				 "request %p != oap_request %p\n",
1672				 request, oap->oap_request);
1673			if (oap->oap_interrupted) {
1674				ptlrpc_req_finished(new_req);
1675				return -EINTR;
1676			}
1677		}
1678	}
1679	/* New request takes over pga and oaps from old request.
1680	 * Note that copying a list_head doesn't work, need to move it... */
1681	aa->aa_resends++;
1682	new_req->rq_interpret_reply = request->rq_interpret_reply;
1683	new_req->rq_async_args = request->rq_async_args;
1684	/* cap resend delay to the current request timeout, this is similar to
1685	 * what ptlrpc does (see after_reply()) */
1686	if (aa->aa_resends > new_req->rq_timeout)
1687		new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1688	else
1689		new_req->rq_sent = get_seconds() + aa->aa_resends;
1690	new_req->rq_generation_set = 1;
1691	new_req->rq_import_generation = request->rq_import_generation;
1692
1693	new_aa = ptlrpc_req_async_args(new_req);
1694
1695	INIT_LIST_HEAD(&new_aa->aa_oaps);
1696	list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1697	INIT_LIST_HEAD(&new_aa->aa_exts);
1698	list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1699	new_aa->aa_resends = aa->aa_resends;
1700
1701	list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1702		if (oap->oap_request) {
1703			ptlrpc_req_finished(oap->oap_request);
1704			oap->oap_request = ptlrpc_request_addref(new_req);
1705		}
1706	}
1707
1708	new_aa->aa_ocapa = aa->aa_ocapa;
1709	aa->aa_ocapa = NULL;
1710
1711	/* XXX: This code will run into problem if we're going to support
1712	 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1713	 * and wait for all of them to be finished. We should inherit request
1714	 * set from old request. */
1715	ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1716
1717	DEBUG_REQ(D_INFO, new_req, "new request");
1718	return 0;
1719}
1720
1721/*
1722 * ugh, we want disk allocation on the target to happen in offset order.  we'll
1723 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1724 * fine for our small page arrays and doesn't require allocation.  its an
1725 * insertion sort that swaps elements that are strides apart, shrinking the
1726 * stride down until its '1' and the array is sorted.
1727 */
1728static void sort_brw_pages(struct brw_page **array, int num)
1729{
1730	int stride, i, j;
1731	struct brw_page *tmp;
1732
1733	if (num == 1)
1734		return;
1735	for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1736		;
1737
1738	do {
1739		stride /= 3;
1740		for (i = stride ; i < num ; i++) {
1741			tmp = array[i];
1742			j = i;
1743			while (j >= stride && array[j - stride]->off > tmp->off) {
1744				array[j] = array[j - stride];
1745				j -= stride;
1746			}
1747			array[j] = tmp;
1748		}
1749	} while (stride > 1);
1750}
1751
1752static void osc_release_ppga(struct brw_page **ppga, u32 count)
1753{
1754	LASSERT(ppga != NULL);
1755	OBD_FREE(ppga, sizeof(*ppga) * count);
1756}
1757
1758static int brw_interpret(const struct lu_env *env,
1759			 struct ptlrpc_request *req, void *data, int rc)
1760{
1761	struct osc_brw_async_args *aa = data;
1762	struct osc_extent *ext;
1763	struct osc_extent *tmp;
1764	struct cl_object  *obj = NULL;
1765	struct client_obd *cli = aa->aa_cli;
1766
1767	rc = osc_brw_fini_request(req, rc);
1768	CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1769	/* When server return -EINPROGRESS, client should always retry
1770	 * regardless of the number of times the bulk was resent already. */
1771	if (osc_recoverable_error(rc)) {
1772		if (req->rq_import_generation !=
1773		    req->rq_import->imp_generation) {
1774			CDEBUG(D_HA, "%s: resend cross eviction for object: "
1775			       ""DOSTID", rc = %d.\n",
1776			       req->rq_import->imp_obd->obd_name,
1777			       POSTID(&aa->aa_oa->o_oi), rc);
1778		} else if (rc == -EINPROGRESS ||
1779		    client_should_resend(aa->aa_resends, aa->aa_cli)) {
1780			rc = osc_brw_redo_request(req, aa, rc);
1781		} else {
1782			CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1783			       req->rq_import->imp_obd->obd_name,
1784			       POSTID(&aa->aa_oa->o_oi), rc);
1785		}
1786
1787		if (rc == 0)
1788			return 0;
1789		else if (rc == -EAGAIN || rc == -EINPROGRESS)
1790			rc = -EIO;
1791	}
1792
1793	if (aa->aa_ocapa) {
1794		capa_put(aa->aa_ocapa);
1795		aa->aa_ocapa = NULL;
1796	}
1797
1798	list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1799		if (obj == NULL && rc == 0) {
1800			obj = osc2cl(ext->oe_obj);
1801			cl_object_get(obj);
1802		}
1803
1804		list_del_init(&ext->oe_link);
1805		osc_extent_finish(env, ext, 1, rc);
1806	}
1807	LASSERT(list_empty(&aa->aa_exts));
1808	LASSERT(list_empty(&aa->aa_oaps));
1809
1810	if (obj != NULL) {
1811		struct obdo *oa = aa->aa_oa;
1812		struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1813		unsigned long valid = 0;
1814
1815		LASSERT(rc == 0);
1816		if (oa->o_valid & OBD_MD_FLBLOCKS) {
1817			attr->cat_blocks = oa->o_blocks;
1818			valid |= CAT_BLOCKS;
1819		}
1820		if (oa->o_valid & OBD_MD_FLMTIME) {
1821			attr->cat_mtime = oa->o_mtime;
1822			valid |= CAT_MTIME;
1823		}
1824		if (oa->o_valid & OBD_MD_FLATIME) {
1825			attr->cat_atime = oa->o_atime;
1826			valid |= CAT_ATIME;
1827		}
1828		if (oa->o_valid & OBD_MD_FLCTIME) {
1829			attr->cat_ctime = oa->o_ctime;
1830			valid |= CAT_CTIME;
1831		}
1832		if (valid != 0) {
1833			cl_object_attr_lock(obj);
1834			cl_object_attr_set(env, obj, attr, valid);
1835			cl_object_attr_unlock(obj);
1836		}
1837		cl_object_put(env, obj);
1838	}
1839	OBDO_FREE(aa->aa_oa);
1840
1841	cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1842			  req->rq_bulk->bd_nob_transferred);
1843	osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1844	ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1845
1846	client_obd_list_lock(&cli->cl_loi_list_lock);
1847	/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1848	 * is called so we know whether to go to sync BRWs or wait for more
1849	 * RPCs to complete */
1850	if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1851		cli->cl_w_in_flight--;
1852	else
1853		cli->cl_r_in_flight--;
1854	osc_wake_cache_waiters(cli);
1855	client_obd_list_unlock(&cli->cl_loi_list_lock);
1856
1857	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1858	return rc;
1859}
1860
1861/**
1862 * Build an RPC by the list of extent @ext_list. The caller must ensure
1863 * that the total pages in this list are NOT over max pages per RPC.
1864 * Extents in the list must be in OES_RPC state.
1865 */
1866int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1867		  struct list_head *ext_list, int cmd, pdl_policy_t pol)
1868{
1869	struct ptlrpc_request		*req = NULL;
1870	struct osc_extent		*ext;
1871	struct brw_page			**pga = NULL;
1872	struct osc_brw_async_args	*aa = NULL;
1873	struct obdo			*oa = NULL;
1874	struct osc_async_page		*oap;
1875	struct osc_async_page		*tmp;
1876	struct cl_req			*clerq = NULL;
1877	enum cl_req_type		crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1878								      CRT_READ;
1879	struct ldlm_lock		*lock = NULL;
1880	struct cl_req_attr		*crattr = NULL;
1881	u64				starting_offset = OBD_OBJECT_EOF;
1882	u64				ending_offset = 0;
1883	int				mpflag = 0;
1884	int				mem_tight = 0;
1885	int				page_count = 0;
1886	int				i;
1887	int				rc;
1888	LIST_HEAD(rpc_list);
1889
1890	LASSERT(!list_empty(ext_list));
1891
1892	/* add pages into rpc_list to build BRW rpc */
1893	list_for_each_entry(ext, ext_list, oe_link) {
1894		LASSERT(ext->oe_state == OES_RPC);
1895		mem_tight |= ext->oe_memalloc;
1896		list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1897			++page_count;
1898			list_add_tail(&oap->oap_rpc_item, &rpc_list);
1899			if (starting_offset > oap->oap_obj_off)
1900				starting_offset = oap->oap_obj_off;
1901			else
1902				LASSERT(oap->oap_page_off == 0);
1903			if (ending_offset < oap->oap_obj_off + oap->oap_count)
1904				ending_offset = oap->oap_obj_off +
1905						oap->oap_count;
1906			else
1907				LASSERT(oap->oap_page_off + oap->oap_count ==
1908					PAGE_CACHE_SIZE);
1909		}
1910	}
1911
1912	if (mem_tight)
1913		mpflag = cfs_memory_pressure_get_and_set();
1914
1915	OBD_ALLOC(crattr, sizeof(*crattr));
1916	if (crattr == NULL) {
1917		rc = -ENOMEM;
1918		goto out;
1919	}
1920
1921	OBD_ALLOC(pga, sizeof(*pga) * page_count);
1922	if (pga == NULL) {
1923		rc = -ENOMEM;
1924		goto out;
1925	}
1926
1927	OBDO_ALLOC(oa);
1928	if (oa == NULL) {
1929		rc = -ENOMEM;
1930		goto out;
1931	}
1932
1933	i = 0;
1934	list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1935		struct cl_page *page = oap2cl_page(oap);
1936		if (clerq == NULL) {
1937			clerq = cl_req_alloc(env, page, crt,
1938					     1 /* only 1-object rpcs for now */);
1939			if (IS_ERR(clerq)) {
1940				rc = PTR_ERR(clerq);
1941				goto out;
1942			}
1943			lock = oap->oap_ldlm_lock;
1944		}
1945		if (mem_tight)
1946			oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1947		pga[i] = &oap->oap_brw_page;
1948		pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1949		CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1950		       pga[i]->pg, page_index(oap->oap_page), oap,
1951		       pga[i]->flag);
1952		i++;
1953		cl_req_page_add(env, clerq, page);
1954	}
1955
1956	/* always get the data for the obdo for the rpc */
1957	LASSERT(clerq != NULL);
1958	crattr->cra_oa = oa;
1959	cl_req_attr_set(env, clerq, crattr, ~0ULL);
1960	if (lock) {
1961		oa->o_handle = lock->l_remote_handle;
1962		oa->o_valid |= OBD_MD_FLHANDLE;
1963	}
1964
1965	rc = cl_req_prep(env, clerq);
1966	if (rc != 0) {
1967		CERROR("cl_req_prep failed: %d\n", rc);
1968		goto out;
1969	}
1970
1971	sort_brw_pages(pga, page_count);
1972	rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1973			pga, &req, crattr->cra_capa, 1, 0);
1974	if (rc != 0) {
1975		CERROR("prep_req failed: %d\n", rc);
1976		goto out;
1977	}
1978
1979	req->rq_interpret_reply = brw_interpret;
1980
1981	if (mem_tight != 0)
1982		req->rq_memalloc = 1;
1983
1984	/* Need to update the timestamps after the request is built in case
1985	 * we race with setattr (locally or in queue at OST).  If OST gets
1986	 * later setattr before earlier BRW (as determined by the request xid),
1987	 * the OST will not use BRW timestamps.  Sadly, there is no obvious
1988	 * way to do this in a single call.  bug 10150 */
1989	cl_req_attr_set(env, clerq, crattr,
1990			OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1991
1992	lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1993
1994	CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1995	aa = ptlrpc_req_async_args(req);
1996	INIT_LIST_HEAD(&aa->aa_oaps);
1997	list_splice_init(&rpc_list, &aa->aa_oaps);
1998	INIT_LIST_HEAD(&aa->aa_exts);
1999	list_splice_init(ext_list, &aa->aa_exts);
2000	aa->aa_clerq = clerq;
2001
2002	/* queued sync pages can be torn down while the pages
2003	 * were between the pending list and the rpc */
2004	tmp = NULL;
2005	list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2006		/* only one oap gets a request reference */
2007		if (tmp == NULL)
2008			tmp = oap;
2009		if (oap->oap_interrupted && !req->rq_intr) {
2010			CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2011					oap, req);
2012			ptlrpc_mark_interrupted(req);
2013		}
2014	}
2015	if (tmp != NULL)
2016		tmp->oap_request = ptlrpc_request_addref(req);
2017
2018	client_obd_list_lock(&cli->cl_loi_list_lock);
2019	starting_offset >>= PAGE_CACHE_SHIFT;
2020	if (cmd == OBD_BRW_READ) {
2021		cli->cl_r_in_flight++;
2022		lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2023		lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2024		lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2025				      starting_offset + 1);
2026	} else {
2027		cli->cl_w_in_flight++;
2028		lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2029		lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2030		lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2031				      starting_offset + 1);
2032	}
2033	client_obd_list_unlock(&cli->cl_loi_list_lock);
2034
2035	DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2036		  page_count, aa, cli->cl_r_in_flight,
2037		  cli->cl_w_in_flight);
2038
2039	/* XXX: Maybe the caller can check the RPC bulk descriptor to
2040	 * see which CPU/NUMA node the majority of pages were allocated
2041	 * on, and try to assign the async RPC to the CPU core
2042	 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2043	 *
2044	 * But on the other hand, we expect that multiple ptlrpcd
2045	 * threads and the initial write sponsor can run in parallel,
2046	 * especially when data checksum is enabled, which is CPU-bound
2047	 * operation and single ptlrpcd thread cannot process in time.
2048	 * So more ptlrpcd threads sharing BRW load
2049	 * (with PDL_POLICY_ROUND) seems better.
2050	 */
2051	ptlrpcd_add_req(req, pol, -1);
2052	rc = 0;
2053
2054out:
2055	if (mem_tight != 0)
2056		cfs_memory_pressure_restore(mpflag);
2057
2058	if (crattr != NULL) {
2059		capa_put(crattr->cra_capa);
2060		OBD_FREE(crattr, sizeof(*crattr));
2061	}
2062
2063	if (rc != 0) {
2064		LASSERT(req == NULL);
2065
2066		if (oa)
2067			OBDO_FREE(oa);
2068		if (pga)
2069			OBD_FREE(pga, sizeof(*pga) * page_count);
2070		/* this should happen rarely and is pretty bad, it makes the
2071		 * pending list not follow the dirty order */
2072		while (!list_empty(ext_list)) {
2073			ext = list_entry(ext_list->next, struct osc_extent,
2074					     oe_link);
2075			list_del_init(&ext->oe_link);
2076			osc_extent_finish(env, ext, 0, rc);
2077		}
2078		if (clerq && !IS_ERR(clerq))
2079			cl_req_completion(env, clerq, rc);
2080	}
2081	return rc;
2082}
2083
2084static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2085					struct ldlm_enqueue_info *einfo)
2086{
2087	void *data = einfo->ei_cbdata;
2088	int set = 0;
2089
2090	LASSERT(lock != NULL);
2091	LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2092	LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2093	LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2094	LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2095
2096	lock_res_and_lock(lock);
2097	spin_lock(&osc_ast_guard);
2098
2099	if (lock->l_ast_data == NULL)
2100		lock->l_ast_data = data;
2101	if (lock->l_ast_data == data)
2102		set = 1;
2103
2104	spin_unlock(&osc_ast_guard);
2105	unlock_res_and_lock(lock);
2106
2107	return set;
2108}
2109
2110static int osc_set_data_with_check(struct lustre_handle *lockh,
2111				   struct ldlm_enqueue_info *einfo)
2112{
2113	struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2114	int set = 0;
2115
2116	if (lock != NULL) {
2117		set = osc_set_lock_data_with_check(lock, einfo);
2118		LDLM_LOCK_PUT(lock);
2119	} else
2120		CERROR("lockh %p, data %p - client evicted?\n",
2121		       lockh, einfo->ei_cbdata);
2122	return set;
2123}
2124
2125/* find any ldlm lock of the inode in osc
2126 * return 0    not find
2127 *	1    find one
2128 *      < 0    error */
2129static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2130			   ldlm_iterator_t replace, void *data)
2131{
2132	struct ldlm_res_id res_id;
2133	struct obd_device *obd = class_exp2obd(exp);
2134	int rc = 0;
2135
2136	ostid_build_res_name(&lsm->lsm_oi, &res_id);
2137	rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2138	if (rc == LDLM_ITER_STOP)
2139		return 1;
2140	if (rc == LDLM_ITER_CONTINUE)
2141		return 0;
2142	return rc;
2143}
2144
2145static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2146			    obd_enqueue_update_f upcall, void *cookie,
2147			    __u64 *flags, int agl, int rc)
2148{
2149	int intent = *flags & LDLM_FL_HAS_INTENT;
2150
2151	if (intent) {
2152		/* The request was created before ldlm_cli_enqueue call. */
2153		if (rc == ELDLM_LOCK_ABORTED) {
2154			struct ldlm_reply *rep;
2155			rep = req_capsule_server_get(&req->rq_pill,
2156						     &RMF_DLM_REP);
2157
2158			LASSERT(rep != NULL);
2159			rep->lock_policy_res1 =
2160				ptlrpc_status_ntoh(rep->lock_policy_res1);
2161			if (rep->lock_policy_res1)
2162				rc = rep->lock_policy_res1;
2163		}
2164	}
2165
2166	if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2167	    (rc == 0)) {
2168		*flags |= LDLM_FL_LVB_READY;
2169		CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2170		       lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2171	}
2172
2173	/* Call the update callback. */
2174	rc = (*upcall)(cookie, rc);
2175	return rc;
2176}
2177
2178static int osc_enqueue_interpret(const struct lu_env *env,
2179				 struct ptlrpc_request *req,
2180				 struct osc_enqueue_args *aa, int rc)
2181{
2182	struct ldlm_lock *lock;
2183	struct lustre_handle handle;
2184	__u32 mode;
2185	struct ost_lvb *lvb;
2186	__u32 lvb_len;
2187	__u64 *flags = aa->oa_flags;
2188
2189	/* Make a local copy of a lock handle and a mode, because aa->oa_*
2190	 * might be freed anytime after lock upcall has been called. */
2191	lustre_handle_copy(&handle, aa->oa_lockh);
2192	mode = aa->oa_ei->ei_mode;
2193
2194	/* ldlm_cli_enqueue is holding a reference on the lock, so it must
2195	 * be valid. */
2196	lock = ldlm_handle2lock(&handle);
2197
2198	/* Take an additional reference so that a blocking AST that
2199	 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2200	 * to arrive after an upcall has been executed by
2201	 * osc_enqueue_fini(). */
2202	ldlm_lock_addref(&handle, mode);
2203
2204	/* Let CP AST to grant the lock first. */
2205	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2206
2207	if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2208		lvb = NULL;
2209		lvb_len = 0;
2210	} else {
2211		lvb = aa->oa_lvb;
2212		lvb_len = sizeof(*aa->oa_lvb);
2213	}
2214
2215	/* Complete obtaining the lock procedure. */
2216	rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2217				   mode, flags, lvb, lvb_len, &handle, rc);
2218	/* Complete osc stuff. */
2219	rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2220			      flags, aa->oa_agl, rc);
2221
2222	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2223
2224	/* Release the lock for async request. */
2225	if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2226		/*
2227		 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2228		 * not already released by
2229		 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2230		 */
2231		ldlm_lock_decref(&handle, mode);
2232
2233	LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2234		 aa->oa_lockh, req, aa);
2235	ldlm_lock_decref(&handle, mode);
2236	LDLM_LOCK_PUT(lock);
2237	return rc;
2238}
2239
2240struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2241
2242/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2243 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2244 * other synchronous requests, however keeping some locks and trying to obtain
2245 * others may take a considerable amount of time in a case of ost failure; and
2246 * when other sync requests do not get released lock from a client, the client
2247 * is excluded from the cluster -- such scenarious make the life difficult, so
2248 * release locks just after they are obtained. */
2249int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2250		     __u64 *flags, ldlm_policy_data_t *policy,
2251		     struct ost_lvb *lvb, int kms_valid,
2252		     obd_enqueue_update_f upcall, void *cookie,
2253		     struct ldlm_enqueue_info *einfo,
2254		     struct lustre_handle *lockh,
2255		     struct ptlrpc_request_set *rqset, int async, int agl)
2256{
2257	struct obd_device *obd = exp->exp_obd;
2258	struct ptlrpc_request *req = NULL;
2259	int intent = *flags & LDLM_FL_HAS_INTENT;
2260	__u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2261	ldlm_mode_t mode;
2262	int rc;
2263
2264	/* Filesystem lock extents are extended to page boundaries so that
2265	 * dealing with the page cache is a little smoother.  */
2266	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2267	policy->l_extent.end |= ~CFS_PAGE_MASK;
2268
2269	/*
2270	 * kms is not valid when either object is completely fresh (so that no
2271	 * locks are cached), or object was evicted. In the latter case cached
2272	 * lock cannot be used, because it would prime inode state with
2273	 * potentially stale LVB.
2274	 */
2275	if (!kms_valid)
2276		goto no_match;
2277
2278	/* Next, search for already existing extent locks that will cover us */
2279	/* If we're trying to read, we also search for an existing PW lock.  The
2280	 * VFS and page cache already protect us locally, so lots of readers/
2281	 * writers can share a single PW lock.
2282	 *
2283	 * There are problems with conversion deadlocks, so instead of
2284	 * converting a read lock to a write lock, we'll just enqueue a new
2285	 * one.
2286	 *
2287	 * At some point we should cancel the read lock instead of making them
2288	 * send us a blocking callback, but there are problems with canceling
2289	 * locks out from other users right now, too. */
2290	mode = einfo->ei_mode;
2291	if (einfo->ei_mode == LCK_PR)
2292		mode |= LCK_PW;
2293	mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2294			       einfo->ei_type, policy, mode, lockh, 0);
2295	if (mode) {
2296		struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2297
2298		if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2299			/* For AGL, if enqueue RPC is sent but the lock is not
2300			 * granted, then skip to process this strpe.
2301			 * Return -ECANCELED to tell the caller. */
2302			ldlm_lock_decref(lockh, mode);
2303			LDLM_LOCK_PUT(matched);
2304			return -ECANCELED;
2305		} else if (osc_set_lock_data_with_check(matched, einfo)) {
2306			*flags |= LDLM_FL_LVB_READY;
2307			/* addref the lock only if not async requests and PW
2308			 * lock is matched whereas we asked for PR. */
2309			if (!rqset && einfo->ei_mode != mode)
2310				ldlm_lock_addref(lockh, LCK_PR);
2311			if (intent) {
2312				/* I would like to be able to ASSERT here that
2313				 * rss <= kms, but I can't, for reasons which
2314				 * are explained in lov_enqueue() */
2315			}
2316
2317			/* We already have a lock, and it's referenced.
2318			 *
2319			 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2320			 * AGL upcall may change it to CLS_HELD directly. */
2321			(*upcall)(cookie, ELDLM_OK);
2322
2323			if (einfo->ei_mode != mode)
2324				ldlm_lock_decref(lockh, LCK_PW);
2325			else if (rqset)
2326				/* For async requests, decref the lock. */
2327				ldlm_lock_decref(lockh, einfo->ei_mode);
2328			LDLM_LOCK_PUT(matched);
2329			return ELDLM_OK;
2330		} else {
2331			ldlm_lock_decref(lockh, mode);
2332			LDLM_LOCK_PUT(matched);
2333		}
2334	}
2335
2336 no_match:
2337	if (intent) {
2338		LIST_HEAD(cancels);
2339		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2340					   &RQF_LDLM_ENQUEUE_LVB);
2341		if (req == NULL)
2342			return -ENOMEM;
2343
2344		rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2345		if (rc) {
2346			ptlrpc_request_free(req);
2347			return rc;
2348		}
2349
2350		req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2351				     sizeof(*lvb));
2352		ptlrpc_request_set_replen(req);
2353	}
2354
2355	/* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2356	*flags &= ~LDLM_FL_BLOCK_GRANTED;
2357
2358	rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2359			      sizeof(*lvb), LVB_T_OST, lockh, async);
2360	if (rqset) {
2361		if (!rc) {
2362			struct osc_enqueue_args *aa;
2363			CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2364			aa = ptlrpc_req_async_args(req);
2365			aa->oa_ei = einfo;
2366			aa->oa_exp = exp;
2367			aa->oa_flags  = flags;
2368			aa->oa_upcall = upcall;
2369			aa->oa_cookie = cookie;
2370			aa->oa_lvb    = lvb;
2371			aa->oa_lockh  = lockh;
2372			aa->oa_agl    = !!agl;
2373
2374			req->rq_interpret_reply =
2375				(ptlrpc_interpterer_t)osc_enqueue_interpret;
2376			if (rqset == PTLRPCD_SET)
2377				ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2378			else
2379				ptlrpc_set_add_req(rqset, req);
2380		} else if (intent) {
2381			ptlrpc_req_finished(req);
2382		}
2383		return rc;
2384	}
2385
2386	rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2387	if (intent)
2388		ptlrpc_req_finished(req);
2389
2390	return rc;
2391}
2392
2393int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2394		   __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2395		   __u64 *flags, void *data, struct lustre_handle *lockh,
2396		   int unref)
2397{
2398	struct obd_device *obd = exp->exp_obd;
2399	__u64 lflags = *flags;
2400	ldlm_mode_t rc;
2401
2402	if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2403		return -EIO;
2404
2405	/* Filesystem lock extents are extended to page boundaries so that
2406	 * dealing with the page cache is a little smoother */
2407	policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2408	policy->l_extent.end |= ~CFS_PAGE_MASK;
2409
2410	/* Next, search for already existing extent locks that will cover us */
2411	/* If we're trying to read, we also search for an existing PW lock.  The
2412	 * VFS and page cache already protect us locally, so lots of readers/
2413	 * writers can share a single PW lock. */
2414	rc = mode;
2415	if (mode == LCK_PR)
2416		rc |= LCK_PW;
2417	rc = ldlm_lock_match(obd->obd_namespace, lflags,
2418			     res_id, type, policy, rc, lockh, unref);
2419	if (rc) {
2420		if (data != NULL) {
2421			if (!osc_set_data_with_check(lockh, data)) {
2422				if (!(lflags & LDLM_FL_TEST_LOCK))
2423					ldlm_lock_decref(lockh, rc);
2424				return 0;
2425			}
2426		}
2427		if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2428			ldlm_lock_addref(lockh, LCK_PR);
2429			ldlm_lock_decref(lockh, LCK_PW);
2430		}
2431		return rc;
2432	}
2433	return rc;
2434}
2435
2436int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2437{
2438	if (unlikely(mode == LCK_GROUP))
2439		ldlm_lock_decref_and_cancel(lockh, mode);
2440	else
2441		ldlm_lock_decref(lockh, mode);
2442
2443	return 0;
2444}
2445
2446static int osc_statfs_interpret(const struct lu_env *env,
2447				struct ptlrpc_request *req,
2448				struct osc_async_args *aa, int rc)
2449{
2450	struct obd_statfs *msfs;
2451
2452	if (rc == -EBADR)
2453		/* The request has in fact never been sent
2454		 * due to issues at a higher level (LOV).
2455		 * Exit immediately since the caller is
2456		 * aware of the problem and takes care
2457		 * of the clean up */
2458		 return rc;
2459
2460	if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2461	    (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2462		rc = 0;
2463		goto out;
2464	}
2465
2466	if (rc != 0)
2467		goto out;
2468
2469	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2470	if (msfs == NULL) {
2471		rc = -EPROTO;
2472		goto out;
2473	}
2474
2475	*aa->aa_oi->oi_osfs = *msfs;
2476out:
2477	rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2478	return rc;
2479}
2480
2481static int osc_statfs_async(struct obd_export *exp,
2482			    struct obd_info *oinfo, __u64 max_age,
2483			    struct ptlrpc_request_set *rqset)
2484{
2485	struct obd_device     *obd = class_exp2obd(exp);
2486	struct ptlrpc_request *req;
2487	struct osc_async_args *aa;
2488	int		    rc;
2489
2490	/* We could possibly pass max_age in the request (as an absolute
2491	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2492	 * extra calls into the filesystem if that isn't necessary (e.g.
2493	 * during mount that would help a bit).  Having relative timestamps
2494	 * is not so great if request processing is slow, while absolute
2495	 * timestamps are not ideal because they need time synchronization. */
2496	req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2497	if (req == NULL)
2498		return -ENOMEM;
2499
2500	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2501	if (rc) {
2502		ptlrpc_request_free(req);
2503		return rc;
2504	}
2505	ptlrpc_request_set_replen(req);
2506	req->rq_request_portal = OST_CREATE_PORTAL;
2507	ptlrpc_at_set_req_timeout(req);
2508
2509	if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2510		/* procfs requests not want stat in wait for avoid deadlock */
2511		req->rq_no_resend = 1;
2512		req->rq_no_delay = 1;
2513	}
2514
2515	req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2516	CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2517	aa = ptlrpc_req_async_args(req);
2518	aa->aa_oi = oinfo;
2519
2520	ptlrpc_set_add_req(rqset, req);
2521	return 0;
2522}
2523
2524static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2525		      struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2526{
2527	struct obd_device     *obd = class_exp2obd(exp);
2528	struct obd_statfs     *msfs;
2529	struct ptlrpc_request *req;
2530	struct obd_import     *imp = NULL;
2531	int rc;
2532
2533	/*Since the request might also come from lprocfs, so we need
2534	 *sync this with client_disconnect_export Bug15684*/
2535	down_read(&obd->u.cli.cl_sem);
2536	if (obd->u.cli.cl_import)
2537		imp = class_import_get(obd->u.cli.cl_import);
2538	up_read(&obd->u.cli.cl_sem);
2539	if (!imp)
2540		return -ENODEV;
2541
2542	/* We could possibly pass max_age in the request (as an absolute
2543	 * timestamp or a "seconds.usec ago") so the target can avoid doing
2544	 * extra calls into the filesystem if that isn't necessary (e.g.
2545	 * during mount that would help a bit).  Having relative timestamps
2546	 * is not so great if request processing is slow, while absolute
2547	 * timestamps are not ideal because they need time synchronization. */
2548	req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2549
2550	class_import_put(imp);
2551
2552	if (req == NULL)
2553		return -ENOMEM;
2554
2555	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2556	if (rc) {
2557		ptlrpc_request_free(req);
2558		return rc;
2559	}
2560	ptlrpc_request_set_replen(req);
2561	req->rq_request_portal = OST_CREATE_PORTAL;
2562	ptlrpc_at_set_req_timeout(req);
2563
2564	if (flags & OBD_STATFS_NODELAY) {
2565		/* procfs requests not want stat in wait for avoid deadlock */
2566		req->rq_no_resend = 1;
2567		req->rq_no_delay = 1;
2568	}
2569
2570	rc = ptlrpc_queue_wait(req);
2571	if (rc)
2572		goto out;
2573
2574	msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2575	if (msfs == NULL) {
2576		rc = -EPROTO;
2577		goto out;
2578	}
2579
2580	*osfs = *msfs;
2581
2582 out:
2583	ptlrpc_req_finished(req);
2584	return rc;
2585}
2586
2587/* Retrieve object striping information.
2588 *
2589 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2590 * the maximum number of OST indices which will fit in the user buffer.
2591 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2592 */
2593static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2594{
2595	/* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2596	struct lov_user_md_v3 lum, *lumk;
2597	struct lov_user_ost_data_v1 *lmm_objects;
2598	int rc = 0, lum_size;
2599
2600	if (!lsm)
2601		return -ENODATA;
2602
2603	/* we only need the header part from user space to get lmm_magic and
2604	 * lmm_stripe_count, (the header part is common to v1 and v3) */
2605	lum_size = sizeof(struct lov_user_md_v1);
2606	if (copy_from_user(&lum, lump, lum_size))
2607		return -EFAULT;
2608
2609	if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2610	    (lum.lmm_magic != LOV_USER_MAGIC_V3))
2611		return -EINVAL;
2612
2613	/* lov_user_md_vX and lov_mds_md_vX must have the same size */
2614	LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2615	LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2616	LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2617
2618	/* we can use lov_mds_md_size() to compute lum_size
2619	 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2620	if (lum.lmm_stripe_count > 0) {
2621		lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2622		OBD_ALLOC(lumk, lum_size);
2623		if (!lumk)
2624			return -ENOMEM;
2625
2626		if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2627			lmm_objects =
2628			    &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2629		else
2630			lmm_objects = &(lumk->lmm_objects[0]);
2631		lmm_objects->l_ost_oi = lsm->lsm_oi;
2632	} else {
2633		lum_size = lov_mds_md_size(0, lum.lmm_magic);
2634		lumk = &lum;
2635	}
2636
2637	lumk->lmm_oi = lsm->lsm_oi;
2638	lumk->lmm_stripe_count = 1;
2639
2640	if (copy_to_user(lump, lumk, lum_size))
2641		rc = -EFAULT;
2642
2643	if (lumk != &lum)
2644		OBD_FREE(lumk, lum_size);
2645
2646	return rc;
2647}
2648
2649
2650static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2651			 void *karg, void *uarg)
2652{
2653	struct obd_device *obd = exp->exp_obd;
2654	struct obd_ioctl_data *data = karg;
2655	int err = 0;
2656
2657	if (!try_module_get(THIS_MODULE)) {
2658		CERROR("Can't get module. Is it alive?");
2659		return -EINVAL;
2660	}
2661	switch (cmd) {
2662	case OBD_IOC_LOV_GET_CONFIG: {
2663		char *buf;
2664		struct lov_desc *desc;
2665		struct obd_uuid uuid;
2666
2667		buf = NULL;
2668		len = 0;
2669		if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) {
2670			err = -EINVAL;
2671			goto out;
2672		}
2673
2674		data = (struct obd_ioctl_data *)buf;
2675
2676		if (sizeof(*desc) > data->ioc_inllen1) {
2677			obd_ioctl_freedata(buf, len);
2678			err = -EINVAL;
2679			goto out;
2680		}
2681
2682		if (data->ioc_inllen2 < sizeof(uuid)) {
2683			obd_ioctl_freedata(buf, len);
2684			err = -EINVAL;
2685			goto out;
2686		}
2687
2688		desc = (struct lov_desc *)data->ioc_inlbuf1;
2689		desc->ld_tgt_count = 1;
2690		desc->ld_active_tgt_count = 1;
2691		desc->ld_default_stripe_count = 1;
2692		desc->ld_default_stripe_size = 0;
2693		desc->ld_default_stripe_offset = 0;
2694		desc->ld_pattern = 0;
2695		memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2696
2697		memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2698
2699		err = copy_to_user((void *)uarg, buf, len);
2700		if (err)
2701			err = -EFAULT;
2702		obd_ioctl_freedata(buf, len);
2703		goto out;
2704	}
2705	case LL_IOC_LOV_SETSTRIPE:
2706		err = obd_alloc_memmd(exp, karg);
2707		if (err > 0)
2708			err = 0;
2709		goto out;
2710	case LL_IOC_LOV_GETSTRIPE:
2711		err = osc_getstripe(karg, uarg);
2712		goto out;
2713	case OBD_IOC_CLIENT_RECOVER:
2714		err = ptlrpc_recover_import(obd->u.cli.cl_import,
2715					    data->ioc_inlbuf1, 0);
2716		if (err > 0)
2717			err = 0;
2718		goto out;
2719	case IOC_OSC_SET_ACTIVE:
2720		err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2721					       data->ioc_offset);
2722		goto out;
2723	case OBD_IOC_POLL_QUOTACHECK:
2724		err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2725		goto out;
2726	case OBD_IOC_PING_TARGET:
2727		err = ptlrpc_obd_ping(obd);
2728		goto out;
2729	default:
2730		CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2731		       cmd, current_comm());
2732		err = -ENOTTY;
2733		goto out;
2734	}
2735out:
2736	module_put(THIS_MODULE);
2737	return err;
2738}
2739
2740static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2741			u32 keylen, void *key, __u32 *vallen, void *val,
2742			struct lov_stripe_md *lsm)
2743{
2744	if (!vallen || !val)
2745		return -EFAULT;
2746
2747	if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2748		__u32 *stripe = val;
2749		*vallen = sizeof(*stripe);
2750		*stripe = 0;
2751		return 0;
2752	} else if (KEY_IS(KEY_LAST_ID)) {
2753		struct ptlrpc_request *req;
2754		u64		*reply;
2755		char		  *tmp;
2756		int		    rc;
2757
2758		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2759					   &RQF_OST_GET_INFO_LAST_ID);
2760		if (req == NULL)
2761			return -ENOMEM;
2762
2763		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2764				     RCL_CLIENT, keylen);
2765		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2766		if (rc) {
2767			ptlrpc_request_free(req);
2768			return rc;
2769		}
2770
2771		tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2772		memcpy(tmp, key, keylen);
2773
2774		req->rq_no_delay = req->rq_no_resend = 1;
2775		ptlrpc_request_set_replen(req);
2776		rc = ptlrpc_queue_wait(req);
2777		if (rc)
2778			goto out;
2779
2780		reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2781		if (reply == NULL) {
2782			rc = -EPROTO;
2783			goto out;
2784		}
2785
2786		*((u64 *)val) = *reply;
2787	out:
2788		ptlrpc_req_finished(req);
2789		return rc;
2790	} else if (KEY_IS(KEY_FIEMAP)) {
2791		struct ll_fiemap_info_key *fm_key =
2792				(struct ll_fiemap_info_key *)key;
2793		struct ldlm_res_id	 res_id;
2794		ldlm_policy_data_t	 policy;
2795		struct lustre_handle	 lockh;
2796		ldlm_mode_t		 mode = 0;
2797		struct ptlrpc_request	*req;
2798		struct ll_user_fiemap	*reply;
2799		char			*tmp;
2800		int			 rc;
2801
2802		if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2803			goto skip_locking;
2804
2805		policy.l_extent.start = fm_key->fiemap.fm_start &
2806						CFS_PAGE_MASK;
2807
2808		if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2809		    fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2810			policy.l_extent.end = OBD_OBJECT_EOF;
2811		else
2812			policy.l_extent.end = (fm_key->fiemap.fm_start +
2813				fm_key->fiemap.fm_length +
2814				PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2815
2816		ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2817		mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2818				       LDLM_FL_BLOCK_GRANTED |
2819				       LDLM_FL_LVB_READY,
2820				       &res_id, LDLM_EXTENT, &policy,
2821				       LCK_PR | LCK_PW, &lockh, 0);
2822		if (mode) { /* lock is cached on client */
2823			if (mode != LCK_PR) {
2824				ldlm_lock_addref(&lockh, LCK_PR);
2825				ldlm_lock_decref(&lockh, LCK_PW);
2826			}
2827		} else { /* no cached lock, needs acquire lock on server side */
2828			fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2829			fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2830		}
2831
2832skip_locking:
2833		req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2834					   &RQF_OST_GET_INFO_FIEMAP);
2835		if (req == NULL) {
2836			rc = -ENOMEM;
2837			goto drop_lock;
2838		}
2839
2840		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2841				     RCL_CLIENT, keylen);
2842		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2843				     RCL_CLIENT, *vallen);
2844		req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2845				     RCL_SERVER, *vallen);
2846
2847		rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2848		if (rc) {
2849			ptlrpc_request_free(req);
2850			goto drop_lock;
2851		}
2852
2853		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2854		memcpy(tmp, key, keylen);
2855		tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2856		memcpy(tmp, val, *vallen);
2857
2858		ptlrpc_request_set_replen(req);
2859		rc = ptlrpc_queue_wait(req);
2860		if (rc)
2861			goto fini_req;
2862
2863		reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2864		if (reply == NULL) {
2865			rc = -EPROTO;
2866			goto fini_req;
2867		}
2868
2869		memcpy(val, reply, *vallen);
2870fini_req:
2871		ptlrpc_req_finished(req);
2872drop_lock:
2873		if (mode)
2874			ldlm_lock_decref(&lockh, LCK_PR);
2875		return rc;
2876	}
2877
2878	return -EINVAL;
2879}
2880
2881static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2882			      u32 keylen, void *key, u32 vallen,
2883			      void *val, struct ptlrpc_request_set *set)
2884{
2885	struct ptlrpc_request *req;
2886	struct obd_device     *obd = exp->exp_obd;
2887	struct obd_import     *imp = class_exp2cliimp(exp);
2888	char		  *tmp;
2889	int		    rc;
2890
2891	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2892
2893	if (KEY_IS(KEY_CHECKSUM)) {
2894		if (vallen != sizeof(int))
2895			return -EINVAL;
2896		exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2897		return 0;
2898	}
2899
2900	if (KEY_IS(KEY_SPTLRPC_CONF)) {
2901		sptlrpc_conf_client_adapt(obd);
2902		return 0;
2903	}
2904
2905	if (KEY_IS(KEY_FLUSH_CTX)) {
2906		sptlrpc_import_flush_my_ctx(imp);
2907		return 0;
2908	}
2909
2910	if (KEY_IS(KEY_CACHE_SET)) {
2911		struct client_obd *cli = &obd->u.cli;
2912
2913		LASSERT(cli->cl_cache == NULL); /* only once */
2914		cli->cl_cache = (struct cl_client_cache *)val;
2915		atomic_inc(&cli->cl_cache->ccc_users);
2916		cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2917
2918		/* add this osc into entity list */
2919		LASSERT(list_empty(&cli->cl_lru_osc));
2920		spin_lock(&cli->cl_cache->ccc_lru_lock);
2921		list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2922		spin_unlock(&cli->cl_cache->ccc_lru_lock);
2923
2924		return 0;
2925	}
2926
2927	if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2928		struct client_obd *cli = &obd->u.cli;
2929		int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2930		int target = *(int *)val;
2931
2932		nr = osc_lru_shrink(cli, min(nr, target));
2933		*(int *)val -= nr;
2934		return 0;
2935	}
2936
2937	if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2938		return -EINVAL;
2939
2940	/* We pass all other commands directly to OST. Since nobody calls osc
2941	   methods directly and everybody is supposed to go through LOV, we
2942	   assume lov checked invalid values for us.
2943	   The only recognised values so far are evict_by_nid and mds_conn.
2944	   Even if something bad goes through, we'd get a -EINVAL from OST
2945	   anyway. */
2946
2947	req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2948						&RQF_OST_SET_GRANT_INFO :
2949						&RQF_OBD_SET_INFO);
2950	if (req == NULL)
2951		return -ENOMEM;
2952
2953	req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2954			     RCL_CLIENT, keylen);
2955	if (!KEY_IS(KEY_GRANT_SHRINK))
2956		req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2957				     RCL_CLIENT, vallen);
2958	rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2959	if (rc) {
2960		ptlrpc_request_free(req);
2961		return rc;
2962	}
2963
2964	tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2965	memcpy(tmp, key, keylen);
2966	tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2967							&RMF_OST_BODY :
2968							&RMF_SETINFO_VAL);
2969	memcpy(tmp, val, vallen);
2970
2971	if (KEY_IS(KEY_GRANT_SHRINK)) {
2972		struct osc_brw_async_args *aa;
2973		struct obdo *oa;
2974
2975		CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2976		aa = ptlrpc_req_async_args(req);
2977		OBDO_ALLOC(oa);
2978		if (!oa) {
2979			ptlrpc_req_finished(req);
2980			return -ENOMEM;
2981		}
2982		*oa = ((struct ost_body *)val)->oa;
2983		aa->aa_oa = oa;
2984		req->rq_interpret_reply = osc_shrink_grant_interpret;
2985	}
2986
2987	ptlrpc_request_set_replen(req);
2988	if (!KEY_IS(KEY_GRANT_SHRINK)) {
2989		LASSERT(set != NULL);
2990		ptlrpc_set_add_req(set, req);
2991		ptlrpc_check_set(NULL, set);
2992	} else
2993		ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2994
2995	return 0;
2996}
2997
2998static int osc_reconnect(const struct lu_env *env,
2999			 struct obd_export *exp, struct obd_device *obd,
3000			 struct obd_uuid *cluuid,
3001			 struct obd_connect_data *data,
3002			 void *localdata)
3003{
3004	struct client_obd *cli = &obd->u.cli;
3005
3006	if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3007		long lost_grant;
3008
3009		client_obd_list_lock(&cli->cl_loi_list_lock);
3010		data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3011				2 * cli_brw_size(obd);
3012		lost_grant = cli->cl_lost_grant;
3013		cli->cl_lost_grant = 0;
3014		client_obd_list_unlock(&cli->cl_loi_list_lock);
3015
3016		CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3017		       " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3018		       data->ocd_version, data->ocd_grant, lost_grant);
3019	}
3020
3021	return 0;
3022}
3023
3024static int osc_disconnect(struct obd_export *exp)
3025{
3026	struct obd_device *obd = class_exp2obd(exp);
3027	int rc;
3028
3029	rc = client_disconnect_export(exp);
3030	/**
3031	 * Initially we put del_shrink_grant before disconnect_export, but it
3032	 * causes the following problem if setup (connect) and cleanup
3033	 * (disconnect) are tangled together.
3034	 *      connect p1		     disconnect p2
3035	 *   ptlrpc_connect_import
3036	 *     ...............	       class_manual_cleanup
3037	 *				     osc_disconnect
3038	 *				     del_shrink_grant
3039	 *   ptlrpc_connect_interrupt
3040	 *     init_grant_shrink
3041	 *   add this client to shrink list
3042	 *				      cleanup_osc
3043	 * Bang! pinger trigger the shrink.
3044	 * So the osc should be disconnected from the shrink list, after we
3045	 * are sure the import has been destroyed. BUG18662
3046	 */
3047	if (obd->u.cli.cl_import == NULL)
3048		osc_del_shrink_grant(&obd->u.cli);
3049	return rc;
3050}
3051
3052static int osc_import_event(struct obd_device *obd,
3053			    struct obd_import *imp,
3054			    enum obd_import_event event)
3055{
3056	struct client_obd *cli;
3057	int rc = 0;
3058
3059	LASSERT(imp->imp_obd == obd);
3060
3061	switch (event) {
3062	case IMP_EVENT_DISCON: {
3063		cli = &obd->u.cli;
3064		client_obd_list_lock(&cli->cl_loi_list_lock);
3065		cli->cl_avail_grant = 0;
3066		cli->cl_lost_grant = 0;
3067		client_obd_list_unlock(&cli->cl_loi_list_lock);
3068		break;
3069	}
3070	case IMP_EVENT_INACTIVE: {
3071		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3072		break;
3073	}
3074	case IMP_EVENT_INVALIDATE: {
3075		struct ldlm_namespace *ns = obd->obd_namespace;
3076		struct lu_env	 *env;
3077		int		    refcheck;
3078
3079		env = cl_env_get(&refcheck);
3080		if (!IS_ERR(env)) {
3081			/* Reset grants */
3082			cli = &obd->u.cli;
3083			/* all pages go to failing rpcs due to the invalid
3084			 * import */
3085			osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3086
3087			ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3088			cl_env_put(env, &refcheck);
3089		} else
3090			rc = PTR_ERR(env);
3091		break;
3092	}
3093	case IMP_EVENT_ACTIVE: {
3094		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3095		break;
3096	}
3097	case IMP_EVENT_OCD: {
3098		struct obd_connect_data *ocd = &imp->imp_connect_data;
3099
3100		if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3101			osc_init_grant(&obd->u.cli, ocd);
3102
3103		/* See bug 7198 */
3104		if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3105			imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3106
3107		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3108		break;
3109	}
3110	case IMP_EVENT_DEACTIVATE: {
3111		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3112		break;
3113	}
3114	case IMP_EVENT_ACTIVATE: {
3115		rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3116		break;
3117	}
3118	default:
3119		CERROR("Unknown import event %d\n", event);
3120		LBUG();
3121	}
3122	return rc;
3123}
3124
3125/**
3126 * Determine whether the lock can be canceled before replaying the lock
3127 * during recovery, see bug16774 for detailed information.
3128 *
3129 * \retval zero the lock can't be canceled
3130 * \retval other ok to cancel
3131 */
3132static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3133{
3134	check_res_locked(lock->l_resource);
3135
3136	/*
3137	 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3138	 *
3139	 * XXX as a future improvement, we can also cancel unused write lock
3140	 * if it doesn't have dirty data and active mmaps.
3141	 */
3142	if (lock->l_resource->lr_type == LDLM_EXTENT &&
3143	    (lock->l_granted_mode == LCK_PR ||
3144	     lock->l_granted_mode == LCK_CR) &&
3145	    (osc_dlm_lock_pageref(lock) == 0))
3146		return 1;
3147
3148	return 0;
3149}
3150
3151static int brw_queue_work(const struct lu_env *env, void *data)
3152{
3153	struct client_obd *cli = data;
3154
3155	CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3156
3157	osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3158	return 0;
3159}
3160
3161int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3162{
3163	struct lprocfs_static_vars lvars = { NULL };
3164	struct client_obd	  *cli = &obd->u.cli;
3165	void		       *handler;
3166	int			rc;
3167
3168	rc = ptlrpcd_addref();
3169	if (rc)
3170		return rc;
3171
3172	rc = client_obd_setup(obd, lcfg);
3173	if (rc)
3174		goto out_ptlrpcd;
3175
3176	handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3177	if (IS_ERR(handler)) {
3178		rc = PTR_ERR(handler);
3179		goto out_client_setup;
3180	}
3181	cli->cl_writeback_work = handler;
3182
3183	rc = osc_quota_setup(obd);
3184	if (rc)
3185		goto out_ptlrpcd_work;
3186
3187	cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3188	lprocfs_osc_init_vars(&lvars);
3189	if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3190		lproc_osc_attach_seqstat(obd);
3191		sptlrpc_lprocfs_cliobd_attach(obd);
3192		ptlrpc_lprocfs_register_obd(obd);
3193	}
3194
3195	/* We need to allocate a few requests more, because
3196	 * brw_interpret tries to create new requests before freeing
3197	 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3198	 * reserved, but I'm afraid that might be too much wasted RAM
3199	 * in fact, so 2 is just my guess and still should work. */
3200	cli->cl_import->imp_rq_pool =
3201		ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3202				    OST_MAXREQSIZE,
3203				    ptlrpc_add_rqs_to_pool);
3204
3205	INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3206	ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3207	return rc;
3208
3209out_ptlrpcd_work:
3210	ptlrpcd_destroy_work(handler);
3211out_client_setup:
3212	client_obd_cleanup(obd);
3213out_ptlrpcd:
3214	ptlrpcd_decref();
3215	return rc;
3216}
3217
3218static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3219{
3220	int rc = 0;
3221
3222	switch (stage) {
3223	case OBD_CLEANUP_EARLY: {
3224		struct obd_import *imp;
3225		imp = obd->u.cli.cl_import;
3226		CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3227		/* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3228		ptlrpc_deactivate_import(imp);
3229		spin_lock(&imp->imp_lock);
3230		imp->imp_pingable = 0;
3231		spin_unlock(&imp->imp_lock);
3232		break;
3233	}
3234	case OBD_CLEANUP_EXPORTS: {
3235		struct client_obd *cli = &obd->u.cli;
3236		/* LU-464
3237		 * for echo client, export may be on zombie list, wait for
3238		 * zombie thread to cull it, because cli.cl_import will be
3239		 * cleared in client_disconnect_export():
3240		 *   class_export_destroy() -> obd_cleanup() ->
3241		 *   echo_device_free() -> echo_client_cleanup() ->
3242		 *   obd_disconnect() -> osc_disconnect() ->
3243		 *   client_disconnect_export()
3244		 */
3245		obd_zombie_barrier();
3246		if (cli->cl_writeback_work) {
3247			ptlrpcd_destroy_work(cli->cl_writeback_work);
3248			cli->cl_writeback_work = NULL;
3249		}
3250		obd_cleanup_client_import(obd);
3251		ptlrpc_lprocfs_unregister_obd(obd);
3252		lprocfs_obd_cleanup(obd);
3253		break;
3254		}
3255	}
3256	return rc;
3257}
3258
3259int osc_cleanup(struct obd_device *obd)
3260{
3261	struct client_obd *cli = &obd->u.cli;
3262	int rc;
3263
3264	/* lru cleanup */
3265	if (cli->cl_cache != NULL) {
3266		LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3267		spin_lock(&cli->cl_cache->ccc_lru_lock);
3268		list_del_init(&cli->cl_lru_osc);
3269		spin_unlock(&cli->cl_cache->ccc_lru_lock);
3270		cli->cl_lru_left = NULL;
3271		atomic_dec(&cli->cl_cache->ccc_users);
3272		cli->cl_cache = NULL;
3273	}
3274
3275	/* free memory of osc quota cache */
3276	osc_quota_cleanup(obd);
3277
3278	rc = client_obd_cleanup(obd);
3279
3280	ptlrpcd_decref();
3281	return rc;
3282}
3283
3284int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3285{
3286	struct lprocfs_static_vars lvars = { NULL };
3287	int rc = 0;
3288
3289	lprocfs_osc_init_vars(&lvars);
3290
3291	switch (lcfg->lcfg_command) {
3292	default:
3293		rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3294					      lcfg, obd);
3295		if (rc > 0)
3296			rc = 0;
3297		break;
3298	}
3299
3300	return rc;
3301}
3302
3303static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3304{
3305	return osc_process_config_base(obd, buf);
3306}
3307
3308struct obd_ops osc_obd_ops = {
3309	.o_owner		= THIS_MODULE,
3310	.o_setup		= osc_setup,
3311	.o_precleanup	   = osc_precleanup,
3312	.o_cleanup	      = osc_cleanup,
3313	.o_add_conn	     = client_import_add_conn,
3314	.o_del_conn	     = client_import_del_conn,
3315	.o_connect	      = client_connect_import,
3316	.o_reconnect	    = osc_reconnect,
3317	.o_disconnect	   = osc_disconnect,
3318	.o_statfs	       = osc_statfs,
3319	.o_statfs_async	 = osc_statfs_async,
3320	.o_packmd	       = osc_packmd,
3321	.o_unpackmd	     = osc_unpackmd,
3322	.o_create	       = osc_create,
3323	.o_destroy	      = osc_destroy,
3324	.o_getattr	      = osc_getattr,
3325	.o_getattr_async	= osc_getattr_async,
3326	.o_setattr	      = osc_setattr,
3327	.o_setattr_async	= osc_setattr_async,
3328	.o_find_cbdata	  = osc_find_cbdata,
3329	.o_iocontrol	    = osc_iocontrol,
3330	.o_get_info	     = osc_get_info,
3331	.o_set_info_async       = osc_set_info_async,
3332	.o_import_event	 = osc_import_event,
3333	.o_process_config       = osc_process_config,
3334	.o_quotactl	     = osc_quotactl,
3335	.o_quotacheck	   = osc_quotacheck,
3336};
3337
3338extern struct lu_kmem_descr osc_caches[];
3339extern spinlock_t osc_ast_guard;
3340extern struct lock_class_key osc_ast_guard_class;
3341
3342int __init osc_init(void)
3343{
3344	struct lprocfs_static_vars lvars = { NULL };
3345	int rc;
3346
3347	/* print an address of _any_ initialized kernel symbol from this
3348	 * module, to allow debugging with gdb that doesn't support data
3349	 * symbols from modules.*/
3350	CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3351
3352	rc = lu_kmem_init(osc_caches);
3353	if (rc)
3354		return rc;
3355
3356	lprocfs_osc_init_vars(&lvars);
3357
3358	rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3359				 LUSTRE_OSC_NAME, &osc_device_type);
3360	if (rc) {
3361		lu_kmem_fini(osc_caches);
3362		return rc;
3363	}
3364
3365	spin_lock_init(&osc_ast_guard);
3366	lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3367
3368	return rc;
3369}
3370
3371static void /*__exit*/ osc_exit(void)
3372{
3373	class_unregister_type(LUSTRE_OSC_NAME);
3374	lu_kmem_fini(osc_caches);
3375}
3376
3377MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3378MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3379MODULE_LICENSE("GPL");
3380MODULE_VERSION(LUSTRE_VERSION_STRING);
3381
3382module_init(osc_init);
3383module_exit(osc_exit);
3384