pipeline_process.py revision f20cffac082e3d920818f230ffc80ae6976267c0
1"""Pipeline process that encapsulates the actual content. 2 3The actual stages include the Steering algorithm, the builder and the executor. 4""" 5 6__author__ = 'yuhenglong@google.com (Yuheng Long)' 7 8import multiprocessing 9 10 11class PipelineProcess(multiprocessing.Process): 12 """A process that encapsulates the actual content. 13 14 It continuously pull tasks from the queue until a poison pill is received. 15 Once a job is received, it will hand it to the actual stage for processing. 16 """ 17 18 # Poison pill means shutdown 19 POISON_PILL = None 20 21 def __init__(self, method, task_queue, result_queue): 22 """Set up input/output queue and the actual method to be called. 23 24 Args: 25 method: The actual pipeline stage to be invoked. 26 task_queue: The input task queue for this pipeline stage. 27 result_queue: The output task queue for this pipeline stage. 28 """ 29 30 multiprocessing.Process.__init__(self) 31 self._method = method 32 self._task_queue = task_queue 33 self._result_queue = result_queue 34 35 def run(self): 36 """Busy pulling the next task from the queue for execution. 37 38 Once a job is pulled, this stage invokes the actual stage method and submits 39 the result to the next pipeline stage. 40 41 The process will terminate on receiving the poison pill from previous stage. 42 """ 43 44 while True: 45 next_task = self.task_queue.get() 46 if next_task is None: 47 # Poison pill means shutdown 48 self.result_queue.put(None) 49 break 50 self._method(next_task) 51 self.result_queue.put(next_task) 52