lock_machine_test.py revision f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbe
1# Copyright 2010 Google Inc. All Rights Reserved. 2"""lock_machine.py related unit-tests. 3 4MachineManagerTest tests MachineManager. 5""" 6 7__author__ = 'asharif@google.com (Ahmad Sharif)' 8 9from multiprocessing import Process 10import time 11import unittest 12 13import lock_machine 14 15 16def LockAndSleep(machine): 17 lock_machine.Machine(machine, auto=True).Lock(exclusive=True) 18 time.sleep(1) 19 20 21class MachineTest(unittest.TestCase): 22 23 def setUp(self): 24 pass 25 26 def testRepeatedUnlock(self): 27 mach = lock_machine.Machine('qqqraymes.mtv') 28 for i in range(10): 29 self.assertFalse(mach.Unlock()) 30 mach = lock_machine.Machine('qqqraymes.mtv', auto=True) 31 for i in range(10): 32 self.assertFalse(mach.Unlock()) 33 34 def testLockUnlock(self): 35 mach = lock_machine.Machine('otter.mtv', '/tmp') 36 for i in range(10): 37 self.assertTrue(mach.Lock(exclusive=True)) 38 self.assertTrue(mach.Unlock(exclusive=True)) 39 40 mach = lock_machine.Machine('otter.mtv', '/tmp', True) 41 for i in range(10): 42 self.assertTrue(mach.Lock(exclusive=True)) 43 self.assertTrue(mach.Unlock(exclusive=True)) 44 45 def testSharedLock(self): 46 mach = lock_machine.Machine('chrotomation.mtv') 47 for i in range(10): 48 self.assertTrue(mach.Lock(exclusive=False)) 49 for i in range(10): 50 self.assertTrue(mach.Unlock(exclusive=False)) 51 self.assertTrue(mach.Lock(exclusive=True)) 52 self.assertTrue(mach.Unlock(exclusive=True)) 53 54 mach = lock_machine.Machine('chrotomation.mtv', auto=True) 55 for i in range(10): 56 self.assertTrue(mach.Lock(exclusive=False)) 57 for i in range(10): 58 self.assertTrue(mach.Unlock(exclusive=False)) 59 self.assertTrue(mach.Lock(exclusive=True)) 60 self.assertTrue(mach.Unlock(exclusive=True)) 61 62 def testExclusiveLock(self): 63 mach = lock_machine.Machine('atree.mtv') 64 self.assertTrue(mach.Lock(exclusive=True)) 65 for i in range(10): 66 self.assertFalse(mach.Lock(exclusive=True)) 67 self.assertFalse(mach.Lock(exclusive=False)) 68 self.assertTrue(mach.Unlock(exclusive=True)) 69 70 mach = lock_machine.Machine('atree.mtv', auto=True) 71 self.assertTrue(mach.Lock(exclusive=True)) 72 for i in range(10): 73 self.assertFalse(mach.Lock(exclusive=True)) 74 self.assertFalse(mach.Lock(exclusive=False)) 75 self.assertTrue(mach.Unlock(exclusive=True)) 76 77 def testExclusiveState(self): 78 mach = lock_machine.Machine('testExclusiveState') 79 self.assertTrue(mach.Lock(exclusive=True)) 80 for i in range(10): 81 self.assertFalse(mach.Lock(exclusive=False)) 82 self.assertTrue(mach.Unlock(exclusive=True)) 83 84 mach = lock_machine.Machine('testExclusiveState', auto=True) 85 self.assertTrue(mach.Lock(exclusive=True)) 86 for i in range(10): 87 self.assertFalse(mach.Lock(exclusive=False)) 88 self.assertTrue(mach.Unlock(exclusive=True)) 89 90 def testAutoLockGone(self): 91 mach = lock_machine.Machine('lockgone', auto=True) 92 p = Process(target=LockAndSleep, args=('lockgone',)) 93 p.start() 94 time.sleep(1.1) 95 p.join() 96 self.assertTrue(mach.Lock(exclusive=True)) 97 98 def testAutoLockFromOther(self): 99 mach = lock_machine.Machine('other_lock', auto=True) 100 p = Process(target=LockAndSleep, args=('other_lock',)) 101 p.start() 102 time.sleep(0.5) 103 self.assertFalse(mach.Lock(exclusive=True)) 104 p.join() 105 time.sleep(0.6) 106 self.assertTrue(mach.Lock(exclusive=True)) 107 108 def testUnlockByOthers(self): 109 mach = lock_machine.Machine('other_unlock', auto=True) 110 p = Process(target=LockAndSleep, args=('other_unlock',)) 111 p.start() 112 time.sleep(0.5) 113 self.assertTrue(mach.Unlock(exclusive=True)) 114 self.assertTrue(mach.Lock(exclusive=True)) 115 116 117if __name__ == '__main__': 118 unittest.main() 119