1__author__ = """Copyright Andy Whitcroft, Martin J. Bligh - 2006, 2007""" 2 3import sys, os, signal, time, cPickle, logging 4 5from autotest_lib.client.common_lib import error, utils 6from autotest_lib.client.common_lib.cros import retry 7 8 9# entry points that use subcommand must set this to their logging manager 10# to get log redirection for subcommands 11logging_manager_object = None 12 13 14def parallel(tasklist, timeout=None, return_results=False): 15 """ 16 Run a set of predefined subcommands in parallel. 17 18 @param tasklist: A list of subcommand instances to execute. 19 @param timeout: Number of seconds after which the commands should timeout. 20 @param return_results: If True instead of an AutoServError being raised 21 on any error a list of the results|exceptions from the tasks is 22 returned. [default: False] 23 """ 24 run_error = False 25 for task in tasklist: 26 task.fork_start() 27 28 remaining_timeout = None 29 if timeout: 30 endtime = time.time() + timeout 31 32 results = [] 33 for task in tasklist: 34 if timeout: 35 remaining_timeout = max(endtime - time.time(), 1) 36 try: 37 status = task.fork_waitfor(timeout=remaining_timeout) 38 except error.AutoservSubcommandError: 39 run_error = True 40 else: 41 if status != 0: 42 run_error = True 43 44 results.append(cPickle.load(task.result_pickle)) 45 task.result_pickle.close() 46 47 if return_results: 48 return results 49 elif run_error: 50 message = 'One or more subcommands failed:\n' 51 for task, result in zip(tasklist, results): 52 message += 'task: %s returned/raised: %r\n' % (task, result) 53 raise error.AutoservError(message) 54 55 56def parallel_simple(function, arglist, log=True, timeout=None, 57 return_results=False): 58 """ 59 Each element in the arglist used to create a subcommand object, 60 where that arg is used both as a subdir name, and a single argument 61 to pass to "function". 62 63 We create a subcommand object for each element in the list, 64 then execute those subcommand objects in parallel. 65 66 NOTE: As an optimization, if len(arglist) == 1 a subcommand is not used. 67 68 @param function: A callable to run in parallel once per arg in arglist. 69 @param arglist: A list of single arguments to be used one per subcommand; 70 typically a list of machine names. 71 @param log: If True, output will be written to output in a subdirectory 72 named after each subcommand's arg. 73 @param timeout: Number of seconds after which the commands should timeout. 74 @param return_results: If True instead of an AutoServError being raised 75 on any error a list of the results|exceptions from the function 76 called on each arg is returned. [default: False] 77 78 @returns None or a list of results/exceptions. 79 """ 80 if not arglist: 81 logging.warning('parallel_simple was called with an empty arglist, ' 82 'did you forget to pass in a list of machines?') 83 # Bypass the multithreading if only one machine. 84 if len(arglist) == 1: 85 arg = arglist[0] 86 if return_results: 87 try: 88 result = function(arg) 89 except Exception, e: 90 return [e] 91 return [result] 92 else: 93 function(arg) 94 return 95 96 subcommands = [] 97 for arg in arglist: 98 args = [arg] 99 if log: 100 subdir = str(arg) 101 else: 102 subdir = None 103 subcommands.append(subcommand(function, args, subdir)) 104 return parallel(subcommands, timeout, return_results=return_results) 105 106 107class subcommand(object): 108 fork_hooks, join_hooks = [], [] 109 110 def __init__(self, func, args, subdir = None): 111 # func(args) - the subcommand to run 112 # subdir - the subdirectory to log results in 113 if subdir: 114 self.subdir = os.path.abspath(subdir) 115 if not os.path.exists(self.subdir): 116 os.mkdir(self.subdir) 117 self.debug = os.path.join(self.subdir, 'debug') 118 if not os.path.exists(self.debug): 119 os.mkdir(self.debug) 120 else: 121 self.subdir = None 122 self.debug = None 123 124 self.func = func 125 self.args = args 126 self.lambda_function = lambda: func(*args) 127 self.pid = None 128 self.returncode = None 129 130 131 def __str__(self): 132 return str('subcommand(func=%s, args=%s, subdir=%s)' % 133 (self.func, self.args, self.subdir)) 134 135 136 @classmethod 137 def register_fork_hook(cls, hook): 138 """ Register a function to be called from the child process after 139 forking. """ 140 cls.fork_hooks.append(hook) 141 142 143 @classmethod 144 def register_join_hook(cls, hook): 145 """ Register a function to be called when from the child process 146 just before the child process terminates (joins to the parent). """ 147 cls.join_hooks.append(hook) 148 149 150 def redirect_output(self): 151 if self.subdir and logging_manager_object: 152 tag = os.path.basename(self.subdir) 153 logging_manager_object.tee_redirect_debug_dir(self.debug, tag=tag) 154 155 156 def fork_start(self): 157 sys.stdout.flush() 158 sys.stderr.flush() 159 r, w = os.pipe() 160 self.returncode = None 161 self.pid = os.fork() 162 163 if self.pid: # I am the parent 164 os.close(w) 165 self.result_pickle = os.fdopen(r, 'r') 166 return 167 else: 168 os.close(r) 169 170 # We are the child from this point on. Never return. 171 signal.signal(signal.SIGTERM, signal.SIG_DFL) # clear handler 172 if self.subdir: 173 os.chdir(self.subdir) 174 self.redirect_output() 175 176 try: 177 for hook in self.fork_hooks: 178 hook(self) 179 result = self.lambda_function() 180 os.write(w, cPickle.dumps(result, cPickle.HIGHEST_PROTOCOL)) 181 exit_code = 0 182 except Exception, e: 183 logging.exception('function failed') 184 exit_code = 1 185 os.write(w, cPickle.dumps(e, cPickle.HIGHEST_PROTOCOL)) 186 187 os.close(w) 188 189 try: 190 for hook in self.join_hooks: 191 hook(self) 192 finally: 193 sys.stdout.flush() 194 sys.stderr.flush() 195 os._exit(exit_code) 196 197 198 def _handle_exitstatus(self, sts): 199 """ 200 This is partially borrowed from subprocess.Popen. 201 """ 202 if os.WIFSIGNALED(sts): 203 self.returncode = -os.WTERMSIG(sts) 204 elif os.WIFEXITED(sts): 205 self.returncode = os.WEXITSTATUS(sts) 206 else: 207 # Should never happen 208 raise RuntimeError("Unknown child exit status!") 209 210 if self.returncode != 0: 211 print "subcommand failed pid %d" % self.pid 212 print "%s" % (self.func,) 213 print "rc=%d" % self.returncode 214 print 215 if self.debug: 216 stderr_file = os.path.join(self.debug, 'autoserv.stderr') 217 if os.path.exists(stderr_file): 218 for line in open(stderr_file).readlines(): 219 print line, 220 print "\n--------------------------------------------\n" 221 raise error.AutoservSubcommandError(self.func, self.returncode) 222 223 224 def poll(self): 225 """ 226 This is borrowed from subprocess.Popen. 227 """ 228 if self.returncode is None: 229 try: 230 pid, sts = os.waitpid(self.pid, os.WNOHANG) 231 if pid == self.pid: 232 self._handle_exitstatus(sts) 233 except os.error: 234 pass 235 return self.returncode 236 237 238 def wait(self): 239 """ 240 This is borrowed from subprocess.Popen. 241 """ 242 if self.returncode is None: 243 pid, sts = os.waitpid(self.pid, 0) 244 self._handle_exitstatus(sts) 245 return self.returncode 246 247 248 def fork_waitfor(self, timeout=None): 249 if not timeout: 250 return self.wait() 251 else: 252 _, result = retry.timeout(self.wait, timeout_sec=timeout) 253 254 if result is None: 255 utils.nuke_pid(self.pid) 256 print "subcommand failed pid %d" % self.pid 257 print "%s" % (self.func,) 258 print "timeout after %ds" % timeout 259 print 260 result = self.wait() 261 262 return result 263