osc_io.c revision b0f5aad587ea1fc3563d056609ee54a961ee1256
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * Implementation of cl_io for OSC layer. 37 * 38 * Author: Nikita Danilov <nikita.danilov@sun.com> 39 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com> 40 */ 41 42#define DEBUG_SUBSYSTEM S_OSC 43 44#include "osc_cl_internal.h" 45 46/** \addtogroup osc 47 * @{ 48 */ 49 50/***************************************************************************** 51 * 52 * Type conversions. 53 * 54 */ 55 56static struct osc_req *cl2osc_req(const struct cl_req_slice *slice) 57{ 58 LINVRNT(slice->crs_dev->cd_lu_dev.ld_type == &osc_device_type); 59 return container_of0(slice, struct osc_req, or_cl); 60} 61 62static struct osc_io *cl2osc_io(const struct lu_env *env, 63 const struct cl_io_slice *slice) 64{ 65 struct osc_io *oio = container_of0(slice, struct osc_io, oi_cl); 66 LINVRNT(oio == osc_env_io(env)); 67 return oio; 68} 69 70static struct osc_page *osc_cl_page_osc(struct cl_page *page) 71{ 72 const struct cl_page_slice *slice; 73 74 slice = cl_page_at(page, &osc_device_type); 75 LASSERT(slice != NULL); 76 77 return cl2osc_page(slice); 78} 79 80 81/***************************************************************************** 82 * 83 * io operations. 84 * 85 */ 86 87static void osc_io_fini(const struct lu_env *env, const struct cl_io_slice *io) 88{ 89} 90 91/** 92 * An implementation of cl_io_operations::cio_io_submit() method for osc 93 * layer. Iterates over pages in the in-queue, prepares each for io by calling 94 * cl_page_prep() and then either submits them through osc_io_submit_page() 95 * or, if page is already submitted, changes osc flags through 96 * osc_set_async_flags(). 97 */ 98static int osc_io_submit(const struct lu_env *env, 99 const struct cl_io_slice *ios, 100 enum cl_req_type crt, struct cl_2queue *queue) 101{ 102 struct cl_page *page; 103 struct cl_page *tmp; 104 struct client_obd *cli = NULL; 105 struct osc_object *osc = NULL; /* to keep gcc happy */ 106 struct osc_page *opg; 107 struct cl_io *io; 108 LIST_HEAD(list); 109 110 struct cl_page_list *qin = &queue->c2_qin; 111 struct cl_page_list *qout = &queue->c2_qout; 112 int queued = 0; 113 int result = 0; 114 int cmd; 115 int brw_flags; 116 int max_pages; 117 118 LASSERT(qin->pl_nr > 0); 119 120 CDEBUG(D_CACHE, "%d %d\n", qin->pl_nr, crt); 121 122 osc = cl2osc(ios->cis_obj); 123 cli = osc_cli(osc); 124 max_pages = cli->cl_max_pages_per_rpc; 125 126 cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ; 127 brw_flags = osc_io_srvlock(cl2osc_io(env, ios)) ? OBD_BRW_SRVLOCK : 0; 128 129 /* 130 * NOTE: here @page is a top-level page. This is done to avoid 131 * creation of sub-page-list. 132 */ 133 cl_page_list_for_each_safe(page, tmp, qin) { 134 struct osc_async_page *oap; 135 136 /* Top level IO. */ 137 io = page->cp_owner; 138 LASSERT(io != NULL); 139 140 opg = osc_cl_page_osc(page); 141 oap = &opg->ops_oap; 142 LASSERT(osc == oap->oap_obj); 143 144 if (!list_empty(&oap->oap_pending_item) || 145 !list_empty(&oap->oap_rpc_item)) { 146 CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n", 147 oap, opg); 148 result = -EBUSY; 149 break; 150 } 151 152 result = cl_page_prep(env, io, page, crt); 153 if (result != 0) { 154 LASSERT(result < 0); 155 if (result != -EALREADY) 156 break; 157 /* 158 * Handle -EALREADY error: for read case, the page is 159 * already in UPTODATE state; for write, the page 160 * is not dirty. 161 */ 162 result = 0; 163 continue; 164 } 165 166 cl_page_list_move(qout, qin, page); 167 oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY; 168 oap->oap_async_flags |= ASYNC_COUNT_STABLE; 169 170 osc_page_submit(env, opg, crt, brw_flags); 171 list_add_tail(&oap->oap_pending_item, &list); 172 if (++queued == max_pages) { 173 queued = 0; 174 result = osc_queue_sync_pages(env, osc, &list, cmd, 175 brw_flags); 176 if (result < 0) 177 break; 178 } 179 } 180 181 if (queued > 0) 182 result = osc_queue_sync_pages(env, osc, &list, cmd, brw_flags); 183 184 CDEBUG(D_INFO, "%d/%d %d\n", qin->pl_nr, qout->pl_nr, result); 185 return qout->pl_nr > 0 ? 0 : result; 186} 187 188static void osc_page_touch_at(const struct lu_env *env, 189 struct cl_object *obj, pgoff_t idx, unsigned to) 190{ 191 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; 192 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 193 int valid; 194 __u64 kms; 195 196 /* offset within stripe */ 197 kms = cl_offset(obj, idx) + to; 198 199 cl_object_attr_lock(obj); 200 /* 201 * XXX old code used 202 * 203 * ll_inode_size_lock(inode, 0); lov_stripe_lock(lsm); 204 * 205 * here 206 */ 207 CDEBUG(D_INODE, "stripe KMS %sincreasing %llu->%llu %llu\n", 208 kms > loi->loi_kms ? "" : "not ", loi->loi_kms, kms, 209 loi->loi_lvb.lvb_size); 210 211 valid = 0; 212 if (kms > loi->loi_kms) { 213 attr->cat_kms = kms; 214 valid |= CAT_KMS; 215 } 216 if (kms > loi->loi_lvb.lvb_size) { 217 attr->cat_size = kms; 218 valid |= CAT_SIZE; 219 } 220 cl_object_attr_set(env, obj, attr, valid); 221 cl_object_attr_unlock(obj); 222} 223 224/** 225 * This is called when a page is accessed within file in a way that creates 226 * new page, if one were missing (i.e., if there were a hole at that place in 227 * the file, or accessed page is beyond the current file size). Examples: 228 * ->commit_write() and ->nopage() methods. 229 * 230 * Expand stripe KMS if necessary. 231 */ 232static void osc_page_touch(const struct lu_env *env, 233 struct osc_page *opage, unsigned to) 234{ 235 struct cl_page *page = opage->ops_cl.cpl_page; 236 struct cl_object *obj = opage->ops_cl.cpl_obj; 237 238 osc_page_touch_at(env, obj, page->cp_index, to); 239} 240 241/** 242 * Implements cl_io_operations::cio_prepare_write() method for osc layer. 243 * 244 * \retval -EIO transfer initiated against this osc will most likely fail 245 * \retval 0 transfer initiated against this osc will most likely succeed. 246 * 247 * The reason for this check is to immediately return an error to the caller 248 * in the case of a deactivated import. Note, that import can be deactivated 249 * later, while pages, dirtied by this IO, are still in the cache, but this is 250 * irrelevant, because that would still return an error to the application (if 251 * it does fsync), but many applications don't do fsync because of performance 252 * issues, and we wanted to return an -EIO at write time to notify the 253 * application. 254 */ 255static int osc_io_prepare_write(const struct lu_env *env, 256 const struct cl_io_slice *ios, 257 const struct cl_page_slice *slice, 258 unsigned from, unsigned to) 259{ 260 struct osc_device *dev = lu2osc_dev(slice->cpl_obj->co_lu.lo_dev); 261 struct obd_import *imp = class_exp2cliimp(dev->od_exp); 262 struct osc_io *oio = cl2osc_io(env, ios); 263 int result = 0; 264 265 /* 266 * This implements OBD_BRW_CHECK logic from old client. 267 */ 268 269 if (imp == NULL || imp->imp_invalid) 270 result = -EIO; 271 if (result == 0 && oio->oi_lockless) 272 /* this page contains `invalid' data, but who cares? 273 * nobody can access the invalid data. 274 * in osc_io_commit_write(), we're going to write exact 275 * [from, to) bytes of this page to OST. -jay */ 276 cl_page_export(env, slice->cpl_page, 1); 277 278 return result; 279} 280 281static int osc_io_commit_write(const struct lu_env *env, 282 const struct cl_io_slice *ios, 283 const struct cl_page_slice *slice, 284 unsigned from, unsigned to) 285{ 286 struct osc_io *oio = cl2osc_io(env, ios); 287 struct osc_page *opg = cl2osc_page(slice); 288 struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj); 289 struct osc_async_page *oap = &opg->ops_oap; 290 291 LASSERT(to > 0); 292 /* 293 * XXX instead of calling osc_page_touch() here and in 294 * osc_io_fault_start() it might be more logical to introduce 295 * cl_page_touch() method, that generic cl_io_commit_write() and page 296 * fault code calls. 297 */ 298 osc_page_touch(env, cl2osc_page(slice), to); 299 if (!client_is_remote(osc_export(obj)) && 300 capable(CFS_CAP_SYS_RESOURCE)) 301 oap->oap_brw_flags |= OBD_BRW_NOQUOTA; 302 303 if (oio->oi_lockless) 304 /* see osc_io_prepare_write() for lockless io handling. */ 305 cl_page_clip(env, slice->cpl_page, from, to); 306 307 return 0; 308} 309 310static int osc_io_fault_start(const struct lu_env *env, 311 const struct cl_io_slice *ios) 312{ 313 struct cl_io *io; 314 struct cl_fault_io *fio; 315 316 io = ios->cis_io; 317 fio = &io->u.ci_fault; 318 CDEBUG(D_INFO, "%lu %d %d\n", 319 fio->ft_index, fio->ft_writable, fio->ft_nob); 320 /* 321 * If mapping is writeable, adjust kms to cover this page, 322 * but do not extend kms beyond actual file size. 323 * See bug 10919. 324 */ 325 if (fio->ft_writable) 326 osc_page_touch_at(env, ios->cis_obj, 327 fio->ft_index, fio->ft_nob); 328 return 0; 329} 330 331static int osc_async_upcall(void *a, int rc) 332{ 333 struct osc_async_cbargs *args = a; 334 335 args->opc_rc = rc; 336 complete(&args->opc_sync); 337 return 0; 338} 339 340/** 341 * Checks that there are no pages being written in the extent being truncated. 342 */ 343static int trunc_check_cb(const struct lu_env *env, struct cl_io *io, 344 struct cl_page *page, void *cbdata) 345{ 346 const struct cl_page_slice *slice; 347 struct osc_page *ops; 348 struct osc_async_page *oap; 349 __u64 start = *(__u64 *)cbdata; 350 351 slice = cl_page_at(page, &osc_device_type); 352 LASSERT(slice != NULL); 353 ops = cl2osc_page(slice); 354 oap = &ops->ops_oap; 355 356 if (oap->oap_cmd & OBD_BRW_WRITE && 357 !list_empty(&oap->oap_pending_item)) 358 CL_PAGE_DEBUG(D_ERROR, env, page, "exists %llu/%s.\n", 359 start, current->comm); 360 361 { 362 struct page *vmpage = cl_page_vmpage(env, page); 363 if (PageLocked(vmpage)) 364 CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n", 365 ops, page->cp_index, 366 (oap->oap_cmd & OBD_BRW_RWMASK)); 367 } 368 369 return CLP_GANG_OKAY; 370} 371 372static void osc_trunc_check(const struct lu_env *env, struct cl_io *io, 373 struct osc_io *oio, __u64 size) 374{ 375 struct cl_object *clob; 376 int partial; 377 pgoff_t start; 378 379 clob = oio->oi_cl.cis_obj; 380 start = cl_index(clob, size); 381 partial = cl_offset(clob, start) < size; 382 383 /* 384 * Complain if there are pages in the truncated region. 385 */ 386 cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, 387 trunc_check_cb, (void *)&size); 388} 389 390static int osc_io_setattr_start(const struct lu_env *env, 391 const struct cl_io_slice *slice) 392{ 393 struct cl_io *io = slice->cis_io; 394 struct osc_io *oio = cl2osc_io(env, slice); 395 struct cl_object *obj = slice->cis_obj; 396 struct lov_oinfo *loi = cl2osc(obj)->oo_oinfo; 397 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 398 struct obdo *oa = &oio->oi_oa; 399 struct osc_async_cbargs *cbargs = &oio->oi_cbarg; 400 __u64 size = io->u.ci_setattr.sa_attr.lvb_size; 401 unsigned int ia_valid = io->u.ci_setattr.sa_valid; 402 int result = 0; 403 struct obd_info oinfo = { { { 0 } } }; 404 405 /* truncate cache dirty pages first */ 406 if (cl_io_is_trunc(io)) 407 result = osc_cache_truncate_start(env, oio, cl2osc(obj), size); 408 409 if (result == 0 && oio->oi_lockless == 0) { 410 cl_object_attr_lock(obj); 411 result = cl_object_attr_get(env, obj, attr); 412 if (result == 0) { 413 struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr; 414 unsigned int cl_valid = 0; 415 416 if (ia_valid & ATTR_SIZE) { 417 attr->cat_size = attr->cat_kms = size; 418 cl_valid = (CAT_SIZE | CAT_KMS); 419 } 420 if (ia_valid & ATTR_MTIME_SET) { 421 attr->cat_mtime = lvb->lvb_mtime; 422 cl_valid |= CAT_MTIME; 423 } 424 if (ia_valid & ATTR_ATIME_SET) { 425 attr->cat_atime = lvb->lvb_atime; 426 cl_valid |= CAT_ATIME; 427 } 428 if (ia_valid & ATTR_CTIME_SET) { 429 attr->cat_ctime = lvb->lvb_ctime; 430 cl_valid |= CAT_CTIME; 431 } 432 result = cl_object_attr_set(env, obj, attr, cl_valid); 433 } 434 cl_object_attr_unlock(obj); 435 } 436 memset(oa, 0, sizeof(*oa)); 437 if (result == 0) { 438 oa->o_oi = loi->loi_oi; 439 oa->o_mtime = attr->cat_mtime; 440 oa->o_atime = attr->cat_atime; 441 oa->o_ctime = attr->cat_ctime; 442 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | 443 OBD_MD_FLCTIME | OBD_MD_FLMTIME; 444 if (ia_valid & ATTR_SIZE) { 445 oa->o_size = size; 446 oa->o_blocks = OBD_OBJECT_EOF; 447 oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; 448 449 if (oio->oi_lockless) { 450 oa->o_flags = OBD_FL_SRVLOCK; 451 oa->o_valid |= OBD_MD_FLFLAGS; 452 } 453 } else { 454 LASSERT(oio->oi_lockless == 0); 455 } 456 457 oinfo.oi_oa = oa; 458 oinfo.oi_capa = io->u.ci_setattr.sa_capa; 459 init_completion(&cbargs->opc_sync); 460 461 if (ia_valid & ATTR_SIZE) 462 result = osc_punch_base(osc_export(cl2osc(obj)), 463 &oinfo, osc_async_upcall, 464 cbargs, PTLRPCD_SET); 465 else 466 result = osc_setattr_async_base(osc_export(cl2osc(obj)), 467 &oinfo, NULL, 468 osc_async_upcall, 469 cbargs, PTLRPCD_SET); 470 cbargs->opc_rpc_sent = result == 0; 471 } 472 return result; 473} 474 475static void osc_io_setattr_end(const struct lu_env *env, 476 const struct cl_io_slice *slice) 477{ 478 struct cl_io *io = slice->cis_io; 479 struct osc_io *oio = cl2osc_io(env, slice); 480 struct cl_object *obj = slice->cis_obj; 481 struct osc_async_cbargs *cbargs = &oio->oi_cbarg; 482 int result = 0; 483 484 if (cbargs->opc_rpc_sent) { 485 wait_for_completion(&cbargs->opc_sync); 486 result = io->ci_result = cbargs->opc_rc; 487 } 488 if (result == 0) { 489 if (oio->oi_lockless) { 490 /* lockless truncate */ 491 struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev); 492 493 LASSERT(cl_io_is_trunc(io)); 494 /* XXX: Need a lock. */ 495 osd->od_stats.os_lockless_truncates++; 496 } 497 } 498 499 if (cl_io_is_trunc(io)) { 500 __u64 size = io->u.ci_setattr.sa_attr.lvb_size; 501 osc_trunc_check(env, io, oio, size); 502 if (oio->oi_trunc != NULL) { 503 osc_cache_truncate_end(env, oio, cl2osc(obj)); 504 oio->oi_trunc = NULL; 505 } 506 } 507} 508 509static int osc_io_read_start(const struct lu_env *env, 510 const struct cl_io_slice *slice) 511{ 512 struct cl_object *obj = slice->cis_obj; 513 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 514 int rc = 0; 515 516 if (!slice->cis_io->ci_noatime) { 517 cl_object_attr_lock(obj); 518 attr->cat_atime = LTIME_S(CURRENT_TIME); 519 rc = cl_object_attr_set(env, obj, attr, CAT_ATIME); 520 cl_object_attr_unlock(obj); 521 } 522 return rc; 523} 524 525static int osc_io_write_start(const struct lu_env *env, 526 const struct cl_io_slice *slice) 527{ 528 struct cl_object *obj = slice->cis_obj; 529 struct cl_attr *attr = &osc_env_info(env)->oti_attr; 530 int rc = 0; 531 532 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1); 533 cl_object_attr_lock(obj); 534 attr->cat_mtime = attr->cat_ctime = LTIME_S(CURRENT_TIME); 535 rc = cl_object_attr_set(env, obj, attr, CAT_MTIME | CAT_CTIME); 536 cl_object_attr_unlock(obj); 537 538 return rc; 539} 540 541static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj, 542 struct cl_fsync_io *fio) 543{ 544 struct osc_io *oio = osc_env_io(env); 545 struct obdo *oa = &oio->oi_oa; 546 struct obd_info *oinfo = &oio->oi_info; 547 struct lov_oinfo *loi = obj->oo_oinfo; 548 struct osc_async_cbargs *cbargs = &oio->oi_cbarg; 549 int rc = 0; 550 551 memset(oa, 0, sizeof(*oa)); 552 oa->o_oi = loi->loi_oi; 553 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP; 554 555 /* reload size abd blocks for start and end of sync range */ 556 oa->o_size = fio->fi_start; 557 oa->o_blocks = fio->fi_end; 558 oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; 559 560 obdo_set_parent_fid(oa, fio->fi_fid); 561 562 memset(oinfo, 0, sizeof(*oinfo)); 563 oinfo->oi_oa = oa; 564 oinfo->oi_capa = fio->fi_capa; 565 init_completion(&cbargs->opc_sync); 566 567 rc = osc_sync_base(osc_export(obj), oinfo, osc_async_upcall, cbargs, 568 PTLRPCD_SET); 569 return rc; 570} 571 572static int osc_io_fsync_start(const struct lu_env *env, 573 const struct cl_io_slice *slice) 574{ 575 struct cl_io *io = slice->cis_io; 576 struct cl_fsync_io *fio = &io->u.ci_fsync; 577 struct cl_object *obj = slice->cis_obj; 578 struct osc_object *osc = cl2osc(obj); 579 pgoff_t start = cl_index(obj, fio->fi_start); 580 pgoff_t end = cl_index(obj, fio->fi_end); 581 int result = 0; 582 583 if (fio->fi_end == OBD_OBJECT_EOF) 584 end = CL_PAGE_EOF; 585 586 result = osc_cache_writeback_range(env, osc, start, end, 0, 587 fio->fi_mode == CL_FSYNC_DISCARD); 588 if (result > 0) { 589 fio->fi_nr_written += result; 590 result = 0; 591 } 592 if (fio->fi_mode == CL_FSYNC_ALL) { 593 int rc; 594 595 /* we have to wait for writeback to finish before we can 596 * send OST_SYNC RPC. This is bad because it causes extents 597 * to be written osc by osc. However, we usually start 598 * writeback before CL_FSYNC_ALL so this won't have any real 599 * problem. */ 600 rc = osc_cache_wait_range(env, osc, start, end); 601 if (result == 0) 602 result = rc; 603 rc = osc_fsync_ost(env, osc, fio); 604 if (result == 0) 605 result = rc; 606 } 607 608 return result; 609} 610 611static void osc_io_fsync_end(const struct lu_env *env, 612 const struct cl_io_slice *slice) 613{ 614 struct cl_fsync_io *fio = &slice->cis_io->u.ci_fsync; 615 struct cl_object *obj = slice->cis_obj; 616 pgoff_t start = cl_index(obj, fio->fi_start); 617 pgoff_t end = cl_index(obj, fio->fi_end); 618 int result = 0; 619 620 if (fio->fi_mode == CL_FSYNC_LOCAL) { 621 result = osc_cache_wait_range(env, cl2osc(obj), start, end); 622 } else if (fio->fi_mode == CL_FSYNC_ALL) { 623 struct osc_io *oio = cl2osc_io(env, slice); 624 struct osc_async_cbargs *cbargs = &oio->oi_cbarg; 625 626 wait_for_completion(&cbargs->opc_sync); 627 if (result == 0) 628 result = cbargs->opc_rc; 629 } 630 slice->cis_io->ci_result = result; 631} 632 633static void osc_io_end(const struct lu_env *env, 634 const struct cl_io_slice *slice) 635{ 636 struct osc_io *oio = cl2osc_io(env, slice); 637 638 if (oio->oi_active) { 639 osc_extent_release(env, oio->oi_active); 640 oio->oi_active = NULL; 641 } 642} 643 644static const struct cl_io_operations osc_io_ops = { 645 .op = { 646 [CIT_READ] = { 647 .cio_start = osc_io_read_start, 648 .cio_fini = osc_io_fini 649 }, 650 [CIT_WRITE] = { 651 .cio_start = osc_io_write_start, 652 .cio_end = osc_io_end, 653 .cio_fini = osc_io_fini 654 }, 655 [CIT_SETATTR] = { 656 .cio_start = osc_io_setattr_start, 657 .cio_end = osc_io_setattr_end 658 }, 659 [CIT_FAULT] = { 660 .cio_start = osc_io_fault_start, 661 .cio_end = osc_io_end, 662 .cio_fini = osc_io_fini 663 }, 664 [CIT_FSYNC] = { 665 .cio_start = osc_io_fsync_start, 666 .cio_end = osc_io_fsync_end, 667 .cio_fini = osc_io_fini 668 }, 669 [CIT_MISC] = { 670 .cio_fini = osc_io_fini 671 } 672 }, 673 .req_op = { 674 [CRT_READ] = { 675 .cio_submit = osc_io_submit 676 }, 677 [CRT_WRITE] = { 678 .cio_submit = osc_io_submit 679 } 680 }, 681 .cio_prepare_write = osc_io_prepare_write, 682 .cio_commit_write = osc_io_commit_write 683}; 684 685/***************************************************************************** 686 * 687 * Transfer operations. 688 * 689 */ 690 691static int osc_req_prep(const struct lu_env *env, 692 const struct cl_req_slice *slice) 693{ 694 return 0; 695} 696 697static void osc_req_completion(const struct lu_env *env, 698 const struct cl_req_slice *slice, int ioret) 699{ 700 struct osc_req *or; 701 702 or = cl2osc_req(slice); 703 OBD_SLAB_FREE_PTR(or, osc_req_kmem); 704} 705 706/** 707 * Implementation of struct cl_req_operations::cro_attr_set() for osc 708 * layer. osc is responsible for struct obdo::o_id and struct obdo::o_seq 709 * fields. 710 */ 711static void osc_req_attr_set(const struct lu_env *env, 712 const struct cl_req_slice *slice, 713 const struct cl_object *obj, 714 struct cl_req_attr *attr, obd_valid flags) 715{ 716 struct lov_oinfo *oinfo; 717 struct cl_req *clerq; 718 struct cl_page *apage; /* _some_ page in @clerq */ 719 struct cl_lock *lock; /* _some_ lock protecting @apage */ 720 struct osc_lock *olck; 721 struct osc_page *opg; 722 struct obdo *oa; 723 struct ost_lvb *lvb; 724 725 oinfo = cl2osc(obj)->oo_oinfo; 726 lvb = &oinfo->loi_lvb; 727 oa = attr->cra_oa; 728 729 if ((flags & OBD_MD_FLMTIME) != 0) { 730 oa->o_mtime = lvb->lvb_mtime; 731 oa->o_valid |= OBD_MD_FLMTIME; 732 } 733 if ((flags & OBD_MD_FLATIME) != 0) { 734 oa->o_atime = lvb->lvb_atime; 735 oa->o_valid |= OBD_MD_FLATIME; 736 } 737 if ((flags & OBD_MD_FLCTIME) != 0) { 738 oa->o_ctime = lvb->lvb_ctime; 739 oa->o_valid |= OBD_MD_FLCTIME; 740 } 741 if (flags & OBD_MD_FLGROUP) { 742 ostid_set_seq(&oa->o_oi, ostid_seq(&oinfo->loi_oi)); 743 oa->o_valid |= OBD_MD_FLGROUP; 744 } 745 if (flags & OBD_MD_FLID) { 746 ostid_set_id(&oa->o_oi, ostid_id(&oinfo->loi_oi)); 747 oa->o_valid |= OBD_MD_FLID; 748 } 749 if (flags & OBD_MD_FLHANDLE) { 750 clerq = slice->crs_req; 751 LASSERT(!list_empty(&clerq->crq_pages)); 752 apage = container_of(clerq->crq_pages.next, 753 struct cl_page, cp_flight); 754 opg = osc_cl_page_osc(apage); 755 apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */ 756 lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1); 757 if (lock == NULL) { 758 struct cl_object_header *head; 759 struct cl_lock *scan; 760 761 head = cl_object_header(apage->cp_obj); 762 list_for_each_entry(scan, &head->coh_locks, 763 cll_linkage) 764 CL_LOCK_DEBUG(D_ERROR, env, scan, 765 "no cover page!\n"); 766 CL_PAGE_DEBUG(D_ERROR, env, apage, 767 "dump uncover page!\n"); 768 dump_stack(); 769 LBUG(); 770 } 771 772 olck = osc_lock_at(lock); 773 LASSERT(olck != NULL); 774 LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL)); 775 /* check for lockless io. */ 776 if (olck->ols_lock != NULL) { 777 oa->o_handle = olck->ols_lock->l_remote_handle; 778 oa->o_valid |= OBD_MD_FLHANDLE; 779 } 780 cl_lock_put(env, lock); 781 } 782} 783 784static const struct cl_req_operations osc_req_ops = { 785 .cro_prep = osc_req_prep, 786 .cro_attr_set = osc_req_attr_set, 787 .cro_completion = osc_req_completion 788}; 789 790 791int osc_io_init(const struct lu_env *env, 792 struct cl_object *obj, struct cl_io *io) 793{ 794 struct osc_io *oio = osc_env_io(env); 795 796 CL_IO_SLICE_CLEAN(oio, oi_cl); 797 cl_io_slice_add(io, &oio->oi_cl, obj, &osc_io_ops); 798 return 0; 799} 800 801int osc_req_init(const struct lu_env *env, struct cl_device *dev, 802 struct cl_req *req) 803{ 804 struct osc_req *or; 805 int result; 806 807 OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, GFP_NOFS); 808 if (or != NULL) { 809 cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops); 810 result = 0; 811 } else 812 result = -ENOMEM; 813 return result; 814} 815 816/** @} osc */ 817