116d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
216d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# Use of this source code is governed by a BSD-style license that can be
316d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# found in the LICENSE file.
4761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long"""The pipeline_worker functions of the build and test stage of the framework.
5761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
649358b75c25a44760e884245440dc96e55812d04Yuheng LongPart of the Chrome build flags optimization.
749358b75c25a44760e884245440dc96e55812d04Yuheng Long
8761748dc64a0d2a6a90714097445fcf782a8934eYuheng LongThis module defines the helper and the worker. If there are duplicate tasks, for
9761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longexample t1 and t2, needs to be built/tested, one of them, for example t1, will
10761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longbe built/tested and the helper waits for the result of t1 and set the results of
11761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longthe other task, t2 here, to be the same as that of t1. Setting the result of t2
12761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longto be the same as t1 is referred to as resolving the result of t2.
13761748dc64a0d2a6a90714097445fcf782a8934eYuheng LongThe worker invokes the work method of the tasks that are not duplicate.
14761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long"""
15761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
16761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long__author__ = 'yuhenglong@google.com (Yuheng Long)'
17761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
18761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longimport pipeline_process
19761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
20761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
2126ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Longdef Helper(stage, done_dict, helper_queue, completed_queue, result_queue):
22761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  """Helper that filters duplicate tasks.
23761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
24761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  This method Continuously pulls duplicate tasks from the helper_queue. The
25761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  duplicate tasks need not be compiled/tested. This method also pulls completed
26761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  tasks from the worker queue and let the results of the duplicate tasks be the
27761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  same as their corresponding finished task.
28761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
29761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  Args:
30761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    stage: The current stage of the pipeline, for example, build stage or test
31761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      stage.
32761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    done_dict: A dictionary of tasks that are done. The key of the dictionary is
33761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      the identifier of the task. The value of the dictionary is the results of
34761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      performing the corresponding task.
35761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    helper_queue: A queue of duplicate tasks whose results need to be resolved.
36761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      This is a communication channel between the pipeline_process and this
37761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      helper process.
38761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    completed_queue: A queue of tasks that have been built/tested. The results
39761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      of these tasks are needed to resolve the results of the duplicate tasks.
40761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      This is the communication channel between the workers and this helper
41761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      process.
42761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    result_queue: After the results of the duplicate tasks have been resolved,
43761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      the duplicate tasks will be sent to the next stage via this queue.
44761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  """
45761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
46761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  # The list of duplicate tasks, the results of which need to be resolved.
47761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  waiting_list = []
48761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
49761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  while True:
50761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    # Pull duplicate task from the helper queue.
51761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    if not helper_queue.empty():
52761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      task = helper_queue.get()
53761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
54761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      if task == pipeline_process.POISONPILL:
55761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long        # Poison pill means no more duplicate task from the helper queue.
56761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long        break
57761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
58761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      # The task has not been performed before.
5926ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long      assert not task.Done(stage)
60761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
61761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      # The identifier of this task.
6226ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long      identifier = task.GetIdentifier(stage)
63761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
64761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      # If a duplicate task comes before the corresponding resolved results from
65761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      # the completed_queue, it will be put in the waiting list. If the result
66761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      # arrives before the duplicate task, the duplicate task will be resolved
67761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      # right away.
68761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      if identifier in done_dict:
69761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long        # This task has been encountered before and the result is available. The
70761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long        # result can be resolved right away.
7126ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long        task.SetResult(stage, done_dict[identifier])
72761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long        result_queue.put(task)
73761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      else:
74761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long        waiting_list.append(task)
75761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
76761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    # Check and get completed tasks from completed_queue.
7726ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long    GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
7826ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long                                result_queue)
79761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
80761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  # Wait to resolve the results of the remaining duplicate tasks.
81761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  while waiting_list:
8226ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long    GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
8326ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long                                result_queue)
84761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
85761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
8626ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Longdef GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list,
8726ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long                                result_queue):
88761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  """Pull results from the completed queue and resolves duplicate tasks.
89761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
90761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  Args:
91761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    stage: The current stage of the pipeline, for example, build stage or test
92761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      stage.
93761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    completed_queue: A queue of tasks that have been performed. The results of
94761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      these tasks are needed to resolve the results of the duplicate tasks. This
95761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      is the communication channel between the workers and this method.
96761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    done_dict: A dictionary of tasks that are done. The key of the dictionary is
97761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      the optimization flags of the task. The value of the dictionary is the
98761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      compilation results of the corresponding task.
99761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    waiting_list: The list of duplicate tasks, the results of which need to be
100761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      resolved.
101761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    result_queue: After the results of the duplicate tasks have been resolved,
102761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      the duplicate tasks will be sent to the next stage via this queue.
103761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
104761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  This helper method tries to pull a completed task from the completed queue.
105761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  If it gets a task from the queue, it resolves the results of all the relevant
106761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  duplicate tasks in the waiting list. Relevant tasks are the tasks that have
107761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  the same flags as the currently received results from the completed_queue.
108761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  """
109761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  # Pull completed task from the worker queue.
110761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  if not completed_queue.empty():
111761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    (identifier, result) = completed_queue.get()
112761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    done_dict[identifier] = result
113761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
11426ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long    tasks = [t for t in waiting_list if t.GetIdentifier(stage) == identifier]
115761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    for duplicate_task in tasks:
11626ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long      duplicate_task.SetResult(stage, result)
117761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      result_queue.put(duplicate_task)
118761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      waiting_list.remove(duplicate_task)
119761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
120761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
12126ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Longdef Worker(stage, task, helper_queue, result_queue):
122761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  """Worker that performs the task.
123761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
124761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  This method calls the work method of the input task and distribute the result
125761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  to the helper and the next stage.
126761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
127761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  Args:
128761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    stage: The current stage of the pipeline, for example, build stage or test
129761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      stage.
130761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    task: Input task that needs to be performed.
131761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    helper_queue: Queue that holds the completed tasks and the results. This is
132761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      the communication channel between the worker and the helper.
133761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long    result_queue: Queue that holds the completed tasks and the results. This is
134761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long      the communication channel between the worker and the next stage.
135761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  """
136761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
137761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  # The task has not been completed before.
13826ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long  assert not task.Done(stage)
139761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long
14026ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long  task.Work(stage)
14126ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long  helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage)))
142761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long  result_queue.put(task)
143