pipeline_process.py revision f20cffac082e3d920818f230ffc80ae6976267c0
1"""Pipeline process that encapsulates the actual content.
2
3The actual stages include the Steering algorithm, the builder and the executor.
4"""
5
6__author__ = 'yuhenglong@google.com (Yuheng Long)'
7
8import multiprocessing
9
10
11class PipelineProcess(multiprocessing.Process):
12  """A process that encapsulates the actual content.
13
14  It continuously pull tasks from the queue until a poison pill is received.
15  Once a job is received, it will hand it to the actual stage for processing.
16  """
17
18  # Poison pill means shutdown
19  POISON_PILL = None
20
21  def __init__(self, method, task_queue, result_queue):
22    """Set up input/output queue and the actual method to be called.
23
24    Args:
25      method: The actual pipeline stage to be invoked.
26      task_queue: The input task queue for this pipeline stage.
27      result_queue: The output task queue for this pipeline stage.
28    """
29
30    multiprocessing.Process.__init__(self)
31    self._method = method
32    self._task_queue = task_queue
33    self._result_queue = result_queue
34
35  def run(self):
36    """Busy pulling the next task from the queue for execution.
37
38    Once a job is pulled, this stage invokes the actual stage method and submits
39    the result to the next pipeline stage.
40
41    The process will terminate on receiving the poison pill from previous stage.
42    """
43
44    while True:
45      next_task = self.task_queue.get()
46      if next_task is None:
47        # Poison pill means shutdown
48        self.result_queue.put(None)
49        break
50      self._method(next_task)
51      self.result_queue.put(next_task)
52