1b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch#!/usr/bin/env python
2b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch# Copyright 2014 the V8 project authors. All rights reserved.
3b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch# Use of this source code is governed by a BSD-style license that can be
4b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch# found in the LICENSE file.
5b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
6014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdochfrom Queue import Empty
7b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochfrom multiprocessing import Event, Process, Queue
8014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdochimport traceback
9014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch
10b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
11b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochclass NormalResult():
12b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  def __init__(self, result):
13b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.result = result
14b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.exception = False
15b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.break_now = False
16b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
17b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
18b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochclass ExceptionResult():
19b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  def __init__(self):
20b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.exception = True
21b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.break_now = False
22b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
23b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
24b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochclass BreakResult():
25b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  def __init__(self):
26b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.exception = False
27b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.break_now = True
28b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
29b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
30014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdochclass MaybeResult():
31014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch  def __init__(self, heartbeat, value):
32014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    self.heartbeat = heartbeat
33014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    self.value = value
34014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch
35014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch  @staticmethod
36014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch  def create_heartbeat():
37014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    return MaybeResult(True, None)
38014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch
39014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch  @staticmethod
40014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch  def create_result(value):
41014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    return MaybeResult(False, value)
42014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch
43014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch
44014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdochdef Worker(fn, work_queue, done_queue, done,
45014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch           process_context_fn=None, process_context_args=None):
46b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  """Worker to be run in a child process.
47b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  The worker stops on two conditions. 1. When the poison pill "STOP" is
48b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  reached or 2. when the event "done" is set."""
49b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  try:
50014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    kwargs = {}
51014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    if process_context_fn and process_context_args is not None:
52014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch      kwargs.update(process_context=process_context_fn(*process_context_args))
53b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    for args in iter(work_queue.get, "STOP"):
54b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      if done.is_set():
55b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        break
56b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      try:
57014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch        done_queue.put(NormalResult(fn(*args, **kwargs)))
58b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      except Exception, e:
59014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch        traceback.print_exc()
60b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        print(">>> EXCEPTION: %s" % e)
61b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        done_queue.put(ExceptionResult())
62b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  except KeyboardInterrupt:
63b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    done_queue.put(BreakResult())
64b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
65b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
66b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdochclass Pool():
67b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  """Distributes tasks to a number of worker processes.
68b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  New tasks can be added dynamically even after the workers have been started.
69b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  Requirement: Tasks can only be added from the parent process, e.g. while
70b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  consuming the results generator."""
71b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
72b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  # Factor to calculate the maximum number of items in the work/done queue.
73b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
74b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  BUFFER_FACTOR = 4
75b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
76014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch  def __init__(self, num_workers, heartbeat_timeout=30):
77b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.num_workers = num_workers
78b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.processes = []
79b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.terminated = False
80b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
81b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # Invariant: count >= #work_queue + #done_queue. It is greater when a
82b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # worker takes an item from the work_queue and before the result is
83b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # submitted to the done_queue. It is equal when no worker is working,
84b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # e.g. when all workers have finished, and when no results are processed.
85b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # Count is only accessed by the parent process. Only the parent process is
86b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # allowed to remove items from the done_queue and to add items to the
87b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # work_queue.
88b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.count = 0
89b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.work_queue = Queue()
90b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.done_queue = Queue()
91b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.done = Event()
92014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    self.heartbeat_timeout = heartbeat_timeout
93b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
94014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch  def imap_unordered(self, fn, gen,
95014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch                     process_context_fn=None, process_context_args=None):
96b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    """Maps function "fn" to items in generator "gen" on the worker processes
97b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    in an arbitrary order. The items are expected to be lists of arguments to
98014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    the function. Returns a results iterator. A result value of type
99014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    MaybeResult either indicates a heartbeat of the runner, i.e. indicating
100014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    that the runner is still waiting for the result to be computed, or it wraps
101014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    the real result.
102014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch
103014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    Args:
104014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch      process_context_fn: Function executed once by each worker. Expected to
105014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch          return a process-context object. If present, this object is passed
106014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch          as additional argument to each call to fn.
107014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch      process_context_args: List of arguments for the invocation of
108014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch          process_context_fn. All arguments will be pickled and sent beyond the
109014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch          process boundary.
110014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch    """
111b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    try:
112109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch      internal_error = False
113b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      gen = iter(gen)
114b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      self.advance = self._advance_more
115b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
116b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      for w in xrange(self.num_workers):
117b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        p = Process(target=Worker, args=(fn,
118b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch                                         self.work_queue,
119b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch                                         self.done_queue,
120014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch                                         self.done,
121014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch                                         process_context_fn,
122014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch                                         process_context_args))
123b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        self.processes.append(p)
124b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        p.start()
125b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
126b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      self.advance(gen)
127b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      while self.count > 0:
128014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch        while True:
129014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch          try:
130014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch            result = self.done_queue.get(timeout=self.heartbeat_timeout)
131014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch            break
132014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch          except Empty:
133014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch            # Indicate a heartbeat. The iterator will continue fetching the
134014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch            # next result.
135014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch            yield MaybeResult.create_heartbeat()
136b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        self.count -= 1
137b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        if result.exception:
138109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch          # TODO(machenbach): Handle a few known types of internal errors
139109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch          # gracefully, e.g. missing test files.
140109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch          internal_error = True
141b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch          continue
142b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        elif result.break_now:
143b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch          # A keyboard interrupt happened in one of the worker processes.
144b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch          raise KeyboardInterrupt
145b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        else:
146014dc512cdd3e367bee49a713fdc5ed92584a3e5Ben Murdoch          yield MaybeResult.create_result(result.result)
147b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        self.advance(gen)
148b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    finally:
149b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      self.terminate()
150109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch    if internal_error:
151109988c7ccb6f3fd1a58574fa3dfb88beaef6632Ben Murdoch      raise Exception("Internal error in a worker process.")
152b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
153b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  def _advance_more(self, gen):
154b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    while self.count < self.num_workers * self.BUFFER_FACTOR:
155b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      try:
156b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        self.work_queue.put(gen.next())
157b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        self.count += 1
158b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      except StopIteration:
159b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        self.advance = self._advance_empty
160b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch        break
161b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
162b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  def _advance_empty(self, gen):
163b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    pass
164b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
165b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  def add(self, args):
166b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    """Adds an item to the work queue. Can be called dynamically while
167b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    processing the results from imap_unordered."""
168b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.work_queue.put(args)
169b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.count += 1
170b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
171b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch  def terminate(self):
172b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    if self.terminated:
173b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      return
174b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.terminated = True
175b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
176b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # For exceptional tear down set the "done" event to stop the workers before
177b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # they empty the queue buffer.
178b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    self.done.set()
179b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
180b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    for p in self.processes:
181b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      # During normal tear down the workers block on get(). Feed a poison pill
182b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      # per worker to make them stop.
183b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      self.work_queue.put("STOP")
184b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
185b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    for p in self.processes:
186b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      p.join()
187b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch
188b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    # Drain the queues to prevent failures when queues are garbage collected.
189b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    try:
190b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      while True: self.work_queue.get(False)
191b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    except:
192b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      pass
193b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    try:
194b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      while True: self.done_queue.get(False)
195b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch    except:
196b8a8cc1952d61a2f3a2568848933943a543b5d3eBen Murdoch      pass
197