dlmglue.c revision 8f2c9c1b16bf6ed0903b29c49d56fa0109a390e4
1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26#include <linux/types.h> 27#include <linux/slab.h> 28#include <linux/highmem.h> 29#include <linux/mm.h> 30#include <linux/crc32.h> 31#include <linux/kthread.h> 32#include <linux/pagemap.h> 33#include <linux/debugfs.h> 34#include <linux/seq_file.h> 35 36#include <cluster/heartbeat.h> 37#include <cluster/nodemanager.h> 38#include <cluster/tcp.h> 39 40#define MLOG_MASK_PREFIX ML_DLM_GLUE 41#include <cluster/masklog.h> 42 43#include "ocfs2.h" 44#include "ocfs2_lockingver.h" 45 46#include "alloc.h" 47#include "dcache.h" 48#include "dlmglue.h" 49#include "extent_map.h" 50#include "file.h" 51#include "heartbeat.h" 52#include "inode.h" 53#include "journal.h" 54#include "stackglue.h" 55#include "slot_map.h" 56#include "super.h" 57#include "uptodate.h" 58 59#include "buffer_head_io.h" 60 61struct ocfs2_mask_waiter { 62 struct list_head mw_item; 63 int mw_status; 64 struct completion mw_complete; 65 unsigned long mw_mask; 66 unsigned long mw_goal; 67}; 68 69static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 70static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 71static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 72 73/* 74 * Return value from ->downconvert_worker functions. 75 * 76 * These control the precise actions of ocfs2_unblock_lock() 77 * and ocfs2_process_blocked_lock() 78 * 79 */ 80enum ocfs2_unblock_action { 81 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 82 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 83 * ->post_unlock callback */ 84 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 85 * ->post_unlock() callback. */ 86}; 87 88struct ocfs2_unblock_ctl { 89 int requeue; 90 enum ocfs2_unblock_action unblock_action; 91}; 92 93static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 94 int new_level); 95static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 96 97static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 98 int blocking); 99 100static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 101 int blocking); 102 103static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 104 struct ocfs2_lock_res *lockres); 105 106 107#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 108 109/* This aids in debugging situations where a bad LVB might be involved. */ 110static void ocfs2_dump_meta_lvb_info(u64 level, 111 const char *function, 112 unsigned int line, 113 struct ocfs2_lock_res *lockres) 114{ 115 struct ocfs2_meta_lvb *lvb = 116 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 117 118 mlog(level, "LVB information for %s (called from %s:%u):\n", 119 lockres->l_name, function, line); 120 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 121 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 122 be32_to_cpu(lvb->lvb_igeneration)); 123 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 124 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 125 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 126 be16_to_cpu(lvb->lvb_imode)); 127 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 128 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 129 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 130 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 131 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 132 be32_to_cpu(lvb->lvb_iattr)); 133} 134 135 136/* 137 * OCFS2 Lock Resource Operations 138 * 139 * These fine tune the behavior of the generic dlmglue locking infrastructure. 140 * 141 * The most basic of lock types can point ->l_priv to their respective 142 * struct ocfs2_super and allow the default actions to manage things. 143 * 144 * Right now, each lock type also needs to implement an init function, 145 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 146 * should be called when the lock is no longer needed (i.e., object 147 * destruction time). 148 */ 149struct ocfs2_lock_res_ops { 150 /* 151 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 152 * this callback if ->l_priv is not an ocfs2_super pointer 153 */ 154 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 155 156 /* 157 * Optionally called in the downconvert thread after a 158 * successful downconvert. The lockres will not be referenced 159 * after this callback is called, so it is safe to free 160 * memory, etc. 161 * 162 * The exact semantics of when this is called are controlled 163 * by ->downconvert_worker() 164 */ 165 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 166 167 /* 168 * Allow a lock type to add checks to determine whether it is 169 * safe to downconvert a lock. Return 0 to re-queue the 170 * downconvert at a later time, nonzero to continue. 171 * 172 * For most locks, the default checks that there are no 173 * incompatible holders are sufficient. 174 * 175 * Called with the lockres spinlock held. 176 */ 177 int (*check_downconvert)(struct ocfs2_lock_res *, int); 178 179 /* 180 * Allows a lock type to populate the lock value block. This 181 * is called on downconvert, and when we drop a lock. 182 * 183 * Locks that want to use this should set LOCK_TYPE_USES_LVB 184 * in the flags field. 185 * 186 * Called with the lockres spinlock held. 187 */ 188 void (*set_lvb)(struct ocfs2_lock_res *); 189 190 /* 191 * Called from the downconvert thread when it is determined 192 * that a lock will be downconverted. This is called without 193 * any locks held so the function can do work that might 194 * schedule (syncing out data, etc). 195 * 196 * This should return any one of the ocfs2_unblock_action 197 * values, depending on what it wants the thread to do. 198 */ 199 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 200 201 /* 202 * LOCK_TYPE_* flags which describe the specific requirements 203 * of a lock type. Descriptions of each individual flag follow. 204 */ 205 int flags; 206}; 207 208/* 209 * Some locks want to "refresh" potentially stale data when a 210 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 211 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 212 * individual lockres l_flags member from the ast function. It is 213 * expected that the locking wrapper will clear the 214 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 215 */ 216#define LOCK_TYPE_REQUIRES_REFRESH 0x1 217 218/* 219 * Indicate that a lock type makes use of the lock value block. The 220 * ->set_lvb lock type callback must be defined. 221 */ 222#define LOCK_TYPE_USES_LVB 0x2 223 224static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 225 .get_osb = ocfs2_get_inode_osb, 226 .flags = 0, 227}; 228 229static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 230 .get_osb = ocfs2_get_inode_osb, 231 .check_downconvert = ocfs2_check_meta_downconvert, 232 .set_lvb = ocfs2_set_meta_lvb, 233 .downconvert_worker = ocfs2_data_convert_worker, 234 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 235}; 236 237static struct ocfs2_lock_res_ops ocfs2_super_lops = { 238 .flags = LOCK_TYPE_REQUIRES_REFRESH, 239}; 240 241static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 242 .flags = 0, 243}; 244 245static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 246 .get_osb = ocfs2_get_dentry_osb, 247 .post_unlock = ocfs2_dentry_post_unlock, 248 .downconvert_worker = ocfs2_dentry_convert_worker, 249 .flags = 0, 250}; 251 252static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 253 .get_osb = ocfs2_get_inode_osb, 254 .flags = 0, 255}; 256 257static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 258 .get_osb = ocfs2_get_file_osb, 259 .flags = 0, 260}; 261 262/* 263 * This is the filesystem locking protocol version. 264 * 265 * Whenever the filesystem does new things with locks (adds or removes a 266 * lock, orders them differently, does different things underneath a lock), 267 * the version must be changed. The protocol is negotiated when joining 268 * the dlm domain. A node may join the domain if its major version is 269 * identical to all other nodes and its minor version is greater than 270 * or equal to all other nodes. When its minor version is greater than 271 * the other nodes, it will run at the minor version specified by the 272 * other nodes. 273 * 274 * If a locking change is made that will not be compatible with older 275 * versions, the major number must be increased and the minor version set 276 * to zero. If a change merely adds a behavior that can be disabled when 277 * speaking to older versions, the minor version must be increased. If a 278 * change adds a fully backwards compatible change (eg, LVB changes that 279 * are just ignored by older versions), the version does not need to be 280 * updated. 281 */ 282const struct dlm_protocol_version ocfs2_locking_protocol = { 283 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 284 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 285}; 286 287static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 288{ 289 return lockres->l_type == OCFS2_LOCK_TYPE_META || 290 lockres->l_type == OCFS2_LOCK_TYPE_RW || 291 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 292} 293 294static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 295{ 296 BUG_ON(!ocfs2_is_inode_lock(lockres)); 297 298 return (struct inode *) lockres->l_priv; 299} 300 301static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 302{ 303 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 304 305 return (struct ocfs2_dentry_lock *)lockres->l_priv; 306} 307 308static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 309{ 310 if (lockres->l_ops->get_osb) 311 return lockres->l_ops->get_osb(lockres); 312 313 return (struct ocfs2_super *)lockres->l_priv; 314} 315 316static int ocfs2_lock_create(struct ocfs2_super *osb, 317 struct ocfs2_lock_res *lockres, 318 int level, 319 u32 dlm_flags); 320static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 321 int wanted); 322static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 323 struct ocfs2_lock_res *lockres, 324 int level); 325static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 326static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 327static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 328static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 329static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 330 struct ocfs2_lock_res *lockres); 331static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 332 int convert); 333#define ocfs2_log_dlm_error(_func, _err, _lockres) do { \ 334 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n", \ 335 _err, _func, _lockres->l_name); \ 336} while (0) 337static int ocfs2_downconvert_thread(void *arg); 338static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 339 struct ocfs2_lock_res *lockres); 340static int ocfs2_inode_lock_update(struct inode *inode, 341 struct buffer_head **bh); 342static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 343static inline int ocfs2_highest_compat_lock_level(int level); 344static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 345 int new_level); 346static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 347 struct ocfs2_lock_res *lockres, 348 int new_level, 349 int lvb); 350static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 351 struct ocfs2_lock_res *lockres); 352static int ocfs2_cancel_convert(struct ocfs2_super *osb, 353 struct ocfs2_lock_res *lockres); 354 355 356static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 357 u64 blkno, 358 u32 generation, 359 char *name) 360{ 361 int len; 362 363 mlog_entry_void(); 364 365 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 366 367 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 368 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 369 (long long)blkno, generation); 370 371 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 372 373 mlog(0, "built lock resource with name: %s\n", name); 374 375 mlog_exit_void(); 376} 377 378static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 379 380static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 381 struct ocfs2_dlm_debug *dlm_debug) 382{ 383 mlog(0, "Add tracking for lockres %s\n", res->l_name); 384 385 spin_lock(&ocfs2_dlm_tracking_lock); 386 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 387 spin_unlock(&ocfs2_dlm_tracking_lock); 388} 389 390static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 391{ 392 spin_lock(&ocfs2_dlm_tracking_lock); 393 if (!list_empty(&res->l_debug_list)) 394 list_del_init(&res->l_debug_list); 395 spin_unlock(&ocfs2_dlm_tracking_lock); 396} 397 398static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 399 struct ocfs2_lock_res *res, 400 enum ocfs2_lock_type type, 401 struct ocfs2_lock_res_ops *ops, 402 void *priv) 403{ 404 res->l_type = type; 405 res->l_ops = ops; 406 res->l_priv = priv; 407 408 res->l_level = DLM_LOCK_IV; 409 res->l_requested = DLM_LOCK_IV; 410 res->l_blocking = DLM_LOCK_IV; 411 res->l_action = OCFS2_AST_INVALID; 412 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 413 414 res->l_flags = OCFS2_LOCK_INITIALIZED; 415 416 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 417} 418 419void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 420{ 421 /* This also clears out the lock status block */ 422 memset(res, 0, sizeof(struct ocfs2_lock_res)); 423 spin_lock_init(&res->l_lock); 424 init_waitqueue_head(&res->l_event); 425 INIT_LIST_HEAD(&res->l_blocked_list); 426 INIT_LIST_HEAD(&res->l_mask_waiters); 427} 428 429void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 430 enum ocfs2_lock_type type, 431 unsigned int generation, 432 struct inode *inode) 433{ 434 struct ocfs2_lock_res_ops *ops; 435 436 switch(type) { 437 case OCFS2_LOCK_TYPE_RW: 438 ops = &ocfs2_inode_rw_lops; 439 break; 440 case OCFS2_LOCK_TYPE_META: 441 ops = &ocfs2_inode_inode_lops; 442 break; 443 case OCFS2_LOCK_TYPE_OPEN: 444 ops = &ocfs2_inode_open_lops; 445 break; 446 default: 447 mlog_bug_on_msg(1, "type: %d\n", type); 448 ops = NULL; /* thanks, gcc */ 449 break; 450 }; 451 452 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 453 generation, res->l_name); 454 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 455} 456 457static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 458{ 459 struct inode *inode = ocfs2_lock_res_inode(lockres); 460 461 return OCFS2_SB(inode->i_sb); 462} 463 464static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 465{ 466 struct ocfs2_file_private *fp = lockres->l_priv; 467 468 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 469} 470 471static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 472{ 473 __be64 inode_blkno_be; 474 475 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 476 sizeof(__be64)); 477 478 return be64_to_cpu(inode_blkno_be); 479} 480 481static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 482{ 483 struct ocfs2_dentry_lock *dl = lockres->l_priv; 484 485 return OCFS2_SB(dl->dl_inode->i_sb); 486} 487 488void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 489 u64 parent, struct inode *inode) 490{ 491 int len; 492 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 493 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 494 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 495 496 ocfs2_lock_res_init_once(lockres); 497 498 /* 499 * Unfortunately, the standard lock naming scheme won't work 500 * here because we have two 16 byte values to use. Instead, 501 * we'll stuff the inode number as a binary value. We still 502 * want error prints to show something without garbling the 503 * display, so drop a null byte in there before the inode 504 * number. A future version of OCFS2 will likely use all 505 * binary lock names. The stringified names have been a 506 * tremendous aid in debugging, but now that the debugfs 507 * interface exists, we can mangle things there if need be. 508 * 509 * NOTE: We also drop the standard "pad" value (the total lock 510 * name size stays the same though - the last part is all 511 * zeros due to the memset in ocfs2_lock_res_init_once() 512 */ 513 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 514 "%c%016llx", 515 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 516 (long long)parent); 517 518 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 519 520 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 521 sizeof(__be64)); 522 523 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 524 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 525 dl); 526} 527 528static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 529 struct ocfs2_super *osb) 530{ 531 /* Superblock lockres doesn't come from a slab so we call init 532 * once on it manually. */ 533 ocfs2_lock_res_init_once(res); 534 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 535 0, res->l_name); 536 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 537 &ocfs2_super_lops, osb); 538} 539 540static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 541 struct ocfs2_super *osb) 542{ 543 /* Rename lockres doesn't come from a slab so we call init 544 * once on it manually. */ 545 ocfs2_lock_res_init_once(res); 546 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 547 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 548 &ocfs2_rename_lops, osb); 549} 550 551void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 552 struct ocfs2_file_private *fp) 553{ 554 struct inode *inode = fp->fp_file->f_mapping->host; 555 struct ocfs2_inode_info *oi = OCFS2_I(inode); 556 557 ocfs2_lock_res_init_once(lockres); 558 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 559 inode->i_generation, lockres->l_name); 560 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 561 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 562 fp); 563 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 564} 565 566void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 567{ 568 mlog_entry_void(); 569 570 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 571 return; 572 573 ocfs2_remove_lockres_tracking(res); 574 575 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 576 "Lockres %s is on the blocked list\n", 577 res->l_name); 578 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 579 "Lockres %s has mask waiters pending\n", 580 res->l_name); 581 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 582 "Lockres %s is locked\n", 583 res->l_name); 584 mlog_bug_on_msg(res->l_ro_holders, 585 "Lockres %s has %u ro holders\n", 586 res->l_name, res->l_ro_holders); 587 mlog_bug_on_msg(res->l_ex_holders, 588 "Lockres %s has %u ex holders\n", 589 res->l_name, res->l_ex_holders); 590 591 /* Need to clear out the lock status block for the dlm */ 592 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 593 594 res->l_flags = 0UL; 595 mlog_exit_void(); 596} 597 598static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 599 int level) 600{ 601 mlog_entry_void(); 602 603 BUG_ON(!lockres); 604 605 switch(level) { 606 case DLM_LOCK_EX: 607 lockres->l_ex_holders++; 608 break; 609 case DLM_LOCK_PR: 610 lockres->l_ro_holders++; 611 break; 612 default: 613 BUG(); 614 } 615 616 mlog_exit_void(); 617} 618 619static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 620 int level) 621{ 622 mlog_entry_void(); 623 624 BUG_ON(!lockres); 625 626 switch(level) { 627 case DLM_LOCK_EX: 628 BUG_ON(!lockres->l_ex_holders); 629 lockres->l_ex_holders--; 630 break; 631 case DLM_LOCK_PR: 632 BUG_ON(!lockres->l_ro_holders); 633 lockres->l_ro_holders--; 634 break; 635 default: 636 BUG(); 637 } 638 mlog_exit_void(); 639} 640 641/* WARNING: This function lives in a world where the only three lock 642 * levels are EX, PR, and NL. It *will* have to be adjusted when more 643 * lock types are added. */ 644static inline int ocfs2_highest_compat_lock_level(int level) 645{ 646 int new_level = DLM_LOCK_EX; 647 648 if (level == DLM_LOCK_EX) 649 new_level = DLM_LOCK_NL; 650 else if (level == DLM_LOCK_PR) 651 new_level = DLM_LOCK_PR; 652 return new_level; 653} 654 655static void lockres_set_flags(struct ocfs2_lock_res *lockres, 656 unsigned long newflags) 657{ 658 struct ocfs2_mask_waiter *mw, *tmp; 659 660 assert_spin_locked(&lockres->l_lock); 661 662 lockres->l_flags = newflags; 663 664 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 665 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 666 continue; 667 668 list_del_init(&mw->mw_item); 669 mw->mw_status = 0; 670 complete(&mw->mw_complete); 671 } 672} 673static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 674{ 675 lockres_set_flags(lockres, lockres->l_flags | or); 676} 677static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 678 unsigned long clear) 679{ 680 lockres_set_flags(lockres, lockres->l_flags & ~clear); 681} 682 683static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 684{ 685 mlog_entry_void(); 686 687 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 688 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 689 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 690 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 691 692 lockres->l_level = lockres->l_requested; 693 if (lockres->l_level <= 694 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 695 lockres->l_blocking = DLM_LOCK_NL; 696 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 697 } 698 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 699 700 mlog_exit_void(); 701} 702 703static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 704{ 705 mlog_entry_void(); 706 707 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 708 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 709 710 /* Convert from RO to EX doesn't really need anything as our 711 * information is already up to data. Convert from NL to 712 * *anything* however should mark ourselves as needing an 713 * update */ 714 if (lockres->l_level == DLM_LOCK_NL && 715 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 716 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 717 718 lockres->l_level = lockres->l_requested; 719 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 720 721 mlog_exit_void(); 722} 723 724static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 725{ 726 mlog_entry_void(); 727 728 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 729 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 730 731 if (lockres->l_requested > DLM_LOCK_NL && 732 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 733 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 734 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 735 736 lockres->l_level = lockres->l_requested; 737 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 738 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 739 740 mlog_exit_void(); 741} 742 743static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 744 int level) 745{ 746 int needs_downconvert = 0; 747 mlog_entry_void(); 748 749 assert_spin_locked(&lockres->l_lock); 750 751 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 752 753 if (level > lockres->l_blocking) { 754 /* only schedule a downconvert if we haven't already scheduled 755 * one that goes low enough to satisfy the level we're 756 * blocking. this also catches the case where we get 757 * duplicate BASTs */ 758 if (ocfs2_highest_compat_lock_level(level) < 759 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 760 needs_downconvert = 1; 761 762 lockres->l_blocking = level; 763 } 764 765 mlog_exit(needs_downconvert); 766 return needs_downconvert; 767} 768 769static void ocfs2_blocking_ast(void *opaque, int level) 770{ 771 struct ocfs2_lock_res *lockres = opaque; 772 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 773 int needs_downconvert; 774 unsigned long flags; 775 776 BUG_ON(level <= DLM_LOCK_NL); 777 778 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 779 lockres->l_name, level, lockres->l_level, 780 ocfs2_lock_type_string(lockres->l_type)); 781 782 /* 783 * We can skip the bast for locks which don't enable caching - 784 * they'll be dropped at the earliest possible time anyway. 785 */ 786 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 787 return; 788 789 spin_lock_irqsave(&lockres->l_lock, flags); 790 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 791 if (needs_downconvert) 792 ocfs2_schedule_blocked_lock(osb, lockres); 793 spin_unlock_irqrestore(&lockres->l_lock, flags); 794 795 wake_up(&lockres->l_event); 796 797 ocfs2_wake_downconvert_thread(osb); 798} 799 800static void ocfs2_locking_ast(void *opaque) 801{ 802 struct ocfs2_lock_res *lockres = opaque; 803 unsigned long flags; 804 805 spin_lock_irqsave(&lockres->l_lock, flags); 806 807 if (ocfs2_dlm_lock_status(&lockres->l_lksb)) { 808 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n", 809 lockres->l_name, 810 ocfs2_dlm_lock_status(&lockres->l_lksb)); 811 spin_unlock_irqrestore(&lockres->l_lock, flags); 812 return; 813 } 814 815 switch(lockres->l_action) { 816 case OCFS2_AST_ATTACH: 817 ocfs2_generic_handle_attach_action(lockres); 818 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 819 break; 820 case OCFS2_AST_CONVERT: 821 ocfs2_generic_handle_convert_action(lockres); 822 break; 823 case OCFS2_AST_DOWNCONVERT: 824 ocfs2_generic_handle_downconvert_action(lockres); 825 break; 826 default: 827 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 828 "lockres flags = 0x%lx, unlock action: %u\n", 829 lockres->l_name, lockres->l_action, lockres->l_flags, 830 lockres->l_unlock_action); 831 BUG(); 832 } 833 834 /* set it to something invalid so if we get called again we 835 * can catch it. */ 836 lockres->l_action = OCFS2_AST_INVALID; 837 838 wake_up(&lockres->l_event); 839 spin_unlock_irqrestore(&lockres->l_lock, flags); 840} 841 842static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 843 int convert) 844{ 845 unsigned long flags; 846 847 mlog_entry_void(); 848 spin_lock_irqsave(&lockres->l_lock, flags); 849 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 850 if (convert) 851 lockres->l_action = OCFS2_AST_INVALID; 852 else 853 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 854 spin_unlock_irqrestore(&lockres->l_lock, flags); 855 856 wake_up(&lockres->l_event); 857 mlog_exit_void(); 858} 859 860/* Note: If we detect another process working on the lock (i.e., 861 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 862 * to do the right thing in that case. 863 */ 864static int ocfs2_lock_create(struct ocfs2_super *osb, 865 struct ocfs2_lock_res *lockres, 866 int level, 867 u32 dlm_flags) 868{ 869 int ret = 0; 870 unsigned long flags; 871 872 mlog_entry_void(); 873 874 mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level, 875 dlm_flags); 876 877 spin_lock_irqsave(&lockres->l_lock, flags); 878 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 879 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 880 spin_unlock_irqrestore(&lockres->l_lock, flags); 881 goto bail; 882 } 883 884 lockres->l_action = OCFS2_AST_ATTACH; 885 lockres->l_requested = level; 886 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 887 spin_unlock_irqrestore(&lockres->l_lock, flags); 888 889 ret = ocfs2_dlm_lock(osb->dlm, 890 level, 891 &lockres->l_lksb, 892 dlm_flags, 893 lockres->l_name, 894 OCFS2_LOCK_ID_MAX_LEN - 1, 895 lockres); 896 if (ret) { 897 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 898 ocfs2_recover_from_dlm_error(lockres, 1); 899 } 900 901 mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name); 902 903bail: 904 mlog_exit(ret); 905 return ret; 906} 907 908static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 909 int flag) 910{ 911 unsigned long flags; 912 int ret; 913 914 spin_lock_irqsave(&lockres->l_lock, flags); 915 ret = lockres->l_flags & flag; 916 spin_unlock_irqrestore(&lockres->l_lock, flags); 917 918 return ret; 919} 920 921static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 922 923{ 924 wait_event(lockres->l_event, 925 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 926} 927 928static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 929 930{ 931 wait_event(lockres->l_event, 932 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 933} 934 935/* predict what lock level we'll be dropping down to on behalf 936 * of another node, and return true if the currently wanted 937 * level will be compatible with it. */ 938static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 939 int wanted) 940{ 941 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 942 943 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 944} 945 946static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 947{ 948 INIT_LIST_HEAD(&mw->mw_item); 949 init_completion(&mw->mw_complete); 950} 951 952static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 953{ 954 wait_for_completion(&mw->mw_complete); 955 /* Re-arm the completion in case we want to wait on it again */ 956 INIT_COMPLETION(mw->mw_complete); 957 return mw->mw_status; 958} 959 960static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 961 struct ocfs2_mask_waiter *mw, 962 unsigned long mask, 963 unsigned long goal) 964{ 965 BUG_ON(!list_empty(&mw->mw_item)); 966 967 assert_spin_locked(&lockres->l_lock); 968 969 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 970 mw->mw_mask = mask; 971 mw->mw_goal = goal; 972} 973 974/* returns 0 if the mw that was removed was already satisfied, -EBUSY 975 * if the mask still hadn't reached its goal */ 976static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 977 struct ocfs2_mask_waiter *mw) 978{ 979 unsigned long flags; 980 int ret = 0; 981 982 spin_lock_irqsave(&lockres->l_lock, flags); 983 if (!list_empty(&mw->mw_item)) { 984 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 985 ret = -EBUSY; 986 987 list_del_init(&mw->mw_item); 988 init_completion(&mw->mw_complete); 989 } 990 spin_unlock_irqrestore(&lockres->l_lock, flags); 991 992 return ret; 993 994} 995 996static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 997 struct ocfs2_lock_res *lockres) 998{ 999 int ret; 1000 1001 ret = wait_for_completion_interruptible(&mw->mw_complete); 1002 if (ret) 1003 lockres_remove_mask_waiter(lockres, mw); 1004 else 1005 ret = mw->mw_status; 1006 /* Re-arm the completion in case we want to wait on it again */ 1007 INIT_COMPLETION(mw->mw_complete); 1008 return ret; 1009} 1010 1011static int ocfs2_cluster_lock(struct ocfs2_super *osb, 1012 struct ocfs2_lock_res *lockres, 1013 int level, 1014 u32 lkm_flags, 1015 int arg_flags) 1016{ 1017 struct ocfs2_mask_waiter mw; 1018 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1019 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1020 unsigned long flags; 1021 1022 mlog_entry_void(); 1023 1024 ocfs2_init_mask_waiter(&mw); 1025 1026 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1027 lkm_flags |= DLM_LKF_VALBLK; 1028 1029again: 1030 wait = 0; 1031 1032 if (catch_signals && signal_pending(current)) { 1033 ret = -ERESTARTSYS; 1034 goto out; 1035 } 1036 1037 spin_lock_irqsave(&lockres->l_lock, flags); 1038 1039 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1040 "Cluster lock called on freeing lockres %s! flags " 1041 "0x%lx\n", lockres->l_name, lockres->l_flags); 1042 1043 /* We only compare against the currently granted level 1044 * here. If the lock is blocked waiting on a downconvert, 1045 * we'll get caught below. */ 1046 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1047 level > lockres->l_level) { 1048 /* is someone sitting in dlm_lock? If so, wait on 1049 * them. */ 1050 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1051 wait = 1; 1052 goto unlock; 1053 } 1054 1055 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1056 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1057 /* is the lock is currently blocked on behalf of 1058 * another node */ 1059 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1060 wait = 1; 1061 goto unlock; 1062 } 1063 1064 if (level > lockres->l_level) { 1065 if (lockres->l_action != OCFS2_AST_INVALID) 1066 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1067 lockres->l_name, lockres->l_action); 1068 1069 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1070 lockres->l_action = OCFS2_AST_ATTACH; 1071 lkm_flags &= ~DLM_LKF_CONVERT; 1072 } else { 1073 lockres->l_action = OCFS2_AST_CONVERT; 1074 lkm_flags |= DLM_LKF_CONVERT; 1075 } 1076 1077 lockres->l_requested = level; 1078 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1079 spin_unlock_irqrestore(&lockres->l_lock, flags); 1080 1081 BUG_ON(level == DLM_LOCK_IV); 1082 BUG_ON(level == DLM_LOCK_NL); 1083 1084 mlog(0, "lock %s, convert from %d to level = %d\n", 1085 lockres->l_name, lockres->l_level, level); 1086 1087 /* call dlm_lock to upgrade lock now */ 1088 ret = ocfs2_dlm_lock(osb->dlm, 1089 level, 1090 &lockres->l_lksb, 1091 lkm_flags, 1092 lockres->l_name, 1093 OCFS2_LOCK_ID_MAX_LEN - 1, 1094 lockres); 1095 if (ret) { 1096 if (!(lkm_flags & DLM_LKF_NOQUEUE) || 1097 (ret != -EAGAIN)) { 1098 ocfs2_log_dlm_error("ocfs2_dlm_lock", 1099 ret, lockres); 1100 } 1101 ocfs2_recover_from_dlm_error(lockres, 1); 1102 goto out; 1103 } 1104 1105 mlog(0, "lock %s, successfull return from ocfs2_dlm_lock\n", 1106 lockres->l_name); 1107 1108 /* At this point we've gone inside the dlm and need to 1109 * complete our work regardless. */ 1110 catch_signals = 0; 1111 1112 /* wait for busy to clear and carry on */ 1113 goto again; 1114 } 1115 1116 /* Ok, if we get here then we're good to go. */ 1117 ocfs2_inc_holders(lockres, level); 1118 1119 ret = 0; 1120unlock: 1121 spin_unlock_irqrestore(&lockres->l_lock, flags); 1122out: 1123 /* 1124 * This is helping work around a lock inversion between the page lock 1125 * and dlm locks. One path holds the page lock while calling aops 1126 * which block acquiring dlm locks. The voting thread holds dlm 1127 * locks while acquiring page locks while down converting data locks. 1128 * This block is helping an aop path notice the inversion and back 1129 * off to unlock its page lock before trying the dlm lock again. 1130 */ 1131 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1132 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1133 wait = 0; 1134 if (lockres_remove_mask_waiter(lockres, &mw)) 1135 ret = -EAGAIN; 1136 else 1137 goto again; 1138 } 1139 if (wait) { 1140 ret = ocfs2_wait_for_mask(&mw); 1141 if (ret == 0) 1142 goto again; 1143 mlog_errno(ret); 1144 } 1145 1146 mlog_exit(ret); 1147 return ret; 1148} 1149 1150static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1151 struct ocfs2_lock_res *lockres, 1152 int level) 1153{ 1154 unsigned long flags; 1155 1156 mlog_entry_void(); 1157 spin_lock_irqsave(&lockres->l_lock, flags); 1158 ocfs2_dec_holders(lockres, level); 1159 ocfs2_downconvert_on_unlock(osb, lockres); 1160 spin_unlock_irqrestore(&lockres->l_lock, flags); 1161 mlog_exit_void(); 1162} 1163 1164static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1165 struct ocfs2_lock_res *lockres, 1166 int ex, 1167 int local) 1168{ 1169 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1170 unsigned long flags; 1171 u32 lkm_flags = local ? DLM_LKF_LOCAL : 0; 1172 1173 spin_lock_irqsave(&lockres->l_lock, flags); 1174 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1175 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1176 spin_unlock_irqrestore(&lockres->l_lock, flags); 1177 1178 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1179} 1180 1181/* Grants us an EX lock on the data and metadata resources, skipping 1182 * the normal cluster directory lookup. Use this ONLY on newly created 1183 * inodes which other nodes can't possibly see, and which haven't been 1184 * hashed in the inode hash yet. This can give us a good performance 1185 * increase as it'll skip the network broadcast normally associated 1186 * with creating a new lock resource. */ 1187int ocfs2_create_new_inode_locks(struct inode *inode) 1188{ 1189 int ret; 1190 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1191 1192 BUG_ON(!inode); 1193 BUG_ON(!ocfs2_inode_is_new(inode)); 1194 1195 mlog_entry_void(); 1196 1197 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1198 1199 /* NOTE: That we don't increment any of the holder counts, nor 1200 * do we add anything to a journal handle. Since this is 1201 * supposed to be a new inode which the cluster doesn't know 1202 * about yet, there is no need to. As far as the LVB handling 1203 * is concerned, this is basically like acquiring an EX lock 1204 * on a resource which has an invalid one -- we'll set it 1205 * valid when we release the EX. */ 1206 1207 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1208 if (ret) { 1209 mlog_errno(ret); 1210 goto bail; 1211 } 1212 1213 /* 1214 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they 1215 * don't use a generation in their lock names. 1216 */ 1217 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1218 if (ret) { 1219 mlog_errno(ret); 1220 goto bail; 1221 } 1222 1223 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1224 if (ret) { 1225 mlog_errno(ret); 1226 goto bail; 1227 } 1228 1229bail: 1230 mlog_exit(ret); 1231 return ret; 1232} 1233 1234int ocfs2_rw_lock(struct inode *inode, int write) 1235{ 1236 int status, level; 1237 struct ocfs2_lock_res *lockres; 1238 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1239 1240 BUG_ON(!inode); 1241 1242 mlog_entry_void(); 1243 1244 mlog(0, "inode %llu take %s RW lock\n", 1245 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1246 write ? "EXMODE" : "PRMODE"); 1247 1248 if (ocfs2_mount_local(osb)) 1249 return 0; 1250 1251 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1252 1253 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1254 1255 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1256 0); 1257 if (status < 0) 1258 mlog_errno(status); 1259 1260 mlog_exit(status); 1261 return status; 1262} 1263 1264void ocfs2_rw_unlock(struct inode *inode, int write) 1265{ 1266 int level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1267 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1268 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1269 1270 mlog_entry_void(); 1271 1272 mlog(0, "inode %llu drop %s RW lock\n", 1273 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1274 write ? "EXMODE" : "PRMODE"); 1275 1276 if (!ocfs2_mount_local(osb)) 1277 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1278 1279 mlog_exit_void(); 1280} 1281 1282/* 1283 * ocfs2_open_lock always get PR mode lock. 1284 */ 1285int ocfs2_open_lock(struct inode *inode) 1286{ 1287 int status = 0; 1288 struct ocfs2_lock_res *lockres; 1289 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1290 1291 BUG_ON(!inode); 1292 1293 mlog_entry_void(); 1294 1295 mlog(0, "inode %llu take PRMODE open lock\n", 1296 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1297 1298 if (ocfs2_mount_local(osb)) 1299 goto out; 1300 1301 lockres = &OCFS2_I(inode)->ip_open_lockres; 1302 1303 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1304 DLM_LOCK_PR, 0, 0); 1305 if (status < 0) 1306 mlog_errno(status); 1307 1308out: 1309 mlog_exit(status); 1310 return status; 1311} 1312 1313int ocfs2_try_open_lock(struct inode *inode, int write) 1314{ 1315 int status = 0, level; 1316 struct ocfs2_lock_res *lockres; 1317 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1318 1319 BUG_ON(!inode); 1320 1321 mlog_entry_void(); 1322 1323 mlog(0, "inode %llu try to take %s open lock\n", 1324 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1325 write ? "EXMODE" : "PRMODE"); 1326 1327 if (ocfs2_mount_local(osb)) 1328 goto out; 1329 1330 lockres = &OCFS2_I(inode)->ip_open_lockres; 1331 1332 level = write ? DLM_LOCK_EX : DLM_LOCK_PR; 1333 1334 /* 1335 * The file system may already holding a PRMODE/EXMODE open lock. 1336 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on 1337 * other nodes and the -EAGAIN will indicate to the caller that 1338 * this inode is still in use. 1339 */ 1340 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1341 level, DLM_LKF_NOQUEUE, 0); 1342 1343out: 1344 mlog_exit(status); 1345 return status; 1346} 1347 1348/* 1349 * ocfs2_open_unlock unlock PR and EX mode open locks. 1350 */ 1351void ocfs2_open_unlock(struct inode *inode) 1352{ 1353 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1354 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1355 1356 mlog_entry_void(); 1357 1358 mlog(0, "inode %llu drop open lock\n", 1359 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1360 1361 if (ocfs2_mount_local(osb)) 1362 goto out; 1363 1364 if(lockres->l_ro_holders) 1365 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1366 DLM_LOCK_PR); 1367 if(lockres->l_ex_holders) 1368 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1369 DLM_LOCK_EX); 1370 1371out: 1372 mlog_exit_void(); 1373} 1374 1375static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1376 int level) 1377{ 1378 int ret; 1379 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1380 unsigned long flags; 1381 struct ocfs2_mask_waiter mw; 1382 1383 ocfs2_init_mask_waiter(&mw); 1384 1385retry_cancel: 1386 spin_lock_irqsave(&lockres->l_lock, flags); 1387 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1388 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1389 if (ret) { 1390 spin_unlock_irqrestore(&lockres->l_lock, flags); 1391 ret = ocfs2_cancel_convert(osb, lockres); 1392 if (ret < 0) { 1393 mlog_errno(ret); 1394 goto out; 1395 } 1396 goto retry_cancel; 1397 } 1398 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1399 spin_unlock_irqrestore(&lockres->l_lock, flags); 1400 1401 ocfs2_wait_for_mask(&mw); 1402 goto retry_cancel; 1403 } 1404 1405 ret = -ERESTARTSYS; 1406 /* 1407 * We may still have gotten the lock, in which case there's no 1408 * point to restarting the syscall. 1409 */ 1410 if (lockres->l_level == level) 1411 ret = 0; 1412 1413 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1414 lockres->l_flags, lockres->l_level, lockres->l_action); 1415 1416 spin_unlock_irqrestore(&lockres->l_lock, flags); 1417 1418out: 1419 return ret; 1420} 1421 1422/* 1423 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1424 * flock() calls. The locking approach this requires is sufficiently 1425 * different from all other cluster lock types that we implement a 1426 * seperate path to the "low-level" dlm calls. In particular: 1427 * 1428 * - No optimization of lock levels is done - we take at exactly 1429 * what's been requested. 1430 * 1431 * - No lock caching is employed. We immediately downconvert to 1432 * no-lock at unlock time. This also means flock locks never go on 1433 * the blocking list). 1434 * 1435 * - Since userspace can trivially deadlock itself with flock, we make 1436 * sure to allow cancellation of a misbehaving applications flock() 1437 * request. 1438 * 1439 * - Access to any flock lockres doesn't require concurrency, so we 1440 * can simplify the code by requiring the caller to guarantee 1441 * serialization of dlmglue flock calls. 1442 */ 1443int ocfs2_file_lock(struct file *file, int ex, int trylock) 1444{ 1445 int ret, level = ex ? LKM_EXMODE : LKM_PRMODE; 1446 unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0; 1447 unsigned long flags; 1448 struct ocfs2_file_private *fp = file->private_data; 1449 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1450 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1451 struct ocfs2_mask_waiter mw; 1452 1453 ocfs2_init_mask_waiter(&mw); 1454 1455 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1456 (lockres->l_level > DLM_LOCK_NL)) { 1457 mlog(ML_ERROR, 1458 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1459 "level: %u\n", lockres->l_name, lockres->l_flags, 1460 lockres->l_level); 1461 return -EINVAL; 1462 } 1463 1464 spin_lock_irqsave(&lockres->l_lock, flags); 1465 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1466 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1467 spin_unlock_irqrestore(&lockres->l_lock, flags); 1468 1469 /* 1470 * Get the lock at NLMODE to start - that way we 1471 * can cancel the upconvert request if need be. 1472 */ 1473 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); 1474 if (ret < 0) { 1475 mlog_errno(ret); 1476 goto out; 1477 } 1478 1479 ret = ocfs2_wait_for_mask(&mw); 1480 if (ret) { 1481 mlog_errno(ret); 1482 goto out; 1483 } 1484 spin_lock_irqsave(&lockres->l_lock, flags); 1485 } 1486 1487 lockres->l_action = OCFS2_AST_CONVERT; 1488 lkm_flags |= LKM_CONVERT; 1489 lockres->l_requested = level; 1490 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1491 1492 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1493 spin_unlock_irqrestore(&lockres->l_lock, flags); 1494 1495 ret = ocfs2_dlm_lock(osb->dlm, level, &lockres->l_lksb, lkm_flags, 1496 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1497 lockres); 1498 if (ret) { 1499 if (!trylock || (ret != -EAGAIN)) { 1500 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 1501 ret = -EINVAL; 1502 } 1503 1504 ocfs2_recover_from_dlm_error(lockres, 1); 1505 lockres_remove_mask_waiter(lockres, &mw); 1506 goto out; 1507 } 1508 1509 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1510 if (ret == -ERESTARTSYS) { 1511 /* 1512 * Userspace can cause deadlock itself with 1513 * flock(). Current behavior locally is to allow the 1514 * deadlock, but abort the system call if a signal is 1515 * received. We follow this example, otherwise a 1516 * poorly written program could sit in kernel until 1517 * reboot. 1518 * 1519 * Handling this is a bit more complicated for Ocfs2 1520 * though. We can't exit this function with an 1521 * outstanding lock request, so a cancel convert is 1522 * required. We intentionally overwrite 'ret' - if the 1523 * cancel fails and the lock was granted, it's easier 1524 * to just bubble sucess back up to the user. 1525 */ 1526 ret = ocfs2_flock_handle_signal(lockres, level); 1527 } 1528 1529out: 1530 1531 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1532 lockres->l_name, ex, trylock, ret); 1533 return ret; 1534} 1535 1536void ocfs2_file_unlock(struct file *file) 1537{ 1538 int ret; 1539 unsigned long flags; 1540 struct ocfs2_file_private *fp = file->private_data; 1541 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1542 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1543 struct ocfs2_mask_waiter mw; 1544 1545 ocfs2_init_mask_waiter(&mw); 1546 1547 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1548 return; 1549 1550 if (lockres->l_level == LKM_NLMODE) 1551 return; 1552 1553 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1554 lockres->l_name, lockres->l_flags, lockres->l_level, 1555 lockres->l_action); 1556 1557 spin_lock_irqsave(&lockres->l_lock, flags); 1558 /* 1559 * Fake a blocking ast for the downconvert code. 1560 */ 1561 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1562 lockres->l_blocking = DLM_LOCK_EX; 1563 1564 ocfs2_prepare_downconvert(lockres, LKM_NLMODE); 1565 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1566 spin_unlock_irqrestore(&lockres->l_lock, flags); 1567 1568 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0); 1569 if (ret) { 1570 mlog_errno(ret); 1571 return; 1572 } 1573 1574 ret = ocfs2_wait_for_mask(&mw); 1575 if (ret) 1576 mlog_errno(ret); 1577} 1578 1579static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 1580 struct ocfs2_lock_res *lockres) 1581{ 1582 int kick = 0; 1583 1584 mlog_entry_void(); 1585 1586 /* If we know that another node is waiting on our lock, kick 1587 * the downconvert thread * pre-emptively when we reach a release 1588 * condition. */ 1589 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1590 switch(lockres->l_blocking) { 1591 case DLM_LOCK_EX: 1592 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1593 kick = 1; 1594 break; 1595 case DLM_LOCK_PR: 1596 if (!lockres->l_ex_holders) 1597 kick = 1; 1598 break; 1599 default: 1600 BUG(); 1601 } 1602 } 1603 1604 if (kick) 1605 ocfs2_wake_downconvert_thread(osb); 1606 1607 mlog_exit_void(); 1608} 1609 1610#define OCFS2_SEC_BITS 34 1611#define OCFS2_SEC_SHIFT (64 - 34) 1612#define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1613 1614/* LVB only has room for 64 bits of time here so we pack it for 1615 * now. */ 1616static u64 ocfs2_pack_timespec(struct timespec *spec) 1617{ 1618 u64 res; 1619 u64 sec = spec->tv_sec; 1620 u32 nsec = spec->tv_nsec; 1621 1622 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1623 1624 return res; 1625} 1626 1627/* Call this with the lockres locked. I am reasonably sure we don't 1628 * need ip_lock in this function as anyone who would be changing those 1629 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 1630static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1631{ 1632 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1633 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1634 struct ocfs2_meta_lvb *lvb; 1635 1636 mlog_entry_void(); 1637 1638 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1639 1640 /* 1641 * Invalidate the LVB of a deleted inode - this way other 1642 * nodes are forced to go to disk and discover the new inode 1643 * status. 1644 */ 1645 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1646 lvb->lvb_version = 0; 1647 goto out; 1648 } 1649 1650 lvb->lvb_version = OCFS2_LVB_VERSION; 1651 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1652 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1653 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1654 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1655 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1656 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1657 lvb->lvb_iatime_packed = 1658 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1659 lvb->lvb_ictime_packed = 1660 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1661 lvb->lvb_imtime_packed = 1662 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1663 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1664 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 1665 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1666 1667out: 1668 mlog_meta_lvb(0, lockres); 1669 1670 mlog_exit_void(); 1671} 1672 1673static void ocfs2_unpack_timespec(struct timespec *spec, 1674 u64 packed_time) 1675{ 1676 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1677 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1678} 1679 1680static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1681{ 1682 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1683 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1684 struct ocfs2_meta_lvb *lvb; 1685 1686 mlog_entry_void(); 1687 1688 mlog_meta_lvb(0, lockres); 1689 1690 lvb = (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1691 1692 /* We're safe here without the lockres lock... */ 1693 spin_lock(&oi->ip_lock); 1694 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1695 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1696 1697 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1698 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 1699 ocfs2_set_inode_flags(inode); 1700 1701 /* fast-symlinks are a special case */ 1702 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1703 inode->i_blocks = 0; 1704 else 1705 inode->i_blocks = ocfs2_inode_sector_count(inode); 1706 1707 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1708 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1709 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1710 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1711 ocfs2_unpack_timespec(&inode->i_atime, 1712 be64_to_cpu(lvb->lvb_iatime_packed)); 1713 ocfs2_unpack_timespec(&inode->i_mtime, 1714 be64_to_cpu(lvb->lvb_imtime_packed)); 1715 ocfs2_unpack_timespec(&inode->i_ctime, 1716 be64_to_cpu(lvb->lvb_ictime_packed)); 1717 spin_unlock(&oi->ip_lock); 1718 1719 mlog_exit_void(); 1720} 1721 1722static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1723 struct ocfs2_lock_res *lockres) 1724{ 1725 struct ocfs2_meta_lvb *lvb = 1726 (struct ocfs2_meta_lvb *)ocfs2_dlm_lvb(&lockres->l_lksb); 1727 1728 if (lvb->lvb_version == OCFS2_LVB_VERSION 1729 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1730 return 1; 1731 return 0; 1732} 1733 1734/* Determine whether a lock resource needs to be refreshed, and 1735 * arbitrate who gets to refresh it. 1736 * 1737 * 0 means no refresh needed. 1738 * 1739 * > 0 means you need to refresh this and you MUST call 1740 * ocfs2_complete_lock_res_refresh afterwards. */ 1741static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1742{ 1743 unsigned long flags; 1744 int status = 0; 1745 1746 mlog_entry_void(); 1747 1748refresh_check: 1749 spin_lock_irqsave(&lockres->l_lock, flags); 1750 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1751 spin_unlock_irqrestore(&lockres->l_lock, flags); 1752 goto bail; 1753 } 1754 1755 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 1756 spin_unlock_irqrestore(&lockres->l_lock, flags); 1757 1758 ocfs2_wait_on_refreshing_lock(lockres); 1759 goto refresh_check; 1760 } 1761 1762 /* Ok, I'll be the one to refresh this lock. */ 1763 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 1764 spin_unlock_irqrestore(&lockres->l_lock, flags); 1765 1766 status = 1; 1767bail: 1768 mlog_exit(status); 1769 return status; 1770} 1771 1772/* If status is non zero, I'll mark it as not being in refresh 1773 * anymroe, but i won't clear the needs refresh flag. */ 1774static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 1775 int status) 1776{ 1777 unsigned long flags; 1778 mlog_entry_void(); 1779 1780 spin_lock_irqsave(&lockres->l_lock, flags); 1781 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 1782 if (!status) 1783 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 1784 spin_unlock_irqrestore(&lockres->l_lock, flags); 1785 1786 wake_up(&lockres->l_event); 1787 1788 mlog_exit_void(); 1789} 1790 1791/* may or may not return a bh if it went to disk. */ 1792static int ocfs2_inode_lock_update(struct inode *inode, 1793 struct buffer_head **bh) 1794{ 1795 int status = 0; 1796 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1797 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1798 struct ocfs2_dinode *fe; 1799 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1800 1801 mlog_entry_void(); 1802 1803 if (ocfs2_mount_local(osb)) 1804 goto bail; 1805 1806 spin_lock(&oi->ip_lock); 1807 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1808 mlog(0, "Orphaned inode %llu was deleted while we " 1809 "were waiting on a lock. ip_flags = 0x%x\n", 1810 (unsigned long long)oi->ip_blkno, oi->ip_flags); 1811 spin_unlock(&oi->ip_lock); 1812 status = -ENOENT; 1813 goto bail; 1814 } 1815 spin_unlock(&oi->ip_lock); 1816 1817 if (!ocfs2_should_refresh_lock_res(lockres)) 1818 goto bail; 1819 1820 /* This will discard any caching information we might have had 1821 * for the inode metadata. */ 1822 ocfs2_metadata_cache_purge(inode); 1823 1824 ocfs2_extent_map_trunc(inode, 0); 1825 1826 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 1827 mlog(0, "Trusting LVB on inode %llu\n", 1828 (unsigned long long)oi->ip_blkno); 1829 ocfs2_refresh_inode_from_lvb(inode); 1830 } else { 1831 /* Boo, we have to go to disk. */ 1832 /* read bh, cast, ocfs2_refresh_inode */ 1833 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno, 1834 bh, OCFS2_BH_CACHED, inode); 1835 if (status < 0) { 1836 mlog_errno(status); 1837 goto bail_refresh; 1838 } 1839 fe = (struct ocfs2_dinode *) (*bh)->b_data; 1840 1841 /* This is a good chance to make sure we're not 1842 * locking an invalid object. 1843 * 1844 * We bug on a stale inode here because we checked 1845 * above whether it was wiped from disk. The wiping 1846 * node provides a guarantee that we receive that 1847 * message and can mark the inode before dropping any 1848 * locks associated with it. */ 1849 if (!OCFS2_IS_VALID_DINODE(fe)) { 1850 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe); 1851 status = -EIO; 1852 goto bail_refresh; 1853 } 1854 mlog_bug_on_msg(inode->i_generation != 1855 le32_to_cpu(fe->i_generation), 1856 "Invalid dinode %llu disk generation: %u " 1857 "inode->i_generation: %u\n", 1858 (unsigned long long)oi->ip_blkno, 1859 le32_to_cpu(fe->i_generation), 1860 inode->i_generation); 1861 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 1862 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 1863 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 1864 (unsigned long long)oi->ip_blkno, 1865 (unsigned long long)le64_to_cpu(fe->i_dtime), 1866 le32_to_cpu(fe->i_flags)); 1867 1868 ocfs2_refresh_inode(inode, fe); 1869 } 1870 1871 status = 0; 1872bail_refresh: 1873 ocfs2_complete_lock_res_refresh(lockres, status); 1874bail: 1875 mlog_exit(status); 1876 return status; 1877} 1878 1879static int ocfs2_assign_bh(struct inode *inode, 1880 struct buffer_head **ret_bh, 1881 struct buffer_head *passed_bh) 1882{ 1883 int status; 1884 1885 if (passed_bh) { 1886 /* Ok, the update went to disk for us, use the 1887 * returned bh. */ 1888 *ret_bh = passed_bh; 1889 get_bh(*ret_bh); 1890 1891 return 0; 1892 } 1893 1894 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), 1895 OCFS2_I(inode)->ip_blkno, 1896 ret_bh, 1897 OCFS2_BH_CACHED, 1898 inode); 1899 if (status < 0) 1900 mlog_errno(status); 1901 1902 return status; 1903} 1904 1905/* 1906 * returns < 0 error if the callback will never be called, otherwise 1907 * the result of the lock will be communicated via the callback. 1908 */ 1909int ocfs2_inode_lock_full(struct inode *inode, 1910 struct buffer_head **ret_bh, 1911 int ex, 1912 int arg_flags) 1913{ 1914 int status, level, acquired; 1915 u32 dlm_flags; 1916 struct ocfs2_lock_res *lockres = NULL; 1917 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1918 struct buffer_head *local_bh = NULL; 1919 1920 BUG_ON(!inode); 1921 1922 mlog_entry_void(); 1923 1924 mlog(0, "inode %llu, take %s META lock\n", 1925 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1926 ex ? "EXMODE" : "PRMODE"); 1927 1928 status = 0; 1929 acquired = 0; 1930 /* We'll allow faking a readonly metadata lock for 1931 * rodevices. */ 1932 if (ocfs2_is_hard_readonly(osb)) { 1933 if (ex) 1934 status = -EROFS; 1935 goto bail; 1936 } 1937 1938 if (ocfs2_mount_local(osb)) 1939 goto local; 1940 1941 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1942 ocfs2_wait_for_recovery(osb); 1943 1944 lockres = &OCFS2_I(inode)->ip_inode_lockres; 1945 level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 1946 dlm_flags = 0; 1947 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 1948 dlm_flags |= DLM_LKF_NOQUEUE; 1949 1950 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 1951 if (status < 0) { 1952 if (status != -EAGAIN && status != -EIOCBRETRY) 1953 mlog_errno(status); 1954 goto bail; 1955 } 1956 1957 /* Notify the error cleanup path to drop the cluster lock. */ 1958 acquired = 1; 1959 1960 /* We wait twice because a node may have died while we were in 1961 * the lower dlm layers. The second time though, we've 1962 * committed to owning this lock so we don't allow signals to 1963 * abort the operation. */ 1964 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1965 ocfs2_wait_for_recovery(osb); 1966 1967local: 1968 /* 1969 * We only see this flag if we're being called from 1970 * ocfs2_read_locked_inode(). It means we're locking an inode 1971 * which hasn't been populated yet, so clear the refresh flag 1972 * and let the caller handle it. 1973 */ 1974 if (inode->i_state & I_NEW) { 1975 status = 0; 1976 if (lockres) 1977 ocfs2_complete_lock_res_refresh(lockres, 0); 1978 goto bail; 1979 } 1980 1981 /* This is fun. The caller may want a bh back, or it may 1982 * not. ocfs2_inode_lock_update definitely wants one in, but 1983 * may or may not read one, depending on what's in the 1984 * LVB. The result of all of this is that we've *only* gone to 1985 * disk if we have to, so the complexity is worthwhile. */ 1986 status = ocfs2_inode_lock_update(inode, &local_bh); 1987 if (status < 0) { 1988 if (status != -ENOENT) 1989 mlog_errno(status); 1990 goto bail; 1991 } 1992 1993 if (ret_bh) { 1994 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 1995 if (status < 0) { 1996 mlog_errno(status); 1997 goto bail; 1998 } 1999 } 2000 2001bail: 2002 if (status < 0) { 2003 if (ret_bh && (*ret_bh)) { 2004 brelse(*ret_bh); 2005 *ret_bh = NULL; 2006 } 2007 if (acquired) 2008 ocfs2_inode_unlock(inode, ex); 2009 } 2010 2011 if (local_bh) 2012 brelse(local_bh); 2013 2014 mlog_exit(status); 2015 return status; 2016} 2017 2018/* 2019 * This is working around a lock inversion between tasks acquiring DLM 2020 * locks while holding a page lock and the downconvert thread which 2021 * blocks dlm lock acquiry while acquiring page locks. 2022 * 2023 * ** These _with_page variantes are only intended to be called from aop 2024 * methods that hold page locks and return a very specific *positive* error 2025 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2026 * 2027 * The DLM is called such that it returns -EAGAIN if it would have 2028 * blocked waiting for the downconvert thread. In that case we unlock 2029 * our page so the downconvert thread can make progress. Once we've 2030 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2031 * that called us can bubble that back up into the VFS who will then 2032 * immediately retry the aop call. 2033 * 2034 * We do a blocking lock and immediate unlock before returning, though, so that 2035 * the lock has a great chance of being cached on this node by the time the VFS 2036 * calls back to retry the aop. This has a potential to livelock as nodes 2037 * ping locks back and forth, but that's a risk we're willing to take to avoid 2038 * the lock inversion simply. 2039 */ 2040int ocfs2_inode_lock_with_page(struct inode *inode, 2041 struct buffer_head **ret_bh, 2042 int ex, 2043 struct page *page) 2044{ 2045 int ret; 2046 2047 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2048 if (ret == -EAGAIN) { 2049 unlock_page(page); 2050 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2051 ocfs2_inode_unlock(inode, ex); 2052 ret = AOP_TRUNCATED_PAGE; 2053 } 2054 2055 return ret; 2056} 2057 2058int ocfs2_inode_lock_atime(struct inode *inode, 2059 struct vfsmount *vfsmnt, 2060 int *level) 2061{ 2062 int ret; 2063 2064 mlog_entry_void(); 2065 ret = ocfs2_inode_lock(inode, NULL, 0); 2066 if (ret < 0) { 2067 mlog_errno(ret); 2068 return ret; 2069 } 2070 2071 /* 2072 * If we should update atime, we will get EX lock, 2073 * otherwise we just get PR lock. 2074 */ 2075 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2076 struct buffer_head *bh = NULL; 2077 2078 ocfs2_inode_unlock(inode, 0); 2079 ret = ocfs2_inode_lock(inode, &bh, 1); 2080 if (ret < 0) { 2081 mlog_errno(ret); 2082 return ret; 2083 } 2084 *level = 1; 2085 if (ocfs2_should_update_atime(inode, vfsmnt)) 2086 ocfs2_update_inode_atime(inode, bh); 2087 if (bh) 2088 brelse(bh); 2089 } else 2090 *level = 0; 2091 2092 mlog_exit(ret); 2093 return ret; 2094} 2095 2096void ocfs2_inode_unlock(struct inode *inode, 2097 int ex) 2098{ 2099 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2100 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2101 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2102 2103 mlog_entry_void(); 2104 2105 mlog(0, "inode %llu drop %s META lock\n", 2106 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2107 ex ? "EXMODE" : "PRMODE"); 2108 2109 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2110 !ocfs2_mount_local(osb)) 2111 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2112 2113 mlog_exit_void(); 2114} 2115 2116int ocfs2_super_lock(struct ocfs2_super *osb, 2117 int ex) 2118{ 2119 int status = 0; 2120 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2121 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2122 2123 mlog_entry_void(); 2124 2125 if (ocfs2_is_hard_readonly(osb)) 2126 return -EROFS; 2127 2128 if (ocfs2_mount_local(osb)) 2129 goto bail; 2130 2131 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2132 if (status < 0) { 2133 mlog_errno(status); 2134 goto bail; 2135 } 2136 2137 /* The super block lock path is really in the best position to 2138 * know when resources covered by the lock need to be 2139 * refreshed, so we do it here. Of course, making sense of 2140 * everything is up to the caller :) */ 2141 status = ocfs2_should_refresh_lock_res(lockres); 2142 if (status < 0) { 2143 mlog_errno(status); 2144 goto bail; 2145 } 2146 if (status) { 2147 status = ocfs2_refresh_slot_info(osb); 2148 2149 ocfs2_complete_lock_res_refresh(lockres, status); 2150 2151 if (status < 0) 2152 mlog_errno(status); 2153 } 2154bail: 2155 mlog_exit(status); 2156 return status; 2157} 2158 2159void ocfs2_super_unlock(struct ocfs2_super *osb, 2160 int ex) 2161{ 2162 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2163 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2164 2165 if (!ocfs2_mount_local(osb)) 2166 ocfs2_cluster_unlock(osb, lockres, level); 2167} 2168 2169int ocfs2_rename_lock(struct ocfs2_super *osb) 2170{ 2171 int status; 2172 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2173 2174 if (ocfs2_is_hard_readonly(osb)) 2175 return -EROFS; 2176 2177 if (ocfs2_mount_local(osb)) 2178 return 0; 2179 2180 status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0); 2181 if (status < 0) 2182 mlog_errno(status); 2183 2184 return status; 2185} 2186 2187void ocfs2_rename_unlock(struct ocfs2_super *osb) 2188{ 2189 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2190 2191 if (!ocfs2_mount_local(osb)) 2192 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX); 2193} 2194 2195int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2196{ 2197 int ret; 2198 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2199 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2200 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2201 2202 BUG_ON(!dl); 2203 2204 if (ocfs2_is_hard_readonly(osb)) 2205 return -EROFS; 2206 2207 if (ocfs2_mount_local(osb)) 2208 return 0; 2209 2210 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2211 if (ret < 0) 2212 mlog_errno(ret); 2213 2214 return ret; 2215} 2216 2217void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2218{ 2219 int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR; 2220 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2221 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2222 2223 if (!ocfs2_mount_local(osb)) 2224 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2225} 2226 2227/* Reference counting of the dlm debug structure. We want this because 2228 * open references on the debug inodes can live on after a mount, so 2229 * we can't rely on the ocfs2_super to always exist. */ 2230static void ocfs2_dlm_debug_free(struct kref *kref) 2231{ 2232 struct ocfs2_dlm_debug *dlm_debug; 2233 2234 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2235 2236 kfree(dlm_debug); 2237} 2238 2239void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2240{ 2241 if (dlm_debug) 2242 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2243} 2244 2245static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2246{ 2247 kref_get(&debug->d_refcnt); 2248} 2249 2250struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2251{ 2252 struct ocfs2_dlm_debug *dlm_debug; 2253 2254 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2255 if (!dlm_debug) { 2256 mlog_errno(-ENOMEM); 2257 goto out; 2258 } 2259 2260 kref_init(&dlm_debug->d_refcnt); 2261 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2262 dlm_debug->d_locking_state = NULL; 2263out: 2264 return dlm_debug; 2265} 2266 2267/* Access to this is arbitrated for us via seq_file->sem. */ 2268struct ocfs2_dlm_seq_priv { 2269 struct ocfs2_dlm_debug *p_dlm_debug; 2270 struct ocfs2_lock_res p_iter_res; 2271 struct ocfs2_lock_res p_tmp_res; 2272}; 2273 2274static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2275 struct ocfs2_dlm_seq_priv *priv) 2276{ 2277 struct ocfs2_lock_res *iter, *ret = NULL; 2278 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2279 2280 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2281 2282 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2283 /* discover the head of the list */ 2284 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2285 mlog(0, "End of list found, %p\n", ret); 2286 break; 2287 } 2288 2289 /* We track our "dummy" iteration lockres' by a NULL 2290 * l_ops field. */ 2291 if (iter->l_ops != NULL) { 2292 ret = iter; 2293 break; 2294 } 2295 } 2296 2297 return ret; 2298} 2299 2300static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2301{ 2302 struct ocfs2_dlm_seq_priv *priv = m->private; 2303 struct ocfs2_lock_res *iter; 2304 2305 spin_lock(&ocfs2_dlm_tracking_lock); 2306 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2307 if (iter) { 2308 /* Since lockres' have the lifetime of their container 2309 * (which can be inodes, ocfs2_supers, etc) we want to 2310 * copy this out to a temporary lockres while still 2311 * under the spinlock. Obviously after this we can't 2312 * trust any pointers on the copy returned, but that's 2313 * ok as the information we want isn't typically held 2314 * in them. */ 2315 priv->p_tmp_res = *iter; 2316 iter = &priv->p_tmp_res; 2317 } 2318 spin_unlock(&ocfs2_dlm_tracking_lock); 2319 2320 return iter; 2321} 2322 2323static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2324{ 2325} 2326 2327static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2328{ 2329 struct ocfs2_dlm_seq_priv *priv = m->private; 2330 struct ocfs2_lock_res *iter = v; 2331 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2332 2333 spin_lock(&ocfs2_dlm_tracking_lock); 2334 iter = ocfs2_dlm_next_res(iter, priv); 2335 list_del_init(&dummy->l_debug_list); 2336 if (iter) { 2337 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2338 priv->p_tmp_res = *iter; 2339 iter = &priv->p_tmp_res; 2340 } 2341 spin_unlock(&ocfs2_dlm_tracking_lock); 2342 2343 return iter; 2344} 2345 2346/* So that debugfs.ocfs2 can determine which format is being used */ 2347#define OCFS2_DLM_DEBUG_STR_VERSION 1 2348static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2349{ 2350 int i; 2351 char *lvb; 2352 struct ocfs2_lock_res *lockres = v; 2353 2354 if (!lockres) 2355 return -EINVAL; 2356 2357 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2358 2359 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2360 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2361 lockres->l_name, 2362 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2363 else 2364 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2365 2366 seq_printf(m, "%d\t" 2367 "0x%lx\t" 2368 "0x%x\t" 2369 "0x%x\t" 2370 "%u\t" 2371 "%u\t" 2372 "%d\t" 2373 "%d\t", 2374 lockres->l_level, 2375 lockres->l_flags, 2376 lockres->l_action, 2377 lockres->l_unlock_action, 2378 lockres->l_ro_holders, 2379 lockres->l_ex_holders, 2380 lockres->l_requested, 2381 lockres->l_blocking); 2382 2383 /* Dump the raw LVB */ 2384 lvb = ocfs2_dlm_lvb(&lockres->l_lksb); 2385 for(i = 0; i < DLM_LVB_LEN; i++) 2386 seq_printf(m, "0x%x\t", lvb[i]); 2387 2388 /* End the line */ 2389 seq_printf(m, "\n"); 2390 return 0; 2391} 2392 2393static const struct seq_operations ocfs2_dlm_seq_ops = { 2394 .start = ocfs2_dlm_seq_start, 2395 .stop = ocfs2_dlm_seq_stop, 2396 .next = ocfs2_dlm_seq_next, 2397 .show = ocfs2_dlm_seq_show, 2398}; 2399 2400static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2401{ 2402 struct seq_file *seq = (struct seq_file *) file->private_data; 2403 struct ocfs2_dlm_seq_priv *priv = seq->private; 2404 struct ocfs2_lock_res *res = &priv->p_iter_res; 2405 2406 ocfs2_remove_lockres_tracking(res); 2407 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2408 return seq_release_private(inode, file); 2409} 2410 2411static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2412{ 2413 int ret; 2414 struct ocfs2_dlm_seq_priv *priv; 2415 struct seq_file *seq; 2416 struct ocfs2_super *osb; 2417 2418 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2419 if (!priv) { 2420 ret = -ENOMEM; 2421 mlog_errno(ret); 2422 goto out; 2423 } 2424 osb = inode->i_private; 2425 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2426 priv->p_dlm_debug = osb->osb_dlm_debug; 2427 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2428 2429 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2430 if (ret) { 2431 kfree(priv); 2432 mlog_errno(ret); 2433 goto out; 2434 } 2435 2436 seq = (struct seq_file *) file->private_data; 2437 seq->private = priv; 2438 2439 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2440 priv->p_dlm_debug); 2441 2442out: 2443 return ret; 2444} 2445 2446static const struct file_operations ocfs2_dlm_debug_fops = { 2447 .open = ocfs2_dlm_debug_open, 2448 .release = ocfs2_dlm_debug_release, 2449 .read = seq_read, 2450 .llseek = seq_lseek, 2451}; 2452 2453static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2454{ 2455 int ret = 0; 2456 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2457 2458 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2459 S_IFREG|S_IRUSR, 2460 osb->osb_debug_root, 2461 osb, 2462 &ocfs2_dlm_debug_fops); 2463 if (!dlm_debug->d_locking_state) { 2464 ret = -EINVAL; 2465 mlog(ML_ERROR, 2466 "Unable to create locking state debugfs file.\n"); 2467 goto out; 2468 } 2469 2470 ocfs2_get_dlm_debug(dlm_debug); 2471out: 2472 return ret; 2473} 2474 2475static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2476{ 2477 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2478 2479 if (dlm_debug) { 2480 debugfs_remove(dlm_debug->d_locking_state); 2481 ocfs2_put_dlm_debug(dlm_debug); 2482 } 2483} 2484 2485int ocfs2_dlm_init(struct ocfs2_super *osb) 2486{ 2487 int status = 0; 2488 u32 dlm_key; 2489 struct dlm_ctxt *dlm = NULL; 2490 2491 mlog_entry_void(); 2492 2493 if (ocfs2_mount_local(osb)) 2494 goto local; 2495 2496 status = ocfs2_dlm_init_debug(osb); 2497 if (status < 0) { 2498 mlog_errno(status); 2499 goto bail; 2500 } 2501 2502 /* launch downconvert thread */ 2503 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 2504 if (IS_ERR(osb->dc_task)) { 2505 status = PTR_ERR(osb->dc_task); 2506 osb->dc_task = NULL; 2507 mlog_errno(status); 2508 goto bail; 2509 } 2510 2511 /* used by the dlm code to make message headers unique, each 2512 * node in this domain must agree on this. */ 2513 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str)); 2514 2515 /* for now, uuid == domain */ 2516 dlm = dlm_register_domain(osb->uuid_str, dlm_key, 2517 &osb->osb_locking_proto); 2518 if (IS_ERR(dlm)) { 2519 status = PTR_ERR(dlm); 2520 mlog_errno(status); 2521 goto bail; 2522 } 2523 2524 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb); 2525 2526local: 2527 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2528 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2529 2530 osb->dlm = dlm; 2531 2532 status = 0; 2533bail: 2534 if (status < 0) { 2535 ocfs2_dlm_shutdown_debug(osb); 2536 if (osb->dc_task) 2537 kthread_stop(osb->dc_task); 2538 } 2539 2540 mlog_exit(status); 2541 return status; 2542} 2543 2544void ocfs2_dlm_shutdown(struct ocfs2_super *osb) 2545{ 2546 mlog_entry_void(); 2547 2548 dlm_unregister_eviction_cb(&osb->osb_eviction_cb); 2549 2550 ocfs2_drop_osb_locks(osb); 2551 2552 if (osb->dc_task) { 2553 kthread_stop(osb->dc_task); 2554 osb->dc_task = NULL; 2555 } 2556 2557 ocfs2_lock_res_free(&osb->osb_super_lockres); 2558 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2559 2560 dlm_unregister_domain(osb->dlm); 2561 osb->dlm = NULL; 2562 2563 ocfs2_dlm_shutdown_debug(osb); 2564 2565 mlog_exit_void(); 2566} 2567 2568static void ocfs2_unlock_ast(void *opaque, int error) 2569{ 2570 struct ocfs2_lock_res *lockres = opaque; 2571 unsigned long flags; 2572 2573 mlog_entry_void(); 2574 2575 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2576 lockres->l_unlock_action); 2577 2578 spin_lock_irqsave(&lockres->l_lock, flags); 2579 /* We tried to cancel a convert request, but it was already 2580 * granted. All we want to do here is clear our unlock 2581 * state. The wake_up call done at the bottom is redundant 2582 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't 2583 * hurt anything anyway */ 2584 if (error == -DLM_ECANCEL && 2585 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2586 mlog(0, "Got cancelgrant for %s\n", lockres->l_name); 2587 2588 /* We don't clear the busy flag in this case as it 2589 * should have been cleared by the ast which the dlm 2590 * has called. */ 2591 goto complete_unlock; 2592 } 2593 2594 /* DLM_EUNLOCK is the success code for unlock */ 2595 if (error != -DLM_EUNLOCK) { 2596 mlog(ML_ERROR, "Dlm passes error %d for lock %s, " 2597 "unlock_action %d\n", error, lockres->l_name, 2598 lockres->l_unlock_action); 2599 spin_unlock_irqrestore(&lockres->l_lock, flags); 2600 return; 2601 } 2602 2603 switch(lockres->l_unlock_action) { 2604 case OCFS2_UNLOCK_CANCEL_CONVERT: 2605 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2606 lockres->l_action = OCFS2_AST_INVALID; 2607 break; 2608 case OCFS2_UNLOCK_DROP_LOCK: 2609 lockres->l_level = DLM_LOCK_IV; 2610 break; 2611 default: 2612 BUG(); 2613 } 2614 2615 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2616complete_unlock: 2617 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2618 spin_unlock_irqrestore(&lockres->l_lock, flags); 2619 2620 wake_up(&lockres->l_event); 2621 2622 mlog_exit_void(); 2623} 2624 2625static int ocfs2_drop_lock(struct ocfs2_super *osb, 2626 struct ocfs2_lock_res *lockres) 2627{ 2628 int ret; 2629 unsigned long flags; 2630 u32 lkm_flags = 0; 2631 2632 /* We didn't get anywhere near actually using this lockres. */ 2633 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2634 goto out; 2635 2636 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2637 lkm_flags |= DLM_LKF_VALBLK; 2638 2639 spin_lock_irqsave(&lockres->l_lock, flags); 2640 2641 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2642 "lockres %s, flags 0x%lx\n", 2643 lockres->l_name, lockres->l_flags); 2644 2645 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2646 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2647 "%u, unlock_action = %u\n", 2648 lockres->l_name, lockres->l_flags, lockres->l_action, 2649 lockres->l_unlock_action); 2650 2651 spin_unlock_irqrestore(&lockres->l_lock, flags); 2652 2653 /* XXX: Today we just wait on any busy 2654 * locks... Perhaps we need to cancel converts in the 2655 * future? */ 2656 ocfs2_wait_on_busy_lock(lockres); 2657 2658 spin_lock_irqsave(&lockres->l_lock, flags); 2659 } 2660 2661 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2662 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2663 lockres->l_level == DLM_LOCK_EX && 2664 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2665 lockres->l_ops->set_lvb(lockres); 2666 } 2667 2668 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2669 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2670 lockres->l_name); 2671 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2672 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2673 2674 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2675 spin_unlock_irqrestore(&lockres->l_lock, flags); 2676 goto out; 2677 } 2678 2679 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2680 2681 /* make sure we never get here while waiting for an ast to 2682 * fire. */ 2683 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2684 2685 /* is this necessary? */ 2686 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2687 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2688 spin_unlock_irqrestore(&lockres->l_lock, flags); 2689 2690 mlog(0, "lock %s\n", lockres->l_name); 2691 2692 ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, lkm_flags, 2693 lockres); 2694 if (ret) { 2695 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 2696 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2697 /* XXX Need to abstract this */ 2698 dlm_print_one_lock(lockres->l_lksb.lksb_o2dlm.lockid); 2699 BUG(); 2700 } 2701 mlog(0, "lock %s, successfull return from ocfs2_dlm_unlock\n", 2702 lockres->l_name); 2703 2704 ocfs2_wait_on_busy_lock(lockres); 2705out: 2706 mlog_exit(0); 2707 return 0; 2708} 2709 2710/* Mark the lockres as being dropped. It will no longer be 2711 * queued if blocking, but we still may have to wait on it 2712 * being dequeued from the downconvert thread before we can consider 2713 * it safe to drop. 2714 * 2715 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2716void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 2717{ 2718 int status; 2719 struct ocfs2_mask_waiter mw; 2720 unsigned long flags; 2721 2722 ocfs2_init_mask_waiter(&mw); 2723 2724 spin_lock_irqsave(&lockres->l_lock, flags); 2725 lockres->l_flags |= OCFS2_LOCK_FREEING; 2726 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 2727 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 2728 spin_unlock_irqrestore(&lockres->l_lock, flags); 2729 2730 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 2731 2732 status = ocfs2_wait_for_mask(&mw); 2733 if (status) 2734 mlog_errno(status); 2735 2736 spin_lock_irqsave(&lockres->l_lock, flags); 2737 } 2738 spin_unlock_irqrestore(&lockres->l_lock, flags); 2739} 2740 2741void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 2742 struct ocfs2_lock_res *lockres) 2743{ 2744 int ret; 2745 2746 ocfs2_mark_lockres_freeing(lockres); 2747 ret = ocfs2_drop_lock(osb, lockres); 2748 if (ret) 2749 mlog_errno(ret); 2750} 2751 2752static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2753{ 2754 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 2755 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 2756} 2757 2758int ocfs2_drop_inode_locks(struct inode *inode) 2759{ 2760 int status, err; 2761 2762 mlog_entry_void(); 2763 2764 /* No need to call ocfs2_mark_lockres_freeing here - 2765 * ocfs2_clear_inode has done it for us. */ 2766 2767 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2768 &OCFS2_I(inode)->ip_open_lockres); 2769 if (err < 0) 2770 mlog_errno(err); 2771 2772 status = err; 2773 2774 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2775 &OCFS2_I(inode)->ip_inode_lockres); 2776 if (err < 0) 2777 mlog_errno(err); 2778 if (err < 0 && !status) 2779 status = err; 2780 2781 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2782 &OCFS2_I(inode)->ip_rw_lockres); 2783 if (err < 0) 2784 mlog_errno(err); 2785 if (err < 0 && !status) 2786 status = err; 2787 2788 mlog_exit(status); 2789 return status; 2790} 2791 2792static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2793 int new_level) 2794{ 2795 assert_spin_locked(&lockres->l_lock); 2796 2797 BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); 2798 2799 if (lockres->l_level <= new_level) { 2800 mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n", 2801 lockres->l_level, new_level); 2802 BUG(); 2803 } 2804 2805 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 2806 lockres->l_name, new_level, lockres->l_blocking); 2807 2808 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2809 lockres->l_requested = new_level; 2810 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2811} 2812 2813static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2814 struct ocfs2_lock_res *lockres, 2815 int new_level, 2816 int lvb) 2817{ 2818 int ret; 2819 u32 dlm_flags = DLM_LKF_CONVERT; 2820 2821 mlog_entry_void(); 2822 2823 if (lvb) 2824 dlm_flags |= DLM_LKF_VALBLK; 2825 2826 ret = ocfs2_dlm_lock(osb->dlm, 2827 new_level, 2828 &lockres->l_lksb, 2829 dlm_flags, 2830 lockres->l_name, 2831 OCFS2_LOCK_ID_MAX_LEN - 1, 2832 lockres); 2833 if (ret) { 2834 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); 2835 ocfs2_recover_from_dlm_error(lockres, 1); 2836 goto bail; 2837 } 2838 2839 ret = 0; 2840bail: 2841 mlog_exit(ret); 2842 return ret; 2843} 2844 2845/* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */ 2846static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 2847 struct ocfs2_lock_res *lockres) 2848{ 2849 assert_spin_locked(&lockres->l_lock); 2850 2851 mlog_entry_void(); 2852 mlog(0, "lock %s\n", lockres->l_name); 2853 2854 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2855 /* If we're already trying to cancel a lock conversion 2856 * then just drop the spinlock and allow the caller to 2857 * requeue this lock. */ 2858 2859 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 2860 return 0; 2861 } 2862 2863 /* were we in a convert when we got the bast fire? */ 2864 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 2865 lockres->l_action != OCFS2_AST_DOWNCONVERT); 2866 /* set things up for the unlockast to know to just 2867 * clear out the ast_action and unset busy, etc. */ 2868 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 2869 2870 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 2871 "lock %s, invalid flags: 0x%lx\n", 2872 lockres->l_name, lockres->l_flags); 2873 2874 return 1; 2875} 2876 2877static int ocfs2_cancel_convert(struct ocfs2_super *osb, 2878 struct ocfs2_lock_res *lockres) 2879{ 2880 int ret; 2881 2882 mlog_entry_void(); 2883 mlog(0, "lock %s\n", lockres->l_name); 2884 2885 ret = ocfs2_dlm_unlock(osb->dlm, &lockres->l_lksb, 2886 DLM_LKF_CANCEL, lockres); 2887 if (ret) { 2888 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); 2889 ocfs2_recover_from_dlm_error(lockres, 0); 2890 } 2891 2892 mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name); 2893 2894 mlog_exit(ret); 2895 return ret; 2896} 2897 2898static int ocfs2_unblock_lock(struct ocfs2_super *osb, 2899 struct ocfs2_lock_res *lockres, 2900 struct ocfs2_unblock_ctl *ctl) 2901{ 2902 unsigned long flags; 2903 int blocking; 2904 int new_level; 2905 int ret = 0; 2906 int set_lvb = 0; 2907 2908 mlog_entry_void(); 2909 2910 spin_lock_irqsave(&lockres->l_lock, flags); 2911 2912 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 2913 2914recheck: 2915 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 2916 ctl->requeue = 1; 2917 ret = ocfs2_prepare_cancel_convert(osb, lockres); 2918 spin_unlock_irqrestore(&lockres->l_lock, flags); 2919 if (ret) { 2920 ret = ocfs2_cancel_convert(osb, lockres); 2921 if (ret < 0) 2922 mlog_errno(ret); 2923 } 2924 goto leave; 2925 } 2926 2927 /* if we're blocking an exclusive and we have *any* holders, 2928 * then requeue. */ 2929 if ((lockres->l_blocking == DLM_LOCK_EX) 2930 && (lockres->l_ex_holders || lockres->l_ro_holders)) 2931 goto leave_requeue; 2932 2933 /* If it's a PR we're blocking, then only 2934 * requeue if we've got any EX holders */ 2935 if (lockres->l_blocking == DLM_LOCK_PR && 2936 lockres->l_ex_holders) 2937 goto leave_requeue; 2938 2939 /* 2940 * Can we get a lock in this state if the holder counts are 2941 * zero? The meta data unblock code used to check this. 2942 */ 2943 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 2944 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 2945 goto leave_requeue; 2946 2947 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 2948 2949 if (lockres->l_ops->check_downconvert 2950 && !lockres->l_ops->check_downconvert(lockres, new_level)) 2951 goto leave_requeue; 2952 2953 /* If we get here, then we know that there are no more 2954 * incompatible holders (and anyone asking for an incompatible 2955 * lock is blocked). We can now downconvert the lock */ 2956 if (!lockres->l_ops->downconvert_worker) 2957 goto downconvert; 2958 2959 /* Some lockres types want to do a bit of work before 2960 * downconverting a lock. Allow that here. The worker function 2961 * may sleep, so we save off a copy of what we're blocking as 2962 * it may change while we're not holding the spin lock. */ 2963 blocking = lockres->l_blocking; 2964 spin_unlock_irqrestore(&lockres->l_lock, flags); 2965 2966 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 2967 2968 if (ctl->unblock_action == UNBLOCK_STOP_POST) 2969 goto leave; 2970 2971 spin_lock_irqsave(&lockres->l_lock, flags); 2972 if (blocking != lockres->l_blocking) { 2973 /* If this changed underneath us, then we can't drop 2974 * it just yet. */ 2975 goto recheck; 2976 } 2977 2978downconvert: 2979 ctl->requeue = 0; 2980 2981 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2982 if (lockres->l_level == DLM_LOCK_EX) 2983 set_lvb = 1; 2984 2985 /* 2986 * We only set the lvb if the lock has been fully 2987 * refreshed - otherwise we risk setting stale 2988 * data. Otherwise, there's no need to actually clear 2989 * out the lvb here as it's value is still valid. 2990 */ 2991 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2992 lockres->l_ops->set_lvb(lockres); 2993 } 2994 2995 ocfs2_prepare_downconvert(lockres, new_level); 2996 spin_unlock_irqrestore(&lockres->l_lock, flags); 2997 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 2998leave: 2999 mlog_exit(ret); 3000 return ret; 3001 3002leave_requeue: 3003 spin_unlock_irqrestore(&lockres->l_lock, flags); 3004 ctl->requeue = 1; 3005 3006 mlog_exit(0); 3007 return 0; 3008} 3009 3010static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3011 int blocking) 3012{ 3013 struct inode *inode; 3014 struct address_space *mapping; 3015 3016 inode = ocfs2_lock_res_inode(lockres); 3017 mapping = inode->i_mapping; 3018 3019 if (!S_ISREG(inode->i_mode)) 3020 goto out; 3021 3022 /* 3023 * We need this before the filemap_fdatawrite() so that it can 3024 * transfer the dirty bit from the PTE to the 3025 * page. Unfortunately this means that even for EX->PR 3026 * downconverts, we'll lose our mappings and have to build 3027 * them up again. 3028 */ 3029 unmap_mapping_range(mapping, 0, 0, 0); 3030 3031 if (filemap_fdatawrite(mapping)) { 3032 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3033 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3034 } 3035 sync_mapping_buffers(mapping); 3036 if (blocking == DLM_LOCK_EX) { 3037 truncate_inode_pages(mapping, 0); 3038 } else { 3039 /* We only need to wait on the I/O if we're not also 3040 * truncating pages because truncate_inode_pages waits 3041 * for us above. We don't truncate pages if we're 3042 * blocking anything < EXMODE because we want to keep 3043 * them around in that case. */ 3044 filemap_fdatawait(mapping); 3045 } 3046 3047out: 3048 return UNBLOCK_CONTINUE; 3049} 3050 3051static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3052 int new_level) 3053{ 3054 struct inode *inode = ocfs2_lock_res_inode(lockres); 3055 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 3056 3057 BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR); 3058 BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed); 3059 3060 if (checkpointed) 3061 return 1; 3062 3063 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb)); 3064 return 0; 3065} 3066 3067static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3068{ 3069 struct inode *inode = ocfs2_lock_res_inode(lockres); 3070 3071 __ocfs2_stuff_meta_lvb(inode); 3072} 3073 3074/* 3075 * Does the final reference drop on our dentry lock. Right now this 3076 * happens in the downconvert thread, but we could choose to simplify the 3077 * dlmglue API and push these off to the ocfs2_wq in the future. 3078 */ 3079static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3080 struct ocfs2_lock_res *lockres) 3081{ 3082 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3083 ocfs2_dentry_lock_put(osb, dl); 3084} 3085 3086/* 3087 * d_delete() matching dentries before the lock downconvert. 3088 * 3089 * At this point, any process waiting to destroy the 3090 * dentry_lock due to last ref count is stopped by the 3091 * OCFS2_LOCK_QUEUED flag. 3092 * 3093 * We have two potential problems 3094 * 3095 * 1) If we do the last reference drop on our dentry_lock (via dput) 3096 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3097 * the downconvert to finish. Instead we take an elevated 3098 * reference and push the drop until after we've completed our 3099 * unblock processing. 3100 * 3101 * 2) There might be another process with a final reference, 3102 * waiting on us to finish processing. If this is the case, we 3103 * detect it and exit out - there's no more dentries anyway. 3104 */ 3105static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3106 int blocking) 3107{ 3108 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3109 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3110 struct dentry *dentry; 3111 unsigned long flags; 3112 int extra_ref = 0; 3113 3114 /* 3115 * This node is blocking another node from getting a read 3116 * lock. This happens when we've renamed within a 3117 * directory. We've forced the other nodes to d_delete(), but 3118 * we never actually dropped our lock because it's still 3119 * valid. The downconvert code will retain a PR for this node, 3120 * so there's no further work to do. 3121 */ 3122 if (blocking == DLM_LOCK_PR) 3123 return UNBLOCK_CONTINUE; 3124 3125 /* 3126 * Mark this inode as potentially orphaned. The code in 3127 * ocfs2_delete_inode() will figure out whether it actually 3128 * needs to be freed or not. 3129 */ 3130 spin_lock(&oi->ip_lock); 3131 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3132 spin_unlock(&oi->ip_lock); 3133 3134 /* 3135 * Yuck. We need to make sure however that the check of 3136 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3137 * respect to a reference decrement or the setting of that 3138 * flag. 3139 */ 3140 spin_lock_irqsave(&lockres->l_lock, flags); 3141 spin_lock(&dentry_attach_lock); 3142 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3143 && dl->dl_count) { 3144 dl->dl_count++; 3145 extra_ref = 1; 3146 } 3147 spin_unlock(&dentry_attach_lock); 3148 spin_unlock_irqrestore(&lockres->l_lock, flags); 3149 3150 mlog(0, "extra_ref = %d\n", extra_ref); 3151 3152 /* 3153 * We have a process waiting on us in ocfs2_dentry_iput(), 3154 * which means we can't have any more outstanding 3155 * aliases. There's no need to do any more work. 3156 */ 3157 if (!extra_ref) 3158 return UNBLOCK_CONTINUE; 3159 3160 spin_lock(&dentry_attach_lock); 3161 while (1) { 3162 dentry = ocfs2_find_local_alias(dl->dl_inode, 3163 dl->dl_parent_blkno, 1); 3164 if (!dentry) 3165 break; 3166 spin_unlock(&dentry_attach_lock); 3167 3168 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3169 dentry->d_name.name); 3170 3171 /* 3172 * The following dcache calls may do an 3173 * iput(). Normally we don't want that from the 3174 * downconverting thread, but in this case it's ok 3175 * because the requesting node already has an 3176 * exclusive lock on the inode, so it can't be queued 3177 * for a downconvert. 3178 */ 3179 d_delete(dentry); 3180 dput(dentry); 3181 3182 spin_lock(&dentry_attach_lock); 3183 } 3184 spin_unlock(&dentry_attach_lock); 3185 3186 /* 3187 * If we are the last holder of this dentry lock, there is no 3188 * reason to downconvert so skip straight to the unlock. 3189 */ 3190 if (dl->dl_count == 1) 3191 return UNBLOCK_STOP_POST; 3192 3193 return UNBLOCK_CONTINUE_POST; 3194} 3195 3196static struct ocfs2_locking_protocol lproto = { 3197 .lp_lock_ast = ocfs2_locking_ast, 3198 .lp_blocking_ast = ocfs2_blocking_ast, 3199 .lp_unlock_ast = ocfs2_unlock_ast, 3200}; 3201 3202/* This interface isn't the final one, hence the less-than-perfect names */ 3203void dlmglue_init_stack(void) 3204{ 3205 o2cb_get_stack(&lproto); 3206} 3207 3208void dlmglue_exit_stack(void) 3209{ 3210 o2cb_put_stack(); 3211} 3212 3213static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3214 struct ocfs2_lock_res *lockres) 3215{ 3216 int status; 3217 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3218 unsigned long flags; 3219 3220 /* Our reference to the lockres in this function can be 3221 * considered valid until we remove the OCFS2_LOCK_QUEUED 3222 * flag. */ 3223 3224 mlog_entry_void(); 3225 3226 BUG_ON(!lockres); 3227 BUG_ON(!lockres->l_ops); 3228 3229 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3230 3231 /* Detect whether a lock has been marked as going away while 3232 * the downconvert thread was processing other things. A lock can 3233 * still be marked with OCFS2_LOCK_FREEING after this check, 3234 * but short circuiting here will still save us some 3235 * performance. */ 3236 spin_lock_irqsave(&lockres->l_lock, flags); 3237 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3238 goto unqueue; 3239 spin_unlock_irqrestore(&lockres->l_lock, flags); 3240 3241 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3242 if (status < 0) 3243 mlog_errno(status); 3244 3245 spin_lock_irqsave(&lockres->l_lock, flags); 3246unqueue: 3247 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3248 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3249 } else 3250 ocfs2_schedule_blocked_lock(osb, lockres); 3251 3252 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3253 ctl.requeue ? "yes" : "no"); 3254 spin_unlock_irqrestore(&lockres->l_lock, flags); 3255 3256 if (ctl.unblock_action != UNBLOCK_CONTINUE 3257 && lockres->l_ops->post_unlock) 3258 lockres->l_ops->post_unlock(osb, lockres); 3259 3260 mlog_exit_void(); 3261} 3262 3263static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3264 struct ocfs2_lock_res *lockres) 3265{ 3266 mlog_entry_void(); 3267 3268 assert_spin_locked(&lockres->l_lock); 3269 3270 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3271 /* Do not schedule a lock for downconvert when it's on 3272 * the way to destruction - any nodes wanting access 3273 * to the resource will get it soon. */ 3274 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 3275 lockres->l_name, lockres->l_flags); 3276 return; 3277 } 3278 3279 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3280 3281 spin_lock(&osb->dc_task_lock); 3282 if (list_empty(&lockres->l_blocked_list)) { 3283 list_add_tail(&lockres->l_blocked_list, 3284 &osb->blocked_lock_list); 3285 osb->blocked_lock_count++; 3286 } 3287 spin_unlock(&osb->dc_task_lock); 3288 3289 mlog_exit_void(); 3290} 3291 3292static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 3293{ 3294 unsigned long processed; 3295 struct ocfs2_lock_res *lockres; 3296 3297 mlog_entry_void(); 3298 3299 spin_lock(&osb->dc_task_lock); 3300 /* grab this early so we know to try again if a state change and 3301 * wake happens part-way through our work */ 3302 osb->dc_work_sequence = osb->dc_wake_sequence; 3303 3304 processed = osb->blocked_lock_count; 3305 while (processed) { 3306 BUG_ON(list_empty(&osb->blocked_lock_list)); 3307 3308 lockres = list_entry(osb->blocked_lock_list.next, 3309 struct ocfs2_lock_res, l_blocked_list); 3310 list_del_init(&lockres->l_blocked_list); 3311 osb->blocked_lock_count--; 3312 spin_unlock(&osb->dc_task_lock); 3313 3314 BUG_ON(!processed); 3315 processed--; 3316 3317 ocfs2_process_blocked_lock(osb, lockres); 3318 3319 spin_lock(&osb->dc_task_lock); 3320 } 3321 spin_unlock(&osb->dc_task_lock); 3322 3323 mlog_exit_void(); 3324} 3325 3326static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 3327{ 3328 int empty = 0; 3329 3330 spin_lock(&osb->dc_task_lock); 3331 if (list_empty(&osb->blocked_lock_list)) 3332 empty = 1; 3333 3334 spin_unlock(&osb->dc_task_lock); 3335 return empty; 3336} 3337 3338static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 3339{ 3340 int should_wake = 0; 3341 3342 spin_lock(&osb->dc_task_lock); 3343 if (osb->dc_work_sequence != osb->dc_wake_sequence) 3344 should_wake = 1; 3345 spin_unlock(&osb->dc_task_lock); 3346 3347 return should_wake; 3348} 3349 3350static int ocfs2_downconvert_thread(void *arg) 3351{ 3352 int status = 0; 3353 struct ocfs2_super *osb = arg; 3354 3355 /* only quit once we've been asked to stop and there is no more 3356 * work available */ 3357 while (!(kthread_should_stop() && 3358 ocfs2_downconvert_thread_lists_empty(osb))) { 3359 3360 wait_event_interruptible(osb->dc_event, 3361 ocfs2_downconvert_thread_should_wake(osb) || 3362 kthread_should_stop()); 3363 3364 mlog(0, "downconvert_thread: awoken\n"); 3365 3366 ocfs2_downconvert_thread_do_work(osb); 3367 } 3368 3369 osb->dc_task = NULL; 3370 return status; 3371} 3372 3373void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 3374{ 3375 spin_lock(&osb->dc_task_lock); 3376 /* make sure the voting thread gets a swipe at whatever changes 3377 * the caller may have made to the voting state */ 3378 osb->dc_wake_sequence++; 3379 spin_unlock(&osb->dc_task_lock); 3380 wake_up(&osb->dc_event); 3381} 3382