pipeline_process.py revision 5fe6dc81886f49eb2089c883c6862f2e896155cf
1"""Pipeline process that encapsulates the actual content. 2 3The actual stages include the builder and the executor. 4""" 5 6__author__ = 'yuhenglong@google.com (Yuheng Long)' 7 8import multiprocessing 9 10# Pick an integer at random. 11POISONPILL = 975 12 13 14class PipelineProcess(multiprocessing.Process): 15 """A process that encapsulates the actual content pipeline stage. 16 17 The actual pipeline stage can be the builder or the tester. This process 18 continuously pull tasks from the queue until a poison pill is received. 19 Once a job is received, it will hand it to the actual stage for processing. 20 21 Each pipeline stage contains three modules. 22 The first module continuously pulls task from the input queue. It searches the 23 cache to check whether the task has encountered before. If so, duplicate 24 computation can be avoided. 25 The second module consists of a pool of workers that do the actual work, e.g., 26 the worker will compile the source code and get the image in the builder 27 pipeline stage. 28 The third module is a helper that put the result cost to the cost field of the 29 duplicate tasks. For example, if two tasks are equivalent, only one task, say 30 t1 will be executed and the other task, say t2 will not be executed. The third 31 mode gets the result from t1, when it is available and set the cost of t2 to 32 be the same as that of t1. 33 """ 34 35 def __init__(self, num_processes, name, cache, stage, task_queue, helper, 36 worker, result_queue): 37 """Set up input/output queue and the actual method to be called. 38 39 Args: 40 num_processes: Number of helpers subprocessors this stage has. 41 name: The name of this stage. 42 cache: The computed tasks encountered before. 43 stage: An int value that specifies the stage for this pipeline stage, for 44 example, build stage or test stage. This value will be used to retrieve 45 the keys in different stage. I.e., the flags set is the key in build 46 stage and the checksum is the key in the test stage. The key is used to 47 detect duplicates. 48 task_queue: The input task queue for this pipeline stage. 49 helper: The method hosted by the helper module to fill up the cost of the 50 duplicate tasks. 51 worker: The method hosted by the worker pools to do the actual work, e.g., 52 compile the image. 53 result_queue: The output task queue for this pipeline stage. 54 """ 55 56 multiprocessing.Process.__init__(self) 57 58 self._name = name 59 self._task_queue = task_queue 60 self._result_queue = result_queue 61 62 self._helper = helper 63 self._worker = worker 64 65 self._cache = cache 66 self._stage = stage 67 self._num_processes = num_processes 68 69 # the queues used by the modules for communication 70 manager = multiprocessing.Manager() 71 self._helper_queue = manager.Queue() 72 self._work_queue = manager.Queue() 73 74 def run(self): 75 """Busy pulling the next task from the queue for execution. 76 77 Once a job is pulled, this stage invokes the actual stage method and submits 78 the result to the next pipeline stage. 79 80 The process will terminate on receiving the poison pill from previous stage. 81 """ 82 83 # the worker pool 84 work_pool = multiprocessing.Pool(self._num_processes) 85 86 # the helper process 87 helper_process = multiprocessing.Process(target=self._helper, 88 args=(self._cache, 89 self._helper_queue, 90 self._work_queue, 91 self._result_queue)) 92 helper_process.start() 93 mycache = self._cache.keys() 94 95 while True: 96 task = self._task_queue.get() 97 if task == POISONPILL: 98 # Poison pill means shutdown 99 self._result_queue.put(POISONPILL) 100 break 101 102 task_key = task.get_key(self._stage) 103 if task_key in mycache: 104 # The task has been encountered before. It will be sent to the helper 105 # module for further processing. 106 self._helper_queue.put(task) 107 else: 108 # Let the workers do the actual work. 109 work_pool.apply_async(self._worker, args=(task, self._work_queue, 110 self._result_queue)) 111 mycache.append(task_key) 112 113 # Shutdown the workers pool and the helper process. 114 work_pool.close() 115 work_pool.join() 116 117 self._helper_queue.put(POISONPILL) 118 helper_process.join() 119