dlmglue.c revision c271c5c22b0a7ca45fda15f1f4d258bca36a5b94
1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26#include <linux/types.h> 27#include <linux/slab.h> 28#include <linux/highmem.h> 29#include <linux/mm.h> 30#include <linux/smp_lock.h> 31#include <linux/crc32.h> 32#include <linux/kthread.h> 33#include <linux/pagemap.h> 34#include <linux/debugfs.h> 35#include <linux/seq_file.h> 36 37#include <cluster/heartbeat.h> 38#include <cluster/nodemanager.h> 39#include <cluster/tcp.h> 40 41#include <dlm/dlmapi.h> 42 43#define MLOG_MASK_PREFIX ML_DLM_GLUE 44#include <cluster/masklog.h> 45 46#include "ocfs2.h" 47 48#include "alloc.h" 49#include "dcache.h" 50#include "dlmglue.h" 51#include "extent_map.h" 52#include "file.h" 53#include "heartbeat.h" 54#include "inode.h" 55#include "journal.h" 56#include "slot_map.h" 57#include "super.h" 58#include "uptodate.h" 59#include "vote.h" 60 61#include "buffer_head_io.h" 62 63struct ocfs2_mask_waiter { 64 struct list_head mw_item; 65 int mw_status; 66 struct completion mw_complete; 67 unsigned long mw_mask; 68 unsigned long mw_goal; 69}; 70 71static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 72static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 73 74/* 75 * Return value from ->downconvert_worker functions. 76 * 77 * These control the precise actions of ocfs2_unblock_lock() 78 * and ocfs2_process_blocked_lock() 79 * 80 */ 81enum ocfs2_unblock_action { 82 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 83 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 84 * ->post_unlock callback */ 85 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 86 * ->post_unlock() callback. */ 87}; 88 89struct ocfs2_unblock_ctl { 90 int requeue; 91 enum ocfs2_unblock_action unblock_action; 92}; 93 94static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 95 int new_level); 96static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 97 98static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 99 int blocking); 100 101static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 102 int blocking); 103 104static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 105 struct ocfs2_lock_res *lockres); 106 107/* 108 * OCFS2 Lock Resource Operations 109 * 110 * These fine tune the behavior of the generic dlmglue locking infrastructure. 111 * 112 * The most basic of lock types can point ->l_priv to their respective 113 * struct ocfs2_super and allow the default actions to manage things. 114 * 115 * Right now, each lock type also needs to implement an init function, 116 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 117 * should be called when the lock is no longer needed (i.e., object 118 * destruction time). 119 */ 120struct ocfs2_lock_res_ops { 121 /* 122 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 123 * this callback if ->l_priv is not an ocfs2_super pointer 124 */ 125 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 126 127 /* 128 * Optionally called in the downconvert (or "vote") thread 129 * after a successful downconvert. The lockres will not be 130 * referenced after this callback is called, so it is safe to 131 * free memory, etc. 132 * 133 * The exact semantics of when this is called are controlled 134 * by ->downconvert_worker() 135 */ 136 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 137 138 /* 139 * Allow a lock type to add checks to determine whether it is 140 * safe to downconvert a lock. Return 0 to re-queue the 141 * downconvert at a later time, nonzero to continue. 142 * 143 * For most locks, the default checks that there are no 144 * incompatible holders are sufficient. 145 * 146 * Called with the lockres spinlock held. 147 */ 148 int (*check_downconvert)(struct ocfs2_lock_res *, int); 149 150 /* 151 * Allows a lock type to populate the lock value block. This 152 * is called on downconvert, and when we drop a lock. 153 * 154 * Locks that want to use this should set LOCK_TYPE_USES_LVB 155 * in the flags field. 156 * 157 * Called with the lockres spinlock held. 158 */ 159 void (*set_lvb)(struct ocfs2_lock_res *); 160 161 /* 162 * Called from the downconvert thread when it is determined 163 * that a lock will be downconverted. This is called without 164 * any locks held so the function can do work that might 165 * schedule (syncing out data, etc). 166 * 167 * This should return any one of the ocfs2_unblock_action 168 * values, depending on what it wants the thread to do. 169 */ 170 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 171 172 /* 173 * LOCK_TYPE_* flags which describe the specific requirements 174 * of a lock type. Descriptions of each individual flag follow. 175 */ 176 int flags; 177}; 178 179/* 180 * Some locks want to "refresh" potentially stale data when a 181 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 182 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 183 * individual lockres l_flags member from the ast function. It is 184 * expected that the locking wrapper will clear the 185 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 186 */ 187#define LOCK_TYPE_REQUIRES_REFRESH 0x1 188 189/* 190 * Indicate that a lock type makes use of the lock value block. The 191 * ->set_lvb lock type callback must be defined. 192 */ 193#define LOCK_TYPE_USES_LVB 0x2 194 195static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 196 .get_osb = ocfs2_get_inode_osb, 197 .flags = 0, 198}; 199 200static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = { 201 .get_osb = ocfs2_get_inode_osb, 202 .check_downconvert = ocfs2_check_meta_downconvert, 203 .set_lvb = ocfs2_set_meta_lvb, 204 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 205}; 206 207static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = { 208 .get_osb = ocfs2_get_inode_osb, 209 .downconvert_worker = ocfs2_data_convert_worker, 210 .flags = 0, 211}; 212 213static struct ocfs2_lock_res_ops ocfs2_super_lops = { 214 .flags = LOCK_TYPE_REQUIRES_REFRESH, 215}; 216 217static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 218 .flags = 0, 219}; 220 221static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 222 .get_osb = ocfs2_get_dentry_osb, 223 .post_unlock = ocfs2_dentry_post_unlock, 224 .downconvert_worker = ocfs2_dentry_convert_worker, 225 .flags = 0, 226}; 227 228static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 229{ 230 return lockres->l_type == OCFS2_LOCK_TYPE_META || 231 lockres->l_type == OCFS2_LOCK_TYPE_DATA || 232 lockres->l_type == OCFS2_LOCK_TYPE_RW; 233} 234 235static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 236{ 237 BUG_ON(!ocfs2_is_inode_lock(lockres)); 238 239 return (struct inode *) lockres->l_priv; 240} 241 242static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 243{ 244 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 245 246 return (struct ocfs2_dentry_lock *)lockres->l_priv; 247} 248 249static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 250{ 251 if (lockres->l_ops->get_osb) 252 return lockres->l_ops->get_osb(lockres); 253 254 return (struct ocfs2_super *)lockres->l_priv; 255} 256 257static int ocfs2_lock_create(struct ocfs2_super *osb, 258 struct ocfs2_lock_res *lockres, 259 int level, 260 int dlm_flags); 261static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 262 int wanted); 263static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 264 struct ocfs2_lock_res *lockres, 265 int level); 266static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 267static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 268static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 269static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 270static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 271 struct ocfs2_lock_res *lockres); 272static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 273 int convert); 274#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \ 275 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ 276 "resource %s: %s\n", dlm_errname(_stat), _func, \ 277 _lockres->l_name, dlm_errmsg(_stat)); \ 278} while (0) 279static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, 280 struct ocfs2_lock_res *lockres); 281static int ocfs2_meta_lock_update(struct inode *inode, 282 struct buffer_head **bh); 283static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 284static inline int ocfs2_highest_compat_lock_level(int level); 285 286static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 287 u64 blkno, 288 u32 generation, 289 char *name) 290{ 291 int len; 292 293 mlog_entry_void(); 294 295 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 296 297 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 298 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 299 (long long)blkno, generation); 300 301 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 302 303 mlog(0, "built lock resource with name: %s\n", name); 304 305 mlog_exit_void(); 306} 307 308static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 309 310static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 311 struct ocfs2_dlm_debug *dlm_debug) 312{ 313 mlog(0, "Add tracking for lockres %s\n", res->l_name); 314 315 spin_lock(&ocfs2_dlm_tracking_lock); 316 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 317 spin_unlock(&ocfs2_dlm_tracking_lock); 318} 319 320static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 321{ 322 spin_lock(&ocfs2_dlm_tracking_lock); 323 if (!list_empty(&res->l_debug_list)) 324 list_del_init(&res->l_debug_list); 325 spin_unlock(&ocfs2_dlm_tracking_lock); 326} 327 328static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 329 struct ocfs2_lock_res *res, 330 enum ocfs2_lock_type type, 331 struct ocfs2_lock_res_ops *ops, 332 void *priv) 333{ 334 res->l_type = type; 335 res->l_ops = ops; 336 res->l_priv = priv; 337 338 res->l_level = LKM_IVMODE; 339 res->l_requested = LKM_IVMODE; 340 res->l_blocking = LKM_IVMODE; 341 res->l_action = OCFS2_AST_INVALID; 342 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 343 344 res->l_flags = OCFS2_LOCK_INITIALIZED; 345 346 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 347} 348 349void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 350{ 351 /* This also clears out the lock status block */ 352 memset(res, 0, sizeof(struct ocfs2_lock_res)); 353 spin_lock_init(&res->l_lock); 354 init_waitqueue_head(&res->l_event); 355 INIT_LIST_HEAD(&res->l_blocked_list); 356 INIT_LIST_HEAD(&res->l_mask_waiters); 357} 358 359void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 360 enum ocfs2_lock_type type, 361 unsigned int generation, 362 struct inode *inode) 363{ 364 struct ocfs2_lock_res_ops *ops; 365 366 switch(type) { 367 case OCFS2_LOCK_TYPE_RW: 368 ops = &ocfs2_inode_rw_lops; 369 break; 370 case OCFS2_LOCK_TYPE_META: 371 ops = &ocfs2_inode_meta_lops; 372 break; 373 case OCFS2_LOCK_TYPE_DATA: 374 ops = &ocfs2_inode_data_lops; 375 break; 376 default: 377 mlog_bug_on_msg(1, "type: %d\n", type); 378 ops = NULL; /* thanks, gcc */ 379 break; 380 }; 381 382 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 383 generation, res->l_name); 384 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 385} 386 387static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 388{ 389 struct inode *inode = ocfs2_lock_res_inode(lockres); 390 391 return OCFS2_SB(inode->i_sb); 392} 393 394static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 395{ 396 __be64 inode_blkno_be; 397 398 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 399 sizeof(__be64)); 400 401 return be64_to_cpu(inode_blkno_be); 402} 403 404static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 405{ 406 struct ocfs2_dentry_lock *dl = lockres->l_priv; 407 408 return OCFS2_SB(dl->dl_inode->i_sb); 409} 410 411void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 412 u64 parent, struct inode *inode) 413{ 414 int len; 415 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 416 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 417 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 418 419 ocfs2_lock_res_init_once(lockres); 420 421 /* 422 * Unfortunately, the standard lock naming scheme won't work 423 * here because we have two 16 byte values to use. Instead, 424 * we'll stuff the inode number as a binary value. We still 425 * want error prints to show something without garbling the 426 * display, so drop a null byte in there before the inode 427 * number. A future version of OCFS2 will likely use all 428 * binary lock names. The stringified names have been a 429 * tremendous aid in debugging, but now that the debugfs 430 * interface exists, we can mangle things there if need be. 431 * 432 * NOTE: We also drop the standard "pad" value (the total lock 433 * name size stays the same though - the last part is all 434 * zeros due to the memset in ocfs2_lock_res_init_once() 435 */ 436 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 437 "%c%016llx", 438 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 439 (long long)parent); 440 441 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 442 443 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 444 sizeof(__be64)); 445 446 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 447 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 448 dl); 449} 450 451static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 452 struct ocfs2_super *osb) 453{ 454 /* Superblock lockres doesn't come from a slab so we call init 455 * once on it manually. */ 456 ocfs2_lock_res_init_once(res); 457 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 458 0, res->l_name); 459 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 460 &ocfs2_super_lops, osb); 461} 462 463static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 464 struct ocfs2_super *osb) 465{ 466 /* Rename lockres doesn't come from a slab so we call init 467 * once on it manually. */ 468 ocfs2_lock_res_init_once(res); 469 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 470 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 471 &ocfs2_rename_lops, osb); 472} 473 474void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 475{ 476 mlog_entry_void(); 477 478 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 479 return; 480 481 ocfs2_remove_lockres_tracking(res); 482 483 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 484 "Lockres %s is on the blocked list\n", 485 res->l_name); 486 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 487 "Lockres %s has mask waiters pending\n", 488 res->l_name); 489 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 490 "Lockres %s is locked\n", 491 res->l_name); 492 mlog_bug_on_msg(res->l_ro_holders, 493 "Lockres %s has %u ro holders\n", 494 res->l_name, res->l_ro_holders); 495 mlog_bug_on_msg(res->l_ex_holders, 496 "Lockres %s has %u ex holders\n", 497 res->l_name, res->l_ex_holders); 498 499 /* Need to clear out the lock status block for the dlm */ 500 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 501 502 res->l_flags = 0UL; 503 mlog_exit_void(); 504} 505 506static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 507 int level) 508{ 509 mlog_entry_void(); 510 511 BUG_ON(!lockres); 512 513 switch(level) { 514 case LKM_EXMODE: 515 lockres->l_ex_holders++; 516 break; 517 case LKM_PRMODE: 518 lockres->l_ro_holders++; 519 break; 520 default: 521 BUG(); 522 } 523 524 mlog_exit_void(); 525} 526 527static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 528 int level) 529{ 530 mlog_entry_void(); 531 532 BUG_ON(!lockres); 533 534 switch(level) { 535 case LKM_EXMODE: 536 BUG_ON(!lockres->l_ex_holders); 537 lockres->l_ex_holders--; 538 break; 539 case LKM_PRMODE: 540 BUG_ON(!lockres->l_ro_holders); 541 lockres->l_ro_holders--; 542 break; 543 default: 544 BUG(); 545 } 546 mlog_exit_void(); 547} 548 549/* WARNING: This function lives in a world where the only three lock 550 * levels are EX, PR, and NL. It *will* have to be adjusted when more 551 * lock types are added. */ 552static inline int ocfs2_highest_compat_lock_level(int level) 553{ 554 int new_level = LKM_EXMODE; 555 556 if (level == LKM_EXMODE) 557 new_level = LKM_NLMODE; 558 else if (level == LKM_PRMODE) 559 new_level = LKM_PRMODE; 560 return new_level; 561} 562 563static void lockres_set_flags(struct ocfs2_lock_res *lockres, 564 unsigned long newflags) 565{ 566 struct list_head *pos, *tmp; 567 struct ocfs2_mask_waiter *mw; 568 569 assert_spin_locked(&lockres->l_lock); 570 571 lockres->l_flags = newflags; 572 573 list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) { 574 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item); 575 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 576 continue; 577 578 list_del_init(&mw->mw_item); 579 mw->mw_status = 0; 580 complete(&mw->mw_complete); 581 } 582} 583static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 584{ 585 lockres_set_flags(lockres, lockres->l_flags | or); 586} 587static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 588 unsigned long clear) 589{ 590 lockres_set_flags(lockres, lockres->l_flags & ~clear); 591} 592 593static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 594{ 595 mlog_entry_void(); 596 597 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 598 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 599 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 600 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 601 602 lockres->l_level = lockres->l_requested; 603 if (lockres->l_level <= 604 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 605 lockres->l_blocking = LKM_NLMODE; 606 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 607 } 608 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 609 610 mlog_exit_void(); 611} 612 613static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 614{ 615 mlog_entry_void(); 616 617 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 618 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 619 620 /* Convert from RO to EX doesn't really need anything as our 621 * information is already up to data. Convert from NL to 622 * *anything* however should mark ourselves as needing an 623 * update */ 624 if (lockres->l_level == LKM_NLMODE && 625 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 626 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 627 628 lockres->l_level = lockres->l_requested; 629 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 630 631 mlog_exit_void(); 632} 633 634static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 635{ 636 mlog_entry_void(); 637 638 BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY)); 639 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 640 641 if (lockres->l_requested > LKM_NLMODE && 642 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 643 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 644 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 645 646 lockres->l_level = lockres->l_requested; 647 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 648 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 649 650 mlog_exit_void(); 651} 652 653static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 654 int level) 655{ 656 int needs_downconvert = 0; 657 mlog_entry_void(); 658 659 assert_spin_locked(&lockres->l_lock); 660 661 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 662 663 if (level > lockres->l_blocking) { 664 /* only schedule a downconvert if we haven't already scheduled 665 * one that goes low enough to satisfy the level we're 666 * blocking. this also catches the case where we get 667 * duplicate BASTs */ 668 if (ocfs2_highest_compat_lock_level(level) < 669 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 670 needs_downconvert = 1; 671 672 lockres->l_blocking = level; 673 } 674 675 mlog_exit(needs_downconvert); 676 return needs_downconvert; 677} 678 679static void ocfs2_blocking_ast(void *opaque, int level) 680{ 681 struct ocfs2_lock_res *lockres = opaque; 682 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 683 int needs_downconvert; 684 unsigned long flags; 685 686 BUG_ON(level <= LKM_NLMODE); 687 688 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 689 lockres->l_name, level, lockres->l_level, 690 ocfs2_lock_type_string(lockres->l_type)); 691 692 spin_lock_irqsave(&lockres->l_lock, flags); 693 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 694 if (needs_downconvert) 695 ocfs2_schedule_blocked_lock(osb, lockres); 696 spin_unlock_irqrestore(&lockres->l_lock, flags); 697 698 wake_up(&lockres->l_event); 699 700 ocfs2_kick_vote_thread(osb); 701} 702 703static void ocfs2_locking_ast(void *opaque) 704{ 705 struct ocfs2_lock_res *lockres = opaque; 706 struct dlm_lockstatus *lksb = &lockres->l_lksb; 707 unsigned long flags; 708 709 spin_lock_irqsave(&lockres->l_lock, flags); 710 711 if (lksb->status != DLM_NORMAL) { 712 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n", 713 lockres->l_name, lksb->status); 714 spin_unlock_irqrestore(&lockres->l_lock, flags); 715 return; 716 } 717 718 switch(lockres->l_action) { 719 case OCFS2_AST_ATTACH: 720 ocfs2_generic_handle_attach_action(lockres); 721 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 722 break; 723 case OCFS2_AST_CONVERT: 724 ocfs2_generic_handle_convert_action(lockres); 725 break; 726 case OCFS2_AST_DOWNCONVERT: 727 ocfs2_generic_handle_downconvert_action(lockres); 728 break; 729 default: 730 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 731 "lockres flags = 0x%lx, unlock action: %u\n", 732 lockres->l_name, lockres->l_action, lockres->l_flags, 733 lockres->l_unlock_action); 734 BUG(); 735 } 736 737 /* set it to something invalid so if we get called again we 738 * can catch it. */ 739 lockres->l_action = OCFS2_AST_INVALID; 740 741 wake_up(&lockres->l_event); 742 spin_unlock_irqrestore(&lockres->l_lock, flags); 743} 744 745static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 746 int convert) 747{ 748 unsigned long flags; 749 750 mlog_entry_void(); 751 spin_lock_irqsave(&lockres->l_lock, flags); 752 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 753 if (convert) 754 lockres->l_action = OCFS2_AST_INVALID; 755 else 756 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 757 spin_unlock_irqrestore(&lockres->l_lock, flags); 758 759 wake_up(&lockres->l_event); 760 mlog_exit_void(); 761} 762 763/* Note: If we detect another process working on the lock (i.e., 764 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 765 * to do the right thing in that case. 766 */ 767static int ocfs2_lock_create(struct ocfs2_super *osb, 768 struct ocfs2_lock_res *lockres, 769 int level, 770 int dlm_flags) 771{ 772 int ret = 0; 773 enum dlm_status status = DLM_NORMAL; 774 unsigned long flags; 775 776 mlog_entry_void(); 777 778 mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level, 779 dlm_flags); 780 781 spin_lock_irqsave(&lockres->l_lock, flags); 782 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 783 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 784 spin_unlock_irqrestore(&lockres->l_lock, flags); 785 goto bail; 786 } 787 788 lockres->l_action = OCFS2_AST_ATTACH; 789 lockres->l_requested = level; 790 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 791 spin_unlock_irqrestore(&lockres->l_lock, flags); 792 793 status = dlmlock(osb->dlm, 794 level, 795 &lockres->l_lksb, 796 dlm_flags, 797 lockres->l_name, 798 OCFS2_LOCK_ID_MAX_LEN - 1, 799 ocfs2_locking_ast, 800 lockres, 801 ocfs2_blocking_ast); 802 if (status != DLM_NORMAL) { 803 ocfs2_log_dlm_error("dlmlock", status, lockres); 804 ret = -EINVAL; 805 ocfs2_recover_from_dlm_error(lockres, 1); 806 } 807 808 mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name); 809 810bail: 811 mlog_exit(ret); 812 return ret; 813} 814 815static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 816 int flag) 817{ 818 unsigned long flags; 819 int ret; 820 821 spin_lock_irqsave(&lockres->l_lock, flags); 822 ret = lockres->l_flags & flag; 823 spin_unlock_irqrestore(&lockres->l_lock, flags); 824 825 return ret; 826} 827 828static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 829 830{ 831 wait_event(lockres->l_event, 832 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 833} 834 835static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 836 837{ 838 wait_event(lockres->l_event, 839 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 840} 841 842/* predict what lock level we'll be dropping down to on behalf 843 * of another node, and return true if the currently wanted 844 * level will be compatible with it. */ 845static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 846 int wanted) 847{ 848 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 849 850 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 851} 852 853static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 854{ 855 INIT_LIST_HEAD(&mw->mw_item); 856 init_completion(&mw->mw_complete); 857} 858 859static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 860{ 861 wait_for_completion(&mw->mw_complete); 862 /* Re-arm the completion in case we want to wait on it again */ 863 INIT_COMPLETION(mw->mw_complete); 864 return mw->mw_status; 865} 866 867static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 868 struct ocfs2_mask_waiter *mw, 869 unsigned long mask, 870 unsigned long goal) 871{ 872 BUG_ON(!list_empty(&mw->mw_item)); 873 874 assert_spin_locked(&lockres->l_lock); 875 876 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 877 mw->mw_mask = mask; 878 mw->mw_goal = goal; 879} 880 881/* returns 0 if the mw that was removed was already satisfied, -EBUSY 882 * if the mask still hadn't reached its goal */ 883static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 884 struct ocfs2_mask_waiter *mw) 885{ 886 unsigned long flags; 887 int ret = 0; 888 889 spin_lock_irqsave(&lockres->l_lock, flags); 890 if (!list_empty(&mw->mw_item)) { 891 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 892 ret = -EBUSY; 893 894 list_del_init(&mw->mw_item); 895 init_completion(&mw->mw_complete); 896 } 897 spin_unlock_irqrestore(&lockres->l_lock, flags); 898 899 return ret; 900 901} 902 903static int ocfs2_cluster_lock(struct ocfs2_super *osb, 904 struct ocfs2_lock_res *lockres, 905 int level, 906 int lkm_flags, 907 int arg_flags) 908{ 909 struct ocfs2_mask_waiter mw; 910 enum dlm_status status; 911 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 912 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 913 unsigned long flags; 914 915 mlog_entry_void(); 916 917 ocfs2_init_mask_waiter(&mw); 918 919 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 920 lkm_flags |= LKM_VALBLK; 921 922again: 923 wait = 0; 924 925 if (catch_signals && signal_pending(current)) { 926 ret = -ERESTARTSYS; 927 goto out; 928 } 929 930 spin_lock_irqsave(&lockres->l_lock, flags); 931 932 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 933 "Cluster lock called on freeing lockres %s! flags " 934 "0x%lx\n", lockres->l_name, lockres->l_flags); 935 936 /* We only compare against the currently granted level 937 * here. If the lock is blocked waiting on a downconvert, 938 * we'll get caught below. */ 939 if (lockres->l_flags & OCFS2_LOCK_BUSY && 940 level > lockres->l_level) { 941 /* is someone sitting in dlm_lock? If so, wait on 942 * them. */ 943 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 944 wait = 1; 945 goto unlock; 946 } 947 948 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 949 /* lock has not been created yet. */ 950 spin_unlock_irqrestore(&lockres->l_lock, flags); 951 952 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); 953 if (ret < 0) { 954 mlog_errno(ret); 955 goto out; 956 } 957 goto again; 958 } 959 960 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 961 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 962 /* is the lock is currently blocked on behalf of 963 * another node */ 964 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 965 wait = 1; 966 goto unlock; 967 } 968 969 if (level > lockres->l_level) { 970 if (lockres->l_action != OCFS2_AST_INVALID) 971 mlog(ML_ERROR, "lockres %s has action %u pending\n", 972 lockres->l_name, lockres->l_action); 973 974 lockres->l_action = OCFS2_AST_CONVERT; 975 lockres->l_requested = level; 976 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 977 spin_unlock_irqrestore(&lockres->l_lock, flags); 978 979 BUG_ON(level == LKM_IVMODE); 980 BUG_ON(level == LKM_NLMODE); 981 982 mlog(0, "lock %s, convert from %d to level = %d\n", 983 lockres->l_name, lockres->l_level, level); 984 985 /* call dlm_lock to upgrade lock now */ 986 status = dlmlock(osb->dlm, 987 level, 988 &lockres->l_lksb, 989 lkm_flags|LKM_CONVERT, 990 lockres->l_name, 991 OCFS2_LOCK_ID_MAX_LEN - 1, 992 ocfs2_locking_ast, 993 lockres, 994 ocfs2_blocking_ast); 995 if (status != DLM_NORMAL) { 996 if ((lkm_flags & LKM_NOQUEUE) && 997 (status == DLM_NOTQUEUED)) 998 ret = -EAGAIN; 999 else { 1000 ocfs2_log_dlm_error("dlmlock", status, 1001 lockres); 1002 ret = -EINVAL; 1003 } 1004 ocfs2_recover_from_dlm_error(lockres, 1); 1005 goto out; 1006 } 1007 1008 mlog(0, "lock %s, successfull return from dlmlock\n", 1009 lockres->l_name); 1010 1011 /* At this point we've gone inside the dlm and need to 1012 * complete our work regardless. */ 1013 catch_signals = 0; 1014 1015 /* wait for busy to clear and carry on */ 1016 goto again; 1017 } 1018 1019 /* Ok, if we get here then we're good to go. */ 1020 ocfs2_inc_holders(lockres, level); 1021 1022 ret = 0; 1023unlock: 1024 spin_unlock_irqrestore(&lockres->l_lock, flags); 1025out: 1026 /* 1027 * This is helping work around a lock inversion between the page lock 1028 * and dlm locks. One path holds the page lock while calling aops 1029 * which block acquiring dlm locks. The voting thread holds dlm 1030 * locks while acquiring page locks while down converting data locks. 1031 * This block is helping an aop path notice the inversion and back 1032 * off to unlock its page lock before trying the dlm lock again. 1033 */ 1034 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1035 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1036 wait = 0; 1037 if (lockres_remove_mask_waiter(lockres, &mw)) 1038 ret = -EAGAIN; 1039 else 1040 goto again; 1041 } 1042 if (wait) { 1043 ret = ocfs2_wait_for_mask(&mw); 1044 if (ret == 0) 1045 goto again; 1046 mlog_errno(ret); 1047 } 1048 1049 mlog_exit(ret); 1050 return ret; 1051} 1052 1053static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1054 struct ocfs2_lock_res *lockres, 1055 int level) 1056{ 1057 unsigned long flags; 1058 1059 mlog_entry_void(); 1060 spin_lock_irqsave(&lockres->l_lock, flags); 1061 ocfs2_dec_holders(lockres, level); 1062 ocfs2_vote_on_unlock(osb, lockres); 1063 spin_unlock_irqrestore(&lockres->l_lock, flags); 1064 mlog_exit_void(); 1065} 1066 1067static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1068 struct ocfs2_lock_res *lockres, 1069 int ex, 1070 int local) 1071{ 1072 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1073 unsigned long flags; 1074 int lkm_flags = local ? LKM_LOCAL : 0; 1075 1076 spin_lock_irqsave(&lockres->l_lock, flags); 1077 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1078 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1079 spin_unlock_irqrestore(&lockres->l_lock, flags); 1080 1081 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1082} 1083 1084/* Grants us an EX lock on the data and metadata resources, skipping 1085 * the normal cluster directory lookup. Use this ONLY on newly created 1086 * inodes which other nodes can't possibly see, and which haven't been 1087 * hashed in the inode hash yet. This can give us a good performance 1088 * increase as it'll skip the network broadcast normally associated 1089 * with creating a new lock resource. */ 1090int ocfs2_create_new_inode_locks(struct inode *inode) 1091{ 1092 int ret; 1093 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1094 1095 BUG_ON(!inode); 1096 BUG_ON(!ocfs2_inode_is_new(inode)); 1097 1098 mlog_entry_void(); 1099 1100 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1101 1102 /* NOTE: That we don't increment any of the holder counts, nor 1103 * do we add anything to a journal handle. Since this is 1104 * supposed to be a new inode which the cluster doesn't know 1105 * about yet, there is no need to. As far as the LVB handling 1106 * is concerned, this is basically like acquiring an EX lock 1107 * on a resource which has an invalid one -- we'll set it 1108 * valid when we release the EX. */ 1109 1110 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1111 if (ret) { 1112 mlog_errno(ret); 1113 goto bail; 1114 } 1115 1116 /* 1117 * We don't want to use LKM_LOCAL on a meta data lock as they 1118 * don't use a generation in their lock names. 1119 */ 1120 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0); 1121 if (ret) { 1122 mlog_errno(ret); 1123 goto bail; 1124 } 1125 1126 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1); 1127 if (ret) { 1128 mlog_errno(ret); 1129 goto bail; 1130 } 1131 1132bail: 1133 mlog_exit(ret); 1134 return ret; 1135} 1136 1137int ocfs2_rw_lock(struct inode *inode, int write) 1138{ 1139 int status, level; 1140 struct ocfs2_lock_res *lockres; 1141 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1142 1143 BUG_ON(!inode); 1144 1145 mlog_entry_void(); 1146 1147 mlog(0, "inode %llu take %s RW lock\n", 1148 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1149 write ? "EXMODE" : "PRMODE"); 1150 1151 if (ocfs2_mount_local(osb)) 1152 return 0; 1153 1154 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1155 1156 level = write ? LKM_EXMODE : LKM_PRMODE; 1157 1158 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1159 0); 1160 if (status < 0) 1161 mlog_errno(status); 1162 1163 mlog_exit(status); 1164 return status; 1165} 1166 1167void ocfs2_rw_unlock(struct inode *inode, int write) 1168{ 1169 int level = write ? LKM_EXMODE : LKM_PRMODE; 1170 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1171 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1172 1173 mlog_entry_void(); 1174 1175 mlog(0, "inode %llu drop %s RW lock\n", 1176 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1177 write ? "EXMODE" : "PRMODE"); 1178 1179 if (!ocfs2_mount_local(osb)) 1180 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1181 1182 mlog_exit_void(); 1183} 1184 1185int ocfs2_data_lock_full(struct inode *inode, 1186 int write, 1187 int arg_flags) 1188{ 1189 int status = 0, level; 1190 struct ocfs2_lock_res *lockres; 1191 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1192 1193 BUG_ON(!inode); 1194 1195 mlog_entry_void(); 1196 1197 mlog(0, "inode %llu take %s DATA lock\n", 1198 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1199 write ? "EXMODE" : "PRMODE"); 1200 1201 /* We'll allow faking a readonly data lock for 1202 * rodevices. */ 1203 if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) { 1204 if (write) { 1205 status = -EROFS; 1206 mlog_errno(status); 1207 } 1208 goto out; 1209 } 1210 1211 if (ocfs2_mount_local(osb)) 1212 goto out; 1213 1214 lockres = &OCFS2_I(inode)->ip_data_lockres; 1215 1216 level = write ? LKM_EXMODE : LKM_PRMODE; 1217 1218 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 1219 0, arg_flags); 1220 if (status < 0 && status != -EAGAIN) 1221 mlog_errno(status); 1222 1223out: 1224 mlog_exit(status); 1225 return status; 1226} 1227 1228/* see ocfs2_meta_lock_with_page() */ 1229int ocfs2_data_lock_with_page(struct inode *inode, 1230 int write, 1231 struct page *page) 1232{ 1233 int ret; 1234 1235 ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK); 1236 if (ret == -EAGAIN) { 1237 unlock_page(page); 1238 if (ocfs2_data_lock(inode, write) == 0) 1239 ocfs2_data_unlock(inode, write); 1240 ret = AOP_TRUNCATED_PAGE; 1241 } 1242 1243 return ret; 1244} 1245 1246static void ocfs2_vote_on_unlock(struct ocfs2_super *osb, 1247 struct ocfs2_lock_res *lockres) 1248{ 1249 int kick = 0; 1250 1251 mlog_entry_void(); 1252 1253 /* If we know that another node is waiting on our lock, kick 1254 * the vote thread * pre-emptively when we reach a release 1255 * condition. */ 1256 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1257 switch(lockres->l_blocking) { 1258 case LKM_EXMODE: 1259 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1260 kick = 1; 1261 break; 1262 case LKM_PRMODE: 1263 if (!lockres->l_ex_holders) 1264 kick = 1; 1265 break; 1266 default: 1267 BUG(); 1268 } 1269 } 1270 1271 if (kick) 1272 ocfs2_kick_vote_thread(osb); 1273 1274 mlog_exit_void(); 1275} 1276 1277void ocfs2_data_unlock(struct inode *inode, 1278 int write) 1279{ 1280 int level = write ? LKM_EXMODE : LKM_PRMODE; 1281 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres; 1282 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1283 1284 mlog_entry_void(); 1285 1286 mlog(0, "inode %llu drop %s DATA lock\n", 1287 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1288 write ? "EXMODE" : "PRMODE"); 1289 1290 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 1291 !ocfs2_mount_local(osb)) 1292 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1293 1294 mlog_exit_void(); 1295} 1296 1297#define OCFS2_SEC_BITS 34 1298#define OCFS2_SEC_SHIFT (64 - 34) 1299#define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1300 1301/* LVB only has room for 64 bits of time here so we pack it for 1302 * now. */ 1303static u64 ocfs2_pack_timespec(struct timespec *spec) 1304{ 1305 u64 res; 1306 u64 sec = spec->tv_sec; 1307 u32 nsec = spec->tv_nsec; 1308 1309 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1310 1311 return res; 1312} 1313 1314/* Call this with the lockres locked. I am reasonably sure we don't 1315 * need ip_lock in this function as anyone who would be changing those 1316 * values is supposed to be blocked in ocfs2_meta_lock right now. */ 1317static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1318{ 1319 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1320 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres; 1321 struct ocfs2_meta_lvb *lvb; 1322 1323 mlog_entry_void(); 1324 1325 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1326 1327 /* 1328 * Invalidate the LVB of a deleted inode - this way other 1329 * nodes are forced to go to disk and discover the new inode 1330 * status. 1331 */ 1332 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1333 lvb->lvb_version = 0; 1334 goto out; 1335 } 1336 1337 lvb->lvb_version = OCFS2_LVB_VERSION; 1338 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1339 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1340 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1341 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1342 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1343 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1344 lvb->lvb_iatime_packed = 1345 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1346 lvb->lvb_ictime_packed = 1347 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1348 lvb->lvb_imtime_packed = 1349 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1350 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1351 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1352 1353out: 1354 mlog_meta_lvb(0, lockres); 1355 1356 mlog_exit_void(); 1357} 1358 1359static void ocfs2_unpack_timespec(struct timespec *spec, 1360 u64 packed_time) 1361{ 1362 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1363 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1364} 1365 1366static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1367{ 1368 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1369 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres; 1370 struct ocfs2_meta_lvb *lvb; 1371 1372 mlog_entry_void(); 1373 1374 mlog_meta_lvb(0, lockres); 1375 1376 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1377 1378 /* We're safe here without the lockres lock... */ 1379 spin_lock(&oi->ip_lock); 1380 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1381 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1382 1383 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1384 ocfs2_set_inode_flags(inode); 1385 1386 /* fast-symlinks are a special case */ 1387 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1388 inode->i_blocks = 0; 1389 else 1390 inode->i_blocks = 1391 ocfs2_align_bytes_to_sectors(i_size_read(inode)); 1392 1393 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1394 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1395 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1396 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1397 ocfs2_unpack_timespec(&inode->i_atime, 1398 be64_to_cpu(lvb->lvb_iatime_packed)); 1399 ocfs2_unpack_timespec(&inode->i_mtime, 1400 be64_to_cpu(lvb->lvb_imtime_packed)); 1401 ocfs2_unpack_timespec(&inode->i_ctime, 1402 be64_to_cpu(lvb->lvb_ictime_packed)); 1403 spin_unlock(&oi->ip_lock); 1404 1405 mlog_exit_void(); 1406} 1407 1408static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1409 struct ocfs2_lock_res *lockres) 1410{ 1411 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1412 1413 if (lvb->lvb_version == OCFS2_LVB_VERSION 1414 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1415 return 1; 1416 return 0; 1417} 1418 1419/* Determine whether a lock resource needs to be refreshed, and 1420 * arbitrate who gets to refresh it. 1421 * 1422 * 0 means no refresh needed. 1423 * 1424 * > 0 means you need to refresh this and you MUST call 1425 * ocfs2_complete_lock_res_refresh afterwards. */ 1426static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1427{ 1428 unsigned long flags; 1429 int status = 0; 1430 1431 mlog_entry_void(); 1432 1433refresh_check: 1434 spin_lock_irqsave(&lockres->l_lock, flags); 1435 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1436 spin_unlock_irqrestore(&lockres->l_lock, flags); 1437 goto bail; 1438 } 1439 1440 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 1441 spin_unlock_irqrestore(&lockres->l_lock, flags); 1442 1443 ocfs2_wait_on_refreshing_lock(lockres); 1444 goto refresh_check; 1445 } 1446 1447 /* Ok, I'll be the one to refresh this lock. */ 1448 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 1449 spin_unlock_irqrestore(&lockres->l_lock, flags); 1450 1451 status = 1; 1452bail: 1453 mlog_exit(status); 1454 return status; 1455} 1456 1457/* If status is non zero, I'll mark it as not being in refresh 1458 * anymroe, but i won't clear the needs refresh flag. */ 1459static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 1460 int status) 1461{ 1462 unsigned long flags; 1463 mlog_entry_void(); 1464 1465 spin_lock_irqsave(&lockres->l_lock, flags); 1466 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 1467 if (!status) 1468 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 1469 spin_unlock_irqrestore(&lockres->l_lock, flags); 1470 1471 wake_up(&lockres->l_event); 1472 1473 mlog_exit_void(); 1474} 1475 1476/* may or may not return a bh if it went to disk. */ 1477static int ocfs2_meta_lock_update(struct inode *inode, 1478 struct buffer_head **bh) 1479{ 1480 int status = 0; 1481 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1482 struct ocfs2_lock_res *lockres = NULL; 1483 struct ocfs2_dinode *fe; 1484 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1485 1486 mlog_entry_void(); 1487 1488 spin_lock(&oi->ip_lock); 1489 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1490 mlog(0, "Orphaned inode %llu was deleted while we " 1491 "were waiting on a lock. ip_flags = 0x%x\n", 1492 (unsigned long long)oi->ip_blkno, oi->ip_flags); 1493 spin_unlock(&oi->ip_lock); 1494 status = -ENOENT; 1495 goto bail; 1496 } 1497 spin_unlock(&oi->ip_lock); 1498 1499 if (!ocfs2_mount_local(osb)) { 1500 lockres = &oi->ip_meta_lockres; 1501 1502 if (!ocfs2_should_refresh_lock_res(lockres)) 1503 goto bail; 1504 } 1505 1506 /* This will discard any caching information we might have had 1507 * for the inode metadata. */ 1508 ocfs2_metadata_cache_purge(inode); 1509 1510 /* will do nothing for inode types that don't use the extent 1511 * map (directories, bitmap files, etc) */ 1512 ocfs2_extent_map_trunc(inode, 0); 1513 1514 if (lockres && ocfs2_meta_lvb_is_trustable(inode, lockres)) { 1515 mlog(0, "Trusting LVB on inode %llu\n", 1516 (unsigned long long)oi->ip_blkno); 1517 ocfs2_refresh_inode_from_lvb(inode); 1518 } else { 1519 /* Boo, we have to go to disk. */ 1520 /* read bh, cast, ocfs2_refresh_inode */ 1521 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno, 1522 bh, OCFS2_BH_CACHED, inode); 1523 if (status < 0) { 1524 mlog_errno(status); 1525 goto bail_refresh; 1526 } 1527 fe = (struct ocfs2_dinode *) (*bh)->b_data; 1528 1529 /* This is a good chance to make sure we're not 1530 * locking an invalid object. 1531 * 1532 * We bug on a stale inode here because we checked 1533 * above whether it was wiped from disk. The wiping 1534 * node provides a guarantee that we receive that 1535 * message and can mark the inode before dropping any 1536 * locks associated with it. */ 1537 if (!OCFS2_IS_VALID_DINODE(fe)) { 1538 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe); 1539 status = -EIO; 1540 goto bail_refresh; 1541 } 1542 mlog_bug_on_msg(inode->i_generation != 1543 le32_to_cpu(fe->i_generation), 1544 "Invalid dinode %llu disk generation: %u " 1545 "inode->i_generation: %u\n", 1546 (unsigned long long)oi->ip_blkno, 1547 le32_to_cpu(fe->i_generation), 1548 inode->i_generation); 1549 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 1550 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 1551 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 1552 (unsigned long long)oi->ip_blkno, 1553 (unsigned long long)le64_to_cpu(fe->i_dtime), 1554 le32_to_cpu(fe->i_flags)); 1555 1556 ocfs2_refresh_inode(inode, fe); 1557 } 1558 1559 status = 0; 1560bail_refresh: 1561 if (lockres) 1562 ocfs2_complete_lock_res_refresh(lockres, status); 1563bail: 1564 mlog_exit(status); 1565 return status; 1566} 1567 1568static int ocfs2_assign_bh(struct inode *inode, 1569 struct buffer_head **ret_bh, 1570 struct buffer_head *passed_bh) 1571{ 1572 int status; 1573 1574 if (passed_bh) { 1575 /* Ok, the update went to disk for us, use the 1576 * returned bh. */ 1577 *ret_bh = passed_bh; 1578 get_bh(*ret_bh); 1579 1580 return 0; 1581 } 1582 1583 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), 1584 OCFS2_I(inode)->ip_blkno, 1585 ret_bh, 1586 OCFS2_BH_CACHED, 1587 inode); 1588 if (status < 0) 1589 mlog_errno(status); 1590 1591 return status; 1592} 1593 1594/* 1595 * returns < 0 error if the callback will never be called, otherwise 1596 * the result of the lock will be communicated via the callback. 1597 */ 1598int ocfs2_meta_lock_full(struct inode *inode, 1599 struct buffer_head **ret_bh, 1600 int ex, 1601 int arg_flags) 1602{ 1603 int status, level, dlm_flags, acquired; 1604 struct ocfs2_lock_res *lockres = NULL; 1605 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1606 struct buffer_head *local_bh = NULL; 1607 1608 BUG_ON(!inode); 1609 1610 mlog_entry_void(); 1611 1612 mlog(0, "inode %llu, take %s META lock\n", 1613 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1614 ex ? "EXMODE" : "PRMODE"); 1615 1616 status = 0; 1617 acquired = 0; 1618 /* We'll allow faking a readonly metadata lock for 1619 * rodevices. */ 1620 if (ocfs2_is_hard_readonly(osb)) { 1621 if (ex) 1622 status = -EROFS; 1623 goto bail; 1624 } 1625 1626 if (ocfs2_mount_local(osb)) 1627 goto local; 1628 1629 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1630 wait_event(osb->recovery_event, 1631 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1632 1633 acquired = 0; 1634 lockres = &OCFS2_I(inode)->ip_meta_lockres; 1635 level = ex ? LKM_EXMODE : LKM_PRMODE; 1636 dlm_flags = 0; 1637 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 1638 dlm_flags |= LKM_NOQUEUE; 1639 1640 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 1641 if (status < 0) { 1642 if (status != -EAGAIN && status != -EIOCBRETRY) 1643 mlog_errno(status); 1644 goto bail; 1645 } 1646 1647 /* Notify the error cleanup path to drop the cluster lock. */ 1648 acquired = 1; 1649 1650 /* We wait twice because a node may have died while we were in 1651 * the lower dlm layers. The second time though, we've 1652 * committed to owning this lock so we don't allow signals to 1653 * abort the operation. */ 1654 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1655 wait_event(osb->recovery_event, 1656 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1657 1658local: 1659 /* 1660 * We only see this flag if we're being called from 1661 * ocfs2_read_locked_inode(). It means we're locking an inode 1662 * which hasn't been populated yet, so clear the refresh flag 1663 * and let the caller handle it. 1664 */ 1665 if (inode->i_state & I_NEW) { 1666 status = 0; 1667 if (lockres) 1668 ocfs2_complete_lock_res_refresh(lockres, 0); 1669 goto bail; 1670 } 1671 1672 /* This is fun. The caller may want a bh back, or it may 1673 * not. ocfs2_meta_lock_update definitely wants one in, but 1674 * may or may not read one, depending on what's in the 1675 * LVB. The result of all of this is that we've *only* gone to 1676 * disk if we have to, so the complexity is worthwhile. */ 1677 status = ocfs2_meta_lock_update(inode, &local_bh); 1678 if (status < 0) { 1679 if (status != -ENOENT) 1680 mlog_errno(status); 1681 goto bail; 1682 } 1683 1684 if (ret_bh) { 1685 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 1686 if (status < 0) { 1687 mlog_errno(status); 1688 goto bail; 1689 } 1690 } 1691 1692bail: 1693 if (status < 0) { 1694 if (ret_bh && (*ret_bh)) { 1695 brelse(*ret_bh); 1696 *ret_bh = NULL; 1697 } 1698 if (acquired) 1699 ocfs2_meta_unlock(inode, ex); 1700 } 1701 1702 if (local_bh) 1703 brelse(local_bh); 1704 1705 mlog_exit(status); 1706 return status; 1707} 1708 1709/* 1710 * This is working around a lock inversion between tasks acquiring DLM locks 1711 * while holding a page lock and the vote thread which blocks dlm lock acquiry 1712 * while acquiring page locks. 1713 * 1714 * ** These _with_page variantes are only intended to be called from aop 1715 * methods that hold page locks and return a very specific *positive* error 1716 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 1717 * 1718 * The DLM is called such that it returns -EAGAIN if it would have blocked 1719 * waiting for the vote thread. In that case we unlock our page so the vote 1720 * thread can make progress. Once we've done this we have to return 1721 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up 1722 * into the VFS who will then immediately retry the aop call. 1723 * 1724 * We do a blocking lock and immediate unlock before returning, though, so that 1725 * the lock has a great chance of being cached on this node by the time the VFS 1726 * calls back to retry the aop. This has a potential to livelock as nodes 1727 * ping locks back and forth, but that's a risk we're willing to take to avoid 1728 * the lock inversion simply. 1729 */ 1730int ocfs2_meta_lock_with_page(struct inode *inode, 1731 struct buffer_head **ret_bh, 1732 int ex, 1733 struct page *page) 1734{ 1735 int ret; 1736 1737 ret = ocfs2_meta_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 1738 if (ret == -EAGAIN) { 1739 unlock_page(page); 1740 if (ocfs2_meta_lock(inode, ret_bh, ex) == 0) 1741 ocfs2_meta_unlock(inode, ex); 1742 ret = AOP_TRUNCATED_PAGE; 1743 } 1744 1745 return ret; 1746} 1747 1748int ocfs2_meta_lock_atime(struct inode *inode, 1749 struct vfsmount *vfsmnt, 1750 int *level) 1751{ 1752 int ret; 1753 1754 mlog_entry_void(); 1755 ret = ocfs2_meta_lock(inode, NULL, 0); 1756 if (ret < 0) { 1757 mlog_errno(ret); 1758 return ret; 1759 } 1760 1761 /* 1762 * If we should update atime, we will get EX lock, 1763 * otherwise we just get PR lock. 1764 */ 1765 if (ocfs2_should_update_atime(inode, vfsmnt)) { 1766 struct buffer_head *bh = NULL; 1767 1768 ocfs2_meta_unlock(inode, 0); 1769 ret = ocfs2_meta_lock(inode, &bh, 1); 1770 if (ret < 0) { 1771 mlog_errno(ret); 1772 return ret; 1773 } 1774 *level = 1; 1775 if (ocfs2_should_update_atime(inode, vfsmnt)) 1776 ocfs2_update_inode_atime(inode, bh); 1777 if (bh) 1778 brelse(bh); 1779 } else 1780 *level = 0; 1781 1782 mlog_exit(ret); 1783 return ret; 1784} 1785 1786void ocfs2_meta_unlock(struct inode *inode, 1787 int ex) 1788{ 1789 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1790 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres; 1791 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1792 1793 mlog_entry_void(); 1794 1795 mlog(0, "inode %llu drop %s META lock\n", 1796 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1797 ex ? "EXMODE" : "PRMODE"); 1798 1799 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 1800 !ocfs2_mount_local(osb)) 1801 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1802 1803 mlog_exit_void(); 1804} 1805 1806int ocfs2_super_lock(struct ocfs2_super *osb, 1807 int ex) 1808{ 1809 int status = 0; 1810 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1811 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 1812 struct buffer_head *bh; 1813 struct ocfs2_slot_info *si = osb->slot_info; 1814 1815 mlog_entry_void(); 1816 1817 if (ocfs2_is_hard_readonly(osb)) 1818 return -EROFS; 1819 1820 if (ocfs2_mount_local(osb)) 1821 goto bail; 1822 1823 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 1824 if (status < 0) { 1825 mlog_errno(status); 1826 goto bail; 1827 } 1828 1829 /* The super block lock path is really in the best position to 1830 * know when resources covered by the lock need to be 1831 * refreshed, so we do it here. Of course, making sense of 1832 * everything is up to the caller :) */ 1833 status = ocfs2_should_refresh_lock_res(lockres); 1834 if (status < 0) { 1835 mlog_errno(status); 1836 goto bail; 1837 } 1838 if (status) { 1839 bh = si->si_bh; 1840 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0, 1841 si->si_inode); 1842 if (status == 0) 1843 ocfs2_update_slot_info(si); 1844 1845 ocfs2_complete_lock_res_refresh(lockres, status); 1846 1847 if (status < 0) 1848 mlog_errno(status); 1849 } 1850bail: 1851 mlog_exit(status); 1852 return status; 1853} 1854 1855void ocfs2_super_unlock(struct ocfs2_super *osb, 1856 int ex) 1857{ 1858 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1859 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 1860 1861 if (!ocfs2_mount_local(osb)) 1862 ocfs2_cluster_unlock(osb, lockres, level); 1863} 1864 1865int ocfs2_rename_lock(struct ocfs2_super *osb) 1866{ 1867 int status; 1868 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 1869 1870 if (ocfs2_is_hard_readonly(osb)) 1871 return -EROFS; 1872 1873 if (ocfs2_mount_local(osb)) 1874 return 0; 1875 1876 status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0); 1877 if (status < 0) 1878 mlog_errno(status); 1879 1880 return status; 1881} 1882 1883void ocfs2_rename_unlock(struct ocfs2_super *osb) 1884{ 1885 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 1886 1887 if (!ocfs2_mount_local(osb)) 1888 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE); 1889} 1890 1891int ocfs2_dentry_lock(struct dentry *dentry, int ex) 1892{ 1893 int ret; 1894 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1895 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 1896 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 1897 1898 BUG_ON(!dl); 1899 1900 if (ocfs2_is_hard_readonly(osb)) 1901 return -EROFS; 1902 1903 if (ocfs2_mount_local(osb)) 1904 return 0; 1905 1906 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 1907 if (ret < 0) 1908 mlog_errno(ret); 1909 1910 return ret; 1911} 1912 1913void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 1914{ 1915 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1916 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 1917 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 1918 1919 if (!ocfs2_mount_local(osb)) 1920 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 1921} 1922 1923/* Reference counting of the dlm debug structure. We want this because 1924 * open references on the debug inodes can live on after a mount, so 1925 * we can't rely on the ocfs2_super to always exist. */ 1926static void ocfs2_dlm_debug_free(struct kref *kref) 1927{ 1928 struct ocfs2_dlm_debug *dlm_debug; 1929 1930 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 1931 1932 kfree(dlm_debug); 1933} 1934 1935void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 1936{ 1937 if (dlm_debug) 1938 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 1939} 1940 1941static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 1942{ 1943 kref_get(&debug->d_refcnt); 1944} 1945 1946struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 1947{ 1948 struct ocfs2_dlm_debug *dlm_debug; 1949 1950 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 1951 if (!dlm_debug) { 1952 mlog_errno(-ENOMEM); 1953 goto out; 1954 } 1955 1956 kref_init(&dlm_debug->d_refcnt); 1957 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 1958 dlm_debug->d_locking_state = NULL; 1959out: 1960 return dlm_debug; 1961} 1962 1963/* Access to this is arbitrated for us via seq_file->sem. */ 1964struct ocfs2_dlm_seq_priv { 1965 struct ocfs2_dlm_debug *p_dlm_debug; 1966 struct ocfs2_lock_res p_iter_res; 1967 struct ocfs2_lock_res p_tmp_res; 1968}; 1969 1970static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 1971 struct ocfs2_dlm_seq_priv *priv) 1972{ 1973 struct ocfs2_lock_res *iter, *ret = NULL; 1974 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 1975 1976 assert_spin_locked(&ocfs2_dlm_tracking_lock); 1977 1978 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 1979 /* discover the head of the list */ 1980 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 1981 mlog(0, "End of list found, %p\n", ret); 1982 break; 1983 } 1984 1985 /* We track our "dummy" iteration lockres' by a NULL 1986 * l_ops field. */ 1987 if (iter->l_ops != NULL) { 1988 ret = iter; 1989 break; 1990 } 1991 } 1992 1993 return ret; 1994} 1995 1996static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 1997{ 1998 struct ocfs2_dlm_seq_priv *priv = m->private; 1999 struct ocfs2_lock_res *iter; 2000 2001 spin_lock(&ocfs2_dlm_tracking_lock); 2002 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2003 if (iter) { 2004 /* Since lockres' have the lifetime of their container 2005 * (which can be inodes, ocfs2_supers, etc) we want to 2006 * copy this out to a temporary lockres while still 2007 * under the spinlock. Obviously after this we can't 2008 * trust any pointers on the copy returned, but that's 2009 * ok as the information we want isn't typically held 2010 * in them. */ 2011 priv->p_tmp_res = *iter; 2012 iter = &priv->p_tmp_res; 2013 } 2014 spin_unlock(&ocfs2_dlm_tracking_lock); 2015 2016 return iter; 2017} 2018 2019static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2020{ 2021} 2022 2023static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2024{ 2025 struct ocfs2_dlm_seq_priv *priv = m->private; 2026 struct ocfs2_lock_res *iter = v; 2027 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2028 2029 spin_lock(&ocfs2_dlm_tracking_lock); 2030 iter = ocfs2_dlm_next_res(iter, priv); 2031 list_del_init(&dummy->l_debug_list); 2032 if (iter) { 2033 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2034 priv->p_tmp_res = *iter; 2035 iter = &priv->p_tmp_res; 2036 } 2037 spin_unlock(&ocfs2_dlm_tracking_lock); 2038 2039 return iter; 2040} 2041 2042/* So that debugfs.ocfs2 can determine which format is being used */ 2043#define OCFS2_DLM_DEBUG_STR_VERSION 1 2044static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2045{ 2046 int i; 2047 char *lvb; 2048 struct ocfs2_lock_res *lockres = v; 2049 2050 if (!lockres) 2051 return -EINVAL; 2052 2053 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2054 2055 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2056 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2057 lockres->l_name, 2058 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2059 else 2060 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2061 2062 seq_printf(m, "%d\t" 2063 "0x%lx\t" 2064 "0x%x\t" 2065 "0x%x\t" 2066 "%u\t" 2067 "%u\t" 2068 "%d\t" 2069 "%d\t", 2070 lockres->l_level, 2071 lockres->l_flags, 2072 lockres->l_action, 2073 lockres->l_unlock_action, 2074 lockres->l_ro_holders, 2075 lockres->l_ex_holders, 2076 lockres->l_requested, 2077 lockres->l_blocking); 2078 2079 /* Dump the raw LVB */ 2080 lvb = lockres->l_lksb.lvb; 2081 for(i = 0; i < DLM_LVB_LEN; i++) 2082 seq_printf(m, "0x%x\t", lvb[i]); 2083 2084 /* End the line */ 2085 seq_printf(m, "\n"); 2086 return 0; 2087} 2088 2089static struct seq_operations ocfs2_dlm_seq_ops = { 2090 .start = ocfs2_dlm_seq_start, 2091 .stop = ocfs2_dlm_seq_stop, 2092 .next = ocfs2_dlm_seq_next, 2093 .show = ocfs2_dlm_seq_show, 2094}; 2095 2096static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2097{ 2098 struct seq_file *seq = (struct seq_file *) file->private_data; 2099 struct ocfs2_dlm_seq_priv *priv = seq->private; 2100 struct ocfs2_lock_res *res = &priv->p_iter_res; 2101 2102 ocfs2_remove_lockres_tracking(res); 2103 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2104 return seq_release_private(inode, file); 2105} 2106 2107static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2108{ 2109 int ret; 2110 struct ocfs2_dlm_seq_priv *priv; 2111 struct seq_file *seq; 2112 struct ocfs2_super *osb; 2113 2114 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2115 if (!priv) { 2116 ret = -ENOMEM; 2117 mlog_errno(ret); 2118 goto out; 2119 } 2120 osb = inode->i_private; 2121 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2122 priv->p_dlm_debug = osb->osb_dlm_debug; 2123 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2124 2125 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2126 if (ret) { 2127 kfree(priv); 2128 mlog_errno(ret); 2129 goto out; 2130 } 2131 2132 seq = (struct seq_file *) file->private_data; 2133 seq->private = priv; 2134 2135 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2136 priv->p_dlm_debug); 2137 2138out: 2139 return ret; 2140} 2141 2142static const struct file_operations ocfs2_dlm_debug_fops = { 2143 .open = ocfs2_dlm_debug_open, 2144 .release = ocfs2_dlm_debug_release, 2145 .read = seq_read, 2146 .llseek = seq_lseek, 2147}; 2148 2149static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2150{ 2151 int ret = 0; 2152 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2153 2154 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2155 S_IFREG|S_IRUSR, 2156 osb->osb_debug_root, 2157 osb, 2158 &ocfs2_dlm_debug_fops); 2159 if (!dlm_debug->d_locking_state) { 2160 ret = -EINVAL; 2161 mlog(ML_ERROR, 2162 "Unable to create locking state debugfs file.\n"); 2163 goto out; 2164 } 2165 2166 ocfs2_get_dlm_debug(dlm_debug); 2167out: 2168 return ret; 2169} 2170 2171static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2172{ 2173 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2174 2175 if (dlm_debug) { 2176 debugfs_remove(dlm_debug->d_locking_state); 2177 ocfs2_put_dlm_debug(dlm_debug); 2178 } 2179} 2180 2181int ocfs2_dlm_init(struct ocfs2_super *osb) 2182{ 2183 int status = 0; 2184 u32 dlm_key; 2185 struct dlm_ctxt *dlm = NULL; 2186 2187 mlog_entry_void(); 2188 2189 if (ocfs2_mount_local(osb)) 2190 goto local; 2191 2192 status = ocfs2_dlm_init_debug(osb); 2193 if (status < 0) { 2194 mlog_errno(status); 2195 goto bail; 2196 } 2197 2198 /* launch vote thread */ 2199 osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote"); 2200 if (IS_ERR(osb->vote_task)) { 2201 status = PTR_ERR(osb->vote_task); 2202 osb->vote_task = NULL; 2203 mlog_errno(status); 2204 goto bail; 2205 } 2206 2207 /* used by the dlm code to make message headers unique, each 2208 * node in this domain must agree on this. */ 2209 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str)); 2210 2211 /* for now, uuid == domain */ 2212 dlm = dlm_register_domain(osb->uuid_str, dlm_key); 2213 if (IS_ERR(dlm)) { 2214 status = PTR_ERR(dlm); 2215 mlog_errno(status); 2216 goto bail; 2217 } 2218 2219 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb); 2220 2221local: 2222 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2223 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2224 2225 osb->dlm = dlm; 2226 2227 status = 0; 2228bail: 2229 if (status < 0) { 2230 ocfs2_dlm_shutdown_debug(osb); 2231 if (osb->vote_task) 2232 kthread_stop(osb->vote_task); 2233 } 2234 2235 mlog_exit(status); 2236 return status; 2237} 2238 2239void ocfs2_dlm_shutdown(struct ocfs2_super *osb) 2240{ 2241 mlog_entry_void(); 2242 2243 dlm_unregister_eviction_cb(&osb->osb_eviction_cb); 2244 2245 ocfs2_drop_osb_locks(osb); 2246 2247 if (osb->vote_task) { 2248 kthread_stop(osb->vote_task); 2249 osb->vote_task = NULL; 2250 } 2251 2252 ocfs2_lock_res_free(&osb->osb_super_lockres); 2253 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2254 2255 dlm_unregister_domain(osb->dlm); 2256 osb->dlm = NULL; 2257 2258 ocfs2_dlm_shutdown_debug(osb); 2259 2260 mlog_exit_void(); 2261} 2262 2263static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) 2264{ 2265 struct ocfs2_lock_res *lockres = opaque; 2266 unsigned long flags; 2267 2268 mlog_entry_void(); 2269 2270 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2271 lockres->l_unlock_action); 2272 2273 spin_lock_irqsave(&lockres->l_lock, flags); 2274 /* We tried to cancel a convert request, but it was already 2275 * granted. All we want to do here is clear our unlock 2276 * state. The wake_up call done at the bottom is redundant 2277 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't 2278 * hurt anything anyway */ 2279 if (status == DLM_CANCELGRANT && 2280 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2281 mlog(0, "Got cancelgrant for %s\n", lockres->l_name); 2282 2283 /* We don't clear the busy flag in this case as it 2284 * should have been cleared by the ast which the dlm 2285 * has called. */ 2286 goto complete_unlock; 2287 } 2288 2289 if (status != DLM_NORMAL) { 2290 mlog(ML_ERROR, "Dlm passes status %d for lock %s, " 2291 "unlock_action %d\n", status, lockres->l_name, 2292 lockres->l_unlock_action); 2293 spin_unlock_irqrestore(&lockres->l_lock, flags); 2294 return; 2295 } 2296 2297 switch(lockres->l_unlock_action) { 2298 case OCFS2_UNLOCK_CANCEL_CONVERT: 2299 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2300 lockres->l_action = OCFS2_AST_INVALID; 2301 break; 2302 case OCFS2_UNLOCK_DROP_LOCK: 2303 lockres->l_level = LKM_IVMODE; 2304 break; 2305 default: 2306 BUG(); 2307 } 2308 2309 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2310complete_unlock: 2311 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2312 spin_unlock_irqrestore(&lockres->l_lock, flags); 2313 2314 wake_up(&lockres->l_event); 2315 2316 mlog_exit_void(); 2317} 2318 2319static int ocfs2_drop_lock(struct ocfs2_super *osb, 2320 struct ocfs2_lock_res *lockres) 2321{ 2322 enum dlm_status status; 2323 unsigned long flags; 2324 int lkm_flags = 0; 2325 2326 /* We didn't get anywhere near actually using this lockres. */ 2327 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2328 goto out; 2329 2330 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2331 lkm_flags |= LKM_VALBLK; 2332 2333 spin_lock_irqsave(&lockres->l_lock, flags); 2334 2335 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2336 "lockres %s, flags 0x%lx\n", 2337 lockres->l_name, lockres->l_flags); 2338 2339 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2340 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2341 "%u, unlock_action = %u\n", 2342 lockres->l_name, lockres->l_flags, lockres->l_action, 2343 lockres->l_unlock_action); 2344 2345 spin_unlock_irqrestore(&lockres->l_lock, flags); 2346 2347 /* XXX: Today we just wait on any busy 2348 * locks... Perhaps we need to cancel converts in the 2349 * future? */ 2350 ocfs2_wait_on_busy_lock(lockres); 2351 2352 spin_lock_irqsave(&lockres->l_lock, flags); 2353 } 2354 2355 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2356 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2357 lockres->l_level == LKM_EXMODE && 2358 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2359 lockres->l_ops->set_lvb(lockres); 2360 } 2361 2362 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2363 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2364 lockres->l_name); 2365 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2366 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2367 2368 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2369 spin_unlock_irqrestore(&lockres->l_lock, flags); 2370 goto out; 2371 } 2372 2373 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2374 2375 /* make sure we never get here while waiting for an ast to 2376 * fire. */ 2377 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2378 2379 /* is this necessary? */ 2380 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2381 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2382 spin_unlock_irqrestore(&lockres->l_lock, flags); 2383 2384 mlog(0, "lock %s\n", lockres->l_name); 2385 2386 status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags, 2387 ocfs2_unlock_ast, lockres); 2388 if (status != DLM_NORMAL) { 2389 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2390 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2391 dlm_print_one_lock(lockres->l_lksb.lockid); 2392 BUG(); 2393 } 2394 mlog(0, "lock %s, successfull return from dlmunlock\n", 2395 lockres->l_name); 2396 2397 ocfs2_wait_on_busy_lock(lockres); 2398out: 2399 mlog_exit(0); 2400 return 0; 2401} 2402 2403/* Mark the lockres as being dropped. It will no longer be 2404 * queued if blocking, but we still may have to wait on it 2405 * being dequeued from the vote thread before we can consider 2406 * it safe to drop. 2407 * 2408 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2409void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 2410{ 2411 int status; 2412 struct ocfs2_mask_waiter mw; 2413 unsigned long flags; 2414 2415 ocfs2_init_mask_waiter(&mw); 2416 2417 spin_lock_irqsave(&lockres->l_lock, flags); 2418 lockres->l_flags |= OCFS2_LOCK_FREEING; 2419 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 2420 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 2421 spin_unlock_irqrestore(&lockres->l_lock, flags); 2422 2423 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 2424 2425 status = ocfs2_wait_for_mask(&mw); 2426 if (status) 2427 mlog_errno(status); 2428 2429 spin_lock_irqsave(&lockres->l_lock, flags); 2430 } 2431 spin_unlock_irqrestore(&lockres->l_lock, flags); 2432} 2433 2434void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 2435 struct ocfs2_lock_res *lockres) 2436{ 2437 int ret; 2438 2439 ocfs2_mark_lockres_freeing(lockres); 2440 ret = ocfs2_drop_lock(osb, lockres); 2441 if (ret) 2442 mlog_errno(ret); 2443} 2444 2445static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2446{ 2447 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 2448 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 2449} 2450 2451int ocfs2_drop_inode_locks(struct inode *inode) 2452{ 2453 int status, err; 2454 2455 mlog_entry_void(); 2456 2457 /* No need to call ocfs2_mark_lockres_freeing here - 2458 * ocfs2_clear_inode has done it for us. */ 2459 2460 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2461 &OCFS2_I(inode)->ip_data_lockres); 2462 if (err < 0) 2463 mlog_errno(err); 2464 2465 status = err; 2466 2467 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2468 &OCFS2_I(inode)->ip_meta_lockres); 2469 if (err < 0) 2470 mlog_errno(err); 2471 if (err < 0 && !status) 2472 status = err; 2473 2474 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2475 &OCFS2_I(inode)->ip_rw_lockres); 2476 if (err < 0) 2477 mlog_errno(err); 2478 if (err < 0 && !status) 2479 status = err; 2480 2481 mlog_exit(status); 2482 return status; 2483} 2484 2485static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2486 int new_level) 2487{ 2488 assert_spin_locked(&lockres->l_lock); 2489 2490 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 2491 2492 if (lockres->l_level <= new_level) { 2493 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n", 2494 lockres->l_level, new_level); 2495 BUG(); 2496 } 2497 2498 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 2499 lockres->l_name, new_level, lockres->l_blocking); 2500 2501 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2502 lockres->l_requested = new_level; 2503 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2504} 2505 2506static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2507 struct ocfs2_lock_res *lockres, 2508 int new_level, 2509 int lvb) 2510{ 2511 int ret, dlm_flags = LKM_CONVERT; 2512 enum dlm_status status; 2513 2514 mlog_entry_void(); 2515 2516 if (lvb) 2517 dlm_flags |= LKM_VALBLK; 2518 2519 status = dlmlock(osb->dlm, 2520 new_level, 2521 &lockres->l_lksb, 2522 dlm_flags, 2523 lockres->l_name, 2524 OCFS2_LOCK_ID_MAX_LEN - 1, 2525 ocfs2_locking_ast, 2526 lockres, 2527 ocfs2_blocking_ast); 2528 if (status != DLM_NORMAL) { 2529 ocfs2_log_dlm_error("dlmlock", status, lockres); 2530 ret = -EINVAL; 2531 ocfs2_recover_from_dlm_error(lockres, 1); 2532 goto bail; 2533 } 2534 2535 ret = 0; 2536bail: 2537 mlog_exit(ret); 2538 return ret; 2539} 2540 2541/* returns 1 when the caller should unlock and call dlmunlock */ 2542static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 2543 struct ocfs2_lock_res *lockres) 2544{ 2545 assert_spin_locked(&lockres->l_lock); 2546 2547 mlog_entry_void(); 2548 mlog(0, "lock %s\n", lockres->l_name); 2549 2550 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2551 /* If we're already trying to cancel a lock conversion 2552 * then just drop the spinlock and allow the caller to 2553 * requeue this lock. */ 2554 2555 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 2556 return 0; 2557 } 2558 2559 /* were we in a convert when we got the bast fire? */ 2560 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 2561 lockres->l_action != OCFS2_AST_DOWNCONVERT); 2562 /* set things up for the unlockast to know to just 2563 * clear out the ast_action and unset busy, etc. */ 2564 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 2565 2566 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 2567 "lock %s, invalid flags: 0x%lx\n", 2568 lockres->l_name, lockres->l_flags); 2569 2570 return 1; 2571} 2572 2573static int ocfs2_cancel_convert(struct ocfs2_super *osb, 2574 struct ocfs2_lock_res *lockres) 2575{ 2576 int ret; 2577 enum dlm_status status; 2578 2579 mlog_entry_void(); 2580 mlog(0, "lock %s\n", lockres->l_name); 2581 2582 ret = 0; 2583 status = dlmunlock(osb->dlm, 2584 &lockres->l_lksb, 2585 LKM_CANCEL, 2586 ocfs2_unlock_ast, 2587 lockres); 2588 if (status != DLM_NORMAL) { 2589 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2590 ret = -EINVAL; 2591 ocfs2_recover_from_dlm_error(lockres, 0); 2592 } 2593 2594 mlog(0, "lock %s return from dlmunlock\n", lockres->l_name); 2595 2596 mlog_exit(ret); 2597 return ret; 2598} 2599 2600static int ocfs2_unblock_lock(struct ocfs2_super *osb, 2601 struct ocfs2_lock_res *lockres, 2602 struct ocfs2_unblock_ctl *ctl) 2603{ 2604 unsigned long flags; 2605 int blocking; 2606 int new_level; 2607 int ret = 0; 2608 int set_lvb = 0; 2609 2610 mlog_entry_void(); 2611 2612 spin_lock_irqsave(&lockres->l_lock, flags); 2613 2614 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 2615 2616recheck: 2617 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 2618 ctl->requeue = 1; 2619 ret = ocfs2_prepare_cancel_convert(osb, lockres); 2620 spin_unlock_irqrestore(&lockres->l_lock, flags); 2621 if (ret) { 2622 ret = ocfs2_cancel_convert(osb, lockres); 2623 if (ret < 0) 2624 mlog_errno(ret); 2625 } 2626 goto leave; 2627 } 2628 2629 /* if we're blocking an exclusive and we have *any* holders, 2630 * then requeue. */ 2631 if ((lockres->l_blocking == LKM_EXMODE) 2632 && (lockres->l_ex_holders || lockres->l_ro_holders)) 2633 goto leave_requeue; 2634 2635 /* If it's a PR we're blocking, then only 2636 * requeue if we've got any EX holders */ 2637 if (lockres->l_blocking == LKM_PRMODE && 2638 lockres->l_ex_holders) 2639 goto leave_requeue; 2640 2641 /* 2642 * Can we get a lock in this state if the holder counts are 2643 * zero? The meta data unblock code used to check this. 2644 */ 2645 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 2646 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 2647 goto leave_requeue; 2648 2649 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 2650 2651 if (lockres->l_ops->check_downconvert 2652 && !lockres->l_ops->check_downconvert(lockres, new_level)) 2653 goto leave_requeue; 2654 2655 /* If we get here, then we know that there are no more 2656 * incompatible holders (and anyone asking for an incompatible 2657 * lock is blocked). We can now downconvert the lock */ 2658 if (!lockres->l_ops->downconvert_worker) 2659 goto downconvert; 2660 2661 /* Some lockres types want to do a bit of work before 2662 * downconverting a lock. Allow that here. The worker function 2663 * may sleep, so we save off a copy of what we're blocking as 2664 * it may change while we're not holding the spin lock. */ 2665 blocking = lockres->l_blocking; 2666 spin_unlock_irqrestore(&lockres->l_lock, flags); 2667 2668 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 2669 2670 if (ctl->unblock_action == UNBLOCK_STOP_POST) 2671 goto leave; 2672 2673 spin_lock_irqsave(&lockres->l_lock, flags); 2674 if (blocking != lockres->l_blocking) { 2675 /* If this changed underneath us, then we can't drop 2676 * it just yet. */ 2677 goto recheck; 2678 } 2679 2680downconvert: 2681 ctl->requeue = 0; 2682 2683 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2684 if (lockres->l_level == LKM_EXMODE) 2685 set_lvb = 1; 2686 2687 /* 2688 * We only set the lvb if the lock has been fully 2689 * refreshed - otherwise we risk setting stale 2690 * data. Otherwise, there's no need to actually clear 2691 * out the lvb here as it's value is still valid. 2692 */ 2693 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2694 lockres->l_ops->set_lvb(lockres); 2695 } 2696 2697 ocfs2_prepare_downconvert(lockres, new_level); 2698 spin_unlock_irqrestore(&lockres->l_lock, flags); 2699 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 2700leave: 2701 mlog_exit(ret); 2702 return ret; 2703 2704leave_requeue: 2705 spin_unlock_irqrestore(&lockres->l_lock, flags); 2706 ctl->requeue = 1; 2707 2708 mlog_exit(0); 2709 return 0; 2710} 2711 2712static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 2713 int blocking) 2714{ 2715 struct inode *inode; 2716 struct address_space *mapping; 2717 2718 inode = ocfs2_lock_res_inode(lockres); 2719 mapping = inode->i_mapping; 2720 2721 if (filemap_fdatawrite(mapping)) { 2722 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 2723 (unsigned long long)OCFS2_I(inode)->ip_blkno); 2724 } 2725 sync_mapping_buffers(mapping); 2726 if (blocking == LKM_EXMODE) { 2727 truncate_inode_pages(mapping, 0); 2728 unmap_mapping_range(mapping, 0, 0, 0); 2729 } else { 2730 /* We only need to wait on the I/O if we're not also 2731 * truncating pages because truncate_inode_pages waits 2732 * for us above. We don't truncate pages if we're 2733 * blocking anything < EXMODE because we want to keep 2734 * them around in that case. */ 2735 filemap_fdatawait(mapping); 2736 } 2737 2738 return UNBLOCK_CONTINUE; 2739} 2740 2741static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 2742 int new_level) 2743{ 2744 struct inode *inode = ocfs2_lock_res_inode(lockres); 2745 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 2746 2747 BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE); 2748 BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed); 2749 2750 if (checkpointed) 2751 return 1; 2752 2753 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb)); 2754 return 0; 2755} 2756 2757static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 2758{ 2759 struct inode *inode = ocfs2_lock_res_inode(lockres); 2760 2761 __ocfs2_stuff_meta_lvb(inode); 2762} 2763 2764/* 2765 * Does the final reference drop on our dentry lock. Right now this 2766 * happens in the vote thread, but we could choose to simplify the 2767 * dlmglue API and push these off to the ocfs2_wq in the future. 2768 */ 2769static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 2770 struct ocfs2_lock_res *lockres) 2771{ 2772 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 2773 ocfs2_dentry_lock_put(osb, dl); 2774} 2775 2776/* 2777 * d_delete() matching dentries before the lock downconvert. 2778 * 2779 * At this point, any process waiting to destroy the 2780 * dentry_lock due to last ref count is stopped by the 2781 * OCFS2_LOCK_QUEUED flag. 2782 * 2783 * We have two potential problems 2784 * 2785 * 1) If we do the last reference drop on our dentry_lock (via dput) 2786 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 2787 * the downconvert to finish. Instead we take an elevated 2788 * reference and push the drop until after we've completed our 2789 * unblock processing. 2790 * 2791 * 2) There might be another process with a final reference, 2792 * waiting on us to finish processing. If this is the case, we 2793 * detect it and exit out - there's no more dentries anyway. 2794 */ 2795static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 2796 int blocking) 2797{ 2798 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 2799 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 2800 struct dentry *dentry; 2801 unsigned long flags; 2802 int extra_ref = 0; 2803 2804 /* 2805 * This node is blocking another node from getting a read 2806 * lock. This happens when we've renamed within a 2807 * directory. We've forced the other nodes to d_delete(), but 2808 * we never actually dropped our lock because it's still 2809 * valid. The downconvert code will retain a PR for this node, 2810 * so there's no further work to do. 2811 */ 2812 if (blocking == LKM_PRMODE) 2813 return UNBLOCK_CONTINUE; 2814 2815 /* 2816 * Mark this inode as potentially orphaned. The code in 2817 * ocfs2_delete_inode() will figure out whether it actually 2818 * needs to be freed or not. 2819 */ 2820 spin_lock(&oi->ip_lock); 2821 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 2822 spin_unlock(&oi->ip_lock); 2823 2824 /* 2825 * Yuck. We need to make sure however that the check of 2826 * OCFS2_LOCK_FREEING and the extra reference are atomic with 2827 * respect to a reference decrement or the setting of that 2828 * flag. 2829 */ 2830 spin_lock_irqsave(&lockres->l_lock, flags); 2831 spin_lock(&dentry_attach_lock); 2832 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 2833 && dl->dl_count) { 2834 dl->dl_count++; 2835 extra_ref = 1; 2836 } 2837 spin_unlock(&dentry_attach_lock); 2838 spin_unlock_irqrestore(&lockres->l_lock, flags); 2839 2840 mlog(0, "extra_ref = %d\n", extra_ref); 2841 2842 /* 2843 * We have a process waiting on us in ocfs2_dentry_iput(), 2844 * which means we can't have any more outstanding 2845 * aliases. There's no need to do any more work. 2846 */ 2847 if (!extra_ref) 2848 return UNBLOCK_CONTINUE; 2849 2850 spin_lock(&dentry_attach_lock); 2851 while (1) { 2852 dentry = ocfs2_find_local_alias(dl->dl_inode, 2853 dl->dl_parent_blkno, 1); 2854 if (!dentry) 2855 break; 2856 spin_unlock(&dentry_attach_lock); 2857 2858 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 2859 dentry->d_name.name); 2860 2861 /* 2862 * The following dcache calls may do an 2863 * iput(). Normally we don't want that from the 2864 * downconverting thread, but in this case it's ok 2865 * because the requesting node already has an 2866 * exclusive lock on the inode, so it can't be queued 2867 * for a downconvert. 2868 */ 2869 d_delete(dentry); 2870 dput(dentry); 2871 2872 spin_lock(&dentry_attach_lock); 2873 } 2874 spin_unlock(&dentry_attach_lock); 2875 2876 /* 2877 * If we are the last holder of this dentry lock, there is no 2878 * reason to downconvert so skip straight to the unlock. 2879 */ 2880 if (dl->dl_count == 1) 2881 return UNBLOCK_STOP_POST; 2882 2883 return UNBLOCK_CONTINUE_POST; 2884} 2885 2886void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 2887 struct ocfs2_lock_res *lockres) 2888{ 2889 int status; 2890 struct ocfs2_unblock_ctl ctl = {0, 0,}; 2891 unsigned long flags; 2892 2893 /* Our reference to the lockres in this function can be 2894 * considered valid until we remove the OCFS2_LOCK_QUEUED 2895 * flag. */ 2896 2897 mlog_entry_void(); 2898 2899 BUG_ON(!lockres); 2900 BUG_ON(!lockres->l_ops); 2901 2902 mlog(0, "lockres %s blocked.\n", lockres->l_name); 2903 2904 /* Detect whether a lock has been marked as going away while 2905 * the vote thread was processing other things. A lock can 2906 * still be marked with OCFS2_LOCK_FREEING after this check, 2907 * but short circuiting here will still save us some 2908 * performance. */ 2909 spin_lock_irqsave(&lockres->l_lock, flags); 2910 if (lockres->l_flags & OCFS2_LOCK_FREEING) 2911 goto unqueue; 2912 spin_unlock_irqrestore(&lockres->l_lock, flags); 2913 2914 status = ocfs2_unblock_lock(osb, lockres, &ctl); 2915 if (status < 0) 2916 mlog_errno(status); 2917 2918 spin_lock_irqsave(&lockres->l_lock, flags); 2919unqueue: 2920 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 2921 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 2922 } else 2923 ocfs2_schedule_blocked_lock(osb, lockres); 2924 2925 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 2926 ctl.requeue ? "yes" : "no"); 2927 spin_unlock_irqrestore(&lockres->l_lock, flags); 2928 2929 if (ctl.unblock_action != UNBLOCK_CONTINUE 2930 && lockres->l_ops->post_unlock) 2931 lockres->l_ops->post_unlock(osb, lockres); 2932 2933 mlog_exit_void(); 2934} 2935 2936static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 2937 struct ocfs2_lock_res *lockres) 2938{ 2939 mlog_entry_void(); 2940 2941 assert_spin_locked(&lockres->l_lock); 2942 2943 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 2944 /* Do not schedule a lock for downconvert when it's on 2945 * the way to destruction - any nodes wanting access 2946 * to the resource will get it soon. */ 2947 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 2948 lockres->l_name, lockres->l_flags); 2949 return; 2950 } 2951 2952 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 2953 2954 spin_lock(&osb->vote_task_lock); 2955 if (list_empty(&lockres->l_blocked_list)) { 2956 list_add_tail(&lockres->l_blocked_list, 2957 &osb->blocked_lock_list); 2958 osb->blocked_lock_count++; 2959 } 2960 spin_unlock(&osb->vote_task_lock); 2961 2962 mlog_exit_void(); 2963} 2964 2965/* This aids in debugging situations where a bad LVB might be involved. */ 2966void ocfs2_dump_meta_lvb_info(u64 level, 2967 const char *function, 2968 unsigned int line, 2969 struct ocfs2_lock_res *lockres) 2970{ 2971 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 2972 2973 mlog(level, "LVB information for %s (called from %s:%u):\n", 2974 lockres->l_name, function, line); 2975 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 2976 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 2977 be32_to_cpu(lvb->lvb_igeneration)); 2978 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 2979 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 2980 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 2981 be16_to_cpu(lvb->lvb_imode)); 2982 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 2983 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 2984 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 2985 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 2986 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 2987 be32_to_cpu(lvb->lvb_iattr)); 2988} 2989