pipeline_process_test.py revision e610c1904b8fbdb4c14c67dede25aafc02167259
1"""Pipeline Process unittest.""" 2 3__author__ = 'yuhenglong@google.com (Yuheng Long)' 4 5import multiprocessing 6import unittest 7 8import pipeline_process 9 10# Pick an integer at random. 11ERROR = -334 12 13 14def MockHelper(done_dict, helper_queue, work_queue, result_queue): 15 """This method echos input to the output.""" 16 while True: 17 if not helper_queue.empty(): 18 task = helper_queue.get() 19 if task == pipeline_process.POISONPILL: 20 # Poison pill means shutdown 21 break 22 23 if task in done_dict: 24 # verify that it does not get duplicate "1"s in the test. 25 result_queue.put(ERROR) 26 else: 27 result_queue.put(('helper', task.get_key(0))) 28 29 30def MockWorker(task, buffer_queue, result_queue): 31 result_queue.put(('worker', task.get_key(0))) 32 33 34class MockTask(object): 35 def __init__(self, key): 36 self._key = key 37 38 def get_key(self, stage): 39 return self._key 40 41 42class PipelineProcessTest(unittest.TestCase): 43 """This class test the PipelineProcess. 44 45 All the task inserted into the input queue should be taken out and hand to the 46 actual pipeline handler, except for the POISON_PILL. All these task should 47 also be passed to the next pipeline stage via the output queue. 48 """ 49 50 def setUp(self): 51 pass 52 53 def testRun(self): 54 """Test the run method. 55 56 Ensure that all the tasks inserted into the queue are properly handled. 57 """ 58 59 manager = multiprocessing.Manager() 60 inp = manager.Queue() 61 output = manager.Queue() 62 63 process = pipeline_process.PipelineProcess(2, 'testing', {}, 'test', inp, 64 MockHelper, MockWorker, output) 65 66 process.start() 67 inp.put(MockTask(1)) 68 inp.put(MockTask(1)) 69 inp.put(MockTask(2)) 70 inp.put(pipeline_process.POISONPILL) 71 process.join() 72 73 # All tasks are processed once and only once. 74 result = [('worker', 1), ('helper', 1), ('worker', 2), 75 pipeline_process.POISONPILL] 76 while result: 77 task = output.get() 78 79 # One "1"s is passed to the worker and one to the helper. 80 self.assertNotEqual(task, ERROR) 81 82 # The messages received should be exactly the same as the result. 83 self.assertTrue(task in result) 84 result.remove(task) 85 86 87if __name__ == '__main__': 88 unittest.main() 89