1/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16// This header declares classes for the infeed manager and the infeed
17// buffer that are used by the GPU runtime to transfer buffers into an
18// executing GPU computation, e.g., to feed data into a while loop.
19
20#ifndef TENSORFLOW_COMPILER_XLA_SERVICE_GPU_INFEED_MANAGER_H_
21#define TENSORFLOW_COMPILER_XLA_SERVICE_GPU_INFEED_MANAGER_H_
22
23#include <deque>
24
25#include "tensorflow/compiler/xla/types.h"
26#include "tensorflow/core/lib/gtl/flatset.h"
27#include "tensorflow/core/platform/mutex.h"
28#include "tensorflow/core/platform/stream_executor_no_cuda.h"
29
30namespace xla {
31namespace gpu {
32
33// TODO(b/30467474) Once GPU infeed implementation settles, consider
34// folding back the cpu and gpu infeed implementations into a generic
35// one if possible.
36//
37// Current limitations:
38// * Does not handle multiple devices/replicas.
39//
40// * Buffer space on GPU is allocated on every infeed enqueue request,
41// and it does not handle the case when it runs out of
42// memory. Potential solution is to pre-allocate a fixed amount of
43// memory and block when that memory is full.
44
45// Defines an infeed buffer that is passed to the runtime by
46// the client. The client manages the memory of the buffer.
47class InfeedBuffer {
48 public:
49  InfeedBuffer(perftools::gputools::StreamExecutor* executor, int64 length)
50      : executor_(executor), length_(length) {
51    device_memory_ = executor_->AllocateArray<uint8>(length);
52    CHECK(!device_memory_.is_null());
53  }
54
55  ~InfeedBuffer() { executor_->Deallocate(&device_memory_); }
56
57  int64 length() const { return length_; }
58
59  // Callback to signal that this buffer is consumed. This helps the
60  // client to manage memory for the infeed buffers.
61  void Done() { delete this; }
62
63  perftools::gputools::DeviceMemoryBase* device_memory() {
64    return &device_memory_;
65  }
66
67 private:
68  perftools::gputools::StreamExecutor* executor_;  // Not owned.
69  const int64 length_;
70  perftools::gputools::DeviceMemoryBase device_memory_;
71};
72
73// Client-side class used to enqueue infeed buffers.
74class InfeedManager {
75 public:
76  InfeedManager();
77
78  // Calls the completion callback for any enqueued buffers that have
79  // not been dequeued by the runtime, and empties the infeed
80  // queue. Reset may not be called while a runtime computation is
81  // processing a dequeued buffer. The only safe way to ensure this
82  // condition is to call Reset when no computation is taking place.
83  void Reset();
84
85  // Adds a set of buffers to the infeed queue atomically. buffer->Done
86  // will be called when the buffer will no longer be accessed by the
87  // InfeedManager, either as a result of a call to Reset or because the
88  // runtime has dequeued and used the buffer.
89  void EnqueueBuffers(const std::vector<InfeedBuffer*>& buffers);
90
91  // Blocks until the infeed queue is non-empty, then returns the
92  // buffer at the head of the queue. Adds the current buffer to the
93  // to-be released set.
94  InfeedBuffer* BlockingDequeueBuffer();
95
96  // Releases a set of buffers from the to-be released set.
97  void ReleaseBuffers(const std::vector<InfeedBuffer*>& buffers);
98
99  // Returns a cached stream associated with an executor. Allocates a
100  // new stream on the first invocation. On subsequent invocations, if
101  // the cached executor is not the same as the requested executor,
102  // returns null.
103  perftools::gputools::Stream* GetStream(
104      perftools::gputools::StreamExecutor* executor);
105
106 private:
107  // TODO(b/30467474): Revisit if this mutex becomes a point of
108  // contention.
109  tensorflow::mutex mu_;
110
111  // Condition variable that is signaled every time a buffer is
112  // enqueued to an empty queue.
113  tensorflow::condition_variable cv_;
114
115  // InfeedBuffer* queue contents are not owned, but buffer->Done must
116  // be called when the buffer is no longer needed by the runtime.
117  std::deque<InfeedBuffer*> enqueued_buffer_;
118
119  // Buffers that are dequeued and currently being processed by the
120  // runtime. Not owned.
121  tensorflow::gtl::FlatSet<const InfeedBuffer*> dequeued_buffer_;
122
123  // Cached host to device stream for queuing infeed data.
124  std::unique_ptr<perftools::gputools::Stream> host_to_device_stream_;
125
126  // Executor that the host_to_device_stream belongs to. Not owned.
127  perftools::gputools::StreamExecutor* host_to_device_executor_;
128};
129
130// Singleton creator-or-accessor: Returns the GPU infeed manager.
131InfeedManager* GetOrCreateInfeedManager();
132
133}  // namespace gpu
134}  // namespace xla
135
136#endif  // TENSORFLOW_COMPILER_XLA_SERVICE_GPU_INFEED_MANAGER_H_
137