10cf9ed3a719c0782695154d5a0bca260001cec15A. Unique TensorFlower# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 29c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# 39c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# Licensed under the Apache License, Version 2.0 (the "License"); 49c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# you may not use this file except in compliance with the License. 59c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# You may obtain a copy of the License at 69c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# 79c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# http://www.apache.org/licenses/LICENSE-2.0 89c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# 99c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# Unless required by applicable law or agreed to in writing, software 109c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# distributed under the License is distributed on an "AS IS" BASIS, 119c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 129c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# See the License for the specific language governing permissions and 139c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# limitations under the License. 149c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# ============================================================================== 15f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur"""Tests for QueueRunner.""" 1658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney 17f2102f4e2c1c87f1d1bf9ab856a2849c54478760Vijay Vasudevanfrom __future__ import absolute_import 18f2102f4e2c1c87f1d1bf9ab856a2849c54478760Vijay Vasudevanfrom __future__ import division 19f2102f4e2c1c87f1d1bf9ab856a2849c54478760Vijay Vasudevanfrom __future__ import print_function 20f2102f4e2c1c87f1d1bf9ab856a2849c54478760Vijay Vasudevan 21112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlowerimport collections 22f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlurimport time 23f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 2458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.client import session 2558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.framework import constant_op 2658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.framework import dtypes 2758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.framework import errors_impl 2858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.framework import ops 2958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.ops import control_flow_ops 3058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.ops import data_flow_ops 3158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.ops import variables 3258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.platform import test 3358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.training import coordinator 347ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispirfrom tensorflow.python.training import monitored_session 3558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.training import queue_runner_impl 36f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 37f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 38112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower_MockOp = collections.namedtuple("MockOp", ["name"]) 39112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower 40112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower 4158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyclass QueueRunnerTest(test.TestCase): 42f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 43f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur def testBasic(self): 44f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.test_session() as sess: 45f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # CountUpTo will raise OUT_OF_RANGE when it reaches the count. 4658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney zero64 = constant_op.constant(0, dtype=dtypes.int64) 4758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney var = variables.Variable(zero64) 48f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur count_up_to = var.count_up_to(3) 4958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 5058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney variables.global_variables_initializer().run() 5158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(queue, [count_up_to]) 52f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads = qr.create_threads(sess) 53112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower self.assertEqual(sorted(t.name for t in threads), 54112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower ["QueueRunnerThread-fifo_queue-CountUpTo:0"]) 55f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 56f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.start() 57f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 58f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.join() 59f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(0, len(qr.exceptions_raised)) 60f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # The variable should be 3. 61f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(3, var.eval()) 62f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 63f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur def testTwoOps(self): 64f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.test_session() as sess: 65f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # CountUpTo will raise OUT_OF_RANGE when it reaches the count. 6658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney zero64 = constant_op.constant(0, dtype=dtypes.int64) 6758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney var0 = variables.Variable(zero64) 68f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur count_up_to_3 = var0.count_up_to(3) 6958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney var1 = variables.Variable(zero64) 70f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur count_up_to_30 = var1.count_up_to(30) 7158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 7258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(queue, [count_up_to_3, count_up_to_30]) 73f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads = qr.create_threads(sess) 74112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower self.assertEqual(sorted(t.name for t in threads), 75112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower ["QueueRunnerThread-fifo_queue-CountUpTo:0", 76112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower "QueueRunnerThread-fifo_queue-CountUpTo_1:0"]) 7758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney variables.global_variables_initializer().run() 78f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 79f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.start() 80f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 81f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.join() 82f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(0, len(qr.exceptions_raised)) 83f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(3, var0.eval()) 84f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(30, var1.eval()) 85f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 86f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur def testExceptionsCaptured(self): 87f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.test_session() as sess: 8858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 89112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower qr = queue_runner_impl.QueueRunner(queue, [_MockOp("i fail"), 90112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower _MockOp("so fail")]) 91f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads = qr.create_threads(sess) 9258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney variables.global_variables_initializer().run() 93f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 94f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.start() 95f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 96f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.join() 97f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur exceptions = qr.exceptions_raised 98f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(2, len(exceptions)) 99f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertTrue("Operation not in the graph" in str(exceptions[0])) 100f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertTrue("Operation not in the graph" in str(exceptions[1])) 101f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 102f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur def testRealDequeueEnqueue(self): 103f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.test_session() as sess: 10458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney q0 = data_flow_ops.FIFOQueue(3, dtypes.float32) 105f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur enqueue0 = q0.enqueue((10.0,)) 106f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur close0 = q0.close() 10758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney q1 = data_flow_ops.FIFOQueue(30, dtypes.float32) 108f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur enqueue1 = q1.enqueue((q0.dequeue(),)) 109f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur dequeue1 = q1.dequeue() 11058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(q1, [enqueue1]) 111f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads = qr.create_threads(sess) 112f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 113f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.start() 114f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # Enqueue 2 values, then close queue0. 115f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur enqueue0.run() 116f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur enqueue0.run() 117f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur close0.run() 118f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # Wait for the queue runner to terminate. 119f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 120f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.join() 121f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # It should have terminated cleanly. 122f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(0, len(qr.exceptions_raised)) 123f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # The 2 values should be in queue1. 124f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(10.0, dequeue1.eval()) 125f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(10.0, dequeue1.eval()) 126f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # And queue1 should now be closed. 12758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney with self.assertRaisesRegexp(errors_impl.OutOfRangeError, "is closed"): 128f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur dequeue1.eval() 129f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 130f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur def testRespectCoordShouldStop(self): 131f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.test_session() as sess: 132f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # CountUpTo will raise OUT_OF_RANGE when it reaches the count. 13358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney zero64 = constant_op.constant(0, dtype=dtypes.int64) 13458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney var = variables.Variable(zero64) 135f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur count_up_to = var.count_up_to(3) 13658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 13758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney variables.global_variables_initializer().run() 13858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(queue, [count_up_to]) 139f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # As the coordinator to stop. The queue runner should 140f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # finish immediately. 14158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney coord = coordinator.Coordinator() 142f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur coord.request_stop() 143f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads = qr.create_threads(sess, coord) 144112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower self.assertEqual(sorted(t.name for t in threads), 145112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower ["QueueRunnerThread-fifo_queue-CountUpTo:0", 146112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower "QueueRunnerThread-fifo_queue-close_on_stop"]) 147f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 148f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.start() 149a0812ee71da3a86fa1bd0c6d7102a32de0b9730eA. Unique TensorFlower coord.join() 150f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(0, len(qr.exceptions_raised)) 151f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # The variable should be 0. 152f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(0, var.eval()) 153f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 154f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur def testRequestStopOnException(self): 155f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.test_session() as sess: 15658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 157112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower qr = queue_runner_impl.QueueRunner(queue, [_MockOp("not an op")]) 15858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney coord = coordinator.Coordinator() 159f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads = qr.create_threads(sess, coord) 160f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 161f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.start() 162f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # The exception should be re-raised when joining. 163f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.assertRaisesRegexp(ValueError, "Operation not in the graph"): 164a0812ee71da3a86fa1bd0c6d7102a32de0b9730eA. Unique TensorFlower coord.join() 165f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 166f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur def testGracePeriod(self): 167f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.test_session() as sess: 168f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # The enqueue will quickly block. 16958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(2, dtypes.float32) 170f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur enqueue = queue.enqueue((10.0,)) 171f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur dequeue = queue.dequeue() 17258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(queue, [enqueue]) 17358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney coord = coordinator.Coordinator() 17454760a10b6a0a0dedf196337eebf51c543b53181Tim Harley qr.create_threads(sess, coord, start=True) 175f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # Dequeue one element and then request stop. 176f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur dequeue.op.run() 177f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur time.sleep(0.02) 178f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur coord.request_stop() 179f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # We should be able to join because the RequestStop() will cause 180f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # the queue to be closed and the enqueue to terminate. 181639b4e71f532761a4840b1cdbaea55ad0917c75bBenoit Steiner coord.join(stop_grace_period_secs=1.0) 182f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 1835ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower def testMultipleSessions(self): 1845ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower with self.test_session() as sess: 18558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney with session.Session() as other_sess: 18658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney zero64 = constant_op.constant(0, dtype=dtypes.int64) 18758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney var = variables.Variable(zero64) 1885ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower count_up_to = var.count_up_to(3) 18958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 19058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney variables.global_variables_initializer().run() 19158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney coord = coordinator.Coordinator() 19258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(queue, [count_up_to]) 1935ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower # NOTE that this test does not actually start the threads. 1945ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower threads = qr.create_threads(sess, coord=coord) 1955ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower other_threads = qr.create_threads(other_sess, coord=coord) 1965ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower self.assertEqual(len(threads), len(other_threads)) 1975ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower 198ea6b59a5c4aa0147cc7e333da95234464b21cf6fA. Unique TensorFlower def testIgnoreMultiStarts(self): 199f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.test_session() as sess: 200f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # CountUpTo will raise OUT_OF_RANGE when it reaches the count. 20158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney zero64 = constant_op.constant(0, dtype=dtypes.int64) 20258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney var = variables.Variable(zero64) 203f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur count_up_to = var.count_up_to(3) 20458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 20558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney variables.global_variables_initializer().run() 20658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney coord = coordinator.Coordinator() 20758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(queue, [count_up_to]) 208f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads = [] 209bdda9fab81e63a73716bdc7f490d54f31600bd28A. Unique TensorFlower # NOTE that this test does not actually start the threads. 210f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads.extend(qr.create_threads(sess, coord=coord)) 211ea6b59a5c4aa0147cc7e333da95234464b21cf6fA. Unique TensorFlower new_threads = qr.create_threads(sess, coord=coord) 212ea6b59a5c4aa0147cc7e333da95234464b21cf6fA. Unique TensorFlower self.assertEqual([], new_threads) 213f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 214f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur def testThreads(self): 215f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur with self.test_session() as sess: 216f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur # CountUpTo will raise OUT_OF_RANGE when it reaches the count. 21758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney zero64 = constant_op.constant(0, dtype=dtypes.int64) 21858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney var = variables.Variable(zero64) 219f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur count_up_to = var.count_up_to(3) 22058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 22158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney variables.global_variables_initializer().run() 222112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower qr = queue_runner_impl.QueueRunner(queue, [count_up_to, 223112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower _MockOp("bad_op")]) 224f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads = qr.create_threads(sess, start=True) 225112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower self.assertEqual(sorted(t.name for t in threads), 226112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower ["QueueRunnerThread-fifo_queue-CountUpTo:0", 227112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower "QueueRunnerThread-fifo_queue-bad_op"]) 228f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 229f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.join() 230f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur exceptions = qr.exceptions_raised 231f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(1, len(exceptions)) 232f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertTrue("Operation not in the graph" in str(exceptions[0])) 233f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 234f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur threads = qr.create_threads(sess, start=True) 235f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur for t in threads: 236f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur t.join() 237f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur exceptions = qr.exceptions_raised 238f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertEqual(1, len(exceptions)) 239f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur self.assertTrue("Operation not in the graph" in str(exceptions[0])) 240f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 241cd55d4606186357affea2f892f740de93c1bd0f7A. Unique TensorFlower def testName(self): 24258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney with ops.name_scope("scope"): 24358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32, name="queue") 24458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(queue, [control_flow_ops.no_op()]) 245cd55d4606186357affea2f892f740de93c1bd0f7A. Unique TensorFlower self.assertEqual("scope/queue", qr.name) 24658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue_runner_impl.add_queue_runner(qr) 24758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney self.assertEqual( 24858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney 1, len(ops.get_collection(ops.GraphKeys.QUEUE_RUNNERS, "scope"))) 249f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur 2505d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower def testStartQueueRunners(self): 2515d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower # CountUpTo will raise OUT_OF_RANGE when it reaches the count. 25258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney zero64 = constant_op.constant(0, dtype=dtypes.int64) 25358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney var = variables.Variable(zero64) 2545d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower count_up_to = var.count_up_to(3) 25558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 25658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney init_op = variables.global_variables_initializer() 25758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(queue, [count_up_to]) 25858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue_runner_impl.add_queue_runner(qr) 2595d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower with self.test_session() as sess: 2605d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower init_op.run() 26158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney threads = queue_runner_impl.start_queue_runners(sess) 2625d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower for t in threads: 2635d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower t.join() 2645d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower self.assertEqual(0, len(qr.exceptions_raised)) 2655d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower # The variable should be 3. 2665d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower self.assertEqual(3, var.eval()) 2675d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower 2687ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir def testStartQueueRunnersRaisesIfNotASession(self): 2697ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir zero64 = constant_op.constant(0, dtype=dtypes.int64) 2707ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir var = variables.Variable(zero64) 2717ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir count_up_to = var.count_up_to(3) 2727ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 2737ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir init_op = variables.global_variables_initializer() 2747ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir qr = queue_runner_impl.QueueRunner(queue, [count_up_to]) 2757ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir queue_runner_impl.add_queue_runner(qr) 2767ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir with self.test_session(): 2777ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir init_op.run() 2787ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir with self.assertRaisesRegexp(TypeError, "tf.Session"): 2797ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir queue_runner_impl.start_queue_runners("NotASession") 2807ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir 2817ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir def testStartQueueRunnersIgnoresMonitoredSession(self): 2827ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir zero64 = constant_op.constant(0, dtype=dtypes.int64) 2837ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir var = variables.Variable(zero64) 2847ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir count_up_to = var.count_up_to(3) 2857ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 2867ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir init_op = variables.global_variables_initializer() 2877ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir qr = queue_runner_impl.QueueRunner(queue, [count_up_to]) 2887ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir queue_runner_impl.add_queue_runner(qr) 2897ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir with self.test_session(): 2907ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir init_op.run() 2917ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir threads = queue_runner_impl.start_queue_runners( 2927ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir monitored_session.MonitoredSession()) 2937ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir self.assertFalse(threads) 2947ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir 2955d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower def testStartQueueRunnersNonDefaultGraph(self): 2965d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower # CountUpTo will raise OUT_OF_RANGE when it reaches the count. 29758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney graph = ops.Graph() 2985d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower with graph.as_default(): 29958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney zero64 = constant_op.constant(0, dtype=dtypes.int64) 30058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney var = variables.Variable(zero64) 3015d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower count_up_to = var.count_up_to(3) 30258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32) 30358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney init_op = variables.global_variables_initializer() 30458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr = queue_runner_impl.QueueRunner(queue, [count_up_to]) 30558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue_runner_impl.add_queue_runner(qr) 3065d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower with self.test_session(graph=graph) as sess: 3075d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower init_op.run() 30858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney threads = queue_runner_impl.start_queue_runners(sess) 3095d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower for t in threads: 3105d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower t.join() 3115d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower self.assertEqual(0, len(qr.exceptions_raised)) 3125d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower # The variable should be 3. 3135d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower self.assertEqual(3, var.eval()) 3145d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower 3157e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo def testQueueRunnerSerializationRoundTrip(self): 31658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney graph = ops.Graph() 3177e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo with graph.as_default(): 31858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue = data_flow_ops.FIFOQueue(10, dtypes.float32, name="queue") 31958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney enqueue_op = control_flow_ops.no_op(name="enqueue") 32058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney close_op = control_flow_ops.no_op(name="close") 32158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney cancel_op = control_flow_ops.no_op(name="cancel") 32258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr0 = queue_runner_impl.QueueRunner( 32358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue, [enqueue_op], 32458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney close_op, 32558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney cancel_op, 32658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney queue_closed_exception_types=(errors_impl.OutOfRangeError, 32758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney errors_impl.CancelledError)) 32858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr0_proto = queue_runner_impl.QueueRunner.to_proto(qr0) 32958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr0_recon = queue_runner_impl.QueueRunner.from_proto(qr0_proto) 3307e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual("queue", qr0_recon.queue.name) 3317e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual(1, len(qr0_recon.enqueue_ops)) 3327e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual(enqueue_op, qr0_recon.enqueue_ops[0]) 3337e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual(close_op, qr0_recon.close_op) 3347e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual(cancel_op, qr0_recon.cancel_op) 3357e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual( 33658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney (errors_impl.OutOfRangeError, errors_impl.CancelledError), 3377e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo qr0_recon.queue_closed_exception_types) 3387e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo 3397e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo # Assert we reconstruct an OutOfRangeError for QueueRunners 3407e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo # created before QueueRunnerDef had a queue_closed_exception_types field. 3417e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo del qr0_proto.queue_closed_exception_types[:] 34258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr0_legacy_recon = queue_runner_impl.QueueRunner.from_proto(qr0_proto) 3437e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual("queue", qr0_legacy_recon.queue.name) 3447e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual(1, len(qr0_legacy_recon.enqueue_ops)) 3457e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual(enqueue_op, qr0_legacy_recon.enqueue_ops[0]) 3467e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual(close_op, qr0_legacy_recon.close_op) 3477e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo self.assertEqual(cancel_op, qr0_legacy_recon.cancel_op) 34858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney self.assertEqual((errors_impl.OutOfRangeError,), 34958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney qr0_legacy_recon.queue_closed_exception_types) 3507e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo 3517e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo 352f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlurif __name__ == "__main__": 35358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney test.main() 354