osc_io.c revision b0f5aad587ea1fc3563d056609ee54a961ee1256
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * Implementation of cl_io for OSC layer.
37 *
38 *   Author: Nikita Danilov <nikita.danilov@sun.com>
39 *   Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
40 */
41
42#define DEBUG_SUBSYSTEM S_OSC
43
44#include "osc_cl_internal.h"
45
46/** \addtogroup osc
47 *  @{
48 */
49
50/*****************************************************************************
51 *
52 * Type conversions.
53 *
54 */
55
56static struct osc_req *cl2osc_req(const struct cl_req_slice *slice)
57{
58	LINVRNT(slice->crs_dev->cd_lu_dev.ld_type == &osc_device_type);
59	return container_of0(slice, struct osc_req, or_cl);
60}
61
62static struct osc_io *cl2osc_io(const struct lu_env *env,
63				const struct cl_io_slice *slice)
64{
65	struct osc_io *oio = container_of0(slice, struct osc_io, oi_cl);
66	LINVRNT(oio == osc_env_io(env));
67	return oio;
68}
69
70static struct osc_page *osc_cl_page_osc(struct cl_page *page)
71{
72	const struct cl_page_slice *slice;
73
74	slice = cl_page_at(page, &osc_device_type);
75	LASSERT(slice != NULL);
76
77	return cl2osc_page(slice);
78}
79
80
81/*****************************************************************************
82 *
83 * io operations.
84 *
85 */
86
87static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io)
88{
89}
90
91/**
92 * An implementation of cl_io_operations::cio_io_submit() method for osc
93 * layer. Iterates over pages in the in-queue, prepares each for io by calling
94 * cl_page_prep() and then either submits them through osc_io_submit_page()
95 * or, if page is already submitted, changes osc flags through
96 * osc_set_async_flags().
97 */
98static int osc_io_submit(const struct lu_env *env,
99			 const struct cl_io_slice *ios,
100			 enum cl_req_type crt, struct cl_2queue *queue)
101{
102	struct cl_page    *page;
103	struct cl_page    *tmp;
104	struct client_obd *cli  = NULL;
105	struct osc_object *osc  = NULL; /* to keep gcc happy */
106	struct osc_page   *opg;
107	struct cl_io      *io;
108	LIST_HEAD(list);
109
110	struct cl_page_list *qin      = &queue->c2_qin;
111	struct cl_page_list *qout     = &queue->c2_qout;
112	int queued = 0;
113	int result = 0;
114	int cmd;
115	int brw_flags;
116	int max_pages;
117
118	LASSERT(qin->pl_nr > 0);
119
120	CDEBUG(D_CACHE, "%d %d\n", qin->pl_nr, crt);
121
122	osc = cl2osc(ios->cis_obj);
123	cli = osc_cli(osc);
124	max_pages = cli->cl_max_pages_per_rpc;
125
126	cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
127	brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0;
128
129	/*
130	 * NOTE: here @page is a top-level page. This is done to avoid
131	 *       creation of sub-page-list.
132	 */
133	cl_page_list_for_each_safe(page, tmp, qin) {
134		struct osc_async_page *oap;
135
136		/* Top level IO. */
137		io = page->cp_owner;
138		LASSERT(io != NULL);
139
140		opg = osc_cl_page_osc(page);
141		oap = &opg->ops_oap;
142		LASSERT(osc == oap->oap_obj);
143
144		if (!list_empty(&oap->oap_pending_item) ||
145		    !list_empty(&oap->oap_rpc_item)) {
146			CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
147			       oap, opg);
148			result = -EBUSY;
149			break;
150		}
151
152		result = cl_page_prep(env, io, page, crt);
153		if (result != 0) {
154			LASSERT(result < 0);
155			if (result != -EALREADY)
156				break;
157			/*
158			 * Handle -EALREADY error: for read case, the page is
159			 * already in UPTODATE state; for write, the page
160			 * is not dirty.
161			 */
162			result = 0;
163			continue;
164		}
165
166		cl_page_list_move(qout, qin, page);
167		oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
168		oap->oap_async_flags |= ASYNC_COUNT_STABLE;
169
170		osc_page_submit(env, opg, crt, brw_flags);
171		list_add_tail(&oap->oap_pending_item, &list);
172		if (++queued == max_pages) {
173			queued = 0;
174			result = osc_queue_sync_pages(env, osc, &list, cmd,
175						      brw_flags);
176			if (result < 0)
177				break;
178		}
179	}
180
181	if (queued > 0)
182		result = osc_queue_sync_pages(env, osc, &list, cmd, brw_flags);
183
184	CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result);
185	return qout->pl_nr > 0 ? 0 : result;
186}
187
188static void osc_page_touch_at(const struct lu_env *env,
189			      struct cl_object *obj, pgoff_t idx, unsigned to)
190{
191	struct lov_oinfo  *loi  = cl2osc(obj)->oo_oinfo;
192	struct cl_attr    *attr = &osc_env_info(env)->oti_attr;
193	int valid;
194	__u64 kms;
195
196	/* offset within stripe */
197	kms = cl_offset(obj, idx) + to;
198
199	cl_object_attr_lock(obj);
200	/*
201	 * XXX old code used
202	 *
203	 *	 ll_inode_size_lock(inode, 0); lov_stripe_lock(lsm);
204	 *
205	 * here
206	 */
207	CDEBUG(D_INODE, "stripe KMS %sincreasing %llu->%llu %llu\n",
208	       kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms,
209	       loi->loi_lvb.lvb_size);
210
211	valid = 0;
212	if (kms > loi->loi_kms) {
213		attr->cat_kms = kms;
214		valid |= CAT_KMS;
215	}
216	if (kms > loi->loi_lvb.lvb_size) {
217		attr->cat_size = kms;
218		valid |= CAT_SIZE;
219	}
220	cl_object_attr_set(env, obj, attr, valid);
221	cl_object_attr_unlock(obj);
222}
223
224/**
225 * This is called when a page is accessed within file in a way that creates
226 * new page, if one were missing (i.e., if there were a hole at that place in
227 * the file, or accessed page is beyond the current file size). Examples:
228 * ->commit_write() and ->nopage() methods.
229 *
230 * Expand stripe KMS if necessary.
231 */
232static void osc_page_touch(const struct lu_env *env,
233			   struct osc_page *opage, unsigned to)
234{
235	struct cl_page    *page = opage->ops_cl.cpl_page;
236	struct cl_object  *obj  = opage->ops_cl.cpl_obj;
237
238	osc_page_touch_at(env, obj, page->cp_index, to);
239}
240
241/**
242 * Implements cl_io_operations::cio_prepare_write() method for osc layer.
243 *
244 * \retval -EIO transfer initiated against this osc will most likely fail
245 * \retval 0    transfer initiated against this osc will most likely succeed.
246 *
247 * The reason for this check is to immediately return an error to the caller
248 * in the case of a deactivated import. Note, that import can be deactivated
249 * later, while pages, dirtied by this IO, are still in the cache, but this is
250 * irrelevant, because that would still return an error to the application (if
251 * it does fsync), but many applications don't do fsync because of performance
252 * issues, and we wanted to return an -EIO at write time to notify the
253 * application.
254 */
255static int osc_io_prepare_write(const struct lu_env *env,
256				const struct cl_io_slice *ios,
257				const struct cl_page_slice *slice,
258				unsigned from, unsigned to)
259{
260	struct osc_device *dev = lu2osc_dev(slice->cpl_obj->co_lu.lo_dev);
261	struct obd_import *imp = class_exp2cliimp(dev->od_exp);
262	struct osc_io     *oio = cl2osc_io(env, ios);
263	int result = 0;
264
265	/*
266	 * This implements OBD_BRW_CHECK logic from old client.
267	 */
268
269	if (imp == NULL || imp->imp_invalid)
270		result = -EIO;
271	if (result == 0 && oio->oi_lockless)
272		/* this page contains `invalid' data, but who cares?
273		 * nobody can access the invalid data.
274		 * in osc_io_commit_write(), we're going to write exact
275		 * [from, to) bytes of this page to OST. -jay */
276		cl_page_export(env, slice->cpl_page, 1);
277
278	return result;
279}
280
281static int osc_io_commit_write(const struct lu_env *env,
282			       const struct cl_io_slice *ios,
283			       const struct cl_page_slice *slice,
284			       unsigned from, unsigned to)
285{
286	struct osc_io	 *oio = cl2osc_io(env, ios);
287	struct osc_page       *opg = cl2osc_page(slice);
288	struct osc_object     *obj = cl2osc(opg->ops_cl.cpl_obj);
289	struct osc_async_page *oap = &opg->ops_oap;
290
291	LASSERT(to > 0);
292	/*
293	 * XXX instead of calling osc_page_touch() here and in
294	 * osc_io_fault_start() it might be more logical to introduce
295	 * cl_page_touch() method, that generic cl_io_commit_write() and page
296	 * fault code calls.
297	 */
298	osc_page_touch(env, cl2osc_page(slice), to);
299	if (!client_is_remote(osc_export(obj)) &&
300	    capable(CFS_CAP_SYS_RESOURCE))
301		oap->oap_brw_flags |= OBD_BRW_NOQUOTA;
302
303	if (oio->oi_lockless)
304		/* see osc_io_prepare_write() for lockless io handling. */
305		cl_page_clip(env, slice->cpl_page, from, to);
306
307	return 0;
308}
309
310static int osc_io_fault_start(const struct lu_env *env,
311			      const struct cl_io_slice *ios)
312{
313	struct cl_io       *io;
314	struct cl_fault_io *fio;
315
316	io  = ios->cis_io;
317	fio = &io->u.ci_fault;
318	CDEBUG(D_INFO, "%lu %d %d\n",
319	       fio->ft_index, fio->ft_writable, fio->ft_nob);
320	/*
321	 * If mapping is writeable, adjust kms to cover this page,
322	 * but do not extend kms beyond actual file size.
323	 * See bug 10919.
324	 */
325	if (fio->ft_writable)
326		osc_page_touch_at(env, ios->cis_obj,
327				  fio->ft_index, fio->ft_nob);
328	return 0;
329}
330
331static int osc_async_upcall(void *a, int rc)
332{
333	struct osc_async_cbargs *args = a;
334
335	args->opc_rc = rc;
336	complete(&args->opc_sync);
337	return 0;
338}
339
340/**
341 * Checks that there are no pages being written in the extent being truncated.
342 */
343static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
344			  struct cl_page *page, void *cbdata)
345{
346	const struct cl_page_slice *slice;
347	struct osc_page *ops;
348	struct osc_async_page *oap;
349	__u64 start = *(__u64 *)cbdata;
350
351	slice = cl_page_at(page, &osc_device_type);
352	LASSERT(slice != NULL);
353	ops = cl2osc_page(slice);
354	oap = &ops->ops_oap;
355
356	if (oap->oap_cmd & OBD_BRW_WRITE &&
357	    !list_empty(&oap->oap_pending_item))
358		CL_PAGE_DEBUG(D_ERROR, env, page, "exists %llu/%s.\n",
359				start, current->comm);
360
361	{
362		struct page *vmpage = cl_page_vmpage(env, page);
363		if (PageLocked(vmpage))
364			CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
365			       ops, page->cp_index,
366			       (oap->oap_cmd & OBD_BRW_RWMASK));
367	}
368
369	return CLP_GANG_OKAY;
370}
371
372static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
373			    struct osc_io *oio, __u64 size)
374{
375	struct cl_object *clob;
376	int     partial;
377	pgoff_t start;
378
379	clob    = oio->oi_cl.cis_obj;
380	start   = cl_index(clob, size);
381	partial = cl_offset(clob, start) < size;
382
383	/*
384	 * Complain if there are pages in the truncated region.
385	 */
386	cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF,
387			    trunc_check_cb, (void *)&size);
388}
389
390static int osc_io_setattr_start(const struct lu_env *env,
391				const struct cl_io_slice *slice)
392{
393	struct cl_io	    *io     = slice->cis_io;
394	struct osc_io	   *oio    = cl2osc_io(env, slice);
395	struct cl_object	*obj    = slice->cis_obj;
396	struct lov_oinfo	*loi    = cl2osc(obj)->oo_oinfo;
397	struct cl_attr	  *attr   = &osc_env_info(env)->oti_attr;
398	struct obdo	     *oa     = &oio->oi_oa;
399	struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
400	__u64		    size   = io->u.ci_setattr.sa_attr.lvb_size;
401	unsigned int	     ia_valid = io->u.ci_setattr.sa_valid;
402	int		      result = 0;
403	struct obd_info	  oinfo = { { { 0 } } };
404
405	/* truncate cache dirty pages first */
406	if (cl_io_is_trunc(io))
407		result = osc_cache_truncate_start(env, oio, cl2osc(obj), size);
408
409	if (result == 0 && oio->oi_lockless == 0) {
410		cl_object_attr_lock(obj);
411		result = cl_object_attr_get(env, obj, attr);
412		if (result == 0) {
413			struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr;
414			unsigned int cl_valid = 0;
415
416			if (ia_valid & ATTR_SIZE) {
417				attr->cat_size = attr->cat_kms = size;
418				cl_valid = (CAT_SIZE | CAT_KMS);
419			}
420			if (ia_valid & ATTR_MTIME_SET) {
421				attr->cat_mtime = lvb->lvb_mtime;
422				cl_valid |= CAT_MTIME;
423			}
424			if (ia_valid & ATTR_ATIME_SET) {
425				attr->cat_atime = lvb->lvb_atime;
426				cl_valid |= CAT_ATIME;
427			}
428			if (ia_valid & ATTR_CTIME_SET) {
429				attr->cat_ctime = lvb->lvb_ctime;
430				cl_valid |= CAT_CTIME;
431			}
432			result = cl_object_attr_set(env, obj, attr, cl_valid);
433		}
434		cl_object_attr_unlock(obj);
435	}
436	memset(oa, 0, sizeof(*oa));
437	if (result == 0) {
438		oa->o_oi = loi->loi_oi;
439		oa->o_mtime = attr->cat_mtime;
440		oa->o_atime = attr->cat_atime;
441		oa->o_ctime = attr->cat_ctime;
442		oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
443			OBD_MD_FLCTIME | OBD_MD_FLMTIME;
444		if (ia_valid & ATTR_SIZE) {
445			oa->o_size = size;
446			oa->o_blocks = OBD_OBJECT_EOF;
447			oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
448
449			if (oio->oi_lockless) {
450				oa->o_flags = OBD_FL_SRVLOCK;
451				oa->o_valid |= OBD_MD_FLFLAGS;
452			}
453		} else {
454			LASSERT(oio->oi_lockless == 0);
455		}
456
457		oinfo.oi_oa = oa;
458		oinfo.oi_capa = io->u.ci_setattr.sa_capa;
459		init_completion(&cbargs->opc_sync);
460
461		if (ia_valid & ATTR_SIZE)
462			result = osc_punch_base(osc_export(cl2osc(obj)),
463						&oinfo, osc_async_upcall,
464						cbargs, PTLRPCD_SET);
465		else
466			result = osc_setattr_async_base(osc_export(cl2osc(obj)),
467							&oinfo, NULL,
468							osc_async_upcall,
469							cbargs, PTLRPCD_SET);
470		cbargs->opc_rpc_sent = result == 0;
471	}
472	return result;
473}
474
475static void osc_io_setattr_end(const struct lu_env *env,
476			       const struct cl_io_slice *slice)
477{
478	struct cl_io     *io  = slice->cis_io;
479	struct osc_io    *oio = cl2osc_io(env, slice);
480	struct cl_object *obj = slice->cis_obj;
481	struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
482	int result = 0;
483
484	if (cbargs->opc_rpc_sent) {
485		wait_for_completion(&cbargs->opc_sync);
486		result = io->ci_result = cbargs->opc_rc;
487	}
488	if (result == 0) {
489		if (oio->oi_lockless) {
490			/* lockless truncate */
491			struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
492
493			LASSERT(cl_io_is_trunc(io));
494			/* XXX: Need a lock. */
495			osd->od_stats.os_lockless_truncates++;
496		}
497	}
498
499	if (cl_io_is_trunc(io)) {
500		__u64 size = io->u.ci_setattr.sa_attr.lvb_size;
501		osc_trunc_check(env, io, oio, size);
502		if (oio->oi_trunc != NULL) {
503			osc_cache_truncate_end(env, oio, cl2osc(obj));
504			oio->oi_trunc = NULL;
505		}
506	}
507}
508
509static int osc_io_read_start(const struct lu_env *env,
510			     const struct cl_io_slice *slice)
511{
512	struct cl_object *obj   = slice->cis_obj;
513	struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
514	int rc = 0;
515
516	if (!slice->cis_io->ci_noatime) {
517		cl_object_attr_lock(obj);
518		attr->cat_atime = LTIME_S(CURRENT_TIME);
519		rc = cl_object_attr_set(env, obj, attr, CAT_ATIME);
520		cl_object_attr_unlock(obj);
521	}
522	return rc;
523}
524
525static int osc_io_write_start(const struct lu_env *env,
526			      const struct cl_io_slice *slice)
527{
528	struct cl_object *obj   = slice->cis_obj;
529	struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
530	int rc = 0;
531
532	OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
533	cl_object_attr_lock(obj);
534	attr->cat_mtime = attr->cat_ctime = LTIME_S(CURRENT_TIME);
535	rc = cl_object_attr_set(env, obj, attr, CAT_MTIME | CAT_CTIME);
536	cl_object_attr_unlock(obj);
537
538	return rc;
539}
540
541static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
542			 struct cl_fsync_io *fio)
543{
544	struct osc_io    *oio   = osc_env_io(env);
545	struct obdo      *oa    = &oio->oi_oa;
546	struct obd_info  *oinfo = &oio->oi_info;
547	struct lov_oinfo *loi   = obj->oo_oinfo;
548	struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
549	int rc = 0;
550
551	memset(oa, 0, sizeof(*oa));
552	oa->o_oi = loi->loi_oi;
553	oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
554
555	/* reload size abd blocks for start and end of sync range */
556	oa->o_size = fio->fi_start;
557	oa->o_blocks = fio->fi_end;
558	oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
559
560	obdo_set_parent_fid(oa, fio->fi_fid);
561
562	memset(oinfo, 0, sizeof(*oinfo));
563	oinfo->oi_oa = oa;
564	oinfo->oi_capa = fio->fi_capa;
565	init_completion(&cbargs->opc_sync);
566
567	rc = osc_sync_base(osc_export(obj), oinfo, osc_async_upcall, cbargs,
568			   PTLRPCD_SET);
569	return rc;
570}
571
572static int osc_io_fsync_start(const struct lu_env *env,
573			      const struct cl_io_slice *slice)
574{
575	struct cl_io       *io  = slice->cis_io;
576	struct cl_fsync_io *fio = &io->u.ci_fsync;
577	struct cl_object   *obj = slice->cis_obj;
578	struct osc_object  *osc = cl2osc(obj);
579	pgoff_t start  = cl_index(obj, fio->fi_start);
580	pgoff_t end    = cl_index(obj, fio->fi_end);
581	int     result = 0;
582
583	if (fio->fi_end == OBD_OBJECT_EOF)
584		end = CL_PAGE_EOF;
585
586	result = osc_cache_writeback_range(env, osc, start, end, 0,
587					   fio->fi_mode == CL_FSYNC_DISCARD);
588	if (result > 0) {
589		fio->fi_nr_written += result;
590		result = 0;
591	}
592	if (fio->fi_mode == CL_FSYNC_ALL) {
593		int rc;
594
595		/* we have to wait for writeback to finish before we can
596		 * send OST_SYNC RPC. This is bad because it causes extents
597		 * to be written osc by osc. However, we usually start
598		 * writeback before CL_FSYNC_ALL so this won't have any real
599		 * problem. */
600		rc = osc_cache_wait_range(env, osc, start, end);
601		if (result == 0)
602			result = rc;
603		rc = osc_fsync_ost(env, osc, fio);
604		if (result == 0)
605			result = rc;
606	}
607
608	return result;
609}
610
611static void osc_io_fsync_end(const struct lu_env *env,
612			     const struct cl_io_slice *slice)
613{
614	struct cl_fsync_io *fio = &slice->cis_io->u.ci_fsync;
615	struct cl_object   *obj = slice->cis_obj;
616	pgoff_t start = cl_index(obj, fio->fi_start);
617	pgoff_t end   = cl_index(obj, fio->fi_end);
618	int result = 0;
619
620	if (fio->fi_mode == CL_FSYNC_LOCAL) {
621		result = osc_cache_wait_range(env, cl2osc(obj), start, end);
622	} else if (fio->fi_mode == CL_FSYNC_ALL) {
623		struct osc_io	   *oio    = cl2osc_io(env, slice);
624		struct osc_async_cbargs *cbargs = &oio->oi_cbarg;
625
626		wait_for_completion(&cbargs->opc_sync);
627		if (result == 0)
628			result = cbargs->opc_rc;
629	}
630	slice->cis_io->ci_result = result;
631}
632
633static void osc_io_end(const struct lu_env *env,
634		       const struct cl_io_slice *slice)
635{
636	struct osc_io *oio = cl2osc_io(env, slice);
637
638	if (oio->oi_active) {
639		osc_extent_release(env, oio->oi_active);
640		oio->oi_active = NULL;
641	}
642}
643
644static const struct cl_io_operations osc_io_ops = {
645	.op = {
646		[CIT_READ] = {
647			.cio_start  = osc_io_read_start,
648			.cio_fini   = osc_io_fini
649		},
650		[CIT_WRITE] = {
651			.cio_start  = osc_io_write_start,
652			.cio_end    = osc_io_end,
653			.cio_fini   = osc_io_fini
654		},
655		[CIT_SETATTR] = {
656			.cio_start  = osc_io_setattr_start,
657			.cio_end    = osc_io_setattr_end
658		},
659		[CIT_FAULT] = {
660			.cio_start  = osc_io_fault_start,
661			.cio_end    = osc_io_end,
662			.cio_fini   = osc_io_fini
663		},
664		[CIT_FSYNC] = {
665			.cio_start  = osc_io_fsync_start,
666			.cio_end    = osc_io_fsync_end,
667			.cio_fini   = osc_io_fini
668		},
669		[CIT_MISC] = {
670			.cio_fini   = osc_io_fini
671		}
672	},
673	.req_op = {
674		 [CRT_READ] = {
675			 .cio_submit    = osc_io_submit
676		 },
677		 [CRT_WRITE] = {
678			 .cio_submit    = osc_io_submit
679		 }
680	 },
681	.cio_prepare_write = osc_io_prepare_write,
682	.cio_commit_write  = osc_io_commit_write
683};
684
685/*****************************************************************************
686 *
687 * Transfer operations.
688 *
689 */
690
691static int osc_req_prep(const struct lu_env *env,
692			const struct cl_req_slice *slice)
693{
694	return 0;
695}
696
697static void osc_req_completion(const struct lu_env *env,
698			       const struct cl_req_slice *slice, int ioret)
699{
700	struct osc_req *or;
701
702	or = cl2osc_req(slice);
703	OBD_SLAB_FREE_PTR(or, osc_req_kmem);
704}
705
706/**
707 * Implementation of struct cl_req_operations::cro_attr_set() for osc
708 * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq
709 * fields.
710 */
711static void osc_req_attr_set(const struct lu_env *env,
712			     const struct cl_req_slice *slice,
713			     const struct cl_object *obj,
714			     struct cl_req_attr *attr, obd_valid flags)
715{
716	struct lov_oinfo *oinfo;
717	struct cl_req    *clerq;
718	struct cl_page   *apage; /* _some_ page in @clerq */
719	struct cl_lock   *lock;  /* _some_ lock protecting @apage */
720	struct osc_lock  *olck;
721	struct osc_page  *opg;
722	struct obdo      *oa;
723	struct ost_lvb   *lvb;
724
725	oinfo	= cl2osc(obj)->oo_oinfo;
726	lvb	= &oinfo->loi_lvb;
727	oa	= attr->cra_oa;
728
729	if ((flags & OBD_MD_FLMTIME) != 0) {
730		oa->o_mtime = lvb->lvb_mtime;
731		oa->o_valid |= OBD_MD_FLMTIME;
732	}
733	if ((flags & OBD_MD_FLATIME) != 0) {
734		oa->o_atime = lvb->lvb_atime;
735		oa->o_valid |= OBD_MD_FLATIME;
736	}
737	if ((flags & OBD_MD_FLCTIME) != 0) {
738		oa->o_ctime = lvb->lvb_ctime;
739		oa->o_valid |= OBD_MD_FLCTIME;
740	}
741	if (flags & OBD_MD_FLGROUP) {
742		ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi));
743		oa->o_valid |= OBD_MD_FLGROUP;
744	}
745	if (flags & OBD_MD_FLID) {
746		ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi));
747		oa->o_valid |= OBD_MD_FLID;
748	}
749	if (flags & OBD_MD_FLHANDLE) {
750		clerq = slice->crs_req;
751		LASSERT(!list_empty(&clerq->crq_pages));
752		apage = container_of(clerq->crq_pages.next,
753				     struct cl_page, cp_flight);
754		opg = osc_cl_page_osc(apage);
755		apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */
756		lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1);
757		if (lock == NULL) {
758			struct cl_object_header *head;
759			struct cl_lock	  *scan;
760
761			head = cl_object_header(apage->cp_obj);
762			list_for_each_entry(scan, &head->coh_locks,
763						cll_linkage)
764				CL_LOCK_DEBUG(D_ERROR, env, scan,
765					      "no cover page!\n");
766			CL_PAGE_DEBUG(D_ERROR, env, apage,
767				      "dump uncover page!\n");
768			dump_stack();
769			LBUG();
770		}
771
772		olck = osc_lock_at(lock);
773		LASSERT(olck != NULL);
774		LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL));
775		/* check for lockless io. */
776		if (olck->ols_lock != NULL) {
777			oa->o_handle = olck->ols_lock->l_remote_handle;
778			oa->o_valid |= OBD_MD_FLHANDLE;
779		}
780		cl_lock_put(env, lock);
781	}
782}
783
784static const struct cl_req_operations osc_req_ops = {
785	.cro_prep       = osc_req_prep,
786	.cro_attr_set   = osc_req_attr_set,
787	.cro_completion = osc_req_completion
788};
789
790
791int osc_io_init(const struct lu_env *env,
792		struct cl_object *obj, struct cl_io *io)
793{
794	struct osc_io *oio = osc_env_io(env);
795
796	CL_IO_SLICE_CLEAN(oio, oi_cl);
797	cl_io_slice_add(io, &oio->oi_cl, obj, &osc_io_ops);
798	return 0;
799}
800
801int osc_req_init(const struct lu_env *env, struct cl_device *dev,
802		 struct cl_req *req)
803{
804	struct osc_req *or;
805	int result;
806
807	OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, GFP_NOFS);
808	if (or != NULL) {
809		cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
810		result = 0;
811	} else
812		result = -ENOMEM;
813	return result;
814}
815
816/** @} osc */
817