rw.c revision 65fb55d19421b9862beea398063816d2cb047907
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/llite/rw.c 37 * 38 * Lustre Lite I/O page cache routines shared by different kernel revs 39 */ 40 41#include <linux/kernel.h> 42#include <linux/mm.h> 43#include <linux/string.h> 44#include <linux/stat.h> 45#include <linux/errno.h> 46#include <linux/unistd.h> 47#include <linux/writeback.h> 48#include <asm/uaccess.h> 49 50#include <linux/fs.h> 51#include <linux/stat.h> 52#include <asm/uaccess.h> 53#include <linux/mm.h> 54#include <linux/pagemap.h> 55/* current_is_kswapd() */ 56#include <linux/swap.h> 57 58#define DEBUG_SUBSYSTEM S_LLITE 59 60#include <lustre_lite.h> 61#include <obd_cksum.h> 62#include "llite_internal.h" 63#include <linux/lustre_compat25.h> 64 65/** 66 * Finalizes cl-data before exiting typical address_space operation. Dual to 67 * ll_cl_init(). 68 */ 69static void ll_cl_fini(struct ll_cl_context *lcc) 70{ 71 struct lu_env *env = lcc->lcc_env; 72 struct cl_io *io = lcc->lcc_io; 73 struct cl_page *page = lcc->lcc_page; 74 75 LASSERT(lcc->lcc_cookie == current); 76 LASSERT(env != NULL); 77 78 if (page != NULL) { 79 lu_ref_del(&page->cp_reference, "cl_io", io); 80 cl_page_put(env, page); 81 } 82 83 if (io && lcc->lcc_created) { 84 cl_io_end(env, io); 85 cl_io_unlock(env, io); 86 cl_io_iter_fini(env, io); 87 cl_io_fini(env, io); 88 } 89 cl_env_put(env, &lcc->lcc_refcheck); 90} 91 92/** 93 * Initializes common cl-data at the typical address_space operation entry 94 * point. 95 */ 96static struct ll_cl_context *ll_cl_init(struct file *file, 97 struct page *vmpage, int create) 98{ 99 struct ll_cl_context *lcc; 100 struct lu_env *env; 101 struct cl_io *io; 102 struct cl_object *clob; 103 struct ccc_io *cio; 104 105 int refcheck; 106 int result = 0; 107 108 clob = ll_i2info(vmpage->mapping->host)->lli_clob; 109 LASSERT(clob != NULL); 110 111 env = cl_env_get(&refcheck); 112 if (IS_ERR(env)) 113 return ERR_PTR(PTR_ERR(env)); 114 115 lcc = &vvp_env_info(env)->vti_io_ctx; 116 memset(lcc, 0, sizeof(*lcc)); 117 lcc->lcc_env = env; 118 lcc->lcc_refcheck = refcheck; 119 lcc->lcc_cookie = current; 120 121 cio = ccc_env_io(env); 122 io = cio->cui_cl.cis_io; 123 if (io == NULL && create) { 124 struct inode *inode = vmpage->mapping->host; 125 loff_t pos; 126 127 if (mutex_trylock(&inode->i_mutex)) { 128 mutex_unlock(&(inode)->i_mutex); 129 130 /* this is too bad. Someone is trying to write the 131 * page w/o holding inode mutex. This means we can 132 * add dirty pages into cache during truncate */ 133 CERROR("Proc %s is dirting page w/o inode lock, this" 134 "will break truncate.\n", current->comm); 135 libcfs_debug_dumpstack(NULL); 136 LBUG(); 137 return ERR_PTR(-EIO); 138 } 139 140 /* 141 * Loop-back driver calls ->prepare_write() and ->sendfile() 142 * methods directly, bypassing file system ->write() operation, 143 * so cl_io has to be created here. 144 */ 145 io = ccc_env_thread_io(env); 146 ll_io_init(io, file, 1); 147 148 /* No lock at all for this kind of IO - we can't do it because 149 * we have held page lock, it would cause deadlock. 150 * XXX: This causes poor performance to loop device - One page 151 * per RPC. 152 * In order to get better performance, users should use 153 * lloop driver instead. 154 */ 155 io->ci_lockreq = CILR_NEVER; 156 157 pos = (vmpage->index << PAGE_CACHE_SHIFT); 158 159 /* Create a temp IO to serve write. */ 160 result = cl_io_rw_init(env, io, CIT_WRITE, pos, PAGE_CACHE_SIZE); 161 if (result == 0) { 162 cio->cui_fd = LUSTRE_FPRIVATE(file); 163 cio->cui_iov = NULL; 164 cio->cui_nrsegs = 0; 165 result = cl_io_iter_init(env, io); 166 if (result == 0) { 167 result = cl_io_lock(env, io); 168 if (result == 0) 169 result = cl_io_start(env, io); 170 } 171 } else 172 result = io->ci_result; 173 lcc->lcc_created = 1; 174 } 175 176 lcc->lcc_io = io; 177 if (io == NULL) 178 result = -EIO; 179 if (result == 0) { 180 struct cl_page *page; 181 182 LASSERT(io != NULL); 183 LASSERT(io->ci_state == CIS_IO_GOING); 184 LASSERT(cio->cui_fd == LUSTRE_FPRIVATE(file)); 185 page = cl_page_find(env, clob, vmpage->index, vmpage, 186 CPT_CACHEABLE); 187 if (!IS_ERR(page)) { 188 lcc->lcc_page = page; 189 lu_ref_add(&page->cp_reference, "cl_io", io); 190 result = 0; 191 } else 192 result = PTR_ERR(page); 193 } 194 if (result) { 195 ll_cl_fini(lcc); 196 lcc = ERR_PTR(result); 197 } 198 199 CDEBUG(D_VFSTRACE, "%lu@"DFID" -> %d %p %p\n", 200 vmpage->index, PFID(lu_object_fid(&clob->co_lu)), result, 201 env, io); 202 return lcc; 203} 204 205static struct ll_cl_context *ll_cl_get(void) 206{ 207 struct ll_cl_context *lcc; 208 struct lu_env *env; 209 int refcheck; 210 211 env = cl_env_get(&refcheck); 212 LASSERT(!IS_ERR(env)); 213 lcc = &vvp_env_info(env)->vti_io_ctx; 214 LASSERT(env == lcc->lcc_env); 215 LASSERT(current == lcc->lcc_cookie); 216 cl_env_put(env, &refcheck); 217 218 /* env has got in ll_cl_init, so it is still usable. */ 219 return lcc; 220} 221 222/** 223 * ->prepare_write() address space operation called by generic_file_write() 224 * for every page during write. 225 */ 226int ll_prepare_write(struct file *file, struct page *vmpage, unsigned from, 227 unsigned to) 228{ 229 struct ll_cl_context *lcc; 230 int result; 231 ENTRY; 232 233 lcc = ll_cl_init(file, vmpage, 1); 234 if (!IS_ERR(lcc)) { 235 struct lu_env *env = lcc->lcc_env; 236 struct cl_io *io = lcc->lcc_io; 237 struct cl_page *page = lcc->lcc_page; 238 239 cl_page_assume(env, io, page); 240 241 result = cl_io_prepare_write(env, io, page, from, to); 242 if (result == 0) { 243 /* 244 * Add a reference, so that page is not evicted from 245 * the cache until ->commit_write() is called. 246 */ 247 cl_page_get(page); 248 lu_ref_add(&page->cp_reference, "prepare_write", 249 current); 250 } else { 251 cl_page_unassume(env, io, page); 252 ll_cl_fini(lcc); 253 } 254 /* returning 0 in prepare assumes commit must be called 255 * afterwards */ 256 } else { 257 result = PTR_ERR(lcc); 258 } 259 RETURN(result); 260} 261 262int ll_commit_write(struct file *file, struct page *vmpage, unsigned from, 263 unsigned to) 264{ 265 struct ll_cl_context *lcc; 266 struct lu_env *env; 267 struct cl_io *io; 268 struct cl_page *page; 269 int result = 0; 270 ENTRY; 271 272 lcc = ll_cl_get(); 273 env = lcc->lcc_env; 274 page = lcc->lcc_page; 275 io = lcc->lcc_io; 276 277 LASSERT(cl_page_is_owned(page, io)); 278 LASSERT(from <= to); 279 if (from != to) /* handle short write case. */ 280 result = cl_io_commit_write(env, io, page, from, to); 281 if (cl_page_is_owned(page, io)) 282 cl_page_unassume(env, io, page); 283 284 /* 285 * Release reference acquired by ll_prepare_write(). 286 */ 287 lu_ref_del(&page->cp_reference, "prepare_write", current); 288 cl_page_put(env, page); 289 ll_cl_fini(lcc); 290 RETURN(result); 291} 292 293struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt) 294{ 295 __u64 opc; 296 297 opc = crt == CRT_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW; 298 return ll_osscapa_get(inode, opc); 299} 300 301static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which); 302 303/** 304 * Get readahead pages from the filesystem readahead pool of the client for a 305 * thread. 306 * 307 * /param sbi superblock for filesystem readahead state ll_ra_info 308 * /param ria per-thread readahead state 309 * /param pages number of pages requested for readahead for the thread. 310 * 311 * WARNING: This algorithm is used to reduce contention on sbi->ll_lock. 312 * It should work well if the ra_max_pages is much greater than the single 313 * file's read-ahead window, and not too many threads contending for 314 * these readahead pages. 315 * 316 * TODO: There may be a 'global sync problem' if many threads are trying 317 * to get an ra budget that is larger than the remaining readahead pages 318 * and reach here at exactly the same time. They will compute /a ret to 319 * consume the remaining pages, but will fail at atomic_add_return() and 320 * get a zero ra window, although there is still ra space remaining. - Jay */ 321 322static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, 323 struct ra_io_arg *ria, 324 unsigned long pages) 325{ 326 struct ll_ra_info *ra = &sbi->ll_ra_info; 327 long ret; 328 ENTRY; 329 330 /* If read-ahead pages left are less than 1M, do not do read-ahead, 331 * otherwise it will form small read RPC(< 1M), which hurt server 332 * performance a lot. */ 333 ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), pages); 334 if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages)) 335 GOTO(out, ret = 0); 336 337 /* If the non-strided (ria_pages == 0) readahead window 338 * (ria_start + ret) has grown across an RPC boundary, then trim 339 * readahead size by the amount beyond the RPC so it ends on an 340 * RPC boundary. If the readahead window is already ending on 341 * an RPC boundary (beyond_rpc == 0), or smaller than a full 342 * RPC (beyond_rpc < ret) the readahead size is unchanged. 343 * The (beyond_rpc != 0) check is skipped since the conditional 344 * branch is more expensive than subtracting zero from the result. 345 * 346 * Strided read is left unaligned to avoid small fragments beyond 347 * the RPC boundary from needing an extra read RPC. */ 348 if (ria->ria_pages == 0) { 349 long beyond_rpc = (ria->ria_start + ret) % PTLRPC_MAX_BRW_PAGES; 350 if (/* beyond_rpc != 0 && */ beyond_rpc < ret) 351 ret -= beyond_rpc; 352 } 353 354 if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) { 355 atomic_sub(ret, &ra->ra_cur_pages); 356 ret = 0; 357 } 358 359out: 360 RETURN(ret); 361} 362 363void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len) 364{ 365 struct ll_ra_info *ra = &sbi->ll_ra_info; 366 atomic_sub(len, &ra->ra_cur_pages); 367} 368 369static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which) 370{ 371 LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which); 372 lprocfs_counter_incr(sbi->ll_ra_stats, which); 373} 374 375void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which) 376{ 377 struct ll_sb_info *sbi = ll_i2sbi(mapping->host); 378 ll_ra_stats_inc_sbi(sbi, which); 379} 380 381#define RAS_CDEBUG(ras) \ 382 CDEBUG(D_READA, \ 383 "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu" \ 384 "csr %lu sf %lu sp %lu sl %lu \n", \ 385 ras->ras_last_readpage, ras->ras_consecutive_requests, \ 386 ras->ras_consecutive_pages, ras->ras_window_start, \ 387 ras->ras_window_len, ras->ras_next_readahead, \ 388 ras->ras_requests, ras->ras_request_index, \ 389 ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \ 390 ras->ras_stride_pages, ras->ras_stride_length) 391 392static int index_in_window(unsigned long index, unsigned long point, 393 unsigned long before, unsigned long after) 394{ 395 unsigned long start = point - before, end = point + after; 396 397 if (start > point) 398 start = 0; 399 if (end < point) 400 end = ~0; 401 402 return start <= index && index <= end; 403} 404 405static struct ll_readahead_state *ll_ras_get(struct file *f) 406{ 407 struct ll_file_data *fd; 408 409 fd = LUSTRE_FPRIVATE(f); 410 return &fd->fd_ras; 411} 412 413void ll_ra_read_in(struct file *f, struct ll_ra_read *rar) 414{ 415 struct ll_readahead_state *ras; 416 417 ras = ll_ras_get(f); 418 419 spin_lock(&ras->ras_lock); 420 ras->ras_requests++; 421 ras->ras_request_index = 0; 422 ras->ras_consecutive_requests++; 423 rar->lrr_reader = current; 424 425 list_add(&rar->lrr_linkage, &ras->ras_read_beads); 426 spin_unlock(&ras->ras_lock); 427} 428 429void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar) 430{ 431 struct ll_readahead_state *ras; 432 433 ras = ll_ras_get(f); 434 435 spin_lock(&ras->ras_lock); 436 list_del_init(&rar->lrr_linkage); 437 spin_unlock(&ras->ras_lock); 438} 439 440static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras) 441{ 442 struct ll_ra_read *scan; 443 444 list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) { 445 if (scan->lrr_reader == current) 446 return scan; 447 } 448 return NULL; 449} 450 451struct ll_ra_read *ll_ra_read_get(struct file *f) 452{ 453 struct ll_readahead_state *ras; 454 struct ll_ra_read *bead; 455 456 ras = ll_ras_get(f); 457 458 spin_lock(&ras->ras_lock); 459 bead = ll_ra_read_get_locked(ras); 460 spin_unlock(&ras->ras_lock); 461 return bead; 462} 463 464static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io, 465 struct cl_page_list *queue, struct cl_page *page, 466 struct page *vmpage) 467{ 468 struct ccc_page *cp; 469 int rc; 470 471 ENTRY; 472 473 rc = 0; 474 cl_page_assume(env, io, page); 475 lu_ref_add(&page->cp_reference, "ra", current); 476 cp = cl2ccc_page(cl_page_at(page, &vvp_device_type)); 477 if (!cp->cpg_defer_uptodate && !PageUptodate(vmpage)) { 478 rc = cl_page_is_under_lock(env, io, page); 479 if (rc == -EBUSY) { 480 cp->cpg_defer_uptodate = 1; 481 cp->cpg_ra_used = 0; 482 cl_page_list_add(queue, page); 483 rc = 1; 484 } else { 485 cl_page_delete(env, page); 486 rc = -ENOLCK; 487 } 488 } else { 489 /* skip completed pages */ 490 cl_page_unassume(env, io, page); 491 } 492 lu_ref_del(&page->cp_reference, "ra", current); 493 cl_page_put(env, page); 494 RETURN(rc); 495} 496 497/** 498 * Initiates read-ahead of a page with given index. 499 * 500 * \retval +ve: page was added to \a queue. 501 * 502 * \retval -ENOLCK: there is no extent lock for this part of a file, stop 503 * read-ahead. 504 * 505 * \retval -ve, 0: page wasn't added to \a queue for other reason. 506 */ 507static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io, 508 struct cl_page_list *queue, 509 pgoff_t index, struct address_space *mapping) 510{ 511 struct page *vmpage; 512 struct cl_object *clob = ll_i2info(mapping->host)->lli_clob; 513 struct cl_page *page; 514 enum ra_stat which = _NR_RA_STAT; /* keep gcc happy */ 515 unsigned int gfp_mask; 516 int rc = 0; 517 const char *msg = NULL; 518 519 ENTRY; 520 521 gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT; 522#ifdef __GFP_NOWARN 523 gfp_mask |= __GFP_NOWARN; 524#endif 525 vmpage = grab_cache_page_nowait(mapping, index); 526 if (vmpage != NULL) { 527 /* Check if vmpage was truncated or reclaimed */ 528 if (vmpage->mapping == mapping) { 529 page = cl_page_find(env, clob, vmpage->index, 530 vmpage, CPT_CACHEABLE); 531 if (!IS_ERR(page)) { 532 rc = cl_read_ahead_page(env, io, queue, 533 page, vmpage); 534 if (rc == -ENOLCK) { 535 which = RA_STAT_FAILED_MATCH; 536 msg = "lock match failed"; 537 } 538 } else { 539 which = RA_STAT_FAILED_GRAB_PAGE; 540 msg = "cl_page_find failed"; 541 } 542 } else { 543 which = RA_STAT_WRONG_GRAB_PAGE; 544 msg = "g_c_p_n returned invalid page"; 545 } 546 if (rc != 1) 547 unlock_page(vmpage); 548 page_cache_release(vmpage); 549 } else { 550 which = RA_STAT_FAILED_GRAB_PAGE; 551 msg = "g_c_p_n failed"; 552 } 553 if (msg != NULL) { 554 ll_ra_stats_inc(mapping, which); 555 CDEBUG(D_READA, "%s\n", msg); 556 } 557 RETURN(rc); 558} 559 560#define RIA_DEBUG(ria) \ 561 CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rp %lu\n", \ 562 ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\ 563 ria->ria_pages) 564 565/* Limit this to the blocksize instead of PTLRPC_BRW_MAX_SIZE, since we don't 566 * know what the actual RPC size is. If this needs to change, it makes more 567 * sense to tune the i_blkbits value for the file based on the OSTs it is 568 * striped over, rather than having a constant value for all files here. */ 569 570/* RAS_INCREASE_STEP should be (1UL << (inode->i_blkbits - PAGE_CACHE_SHIFT)). 571 * Temprarily set RAS_INCREASE_STEP to 1MB. After 4MB RPC is enabled 572 * by default, this should be adjusted corresponding with max_read_ahead_mb 573 * and max_read_ahead_per_file_mb otherwise the readahead budget can be used 574 * up quickly which will affect read performance siginificantly. See LU-2816 */ 575#define RAS_INCREASE_STEP(inode) (ONE_MB_BRW_SIZE >> PAGE_CACHE_SHIFT) 576 577static inline int stride_io_mode(struct ll_readahead_state *ras) 578{ 579 return ras->ras_consecutive_stride_requests > 1; 580} 581/* The function calculates how much pages will be read in 582 * [off, off + length], in such stride IO area, 583 * stride_offset = st_off, stride_lengh = st_len, 584 * stride_pages = st_pgs 585 * 586 * |------------------|*****|------------------|*****|------------|*****|.... 587 * st_off 588 * |--- st_pgs ---| 589 * |----- st_len -----| 590 * 591 * How many pages it should read in such pattern 592 * |-------------------------------------------------------------| 593 * off 594 * |<------ length ------->| 595 * 596 * = |<----->| + |-------------------------------------| + |---| 597 * start_left st_pgs * i end_left 598 */ 599static unsigned long 600stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs, 601 unsigned long off, unsigned long length) 602{ 603 __u64 start = off > st_off ? off - st_off : 0; 604 __u64 end = off + length > st_off ? off + length - st_off : 0; 605 unsigned long start_left = 0; 606 unsigned long end_left = 0; 607 unsigned long pg_count; 608 609 if (st_len == 0 || length == 0 || end == 0) 610 return length; 611 612 start_left = do_div(start, st_len); 613 if (start_left < st_pgs) 614 start_left = st_pgs - start_left; 615 else 616 start_left = 0; 617 618 end_left = do_div(end, st_len); 619 if (end_left > st_pgs) 620 end_left = st_pgs; 621 622 CDEBUG(D_READA, "start "LPU64", end "LPU64" start_left %lu end_left %lu \n", 623 start, end, start_left, end_left); 624 625 if (start == end) 626 pg_count = end_left - (st_pgs - start_left); 627 else 628 pg_count = start_left + st_pgs * (end - start - 1) + end_left; 629 630 CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %lu" 631 "pgcount %lu\n", st_off, st_len, st_pgs, off, length, pg_count); 632 633 return pg_count; 634} 635 636static int ria_page_count(struct ra_io_arg *ria) 637{ 638 __u64 length = ria->ria_end >= ria->ria_start ? 639 ria->ria_end - ria->ria_start + 1 : 0; 640 641 return stride_pg_count(ria->ria_stoff, ria->ria_length, 642 ria->ria_pages, ria->ria_start, 643 length); 644} 645 646/*Check whether the index is in the defined ra-window */ 647static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria) 648{ 649 /* If ria_length == ria_pages, it means non-stride I/O mode, 650 * idx should always inside read-ahead window in this case 651 * For stride I/O mode, just check whether the idx is inside 652 * the ria_pages. */ 653 return ria->ria_length == 0 || ria->ria_length == ria->ria_pages || 654 (idx >= ria->ria_stoff && (idx - ria->ria_stoff) % 655 ria->ria_length < ria->ria_pages); 656} 657 658static int ll_read_ahead_pages(const struct lu_env *env, 659 struct cl_io *io, struct cl_page_list *queue, 660 struct ra_io_arg *ria, 661 unsigned long *reserved_pages, 662 struct address_space *mapping, 663 unsigned long *ra_end) 664{ 665 int rc, count = 0, stride_ria; 666 unsigned long page_idx; 667 668 LASSERT(ria != NULL); 669 RIA_DEBUG(ria); 670 671 stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0; 672 for (page_idx = ria->ria_start; page_idx <= ria->ria_end && 673 *reserved_pages > 0; page_idx++) { 674 if (ras_inside_ra_window(page_idx, ria)) { 675 /* If the page is inside the read-ahead window*/ 676 rc = ll_read_ahead_page(env, io, queue, 677 page_idx, mapping); 678 if (rc == 1) { 679 (*reserved_pages)--; 680 count ++; 681 } else if (rc == -ENOLCK) 682 break; 683 } else if (stride_ria) { 684 /* If it is not in the read-ahead window, and it is 685 * read-ahead mode, then check whether it should skip 686 * the stride gap */ 687 pgoff_t offset; 688 /* FIXME: This assertion only is valid when it is for 689 * forward read-ahead, it will be fixed when backward 690 * read-ahead is implemented */ 691 LASSERTF(page_idx > ria->ria_stoff, "Invalid page_idx %lu" 692 "rs %lu re %lu ro %lu rl %lu rp %lu\n", page_idx, 693 ria->ria_start, ria->ria_end, ria->ria_stoff, 694 ria->ria_length, ria->ria_pages); 695 offset = page_idx - ria->ria_stoff; 696 offset = offset % (ria->ria_length); 697 if (offset > ria->ria_pages) { 698 page_idx += ria->ria_length - offset; 699 CDEBUG(D_READA, "i %lu skip %lu \n", page_idx, 700 ria->ria_length - offset); 701 continue; 702 } 703 } 704 } 705 *ra_end = page_idx; 706 return count; 707} 708 709int ll_readahead(const struct lu_env *env, struct cl_io *io, 710 struct ll_readahead_state *ras, struct address_space *mapping, 711 struct cl_page_list *queue, int flags) 712{ 713 struct vvp_io *vio = vvp_env_io(env); 714 struct vvp_thread_info *vti = vvp_env_info(env); 715 struct cl_attr *attr = ccc_env_thread_attr(env); 716 unsigned long start = 0, end = 0, reserved; 717 unsigned long ra_end, len; 718 struct inode *inode; 719 struct ll_ra_read *bead; 720 struct ra_io_arg *ria = &vti->vti_ria; 721 struct ll_inode_info *lli; 722 struct cl_object *clob; 723 int ret = 0; 724 __u64 kms; 725 ENTRY; 726 727 inode = mapping->host; 728 lli = ll_i2info(inode); 729 clob = lli->lli_clob; 730 731 memset(ria, 0, sizeof *ria); 732 733 cl_object_attr_lock(clob); 734 ret = cl_object_attr_get(env, clob, attr); 735 cl_object_attr_unlock(clob); 736 737 if (ret != 0) 738 RETURN(ret); 739 kms = attr->cat_kms; 740 if (kms == 0) { 741 ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN); 742 RETURN(0); 743 } 744 745 spin_lock(&ras->ras_lock); 746 if (vio->cui_ra_window_set) 747 bead = &vio->cui_bead; 748 else 749 bead = NULL; 750 751 /* Enlarge the RA window to encompass the full read */ 752 if (bead != NULL && ras->ras_window_start + ras->ras_window_len < 753 bead->lrr_start + bead->lrr_count) { 754 ras->ras_window_len = bead->lrr_start + bead->lrr_count - 755 ras->ras_window_start; 756 } 757 /* Reserve a part of the read-ahead window that we'll be issuing */ 758 if (ras->ras_window_len) { 759 start = ras->ras_next_readahead; 760 end = ras->ras_window_start + ras->ras_window_len - 1; 761 } 762 if (end != 0) { 763 unsigned long rpc_boundary; 764 /* 765 * Align RA window to an optimal boundary. 766 * 767 * XXX This would be better to align to cl_max_pages_per_rpc 768 * instead of PTLRPC_MAX_BRW_PAGES, because the RPC size may 769 * be aligned to the RAID stripe size in the future and that 770 * is more important than the RPC size. 771 */ 772 /* Note: we only trim the RPC, instead of extending the RPC 773 * to the boundary, so to avoid reading too much pages during 774 * random reading. */ 775 rpc_boundary = ((end + 1) & (~(PTLRPC_MAX_BRW_PAGES - 1))); 776 if (rpc_boundary > 0) 777 rpc_boundary--; 778 779 if (rpc_boundary > start) 780 end = rpc_boundary; 781 782 /* Truncate RA window to end of file */ 783 end = min(end, (unsigned long)((kms - 1) >> PAGE_CACHE_SHIFT)); 784 785 ras->ras_next_readahead = max(end, end + 1); 786 RAS_CDEBUG(ras); 787 } 788 ria->ria_start = start; 789 ria->ria_end = end; 790 /* If stride I/O mode is detected, get stride window*/ 791 if (stride_io_mode(ras)) { 792 ria->ria_stoff = ras->ras_stride_offset; 793 ria->ria_length = ras->ras_stride_length; 794 ria->ria_pages = ras->ras_stride_pages; 795 } 796 spin_unlock(&ras->ras_lock); 797 798 if (end == 0) { 799 ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW); 800 RETURN(0); 801 } 802 len = ria_page_count(ria); 803 if (len == 0) 804 RETURN(0); 805 806 reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len); 807 if (reserved < len) 808 ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT); 809 810 CDEBUG(D_READA, "reserved page %lu ra_cur %d ra_max %lu\n", reserved, 811 atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages), 812 ll_i2sbi(inode)->ll_ra_info.ra_max_pages); 813 814 ret = ll_read_ahead_pages(env, io, queue, 815 ria, &reserved, mapping, &ra_end); 816 817 LASSERTF(reserved >= 0, "reserved %lu\n", reserved); 818 if (reserved != 0) 819 ll_ra_count_put(ll_i2sbi(inode), reserved); 820 821 if (ra_end == end + 1 && ra_end == (kms >> PAGE_CACHE_SHIFT)) 822 ll_ra_stats_inc(mapping, RA_STAT_EOF); 823 824 /* if we didn't get to the end of the region we reserved from 825 * the ras we need to go back and update the ras so that the 826 * next read-ahead tries from where we left off. we only do so 827 * if the region we failed to issue read-ahead on is still ahead 828 * of the app and behind the next index to start read-ahead from */ 829 CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n", 830 ra_end, end, ria->ria_end); 831 832 if (ra_end != end + 1) { 833 spin_lock(&ras->ras_lock); 834 if (ra_end < ras->ras_next_readahead && 835 index_in_window(ra_end, ras->ras_window_start, 0, 836 ras->ras_window_len)) { 837 ras->ras_next_readahead = ra_end; 838 RAS_CDEBUG(ras); 839 } 840 spin_unlock(&ras->ras_lock); 841 } 842 843 RETURN(ret); 844} 845 846static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras, 847 unsigned long index) 848{ 849 ras->ras_window_start = index & (~(RAS_INCREASE_STEP(inode) - 1)); 850} 851 852/* called with the ras_lock held or from places where it doesn't matter */ 853static void ras_reset(struct inode *inode, struct ll_readahead_state *ras, 854 unsigned long index) 855{ 856 ras->ras_last_readpage = index; 857 ras->ras_consecutive_requests = 0; 858 ras->ras_consecutive_pages = 0; 859 ras->ras_window_len = 0; 860 ras_set_start(inode, ras, index); 861 ras->ras_next_readahead = max(ras->ras_window_start, index); 862 863 RAS_CDEBUG(ras); 864} 865 866/* called with the ras_lock held or from places where it doesn't matter */ 867static void ras_stride_reset(struct ll_readahead_state *ras) 868{ 869 ras->ras_consecutive_stride_requests = 0; 870 ras->ras_stride_length = 0; 871 ras->ras_stride_pages = 0; 872 RAS_CDEBUG(ras); 873} 874 875void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras) 876{ 877 spin_lock_init(&ras->ras_lock); 878 ras_reset(inode, ras, 0); 879 ras->ras_requests = 0; 880 INIT_LIST_HEAD(&ras->ras_read_beads); 881} 882 883/* 884 * Check whether the read request is in the stride window. 885 * If it is in the stride window, return 1, otherwise return 0. 886 */ 887static int index_in_stride_window(struct ll_readahead_state *ras, 888 unsigned long index) 889{ 890 unsigned long stride_gap; 891 892 if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 || 893 ras->ras_stride_pages == ras->ras_stride_length) 894 return 0; 895 896 stride_gap = index - ras->ras_last_readpage - 1; 897 898 /* If it is contiguous read */ 899 if (stride_gap == 0) 900 return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages; 901 902 /* Otherwise check the stride by itself */ 903 return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap && 904 ras->ras_consecutive_pages == ras->ras_stride_pages; 905} 906 907static void ras_update_stride_detector(struct ll_readahead_state *ras, 908 unsigned long index) 909{ 910 unsigned long stride_gap = index - ras->ras_last_readpage - 1; 911 912 if (!stride_io_mode(ras) && (stride_gap != 0 || 913 ras->ras_consecutive_stride_requests == 0)) { 914 ras->ras_stride_pages = ras->ras_consecutive_pages; 915 ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages; 916 } 917 LASSERT(ras->ras_request_index == 0); 918 LASSERT(ras->ras_consecutive_stride_requests == 0); 919 920 if (index <= ras->ras_last_readpage) { 921 /*Reset stride window for forward read*/ 922 ras_stride_reset(ras); 923 return; 924 } 925 926 ras->ras_stride_pages = ras->ras_consecutive_pages; 927 ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages; 928 929 RAS_CDEBUG(ras); 930 return; 931} 932 933static unsigned long 934stride_page_count(struct ll_readahead_state *ras, unsigned long len) 935{ 936 return stride_pg_count(ras->ras_stride_offset, ras->ras_stride_length, 937 ras->ras_stride_pages, ras->ras_stride_offset, 938 len); 939} 940 941/* Stride Read-ahead window will be increased inc_len according to 942 * stride I/O pattern */ 943static void ras_stride_increase_window(struct ll_readahead_state *ras, 944 struct ll_ra_info *ra, 945 unsigned long inc_len) 946{ 947 unsigned long left, step, window_len; 948 unsigned long stride_len; 949 950 LASSERT(ras->ras_stride_length > 0); 951 LASSERTF(ras->ras_window_start + ras->ras_window_len 952 >= ras->ras_stride_offset, "window_start %lu, window_len %lu" 953 " stride_offset %lu\n", ras->ras_window_start, 954 ras->ras_window_len, ras->ras_stride_offset); 955 956 stride_len = ras->ras_window_start + ras->ras_window_len - 957 ras->ras_stride_offset; 958 959 left = stride_len % ras->ras_stride_length; 960 window_len = ras->ras_window_len - left; 961 962 if (left < ras->ras_stride_pages) 963 left += inc_len; 964 else 965 left = ras->ras_stride_pages + inc_len; 966 967 LASSERT(ras->ras_stride_pages != 0); 968 969 step = left / ras->ras_stride_pages; 970 left %= ras->ras_stride_pages; 971 972 window_len += step * ras->ras_stride_length + left; 973 974 if (stride_page_count(ras, window_len) <= ra->ra_max_pages_per_file) 975 ras->ras_window_len = window_len; 976 977 RAS_CDEBUG(ras); 978} 979 980static void ras_increase_window(struct inode *inode, 981 struct ll_readahead_state *ras, 982 struct ll_ra_info *ra) 983{ 984 /* The stretch of ra-window should be aligned with max rpc_size 985 * but current clio architecture does not support retrieve such 986 * information from lower layer. FIXME later 987 */ 988 if (stride_io_mode(ras)) 989 ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP(inode)); 990 else 991 ras->ras_window_len = min(ras->ras_window_len + 992 RAS_INCREASE_STEP(inode), 993 ra->ra_max_pages_per_file); 994} 995 996void ras_update(struct ll_sb_info *sbi, struct inode *inode, 997 struct ll_readahead_state *ras, unsigned long index, 998 unsigned hit) 999{ 1000 struct ll_ra_info *ra = &sbi->ll_ra_info; 1001 int zero = 0, stride_detect = 0, ra_miss = 0; 1002 ENTRY; 1003 1004 spin_lock(&ras->ras_lock); 1005 1006 ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS); 1007 1008 /* reset the read-ahead window in two cases. First when the app seeks 1009 * or reads to some other part of the file. Secondly if we get a 1010 * read-ahead miss that we think we've previously issued. This can 1011 * be a symptom of there being so many read-ahead pages that the VM is 1012 * reclaiming it before we get to it. */ 1013 if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) { 1014 zero = 1; 1015 ll_ra_stats_inc_sbi(sbi, RA_STAT_DISTANT_READPAGE); 1016 } else if (!hit && ras->ras_window_len && 1017 index < ras->ras_next_readahead && 1018 index_in_window(index, ras->ras_window_start, 0, 1019 ras->ras_window_len)) { 1020 ra_miss = 1; 1021 ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW); 1022 } 1023 1024 /* On the second access to a file smaller than the tunable 1025 * ra_max_read_ahead_whole_pages trigger RA on all pages in the 1026 * file up to ra_max_pages_per_file. This is simply a best effort 1027 * and only occurs once per open file. Normal RA behavior is reverted 1028 * to for subsequent IO. The mmap case does not increment 1029 * ras_requests and thus can never trigger this behavior. */ 1030 if (ras->ras_requests == 2 && !ras->ras_request_index) { 1031 __u64 kms_pages; 1032 1033 kms_pages = (i_size_read(inode) + PAGE_CACHE_SIZE - 1) >> 1034 PAGE_CACHE_SHIFT; 1035 1036 CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages, 1037 ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages_per_file); 1038 1039 if (kms_pages && 1040 kms_pages <= ra->ra_max_read_ahead_whole_pages) { 1041 ras->ras_window_start = 0; 1042 ras->ras_last_readpage = 0; 1043 ras->ras_next_readahead = 0; 1044 ras->ras_window_len = min(ra->ra_max_pages_per_file, 1045 ra->ra_max_read_ahead_whole_pages); 1046 GOTO(out_unlock, 0); 1047 } 1048 } 1049 if (zero) { 1050 /* check whether it is in stride I/O mode*/ 1051 if (!index_in_stride_window(ras, index)) { 1052 if (ras->ras_consecutive_stride_requests == 0 && 1053 ras->ras_request_index == 0) { 1054 ras_update_stride_detector(ras, index); 1055 ras->ras_consecutive_stride_requests++; 1056 } else { 1057 ras_stride_reset(ras); 1058 } 1059 ras_reset(inode, ras, index); 1060 ras->ras_consecutive_pages++; 1061 GOTO(out_unlock, 0); 1062 } else { 1063 ras->ras_consecutive_pages = 0; 1064 ras->ras_consecutive_requests = 0; 1065 if (++ras->ras_consecutive_stride_requests > 1) 1066 stride_detect = 1; 1067 RAS_CDEBUG(ras); 1068 } 1069 } else { 1070 if (ra_miss) { 1071 if (index_in_stride_window(ras, index) && 1072 stride_io_mode(ras)) { 1073 /*If stride-RA hit cache miss, the stride dector 1074 *will not be reset to avoid the overhead of 1075 *redetecting read-ahead mode */ 1076 if (index != ras->ras_last_readpage + 1) 1077 ras->ras_consecutive_pages = 0; 1078 ras_reset(inode, ras, index); 1079 RAS_CDEBUG(ras); 1080 } else { 1081 /* Reset both stride window and normal RA 1082 * window */ 1083 ras_reset(inode, ras, index); 1084 ras->ras_consecutive_pages++; 1085 ras_stride_reset(ras); 1086 GOTO(out_unlock, 0); 1087 } 1088 } else if (stride_io_mode(ras)) { 1089 /* If this is contiguous read but in stride I/O mode 1090 * currently, check whether stride step still is valid, 1091 * if invalid, it will reset the stride ra window*/ 1092 if (!index_in_stride_window(ras, index)) { 1093 /* Shrink stride read-ahead window to be zero */ 1094 ras_stride_reset(ras); 1095 ras->ras_window_len = 0; 1096 ras->ras_next_readahead = index; 1097 } 1098 } 1099 } 1100 ras->ras_consecutive_pages++; 1101 ras->ras_last_readpage = index; 1102 ras_set_start(inode, ras, index); 1103 1104 if (stride_io_mode(ras)) 1105 /* Since stride readahead is sentivite to the offset 1106 * of read-ahead, so we use original offset here, 1107 * instead of ras_window_start, which is RPC aligned */ 1108 ras->ras_next_readahead = max(index, ras->ras_next_readahead); 1109 else 1110 ras->ras_next_readahead = max(ras->ras_window_start, 1111 ras->ras_next_readahead); 1112 RAS_CDEBUG(ras); 1113 1114 /* Trigger RA in the mmap case where ras_consecutive_requests 1115 * is not incremented and thus can't be used to trigger RA */ 1116 if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) { 1117 ras->ras_window_len = RAS_INCREASE_STEP(inode); 1118 GOTO(out_unlock, 0); 1119 } 1120 1121 /* Initially reset the stride window offset to next_readahead*/ 1122 if (ras->ras_consecutive_stride_requests == 2 && stride_detect) { 1123 /** 1124 * Once stride IO mode is detected, next_readahead should be 1125 * reset to make sure next_readahead > stride offset 1126 */ 1127 ras->ras_next_readahead = max(index, ras->ras_next_readahead); 1128 ras->ras_stride_offset = index; 1129 ras->ras_window_len = RAS_INCREASE_STEP(inode); 1130 } 1131 1132 /* The initial ras_window_len is set to the request size. To avoid 1133 * uselessly reading and discarding pages for random IO the window is 1134 * only increased once per consecutive request received. */ 1135 if ((ras->ras_consecutive_requests > 1 || stride_detect) && 1136 !ras->ras_request_index) 1137 ras_increase_window(inode, ras, ra); 1138 EXIT; 1139out_unlock: 1140 RAS_CDEBUG(ras); 1141 ras->ras_request_index++; 1142 spin_unlock(&ras->ras_lock); 1143 return; 1144} 1145 1146int ll_writepage(struct page *vmpage, struct writeback_control *wbc) 1147{ 1148 struct inode *inode = vmpage->mapping->host; 1149 struct ll_inode_info *lli = ll_i2info(inode); 1150 struct lu_env *env; 1151 struct cl_io *io; 1152 struct cl_page *page; 1153 struct cl_object *clob; 1154 struct cl_env_nest nest; 1155 bool redirtied = false; 1156 bool unlocked = false; 1157 int result; 1158 ENTRY; 1159 1160 LASSERT(PageLocked(vmpage)); 1161 LASSERT(!PageWriteback(vmpage)); 1162 1163 LASSERT(ll_i2dtexp(inode) != NULL); 1164 1165 env = cl_env_nested_get(&nest); 1166 if (IS_ERR(env)) 1167 GOTO(out, result = PTR_ERR(env)); 1168 1169 clob = ll_i2info(inode)->lli_clob; 1170 LASSERT(clob != NULL); 1171 1172 io = ccc_env_thread_io(env); 1173 io->ci_obj = clob; 1174 io->ci_ignore_layout = 1; 1175 result = cl_io_init(env, io, CIT_MISC, clob); 1176 if (result == 0) { 1177 page = cl_page_find(env, clob, vmpage->index, 1178 vmpage, CPT_CACHEABLE); 1179 if (!IS_ERR(page)) { 1180 lu_ref_add(&page->cp_reference, "writepage", 1181 current); 1182 cl_page_assume(env, io, page); 1183 result = cl_page_flush(env, io, page); 1184 if (result != 0) { 1185 /* 1186 * Re-dirty page on error so it retries write, 1187 * but not in case when IO has actually 1188 * occurred and completed with an error. 1189 */ 1190 if (!PageError(vmpage)) { 1191 redirty_page_for_writepage(wbc, vmpage); 1192 result = 0; 1193 redirtied = true; 1194 } 1195 } 1196 cl_page_disown(env, io, page); 1197 unlocked = true; 1198 lu_ref_del(&page->cp_reference, 1199 "writepage", current); 1200 cl_page_put(env, page); 1201 } else { 1202 result = PTR_ERR(page); 1203 } 1204 } 1205 cl_io_fini(env, io); 1206 1207 if (redirtied && wbc->sync_mode == WB_SYNC_ALL) { 1208 loff_t offset = cl_offset(clob, vmpage->index); 1209 1210 /* Flush page failed because the extent is being written out. 1211 * Wait for the write of extent to be finished to avoid 1212 * breaking kernel which assumes ->writepage should mark 1213 * PageWriteback or clean the page. */ 1214 result = cl_sync_file_range(inode, offset, 1215 offset + PAGE_CACHE_SIZE - 1, 1216 CL_FSYNC_LOCAL, 1); 1217 if (result > 0) { 1218 /* actually we may have written more than one page. 1219 * decreasing this page because the caller will count 1220 * it. */ 1221 wbc->nr_to_write -= result - 1; 1222 result = 0; 1223 } 1224 } 1225 1226 cl_env_nested_put(&nest, env); 1227 GOTO(out, result); 1228 1229out: 1230 if (result < 0) { 1231 if (!lli->lli_async_rc) 1232 lli->lli_async_rc = result; 1233 SetPageError(vmpage); 1234 if (!unlocked) 1235 unlock_page(vmpage); 1236 } 1237 return result; 1238} 1239 1240int ll_writepages(struct address_space *mapping, struct writeback_control *wbc) 1241{ 1242 struct inode *inode = mapping->host; 1243 struct ll_sb_info *sbi = ll_i2sbi(inode); 1244 loff_t start; 1245 loff_t end; 1246 enum cl_fsync_mode mode; 1247 int range_whole = 0; 1248 int result; 1249 int ignore_layout = 0; 1250 ENTRY; 1251 1252 if (wbc->range_cyclic) { 1253 start = mapping->writeback_index << PAGE_CACHE_SHIFT; 1254 end = OBD_OBJECT_EOF; 1255 } else { 1256 start = wbc->range_start; 1257 end = wbc->range_end; 1258 if (end == LLONG_MAX) { 1259 end = OBD_OBJECT_EOF; 1260 range_whole = start == 0; 1261 } 1262 } 1263 1264 mode = CL_FSYNC_NONE; 1265 if (wbc->sync_mode == WB_SYNC_ALL) 1266 mode = CL_FSYNC_LOCAL; 1267 1268 if (sbi->ll_umounting) 1269 /* if the mountpoint is being umounted, all pages have to be 1270 * evicted to avoid hitting LBUG when truncate_inode_pages() 1271 * is called later on. */ 1272 ignore_layout = 1; 1273 result = cl_sync_file_range(inode, start, end, mode, ignore_layout); 1274 if (result > 0) { 1275 wbc->nr_to_write -= result; 1276 result = 0; 1277 } 1278 1279 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) { 1280 if (end == OBD_OBJECT_EOF) 1281 end = i_size_read(inode); 1282 mapping->writeback_index = (end >> PAGE_CACHE_SHIFT) + 1; 1283 } 1284 RETURN(result); 1285} 1286 1287int ll_readpage(struct file *file, struct page *vmpage) 1288{ 1289 struct ll_cl_context *lcc; 1290 int result; 1291 ENTRY; 1292 1293 lcc = ll_cl_init(file, vmpage, 0); 1294 if (!IS_ERR(lcc)) { 1295 struct lu_env *env = lcc->lcc_env; 1296 struct cl_io *io = lcc->lcc_io; 1297 struct cl_page *page = lcc->lcc_page; 1298 1299 LASSERT(page->cp_type == CPT_CACHEABLE); 1300 if (likely(!PageUptodate(vmpage))) { 1301 cl_page_assume(env, io, page); 1302 result = cl_io_read_page(env, io, page); 1303 } else { 1304 /* Page from a non-object file. */ 1305 unlock_page(vmpage); 1306 result = 0; 1307 } 1308 ll_cl_fini(lcc); 1309 } else { 1310 unlock_page(vmpage); 1311 result = PTR_ERR(lcc); 1312 } 1313 RETURN(result); 1314} 1315