1""" Parallel execution management """
2
3__author__ = """Copyright Andy Whitcroft 2006"""
4
5import sys, logging, os, pickle, traceback, gc, time
6from autotest_lib.client.common_lib import error, utils
7
8def fork_start(tmp, l):
9    sys.stdout.flush()
10    sys.stderr.flush()
11    pid = os.fork()
12    if pid:
13        # Parent
14        return pid
15
16    try:
17        try:
18            l()
19        except error.AutotestError:
20            raise
21        except Exception, e:
22            raise error.UnhandledTestError(e)
23    except Exception, detail:
24        try:
25            try:
26                logging.error('child process failed')
27                # logging.exception() uses ERROR level, but we want DEBUG for
28                # the traceback
29                for line in traceback.format_exc().splitlines():
30                    logging.debug(line)
31            finally:
32                # note that exceptions originating in this block won't make it
33                # to the logs
34                output_dir = os.path.join(tmp, 'debug')
35                if not os.path.exists(output_dir):
36                    os.makedirs(output_dir)
37                ename = os.path.join(output_dir, "error-%d" % os.getpid())
38                pickle.dump(detail, open(ename, "w"))
39
40                sys.stdout.flush()
41                sys.stderr.flush()
42        finally:
43            # clear exception information to allow garbage collection of
44            # objects referenced by the exception's traceback
45            sys.exc_clear()
46            gc.collect()
47            os._exit(1)
48    else:
49        try:
50            sys.stdout.flush()
51            sys.stderr.flush()
52        finally:
53            os._exit(0)
54
55
56def _check_for_subprocess_exception(temp_dir, pid):
57    ename = temp_dir + "/debug/error-%d" % pid
58    if os.path.exists(ename):
59        try:
60            e = pickle.load(file(ename, 'r'))
61        except ImportError:
62            with open(ename, 'r') as fp:
63                file_text = fp.read()
64            raise error.TestError(
65                    'Subprocess raised an exception that could not be '
66                    'identified. The root cause exception is in the text '
67                    'that follows: ' + file_text)
68        finally:
69            # Rename the error-pid file so that they do not affect later child
70            # processes that use recycled pids.
71            i = 0
72            while True:
73                pename = ename + ('-%d' % i)
74                i += 1
75                if not os.path.exists(pename):
76                    break
77            os.rename(ename, pename)
78        raise e
79
80
81def fork_waitfor(tmp, pid):
82    (pid, status) = os.waitpid(pid, 0)
83
84    _check_for_subprocess_exception(tmp, pid)
85
86    if status:
87        raise error.TestError("Test subprocess failed rc=%d" % (status))
88
89def fork_waitfor_timed(tmp, pid, timeout):
90    """
91    Waits for pid until it terminates or timeout expires.
92    If timeout expires, test subprocess is killed.
93    """
94    timer_expired = True
95    poll_time = 2
96    time_passed = 0
97    while time_passed < timeout:
98        time.sleep(poll_time)
99        (child_pid, status) = os.waitpid(pid, os.WNOHANG)
100        if (child_pid, status) == (0, 0):
101            time_passed = time_passed + poll_time
102        else:
103            timer_expired = False
104            break
105
106    if timer_expired:
107        logging.info('Timer expired (%d sec.), nuking pid %d', timeout, pid)
108        utils.nuke_pid(pid)
109        (child_pid, status) = os.waitpid(pid, 0)
110        raise error.TestError("Test timeout expired, rc=%d" % (status))
111    else:
112        _check_for_subprocess_exception(tmp, pid)
113
114    if status:
115        raise error.TestError("Test subprocess failed rc=%d" % (status))
116
117def fork_nuke_subprocess(tmp, pid):
118    utils.nuke_pid(pid)
119    _check_for_subprocess_exception(tmp, pid)
120