dlmglue.c revision 810d5aeba18825c754cf47db59eb83814a54bb27
1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26#include <linux/types.h> 27#include <linux/slab.h> 28#include <linux/highmem.h> 29#include <linux/mm.h> 30#include <linux/smp_lock.h> 31#include <linux/crc32.h> 32#include <linux/kthread.h> 33#include <linux/pagemap.h> 34#include <linux/debugfs.h> 35#include <linux/seq_file.h> 36 37#include <cluster/heartbeat.h> 38#include <cluster/nodemanager.h> 39#include <cluster/tcp.h> 40 41#include <dlm/dlmapi.h> 42 43#define MLOG_MASK_PREFIX ML_DLM_GLUE 44#include <cluster/masklog.h> 45 46#include "ocfs2.h" 47 48#include "alloc.h" 49#include "dcache.h" 50#include "dlmglue.h" 51#include "extent_map.h" 52#include "heartbeat.h" 53#include "inode.h" 54#include "journal.h" 55#include "slot_map.h" 56#include "super.h" 57#include "uptodate.h" 58#include "vote.h" 59 60#include "buffer_head_io.h" 61 62struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68}; 69 70static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 71static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 72 73/* 74 * Return value from ocfs2_convert_worker_t functions. 75 * 76 * These control the precise actions of ocfs2_generic_unblock_lock() 77 * and ocfs2_process_blocked_lock() 78 * 79 */ 80enum ocfs2_unblock_action { 81 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 82 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 83 * ->post_unlock callback */ 84 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 85 * ->post_unlock() callback. */ 86}; 87 88struct ocfs2_unblock_ctl { 89 int requeue; 90 enum ocfs2_unblock_action unblock_action; 91}; 92 93static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, 94 struct ocfs2_unblock_ctl *ctl); 95static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 96 int new_level); 97static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 98 99static int ocfs2_unblock_data(struct ocfs2_lock_res *lockres, 100 struct ocfs2_unblock_ctl *ctl); 101static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, 102 struct ocfs2_unblock_ctl *ctl); 103static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres, 104 struct ocfs2_unblock_ctl *ctl); 105static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres, 106 struct ocfs2_unblock_ctl *ctl); 107 108static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 109 struct ocfs2_lock_res *lockres); 110 111/* 112 * OCFS2 Lock Resource Operations 113 * 114 * These fine tune the behavior of the generic dlmglue locking infrastructure. 115 */ 116struct ocfs2_lock_res_ops { 117 /* 118 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 119 * this callback if ->l_priv is not an ocfs2_super pointer 120 */ 121 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 122 int (*unblock)(struct ocfs2_lock_res *, struct ocfs2_unblock_ctl *); 123 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 124 125 /* 126 * Allow a lock type to add checks to determine whether it is 127 * safe to downconvert a lock. Return 0 to re-queue the 128 * downconvert at a later time, nonzero to continue. 129 * 130 * For most locks, the default checks that there are no 131 * incompatible holders are sufficient. 132 * 133 * Called with the lockres spinlock held. 134 */ 135 int (*check_downconvert)(struct ocfs2_lock_res *, int); 136 137 /* 138 * Allows a lock type to populate the lock value block. This 139 * is called on downconvert, and when we drop a lock. 140 * 141 * Locks that want to use this should set LOCK_TYPE_USES_LVB 142 * in the flags field. 143 * 144 * Called with the lockres spinlock held. 145 */ 146 void (*set_lvb)(struct ocfs2_lock_res *); 147 148 /* 149 * LOCK_TYPE_* flags which describe the specific requirements 150 * of a lock type. Descriptions of each individual flag follow. 151 */ 152 int flags; 153}; 154 155/* 156 * Some locks want to "refresh" potentially stale data when a 157 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 158 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 159 * individual lockres l_flags member from the ast function. It is 160 * expected that the locking wrapper will clear the 161 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 162 */ 163#define LOCK_TYPE_REQUIRES_REFRESH 0x1 164 165/* 166 * Indicate that a lock type makes use of the lock value block. The 167 * ->set_lvb lock type callback must be defined. 168 */ 169#define LOCK_TYPE_USES_LVB 0x2 170 171typedef int (ocfs2_convert_worker_t)(struct ocfs2_lock_res *, int); 172static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, 173 struct ocfs2_lock_res *lockres, 174 struct ocfs2_unblock_ctl *ctl, 175 ocfs2_convert_worker_t *worker); 176 177static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 178 .get_osb = ocfs2_get_inode_osb, 179 .unblock = ocfs2_unblock_inode_lock, 180 .flags = 0, 181}; 182 183static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = { 184 .get_osb = ocfs2_get_inode_osb, 185 .unblock = ocfs2_unblock_meta, 186 .check_downconvert = ocfs2_check_meta_downconvert, 187 .set_lvb = ocfs2_set_meta_lvb, 188 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 189}; 190 191static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = { 192 .get_osb = ocfs2_get_inode_osb, 193 .unblock = ocfs2_unblock_data, 194 .flags = 0, 195}; 196 197static struct ocfs2_lock_res_ops ocfs2_super_lops = { 198 .unblock = ocfs2_unblock_osb_lock, 199 .flags = LOCK_TYPE_REQUIRES_REFRESH, 200}; 201 202static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 203 .unblock = ocfs2_unblock_osb_lock, 204 .flags = 0, 205}; 206 207static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 208 .get_osb = ocfs2_get_dentry_osb, 209 .unblock = ocfs2_unblock_dentry_lock, 210 .post_unlock = ocfs2_dentry_post_unlock, 211 .flags = 0, 212}; 213 214static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 215{ 216 return lockres->l_type == OCFS2_LOCK_TYPE_META || 217 lockres->l_type == OCFS2_LOCK_TYPE_DATA || 218 lockres->l_type == OCFS2_LOCK_TYPE_RW; 219} 220 221static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 222{ 223 BUG_ON(!ocfs2_is_inode_lock(lockres)); 224 225 return (struct inode *) lockres->l_priv; 226} 227 228static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 229{ 230 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 231 232 return (struct ocfs2_dentry_lock *)lockres->l_priv; 233} 234 235static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 236{ 237 if (lockres->l_ops->get_osb) 238 return lockres->l_ops->get_osb(lockres); 239 240 return (struct ocfs2_super *)lockres->l_priv; 241} 242 243static int ocfs2_lock_create(struct ocfs2_super *osb, 244 struct ocfs2_lock_res *lockres, 245 int level, 246 int dlm_flags); 247static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 248 int wanted); 249static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 250 struct ocfs2_lock_res *lockres, 251 int level); 252static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 253static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 254static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 255static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 256static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 257 struct ocfs2_lock_res *lockres); 258static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 259 int convert); 260#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \ 261 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ 262 "resource %s: %s\n", dlm_errname(_stat), _func, \ 263 _lockres->l_name, dlm_errmsg(_stat)); \ 264} while (0) 265static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, 266 struct ocfs2_lock_res *lockres); 267static int ocfs2_meta_lock_update(struct inode *inode, 268 struct buffer_head **bh); 269static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 270static inline int ocfs2_highest_compat_lock_level(int level); 271static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode, 272 struct ocfs2_lock_res *lockres, 273 int new_level); 274 275static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 276 u64 blkno, 277 u32 generation, 278 char *name) 279{ 280 int len; 281 282 mlog_entry_void(); 283 284 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 285 286 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 287 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 288 (long long)blkno, generation); 289 290 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 291 292 mlog(0, "built lock resource with name: %s\n", name); 293 294 mlog_exit_void(); 295} 296 297static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 298 299static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 300 struct ocfs2_dlm_debug *dlm_debug) 301{ 302 mlog(0, "Add tracking for lockres %s\n", res->l_name); 303 304 spin_lock(&ocfs2_dlm_tracking_lock); 305 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 306 spin_unlock(&ocfs2_dlm_tracking_lock); 307} 308 309static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 310{ 311 spin_lock(&ocfs2_dlm_tracking_lock); 312 if (!list_empty(&res->l_debug_list)) 313 list_del_init(&res->l_debug_list); 314 spin_unlock(&ocfs2_dlm_tracking_lock); 315} 316 317static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 318 struct ocfs2_lock_res *res, 319 enum ocfs2_lock_type type, 320 struct ocfs2_lock_res_ops *ops, 321 void *priv) 322{ 323 res->l_type = type; 324 res->l_ops = ops; 325 res->l_priv = priv; 326 327 res->l_level = LKM_IVMODE; 328 res->l_requested = LKM_IVMODE; 329 res->l_blocking = LKM_IVMODE; 330 res->l_action = OCFS2_AST_INVALID; 331 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 332 333 res->l_flags = OCFS2_LOCK_INITIALIZED; 334 335 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 336} 337 338void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 339{ 340 /* This also clears out the lock status block */ 341 memset(res, 0, sizeof(struct ocfs2_lock_res)); 342 spin_lock_init(&res->l_lock); 343 init_waitqueue_head(&res->l_event); 344 INIT_LIST_HEAD(&res->l_blocked_list); 345 INIT_LIST_HEAD(&res->l_mask_waiters); 346} 347 348void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 349 enum ocfs2_lock_type type, 350 unsigned int generation, 351 struct inode *inode) 352{ 353 struct ocfs2_lock_res_ops *ops; 354 355 switch(type) { 356 case OCFS2_LOCK_TYPE_RW: 357 ops = &ocfs2_inode_rw_lops; 358 break; 359 case OCFS2_LOCK_TYPE_META: 360 ops = &ocfs2_inode_meta_lops; 361 break; 362 case OCFS2_LOCK_TYPE_DATA: 363 ops = &ocfs2_inode_data_lops; 364 break; 365 default: 366 mlog_bug_on_msg(1, "type: %d\n", type); 367 ops = NULL; /* thanks, gcc */ 368 break; 369 }; 370 371 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 372 generation, res->l_name); 373 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 374} 375 376static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 377{ 378 struct inode *inode = ocfs2_lock_res_inode(lockres); 379 380 return OCFS2_SB(inode->i_sb); 381} 382 383static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 384{ 385 __be64 inode_blkno_be; 386 387 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 388 sizeof(__be64)); 389 390 return be64_to_cpu(inode_blkno_be); 391} 392 393static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 394{ 395 struct ocfs2_dentry_lock *dl = lockres->l_priv; 396 397 return OCFS2_SB(dl->dl_inode->i_sb); 398} 399 400void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 401 u64 parent, struct inode *inode) 402{ 403 int len; 404 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 405 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 406 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 407 408 ocfs2_lock_res_init_once(lockres); 409 410 /* 411 * Unfortunately, the standard lock naming scheme won't work 412 * here because we have two 16 byte values to use. Instead, 413 * we'll stuff the inode number as a binary value. We still 414 * want error prints to show something without garbling the 415 * display, so drop a null byte in there before the inode 416 * number. A future version of OCFS2 will likely use all 417 * binary lock names. The stringified names have been a 418 * tremendous aid in debugging, but now that the debugfs 419 * interface exists, we can mangle things there if need be. 420 * 421 * NOTE: We also drop the standard "pad" value (the total lock 422 * name size stays the same though - the last part is all 423 * zeros due to the memset in ocfs2_lock_res_init_once() 424 */ 425 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 426 "%c%016llx", 427 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 428 (long long)parent); 429 430 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 431 432 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 433 sizeof(__be64)); 434 435 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 436 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 437 dl); 438} 439 440static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 441 struct ocfs2_super *osb) 442{ 443 /* Superblock lockres doesn't come from a slab so we call init 444 * once on it manually. */ 445 ocfs2_lock_res_init_once(res); 446 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 447 0, res->l_name); 448 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 449 &ocfs2_super_lops, osb); 450} 451 452static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 453 struct ocfs2_super *osb) 454{ 455 /* Rename lockres doesn't come from a slab so we call init 456 * once on it manually. */ 457 ocfs2_lock_res_init_once(res); 458 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 459 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 460 &ocfs2_rename_lops, osb); 461} 462 463void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 464{ 465 mlog_entry_void(); 466 467 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 468 return; 469 470 ocfs2_remove_lockres_tracking(res); 471 472 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 473 "Lockres %s is on the blocked list\n", 474 res->l_name); 475 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 476 "Lockres %s has mask waiters pending\n", 477 res->l_name); 478 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 479 "Lockres %s is locked\n", 480 res->l_name); 481 mlog_bug_on_msg(res->l_ro_holders, 482 "Lockres %s has %u ro holders\n", 483 res->l_name, res->l_ro_holders); 484 mlog_bug_on_msg(res->l_ex_holders, 485 "Lockres %s has %u ex holders\n", 486 res->l_name, res->l_ex_holders); 487 488 /* Need to clear out the lock status block for the dlm */ 489 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 490 491 res->l_flags = 0UL; 492 mlog_exit_void(); 493} 494 495static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 496 int level) 497{ 498 mlog_entry_void(); 499 500 BUG_ON(!lockres); 501 502 switch(level) { 503 case LKM_EXMODE: 504 lockres->l_ex_holders++; 505 break; 506 case LKM_PRMODE: 507 lockres->l_ro_holders++; 508 break; 509 default: 510 BUG(); 511 } 512 513 mlog_exit_void(); 514} 515 516static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 517 int level) 518{ 519 mlog_entry_void(); 520 521 BUG_ON(!lockres); 522 523 switch(level) { 524 case LKM_EXMODE: 525 BUG_ON(!lockres->l_ex_holders); 526 lockres->l_ex_holders--; 527 break; 528 case LKM_PRMODE: 529 BUG_ON(!lockres->l_ro_holders); 530 lockres->l_ro_holders--; 531 break; 532 default: 533 BUG(); 534 } 535 mlog_exit_void(); 536} 537 538/* WARNING: This function lives in a world where the only three lock 539 * levels are EX, PR, and NL. It *will* have to be adjusted when more 540 * lock types are added. */ 541static inline int ocfs2_highest_compat_lock_level(int level) 542{ 543 int new_level = LKM_EXMODE; 544 545 if (level == LKM_EXMODE) 546 new_level = LKM_NLMODE; 547 else if (level == LKM_PRMODE) 548 new_level = LKM_PRMODE; 549 return new_level; 550} 551 552static void lockres_set_flags(struct ocfs2_lock_res *lockres, 553 unsigned long newflags) 554{ 555 struct list_head *pos, *tmp; 556 struct ocfs2_mask_waiter *mw; 557 558 assert_spin_locked(&lockres->l_lock); 559 560 lockres->l_flags = newflags; 561 562 list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) { 563 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item); 564 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 565 continue; 566 567 list_del_init(&mw->mw_item); 568 mw->mw_status = 0; 569 complete(&mw->mw_complete); 570 } 571} 572static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 573{ 574 lockres_set_flags(lockres, lockres->l_flags | or); 575} 576static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 577 unsigned long clear) 578{ 579 lockres_set_flags(lockres, lockres->l_flags & ~clear); 580} 581 582static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 583{ 584 mlog_entry_void(); 585 586 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 587 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 588 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 589 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 590 591 lockres->l_level = lockres->l_requested; 592 if (lockres->l_level <= 593 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 594 lockres->l_blocking = LKM_NLMODE; 595 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 596 } 597 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 598 599 mlog_exit_void(); 600} 601 602static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 603{ 604 mlog_entry_void(); 605 606 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 607 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 608 609 /* Convert from RO to EX doesn't really need anything as our 610 * information is already up to data. Convert from NL to 611 * *anything* however should mark ourselves as needing an 612 * update */ 613 if (lockres->l_level == LKM_NLMODE && 614 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 615 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 616 617 lockres->l_level = lockres->l_requested; 618 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 619 620 mlog_exit_void(); 621} 622 623static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 624{ 625 mlog_entry_void(); 626 627 BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY)); 628 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 629 630 if (lockres->l_requested > LKM_NLMODE && 631 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 632 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 633 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 634 635 lockres->l_level = lockres->l_requested; 636 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 637 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 638 639 mlog_exit_void(); 640} 641 642static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 643 int level) 644{ 645 int needs_downconvert = 0; 646 mlog_entry_void(); 647 648 assert_spin_locked(&lockres->l_lock); 649 650 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 651 652 if (level > lockres->l_blocking) { 653 /* only schedule a downconvert if we haven't already scheduled 654 * one that goes low enough to satisfy the level we're 655 * blocking. this also catches the case where we get 656 * duplicate BASTs */ 657 if (ocfs2_highest_compat_lock_level(level) < 658 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 659 needs_downconvert = 1; 660 661 lockres->l_blocking = level; 662 } 663 664 mlog_exit(needs_downconvert); 665 return needs_downconvert; 666} 667 668static void ocfs2_blocking_ast(void *opaque, int level) 669{ 670 struct ocfs2_lock_res *lockres = opaque; 671 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 672 int needs_downconvert; 673 unsigned long flags; 674 675 BUG_ON(level <= LKM_NLMODE); 676 677 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 678 lockres->l_name, level, lockres->l_level, 679 ocfs2_lock_type_string(lockres->l_type)); 680 681 spin_lock_irqsave(&lockres->l_lock, flags); 682 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 683 if (needs_downconvert) 684 ocfs2_schedule_blocked_lock(osb, lockres); 685 spin_unlock_irqrestore(&lockres->l_lock, flags); 686 687 wake_up(&lockres->l_event); 688 689 ocfs2_kick_vote_thread(osb); 690} 691 692static void ocfs2_locking_ast(void *opaque) 693{ 694 struct ocfs2_lock_res *lockres = opaque; 695 struct dlm_lockstatus *lksb = &lockres->l_lksb; 696 unsigned long flags; 697 698 spin_lock_irqsave(&lockres->l_lock, flags); 699 700 if (lksb->status != DLM_NORMAL) { 701 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n", 702 lockres->l_name, lksb->status); 703 spin_unlock_irqrestore(&lockres->l_lock, flags); 704 return; 705 } 706 707 switch(lockres->l_action) { 708 case OCFS2_AST_ATTACH: 709 ocfs2_generic_handle_attach_action(lockres); 710 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 711 break; 712 case OCFS2_AST_CONVERT: 713 ocfs2_generic_handle_convert_action(lockres); 714 break; 715 case OCFS2_AST_DOWNCONVERT: 716 ocfs2_generic_handle_downconvert_action(lockres); 717 break; 718 default: 719 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 720 "lockres flags = 0x%lx, unlock action: %u\n", 721 lockres->l_name, lockres->l_action, lockres->l_flags, 722 lockres->l_unlock_action); 723 BUG(); 724 } 725 726 /* set it to something invalid so if we get called again we 727 * can catch it. */ 728 lockres->l_action = OCFS2_AST_INVALID; 729 730 wake_up(&lockres->l_event); 731 spin_unlock_irqrestore(&lockres->l_lock, flags); 732} 733 734static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 735 int convert) 736{ 737 unsigned long flags; 738 739 mlog_entry_void(); 740 spin_lock_irqsave(&lockres->l_lock, flags); 741 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 742 if (convert) 743 lockres->l_action = OCFS2_AST_INVALID; 744 else 745 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 746 spin_unlock_irqrestore(&lockres->l_lock, flags); 747 748 wake_up(&lockres->l_event); 749 mlog_exit_void(); 750} 751 752/* Note: If we detect another process working on the lock (i.e., 753 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 754 * to do the right thing in that case. 755 */ 756static int ocfs2_lock_create(struct ocfs2_super *osb, 757 struct ocfs2_lock_res *lockres, 758 int level, 759 int dlm_flags) 760{ 761 int ret = 0; 762 enum dlm_status status; 763 unsigned long flags; 764 765 mlog_entry_void(); 766 767 mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level, 768 dlm_flags); 769 770 spin_lock_irqsave(&lockres->l_lock, flags); 771 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 772 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 773 spin_unlock_irqrestore(&lockres->l_lock, flags); 774 goto bail; 775 } 776 777 lockres->l_action = OCFS2_AST_ATTACH; 778 lockres->l_requested = level; 779 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 780 spin_unlock_irqrestore(&lockres->l_lock, flags); 781 782 status = dlmlock(osb->dlm, 783 level, 784 &lockres->l_lksb, 785 dlm_flags, 786 lockres->l_name, 787 OCFS2_LOCK_ID_MAX_LEN - 1, 788 ocfs2_locking_ast, 789 lockres, 790 ocfs2_blocking_ast); 791 if (status != DLM_NORMAL) { 792 ocfs2_log_dlm_error("dlmlock", status, lockres); 793 ret = -EINVAL; 794 ocfs2_recover_from_dlm_error(lockres, 1); 795 } 796 797 mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name); 798 799bail: 800 mlog_exit(ret); 801 return ret; 802} 803 804static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 805 int flag) 806{ 807 unsigned long flags; 808 int ret; 809 810 spin_lock_irqsave(&lockres->l_lock, flags); 811 ret = lockres->l_flags & flag; 812 spin_unlock_irqrestore(&lockres->l_lock, flags); 813 814 return ret; 815} 816 817static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 818 819{ 820 wait_event(lockres->l_event, 821 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 822} 823 824static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 825 826{ 827 wait_event(lockres->l_event, 828 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 829} 830 831/* predict what lock level we'll be dropping down to on behalf 832 * of another node, and return true if the currently wanted 833 * level will be compatible with it. */ 834static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 835 int wanted) 836{ 837 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 838 839 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 840} 841 842static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 843{ 844 INIT_LIST_HEAD(&mw->mw_item); 845 init_completion(&mw->mw_complete); 846} 847 848static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 849{ 850 wait_for_completion(&mw->mw_complete); 851 /* Re-arm the completion in case we want to wait on it again */ 852 INIT_COMPLETION(mw->mw_complete); 853 return mw->mw_status; 854} 855 856static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 857 struct ocfs2_mask_waiter *mw, 858 unsigned long mask, 859 unsigned long goal) 860{ 861 BUG_ON(!list_empty(&mw->mw_item)); 862 863 assert_spin_locked(&lockres->l_lock); 864 865 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 866 mw->mw_mask = mask; 867 mw->mw_goal = goal; 868} 869 870/* returns 0 if the mw that was removed was already satisfied, -EBUSY 871 * if the mask still hadn't reached its goal */ 872static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 873 struct ocfs2_mask_waiter *mw) 874{ 875 unsigned long flags; 876 int ret = 0; 877 878 spin_lock_irqsave(&lockres->l_lock, flags); 879 if (!list_empty(&mw->mw_item)) { 880 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 881 ret = -EBUSY; 882 883 list_del_init(&mw->mw_item); 884 init_completion(&mw->mw_complete); 885 } 886 spin_unlock_irqrestore(&lockres->l_lock, flags); 887 888 return ret; 889 890} 891 892static int ocfs2_cluster_lock(struct ocfs2_super *osb, 893 struct ocfs2_lock_res *lockres, 894 int level, 895 int lkm_flags, 896 int arg_flags) 897{ 898 struct ocfs2_mask_waiter mw; 899 enum dlm_status status; 900 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 901 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 902 unsigned long flags; 903 904 mlog_entry_void(); 905 906 ocfs2_init_mask_waiter(&mw); 907 908 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 909 lkm_flags |= LKM_VALBLK; 910 911again: 912 wait = 0; 913 914 if (catch_signals && signal_pending(current)) { 915 ret = -ERESTARTSYS; 916 goto out; 917 } 918 919 spin_lock_irqsave(&lockres->l_lock, flags); 920 921 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 922 "Cluster lock called on freeing lockres %s! flags " 923 "0x%lx\n", lockres->l_name, lockres->l_flags); 924 925 /* We only compare against the currently granted level 926 * here. If the lock is blocked waiting on a downconvert, 927 * we'll get caught below. */ 928 if (lockres->l_flags & OCFS2_LOCK_BUSY && 929 level > lockres->l_level) { 930 /* is someone sitting in dlm_lock? If so, wait on 931 * them. */ 932 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 933 wait = 1; 934 goto unlock; 935 } 936 937 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 938 /* lock has not been created yet. */ 939 spin_unlock_irqrestore(&lockres->l_lock, flags); 940 941 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); 942 if (ret < 0) { 943 mlog_errno(ret); 944 goto out; 945 } 946 goto again; 947 } 948 949 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 950 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 951 /* is the lock is currently blocked on behalf of 952 * another node */ 953 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 954 wait = 1; 955 goto unlock; 956 } 957 958 if (level > lockres->l_level) { 959 if (lockres->l_action != OCFS2_AST_INVALID) 960 mlog(ML_ERROR, "lockres %s has action %u pending\n", 961 lockres->l_name, lockres->l_action); 962 963 lockres->l_action = OCFS2_AST_CONVERT; 964 lockres->l_requested = level; 965 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 966 spin_unlock_irqrestore(&lockres->l_lock, flags); 967 968 BUG_ON(level == LKM_IVMODE); 969 BUG_ON(level == LKM_NLMODE); 970 971 mlog(0, "lock %s, convert from %d to level = %d\n", 972 lockres->l_name, lockres->l_level, level); 973 974 /* call dlm_lock to upgrade lock now */ 975 status = dlmlock(osb->dlm, 976 level, 977 &lockres->l_lksb, 978 lkm_flags|LKM_CONVERT, 979 lockres->l_name, 980 OCFS2_LOCK_ID_MAX_LEN - 1, 981 ocfs2_locking_ast, 982 lockres, 983 ocfs2_blocking_ast); 984 if (status != DLM_NORMAL) { 985 if ((lkm_flags & LKM_NOQUEUE) && 986 (status == DLM_NOTQUEUED)) 987 ret = -EAGAIN; 988 else { 989 ocfs2_log_dlm_error("dlmlock", status, 990 lockres); 991 ret = -EINVAL; 992 } 993 ocfs2_recover_from_dlm_error(lockres, 1); 994 goto out; 995 } 996 997 mlog(0, "lock %s, successfull return from dlmlock\n", 998 lockres->l_name); 999 1000 /* At this point we've gone inside the dlm and need to 1001 * complete our work regardless. */ 1002 catch_signals = 0; 1003 1004 /* wait for busy to clear and carry on */ 1005 goto again; 1006 } 1007 1008 /* Ok, if we get here then we're good to go. */ 1009 ocfs2_inc_holders(lockres, level); 1010 1011 ret = 0; 1012unlock: 1013 spin_unlock_irqrestore(&lockres->l_lock, flags); 1014out: 1015 /* 1016 * This is helping work around a lock inversion between the page lock 1017 * and dlm locks. One path holds the page lock while calling aops 1018 * which block acquiring dlm locks. The voting thread holds dlm 1019 * locks while acquiring page locks while down converting data locks. 1020 * This block is helping an aop path notice the inversion and back 1021 * off to unlock its page lock before trying the dlm lock again. 1022 */ 1023 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1024 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1025 wait = 0; 1026 if (lockres_remove_mask_waiter(lockres, &mw)) 1027 ret = -EAGAIN; 1028 else 1029 goto again; 1030 } 1031 if (wait) { 1032 ret = ocfs2_wait_for_mask(&mw); 1033 if (ret == 0) 1034 goto again; 1035 mlog_errno(ret); 1036 } 1037 1038 mlog_exit(ret); 1039 return ret; 1040} 1041 1042static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1043 struct ocfs2_lock_res *lockres, 1044 int level) 1045{ 1046 unsigned long flags; 1047 1048 mlog_entry_void(); 1049 spin_lock_irqsave(&lockres->l_lock, flags); 1050 ocfs2_dec_holders(lockres, level); 1051 ocfs2_vote_on_unlock(osb, lockres); 1052 spin_unlock_irqrestore(&lockres->l_lock, flags); 1053 mlog_exit_void(); 1054} 1055 1056int ocfs2_create_new_lock(struct ocfs2_super *osb, 1057 struct ocfs2_lock_res *lockres, 1058 int ex, 1059 int local) 1060{ 1061 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1062 unsigned long flags; 1063 int lkm_flags = local ? LKM_LOCAL : 0; 1064 1065 spin_lock_irqsave(&lockres->l_lock, flags); 1066 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1067 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1068 spin_unlock_irqrestore(&lockres->l_lock, flags); 1069 1070 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1071} 1072 1073/* Grants us an EX lock on the data and metadata resources, skipping 1074 * the normal cluster directory lookup. Use this ONLY on newly created 1075 * inodes which other nodes can't possibly see, and which haven't been 1076 * hashed in the inode hash yet. This can give us a good performance 1077 * increase as it'll skip the network broadcast normally associated 1078 * with creating a new lock resource. */ 1079int ocfs2_create_new_inode_locks(struct inode *inode) 1080{ 1081 int ret; 1082 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1083 1084 BUG_ON(!inode); 1085 BUG_ON(!ocfs2_inode_is_new(inode)); 1086 1087 mlog_entry_void(); 1088 1089 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1090 1091 /* NOTE: That we don't increment any of the holder counts, nor 1092 * do we add anything to a journal handle. Since this is 1093 * supposed to be a new inode which the cluster doesn't know 1094 * about yet, there is no need to. As far as the LVB handling 1095 * is concerned, this is basically like acquiring an EX lock 1096 * on a resource which has an invalid one -- we'll set it 1097 * valid when we release the EX. */ 1098 1099 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1100 if (ret) { 1101 mlog_errno(ret); 1102 goto bail; 1103 } 1104 1105 /* 1106 * We don't want to use LKM_LOCAL on a meta data lock as they 1107 * don't use a generation in their lock names. 1108 */ 1109 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0); 1110 if (ret) { 1111 mlog_errno(ret); 1112 goto bail; 1113 } 1114 1115 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1); 1116 if (ret) { 1117 mlog_errno(ret); 1118 goto bail; 1119 } 1120 1121bail: 1122 mlog_exit(ret); 1123 return ret; 1124} 1125 1126int ocfs2_rw_lock(struct inode *inode, int write) 1127{ 1128 int status, level; 1129 struct ocfs2_lock_res *lockres; 1130 1131 BUG_ON(!inode); 1132 1133 mlog_entry_void(); 1134 1135 mlog(0, "inode %llu take %s RW lock\n", 1136 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1137 write ? "EXMODE" : "PRMODE"); 1138 1139 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1140 1141 level = write ? LKM_EXMODE : LKM_PRMODE; 1142 1143 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1144 0); 1145 if (status < 0) 1146 mlog_errno(status); 1147 1148 mlog_exit(status); 1149 return status; 1150} 1151 1152void ocfs2_rw_unlock(struct inode *inode, int write) 1153{ 1154 int level = write ? LKM_EXMODE : LKM_PRMODE; 1155 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1156 1157 mlog_entry_void(); 1158 1159 mlog(0, "inode %llu drop %s RW lock\n", 1160 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1161 write ? "EXMODE" : "PRMODE"); 1162 1163 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1164 1165 mlog_exit_void(); 1166} 1167 1168int ocfs2_data_lock_full(struct inode *inode, 1169 int write, 1170 int arg_flags) 1171{ 1172 int status = 0, level; 1173 struct ocfs2_lock_res *lockres; 1174 1175 BUG_ON(!inode); 1176 1177 mlog_entry_void(); 1178 1179 mlog(0, "inode %llu take %s DATA lock\n", 1180 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1181 write ? "EXMODE" : "PRMODE"); 1182 1183 /* We'll allow faking a readonly data lock for 1184 * rodevices. */ 1185 if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) { 1186 if (write) { 1187 status = -EROFS; 1188 mlog_errno(status); 1189 } 1190 goto out; 1191 } 1192 1193 lockres = &OCFS2_I(inode)->ip_data_lockres; 1194 1195 level = write ? LKM_EXMODE : LKM_PRMODE; 1196 1197 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 1198 0, arg_flags); 1199 if (status < 0 && status != -EAGAIN) 1200 mlog_errno(status); 1201 1202out: 1203 mlog_exit(status); 1204 return status; 1205} 1206 1207/* see ocfs2_meta_lock_with_page() */ 1208int ocfs2_data_lock_with_page(struct inode *inode, 1209 int write, 1210 struct page *page) 1211{ 1212 int ret; 1213 1214 ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK); 1215 if (ret == -EAGAIN) { 1216 unlock_page(page); 1217 if (ocfs2_data_lock(inode, write) == 0) 1218 ocfs2_data_unlock(inode, write); 1219 ret = AOP_TRUNCATED_PAGE; 1220 } 1221 1222 return ret; 1223} 1224 1225static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, 1226 struct ocfs2_lock_res *lockres) 1227{ 1228 int kick = 0; 1229 1230 mlog_entry_void(); 1231 1232 /* If we know that another node is waiting on our lock, kick 1233 * the vote thread * pre-emptively when we reach a release 1234 * condition. */ 1235 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1236 switch(lockres->l_blocking) { 1237 case LKM_EXMODE: 1238 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1239 kick = 1; 1240 break; 1241 case LKM_PRMODE: 1242 if (!lockres->l_ex_holders) 1243 kick = 1; 1244 break; 1245 default: 1246 BUG(); 1247 } 1248 } 1249 1250 if (kick) 1251 ocfs2_kick_vote_thread(osb); 1252 1253 mlog_exit_void(); 1254} 1255 1256void ocfs2_data_unlock(struct inode *inode, 1257 int write) 1258{ 1259 int level = write ? LKM_EXMODE : LKM_PRMODE; 1260 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres; 1261 1262 mlog_entry_void(); 1263 1264 mlog(0, "inode %llu drop %s DATA lock\n", 1265 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1266 write ? "EXMODE" : "PRMODE"); 1267 1268 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) 1269 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1270 1271 mlog_exit_void(); 1272} 1273 1274#define OCFS2_SEC_BITS 34 1275#define OCFS2_SEC_SHIFT (64 - 34) 1276#define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1277 1278/* LVB only has room for 64 bits of time here so we pack it for 1279 * now. */ 1280static u64 ocfs2_pack_timespec(struct timespec *spec) 1281{ 1282 u64 res; 1283 u64 sec = spec->tv_sec; 1284 u32 nsec = spec->tv_nsec; 1285 1286 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1287 1288 return res; 1289} 1290 1291/* Call this with the lockres locked. I am reasonably sure we don't 1292 * need ip_lock in this function as anyone who would be changing those 1293 * values is supposed to be blocked in ocfs2_meta_lock right now. */ 1294static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1295{ 1296 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1297 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres; 1298 struct ocfs2_meta_lvb *lvb; 1299 1300 mlog_entry_void(); 1301 1302 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1303 1304 /* 1305 * Invalidate the LVB of a deleted inode - this way other 1306 * nodes are forced to go to disk and discover the new inode 1307 * status. 1308 */ 1309 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1310 lvb->lvb_version = 0; 1311 goto out; 1312 } 1313 1314 lvb->lvb_version = OCFS2_LVB_VERSION; 1315 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1316 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1317 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1318 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1319 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1320 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1321 lvb->lvb_iatime_packed = 1322 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1323 lvb->lvb_ictime_packed = 1324 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1325 lvb->lvb_imtime_packed = 1326 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1327 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1328 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1329 1330out: 1331 mlog_meta_lvb(0, lockres); 1332 1333 mlog_exit_void(); 1334} 1335 1336static void ocfs2_unpack_timespec(struct timespec *spec, 1337 u64 packed_time) 1338{ 1339 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1340 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1341} 1342 1343static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1344{ 1345 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1346 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres; 1347 struct ocfs2_meta_lvb *lvb; 1348 1349 mlog_entry_void(); 1350 1351 mlog_meta_lvb(0, lockres); 1352 1353 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1354 1355 /* We're safe here without the lockres lock... */ 1356 spin_lock(&oi->ip_lock); 1357 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1358 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1359 1360 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1361 ocfs2_set_inode_flags(inode); 1362 1363 /* fast-symlinks are a special case */ 1364 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1365 inode->i_blocks = 0; 1366 else 1367 inode->i_blocks = 1368 ocfs2_align_bytes_to_sectors(i_size_read(inode)); 1369 1370 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1371 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1372 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1373 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1374 ocfs2_unpack_timespec(&inode->i_atime, 1375 be64_to_cpu(lvb->lvb_iatime_packed)); 1376 ocfs2_unpack_timespec(&inode->i_mtime, 1377 be64_to_cpu(lvb->lvb_imtime_packed)); 1378 ocfs2_unpack_timespec(&inode->i_ctime, 1379 be64_to_cpu(lvb->lvb_ictime_packed)); 1380 spin_unlock(&oi->ip_lock); 1381 1382 mlog_exit_void(); 1383} 1384 1385static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1386 struct ocfs2_lock_res *lockres) 1387{ 1388 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1389 1390 if (lvb->lvb_version == OCFS2_LVB_VERSION 1391 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1392 return 1; 1393 return 0; 1394} 1395 1396/* Determine whether a lock resource needs to be refreshed, and 1397 * arbitrate who gets to refresh it. 1398 * 1399 * 0 means no refresh needed. 1400 * 1401 * > 0 means you need to refresh this and you MUST call 1402 * ocfs2_complete_lock_res_refresh afterwards. */ 1403static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1404{ 1405 unsigned long flags; 1406 int status = 0; 1407 1408 mlog_entry_void(); 1409 1410refresh_check: 1411 spin_lock_irqsave(&lockres->l_lock, flags); 1412 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1413 spin_unlock_irqrestore(&lockres->l_lock, flags); 1414 goto bail; 1415 } 1416 1417 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 1418 spin_unlock_irqrestore(&lockres->l_lock, flags); 1419 1420 ocfs2_wait_on_refreshing_lock(lockres); 1421 goto refresh_check; 1422 } 1423 1424 /* Ok, I'll be the one to refresh this lock. */ 1425 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 1426 spin_unlock_irqrestore(&lockres->l_lock, flags); 1427 1428 status = 1; 1429bail: 1430 mlog_exit(status); 1431 return status; 1432} 1433 1434/* If status is non zero, I'll mark it as not being in refresh 1435 * anymroe, but i won't clear the needs refresh flag. */ 1436static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 1437 int status) 1438{ 1439 unsigned long flags; 1440 mlog_entry_void(); 1441 1442 spin_lock_irqsave(&lockres->l_lock, flags); 1443 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 1444 if (!status) 1445 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 1446 spin_unlock_irqrestore(&lockres->l_lock, flags); 1447 1448 wake_up(&lockres->l_event); 1449 1450 mlog_exit_void(); 1451} 1452 1453/* may or may not return a bh if it went to disk. */ 1454static int ocfs2_meta_lock_update(struct inode *inode, 1455 struct buffer_head **bh) 1456{ 1457 int status = 0; 1458 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1459 struct ocfs2_lock_res *lockres; 1460 struct ocfs2_dinode *fe; 1461 1462 mlog_entry_void(); 1463 1464 spin_lock(&oi->ip_lock); 1465 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1466 mlog(0, "Orphaned inode %llu was deleted while we " 1467 "were waiting on a lock. ip_flags = 0x%x\n", 1468 (unsigned long long)oi->ip_blkno, oi->ip_flags); 1469 spin_unlock(&oi->ip_lock); 1470 status = -ENOENT; 1471 goto bail; 1472 } 1473 spin_unlock(&oi->ip_lock); 1474 1475 lockres = &oi->ip_meta_lockres; 1476 1477 if (!ocfs2_should_refresh_lock_res(lockres)) 1478 goto bail; 1479 1480 /* This will discard any caching information we might have had 1481 * for the inode metadata. */ 1482 ocfs2_metadata_cache_purge(inode); 1483 1484 /* will do nothing for inode types that don't use the extent 1485 * map (directories, bitmap files, etc) */ 1486 ocfs2_extent_map_trunc(inode, 0); 1487 1488 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 1489 mlog(0, "Trusting LVB on inode %llu\n", 1490 (unsigned long long)oi->ip_blkno); 1491 ocfs2_refresh_inode_from_lvb(inode); 1492 } else { 1493 /* Boo, we have to go to disk. */ 1494 /* read bh, cast, ocfs2_refresh_inode */ 1495 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno, 1496 bh, OCFS2_BH_CACHED, inode); 1497 if (status < 0) { 1498 mlog_errno(status); 1499 goto bail_refresh; 1500 } 1501 fe = (struct ocfs2_dinode *) (*bh)->b_data; 1502 1503 /* This is a good chance to make sure we're not 1504 * locking an invalid object. 1505 * 1506 * We bug on a stale inode here because we checked 1507 * above whether it was wiped from disk. The wiping 1508 * node provides a guarantee that we receive that 1509 * message and can mark the inode before dropping any 1510 * locks associated with it. */ 1511 if (!OCFS2_IS_VALID_DINODE(fe)) { 1512 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe); 1513 status = -EIO; 1514 goto bail_refresh; 1515 } 1516 mlog_bug_on_msg(inode->i_generation != 1517 le32_to_cpu(fe->i_generation), 1518 "Invalid dinode %llu disk generation: %u " 1519 "inode->i_generation: %u\n", 1520 (unsigned long long)oi->ip_blkno, 1521 le32_to_cpu(fe->i_generation), 1522 inode->i_generation); 1523 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 1524 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 1525 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 1526 (unsigned long long)oi->ip_blkno, 1527 (unsigned long long)le64_to_cpu(fe->i_dtime), 1528 le32_to_cpu(fe->i_flags)); 1529 1530 ocfs2_refresh_inode(inode, fe); 1531 } 1532 1533 status = 0; 1534bail_refresh: 1535 ocfs2_complete_lock_res_refresh(lockres, status); 1536bail: 1537 mlog_exit(status); 1538 return status; 1539} 1540 1541static int ocfs2_assign_bh(struct inode *inode, 1542 struct buffer_head **ret_bh, 1543 struct buffer_head *passed_bh) 1544{ 1545 int status; 1546 1547 if (passed_bh) { 1548 /* Ok, the update went to disk for us, use the 1549 * returned bh. */ 1550 *ret_bh = passed_bh; 1551 get_bh(*ret_bh); 1552 1553 return 0; 1554 } 1555 1556 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), 1557 OCFS2_I(inode)->ip_blkno, 1558 ret_bh, 1559 OCFS2_BH_CACHED, 1560 inode); 1561 if (status < 0) 1562 mlog_errno(status); 1563 1564 return status; 1565} 1566 1567/* 1568 * returns < 0 error if the callback will never be called, otherwise 1569 * the result of the lock will be communicated via the callback. 1570 */ 1571int ocfs2_meta_lock_full(struct inode *inode, 1572 struct ocfs2_journal_handle *handle, 1573 struct buffer_head **ret_bh, 1574 int ex, 1575 int arg_flags) 1576{ 1577 int status, level, dlm_flags, acquired; 1578 struct ocfs2_lock_res *lockres; 1579 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1580 struct buffer_head *local_bh = NULL; 1581 1582 BUG_ON(!inode); 1583 1584 mlog_entry_void(); 1585 1586 mlog(0, "inode %llu, take %s META lock\n", 1587 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1588 ex ? "EXMODE" : "PRMODE"); 1589 1590 status = 0; 1591 acquired = 0; 1592 /* We'll allow faking a readonly metadata lock for 1593 * rodevices. */ 1594 if (ocfs2_is_hard_readonly(osb)) { 1595 if (ex) 1596 status = -EROFS; 1597 goto bail; 1598 } 1599 1600 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1601 wait_event(osb->recovery_event, 1602 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1603 1604 acquired = 0; 1605 lockres = &OCFS2_I(inode)->ip_meta_lockres; 1606 level = ex ? LKM_EXMODE : LKM_PRMODE; 1607 dlm_flags = 0; 1608 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 1609 dlm_flags |= LKM_NOQUEUE; 1610 1611 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 1612 if (status < 0) { 1613 if (status != -EAGAIN && status != -EIOCBRETRY) 1614 mlog_errno(status); 1615 goto bail; 1616 } 1617 1618 /* Notify the error cleanup path to drop the cluster lock. */ 1619 acquired = 1; 1620 1621 /* We wait twice because a node may have died while we were in 1622 * the lower dlm layers. The second time though, we've 1623 * committed to owning this lock so we don't allow signals to 1624 * abort the operation. */ 1625 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1626 wait_event(osb->recovery_event, 1627 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1628 1629 /* 1630 * We only see this flag if we're being called from 1631 * ocfs2_read_locked_inode(). It means we're locking an inode 1632 * which hasn't been populated yet, so clear the refresh flag 1633 * and let the caller handle it. 1634 */ 1635 if (inode->i_state & I_NEW) { 1636 status = 0; 1637 ocfs2_complete_lock_res_refresh(lockres, 0); 1638 goto bail; 1639 } 1640 1641 /* This is fun. The caller may want a bh back, or it may 1642 * not. ocfs2_meta_lock_update definitely wants one in, but 1643 * may or may not read one, depending on what's in the 1644 * LVB. The result of all of this is that we've *only* gone to 1645 * disk if we have to, so the complexity is worthwhile. */ 1646 status = ocfs2_meta_lock_update(inode, &local_bh); 1647 if (status < 0) { 1648 if (status != -ENOENT) 1649 mlog_errno(status); 1650 goto bail; 1651 } 1652 1653 if (ret_bh) { 1654 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 1655 if (status < 0) { 1656 mlog_errno(status); 1657 goto bail; 1658 } 1659 } 1660 1661 if (handle) { 1662 status = ocfs2_handle_add_lock(handle, inode); 1663 if (status < 0) 1664 mlog_errno(status); 1665 } 1666 1667bail: 1668 if (status < 0) { 1669 if (ret_bh && (*ret_bh)) { 1670 brelse(*ret_bh); 1671 *ret_bh = NULL; 1672 } 1673 if (acquired) 1674 ocfs2_meta_unlock(inode, ex); 1675 } 1676 1677 if (local_bh) 1678 brelse(local_bh); 1679 1680 mlog_exit(status); 1681 return status; 1682} 1683 1684/* 1685 * This is working around a lock inversion between tasks acquiring DLM locks 1686 * while holding a page lock and the vote thread which blocks dlm lock acquiry 1687 * while acquiring page locks. 1688 * 1689 * ** These _with_page variantes are only intended to be called from aop 1690 * methods that hold page locks and return a very specific *positive* error 1691 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 1692 * 1693 * The DLM is called such that it returns -EAGAIN if it would have blocked 1694 * waiting for the vote thread. In that case we unlock our page so the vote 1695 * thread can make progress. Once we've done this we have to return 1696 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up 1697 * into the VFS who will then immediately retry the aop call. 1698 * 1699 * We do a blocking lock and immediate unlock before returning, though, so that 1700 * the lock has a great chance of being cached on this node by the time the VFS 1701 * calls back to retry the aop. This has a potential to livelock as nodes 1702 * ping locks back and forth, but that's a risk we're willing to take to avoid 1703 * the lock inversion simply. 1704 */ 1705int ocfs2_meta_lock_with_page(struct inode *inode, 1706 struct ocfs2_journal_handle *handle, 1707 struct buffer_head **ret_bh, 1708 int ex, 1709 struct page *page) 1710{ 1711 int ret; 1712 1713 ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex, 1714 OCFS2_LOCK_NONBLOCK); 1715 if (ret == -EAGAIN) { 1716 unlock_page(page); 1717 if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0) 1718 ocfs2_meta_unlock(inode, ex); 1719 ret = AOP_TRUNCATED_PAGE; 1720 } 1721 1722 return ret; 1723} 1724 1725void ocfs2_meta_unlock(struct inode *inode, 1726 int ex) 1727{ 1728 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1729 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres; 1730 1731 mlog_entry_void(); 1732 1733 mlog(0, "inode %llu drop %s META lock\n", 1734 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1735 ex ? "EXMODE" : "PRMODE"); 1736 1737 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) 1738 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1739 1740 mlog_exit_void(); 1741} 1742 1743int ocfs2_super_lock(struct ocfs2_super *osb, 1744 int ex) 1745{ 1746 int status; 1747 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1748 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 1749 struct buffer_head *bh; 1750 struct ocfs2_slot_info *si = osb->slot_info; 1751 1752 mlog_entry_void(); 1753 1754 if (ocfs2_is_hard_readonly(osb)) 1755 return -EROFS; 1756 1757 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 1758 if (status < 0) { 1759 mlog_errno(status); 1760 goto bail; 1761 } 1762 1763 /* The super block lock path is really in the best position to 1764 * know when resources covered by the lock need to be 1765 * refreshed, so we do it here. Of course, making sense of 1766 * everything is up to the caller :) */ 1767 status = ocfs2_should_refresh_lock_res(lockres); 1768 if (status < 0) { 1769 mlog_errno(status); 1770 goto bail; 1771 } 1772 if (status) { 1773 bh = si->si_bh; 1774 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0, 1775 si->si_inode); 1776 if (status == 0) 1777 ocfs2_update_slot_info(si); 1778 1779 ocfs2_complete_lock_res_refresh(lockres, status); 1780 1781 if (status < 0) 1782 mlog_errno(status); 1783 } 1784bail: 1785 mlog_exit(status); 1786 return status; 1787} 1788 1789void ocfs2_super_unlock(struct ocfs2_super *osb, 1790 int ex) 1791{ 1792 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1793 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 1794 1795 ocfs2_cluster_unlock(osb, lockres, level); 1796} 1797 1798int ocfs2_rename_lock(struct ocfs2_super *osb) 1799{ 1800 int status; 1801 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 1802 1803 if (ocfs2_is_hard_readonly(osb)) 1804 return -EROFS; 1805 1806 status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0); 1807 if (status < 0) 1808 mlog_errno(status); 1809 1810 return status; 1811} 1812 1813void ocfs2_rename_unlock(struct ocfs2_super *osb) 1814{ 1815 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 1816 1817 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE); 1818} 1819 1820int ocfs2_dentry_lock(struct dentry *dentry, int ex) 1821{ 1822 int ret; 1823 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1824 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 1825 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 1826 1827 BUG_ON(!dl); 1828 1829 if (ocfs2_is_hard_readonly(osb)) 1830 return -EROFS; 1831 1832 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 1833 if (ret < 0) 1834 mlog_errno(ret); 1835 1836 return ret; 1837} 1838 1839void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 1840{ 1841 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1842 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 1843 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 1844 1845 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 1846} 1847 1848/* Reference counting of the dlm debug structure. We want this because 1849 * open references on the debug inodes can live on after a mount, so 1850 * we can't rely on the ocfs2_super to always exist. */ 1851static void ocfs2_dlm_debug_free(struct kref *kref) 1852{ 1853 struct ocfs2_dlm_debug *dlm_debug; 1854 1855 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 1856 1857 kfree(dlm_debug); 1858} 1859 1860void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 1861{ 1862 if (dlm_debug) 1863 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 1864} 1865 1866static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 1867{ 1868 kref_get(&debug->d_refcnt); 1869} 1870 1871struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 1872{ 1873 struct ocfs2_dlm_debug *dlm_debug; 1874 1875 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 1876 if (!dlm_debug) { 1877 mlog_errno(-ENOMEM); 1878 goto out; 1879 } 1880 1881 kref_init(&dlm_debug->d_refcnt); 1882 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 1883 dlm_debug->d_locking_state = NULL; 1884out: 1885 return dlm_debug; 1886} 1887 1888/* Access to this is arbitrated for us via seq_file->sem. */ 1889struct ocfs2_dlm_seq_priv { 1890 struct ocfs2_dlm_debug *p_dlm_debug; 1891 struct ocfs2_lock_res p_iter_res; 1892 struct ocfs2_lock_res p_tmp_res; 1893}; 1894 1895static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 1896 struct ocfs2_dlm_seq_priv *priv) 1897{ 1898 struct ocfs2_lock_res *iter, *ret = NULL; 1899 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 1900 1901 assert_spin_locked(&ocfs2_dlm_tracking_lock); 1902 1903 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 1904 /* discover the head of the list */ 1905 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 1906 mlog(0, "End of list found, %p\n", ret); 1907 break; 1908 } 1909 1910 /* We track our "dummy" iteration lockres' by a NULL 1911 * l_ops field. */ 1912 if (iter->l_ops != NULL) { 1913 ret = iter; 1914 break; 1915 } 1916 } 1917 1918 return ret; 1919} 1920 1921static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 1922{ 1923 struct ocfs2_dlm_seq_priv *priv = m->private; 1924 struct ocfs2_lock_res *iter; 1925 1926 spin_lock(&ocfs2_dlm_tracking_lock); 1927 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 1928 if (iter) { 1929 /* Since lockres' have the lifetime of their container 1930 * (which can be inodes, ocfs2_supers, etc) we want to 1931 * copy this out to a temporary lockres while still 1932 * under the spinlock. Obviously after this we can't 1933 * trust any pointers on the copy returned, but that's 1934 * ok as the information we want isn't typically held 1935 * in them. */ 1936 priv->p_tmp_res = *iter; 1937 iter = &priv->p_tmp_res; 1938 } 1939 spin_unlock(&ocfs2_dlm_tracking_lock); 1940 1941 return iter; 1942} 1943 1944static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 1945{ 1946} 1947 1948static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 1949{ 1950 struct ocfs2_dlm_seq_priv *priv = m->private; 1951 struct ocfs2_lock_res *iter = v; 1952 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 1953 1954 spin_lock(&ocfs2_dlm_tracking_lock); 1955 iter = ocfs2_dlm_next_res(iter, priv); 1956 list_del_init(&dummy->l_debug_list); 1957 if (iter) { 1958 list_add(&dummy->l_debug_list, &iter->l_debug_list); 1959 priv->p_tmp_res = *iter; 1960 iter = &priv->p_tmp_res; 1961 } 1962 spin_unlock(&ocfs2_dlm_tracking_lock); 1963 1964 return iter; 1965} 1966 1967/* So that debugfs.ocfs2 can determine which format is being used */ 1968#define OCFS2_DLM_DEBUG_STR_VERSION 1 1969static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 1970{ 1971 int i; 1972 char *lvb; 1973 struct ocfs2_lock_res *lockres = v; 1974 1975 if (!lockres) 1976 return -EINVAL; 1977 1978 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 1979 1980 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 1981 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 1982 lockres->l_name, 1983 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 1984 else 1985 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 1986 1987 seq_printf(m, "%d\t" 1988 "0x%lx\t" 1989 "0x%x\t" 1990 "0x%x\t" 1991 "%u\t" 1992 "%u\t" 1993 "%d\t" 1994 "%d\t", 1995 lockres->l_level, 1996 lockres->l_flags, 1997 lockres->l_action, 1998 lockres->l_unlock_action, 1999 lockres->l_ro_holders, 2000 lockres->l_ex_holders, 2001 lockres->l_requested, 2002 lockres->l_blocking); 2003 2004 /* Dump the raw LVB */ 2005 lvb = lockres->l_lksb.lvb; 2006 for(i = 0; i < DLM_LVB_LEN; i++) 2007 seq_printf(m, "0x%x\t", lvb[i]); 2008 2009 /* End the line */ 2010 seq_printf(m, "\n"); 2011 return 0; 2012} 2013 2014static struct seq_operations ocfs2_dlm_seq_ops = { 2015 .start = ocfs2_dlm_seq_start, 2016 .stop = ocfs2_dlm_seq_stop, 2017 .next = ocfs2_dlm_seq_next, 2018 .show = ocfs2_dlm_seq_show, 2019}; 2020 2021static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2022{ 2023 struct seq_file *seq = (struct seq_file *) file->private_data; 2024 struct ocfs2_dlm_seq_priv *priv = seq->private; 2025 struct ocfs2_lock_res *res = &priv->p_iter_res; 2026 2027 ocfs2_remove_lockres_tracking(res); 2028 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2029 return seq_release_private(inode, file); 2030} 2031 2032static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2033{ 2034 int ret; 2035 struct ocfs2_dlm_seq_priv *priv; 2036 struct seq_file *seq; 2037 struct ocfs2_super *osb; 2038 2039 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2040 if (!priv) { 2041 ret = -ENOMEM; 2042 mlog_errno(ret); 2043 goto out; 2044 } 2045 osb = (struct ocfs2_super *) inode->u.generic_ip; 2046 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2047 priv->p_dlm_debug = osb->osb_dlm_debug; 2048 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2049 2050 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2051 if (ret) { 2052 kfree(priv); 2053 mlog_errno(ret); 2054 goto out; 2055 } 2056 2057 seq = (struct seq_file *) file->private_data; 2058 seq->private = priv; 2059 2060 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2061 priv->p_dlm_debug); 2062 2063out: 2064 return ret; 2065} 2066 2067static const struct file_operations ocfs2_dlm_debug_fops = { 2068 .open = ocfs2_dlm_debug_open, 2069 .release = ocfs2_dlm_debug_release, 2070 .read = seq_read, 2071 .llseek = seq_lseek, 2072}; 2073 2074static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2075{ 2076 int ret = 0; 2077 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2078 2079 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2080 S_IFREG|S_IRUSR, 2081 osb->osb_debug_root, 2082 osb, 2083 &ocfs2_dlm_debug_fops); 2084 if (!dlm_debug->d_locking_state) { 2085 ret = -EINVAL; 2086 mlog(ML_ERROR, 2087 "Unable to create locking state debugfs file.\n"); 2088 goto out; 2089 } 2090 2091 ocfs2_get_dlm_debug(dlm_debug); 2092out: 2093 return ret; 2094} 2095 2096static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2097{ 2098 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2099 2100 if (dlm_debug) { 2101 debugfs_remove(dlm_debug->d_locking_state); 2102 ocfs2_put_dlm_debug(dlm_debug); 2103 } 2104} 2105 2106int ocfs2_dlm_init(struct ocfs2_super *osb) 2107{ 2108 int status; 2109 u32 dlm_key; 2110 struct dlm_ctxt *dlm; 2111 2112 mlog_entry_void(); 2113 2114 status = ocfs2_dlm_init_debug(osb); 2115 if (status < 0) { 2116 mlog_errno(status); 2117 goto bail; 2118 } 2119 2120 /* launch vote thread */ 2121 osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote"); 2122 if (IS_ERR(osb->vote_task)) { 2123 status = PTR_ERR(osb->vote_task); 2124 osb->vote_task = NULL; 2125 mlog_errno(status); 2126 goto bail; 2127 } 2128 2129 /* used by the dlm code to make message headers unique, each 2130 * node in this domain must agree on this. */ 2131 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str)); 2132 2133 /* for now, uuid == domain */ 2134 dlm = dlm_register_domain(osb->uuid_str, dlm_key); 2135 if (IS_ERR(dlm)) { 2136 status = PTR_ERR(dlm); 2137 mlog_errno(status); 2138 goto bail; 2139 } 2140 2141 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2142 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2143 2144 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb); 2145 2146 osb->dlm = dlm; 2147 2148 status = 0; 2149bail: 2150 if (status < 0) { 2151 ocfs2_dlm_shutdown_debug(osb); 2152 if (osb->vote_task) 2153 kthread_stop(osb->vote_task); 2154 } 2155 2156 mlog_exit(status); 2157 return status; 2158} 2159 2160void ocfs2_dlm_shutdown(struct ocfs2_super *osb) 2161{ 2162 mlog_entry_void(); 2163 2164 dlm_unregister_eviction_cb(&osb->osb_eviction_cb); 2165 2166 ocfs2_drop_osb_locks(osb); 2167 2168 if (osb->vote_task) { 2169 kthread_stop(osb->vote_task); 2170 osb->vote_task = NULL; 2171 } 2172 2173 ocfs2_lock_res_free(&osb->osb_super_lockres); 2174 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2175 2176 dlm_unregister_domain(osb->dlm); 2177 osb->dlm = NULL; 2178 2179 ocfs2_dlm_shutdown_debug(osb); 2180 2181 mlog_exit_void(); 2182} 2183 2184static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) 2185{ 2186 struct ocfs2_lock_res *lockres = opaque; 2187 unsigned long flags; 2188 2189 mlog_entry_void(); 2190 2191 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2192 lockres->l_unlock_action); 2193 2194 spin_lock_irqsave(&lockres->l_lock, flags); 2195 /* We tried to cancel a convert request, but it was already 2196 * granted. All we want to do here is clear our unlock 2197 * state. The wake_up call done at the bottom is redundant 2198 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't 2199 * hurt anything anyway */ 2200 if (status == DLM_CANCELGRANT && 2201 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2202 mlog(0, "Got cancelgrant for %s\n", lockres->l_name); 2203 2204 /* We don't clear the busy flag in this case as it 2205 * should have been cleared by the ast which the dlm 2206 * has called. */ 2207 goto complete_unlock; 2208 } 2209 2210 if (status != DLM_NORMAL) { 2211 mlog(ML_ERROR, "Dlm passes status %d for lock %s, " 2212 "unlock_action %d\n", status, lockres->l_name, 2213 lockres->l_unlock_action); 2214 spin_unlock_irqrestore(&lockres->l_lock, flags); 2215 return; 2216 } 2217 2218 switch(lockres->l_unlock_action) { 2219 case OCFS2_UNLOCK_CANCEL_CONVERT: 2220 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2221 lockres->l_action = OCFS2_AST_INVALID; 2222 break; 2223 case OCFS2_UNLOCK_DROP_LOCK: 2224 lockres->l_level = LKM_IVMODE; 2225 break; 2226 default: 2227 BUG(); 2228 } 2229 2230 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2231complete_unlock: 2232 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2233 spin_unlock_irqrestore(&lockres->l_lock, flags); 2234 2235 wake_up(&lockres->l_event); 2236 2237 mlog_exit_void(); 2238} 2239 2240typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *); 2241 2242struct drop_lock_cb { 2243 ocfs2_pre_drop_cb_t *drop_func; 2244 void *drop_data; 2245}; 2246 2247static int ocfs2_drop_lock(struct ocfs2_super *osb, 2248 struct ocfs2_lock_res *lockres, 2249 struct drop_lock_cb *dcb) 2250{ 2251 enum dlm_status status; 2252 unsigned long flags; 2253 int lkm_flags = 0; 2254 2255 /* We didn't get anywhere near actually using this lockres. */ 2256 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2257 goto out; 2258 2259 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2260 lkm_flags |= LKM_VALBLK; 2261 2262 spin_lock_irqsave(&lockres->l_lock, flags); 2263 2264 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2265 "lockres %s, flags 0x%lx\n", 2266 lockres->l_name, lockres->l_flags); 2267 2268 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2269 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2270 "%u, unlock_action = %u\n", 2271 lockres->l_name, lockres->l_flags, lockres->l_action, 2272 lockres->l_unlock_action); 2273 2274 spin_unlock_irqrestore(&lockres->l_lock, flags); 2275 2276 /* XXX: Today we just wait on any busy 2277 * locks... Perhaps we need to cancel converts in the 2278 * future? */ 2279 ocfs2_wait_on_busy_lock(lockres); 2280 2281 spin_lock_irqsave(&lockres->l_lock, flags); 2282 } 2283 2284 if (dcb) 2285 dcb->drop_func(lockres, dcb->drop_data); 2286 2287 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2288 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2289 lockres->l_name); 2290 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2291 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2292 2293 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2294 spin_unlock_irqrestore(&lockres->l_lock, flags); 2295 goto out; 2296 } 2297 2298 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2299 2300 /* make sure we never get here while waiting for an ast to 2301 * fire. */ 2302 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2303 2304 /* is this necessary? */ 2305 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2306 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2307 spin_unlock_irqrestore(&lockres->l_lock, flags); 2308 2309 mlog(0, "lock %s\n", lockres->l_name); 2310 2311 status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags, 2312 ocfs2_unlock_ast, lockres); 2313 if (status != DLM_NORMAL) { 2314 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2315 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2316 dlm_print_one_lock(lockres->l_lksb.lockid); 2317 BUG(); 2318 } 2319 mlog(0, "lock %s, successfull return from dlmunlock\n", 2320 lockres->l_name); 2321 2322 ocfs2_wait_on_busy_lock(lockres); 2323out: 2324 mlog_exit(0); 2325 return 0; 2326} 2327 2328/* Mark the lockres as being dropped. It will no longer be 2329 * queued if blocking, but we still may have to wait on it 2330 * being dequeued from the vote thread before we can consider 2331 * it safe to drop. 2332 * 2333 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2334void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 2335{ 2336 int status; 2337 struct ocfs2_mask_waiter mw; 2338 unsigned long flags; 2339 2340 ocfs2_init_mask_waiter(&mw); 2341 2342 spin_lock_irqsave(&lockres->l_lock, flags); 2343 lockres->l_flags |= OCFS2_LOCK_FREEING; 2344 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 2345 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 2346 spin_unlock_irqrestore(&lockres->l_lock, flags); 2347 2348 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 2349 2350 status = ocfs2_wait_for_mask(&mw); 2351 if (status) 2352 mlog_errno(status); 2353 2354 spin_lock_irqsave(&lockres->l_lock, flags); 2355 } 2356 spin_unlock_irqrestore(&lockres->l_lock, flags); 2357} 2358 2359void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 2360 struct ocfs2_lock_res *lockres) 2361{ 2362 int ret; 2363 2364 ocfs2_mark_lockres_freeing(lockres); 2365 ret = ocfs2_drop_lock(osb, lockres, NULL); 2366 if (ret) 2367 mlog_errno(ret); 2368} 2369 2370static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2371{ 2372 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 2373 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 2374} 2375 2376static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data) 2377{ 2378 struct inode *inode = data; 2379 2380 /* the metadata lock requires a bit more work as we have an 2381 * LVB to worry about. */ 2382 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2383 lockres->l_level == LKM_EXMODE && 2384 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2385 __ocfs2_stuff_meta_lvb(inode); 2386} 2387 2388int ocfs2_drop_inode_locks(struct inode *inode) 2389{ 2390 int status, err; 2391 struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, }; 2392 2393 mlog_entry_void(); 2394 2395 /* No need to call ocfs2_mark_lockres_freeing here - 2396 * ocfs2_clear_inode has done it for us. */ 2397 2398 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2399 &OCFS2_I(inode)->ip_data_lockres, 2400 NULL); 2401 if (err < 0) 2402 mlog_errno(err); 2403 2404 status = err; 2405 2406 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2407 &OCFS2_I(inode)->ip_meta_lockres, 2408 &meta_dcb); 2409 if (err < 0) 2410 mlog_errno(err); 2411 if (err < 0 && !status) 2412 status = err; 2413 2414 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2415 &OCFS2_I(inode)->ip_rw_lockres, 2416 NULL); 2417 if (err < 0) 2418 mlog_errno(err); 2419 if (err < 0 && !status) 2420 status = err; 2421 2422 mlog_exit(status); 2423 return status; 2424} 2425 2426static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2427 int new_level) 2428{ 2429 assert_spin_locked(&lockres->l_lock); 2430 2431 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 2432 2433 if (lockres->l_level <= new_level) { 2434 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n", 2435 lockres->l_level, new_level); 2436 BUG(); 2437 } 2438 2439 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 2440 lockres->l_name, new_level, lockres->l_blocking); 2441 2442 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2443 lockres->l_requested = new_level; 2444 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2445} 2446 2447static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2448 struct ocfs2_lock_res *lockres, 2449 int new_level, 2450 int lvb) 2451{ 2452 int ret, dlm_flags = LKM_CONVERT; 2453 enum dlm_status status; 2454 2455 mlog_entry_void(); 2456 2457 if (lvb) 2458 dlm_flags |= LKM_VALBLK; 2459 2460 status = dlmlock(osb->dlm, 2461 new_level, 2462 &lockres->l_lksb, 2463 dlm_flags, 2464 lockres->l_name, 2465 OCFS2_LOCK_ID_MAX_LEN - 1, 2466 ocfs2_locking_ast, 2467 lockres, 2468 ocfs2_blocking_ast); 2469 if (status != DLM_NORMAL) { 2470 ocfs2_log_dlm_error("dlmlock", status, lockres); 2471 ret = -EINVAL; 2472 ocfs2_recover_from_dlm_error(lockres, 1); 2473 goto bail; 2474 } 2475 2476 ret = 0; 2477bail: 2478 mlog_exit(ret); 2479 return ret; 2480} 2481 2482/* returns 1 when the caller should unlock and call dlmunlock */ 2483static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 2484 struct ocfs2_lock_res *lockres) 2485{ 2486 assert_spin_locked(&lockres->l_lock); 2487 2488 mlog_entry_void(); 2489 mlog(0, "lock %s\n", lockres->l_name); 2490 2491 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2492 /* If we're already trying to cancel a lock conversion 2493 * then just drop the spinlock and allow the caller to 2494 * requeue this lock. */ 2495 2496 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 2497 return 0; 2498 } 2499 2500 /* were we in a convert when we got the bast fire? */ 2501 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 2502 lockres->l_action != OCFS2_AST_DOWNCONVERT); 2503 /* set things up for the unlockast to know to just 2504 * clear out the ast_action and unset busy, etc. */ 2505 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 2506 2507 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 2508 "lock %s, invalid flags: 0x%lx\n", 2509 lockres->l_name, lockres->l_flags); 2510 2511 return 1; 2512} 2513 2514static int ocfs2_cancel_convert(struct ocfs2_super *osb, 2515 struct ocfs2_lock_res *lockres) 2516{ 2517 int ret; 2518 enum dlm_status status; 2519 2520 mlog_entry_void(); 2521 mlog(0, "lock %s\n", lockres->l_name); 2522 2523 ret = 0; 2524 status = dlmunlock(osb->dlm, 2525 &lockres->l_lksb, 2526 LKM_CANCEL, 2527 ocfs2_unlock_ast, 2528 lockres); 2529 if (status != DLM_NORMAL) { 2530 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2531 ret = -EINVAL; 2532 ocfs2_recover_from_dlm_error(lockres, 0); 2533 } 2534 2535 mlog(0, "lock %s return from dlmunlock\n", lockres->l_name); 2536 2537 mlog_exit(ret); 2538 return ret; 2539} 2540 2541static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode, 2542 struct ocfs2_lock_res *lockres, 2543 int new_level) 2544{ 2545 int ret; 2546 2547 mlog_entry_void(); 2548 2549 BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE); 2550 2551 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 2552 ret = 0; 2553 mlog(0, "lockres %s currently being refreshed -- backing " 2554 "off!\n", lockres->l_name); 2555 } else if (new_level == LKM_PRMODE) 2556 ret = !lockres->l_ex_holders && 2557 ocfs2_inode_fully_checkpointed(inode); 2558 else /* Must be NLMODE we're converting to. */ 2559 ret = !lockres->l_ro_holders && !lockres->l_ex_holders && 2560 ocfs2_inode_fully_checkpointed(inode); 2561 2562 mlog_exit(ret); 2563 return ret; 2564} 2565 2566static int ocfs2_do_unblock_meta(struct inode *inode, 2567 int *requeue) 2568{ 2569 int new_level; 2570 int set_lvb = 0; 2571 int ret = 0; 2572 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres; 2573 unsigned long flags; 2574 2575 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2576 2577 mlog_entry_void(); 2578 2579 spin_lock_irqsave(&lockres->l_lock, flags); 2580 2581 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 2582 2583 mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level, 2584 lockres->l_blocking); 2585 2586 BUG_ON(lockres->l_level != LKM_EXMODE && 2587 lockres->l_level != LKM_PRMODE); 2588 2589 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 2590 *requeue = 1; 2591 ret = ocfs2_prepare_cancel_convert(osb, lockres); 2592 spin_unlock_irqrestore(&lockres->l_lock, flags); 2593 if (ret) { 2594 ret = ocfs2_cancel_convert(osb, lockres); 2595 if (ret < 0) 2596 mlog_errno(ret); 2597 } 2598 goto leave; 2599 } 2600 2601 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 2602 2603 mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n", 2604 lockres->l_level, lockres->l_blocking, new_level); 2605 2606 if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) { 2607 if (lockres->l_level == LKM_EXMODE) 2608 set_lvb = 1; 2609 2610 /* If the lock hasn't been refreshed yet (rare), then 2611 * our memory inode values are old and we skip 2612 * stuffing the lvb. There's no need to actually clear 2613 * out the lvb here as it's value is still valid. */ 2614 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 2615 if (set_lvb) 2616 __ocfs2_stuff_meta_lvb(inode); 2617 } else 2618 mlog(0, "lockres %s: downconverting stale lock!\n", 2619 lockres->l_name); 2620 2621 mlog(0, "calling ocfs2_downconvert_lock with l_level=%d, " 2622 "l_blocking=%d, new_level=%d\n", 2623 lockres->l_level, lockres->l_blocking, new_level); 2624 2625 ocfs2_prepare_downconvert(lockres, new_level); 2626 spin_unlock_irqrestore(&lockres->l_lock, flags); 2627 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 2628 goto leave; 2629 } 2630 if (!ocfs2_inode_fully_checkpointed(inode)) 2631 ocfs2_start_checkpoint(osb); 2632 2633 *requeue = 1; 2634 spin_unlock_irqrestore(&lockres->l_lock, flags); 2635 ret = 0; 2636leave: 2637 mlog_exit(ret); 2638 return ret; 2639} 2640 2641static int ocfs2_generic_unblock_lock(struct ocfs2_super *osb, 2642 struct ocfs2_lock_res *lockres, 2643 struct ocfs2_unblock_ctl *ctl, 2644 ocfs2_convert_worker_t *worker) 2645{ 2646 unsigned long flags; 2647 int blocking; 2648 int new_level; 2649 int ret = 0; 2650 int set_lvb = 0; 2651 2652 mlog_entry_void(); 2653 2654 spin_lock_irqsave(&lockres->l_lock, flags); 2655 2656 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 2657 2658recheck: 2659 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 2660 ctl->requeue = 1; 2661 ret = ocfs2_prepare_cancel_convert(osb, lockres); 2662 spin_unlock_irqrestore(&lockres->l_lock, flags); 2663 if (ret) { 2664 ret = ocfs2_cancel_convert(osb, lockres); 2665 if (ret < 0) 2666 mlog_errno(ret); 2667 } 2668 goto leave; 2669 } 2670 2671 /* if we're blocking an exclusive and we have *any* holders, 2672 * then requeue. */ 2673 if ((lockres->l_blocking == LKM_EXMODE) 2674 && (lockres->l_ex_holders || lockres->l_ro_holders)) 2675 goto leave_requeue; 2676 2677 /* If it's a PR we're blocking, then only 2678 * requeue if we've got any EX holders */ 2679 if (lockres->l_blocking == LKM_PRMODE && 2680 lockres->l_ex_holders) 2681 goto leave_requeue; 2682 2683 /* 2684 * Can we get a lock in this state if the holder counts are 2685 * zero? The meta data unblock code used to check this. 2686 */ 2687 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 2688 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 2689 goto leave_requeue; 2690 2691 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 2692 2693 if (lockres->l_ops->check_downconvert 2694 && !lockres->l_ops->check_downconvert(lockres, new_level)) 2695 goto leave_requeue; 2696 2697 /* If we get here, then we know that there are no more 2698 * incompatible holders (and anyone asking for an incompatible 2699 * lock is blocked). We can now downconvert the lock */ 2700 if (!worker) 2701 goto downconvert; 2702 2703 /* Some lockres types want to do a bit of work before 2704 * downconverting a lock. Allow that here. The worker function 2705 * may sleep, so we save off a copy of what we're blocking as 2706 * it may change while we're not holding the spin lock. */ 2707 blocking = lockres->l_blocking; 2708 spin_unlock_irqrestore(&lockres->l_lock, flags); 2709 2710 ctl->unblock_action = worker(lockres, blocking); 2711 2712 if (ctl->unblock_action == UNBLOCK_STOP_POST) 2713 goto leave; 2714 2715 spin_lock_irqsave(&lockres->l_lock, flags); 2716 if (blocking != lockres->l_blocking) { 2717 /* If this changed underneath us, then we can't drop 2718 * it just yet. */ 2719 goto recheck; 2720 } 2721 2722downconvert: 2723 ctl->requeue = 0; 2724 2725 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2726 if (lockres->l_level == LKM_EXMODE) 2727 set_lvb = 1; 2728 2729 /* 2730 * We only set the lvb if the lock has been fully 2731 * refreshed - otherwise we risk setting stale 2732 * data. Otherwise, there's no need to actually clear 2733 * out the lvb here as it's value is still valid. 2734 */ 2735 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2736 lockres->l_ops->set_lvb(lockres); 2737 } 2738 2739 ocfs2_prepare_downconvert(lockres, new_level); 2740 spin_unlock_irqrestore(&lockres->l_lock, flags); 2741 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 2742leave: 2743 mlog_exit(ret); 2744 return ret; 2745 2746leave_requeue: 2747 spin_unlock_irqrestore(&lockres->l_lock, flags); 2748 ctl->requeue = 1; 2749 2750 mlog_exit(0); 2751 return 0; 2752} 2753 2754static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 2755 int blocking) 2756{ 2757 struct inode *inode; 2758 struct address_space *mapping; 2759 2760 inode = ocfs2_lock_res_inode(lockres); 2761 mapping = inode->i_mapping; 2762 2763 if (filemap_fdatawrite(mapping)) { 2764 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 2765 (unsigned long long)OCFS2_I(inode)->ip_blkno); 2766 } 2767 sync_mapping_buffers(mapping); 2768 if (blocking == LKM_EXMODE) { 2769 truncate_inode_pages(mapping, 0); 2770 unmap_mapping_range(mapping, 0, 0, 0); 2771 } else { 2772 /* We only need to wait on the I/O if we're not also 2773 * truncating pages because truncate_inode_pages waits 2774 * for us above. We don't truncate pages if we're 2775 * blocking anything < EXMODE because we want to keep 2776 * them around in that case. */ 2777 filemap_fdatawait(mapping); 2778 } 2779 2780 return UNBLOCK_CONTINUE; 2781} 2782 2783int ocfs2_unblock_data(struct ocfs2_lock_res *lockres, 2784 struct ocfs2_unblock_ctl *ctl) 2785{ 2786 int status; 2787 struct inode *inode; 2788 struct ocfs2_super *osb; 2789 2790 mlog_entry_void(); 2791 2792 inode = ocfs2_lock_res_inode(lockres); 2793 osb = OCFS2_SB(inode->i_sb); 2794 2795 mlog(0, "unblock inode %llu\n", 2796 (unsigned long long)OCFS2_I(inode)->ip_blkno); 2797 2798 status = ocfs2_generic_unblock_lock(osb, lockres, ctl, 2799 ocfs2_data_convert_worker); 2800 if (status < 0) 2801 mlog_errno(status); 2802 2803 mlog(0, "inode %llu, requeue = %d\n", 2804 (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue); 2805 2806 mlog_exit(status); 2807 return status; 2808} 2809 2810static int ocfs2_unblock_inode_lock(struct ocfs2_lock_res *lockres, 2811 struct ocfs2_unblock_ctl *ctl) 2812{ 2813 int status; 2814 struct inode *inode; 2815 2816 mlog_entry_void(); 2817 2818 mlog(0, "Unblock lockres %s\n", lockres->l_name); 2819 2820 inode = ocfs2_lock_res_inode(lockres); 2821 2822 status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb), 2823 lockres, ctl, NULL); 2824 if (status < 0) 2825 mlog_errno(status); 2826 2827 mlog_exit(status); 2828 return status; 2829} 2830 2831static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 2832 int new_level) 2833{ 2834 struct inode *inode = ocfs2_lock_res_inode(lockres); 2835 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 2836 2837 BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE); 2838 BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed); 2839 2840 if (checkpointed) 2841 return 1; 2842 2843 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb)); 2844 return 0; 2845} 2846 2847static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 2848{ 2849 struct inode *inode = ocfs2_lock_res_inode(lockres); 2850 2851 __ocfs2_stuff_meta_lvb(inode); 2852} 2853 2854static int ocfs2_unblock_meta(struct ocfs2_lock_res *lockres, 2855 struct ocfs2_unblock_ctl *ctl) 2856{ 2857 int status; 2858 struct inode *inode; 2859 2860 mlog_entry_void(); 2861 2862 inode = ocfs2_lock_res_inode(lockres); 2863 2864 mlog(0, "unblock inode %llu\n", 2865 (unsigned long long)OCFS2_I(inode)->ip_blkno); 2866 2867 status = ocfs2_generic_unblock_lock(OCFS2_SB(inode->i_sb), 2868 lockres, ctl, NULL); 2869 if (status < 0) 2870 mlog_errno(status); 2871 2872 mlog(0, "inode %llu, requeue = %d\n", 2873 (unsigned long long)OCFS2_I(inode)->ip_blkno, ctl->requeue); 2874 2875 mlog_exit(status); 2876 return status; 2877} 2878 2879/* 2880 * Does the final reference drop on our dentry lock. Right now this 2881 * happens in the vote thread, but we could choose to simplify the 2882 * dlmglue API and push these off to the ocfs2_wq in the future. 2883 */ 2884static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 2885 struct ocfs2_lock_res *lockres) 2886{ 2887 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 2888 ocfs2_dentry_lock_put(osb, dl); 2889} 2890 2891/* 2892 * d_delete() matching dentries before the lock downconvert. 2893 * 2894 * At this point, any process waiting to destroy the 2895 * dentry_lock due to last ref count is stopped by the 2896 * OCFS2_LOCK_QUEUED flag. 2897 * 2898 * We have two potential problems 2899 * 2900 * 1) If we do the last reference drop on our dentry_lock (via dput) 2901 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 2902 * the downconvert to finish. Instead we take an elevated 2903 * reference and push the drop until after we've completed our 2904 * unblock processing. 2905 * 2906 * 2) There might be another process with a final reference, 2907 * waiting on us to finish processing. If this is the case, we 2908 * detect it and exit out - there's no more dentries anyway. 2909 */ 2910static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 2911 int blocking) 2912{ 2913 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 2914 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 2915 struct dentry *dentry; 2916 unsigned long flags; 2917 int extra_ref = 0; 2918 2919 /* 2920 * This node is blocking another node from getting a read 2921 * lock. This happens when we've renamed within a 2922 * directory. We've forced the other nodes to d_delete(), but 2923 * we never actually dropped our lock because it's still 2924 * valid. The downconvert code will retain a PR for this node, 2925 * so there's no further work to do. 2926 */ 2927 if (blocking == LKM_PRMODE) 2928 return UNBLOCK_CONTINUE; 2929 2930 /* 2931 * Mark this inode as potentially orphaned. The code in 2932 * ocfs2_delete_inode() will figure out whether it actually 2933 * needs to be freed or not. 2934 */ 2935 spin_lock(&oi->ip_lock); 2936 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 2937 spin_unlock(&oi->ip_lock); 2938 2939 /* 2940 * Yuck. We need to make sure however that the check of 2941 * OCFS2_LOCK_FREEING and the extra reference are atomic with 2942 * respect to a reference decrement or the setting of that 2943 * flag. 2944 */ 2945 spin_lock_irqsave(&lockres->l_lock, flags); 2946 spin_lock(&dentry_attach_lock); 2947 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 2948 && dl->dl_count) { 2949 dl->dl_count++; 2950 extra_ref = 1; 2951 } 2952 spin_unlock(&dentry_attach_lock); 2953 spin_unlock_irqrestore(&lockres->l_lock, flags); 2954 2955 mlog(0, "extra_ref = %d\n", extra_ref); 2956 2957 /* 2958 * We have a process waiting on us in ocfs2_dentry_iput(), 2959 * which means we can't have any more outstanding 2960 * aliases. There's no need to do any more work. 2961 */ 2962 if (!extra_ref) 2963 return UNBLOCK_CONTINUE; 2964 2965 spin_lock(&dentry_attach_lock); 2966 while (1) { 2967 dentry = ocfs2_find_local_alias(dl->dl_inode, 2968 dl->dl_parent_blkno, 1); 2969 if (!dentry) 2970 break; 2971 spin_unlock(&dentry_attach_lock); 2972 2973 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 2974 dentry->d_name.name); 2975 2976 /* 2977 * The following dcache calls may do an 2978 * iput(). Normally we don't want that from the 2979 * downconverting thread, but in this case it's ok 2980 * because the requesting node already has an 2981 * exclusive lock on the inode, so it can't be queued 2982 * for a downconvert. 2983 */ 2984 d_delete(dentry); 2985 dput(dentry); 2986 2987 spin_lock(&dentry_attach_lock); 2988 } 2989 spin_unlock(&dentry_attach_lock); 2990 2991 /* 2992 * If we are the last holder of this dentry lock, there is no 2993 * reason to downconvert so skip straight to the unlock. 2994 */ 2995 if (dl->dl_count == 1) 2996 return UNBLOCK_STOP_POST; 2997 2998 return UNBLOCK_CONTINUE_POST; 2999} 3000 3001static int ocfs2_unblock_dentry_lock(struct ocfs2_lock_res *lockres, 3002 struct ocfs2_unblock_ctl *ctl) 3003{ 3004 int ret; 3005 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3006 struct ocfs2_super *osb = OCFS2_SB(dl->dl_inode->i_sb); 3007 3008 mlog(0, "unblock dentry lock: %llu\n", 3009 (unsigned long long)OCFS2_I(dl->dl_inode)->ip_blkno); 3010 3011 ret = ocfs2_generic_unblock_lock(osb, 3012 lockres, 3013 ctl, 3014 ocfs2_dentry_convert_worker); 3015 if (ret < 0) 3016 mlog_errno(ret); 3017 3018 mlog(0, "requeue = %d, post = %d\n", ctl->requeue, ctl->unblock_action); 3019 3020 return ret; 3021} 3022 3023/* Generic unblock function for any lockres whose private data is an 3024 * ocfs2_super pointer. */ 3025static int ocfs2_unblock_osb_lock(struct ocfs2_lock_res *lockres, 3026 struct ocfs2_unblock_ctl *ctl) 3027{ 3028 int status; 3029 struct ocfs2_super *osb; 3030 3031 mlog_entry_void(); 3032 3033 mlog(0, "Unblock lockres %s\n", lockres->l_name); 3034 3035 osb = ocfs2_get_lockres_osb(lockres); 3036 3037 status = ocfs2_generic_unblock_lock(osb, 3038 lockres, 3039 ctl, 3040 NULL); 3041 if (status < 0) 3042 mlog_errno(status); 3043 3044 mlog_exit(status); 3045 return status; 3046} 3047 3048void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3049 struct ocfs2_lock_res *lockres) 3050{ 3051 int status; 3052 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3053 unsigned long flags; 3054 3055 /* Our reference to the lockres in this function can be 3056 * considered valid until we remove the OCFS2_LOCK_QUEUED 3057 * flag. */ 3058 3059 mlog_entry_void(); 3060 3061 BUG_ON(!lockres); 3062 BUG_ON(!lockres->l_ops); 3063 BUG_ON(!lockres->l_ops->unblock); 3064 3065 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3066 3067 /* Detect whether a lock has been marked as going away while 3068 * the vote thread was processing other things. A lock can 3069 * still be marked with OCFS2_LOCK_FREEING after this check, 3070 * but short circuiting here will still save us some 3071 * performance. */ 3072 spin_lock_irqsave(&lockres->l_lock, flags); 3073 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3074 goto unqueue; 3075 spin_unlock_irqrestore(&lockres->l_lock, flags); 3076 3077 status = lockres->l_ops->unblock(lockres, &ctl); 3078 if (status < 0) 3079 mlog_errno(status); 3080 3081 spin_lock_irqsave(&lockres->l_lock, flags); 3082unqueue: 3083 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3084 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3085 } else 3086 ocfs2_schedule_blocked_lock(osb, lockres); 3087 3088 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3089 ctl.requeue ? "yes" : "no"); 3090 spin_unlock_irqrestore(&lockres->l_lock, flags); 3091 3092 if (ctl.unblock_action != UNBLOCK_CONTINUE 3093 && lockres->l_ops->post_unlock) 3094 lockres->l_ops->post_unlock(osb, lockres); 3095 3096 mlog_exit_void(); 3097} 3098 3099static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3100 struct ocfs2_lock_res *lockres) 3101{ 3102 mlog_entry_void(); 3103 3104 assert_spin_locked(&lockres->l_lock); 3105 3106 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3107 /* Do not schedule a lock for downconvert when it's on 3108 * the way to destruction - any nodes wanting access 3109 * to the resource will get it soon. */ 3110 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 3111 lockres->l_name, lockres->l_flags); 3112 return; 3113 } 3114 3115 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3116 3117 spin_lock(&osb->vote_task_lock); 3118 if (list_empty(&lockres->l_blocked_list)) { 3119 list_add_tail(&lockres->l_blocked_list, 3120 &osb->blocked_lock_list); 3121 osb->blocked_lock_count++; 3122 } 3123 spin_unlock(&osb->vote_task_lock); 3124 3125 mlog_exit_void(); 3126} 3127 3128/* This aids in debugging situations where a bad LVB might be involved. */ 3129void ocfs2_dump_meta_lvb_info(u64 level, 3130 const char *function, 3131 unsigned int line, 3132 struct ocfs2_lock_res *lockres) 3133{ 3134 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 3135 3136 mlog(level, "LVB information for %s (called from %s:%u):\n", 3137 lockres->l_name, function, line); 3138 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 3139 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 3140 be32_to_cpu(lvb->lvb_igeneration)); 3141 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 3142 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 3143 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 3144 be16_to_cpu(lvb->lvb_imode)); 3145 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 3146 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 3147 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 3148 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 3149 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 3150 be32_to_cpu(lvb->lvb_iattr)); 3151} 3152