rw.c revision 2d95f10e50da3eadd3f0a54f8b4b03db37ce879c
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * lustre/llite/rw.c 37 * 38 * Lustre Lite I/O page cache routines shared by different kernel revs 39 */ 40 41#include <linux/kernel.h> 42#include <linux/mm.h> 43#include <linux/string.h> 44#include <linux/stat.h> 45#include <linux/errno.h> 46#include <linux/unistd.h> 47#include <linux/writeback.h> 48#include <asm/uaccess.h> 49 50#include <linux/fs.h> 51#include <linux/pagemap.h> 52/* current_is_kswapd() */ 53#include <linux/swap.h> 54 55#define DEBUG_SUBSYSTEM S_LLITE 56 57#include <lustre_lite.h> 58#include <obd_cksum.h> 59#include "llite_internal.h" 60#include <linux/lustre_compat25.h> 61 62/** 63 * Finalizes cl-data before exiting typical address_space operation. Dual to 64 * ll_cl_init(). 65 */ 66static void ll_cl_fini(struct ll_cl_context *lcc) 67{ 68 struct lu_env *env = lcc->lcc_env; 69 struct cl_io *io = lcc->lcc_io; 70 struct cl_page *page = lcc->lcc_page; 71 72 LASSERT(lcc->lcc_cookie == current); 73 LASSERT(env != NULL); 74 75 if (page != NULL) { 76 lu_ref_del(&page->cp_reference, "cl_io", io); 77 cl_page_put(env, page); 78 } 79 80 cl_env_put(env, &lcc->lcc_refcheck); 81} 82 83/** 84 * Initializes common cl-data at the typical address_space operation entry 85 * point. 86 */ 87static struct ll_cl_context *ll_cl_init(struct file *file, 88 struct page *vmpage, int create) 89{ 90 struct ll_cl_context *lcc; 91 struct lu_env *env; 92 struct cl_io *io; 93 struct cl_object *clob; 94 struct ccc_io *cio; 95 96 int refcheck; 97 int result = 0; 98 99 clob = ll_i2info(vmpage->mapping->host)->lli_clob; 100 LASSERT(clob != NULL); 101 102 env = cl_env_get(&refcheck); 103 if (IS_ERR(env)) 104 return ERR_CAST(env); 105 106 lcc = &vvp_env_info(env)->vti_io_ctx; 107 memset(lcc, 0, sizeof(*lcc)); 108 lcc->lcc_env = env; 109 lcc->lcc_refcheck = refcheck; 110 lcc->lcc_cookie = current; 111 112 cio = ccc_env_io(env); 113 io = cio->cui_cl.cis_io; 114 if (io == NULL && create) { 115 struct inode *inode = vmpage->mapping->host; 116 loff_t pos; 117 118 if (mutex_trylock(&inode->i_mutex)) { 119 mutex_unlock(&(inode)->i_mutex); 120 121 /* this is too bad. Someone is trying to write the 122 * page w/o holding inode mutex. This means we can 123 * add dirty pages into cache during truncate */ 124 CERROR("Proc %s is dirting page w/o inode lock, this" 125 "will break truncate.\n", current->comm); 126 dump_stack(); 127 LBUG(); 128 return ERR_PTR(-EIO); 129 } 130 131 /* 132 * Loop-back driver calls ->prepare_write(). 133 * methods directly, bypassing file system ->write() operation, 134 * so cl_io has to be created here. 135 */ 136 io = ccc_env_thread_io(env); 137 ll_io_init(io, file, 1); 138 139 /* No lock at all for this kind of IO - we can't do it because 140 * we have held page lock, it would cause deadlock. 141 * XXX: This causes poor performance to loop device - One page 142 * per RPC. 143 * In order to get better performance, users should use 144 * lloop driver instead. 145 */ 146 io->ci_lockreq = CILR_NEVER; 147 148 pos = (vmpage->index << PAGE_CACHE_SHIFT); 149 150 /* Create a temp IO to serve write. */ 151 result = cl_io_rw_init(env, io, CIT_WRITE, pos, PAGE_CACHE_SIZE); 152 if (result == 0) { 153 cio->cui_fd = LUSTRE_FPRIVATE(file); 154 cio->cui_iov = NULL; 155 cio->cui_nrsegs = 0; 156 result = cl_io_iter_init(env, io); 157 if (result == 0) { 158 result = cl_io_lock(env, io); 159 if (result == 0) 160 result = cl_io_start(env, io); 161 } 162 } else 163 result = io->ci_result; 164 } 165 166 lcc->lcc_io = io; 167 if (io == NULL) 168 result = -EIO; 169 if (result == 0) { 170 struct cl_page *page; 171 172 LASSERT(io != NULL); 173 LASSERT(io->ci_state == CIS_IO_GOING); 174 LASSERT(cio->cui_fd == LUSTRE_FPRIVATE(file)); 175 page = cl_page_find(env, clob, vmpage->index, vmpage, 176 CPT_CACHEABLE); 177 if (!IS_ERR(page)) { 178 lcc->lcc_page = page; 179 lu_ref_add(&page->cp_reference, "cl_io", io); 180 result = 0; 181 } else 182 result = PTR_ERR(page); 183 } 184 if (result) { 185 ll_cl_fini(lcc); 186 lcc = ERR_PTR(result); 187 } 188 189 CDEBUG(D_VFSTRACE, "%lu@"DFID" -> %d %p %p\n", 190 vmpage->index, PFID(lu_object_fid(&clob->co_lu)), result, 191 env, io); 192 return lcc; 193} 194 195static struct ll_cl_context *ll_cl_get(void) 196{ 197 struct ll_cl_context *lcc; 198 struct lu_env *env; 199 int refcheck; 200 201 env = cl_env_get(&refcheck); 202 LASSERT(!IS_ERR(env)); 203 lcc = &vvp_env_info(env)->vti_io_ctx; 204 LASSERT(env == lcc->lcc_env); 205 LASSERT(current == lcc->lcc_cookie); 206 cl_env_put(env, &refcheck); 207 208 /* env has got in ll_cl_init, so it is still usable. */ 209 return lcc; 210} 211 212/** 213 * ->prepare_write() address space operation called by generic_file_write() 214 * for every page during write. 215 */ 216int ll_prepare_write(struct file *file, struct page *vmpage, unsigned from, 217 unsigned to) 218{ 219 struct ll_cl_context *lcc; 220 int result; 221 222 lcc = ll_cl_init(file, vmpage, 1); 223 if (!IS_ERR(lcc)) { 224 struct lu_env *env = lcc->lcc_env; 225 struct cl_io *io = lcc->lcc_io; 226 struct cl_page *page = lcc->lcc_page; 227 228 cl_page_assume(env, io, page); 229 230 result = cl_io_prepare_write(env, io, page, from, to); 231 if (result == 0) { 232 /* 233 * Add a reference, so that page is not evicted from 234 * the cache until ->commit_write() is called. 235 */ 236 cl_page_get(page); 237 lu_ref_add(&page->cp_reference, "prepare_write", 238 current); 239 } else { 240 cl_page_unassume(env, io, page); 241 ll_cl_fini(lcc); 242 } 243 /* returning 0 in prepare assumes commit must be called 244 * afterwards */ 245 } else { 246 result = PTR_ERR(lcc); 247 } 248 return result; 249} 250 251int ll_commit_write(struct file *file, struct page *vmpage, unsigned from, 252 unsigned to) 253{ 254 struct ll_cl_context *lcc; 255 struct lu_env *env; 256 struct cl_io *io; 257 struct cl_page *page; 258 int result = 0; 259 260 lcc = ll_cl_get(); 261 env = lcc->lcc_env; 262 page = lcc->lcc_page; 263 io = lcc->lcc_io; 264 265 LASSERT(cl_page_is_owned(page, io)); 266 LASSERT(from <= to); 267 if (from != to) /* handle short write case. */ 268 result = cl_io_commit_write(env, io, page, from, to); 269 if (cl_page_is_owned(page, io)) 270 cl_page_unassume(env, io, page); 271 272 /* 273 * Release reference acquired by ll_prepare_write(). 274 */ 275 lu_ref_del(&page->cp_reference, "prepare_write", current); 276 cl_page_put(env, page); 277 ll_cl_fini(lcc); 278 return result; 279} 280 281struct obd_capa *cl_capa_lookup(struct inode *inode, enum cl_req_type crt) 282{ 283 __u64 opc; 284 285 opc = crt == CRT_WRITE ? CAPA_OPC_OSS_WRITE : CAPA_OPC_OSS_RW; 286 return ll_osscapa_get(inode, opc); 287} 288 289static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which); 290 291/** 292 * Get readahead pages from the filesystem readahead pool of the client for a 293 * thread. 294 * 295 * /param sbi superblock for filesystem readahead state ll_ra_info 296 * /param ria per-thread readahead state 297 * /param pages number of pages requested for readahead for the thread. 298 * 299 * WARNING: This algorithm is used to reduce contention on sbi->ll_lock. 300 * It should work well if the ra_max_pages is much greater than the single 301 * file's read-ahead window, and not too many threads contending for 302 * these readahead pages. 303 * 304 * TODO: There may be a 'global sync problem' if many threads are trying 305 * to get an ra budget that is larger than the remaining readahead pages 306 * and reach here at exactly the same time. They will compute /a ret to 307 * consume the remaining pages, but will fail at atomic_add_return() and 308 * get a zero ra window, although there is still ra space remaining. - Jay */ 309 310static unsigned long ll_ra_count_get(struct ll_sb_info *sbi, 311 struct ra_io_arg *ria, 312 unsigned long pages) 313{ 314 struct ll_ra_info *ra = &sbi->ll_ra_info; 315 long ret; 316 317 /* If read-ahead pages left are less than 1M, do not do read-ahead, 318 * otherwise it will form small read RPC(< 1M), which hurt server 319 * performance a lot. */ 320 ret = min(ra->ra_max_pages - atomic_read(&ra->ra_cur_pages), pages); 321 if (ret < 0 || ret < min_t(long, PTLRPC_MAX_BRW_PAGES, pages)) 322 GOTO(out, ret = 0); 323 324 /* If the non-strided (ria_pages == 0) readahead window 325 * (ria_start + ret) has grown across an RPC boundary, then trim 326 * readahead size by the amount beyond the RPC so it ends on an 327 * RPC boundary. If the readahead window is already ending on 328 * an RPC boundary (beyond_rpc == 0), or smaller than a full 329 * RPC (beyond_rpc < ret) the readahead size is unchanged. 330 * The (beyond_rpc != 0) check is skipped since the conditional 331 * branch is more expensive than subtracting zero from the result. 332 * 333 * Strided read is left unaligned to avoid small fragments beyond 334 * the RPC boundary from needing an extra read RPC. */ 335 if (ria->ria_pages == 0) { 336 long beyond_rpc = (ria->ria_start + ret) % PTLRPC_MAX_BRW_PAGES; 337 if (/* beyond_rpc != 0 && */ beyond_rpc < ret) 338 ret -= beyond_rpc; 339 } 340 341 if (atomic_add_return(ret, &ra->ra_cur_pages) > ra->ra_max_pages) { 342 atomic_sub(ret, &ra->ra_cur_pages); 343 ret = 0; 344 } 345 346out: 347 return ret; 348} 349 350void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len) 351{ 352 struct ll_ra_info *ra = &sbi->ll_ra_info; 353 atomic_sub(len, &ra->ra_cur_pages); 354} 355 356static void ll_ra_stats_inc_sbi(struct ll_sb_info *sbi, enum ra_stat which) 357{ 358 LASSERTF(which >= 0 && which < _NR_RA_STAT, "which: %u\n", which); 359 lprocfs_counter_incr(sbi->ll_ra_stats, which); 360} 361 362void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which) 363{ 364 struct ll_sb_info *sbi = ll_i2sbi(mapping->host); 365 ll_ra_stats_inc_sbi(sbi, which); 366} 367 368#define RAS_CDEBUG(ras) \ 369 CDEBUG(D_READA, \ 370 "lrp %lu cr %lu cp %lu ws %lu wl %lu nra %lu r %lu ri %lu" \ 371 "csr %lu sf %lu sp %lu sl %lu \n", \ 372 ras->ras_last_readpage, ras->ras_consecutive_requests, \ 373 ras->ras_consecutive_pages, ras->ras_window_start, \ 374 ras->ras_window_len, ras->ras_next_readahead, \ 375 ras->ras_requests, ras->ras_request_index, \ 376 ras->ras_consecutive_stride_requests, ras->ras_stride_offset, \ 377 ras->ras_stride_pages, ras->ras_stride_length) 378 379static int index_in_window(unsigned long index, unsigned long point, 380 unsigned long before, unsigned long after) 381{ 382 unsigned long start = point - before, end = point + after; 383 384 if (start > point) 385 start = 0; 386 if (end < point) 387 end = ~0; 388 389 return start <= index && index <= end; 390} 391 392static struct ll_readahead_state *ll_ras_get(struct file *f) 393{ 394 struct ll_file_data *fd; 395 396 fd = LUSTRE_FPRIVATE(f); 397 return &fd->fd_ras; 398} 399 400void ll_ra_read_in(struct file *f, struct ll_ra_read *rar) 401{ 402 struct ll_readahead_state *ras; 403 404 ras = ll_ras_get(f); 405 406 spin_lock(&ras->ras_lock); 407 ras->ras_requests++; 408 ras->ras_request_index = 0; 409 ras->ras_consecutive_requests++; 410 rar->lrr_reader = current; 411 412 list_add(&rar->lrr_linkage, &ras->ras_read_beads); 413 spin_unlock(&ras->ras_lock); 414} 415 416void ll_ra_read_ex(struct file *f, struct ll_ra_read *rar) 417{ 418 struct ll_readahead_state *ras; 419 420 ras = ll_ras_get(f); 421 422 spin_lock(&ras->ras_lock); 423 list_del_init(&rar->lrr_linkage); 424 spin_unlock(&ras->ras_lock); 425} 426 427static struct ll_ra_read *ll_ra_read_get_locked(struct ll_readahead_state *ras) 428{ 429 struct ll_ra_read *scan; 430 431 list_for_each_entry(scan, &ras->ras_read_beads, lrr_linkage) { 432 if (scan->lrr_reader == current) 433 return scan; 434 } 435 return NULL; 436} 437 438struct ll_ra_read *ll_ra_read_get(struct file *f) 439{ 440 struct ll_readahead_state *ras; 441 struct ll_ra_read *bead; 442 443 ras = ll_ras_get(f); 444 445 spin_lock(&ras->ras_lock); 446 bead = ll_ra_read_get_locked(ras); 447 spin_unlock(&ras->ras_lock); 448 return bead; 449} 450 451static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io, 452 struct cl_page_list *queue, struct cl_page *page, 453 struct page *vmpage) 454{ 455 struct ccc_page *cp; 456 int rc; 457 458 rc = 0; 459 cl_page_assume(env, io, page); 460 lu_ref_add(&page->cp_reference, "ra", current); 461 cp = cl2ccc_page(cl_page_at(page, &vvp_device_type)); 462 if (!cp->cpg_defer_uptodate && !PageUptodate(vmpage)) { 463 rc = cl_page_is_under_lock(env, io, page); 464 if (rc == -EBUSY) { 465 cp->cpg_defer_uptodate = 1; 466 cp->cpg_ra_used = 0; 467 cl_page_list_add(queue, page); 468 rc = 1; 469 } else { 470 cl_page_delete(env, page); 471 rc = -ENOLCK; 472 } 473 } else { 474 /* skip completed pages */ 475 cl_page_unassume(env, io, page); 476 } 477 lu_ref_del(&page->cp_reference, "ra", current); 478 cl_page_put(env, page); 479 return rc; 480} 481 482/** 483 * Initiates read-ahead of a page with given index. 484 * 485 * \retval +ve: page was added to \a queue. 486 * 487 * \retval -ENOLCK: there is no extent lock for this part of a file, stop 488 * read-ahead. 489 * 490 * \retval -ve, 0: page wasn't added to \a queue for other reason. 491 */ 492static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io, 493 struct cl_page_list *queue, 494 pgoff_t index, struct address_space *mapping) 495{ 496 struct page *vmpage; 497 struct cl_object *clob = ll_i2info(mapping->host)->lli_clob; 498 struct cl_page *page; 499 enum ra_stat which = _NR_RA_STAT; /* keep gcc happy */ 500 unsigned int gfp_mask; 501 int rc = 0; 502 const char *msg = NULL; 503 504 gfp_mask = GFP_HIGHUSER & ~__GFP_WAIT; 505#ifdef __GFP_NOWARN 506 gfp_mask |= __GFP_NOWARN; 507#endif 508 vmpage = grab_cache_page_nowait(mapping, index); 509 if (vmpage != NULL) { 510 /* Check if vmpage was truncated or reclaimed */ 511 if (vmpage->mapping == mapping) { 512 page = cl_page_find(env, clob, vmpage->index, 513 vmpage, CPT_CACHEABLE); 514 if (!IS_ERR(page)) { 515 rc = cl_read_ahead_page(env, io, queue, 516 page, vmpage); 517 if (rc == -ENOLCK) { 518 which = RA_STAT_FAILED_MATCH; 519 msg = "lock match failed"; 520 } 521 } else { 522 which = RA_STAT_FAILED_GRAB_PAGE; 523 msg = "cl_page_find failed"; 524 } 525 } else { 526 which = RA_STAT_WRONG_GRAB_PAGE; 527 msg = "g_c_p_n returned invalid page"; 528 } 529 if (rc != 1) 530 unlock_page(vmpage); 531 page_cache_release(vmpage); 532 } else { 533 which = RA_STAT_FAILED_GRAB_PAGE; 534 msg = "g_c_p_n failed"; 535 } 536 if (msg != NULL) { 537 ll_ra_stats_inc(mapping, which); 538 CDEBUG(D_READA, "%s\n", msg); 539 } 540 return rc; 541} 542 543#define RIA_DEBUG(ria) \ 544 CDEBUG(D_READA, "rs %lu re %lu ro %lu rl %lu rp %lu\n", \ 545 ria->ria_start, ria->ria_end, ria->ria_stoff, ria->ria_length,\ 546 ria->ria_pages) 547 548/* Limit this to the blocksize instead of PTLRPC_BRW_MAX_SIZE, since we don't 549 * know what the actual RPC size is. If this needs to change, it makes more 550 * sense to tune the i_blkbits value for the file based on the OSTs it is 551 * striped over, rather than having a constant value for all files here. */ 552 553/* RAS_INCREASE_STEP should be (1UL << (inode->i_blkbits - PAGE_CACHE_SHIFT)). 554 * Temporarily set RAS_INCREASE_STEP to 1MB. After 4MB RPC is enabled 555 * by default, this should be adjusted corresponding with max_read_ahead_mb 556 * and max_read_ahead_per_file_mb otherwise the readahead budget can be used 557 * up quickly which will affect read performance significantly. See LU-2816 */ 558#define RAS_INCREASE_STEP(inode) (ONE_MB_BRW_SIZE >> PAGE_CACHE_SHIFT) 559 560static inline int stride_io_mode(struct ll_readahead_state *ras) 561{ 562 return ras->ras_consecutive_stride_requests > 1; 563} 564/* The function calculates how much pages will be read in 565 * [off, off + length], in such stride IO area, 566 * stride_offset = st_off, stride_length = st_len, 567 * stride_pages = st_pgs 568 * 569 * |------------------|*****|------------------|*****|------------|*****|.... 570 * st_off 571 * |--- st_pgs ---| 572 * |----- st_len -----| 573 * 574 * How many pages it should read in such pattern 575 * |-------------------------------------------------------------| 576 * off 577 * |<------ length ------->| 578 * 579 * = |<----->| + |-------------------------------------| + |---| 580 * start_left st_pgs * i end_left 581 */ 582static unsigned long 583stride_pg_count(pgoff_t st_off, unsigned long st_len, unsigned long st_pgs, 584 unsigned long off, unsigned long length) 585{ 586 __u64 start = off > st_off ? off - st_off : 0; 587 __u64 end = off + length > st_off ? off + length - st_off : 0; 588 unsigned long start_left = 0; 589 unsigned long end_left = 0; 590 unsigned long pg_count; 591 592 if (st_len == 0 || length == 0 || end == 0) 593 return length; 594 595 start_left = do_div(start, st_len); 596 if (start_left < st_pgs) 597 start_left = st_pgs - start_left; 598 else 599 start_left = 0; 600 601 end_left = do_div(end, st_len); 602 if (end_left > st_pgs) 603 end_left = st_pgs; 604 605 CDEBUG(D_READA, "start "LPU64", end "LPU64" start_left %lu end_left %lu \n", 606 start, end, start_left, end_left); 607 608 if (start == end) 609 pg_count = end_left - (st_pgs - start_left); 610 else 611 pg_count = start_left + st_pgs * (end - start - 1) + end_left; 612 613 CDEBUG(D_READA, "st_off %lu, st_len %lu st_pgs %lu off %lu length %lu" 614 "pgcount %lu\n", st_off, st_len, st_pgs, off, length, pg_count); 615 616 return pg_count; 617} 618 619static int ria_page_count(struct ra_io_arg *ria) 620{ 621 __u64 length = ria->ria_end >= ria->ria_start ? 622 ria->ria_end - ria->ria_start + 1 : 0; 623 624 return stride_pg_count(ria->ria_stoff, ria->ria_length, 625 ria->ria_pages, ria->ria_start, 626 length); 627} 628 629/*Check whether the index is in the defined ra-window */ 630static int ras_inside_ra_window(unsigned long idx, struct ra_io_arg *ria) 631{ 632 /* If ria_length == ria_pages, it means non-stride I/O mode, 633 * idx should always inside read-ahead window in this case 634 * For stride I/O mode, just check whether the idx is inside 635 * the ria_pages. */ 636 return ria->ria_length == 0 || ria->ria_length == ria->ria_pages || 637 (idx >= ria->ria_stoff && (idx - ria->ria_stoff) % 638 ria->ria_length < ria->ria_pages); 639} 640 641static int ll_read_ahead_pages(const struct lu_env *env, 642 struct cl_io *io, struct cl_page_list *queue, 643 struct ra_io_arg *ria, 644 unsigned long *reserved_pages, 645 struct address_space *mapping, 646 unsigned long *ra_end) 647{ 648 int rc, count = 0, stride_ria; 649 unsigned long page_idx; 650 651 LASSERT(ria != NULL); 652 RIA_DEBUG(ria); 653 654 stride_ria = ria->ria_length > ria->ria_pages && ria->ria_pages > 0; 655 for (page_idx = ria->ria_start; page_idx <= ria->ria_end && 656 *reserved_pages > 0; page_idx++) { 657 if (ras_inside_ra_window(page_idx, ria)) { 658 /* If the page is inside the read-ahead window*/ 659 rc = ll_read_ahead_page(env, io, queue, 660 page_idx, mapping); 661 if (rc == 1) { 662 (*reserved_pages)--; 663 count ++; 664 } else if (rc == -ENOLCK) 665 break; 666 } else if (stride_ria) { 667 /* If it is not in the read-ahead window, and it is 668 * read-ahead mode, then check whether it should skip 669 * the stride gap */ 670 pgoff_t offset; 671 /* FIXME: This assertion only is valid when it is for 672 * forward read-ahead, it will be fixed when backward 673 * read-ahead is implemented */ 674 LASSERTF(page_idx > ria->ria_stoff, "Invalid page_idx %lu" 675 "rs %lu re %lu ro %lu rl %lu rp %lu\n", page_idx, 676 ria->ria_start, ria->ria_end, ria->ria_stoff, 677 ria->ria_length, ria->ria_pages); 678 offset = page_idx - ria->ria_stoff; 679 offset = offset % (ria->ria_length); 680 if (offset > ria->ria_pages) { 681 page_idx += ria->ria_length - offset; 682 CDEBUG(D_READA, "i %lu skip %lu \n", page_idx, 683 ria->ria_length - offset); 684 continue; 685 } 686 } 687 } 688 *ra_end = page_idx; 689 return count; 690} 691 692int ll_readahead(const struct lu_env *env, struct cl_io *io, 693 struct ll_readahead_state *ras, struct address_space *mapping, 694 struct cl_page_list *queue, int flags) 695{ 696 struct vvp_io *vio = vvp_env_io(env); 697 struct vvp_thread_info *vti = vvp_env_info(env); 698 struct cl_attr *attr = ccc_env_thread_attr(env); 699 unsigned long start = 0, end = 0, reserved; 700 unsigned long ra_end, len; 701 struct inode *inode; 702 struct ll_ra_read *bead; 703 struct ra_io_arg *ria = &vti->vti_ria; 704 struct ll_inode_info *lli; 705 struct cl_object *clob; 706 int ret = 0; 707 __u64 kms; 708 709 inode = mapping->host; 710 lli = ll_i2info(inode); 711 clob = lli->lli_clob; 712 713 memset(ria, 0, sizeof(*ria)); 714 715 cl_object_attr_lock(clob); 716 ret = cl_object_attr_get(env, clob, attr); 717 cl_object_attr_unlock(clob); 718 719 if (ret != 0) 720 return ret; 721 kms = attr->cat_kms; 722 if (kms == 0) { 723 ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN); 724 return 0; 725 } 726 727 spin_lock(&ras->ras_lock); 728 if (vio->cui_ra_window_set) 729 bead = &vio->cui_bead; 730 else 731 bead = NULL; 732 733 /* Enlarge the RA window to encompass the full read */ 734 if (bead != NULL && ras->ras_window_start + ras->ras_window_len < 735 bead->lrr_start + bead->lrr_count) { 736 ras->ras_window_len = bead->lrr_start + bead->lrr_count - 737 ras->ras_window_start; 738 } 739 /* Reserve a part of the read-ahead window that we'll be issuing */ 740 if (ras->ras_window_len) { 741 start = ras->ras_next_readahead; 742 end = ras->ras_window_start + ras->ras_window_len - 1; 743 } 744 if (end != 0) { 745 unsigned long rpc_boundary; 746 /* 747 * Align RA window to an optimal boundary. 748 * 749 * XXX This would be better to align to cl_max_pages_per_rpc 750 * instead of PTLRPC_MAX_BRW_PAGES, because the RPC size may 751 * be aligned to the RAID stripe size in the future and that 752 * is more important than the RPC size. 753 */ 754 /* Note: we only trim the RPC, instead of extending the RPC 755 * to the boundary, so to avoid reading too much pages during 756 * random reading. */ 757 rpc_boundary = ((end + 1) & (~(PTLRPC_MAX_BRW_PAGES - 1))); 758 if (rpc_boundary > 0) 759 rpc_boundary--; 760 761 if (rpc_boundary > start) 762 end = rpc_boundary; 763 764 /* Truncate RA window to end of file */ 765 end = min(end, (unsigned long)((kms - 1) >> PAGE_CACHE_SHIFT)); 766 767 ras->ras_next_readahead = max(end, end + 1); 768 RAS_CDEBUG(ras); 769 } 770 ria->ria_start = start; 771 ria->ria_end = end; 772 /* If stride I/O mode is detected, get stride window*/ 773 if (stride_io_mode(ras)) { 774 ria->ria_stoff = ras->ras_stride_offset; 775 ria->ria_length = ras->ras_stride_length; 776 ria->ria_pages = ras->ras_stride_pages; 777 } 778 spin_unlock(&ras->ras_lock); 779 780 if (end == 0) { 781 ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW); 782 return 0; 783 } 784 len = ria_page_count(ria); 785 if (len == 0) 786 return 0; 787 788 reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len); 789 if (reserved < len) 790 ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT); 791 792 CDEBUG(D_READA, "reserved page %lu ra_cur %d ra_max %lu\n", reserved, 793 atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages), 794 ll_i2sbi(inode)->ll_ra_info.ra_max_pages); 795 796 ret = ll_read_ahead_pages(env, io, queue, 797 ria, &reserved, mapping, &ra_end); 798 799 LASSERTF(reserved >= 0, "reserved %lu\n", reserved); 800 if (reserved != 0) 801 ll_ra_count_put(ll_i2sbi(inode), reserved); 802 803 if (ra_end == end + 1 && ra_end == (kms >> PAGE_CACHE_SHIFT)) 804 ll_ra_stats_inc(mapping, RA_STAT_EOF); 805 806 /* if we didn't get to the end of the region we reserved from 807 * the ras we need to go back and update the ras so that the 808 * next read-ahead tries from where we left off. we only do so 809 * if the region we failed to issue read-ahead on is still ahead 810 * of the app and behind the next index to start read-ahead from */ 811 CDEBUG(D_READA, "ra_end %lu end %lu stride end %lu \n", 812 ra_end, end, ria->ria_end); 813 814 if (ra_end != end + 1) { 815 spin_lock(&ras->ras_lock); 816 if (ra_end < ras->ras_next_readahead && 817 index_in_window(ra_end, ras->ras_window_start, 0, 818 ras->ras_window_len)) { 819 ras->ras_next_readahead = ra_end; 820 RAS_CDEBUG(ras); 821 } 822 spin_unlock(&ras->ras_lock); 823 } 824 825 return ret; 826} 827 828static void ras_set_start(struct inode *inode, struct ll_readahead_state *ras, 829 unsigned long index) 830{ 831 ras->ras_window_start = index & (~(RAS_INCREASE_STEP(inode) - 1)); 832} 833 834/* called with the ras_lock held or from places where it doesn't matter */ 835static void ras_reset(struct inode *inode, struct ll_readahead_state *ras, 836 unsigned long index) 837{ 838 ras->ras_last_readpage = index; 839 ras->ras_consecutive_requests = 0; 840 ras->ras_consecutive_pages = 0; 841 ras->ras_window_len = 0; 842 ras_set_start(inode, ras, index); 843 ras->ras_next_readahead = max(ras->ras_window_start, index); 844 845 RAS_CDEBUG(ras); 846} 847 848/* called with the ras_lock held or from places where it doesn't matter */ 849static void ras_stride_reset(struct ll_readahead_state *ras) 850{ 851 ras->ras_consecutive_stride_requests = 0; 852 ras->ras_stride_length = 0; 853 ras->ras_stride_pages = 0; 854 RAS_CDEBUG(ras); 855} 856 857void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras) 858{ 859 spin_lock_init(&ras->ras_lock); 860 ras_reset(inode, ras, 0); 861 ras->ras_requests = 0; 862 INIT_LIST_HEAD(&ras->ras_read_beads); 863} 864 865/* 866 * Check whether the read request is in the stride window. 867 * If it is in the stride window, return 1, otherwise return 0. 868 */ 869static int index_in_stride_window(struct ll_readahead_state *ras, 870 unsigned long index) 871{ 872 unsigned long stride_gap; 873 874 if (ras->ras_stride_length == 0 || ras->ras_stride_pages == 0 || 875 ras->ras_stride_pages == ras->ras_stride_length) 876 return 0; 877 878 stride_gap = index - ras->ras_last_readpage - 1; 879 880 /* If it is contiguous read */ 881 if (stride_gap == 0) 882 return ras->ras_consecutive_pages + 1 <= ras->ras_stride_pages; 883 884 /* Otherwise check the stride by itself */ 885 return (ras->ras_stride_length - ras->ras_stride_pages) == stride_gap && 886 ras->ras_consecutive_pages == ras->ras_stride_pages; 887} 888 889static void ras_update_stride_detector(struct ll_readahead_state *ras, 890 unsigned long index) 891{ 892 unsigned long stride_gap = index - ras->ras_last_readpage - 1; 893 894 if (!stride_io_mode(ras) && (stride_gap != 0 || 895 ras->ras_consecutive_stride_requests == 0)) { 896 ras->ras_stride_pages = ras->ras_consecutive_pages; 897 ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages; 898 } 899 LASSERT(ras->ras_request_index == 0); 900 LASSERT(ras->ras_consecutive_stride_requests == 0); 901 902 if (index <= ras->ras_last_readpage) { 903 /*Reset stride window for forward read*/ 904 ras_stride_reset(ras); 905 return; 906 } 907 908 ras->ras_stride_pages = ras->ras_consecutive_pages; 909 ras->ras_stride_length = stride_gap +ras->ras_consecutive_pages; 910 911 RAS_CDEBUG(ras); 912 return; 913} 914 915static unsigned long 916stride_page_count(struct ll_readahead_state *ras, unsigned long len) 917{ 918 return stride_pg_count(ras->ras_stride_offset, ras->ras_stride_length, 919 ras->ras_stride_pages, ras->ras_stride_offset, 920 len); 921} 922 923/* Stride Read-ahead window will be increased inc_len according to 924 * stride I/O pattern */ 925static void ras_stride_increase_window(struct ll_readahead_state *ras, 926 struct ll_ra_info *ra, 927 unsigned long inc_len) 928{ 929 unsigned long left, step, window_len; 930 unsigned long stride_len; 931 932 LASSERT(ras->ras_stride_length > 0); 933 LASSERTF(ras->ras_window_start + ras->ras_window_len 934 >= ras->ras_stride_offset, "window_start %lu, window_len %lu" 935 " stride_offset %lu\n", ras->ras_window_start, 936 ras->ras_window_len, ras->ras_stride_offset); 937 938 stride_len = ras->ras_window_start + ras->ras_window_len - 939 ras->ras_stride_offset; 940 941 left = stride_len % ras->ras_stride_length; 942 window_len = ras->ras_window_len - left; 943 944 if (left < ras->ras_stride_pages) 945 left += inc_len; 946 else 947 left = ras->ras_stride_pages + inc_len; 948 949 LASSERT(ras->ras_stride_pages != 0); 950 951 step = left / ras->ras_stride_pages; 952 left %= ras->ras_stride_pages; 953 954 window_len += step * ras->ras_stride_length + left; 955 956 if (stride_page_count(ras, window_len) <= ra->ra_max_pages_per_file) 957 ras->ras_window_len = window_len; 958 959 RAS_CDEBUG(ras); 960} 961 962static void ras_increase_window(struct inode *inode, 963 struct ll_readahead_state *ras, 964 struct ll_ra_info *ra) 965{ 966 /* The stretch of ra-window should be aligned with max rpc_size 967 * but current clio architecture does not support retrieve such 968 * information from lower layer. FIXME later 969 */ 970 if (stride_io_mode(ras)) 971 ras_stride_increase_window(ras, ra, RAS_INCREASE_STEP(inode)); 972 else 973 ras->ras_window_len = min(ras->ras_window_len + 974 RAS_INCREASE_STEP(inode), 975 ra->ra_max_pages_per_file); 976} 977 978void ras_update(struct ll_sb_info *sbi, struct inode *inode, 979 struct ll_readahead_state *ras, unsigned long index, 980 unsigned hit) 981{ 982 struct ll_ra_info *ra = &sbi->ll_ra_info; 983 int zero = 0, stride_detect = 0, ra_miss = 0; 984 985 spin_lock(&ras->ras_lock); 986 987 ll_ra_stats_inc_sbi(sbi, hit ? RA_STAT_HIT : RA_STAT_MISS); 988 989 /* reset the read-ahead window in two cases. First when the app seeks 990 * or reads to some other part of the file. Secondly if we get a 991 * read-ahead miss that we think we've previously issued. This can 992 * be a symptom of there being so many read-ahead pages that the VM is 993 * reclaiming it before we get to it. */ 994 if (!index_in_window(index, ras->ras_last_readpage, 8, 8)) { 995 zero = 1; 996 ll_ra_stats_inc_sbi(sbi, RA_STAT_DISTANT_READPAGE); 997 } else if (!hit && ras->ras_window_len && 998 index < ras->ras_next_readahead && 999 index_in_window(index, ras->ras_window_start, 0, 1000 ras->ras_window_len)) { 1001 ra_miss = 1; 1002 ll_ra_stats_inc_sbi(sbi, RA_STAT_MISS_IN_WINDOW); 1003 } 1004 1005 /* On the second access to a file smaller than the tunable 1006 * ra_max_read_ahead_whole_pages trigger RA on all pages in the 1007 * file up to ra_max_pages_per_file. This is simply a best effort 1008 * and only occurs once per open file. Normal RA behavior is reverted 1009 * to for subsequent IO. The mmap case does not increment 1010 * ras_requests and thus can never trigger this behavior. */ 1011 if (ras->ras_requests == 2 && !ras->ras_request_index) { 1012 __u64 kms_pages; 1013 1014 kms_pages = (i_size_read(inode) + PAGE_CACHE_SIZE - 1) >> 1015 PAGE_CACHE_SHIFT; 1016 1017 CDEBUG(D_READA, "kmsp "LPU64" mwp %lu mp %lu\n", kms_pages, 1018 ra->ra_max_read_ahead_whole_pages, ra->ra_max_pages_per_file); 1019 1020 if (kms_pages && 1021 kms_pages <= ra->ra_max_read_ahead_whole_pages) { 1022 ras->ras_window_start = 0; 1023 ras->ras_last_readpage = 0; 1024 ras->ras_next_readahead = 0; 1025 ras->ras_window_len = min(ra->ra_max_pages_per_file, 1026 ra->ra_max_read_ahead_whole_pages); 1027 GOTO(out_unlock, 0); 1028 } 1029 } 1030 if (zero) { 1031 /* check whether it is in stride I/O mode*/ 1032 if (!index_in_stride_window(ras, index)) { 1033 if (ras->ras_consecutive_stride_requests == 0 && 1034 ras->ras_request_index == 0) { 1035 ras_update_stride_detector(ras, index); 1036 ras->ras_consecutive_stride_requests++; 1037 } else { 1038 ras_stride_reset(ras); 1039 } 1040 ras_reset(inode, ras, index); 1041 ras->ras_consecutive_pages++; 1042 GOTO(out_unlock, 0); 1043 } else { 1044 ras->ras_consecutive_pages = 0; 1045 ras->ras_consecutive_requests = 0; 1046 if (++ras->ras_consecutive_stride_requests > 1) 1047 stride_detect = 1; 1048 RAS_CDEBUG(ras); 1049 } 1050 } else { 1051 if (ra_miss) { 1052 if (index_in_stride_window(ras, index) && 1053 stride_io_mode(ras)) { 1054 /*If stride-RA hit cache miss, the stride dector 1055 *will not be reset to avoid the overhead of 1056 *redetecting read-ahead mode */ 1057 if (index != ras->ras_last_readpage + 1) 1058 ras->ras_consecutive_pages = 0; 1059 ras_reset(inode, ras, index); 1060 RAS_CDEBUG(ras); 1061 } else { 1062 /* Reset both stride window and normal RA 1063 * window */ 1064 ras_reset(inode, ras, index); 1065 ras->ras_consecutive_pages++; 1066 ras_stride_reset(ras); 1067 GOTO(out_unlock, 0); 1068 } 1069 } else if (stride_io_mode(ras)) { 1070 /* If this is contiguous read but in stride I/O mode 1071 * currently, check whether stride step still is valid, 1072 * if invalid, it will reset the stride ra window*/ 1073 if (!index_in_stride_window(ras, index)) { 1074 /* Shrink stride read-ahead window to be zero */ 1075 ras_stride_reset(ras); 1076 ras->ras_window_len = 0; 1077 ras->ras_next_readahead = index; 1078 } 1079 } 1080 } 1081 ras->ras_consecutive_pages++; 1082 ras->ras_last_readpage = index; 1083 ras_set_start(inode, ras, index); 1084 1085 if (stride_io_mode(ras)) 1086 /* Since stride readahead is sensitive to the offset 1087 * of read-ahead, so we use original offset here, 1088 * instead of ras_window_start, which is RPC aligned */ 1089 ras->ras_next_readahead = max(index, ras->ras_next_readahead); 1090 else 1091 ras->ras_next_readahead = max(ras->ras_window_start, 1092 ras->ras_next_readahead); 1093 RAS_CDEBUG(ras); 1094 1095 /* Trigger RA in the mmap case where ras_consecutive_requests 1096 * is not incremented and thus can't be used to trigger RA */ 1097 if (!ras->ras_window_len && ras->ras_consecutive_pages == 4) { 1098 ras->ras_window_len = RAS_INCREASE_STEP(inode); 1099 GOTO(out_unlock, 0); 1100 } 1101 1102 /* Initially reset the stride window offset to next_readahead*/ 1103 if (ras->ras_consecutive_stride_requests == 2 && stride_detect) { 1104 /** 1105 * Once stride IO mode is detected, next_readahead should be 1106 * reset to make sure next_readahead > stride offset 1107 */ 1108 ras->ras_next_readahead = max(index, ras->ras_next_readahead); 1109 ras->ras_stride_offset = index; 1110 ras->ras_window_len = RAS_INCREASE_STEP(inode); 1111 } 1112 1113 /* The initial ras_window_len is set to the request size. To avoid 1114 * uselessly reading and discarding pages for random IO the window is 1115 * only increased once per consecutive request received. */ 1116 if ((ras->ras_consecutive_requests > 1 || stride_detect) && 1117 !ras->ras_request_index) 1118 ras_increase_window(inode, ras, ra); 1119out_unlock: 1120 RAS_CDEBUG(ras); 1121 ras->ras_request_index++; 1122 spin_unlock(&ras->ras_lock); 1123 return; 1124} 1125 1126int ll_writepage(struct page *vmpage, struct writeback_control *wbc) 1127{ 1128 struct inode *inode = vmpage->mapping->host; 1129 struct ll_inode_info *lli = ll_i2info(inode); 1130 struct lu_env *env; 1131 struct cl_io *io; 1132 struct cl_page *page; 1133 struct cl_object *clob; 1134 struct cl_env_nest nest; 1135 bool redirtied = false; 1136 bool unlocked = false; 1137 int result; 1138 1139 LASSERT(PageLocked(vmpage)); 1140 LASSERT(!PageWriteback(vmpage)); 1141 1142 LASSERT(ll_i2dtexp(inode) != NULL); 1143 1144 env = cl_env_nested_get(&nest); 1145 if (IS_ERR(env)) 1146 GOTO(out, result = PTR_ERR(env)); 1147 1148 clob = ll_i2info(inode)->lli_clob; 1149 LASSERT(clob != NULL); 1150 1151 io = ccc_env_thread_io(env); 1152 io->ci_obj = clob; 1153 io->ci_ignore_layout = 1; 1154 result = cl_io_init(env, io, CIT_MISC, clob); 1155 if (result == 0) { 1156 page = cl_page_find(env, clob, vmpage->index, 1157 vmpage, CPT_CACHEABLE); 1158 if (!IS_ERR(page)) { 1159 lu_ref_add(&page->cp_reference, "writepage", 1160 current); 1161 cl_page_assume(env, io, page); 1162 result = cl_page_flush(env, io, page); 1163 if (result != 0) { 1164 /* 1165 * Re-dirty page on error so it retries write, 1166 * but not in case when IO has actually 1167 * occurred and completed with an error. 1168 */ 1169 if (!PageError(vmpage)) { 1170 redirty_page_for_writepage(wbc, vmpage); 1171 result = 0; 1172 redirtied = true; 1173 } 1174 } 1175 cl_page_disown(env, io, page); 1176 unlocked = true; 1177 lu_ref_del(&page->cp_reference, 1178 "writepage", current); 1179 cl_page_put(env, page); 1180 } else { 1181 result = PTR_ERR(page); 1182 } 1183 } 1184 cl_io_fini(env, io); 1185 1186 if (redirtied && wbc->sync_mode == WB_SYNC_ALL) { 1187 loff_t offset = cl_offset(clob, vmpage->index); 1188 1189 /* Flush page failed because the extent is being written out. 1190 * Wait for the write of extent to be finished to avoid 1191 * breaking kernel which assumes ->writepage should mark 1192 * PageWriteback or clean the page. */ 1193 result = cl_sync_file_range(inode, offset, 1194 offset + PAGE_CACHE_SIZE - 1, 1195 CL_FSYNC_LOCAL, 1); 1196 if (result > 0) { 1197 /* actually we may have written more than one page. 1198 * decreasing this page because the caller will count 1199 * it. */ 1200 wbc->nr_to_write -= result - 1; 1201 result = 0; 1202 } 1203 } 1204 1205 cl_env_nested_put(&nest, env); 1206 GOTO(out, result); 1207 1208out: 1209 if (result < 0) { 1210 if (!lli->lli_async_rc) 1211 lli->lli_async_rc = result; 1212 SetPageError(vmpage); 1213 if (!unlocked) 1214 unlock_page(vmpage); 1215 } 1216 return result; 1217} 1218 1219int ll_writepages(struct address_space *mapping, struct writeback_control *wbc) 1220{ 1221 struct inode *inode = mapping->host; 1222 struct ll_sb_info *sbi = ll_i2sbi(inode); 1223 loff_t start; 1224 loff_t end; 1225 enum cl_fsync_mode mode; 1226 int range_whole = 0; 1227 int result; 1228 int ignore_layout = 0; 1229 1230 if (wbc->range_cyclic) { 1231 start = mapping->writeback_index << PAGE_CACHE_SHIFT; 1232 end = OBD_OBJECT_EOF; 1233 } else { 1234 start = wbc->range_start; 1235 end = wbc->range_end; 1236 if (end == LLONG_MAX) { 1237 end = OBD_OBJECT_EOF; 1238 range_whole = start == 0; 1239 } 1240 } 1241 1242 mode = CL_FSYNC_NONE; 1243 if (wbc->sync_mode == WB_SYNC_ALL) 1244 mode = CL_FSYNC_LOCAL; 1245 1246 if (sbi->ll_umounting) 1247 /* if the mountpoint is being umounted, all pages have to be 1248 * evicted to avoid hitting LBUG when truncate_inode_pages() 1249 * is called later on. */ 1250 ignore_layout = 1; 1251 result = cl_sync_file_range(inode, start, end, mode, ignore_layout); 1252 if (result > 0) { 1253 wbc->nr_to_write -= result; 1254 result = 0; 1255 } 1256 1257 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) { 1258 if (end == OBD_OBJECT_EOF) 1259 end = i_size_read(inode); 1260 mapping->writeback_index = (end >> PAGE_CACHE_SHIFT) + 1; 1261 } 1262 return result; 1263} 1264 1265int ll_readpage(struct file *file, struct page *vmpage) 1266{ 1267 struct ll_cl_context *lcc; 1268 int result; 1269 1270 lcc = ll_cl_init(file, vmpage, 0); 1271 if (!IS_ERR(lcc)) { 1272 struct lu_env *env = lcc->lcc_env; 1273 struct cl_io *io = lcc->lcc_io; 1274 struct cl_page *page = lcc->lcc_page; 1275 1276 LASSERT(page->cp_type == CPT_CACHEABLE); 1277 if (likely(!PageUptodate(vmpage))) { 1278 cl_page_assume(env, io, page); 1279 result = cl_io_read_page(env, io, page); 1280 } else { 1281 /* Page from a non-object file. */ 1282 unlock_page(vmpage); 1283 result = 0; 1284 } 1285 ll_cl_fini(lcc); 1286 } else { 1287 unlock_page(vmpage); 1288 result = PTR_ERR(lcc); 1289 } 1290 return result; 1291} 1292