116d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 216d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# Use of this source code is governed by a BSD-style license that can be 316d7a5204e347855b1c3a68c982c22f931a12866Yuheng Long# found in the LICENSE file. 4761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long"""The pipeline_worker functions of the build and test stage of the framework. 5761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 649358b75c25a44760e884245440dc96e55812d04Yuheng LongPart of the Chrome build flags optimization. 749358b75c25a44760e884245440dc96e55812d04Yuheng Long 8761748dc64a0d2a6a90714097445fcf782a8934eYuheng LongThis module defines the helper and the worker. If there are duplicate tasks, for 9761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longexample t1 and t2, needs to be built/tested, one of them, for example t1, will 10761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longbe built/tested and the helper waits for the result of t1 and set the results of 11761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longthe other task, t2 here, to be the same as that of t1. Setting the result of t2 12761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longto be the same as t1 is referred to as resolving the result of t2. 13761748dc64a0d2a6a90714097445fcf782a8934eYuheng LongThe worker invokes the work method of the tasks that are not duplicate. 14761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long""" 15761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 16761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long__author__ = 'yuhenglong@google.com (Yuheng Long)' 17761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 18761748dc64a0d2a6a90714097445fcf782a8934eYuheng Longimport pipeline_process 19761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 20761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 2126ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Longdef Helper(stage, done_dict, helper_queue, completed_queue, result_queue): 22761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long """Helper that filters duplicate tasks. 23761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 24761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long This method Continuously pulls duplicate tasks from the helper_queue. The 25761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long duplicate tasks need not be compiled/tested. This method also pulls completed 26761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long tasks from the worker queue and let the results of the duplicate tasks be the 27761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long same as their corresponding finished task. 28761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 29761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long Args: 30761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long stage: The current stage of the pipeline, for example, build stage or test 31761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long stage. 32761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long done_dict: A dictionary of tasks that are done. The key of the dictionary is 33761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long the identifier of the task. The value of the dictionary is the results of 34761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long performing the corresponding task. 35761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long helper_queue: A queue of duplicate tasks whose results need to be resolved. 36761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long This is a communication channel between the pipeline_process and this 37761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long helper process. 38761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long completed_queue: A queue of tasks that have been built/tested. The results 39761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long of these tasks are needed to resolve the results of the duplicate tasks. 40761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long This is the communication channel between the workers and this helper 41761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long process. 42761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long result_queue: After the results of the duplicate tasks have been resolved, 43761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long the duplicate tasks will be sent to the next stage via this queue. 44761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long """ 45761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 46761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # The list of duplicate tasks, the results of which need to be resolved. 47761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long waiting_list = [] 48761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 49761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long while True: 50761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # Pull duplicate task from the helper queue. 51761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long if not helper_queue.empty(): 52761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long task = helper_queue.get() 53761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 54761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long if task == pipeline_process.POISONPILL: 55761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # Poison pill means no more duplicate task from the helper queue. 56761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long break 57761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 58761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # The task has not been performed before. 5926ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long assert not task.Done(stage) 60761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 61761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # The identifier of this task. 6226ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long identifier = task.GetIdentifier(stage) 63761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 64761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # If a duplicate task comes before the corresponding resolved results from 65761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # the completed_queue, it will be put in the waiting list. If the result 66761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # arrives before the duplicate task, the duplicate task will be resolved 67761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # right away. 68761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long if identifier in done_dict: 69761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # This task has been encountered before and the result is available. The 70761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # result can be resolved right away. 7126ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long task.SetResult(stage, done_dict[identifier]) 72761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long result_queue.put(task) 73761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long else: 74761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long waiting_list.append(task) 75761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 76761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # Check and get completed tasks from completed_queue. 7726ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list, 7826ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long result_queue) 79761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 80761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # Wait to resolve the results of the remaining duplicate tasks. 81761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long while waiting_list: 8226ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list, 8326ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long result_queue) 84761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 85761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 8626ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Longdef GetResultFromCompletedQueue(stage, completed_queue, done_dict, waiting_list, 8726ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long result_queue): 88761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long """Pull results from the completed queue and resolves duplicate tasks. 89761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 90761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long Args: 91761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long stage: The current stage of the pipeline, for example, build stage or test 92761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long stage. 93761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long completed_queue: A queue of tasks that have been performed. The results of 94761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long these tasks are needed to resolve the results of the duplicate tasks. This 95761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long is the communication channel between the workers and this method. 96761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long done_dict: A dictionary of tasks that are done. The key of the dictionary is 97761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long the optimization flags of the task. The value of the dictionary is the 98761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long compilation results of the corresponding task. 99761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long waiting_list: The list of duplicate tasks, the results of which need to be 100761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long resolved. 101761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long result_queue: After the results of the duplicate tasks have been resolved, 102761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long the duplicate tasks will be sent to the next stage via this queue. 103761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 104761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long This helper method tries to pull a completed task from the completed queue. 105761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long If it gets a task from the queue, it resolves the results of all the relevant 106761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long duplicate tasks in the waiting list. Relevant tasks are the tasks that have 107761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long the same flags as the currently received results from the completed_queue. 108761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long """ 109761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # Pull completed task from the worker queue. 110761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long if not completed_queue.empty(): 111761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long (identifier, result) = completed_queue.get() 112761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long done_dict[identifier] = result 113761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 11426ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long tasks = [t for t in waiting_list if t.GetIdentifier(stage) == identifier] 115761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long for duplicate_task in tasks: 11626ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long duplicate_task.SetResult(stage, result) 117761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long result_queue.put(duplicate_task) 118761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long waiting_list.remove(duplicate_task) 119761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 120761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 12126ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Longdef Worker(stage, task, helper_queue, result_queue): 122761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long """Worker that performs the task. 123761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 124761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long This method calls the work method of the input task and distribute the result 125761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long to the helper and the next stage. 126761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 127761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long Args: 128761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long stage: The current stage of the pipeline, for example, build stage or test 129761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long stage. 130761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long task: Input task that needs to be performed. 131761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long helper_queue: Queue that holds the completed tasks and the results. This is 132761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long the communication channel between the worker and the helper. 133761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long result_queue: Queue that holds the completed tasks and the results. This is 134761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long the communication channel between the worker and the next stage. 135761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long """ 136761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 137761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long # The task has not been completed before. 13826ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long assert not task.Done(stage) 139761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long 14026ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long task.Work(stage) 14126ec76c8a9d4f5dd023513d2772fa6cd4b6749eaYuheng Long helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage))) 142761748dc64a0d2a6a90714097445fcf782a8934eYuheng Long result_queue.put(task) 143