pipeline_process.py revision 16d7a5204e347855b1c3a68c982c22f931a12866
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Pipeline process that encapsulates the actual content.
6
7Part of the Chrome build flags optimization.
8
9The actual stages include the builder and the executor.
10"""
11
12__author__ = 'yuhenglong@google.com (Yuheng Long)'
13
14import multiprocessing
15
16# Pick an integer at random.
17POISONPILL = 975
18
19
20class PipelineProcess(multiprocessing.Process):
21  """A process that encapsulates the actual content pipeline stage.
22
23  The actual pipeline stage can be the builder or the tester.  This process
24  continuously pull tasks from the queue until a poison pill is received.
25  Once a job is received, it will hand it to the actual stage for processing.
26
27  Each pipeline stage contains three modules.
28  The first module continuously pulls task from the input queue. It searches the
29  cache to check whether the task has encountered before. If so, duplicate
30  computation can be avoided.
31  The second module consists of a pool of workers that do the actual work, e.g.,
32  the worker will compile the source code and get the image in the builder
33  pipeline stage.
34  The third module is a helper that put the result cost to the cost field of the
35  duplicate tasks. For example, if two tasks are equivalent, only one task, say
36  t1 will be executed and the other task, say t2 will not be executed. The third
37  mode gets the result from t1, when it is available and set the cost of t2 to
38  be the same as that of t1.
39  """
40
41  def __init__(self, num_processes, name, cache, stage, task_queue, helper,
42               worker, result_queue):
43    """Set up input/output queue and the actual method to be called.
44
45    Args:
46      num_processes: Number of helpers subprocessors this stage has.
47      name: The name of this stage.
48      cache: The computed tasks encountered before.
49      stage: An int value that specifies the stage for this pipeline stage, for
50        example, build stage or test stage. This value will be used to retrieve
51        the keys in different stage. I.e., the flags set is the key in build
52        stage and the checksum is the key in the test stage. The key is used to
53        detect duplicates.
54      task_queue: The input task queue for this pipeline stage.
55      helper: The method hosted by the helper module to fill up the cost of the
56        duplicate tasks.
57      worker: The method hosted by the worker pools to do the actual work, e.g.,
58        compile the image.
59      result_queue: The output task queue for this pipeline stage.
60    """
61
62    multiprocessing.Process.__init__(self)
63
64    self._name = name
65    self._task_queue = task_queue
66    self._result_queue = result_queue
67
68    self._helper = helper
69    self._worker = worker
70
71    self._cache = cache
72    self._stage = stage
73    self._num_processes = num_processes
74
75    # the queues used by the modules for communication
76    manager = multiprocessing.Manager()
77    self._helper_queue = manager.Queue()
78    self._work_queue = manager.Queue()
79
80  def run(self):
81    """Busy pulling the next task from the queue for execution.
82
83    Once a job is pulled, this stage invokes the actual stage method and submits
84    the result to the next pipeline stage.
85
86    The process will terminate on receiving the poison pill from previous stage.
87    """
88
89    # the worker pool
90    work_pool = multiprocessing.Pool(self._num_processes)
91
92    # the helper process
93    helper_process = multiprocessing.Process(target=self._helper,
94                                             args=(self._cache,
95                                                   self._helper_queue,
96                                                   self._work_queue,
97                                                   self._result_queue))
98    helper_process.start()
99    mycache = self._cache.keys()
100
101    while True:
102      task = self._task_queue.get()
103      if task == POISONPILL:
104        # Poison pill means shutdown
105        self._result_queue.put(POISONPILL)
106        break
107
108      task_key = task.GetIdentifier(self._stage)
109      if task_key in mycache:
110        # The task has been encountered before. It will be sent to the helper
111        # module for further processing.
112        self._helper_queue.put(task)
113      else:
114        # Let the workers do the actual work.
115        work_pool.apply_async(self._worker, args=(task, self._work_queue,
116                                                  self._result_queue))
117        mycache.append(task_key)
118
119    # Shutdown the workers pool and the helper process.
120    work_pool.close()
121    work_pool.join()
122
123    self._helper_queue.put(POISONPILL)
124    helper_process.join()
125