1""" Parallel execution management """ 2 3__author__ = """Copyright Andy Whitcroft 2006""" 4 5import sys, logging, os, pickle, traceback, gc, time 6from autotest_lib.client.common_lib import error, utils 7 8def fork_start(tmp, l): 9 sys.stdout.flush() 10 sys.stderr.flush() 11 pid = os.fork() 12 if pid: 13 # Parent 14 return pid 15 16 try: 17 try: 18 l() 19 except error.AutotestError: 20 raise 21 except Exception, e: 22 raise error.UnhandledTestError(e) 23 except Exception, detail: 24 try: 25 try: 26 logging.error('child process failed') 27 # logging.exception() uses ERROR level, but we want DEBUG for 28 # the traceback 29 for line in traceback.format_exc().splitlines(): 30 logging.debug(line) 31 finally: 32 # note that exceptions originating in this block won't make it 33 # to the logs 34 output_dir = os.path.join(tmp, 'debug') 35 if not os.path.exists(output_dir): 36 os.makedirs(output_dir) 37 ename = os.path.join(output_dir, "error-%d" % os.getpid()) 38 pickle.dump(detail, open(ename, "w")) 39 40 sys.stdout.flush() 41 sys.stderr.flush() 42 finally: 43 # clear exception information to allow garbage collection of 44 # objects referenced by the exception's traceback 45 sys.exc_clear() 46 gc.collect() 47 os._exit(1) 48 else: 49 try: 50 sys.stdout.flush() 51 sys.stderr.flush() 52 finally: 53 os._exit(0) 54 55 56def _check_for_subprocess_exception(temp_dir, pid): 57 ename = temp_dir + "/debug/error-%d" % pid 58 if os.path.exists(ename): 59 try: 60 e = pickle.load(file(ename, 'r')) 61 except ImportError: 62 with open(ename, 'r') as fp: 63 file_text = fp.read() 64 raise error.TestError( 65 'Subprocess raised an exception that could not be ' 66 'identified. The root cause exception is in the text ' 67 'that follows: ' + file_text) 68 finally: 69 # Rename the error-pid file so that they do not affect later child 70 # processes that use recycled pids. 71 i = 0 72 while True: 73 pename = ename + ('-%d' % i) 74 i += 1 75 if not os.path.exists(pename): 76 break 77 os.rename(ename, pename) 78 raise e 79 80 81def fork_waitfor(tmp, pid): 82 (pid, status) = os.waitpid(pid, 0) 83 84 _check_for_subprocess_exception(tmp, pid) 85 86 if status: 87 raise error.TestError("Test subprocess failed rc=%d" % (status)) 88 89def fork_waitfor_timed(tmp, pid, timeout): 90 """ 91 Waits for pid until it terminates or timeout expires. 92 If timeout expires, test subprocess is killed. 93 """ 94 timer_expired = True 95 poll_time = 2 96 time_passed = 0 97 while time_passed < timeout: 98 time.sleep(poll_time) 99 (child_pid, status) = os.waitpid(pid, os.WNOHANG) 100 if (child_pid, status) == (0, 0): 101 time_passed = time_passed + poll_time 102 else: 103 timer_expired = False 104 break 105 106 if timer_expired: 107 logging.info('Timer expired (%d sec.), nuking pid %d', timeout, pid) 108 utils.nuke_pid(pid) 109 (child_pid, status) = os.waitpid(pid, 0) 110 raise error.TestError("Test timeout expired, rc=%d" % (status)) 111 else: 112 _check_for_subprocess_exception(tmp, pid) 113 114 if status: 115 raise error.TestError("Test subprocess failed rc=%d" % (status)) 116 117def fork_nuke_subprocess(tmp, pid): 118 utils.nuke_pid(pid) 119 _check_for_subprocess_exception(tmp, pid) 120