cpu_executable.cc revision cf245240ca90e6b552415f720342ae1acd326590
1/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/compiler/xla/service/cpu/cpu_executable.h"
17
18#include <stdint.h>
19#include <algorithm>
20#include <set>
21#include <unordered_set>
22#include <utility>
23#include <vector>
24
25#include "llvm/ExecutionEngine/Orc/IRCompileLayer.h"
26#include "tensorflow/compiler/xla/service/buffer_assignment.h"
27#include "tensorflow/compiler/xla/service/computation_layout.h"
28#include "tensorflow/compiler/xla/service/hlo_computation.h"
29#include "tensorflow/compiler/xla/service/hlo_module.h"
30#include "tensorflow/compiler/xla/service/logical_buffer.h"
31#include "tensorflow/compiler/xla/service/shaped_buffer.h"
32#include "tensorflow/compiler/xla/shape_tree.h"
33#include "tensorflow/compiler/xla/shape_util.h"
34#include "tensorflow/compiler/xla/status_macros.h"
35#include "tensorflow/compiler/xla/types.h"
36#include "tensorflow/compiler/xla/util.h"
37#include "tensorflow/compiler/xla/xla_data.pb.h"
38#include "tensorflow/core/lib/strings/str_util.h"
39#include "tensorflow/core/lib/strings/strcat.h"
40#include "tensorflow/core/lib/strings/stringprintf.h"
41#include "tensorflow/core/platform/env.h"
42#include "tensorflow/core/platform/logging.h"
43#include "tensorflow/core/platform/mem.h"
44#include "tensorflow/core/platform/mutex.h"
45#include "tensorflow/core/platform/types.h"
46#include "tensorflow/stream_executor/host/host_stream.h"
47
48namespace se = ::perftools::gputools;
49
50namespace xla {
51namespace cpu {
52
53CpuExecutable::CpuExecutable(
54    std::unique_ptr<SimpleOrcJIT> jit,
55    std::unique_ptr<const BufferAssignment> assignment,
56    std::unique_ptr<const HloModule> hlo_module,
57    const string& entry_function_name,
58    std::unordered_map<const HloInstruction*, size_t> hlo_to_profile_idx)
59    : Executable(std::move(hlo_module)),
60      jit_(std::move(jit)),
61      assignment_(std::move(assignment)),
62      hlo_to_profile_idx_(std::move(hlo_to_profile_idx)) {
63  // Resolve symbols in the constructor rather than at execution time to avoid
64  // races because FindSymbol is not thread safe.
65  llvm::JITSymbol sym = jit_->FindSymbol(entry_function_name);
66  // We expect to find the symbol provided with entry_function_name; otherwise
67  // this is an internal error.
68  CHECK(sym) << "Symbol " << entry_function_name << " not found.";
69  // getAddress can do work under the hood in the jit, so it needs to be
70  // guarded by the mutex.
71  compute_function_ =
72      reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress()));
73}
74
75// Given a pointer to an output buffer (following the CPU JIT calling
76// conventions), mark addresses that are "live". The initial pointer itself is
77// trivially live. If the shape of the buffer is a tuple, this analysis looks
78// into the tuple's elements and marks them live as well (since tuples keep
79// pointers to buffers) and also works recursively.  address is an in-memory
80// buffer address that contains some runtime XLA object.  shape is its
81// shape. marked_addresses is the set of live addresses to populate.
82static void MarkLiveAddressesInOutput(
83    const void* address, const Shape& shape,
84    std::unordered_set<const void*>* marked_addresses) {
85  marked_addresses->insert(address);
86  const uintptr_t* address_buffer = static_cast<const uintptr_t*>(address);
87  if (ShapeUtil::IsTuple(shape)) {
88    for (int i = 0; i < ShapeUtil::TupleElementCount(shape); ++i) {
89      const uintptr_t* element_address = address_buffer + i;
90      const void* element = reinterpret_cast<const void*>(*element_address);
91      MarkLiveAddressesInOutput(
92          element, ShapeUtil::GetTupleElementShape(shape, i), marked_addresses);
93    }
94  }
95}
96
97Status CpuExecutable::AllocateBuffers(
98    DeviceMemoryAllocator* memory_allocator, int device_ordinal,
99    std::vector<perftools::gputools::DeviceMemoryBase>* buffers) {
100  CHECK_EQ(buffers->size(), assignment_->Allocations().size());
101  VLOG(3) << "Allocating " << assignment_->Allocations().size()
102          << " allocations for module " << module().name();
103  for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size();
104       ++i) {
105    auto& allocation = assignment_->GetAllocation(i);
106
107    VLOG(3) << allocation.ToString();
108
109    if (allocation.is_entry_computation_parameter()) {
110      VLOG(3) << "allocation #" << i << " is a parameter";
111      continue;
112    }
113
114    if (allocation.is_thread_local()) {
115      VLOG(3) << "buffer #" << i << " is thread-local";
116      continue;
117    }
118
119    int64 buffer_size = allocation.size();
120    if (!(*buffers)[i].is_null()) {
121      VLOG(3) << "buffer #" << i
122              << " is in the preallocated result ShapedBuffer";
123    } else {
124      TF_ASSIGN_OR_RETURN((*buffers)[i], memory_allocator->Allocate(
125                                             device_ordinal, buffer_size));
126
127      VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes ["
128              << (*buffers)[i].opaque() << "]";
129    }
130
131    // Since the output buffer and all the temporary buffers were written into
132    // by the JITed code, msan has no way of knowing their memory was
133    // initialized. Mark them initialized so that msan doesn't flag loads from
134    // these buffers.
135    TF_ANNOTATE_MEMORY_IS_INITIALIZED((*buffers)[i].opaque(), buffer_size);
136  }
137
138  TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
139                      assignment_->GetUniqueTopLevelOutputSlice());
140  VLOG(3) << "result index: " << result_slice.index();
141
142  return Status::OK();
143}
144
145Status CpuExecutable::ExecuteComputeFunction(
146    const ExecutableRunOptions* run_options,
147    tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
148    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
149    HloExecutionProfile* hlo_execution_profile) {
150  std::vector<se::DeviceMemoryBase> argument_buffers;
151  argument_buffers.reserve(arguments.size());
152  for (const auto* argument : arguments) {
153    argument_buffers.push_back(argument->buffer(/*index=*/{}));
154  }
155  return ExecuteComputeFunction(run_options, argument_buffers, buffers,
156                                hlo_execution_profile);
157}
158
159Status CpuExecutable::ExecuteComputeFunction(
160    const ExecutableRunOptions* run_options,
161    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments,
162    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
163    HloExecutionProfile* hlo_execution_profile) {
164  // The calling convention for JITed functions is:
165  //
166  //  void function(void* result, const void* run_options, void** args_array,
167  //                void** temps_array)
168  //
169  // result: Points at the result.
170  // run_options: the ExecutableRunOptions object.
171  // args_array: An array of pointers, each of which points to a parameter.
172  //               The size of this array is determined by the function's arity
173  //               (ProgramShape).
174  // temps_array:  An array of pointers, each of which points to a temporary
175  //               buffer the computation needs. The size of this array is
176  //               determined by buffer analysis.
177  //
178  std::vector<const void*> args_array;
179  for (se::DeviceMemoryBase arg_mem : arguments) {
180    args_array.push_back(arg_mem.opaque());
181  }
182
183  uint64 start_micros = tensorflow::Env::Default()->NowMicros();
184
185  // Allocate profiling counters for each hlo instruction that we would like to
186  // profile.  Allocate an additional profile counter for the entire
187  // computation.
188  std::vector<uint64> profile_counters(hlo_to_profile_idx_.size() + 1);
189
190  // Call the computation function following the calling convention.
191  std::vector<void*> buffer_pointers;
192  for (auto& buffer : buffers) {
193    buffer_pointers.push_back(const_cast<void*>(buffer.opaque()));
194  }
195  TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
196                      assignment_->GetUniqueTopLevelOutputSlice());
197  void* result_buffer = buffer_pointers[result_slice.index()];
198  if (VLOG_IS_ON(3)) {
199    VLOG(3) << "Executing compute function:";
200    VLOG(3) << tensorflow::strings::Printf(
201        "  func(void* result, void* params[%zu], void* temps[%zu], "
202        "uint64 profile_counters[%zu])",
203        args_array.size(), buffer_pointers.size(), profile_counters.size());
204    VLOG(3) << tensorflow::strings::Printf("    result = %p", result_buffer);
205    auto ptr_printer = [](string* out, const void* p) {
206      tensorflow::strings::StrAppend(out, tensorflow::strings::Printf("%p", p));
207    };
208    VLOG(3) << tensorflow::strings::Printf(
209        "    params = [%s]",
210        tensorflow::str_util::Join(args_array, ", ", ptr_printer).c_str());
211    VLOG(3) << tensorflow::strings::Printf(
212        "    temps = [%s]",
213        tensorflow::str_util::Join(buffer_pointers, ", ", ptr_printer).c_str());
214    VLOG(3) << tensorflow::strings::Printf("    profile_counters = %p",
215                                           profile_counters.data());
216  }
217
218  compute_function_(result_buffer, run_options, args_array.data(),
219                    buffer_pointers.data(), profile_counters.data());
220
221  uint64 end_micros = tensorflow::Env::Default()->NowMicros();
222
223  {
224    tensorflow::mutex_lock lock(mutex_);
225    const double nanoseconds = (end_micros - start_micros) * 1000.0;
226    execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0));
227
228    // The last profile counter is used for the computation as a whole.
229    execution_profile_.set_compute_cycle_count(profile_counters.back());
230  }
231
232  if (hlo_execution_profile != nullptr) {
233    hlo_execution_profile->set_total_cycles_executed(
234        *module().entry_computation(), profile_counters.back());
235
236    for (auto hlo_prof_idx : hlo_to_profile_idx_) {
237      const HloInstruction* hlo = hlo_prof_idx.first;
238      uint64 cycles_taken = profile_counters[hlo_prof_idx.second];
239      hlo_execution_profile->SetCyclesTakenBy(hlo, cycles_taken);
240    }
241  }
242  return Status::OK();
243}
244
245static void LogLiveAddresses(
246    const std::unordered_set<const void*>& marked_addresses) {
247  VLOG(3) << "Live addresses in output marking found "
248          << marked_addresses.size() << " addresses:\n"
249          << tensorflow::str_util::Join(
250                 marked_addresses, ", ", [](string* out, const void* address) {
251                   tensorflow::strings::StrAppend(
252                       out, tensorflow::strings::Printf("%p", address));
253                 });
254}
255
256static Status DeallocateTempBuffers(
257    DeviceMemoryAllocator* allocator, se::Stream* stream,
258    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
259    const std::unordered_set<const void*>& marked_addresses) {
260  // Keep those marked live because they are referenced by the output of the
261  // computation and are needed by the service. They will be deallocated by the
262  // service.
263  for (size_t i = 0; i < buffers.size(); ++i) {
264    se::DeviceMemoryBase alloc = buffers[i];
265    if (marked_addresses.count(alloc.opaque()) == 0 && !alloc.is_null()) {
266      VLOG(3) << "CpuExecutable deallocating buffer #" << i << " ["
267              << alloc.opaque() << "]";
268      TF_RETURN_IF_ERROR(
269          allocator->Deallocate(stream->parent()->device_ordinal(), &alloc));
270    }
271  }
272
273  return Status::OK();
274}
275
276StatusOr<perftools::gputools::DeviceMemoryBase> CpuExecutable::ExecuteOnStream(
277    const ServiceExecutableRunOptions* run_options,
278    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments,
279    HloExecutionProfile* hlo_execution_profile) {
280  se::Stream* stream = run_options->stream();
281  DeviceMemoryAllocator* memory_allocator = run_options->allocator();
282  std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
283
284  TF_RETURN_IF_ERROR(AllocateBuffers(
285      memory_allocator, stream->parent()->device_ordinal(), &buffers));
286  TF_RETURN_IF_ERROR(ExecuteComputeFunction(
287      &run_options->run_options(), arguments, buffers, hlo_execution_profile));
288
289  // Mark the buffers that are actually live (used in the output) when the
290  // computation finishes executing.
291  std::unordered_set<const void*> marked_addresses;
292  TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
293                      assignment_->GetUniqueTopLevelOutputSlice());
294  se::DeviceMemoryBase top_level_output = buffers[result_slice.index()];
295  MarkLiveAddressesInOutput(top_level_output.opaque(), result_shape(),
296                            &marked_addresses);
297
298  LogLiveAddresses(marked_addresses);
299  TF_RETURN_IF_ERROR(DeallocateTempBuffers(memory_allocator, stream, buffers,
300                                           marked_addresses));
301
302  return top_level_output;
303}
304
305StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteOnStream(
306    const ServiceExecutableRunOptions* run_options,
307    tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
308    HloExecutionProfile* hlo_execution_profile) {
309  if (GetRootPointsToSet().IsAmbiguous()) {
310    return Unimplemented("Points-to set of root instruction is ambiguous");
311  }
312
313  se::Stream* stream = run_options->stream();
314  DeviceMemoryAllocator* memory_allocator = run_options->allocator();
315  std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
316
317  auto result_buffer =
318      MakeUnique<ShapedBuffer>(result_shape(), stream->parent()->platform(),
319                               stream->parent()->device_ordinal());
320
321  TF_RETURN_IF_ERROR(AllocateBuffers(
322      memory_allocator, stream->parent()->device_ordinal(), &buffers));
323  TF_RETURN_IF_ERROR(ExecuteComputeFunction(
324      &run_options->run_options(), arguments, buffers, hlo_execution_profile));
325
326  // Copy DeviceMemoryBase values which contain the array(s) of the result into
327  // the respective location in ShapedBuffer which is returned to the caller.
328  std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false);
329  TF_RETURN_IF_ERROR(
330      result_buffer->mutable_shape_index_to_buffer_entry()
331          ->ForEachMutableElementWithStatus(
332              [&buffers, &buffers_in_result, &result_buffer, this](
333                  const ShapeIndex& index, size_t* buffer_entry) {
334                const auto& sources = this->GetRootPointsToSet().element(index);
335                // The points to set is unambiguous so the set should be a
336                // singleton.
337                CHECK_EQ(1, sources.size());
338                const LogicalBuffer* buffer_source = sources[0];
339                HloInstruction* src = buffer_source->instruction();
340
341                // The source for this result buffer can be a nested buffer
342                // such as a tuple element.
343
344                // The source instruction should have a non-parameter buffer
345                // assigned.
346                TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice slice,
347                                    this->assignment_->GetUniqueSlice(
348                                        src, buffer_source->index()));
349                CHECK(!slice.allocation()->is_entry_computation_parameter());
350
351                const BufferAllocation::Index buffer_index = slice.index();
352                const se::DeviceMemoryBase& buffer = buffers[buffer_index];
353                CHECK(!buffer.is_null() || buffer.size() == 0);
354                *buffer_entry = result_buffer->mutable_buffers()->size();
355                result_buffer->mutable_buffers()->push_back(buffer);
356                buffers_in_result[buffer_index] = true;
357                return Status::OK();
358              }));
359
360  // Free all buffers not in the result.
361  for (size_t i = 0; i < buffers.size(); ++i) {
362    se::DeviceMemoryBase alloc = buffers[i];
363    if (!buffers_in_result[i] && !alloc.is_null()) {
364      VLOG(3) << "CpuExecutable deallocating buffer #" << i << " ["
365              << alloc.opaque() << "]";
366      TF_RETURN_IF_ERROR(memory_allocator->Deallocate(
367          stream->parent()->device_ordinal(), &alloc));
368    }
369  }
370
371  return std::move(result_buffer);
372}
373
374StatusOr<perftools::gputools::DeviceMemoryBase>
375CpuExecutable::ExecuteAsyncOnStream(
376    const ServiceExecutableRunOptions* run_options,
377    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments) {
378  if (hlo_profiling_enabled()) {
379    return Unimplemented(
380        "Asynchronous execution on stream with hlo profiling is not yet "
381        "supported on CPU.");
382  }
383
384  auto* host_stream = dynamic_cast<perftools::gputools::host::HostStream*>(
385      run_options->stream()->implementation());
386  se::Stream* stream = run_options->stream();
387  DeviceMemoryAllocator* memory_allocator = run_options->allocator();
388  std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
389
390  TF_RETURN_IF_ERROR(AllocateBuffers(
391      memory_allocator, stream->parent()->device_ordinal(), &buffers));
392
393  // Mark the buffers that are actually live (used in the output) when the
394  // computation finishes executing.
395  std::unordered_set<const void*> marked_addresses;
396  TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
397                      assignment_->GetUniqueTopLevelOutputSlice());
398  se::DeviceMemoryBase top_level_output = buffers[result_slice.index()];
399  MarkLiveAddressesInOutput(top_level_output.opaque(), result_shape(),
400                            &marked_addresses);
401
402  LogLiveAddresses(marked_addresses);
403
404  host_stream->EnqueueTask([this, run_options, arguments, buffers,
405                            marked_addresses, memory_allocator, stream]() {
406    // Failing a CHECK here is not great, but I don't see an obvious way to
407    // return a failed Status asynchronously.
408    TF_CHECK_OK(ExecuteComputeFunction(&run_options->run_options(), arguments,
409                                       buffers,
410                                       /*hlo_execution_profile=*/nullptr));
411    TF_CHECK_OK(DeallocateTempBuffers(memory_allocator, stream, buffers,
412                                      marked_addresses));
413  });
414
415  return top_level_output;
416}
417
418/*static*/ int64 CpuExecutable::ShapeSizeBytes(const Shape& shape) {
419  // On the cpu, opaques are pointers.
420  if (ShapeUtil::IsOpaque(shape)) {
421    return sizeof(void*);
422  }
423  return ShapeUtil::ByteSizeOf(shape, sizeof(void*));
424}
425
426const PointsToSet& CpuExecutable::GetRootPointsToSet() const {
427  return assignment_->points_to_analysis().GetPointsToSet(
428      module().entry_computation()->root_instruction());
429}
430
431std::unique_ptr<HloCostAnalysis> CpuExecutable::CreateCostAnalysis() const {
432  return MakeUnique<HloCostAnalysis>(ShapeSizeBytes);
433}
434
435}  // namespace cpu
436}  // namespace xla
437