116d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 216d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# Use of this source code is governed by a BSD-style license that can be 316d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# found in the LICENSE file. 4f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long"""Pipeline process that encapsulates the actual content. 5f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 649358b75c25a44760e884245440dc96e55812d04Yuheng LongPart of the Chrome build flags optimization. 749358b75c25a44760e884245440dc96e55812d04Yuheng Long 8e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng LongThe actual stages include the builder and the executor. 9f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long""" 10f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 11f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long__author__ = 'yuhenglong@google.com (Yuheng Long)' 12f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 13f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Longimport multiprocessing 14f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 15e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long# Pick an integer at random. 16e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng LongPOISONPILL = 975 17e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 18f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 19f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Longclass PipelineProcess(multiprocessing.Process): 20e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long """A process that encapsulates the actual content pipeline stage. 21f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 22e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long The actual pipeline stage can be the builder or the tester. This process 23e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long continuously pull tasks from the queue until a poison pill is received. 24f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long Once a job is received, it will hand it to the actual stage for processing. 25f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 26e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long Each pipeline stage contains three modules. 27e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long The first module continuously pulls task from the input queue. It searches the 28e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long cache to check whether the task has encountered before. If so, duplicate 29e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long computation can be avoided. 30e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long The second module consists of a pool of workers that do the actual work, e.g., 31e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long the worker will compile the source code and get the image in the builder 32e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long pipeline stage. 33e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long The third module is a helper that put the result cost to the cost field of the 34e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long duplicate tasks. For example, if two tasks are equivalent, only one task, say 35e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long t1 will be executed and the other task, say t2 will not be executed. The third 36e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long mode gets the result from t1, when it is available and set the cost of t2 to 37e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long be the same as that of t1. 38e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long """ 39f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 40e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long def __init__(self, num_processes, name, cache, stage, task_queue, helper, 41e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long worker, result_queue): 42f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long """Set up input/output queue and the actual method to be called. 43f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 44f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long Args: 45e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long num_processes: Number of helpers subprocessors this stage has. 46e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long name: The name of this stage. 47e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long cache: The computed tasks encountered before. 48e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long stage: An int value that specifies the stage for this pipeline stage, for 49e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long example, build stage or test stage. This value will be used to retrieve 50e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long the keys in different stage. I.e., the flags set is the key in build 51e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long stage and the checksum is the key in the test stage. The key is used to 52e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long detect duplicates. 53f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long task_queue: The input task queue for this pipeline stage. 54e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long helper: The method hosted by the helper module to fill up the cost of the 55e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long duplicate tasks. 56e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long worker: The method hosted by the worker pools to do the actual work, e.g., 57e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long compile the image. 58f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long result_queue: The output task queue for this pipeline stage. 59f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long """ 60f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 61f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long multiprocessing.Process.__init__(self) 62e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 63e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._name = name 64f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long self._task_queue = task_queue 65f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long self._result_queue = result_queue 66f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 67e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._helper = helper 68e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._worker = worker 69e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 70e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._cache = cache 71e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._stage = stage 72e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._num_processes = num_processes 73e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 74e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long # the queues used by the modules for communication 75e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long manager = multiprocessing.Manager() 76e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._helper_queue = manager.Queue() 77e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._work_queue = manager.Queue() 78e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 79f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long def run(self): 80f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long """Busy pulling the next task from the queue for execution. 81f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 82f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long Once a job is pulled, this stage invokes the actual stage method and submits 83f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long the result to the next pipeline stage. 84f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 85f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long The process will terminate on receiving the poison pill from previous stage. 86f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long """ 87f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long 88e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long # the worker pool 895fe6dc81886f49eb2089c883c6862f2e896155cfYuheng Long work_pool = multiprocessing.Pool(self._num_processes) 90e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 91e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long # the helper process 92f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano helper_process = multiprocessing.Process( 93f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano target=self._helper, 94f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano args=(self._stage, self._cache, self._helper_queue, self._work_queue, 95f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano self._result_queue)) 96e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long helper_process.start() 97e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long mycache = self._cache.keys() 98e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 99f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long while True: 100e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long task = self._task_queue.get() 101e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long if task == POISONPILL: 102f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long # Poison pill means shutdown 103e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._result_queue.put(POISONPILL) 104f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long break 105e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 106a791546e80cede30d5325bec834b35b99b7e7bfeYuheng Long task_key = task.GetIdentifier(self._stage) 107e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long if task_key in mycache: 108e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long # The task has been encountered before. It will be sent to the helper 109e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long # module for further processing. 110e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._helper_queue.put(task) 111e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long else: 112e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long # Let the workers do the actual work. 113f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano work_pool.apply_async( 114f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano self._worker, 115f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano args=(self._stage, task, self._work_queue, self._result_queue)) 116e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long mycache.append(task_key) 117e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 118e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long # Shutdown the workers pool and the helper process. 1195fe6dc81886f49eb2089c883c6862f2e896155cfYuheng Long work_pool.close() 1205fe6dc81886f49eb2089c883c6862f2e896155cfYuheng Long work_pool.join() 121e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long 122e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long self._helper_queue.put(POISONPILL) 123e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long helper_process.join() 124