cpu_executable.cc revision 1e67c90e2caceeff82d09793d1ef5fa0300d219b
1/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/compiler/xla/service/cpu/cpu_executable.h"
17
18#include <stdint.h>
19#include <algorithm>
20#include <set>
21#include <unordered_set>
22#include <utility>
23#include <vector>
24
25#include "external/llvm/include/llvm/ExecutionEngine/Orc/IRCompileLayer.h"
26#include "tensorflow/compiler/xla/service/buffer_assignment.h"
27#include "tensorflow/compiler/xla/service/computation_layout.h"
28#include "tensorflow/compiler/xla/service/hlo_computation.h"
29#include "tensorflow/compiler/xla/service/hlo_module.h"
30#include "tensorflow/compiler/xla/service/hlo_module_config.h"
31#include "tensorflow/compiler/xla/service/logical_buffer.h"
32#include "tensorflow/compiler/xla/service/shaped_buffer.h"
33#include "tensorflow/compiler/xla/shape_tree.h"
34#include "tensorflow/compiler/xla/shape_util.h"
35#include "tensorflow/compiler/xla/status_macros.h"
36#include "tensorflow/compiler/xla/types.h"
37#include "tensorflow/compiler/xla/util.h"
38#include "tensorflow/compiler/xla/xla_data.pb.h"
39#include "tensorflow/core/lib/strings/str_util.h"
40#include "tensorflow/core/lib/strings/strcat.h"
41#include "tensorflow/core/lib/strings/stringprintf.h"
42#include "tensorflow/core/platform/env.h"
43#include "tensorflow/core/platform/logging.h"
44#include "tensorflow/core/platform/mem.h"
45#include "tensorflow/core/platform/mutex.h"
46#include "tensorflow/core/platform/types.h"
47
48namespace se = ::perftools::gputools;
49
50namespace xla {
51namespace cpu {
52
53CpuExecutable::CpuExecutable(
54    std::unique_ptr<SimpleOrcJIT> jit,
55    std::unique_ptr<BufferAssignment> assignment,
56    std::unique_ptr<HloModule> hlo_module,
57    std::unique_ptr<HloModuleConfig> module_config,
58    const string& entry_function_name,
59    std::unordered_map<const HloInstruction*, size_t> hlo_to_profile_idx)
60    : Executable(std::move(hlo_module), std::move(module_config)),
61      jit_(std::move(jit)),
62      assignment_(std::move(assignment)),
63      hlo_to_profile_idx_(std::move(hlo_to_profile_idx)) {
64  // Resolve symbols in the constructor rather than at execution time to avoid
65  // races because FindSymbol is not thread safe.
66  llvm::JITSymbol sym = jit_->FindSymbol(entry_function_name);
67  // We expect to find the symbol provided with entry_function_name; otherwise
68  // this is an internal error.
69  CHECK(sym) << "Symbol " << entry_function_name << " not found.";
70  // getAddress can do work under the hood in the jit, so it needs to be
71  // guarded by the mutex.
72  compute_function_ = reinterpret_cast<ComputeFunctionType>(sym.getAddress());
73}
74
75// Given a pointer to an output buffer (following the CPU JIT calling
76// conventions), mark addresses that are "live". The initial pointer itself is
77// trivially live. If the shape of the buffer is a tuple, this analysis looks
78// into the tuple's elements and marks them live as well (since tuples keep
79// pointers to buffers) and also works recursively.  address is an in-memory
80// buffer address that contains some runtime XLA object.  shape is its
81// shape. marked_addresses is the set of live addresses to populate.
82static void MarkLiveAddressesInOutput(
83    const void* address, const Shape& shape,
84    std::unordered_set<const void*>* marked_addresses) {
85  marked_addresses->insert(address);
86  const uintptr_t* address_buffer = static_cast<const uintptr_t*>(address);
87  if (ShapeUtil::IsTuple(shape)) {
88    for (int i = 0; i < ShapeUtil::TupleElementCount(shape); ++i) {
89      const uintptr_t* element_address = address_buffer + i;
90      const void* element = reinterpret_cast<const void*>(*element_address);
91      MarkLiveAddressesInOutput(
92          element, ShapeUtil::GetTupleElementShape(shape, i), marked_addresses);
93    }
94  }
95}
96
97Status CpuExecutable::AllocateBuffers(
98    DeviceMemoryAllocator* memory_allocator, int device_ordinal,
99    std::vector<perftools::gputools::DeviceMemoryBase>* buffers) {
100  CHECK_EQ(buffers->size(), assignment_->Allocations().size());
101  VLOG(3) << "Allocating " << assignment_->Allocations().size()
102          << " allocations for module " << module().name();
103  for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size();
104       ++i) {
105    auto& allocation = assignment_->GetAllocation(i);
106
107    VLOG(3) << allocation.ToString();
108
109    if (allocation.is_entry_computation_parameter()) {
110      VLOG(3) << "allocation #" << i << " is a parameter";
111      continue;
112    }
113
114    if (allocation.is_thread_local()) {
115      VLOG(3) << "buffer #" << i << " is thread-local";
116      continue;
117    }
118
119    int64 buffer_size = allocation.size();
120    if (!(*buffers)[i].is_null()) {
121      VLOG(3) << "buffer #" << i
122              << " is in the preallocated result ShapedBuffer";
123    } else {
124      TF_ASSIGN_OR_RETURN((*buffers)[i], memory_allocator->Allocate(
125                                             device_ordinal, buffer_size));
126
127      VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes ["
128              << (*buffers)[i].opaque() << "]";
129    }
130
131    // Since the output buffer and all the temporary buffers were written into
132    // by the JITed code, msan has no way of knowing their memory was
133    // initialized. Mark them initialized so that msan doesn't flag loads from
134    // these buffers.
135    TF_ANNOTATE_MEMORY_IS_INITIALIZED((*buffers)[i].opaque(), buffer_size);
136  }
137
138  TF_ASSIGN_OR_RETURN(const BufferAllocation* result_allocation,
139                      assignment_->GetUniqueTopLevelOutputAllocation());
140
141  VLOG(3) << "result index: " << result_allocation->index();
142
143  return Status::OK();
144}
145
146Status CpuExecutable::ExecuteComputeFunction(
147    const ExecutableRunOptions* run_options,
148    tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
149    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
150    HloExecutionProfile* hlo_execution_profile) {
151  std::vector<se::DeviceMemoryBase> argument_buffers;
152  for (int i = 0; i < arguments.size(); ++i) {
153    TF_RET_CHECK(!ShapeUtil::IsTuple(arguments[i]->shape()));
154    argument_buffers.push_back(arguments[i]->buffer(/*index=*/{}));
155  }
156  return ExecuteComputeFunction(run_options, argument_buffers, buffers,
157                                hlo_execution_profile);
158}
159
160Status CpuExecutable::ExecuteComputeFunction(
161    const ExecutableRunOptions* run_options,
162    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments,
163    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
164    HloExecutionProfile* hlo_execution_profile) {
165  // The calling convention for JITed functions is:
166  //
167  //  void function(void* result, const void* run_options, void** args_array,
168  //                void** temps_array)
169  //
170  // result: Points at the result.
171  // run_options: the ExecutableRunOptions object.
172  // args_array: An array of pointers, each of which points to a parameter.
173  //               The size of this array is determined by the function's arity
174  //               (ProgramShape).
175  // temps_array:  An array of pointers, each of which points to a temporary
176  //               buffer the computation needs. The size of this array is
177  //               determined by buffer analysis.
178  //
179  std::vector<const void*> args_array;
180  for (se::DeviceMemoryBase arg_mem : arguments) {
181    args_array.push_back(arg_mem.opaque());
182  }
183
184  uint64 start_micros = tensorflow::Env::Default()->NowMicros();
185
186  // Allocate profiling counters for each hlo instruction that we would like to
187  // profile.  Allocate an additional profile counter for the entire
188  // computation.
189  std::vector<uint64> profile_counters(hlo_to_profile_idx_.size() + 1);
190
191  // Call the computation function following the calling convention.
192  std::vector<void*> buffer_pointers;
193  for (auto& buffer : buffers) {
194    buffer_pointers.push_back(const_cast<void*>(buffer.opaque()));
195  }
196  TF_ASSIGN_OR_RETURN(const BufferAllocation* result_allocation,
197                      assignment_->GetUniqueTopLevelOutputAllocation());
198  void* result_buffer = buffer_pointers[result_allocation->index()];
199  if (VLOG_IS_ON(3)) {
200    VLOG(3) << "Executing compute function:";
201    VLOG(3) << tensorflow::strings::Printf(
202        "  func(void* result, void* params[%zu], void* temps[%zu], "
203        "uint64 profile_counters[%zu])",
204        args_array.size(), buffer_pointers.size(), profile_counters.size());
205    VLOG(3) << tensorflow::strings::Printf("    result = %p", result_buffer);
206    auto ptr_printer = [](string* out, const void* p) {
207      tensorflow::strings::StrAppend(out, tensorflow::strings::Printf("%p", p));
208    };
209    VLOG(3) << tensorflow::strings::Printf(
210        "    params = [%s]",
211        tensorflow::str_util::Join(args_array, ", ", ptr_printer).c_str());
212    VLOG(3) << tensorflow::strings::Printf(
213        "    temps = [%s]",
214        tensorflow::str_util::Join(buffer_pointers, ", ", ptr_printer).c_str());
215    VLOG(3) << tensorflow::strings::Printf("    profile_counters = %p",
216                                           profile_counters.data());
217  }
218
219  compute_function_(result_buffer, run_options, args_array.data(),
220                    buffer_pointers.data(), profile_counters.data());
221
222  uint64 end_micros = tensorflow::Env::Default()->NowMicros();
223
224  {
225    tensorflow::mutex_lock lock(mutex_);
226    const double nanoseconds = (end_micros - start_micros) * 1000.0;
227    execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0));
228
229    // The last profile counter is used for the computation as a whole.
230    execution_profile_.set_compute_cycle_count(profile_counters.back());
231  }
232
233  if (hlo_execution_profile != nullptr) {
234    hlo_execution_profile->set_total_cycles_executed(profile_counters.back());
235
236    for (auto hlo_prof_idx : hlo_to_profile_idx_) {
237      const HloInstruction* hlo = hlo_prof_idx.first;
238      uint64 cycles_taken = profile_counters[hlo_prof_idx.second];
239      hlo_execution_profile->AddProfileResult(hlo, cycles_taken);
240    }
241  }
242  return Status::OK();
243}
244
245StatusOr<perftools::gputools::DeviceMemoryBase> CpuExecutable::ExecuteOnStream(
246    const ExecutableRunOptions* run_options,
247    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments,
248    HloExecutionProfile* hlo_execution_profile) {
249  se::Stream* stream = run_options->stream();
250  DeviceMemoryAllocator* memory_allocator = run_options->allocator();
251  std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
252  TF_RETURN_IF_ERROR(AllocateBuffers(
253      memory_allocator, stream->parent()->device_ordinal(), &buffers));
254
255  TF_RETURN_IF_ERROR(ExecuteComputeFunction(run_options, arguments, buffers,
256                                            hlo_execution_profile));
257
258  // Mark the buffers that are actually live (used in the output) when the
259  // computation finishes executing.
260  std::unordered_set<const void*> marked_addresses;
261  TF_ASSIGN_OR_RETURN(const BufferAllocation* result_allocation,
262                      assignment_->GetUniqueTopLevelOutputAllocation());
263  se::DeviceMemoryBase top_level_output = buffers[result_allocation->index()];
264  MarkLiveAddressesInOutput(top_level_output.opaque(), result_shape(),
265                            &marked_addresses);
266
267  VLOG(3) << "Live addresses in output marking found "
268          << marked_addresses.size() << " addresses:\n"
269          << tensorflow::str_util::Join(
270                 marked_addresses, ", ", [](string* out, const void* address) {
271                   tensorflow::strings::StrAppend(
272                       out, tensorflow::strings::Printf("%p", address));
273                 });
274
275  // Computation is done - deallocate temp buffers. Keep those marked live
276  // because they are referenced by the output of the computation and are needed
277  // by the service. They will be deallocated by the service.
278  for (auto i = 0; i < buffers.size(); ++i) {
279    auto alloc = buffers[i];
280    if (marked_addresses.count(alloc.opaque()) == 0 &&
281        alloc.opaque() != nullptr) {
282      VLOG(3) << "CpuExecutable deallocating buffer #" << i << " ["
283              << alloc.opaque() << "]";
284      TF_RETURN_IF_ERROR(memory_allocator->Deallocate(
285          stream->parent()->device_ordinal(), &alloc));
286    }
287  }
288
289  return top_level_output;
290}
291
292StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteOnStream(
293    const ExecutableRunOptions* run_options,
294    tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
295    HloExecutionProfile* hlo_execution_profile) {
296  se::Stream* stream = run_options->stream();
297  DeviceMemoryAllocator* memory_allocator = run_options->allocator();
298  if (GetRootPointsToSet().IsAmbiguous()) {
299    return Unimplemented("Points-to set of root instruction is ambiguous");
300  }
301  std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
302
303  TF_ASSIGN_OR_RETURN(
304      std::unique_ptr<ShapedBuffer> result_buffer,
305      ShapedBuffer::MakeShapedBuffer(
306          module_config().entry_computation_layout().result_shape(),
307          stream->parent()->platform(), stream->parent()->device_ordinal()));
308
309  TF_RETURN_IF_ERROR(AllocateBuffers(
310      memory_allocator, stream->parent()->device_ordinal(), &buffers));
311
312  TF_RETURN_IF_ERROR(ExecuteComputeFunction(run_options, arguments, buffers,
313                                            hlo_execution_profile));
314
315  // Copy DeviceMemoryBase values which contain the array(s) of the result into
316  // the respective location in ShapedBuffer which is returned to the caller.
317  std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false);
318  TF_RETURN_IF_ERROR(
319      result_buffer->mutable_shape_index_to_buffer_entry()
320          ->ForEachMutableElement(
321              [&buffers, &buffers_in_result, &result_buffer, this](
322                  const ShapeIndex& index, bool is_leaf, size_t* buffer_entry) {
323                if (is_leaf) {
324                  const std::vector<const LogicalBuffer*>& sources =
325                      this->GetRootPointsToSet().element(index);
326                  // The points to set is unambiguous so the set should be a
327                  // singleton.
328                  CHECK_EQ(1, sources.size());
329                  const LogicalBuffer* buffer_source = sources[0];
330                  HloInstruction* src = buffer_source->instruction();
331
332                  // The source for this result buffer can be a nested buffer
333                  // such as a tuple element.
334
335                  // The source instruction should have a non-parameter buffer
336                  // assigned.
337                  TF_ASSIGN_OR_RETURN(const BufferAllocation* allocation,
338                                      this->assignment_->GetUniqueAllocation(
339                                          src, buffer_source->index()));
340                  CHECK(!allocation->is_entry_computation_parameter());
341
342                  CHECK(!buffers[allocation->index()].is_null() ||
343                        buffers[allocation->index()].size() == 0);
344                  result_buffer->mutable_buffers()->push_back(
345                      buffers[allocation->index()]);
346                  *buffer_entry = result_buffer->mutable_buffers()->size() - 1;
347                  buffers_in_result[allocation->index()] = true;
348                }
349                return Status::OK();
350              }));
351
352  // Free all buffers not in the result.
353  for (auto i = 0; i < buffers.size(); ++i) {
354    auto alloc = buffers[i];
355    if (!buffers_in_result[i] && !alloc.is_null()) {
356      VLOG(3) << "CpuExecutable deallocating buffer #" << i << " ["
357              << alloc.opaque() << "]";
358      TF_RETURN_IF_ERROR(memory_allocator->Deallocate(
359          stream->parent()->device_ordinal(), &alloc));
360    }
361  }
362
363  return std::move(result_buffer);
364}
365
366Status CpuExecutable::ExecuteOnStream(
367    const ExecutableRunOptions* run_options,
368    tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
369    ShapedBuffer* result_buffer, HloExecutionProfile* hlo_execution_profile) {
370  se::Stream* stream = run_options->stream();
371  DeviceMemoryAllocator* memory_allocator = run_options->allocator();
372  // Every array element in the result of the computation must be unambiguously
373  // produced by a single instruction.
374  // This ensures that the buffers inside result_buffer can be assigned without
375  // conflict to the respective instructions because there is a one-to-one
376  // correspondence between hlo instructions and array buffers in the result.
377  if (GetRootPointsToSet().IsAmbiguous()) {
378    return Unimplemented(
379        "Points-to set of root instruction is ambiguous or not distinct");
380  }
381  std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
382  DCHECK(ShapeUtil::Compatible(result_buffer->shape(), result_shape()));
383
384  // If two tuple elements point to the same buffer, one of the results in the
385  // result buffer is considered the canonical location while the other result
386  // points to it (instead of, say, making a copy of the result).
387  // buffer_index_to_shape_index maps a buffer index to its canonical location
388  // in the result buffer.
389  std::unordered_map<BufferAllocation::Index, size_t>
390      buffer_index_to_shape_index;
391
392  // Copy values from result_buffer to the index in "buffers". These buffers
393  // will not be allocated in the call to AllocateBuffers.
394  std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false);
395  TF_RETURN_IF_ERROR(
396      result_buffer->mutable_shape_index_to_buffer_entry()
397          ->ForEachMutableElement(
398              [&buffers, &buffers_in_result, &buffer_index_to_shape_index,
399               result_buffer, this](const ShapeIndex& index, bool is_leaf,
400                                    size_t* buffer_entry) {
401                if (is_leaf) {
402                  const std::vector<const LogicalBuffer*>& sources =
403                      this->GetRootPointsToSet().element(index);
404                  // The points to set is unambiguous so the set should be a
405                  // singleton.
406                  CHECK_EQ(1, sources.size());
407                  const LogicalBuffer* buffer_source = sources[0];
408                  HloInstruction* src = buffer_source->instruction();
409
410                  // The source for this result buffer can be a nested buffer
411                  // such as a tuple element.
412
413                  // The source instruction should have a non-parameter buffer
414                  // assigned.
415                  TF_ASSIGN_OR_RETURN(const BufferAllocation* allocation,
416                                      this->assignment_->GetUniqueAllocation(
417                                          src, buffer_source->index()));
418                  CHECK(!allocation->is_entry_computation_parameter());
419
420                  auto insert_result = buffer_index_to_shape_index.emplace(
421                      allocation->index(), *buffer_entry);
422                  if (insert_result.second) {
423                    // The points-to set is distinct so this buffer should not
424                    // have
425                    // been assigned in a previous invocation of this lambda.
426                    perftools::gputools::DeviceMemoryBase memory_base =
427                        result_buffer->buffer(index);
428                    CHECK(buffers[allocation->index()].is_null());
429                    CHECK(!memory_base.is_null());
430                    buffers[allocation->index()] = memory_base;
431                    buffers_in_result[allocation->index()] = true;
432                  } else {
433                    // Record the fact that this tuple element is identical to
434                    // some
435                    // prior result.
436                    *buffer_entry = insert_result.first->second;
437                  }
438                }
439                return Status::OK();
440              }));
441
442  TF_RETURN_IF_ERROR(AllocateBuffers(
443      memory_allocator, stream->parent()->device_ordinal(), &buffers));
444
445  TF_RETURN_IF_ERROR(ExecuteComputeFunction(run_options, arguments, buffers,
446                                            hlo_execution_profile));
447
448  // Free all buffers not in the result.
449  for (auto i = 0; i < buffers.size(); ++i) {
450    auto alloc = buffers[i];
451    if (!buffers_in_result[i] && !alloc.is_null()) {
452      VLOG(3) << "CpuExecutable deallocating buffer #" << i << " ["
453              << alloc.opaque() << "]";
454      TF_RETURN_IF_ERROR(memory_allocator->Deallocate(
455          stream->parent()->device_ordinal(), &alloc));
456    }
457  }
458
459  return Status::OK();
460}
461
462StatusOr<perftools::gputools::DeviceMemoryBase>
463CpuExecutable::ExecuteAsyncOnStream(
464    const ExecutableRunOptions* run_options,
465    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments) {
466  // TODO(b/30671675): Implement asynchronous execution mode.
467  return Unimplemented(
468      "Asynchronous execution on stream is not yet supported on CPU.");
469}
470
471const PointsToSet& CpuExecutable::GetRootPointsToSet() const {
472  return assignment_->points_to_analysis().GetPointsToSet(
473      module().entry_computation()->root_instruction());
474}
475
476}  // namespace cpu
477}  // namespace xla
478