10cf9ed3a719c0782695154d5a0bca260001cec15A. Unique TensorFlower# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
29c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur#
39c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# Licensed under the Apache License, Version 2.0 (the "License");
49c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# you may not use this file except in compliance with the License.
59c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# You may obtain a copy of the License at
69c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur#
79c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur#     http://www.apache.org/licenses/LICENSE-2.0
89c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur#
99c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# Unless required by applicable law or agreed to in writing, software
109c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# distributed under the License is distributed on an "AS IS" BASIS,
119c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
129c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# See the License for the specific language governing permissions and
139c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# limitations under the License.
149c3043ff3bf31a6a81810b4ce9e87ef936f1f529Manjunath Kudlur# ==============================================================================
15f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur"""Tests for QueueRunner."""
1658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney
17f2102f4e2c1c87f1d1bf9ab856a2849c54478760Vijay Vasudevanfrom __future__ import absolute_import
18f2102f4e2c1c87f1d1bf9ab856a2849c54478760Vijay Vasudevanfrom __future__ import division
19f2102f4e2c1c87f1d1bf9ab856a2849c54478760Vijay Vasudevanfrom __future__ import print_function
20f2102f4e2c1c87f1d1bf9ab856a2849c54478760Vijay Vasudevan
21112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlowerimport collections
22f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlurimport time
23f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
2458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.client import session
2558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.framework import constant_op
2658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.framework import dtypes
2758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.framework import errors_impl
2858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.framework import ops
2958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.ops import control_flow_ops
3058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.ops import data_flow_ops
3158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.ops import variables
3258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.platform import test
3358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.training import coordinator
347ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispirfrom tensorflow.python.training import monitored_session
3558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyfrom tensorflow.python.training import queue_runner_impl
36f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
37f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
38112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower_MockOp = collections.namedtuple("MockOp", ["name"])
39112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower
40112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower
4158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunneyclass QueueRunnerTest(test.TestCase):
42f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
43f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur  def testBasic(self):
44f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur    with self.test_session() as sess:
45f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # CountUpTo will raise OUT_OF_RANGE when it reaches the count.
4658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      zero64 = constant_op.constant(0, dtype=dtypes.int64)
4758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      var = variables.Variable(zero64)
48f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      count_up_to = var.count_up_to(3)
4958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
5058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      variables.global_variables_initializer().run()
5158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr = queue_runner_impl.QueueRunner(queue, [count_up_to])
52f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads = qr.create_threads(sess)
53112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower      self.assertEqual(sorted(t.name for t in threads),
54112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower                       ["QueueRunnerThread-fifo_queue-CountUpTo:0"])
55f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
56f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.start()
57f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
58f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.join()
59f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(0, len(qr.exceptions_raised))
60f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # The variable should be 3.
61f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(3, var.eval())
62f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
63f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur  def testTwoOps(self):
64f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur    with self.test_session() as sess:
65f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # CountUpTo will raise OUT_OF_RANGE when it reaches the count.
6658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      zero64 = constant_op.constant(0, dtype=dtypes.int64)
6758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      var0 = variables.Variable(zero64)
68f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      count_up_to_3 = var0.count_up_to(3)
6958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      var1 = variables.Variable(zero64)
70f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      count_up_to_30 = var1.count_up_to(30)
7158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
7258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr = queue_runner_impl.QueueRunner(queue, [count_up_to_3, count_up_to_30])
73f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads = qr.create_threads(sess)
74112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower      self.assertEqual(sorted(t.name for t in threads),
75112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower                       ["QueueRunnerThread-fifo_queue-CountUpTo:0",
76112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower                        "QueueRunnerThread-fifo_queue-CountUpTo_1:0"])
7758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      variables.global_variables_initializer().run()
78f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
79f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.start()
80f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
81f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.join()
82f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(0, len(qr.exceptions_raised))
83f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(3, var0.eval())
84f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(30, var1.eval())
85f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
86f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur  def testExceptionsCaptured(self):
87f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur    with self.test_session() as sess:
8858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
89112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower      qr = queue_runner_impl.QueueRunner(queue, [_MockOp("i fail"),
90112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower                                                 _MockOp("so fail")])
91f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads = qr.create_threads(sess)
9258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      variables.global_variables_initializer().run()
93f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
94f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.start()
95f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
96f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.join()
97f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      exceptions = qr.exceptions_raised
98f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(2, len(exceptions))
99f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertTrue("Operation not in the graph" in str(exceptions[0]))
100f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertTrue("Operation not in the graph" in str(exceptions[1]))
101f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
102f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur  def testRealDequeueEnqueue(self):
103f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur    with self.test_session() as sess:
10458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      q0 = data_flow_ops.FIFOQueue(3, dtypes.float32)
105f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      enqueue0 = q0.enqueue((10.0,))
106f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      close0 = q0.close()
10758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      q1 = data_flow_ops.FIFOQueue(30, dtypes.float32)
108f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      enqueue1 = q1.enqueue((q0.dequeue(),))
109f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      dequeue1 = q1.dequeue()
11058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr = queue_runner_impl.QueueRunner(q1, [enqueue1])
111f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads = qr.create_threads(sess)
112f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
113f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.start()
114f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # Enqueue 2 values, then close queue0.
115f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      enqueue0.run()
116f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      enqueue0.run()
117f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      close0.run()
118f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # Wait for the queue runner to terminate.
119f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
120f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.join()
121f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # It should have terminated cleanly.
122f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(0, len(qr.exceptions_raised))
123f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # The 2 values should be in queue1.
124f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(10.0, dequeue1.eval())
125f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(10.0, dequeue1.eval())
126f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # And queue1 should now be closed.
12758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      with self.assertRaisesRegexp(errors_impl.OutOfRangeError, "is closed"):
128f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        dequeue1.eval()
129f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
130f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur  def testRespectCoordShouldStop(self):
131f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur    with self.test_session() as sess:
132f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # CountUpTo will raise OUT_OF_RANGE when it reaches the count.
13358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      zero64 = constant_op.constant(0, dtype=dtypes.int64)
13458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      var = variables.Variable(zero64)
135f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      count_up_to = var.count_up_to(3)
13658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
13758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      variables.global_variables_initializer().run()
13858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr = queue_runner_impl.QueueRunner(queue, [count_up_to])
139f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # As the coordinator to stop.  The queue runner should
140f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # finish immediately.
14158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      coord = coordinator.Coordinator()
142f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      coord.request_stop()
143f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads = qr.create_threads(sess, coord)
144112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower      self.assertEqual(sorted(t.name for t in threads),
145112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower                       ["QueueRunnerThread-fifo_queue-CountUpTo:0",
146112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower                        "QueueRunnerThread-fifo_queue-close_on_stop"])
147f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
148f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.start()
149a0812ee71da3a86fa1bd0c6d7102a32de0b9730eA. Unique TensorFlower      coord.join()
150f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(0, len(qr.exceptions_raised))
151f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # The variable should be 0.
152f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(0, var.eval())
153f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
154f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur  def testRequestStopOnException(self):
155f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur    with self.test_session() as sess:
15658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
157112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower      qr = queue_runner_impl.QueueRunner(queue, [_MockOp("not an op")])
15858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      coord = coordinator.Coordinator()
159f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads = qr.create_threads(sess, coord)
160f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
161f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.start()
162f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # The exception should be re-raised when joining.
163f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      with self.assertRaisesRegexp(ValueError, "Operation not in the graph"):
164a0812ee71da3a86fa1bd0c6d7102a32de0b9730eA. Unique TensorFlower        coord.join()
165f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
166f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur  def testGracePeriod(self):
167f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur    with self.test_session() as sess:
168f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # The enqueue will quickly block.
16958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(2, dtypes.float32)
170f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      enqueue = queue.enqueue((10.0,))
171f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      dequeue = queue.dequeue()
17258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr = queue_runner_impl.QueueRunner(queue, [enqueue])
17358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      coord = coordinator.Coordinator()
17454760a10b6a0a0dedf196337eebf51c543b53181Tim Harley      qr.create_threads(sess, coord, start=True)
175f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # Dequeue one element and then request stop.
176f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      dequeue.op.run()
177f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      time.sleep(0.02)
178f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      coord.request_stop()
179f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # We should be able to join because the RequestStop() will cause
180f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # the queue to be closed and the enqueue to terminate.
181639b4e71f532761a4840b1cdbaea55ad0917c75bBenoit Steiner      coord.join(stop_grace_period_secs=1.0)
182f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
1835ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower  def testMultipleSessions(self):
1845ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower    with self.test_session() as sess:
18558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      with session.Session() as other_sess:
18658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney        zero64 = constant_op.constant(0, dtype=dtypes.int64)
18758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney        var = variables.Variable(zero64)
1885ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower        count_up_to = var.count_up_to(3)
18958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney        queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
19058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney        variables.global_variables_initializer().run()
19158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney        coord = coordinator.Coordinator()
19258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney        qr = queue_runner_impl.QueueRunner(queue, [count_up_to])
1935ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower        # NOTE that this test does not actually start the threads.
1945ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower        threads = qr.create_threads(sess, coord=coord)
1955ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower        other_threads = qr.create_threads(other_sess, coord=coord)
1965ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower        self.assertEqual(len(threads), len(other_threads))
1975ad6738c5117ebc2b9384a379a38fa0fccd587a0A. Unique TensorFlower
198ea6b59a5c4aa0147cc7e333da95234464b21cf6fA. Unique TensorFlower  def testIgnoreMultiStarts(self):
199f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur    with self.test_session() as sess:
200f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # CountUpTo will raise OUT_OF_RANGE when it reaches the count.
20158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      zero64 = constant_op.constant(0, dtype=dtypes.int64)
20258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      var = variables.Variable(zero64)
203f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      count_up_to = var.count_up_to(3)
20458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
20558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      variables.global_variables_initializer().run()
20658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      coord = coordinator.Coordinator()
20758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr = queue_runner_impl.QueueRunner(queue, [count_up_to])
208f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads = []
209bdda9fab81e63a73716bdc7f490d54f31600bd28A. Unique TensorFlower      # NOTE that this test does not actually start the threads.
210f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads.extend(qr.create_threads(sess, coord=coord))
211ea6b59a5c4aa0147cc7e333da95234464b21cf6fA. Unique TensorFlower      new_threads = qr.create_threads(sess, coord=coord)
212ea6b59a5c4aa0147cc7e333da95234464b21cf6fA. Unique TensorFlower      self.assertEqual([], new_threads)
213f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
214f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur  def testThreads(self):
215f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur    with self.test_session() as sess:
216f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      # CountUpTo will raise OUT_OF_RANGE when it reaches the count.
21758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      zero64 = constant_op.constant(0, dtype=dtypes.int64)
21858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      var = variables.Variable(zero64)
219f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      count_up_to = var.count_up_to(3)
22058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
22158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      variables.global_variables_initializer().run()
222112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower      qr = queue_runner_impl.QueueRunner(queue, [count_up_to,
223112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower                                                 _MockOp("bad_op")])
224f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads = qr.create_threads(sess, start=True)
225112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower      self.assertEqual(sorted(t.name for t in threads),
226112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower                       ["QueueRunnerThread-fifo_queue-CountUpTo:0",
227112191d51f5ca4acba234d56a83691bc27091f43A. Unique TensorFlower                        "QueueRunnerThread-fifo_queue-bad_op"])
228f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
229f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.join()
230f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      exceptions = qr.exceptions_raised
231f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(1, len(exceptions))
232f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertTrue("Operation not in the graph" in str(exceptions[0]))
233f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
234f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      threads = qr.create_threads(sess, start=True)
235f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      for t in threads:
236f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur        t.join()
237f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      exceptions = qr.exceptions_raised
238f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertEqual(1, len(exceptions))
239f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur      self.assertTrue("Operation not in the graph" in str(exceptions[0]))
240f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
241cd55d4606186357affea2f892f740de93c1bd0f7A. Unique TensorFlower  def testName(self):
24258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    with ops.name_scope("scope"):
24358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32, name="queue")
24458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    qr = queue_runner_impl.QueueRunner(queue, [control_flow_ops.no_op()])
245cd55d4606186357affea2f892f740de93c1bd0f7A. Unique TensorFlower    self.assertEqual("scope/queue", qr.name)
24658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    queue_runner_impl.add_queue_runner(qr)
24758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    self.assertEqual(
24858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney        1, len(ops.get_collection(ops.GraphKeys.QUEUE_RUNNERS, "scope")))
249f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlur
2505d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower  def testStartQueueRunners(self):
2515d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower    # CountUpTo will raise OUT_OF_RANGE when it reaches the count.
25258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    zero64 = constant_op.constant(0, dtype=dtypes.int64)
25358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    var = variables.Variable(zero64)
2545d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower    count_up_to = var.count_up_to(3)
25558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
25658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    init_op = variables.global_variables_initializer()
25758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    qr = queue_runner_impl.QueueRunner(queue, [count_up_to])
25858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    queue_runner_impl.add_queue_runner(qr)
2595d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower    with self.test_session() as sess:
2605d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      init_op.run()
26158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      threads = queue_runner_impl.start_queue_runners(sess)
2625d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      for t in threads:
2635d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower        t.join()
2645d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      self.assertEqual(0, len(qr.exceptions_raised))
2655d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      # The variable should be 3.
2665d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      self.assertEqual(3, var.eval())
2675d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower
2687ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir  def testStartQueueRunnersRaisesIfNotASession(self):
2697ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    zero64 = constant_op.constant(0, dtype=dtypes.int64)
2707ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    var = variables.Variable(zero64)
2717ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    count_up_to = var.count_up_to(3)
2727ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
2737ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    init_op = variables.global_variables_initializer()
2747ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    qr = queue_runner_impl.QueueRunner(queue, [count_up_to])
2757ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    queue_runner_impl.add_queue_runner(qr)
2767ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    with self.test_session():
2777ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir      init_op.run()
2787ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir      with self.assertRaisesRegexp(TypeError, "tf.Session"):
2797ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir        queue_runner_impl.start_queue_runners("NotASession")
2807ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir
2817ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir  def testStartQueueRunnersIgnoresMonitoredSession(self):
2827ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    zero64 = constant_op.constant(0, dtype=dtypes.int64)
2837ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    var = variables.Variable(zero64)
2847ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    count_up_to = var.count_up_to(3)
2857ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
2867ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    init_op = variables.global_variables_initializer()
2877ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    qr = queue_runner_impl.QueueRunner(queue, [count_up_to])
2887ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    queue_runner_impl.add_queue_runner(qr)
2897ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir    with self.test_session():
2907ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir      init_op.run()
2917ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir      threads = queue_runner_impl.start_queue_runners(
2927ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir          monitored_session.MonitoredSession())
2937ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir      self.assertFalse(threads)
2947ad0d0698ab443324bbe68dd5d6476111c6b229aMustafa Ispir
2955d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower  def testStartQueueRunnersNonDefaultGraph(self):
2965d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower    # CountUpTo will raise OUT_OF_RANGE when it reaches the count.
29758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    graph = ops.Graph()
2985d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower    with graph.as_default():
29958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      zero64 = constant_op.constant(0, dtype=dtypes.int64)
30058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      var = variables.Variable(zero64)
3015d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      count_up_to = var.count_up_to(3)
30258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32)
30358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      init_op = variables.global_variables_initializer()
30458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr = queue_runner_impl.QueueRunner(queue, [count_up_to])
30558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue_runner_impl.add_queue_runner(qr)
3065d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower    with self.test_session(graph=graph) as sess:
3075d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      init_op.run()
30858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      threads = queue_runner_impl.start_queue_runners(sess)
3095d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      for t in threads:
3105d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower        t.join()
3115d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      self.assertEqual(0, len(qr.exceptions_raised))
3125d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      # The variable should be 3.
3135d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower      self.assertEqual(3, var.eval())
3145d2075f12b0b43a09a432e077334a87394933cd2A. Unique TensorFlower
3157e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo  def testQueueRunnerSerializationRoundTrip(self):
31658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney    graph = ops.Graph()
3177e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo    with graph.as_default():
31858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      queue = data_flow_ops.FIFOQueue(10, dtypes.float32, name="queue")
31958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      enqueue_op = control_flow_ops.no_op(name="enqueue")
32058201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      close_op = control_flow_ops.no_op(name="close")
32158201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      cancel_op = control_flow_ops.no_op(name="cancel")
32258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr0 = queue_runner_impl.QueueRunner(
32358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney          queue, [enqueue_op],
32458201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney          close_op,
32558201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney          cancel_op,
32658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney          queue_closed_exception_types=(errors_impl.OutOfRangeError,
32758201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney                                        errors_impl.CancelledError))
32858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr0_proto = queue_runner_impl.QueueRunner.to_proto(qr0)
32958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr0_recon = queue_runner_impl.QueueRunner.from_proto(qr0_proto)
3307e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual("queue", qr0_recon.queue.name)
3317e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual(1, len(qr0_recon.enqueue_ops))
3327e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual(enqueue_op, qr0_recon.enqueue_ops[0])
3337e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual(close_op, qr0_recon.close_op)
3347e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual(cancel_op, qr0_recon.cancel_op)
3357e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual(
33658201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney          (errors_impl.OutOfRangeError, errors_impl.CancelledError),
3377e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo          qr0_recon.queue_closed_exception_types)
3387e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo
3397e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      # Assert we reconstruct an OutOfRangeError for QueueRunners
3407e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      # created before QueueRunnerDef had a queue_closed_exception_types field.
3417e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      del qr0_proto.queue_closed_exception_types[:]
34258201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      qr0_legacy_recon = queue_runner_impl.QueueRunner.from_proto(qr0_proto)
3437e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual("queue", qr0_legacy_recon.queue.name)
3447e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual(1, len(qr0_legacy_recon.enqueue_ops))
3457e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual(enqueue_op, qr0_legacy_recon.enqueue_ops[0])
3467e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual(close_op, qr0_legacy_recon.close_op)
3477e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo      self.assertEqual(cancel_op, qr0_legacy_recon.cancel_op)
34858201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney      self.assertEqual((errors_impl.OutOfRangeError,),
34958201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney                       qr0_legacy_recon.queue_closed_exception_types)
3507e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo
3517e7e0d68ceca42900ddf378a07fff2bec6dec88dEugene Brevdo
352f41959ccb2d9d4c722fe8fc3351401d53bcf490Manjunath Kudlurif __name__ == "__main__":
35358201a058853de647b37ddb0ccf63d89b2357f03Justine Tunney  test.main()
354