116d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
216d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# Use of this source code is governed by a BSD-style license that can be
316d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# found in the LICENSE file.
4f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long"""Pipeline process that encapsulates the actual content.
5f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
649358b75c25a44760e884245440dc96e55812d04Yuheng LongPart of the Chrome build flags optimization.
749358b75c25a44760e884245440dc96e55812d04Yuheng Long
8e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng LongThe actual stages include the builder and the executor.
9f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long"""
10f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
11f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long__author__ = 'yuhenglong@google.com (Yuheng Long)'
12f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
13f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Longimport multiprocessing
14f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
15e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long# Pick an integer at random.
16e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng LongPOISONPILL = 975
17e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
18f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
19f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Longclass PipelineProcess(multiprocessing.Process):
20e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  """A process that encapsulates the actual content pipeline stage.
21f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
22e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  The actual pipeline stage can be the builder or the tester.  This process
23e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  continuously pull tasks from the queue until a poison pill is received.
24f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long  Once a job is received, it will hand it to the actual stage for processing.
25f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
26e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  Each pipeline stage contains three modules.
27e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  The first module continuously pulls task from the input queue. It searches the
28e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  cache to check whether the task has encountered before. If so, duplicate
29e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  computation can be avoided.
30e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  The second module consists of a pool of workers that do the actual work, e.g.,
31e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  the worker will compile the source code and get the image in the builder
32e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  pipeline stage.
33e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  The third module is a helper that put the result cost to the cost field of the
34e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  duplicate tasks. For example, if two tasks are equivalent, only one task, say
35e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  t1 will be executed and the other task, say t2 will not be executed. The third
36e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  mode gets the result from t1, when it is available and set the cost of t2 to
37e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  be the same as that of t1.
38e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  """
39f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
40e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long  def __init__(self, num_processes, name, cache, stage, task_queue, helper,
41e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long               worker, result_queue):
42f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    """Set up input/output queue and the actual method to be called.
43f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
44f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    Args:
45e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      num_processes: Number of helpers subprocessors this stage has.
46e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      name: The name of this stage.
47e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      cache: The computed tasks encountered before.
48e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      stage: An int value that specifies the stage for this pipeline stage, for
49e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        example, build stage or test stage. This value will be used to retrieve
50e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        the keys in different stage. I.e., the flags set is the key in build
51e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        stage and the checksum is the key in the test stage. The key is used to
52e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        detect duplicates.
53f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long      task_queue: The input task queue for this pipeline stage.
54e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      helper: The method hosted by the helper module to fill up the cost of the
55e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        duplicate tasks.
56e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      worker: The method hosted by the worker pools to do the actual work, e.g.,
57e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        compile the image.
58f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long      result_queue: The output task queue for this pipeline stage.
59f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    """
60f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
61f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    multiprocessing.Process.__init__(self)
62e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
63e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    self._name = name
64f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    self._task_queue = task_queue
65f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    self._result_queue = result_queue
66f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
67e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    self._helper = helper
68e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    self._worker = worker
69e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
70e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    self._cache = cache
71e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    self._stage = stage
72e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    self._num_processes = num_processes
73e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
74e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    # the queues used by the modules for communication
75e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    manager = multiprocessing.Manager()
76e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    self._helper_queue = manager.Queue()
77e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    self._work_queue = manager.Queue()
78e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
79f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long  def run(self):
80f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    """Busy pulling the next task from the queue for execution.
81f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
82f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    Once a job is pulled, this stage invokes the actual stage method and submits
83f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    the result to the next pipeline stage.
84f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
85f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    The process will terminate on receiving the poison pill from previous stage.
86f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    """
87f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long
88e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    # the worker pool
895fe6dc81886f49eb2089c883c6862f2e896155cfYuheng Long    work_pool = multiprocessing.Pool(self._num_processes)
90e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
91e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    # the helper process
92f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano    helper_process = multiprocessing.Process(
93f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano        target=self._helper,
94f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano        args=(self._stage, self._cache, self._helper_queue, self._work_queue,
95f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano              self._result_queue))
96e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    helper_process.start()
97e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    mycache = self._cache.keys()
98e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
99f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long    while True:
100e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      task = self._task_queue.get()
101e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      if task == POISONPILL:
102f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long        # Poison pill means shutdown
103e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        self._result_queue.put(POISONPILL)
104f20cffac082e3d920818f230ffc80ae6976267c0Yuheng Long        break
105e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
106a791546e80cede30d5325bec834b35b99b7e7bfeYuheng Long      task_key = task.GetIdentifier(self._stage)
107e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      if task_key in mycache:
108e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        # The task has been encountered before. It will be sent to the helper
109e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        # module for further processing.
110e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        self._helper_queue.put(task)
111e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long      else:
112e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        # Let the workers do the actual work.
113f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano        work_pool.apply_async(
114f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano            self._worker,
115f2a3ef46f75d2196a93d3ed27f4d1fcf22b54fbeLuis Lozano            args=(self._stage, task, self._work_queue, self._result_queue))
116e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long        mycache.append(task_key)
117e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
118e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    # Shutdown the workers pool and the helper process.
1195fe6dc81886f49eb2089c883c6862f2e896155cfYuheng Long    work_pool.close()
1205fe6dc81886f49eb2089c883c6862f2e896155cfYuheng Long    work_pool.join()
121e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long
122e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    self._helper_queue.put(POISONPILL)
123e610c1904b8fbdb4c14c67dede25aafc02167259Yuheng Long    helper_process.join()
124