1#!/usr/bin/env python
2# Copyright 2014 the V8 project authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from multiprocessing import Event, Process, Queue
7
8class NormalResult():
9  def __init__(self, result):
10    self.result = result
11    self.exception = False
12    self.break_now = False
13
14
15class ExceptionResult():
16  def __init__(self):
17    self.exception = True
18    self.break_now = False
19
20
21class BreakResult():
22  def __init__(self):
23    self.exception = False
24    self.break_now = True
25
26
27def Worker(fn, work_queue, done_queue, done):
28  """Worker to be run in a child process.
29  The worker stops on two conditions. 1. When the poison pill "STOP" is
30  reached or 2. when the event "done" is set."""
31  try:
32    for args in iter(work_queue.get, "STOP"):
33      if done.is_set():
34        break
35      try:
36        done_queue.put(NormalResult(fn(*args)))
37      except Exception, e:
38        print(">>> EXCEPTION: %s" % e)
39        done_queue.put(ExceptionResult())
40  except KeyboardInterrupt:
41    done_queue.put(BreakResult())
42
43
44class Pool():
45  """Distributes tasks to a number of worker processes.
46  New tasks can be added dynamically even after the workers have been started.
47  Requirement: Tasks can only be added from the parent process, e.g. while
48  consuming the results generator."""
49
50  # Factor to calculate the maximum number of items in the work/done queue.
51  # Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
52  BUFFER_FACTOR = 4
53
54  def __init__(self, num_workers):
55    self.num_workers = num_workers
56    self.processes = []
57    self.terminated = False
58
59    # Invariant: count >= #work_queue + #done_queue. It is greater when a
60    # worker takes an item from the work_queue and before the result is
61    # submitted to the done_queue. It is equal when no worker is working,
62    # e.g. when all workers have finished, and when no results are processed.
63    # Count is only accessed by the parent process. Only the parent process is
64    # allowed to remove items from the done_queue and to add items to the
65    # work_queue.
66    self.count = 0
67    self.work_queue = Queue()
68    self.done_queue = Queue()
69    self.done = Event()
70
71  def imap_unordered(self, fn, gen):
72    """Maps function "fn" to items in generator "gen" on the worker processes
73    in an arbitrary order. The items are expected to be lists of arguments to
74    the function. Returns a results iterator."""
75    try:
76      gen = iter(gen)
77      self.advance = self._advance_more
78
79      for w in xrange(self.num_workers):
80        p = Process(target=Worker, args=(fn,
81                                         self.work_queue,
82                                         self.done_queue,
83                                         self.done))
84        self.processes.append(p)
85        p.start()
86
87      self.advance(gen)
88      while self.count > 0:
89        result = self.done_queue.get()
90        self.count -= 1
91        if result.exception:
92          # Ignore items with unexpected exceptions.
93          continue
94        elif result.break_now:
95          # A keyboard interrupt happened in one of the worker processes.
96          raise KeyboardInterrupt
97        else:
98          yield result.result
99        self.advance(gen)
100    finally:
101      self.terminate()
102
103  def _advance_more(self, gen):
104    while self.count < self.num_workers * self.BUFFER_FACTOR:
105      try:
106        self.work_queue.put(gen.next())
107        self.count += 1
108      except StopIteration:
109        self.advance = self._advance_empty
110        break
111
112  def _advance_empty(self, gen):
113    pass
114
115  def add(self, args):
116    """Adds an item to the work queue. Can be called dynamically while
117    processing the results from imap_unordered."""
118    self.work_queue.put(args)
119    self.count += 1
120
121  def terminate(self):
122    if self.terminated:
123      return
124    self.terminated = True
125
126    # For exceptional tear down set the "done" event to stop the workers before
127    # they empty the queue buffer.
128    self.done.set()
129
130    for p in self.processes:
131      # During normal tear down the workers block on get(). Feed a poison pill
132      # per worker to make them stop.
133      self.work_queue.put("STOP")
134
135    for p in self.processes:
136      p.join()
137
138    # Drain the queues to prevent failures when queues are garbage collected.
139    try:
140      while True: self.work_queue.get(False)
141    except:
142      pass
143    try:
144      while True: self.done_queue.get(False)
145    except:
146      pass
147