dlmglue.c revision 2a45f2d13e1dd91bc110801f5818379f2699509c
1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26#include <linux/types.h> 27#include <linux/slab.h> 28#include <linux/highmem.h> 29#include <linux/mm.h> 30#include <linux/smp_lock.h> 31#include <linux/crc32.h> 32#include <linux/kthread.h> 33#include <linux/pagemap.h> 34#include <linux/debugfs.h> 35#include <linux/seq_file.h> 36 37#include <cluster/heartbeat.h> 38#include <cluster/nodemanager.h> 39#include <cluster/tcp.h> 40 41#include <dlm/dlmapi.h> 42 43#define MLOG_MASK_PREFIX ML_DLM_GLUE 44#include <cluster/masklog.h> 45 46#include "ocfs2.h" 47 48#include "alloc.h" 49#include "dcache.h" 50#include "dlmglue.h" 51#include "extent_map.h" 52#include "heartbeat.h" 53#include "inode.h" 54#include "journal.h" 55#include "slot_map.h" 56#include "super.h" 57#include "uptodate.h" 58#include "vote.h" 59 60#include "buffer_head_io.h" 61 62struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68}; 69 70static void ocfs2_inode_bast_func(void *opaque, 71 int level); 72static void ocfs2_dentry_bast_func(void *opaque, 73 int level); 74static void ocfs2_super_bast_func(void *opaque, 75 int level); 76static void ocfs2_rename_bast_func(void *opaque, 77 int level); 78 79/* 80 * Return value from ocfs2_convert_worker_t functions. 81 * 82 * These control the precise actions of ocfs2_generic_unblock_lock() 83 * and ocfs2_process_blocked_lock() 84 * 85 */ 86enum ocfs2_unblock_action { 87 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 88 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 89 * ->post_unlock callback */ 90 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 91 * ->post_unlock() callback. */ 92}; 93 94struct ocfs2_unblock_ctl { 95 int requeue; 96 enum ocfs2_unblock_action unblock_action; 97}; 98 99static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, 100 struct ocfs2_unblock_ctl *ctl); 101static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres, 102 struct ocfs2_unblock_ctl *ctl); 103static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, 104 struct ocfs2_unblock_ctl *ctl); 105static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres, 106 struct ocfs2_unblock_ctl *ctl); 107static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres, 108 struct ocfs2_unblock_ctl *ctl); 109 110static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 111 struct ocfs2_lock_res *lockres); 112 113/* 114 * OCFS2 Lock Resource Operations 115 * 116 * These fine tune the behavior of the generic dlmglue locking infrastructure. 117 */ 118struct ocfs2_lock_res_ops { 119 void (*bast)(void *, int); 120 int (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *); 121 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 122 123 /* 124 * LOCK_TYPE_* flags which describe the specific requirements 125 * of a lock type. Descriptions of each individual flag follow. 126 */ 127 int flags; 128}; 129 130/* 131 * Some locks want to "refresh" potentially stale data when a 132 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 133 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 134 * individual lockres l_flags member from the ast function. It is 135 * expected that the locking wrapper will clear the 136 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 137 */ 138#define LOCK_TYPE_REQUIRES_REFRESH 0x1 139 140typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int); 141static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, 142 struct ocfs2_lock_res *lockres, 143 struct ocfs2_unblock_ctl *ctl, 144 ocfs2_convert_worker_t *worker); 145 146static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 147 .bast = ocfs2_inode_bast_func, 148 .unblock = ocfs2_unblock_inode_lock, 149 .flags = 0, 150}; 151 152static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = { 153 .bast = ocfs2_inode_bast_func, 154 .unblock = ocfs2_unblock_meta, 155 .flags = LOCK_TYPE_REQUIRES_REFRESH, 156}; 157 158static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = { 159 .bast = ocfs2_inode_bast_func, 160 .unblock = ocfs2_unblock_data, 161 .flags = 0, 162}; 163 164static struct ocfs2_lock_res_ops ocfs2_super_lops = { 165 .bast = ocfs2_super_bast_func, 166 .unblock = ocfs2_unblock_osb_lock, 167 .flags = LOCK_TYPE_REQUIRES_REFRESH, 168}; 169 170static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 171 .bast = ocfs2_rename_bast_func, 172 .unblock = ocfs2_unblock_osb_lock, 173 .flags = 0, 174}; 175 176static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 177 .bast = ocfs2_dentry_bast_func, 178 .unblock = ocfs2_unblock_dentry_lock, 179 .post_unlock = ocfs2_dentry_post_unlock, 180 .flags = 0, 181}; 182 183static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 184{ 185 return lockres->l_type == OCFS2_LOCK_TYPE_META || 186 lockres->l_type == OCFS2_LOCK_TYPE_DATA || 187 lockres->l_type == OCFS2_LOCK_TYPE_RW; 188} 189 190static inline int ocfs2_is_super_lock(struct ocfs2_lock_res *lockres) 191{ 192 return lockres->l_type == OCFS2_LOCK_TYPE_SUPER; 193} 194 195static inline int ocfs2_is_rename_lock(struct ocfs2_lock_res *lockres) 196{ 197 return lockres->l_type == OCFS2_LOCK_TYPE_RENAME; 198} 199 200static inline struct ocfs2_super *ocfs2_lock_res_super(struct ocfs2_lock_res *lockres) 201{ 202 BUG_ON(!ocfs2_is_super_lock(lockres) 203 && !ocfs2_is_rename_lock(lockres)); 204 205 return (struct ocfs2_super *) lockres->l_priv; 206} 207 208static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 209{ 210 BUG_ON(!ocfs2_is_inode_lock(lockres)); 211 212 return (struct inode *) lockres->l_priv; 213} 214 215static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 216{ 217 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 218 219 return (struct ocfs2_dentry_lock *)lockres->l_priv; 220} 221 222static int ocfs2_lock_create(struct ocfs2_super *osb, 223 struct ocfs2_lock_res *lockres, 224 int level, 225 int dlm_flags); 226static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 227 int wanted); 228static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 229 struct ocfs2_lock_res *lockres, 230 int level); 231static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 232static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 233static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 234static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 235static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 236 struct ocfs2_lock_res *lockres); 237static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 238 int convert); 239#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \ 240 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ 241 "resource %s: %s\n", dlm_errname(_stat), _func, \ 242 _lockres->l_name, dlm_errmsg(_stat)); \ 243} while (0) 244static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, 245 struct ocfs2_lock_res *lockres); 246static int ocfs2_meta_lock_update(struct inode *inode, 247 struct buffer_head **bh); 248static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 249static inline int ocfs2_highest_compat_lock_level(int level); 250static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode, 251 struct ocfs2_lock_res *lockres, 252 int new_level); 253 254static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 255 u64 blkno, 256 u32 generation, 257 char *name) 258{ 259 int len; 260 261 mlog_entry_void(); 262 263 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 264 265 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 266 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 267 (long long)blkno, generation); 268 269 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 270 271 mlog(0, "built lock resource with name: %s\n", name); 272 273 mlog_exit_void(); 274} 275 276static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 277 278static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 279 struct ocfs2_dlm_debug *dlm_debug) 280{ 281 mlog(0, "Add tracking for lockres %s\n", res->l_name); 282 283 spin_lock(&ocfs2_dlm_tracking_lock); 284 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 285 spin_unlock(&ocfs2_dlm_tracking_lock); 286} 287 288static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 289{ 290 spin_lock(&ocfs2_dlm_tracking_lock); 291 if (!list_empty(&res->l_debug_list)) 292 list_del_init(&res->l_debug_list); 293 spin_unlock(&ocfs2_dlm_tracking_lock); 294} 295 296static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 297 struct ocfs2_lock_res *res, 298 enum ocfs2_lock_type type, 299 struct ocfs2_lock_res_ops *ops, 300 void *priv) 301{ 302 res->l_type = type; 303 res->l_ops = ops; 304 res->l_priv = priv; 305 306 res->l_level = LKM_IVMODE; 307 res->l_requested = LKM_IVMODE; 308 res->l_blocking = LKM_IVMODE; 309 res->l_action = OCFS2_AST_INVALID; 310 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 311 312 res->l_flags = OCFS2_LOCK_INITIALIZED; 313 314 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 315} 316 317void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 318{ 319 /* This also clears out the lock status block */ 320 memset(res, 0, sizeof(struct ocfs2_lock_res)); 321 spin_lock_init(&res->l_lock); 322 init_waitqueue_head(&res->l_event); 323 INIT_LIST_HEAD(&res->l_blocked_list); 324 INIT_LIST_HEAD(&res->l_mask_waiters); 325} 326 327void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 328 enum ocfs2_lock_type type, 329 unsigned int generation, 330 struct inode *inode) 331{ 332 struct ocfs2_lock_res_ops *ops; 333 334 switch(type) { 335 case OCFS2_LOCK_TYPE_RW: 336 ops = &ocfs2_inode_rw_lops; 337 break; 338 case OCFS2_LOCK_TYPE_META: 339 ops = &ocfs2_inode_meta_lops; 340 break; 341 case OCFS2_LOCK_TYPE_DATA: 342 ops = &ocfs2_inode_data_lops; 343 break; 344 default: 345 mlog_bug_on_msg(1, "type: %d\n", type); 346 ops = NULL; /* thanks, gcc */ 347 break; 348 }; 349 350 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 351 generation, res->l_name); 352 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 353} 354 355static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 356{ 357 __be64 inode_blkno_be; 358 359 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 360 sizeof(__be64)); 361 362 return be64_to_cpu(inode_blkno_be); 363} 364 365void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 366 u64 parent, struct inode *inode) 367{ 368 int len; 369 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 370 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 371 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 372 373 ocfs2_lock_res_init_once(lockres); 374 375 /* 376 * Unfortunately, the standard lock naming scheme won't work 377 * here because we have two 16 byte values to use. Instead, 378 * we'll stuff the inode number as a binary value. We still 379 * want error prints to show something without garbling the 380 * display, so drop a null byte in there before the inode 381 * number. A future version of OCFS2 will likely use all 382 * binary lock names. The stringified names have been a 383 * tremendous aid in debugging, but now that the debugfs 384 * interface exists, we can mangle things there if need be. 385 * 386 * NOTE: We also drop the standard "pad" value (the total lock 387 * name size stays the same though - the last part is all 388 * zeros due to the memset in ocfs2_lock_res_init_once() 389 */ 390 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 391 "%c%016llx", 392 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 393 (long long)parent); 394 395 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 396 397 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 398 sizeof(__be64)); 399 400 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 401 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 402 dl); 403} 404 405static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 406 struct ocfs2_super *osb) 407{ 408 /* Superblock lockres doesn't come from a slab so we call init 409 * once on it manually. */ 410 ocfs2_lock_res_init_once(res); 411 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 412 0, res->l_name); 413 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 414 &ocfs2_super_lops, osb); 415} 416 417static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 418 struct ocfs2_super *osb) 419{ 420 /* Rename lockres doesn't come from a slab so we call init 421 * once on it manually. */ 422 ocfs2_lock_res_init_once(res); 423 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 424 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 425 &ocfs2_rename_lops, osb); 426} 427 428void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 429{ 430 mlog_entry_void(); 431 432 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 433 return; 434 435 ocfs2_remove_lockres_tracking(res); 436 437 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 438 "Lockres %s is on the blocked list\n", 439 res->l_name); 440 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 441 "Lockres %s has mask waiters pending\n", 442 res->l_name); 443 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 444 "Lockres %s is locked\n", 445 res->l_name); 446 mlog_bug_on_msg(res->l_ro_holders, 447 "Lockres %s has %u ro holders\n", 448 res->l_name, res->l_ro_holders); 449 mlog_bug_on_msg(res->l_ex_holders, 450 "Lockres %s has %u ex holders\n", 451 res->l_name, res->l_ex_holders); 452 453 /* Need to clear out the lock status block for the dlm */ 454 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 455 456 res->l_flags = 0UL; 457 mlog_exit_void(); 458} 459 460static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 461 int level) 462{ 463 mlog_entry_void(); 464 465 BUG_ON(!lockres); 466 467 switch(level) { 468 case LKM_EXMODE: 469 lockres->l_ex_holders++; 470 break; 471 case LKM_PRMODE: 472 lockres->l_ro_holders++; 473 break; 474 default: 475 BUG(); 476 } 477 478 mlog_exit_void(); 479} 480 481static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 482 int level) 483{ 484 mlog_entry_void(); 485 486 BUG_ON(!lockres); 487 488 switch(level) { 489 case LKM_EXMODE: 490 BUG_ON(!lockres->l_ex_holders); 491 lockres->l_ex_holders--; 492 break; 493 case LKM_PRMODE: 494 BUG_ON(!lockres->l_ro_holders); 495 lockres->l_ro_holders--; 496 break; 497 default: 498 BUG(); 499 } 500 mlog_exit_void(); 501} 502 503/* WARNING: This function lives in a world where the only three lock 504 * levels are EX, PR, and NL. It *will* have to be adjusted when more 505 * lock types are added. */ 506static inline int ocfs2_highest_compat_lock_level(int level) 507{ 508 int new_level = LKM_EXMODE; 509 510 if (level == LKM_EXMODE) 511 new_level = LKM_NLMODE; 512 else if (level == LKM_PRMODE) 513 new_level = LKM_PRMODE; 514 return new_level; 515} 516 517static void lockres_set_flags(struct ocfs2_lock_res *lockres, 518 unsigned long newflags) 519{ 520 struct list_head *pos, *tmp; 521 struct ocfs2_mask_waiter *mw; 522 523 assert_spin_locked(&lockres->l_lock); 524 525 lockres->l_flags = newflags; 526 527 list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) { 528 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item); 529 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 530 continue; 531 532 list_del_init(&mw->mw_item); 533 mw->mw_status = 0; 534 complete(&mw->mw_complete); 535 } 536} 537static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 538{ 539 lockres_set_flags(lockres, lockres->l_flags | or); 540} 541static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 542 unsigned long clear) 543{ 544 lockres_set_flags(lockres, lockres->l_flags & ~clear); 545} 546 547static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 548{ 549 mlog_entry_void(); 550 551 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 552 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 553 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 554 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 555 556 lockres->l_level = lockres->l_requested; 557 if (lockres->l_level <= 558 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 559 lockres->l_blocking = LKM_NLMODE; 560 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 561 } 562 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 563 564 mlog_exit_void(); 565} 566 567static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 568{ 569 mlog_entry_void(); 570 571 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 572 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 573 574 /* Convert from RO to EX doesn't really need anything as our 575 * information is already up to data. Convert from NL to 576 * *anything* however should mark ourselves as needing an 577 * update */ 578 if (lockres->l_level == LKM_NLMODE && 579 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 580 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 581 582 lockres->l_level = lockres->l_requested; 583 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 584 585 mlog_exit_void(); 586} 587 588static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 589{ 590 mlog_entry_void(); 591 592 BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY)); 593 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 594 595 if (lockres->l_requested > LKM_NLMODE && 596 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 597 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 598 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 599 600 lockres->l_level = lockres->l_requested; 601 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 602 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 603 604 mlog_exit_void(); 605} 606 607static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 608 int level) 609{ 610 int needs_downconvert = 0; 611 mlog_entry_void(); 612 613 assert_spin_locked(&lockres->l_lock); 614 615 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 616 617 if (level > lockres->l_blocking) { 618 /* only schedule a downconvert if we haven't already scheduled 619 * one that goes low enough to satisfy the level we're 620 * blocking. this also catches the case where we get 621 * duplicate BASTs */ 622 if (ocfs2_highest_compat_lock_level(level) < 623 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 624 needs_downconvert = 1; 625 626 lockres->l_blocking = level; 627 } 628 629 mlog_exit(needs_downconvert); 630 return needs_downconvert; 631} 632 633static void ocfs2_generic_bast_func(struct ocfs2_super *osb, 634 struct ocfs2_lock_res *lockres, 635 int level) 636{ 637 int needs_downconvert; 638 unsigned long flags; 639 640 mlog_entry_void(); 641 642 BUG_ON(level <= LKM_NLMODE); 643 644 spin_lock_irqsave(&lockres->l_lock, flags); 645 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 646 if (needs_downconvert) 647 ocfs2_schedule_blocked_lock(osb, lockres); 648 spin_unlock_irqrestore(&lockres->l_lock, flags); 649 650 wake_up(&lockres->l_event); 651 652 ocfs2_kick_vote_thread(osb); 653 654 mlog_exit_void(); 655} 656 657static void ocfs2_inode_bast_func(void *opaque, int level) 658{ 659 struct ocfs2_lock_res *lockres = opaque; 660 struct inode *inode; 661 struct ocfs2_super *osb; 662 663 mlog_entry_void(); 664 665 BUG_ON(!ocfs2_is_inode_lock(lockres)); 666 667 inode = ocfs2_lock_res_inode(lockres); 668 osb = OCFS2_SB(inode->i_sb); 669 670 mlog(0, "BAST fired for inode %llu, blocking %d, level %d type %s\n", 671 (unsigned long long)OCFS2_I(inode)->ip_blkno, level, 672 lockres->l_level, ocfs2_lock_type_string(lockres->l_type)); 673 674 ocfs2_generic_bast_func(osb, lockres, level); 675 676 mlog_exit_void(); 677} 678 679static void ocfs2_locking_ast(void *opaque) 680{ 681 struct ocfs2_lock_res *lockres = opaque; 682 struct dlm_lockstatus *lksb = &lockres->l_lksb; 683 unsigned long flags; 684 685 spin_lock_irqsave(&lockres->l_lock, flags); 686 687 if (lksb->status != DLM_NORMAL) { 688 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n", 689 lockres->l_name, lksb->status); 690 spin_unlock_irqrestore(&lockres->l_lock, flags); 691 return; 692 } 693 694 switch(lockres->l_action) { 695 case OCFS2_AST_ATTACH: 696 ocfs2_generic_handle_attach_action(lockres); 697 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 698 break; 699 case OCFS2_AST_CONVERT: 700 ocfs2_generic_handle_convert_action(lockres); 701 break; 702 case OCFS2_AST_DOWNCONVERT: 703 ocfs2_generic_handle_downconvert_action(lockres); 704 break; 705 default: 706 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 707 "lockres flags = 0x%lx, unlock action: %u\n", 708 lockres->l_name, lockres->l_action, lockres->l_flags, 709 lockres->l_unlock_action); 710 BUG(); 711 } 712 713 /* set it to something invalid so if we get called again we 714 * can catch it. */ 715 lockres->l_action = OCFS2_AST_INVALID; 716 717 wake_up(&lockres->l_event); 718 spin_unlock_irqrestore(&lockres->l_lock, flags); 719} 720 721static void ocfs2_super_bast_func(void *opaque, 722 int level) 723{ 724 struct ocfs2_lock_res *lockres = opaque; 725 struct ocfs2_super *osb; 726 727 mlog_entry_void(); 728 mlog(0, "Superblock BAST fired\n"); 729 730 BUG_ON(!ocfs2_is_super_lock(lockres)); 731 osb = ocfs2_lock_res_super(lockres); 732 ocfs2_generic_bast_func(osb, lockres, level); 733 734 mlog_exit_void(); 735} 736 737static void ocfs2_rename_bast_func(void *opaque, 738 int level) 739{ 740 struct ocfs2_lock_res *lockres = opaque; 741 struct ocfs2_super *osb; 742 743 mlog_entry_void(); 744 745 mlog(0, "Rename BAST fired\n"); 746 747 BUG_ON(!ocfs2_is_rename_lock(lockres)); 748 749 osb = ocfs2_lock_res_super(lockres); 750 ocfs2_generic_bast_func(osb, lockres, level); 751 752 mlog_exit_void(); 753} 754 755static void ocfs2_dentry_bast_func(void *opaque, int level) 756{ 757 struct ocfs2_lock_res *lockres = opaque; 758 struct ocfs2_dentry_lock *dl = lockres->l_priv; 759 struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb); 760 761 mlog(0, "Dentry bast: level: %d, name: %s\n", level, 762 lockres->l_name); 763 764 ocfs2_generic_bast_func(osb, lockres, level); 765} 766 767static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 768 int convert) 769{ 770 unsigned long flags; 771 772 mlog_entry_void(); 773 spin_lock_irqsave(&lockres->l_lock, flags); 774 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 775 if (convert) 776 lockres->l_action = OCFS2_AST_INVALID; 777 else 778 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 779 spin_unlock_irqrestore(&lockres->l_lock, flags); 780 781 wake_up(&lockres->l_event); 782 mlog_exit_void(); 783} 784 785/* Note: If we detect another process working on the lock (i.e., 786 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 787 * to do the right thing in that case. 788 */ 789static int ocfs2_lock_create(struct ocfs2_super *osb, 790 struct ocfs2_lock_res *lockres, 791 int level, 792 int dlm_flags) 793{ 794 int ret = 0; 795 enum dlm_status status; 796 unsigned long flags; 797 798 mlog_entry_void(); 799 800 mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level, 801 dlm_flags); 802 803 spin_lock_irqsave(&lockres->l_lock, flags); 804 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 805 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 806 spin_unlock_irqrestore(&lockres->l_lock, flags); 807 goto bail; 808 } 809 810 lockres->l_action = OCFS2_AST_ATTACH; 811 lockres->l_requested = level; 812 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 813 spin_unlock_irqrestore(&lockres->l_lock, flags); 814 815 status = dlmlock(osb->dlm, 816 level, 817 &lockres->l_lksb, 818 dlm_flags, 819 lockres->l_name, 820 OCFS2_LOCK_ID_MAX_LEN - 1, 821 ocfs2_locking_ast, 822 lockres, 823 lockres->l_ops->bast); 824 if (status != DLM_NORMAL) { 825 ocfs2_log_dlm_error("dlmlock", status, lockres); 826 ret = -EINVAL; 827 ocfs2_recover_from_dlm_error(lockres, 1); 828 } 829 830 mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name); 831 832bail: 833 mlog_exit(ret); 834 return ret; 835} 836 837static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 838 int flag) 839{ 840 unsigned long flags; 841 int ret; 842 843 spin_lock_irqsave(&lockres->l_lock, flags); 844 ret = lockres->l_flags & flag; 845 spin_unlock_irqrestore(&lockres->l_lock, flags); 846 847 return ret; 848} 849 850static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 851 852{ 853 wait_event(lockres->l_event, 854 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 855} 856 857static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 858 859{ 860 wait_event(lockres->l_event, 861 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 862} 863 864/* predict what lock level we'll be dropping down to on behalf 865 * of another node, and return true if the currently wanted 866 * level will be compatible with it. */ 867static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 868 int wanted) 869{ 870 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 871 872 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 873} 874 875static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 876{ 877 INIT_LIST_HEAD(&mw->mw_item); 878 init_completion(&mw->mw_complete); 879} 880 881static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 882{ 883 wait_for_completion(&mw->mw_complete); 884 /* Re-arm the completion in case we want to wait on it again */ 885 INIT_COMPLETION(mw->mw_complete); 886 return mw->mw_status; 887} 888 889static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 890 struct ocfs2_mask_waiter *mw, 891 unsigned long mask, 892 unsigned long goal) 893{ 894 BUG_ON(!list_empty(&mw->mw_item)); 895 896 assert_spin_locked(&lockres->l_lock); 897 898 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 899 mw->mw_mask = mask; 900 mw->mw_goal = goal; 901} 902 903/* returns 0 if the mw that was removed was already satisfied, -EBUSY 904 * if the mask still hadn't reached its goal */ 905static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 906 struct ocfs2_mask_waiter *mw) 907{ 908 unsigned long flags; 909 int ret = 0; 910 911 spin_lock_irqsave(&lockres->l_lock, flags); 912 if (!list_empty(&mw->mw_item)) { 913 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 914 ret = -EBUSY; 915 916 list_del_init(&mw->mw_item); 917 init_completion(&mw->mw_complete); 918 } 919 spin_unlock_irqrestore(&lockres->l_lock, flags); 920 921 return ret; 922 923} 924 925static int ocfs2_cluster_lock(struct ocfs2_super *osb, 926 struct ocfs2_lock_res *lockres, 927 int level, 928 int lkm_flags, 929 int arg_flags) 930{ 931 struct ocfs2_mask_waiter mw; 932 enum dlm_status status; 933 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 934 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 935 unsigned long flags; 936 937 mlog_entry_void(); 938 939 ocfs2_init_mask_waiter(&mw); 940 941again: 942 wait = 0; 943 944 if (catch_signals && signal_pending(current)) { 945 ret = -ERESTARTSYS; 946 goto out; 947 } 948 949 spin_lock_irqsave(&lockres->l_lock, flags); 950 951 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 952 "Cluster lock called on freeing lockres %s! flags " 953 "0x%lx\n", lockres->l_name, lockres->l_flags); 954 955 /* We only compare against the currently granted level 956 * here. If the lock is blocked waiting on a downconvert, 957 * we'll get caught below. */ 958 if (lockres->l_flags & OCFS2_LOCK_BUSY && 959 level > lockres->l_level) { 960 /* is someone sitting in dlm_lock? If so, wait on 961 * them. */ 962 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 963 wait = 1; 964 goto unlock; 965 } 966 967 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 968 /* lock has not been created yet. */ 969 spin_unlock_irqrestore(&lockres->l_lock, flags); 970 971 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); 972 if (ret < 0) { 973 mlog_errno(ret); 974 goto out; 975 } 976 goto again; 977 } 978 979 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 980 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 981 /* is the lock is currently blocked on behalf of 982 * another node */ 983 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 984 wait = 1; 985 goto unlock; 986 } 987 988 if (level > lockres->l_level) { 989 if (lockres->l_action != OCFS2_AST_INVALID) 990 mlog(ML_ERROR, "lockres %s has action %u pending\n", 991 lockres->l_name, lockres->l_action); 992 993 lockres->l_action = OCFS2_AST_CONVERT; 994 lockres->l_requested = level; 995 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 996 spin_unlock_irqrestore(&lockres->l_lock, flags); 997 998 BUG_ON(level == LKM_IVMODE); 999 BUG_ON(level == LKM_NLMODE); 1000 1001 mlog(0, "lock %s, convert from %d to level = %d\n", 1002 lockres->l_name, lockres->l_level, level); 1003 1004 /* call dlm_lock to upgrade lock now */ 1005 status = dlmlock(osb->dlm, 1006 level, 1007 &lockres->l_lksb, 1008 lkm_flags|LKM_CONVERT|LKM_VALBLK, 1009 lockres->l_name, 1010 OCFS2_LOCK_ID_MAX_LEN - 1, 1011 ocfs2_locking_ast, 1012 lockres, 1013 lockres->l_ops->bast); 1014 if (status != DLM_NORMAL) { 1015 if ((lkm_flags & LKM_NOQUEUE) && 1016 (status == DLM_NOTQUEUED)) 1017 ret = -EAGAIN; 1018 else { 1019 ocfs2_log_dlm_error("dlmlock", status, 1020 lockres); 1021 ret = -EINVAL; 1022 } 1023 ocfs2_recover_from_dlm_error(lockres, 1); 1024 goto out; 1025 } 1026 1027 mlog(0, "lock %s, successfull return from dlmlock\n", 1028 lockres->l_name); 1029 1030 /* At this point we've gone inside the dlm and need to 1031 * complete our work regardless. */ 1032 catch_signals = 0; 1033 1034 /* wait for busy to clear and carry on */ 1035 goto again; 1036 } 1037 1038 /* Ok, if we get here then we're good to go. */ 1039 ocfs2_inc_holders(lockres, level); 1040 1041 ret = 0; 1042unlock: 1043 spin_unlock_irqrestore(&lockres->l_lock, flags); 1044out: 1045 /* 1046 * This is helping work around a lock inversion between the page lock 1047 * and dlm locks. One path holds the page lock while calling aops 1048 * which block acquiring dlm locks. The voting thread holds dlm 1049 * locks while acquiring page locks while down converting data locks. 1050 * This block is helping an aop path notice the inversion and back 1051 * off to unlock its page lock before trying the dlm lock again. 1052 */ 1053 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1054 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1055 wait = 0; 1056 if (lockres_remove_mask_waiter(lockres, &mw)) 1057 ret = -EAGAIN; 1058 else 1059 goto again; 1060 } 1061 if (wait) { 1062 ret = ocfs2_wait_for_mask(&mw); 1063 if (ret == 0) 1064 goto again; 1065 mlog_errno(ret); 1066 } 1067 1068 mlog_exit(ret); 1069 return ret; 1070} 1071 1072static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1073 struct ocfs2_lock_res *lockres, 1074 int level) 1075{ 1076 unsigned long flags; 1077 1078 mlog_entry_void(); 1079 spin_lock_irqsave(&lockres->l_lock, flags); 1080 ocfs2_dec_holders(lockres, level); 1081 ocfs2_vote_on_unlock(osb, lockres); 1082 spin_unlock_irqrestore(&lockres->l_lock, flags); 1083 mlog_exit_void(); 1084} 1085 1086int ocfs2_create_new_lock(struct ocfs2_super *osb, 1087 struct ocfs2_lock_res *lockres, 1088 int ex, 1089 int local) 1090{ 1091 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1092 unsigned long flags; 1093 int lkm_flags = local ? LKM_LOCAL : 0; 1094 1095 spin_lock_irqsave(&lockres->l_lock, flags); 1096 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1097 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1098 spin_unlock_irqrestore(&lockres->l_lock, flags); 1099 1100 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1101} 1102 1103/* Grants us an EX lock on the data and metadata resources, skipping 1104 * the normal cluster directory lookup. Use this ONLY on newly created 1105 * inodes which other nodes can't possibly see, and which haven't been 1106 * hashed in the inode hash yet. This can give us a good performance 1107 * increase as it'll skip the network broadcast normally associated 1108 * with creating a new lock resource. */ 1109int ocfs2_create_new_inode_locks(struct inode *inode) 1110{ 1111 int ret; 1112 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1113 1114 BUG_ON(!inode); 1115 BUG_ON(!ocfs2_inode_is_new(inode)); 1116 1117 mlog_entry_void(); 1118 1119 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1120 1121 /* NOTE: That we don't increment any of the holder counts, nor 1122 * do we add anything to a journal handle. Since this is 1123 * supposed to be a new inode which the cluster doesn't know 1124 * about yet, there is no need to. As far as the LVB handling 1125 * is concerned, this is basically like acquiring an EX lock 1126 * on a resource which has an invalid one -- we'll set it 1127 * valid when we release the EX. */ 1128 1129 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1130 if (ret) { 1131 mlog_errno(ret); 1132 goto bail; 1133 } 1134 1135 /* 1136 * We don't want to use LKM_LOCAL on a meta data lock as they 1137 * don't use a generation in their lock names. 1138 */ 1139 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0); 1140 if (ret) { 1141 mlog_errno(ret); 1142 goto bail; 1143 } 1144 1145 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1); 1146 if (ret) { 1147 mlog_errno(ret); 1148 goto bail; 1149 } 1150 1151bail: 1152 mlog_exit(ret); 1153 return ret; 1154} 1155 1156int ocfs2_rw_lock(struct inode *inode, int write) 1157{ 1158 int status, level; 1159 struct ocfs2_lock_res *lockres; 1160 1161 BUG_ON(!inode); 1162 1163 mlog_entry_void(); 1164 1165 mlog(0, "inode %llu take %s RW lock\n", 1166 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1167 write ? "EXMODE" : "PRMODE"); 1168 1169 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1170 1171 level = write ? LKM_EXMODE : LKM_PRMODE; 1172 1173 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1174 0); 1175 if (status < 0) 1176 mlog_errno(status); 1177 1178 mlog_exit(status); 1179 return status; 1180} 1181 1182void ocfs2_rw_unlock(struct inode *inode, int write) 1183{ 1184 int level = write ? LKM_EXMODE : LKM_PRMODE; 1185 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1186 1187 mlog_entry_void(); 1188 1189 mlog(0, "inode %llu drop %s RW lock\n", 1190 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1191 write ? "EXMODE" : "PRMODE"); 1192 1193 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1194 1195 mlog_exit_void(); 1196} 1197 1198int ocfs2_data_lock_full(struct inode *inode, 1199 int write, 1200 int arg_flags) 1201{ 1202 int status = 0, level; 1203 struct ocfs2_lock_res *lockres; 1204 1205 BUG_ON(!inode); 1206 1207 mlog_entry_void(); 1208 1209 mlog(0, "inode %llu take %s DATA lock\n", 1210 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1211 write ? "EXMODE" : "PRMODE"); 1212 1213 /* We'll allow faking a readonly data lock for 1214 * rodevices. */ 1215 if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) { 1216 if (write) { 1217 status = -EROFS; 1218 mlog_errno(status); 1219 } 1220 goto out; 1221 } 1222 1223 lockres = &OCFS2_I(inode)->ip_data_lockres; 1224 1225 level = write ? LKM_EXMODE : LKM_PRMODE; 1226 1227 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 1228 0, arg_flags); 1229 if (status < 0 && status != -EAGAIN) 1230 mlog_errno(status); 1231 1232out: 1233 mlog_exit(status); 1234 return status; 1235} 1236 1237/* see ocfs2_meta_lock_with_page() */ 1238int ocfs2_data_lock_with_page(struct inode *inode, 1239 int write, 1240 struct page *page) 1241{ 1242 int ret; 1243 1244 ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK); 1245 if (ret == -EAGAIN) { 1246 unlock_page(page); 1247 if (ocfs2_data_lock(inode, write) == 0) 1248 ocfs2_data_unlock(inode, write); 1249 ret = AOP_TRUNCATED_PAGE; 1250 } 1251 1252 return ret; 1253} 1254 1255static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, 1256 struct ocfs2_lock_res *lockres) 1257{ 1258 int kick = 0; 1259 1260 mlog_entry_void(); 1261 1262 /* If we know that another node is waiting on our lock, kick 1263 * the vote thread * pre-emptively when we reach a release 1264 * condition. */ 1265 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1266 switch(lockres->l_blocking) { 1267 case LKM_EXMODE: 1268 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1269 kick = 1; 1270 break; 1271 case LKM_PRMODE: 1272 if (!lockres->l_ex_holders) 1273 kick = 1; 1274 break; 1275 default: 1276 BUG(); 1277 } 1278 } 1279 1280 if (kick) 1281 ocfs2_kick_vote_thread(osb); 1282 1283 mlog_exit_void(); 1284} 1285 1286void ocfs2_data_unlock(struct inode *inode, 1287 int write) 1288{ 1289 int level = write ? LKM_EXMODE : LKM_PRMODE; 1290 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres; 1291 1292 mlog_entry_void(); 1293 1294 mlog(0, "inode %llu drop %s DATA lock\n", 1295 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1296 write ? "EXMODE" : "PRMODE"); 1297 1298 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) 1299 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1300 1301 mlog_exit_void(); 1302} 1303 1304#define OCFS2_SEC_BITS 34 1305#define OCFS2_SEC_SHIFT (64 - 34) 1306#define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1307 1308/* LVB only has room for 64 bits of time here so we pack it for 1309 * now. */ 1310static u64 ocfs2_pack_timespec(struct timespec *spec) 1311{ 1312 u64 res; 1313 u64 sec = spec->tv_sec; 1314 u32 nsec = spec->tv_nsec; 1315 1316 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1317 1318 return res; 1319} 1320 1321/* Call this with the lockres locked. I am reasonably sure we don't 1322 * need ip_lock in this function as anyone who would be changing those 1323 * values is supposed to be blocked in ocfs2_meta_lock right now. */ 1324static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1325{ 1326 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1327 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres; 1328 struct ocfs2_meta_lvb *lvb; 1329 1330 mlog_entry_void(); 1331 1332 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1333 1334 /* 1335 * Invalidate the LVB of a deleted inode - this way other 1336 * nodes are forced to go to disk and discover the new inode 1337 * status. 1338 */ 1339 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1340 lvb->lvb_version = 0; 1341 goto out; 1342 } 1343 1344 lvb->lvb_version = OCFS2_LVB_VERSION; 1345 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1346 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1347 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1348 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1349 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1350 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1351 lvb->lvb_iatime_packed = 1352 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1353 lvb->lvb_ictime_packed = 1354 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1355 lvb->lvb_imtime_packed = 1356 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1357 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1358 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1359 1360out: 1361 mlog_meta_lvb(0, lockres); 1362 1363 mlog_exit_void(); 1364} 1365 1366static void ocfs2_unpack_timespec(struct timespec *spec, 1367 u64 packed_time) 1368{ 1369 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1370 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1371} 1372 1373static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1374{ 1375 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1376 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres; 1377 struct ocfs2_meta_lvb *lvb; 1378 1379 mlog_entry_void(); 1380 1381 mlog_meta_lvb(0, lockres); 1382 1383 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1384 1385 /* We're safe here without the lockres lock... */ 1386 spin_lock(&oi->ip_lock); 1387 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1388 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1389 1390 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1391 ocfs2_set_inode_flags(inode); 1392 1393 /* fast-symlinks are a special case */ 1394 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1395 inode->i_blocks = 0; 1396 else 1397 inode->i_blocks = 1398 ocfs2_align_bytes_to_sectors(i_size_read(inode)); 1399 1400 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1401 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1402 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1403 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1404 ocfs2_unpack_timespec(&inode->i_atime, 1405 be64_to_cpu(lvb->lvb_iatime_packed)); 1406 ocfs2_unpack_timespec(&inode->i_mtime, 1407 be64_to_cpu(lvb->lvb_imtime_packed)); 1408 ocfs2_unpack_timespec(&inode->i_ctime, 1409 be64_to_cpu(lvb->lvb_ictime_packed)); 1410 spin_unlock(&oi->ip_lock); 1411 1412 mlog_exit_void(); 1413} 1414 1415static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1416 struct ocfs2_lock_res *lockres) 1417{ 1418 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1419 1420 if (lvb->lvb_version == OCFS2_LVB_VERSION 1421 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1422 return 1; 1423 return 0; 1424} 1425 1426/* Determine whether a lock resource needs to be refreshed, and 1427 * arbitrate who gets to refresh it. 1428 * 1429 * 0 means no refresh needed. 1430 * 1431 * > 0 means you need to refresh this and you MUST call 1432 * ocfs2_complete_lock_res_refresh afterwards. */ 1433static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1434{ 1435 unsigned long flags; 1436 int status = 0; 1437 1438 mlog_entry_void(); 1439 1440refresh_check: 1441 spin_lock_irqsave(&lockres->l_lock, flags); 1442 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1443 spin_unlock_irqrestore(&lockres->l_lock, flags); 1444 goto bail; 1445 } 1446 1447 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 1448 spin_unlock_irqrestore(&lockres->l_lock, flags); 1449 1450 ocfs2_wait_on_refreshing_lock(lockres); 1451 goto refresh_check; 1452 } 1453 1454 /* Ok, I'll be the one to refresh this lock. */ 1455 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 1456 spin_unlock_irqrestore(&lockres->l_lock, flags); 1457 1458 status = 1; 1459bail: 1460 mlog_exit(status); 1461 return status; 1462} 1463 1464/* If status is non zero, I'll mark it as not being in refresh 1465 * anymroe, but i won't clear the needs refresh flag. */ 1466static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 1467 int status) 1468{ 1469 unsigned long flags; 1470 mlog_entry_void(); 1471 1472 spin_lock_irqsave(&lockres->l_lock, flags); 1473 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 1474 if (!status) 1475 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 1476 spin_unlock_irqrestore(&lockres->l_lock, flags); 1477 1478 wake_up(&lockres->l_event); 1479 1480 mlog_exit_void(); 1481} 1482 1483/* may or may not return a bh if it went to disk. */ 1484static int ocfs2_meta_lock_update(struct inode *inode, 1485 struct buffer_head **bh) 1486{ 1487 int status = 0; 1488 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1489 struct ocfs2_lock_res *lockres; 1490 struct ocfs2_dinode *fe; 1491 1492 mlog_entry_void(); 1493 1494 spin_lock(&oi->ip_lock); 1495 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1496 mlog(0, "Orphaned inode %llu was deleted while we " 1497 "were waiting on a lock. ip_flags = 0x%x\n", 1498 (unsigned long long)oi->ip_blkno, oi->ip_flags); 1499 spin_unlock(&oi->ip_lock); 1500 status = -ENOENT; 1501 goto bail; 1502 } 1503 spin_unlock(&oi->ip_lock); 1504 1505 lockres = &oi->ip_meta_lockres; 1506 1507 if (!ocfs2_should_refresh_lock_res(lockres)) 1508 goto bail; 1509 1510 /* This will discard any caching information we might have had 1511 * for the inode metadata. */ 1512 ocfs2_metadata_cache_purge(inode); 1513 1514 /* will do nothing for inode types that don't use the extent 1515 * map (directories, bitmap files, etc) */ 1516 ocfs2_extent_map_trunc(inode, 0); 1517 1518 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 1519 mlog(0, "Trusting LVB on inode %llu\n", 1520 (unsigned long long)oi->ip_blkno); 1521 ocfs2_refresh_inode_from_lvb(inode); 1522 } else { 1523 /* Boo, we have to go to disk. */ 1524 /* read bh, cast, ocfs2_refresh_inode */ 1525 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno, 1526 bh, OCFS2_BH_CACHED, inode); 1527 if (status < 0) { 1528 mlog_errno(status); 1529 goto bail_refresh; 1530 } 1531 fe = (struct ocfs2_dinode *) (*bh)->b_data; 1532 1533 /* This is a good chance to make sure we're not 1534 * locking an invalid object. 1535 * 1536 * We bug on a stale inode here because we checked 1537 * above whether it was wiped from disk. The wiping 1538 * node provides a guarantee that we receive that 1539 * message and can mark the inode before dropping any 1540 * locks associated with it. */ 1541 if (!OCFS2_IS_VALID_DINODE(fe)) { 1542 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe); 1543 status = -EIO; 1544 goto bail_refresh; 1545 } 1546 mlog_bug_on_msg(inode->i_generation != 1547 le32_to_cpu(fe->i_generation), 1548 "Invalid dinode %llu disk generation: %u " 1549 "inode->i_generation: %u\n", 1550 (unsigned long long)oi->ip_blkno, 1551 le32_to_cpu(fe->i_generation), 1552 inode->i_generation); 1553 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 1554 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 1555 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 1556 (unsigned long long)oi->ip_blkno, 1557 (unsigned long long)le64_to_cpu(fe->i_dtime), 1558 le32_to_cpu(fe->i_flags)); 1559 1560 ocfs2_refresh_inode(inode, fe); 1561 } 1562 1563 status = 0; 1564bail_refresh: 1565 ocfs2_complete_lock_res_refresh(lockres, status); 1566bail: 1567 mlog_exit(status); 1568 return status; 1569} 1570 1571static int ocfs2_assign_bh(struct inode *inode, 1572 struct buffer_head **ret_bh, 1573 struct buffer_head *passed_bh) 1574{ 1575 int status; 1576 1577 if (passed_bh) { 1578 /* Ok, the update went to disk for us, use the 1579 * returned bh. */ 1580 *ret_bh = passed_bh; 1581 get_bh(*ret_bh); 1582 1583 return 0; 1584 } 1585 1586 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), 1587 OCFS2_I(inode)->ip_blkno, 1588 ret_bh, 1589 OCFS2_BH_CACHED, 1590 inode); 1591 if (status < 0) 1592 mlog_errno(status); 1593 1594 return status; 1595} 1596 1597/* 1598 * returns < 0 error if the callback will never be called, otherwise 1599 * the result of the lock will be communicated via the callback. 1600 */ 1601int ocfs2_meta_lock_full(struct inode *inode, 1602 struct ocfs2_journal_handle *handle, 1603 struct buffer_head **ret_bh, 1604 int ex, 1605 int arg_flags) 1606{ 1607 int status, level, dlm_flags, acquired; 1608 struct ocfs2_lock_res *lockres; 1609 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1610 struct buffer_head *local_bh = NULL; 1611 1612 BUG_ON(!inode); 1613 1614 mlog_entry_void(); 1615 1616 mlog(0, "inode %llu, take %s META lock\n", 1617 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1618 ex ? "EXMODE" : "PRMODE"); 1619 1620 status = 0; 1621 acquired = 0; 1622 /* We'll allow faking a readonly metadata lock for 1623 * rodevices. */ 1624 if (ocfs2_is_hard_readonly(osb)) { 1625 if (ex) 1626 status = -EROFS; 1627 goto bail; 1628 } 1629 1630 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1631 wait_event(osb->recovery_event, 1632 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1633 1634 acquired = 0; 1635 lockres = &OCFS2_I(inode)->ip_meta_lockres; 1636 level = ex ? LKM_EXMODE : LKM_PRMODE; 1637 dlm_flags = 0; 1638 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 1639 dlm_flags |= LKM_NOQUEUE; 1640 1641 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 1642 if (status < 0) { 1643 if (status != -EAGAIN && status != -EIOCBRETRY) 1644 mlog_errno(status); 1645 goto bail; 1646 } 1647 1648 /* Notify the error cleanup path to drop the cluster lock. */ 1649 acquired = 1; 1650 1651 /* We wait twice because a node may have died while we were in 1652 * the lower dlm layers. The second time though, we've 1653 * committed to owning this lock so we don't allow signals to 1654 * abort the operation. */ 1655 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1656 wait_event(osb->recovery_event, 1657 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1658 1659 /* 1660 * We only see this flag if we're being called from 1661 * ocfs2_read_locked_inode(). It means we're locking an inode 1662 * which hasn't been populated yet, so clear the refresh flag 1663 * and let the caller handle it. 1664 */ 1665 if (inode->i_state & I_NEW) { 1666 status = 0; 1667 ocfs2_complete_lock_res_refresh(lockres, 0); 1668 goto bail; 1669 } 1670 1671 /* This is fun. The caller may want a bh back, or it may 1672 * not. ocfs2_meta_lock_update definitely wants one in, but 1673 * may or may not read one, depending on what's in the 1674 * LVB. The result of all of this is that we've *only* gone to 1675 * disk if we have to, so the complexity is worthwhile. */ 1676 status = ocfs2_meta_lock_update(inode, &local_bh); 1677 if (status < 0) { 1678 if (status != -ENOENT) 1679 mlog_errno(status); 1680 goto bail; 1681 } 1682 1683 if (ret_bh) { 1684 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 1685 if (status < 0) { 1686 mlog_errno(status); 1687 goto bail; 1688 } 1689 } 1690 1691 if (handle) { 1692 status = ocfs2_handle_add_lock(handle, inode); 1693 if (status < 0) 1694 mlog_errno(status); 1695 } 1696 1697bail: 1698 if (status < 0) { 1699 if (ret_bh && (*ret_bh)) { 1700 brelse(*ret_bh); 1701 *ret_bh = NULL; 1702 } 1703 if (acquired) 1704 ocfs2_meta_unlock(inode, ex); 1705 } 1706 1707 if (local_bh) 1708 brelse(local_bh); 1709 1710 mlog_exit(status); 1711 return status; 1712} 1713 1714/* 1715 * This is working around a lock inversion between tasks acquiring DLM locks 1716 * while holding a page lock and the vote thread which blocks dlm lock acquiry 1717 * while acquiring page locks. 1718 * 1719 * ** These _with_page variantes are only intended to be called from aop 1720 * methods that hold page locks and return a very specific *positive* error 1721 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 1722 * 1723 * The DLM is called such that it returns -EAGAIN if it would have blocked 1724 * waiting for the vote thread. In that case we unlock our page so the vote 1725 * thread can make progress. Once we've done this we have to return 1726 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up 1727 * into the VFS who will then immediately retry the aop call. 1728 * 1729 * We do a blocking lock and immediate unlock before returning, though, so that 1730 * the lock has a great chance of being cached on this node by the time the VFS 1731 * calls back to retry the aop. This has a potential to livelock as nodes 1732 * ping locks back and forth, but that's a risk we're willing to take to avoid 1733 * the lock inversion simply. 1734 */ 1735int ocfs2_meta_lock_with_page(struct inode *inode, 1736 struct ocfs2_journal_handle *handle, 1737 struct buffer_head **ret_bh, 1738 int ex, 1739 struct page *page) 1740{ 1741 int ret; 1742 1743 ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex, 1744 OCFS2_LOCK_NONBLOCK); 1745 if (ret == -EAGAIN) { 1746 unlock_page(page); 1747 if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0) 1748 ocfs2_meta_unlock(inode, ex); 1749 ret = AOP_TRUNCATED_PAGE; 1750 } 1751 1752 return ret; 1753} 1754 1755void ocfs2_meta_unlock(struct inode *inode, 1756 int ex) 1757{ 1758 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1759 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres; 1760 1761 mlog_entry_void(); 1762 1763 mlog(0, "inode %llu drop %s META lock\n", 1764 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1765 ex ? "EXMODE" : "PRMODE"); 1766 1767 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) 1768 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1769 1770 mlog_exit_void(); 1771} 1772 1773int ocfs2_super_lock(struct ocfs2_super *osb, 1774 int ex) 1775{ 1776 int status; 1777 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1778 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 1779 struct buffer_head *bh; 1780 struct ocfs2_slot_info *si = osb->slot_info; 1781 1782 mlog_entry_void(); 1783 1784 if (ocfs2_is_hard_readonly(osb)) 1785 return -EROFS; 1786 1787 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 1788 if (status < 0) { 1789 mlog_errno(status); 1790 goto bail; 1791 } 1792 1793 /* The super block lock path is really in the best position to 1794 * know when resources covered by the lock need to be 1795 * refreshed, so we do it here. Of course, making sense of 1796 * everything is up to the caller :) */ 1797 status = ocfs2_should_refresh_lock_res(lockres); 1798 if (status < 0) { 1799 mlog_errno(status); 1800 goto bail; 1801 } 1802 if (status) { 1803 bh = si->si_bh; 1804 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0, 1805 si->si_inode); 1806 if (status == 0) 1807 ocfs2_update_slot_info(si); 1808 1809 ocfs2_complete_lock_res_refresh(lockres, status); 1810 1811 if (status < 0) 1812 mlog_errno(status); 1813 } 1814bail: 1815 mlog_exit(status); 1816 return status; 1817} 1818 1819void ocfs2_super_unlock(struct ocfs2_super *osb, 1820 int ex) 1821{ 1822 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1823 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 1824 1825 ocfs2_cluster_unlock(osb, lockres, level); 1826} 1827 1828int ocfs2_rename_lock(struct ocfs2_super *osb) 1829{ 1830 int status; 1831 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 1832 1833 if (ocfs2_is_hard_readonly(osb)) 1834 return -EROFS; 1835 1836 status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0); 1837 if (status < 0) 1838 mlog_errno(status); 1839 1840 return status; 1841} 1842 1843void ocfs2_rename_unlock(struct ocfs2_super *osb) 1844{ 1845 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 1846 1847 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE); 1848} 1849 1850int ocfs2_dentry_lock(struct dentry *dentry, int ex) 1851{ 1852 int ret; 1853 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1854 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 1855 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 1856 1857 BUG_ON(!dl); 1858 1859 if (ocfs2_is_hard_readonly(osb)) 1860 return -EROFS; 1861 1862 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 1863 if (ret < 0) 1864 mlog_errno(ret); 1865 1866 return ret; 1867} 1868 1869void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 1870{ 1871 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1872 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 1873 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 1874 1875 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 1876} 1877 1878/* Reference counting of the dlm debug structure. We want this because 1879 * open references on the debug inodes can live on after a mount, so 1880 * we can't rely on the ocfs2_super to always exist. */ 1881static void ocfs2_dlm_debug_free(struct kref *kref) 1882{ 1883 struct ocfs2_dlm_debug *dlm_debug; 1884 1885 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 1886 1887 kfree(dlm_debug); 1888} 1889 1890void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 1891{ 1892 if (dlm_debug) 1893 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 1894} 1895 1896static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 1897{ 1898 kref_get(&debug->d_refcnt); 1899} 1900 1901struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 1902{ 1903 struct ocfs2_dlm_debug *dlm_debug; 1904 1905 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 1906 if (!dlm_debug) { 1907 mlog_errno(-ENOMEM); 1908 goto out; 1909 } 1910 1911 kref_init(&dlm_debug->d_refcnt); 1912 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 1913 dlm_debug->d_locking_state = NULL; 1914out: 1915 return dlm_debug; 1916} 1917 1918/* Access to this is arbitrated for us via seq_file->sem. */ 1919struct ocfs2_dlm_seq_priv { 1920 struct ocfs2_dlm_debug *p_dlm_debug; 1921 struct ocfs2_lock_res p_iter_res; 1922 struct ocfs2_lock_res p_tmp_res; 1923}; 1924 1925static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 1926 struct ocfs2_dlm_seq_priv *priv) 1927{ 1928 struct ocfs2_lock_res *iter, *ret = NULL; 1929 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 1930 1931 assert_spin_locked(&ocfs2_dlm_tracking_lock); 1932 1933 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 1934 /* discover the head of the list */ 1935 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 1936 mlog(0, "End of list found, %p\n", ret); 1937 break; 1938 } 1939 1940 /* We track our "dummy" iteration lockres' by a NULL 1941 * l_ops field. */ 1942 if (iter->l_ops != NULL) { 1943 ret = iter; 1944 break; 1945 } 1946 } 1947 1948 return ret; 1949} 1950 1951static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 1952{ 1953 struct ocfs2_dlm_seq_priv *priv = m->private; 1954 struct ocfs2_lock_res *iter; 1955 1956 spin_lock(&ocfs2_dlm_tracking_lock); 1957 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 1958 if (iter) { 1959 /* Since lockres' have the lifetime of their container 1960 * (which can be inodes, ocfs2_supers, etc) we want to 1961 * copy this out to a temporary lockres while still 1962 * under the spinlock. Obviously after this we can't 1963 * trust any pointers on the copy returned, but that's 1964 * ok as the information we want isn't typically held 1965 * in them. */ 1966 priv->p_tmp_res = *iter; 1967 iter = &priv->p_tmp_res; 1968 } 1969 spin_unlock(&ocfs2_dlm_tracking_lock); 1970 1971 return iter; 1972} 1973 1974static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 1975{ 1976} 1977 1978static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 1979{ 1980 struct ocfs2_dlm_seq_priv *priv = m->private; 1981 struct ocfs2_lock_res *iter = v; 1982 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 1983 1984 spin_lock(&ocfs2_dlm_tracking_lock); 1985 iter = ocfs2_dlm_next_res(iter, priv); 1986 list_del_init(&dummy->l_debug_list); 1987 if (iter) { 1988 list_add(&dummy->l_debug_list, &iter->l_debug_list); 1989 priv->p_tmp_res = *iter; 1990 iter = &priv->p_tmp_res; 1991 } 1992 spin_unlock(&ocfs2_dlm_tracking_lock); 1993 1994 return iter; 1995} 1996 1997/* So that debugfs.ocfs2 can determine which format is being used */ 1998#define OCFS2_DLM_DEBUG_STR_VERSION 1 1999static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2000{ 2001 int i; 2002 char *lvb; 2003 struct ocfs2_lock_res *lockres = v; 2004 2005 if (!lockres) 2006 return -EINVAL; 2007 2008 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2009 2010 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2011 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2012 lockres->l_name, 2013 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2014 else 2015 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2016 2017 seq_printf(m, "%d\t" 2018 "0x%lx\t" 2019 "0x%x\t" 2020 "0x%x\t" 2021 "%u\t" 2022 "%u\t" 2023 "%d\t" 2024 "%d\t", 2025 lockres->l_level, 2026 lockres->l_flags, 2027 lockres->l_action, 2028 lockres->l_unlock_action, 2029 lockres->l_ro_holders, 2030 lockres->l_ex_holders, 2031 lockres->l_requested, 2032 lockres->l_blocking); 2033 2034 /* Dump the raw LVB */ 2035 lvb = lockres->l_lksb.lvb; 2036 for(i = 0; i < DLM_LVB_LEN; i++) 2037 seq_printf(m, "0x%x\t", lvb[i]); 2038 2039 /* End the line */ 2040 seq_printf(m, "\n"); 2041 return 0; 2042} 2043 2044static struct seq_operations ocfs2_dlm_seq_ops = { 2045 .start = ocfs2_dlm_seq_start, 2046 .stop = ocfs2_dlm_seq_stop, 2047 .next = ocfs2_dlm_seq_next, 2048 .show = ocfs2_dlm_seq_show, 2049}; 2050 2051static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2052{ 2053 struct seq_file *seq = (struct seq_file *) file->private_data; 2054 struct ocfs2_dlm_seq_priv *priv = seq->private; 2055 struct ocfs2_lock_res *res = &priv->p_iter_res; 2056 2057 ocfs2_remove_lockres_tracking(res); 2058 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2059 return seq_release_private(inode, file); 2060} 2061 2062static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2063{ 2064 int ret; 2065 struct ocfs2_dlm_seq_priv *priv; 2066 struct seq_file *seq; 2067 struct ocfs2_super *osb; 2068 2069 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2070 if (!priv) { 2071 ret = -ENOMEM; 2072 mlog_errno(ret); 2073 goto out; 2074 } 2075 osb = (struct ocfs2_super *) inode->u.generic_ip; 2076 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2077 priv->p_dlm_debug = osb->osb_dlm_debug; 2078 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2079 2080 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2081 if (ret) { 2082 kfree(priv); 2083 mlog_errno(ret); 2084 goto out; 2085 } 2086 2087 seq = (struct seq_file *) file->private_data; 2088 seq->private = priv; 2089 2090 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2091 priv->p_dlm_debug); 2092 2093out: 2094 return ret; 2095} 2096 2097static const struct file_operations ocfs2_dlm_debug_fops = { 2098 .open = ocfs2_dlm_debug_open, 2099 .release = ocfs2_dlm_debug_release, 2100 .read = seq_read, 2101 .llseek = seq_lseek, 2102}; 2103 2104static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2105{ 2106 int ret = 0; 2107 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2108 2109 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2110 S_IFREG|S_IRUSR, 2111 osb->osb_debug_root, 2112 osb, 2113 &ocfs2_dlm_debug_fops); 2114 if (!dlm_debug->d_locking_state) { 2115 ret = -EINVAL; 2116 mlog(ML_ERROR, 2117 "Unable to create locking state debugfs file.\n"); 2118 goto out; 2119 } 2120 2121 ocfs2_get_dlm_debug(dlm_debug); 2122out: 2123 return ret; 2124} 2125 2126static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2127{ 2128 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2129 2130 if (dlm_debug) { 2131 debugfs_remove(dlm_debug->d_locking_state); 2132 ocfs2_put_dlm_debug(dlm_debug); 2133 } 2134} 2135 2136int ocfs2_dlm_init(struct ocfs2_super *osb) 2137{ 2138 int status; 2139 u32 dlm_key; 2140 struct dlm_ctxt *dlm; 2141 2142 mlog_entry_void(); 2143 2144 status = ocfs2_dlm_init_debug(osb); 2145 if (status < 0) { 2146 mlog_errno(status); 2147 goto bail; 2148 } 2149 2150 /* launch vote thread */ 2151 osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote"); 2152 if (IS_ERR(osb->vote_task)) { 2153 status = PTR_ERR(osb->vote_task); 2154 osb->vote_task = NULL; 2155 mlog_errno(status); 2156 goto bail; 2157 } 2158 2159 /* used by the dlm code to make message headers unique, each 2160 * node in this domain must agree on this. */ 2161 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str)); 2162 2163 /* for now, uuid == domain */ 2164 dlm = dlm_register_domain(osb->uuid_str, dlm_key); 2165 if (IS_ERR(dlm)) { 2166 status = PTR_ERR(dlm); 2167 mlog_errno(status); 2168 goto bail; 2169 } 2170 2171 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2172 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2173 2174 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb); 2175 2176 osb->dlm = dlm; 2177 2178 status = 0; 2179bail: 2180 if (status < 0) { 2181 ocfs2_dlm_shutdown_debug(osb); 2182 if (osb->vote_task) 2183 kthread_stop(osb->vote_task); 2184 } 2185 2186 mlog_exit(status); 2187 return status; 2188} 2189 2190void ocfs2_dlm_shutdown(struct ocfs2_super *osb) 2191{ 2192 mlog_entry_void(); 2193 2194 dlm_unregister_eviction_cb(&osb->osb_eviction_cb); 2195 2196 ocfs2_drop_osb_locks(osb); 2197 2198 if (osb->vote_task) { 2199 kthread_stop(osb->vote_task); 2200 osb->vote_task = NULL; 2201 } 2202 2203 ocfs2_lock_res_free(&osb->osb_super_lockres); 2204 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2205 2206 dlm_unregister_domain(osb->dlm); 2207 osb->dlm = NULL; 2208 2209 ocfs2_dlm_shutdown_debug(osb); 2210 2211 mlog_exit_void(); 2212} 2213 2214static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) 2215{ 2216 struct ocfs2_lock_res *lockres = opaque; 2217 unsigned long flags; 2218 2219 mlog_entry_void(); 2220 2221 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2222 lockres->l_unlock_action); 2223 2224 spin_lock_irqsave(&lockres->l_lock, flags); 2225 /* We tried to cancel a convert request, but it was already 2226 * granted. All we want to do here is clear our unlock 2227 * state. The wake_up call done at the bottom is redundant 2228 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't 2229 * hurt anything anyway */ 2230 if (status == DLM_CANCELGRANT && 2231 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2232 mlog(0, "Got cancelgrant for %s\n", lockres->l_name); 2233 2234 /* We don't clear the busy flag in this case as it 2235 * should have been cleared by the ast which the dlm 2236 * has called. */ 2237 goto complete_unlock; 2238 } 2239 2240 if (status != DLM_NORMAL) { 2241 mlog(ML_ERROR, "Dlm passes status %d for lock %s, " 2242 "unlock_action %d\n", status, lockres->l_name, 2243 lockres->l_unlock_action); 2244 spin_unlock_irqrestore(&lockres->l_lock, flags); 2245 return; 2246 } 2247 2248 switch(lockres->l_unlock_action) { 2249 case OCFS2_UNLOCK_CANCEL_CONVERT: 2250 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2251 lockres->l_action = OCFS2_AST_INVALID; 2252 break; 2253 case OCFS2_UNLOCK_DROP_LOCK: 2254 lockres->l_level = LKM_IVMODE; 2255 break; 2256 default: 2257 BUG(); 2258 } 2259 2260 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2261complete_unlock: 2262 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2263 spin_unlock_irqrestore(&lockres->l_lock, flags); 2264 2265 wake_up(&lockres->l_event); 2266 2267 mlog_exit_void(); 2268} 2269 2270typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *); 2271 2272struct drop_lock_cb { 2273 ocfs2_pre_drop_cb_t *drop_func; 2274 void *drop_data; 2275}; 2276 2277static int ocfs2_drop_lock(struct ocfs2_super *osb, 2278 struct ocfs2_lock_res *lockres, 2279 struct drop_lock_cb *dcb) 2280{ 2281 enum dlm_status status; 2282 unsigned long flags; 2283 2284 /* We didn't get anywhere near actually using this lockres. */ 2285 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2286 goto out; 2287 2288 spin_lock_irqsave(&lockres->l_lock, flags); 2289 2290 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2291 "lockres %s, flags 0x%lx\n", 2292 lockres->l_name, lockres->l_flags); 2293 2294 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2295 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2296 "%u, unlock_action = %u\n", 2297 lockres->l_name, lockres->l_flags, lockres->l_action, 2298 lockres->l_unlock_action); 2299 2300 spin_unlock_irqrestore(&lockres->l_lock, flags); 2301 2302 /* XXX: Today we just wait on any busy 2303 * locks... Perhaps we need to cancel converts in the 2304 * future? */ 2305 ocfs2_wait_on_busy_lock(lockres); 2306 2307 spin_lock_irqsave(&lockres->l_lock, flags); 2308 } 2309 2310 if (dcb) 2311 dcb->drop_func(lockres, dcb->drop_data); 2312 2313 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2314 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2315 lockres->l_name); 2316 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2317 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2318 2319 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2320 spin_unlock_irqrestore(&lockres->l_lock, flags); 2321 goto out; 2322 } 2323 2324 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2325 2326 /* make sure we never get here while waiting for an ast to 2327 * fire. */ 2328 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2329 2330 /* is this necessary? */ 2331 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2332 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2333 spin_unlock_irqrestore(&lockres->l_lock, flags); 2334 2335 mlog(0, "lock %s\n", lockres->l_name); 2336 2337 status = dlmunlock(osb->dlm, &lockres->l_lksb, LKM_VALBLK, 2338 ocfs2_unlock_ast, lockres); 2339 if (status != DLM_NORMAL) { 2340 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2341 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2342 dlm_print_one_lock(lockres->l_lksb.lockid); 2343 BUG(); 2344 } 2345 mlog(0, "lock %s, successfull return from dlmunlock\n", 2346 lockres->l_name); 2347 2348 ocfs2_wait_on_busy_lock(lockres); 2349out: 2350 mlog_exit(0); 2351 return 0; 2352} 2353 2354/* Mark the lockres as being dropped. It will no longer be 2355 * queued if blocking, but we still may have to wait on it 2356 * being dequeued from the vote thread before we can consider 2357 * it safe to drop. 2358 * 2359 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2360void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 2361{ 2362 int status; 2363 struct ocfs2_mask_waiter mw; 2364 unsigned long flags; 2365 2366 ocfs2_init_mask_waiter(&mw); 2367 2368 spin_lock_irqsave(&lockres->l_lock, flags); 2369 lockres->l_flags |= OCFS2_LOCK_FREEING; 2370 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 2371 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 2372 spin_unlock_irqrestore(&lockres->l_lock, flags); 2373 2374 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 2375 2376 status = ocfs2_wait_for_mask(&mw); 2377 if (status) 2378 mlog_errno(status); 2379 2380 spin_lock_irqsave(&lockres->l_lock, flags); 2381 } 2382 spin_unlock_irqrestore(&lockres->l_lock, flags); 2383} 2384 2385void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 2386 struct ocfs2_lock_res *lockres) 2387{ 2388 int ret; 2389 2390 ocfs2_mark_lockres_freeing(lockres); 2391 ret = ocfs2_drop_lock(osb, lockres, NULL); 2392 if (ret) 2393 mlog_errno(ret); 2394} 2395 2396static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2397{ 2398 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 2399 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 2400} 2401 2402static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data) 2403{ 2404 struct inode *inode = data; 2405 2406 /* the metadata lock requires a bit more work as we have an 2407 * LVB to worry about. */ 2408 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2409 lockres->l_level == LKM_EXMODE && 2410 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2411 __ocfs2_stuff_meta_lvb(inode); 2412} 2413 2414int ocfs2_drop_inode_locks(struct inode *inode) 2415{ 2416 int status, err; 2417 struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, }; 2418 2419 mlog_entry_void(); 2420 2421 /* No need to call ocfs2_mark_lockres_freeing here - 2422 * ocfs2_clear_inode has done it for us. */ 2423 2424 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2425 &OCFS2_I(inode)->ip_data_lockres, 2426 NULL); 2427 if (err < 0) 2428 mlog_errno(err); 2429 2430 status = err; 2431 2432 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2433 &OCFS2_I(inode)->ip_meta_lockres, 2434 &meta_dcb); 2435 if (err < 0) 2436 mlog_errno(err); 2437 if (err < 0 && !status) 2438 status = err; 2439 2440 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2441 &OCFS2_I(inode)->ip_rw_lockres, 2442 NULL); 2443 if (err < 0) 2444 mlog_errno(err); 2445 if (err < 0 && !status) 2446 status = err; 2447 2448 mlog_exit(status); 2449 return status; 2450} 2451 2452static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2453 int new_level) 2454{ 2455 assert_spin_locked(&lockres->l_lock); 2456 2457 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 2458 2459 if (lockres->l_level <= new_level) { 2460 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n", 2461 lockres->l_level, new_level); 2462 BUG(); 2463 } 2464 2465 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 2466 lockres->l_name, new_level, lockres->l_blocking); 2467 2468 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2469 lockres->l_requested = new_level; 2470 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2471} 2472 2473static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2474 struct ocfs2_lock_res *lockres, 2475 int new_level, 2476 int lvb) 2477{ 2478 int ret, dlm_flags = LKM_CONVERT; 2479 enum dlm_status status; 2480 2481 mlog_entry_void(); 2482 2483 if (lvb) 2484 dlm_flags |= LKM_VALBLK; 2485 2486 status = dlmlock(osb->dlm, 2487 new_level, 2488 &lockres->l_lksb, 2489 dlm_flags, 2490 lockres->l_name, 2491 OCFS2_LOCK_ID_MAX_LEN - 1, 2492 ocfs2_locking_ast, 2493 lockres, 2494 lockres->l_ops->bast); 2495 if (status != DLM_NORMAL) { 2496 ocfs2_log_dlm_error("dlmlock", status, lockres); 2497 ret = -EINVAL; 2498 ocfs2_recover_from_dlm_error(lockres, 1); 2499 goto bail; 2500 } 2501 2502 ret = 0; 2503bail: 2504 mlog_exit(ret); 2505 return ret; 2506} 2507 2508/* returns 1 when the caller should unlock and call dlmunlock */ 2509static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 2510 struct ocfs2_lock_res *lockres) 2511{ 2512 assert_spin_locked(&lockres->l_lock); 2513 2514 mlog_entry_void(); 2515 mlog(0, "lock %s\n", lockres->l_name); 2516 2517 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2518 /* If we're already trying to cancel a lock conversion 2519 * then just drop the spinlock and allow the caller to 2520 * requeue this lock. */ 2521 2522 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 2523 return 0; 2524 } 2525 2526 /* were we in a convert when we got the bast fire? */ 2527 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 2528 lockres->l_action != OCFS2_AST_DOWNCONVERT); 2529 /* set things up for the unlockast to know to just 2530 * clear out the ast_action and unset busy, etc. */ 2531 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 2532 2533 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 2534 "lock %s, invalid flags: 0x%lx\n", 2535 lockres->l_name, lockres->l_flags); 2536 2537 return 1; 2538} 2539 2540static int ocfs2_cancel_convert(struct ocfs2_super *osb, 2541 struct ocfs2_lock_res *lockres) 2542{ 2543 int ret; 2544 enum dlm_status status; 2545 2546 mlog_entry_void(); 2547 mlog(0, "lock %s\n", lockres->l_name); 2548 2549 ret = 0; 2550 status = dlmunlock(osb->dlm, 2551 &lockres->l_lksb, 2552 LKM_CANCEL, 2553 ocfs2_unlock_ast, 2554 lockres); 2555 if (status != DLM_NORMAL) { 2556 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2557 ret = -EINVAL; 2558 ocfs2_recover_from_dlm_error(lockres, 0); 2559 } 2560 2561 mlog(0, "lock %s return from dlmunlock\n", lockres->l_name); 2562 2563 mlog_exit(ret); 2564 return ret; 2565} 2566 2567static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode, 2568 struct ocfs2_lock_res *lockres, 2569 int new_level) 2570{ 2571 int ret; 2572 2573 mlog_entry_void(); 2574 2575 BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE); 2576 2577 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2578 ret = 0; 2579 mlog(0, "lockres %s currently being refreshed -- backing " 2580 "off!\n", lockres->l_name); 2581 } else if (new_level == LKM_PRMODE) 2582 ret = !lockres->l_ex_holders && 2583 ocfs2_inode_fully_checkpointed(inode); 2584 else /* Must be NLMODE we're converting to. */ 2585 ret = !lockres->l_ro_holders && !lockres->l_ex_holders && 2586 ocfs2_inode_fully_checkpointed(inode); 2587 2588 mlog_exit(ret); 2589 return ret; 2590} 2591 2592static int ocfs2_do_unblock_meta(struct inode *inode, 2593 int *requeue) 2594{ 2595 int new_level; 2596 int set_lvb = 0; 2597 int ret = 0; 2598 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres; 2599 unsigned long flags; 2600 2601 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2602 2603 mlog_entry_void(); 2604 2605 spin_lock_irqsave(&lockres->l_lock, flags); 2606 2607 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 2608 2609 mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level, 2610 lockres->l_blocking); 2611 2612 BUG_ON(lockres->l_level != LKM_EXMODE && 2613 lockres->l_level != LKM_PRMODE); 2614 2615 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 2616 *requeue = 1; 2617 ret = ocfs2_prepare_cancel_convert(osb, lockres); 2618 spin_unlock_irqrestore(&lockres->l_lock, flags); 2619 if (ret) { 2620 ret = ocfs2_cancel_convert(osb, lockres); 2621 if (ret < 0) 2622 mlog_errno(ret); 2623 } 2624 goto leave; 2625 } 2626 2627 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 2628 2629 mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n", 2630 lockres->l_level, lockres->l_blocking, new_level); 2631 2632 if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) { 2633 if (lockres->l_level == LKM_EXMODE) 2634 set_lvb = 1; 2635 2636 /* If the lock hasn't been refreshed yet (rare), then 2637 * our memory inode values are old and we skip 2638 * stuffing the lvb. There's no need to actually clear 2639 * out the lvb here as it's value is still valid. */ 2640 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2641 if (set_lvb) 2642 __ocfs2_stuff_meta_lvb(inode); 2643 } else 2644 mlog(0, "lockres %s: downconverting stale lock!\n", 2645 lockres->l_name); 2646 2647 mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, " 2648 "l_blocking=%d, new_level=%d\n", 2649 lockres->l_level, lockres->l_blocking, new_level); 2650 2651 ocfs2_prepare_downconvert(lockres, new_level); 2652 spin_unlock_irqrestore(&lockres->l_lock, flags); 2653 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 2654 goto leave; 2655 } 2656 if (!ocfs2_inode_fully_checkpointed(inode)) 2657 ocfs2_start_checkpoint(osb); 2658 2659 *requeue = 1; 2660 spin_unlock_irqrestore(&lockres->l_lock, flags); 2661 ret = 0; 2662leave: 2663 mlog_exit(ret); 2664 return ret; 2665} 2666 2667static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, 2668 struct ocfs2_lock_res *lockres, 2669 struct ocfs2_unblock_ctl *ctl, 2670 ocfs2_convert_worker_t *worker) 2671{ 2672 unsigned long flags; 2673 int blocking; 2674 int new_level; 2675 int ret = 0; 2676 2677 mlog_entry_void(); 2678 2679 spin_lock_irqsave(&lockres->l_lock, flags); 2680 2681 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 2682 2683recheck: 2684 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 2685 ctl->requeue = 1; 2686 ret = ocfs2_prepare_cancel_convert(osb, lockres); 2687 spin_unlock_irqrestore(&lockres->l_lock, flags); 2688 if (ret) { 2689 ret = ocfs2_cancel_convert(osb, lockres); 2690 if (ret < 0) 2691 mlog_errno(ret); 2692 } 2693 goto leave; 2694 } 2695 2696 /* if we're blocking an exclusive and we have *any* holders, 2697 * then requeue. */ 2698 if ((lockres->l_blocking == LKM_EXMODE) 2699 && (lockres->l_ex_holders || lockres->l_ro_holders)) { 2700 spin_unlock_irqrestore(&lockres->l_lock, flags); 2701 ctl->requeue = 1; 2702 ret = 0; 2703 goto leave; 2704 } 2705 2706 /* If it's a PR we're blocking, then only 2707 * requeue if we've got any EX holders */ 2708 if (lockres->l_blocking == LKM_PRMODE && 2709 lockres->l_ex_holders) { 2710 spin_unlock_irqrestore(&lockres->l_lock, flags); 2711 ctl->requeue = 1; 2712 ret = 0; 2713 goto leave; 2714 } 2715 2716 /* If we get here, then we know that there are no more 2717 * incompatible holders (and anyone asking for an incompatible 2718 * lock is blocked). We can now downconvert the lock */ 2719 if (!worker) 2720 goto downconvert; 2721 2722 /* Some lockres types want to do a bit of work before 2723 * downconverting a lock. Allow that here. The worker function 2724 * may sleep, so we save off a copy of what we're blocking as 2725 * it may change while we're not holding the spin lock. */ 2726 blocking = lockres->l_blocking; 2727 spin_unlock_irqrestore(&lockres->l_lock, flags); 2728 2729 ctl->unblock_action = worker(lockres, blocking); 2730 2731 if (ctl->unblock_action == UNBLOCK_STOP_POST) 2732 goto leave; 2733 2734 spin_lock_irqsave(&lockres->l_lock, flags); 2735 if (blocking != lockres->l_blocking) { 2736 /* If this changed underneath us, then we can't drop 2737 * it just yet. */ 2738 goto recheck; 2739 } 2740 2741downconvert: 2742 ctl->requeue = 0; 2743 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 2744 2745 ocfs2_prepare_downconvert(lockres, new_level); 2746 spin_unlock_irqrestore(&lockres->l_lock, flags); 2747 ret = ocfs2_downconvert_lock(osb, lockres, new_level, 0); 2748leave: 2749 mlog_exit(ret); 2750 return ret; 2751} 2752 2753static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 2754 int blocking) 2755{ 2756 struct inode *inode; 2757 struct address_space *mapping; 2758 2759 inode = ocfs2_lock_res_inode(lockres); 2760 mapping = inode->i_mapping; 2761 2762 if (filemap_fdatawrite(mapping)) { 2763 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 2764 (unsigned long long)OCFS2_I(inode)->ip_blkno); 2765 } 2766 sync_mapping_buffers(mapping); 2767 if (blocking == LKM_EXMODE) { 2768 truncate_inode_pages(mapping, 0); 2769 unmap_mapping_range(mapping, 0, 0, 0); 2770 } else { 2771 /* We only need to wait on the I/O if we're not also 2772 * truncating pages because truncate_inode_pages waits 2773 * for us above. We don't truncate pages if we're 2774 * blocking anything < EXMODE because we want to keep 2775 * them around in that case. */ 2776 filemap_fdatawait(mapping); 2777 } 2778 2779 return UNBLOCK_CONTINUE; 2780} 2781 2782int ocfs2_unblock_data(struct ocfs2_lock_res *lockres, 2783 struct ocfs2_unblock_ctl *ctl) 2784{ 2785 int status; 2786 struct inode *inode; 2787 struct ocfs2_super *osb; 2788 2789 mlog_entry_void(); 2790 2791 inode = ocfs2_lock_res_inode(lockres); 2792 osb = OCFS2_SB(inode->i_sb); 2793 2794 mlog(0, "unblock inode %llu\n", 2795 (unsigned long long)OCFS2_I(inode)->ip_blkno); 2796 2797 status = ocfs2_generic_unblock_lock(osb, lockres, ctl, 2798 ocfs2_data_convert_worker); 2799 if (status < 0) 2800 mlog_errno(status); 2801 2802 mlog(0, "inode %llu, requeue = %d\n", 2803 (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue); 2804 2805 mlog_exit(status); 2806 return status; 2807} 2808 2809static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, 2810 struct ocfs2_unblock_ctl *ctl) 2811{ 2812 int status; 2813 struct inode *inode; 2814 2815 mlog_entry_void(); 2816 2817 mlog(0, "Unblock lockres %s\n", lockres->l_name); 2818 2819 inode = ocfs2_lock_res_inode(lockres); 2820 2821 status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb), 2822 lockres, ctl, NULL); 2823 if (status < 0) 2824 mlog_errno(status); 2825 2826 mlog_exit(status); 2827 return status; 2828} 2829 2830static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, 2831 struct ocfs2_unblock_ctl *ctl) 2832{ 2833 int status; 2834 struct inode *inode; 2835 2836 mlog_entry_void(); 2837 2838 inode = ocfs2_lock_res_inode(lockres); 2839 2840 mlog(0, "unblock inode %llu\n", 2841 (unsigned long long)OCFS2_I(inode)->ip_blkno); 2842 2843 status = ocfs2_do_unblock_meta(inode, &ctl->requeue); 2844 if (status < 0) 2845 mlog_errno(status); 2846 2847 mlog(0, "inode %llu, requeue = %d\n", 2848 (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue); 2849 2850 mlog_exit(status); 2851 return status; 2852} 2853 2854/* 2855 * Does the final reference drop on our dentry lock. Right now this 2856 * happens in the vote thread, but we could choose to simplify the 2857 * dlmglue API and push these off to the ocfs2_wq in the future. 2858 */ 2859static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 2860 struct ocfs2_lock_res *lockres) 2861{ 2862 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 2863 ocfs2_dentry_lock_put(osb, dl); 2864} 2865 2866/* 2867 * d_delete() matching dentries before the lock downconvert. 2868 * 2869 * At this point, any process waiting to destroy the 2870 * dentry_lock due to last ref count is stopped by the 2871 * OCFS2_LOCK_QUEUED flag. 2872 * 2873 * We have two potential problems 2874 * 2875 * 1) If we do the last reference drop on our dentry_lock (via dput) 2876 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 2877 * the downconvert to finish. Instead we take an elevated 2878 * reference and push the drop until after we've completed our 2879 * unblock processing. 2880 * 2881 * 2) There might be another process with a final reference, 2882 * waiting on us to finish processing. If this is the case, we 2883 * detect it and exit out - there's no more dentries anyway. 2884 */ 2885static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 2886 int blocking) 2887{ 2888 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 2889 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 2890 struct dentry *dentry; 2891 unsigned long flags; 2892 int extra_ref = 0; 2893 2894 /* 2895 * This node is blocking another node from getting a read 2896 * lock. This happens when we've renamed within a 2897 * directory. We've forced the other nodes to d_delete(), but 2898 * we never actually dropped our lock because it's still 2899 * valid. The downconvert code will retain a PR for this node, 2900 * so there's no further work to do. 2901 */ 2902 if (blocking == LKM_PRMODE) 2903 return UNBLOCK_CONTINUE; 2904 2905 /* 2906 * Mark this inode as potentially orphaned. The code in 2907 * ocfs2_delete_inode() will figure out whether it actually 2908 * needs to be freed or not. 2909 */ 2910 spin_lock(&oi->ip_lock); 2911 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 2912 spin_unlock(&oi->ip_lock); 2913 2914 /* 2915 * Yuck. We need to make sure however that the check of 2916 * OCFS2_LOCK_FREEING and the extra reference are atomic with 2917 * respect to a reference decrement or the setting of that 2918 * flag. 2919 */ 2920 spin_lock_irqsave(&lockres->l_lock, flags); 2921 spin_lock(&dentry_attach_lock); 2922 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 2923 && dl->dl_count) { 2924 dl->dl_count++; 2925 extra_ref = 1; 2926 } 2927 spin_unlock(&dentry_attach_lock); 2928 spin_unlock_irqrestore(&lockres->l_lock, flags); 2929 2930 mlog(0, "extra_ref = %d\n", extra_ref); 2931 2932 /* 2933 * We have a process waiting on us in ocfs2_dentry_iput(), 2934 * which means we can't have any more outstanding 2935 * aliases. There's no need to do any more work. 2936 */ 2937 if (!extra_ref) 2938 return UNBLOCK_CONTINUE; 2939 2940 spin_lock(&dentry_attach_lock); 2941 while (1) { 2942 dentry = ocfs2_find_local_alias(dl->dl_inode, 2943 dl->dl_parent_blkno, 1); 2944 if (!dentry) 2945 break; 2946 spin_unlock(&dentry_attach_lock); 2947 2948 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 2949 dentry->d_name.name); 2950 2951 /* 2952 * The following dcache calls may do an 2953 * iput(). Normally we don't want that from the 2954 * downconverting thread, but in this case it's ok 2955 * because the requesting node already has an 2956 * exclusive lock on the inode, so it can't be queued 2957 * for a downconvert. 2958 */ 2959 d_delete(dentry); 2960 dput(dentry); 2961 2962 spin_lock(&dentry_attach_lock); 2963 } 2964 spin_unlock(&dentry_attach_lock); 2965 2966 /* 2967 * If we are the last holder of this dentry lock, there is no 2968 * reason to downconvert so skip straight to the unlock. 2969 */ 2970 if (dl->dl_count == 1) 2971 return UNBLOCK_STOP_POST; 2972 2973 return UNBLOCK_CONTINUE_POST; 2974} 2975 2976static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres, 2977 struct ocfs2_unblock_ctl *ctl) 2978{ 2979 int ret; 2980 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 2981 struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb); 2982 2983 mlog(0, "unblock dentry lock: %llu\n", 2984 (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno); 2985 2986 ret = ocfs2_generic_unblock_lock(osb, 2987 lockres, 2988 ctl, 2989 ocfs2_dentry_convert_worker); 2990 if (ret < 0) 2991 mlog_errno(ret); 2992 2993 mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action); 2994 2995 return ret; 2996} 2997 2998/* Generic unblock function for any lockres whose private data is an 2999 * ocfs2_super pointer. */ 3000static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres, 3001 struct ocfs2_unblock_ctl *ctl) 3002{ 3003 int status; 3004 struct ocfs2_super *osb; 3005 3006 mlog_entry_void(); 3007 3008 mlog(0, "Unblock lockres %s\n", lockres->l_name); 3009 3010 osb = ocfs2_lock_res_super(lockres); 3011 3012 status = ocfs2_generic_unblock_lock(osb, 3013 lockres, 3014 ctl, 3015 NULL); 3016 if (status < 0) 3017 mlog_errno(status); 3018 3019 mlog_exit(status); 3020 return status; 3021} 3022 3023void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3024 struct ocfs2_lock_res *lockres) 3025{ 3026 int status; 3027 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3028 unsigned long flags; 3029 3030 /* Our reference to the lockres in this function can be 3031 * considered valid until we remove the OCFS2_LOCK_QUEUED 3032 * flag. */ 3033 3034 mlog_entry_void(); 3035 3036 BUG_ON(!lockres); 3037 BUG_ON(!lockres->l_ops); 3038 BUG_ON(!lockres->l_ops->unblock); 3039 3040 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3041 3042 /* Detect whether a lock has been marked as going away while 3043 * the vote thread was processing other things. A lock can 3044 * still be marked with OCFS2_LOCK_FREEING after this check, 3045 * but short circuiting here will still save us some 3046 * performance. */ 3047 spin_lock_irqsave(&lockres->l_lock, flags); 3048 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3049 goto unqueue; 3050 spin_unlock_irqrestore(&lockres->l_lock, flags); 3051 3052 status = lockres->l_ops->unblock(lockres, &ctl); 3053 if (status < 0) 3054 mlog_errno(status); 3055 3056 spin_lock_irqsave(&lockres->l_lock, flags); 3057unqueue: 3058 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3059 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3060 } else 3061 ocfs2_schedule_blocked_lock(osb, lockres); 3062 3063 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3064 ctl.requeue ? "yes" : "no"); 3065 spin_unlock_irqrestore(&lockres->l_lock, flags); 3066 3067 if (ctl.unblock_action != UNBLOCK_CONTINUE 3068 && lockres->l_ops->post_unlock) 3069 lockres->l_ops->post_unlock(osb, lockres); 3070 3071 mlog_exit_void(); 3072} 3073 3074static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3075 struct ocfs2_lock_res *lockres) 3076{ 3077 mlog_entry_void(); 3078 3079 assert_spin_locked(&lockres->l_lock); 3080 3081 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3082 /* Do not schedule a lock for downconvert when it's on 3083 * the way to destruction - any nodes wanting access 3084 * to the resource will get it soon. */ 3085 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 3086 lockres->l_name, lockres->l_flags); 3087 return; 3088 } 3089 3090 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3091 3092 spin_lock(&osb->vote_task_lock); 3093 if (list_empty(&lockres->l_blocked_list)) { 3094 list_add_tail(&lockres->l_blocked_list, 3095 &osb->blocked_lock_list); 3096 osb->blocked_lock_count++; 3097 } 3098 spin_unlock(&osb->vote_task_lock); 3099 3100 mlog_exit_void(); 3101} 3102 3103/* This aids in debugging situations where a bad LVB might be involved. */ 3104void ocfs2_dump_meta_lvb_info(u64 level, 3105 const char *function, 3106 unsigned int line, 3107 struct ocfs2_lock_res *lockres) 3108{ 3109 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 3110 3111 mlog(level, "LVB information for %s (called from %s:%u):\n", 3112 lockres->l_name, function, line); 3113 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 3114 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 3115 be32_to_cpu(lvb->lvb_igeneration)); 3116 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 3117 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 3118 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 3119 be16_to_cpu(lvb->lvb_imode)); 3120 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 3121 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 3122 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 3123 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 3124 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 3125 be32_to_cpu(lvb->lvb_iattr)); 3126} 3127