1b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch#!/usr/bin/env python 2b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch# Copyright 2014 the V8 project authors. All rights reserved. 3b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch# Use of this source code is governed by a BSD-style license that can be 4b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch# found in the LICENSE file. 5b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 6014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdochfrom Queue import Empty 7b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochfrom multiprocessing import Event, Process, Queue 8014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdochimport traceback 9014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch 10b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 11b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochclass NormalResult(): 12b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch def __init__(self, result): 13b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.result = result 14b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.exception = False 15b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.break_now = False 16b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 17b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 18b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochclass ExceptionResult(): 19b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch def __init__(self): 20b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.exception = True 21b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.break_now = False 22b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 23b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 24b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochclass BreakResult(): 25b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch def __init__(self): 26b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.exception = False 27b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.break_now = True 28b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 29b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 30014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdochclass MaybeResult(): 31014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch def __init__(self, heartbeat, value): 32014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch self.heartbeat = heartbeat 33014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch self.value = value 34014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch 35014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch @staticmethod 36014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch def create_heartbeat(): 37014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch return MaybeResult(True, None) 38014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch 39014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch @staticmethod 40014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch def create_result(value): 41014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch return MaybeResult(False, value) 42014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch 43014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch 44014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdochdef Worker(fn, work_queue, done_queue, done, 45014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch process_context_fn=None, process_context_args=None): 46b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch """Worker to be run in a child process. 47b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch The worker stops on two conditions. 1. When the poison pill "STOP" is 48b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch reached or 2. when the event "done" is set.""" 49b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch try: 50014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch kwargs = {} 51014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch if process_context_fn and process_context_args is not None: 52014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch kwargs.update(process_context=process_context_fn(*process_context_args)) 53b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch for args in iter(work_queue.get, "STOP"): 54b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch if done.is_set(): 55b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch break 56b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch try: 57014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch done_queue.put(NormalResult(fn(*args, **kwargs))) 58b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch except Exception, e: 59014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch traceback.print_exc() 60b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch print(">>> EXCEPTION: %s" % e) 61b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch done_queue.put(ExceptionResult()) 62b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch except KeyboardInterrupt: 63b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch done_queue.put(BreakResult()) 64b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 65b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 66b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochclass Pool(): 67b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch """Distributes tasks to a number of worker processes. 68b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch New tasks can be added dynamically even after the workers have been started. 69b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch Requirement: Tasks can only be added from the parent process, e.g. while 70b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch consuming the results generator.""" 71b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 72b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # Factor to calculate the maximum number of items in the work/done queue. 73b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. 74b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch BUFFER_FACTOR = 4 75b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 76014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch def __init__(self, num_workers, heartbeat_timeout=30): 77b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.num_workers = num_workers 78b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.processes = [] 79b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.terminated = False 80b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 81b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # Invariant: count >= #work_queue + #done_queue. It is greater when a 82b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # worker takes an item from the work_queue and before the result is 83b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # submitted to the done_queue. It is equal when no worker is working, 84b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # e.g. when all workers have finished, and when no results are processed. 85b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # Count is only accessed by the parent process. Only the parent process is 86b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # allowed to remove items from the done_queue and to add items to the 87b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # work_queue. 88b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.count = 0 89b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.work_queue = Queue() 90b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.done_queue = Queue() 91b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.done = Event() 92014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch self.heartbeat_timeout = heartbeat_timeout 93b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 94014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch def imap_unordered(self, fn, gen, 95014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch process_context_fn=None, process_context_args=None): 96b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch """Maps function "fn" to items in generator "gen" on the worker processes 97b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch in an arbitrary order. The items are expected to be lists of arguments to 98014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch the function. Returns a results iterator. A result value of type 99014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch MaybeResult either indicates a heartbeat of the runner, i.e. indicating 100014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch that the runner is still waiting for the result to be computed, or it wraps 101014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch the real result. 102014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch 103014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch Args: 104014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch process_context_fn: Function executed once by each worker. Expected to 105014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch return a process-context object. If present, this object is passed 106014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch as additional argument to each call to fn. 107014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch process_context_args: List of arguments for the invocation of 108014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch process_context_fn. All arguments will be pickled and sent beyond the 109014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch process boundary. 110014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch """ 111b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch try: 112109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch internal_error = False 113b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch gen = iter(gen) 114b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.advance = self._advance_more 115b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 116b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch for w in xrange(self.num_workers): 117b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch p = Process(target=Worker, args=(fn, 118b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.work_queue, 119b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.done_queue, 120014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch self.done, 121014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch process_context_fn, 122014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch process_context_args)) 123b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.processes.append(p) 124b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch p.start() 125b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 126b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.advance(gen) 127b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch while self.count > 0: 128014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch while True: 129014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch try: 130014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch result = self.done_queue.get(timeout=self.heartbeat_timeout) 131014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch break 132014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch except Empty: 133014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch # Indicate a heartbeat. The iterator will continue fetching the 134014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch # next result. 135014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch yield MaybeResult.create_heartbeat() 136b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.count -= 1 137b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch if result.exception: 138109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch # TODO(machenbach): Handle a few known types of internal errors 139109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch # gracefully, e.g. missing test files. 140109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch internal_error = True 141b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch continue 142b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch elif result.break_now: 143b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # A keyboard interrupt happened in one of the worker processes. 144b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch raise KeyboardInterrupt 145b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch else: 146014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch yield MaybeResult.create_result(result.result) 147b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.advance(gen) 148b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch finally: 149b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.terminate() 150109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch if internal_error: 151109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch raise Exception("Internal error in a worker process.") 152b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 153b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch def _advance_more(self, gen): 154b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch while self.count < self.num_workers * self.BUFFER_FACTOR: 155b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch try: 156b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.work_queue.put(gen.next()) 157b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.count += 1 158b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch except StopIteration: 159b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.advance = self._advance_empty 160b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch break 161b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 162b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch def _advance_empty(self, gen): 163b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch pass 164b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 165b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch def add(self, args): 166b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch """Adds an item to the work queue. Can be called dynamically while 167b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch processing the results from imap_unordered.""" 168b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.work_queue.put(args) 169b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.count += 1 170b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 171b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch def terminate(self): 172b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch if self.terminated: 173b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch return 174b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.terminated = True 175b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 176b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # For exceptional tear down set the "done" event to stop the workers before 177b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # they empty the queue buffer. 178b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.done.set() 179b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 180b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch for p in self.processes: 181b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # During normal tear down the workers block on get(). Feed a poison pill 182b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # per worker to make them stop. 183b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch self.work_queue.put("STOP") 184b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 185b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch for p in self.processes: 186b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch p.join() 187b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch 188b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch # Drain the queues to prevent failures when queues are garbage collected. 189b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch try: 190b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch while True: self.work_queue.get(False) 191b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch except: 192b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch pass 193b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch try: 194b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch while True: self.done_queue.get(False) 195b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch except: 196b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch pass 197