1__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007"""
2
3import sys, os, signal, time, cPickle, logging
4
5from autotest_lib.client.common_lib import error, utils
6from autotest_lib.client.common_lib.cros import retry
7
8
9# entry points that use subcommand must set this to their logging manager
10# to get log redirection for subcommands
11logging_manager_object = None
12
13
14def parallel(tasklist, timeout=None, return_results=False):
15    """
16    Run a set of predefined subcommands in parallel.
17
18    @param tasklist: A list of subcommand instances to execute.
19    @param timeout: Number of seconds after which the commands should timeout.
20    @param return_results: If True instead of an AutoServError being raised
21            on any error a list of the results|exceptions from the tasks is
22            returned.  [default: False]
23    """
24    run_error = False
25    for task in tasklist:
26        task.fork_start()
27
28    remaining_timeout = None
29    if timeout:
30        endtime = time.time() + timeout
31
32    results = []
33    for task in tasklist:
34        if timeout:
35            remaining_timeout = max(endtime - time.time(), 1)
36        try:
37            status = task.fork_waitfor(timeout=remaining_timeout)
38        except error.AutoservSubcommandError:
39            run_error = True
40        else:
41            if status != 0:
42                run_error = True
43
44        results.append(cPickle.load(task.result_pickle))
45        task.result_pickle.close()
46
47    if return_results:
48        return results
49    elif run_error:
50        message = 'One or more subcommands failed:\n'
51        for task, result in zip(tasklist, results):
52            message += 'task: %s returned/raised: %r\n' % (task, result)
53        raise error.AutoservError(message)
54
55
56def parallel_simple(function, arglist, log=True, timeout=None,
57                    return_results=False):
58    """
59    Each element in the arglist used to create a subcommand object,
60    where that arg is used both as a subdir name, and a single argument
61    to pass to "function".
62
63    We create a subcommand object for each element in the list,
64    then execute those subcommand objects in parallel.
65
66    NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used.
67
68    @param function: A callable to run in parallel once per arg in arglist.
69    @param arglist: A list of single arguments to be used one per subcommand;
70            typically a list of machine names.
71    @param log: If True, output will be written to output in a subdirectory
72            named after each subcommand's arg.
73    @param timeout: Number of seconds after which the commands should timeout.
74    @param return_results: If True instead of an AutoServError being raised
75            on any error a list of the results|exceptions from the function
76            called on each arg is returned.  [default: False]
77
78    @returns None or a list of results/exceptions.
79    """
80    if not arglist:
81        logging.warning('parallel_simple was called with an empty arglist, '
82                     'did you forget to pass in a list of machines?')
83    # Bypass the multithreading if only one machine.
84    if len(arglist) == 1:
85        arg = arglist[0]
86        if return_results:
87            try:
88                result = function(arg)
89            except Exception, e:
90                return [e]
91            return [result]
92        else:
93            function(arg)
94            return
95
96    subcommands = []
97    for arg in arglist:
98        args = [arg]
99        if log:
100            subdir = str(arg)
101        else:
102            subdir = None
103        subcommands.append(subcommand(function, args, subdir))
104    return parallel(subcommands, timeout, return_results=return_results)
105
106
107class subcommand(object):
108    fork_hooks, join_hooks = [], []
109
110    def __init__(self, func, args, subdir = None):
111        # func(args) - the subcommand to run
112        # subdir     - the subdirectory to log results in
113        if subdir:
114            self.subdir = os.path.abspath(subdir)
115            if not os.path.exists(self.subdir):
116                os.mkdir(self.subdir)
117            self.debug = os.path.join(self.subdir, 'debug')
118            if not os.path.exists(self.debug):
119                os.mkdir(self.debug)
120        else:
121            self.subdir = None
122            self.debug = None
123
124        self.func = func
125        self.args = args
126        self.lambda_function = lambda: func(*args)
127        self.pid = None
128        self.returncode = None
129
130
131    def __str__(self):
132        return str('subcommand(func=%s,  args=%s, subdir=%s)' %
133                   (self.func, self.args, self.subdir))
134
135
136    @classmethod
137    def register_fork_hook(cls, hook):
138        """ Register a function to be called from the child process after
139        forking. """
140        cls.fork_hooks.append(hook)
141
142
143    @classmethod
144    def register_join_hook(cls, hook):
145        """ Register a function to be called when from the child process
146        just before the child process terminates (joins to the parent). """
147        cls.join_hooks.append(hook)
148
149
150    def redirect_output(self):
151        if self.subdir and logging_manager_object:
152            tag = os.path.basename(self.subdir)
153            logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag)
154
155
156    def fork_start(self):
157        sys.stdout.flush()
158        sys.stderr.flush()
159        r, w = os.pipe()
160        self.returncode = None
161        self.pid = os.fork()
162
163        if self.pid:                            # I am the parent
164            os.close(w)
165            self.result_pickle = os.fdopen(r, 'r')
166            return
167        else:
168            os.close(r)
169
170        # We are the child from this point on. Never return.
171        signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler
172        if self.subdir:
173            os.chdir(self.subdir)
174        self.redirect_output()
175
176        try:
177            for hook in self.fork_hooks:
178                hook(self)
179            result = self.lambda_function()
180            os.write(w, cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL))
181            exit_code = 0
182        except Exception, e:
183            logging.exception('function failed')
184            exit_code = 1
185            os.write(w, cPickle.dumps(e, cPickle.HIGHEST_PROTOCOL))
186
187        os.close(w)
188
189        try:
190            for hook in self.join_hooks:
191                hook(self)
192        finally:
193            sys.stdout.flush()
194            sys.stderr.flush()
195            os._exit(exit_code)
196
197
198    def _handle_exitstatus(self, sts):
199        """
200        This is partially borrowed from subprocess.Popen.
201        """
202        if os.WIFSIGNALED(sts):
203            self.returncode = -os.WTERMSIG(sts)
204        elif os.WIFEXITED(sts):
205            self.returncode = os.WEXITSTATUS(sts)
206        else:
207            # Should never happen
208            raise RuntimeError("Unknown child exit status!")
209
210        if self.returncode != 0:
211            print "subcommand failed pid %d" % self.pid
212            print "%s" % (self.func,)
213            print "rc=%d" % self.returncode
214            print
215            if self.debug:
216                stderr_file = os.path.join(self.debug, 'autoserv.stderr')
217                if os.path.exists(stderr_file):
218                    for line in open(stderr_file).readlines():
219                        print line,
220            print "\n--------------------------------------------\n"
221            raise error.AutoservSubcommandError(self.func, self.returncode)
222
223
224    def poll(self):
225        """
226        This is borrowed from subprocess.Popen.
227        """
228        if self.returncode is None:
229            try:
230                pid, sts = os.waitpid(self.pid, os.WNOHANG)
231                if pid == self.pid:
232                    self._handle_exitstatus(sts)
233            except os.error:
234                pass
235        return self.returncode
236
237
238    def wait(self):
239        """
240        This is borrowed from subprocess.Popen.
241        """
242        if self.returncode is None:
243            pid, sts = os.waitpid(self.pid, 0)
244            self._handle_exitstatus(sts)
245        return self.returncode
246
247
248    def fork_waitfor(self, timeout=None):
249        if not timeout:
250            return self.wait()
251        else:
252            _, result = retry.timeout(self.wait, timeout_sec=timeout)
253
254            if result is None:
255                utils.nuke_pid(self.pid)
256                print "subcommand failed pid %d" % self.pid
257                print "%s" % (self.func,)
258                print "timeout after %ds" % timeout
259                print
260                result = self.wait()
261
262            return result
263