pipeline_worker_test.py revision f1f606e1c5b559aa264518a89595afd787f12e8a
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Unittest for the pipeline_worker functions in the build/test stage.
6
7Part of the Chrome build flags optimization.
8
9This module tests the helper method and the worker method.
10"""
11
12__author__ = 'yuhenglong@google.com (Yuheng Long)'
13
14import multiprocessing
15import random
16import sys
17import unittest
18
19from mock_task import MockTask
20import pipeline_process
21import pipeline_worker
22
23
24# Pick an integer at random.
25TEST_STAGE = -3
26
27
28def MockTaskCostGenerator():
29  """Calls a random number generator and returns a negative number."""
30  return random.randint(-sys.maxint - 1, -1)
31
32
33class PipelineWorkerTest(unittest.TestCase):
34  """This class tests the pipeline_worker functions.
35
36  Given the same identifier, the cost should result the same from the
37  pipeline_worker functions.
38  """
39
40  def testHelper(self):
41    """"Test the helper.
42
43    Call the helper method twice, and test the results. The results should be
44    the same, i.e., the cost should be the same.
45    """
46
47    # Set up the input, helper and output queue for the helper method.
48    manager = multiprocessing.Manager()
49    helper_queue = manager.Queue()
50    result_queue = manager.Queue()
51    completed_queue = manager.Queue()
52
53    # Set up the helper process that holds the helper method.
54    helper_process = multiprocessing.Process(target=pipeline_worker.Helper,
55                                             args=(TEST_STAGE, {}, helper_queue,
56                                                   completed_queue,
57                                                   result_queue))
58    helper_process.start()
59
60    # A dictionary defines the mock result to the helper.
61    mock_result = {1: 1995, 2: 59, 9: 1027}
62
63    # Test if there is a task that is done before, whether the duplicate task
64    # will have the same result. Here, two different scenarios are tested. That
65    # is the mock results are added to the completed_queue before and after the
66    # corresponding mock tasks being added to the input queue.
67    completed_queue.put((9, mock_result[9]))
68
69    # The output of the helper should contain all the following tasks.
70    results = [1, 1, 2, 9]
71
72    # Testing the correctness of having tasks having the same identifier, here
73    # 1.
74    for result in results:
75      helper_queue.put(MockTask(TEST_STAGE, result, MockTaskCostGenerator()))
76
77    completed_queue.put((2, mock_result[2]))
78    completed_queue.put((1, mock_result[1]))
79
80    # Signal there is no more duplicate task.
81    helper_queue.put(pipeline_process.POISONPILL)
82    helper_process.join()
83
84    while results:
85      task = result_queue.get()
86      identifier = task.GetIdentifier(TEST_STAGE)
87      self.assertTrue(identifier in results)
88      if identifier in mock_result:
89        self.assertTrue(task.GetResult(TEST_STAGE), mock_result[identifier])
90      results.remove(identifier)
91
92  def testWorker(self):
93    """"Test the worker method.
94
95    The worker should process all the input tasks and output the tasks to the
96    helper and result queue.
97    """
98
99    manager = multiprocessing.Manager()
100    result_queue = manager.Queue()
101    completed_queue = manager.Queue()
102
103    # A dictionary defines the mock tasks and their corresponding results.
104    mock_work_tasks = {1: 86, 2: 788}
105
106    mock_tasks = []
107
108    for flag, cost in mock_work_tasks.iteritems():
109      mock_tasks.append(MockTask(TEST_STAGE, flag, cost))
110
111    # Submit the mock tasks to the worker.
112    for mock_task in mock_tasks:
113      pipeline_worker.Worker(TEST_STAGE, mock_task, completed_queue,
114                             result_queue)
115
116    # The tasks, from the output queue, should be the same as the input and
117    # should be performed.
118    for task in mock_tasks:
119      output = result_queue.get()
120      self.assertEqual(output, task)
121      self.assertTrue(output.Done(TEST_STAGE))
122
123    # The tasks, from the completed_queue, should be defined in the
124    # mock_work_tasks dictionary.
125    for flag, cost in mock_work_tasks.iteritems():
126      helper_input = completed_queue.get()
127      self.assertEqual(helper_input, (flag, cost))
128
129
130if __name__ == '__main__':
131  unittest.main()
132