dlmglue.c revision d24fbcda0c4988322949df3d759f1cfb32b32953
1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * dlmglue.c 5 * 6 * Code which implements an OCFS2 specific interface to our DLM. 7 * 8 * Copyright (C) 2003, 2004 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation; either 13 * version 2 of the License, or (at your option) any later version. 14 * 15 * This program is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 18 * General Public License for more details. 19 * 20 * You should have received a copy of the GNU General Public 21 * License along with this program; if not, write to the 22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 23 * Boston, MA 021110-1307, USA. 24 */ 25 26#include <linux/types.h> 27#include <linux/slab.h> 28#include <linux/highmem.h> 29#include <linux/mm.h> 30#include <linux/crc32.h> 31#include <linux/kthread.h> 32#include <linux/pagemap.h> 33#include <linux/debugfs.h> 34#include <linux/seq_file.h> 35 36#include <cluster/heartbeat.h> 37#include <cluster/nodemanager.h> 38#include <cluster/tcp.h> 39 40#include <dlm/dlmapi.h> 41 42#define MLOG_MASK_PREFIX ML_DLM_GLUE 43#include <cluster/masklog.h> 44 45#include "ocfs2.h" 46#include "ocfs2_lockingver.h" 47 48#include "alloc.h" 49#include "dcache.h" 50#include "dlmglue.h" 51#include "extent_map.h" 52#include "file.h" 53#include "heartbeat.h" 54#include "inode.h" 55#include "journal.h" 56#include "slot_map.h" 57#include "super.h" 58#include "uptodate.h" 59 60#include "buffer_head_io.h" 61 62struct ocfs2_mask_waiter { 63 struct list_head mw_item; 64 int mw_status; 65 struct completion mw_complete; 66 unsigned long mw_mask; 67 unsigned long mw_goal; 68}; 69 70static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres); 71static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres); 72static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres); 73 74/* 75 * Return value from ->downconvert_worker functions. 76 * 77 * These control the precise actions of ocfs2_unblock_lock() 78 * and ocfs2_process_blocked_lock() 79 * 80 */ 81enum ocfs2_unblock_action { 82 UNBLOCK_CONTINUE = 0, /* Continue downconvert */ 83 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire 84 * ->post_unlock callback */ 85 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire 86 * ->post_unlock() callback. */ 87}; 88 89struct ocfs2_unblock_ctl { 90 int requeue; 91 enum ocfs2_unblock_action unblock_action; 92}; 93 94static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 95 int new_level); 96static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres); 97 98static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 99 int blocking); 100 101static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 102 int blocking); 103 104static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 105 struct ocfs2_lock_res *lockres); 106 107 108#define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres) 109 110/* This aids in debugging situations where a bad LVB might be involved. */ 111static void ocfs2_dump_meta_lvb_info(u64 level, 112 const char *function, 113 unsigned int line, 114 struct ocfs2_lock_res *lockres) 115{ 116 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 117 118 mlog(level, "LVB information for %s (called from %s:%u):\n", 119 lockres->l_name, function, line); 120 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n", 121 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters), 122 be32_to_cpu(lvb->lvb_igeneration)); 123 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n", 124 (unsigned long long)be64_to_cpu(lvb->lvb_isize), 125 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid), 126 be16_to_cpu(lvb->lvb_imode)); 127 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, " 128 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink), 129 (long long)be64_to_cpu(lvb->lvb_iatime_packed), 130 (long long)be64_to_cpu(lvb->lvb_ictime_packed), 131 (long long)be64_to_cpu(lvb->lvb_imtime_packed), 132 be32_to_cpu(lvb->lvb_iattr)); 133} 134 135 136/* 137 * OCFS2 Lock Resource Operations 138 * 139 * These fine tune the behavior of the generic dlmglue locking infrastructure. 140 * 141 * The most basic of lock types can point ->l_priv to their respective 142 * struct ocfs2_super and allow the default actions to manage things. 143 * 144 * Right now, each lock type also needs to implement an init function, 145 * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres() 146 * should be called when the lock is no longer needed (i.e., object 147 * destruction time). 148 */ 149struct ocfs2_lock_res_ops { 150 /* 151 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define 152 * this callback if ->l_priv is not an ocfs2_super pointer 153 */ 154 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *); 155 156 /* 157 * Optionally called in the downconvert thread after a 158 * successful downconvert. The lockres will not be referenced 159 * after this callback is called, so it is safe to free 160 * memory, etc. 161 * 162 * The exact semantics of when this is called are controlled 163 * by ->downconvert_worker() 164 */ 165 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *); 166 167 /* 168 * Allow a lock type to add checks to determine whether it is 169 * safe to downconvert a lock. Return 0 to re-queue the 170 * downconvert at a later time, nonzero to continue. 171 * 172 * For most locks, the default checks that there are no 173 * incompatible holders are sufficient. 174 * 175 * Called with the lockres spinlock held. 176 */ 177 int (*check_downconvert)(struct ocfs2_lock_res *, int); 178 179 /* 180 * Allows a lock type to populate the lock value block. This 181 * is called on downconvert, and when we drop a lock. 182 * 183 * Locks that want to use this should set LOCK_TYPE_USES_LVB 184 * in the flags field. 185 * 186 * Called with the lockres spinlock held. 187 */ 188 void (*set_lvb)(struct ocfs2_lock_res *); 189 190 /* 191 * Called from the downconvert thread when it is determined 192 * that a lock will be downconverted. This is called without 193 * any locks held so the function can do work that might 194 * schedule (syncing out data, etc). 195 * 196 * This should return any one of the ocfs2_unblock_action 197 * values, depending on what it wants the thread to do. 198 */ 199 int (*downconvert_worker)(struct ocfs2_lock_res *, int); 200 201 /* 202 * LOCK_TYPE_* flags which describe the specific requirements 203 * of a lock type. Descriptions of each individual flag follow. 204 */ 205 int flags; 206}; 207 208/* 209 * Some locks want to "refresh" potentially stale data when a 210 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this 211 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the 212 * individual lockres l_flags member from the ast function. It is 213 * expected that the locking wrapper will clear the 214 * OCFS2_LOCK_NEEDS_REFRESH flag when done. 215 */ 216#define LOCK_TYPE_REQUIRES_REFRESH 0x1 217 218/* 219 * Indicate that a lock type makes use of the lock value block. The 220 * ->set_lvb lock type callback must be defined. 221 */ 222#define LOCK_TYPE_USES_LVB 0x2 223 224static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = { 225 .get_osb = ocfs2_get_inode_osb, 226 .flags = 0, 227}; 228 229static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = { 230 .get_osb = ocfs2_get_inode_osb, 231 .check_downconvert = ocfs2_check_meta_downconvert, 232 .set_lvb = ocfs2_set_meta_lvb, 233 .downconvert_worker = ocfs2_data_convert_worker, 234 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB, 235}; 236 237static struct ocfs2_lock_res_ops ocfs2_super_lops = { 238 .flags = LOCK_TYPE_REQUIRES_REFRESH, 239}; 240 241static struct ocfs2_lock_res_ops ocfs2_rename_lops = { 242 .flags = 0, 243}; 244 245static struct ocfs2_lock_res_ops ocfs2_dentry_lops = { 246 .get_osb = ocfs2_get_dentry_osb, 247 .post_unlock = ocfs2_dentry_post_unlock, 248 .downconvert_worker = ocfs2_dentry_convert_worker, 249 .flags = 0, 250}; 251 252static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = { 253 .get_osb = ocfs2_get_inode_osb, 254 .flags = 0, 255}; 256 257static struct ocfs2_lock_res_ops ocfs2_flock_lops = { 258 .get_osb = ocfs2_get_file_osb, 259 .flags = 0, 260}; 261 262/* 263 * This is the filesystem locking protocol version. 264 * 265 * Whenever the filesystem does new things with locks (adds or removes a 266 * lock, orders them differently, does different things underneath a lock), 267 * the version must be changed. The protocol is negotiated when joining 268 * the dlm domain. A node may join the domain if its major version is 269 * identical to all other nodes and its minor version is greater than 270 * or equal to all other nodes. When its minor version is greater than 271 * the other nodes, it will run at the minor version specified by the 272 * other nodes. 273 * 274 * If a locking change is made that will not be compatible with older 275 * versions, the major number must be increased and the minor version set 276 * to zero. If a change merely adds a behavior that can be disabled when 277 * speaking to older versions, the minor version must be increased. If a 278 * change adds a fully backwards compatible change (eg, LVB changes that 279 * are just ignored by older versions), the version does not need to be 280 * updated. 281 */ 282const struct dlm_protocol_version ocfs2_locking_protocol = { 283 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, 284 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, 285}; 286 287static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) 288{ 289 return lockres->l_type == OCFS2_LOCK_TYPE_META || 290 lockres->l_type == OCFS2_LOCK_TYPE_RW || 291 lockres->l_type == OCFS2_LOCK_TYPE_OPEN; 292} 293 294static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) 295{ 296 BUG_ON(!ocfs2_is_inode_lock(lockres)); 297 298 return (struct inode *) lockres->l_priv; 299} 300 301static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres) 302{ 303 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY); 304 305 return (struct ocfs2_dentry_lock *)lockres->l_priv; 306} 307 308static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres) 309{ 310 if (lockres->l_ops->get_osb) 311 return lockres->l_ops->get_osb(lockres); 312 313 return (struct ocfs2_super *)lockres->l_priv; 314} 315 316static int ocfs2_lock_create(struct ocfs2_super *osb, 317 struct ocfs2_lock_res *lockres, 318 int level, 319 int dlm_flags); 320static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 321 int wanted); 322static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 323 struct ocfs2_lock_res *lockres, 324 int level); 325static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres); 326static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres); 327static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres); 328static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level); 329static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 330 struct ocfs2_lock_res *lockres); 331static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 332 int convert); 333#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \ 334 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ 335 "resource %s: %s\n", dlm_errname(_stat), _func, \ 336 _lockres->l_name, dlm_errmsg(_stat)); \ 337} while (0) 338static int ocfs2_downconvert_thread(void *arg); 339static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 340 struct ocfs2_lock_res *lockres); 341static int ocfs2_inode_lock_update(struct inode *inode, 342 struct buffer_head **bh); 343static void ocfs2_drop_osb_locks(struct ocfs2_super *osb); 344static inline int ocfs2_highest_compat_lock_level(int level); 345static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 346 int new_level); 347static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 348 struct ocfs2_lock_res *lockres, 349 int new_level, 350 int lvb); 351static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 352 struct ocfs2_lock_res *lockres); 353static int ocfs2_cancel_convert(struct ocfs2_super *osb, 354 struct ocfs2_lock_res *lockres); 355 356 357static void ocfs2_build_lock_name(enum ocfs2_lock_type type, 358 u64 blkno, 359 u32 generation, 360 char *name) 361{ 362 int len; 363 364 mlog_entry_void(); 365 366 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES); 367 368 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x", 369 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD, 370 (long long)blkno, generation); 371 372 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1)); 373 374 mlog(0, "built lock resource with name: %s\n", name); 375 376 mlog_exit_void(); 377} 378 379static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock); 380 381static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res, 382 struct ocfs2_dlm_debug *dlm_debug) 383{ 384 mlog(0, "Add tracking for lockres %s\n", res->l_name); 385 386 spin_lock(&ocfs2_dlm_tracking_lock); 387 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking); 388 spin_unlock(&ocfs2_dlm_tracking_lock); 389} 390 391static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res) 392{ 393 spin_lock(&ocfs2_dlm_tracking_lock); 394 if (!list_empty(&res->l_debug_list)) 395 list_del_init(&res->l_debug_list); 396 spin_unlock(&ocfs2_dlm_tracking_lock); 397} 398 399static void ocfs2_lock_res_init_common(struct ocfs2_super *osb, 400 struct ocfs2_lock_res *res, 401 enum ocfs2_lock_type type, 402 struct ocfs2_lock_res_ops *ops, 403 void *priv) 404{ 405 res->l_type = type; 406 res->l_ops = ops; 407 res->l_priv = priv; 408 409 res->l_level = LKM_IVMODE; 410 res->l_requested = LKM_IVMODE; 411 res->l_blocking = LKM_IVMODE; 412 res->l_action = OCFS2_AST_INVALID; 413 res->l_unlock_action = OCFS2_UNLOCK_INVALID; 414 415 res->l_flags = OCFS2_LOCK_INITIALIZED; 416 417 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug); 418} 419 420void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res) 421{ 422 /* This also clears out the lock status block */ 423 memset(res, 0, sizeof(struct ocfs2_lock_res)); 424 spin_lock_init(&res->l_lock); 425 init_waitqueue_head(&res->l_event); 426 INIT_LIST_HEAD(&res->l_blocked_list); 427 INIT_LIST_HEAD(&res->l_mask_waiters); 428} 429 430void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res, 431 enum ocfs2_lock_type type, 432 unsigned int generation, 433 struct inode *inode) 434{ 435 struct ocfs2_lock_res_ops *ops; 436 437 switch(type) { 438 case OCFS2_LOCK_TYPE_RW: 439 ops = &ocfs2_inode_rw_lops; 440 break; 441 case OCFS2_LOCK_TYPE_META: 442 ops = &ocfs2_inode_inode_lops; 443 break; 444 case OCFS2_LOCK_TYPE_OPEN: 445 ops = &ocfs2_inode_open_lops; 446 break; 447 default: 448 mlog_bug_on_msg(1, "type: %d\n", type); 449 ops = NULL; /* thanks, gcc */ 450 break; 451 }; 452 453 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno, 454 generation, res->l_name); 455 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode); 456} 457 458static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres) 459{ 460 struct inode *inode = ocfs2_lock_res_inode(lockres); 461 462 return OCFS2_SB(inode->i_sb); 463} 464 465static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres) 466{ 467 struct ocfs2_file_private *fp = lockres->l_priv; 468 469 return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb); 470} 471 472static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres) 473{ 474 __be64 inode_blkno_be; 475 476 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], 477 sizeof(__be64)); 478 479 return be64_to_cpu(inode_blkno_be); 480} 481 482static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres) 483{ 484 struct ocfs2_dentry_lock *dl = lockres->l_priv; 485 486 return OCFS2_SB(dl->dl_inode->i_sb); 487} 488 489void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl, 490 u64 parent, struct inode *inode) 491{ 492 int len; 493 u64 inode_blkno = OCFS2_I(inode)->ip_blkno; 494 __be64 inode_blkno_be = cpu_to_be64(inode_blkno); 495 struct ocfs2_lock_res *lockres = &dl->dl_lockres; 496 497 ocfs2_lock_res_init_once(lockres); 498 499 /* 500 * Unfortunately, the standard lock naming scheme won't work 501 * here because we have two 16 byte values to use. Instead, 502 * we'll stuff the inode number as a binary value. We still 503 * want error prints to show something without garbling the 504 * display, so drop a null byte in there before the inode 505 * number. A future version of OCFS2 will likely use all 506 * binary lock names. The stringified names have been a 507 * tremendous aid in debugging, but now that the debugfs 508 * interface exists, we can mangle things there if need be. 509 * 510 * NOTE: We also drop the standard "pad" value (the total lock 511 * name size stays the same though - the last part is all 512 * zeros due to the memset in ocfs2_lock_res_init_once() 513 */ 514 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START, 515 "%c%016llx", 516 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY), 517 (long long)parent); 518 519 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1)); 520 521 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be, 522 sizeof(__be64)); 523 524 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 525 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops, 526 dl); 527} 528 529static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res, 530 struct ocfs2_super *osb) 531{ 532 /* Superblock lockres doesn't come from a slab so we call init 533 * once on it manually. */ 534 ocfs2_lock_res_init_once(res); 535 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO, 536 0, res->l_name); 537 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER, 538 &ocfs2_super_lops, osb); 539} 540 541static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res, 542 struct ocfs2_super *osb) 543{ 544 /* Rename lockres doesn't come from a slab so we call init 545 * once on it manually. */ 546 ocfs2_lock_res_init_once(res); 547 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name); 548 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME, 549 &ocfs2_rename_lops, osb); 550} 551 552void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres, 553 struct ocfs2_file_private *fp) 554{ 555 struct inode *inode = fp->fp_file->f_mapping->host; 556 struct ocfs2_inode_info *oi = OCFS2_I(inode); 557 558 ocfs2_lock_res_init_once(lockres); 559 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno, 560 inode->i_generation, lockres->l_name); 561 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres, 562 OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops, 563 fp); 564 lockres->l_flags |= OCFS2_LOCK_NOCACHE; 565} 566 567void ocfs2_lock_res_free(struct ocfs2_lock_res *res) 568{ 569 mlog_entry_void(); 570 571 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED)) 572 return; 573 574 ocfs2_remove_lockres_tracking(res); 575 576 mlog_bug_on_msg(!list_empty(&res->l_blocked_list), 577 "Lockres %s is on the blocked list\n", 578 res->l_name); 579 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters), 580 "Lockres %s has mask waiters pending\n", 581 res->l_name); 582 mlog_bug_on_msg(spin_is_locked(&res->l_lock), 583 "Lockres %s is locked\n", 584 res->l_name); 585 mlog_bug_on_msg(res->l_ro_holders, 586 "Lockres %s has %u ro holders\n", 587 res->l_name, res->l_ro_holders); 588 mlog_bug_on_msg(res->l_ex_holders, 589 "Lockres %s has %u ex holders\n", 590 res->l_name, res->l_ex_holders); 591 592 /* Need to clear out the lock status block for the dlm */ 593 memset(&res->l_lksb, 0, sizeof(res->l_lksb)); 594 595 res->l_flags = 0UL; 596 mlog_exit_void(); 597} 598 599static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres, 600 int level) 601{ 602 mlog_entry_void(); 603 604 BUG_ON(!lockres); 605 606 switch(level) { 607 case LKM_EXMODE: 608 lockres->l_ex_holders++; 609 break; 610 case LKM_PRMODE: 611 lockres->l_ro_holders++; 612 break; 613 default: 614 BUG(); 615 } 616 617 mlog_exit_void(); 618} 619 620static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres, 621 int level) 622{ 623 mlog_entry_void(); 624 625 BUG_ON(!lockres); 626 627 switch(level) { 628 case LKM_EXMODE: 629 BUG_ON(!lockres->l_ex_holders); 630 lockres->l_ex_holders--; 631 break; 632 case LKM_PRMODE: 633 BUG_ON(!lockres->l_ro_holders); 634 lockres->l_ro_holders--; 635 break; 636 default: 637 BUG(); 638 } 639 mlog_exit_void(); 640} 641 642/* WARNING: This function lives in a world where the only three lock 643 * levels are EX, PR, and NL. It *will* have to be adjusted when more 644 * lock types are added. */ 645static inline int ocfs2_highest_compat_lock_level(int level) 646{ 647 int new_level = LKM_EXMODE; 648 649 if (level == LKM_EXMODE) 650 new_level = LKM_NLMODE; 651 else if (level == LKM_PRMODE) 652 new_level = LKM_PRMODE; 653 return new_level; 654} 655 656static void lockres_set_flags(struct ocfs2_lock_res *lockres, 657 unsigned long newflags) 658{ 659 struct ocfs2_mask_waiter *mw, *tmp; 660 661 assert_spin_locked(&lockres->l_lock); 662 663 lockres->l_flags = newflags; 664 665 list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) { 666 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 667 continue; 668 669 list_del_init(&mw->mw_item); 670 mw->mw_status = 0; 671 complete(&mw->mw_complete); 672 } 673} 674static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or) 675{ 676 lockres_set_flags(lockres, lockres->l_flags | or); 677} 678static void lockres_clear_flags(struct ocfs2_lock_res *lockres, 679 unsigned long clear) 680{ 681 lockres_set_flags(lockres, lockres->l_flags & ~clear); 682} 683 684static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres) 685{ 686 mlog_entry_void(); 687 688 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 689 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 690 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 691 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 692 693 lockres->l_level = lockres->l_requested; 694 if (lockres->l_level <= 695 ocfs2_highest_compat_lock_level(lockres->l_blocking)) { 696 lockres->l_blocking = LKM_NLMODE; 697 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); 698 } 699 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 700 701 mlog_exit_void(); 702} 703 704static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres) 705{ 706 mlog_entry_void(); 707 708 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY)); 709 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED)); 710 711 /* Convert from RO to EX doesn't really need anything as our 712 * information is already up to data. Convert from NL to 713 * *anything* however should mark ourselves as needing an 714 * update */ 715 if (lockres->l_level == LKM_NLMODE && 716 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 717 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 718 719 lockres->l_level = lockres->l_requested; 720 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 721 722 mlog_exit_void(); 723} 724 725static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres) 726{ 727 mlog_entry_void(); 728 729 BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY))); 730 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 731 732 if (lockres->l_requested > LKM_NLMODE && 733 !(lockres->l_flags & OCFS2_LOCK_LOCAL) && 734 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 735 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 736 737 lockres->l_level = lockres->l_requested; 738 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED); 739 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 740 741 mlog_exit_void(); 742} 743 744static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, 745 int level) 746{ 747 int needs_downconvert = 0; 748 mlog_entry_void(); 749 750 assert_spin_locked(&lockres->l_lock); 751 752 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 753 754 if (level > lockres->l_blocking) { 755 /* only schedule a downconvert if we haven't already scheduled 756 * one that goes low enough to satisfy the level we're 757 * blocking. this also catches the case where we get 758 * duplicate BASTs */ 759 if (ocfs2_highest_compat_lock_level(level) < 760 ocfs2_highest_compat_lock_level(lockres->l_blocking)) 761 needs_downconvert = 1; 762 763 lockres->l_blocking = level; 764 } 765 766 mlog_exit(needs_downconvert); 767 return needs_downconvert; 768} 769 770static void ocfs2_blocking_ast(void *opaque, int level) 771{ 772 struct ocfs2_lock_res *lockres = opaque; 773 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 774 int needs_downconvert; 775 unsigned long flags; 776 777 BUG_ON(level <= LKM_NLMODE); 778 779 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", 780 lockres->l_name, level, lockres->l_level, 781 ocfs2_lock_type_string(lockres->l_type)); 782 783 /* 784 * We can skip the bast for locks which don't enable caching - 785 * they'll be dropped at the earliest possible time anyway. 786 */ 787 if (lockres->l_flags & OCFS2_LOCK_NOCACHE) 788 return; 789 790 spin_lock_irqsave(&lockres->l_lock, flags); 791 needs_downconvert = ocfs2_generic_handle_bast(lockres, level); 792 if (needs_downconvert) 793 ocfs2_schedule_blocked_lock(osb, lockres); 794 spin_unlock_irqrestore(&lockres->l_lock, flags); 795 796 wake_up(&lockres->l_event); 797 798 ocfs2_wake_downconvert_thread(osb); 799} 800 801static void ocfs2_locking_ast(void *opaque) 802{ 803 struct ocfs2_lock_res *lockres = opaque; 804 struct dlm_lockstatus *lksb = &lockres->l_lksb; 805 unsigned long flags; 806 807 spin_lock_irqsave(&lockres->l_lock, flags); 808 809 if (lksb->status != DLM_NORMAL) { 810 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n", 811 lockres->l_name, lksb->status); 812 spin_unlock_irqrestore(&lockres->l_lock, flags); 813 return; 814 } 815 816 switch(lockres->l_action) { 817 case OCFS2_AST_ATTACH: 818 ocfs2_generic_handle_attach_action(lockres); 819 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); 820 break; 821 case OCFS2_AST_CONVERT: 822 ocfs2_generic_handle_convert_action(lockres); 823 break; 824 case OCFS2_AST_DOWNCONVERT: 825 ocfs2_generic_handle_downconvert_action(lockres); 826 break; 827 default: 828 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " 829 "lockres flags = 0x%lx, unlock action: %u\n", 830 lockres->l_name, lockres->l_action, lockres->l_flags, 831 lockres->l_unlock_action); 832 BUG(); 833 } 834 835 /* set it to something invalid so if we get called again we 836 * can catch it. */ 837 lockres->l_action = OCFS2_AST_INVALID; 838 839 wake_up(&lockres->l_event); 840 spin_unlock_irqrestore(&lockres->l_lock, flags); 841} 842 843static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, 844 int convert) 845{ 846 unsigned long flags; 847 848 mlog_entry_void(); 849 spin_lock_irqsave(&lockres->l_lock, flags); 850 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 851 if (convert) 852 lockres->l_action = OCFS2_AST_INVALID; 853 else 854 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 855 spin_unlock_irqrestore(&lockres->l_lock, flags); 856 857 wake_up(&lockres->l_event); 858 mlog_exit_void(); 859} 860 861/* Note: If we detect another process working on the lock (i.e., 862 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller 863 * to do the right thing in that case. 864 */ 865static int ocfs2_lock_create(struct ocfs2_super *osb, 866 struct ocfs2_lock_res *lockres, 867 int level, 868 int dlm_flags) 869{ 870 int ret = 0; 871 enum dlm_status status = DLM_NORMAL; 872 unsigned long flags; 873 874 mlog_entry_void(); 875 876 mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level, 877 dlm_flags); 878 879 spin_lock_irqsave(&lockres->l_lock, flags); 880 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || 881 (lockres->l_flags & OCFS2_LOCK_BUSY)) { 882 spin_unlock_irqrestore(&lockres->l_lock, flags); 883 goto bail; 884 } 885 886 lockres->l_action = OCFS2_AST_ATTACH; 887 lockres->l_requested = level; 888 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 889 spin_unlock_irqrestore(&lockres->l_lock, flags); 890 891 status = dlmlock(osb->dlm, 892 level, 893 &lockres->l_lksb, 894 dlm_flags, 895 lockres->l_name, 896 OCFS2_LOCK_ID_MAX_LEN - 1, 897 ocfs2_locking_ast, 898 lockres, 899 ocfs2_blocking_ast); 900 if (status != DLM_NORMAL) { 901 ocfs2_log_dlm_error("dlmlock", status, lockres); 902 ret = -EINVAL; 903 ocfs2_recover_from_dlm_error(lockres, 1); 904 } 905 906 mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name); 907 908bail: 909 mlog_exit(ret); 910 return ret; 911} 912 913static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, 914 int flag) 915{ 916 unsigned long flags; 917 int ret; 918 919 spin_lock_irqsave(&lockres->l_lock, flags); 920 ret = lockres->l_flags & flag; 921 spin_unlock_irqrestore(&lockres->l_lock, flags); 922 923 return ret; 924} 925 926static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres) 927 928{ 929 wait_event(lockres->l_event, 930 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY)); 931} 932 933static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres) 934 935{ 936 wait_event(lockres->l_event, 937 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING)); 938} 939 940/* predict what lock level we'll be dropping down to on behalf 941 * of another node, and return true if the currently wanted 942 * level will be compatible with it. */ 943static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, 944 int wanted) 945{ 946 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 947 948 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking); 949} 950 951static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw) 952{ 953 INIT_LIST_HEAD(&mw->mw_item); 954 init_completion(&mw->mw_complete); 955} 956 957static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw) 958{ 959 wait_for_completion(&mw->mw_complete); 960 /* Re-arm the completion in case we want to wait on it again */ 961 INIT_COMPLETION(mw->mw_complete); 962 return mw->mw_status; 963} 964 965static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres, 966 struct ocfs2_mask_waiter *mw, 967 unsigned long mask, 968 unsigned long goal) 969{ 970 BUG_ON(!list_empty(&mw->mw_item)); 971 972 assert_spin_locked(&lockres->l_lock); 973 974 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters); 975 mw->mw_mask = mask; 976 mw->mw_goal = goal; 977} 978 979/* returns 0 if the mw that was removed was already satisfied, -EBUSY 980 * if the mask still hadn't reached its goal */ 981static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres, 982 struct ocfs2_mask_waiter *mw) 983{ 984 unsigned long flags; 985 int ret = 0; 986 987 spin_lock_irqsave(&lockres->l_lock, flags); 988 if (!list_empty(&mw->mw_item)) { 989 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal) 990 ret = -EBUSY; 991 992 list_del_init(&mw->mw_item); 993 init_completion(&mw->mw_complete); 994 } 995 spin_unlock_irqrestore(&lockres->l_lock, flags); 996 997 return ret; 998 999} 1000 1001static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw, 1002 struct ocfs2_lock_res *lockres) 1003{ 1004 int ret; 1005 1006 ret = wait_for_completion_interruptible(&mw->mw_complete); 1007 if (ret) 1008 lockres_remove_mask_waiter(lockres, mw); 1009 else 1010 ret = mw->mw_status; 1011 /* Re-arm the completion in case we want to wait on it again */ 1012 INIT_COMPLETION(mw->mw_complete); 1013 return ret; 1014} 1015 1016static int ocfs2_cluster_lock(struct ocfs2_super *osb, 1017 struct ocfs2_lock_res *lockres, 1018 int level, 1019 int lkm_flags, 1020 int arg_flags) 1021{ 1022 struct ocfs2_mask_waiter mw; 1023 enum dlm_status status; 1024 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); 1025 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */ 1026 unsigned long flags; 1027 1028 mlog_entry_void(); 1029 1030 ocfs2_init_mask_waiter(&mw); 1031 1032 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 1033 lkm_flags |= LKM_VALBLK; 1034 1035again: 1036 wait = 0; 1037 1038 if (catch_signals && signal_pending(current)) { 1039 ret = -ERESTARTSYS; 1040 goto out; 1041 } 1042 1043 spin_lock_irqsave(&lockres->l_lock, flags); 1044 1045 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, 1046 "Cluster lock called on freeing lockres %s! flags " 1047 "0x%lx\n", lockres->l_name, lockres->l_flags); 1048 1049 /* We only compare against the currently granted level 1050 * here. If the lock is blocked waiting on a downconvert, 1051 * we'll get caught below. */ 1052 if (lockres->l_flags & OCFS2_LOCK_BUSY && 1053 level > lockres->l_level) { 1054 /* is someone sitting in dlm_lock? If so, wait on 1055 * them. */ 1056 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1057 wait = 1; 1058 goto unlock; 1059 } 1060 1061 if (lockres->l_flags & OCFS2_LOCK_BLOCKED && 1062 !ocfs2_may_continue_on_blocked_lock(lockres, level)) { 1063 /* is the lock is currently blocked on behalf of 1064 * another node */ 1065 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0); 1066 wait = 1; 1067 goto unlock; 1068 } 1069 1070 if (level > lockres->l_level) { 1071 if (lockres->l_action != OCFS2_AST_INVALID) 1072 mlog(ML_ERROR, "lockres %s has action %u pending\n", 1073 lockres->l_name, lockres->l_action); 1074 1075 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1076 lockres->l_action = OCFS2_AST_ATTACH; 1077 lkm_flags &= ~LKM_CONVERT; 1078 } else { 1079 lockres->l_action = OCFS2_AST_CONVERT; 1080 lkm_flags |= LKM_CONVERT; 1081 } 1082 1083 lockres->l_requested = level; 1084 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1085 spin_unlock_irqrestore(&lockres->l_lock, flags); 1086 1087 BUG_ON(level == LKM_IVMODE); 1088 BUG_ON(level == LKM_NLMODE); 1089 1090 mlog(0, "lock %s, convert from %d to level = %d\n", 1091 lockres->l_name, lockres->l_level, level); 1092 1093 /* call dlm_lock to upgrade lock now */ 1094 status = dlmlock(osb->dlm, 1095 level, 1096 &lockres->l_lksb, 1097 lkm_flags, 1098 lockres->l_name, 1099 OCFS2_LOCK_ID_MAX_LEN - 1, 1100 ocfs2_locking_ast, 1101 lockres, 1102 ocfs2_blocking_ast); 1103 if (status != DLM_NORMAL) { 1104 if ((lkm_flags & LKM_NOQUEUE) && 1105 (status == DLM_NOTQUEUED)) 1106 ret = -EAGAIN; 1107 else { 1108 ocfs2_log_dlm_error("dlmlock", status, 1109 lockres); 1110 ret = -EINVAL; 1111 } 1112 ocfs2_recover_from_dlm_error(lockres, 1); 1113 goto out; 1114 } 1115 1116 mlog(0, "lock %s, successfull return from dlmlock\n", 1117 lockres->l_name); 1118 1119 /* At this point we've gone inside the dlm and need to 1120 * complete our work regardless. */ 1121 catch_signals = 0; 1122 1123 /* wait for busy to clear and carry on */ 1124 goto again; 1125 } 1126 1127 /* Ok, if we get here then we're good to go. */ 1128 ocfs2_inc_holders(lockres, level); 1129 1130 ret = 0; 1131unlock: 1132 spin_unlock_irqrestore(&lockres->l_lock, flags); 1133out: 1134 /* 1135 * This is helping work around a lock inversion between the page lock 1136 * and dlm locks. One path holds the page lock while calling aops 1137 * which block acquiring dlm locks. The voting thread holds dlm 1138 * locks while acquiring page locks while down converting data locks. 1139 * This block is helping an aop path notice the inversion and back 1140 * off to unlock its page lock before trying the dlm lock again. 1141 */ 1142 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK && 1143 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) { 1144 wait = 0; 1145 if (lockres_remove_mask_waiter(lockres, &mw)) 1146 ret = -EAGAIN; 1147 else 1148 goto again; 1149 } 1150 if (wait) { 1151 ret = ocfs2_wait_for_mask(&mw); 1152 if (ret == 0) 1153 goto again; 1154 mlog_errno(ret); 1155 } 1156 1157 mlog_exit(ret); 1158 return ret; 1159} 1160 1161static void ocfs2_cluster_unlock(struct ocfs2_super *osb, 1162 struct ocfs2_lock_res *lockres, 1163 int level) 1164{ 1165 unsigned long flags; 1166 1167 mlog_entry_void(); 1168 spin_lock_irqsave(&lockres->l_lock, flags); 1169 ocfs2_dec_holders(lockres, level); 1170 ocfs2_downconvert_on_unlock(osb, lockres); 1171 spin_unlock_irqrestore(&lockres->l_lock, flags); 1172 mlog_exit_void(); 1173} 1174 1175static int ocfs2_create_new_lock(struct ocfs2_super *osb, 1176 struct ocfs2_lock_res *lockres, 1177 int ex, 1178 int local) 1179{ 1180 int level = ex ? LKM_EXMODE : LKM_PRMODE; 1181 unsigned long flags; 1182 int lkm_flags = local ? LKM_LOCAL : 0; 1183 1184 spin_lock_irqsave(&lockres->l_lock, flags); 1185 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED); 1186 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL); 1187 spin_unlock_irqrestore(&lockres->l_lock, flags); 1188 1189 return ocfs2_lock_create(osb, lockres, level, lkm_flags); 1190} 1191 1192/* Grants us an EX lock on the data and metadata resources, skipping 1193 * the normal cluster directory lookup. Use this ONLY on newly created 1194 * inodes which other nodes can't possibly see, and which haven't been 1195 * hashed in the inode hash yet. This can give us a good performance 1196 * increase as it'll skip the network broadcast normally associated 1197 * with creating a new lock resource. */ 1198int ocfs2_create_new_inode_locks(struct inode *inode) 1199{ 1200 int ret; 1201 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1202 1203 BUG_ON(!inode); 1204 BUG_ON(!ocfs2_inode_is_new(inode)); 1205 1206 mlog_entry_void(); 1207 1208 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno); 1209 1210 /* NOTE: That we don't increment any of the holder counts, nor 1211 * do we add anything to a journal handle. Since this is 1212 * supposed to be a new inode which the cluster doesn't know 1213 * about yet, there is no need to. As far as the LVB handling 1214 * is concerned, this is basically like acquiring an EX lock 1215 * on a resource which has an invalid one -- we'll set it 1216 * valid when we release the EX. */ 1217 1218 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1); 1219 if (ret) { 1220 mlog_errno(ret); 1221 goto bail; 1222 } 1223 1224 /* 1225 * We don't want to use LKM_LOCAL on a meta data lock as they 1226 * don't use a generation in their lock names. 1227 */ 1228 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0); 1229 if (ret) { 1230 mlog_errno(ret); 1231 goto bail; 1232 } 1233 1234 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0); 1235 if (ret) { 1236 mlog_errno(ret); 1237 goto bail; 1238 } 1239 1240bail: 1241 mlog_exit(ret); 1242 return ret; 1243} 1244 1245int ocfs2_rw_lock(struct inode *inode, int write) 1246{ 1247 int status, level; 1248 struct ocfs2_lock_res *lockres; 1249 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1250 1251 BUG_ON(!inode); 1252 1253 mlog_entry_void(); 1254 1255 mlog(0, "inode %llu take %s RW lock\n", 1256 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1257 write ? "EXMODE" : "PRMODE"); 1258 1259 if (ocfs2_mount_local(osb)) 1260 return 0; 1261 1262 lockres = &OCFS2_I(inode)->ip_rw_lockres; 1263 1264 level = write ? LKM_EXMODE : LKM_PRMODE; 1265 1266 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0, 1267 0); 1268 if (status < 0) 1269 mlog_errno(status); 1270 1271 mlog_exit(status); 1272 return status; 1273} 1274 1275void ocfs2_rw_unlock(struct inode *inode, int write) 1276{ 1277 int level = write ? LKM_EXMODE : LKM_PRMODE; 1278 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres; 1279 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1280 1281 mlog_entry_void(); 1282 1283 mlog(0, "inode %llu drop %s RW lock\n", 1284 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1285 write ? "EXMODE" : "PRMODE"); 1286 1287 if (!ocfs2_mount_local(osb)) 1288 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 1289 1290 mlog_exit_void(); 1291} 1292 1293/* 1294 * ocfs2_open_lock always get PR mode lock. 1295 */ 1296int ocfs2_open_lock(struct inode *inode) 1297{ 1298 int status = 0; 1299 struct ocfs2_lock_res *lockres; 1300 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1301 1302 BUG_ON(!inode); 1303 1304 mlog_entry_void(); 1305 1306 mlog(0, "inode %llu take PRMODE open lock\n", 1307 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1308 1309 if (ocfs2_mount_local(osb)) 1310 goto out; 1311 1312 lockres = &OCFS2_I(inode)->ip_open_lockres; 1313 1314 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1315 LKM_PRMODE, 0, 0); 1316 if (status < 0) 1317 mlog_errno(status); 1318 1319out: 1320 mlog_exit(status); 1321 return status; 1322} 1323 1324int ocfs2_try_open_lock(struct inode *inode, int write) 1325{ 1326 int status = 0, level; 1327 struct ocfs2_lock_res *lockres; 1328 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1329 1330 BUG_ON(!inode); 1331 1332 mlog_entry_void(); 1333 1334 mlog(0, "inode %llu try to take %s open lock\n", 1335 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1336 write ? "EXMODE" : "PRMODE"); 1337 1338 if (ocfs2_mount_local(osb)) 1339 goto out; 1340 1341 lockres = &OCFS2_I(inode)->ip_open_lockres; 1342 1343 level = write ? LKM_EXMODE : LKM_PRMODE; 1344 1345 /* 1346 * The file system may already holding a PRMODE/EXMODE open lock. 1347 * Since we pass LKM_NOQUEUE, the request won't block waiting on 1348 * other nodes and the -EAGAIN will indicate to the caller that 1349 * this inode is still in use. 1350 */ 1351 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, 1352 level, LKM_NOQUEUE, 0); 1353 1354out: 1355 mlog_exit(status); 1356 return status; 1357} 1358 1359/* 1360 * ocfs2_open_unlock unlock PR and EX mode open locks. 1361 */ 1362void ocfs2_open_unlock(struct inode *inode) 1363{ 1364 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres; 1365 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1366 1367 mlog_entry_void(); 1368 1369 mlog(0, "inode %llu drop open lock\n", 1370 (unsigned long long)OCFS2_I(inode)->ip_blkno); 1371 1372 if (ocfs2_mount_local(osb)) 1373 goto out; 1374 1375 if(lockres->l_ro_holders) 1376 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1377 LKM_PRMODE); 1378 if(lockres->l_ex_holders) 1379 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, 1380 LKM_EXMODE); 1381 1382out: 1383 mlog_exit_void(); 1384} 1385 1386static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres, 1387 int level) 1388{ 1389 int ret; 1390 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); 1391 unsigned long flags; 1392 struct ocfs2_mask_waiter mw; 1393 1394 ocfs2_init_mask_waiter(&mw); 1395 1396retry_cancel: 1397 spin_lock_irqsave(&lockres->l_lock, flags); 1398 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 1399 ret = ocfs2_prepare_cancel_convert(osb, lockres); 1400 if (ret) { 1401 spin_unlock_irqrestore(&lockres->l_lock, flags); 1402 ret = ocfs2_cancel_convert(osb, lockres); 1403 if (ret < 0) { 1404 mlog_errno(ret); 1405 goto out; 1406 } 1407 goto retry_cancel; 1408 } 1409 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1410 spin_unlock_irqrestore(&lockres->l_lock, flags); 1411 1412 ocfs2_wait_for_mask(&mw); 1413 goto retry_cancel; 1414 } 1415 1416 ret = -ERESTARTSYS; 1417 /* 1418 * We may still have gotten the lock, in which case there's no 1419 * point to restarting the syscall. 1420 */ 1421 if (lockres->l_level == level) 1422 ret = 0; 1423 1424 mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret, 1425 lockres->l_flags, lockres->l_level, lockres->l_action); 1426 1427 spin_unlock_irqrestore(&lockres->l_lock, flags); 1428 1429out: 1430 return ret; 1431} 1432 1433/* 1434 * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of 1435 * flock() calls. The locking approach this requires is sufficiently 1436 * different from all other cluster lock types that we implement a 1437 * seperate path to the "low-level" dlm calls. In particular: 1438 * 1439 * - No optimization of lock levels is done - we take at exactly 1440 * what's been requested. 1441 * 1442 * - No lock caching is employed. We immediately downconvert to 1443 * no-lock at unlock time. This also means flock locks never go on 1444 * the blocking list). 1445 * 1446 * - Since userspace can trivially deadlock itself with flock, we make 1447 * sure to allow cancellation of a misbehaving applications flock() 1448 * request. 1449 * 1450 * - Access to any flock lockres doesn't require concurrency, so we 1451 * can simplify the code by requiring the caller to guarantee 1452 * serialization of dlmglue flock calls. 1453 */ 1454int ocfs2_file_lock(struct file *file, int ex, int trylock) 1455{ 1456 int ret, level = ex ? LKM_EXMODE : LKM_PRMODE; 1457 unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0; 1458 unsigned long flags; 1459 struct ocfs2_file_private *fp = file->private_data; 1460 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1461 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1462 struct ocfs2_mask_waiter mw; 1463 1464 ocfs2_init_mask_waiter(&mw); 1465 1466 if ((lockres->l_flags & OCFS2_LOCK_BUSY) || 1467 (lockres->l_level > LKM_NLMODE)) { 1468 mlog(ML_ERROR, 1469 "File lock \"%s\" has busy or locked state: flags: 0x%lx, " 1470 "level: %u\n", lockres->l_name, lockres->l_flags, 1471 lockres->l_level); 1472 return -EINVAL; 1473 } 1474 1475 spin_lock_irqsave(&lockres->l_lock, flags); 1476 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 1477 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1478 spin_unlock_irqrestore(&lockres->l_lock, flags); 1479 1480 /* 1481 * Get the lock at NLMODE to start - that way we 1482 * can cancel the upconvert request if need be. 1483 */ 1484 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); 1485 if (ret < 0) { 1486 mlog_errno(ret); 1487 goto out; 1488 } 1489 1490 ret = ocfs2_wait_for_mask(&mw); 1491 if (ret) { 1492 mlog_errno(ret); 1493 goto out; 1494 } 1495 spin_lock_irqsave(&lockres->l_lock, flags); 1496 } 1497 1498 lockres->l_action = OCFS2_AST_CONVERT; 1499 lkm_flags |= LKM_CONVERT; 1500 lockres->l_requested = level; 1501 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 1502 1503 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1504 spin_unlock_irqrestore(&lockres->l_lock, flags); 1505 1506 ret = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags, 1507 lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, 1508 ocfs2_locking_ast, lockres, ocfs2_blocking_ast); 1509 if (ret != DLM_NORMAL) { 1510 if (trylock && ret == DLM_NOTQUEUED) 1511 ret = -EAGAIN; 1512 else { 1513 ocfs2_log_dlm_error("dlmlock", ret, lockres); 1514 ret = -EINVAL; 1515 } 1516 1517 ocfs2_recover_from_dlm_error(lockres, 1); 1518 lockres_remove_mask_waiter(lockres, &mw); 1519 goto out; 1520 } 1521 1522 ret = ocfs2_wait_for_mask_interruptible(&mw, lockres); 1523 if (ret == -ERESTARTSYS) { 1524 /* 1525 * Userspace can cause deadlock itself with 1526 * flock(). Current behavior locally is to allow the 1527 * deadlock, but abort the system call if a signal is 1528 * received. We follow this example, otherwise a 1529 * poorly written program could sit in kernel until 1530 * reboot. 1531 * 1532 * Handling this is a bit more complicated for Ocfs2 1533 * though. We can't exit this function with an 1534 * outstanding lock request, so a cancel convert is 1535 * required. We intentionally overwrite 'ret' - if the 1536 * cancel fails and the lock was granted, it's easier 1537 * to just bubble sucess back up to the user. 1538 */ 1539 ret = ocfs2_flock_handle_signal(lockres, level); 1540 } 1541 1542out: 1543 1544 mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n", 1545 lockres->l_name, ex, trylock, ret); 1546 return ret; 1547} 1548 1549void ocfs2_file_unlock(struct file *file) 1550{ 1551 int ret; 1552 unsigned long flags; 1553 struct ocfs2_file_private *fp = file->private_data; 1554 struct ocfs2_lock_res *lockres = &fp->fp_flock; 1555 struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb); 1556 struct ocfs2_mask_waiter mw; 1557 1558 ocfs2_init_mask_waiter(&mw); 1559 1560 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) 1561 return; 1562 1563 if (lockres->l_level == LKM_NLMODE) 1564 return; 1565 1566 mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n", 1567 lockres->l_name, lockres->l_flags, lockres->l_level, 1568 lockres->l_action); 1569 1570 spin_lock_irqsave(&lockres->l_lock, flags); 1571 /* 1572 * Fake a blocking ast for the downconvert code. 1573 */ 1574 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); 1575 lockres->l_blocking = LKM_EXMODE; 1576 1577 ocfs2_prepare_downconvert(lockres, LKM_NLMODE); 1578 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0); 1579 spin_unlock_irqrestore(&lockres->l_lock, flags); 1580 1581 ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0); 1582 if (ret) { 1583 mlog_errno(ret); 1584 return; 1585 } 1586 1587 ret = ocfs2_wait_for_mask(&mw); 1588 if (ret) 1589 mlog_errno(ret); 1590} 1591 1592static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb, 1593 struct ocfs2_lock_res *lockres) 1594{ 1595 int kick = 0; 1596 1597 mlog_entry_void(); 1598 1599 /* If we know that another node is waiting on our lock, kick 1600 * the downconvert thread * pre-emptively when we reach a release 1601 * condition. */ 1602 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) { 1603 switch(lockres->l_blocking) { 1604 case LKM_EXMODE: 1605 if (!lockres->l_ex_holders && !lockres->l_ro_holders) 1606 kick = 1; 1607 break; 1608 case LKM_PRMODE: 1609 if (!lockres->l_ex_holders) 1610 kick = 1; 1611 break; 1612 default: 1613 BUG(); 1614 } 1615 } 1616 1617 if (kick) 1618 ocfs2_wake_downconvert_thread(osb); 1619 1620 mlog_exit_void(); 1621} 1622 1623#define OCFS2_SEC_BITS 34 1624#define OCFS2_SEC_SHIFT (64 - 34) 1625#define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1) 1626 1627/* LVB only has room for 64 bits of time here so we pack it for 1628 * now. */ 1629static u64 ocfs2_pack_timespec(struct timespec *spec) 1630{ 1631 u64 res; 1632 u64 sec = spec->tv_sec; 1633 u32 nsec = spec->tv_nsec; 1634 1635 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK); 1636 1637 return res; 1638} 1639 1640/* Call this with the lockres locked. I am reasonably sure we don't 1641 * need ip_lock in this function as anyone who would be changing those 1642 * values is supposed to be blocked in ocfs2_inode_lock right now. */ 1643static void __ocfs2_stuff_meta_lvb(struct inode *inode) 1644{ 1645 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1646 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1647 struct ocfs2_meta_lvb *lvb; 1648 1649 mlog_entry_void(); 1650 1651 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1652 1653 /* 1654 * Invalidate the LVB of a deleted inode - this way other 1655 * nodes are forced to go to disk and discover the new inode 1656 * status. 1657 */ 1658 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1659 lvb->lvb_version = 0; 1660 goto out; 1661 } 1662 1663 lvb->lvb_version = OCFS2_LVB_VERSION; 1664 lvb->lvb_isize = cpu_to_be64(i_size_read(inode)); 1665 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters); 1666 lvb->lvb_iuid = cpu_to_be32(inode->i_uid); 1667 lvb->lvb_igid = cpu_to_be32(inode->i_gid); 1668 lvb->lvb_imode = cpu_to_be16(inode->i_mode); 1669 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink); 1670 lvb->lvb_iatime_packed = 1671 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime)); 1672 lvb->lvb_ictime_packed = 1673 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime)); 1674 lvb->lvb_imtime_packed = 1675 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime)); 1676 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr); 1677 lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features); 1678 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation); 1679 1680out: 1681 mlog_meta_lvb(0, lockres); 1682 1683 mlog_exit_void(); 1684} 1685 1686static void ocfs2_unpack_timespec(struct timespec *spec, 1687 u64 packed_time) 1688{ 1689 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT; 1690 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK; 1691} 1692 1693static void ocfs2_refresh_inode_from_lvb(struct inode *inode) 1694{ 1695 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1696 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1697 struct ocfs2_meta_lvb *lvb; 1698 1699 mlog_entry_void(); 1700 1701 mlog_meta_lvb(0, lockres); 1702 1703 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1704 1705 /* We're safe here without the lockres lock... */ 1706 spin_lock(&oi->ip_lock); 1707 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters); 1708 i_size_write(inode, be64_to_cpu(lvb->lvb_isize)); 1709 1710 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr); 1711 oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures); 1712 ocfs2_set_inode_flags(inode); 1713 1714 /* fast-symlinks are a special case */ 1715 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters) 1716 inode->i_blocks = 0; 1717 else 1718 inode->i_blocks = ocfs2_inode_sector_count(inode); 1719 1720 inode->i_uid = be32_to_cpu(lvb->lvb_iuid); 1721 inode->i_gid = be32_to_cpu(lvb->lvb_igid); 1722 inode->i_mode = be16_to_cpu(lvb->lvb_imode); 1723 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink); 1724 ocfs2_unpack_timespec(&inode->i_atime, 1725 be64_to_cpu(lvb->lvb_iatime_packed)); 1726 ocfs2_unpack_timespec(&inode->i_mtime, 1727 be64_to_cpu(lvb->lvb_imtime_packed)); 1728 ocfs2_unpack_timespec(&inode->i_ctime, 1729 be64_to_cpu(lvb->lvb_ictime_packed)); 1730 spin_unlock(&oi->ip_lock); 1731 1732 mlog_exit_void(); 1733} 1734 1735static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode, 1736 struct ocfs2_lock_res *lockres) 1737{ 1738 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb; 1739 1740 if (lvb->lvb_version == OCFS2_LVB_VERSION 1741 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation) 1742 return 1; 1743 return 0; 1744} 1745 1746/* Determine whether a lock resource needs to be refreshed, and 1747 * arbitrate who gets to refresh it. 1748 * 1749 * 0 means no refresh needed. 1750 * 1751 * > 0 means you need to refresh this and you MUST call 1752 * ocfs2_complete_lock_res_refresh afterwards. */ 1753static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres) 1754{ 1755 unsigned long flags; 1756 int status = 0; 1757 1758 mlog_entry_void(); 1759 1760refresh_check: 1761 spin_lock_irqsave(&lockres->l_lock, flags); 1762 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { 1763 spin_unlock_irqrestore(&lockres->l_lock, flags); 1764 goto bail; 1765 } 1766 1767 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { 1768 spin_unlock_irqrestore(&lockres->l_lock, flags); 1769 1770 ocfs2_wait_on_refreshing_lock(lockres); 1771 goto refresh_check; 1772 } 1773 1774 /* Ok, I'll be the one to refresh this lock. */ 1775 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING); 1776 spin_unlock_irqrestore(&lockres->l_lock, flags); 1777 1778 status = 1; 1779bail: 1780 mlog_exit(status); 1781 return status; 1782} 1783 1784/* If status is non zero, I'll mark it as not being in refresh 1785 * anymroe, but i won't clear the needs refresh flag. */ 1786static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres, 1787 int status) 1788{ 1789 unsigned long flags; 1790 mlog_entry_void(); 1791 1792 spin_lock_irqsave(&lockres->l_lock, flags); 1793 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING); 1794 if (!status) 1795 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); 1796 spin_unlock_irqrestore(&lockres->l_lock, flags); 1797 1798 wake_up(&lockres->l_event); 1799 1800 mlog_exit_void(); 1801} 1802 1803/* may or may not return a bh if it went to disk. */ 1804static int ocfs2_inode_lock_update(struct inode *inode, 1805 struct buffer_head **bh) 1806{ 1807 int status = 0; 1808 struct ocfs2_inode_info *oi = OCFS2_I(inode); 1809 struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres; 1810 struct ocfs2_dinode *fe; 1811 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1812 1813 mlog_entry_void(); 1814 1815 if (ocfs2_mount_local(osb)) 1816 goto bail; 1817 1818 spin_lock(&oi->ip_lock); 1819 if (oi->ip_flags & OCFS2_INODE_DELETED) { 1820 mlog(0, "Orphaned inode %llu was deleted while we " 1821 "were waiting on a lock. ip_flags = 0x%x\n", 1822 (unsigned long long)oi->ip_blkno, oi->ip_flags); 1823 spin_unlock(&oi->ip_lock); 1824 status = -ENOENT; 1825 goto bail; 1826 } 1827 spin_unlock(&oi->ip_lock); 1828 1829 if (!ocfs2_should_refresh_lock_res(lockres)) 1830 goto bail; 1831 1832 /* This will discard any caching information we might have had 1833 * for the inode metadata. */ 1834 ocfs2_metadata_cache_purge(inode); 1835 1836 ocfs2_extent_map_trunc(inode, 0); 1837 1838 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) { 1839 mlog(0, "Trusting LVB on inode %llu\n", 1840 (unsigned long long)oi->ip_blkno); 1841 ocfs2_refresh_inode_from_lvb(inode); 1842 } else { 1843 /* Boo, we have to go to disk. */ 1844 /* read bh, cast, ocfs2_refresh_inode */ 1845 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno, 1846 bh, OCFS2_BH_CACHED, inode); 1847 if (status < 0) { 1848 mlog_errno(status); 1849 goto bail_refresh; 1850 } 1851 fe = (struct ocfs2_dinode *) (*bh)->b_data; 1852 1853 /* This is a good chance to make sure we're not 1854 * locking an invalid object. 1855 * 1856 * We bug on a stale inode here because we checked 1857 * above whether it was wiped from disk. The wiping 1858 * node provides a guarantee that we receive that 1859 * message and can mark the inode before dropping any 1860 * locks associated with it. */ 1861 if (!OCFS2_IS_VALID_DINODE(fe)) { 1862 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe); 1863 status = -EIO; 1864 goto bail_refresh; 1865 } 1866 mlog_bug_on_msg(inode->i_generation != 1867 le32_to_cpu(fe->i_generation), 1868 "Invalid dinode %llu disk generation: %u " 1869 "inode->i_generation: %u\n", 1870 (unsigned long long)oi->ip_blkno, 1871 le32_to_cpu(fe->i_generation), 1872 inode->i_generation); 1873 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) || 1874 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)), 1875 "Stale dinode %llu dtime: %llu flags: 0x%x\n", 1876 (unsigned long long)oi->ip_blkno, 1877 (unsigned long long)le64_to_cpu(fe->i_dtime), 1878 le32_to_cpu(fe->i_flags)); 1879 1880 ocfs2_refresh_inode(inode, fe); 1881 } 1882 1883 status = 0; 1884bail_refresh: 1885 ocfs2_complete_lock_res_refresh(lockres, status); 1886bail: 1887 mlog_exit(status); 1888 return status; 1889} 1890 1891static int ocfs2_assign_bh(struct inode *inode, 1892 struct buffer_head **ret_bh, 1893 struct buffer_head *passed_bh) 1894{ 1895 int status; 1896 1897 if (passed_bh) { 1898 /* Ok, the update went to disk for us, use the 1899 * returned bh. */ 1900 *ret_bh = passed_bh; 1901 get_bh(*ret_bh); 1902 1903 return 0; 1904 } 1905 1906 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), 1907 OCFS2_I(inode)->ip_blkno, 1908 ret_bh, 1909 OCFS2_BH_CACHED, 1910 inode); 1911 if (status < 0) 1912 mlog_errno(status); 1913 1914 return status; 1915} 1916 1917/* 1918 * returns < 0 error if the callback will never be called, otherwise 1919 * the result of the lock will be communicated via the callback. 1920 */ 1921int ocfs2_inode_lock_full(struct inode *inode, 1922 struct buffer_head **ret_bh, 1923 int ex, 1924 int arg_flags) 1925{ 1926 int status, level, dlm_flags, acquired; 1927 struct ocfs2_lock_res *lockres = NULL; 1928 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 1929 struct buffer_head *local_bh = NULL; 1930 1931 BUG_ON(!inode); 1932 1933 mlog_entry_void(); 1934 1935 mlog(0, "inode %llu, take %s META lock\n", 1936 (unsigned long long)OCFS2_I(inode)->ip_blkno, 1937 ex ? "EXMODE" : "PRMODE"); 1938 1939 status = 0; 1940 acquired = 0; 1941 /* We'll allow faking a readonly metadata lock for 1942 * rodevices. */ 1943 if (ocfs2_is_hard_readonly(osb)) { 1944 if (ex) 1945 status = -EROFS; 1946 goto bail; 1947 } 1948 1949 if (ocfs2_mount_local(osb)) 1950 goto local; 1951 1952 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1953 wait_event(osb->recovery_event, 1954 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1955 1956 lockres = &OCFS2_I(inode)->ip_inode_lockres; 1957 level = ex ? LKM_EXMODE : LKM_PRMODE; 1958 dlm_flags = 0; 1959 if (arg_flags & OCFS2_META_LOCK_NOQUEUE) 1960 dlm_flags |= LKM_NOQUEUE; 1961 1962 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags); 1963 if (status < 0) { 1964 if (status != -EAGAIN && status != -EIOCBRETRY) 1965 mlog_errno(status); 1966 goto bail; 1967 } 1968 1969 /* Notify the error cleanup path to drop the cluster lock. */ 1970 acquired = 1; 1971 1972 /* We wait twice because a node may have died while we were in 1973 * the lower dlm layers. The second time though, we've 1974 * committed to owning this lock so we don't allow signals to 1975 * abort the operation. */ 1976 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY)) 1977 wait_event(osb->recovery_event, 1978 ocfs2_node_map_is_empty(osb, &osb->recovery_map)); 1979 1980local: 1981 /* 1982 * We only see this flag if we're being called from 1983 * ocfs2_read_locked_inode(). It means we're locking an inode 1984 * which hasn't been populated yet, so clear the refresh flag 1985 * and let the caller handle it. 1986 */ 1987 if (inode->i_state & I_NEW) { 1988 status = 0; 1989 if (lockres) 1990 ocfs2_complete_lock_res_refresh(lockres, 0); 1991 goto bail; 1992 } 1993 1994 /* This is fun. The caller may want a bh back, or it may 1995 * not. ocfs2_inode_lock_update definitely wants one in, but 1996 * may or may not read one, depending on what's in the 1997 * LVB. The result of all of this is that we've *only* gone to 1998 * disk if we have to, so the complexity is worthwhile. */ 1999 status = ocfs2_inode_lock_update(inode, &local_bh); 2000 if (status < 0) { 2001 if (status != -ENOENT) 2002 mlog_errno(status); 2003 goto bail; 2004 } 2005 2006 if (ret_bh) { 2007 status = ocfs2_assign_bh(inode, ret_bh, local_bh); 2008 if (status < 0) { 2009 mlog_errno(status); 2010 goto bail; 2011 } 2012 } 2013 2014bail: 2015 if (status < 0) { 2016 if (ret_bh && (*ret_bh)) { 2017 brelse(*ret_bh); 2018 *ret_bh = NULL; 2019 } 2020 if (acquired) 2021 ocfs2_inode_unlock(inode, ex); 2022 } 2023 2024 if (local_bh) 2025 brelse(local_bh); 2026 2027 mlog_exit(status); 2028 return status; 2029} 2030 2031/* 2032 * This is working around a lock inversion between tasks acquiring DLM 2033 * locks while holding a page lock and the downconvert thread which 2034 * blocks dlm lock acquiry while acquiring page locks. 2035 * 2036 * ** These _with_page variantes are only intended to be called from aop 2037 * methods that hold page locks and return a very specific *positive* error 2038 * code that aop methods pass up to the VFS -- test for errors with != 0. ** 2039 * 2040 * The DLM is called such that it returns -EAGAIN if it would have 2041 * blocked waiting for the downconvert thread. In that case we unlock 2042 * our page so the downconvert thread can make progress. Once we've 2043 * done this we have to return AOP_TRUNCATED_PAGE so the aop method 2044 * that called us can bubble that back up into the VFS who will then 2045 * immediately retry the aop call. 2046 * 2047 * We do a blocking lock and immediate unlock before returning, though, so that 2048 * the lock has a great chance of being cached on this node by the time the VFS 2049 * calls back to retry the aop. This has a potential to livelock as nodes 2050 * ping locks back and forth, but that's a risk we're willing to take to avoid 2051 * the lock inversion simply. 2052 */ 2053int ocfs2_inode_lock_with_page(struct inode *inode, 2054 struct buffer_head **ret_bh, 2055 int ex, 2056 struct page *page) 2057{ 2058 int ret; 2059 2060 ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK); 2061 if (ret == -EAGAIN) { 2062 unlock_page(page); 2063 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0) 2064 ocfs2_inode_unlock(inode, ex); 2065 ret = AOP_TRUNCATED_PAGE; 2066 } 2067 2068 return ret; 2069} 2070 2071int ocfs2_inode_lock_atime(struct inode *inode, 2072 struct vfsmount *vfsmnt, 2073 int *level) 2074{ 2075 int ret; 2076 2077 mlog_entry_void(); 2078 ret = ocfs2_inode_lock(inode, NULL, 0); 2079 if (ret < 0) { 2080 mlog_errno(ret); 2081 return ret; 2082 } 2083 2084 /* 2085 * If we should update atime, we will get EX lock, 2086 * otherwise we just get PR lock. 2087 */ 2088 if (ocfs2_should_update_atime(inode, vfsmnt)) { 2089 struct buffer_head *bh = NULL; 2090 2091 ocfs2_inode_unlock(inode, 0); 2092 ret = ocfs2_inode_lock(inode, &bh, 1); 2093 if (ret < 0) { 2094 mlog_errno(ret); 2095 return ret; 2096 } 2097 *level = 1; 2098 if (ocfs2_should_update_atime(inode, vfsmnt)) 2099 ocfs2_update_inode_atime(inode, bh); 2100 if (bh) 2101 brelse(bh); 2102 } else 2103 *level = 0; 2104 2105 mlog_exit(ret); 2106 return ret; 2107} 2108 2109void ocfs2_inode_unlock(struct inode *inode, 2110 int ex) 2111{ 2112 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2113 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres; 2114 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); 2115 2116 mlog_entry_void(); 2117 2118 mlog(0, "inode %llu drop %s META lock\n", 2119 (unsigned long long)OCFS2_I(inode)->ip_blkno, 2120 ex ? "EXMODE" : "PRMODE"); 2121 2122 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) && 2123 !ocfs2_mount_local(osb)) 2124 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level); 2125 2126 mlog_exit_void(); 2127} 2128 2129int ocfs2_super_lock(struct ocfs2_super *osb, 2130 int ex) 2131{ 2132 int status = 0; 2133 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2134 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2135 struct buffer_head *bh; 2136 struct ocfs2_slot_info *si = osb->slot_info; 2137 2138 mlog_entry_void(); 2139 2140 if (ocfs2_is_hard_readonly(osb)) 2141 return -EROFS; 2142 2143 if (ocfs2_mount_local(osb)) 2144 goto bail; 2145 2146 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0); 2147 if (status < 0) { 2148 mlog_errno(status); 2149 goto bail; 2150 } 2151 2152 /* The super block lock path is really in the best position to 2153 * know when resources covered by the lock need to be 2154 * refreshed, so we do it here. Of course, making sense of 2155 * everything is up to the caller :) */ 2156 status = ocfs2_should_refresh_lock_res(lockres); 2157 if (status < 0) { 2158 mlog_errno(status); 2159 goto bail; 2160 } 2161 if (status) { 2162 bh = si->si_bh; 2163 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0, 2164 si->si_inode); 2165 if (status == 0) 2166 ocfs2_update_slot_info(si); 2167 2168 ocfs2_complete_lock_res_refresh(lockres, status); 2169 2170 if (status < 0) 2171 mlog_errno(status); 2172 } 2173bail: 2174 mlog_exit(status); 2175 return status; 2176} 2177 2178void ocfs2_super_unlock(struct ocfs2_super *osb, 2179 int ex) 2180{ 2181 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2182 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres; 2183 2184 if (!ocfs2_mount_local(osb)) 2185 ocfs2_cluster_unlock(osb, lockres, level); 2186} 2187 2188int ocfs2_rename_lock(struct ocfs2_super *osb) 2189{ 2190 int status; 2191 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2192 2193 if (ocfs2_is_hard_readonly(osb)) 2194 return -EROFS; 2195 2196 if (ocfs2_mount_local(osb)) 2197 return 0; 2198 2199 status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0); 2200 if (status < 0) 2201 mlog_errno(status); 2202 2203 return status; 2204} 2205 2206void ocfs2_rename_unlock(struct ocfs2_super *osb) 2207{ 2208 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres; 2209 2210 if (!ocfs2_mount_local(osb)) 2211 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE); 2212} 2213 2214int ocfs2_dentry_lock(struct dentry *dentry, int ex) 2215{ 2216 int ret; 2217 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2218 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2219 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2220 2221 BUG_ON(!dl); 2222 2223 if (ocfs2_is_hard_readonly(osb)) 2224 return -EROFS; 2225 2226 if (ocfs2_mount_local(osb)) 2227 return 0; 2228 2229 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0); 2230 if (ret < 0) 2231 mlog_errno(ret); 2232 2233 return ret; 2234} 2235 2236void ocfs2_dentry_unlock(struct dentry *dentry, int ex) 2237{ 2238 int level = ex ? LKM_EXMODE : LKM_PRMODE; 2239 struct ocfs2_dentry_lock *dl = dentry->d_fsdata; 2240 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb); 2241 2242 if (!ocfs2_mount_local(osb)) 2243 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level); 2244} 2245 2246/* Reference counting of the dlm debug structure. We want this because 2247 * open references on the debug inodes can live on after a mount, so 2248 * we can't rely on the ocfs2_super to always exist. */ 2249static void ocfs2_dlm_debug_free(struct kref *kref) 2250{ 2251 struct ocfs2_dlm_debug *dlm_debug; 2252 2253 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt); 2254 2255 kfree(dlm_debug); 2256} 2257 2258void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug) 2259{ 2260 if (dlm_debug) 2261 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free); 2262} 2263 2264static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug) 2265{ 2266 kref_get(&debug->d_refcnt); 2267} 2268 2269struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void) 2270{ 2271 struct ocfs2_dlm_debug *dlm_debug; 2272 2273 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL); 2274 if (!dlm_debug) { 2275 mlog_errno(-ENOMEM); 2276 goto out; 2277 } 2278 2279 kref_init(&dlm_debug->d_refcnt); 2280 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking); 2281 dlm_debug->d_locking_state = NULL; 2282out: 2283 return dlm_debug; 2284} 2285 2286/* Access to this is arbitrated for us via seq_file->sem. */ 2287struct ocfs2_dlm_seq_priv { 2288 struct ocfs2_dlm_debug *p_dlm_debug; 2289 struct ocfs2_lock_res p_iter_res; 2290 struct ocfs2_lock_res p_tmp_res; 2291}; 2292 2293static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start, 2294 struct ocfs2_dlm_seq_priv *priv) 2295{ 2296 struct ocfs2_lock_res *iter, *ret = NULL; 2297 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug; 2298 2299 assert_spin_locked(&ocfs2_dlm_tracking_lock); 2300 2301 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) { 2302 /* discover the head of the list */ 2303 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) { 2304 mlog(0, "End of list found, %p\n", ret); 2305 break; 2306 } 2307 2308 /* We track our "dummy" iteration lockres' by a NULL 2309 * l_ops field. */ 2310 if (iter->l_ops != NULL) { 2311 ret = iter; 2312 break; 2313 } 2314 } 2315 2316 return ret; 2317} 2318 2319static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos) 2320{ 2321 struct ocfs2_dlm_seq_priv *priv = m->private; 2322 struct ocfs2_lock_res *iter; 2323 2324 spin_lock(&ocfs2_dlm_tracking_lock); 2325 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv); 2326 if (iter) { 2327 /* Since lockres' have the lifetime of their container 2328 * (which can be inodes, ocfs2_supers, etc) we want to 2329 * copy this out to a temporary lockres while still 2330 * under the spinlock. Obviously after this we can't 2331 * trust any pointers on the copy returned, but that's 2332 * ok as the information we want isn't typically held 2333 * in them. */ 2334 priv->p_tmp_res = *iter; 2335 iter = &priv->p_tmp_res; 2336 } 2337 spin_unlock(&ocfs2_dlm_tracking_lock); 2338 2339 return iter; 2340} 2341 2342static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v) 2343{ 2344} 2345 2346static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos) 2347{ 2348 struct ocfs2_dlm_seq_priv *priv = m->private; 2349 struct ocfs2_lock_res *iter = v; 2350 struct ocfs2_lock_res *dummy = &priv->p_iter_res; 2351 2352 spin_lock(&ocfs2_dlm_tracking_lock); 2353 iter = ocfs2_dlm_next_res(iter, priv); 2354 list_del_init(&dummy->l_debug_list); 2355 if (iter) { 2356 list_add(&dummy->l_debug_list, &iter->l_debug_list); 2357 priv->p_tmp_res = *iter; 2358 iter = &priv->p_tmp_res; 2359 } 2360 spin_unlock(&ocfs2_dlm_tracking_lock); 2361 2362 return iter; 2363} 2364 2365/* So that debugfs.ocfs2 can determine which format is being used */ 2366#define OCFS2_DLM_DEBUG_STR_VERSION 1 2367static int ocfs2_dlm_seq_show(struct seq_file *m, void *v) 2368{ 2369 int i; 2370 char *lvb; 2371 struct ocfs2_lock_res *lockres = v; 2372 2373 if (!lockres) 2374 return -EINVAL; 2375 2376 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION); 2377 2378 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY) 2379 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1, 2380 lockres->l_name, 2381 (unsigned int)ocfs2_get_dentry_lock_ino(lockres)); 2382 else 2383 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name); 2384 2385 seq_printf(m, "%d\t" 2386 "0x%lx\t" 2387 "0x%x\t" 2388 "0x%x\t" 2389 "%u\t" 2390 "%u\t" 2391 "%d\t" 2392 "%d\t", 2393 lockres->l_level, 2394 lockres->l_flags, 2395 lockres->l_action, 2396 lockres->l_unlock_action, 2397 lockres->l_ro_holders, 2398 lockres->l_ex_holders, 2399 lockres->l_requested, 2400 lockres->l_blocking); 2401 2402 /* Dump the raw LVB */ 2403 lvb = lockres->l_lksb.lvb; 2404 for(i = 0; i < DLM_LVB_LEN; i++) 2405 seq_printf(m, "0x%x\t", lvb[i]); 2406 2407 /* End the line */ 2408 seq_printf(m, "\n"); 2409 return 0; 2410} 2411 2412static struct seq_operations ocfs2_dlm_seq_ops = { 2413 .start = ocfs2_dlm_seq_start, 2414 .stop = ocfs2_dlm_seq_stop, 2415 .next = ocfs2_dlm_seq_next, 2416 .show = ocfs2_dlm_seq_show, 2417}; 2418 2419static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file) 2420{ 2421 struct seq_file *seq = (struct seq_file *) file->private_data; 2422 struct ocfs2_dlm_seq_priv *priv = seq->private; 2423 struct ocfs2_lock_res *res = &priv->p_iter_res; 2424 2425 ocfs2_remove_lockres_tracking(res); 2426 ocfs2_put_dlm_debug(priv->p_dlm_debug); 2427 return seq_release_private(inode, file); 2428} 2429 2430static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file) 2431{ 2432 int ret; 2433 struct ocfs2_dlm_seq_priv *priv; 2434 struct seq_file *seq; 2435 struct ocfs2_super *osb; 2436 2437 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL); 2438 if (!priv) { 2439 ret = -ENOMEM; 2440 mlog_errno(ret); 2441 goto out; 2442 } 2443 osb = inode->i_private; 2444 ocfs2_get_dlm_debug(osb->osb_dlm_debug); 2445 priv->p_dlm_debug = osb->osb_dlm_debug; 2446 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list); 2447 2448 ret = seq_open(file, &ocfs2_dlm_seq_ops); 2449 if (ret) { 2450 kfree(priv); 2451 mlog_errno(ret); 2452 goto out; 2453 } 2454 2455 seq = (struct seq_file *) file->private_data; 2456 seq->private = priv; 2457 2458 ocfs2_add_lockres_tracking(&priv->p_iter_res, 2459 priv->p_dlm_debug); 2460 2461out: 2462 return ret; 2463} 2464 2465static const struct file_operations ocfs2_dlm_debug_fops = { 2466 .open = ocfs2_dlm_debug_open, 2467 .release = ocfs2_dlm_debug_release, 2468 .read = seq_read, 2469 .llseek = seq_lseek, 2470}; 2471 2472static int ocfs2_dlm_init_debug(struct ocfs2_super *osb) 2473{ 2474 int ret = 0; 2475 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2476 2477 dlm_debug->d_locking_state = debugfs_create_file("locking_state", 2478 S_IFREG|S_IRUSR, 2479 osb->osb_debug_root, 2480 osb, 2481 &ocfs2_dlm_debug_fops); 2482 if (!dlm_debug->d_locking_state) { 2483 ret = -EINVAL; 2484 mlog(ML_ERROR, 2485 "Unable to create locking state debugfs file.\n"); 2486 goto out; 2487 } 2488 2489 ocfs2_get_dlm_debug(dlm_debug); 2490out: 2491 return ret; 2492} 2493 2494static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb) 2495{ 2496 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug; 2497 2498 if (dlm_debug) { 2499 debugfs_remove(dlm_debug->d_locking_state); 2500 ocfs2_put_dlm_debug(dlm_debug); 2501 } 2502} 2503 2504int ocfs2_dlm_init(struct ocfs2_super *osb) 2505{ 2506 int status = 0; 2507 u32 dlm_key; 2508 struct dlm_ctxt *dlm = NULL; 2509 2510 mlog_entry_void(); 2511 2512 if (ocfs2_mount_local(osb)) 2513 goto local; 2514 2515 status = ocfs2_dlm_init_debug(osb); 2516 if (status < 0) { 2517 mlog_errno(status); 2518 goto bail; 2519 } 2520 2521 /* launch downconvert thread */ 2522 osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc"); 2523 if (IS_ERR(osb->dc_task)) { 2524 status = PTR_ERR(osb->dc_task); 2525 osb->dc_task = NULL; 2526 mlog_errno(status); 2527 goto bail; 2528 } 2529 2530 /* used by the dlm code to make message headers unique, each 2531 * node in this domain must agree on this. */ 2532 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str)); 2533 2534 /* for now, uuid == domain */ 2535 dlm = dlm_register_domain(osb->uuid_str, dlm_key, 2536 &osb->osb_locking_proto); 2537 if (IS_ERR(dlm)) { 2538 status = PTR_ERR(dlm); 2539 mlog_errno(status); 2540 goto bail; 2541 } 2542 2543 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb); 2544 2545local: 2546 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); 2547 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); 2548 2549 osb->dlm = dlm; 2550 2551 status = 0; 2552bail: 2553 if (status < 0) { 2554 ocfs2_dlm_shutdown_debug(osb); 2555 if (osb->dc_task) 2556 kthread_stop(osb->dc_task); 2557 } 2558 2559 mlog_exit(status); 2560 return status; 2561} 2562 2563void ocfs2_dlm_shutdown(struct ocfs2_super *osb) 2564{ 2565 mlog_entry_void(); 2566 2567 dlm_unregister_eviction_cb(&osb->osb_eviction_cb); 2568 2569 ocfs2_drop_osb_locks(osb); 2570 2571 if (osb->dc_task) { 2572 kthread_stop(osb->dc_task); 2573 osb->dc_task = NULL; 2574 } 2575 2576 ocfs2_lock_res_free(&osb->osb_super_lockres); 2577 ocfs2_lock_res_free(&osb->osb_rename_lockres); 2578 2579 dlm_unregister_domain(osb->dlm); 2580 osb->dlm = NULL; 2581 2582 ocfs2_dlm_shutdown_debug(osb); 2583 2584 mlog_exit_void(); 2585} 2586 2587static void ocfs2_unlock_ast(void *opaque, enum dlm_status status) 2588{ 2589 struct ocfs2_lock_res *lockres = opaque; 2590 unsigned long flags; 2591 2592 mlog_entry_void(); 2593 2594 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, 2595 lockres->l_unlock_action); 2596 2597 spin_lock_irqsave(&lockres->l_lock, flags); 2598 /* We tried to cancel a convert request, but it was already 2599 * granted. All we want to do here is clear our unlock 2600 * state. The wake_up call done at the bottom is redundant 2601 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't 2602 * hurt anything anyway */ 2603 if (status == DLM_CANCELGRANT && 2604 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2605 mlog(0, "Got cancelgrant for %s\n", lockres->l_name); 2606 2607 /* We don't clear the busy flag in this case as it 2608 * should have been cleared by the ast which the dlm 2609 * has called. */ 2610 goto complete_unlock; 2611 } 2612 2613 if (status != DLM_NORMAL) { 2614 mlog(ML_ERROR, "Dlm passes status %d for lock %s, " 2615 "unlock_action %d\n", status, lockres->l_name, 2616 lockres->l_unlock_action); 2617 spin_unlock_irqrestore(&lockres->l_lock, flags); 2618 return; 2619 } 2620 2621 switch(lockres->l_unlock_action) { 2622 case OCFS2_UNLOCK_CANCEL_CONVERT: 2623 mlog(0, "Cancel convert success for %s\n", lockres->l_name); 2624 lockres->l_action = OCFS2_AST_INVALID; 2625 break; 2626 case OCFS2_UNLOCK_DROP_LOCK: 2627 lockres->l_level = LKM_IVMODE; 2628 break; 2629 default: 2630 BUG(); 2631 } 2632 2633 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); 2634complete_unlock: 2635 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; 2636 spin_unlock_irqrestore(&lockres->l_lock, flags); 2637 2638 wake_up(&lockres->l_event); 2639 2640 mlog_exit_void(); 2641} 2642 2643static int ocfs2_drop_lock(struct ocfs2_super *osb, 2644 struct ocfs2_lock_res *lockres) 2645{ 2646 enum dlm_status status; 2647 unsigned long flags; 2648 int lkm_flags = 0; 2649 2650 /* We didn't get anywhere near actually using this lockres. */ 2651 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) 2652 goto out; 2653 2654 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) 2655 lkm_flags |= LKM_VALBLK; 2656 2657 spin_lock_irqsave(&lockres->l_lock, flags); 2658 2659 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), 2660 "lockres %s, flags 0x%lx\n", 2661 lockres->l_name, lockres->l_flags); 2662 2663 while (lockres->l_flags & OCFS2_LOCK_BUSY) { 2664 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " 2665 "%u, unlock_action = %u\n", 2666 lockres->l_name, lockres->l_flags, lockres->l_action, 2667 lockres->l_unlock_action); 2668 2669 spin_unlock_irqrestore(&lockres->l_lock, flags); 2670 2671 /* XXX: Today we just wait on any busy 2672 * locks... Perhaps we need to cancel converts in the 2673 * future? */ 2674 ocfs2_wait_on_busy_lock(lockres); 2675 2676 spin_lock_irqsave(&lockres->l_lock, flags); 2677 } 2678 2679 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 2680 if (lockres->l_flags & OCFS2_LOCK_ATTACHED && 2681 lockres->l_level == LKM_EXMODE && 2682 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 2683 lockres->l_ops->set_lvb(lockres); 2684 } 2685 2686 if (lockres->l_flags & OCFS2_LOCK_BUSY) 2687 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", 2688 lockres->l_name); 2689 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) 2690 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); 2691 2692 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { 2693 spin_unlock_irqrestore(&lockres->l_lock, flags); 2694 goto out; 2695 } 2696 2697 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); 2698 2699 /* make sure we never get here while waiting for an ast to 2700 * fire. */ 2701 BUG_ON(lockres->l_action != OCFS2_AST_INVALID); 2702 2703 /* is this necessary? */ 2704 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2705 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; 2706 spin_unlock_irqrestore(&lockres->l_lock, flags); 2707 2708 mlog(0, "lock %s\n", lockres->l_name); 2709 2710 status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags, 2711 ocfs2_unlock_ast, lockres); 2712 if (status != DLM_NORMAL) { 2713 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2714 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); 2715 dlm_print_one_lock(lockres->l_lksb.lockid); 2716 BUG(); 2717 } 2718 mlog(0, "lock %s, successfull return from dlmunlock\n", 2719 lockres->l_name); 2720 2721 ocfs2_wait_on_busy_lock(lockres); 2722out: 2723 mlog_exit(0); 2724 return 0; 2725} 2726 2727/* Mark the lockres as being dropped. It will no longer be 2728 * queued if blocking, but we still may have to wait on it 2729 * being dequeued from the downconvert thread before we can consider 2730 * it safe to drop. 2731 * 2732 * You can *not* attempt to call cluster_lock on this lockres anymore. */ 2733void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres) 2734{ 2735 int status; 2736 struct ocfs2_mask_waiter mw; 2737 unsigned long flags; 2738 2739 ocfs2_init_mask_waiter(&mw); 2740 2741 spin_lock_irqsave(&lockres->l_lock, flags); 2742 lockres->l_flags |= OCFS2_LOCK_FREEING; 2743 while (lockres->l_flags & OCFS2_LOCK_QUEUED) { 2744 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0); 2745 spin_unlock_irqrestore(&lockres->l_lock, flags); 2746 2747 mlog(0, "Waiting on lockres %s\n", lockres->l_name); 2748 2749 status = ocfs2_wait_for_mask(&mw); 2750 if (status) 2751 mlog_errno(status); 2752 2753 spin_lock_irqsave(&lockres->l_lock, flags); 2754 } 2755 spin_unlock_irqrestore(&lockres->l_lock, flags); 2756} 2757 2758void ocfs2_simple_drop_lockres(struct ocfs2_super *osb, 2759 struct ocfs2_lock_res *lockres) 2760{ 2761 int ret; 2762 2763 ocfs2_mark_lockres_freeing(lockres); 2764 ret = ocfs2_drop_lock(osb, lockres); 2765 if (ret) 2766 mlog_errno(ret); 2767} 2768 2769static void ocfs2_drop_osb_locks(struct ocfs2_super *osb) 2770{ 2771 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres); 2772 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres); 2773} 2774 2775int ocfs2_drop_inode_locks(struct inode *inode) 2776{ 2777 int status, err; 2778 2779 mlog_entry_void(); 2780 2781 /* No need to call ocfs2_mark_lockres_freeing here - 2782 * ocfs2_clear_inode has done it for us. */ 2783 2784 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2785 &OCFS2_I(inode)->ip_open_lockres); 2786 if (err < 0) 2787 mlog_errno(err); 2788 2789 status = err; 2790 2791 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2792 &OCFS2_I(inode)->ip_inode_lockres); 2793 if (err < 0) 2794 mlog_errno(err); 2795 if (err < 0 && !status) 2796 status = err; 2797 2798 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), 2799 &OCFS2_I(inode)->ip_rw_lockres); 2800 if (err < 0) 2801 mlog_errno(err); 2802 if (err < 0 && !status) 2803 status = err; 2804 2805 mlog_exit(status); 2806 return status; 2807} 2808 2809static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, 2810 int new_level) 2811{ 2812 assert_spin_locked(&lockres->l_lock); 2813 2814 BUG_ON(lockres->l_blocking <= LKM_NLMODE); 2815 2816 if (lockres->l_level <= new_level) { 2817 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n", 2818 lockres->l_level, new_level); 2819 BUG(); 2820 } 2821 2822 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", 2823 lockres->l_name, new_level, lockres->l_blocking); 2824 2825 lockres->l_action = OCFS2_AST_DOWNCONVERT; 2826 lockres->l_requested = new_level; 2827 lockres_or_flags(lockres, OCFS2_LOCK_BUSY); 2828} 2829 2830static int ocfs2_downconvert_lock(struct ocfs2_super *osb, 2831 struct ocfs2_lock_res *lockres, 2832 int new_level, 2833 int lvb) 2834{ 2835 int ret, dlm_flags = LKM_CONVERT; 2836 enum dlm_status status; 2837 2838 mlog_entry_void(); 2839 2840 if (lvb) 2841 dlm_flags |= LKM_VALBLK; 2842 2843 status = dlmlock(osb->dlm, 2844 new_level, 2845 &lockres->l_lksb, 2846 dlm_flags, 2847 lockres->l_name, 2848 OCFS2_LOCK_ID_MAX_LEN - 1, 2849 ocfs2_locking_ast, 2850 lockres, 2851 ocfs2_blocking_ast); 2852 if (status != DLM_NORMAL) { 2853 ocfs2_log_dlm_error("dlmlock", status, lockres); 2854 ret = -EINVAL; 2855 ocfs2_recover_from_dlm_error(lockres, 1); 2856 goto bail; 2857 } 2858 2859 ret = 0; 2860bail: 2861 mlog_exit(ret); 2862 return ret; 2863} 2864 2865/* returns 1 when the caller should unlock and call dlmunlock */ 2866static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, 2867 struct ocfs2_lock_res *lockres) 2868{ 2869 assert_spin_locked(&lockres->l_lock); 2870 2871 mlog_entry_void(); 2872 mlog(0, "lock %s\n", lockres->l_name); 2873 2874 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { 2875 /* If we're already trying to cancel a lock conversion 2876 * then just drop the spinlock and allow the caller to 2877 * requeue this lock. */ 2878 2879 mlog(0, "Lockres %s, skip convert\n", lockres->l_name); 2880 return 0; 2881 } 2882 2883 /* were we in a convert when we got the bast fire? */ 2884 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && 2885 lockres->l_action != OCFS2_AST_DOWNCONVERT); 2886 /* set things up for the unlockast to know to just 2887 * clear out the ast_action and unset busy, etc. */ 2888 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; 2889 2890 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), 2891 "lock %s, invalid flags: 0x%lx\n", 2892 lockres->l_name, lockres->l_flags); 2893 2894 return 1; 2895} 2896 2897static int ocfs2_cancel_convert(struct ocfs2_super *osb, 2898 struct ocfs2_lock_res *lockres) 2899{ 2900 int ret; 2901 enum dlm_status status; 2902 2903 mlog_entry_void(); 2904 mlog(0, "lock %s\n", lockres->l_name); 2905 2906 ret = 0; 2907 status = dlmunlock(osb->dlm, 2908 &lockres->l_lksb, 2909 LKM_CANCEL, 2910 ocfs2_unlock_ast, 2911 lockres); 2912 if (status != DLM_NORMAL) { 2913 ocfs2_log_dlm_error("dlmunlock", status, lockres); 2914 ret = -EINVAL; 2915 ocfs2_recover_from_dlm_error(lockres, 0); 2916 } 2917 2918 mlog(0, "lock %s return from dlmunlock\n", lockres->l_name); 2919 2920 mlog_exit(ret); 2921 return ret; 2922} 2923 2924static int ocfs2_unblock_lock(struct ocfs2_super *osb, 2925 struct ocfs2_lock_res *lockres, 2926 struct ocfs2_unblock_ctl *ctl) 2927{ 2928 unsigned long flags; 2929 int blocking; 2930 int new_level; 2931 int ret = 0; 2932 int set_lvb = 0; 2933 2934 mlog_entry_void(); 2935 2936 spin_lock_irqsave(&lockres->l_lock, flags); 2937 2938 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); 2939 2940recheck: 2941 if (lockres->l_flags & OCFS2_LOCK_BUSY) { 2942 ctl->requeue = 1; 2943 ret = ocfs2_prepare_cancel_convert(osb, lockres); 2944 spin_unlock_irqrestore(&lockres->l_lock, flags); 2945 if (ret) { 2946 ret = ocfs2_cancel_convert(osb, lockres); 2947 if (ret < 0) 2948 mlog_errno(ret); 2949 } 2950 goto leave; 2951 } 2952 2953 /* if we're blocking an exclusive and we have *any* holders, 2954 * then requeue. */ 2955 if ((lockres->l_blocking == LKM_EXMODE) 2956 && (lockres->l_ex_holders || lockres->l_ro_holders)) 2957 goto leave_requeue; 2958 2959 /* If it's a PR we're blocking, then only 2960 * requeue if we've got any EX holders */ 2961 if (lockres->l_blocking == LKM_PRMODE && 2962 lockres->l_ex_holders) 2963 goto leave_requeue; 2964 2965 /* 2966 * Can we get a lock in this state if the holder counts are 2967 * zero? The meta data unblock code used to check this. 2968 */ 2969 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) 2970 && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) 2971 goto leave_requeue; 2972 2973 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); 2974 2975 if (lockres->l_ops->check_downconvert 2976 && !lockres->l_ops->check_downconvert(lockres, new_level)) 2977 goto leave_requeue; 2978 2979 /* If we get here, then we know that there are no more 2980 * incompatible holders (and anyone asking for an incompatible 2981 * lock is blocked). We can now downconvert the lock */ 2982 if (!lockres->l_ops->downconvert_worker) 2983 goto downconvert; 2984 2985 /* Some lockres types want to do a bit of work before 2986 * downconverting a lock. Allow that here. The worker function 2987 * may sleep, so we save off a copy of what we're blocking as 2988 * it may change while we're not holding the spin lock. */ 2989 blocking = lockres->l_blocking; 2990 spin_unlock_irqrestore(&lockres->l_lock, flags); 2991 2992 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); 2993 2994 if (ctl->unblock_action == UNBLOCK_STOP_POST) 2995 goto leave; 2996 2997 spin_lock_irqsave(&lockres->l_lock, flags); 2998 if (blocking != lockres->l_blocking) { 2999 /* If this changed underneath us, then we can't drop 3000 * it just yet. */ 3001 goto recheck; 3002 } 3003 3004downconvert: 3005 ctl->requeue = 0; 3006 3007 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) { 3008 if (lockres->l_level == LKM_EXMODE) 3009 set_lvb = 1; 3010 3011 /* 3012 * We only set the lvb if the lock has been fully 3013 * refreshed - otherwise we risk setting stale 3014 * data. Otherwise, there's no need to actually clear 3015 * out the lvb here as it's value is still valid. 3016 */ 3017 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) 3018 lockres->l_ops->set_lvb(lockres); 3019 } 3020 3021 ocfs2_prepare_downconvert(lockres, new_level); 3022 spin_unlock_irqrestore(&lockres->l_lock, flags); 3023 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb); 3024leave: 3025 mlog_exit(ret); 3026 return ret; 3027 3028leave_requeue: 3029 spin_unlock_irqrestore(&lockres->l_lock, flags); 3030 ctl->requeue = 1; 3031 3032 mlog_exit(0); 3033 return 0; 3034} 3035 3036static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres, 3037 int blocking) 3038{ 3039 struct inode *inode; 3040 struct address_space *mapping; 3041 3042 inode = ocfs2_lock_res_inode(lockres); 3043 mapping = inode->i_mapping; 3044 3045 if (S_ISREG(inode->i_mode)) 3046 goto out; 3047 3048 /* 3049 * We need this before the filemap_fdatawrite() so that it can 3050 * transfer the dirty bit from the PTE to the 3051 * page. Unfortunately this means that even for EX->PR 3052 * downconverts, we'll lose our mappings and have to build 3053 * them up again. 3054 */ 3055 unmap_mapping_range(mapping, 0, 0, 0); 3056 3057 if (filemap_fdatawrite(mapping)) { 3058 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!", 3059 (unsigned long long)OCFS2_I(inode)->ip_blkno); 3060 } 3061 sync_mapping_buffers(mapping); 3062 if (blocking == LKM_EXMODE) { 3063 truncate_inode_pages(mapping, 0); 3064 } else { 3065 /* We only need to wait on the I/O if we're not also 3066 * truncating pages because truncate_inode_pages waits 3067 * for us above. We don't truncate pages if we're 3068 * blocking anything < EXMODE because we want to keep 3069 * them around in that case. */ 3070 filemap_fdatawait(mapping); 3071 } 3072 3073out: 3074 return UNBLOCK_CONTINUE; 3075} 3076 3077static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres, 3078 int new_level) 3079{ 3080 struct inode *inode = ocfs2_lock_res_inode(lockres); 3081 int checkpointed = ocfs2_inode_fully_checkpointed(inode); 3082 3083 BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE); 3084 BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed); 3085 3086 if (checkpointed) 3087 return 1; 3088 3089 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb)); 3090 return 0; 3091} 3092 3093static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres) 3094{ 3095 struct inode *inode = ocfs2_lock_res_inode(lockres); 3096 3097 __ocfs2_stuff_meta_lvb(inode); 3098} 3099 3100/* 3101 * Does the final reference drop on our dentry lock. Right now this 3102 * happens in the downconvert thread, but we could choose to simplify the 3103 * dlmglue API and push these off to the ocfs2_wq in the future. 3104 */ 3105static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb, 3106 struct ocfs2_lock_res *lockres) 3107{ 3108 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3109 ocfs2_dentry_lock_put(osb, dl); 3110} 3111 3112/* 3113 * d_delete() matching dentries before the lock downconvert. 3114 * 3115 * At this point, any process waiting to destroy the 3116 * dentry_lock due to last ref count is stopped by the 3117 * OCFS2_LOCK_QUEUED flag. 3118 * 3119 * We have two potential problems 3120 * 3121 * 1) If we do the last reference drop on our dentry_lock (via dput) 3122 * we'll wind up in ocfs2_release_dentry_lock(), waiting on 3123 * the downconvert to finish. Instead we take an elevated 3124 * reference and push the drop until after we've completed our 3125 * unblock processing. 3126 * 3127 * 2) There might be another process with a final reference, 3128 * waiting on us to finish processing. If this is the case, we 3129 * detect it and exit out - there's no more dentries anyway. 3130 */ 3131static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres, 3132 int blocking) 3133{ 3134 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres); 3135 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode); 3136 struct dentry *dentry; 3137 unsigned long flags; 3138 int extra_ref = 0; 3139 3140 /* 3141 * This node is blocking another node from getting a read 3142 * lock. This happens when we've renamed within a 3143 * directory. We've forced the other nodes to d_delete(), but 3144 * we never actually dropped our lock because it's still 3145 * valid. The downconvert code will retain a PR for this node, 3146 * so there's no further work to do. 3147 */ 3148 if (blocking == LKM_PRMODE) 3149 return UNBLOCK_CONTINUE; 3150 3151 /* 3152 * Mark this inode as potentially orphaned. The code in 3153 * ocfs2_delete_inode() will figure out whether it actually 3154 * needs to be freed or not. 3155 */ 3156 spin_lock(&oi->ip_lock); 3157 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED; 3158 spin_unlock(&oi->ip_lock); 3159 3160 /* 3161 * Yuck. We need to make sure however that the check of 3162 * OCFS2_LOCK_FREEING and the extra reference are atomic with 3163 * respect to a reference decrement or the setting of that 3164 * flag. 3165 */ 3166 spin_lock_irqsave(&lockres->l_lock, flags); 3167 spin_lock(&dentry_attach_lock); 3168 if (!(lockres->l_flags & OCFS2_LOCK_FREEING) 3169 && dl->dl_count) { 3170 dl->dl_count++; 3171 extra_ref = 1; 3172 } 3173 spin_unlock(&dentry_attach_lock); 3174 spin_unlock_irqrestore(&lockres->l_lock, flags); 3175 3176 mlog(0, "extra_ref = %d\n", extra_ref); 3177 3178 /* 3179 * We have a process waiting on us in ocfs2_dentry_iput(), 3180 * which means we can't have any more outstanding 3181 * aliases. There's no need to do any more work. 3182 */ 3183 if (!extra_ref) 3184 return UNBLOCK_CONTINUE; 3185 3186 spin_lock(&dentry_attach_lock); 3187 while (1) { 3188 dentry = ocfs2_find_local_alias(dl->dl_inode, 3189 dl->dl_parent_blkno, 1); 3190 if (!dentry) 3191 break; 3192 spin_unlock(&dentry_attach_lock); 3193 3194 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len, 3195 dentry->d_name.name); 3196 3197 /* 3198 * The following dcache calls may do an 3199 * iput(). Normally we don't want that from the 3200 * downconverting thread, but in this case it's ok 3201 * because the requesting node already has an 3202 * exclusive lock on the inode, so it can't be queued 3203 * for a downconvert. 3204 */ 3205 d_delete(dentry); 3206 dput(dentry); 3207 3208 spin_lock(&dentry_attach_lock); 3209 } 3210 spin_unlock(&dentry_attach_lock); 3211 3212 /* 3213 * If we are the last holder of this dentry lock, there is no 3214 * reason to downconvert so skip straight to the unlock. 3215 */ 3216 if (dl->dl_count == 1) 3217 return UNBLOCK_STOP_POST; 3218 3219 return UNBLOCK_CONTINUE_POST; 3220} 3221 3222void ocfs2_process_blocked_lock(struct ocfs2_super *osb, 3223 struct ocfs2_lock_res *lockres) 3224{ 3225 int status; 3226 struct ocfs2_unblock_ctl ctl = {0, 0,}; 3227 unsigned long flags; 3228 3229 /* Our reference to the lockres in this function can be 3230 * considered valid until we remove the OCFS2_LOCK_QUEUED 3231 * flag. */ 3232 3233 mlog_entry_void(); 3234 3235 BUG_ON(!lockres); 3236 BUG_ON(!lockres->l_ops); 3237 3238 mlog(0, "lockres %s blocked.\n", lockres->l_name); 3239 3240 /* Detect whether a lock has been marked as going away while 3241 * the downconvert thread was processing other things. A lock can 3242 * still be marked with OCFS2_LOCK_FREEING after this check, 3243 * but short circuiting here will still save us some 3244 * performance. */ 3245 spin_lock_irqsave(&lockres->l_lock, flags); 3246 if (lockres->l_flags & OCFS2_LOCK_FREEING) 3247 goto unqueue; 3248 spin_unlock_irqrestore(&lockres->l_lock, flags); 3249 3250 status = ocfs2_unblock_lock(osb, lockres, &ctl); 3251 if (status < 0) 3252 mlog_errno(status); 3253 3254 spin_lock_irqsave(&lockres->l_lock, flags); 3255unqueue: 3256 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) { 3257 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED); 3258 } else 3259 ocfs2_schedule_blocked_lock(osb, lockres); 3260 3261 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, 3262 ctl.requeue ? "yes" : "no"); 3263 spin_unlock_irqrestore(&lockres->l_lock, flags); 3264 3265 if (ctl.unblock_action != UNBLOCK_CONTINUE 3266 && lockres->l_ops->post_unlock) 3267 lockres->l_ops->post_unlock(osb, lockres); 3268 3269 mlog_exit_void(); 3270} 3271 3272static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, 3273 struct ocfs2_lock_res *lockres) 3274{ 3275 mlog_entry_void(); 3276 3277 assert_spin_locked(&lockres->l_lock); 3278 3279 if (lockres->l_flags & OCFS2_LOCK_FREEING) { 3280 /* Do not schedule a lock for downconvert when it's on 3281 * the way to destruction - any nodes wanting access 3282 * to the resource will get it soon. */ 3283 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", 3284 lockres->l_name, lockres->l_flags); 3285 return; 3286 } 3287 3288 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED); 3289 3290 spin_lock(&osb->dc_task_lock); 3291 if (list_empty(&lockres->l_blocked_list)) { 3292 list_add_tail(&lockres->l_blocked_list, 3293 &osb->blocked_lock_list); 3294 osb->blocked_lock_count++; 3295 } 3296 spin_unlock(&osb->dc_task_lock); 3297 3298 mlog_exit_void(); 3299} 3300 3301static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb) 3302{ 3303 unsigned long processed; 3304 struct ocfs2_lock_res *lockres; 3305 3306 mlog_entry_void(); 3307 3308 spin_lock(&osb->dc_task_lock); 3309 /* grab this early so we know to try again if a state change and 3310 * wake happens part-way through our work */ 3311 osb->dc_work_sequence = osb->dc_wake_sequence; 3312 3313 processed = osb->blocked_lock_count; 3314 while (processed) { 3315 BUG_ON(list_empty(&osb->blocked_lock_list)); 3316 3317 lockres = list_entry(osb->blocked_lock_list.next, 3318 struct ocfs2_lock_res, l_blocked_list); 3319 list_del_init(&lockres->l_blocked_list); 3320 osb->blocked_lock_count--; 3321 spin_unlock(&osb->dc_task_lock); 3322 3323 BUG_ON(!processed); 3324 processed--; 3325 3326 ocfs2_process_blocked_lock(osb, lockres); 3327 3328 spin_lock(&osb->dc_task_lock); 3329 } 3330 spin_unlock(&osb->dc_task_lock); 3331 3332 mlog_exit_void(); 3333} 3334 3335static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb) 3336{ 3337 int empty = 0; 3338 3339 spin_lock(&osb->dc_task_lock); 3340 if (list_empty(&osb->blocked_lock_list)) 3341 empty = 1; 3342 3343 spin_unlock(&osb->dc_task_lock); 3344 return empty; 3345} 3346 3347static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb) 3348{ 3349 int should_wake = 0; 3350 3351 spin_lock(&osb->dc_task_lock); 3352 if (osb->dc_work_sequence != osb->dc_wake_sequence) 3353 should_wake = 1; 3354 spin_unlock(&osb->dc_task_lock); 3355 3356 return should_wake; 3357} 3358 3359int ocfs2_downconvert_thread(void *arg) 3360{ 3361 int status = 0; 3362 struct ocfs2_super *osb = arg; 3363 3364 /* only quit once we've been asked to stop and there is no more 3365 * work available */ 3366 while (!(kthread_should_stop() && 3367 ocfs2_downconvert_thread_lists_empty(osb))) { 3368 3369 wait_event_interruptible(osb->dc_event, 3370 ocfs2_downconvert_thread_should_wake(osb) || 3371 kthread_should_stop()); 3372 3373 mlog(0, "downconvert_thread: awoken\n"); 3374 3375 ocfs2_downconvert_thread_do_work(osb); 3376 } 3377 3378 osb->dc_task = NULL; 3379 return status; 3380} 3381 3382void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb) 3383{ 3384 spin_lock(&osb->dc_task_lock); 3385 /* make sure the voting thread gets a swipe at whatever changes 3386 * the caller may have made to the voting state */ 3387 osb->dc_wake_sequence++; 3388 spin_unlock(&osb->dc_task_lock); 3389 wake_up(&osb->dc_event); 3390} 3391