gpu_device.cc revision bfa539c03cd1555024fc04f4974e531c46b24e07
1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16// TODO(opensource): Use a more generic sounding preprocessor name than
17// GOOGLE_CUDA
18#if GOOGLE_CUDA
19
20#define EIGEN_USE_GPU
21
22#include "tensorflow/core/common_runtime/gpu/gpu_device.h"
23
24#include <stdlib.h>
25#include <string.h>
26#include <algorithm>
27#include <list>
28#include <map>
29#include <tuple>
30#include <vector>
31
32#include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
33#include "tensorflow/core/common_runtime/device_factory.h"
34#include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h"
35#include "tensorflow/core/common_runtime/gpu/gpu_init.h"
36#include "tensorflow/core/common_runtime/gpu/gpu_stream_util.h"
37#include "tensorflow/core/common_runtime/gpu/gpu_util.h"
38#include "tensorflow/core/common_runtime/gpu/process_state.h"
39#include "tensorflow/core/common_runtime/gpu_device_context.h"
40#include "tensorflow/core/common_runtime/local_device.h"
41#include "tensorflow/core/framework/allocator.h"
42#include "tensorflow/core/framework/device_base.h"
43#include "tensorflow/core/framework/op_kernel.h"
44#include "tensorflow/core/framework/tensor.h"
45#include "tensorflow/core/framework/tensor.pb.h"
46#include "tensorflow/core/framework/types.h"
47#include "tensorflow/core/framework/variant_op_registry.h"
48#include "tensorflow/core/graph/types.h"
49#include "tensorflow/core/lib/core/errors.h"
50#include "tensorflow/core/lib/core/status.h"
51#include "tensorflow/core/lib/gtl/stl_util.h"
52#include "tensorflow/core/lib/strings/numbers.h"
53#include "tensorflow/core/lib/strings/str_util.h"
54#include "tensorflow/core/lib/strings/strcat.h"
55#include "tensorflow/core/platform/cuda.h"
56#include "tensorflow/core/platform/logging.h"
57#include "tensorflow/core/platform/macros.h"
58#include "tensorflow/core/platform/stream_executor.h"
59#include "tensorflow/core/platform/tracing.h"
60#include "tensorflow/core/platform/types.h"
61#include "tensorflow/core/public/session_options.h"
62#include "tensorflow/core/util/device_name_utils.h"
63#include "tensorflow/core/util/stream_executor_util.h"
64
65namespace tensorflow {
66
67// Eigen Ops directly allocate memory only for temporary buffers used
68// during OpKernel::Compute().  The recommended way of allocating such
69// memory is via OpKernelContext::allocate_temp().  However, Eigen Ops
70// don't have access to OpKernelContext, instead they get access to
71// memory directly through the device allocator.  As an Open Source
72// project, Eigen assumes allocator semantics similar to those of the
73// CUDA memory allocator, and may not work correctly due to race
74// conditions if used with some other allocator.  For safety, we need
75// to delay deallocation calls out of Eigen until all events on the
76// corresponding stream have completed.  The following two classes
77// serve this purpose in two different compilation environments.
78
79class EigenCudaStreamDevice : public ::Eigen::StreamInterface {
80 public:
81  EigenCudaStreamDevice()
82      : scratch_(nullptr), semaphore_(nullptr), context_(nullptr) {
83    Eigen::initializeDeviceProp();
84  }
85  ~EigenCudaStreamDevice() override {}
86  void Reinitialize(OpKernelContext* context, const cudaStream_t* cuda_stream,
87                    int gpu_id, ::tensorflow::Allocator* alloc, char* scratch) {
88    if (LogMemory::IsEnabled()) {
89      operation_ = context->op_kernel().name() + "/EigenAllocator";
90      step_id_ = context->step_id();
91    }
92    context_ = context;
93    scratch_ = scratch;
94    semaphore_ =
95        reinterpret_cast<unsigned int*>(scratch + Eigen::kCudaScratchSize);
96    stream_ = cuda_stream;
97    allocator_ = alloc;
98    device_prop_ = &Eigen::m_deviceProperties[gpu_id];
99  }
100
101  const cudaStream_t& stream() const override { return *stream_; }
102  const cudaDeviceProp& deviceProperties() const override {
103    return *device_prop_;
104  }
105
106  void* allocate(size_t num_bytes) const override {
107    void* ret = allocator_->AllocateRaw(32 /* alignment */, num_bytes);
108    if (ret == nullptr) {
109      if (context_) {
110        context_->SetStatus(errors::ResourceExhausted(
111            strings::StrCat("Ran out of GPU memory when allocating ", num_bytes,
112                            " bytes for ", operation_)));
113      } else {
114        LOG(FATAL)
115            << "EigenAllocator for GPU ran out of memory when allocating "
116            << num_bytes << ". See error logs for more detailed info.";
117      }
118    }
119    if (LogMemory::IsEnabled() && ret != nullptr) {
120      LogMemory::RecordRawAllocation(operation_, step_id_, num_bytes, ret,
121                                     allocator_);
122    }
123    return ret;
124  }
125  void deallocate(void* buffer) const override {
126    if (LogMemory::IsEnabled() && buffer != nullptr) {
127      LogMemory::RecordRawDeallocation(operation_, step_id_, buffer, allocator_,
128                                       true);
129    }
130    AsyncFreeData* afData =
131        new AsyncFreeData(allocator_, buffer, operation_, step_id_);
132    cudaError_t err = cudaStreamAddCallback(*stream_, asyncFree, afData, 0);
133    CHECK_EQ(err, cudaSuccess);
134  }
135
136  // Return a pointer to a per stream scratchpad of 1024 bytes residing
137  // in global memory.
138  void* scratchpad() const override { return scratch_; }
139
140  // Return a semaphore. The semaphore is initially initialized to 0, and
141  // each kernel using it is responsible for resetting to 0 upon completion
142  // to maintain the invariant that the semaphore is always equal to 0 upon
143  // each kernel start.
144  unsigned int* semaphore() const override { return semaphore_; }
145
146 private:
147  struct AsyncFreeData {
148    AsyncFreeData(::tensorflow::Allocator* a, void* p, const string& o,
149                  const int64 s)
150        : allocator_(a), address_(p), operation_(o), step_id_(s) {}
151    ::tensorflow::Allocator* allocator_;
152    void* address_;
153    const string operation_;
154    const int64 step_id_;
155  };
156
157  static void CUDART_CB asyncFree(cudaStream_t stream, cudaError_t status,
158                                  void* userData) {
159    AsyncFreeData* data = static_cast<AsyncFreeData*>(userData);
160    if (LogMemory::IsEnabled()) {
161      LogMemory::RecordRawDeallocation(data->operation_, data->step_id_,
162                                       data->address_, data->allocator_, false);
163    }
164    data->allocator_->DeallocateRaw(data->address_);
165    delete data;
166  }
167
168  string operation_;
169  int64 step_id_;
170  const cudaStream_t* stream_;          // Not owned.
171  const cudaDeviceProp* device_prop_;   // Not owned.
172  ::tensorflow::Allocator* allocator_;  // Not owned.
173  mutable char* scratch_;
174  mutable unsigned int* semaphore_;
175  OpKernelContext* context_;
176
177  TF_DISALLOW_COPY_AND_ASSIGN(EigenCudaStreamDevice);
178};
179
180// This factory helps to ensure that different GPU device objects that refer to
181// the same physical device and stream group id use the same stream group
182// object (and therefore the same CUDA streams). This is necessary since there
183// is a single memory allocator per device (see ProcessState::GetGPUAllocator)
184// and allocators must not be shared across streams.
185class BaseGPUDevice::StreamGroupFactory {
186 public:
187  // Returns the unique stream group for use with the stream defined by
188  // {gpu_id, stream_group_within_gpu}, creating it if it does not yet exist.
189  // This function is thread safe.
190  BaseGPUDevice::StreamGroup* GetOrCreate(int gpu_id,
191                                          int stream_group_within_gpu,
192                                          gpu::StreamExecutor* executor) {
193    mutex_lock guard(lock_);
194    StreamGroup* group = &streams_[key_type(gpu_id, stream_group_within_gpu)];
195    if (!group->compute) {
196      group->compute = new gpu::Stream(executor);
197      group->compute->Init();
198      VLOG(2) << "Created stream[" << stream_group_within_gpu
199              << "] = " << group->compute;
200
201      group->host_to_device = new gpu::Stream(executor);
202      group->host_to_device->Init();
203      VLOG(2) << "Created host_to_device_stream[" << stream_group_within_gpu
204              << "] = " << group->host_to_device;
205
206      group->device_to_host = new gpu::Stream(executor);
207      group->device_to_host->Init();
208      VLOG(2) << "Created device_to_host_stream[" << stream_group_within_gpu
209              << "] = " << group->device_to_host;
210
211      group->device_to_device = new gpu::Stream(executor);
212      group->device_to_device->Init();
213      VLOG(2) << "Created device_to_device_stream[" << stream_group_within_gpu
214              << "] = " << group->device_to_host;
215    }
216    return group;
217  }
218
219  // Returns a reference to the StreamGroupFactory singleton. Note that this is
220  // never destroyed, so the objects it owns are never deleted.
221  static StreamGroupFactory& Global() {
222    static StreamGroupFactory* instance = new StreamGroupFactory();
223    return *instance;
224  }
225
226 private:
227  mutex lock_;
228  using key_type = std::tuple<int, int>;
229  std::map<key_type, StreamGroup> streams_;
230
231  // StreamGroupFactory cannot be created directly; Call
232  // StreamGroupFactory::Global() to get the global instance.
233  StreamGroupFactory() = default;
234  TF_DISALLOW_COPY_AND_ASSIGN(StreamGroupFactory);
235};
236
237BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
238                             Bytes memory_limit, const DeviceLocality& locality,
239                             int gpu_id, const string& physical_device_desc,
240                             Allocator* gpu_allocator, Allocator* cpu_allocator,
241                             bool sync_every_op, int32 max_streams)
242    : LocalDevice(options, Device::BuildDeviceAttributes(name, DEVICE_GPU,
243                                                         memory_limit, locality,
244                                                         physical_device_desc)),
245      gpu_allocator_(gpu_allocator),
246      cpu_allocator_(cpu_allocator),
247      gpu_id_(gpu_id),
248      sync_every_op_(sync_every_op),
249      max_streams_(max_streams) {
250  ProcessState::singleton()->EnableGPUDevice();
251}
252
253BaseGPUDevice::~BaseGPUDevice() {
254  delete gpu_device_info_;
255  for (auto ctx : device_contexts_) ctx->Unref();
256}
257
258Status BaseGPUDevice::Init(const SessionOptions& options) {
259  auto executor_status = GPUMachineManager()->ExecutorForDevice(gpu_id_);
260  if (!executor_status.status().ok()) {
261    return errors::Internal("Failed to get StreamExecutor for device ",
262                            gpu_id_);
263  }
264
265  executor_ = executor_status.ValueOrDie();
266  em_.reset(new EventMgr(executor_, options.config.gpu_options()));
267
268  if (max_streams_ < 1) {
269    return errors::InvalidArgument("Invalid value for max_streams.");
270  }
271
272  // Create the specified number of GPU streams
273  for (int i = 0; i < max_streams_; i++) {
274    streams_.push_back(
275        StreamGroupFactory::Global().GetOrCreate(gpu_id_, i, executor_));
276
277    size_t scratch_buffer_size = Eigen::kCudaScratchSize + sizeof(unsigned int);
278    void* scratch_buffer = gpu_allocator_->AllocateRaw(
279        Allocator::kAllocatorAlignment, scratch_buffer_size);
280    if (scratch_buffer == nullptr) {
281      return errors::FailedPrecondition(
282          "Failed to allocate scratch buffer for device ", gpu_id_);
283    }
284    scratch_.push_back(static_cast<char*>(scratch_buffer));
285
286    perftools::gputools::DeviceMemory<char> mem(
287        perftools::gputools::DeviceMemoryBase(scratch_buffer,
288                                              scratch_buffer_size));
289
290    bool ok = executor_->SynchronousMemZero(
291        &mem, Eigen::kCudaScratchSize + sizeof(unsigned int));
292    if (!ok) {
293      return errors::FailedPrecondition(
294          "Failed to memcopy into scratch buffer for device ", gpu_id_);
295    }
296
297    device_contexts_.push_back(new GPUDeviceContext(
298        i, streams_.back()->compute, streams_.back()->host_to_device,
299        streams_.back()->device_to_host, streams_.back()->device_to_device));
300  }
301  gpu_device_info_ = new GpuDeviceInfo;
302  gpu_device_info_->stream = streams_[0]->compute;
303  gpu_device_info_->default_context = device_contexts_[0];
304  gpu_device_info_->event_mgr = em_.get();
305  gpu_device_info_->gpu_id = gpu_id_;
306  set_tensorflow_gpu_device_info(gpu_device_info_);
307
308  return Status::OK();
309}
310
311bool BaseGPUDevice::RequiresRecordingAccessedTensors() const {
312  // When there is no more than one stream, we release the tensor reference
313  // at the end of the kernel launch, instead of at the end of the kernel
314  // execution.
315  return streams_.size() > 1;
316}
317
318Status BaseGPUDevice::FillContextMap(const Graph* graph,
319                                     DeviceContextMap* device_context_map) {
320  VLOG(2) << "FillContextMap";
321
322  const size_t num_streams = streams_.size();
323  // Special case for single stream.
324  if (num_streams == 1) {
325    return Status::OK();
326  }
327  const int64 before = Env::Default()->NowMicros();
328  gpu_stream_util::AssignStreamsOpts opts;
329  opts.max_streams = static_cast<int32>(num_streams);
330  std::unordered_map<int, int> node_to_stream_id;
331  TF_RETURN_IF_ERROR(
332      gpu_stream_util::AssignStreams(graph, opts, &node_to_stream_id));
333  int64 elapsed = Env::Default()->NowMicros() - before;
334  VLOG(3) << "AssignStreams took " << elapsed << "us";
335
336  // Fill in the context map.  It is OK for this map to contain
337  // duplicate DeviceContexts so long as we increment the refcount.
338  device_context_map->resize(graph->num_node_ids());
339  for (Node* n : graph->nodes()) {
340    auto mapped_stream = node_to_stream_id[n->id()];
341    CHECK_LE(mapped_stream, num_streams);
342    auto ctx = device_contexts_[mapped_stream];
343    VLOG(3) << "Assigned stream " << node_to_stream_id[n->id()]
344            << " ==> stream[" << ctx->stream_id() << "] for node id " << n->id()
345            << " " << n->type_string() << " " << n->name();
346    ctx->Ref();
347    (*device_context_map)[n->id()] = ctx;
348  }
349
350  return Status::OK();
351}
352
353void BaseGPUDevice::Compute(OpKernel* op_kernel, OpKernelContext* context) {
354  // ScopedActivity is cheap when tracing is not active, but we
355  // can avoid computing the Hash64.
356  // TODO(pbar) This would no longer be needed if Ops have a unique id.
357  const uint64 id = port::Tracing::IsActive() ? Hash64(op_kernel->name()) : 0;
358  port::Tracing::ScopedActivity region(port::Tracing::EventCategory::kCompute,
359                                       id);
360
361  // NOTE(tucker): We need to discriminate between Eigen GPU
362  // operations and all others.  If an operation is Eigen
363  // implemented (or otherwise tries to launch a cuda kernel
364  // directly), we need to establish a stacked-scoped environment
365  // that directs it to execute on the proper device.  Otherwise we
366  // expect the Op to use StreamExecutor directly and correctly.  The
367  // way we make this discrimination is quite hacky: At the moment
368  // the only non-Eigen GPU Op is the recv-op, which is known to be
369  // asynchronous.
370  if (op_kernel->is_internal() && op_kernel->type_string() == "_Recv") {
371    context->SetStatus(errors::Internal(
372        "Invalid synchronous 'Compute' on GPU for '_Recv' op"));
373  } else if (port::Tracing::ScopedAnnotation::Enabled()) {
374    port::Tracing::ScopedAnnotation annotation(op_kernel->name(),
375                                               op_kernel->type_string());
376    ComputeHelper(op_kernel, context);
377  } else {
378    ComputeHelper(op_kernel, context);
379  }
380}
381
382void BaseGPUDevice::ComputeHelper(OpKernel* op_kernel,
383                                  OpKernelContext* context) {
384  GPUDeviceContext* gpu_device_context = device_contexts_[0];
385  if (context->op_device_context() != nullptr) {
386    gpu_device_context =
387        static_cast<GPUDeviceContext*>(context->op_device_context());
388  }
389  gpu::Stream* stream = gpu_device_context->stream();
390  const auto stream_id = gpu_device_context->stream_id();
391
392  const bool vlog_1 = VLOG_IS_ON(1);
393  const bool vlog_2 = vlog_1 && VLOG_IS_ON(2);
394
395  if (vlog_1) {
396    VLOG(1) << "GpuDevice::Compute " << op_kernel->name() << " op "
397            << op_kernel->type_string() << " on GPU" << gpu_id_ << " stream["
398            << stream_id << "]";
399  }
400
401  const auto num_streams = streams_.size();
402  if (num_streams > 1) {
403    // If this op's device context is different from the other contexts,
404    // we must wait on the stream.
405    for (int i = 0; i < context->num_inputs(); ++i) {
406      const GPUDeviceContext* idc =
407          static_cast<GPUDeviceContext*>(context->input_device_context(i));
408      OP_REQUIRES(context, idc != nullptr,
409                  errors::Internal("Input device context ", i,
410                                   " was not set properly."));
411      if (vlog_2) {
412        const void* base;
413        size_t len;
414        if (context->has_input(i)) {
415          if (IsRefType(context->input_dtype(i))) {
416            Tensor tensor = context->mutable_input(i, false);
417            base = DMAHelper::base(&tensor);
418            len = tensor.TotalBytes();
419          } else {
420            const Tensor& tensor = context->input(i);
421            base = DMAHelper::base(&tensor);
422            len = tensor.TotalBytes();
423          }
424          LOG(INFO) << "Input " << i << " " << base << "  " << len;
425          LOG(INFO) << "  stream[" << stream_id << "].ThenWaitFor(stream["
426                    << idc->stream_id() << "])"
427                    << ((idc->stream() == stream) ? " not needed" : "");
428        }
429      }
430      if (idc->stream() != stream) stream->ThenWaitFor(idc->stream());
431    }
432  }
433  gpu::cuda::ScopedActivateExecutorContext scoped_activation{stream->parent()};
434  op_kernel->Compute(context);
435  if (context->status().ok()) {
436    if (sync_every_op_) {
437      // Note: GPUUtil::Sync() only syncs the default stream.
438      // We need to either sync the stream used by this op, or
439      // all streams.  Given that this flag is typically used for
440      // debugging it makes more sense to sync all GPU activity.
441      context->SetStatus(GPUUtil::SyncAll(this));
442    }
443  }
444}
445
446void BaseGPUDevice::ConsumeListOfAccessedTensors(
447    DeviceContext* device_context, const TensorReferenceVector& tensor_refs) {
448  GPUDeviceContext* gpu_device_context = device_contexts_[0];
449  if (device_context != nullptr) {
450    gpu_device_context = static_cast<GPUDeviceContext*>(device_context);
451  }
452  gpu::Stream* stream = gpu_device_context->stream();
453  em_->ThenDeleteTensors(stream, tensor_refs);
454}
455
456// Based on the semantics of Device::Sync this call should wait for
457// all streams not just the current one.
458Status BaseGPUDevice::Sync() { return GPUUtil::SyncAll(this); }
459
460void BaseGPUDevice::ComputeAsync(AsyncOpKernel* op_kernel,
461                                 OpKernelContext* context,
462                                 AsyncOpKernel::DoneCallback done) {
463  GPUDeviceContext* gpu_device_context = device_contexts_[0];
464  if (context->op_device_context() != nullptr) {
465    gpu_device_context =
466        static_cast<GPUDeviceContext*>(context->op_device_context());
467  }
468  gpu::Stream* stream = gpu_device_context->stream();
469  const auto stream_id = gpu_device_context->stream_id();
470
471  VLOG(1) << "GpuDevice::ComputeAsync " << op_kernel->name() << " op "
472          << op_kernel->type_string() << " on GPU" << gpu_id_ << " stream["
473          << stream_id << "]";
474
475  // When TraceMe profiling is off (which is the default), the
476  // following TraceMe constructor is simply a conditional test of
477  // false value. Measurements show that its overhead is negligible.
478  port::Tracing::TraceMe activity(op_kernel->name(), op_kernel->type_string(),
479                                  op_kernel->IsExpensive());
480  gpu::cuda::ScopedActivateExecutorContext scoped_activation{stream->parent()};
481  op_kernel->ComputeAsync(context, done);
482}
483
484Status BaseGPUDevice::MaybeCopyTensorToGPU(
485    const AllocatorAttributes& alloc_attrs, const Tensor& from, Tensor* to,
486    StatusCallback done) {
487  if (alloc_attrs.on_host()) {
488    *to = from;
489    done(Status::OK());
490    return Status::OK();
491  } else {
492    if (!DMAHelper::CanUseDMA(&from)) {
493      Status err = errors::Internal("GPU copy from non-DMA ",
494                                    DataTypeString(from.dtype()), " tensor");
495      done(err);
496      return err;
497    }
498    auto* copy =
499        new Tensor(GetAllocator(alloc_attrs), from.dtype(), from.shape());
500
501    // If the tensor is not initialized, we likely ran out of memory.
502    if (!copy->IsInitialized()) {
503      delete copy;
504      Status err = errors::ResourceExhausted(
505          "OOM when allocating tensor of shape ", from.shape().DebugString(),
506          " and type ", DataTypeString(from.dtype()));
507      done(err);
508      return err;
509    }
510
511    StatusCallback wrapped_done = std::bind(
512        [to, copy](StatusCallback done_,
513                   // Begin unbound arguments.
514                   const Status& s) {
515          *to = std::move(*copy);
516          delete copy;
517          done_(s);
518        },
519        std::move(done), std::placeholders::_1);
520
521    port::Tracing::ScopedAnnotation annotation("MakeTensorFromProto");
522    device_contexts_[0]->CopyCPUTensorToDevice(&from, this, copy,
523                                               std::move(wrapped_done));
524    return Status::OK();
525  }
526}
527
528Status BaseGPUDevice::MakeTensorFromProto(const TensorProto& tensor_proto,
529                                          const AllocatorAttributes alloc_attrs,
530                                          Tensor* tensor) {
531  AllocatorAttributes attr;
532  attr.set_on_host(true);
533  attr.set_gpu_compatible(true);
534  Allocator* host_alloc = GetAllocator(attr);
535  Tensor parsed(tensor_proto.dtype());
536  if (!parsed.FromProto(host_alloc, tensor_proto)) {
537    return errors::InvalidArgument("Cannot parse tensor from proto: ",
538                                   tensor_proto.DebugString());
539  }
540
541  if (parsed.dtype() == DT_VARIANT) {
542    if (parsed.shape().dims() != 0) {
543      // TODO(b/67311047): Expand support to non-singleton variants?
544      return errors::Unimplemented(
545          "GPUDevice::MakeTensorFromProto: Only singleton Variants are "
546          "supported. Tensor has shape: ",
547          parsed.shape().DebugString());
548    }
549    const Variant& from = parsed.scalar<Variant>()();
550    Tensor copy(cpu_allocator(), DT_VARIANT, TensorShape({}));
551    Variant* copy_variant = &(copy.scalar<Variant>()());
552
553    std::list<Notification> notifications;
554    Status copy_status;
555    auto copier = [this, &alloc_attrs, &notifications, &copy_status](
556                      const Tensor& from, Tensor* to) {
557      // Copier isn't run in a multithreaded environment, so we don't
558      // have to worry about the notifications list being modified in parallel.
559      notifications.emplace_back();
560      Notification& n = *notifications.rbegin();
561      return MaybeCopyTensorToGPU(alloc_attrs, from, to,
562                                  [&n, &copy_status](const Status& s) {
563                                    if (copy_status.ok()) {
564                                      copy_status.Update(s);
565                                    }
566                                    n.Notify();
567                                  });
568    };
569    TF_RETURN_IF_ERROR(
570        VariantDeviceCopy(VariantDeviceCopyDirection::HOST_TO_DEVICE, from,
571                          copy_variant, std::move(copier)));
572    for (auto& n : notifications) {
573      n.WaitForNotification();
574    }
575    *tensor = std::move(copy);
576    return copy_status;
577  } else {
578    Notification n;
579    Status status;
580    TF_RETURN_IF_ERROR(MaybeCopyTensorToGPU(alloc_attrs, parsed, tensor,
581                                            [&n, &status](const Status& s) {
582                                              status = s;
583                                              n.Notify();
584                                            }));
585    n.WaitForNotification();
586    return status;
587  }
588}
589
590namespace {
591class ConcretePerOpGpuDevice : public PerOpGpuDevice {
592 public:
593  ConcretePerOpGpuDevice() : device_(&stream_device_) {}
594
595  void Reinitialize(OpKernelContext* context, const cudaStream_t* cuda_stream,
596                    int gpu_id, Allocator* base_allocator, char* scratch) {
597    stream_device_.Reinitialize(context, cuda_stream, gpu_id, base_allocator,
598                                scratch);
599  }
600
601  const Eigen::GpuDevice& device() const override { return device_; }
602
603 private:
604  EigenCudaStreamDevice stream_device_;
605  Eigen::GpuDevice device_;
606};
607}  // namespace
608
609void BaseGPUDevice::ReinitializeDevice(OpKernelContext* context,
610                                       PerOpGpuDevice* device, int stream_id,
611                                       Allocator* allocator) {
612  ConcretePerOpGpuDevice* concrete_device =
613      static_cast<ConcretePerOpGpuDevice*>(device);
614  DCHECK(concrete_device);
615  const cudaStream_t* cuda_stream = reinterpret_cast<const cudaStream_t*>(
616      streams_[stream_id]->compute->implementation()->CudaStreamMemberHack());
617  concrete_device->Reinitialize(context, cuda_stream, gpu_id_, allocator,
618                                scratch_[stream_id]);
619}
620
621PerOpGpuDevice* BaseGPUDevice::MakeGpuDevice() {
622  return new ConcretePerOpGpuDevice();
623}
624
625void BaseGPUDevice::ReinitializeGpuDevice(OpKernelContext* context,
626                                          PerOpGpuDevice* device,
627                                          DeviceContext* dc,
628                                          Allocator* allocator) {
629  if (dc) {
630    const GPUDeviceContext* gpu_dc = static_cast<GPUDeviceContext*>(dc);
631    const int stream_id = gpu_dc->stream_id();
632    VLOG(1) << "  eigen_gpu_device(" << dc << ") => stream[" << stream_id
633            << "]";
634    CHECK_LT(stream_id, streams_.size());
635    ReinitializeDevice(context, device, stream_id, allocator);
636  } else {
637    ReinitializeDevice(context, device, 0, allocator);
638  }
639}
640
641Status BaseGPUDeviceFactory::CreateDevices(const SessionOptions& options,
642                                           const string& name_prefix,
643                                           std::vector<Device*>* devices) {
644  size_t n = INT_MAX;
645  auto iter = options.config.device_count().find("GPU");
646  if (iter != options.config.device_count().end()) {
647    n = iter->second;
648  }
649  std::vector<int> valid_gpu_ids;
650  TF_RETURN_IF_ERROR(GetValidDeviceIds(
651      options.config.gpu_options().visible_device_list(), &valid_gpu_ids));
652  if (static_cast<size_t>(n) > valid_gpu_ids.size()) {
653    n = valid_gpu_ids.size();
654  }
655  for (int i = 0; i < n; i++) {
656    BaseGPUDevice* gpu_device;
657    TF_RETURN_IF_ERROR(CreateGPUDevice(
658        options, strings::StrCat(name_prefix, "/device:GPU:", i),
659        valid_gpu_ids[i], &gpu_device));
660    TF_RETURN_IF_ERROR(gpu_device->Init(options));
661    devices->push_back(gpu_device);
662  }
663
664  return Status::OK();
665}
666
667namespace {
668int64 MinSystemMemory(int64 available_memory) {
669  // We use the following heuristic for now:
670  //
671  // If the available_memory is < 2GiB, we allocate 225MiB to system memory.
672  // Otherwise, allocate max(300MiB, 0.05 * available_memory) to system memory.
673  //
674  // In the future we could be more sophisticated by using a table of devices.
675  int64 min_system_memory;
676  if (available_memory < (1LL << 31)) {
677    // 225MiB
678    min_system_memory = 225 * 1024 * 1024;
679  } else {
680    // max(300 MiB, 0.05 * available_memory)
681    min_system_memory =
682        std::max(314572800LL, static_cast<int64>(available_memory * 0.05));
683  }
684#if defined(__GNUC__) && defined(__OPTIMIZE__)
685// Do nothing
686#elif !defined(__GNUC__) && defined(NDEBUG)
687// Do nothing
688#else
689  // Double the amount of available GPU memory in non-opt builds (debug
690  // builds in windows); because in non-opt builds more system memory
691  // is necessary.
692  min_system_memory *= 2;
693#endif
694  return min_system_memory;
695}
696
697}  // namespace
698
699static string GetShortDeviceDescription(int device_id,
700                                        const gpu::DeviceDescription& desc) {
701  int cc_major;
702  int cc_minor;
703  if (!desc.cuda_compute_capability(&cc_major, &cc_minor)) {
704    cc_major = 0;
705    cc_minor = 0;
706  }
707  // LINT.IfChange
708  return strings::StrCat("device: ", device_id, ", name: ", desc.name(),
709                         ", pci bus id: ", desc.pci_bus_id(),
710                         ", compute capability: ", cc_major, ".", cc_minor);
711  // LINT.ThenChange(//tensorflow/python/platform/test.py)
712}
713
714Status BaseGPUDeviceFactory::CreateGPUDevice(const SessionOptions& options,
715                                             const string& name, int gpu_id,
716                                             BaseGPUDevice** out_device) {
717  CHECK_GE(gpu_id, 0);
718
719  // Look up the device, to see its attributes.
720  gpu::Platform* gpu_platform = GPUMachineManager();
721  CHECK_LT(gpu_id, gpu_platform->VisibleDeviceCount());
722  gpu::StreamExecutor* se =
723      gpu_platform->ExecutorForDevice(gpu_id).ValueOrDie();
724  const gpu::DeviceDescription& desc = se->GetDeviceDescription();
725  int numa_node = desc.numa_node();
726  if (numa_node < 0) {
727    // For some reason the StreamExecutor couldn't get the NUMA
728    // affinity of the GPU.  If this is not a multi-socket mobo with
729    // GPUs local to different buses, it doesn't matter.  If it is, we
730    // may run into trouble later with data transfer operations.  The
731    // trouble may manifest as slower than expected performance, or
732    // outright failures.
733    LOG(INFO) << "Could not identify NUMA node of " << name
734              << ", defaulting to 0.  Your kernel may not have been built "
735              << "with NUMA support.";
736    numa_node = 0;
737  }
738
739  int64 total_memory, available_memory;
740  if (!se->DeviceMemoryUsage(&available_memory, &total_memory)) {
741    return errors::Unknown(
742        strings::StrCat("Failed to query available memory for GPU ", gpu_id));
743  }
744
745  int64 allocated_memory;
746  double config_memory_fraction =
747      options.config.gpu_options().per_process_gpu_memory_fraction();
748  if (config_memory_fraction == 0) {
749    allocated_memory = available_memory;
750    const int64 min_system_memory = MinSystemMemory(available_memory);
751    if (min_system_memory < allocated_memory) {
752      allocated_memory -= min_system_memory;
753    }
754  } else {
755    allocated_memory = total_memory * config_memory_fraction;
756  }
757
758  Bytes allocated_bytes = static_cast<Bytes>(allocated_memory);
759
760  // Get GPU bus_id from its reported NUMA affinity.  Because GPUs are
761  // virtualized in some environments, we can't just use the GPU id.
762  // NUMA locales are indexed from 0, buses are indexed from 1.
763  DeviceLocality dev_locality;
764  dev_locality.set_bus_id(numa_node + 1);
765  VLOG(1) << "GPUDevice id " << gpu_id << " on bus " << dev_locality.bus_id()
766          << " numa: " << numa_node << " pci: " << desc.pci_bus_id();
767
768  ProcessState* process_state = ProcessState::singleton();
769  *out_device = CreateGPUDevice(
770      options, name, allocated_bytes, dev_locality, gpu_id,
771      GetShortDeviceDescription(gpu_id, desc),
772      process_state->GetGPUAllocator(options.config.gpu_options(), gpu_id,
773                                     allocated_memory),
774      process_state->GetCPUAllocator(numa_node));
775
776  return Status::OK();
777}
778
779static int GetDefaultMinGPUMultiprocessorCount(
780    gpu::Platform* gpu_manager, const std::vector<int>& visible_gpu_order) {
781  static const int kDefaultMinGPUMultiprocessorCount = 8;
782
783  // Find the highest multi-processor count across all visible GPUs.
784  int max_count = -1;
785  for (int i = 0; i < visible_gpu_order.size(); ++i) {
786    auto exec_status = gpu_manager->ExecutorForDevice(visible_gpu_order[i]);
787    if (!exec_status.ok()) {
788      continue;
789    }
790
791    gpu::StreamExecutor* se = exec_status.ValueOrDie();
792    const gpu::DeviceDescription& desc = se->GetDeviceDescription();
793    max_count = std::max(max_count, desc.core_count());
794  }
795
796  if (max_count < 0 || kDefaultMinGPUMultiprocessorCount < max_count) {
797    return kDefaultMinGPUMultiprocessorCount;
798  } else {
799    return max_count;
800  }
801}
802
803static int GetMinGPUMultiprocessorCount(
804    gpu::Platform* gpu_manager, const std::vector<int>& visible_gpu_order) {
805  const char* tf_min_gpu_core_count = getenv("TF_MIN_GPU_MULTIPROCESSOR_COUNT");
806
807  if (tf_min_gpu_core_count == nullptr ||
808      strcmp(tf_min_gpu_core_count, "") == 0) {
809    return GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
810  }
811
812  int min_gpu_core_count = -1;
813  if (strings::safe_strto32(tf_min_gpu_core_count, &min_gpu_core_count)) {
814    if (min_gpu_core_count >= 0) {
815      return min_gpu_core_count;
816    }
817  }
818
819  int count =
820      GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
821  LOG(ERROR) << "Invalid minimum GPU multiprocessor count: ["
822             << tf_min_gpu_core_count << "]. "
823             << "Using the default value: " << count;
824  return count;
825}
826
827namespace {
828
829struct CudaVersion {
830  // Initialize from version_name in the form of "3.5"
831  explicit CudaVersion(const std::string& version_name) {
832    size_t dot_pos = version_name.find('.');
833    CHECK(dot_pos != string::npos)
834        << "Illegal version name: [" << version_name << "]";
835    string major_str = version_name.substr(0, dot_pos);
836    CHECK(strings::safe_strto32(major_str, &major_part))
837        << "Illegal version name: [" << version_name << "]";
838    string minor_str = version_name.substr(dot_pos + 1);
839    CHECK(strings::safe_strto32(minor_str, &minor_part))
840        << "Illegal version name: [" << version_name << "]";
841  }
842  CudaVersion() {}
843  bool operator<(const CudaVersion& other) const {
844    if (this->major_part != other.major_part) {
845      return this->major_part < other.major_part;
846    }
847    return this->minor_part < other.minor_part;
848  }
849  friend std::ostream& operator<<(std::ostream& os,
850                                  const CudaVersion& version) {
851    os << version.major_part << "." << version.minor_part;
852    return os;
853  }
854  int major_part = -1;
855  int minor_part = -1;
856};
857
858std::vector<CudaVersion> supported_cuda_compute_capabilities = {
859    TF_CUDA_CAPABILITIES,};
860
861std::vector<CudaVersion> GetSupportedCudaComputeCapabilities() {
862  auto cuda_caps = supported_cuda_compute_capabilities;
863#ifdef TF_EXTRA_CUDA_CAPABILITIES
864// TF_EXTRA_CUDA_CAPABILITIES should be defined a sequence separated by commas,
865// for example:
866//   TF_EXTRA_CUDA_CAPABILITIES=3.0,4.0,5.0
867// Use two-level macro expansion for stringification.
868#define TF_XSTRING(...) #__VA_ARGS__
869#define TF_STRING(s) TF_XSTRING(s)
870  string extra_cuda_caps = TF_STRING(TF_EXTRA_CUDA_CAPABILITIES);
871#undef TF_STRING
872#undef TF_XSTRING
873  auto extra_capabilities = str_util::Split(extra_cuda_caps, ',');
874  for (const auto& capability : extra_capabilities) {
875    cuda_caps.push_back(CudaVersion(capability));
876  }
877#endif
878  return cuda_caps;
879}
880
881std::unique_ptr<std::map<std::pair<int, int>, bool>> GetPeerAccessMap(
882    gpu::Platform* platform, const std::vector<int>& visible_gpu_order) {
883  std::unique_ptr<std::map<std::pair<int, int>, bool>> map(
884      new std::map<std::pair<int, int>, bool>);
885  for (int i = 0; i < visible_gpu_order.size(); ++i) {
886    const int i_gpu_id = visible_gpu_order[i];
887    for (int j = 0; j < visible_gpu_order.size(); ++j) {
888      const int j_gpu_id = visible_gpu_order[j];
889      gpu::StreamExecutor* from =
890          platform->ExecutorForDevice(i_gpu_id).ValueOrDie();
891      gpu::StreamExecutor* to =
892          platform->ExecutorForDevice(j_gpu_id).ValueOrDie();
893      (*map)[{i, j}] = from->CanEnablePeerAccessTo(to);
894    }
895  }
896
897  return map;
898}
899
900Status EnablePeerAccess(gpu::Platform* platform,
901                        const std::vector<int>& visible_gpu_order) {
902  int possible_peer_count = 0;
903  int enabled_peer_count = 0;
904  for (int i = 0; i < visible_gpu_order.size(); ++i) {
905    const int i_gpu_id = visible_gpu_order[i];
906    for (int j = 0; j < visible_gpu_order.size(); ++j) {
907      const int j_gpu_id = visible_gpu_order[j];
908      // We have already validated that ExecutorForDevice() calls
909      // return OK.
910      gpu::StreamExecutor* from =
911          platform->ExecutorForDevice(i_gpu_id).ValueOrDie();
912      gpu::StreamExecutor* to =
913          platform->ExecutorForDevice(j_gpu_id).ValueOrDie();
914
915      if (from->CanEnablePeerAccessTo(to)) {
916        ++possible_peer_count;
917        auto status = from->EnablePeerAccessTo(to);
918        if (!status.ok()) {
919          LOG(WARNING)
920              << "Unable to enable peer access between device ordinals "
921              << i_gpu_id << " and " << j_gpu_id;
922        } else {
923          ++enabled_peer_count;
924        }
925      }
926    }
927  }
928
929  // Return an error in the extreme failure case where the driver
930  // reported that peering was possible but not a single peering was
931  // successful.  This is to catch possible system misconfigurations
932  // or more fundamental issues.
933  if (possible_peer_count > 0 && enabled_peer_count == 0) {
934    return errors::Internal(possible_peer_count,
935                            " potential peer access pairs were reported by the "
936                            "driver, but no peering could be enabled.");
937  }
938  return Status::OK();
939}
940
941}  // namespace
942
943Status BaseGPUDeviceFactory::GetValidDeviceIds(
944    const string& visible_device_list, std::vector<int>* ids) {
945  TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
946
947  gpu::Platform* gpu_manager = GPUMachineManager();
948  if (gpu_manager == nullptr) {
949    return Status::OK();
950  }
951
952  // If there are no GPUs visible, do nothing.
953  if (gpu_manager->VisibleDeviceCount() <= 0) {
954    return Status::OK();
955  }
956
957  // If the user wants to remap the visible to virtual GPU mapping,
958  // check for that here.
959  std::vector<int> visible_gpu_order;
960  if (visible_device_list.empty()) {
961    visible_gpu_order.resize(gpu_manager->VisibleDeviceCount());
962    // By default, visible to virtual mapping is unchanged.
963    int deviceNo = 0;
964    std::generate(visible_gpu_order.begin(), visible_gpu_order.end(),
965                  [&deviceNo] { return deviceNo++; });
966  } else {
967    std::vector<string> order_str = str_util::Split(visible_device_list, ',');
968    for (int i = 0; i < order_str.size(); ++i) {
969      const string& gpu_id_str = order_str[i];
970      int32 gpu_id;
971      if (!strings::safe_strto32(gpu_id_str, &gpu_id)) {
972        return errors::InvalidArgument(
973            "Could not parse entry in 'visible_device_list': '", gpu_id_str,
974            "'.  visible_device_list = ", visible_device_list);
975      }
976
977      if (gpu_id < 0 || gpu_id >= gpu_manager->VisibleDeviceCount()) {
978        return errors::InvalidArgument(
979            "'visible_device_list' listed an invalid GPU id '", gpu_id,
980            "' but visible device count is ",
981            gpu_manager->VisibleDeviceCount());
982      }
983
984      visible_gpu_order.push_back(gpu_id);
985    }
986  }
987
988  // Validate no repeats.
989  std::set<int> visible_device_set(visible_gpu_order.begin(),
990                                   visible_gpu_order.end());
991  if (visible_device_set.size() != visible_gpu_order.size()) {
992    return errors::InvalidArgument(
993        "visible_device_list contained "
994        "a duplicate entry: ",
995        visible_device_list);
996  }
997
998  bool new_gpu_found = false;
999  for (int i = 0; i < visible_gpu_order.size(); ++i) {
1000    int gpu_id = visible_gpu_order[i];
1001
1002    // Only perform this once per visible gpu id.
1003    if (visible_gpu_initialized_[gpu_id]) {
1004      continue;
1005    }
1006
1007    visible_gpu_initialized_[gpu_id] = true;
1008    new_gpu_found = true;
1009
1010    auto executor = gpu_manager->ExecutorForDevice(gpu_id);
1011    if (!executor.ok()) {
1012      return StreamExecutorUtil::ConvertStatus(executor.status());
1013    }
1014
1015    auto stream_exec = executor.ValueOrDie();
1016    int64 free_bytes;
1017    int64 total_bytes;
1018    if (!stream_exec->DeviceMemoryUsage(&free_bytes, &total_bytes)) {
1019      // Logs internally on failure.
1020      free_bytes = 0;
1021      total_bytes = 0;
1022    }
1023    const auto& description = stream_exec->GetDeviceDescription();
1024    int cc_major;
1025    int cc_minor;
1026    if (!description.cuda_compute_capability(&cc_major, &cc_minor)) {
1027      // Logs internally on failure.
1028      cc_major = 0;
1029      cc_minor = 0;
1030    }
1031    LOG(INFO) << "Found device " << i << " with properties: "
1032              << "\nname: " << description.name() << " major: " << cc_major
1033              << " minor: " << cc_minor
1034              << " memoryClockRate(GHz): " << description.clock_rate_ghz()
1035              << "\npciBusID: " << description.pci_bus_id() << "\ntotalMemory: "
1036              << strings::HumanReadableNumBytes(total_bytes)
1037              << " freeMemory: " << strings::HumanReadableNumBytes(free_bytes);
1038  }
1039  // Checking peering and shows matrix if more than one gpu found.
1040  if (new_gpu_found && visible_gpu_order.size() > 1) {
1041    // Enable peer access
1042    TF_RETURN_IF_ERROR(EnablePeerAccess(gpu_manager, visible_gpu_order));
1043
1044    // Print out a matrix showing which devices can DMA to one
1045    // another.
1046    LOG(INFO) << "Device peer to peer matrix";
1047    auto access_map = GetPeerAccessMap(gpu_manager, visible_gpu_order);
1048    string line_buf = "DMA: ";
1049    for (int i = 0; i < visible_gpu_order.size(); ++i) {
1050      strings::StrAppend(&line_buf, visible_gpu_order[i], " ");
1051    }
1052    LOG(INFO) << line_buf;
1053    for (int i = 0; i < visible_gpu_order.size(); ++i) {
1054      line_buf = strings::StrCat(visible_gpu_order[i], ":   ");
1055      for (int j = 0; j < visible_gpu_order.size(); ++j) {
1056        if ((*access_map)[{i, j}]) {
1057          line_buf.append("Y ");
1058        } else {
1059          line_buf.append("N ");
1060        }
1061      }
1062      LOG(INFO) << line_buf;
1063    }
1064  }
1065
1066  auto cuda_supported_capabilities = GetSupportedCudaComputeCapabilities();
1067  if (cuda_supported_capabilities.empty()) {
1068    return errors::FailedPrecondition(
1069        "No supported cuda capabilities in binary.");
1070  }
1071  CudaVersion min_supported_capability = *std::min_element(
1072      cuda_supported_capabilities.begin(), cuda_supported_capabilities.end());
1073
1074  int min_gpu_core_count =
1075      GetMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1076
1077  // Filter out devices that don't have the right capability or power.
1078  for (int i = 0; i < visible_gpu_order.size(); ++i) {
1079    const int32 visible_gpu_id = visible_gpu_order[i];
1080    auto exec_status = gpu_manager->ExecutorForDevice(visible_gpu_id);
1081    if (!exec_status.ok()) {
1082      continue;
1083    }
1084    gpu::StreamExecutor* se = exec_status.ValueOrDie();
1085    const gpu::DeviceDescription& desc = se->GetDeviceDescription();
1086    CudaVersion device_capability;
1087    if (!desc.cuda_compute_capability(&device_capability.major_part,
1088                                      &device_capability.minor_part)) {
1089      continue;
1090    }
1091    // Only GPUs with no less than the minimum supported compute capability is
1092    // accepted.
1093    if (device_capability < min_supported_capability) {
1094      LOG(INFO) << "Ignoring visible gpu device "
1095                << "(" << GetShortDeviceDescription(visible_gpu_id, desc)
1096                << ") "
1097                << "with Cuda compute capability " << device_capability
1098                << ". The minimum required Cuda capability is "
1099                << min_supported_capability << ".";
1100      continue;
1101    }
1102
1103    // Filter out slow GPUs. By default, GPUs with a lower multiprocessor
1104    // count than the fastest GPU are filtered out, unless they have 8 or more
1105    // multiprocessors. If the TF_MIN_GPU_MULTIPROCESSOR_COUNT environment
1106    // variable is set, its value will be used to filter out GPUs.
1107    if (desc.core_count() < min_gpu_core_count) {
1108      LOG(INFO) << "Ignoring gpu device "
1109                << "(" << GetShortDeviceDescription(visible_gpu_id, desc)
1110                << ") "
1111                << "with Cuda multiprocessor count: " << desc.core_count()
1112                << ". The minimum required count is " << min_gpu_core_count
1113                << ". You can adjust this requirement with the env var "
1114                   "TF_MIN_GPU_MULTIPROCESSOR_COUNT.";
1115      continue;
1116    }
1117
1118    size_t new_id = ids->size();
1119    ids->push_back(visible_gpu_id);
1120
1121    LOG(INFO) << "Creating TensorFlow device (/device:GPU:" << new_id << ") -> "
1122              << "(" << GetShortDeviceDescription(visible_gpu_id, desc) << ")";
1123  }
1124
1125  return Status::OK();
1126}
1127
1128}  // namespace tensorflow
1129
1130#endif  // GOOGLE_CUDA
1131