1#!/usr/bin/env python
2# Copyright 2014 the V8 project authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from Queue import Empty
7from multiprocessing import Event, Process, Queue
8import traceback
9
10
11class NormalResult():
12  def __init__(self, result):
13    self.result = result
14    self.exception = False
15    self.break_now = False
16
17
18class ExceptionResult():
19  def __init__(self):
20    self.exception = True
21    self.break_now = False
22
23
24class BreakResult():
25  def __init__(self):
26    self.exception = False
27    self.break_now = True
28
29
30class MaybeResult():
31  def __init__(self, heartbeat, value):
32    self.heartbeat = heartbeat
33    self.value = value
34
35  @staticmethod
36  def create_heartbeat():
37    return MaybeResult(True, None)
38
39  @staticmethod
40  def create_result(value):
41    return MaybeResult(False, value)
42
43
44def Worker(fn, work_queue, done_queue, done,
45           process_context_fn=None, process_context_args=None):
46  """Worker to be run in a child process.
47  The worker stops on two conditions. 1. When the poison pill "STOP" is
48  reached or 2. when the event "done" is set."""
49  try:
50    kwargs = {}
51    if process_context_fn and process_context_args is not None:
52      kwargs.update(process_context=process_context_fn(*process_context_args))
53    for args in iter(work_queue.get, "STOP"):
54      if done.is_set():
55        break
56      try:
57        done_queue.put(NormalResult(fn(*args, **kwargs)))
58      except Exception, e:
59        traceback.print_exc()
60        print(">>> EXCEPTION: %s" % e)
61        done_queue.put(ExceptionResult())
62  except KeyboardInterrupt:
63    done_queue.put(BreakResult())
64
65
66class Pool():
67  """Distributes tasks to a number of worker processes.
68  New tasks can be added dynamically even after the workers have been started.
69  Requirement: Tasks can only be added from the parent process, e.g. while
70  consuming the results generator."""
71
72  # Factor to calculate the maximum number of items in the work/done queue.
73  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
74  BUFFER_FACTOR = 4
75
76  def __init__(self, num_workers, heartbeat_timeout=30):
77    self.num_workers = num_workers
78    self.processes = []
79    self.terminated = False
80
81    # Invariant: count >= #work_queue + #done_queue. It is greater when a
82    # worker takes an item from the work_queue and before the result is
83    # submitted to the done_queue. It is equal when no worker is working,
84    # e.g. when all workers have finished, and when no results are processed.
85    # Count is only accessed by the parent process. Only the parent process is
86    # allowed to remove items from the done_queue and to add items to the
87    # work_queue.
88    self.count = 0
89    self.work_queue = Queue()
90    self.done_queue = Queue()
91    self.done = Event()
92    self.heartbeat_timeout = heartbeat_timeout
93
94  def imap_unordered(self, fn, gen,
95                     process_context_fn=None, process_context_args=None):
96    """Maps function "fn" to items in generator "gen" on the worker processes
97    in an arbitrary order. The items are expected to be lists of arguments to
98    the function. Returns a results iterator. A result value of type
99    MaybeResult either indicates a heartbeat of the runner, i.e. indicating
100    that the runner is still waiting for the result to be computed, or it wraps
101    the real result.
102
103    Args:
104      process_context_fn: Function executed once by each worker. Expected to
105          return a process-context object. If present, this object is passed
106          as additional argument to each call to fn.
107      process_context_args: List of arguments for the invocation of
108          process_context_fn. All arguments will be pickled and sent beyond the
109          process boundary.
110    """
111    try:
112      internal_error = False
113      gen = iter(gen)
114      self.advance = self._advance_more
115
116      for w in xrange(self.num_workers):
117        p = Process(target=Worker, args=(fn,
118                                         self.work_queue,
119                                         self.done_queue,
120                                         self.done,
121                                         process_context_fn,
122                                         process_context_args))
123        self.processes.append(p)
124        p.start()
125
126      self.advance(gen)
127      while self.count > 0:
128        while True:
129          try:
130            result = self.done_queue.get(timeout=self.heartbeat_timeout)
131            break
132          except Empty:
133            # Indicate a heartbeat. The iterator will continue fetching the
134            # next result.
135            yield MaybeResult.create_heartbeat()
136        self.count -= 1
137        if result.exception:
138          # TODO(machenbach): Handle a few known types of internal errors
139          # gracefully, e.g. missing test files.
140          internal_error = True
141          continue
142        elif result.break_now:
143          # A keyboard interrupt happened in one of the worker processes.
144          raise KeyboardInterrupt
145        else:
146          yield MaybeResult.create_result(result.result)
147        self.advance(gen)
148    finally:
149      self.terminate()
150    if internal_error:
151      raise Exception("Internal error in a worker process.")
152
153  def _advance_more(self, gen):
154    while self.count < self.num_workers * self.BUFFER_FACTOR:
155      try:
156        self.work_queue.put(gen.next())
157        self.count += 1
158      except StopIteration:
159        self.advance = self._advance_empty
160        break
161
162  def _advance_empty(self, gen):
163    pass
164
165  def add(self, args):
166    """Adds an item to the work queue. Can be called dynamically while
167    processing the results from imap_unordered."""
168    self.work_queue.put(args)
169    self.count += 1
170
171  def terminate(self):
172    if self.terminated:
173      return
174    self.terminated = True
175
176    # For exceptional tear down set the "done" event to stop the workers before
177    # they empty the queue buffer.
178    self.done.set()
179
180    for p in self.processes:
181      # During normal tear down the workers block on get(). Feed a poison pill
182      # per worker to make them stop.
183      self.work_queue.put("STOP")
184
185    for p in self.processes:
186      p.join()
187
188    # Drain the queues to prevent failures when queues are garbage collected.
189    try:
190      while True: self.work_queue.get(False)
191    except:
192      pass
193    try:
194      while True: self.done_queue.get(False)
195    except:
196      pass
197