1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Steering stage unittest.
5
6Part of the Chrome build flags optimization.
7"""
8
9__author__ = 'yuhenglong@google.com (Yuheng Long)'
10
11import multiprocessing
12import unittest
13
14from generation import Generation
15from mock_task import IdentifierMockTask
16import pipeline_process
17import steering
18
19# Pick an integer at random.
20STEERING_TEST_STAGE = -8
21
22# The number of generations to be used to do the testing.
23NUMBER_OF_GENERATIONS = 20
24
25# The number of tasks to be put in each generation to be tested.
26NUMBER_OF_TASKS = 20
27
28# The stride of permutation used to shuffle the input list of tasks. Should be
29# relatively prime with NUMBER_OF_TASKS.
30STRIDE = 7
31
32
33class MockGeneration(Generation):
34  """This class emulates an actual generation.
35
36  It will output the next_generations when the method Next is called. The
37  next_generations is initiated when the MockGeneration instance is constructed.
38  """
39
40  def __init__(self, tasks, next_generations):
41    """Set up the next generations for this task.
42
43    Args:
44      tasks: A set of tasks to be run.
45      next_generations: A list of generations as the next generation of the
46        current generation.
47    """
48    Generation.__init__(self, tasks, None)
49    self._next_generations = next_generations
50
51  def Next(self, _):
52    return self._next_generations
53
54  def IsImproved(self):
55    if self._next_generations:
56      return True
57    return False
58
59
60class SteeringTest(unittest.TestCase):
61  """This class test the steering method.
62
63  The steering algorithm should return if there is no new task in the initial
64  generation. The steering algorithm should send all the tasks to the next stage
65  and should terminate once there is no pending generation. A generation is
66  pending if it contains pending task. A task is pending if its (test) result
67  is not ready.
68  """
69
70  def testSteering(self):
71    """Test that the steering algorithm processes all the tasks properly.
72
73    Test that the steering algorithm sends all the tasks to the next stage. Test
74    that the steering algorithm terminates once all the tasks have been
75    processed, i.e., the results for the tasks are all ready.
76    """
77
78    # A list of generations used to test the steering stage.
79    generations = []
80
81    task_index = 0
82    previous_generations = None
83
84    # Generate a sequence of generations to be tested. Each generation will
85    # output the next generation in reverse order of the list when the "Next"
86    # method is called.
87    for _ in range(NUMBER_OF_GENERATIONS):
88      # Use a consecutive sequence of numbers as identifiers for the set of
89      # tasks put into a generation.
90      test_ranges = range(task_index, task_index + NUMBER_OF_TASKS)
91      tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
92      steering_tasks = set(tasks)
93
94      # Let the previous generation as the offspring generation of the current
95      # generation.
96      current_generation = MockGeneration(steering_tasks, previous_generations)
97      generations.insert(0, current_generation)
98      previous_generations = [current_generation]
99
100      task_index += NUMBER_OF_TASKS
101
102    # If there is no generation at all, the unittest returns right away.
103    if not current_generation:
104      return
105
106    # Set up the input and result queue for the steering method.
107    manager = multiprocessing.Manager()
108    input_queue = manager.Queue()
109    result_queue = manager.Queue()
110
111    steering_process = multiprocessing.Process(
112        target=steering.Steering,
113        args=(set(), [current_generation], input_queue, result_queue))
114    steering_process.start()
115
116    # Test that each generation is processed properly. I.e., the generations are
117    # processed in order.
118    while generations:
119      generation = generations.pop(0)
120      tasks = [task for task in generation.Pool()]
121
122      # Test that all the tasks are processed once and only once.
123      while tasks:
124        task = result_queue.get()
125
126        assert task in tasks
127        tasks.remove(task)
128
129        input_queue.put(task)
130
131    task = result_queue.get()
132
133    # Test that the steering algorithm returns properly after processing all
134    # the generations.
135    assert task == pipeline_process.POISONPILL
136
137    steering_process.join()
138
139  def testCache(self):
140    """The steering algorithm returns immediately if there is no new tasks.
141
142    If all the new tasks have been cached before, the steering algorithm does
143    not have to execute these tasks again and thus can terminate right away.
144    """
145
146    # Put a set of tasks in the cache and add this set to initial generation.
147    test_ranges = range(NUMBER_OF_TASKS)
148    tasks = [IdentifierMockTask(STEERING_TEST_STAGE, t) for t in test_ranges]
149    steering_tasks = set(tasks)
150
151    current_generation = MockGeneration(steering_tasks, None)
152
153    # Set up the input and result queue for the steering method.
154    manager = multiprocessing.Manager()
155    input_queue = manager.Queue()
156    result_queue = manager.Queue()
157
158    steering_process = multiprocessing.Process(
159        target=steering.Steering,
160        args=(steering_tasks, [current_generation], input_queue, result_queue))
161
162    steering_process.start()
163
164    # Test that the steering method returns right away.
165    assert result_queue.get() == pipeline_process.POISONPILL
166    steering_process.join()
167
168
169if __name__ == '__main__':
170  unittest.main()
171