pipeline_process.py revision 1dd70cd5565f1800b5a2133922d84022cf619ddd
16fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann"""Pipeline process that encapsulates the actual content. 2398ee59bebad6835dab57b60157eff16d511709eMarc R. Hoffmann 36fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. HoffmannThe actual stages include the builder and the executor. 46fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann""" 56fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 66fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann__author__ = 'yuhenglong@google.com (Yuheng Long)' 76fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 86fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmannimport multiprocessing 96fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 106fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann# Pick an integer at random. 116fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. HoffmannPOISONPILL = 975 128e8932f5e5d472a1e9cffa13a41cb178466a87f9Marc R. Hoffmann 136fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 146fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmannclass PipelineProcess(multiprocessing.Process): 156fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann """A process that encapsulates the actual content pipeline stage. 166fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 176fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann The actual pipeline stage can be the builder or the tester. This process 186fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann continuously pull tasks from the queue until a poison pill is received. 196fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann Once a job is received, it will hand it to the actual stage for processing. 206fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 216fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann Each pipeline stage contains three modules. 226fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann The first module continuously pulls task from the input queue. It searches the 236fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann cache to check whether the task has encountered before. If so, duplicate 246fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann computation can be avoided. 256fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann The second module consists of a pool of workers that do the actual work, e.g., 266fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann the worker will compile the source code and get the image in the builder 276fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann pipeline stage. 286fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann The third module is a helper that put the result cost to the cost field of the 296fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann duplicate tasks. For example, if two tasks are equivalent, only one task, say 306fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann t1 will be executed and the other task, say t2 will not be executed. The third 316fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann mode gets the result from t1, when it is available and set the cost of t2 to 326fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann be the same as that of t1. 336fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann """ 346fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 356fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann def __init__(self, num_processes, name, cache, stage, task_queue, helper, 366fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann worker, result_queue): 376fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann """Set up input/output queue and the actual method to be called. 386fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 396fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann Args: 406fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann num_processes: Number of helpers subprocessors this stage has. 416fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann name: The name of this stage. 426fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann cache: The computed tasks encountered before. 436fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann stage: An int value that specifies the stage for this pipeline stage, for 446fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann example, build stage or test stage. This value will be used to retrieve 456fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann the keys in different stage. I.e., the flags set is the key in build 466fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann stage and the checksum is the key in the test stage. The key is used to 476fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann detect duplicates. 486fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann task_queue: The input task queue for this pipeline stage. 496fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann helper: The method hosted by the helper module to fill up the cost of the 506fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann duplicate tasks. 516fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann worker: The method hosted by the worker pools to do the actual work, e.g., 526fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann compile the image. 536fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann result_queue: The output task queue for this pipeline stage. 546fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann """ 556fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 566fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann multiprocessing.Process.__init__(self) 576fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 586fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._name = name 596fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._task_queue = task_queue 606fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._result_queue = result_queue 616fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 626fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._helper = helper 636fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._worker = worker 646fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 656fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._cache = cache 666fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._stage = stage 676fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._num_processes = num_processes 686fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 696fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann # the queues used by the modules for communication 706fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann manager = multiprocessing.Manager() 716fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._helper_queue = manager.Queue() 726fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._work_queue = manager.Queue() 736fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 746fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann def run(self): 756fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann """Busy pulling the next task from the queue for execution. 766fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 776fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann Once a job is pulled, this stage invokes the actual stage method and submits 786fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann the result to the next pipeline stage. 796fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 806fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann The process will terminate on receiving the poison pill from previous stage. 816fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann """ 826fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 836fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann # the worker pool 846fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann work_pool = multiprocessing.Pool(self._num_processes) 856fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 866fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann # the helper process 876fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann helper_process = multiprocessing.Process(target=self._helper, 886fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann args=(self._cache, 896fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._helper_queue, 906fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._work_queue, 916fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._result_queue)) 926fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann helper_process.start() 936fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann mycache = self._cache.keys() 946fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 956fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann while True: 966fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann task = self._task_queue.get() 976fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann if task == POISONPILL: 986fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann # Poison pill means shutdown 996fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann self._result_queue.put(POISONPILL) 1006fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann break 1016fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann 1026fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann task_key = task.get_key(self._stage) 103 if task_key in mycache: 104 # The task has been encountered before. It will be sent to the helper 105 # module for further processing. 106 self._helper_queue.put(task) 107 else: 108 # Let the workers do the actual work. 109 work_pool.apply_async(self._worker, args=(task, self._work_queue, 110 self._result_queue)) 111 mycache.append(task_key) 112 113 # Shutdown the workers pool and the helper process. 114 work_pool.close() 115 work_pool.join() 116 117 self._helper_queue.put(POISONPILL) 118 helper_process.join() 119