pipeline_process.py revision 1dd70cd5565f1800b5a2133922d84022cf619ddd
16fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann"""Pipeline process that encapsulates the actual content.
2398ee59bebad6835dab57b60157eff16d511709eMarc R. Hoffmann
36fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. HoffmannThe actual stages include the builder and the executor.
46fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann"""
56fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
66fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann__author__ = 'yuhenglong@google.com (Yuheng Long)'
76fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
86fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmannimport multiprocessing
96fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
106fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann# Pick an integer at random.
116fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. HoffmannPOISONPILL = 975
128e8932f5e5d472a1e9cffa13a41cb178466a87f9Marc R. Hoffmann
136fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
146fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmannclass PipelineProcess(multiprocessing.Process):
156fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  """A process that encapsulates the actual content pipeline stage.
166fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
176fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  The actual pipeline stage can be the builder or the tester.  This process
186fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  continuously pull tasks from the queue until a poison pill is received.
196fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  Once a job is received, it will hand it to the actual stage for processing.
206fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
216fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  Each pipeline stage contains three modules.
226fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  The first module continuously pulls task from the input queue. It searches the
236fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  cache to check whether the task has encountered before. If so, duplicate
246fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  computation can be avoided.
256fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  The second module consists of a pool of workers that do the actual work, e.g.,
266fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  the worker will compile the source code and get the image in the builder
276fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  pipeline stage.
286fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  The third module is a helper that put the result cost to the cost field of the
296fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  duplicate tasks. For example, if two tasks are equivalent, only one task, say
306fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  t1 will be executed and the other task, say t2 will not be executed. The third
316fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  mode gets the result from t1, when it is available and set the cost of t2 to
326fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  be the same as that of t1.
336fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  """
346fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
356fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  def __init__(self, num_processes, name, cache, stage, task_queue, helper,
366fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann               worker, result_queue):
376fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    """Set up input/output queue and the actual method to be called.
386fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
396fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    Args:
406fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      num_processes: Number of helpers subprocessors this stage has.
416fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      name: The name of this stage.
426fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      cache: The computed tasks encountered before.
436fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      stage: An int value that specifies the stage for this pipeline stage, for
446fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann        example, build stage or test stage. This value will be used to retrieve
456fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann        the keys in different stage. I.e., the flags set is the key in build
466fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann        stage and the checksum is the key in the test stage. The key is used to
476fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann        detect duplicates.
486fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      task_queue: The input task queue for this pipeline stage.
496fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      helper: The method hosted by the helper module to fill up the cost of the
506fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann        duplicate tasks.
516fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      worker: The method hosted by the worker pools to do the actual work, e.g.,
526fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann        compile the image.
536fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      result_queue: The output task queue for this pipeline stage.
546fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    """
556fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
566fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    multiprocessing.Process.__init__(self)
576fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
586fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._name = name
596fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._task_queue = task_queue
606fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._result_queue = result_queue
616fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
626fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._helper = helper
636fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._worker = worker
646fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
656fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._cache = cache
666fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._stage = stage
676fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._num_processes = num_processes
686fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
696fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    # the queues used by the modules for communication
706fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    manager = multiprocessing.Manager()
716fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._helper_queue = manager.Queue()
726fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    self._work_queue = manager.Queue()
736fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
746fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann  def run(self):
756fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    """Busy pulling the next task from the queue for execution.
766fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
776fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    Once a job is pulled, this stage invokes the actual stage method and submits
786fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    the result to the next pipeline stage.
796fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
806fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    The process will terminate on receiving the poison pill from previous stage.
816fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    """
826fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
836fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    # the worker pool
846fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    work_pool = multiprocessing.Pool(self._num_processes)
856fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
866fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    # the helper process
876fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    helper_process = multiprocessing.Process(target=self._helper,
886fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann                                             args=(self._cache,
896fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann                                                   self._helper_queue,
906fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann                                                   self._work_queue,
916fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann                                                   self._result_queue))
926fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    helper_process.start()
936fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    mycache = self._cache.keys()
946fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
956fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann    while True:
966fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      task = self._task_queue.get()
976fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      if task == POISONPILL:
986fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann        # Poison pill means shutdown
996fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann        self._result_queue.put(POISONPILL)
1006fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann        break
1016fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann
1026fd10ff6a1935d009a544f1e913e2bed406110f9Marc R. Hoffmann      task_key = task.get_key(self._stage)
103      if task_key in mycache:
104        # The task has been encountered before. It will be sent to the helper
105        # module for further processing.
106        self._helper_queue.put(task)
107      else:
108        # Let the workers do the actual work.
109        work_pool.apply_async(self._worker, args=(task, self._work_queue,
110                                                  self._result_queue))
111        mycache.append(task_key)
112
113    # Shutdown the workers pool and the helper process.
114    work_pool.close()
115    work_pool.join()
116
117    self._helper_queue.put(POISONPILL)
118    helper_process.join()
119