pipeline_worker_test.py revision a791546e80cede30d5325bec834b35b99b7e7bfe
1"""Unittest for the pipeline_worker functions in the build/test stage. 2 3Part of the Chrome build flags optimization. 4 5This module tests the helper method and the worker method. 6""" 7 8__author__ = 'yuhenglong@google.com (Yuheng Long)' 9 10import multiprocessing 11import random 12import sys 13import unittest 14 15from mock_task import MockTask 16import pipeline_process 17import pipeline_worker 18 19 20# Pick an integer at random. 21TESTSTAGE = -3 22 23 24def MockTaskCostGenerator(): 25 """Calls a random number generator and returns a negative number.""" 26 return random.randint(-sys.maxint - 1, -1) 27 28 29class PipelineWorkerTest(unittest.TestCase): 30 """This class tests the pipeline_worker functions. 31 32 Given the same identifier, the cost should result the same from the 33 pipeline_worker functions. 34 """ 35 36 def testHelper(self): 37 """"Test the helper. 38 39 Call the helper method twice, and test the results. The results should be 40 the same, i.e., the cost should be the same. 41 """ 42 43 # Set up the input, helper and output queue for the helper method. 44 manager = multiprocessing.Manager() 45 helper_queue = manager.Queue() 46 result_queue = manager.Queue() 47 completed_queue = manager.Queue() 48 49 # Set up the helper process that holds the helper method. 50 helper_process = multiprocessing.Process(target=pipeline_worker.Helper, 51 args=(TESTSTAGE, {}, helper_queue, 52 completed_queue, 53 result_queue)) 54 helper_process.start() 55 56 # A dictionary defines the mock result to the helper. 57 mock_result = {1: 1995, 2: 59, 9: 1027} 58 59 # Test if there is a task that is done before, whether the duplicate task 60 # will have the same result. Here, two different scenarios are tested. That 61 # is the mock results are added to the completed_queue before and after the 62 # corresponding mock tasks being added to the input queue. 63 completed_queue.put((9, mock_result[9])) 64 65 # The output of the helper should contain all the following tasks. 66 results = [1, 1, 2, 9] 67 68 # Testing the correctness of having tasks having the same identifier, here 69 # 1. 70 for result in results: 71 helper_queue.put(MockTask(TESTSTAGE, result, MockTaskCostGenerator())) 72 73 completed_queue.put((2, mock_result[2])) 74 completed_queue.put((1, mock_result[1])) 75 76 # Signal there is no more duplicate task. 77 helper_queue.put(pipeline_process.POISONPILL) 78 helper_process.join() 79 80 while results: 81 task = result_queue.get() 82 identifier = task._identifier 83 cost = task._cost 84 self.assertTrue(identifier in results) 85 if identifier in mock_result: 86 self.assertTrue(cost, mock_result[identifier]) 87 results.remove(task._identifier) 88 89 def testWorker(self): 90 """"Test the worker method. 91 92 The worker should process all the input tasks and output the tasks to the 93 helper and result queue. 94 """ 95 96 manager = multiprocessing.Manager() 97 result_queue = manager.Queue() 98 completed_queue = manager.Queue() 99 100 # A dictionary defines the mock tasks and their corresponding results. 101 mock_work_tasks = {1: 86, 2: 788} 102 103 mock_tasks = [] 104 105 for flag, cost in mock_work_tasks.iteritems(): 106 mock_tasks.append(MockTask(TESTSTAGE, flag, cost)) 107 108 # Submit the mock tasks to the worker. 109 for mock_task in mock_tasks: 110 pipeline_worker.Worker(TESTSTAGE, mock_task, completed_queue, 111 result_queue) 112 113 # The tasks, from the output queue, should be the same as the input and 114 # should be performed. 115 for task in mock_tasks: 116 output = result_queue.get() 117 self.assertEqual(output, task) 118 self.assertTrue(output.Done(TESTSTAGE)) 119 120 # The tasks, from the completed_queue, should be defined in the 121 # mock_work_tasks dictionary. 122 for flag, cost in mock_work_tasks.iteritems(): 123 helper_input = completed_queue.get() 124 self.assertEqual(helper_input, (flag, cost)) 125 126 127if __name__ == '__main__': 128 unittest.main() 129