lov_object.c revision 081b726563585f3d55dedb2649b17f0c15a36f82
1/* 2 * GPL HEADER START 3 * 4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License version 2 only, 8 * as published by the Free Software Foundation. 9 * 10 * This program is distributed in the hope that it will be useful, but 11 * WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 13 * General Public License version 2 for more details (a copy is included 14 * in the LICENSE file that accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License 17 * version 2 along with this program; If not, see 18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf 19 * 20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, 21 * CA 95054 USA or visit www.sun.com if you need additional information or 22 * have any questions. 23 * 24 * GPL HEADER END 25 */ 26/* 27 * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. 28 * Use is subject to license terms. 29 * 30 * Copyright (c) 2011, 2012, Intel Corporation. 31 */ 32/* 33 * This file is part of Lustre, http://www.lustre.org/ 34 * Lustre is a trademark of Sun Microsystems, Inc. 35 * 36 * Implementation of cl_object for LOV layer. 37 * 38 * Author: Nikita Danilov <nikita.danilov@sun.com> 39 * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com> 40 */ 41 42#define DEBUG_SUBSYSTEM S_LOV 43 44#include "lov_cl_internal.h" 45 46/** \addtogroup lov 47 * @{ 48 */ 49 50/***************************************************************************** 51 * 52 * Layout operations. 53 * 54 */ 55 56struct lov_layout_operations { 57 int (*llo_init)(const struct lu_env *env, struct lov_device *dev, 58 struct lov_object *lov, 59 const struct cl_object_conf *conf, 60 union lov_layout_state *state); 61 int (*llo_delete)(const struct lu_env *env, struct lov_object *lov, 62 union lov_layout_state *state); 63 void (*llo_fini)(const struct lu_env *env, struct lov_object *lov, 64 union lov_layout_state *state); 65 void (*llo_install)(const struct lu_env *env, struct lov_object *lov, 66 union lov_layout_state *state); 67 int (*llo_print)(const struct lu_env *env, void *cookie, 68 lu_printer_t p, const struct lu_object *o); 69 int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj, 70 struct cl_page *page, struct page *vmpage); 71 int (*llo_lock_init)(const struct lu_env *env, 72 struct cl_object *obj, struct cl_lock *lock, 73 const struct cl_io *io); 74 int (*llo_io_init)(const struct lu_env *env, 75 struct cl_object *obj, struct cl_io *io); 76 int (*llo_getattr)(const struct lu_env *env, struct cl_object *obj, 77 struct cl_attr *attr); 78}; 79 80static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov); 81 82/***************************************************************************** 83 * 84 * Lov object layout operations. 85 * 86 */ 87 88static void lov_install_empty(const struct lu_env *env, 89 struct lov_object *lov, 90 union lov_layout_state *state) 91{ 92 /* 93 * File without objects. 94 */ 95} 96 97static int lov_init_empty(const struct lu_env *env, 98 struct lov_device *dev, struct lov_object *lov, 99 const struct cl_object_conf *conf, 100 union lov_layout_state *state) 101{ 102 return 0; 103} 104 105static void lov_install_raid0(const struct lu_env *env, 106 struct lov_object *lov, 107 union lov_layout_state *state) 108{ 109} 110 111static struct cl_object *lov_sub_find(const struct lu_env *env, 112 struct cl_device *dev, 113 const struct lu_fid *fid, 114 const struct cl_object_conf *conf) 115{ 116 struct lu_object *o; 117 118 o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu); 119 LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type)); 120 return lu2cl(o); 121} 122 123static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, 124 struct cl_object *stripe, struct lov_layout_raid0 *r0, 125 int idx) 126{ 127 struct cl_object_header *hdr; 128 struct cl_object_header *subhdr; 129 struct cl_object_header *parent; 130 struct lov_oinfo *oinfo; 131 int result; 132 133 if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) { 134 /* For sanity:test_206. 135 * Do not leave the object in cache to avoid accessing 136 * freed memory. This is because osc_object is referring to 137 * lov_oinfo of lsm_stripe_data which will be freed due to 138 * this failure. */ 139 cl_object_kill(env, stripe); 140 cl_object_put(env, stripe); 141 return -EIO; 142 } 143 144 hdr = cl_object_header(lov2cl(lov)); 145 subhdr = cl_object_header(stripe); 146 147 oinfo = lov->lo_lsm->lsm_oinfo[idx]; 148 CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID 149 " idx: %d gen: %d\n", 150 PFID(&subhdr->coh_lu.loh_fid), subhdr, idx, 151 PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi), 152 oinfo->loi_ost_idx, oinfo->loi_ost_gen); 153 154 /* reuse ->coh_attr_guard to protect coh_parent change */ 155 spin_lock(&subhdr->coh_attr_guard); 156 parent = subhdr->coh_parent; 157 if (parent == NULL) { 158 subhdr->coh_parent = hdr; 159 spin_unlock(&subhdr->coh_attr_guard); 160 subhdr->coh_nesting = hdr->coh_nesting + 1; 161 lu_object_ref_add(&stripe->co_lu, "lov-parent", lov); 162 r0->lo_sub[idx] = cl2lovsub(stripe); 163 r0->lo_sub[idx]->lso_super = lov; 164 r0->lo_sub[idx]->lso_index = idx; 165 result = 0; 166 } else { 167 struct lu_object *old_obj; 168 struct lov_object *old_lov; 169 unsigned int mask = D_INODE; 170 171 spin_unlock(&subhdr->coh_attr_guard); 172 old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type); 173 LASSERT(old_obj != NULL); 174 old_lov = cl2lov(lu2cl(old_obj)); 175 if (old_lov->lo_layout_invalid) { 176 /* the object's layout has already changed but isn't 177 * refreshed */ 178 lu_object_unhash(env, &stripe->co_lu); 179 result = -EAGAIN; 180 } else { 181 mask = D_ERROR; 182 result = -EIO; 183 } 184 185 LU_OBJECT_DEBUG(mask, env, &stripe->co_lu, 186 "stripe %d is already owned.\n", idx); 187 LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n"); 188 LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n"); 189 cl_object_put(env, stripe); 190 } 191 return result; 192} 193 194static int lov_init_raid0(const struct lu_env *env, 195 struct lov_device *dev, struct lov_object *lov, 196 const struct cl_object_conf *conf, 197 union lov_layout_state *state) 198{ 199 int result; 200 int i; 201 202 struct cl_object *stripe; 203 struct lov_thread_info *lti = lov_env_info(env); 204 struct cl_object_conf *subconf = <i->lti_stripe_conf; 205 struct lov_stripe_md *lsm = conf->u.coc_md->lsm; 206 struct lu_fid *ofid = <i->lti_fid; 207 struct lov_layout_raid0 *r0 = &state->raid0; 208 209 if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) { 210 dump_lsm(D_ERROR, lsm); 211 LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n", 212 LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic); 213 } 214 215 LASSERT(lov->lo_lsm == NULL); 216 lov->lo_lsm = lsm_addref(lsm); 217 r0->lo_nr = lsm->lsm_stripe_count; 218 LASSERT(r0->lo_nr <= lov_targets_nr(dev)); 219 220 OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof(r0->lo_sub[0])); 221 if (r0->lo_sub != NULL) { 222 result = 0; 223 subconf->coc_inode = conf->coc_inode; 224 spin_lock_init(&r0->lo_sub_lock); 225 /* 226 * Create stripe cl_objects. 227 */ 228 for (i = 0; i < r0->lo_nr && result == 0; ++i) { 229 struct cl_device *subdev; 230 struct lov_oinfo *oinfo = lsm->lsm_oinfo[i]; 231 int ost_idx = oinfo->loi_ost_idx; 232 233 result = ostid_to_fid(ofid, &oinfo->loi_oi, 234 oinfo->loi_ost_idx); 235 if (result != 0) 236 GOTO(out, result); 237 238 subdev = lovsub2cl_dev(dev->ld_target[ost_idx]); 239 subconf->u.coc_oinfo = oinfo; 240 LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx); 241 /* In the function below, .hs_keycmp resolves to 242 * lu_obj_hop_keycmp() */ 243 /* coverity[overrun-buffer-val] */ 244 stripe = lov_sub_find(env, subdev, ofid, subconf); 245 if (!IS_ERR(stripe)) { 246 result = lov_init_sub(env, lov, stripe, r0, i); 247 if (result == -EAGAIN) { /* try again */ 248 --i; 249 result = 0; 250 } 251 } else { 252 result = PTR_ERR(stripe); 253 } 254 } 255 } else 256 result = -ENOMEM; 257out: 258 return result; 259} 260 261static int lov_init_released(const struct lu_env *env, 262 struct lov_device *dev, struct lov_object *lov, 263 const struct cl_object_conf *conf, 264 union lov_layout_state *state) 265{ 266 struct lov_stripe_md *lsm = conf->u.coc_md->lsm; 267 268 LASSERT(lsm != NULL); 269 LASSERT(lsm_is_released(lsm)); 270 LASSERT(lov->lo_lsm == NULL); 271 272 lov->lo_lsm = lsm_addref(lsm); 273 return 0; 274} 275 276static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, 277 union lov_layout_state *state) 278{ 279 LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED); 280 281 lov_layout_wait(env, lov); 282 283 cl_object_prune(env, &lov->lo_cl); 284 return 0; 285} 286 287static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, 288 struct lovsub_object *los, int idx) 289{ 290 struct cl_object *sub; 291 struct lov_layout_raid0 *r0; 292 struct lu_site *site; 293 struct lu_site_bkt_data *bkt; 294 wait_queue_t *waiter; 295 296 r0 = &lov->u.raid0; 297 LASSERT(r0->lo_sub[idx] == los); 298 299 sub = lovsub2cl(los); 300 site = sub->co_lu.lo_dev->ld_site; 301 bkt = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid); 302 303 cl_object_kill(env, sub); 304 /* release a reference to the sub-object and ... */ 305 lu_object_ref_del(&sub->co_lu, "lov-parent", lov); 306 cl_object_put(env, sub); 307 308 /* ... wait until it is actually destroyed---sub-object clears its 309 * ->lo_sub[] slot in lovsub_object_fini() */ 310 if (r0->lo_sub[idx] == los) { 311 waiter = &lov_env_info(env)->lti_waiter; 312 init_waitqueue_entry(waiter, current); 313 add_wait_queue(&bkt->lsb_marche_funebre, waiter); 314 set_current_state(TASK_UNINTERRUPTIBLE); 315 while (1) { 316 /* this wait-queue is signaled at the end of 317 * lu_object_free(). */ 318 set_current_state(TASK_UNINTERRUPTIBLE); 319 spin_lock(&r0->lo_sub_lock); 320 if (r0->lo_sub[idx] == los) { 321 spin_unlock(&r0->lo_sub_lock); 322 schedule(); 323 } else { 324 spin_unlock(&r0->lo_sub_lock); 325 set_current_state(TASK_RUNNING); 326 break; 327 } 328 } 329 remove_wait_queue(&bkt->lsb_marche_funebre, waiter); 330 } 331 LASSERT(r0->lo_sub[idx] == NULL); 332} 333 334static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, 335 union lov_layout_state *state) 336{ 337 struct lov_layout_raid0 *r0 = &state->raid0; 338 struct lov_stripe_md *lsm = lov->lo_lsm; 339 int i; 340 341 dump_lsm(D_INODE, lsm); 342 343 lov_layout_wait(env, lov); 344 if (r0->lo_sub != NULL) { 345 for (i = 0; i < r0->lo_nr; ++i) { 346 struct lovsub_object *los = r0->lo_sub[i]; 347 348 if (los != NULL) { 349 cl_locks_prune(env, &los->lso_cl, 1); 350 /* 351 * If top-level object is to be evicted from 352 * the cache, so are its sub-objects. 353 */ 354 lov_subobject_kill(env, lov, los, i); 355 } 356 } 357 } 358 cl_object_prune(env, &lov->lo_cl); 359 return 0; 360} 361 362static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov, 363 union lov_layout_state *state) 364{ 365 LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED); 366} 367 368static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov, 369 union lov_layout_state *state) 370{ 371 struct lov_layout_raid0 *r0 = &state->raid0; 372 373 if (r0->lo_sub != NULL) { 374 OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof(r0->lo_sub[0])); 375 r0->lo_sub = NULL; 376 } 377 378 dump_lsm(D_INODE, lov->lo_lsm); 379 lov_free_memmd(&lov->lo_lsm); 380} 381 382static void lov_fini_released(const struct lu_env *env, struct lov_object *lov, 383 union lov_layout_state *state) 384{ 385 dump_lsm(D_INODE, lov->lo_lsm); 386 lov_free_memmd(&lov->lo_lsm); 387} 388 389static int lov_print_empty(const struct lu_env *env, void *cookie, 390 lu_printer_t p, const struct lu_object *o) 391{ 392 (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid); 393 return 0; 394} 395 396static int lov_print_raid0(const struct lu_env *env, void *cookie, 397 lu_printer_t p, const struct lu_object *o) 398{ 399 struct lov_object *lov = lu2lov(o); 400 struct lov_layout_raid0 *r0 = lov_r0(lov); 401 struct lov_stripe_md *lsm = lov->lo_lsm; 402 int i; 403 404 (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n", 405 r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm, 406 lsm->lsm_magic, atomic_read(&lsm->lsm_refc), 407 lsm->lsm_stripe_count, lsm->lsm_layout_gen); 408 for (i = 0; i < r0->lo_nr; ++i) { 409 struct lu_object *sub; 410 411 if (r0->lo_sub[i] != NULL) { 412 sub = lovsub2lu(r0->lo_sub[i]); 413 lu_object_print(env, cookie, p, sub); 414 } else { 415 (*p)(env, cookie, "sub %d absent\n", i); 416 } 417 } 418 return 0; 419} 420 421static int lov_print_released(const struct lu_env *env, void *cookie, 422 lu_printer_t p, const struct lu_object *o) 423{ 424 struct lov_object *lov = lu2lov(o); 425 struct lov_stripe_md *lsm = lov->lo_lsm; 426 427 (*p)(env, cookie, 428 "released: %s, lsm{%p 0x%08X %d %u %u}:\n", 429 lov->lo_layout_invalid ? "invalid" : "valid", lsm, 430 lsm->lsm_magic, atomic_read(&lsm->lsm_refc), 431 lsm->lsm_stripe_count, lsm->lsm_layout_gen); 432 return 0; 433} 434 435/** 436 * Implements cl_object_operations::coo_attr_get() method for an object 437 * without stripes (LLT_EMPTY layout type). 438 * 439 * The only attributes this layer is authoritative in this case is 440 * cl_attr::cat_blocks---it's 0. 441 */ 442static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj, 443 struct cl_attr *attr) 444{ 445 attr->cat_blocks = 0; 446 return 0; 447} 448 449static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj, 450 struct cl_attr *attr) 451{ 452 struct lov_object *lov = cl2lov(obj); 453 struct lov_layout_raid0 *r0 = lov_r0(lov); 454 struct cl_attr *lov_attr = &r0->lo_attr; 455 int result = 0; 456 457 /* this is called w/o holding type guard mutex, so it must be inside 458 * an on going IO otherwise lsm may be replaced. 459 * LU-2117: it turns out there exists one exception. For mmaped files, 460 * the lock of those files may be requested in the other file's IO 461 * context, and this function is called in ccc_lock_state(), it will 462 * hit this assertion. 463 * Anyway, it's still okay to call attr_get w/o type guard as layout 464 * can't go if locks exist. */ 465 /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */ 466 467 if (!r0->lo_attr_valid) { 468 struct lov_stripe_md *lsm = lov->lo_lsm; 469 struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb; 470 __u64 kms = 0; 471 472 memset(lvb, 0, sizeof(*lvb)); 473 /* XXX: timestamps can be negative by sanity:test_39m, 474 * how can it be? */ 475 lvb->lvb_atime = LLONG_MIN; 476 lvb->lvb_ctime = LLONG_MIN; 477 lvb->lvb_mtime = LLONG_MIN; 478 479 /* 480 * XXX that should be replaced with a loop over sub-objects, 481 * doing cl_object_attr_get() on them. But for now, let's 482 * reuse old lov code. 483 */ 484 485 /* 486 * XXX take lsm spin-lock to keep lov_merge_lvb_kms() 487 * happy. It's not needed, because new code uses 488 * ->coh_attr_guard spin-lock to protect consistency of 489 * sub-object attributes. 490 */ 491 lov_stripe_lock(lsm); 492 result = lov_merge_lvb_kms(lsm, lvb, &kms); 493 lov_stripe_unlock(lsm); 494 if (result == 0) { 495 cl_lvb2attr(lov_attr, lvb); 496 lov_attr->cat_kms = kms; 497 r0->lo_attr_valid = 1; 498 } 499 } 500 if (result == 0) { /* merge results */ 501 attr->cat_blocks = lov_attr->cat_blocks; 502 attr->cat_size = lov_attr->cat_size; 503 attr->cat_kms = lov_attr->cat_kms; 504 if (attr->cat_atime < lov_attr->cat_atime) 505 attr->cat_atime = lov_attr->cat_atime; 506 if (attr->cat_ctime < lov_attr->cat_ctime) 507 attr->cat_ctime = lov_attr->cat_ctime; 508 if (attr->cat_mtime < lov_attr->cat_mtime) 509 attr->cat_mtime = lov_attr->cat_mtime; 510 } 511 return result; 512} 513 514static const struct lov_layout_operations lov_dispatch[] = { 515 [LLT_EMPTY] = { 516 .llo_init = lov_init_empty, 517 .llo_delete = lov_delete_empty, 518 .llo_fini = lov_fini_empty, 519 .llo_install = lov_install_empty, 520 .llo_print = lov_print_empty, 521 .llo_page_init = lov_page_init_empty, 522 .llo_lock_init = lov_lock_init_empty, 523 .llo_io_init = lov_io_init_empty, 524 .llo_getattr = lov_attr_get_empty 525 }, 526 [LLT_RAID0] = { 527 .llo_init = lov_init_raid0, 528 .llo_delete = lov_delete_raid0, 529 .llo_fini = lov_fini_raid0, 530 .llo_install = lov_install_raid0, 531 .llo_print = lov_print_raid0, 532 .llo_page_init = lov_page_init_raid0, 533 .llo_lock_init = lov_lock_init_raid0, 534 .llo_io_init = lov_io_init_raid0, 535 .llo_getattr = lov_attr_get_raid0 536 }, 537 [LLT_RELEASED] = { 538 .llo_init = lov_init_released, 539 .llo_delete = lov_delete_empty, 540 .llo_fini = lov_fini_released, 541 .llo_install = lov_install_empty, 542 .llo_print = lov_print_released, 543 .llo_page_init = lov_page_init_empty, 544 .llo_lock_init = lov_lock_init_empty, 545 .llo_io_init = lov_io_init_released, 546 .llo_getattr = lov_attr_get_empty 547 } 548}; 549 550/** 551 * Performs a double-dispatch based on the layout type of an object. 552 */ 553#define LOV_2DISPATCH_NOLOCK(obj, op, ...) \ 554({ \ 555 struct lov_object *__obj = (obj); \ 556 enum lov_layout_type __llt; \ 557 \ 558 __llt = __obj->lo_type; \ 559 LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch)); \ 560 lov_dispatch[__llt].op(__VA_ARGS__); \ 561}) 562 563/** 564 * Return lov_layout_type associated with a given lsm 565 */ 566enum lov_layout_type lov_type(struct lov_stripe_md *lsm) 567{ 568 if (lsm == NULL) 569 return LLT_EMPTY; 570 if (lsm_is_released(lsm)) 571 return LLT_RELEASED; 572 return LLT_RAID0; 573} 574 575static inline void lov_conf_freeze(struct lov_object *lov) 576{ 577 if (lov->lo_owner != current) 578 down_read(&lov->lo_type_guard); 579} 580 581static inline void lov_conf_thaw(struct lov_object *lov) 582{ 583 if (lov->lo_owner != current) 584 up_read(&lov->lo_type_guard); 585} 586 587#define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...) \ 588({ \ 589 struct lov_object *__obj = (obj); \ 590 int __lock = !!(lock); \ 591 typeof(lov_dispatch[0].op(__VA_ARGS__)) __result; \ 592 \ 593 if (__lock) \ 594 lov_conf_freeze(__obj); \ 595 __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__); \ 596 if (__lock) \ 597 lov_conf_thaw(__obj); \ 598 __result; \ 599}) 600 601/** 602 * Performs a locked double-dispatch based on the layout type of an object. 603 */ 604#define LOV_2DISPATCH(obj, op, ...) \ 605 LOV_2DISPATCH_MAYLOCK(obj, op, 1, __VA_ARGS__) 606 607#define LOV_2DISPATCH_VOID(obj, op, ...) \ 608do { \ 609 struct lov_object *__obj = (obj); \ 610 enum lov_layout_type __llt; \ 611 \ 612 lov_conf_freeze(__obj); \ 613 __llt = __obj->lo_type; \ 614 LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch)); \ 615 lov_dispatch[__llt].op(__VA_ARGS__); \ 616 lov_conf_thaw(__obj); \ 617} while (0) 618 619static void lov_conf_lock(struct lov_object *lov) 620{ 621 LASSERT(lov->lo_owner != current); 622 down_write(&lov->lo_type_guard); 623 LASSERT(lov->lo_owner == NULL); 624 lov->lo_owner = current; 625} 626 627static void lov_conf_unlock(struct lov_object *lov) 628{ 629 lov->lo_owner = NULL; 630 up_write(&lov->lo_type_guard); 631} 632 633static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov) 634{ 635 struct l_wait_info lwi = { 0 }; 636 637 while (atomic_read(&lov->lo_active_ios) > 0) { 638 CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n", 639 PFID(lu_object_fid(lov2lu(lov))), 640 atomic_read(&lov->lo_active_ios)); 641 642 l_wait_event(lov->lo_waitq, 643 atomic_read(&lov->lo_active_ios) == 0, &lwi); 644 } 645 return 0; 646} 647 648static int lov_layout_change(const struct lu_env *unused, 649 struct lov_object *lov, 650 const struct cl_object_conf *conf) 651{ 652 int result; 653 enum lov_layout_type llt = LLT_EMPTY; 654 union lov_layout_state *state = &lov->u; 655 const struct lov_layout_operations *old_ops; 656 const struct lov_layout_operations *new_ops; 657 658 struct cl_object_header *hdr = cl_object_header(&lov->lo_cl); 659 void *cookie; 660 struct lu_env *env; 661 int refcheck; 662 663 LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch)); 664 665 if (conf->u.coc_md != NULL) 666 llt = lov_type(conf->u.coc_md->lsm); 667 LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch)); 668 669 cookie = cl_env_reenter(); 670 env = cl_env_get(&refcheck); 671 if (IS_ERR(env)) { 672 cl_env_reexit(cookie); 673 return PTR_ERR(env); 674 } 675 676 CDEBUG(D_INODE, DFID" from %s to %s\n", 677 PFID(lu_object_fid(lov2lu(lov))), 678 llt2str(lov->lo_type), llt2str(llt)); 679 680 old_ops = &lov_dispatch[lov->lo_type]; 681 new_ops = &lov_dispatch[llt]; 682 683 result = old_ops->llo_delete(env, lov, &lov->u); 684 if (result == 0) { 685 old_ops->llo_fini(env, lov, &lov->u); 686 687 LASSERT(atomic_read(&lov->lo_active_ios) == 0); 688 LASSERT(hdr->coh_tree.rnode == NULL); 689 LASSERT(hdr->coh_pages == 0); 690 691 lov->lo_type = LLT_EMPTY; 692 result = new_ops->llo_init(env, 693 lu2lov_dev(lov->lo_cl.co_lu.lo_dev), 694 lov, conf, state); 695 if (result == 0) { 696 new_ops->llo_install(env, lov, state); 697 lov->lo_type = llt; 698 } else { 699 new_ops->llo_delete(env, lov, state); 700 new_ops->llo_fini(env, lov, state); 701 /* this file becomes an EMPTY file. */ 702 } 703 } 704 705 cl_env_put(env, &refcheck); 706 cl_env_reexit(cookie); 707 return result; 708} 709 710/***************************************************************************** 711 * 712 * Lov object operations. 713 * 714 */ 715int lov_object_init(const struct lu_env *env, struct lu_object *obj, 716 const struct lu_object_conf *conf) 717{ 718 struct lov_device *dev = lu2lov_dev(obj->lo_dev); 719 struct lov_object *lov = lu2lov(obj); 720 const struct cl_object_conf *cconf = lu2cl_conf(conf); 721 union lov_layout_state *set = &lov->u; 722 const struct lov_layout_operations *ops; 723 int result; 724 725 init_rwsem(&lov->lo_type_guard); 726 atomic_set(&lov->lo_active_ios, 0); 727 init_waitqueue_head(&lov->lo_waitq); 728 729 cl_object_page_init(lu2cl(obj), sizeof(struct lov_page)); 730 731 /* no locking is necessary, as object is being created */ 732 lov->lo_type = lov_type(cconf->u.coc_md->lsm); 733 ops = &lov_dispatch[lov->lo_type]; 734 result = ops->llo_init(env, dev, lov, cconf, set); 735 if (result == 0) 736 ops->llo_install(env, lov, set); 737 return result; 738} 739 740static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, 741 const struct cl_object_conf *conf) 742{ 743 struct lov_stripe_md *lsm = NULL; 744 struct lov_object *lov = cl2lov(obj); 745 int result = 0; 746 747 lov_conf_lock(lov); 748 if (conf->coc_opc == OBJECT_CONF_INVALIDATE) { 749 lov->lo_layout_invalid = true; 750 GOTO(out, result = 0); 751 } 752 753 if (conf->coc_opc == OBJECT_CONF_WAIT) { 754 if (lov->lo_layout_invalid && 755 atomic_read(&lov->lo_active_ios) > 0) { 756 lov_conf_unlock(lov); 757 result = lov_layout_wait(env, lov); 758 lov_conf_lock(lov); 759 } 760 GOTO(out, result); 761 } 762 763 LASSERT(conf->coc_opc == OBJECT_CONF_SET); 764 765 if (conf->u.coc_md != NULL) 766 lsm = conf->u.coc_md->lsm; 767 if ((lsm == NULL && lov->lo_lsm == NULL) || 768 ((lsm != NULL && lov->lo_lsm != NULL) && 769 (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) && 770 (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) { 771 /* same version of layout */ 772 lov->lo_layout_invalid = false; 773 GOTO(out, result = 0); 774 } 775 776 /* will change layout - check if there still exists active IO. */ 777 if (atomic_read(&lov->lo_active_ios) > 0) { 778 lov->lo_layout_invalid = true; 779 GOTO(out, result = -EBUSY); 780 } 781 782 lov->lo_layout_invalid = lov_layout_change(env, lov, conf); 783 784out: 785 lov_conf_unlock(lov); 786 CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n", 787 PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid); 788 return result; 789} 790 791static void lov_object_delete(const struct lu_env *env, struct lu_object *obj) 792{ 793 struct lov_object *lov = lu2lov(obj); 794 795 LOV_2DISPATCH_VOID(lov, llo_delete, env, lov, &lov->u); 796} 797 798static void lov_object_free(const struct lu_env *env, struct lu_object *obj) 799{ 800 struct lov_object *lov = lu2lov(obj); 801 802 LOV_2DISPATCH_VOID(lov, llo_fini, env, lov, &lov->u); 803 lu_object_fini(obj); 804 OBD_SLAB_FREE_PTR(lov, lov_object_kmem); 805} 806 807static int lov_object_print(const struct lu_env *env, void *cookie, 808 lu_printer_t p, const struct lu_object *o) 809{ 810 return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o); 811} 812 813int lov_page_init(const struct lu_env *env, struct cl_object *obj, 814 struct cl_page *page, struct page *vmpage) 815{ 816 return LOV_2DISPATCH_NOLOCK(cl2lov(obj), 817 llo_page_init, env, obj, page, vmpage); 818} 819 820/** 821 * Implements cl_object_operations::clo_io_init() method for lov 822 * layer. Dispatches to the appropriate layout io initialization method. 823 */ 824int lov_io_init(const struct lu_env *env, struct cl_object *obj, 825 struct cl_io *io) 826{ 827 CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl); 828 return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init, 829 !io->ci_ignore_layout, env, obj, io); 830} 831 832/** 833 * An implementation of cl_object_operations::clo_attr_get() method for lov 834 * layer. For raid0 layout this collects and merges attributes of all 835 * sub-objects. 836 */ 837static int lov_attr_get(const struct lu_env *env, struct cl_object *obj, 838 struct cl_attr *attr) 839{ 840 /* do not take lock, as this function is called under a 841 * spin-lock. Layout is protected from changing by ongoing IO. */ 842 return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr); 843} 844 845static int lov_attr_set(const struct lu_env *env, struct cl_object *obj, 846 const struct cl_attr *attr, unsigned valid) 847{ 848 /* 849 * No dispatch is required here, as no layout implements this. 850 */ 851 return 0; 852} 853 854int lov_lock_init(const struct lu_env *env, struct cl_object *obj, 855 struct cl_lock *lock, const struct cl_io *io) 856{ 857 /* No need to lock because we've taken one refcount of layout. */ 858 return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_lock_init, env, obj, lock, 859 io); 860} 861 862static const struct cl_object_operations lov_ops = { 863 .coo_page_init = lov_page_init, 864 .coo_lock_init = lov_lock_init, 865 .coo_io_init = lov_io_init, 866 .coo_attr_get = lov_attr_get, 867 .coo_attr_set = lov_attr_set, 868 .coo_conf_set = lov_conf_set 869}; 870 871static const struct lu_object_operations lov_lu_obj_ops = { 872 .loo_object_init = lov_object_init, 873 .loo_object_delete = lov_object_delete, 874 .loo_object_release = NULL, 875 .loo_object_free = lov_object_free, 876 .loo_object_print = lov_object_print, 877 .loo_object_invariant = NULL 878}; 879 880struct lu_object *lov_object_alloc(const struct lu_env *env, 881 const struct lu_object_header *unused, 882 struct lu_device *dev) 883{ 884 struct lov_object *lov; 885 struct lu_object *obj; 886 887 OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS); 888 if (lov != NULL) { 889 obj = lov2lu(lov); 890 lu_object_init(obj, NULL, dev); 891 lov->lo_cl.co_ops = &lov_ops; 892 lov->lo_type = -1; /* invalid, to catch uninitialized type */ 893 /* 894 * object io operation vector (cl_object::co_iop) is installed 895 * later in lov_object_init(), as different vectors are used 896 * for object with different layouts. 897 */ 898 obj->lo_ops = &lov_lu_obj_ops; 899 } else 900 obj = NULL; 901 return obj; 902} 903 904struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov) 905{ 906 struct lov_stripe_md *lsm = NULL; 907 908 lov_conf_freeze(lov); 909 if (lov->lo_lsm != NULL) { 910 lsm = lsm_addref(lov->lo_lsm); 911 CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n", 912 lsm, atomic_read(&lsm->lsm_refc), 913 lov->lo_layout_invalid, current); 914 } 915 lov_conf_thaw(lov); 916 return lsm; 917} 918 919void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm) 920{ 921 if (lsm == NULL) 922 return; 923 924 CDEBUG(D_INODE, "lsm %p decref %d by %p.\n", 925 lsm, atomic_read(&lsm->lsm_refc), current); 926 927 lov_free_memmd(&lsm); 928} 929 930struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj) 931{ 932 struct lu_object *luobj; 933 struct lov_stripe_md *lsm = NULL; 934 935 if (clobj == NULL) 936 return NULL; 937 938 luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu, 939 &lov_device_type); 940 if (luobj != NULL) 941 lsm = lov_lsm_addref(lu2lov(luobj)); 942 return lsm; 943} 944EXPORT_SYMBOL(lov_lsm_get); 945 946void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm) 947{ 948 if (lsm != NULL) 949 lov_free_memmd(&lsm); 950} 951EXPORT_SYMBOL(lov_lsm_put); 952 953int lov_read_and_clear_async_rc(struct cl_object *clob) 954{ 955 struct lu_object *luobj; 956 int rc = 0; 957 958 luobj = lu_object_locate(&cl_object_header(clob)->coh_lu, 959 &lov_device_type); 960 if (luobj != NULL) { 961 struct lov_object *lov = lu2lov(luobj); 962 963 lov_conf_freeze(lov); 964 switch (lov->lo_type) { 965 case LLT_RAID0: { 966 struct lov_stripe_md *lsm; 967 int i; 968 969 lsm = lov->lo_lsm; 970 LASSERT(lsm != NULL); 971 for (i = 0; i < lsm->lsm_stripe_count; i++) { 972 struct lov_oinfo *loi = lsm->lsm_oinfo[i]; 973 if (loi->loi_ar.ar_rc && !rc) 974 rc = loi->loi_ar.ar_rc; 975 loi->loi_ar.ar_rc = 0; 976 } 977 } 978 case LLT_RELEASED: 979 case LLT_EMPTY: 980 break; 981 default: 982 LBUG(); 983 } 984 lov_conf_thaw(lov); 985 } 986 return rc; 987} 988EXPORT_SYMBOL(lov_read_and_clear_async_rc); 989 990/** @} lov */ 991