1/* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.h"
17
18#include <stdint.h>
19#include <algorithm>
20#include <deque>
21#include <iterator>
22#include <list>
23#include <unordered_set>
24#include <utility>
25#include <vector>
26
27#include "llvm/ExecutionEngine/Orc/IRCompileLayer.h"
28#include "tensorflow/compiler/xla/map_util.h"
29#include "tensorflow/compiler/xla/service/buffer_assignment.h"
30#include "tensorflow/compiler/xla/service/cpu/cpu_runtime.h"
31#include "tensorflow/compiler/xla/service/cpu/shape_partition.h"
32#include "tensorflow/compiler/xla/service/hlo_computation.h"
33#include "tensorflow/compiler/xla/service/hlo_module.h"
34#include "tensorflow/compiler/xla/service/hlo_opcode.h"
35#include "tensorflow/compiler/xla/service/logical_buffer.h"
36#include "tensorflow/compiler/xla/service/shaped_buffer.h"
37#include "tensorflow/compiler/xla/shape_util.h"
38#include "tensorflow/compiler/xla/status_macros.h"
39#include "tensorflow/compiler/xla/types.h"
40#include "tensorflow/compiler/xla/util.h"
41#include "tensorflow/compiler/xla/xla_data.pb.h"
42#include "tensorflow/core/lib/core/threadpool.h"
43#include "tensorflow/core/lib/strings/str_util.h"
44#include "tensorflow/core/lib/strings/strcat.h"
45#include "tensorflow/core/lib/strings/stringprintf.h"
46#include "tensorflow/core/platform/env.h"
47#include "tensorflow/core/platform/logging.h"
48#include "tensorflow/core/platform/mem.h"
49#include "tensorflow/core/platform/mutex.h"
50#include "tensorflow/core/platform/types.h"
51
52namespace se = ::perftools::gputools;
53
54namespace xla {
55namespace cpu {
56
57ParallelCpuExecutable::ParallelCpuExecutable(
58    std::unique_ptr<SimpleOrcJIT> jit,
59    std::unique_ptr<const BufferAssignment> assignment,
60    std::unique_ptr<const HloModule> hlo_module,
61    std::unique_ptr<const HloInstructionMap<string>> function_names,
62    std::unordered_map<const HloInstruction*, std::unique_ptr<unsigned char[]>>
63        aligned_constants,
64    std::unique_ptr<HloProfilePrinterData> hlo_profile_printer_data,
65    std::unique_ptr<HloProfileIndexMap> hlo_profile_index_map)
66    : Executable(std::move(hlo_module), std::move(hlo_profile_printer_data),
67                 std::move(hlo_profile_index_map)),
68      jit_(std::move(jit)),
69      assignment_(std::move(assignment)),
70      function_names_(std::move(function_names)),
71      aligned_constants_(std::move(aligned_constants)) {}
72
73// Type of the computation function we expect in the JIT.
74using ComputeFunctionType = void (*)(void*, const void*, const void**, void**,
75                                     int64*, int64*);
76
77// Given a pointer to an output buffer (following the CPU JIT calling
78// conventions), mark addresses that are "live". The initial pointer itself is
79// trivially live. If the shape of the buffer is a tuple, this analysis looks
80// into the tuple's elements and marks them live as well (since tuples keep
81// pointers to buffers) and also works recursively.
82// address is an in-memory buffer address that contains some runtime XLA object.
83// shape is its shape. marked_addresses is the set of live addresses to
84// populate.
85static void MarkLiveAddressesInOutput(
86    const void* address, const Shape& shape,
87    std::unordered_set<const void*>* marked_addresses) {
88  marked_addresses->insert(address);
89  const uintptr_t* address_buffer = static_cast<const uintptr_t*>(address);
90  if (ShapeUtil::IsTuple(shape)) {
91    for (int i = 0; i < ShapeUtil::TupleElementCount(shape); ++i) {
92      const uintptr_t* element_address = address_buffer + i;
93      const void* element = reinterpret_cast<const void*>(*element_address);
94      MarkLiveAddressesInOutput(
95          element, ShapeUtil::GetTupleElementShape(shape, i), marked_addresses);
96    }
97  }
98}
99
100namespace {
101
102// Executor manages the concurrent execution of 'functions' for instructions
103// in 'pending' on 'thread_pool' (storing resulting data in 'results').
104class Executor {
105 public:
106  Executor(const HloInstructionMap<ComputeFunctionType>& functions,
107           const ServiceExecutableRunOptions* run_options,
108           std::list<HloInstruction*>* pending,
109           HloInstructionMap<const void*>* results, void** temps_array,
110           int64* profile_counters_array, const BufferAssignment* assignment)
111      : functions_(functions),
112        run_options_(run_options),
113        pending_(pending),
114        results_(results),
115        temps_array_(temps_array),
116        profile_counters_array_(profile_counters_array),
117        thread_pool_(CHECK_NOTNULL(run_options_->xla_intra_op_thread_pool())),
118        assignment_(assignment) {}
119
120  // Executes pending list of instructions on thread pool.
121  // Returns OK status on success, error status otherwise.
122  Status Run();
123
124 private:
125  // Schedules a parallel invocation of compute function for 'instruction' on
126  // 'thread_pool_', storing result in 'result_buffer'.
127  // If 'partition_buffers' is non-null, parallel task will be invoked on
128  // per-dimension partition [start, limit) values stored in
129  // 'partition_buffers'.
130  void Schedule(HloInstruction* instruction, int64* partition_buffers,
131                void* result_buffer);
132
133  // Returns true if 'instruction' has been assigned parallel tasks (returns
134  // false otherwise).
135  bool HasParallelTasks(HloInstruction* instruction);
136
137  // Returns in 'partition_buffers' the partition [size, limit) for each
138  // dimension.
139  int64* GetPartitionBuffers(
140      const std::vector<std::pair<int64, int64>>& partition);
141
142  // Returns array of result buffers for all operands in 'instruction'.
143  const void** GetOperandBuffers(HloInstruction* instruction);
144
145  // Arguments passed into Executor.
146  const HloInstructionMap<ComputeFunctionType>& functions_;
147  const ServiceExecutableRunOptions* run_options_;
148  std::list<HloInstruction*>* pending_;
149  HloInstructionMap<const void*>* results_;
150  void** temps_array_;
151  int64* profile_counters_array_;
152  tensorflow::thread::ThreadPool* thread_pool_;
153  const BufferAssignment* assignment_;
154
155  // Members used to manage instruction execution.
156  tensorflow::mutex completion_queue_lock_;
157  tensorflow::condition_variable completion_queue_cv_;
158  std::deque<HloInstruction*> completion_queue_;
159  int64 instructions_in_flight_ = 0;
160  std::unordered_map<const HloInstruction*, int64> tasks_in_flight_;
161};
162
163Status Executor::Run() {
164  while (!pending_->empty() || instructions_in_flight_ > 0) {
165    auto pending_it = pending_->begin();
166    while (pending_it != pending_->end()) {
167      HloInstruction* instruction = *pending_it;
168      // Skip pending instructions whose operands aren't ready.
169      if (std::any_of(instruction->operands().begin(),
170                      instruction->operands().end(),
171                      [&](HloInstruction* operand) {
172                        return !ContainsKey(*results_, operand);
173                      })) {
174        ++pending_it;
175        continue;
176      }
177
178      // Get 'result_buffer' reference to result buffer for 'instruction'.
179      TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
180                          assignment_->GetUniqueTopLevelSlice(instruction));
181      void* result_buffer =
182          static_cast<char*>(temps_array_[result_slice.index()]) +
183          result_slice.offset();
184
185      if (HasParallelTasks(instruction)) {
186        // 'instruction' has been assigned parallel task partitions.
187        CHECK_EQ(HloOpcode::kCall, instruction->opcode());
188        HloInstruction* root = instruction->to_apply()->root_instruction();
189
190        // Create ShapePartitionIterator to iterate through all outer dimension
191        // partitions of 'instruction'.
192        ShapePartitionIterator partition_iterator(
193            root->shape(), root->outer_dimension_partitions());
194
195        const int64 partition_count =
196            partition_iterator.GetTotalPartitionCount();
197
198        // Record total parallel task count for 'instruction' before dispatch.
199        {
200          tensorflow::mutex_lock l(completion_queue_lock_);
201          tasks_in_flight_.insert(std::make_pair(instruction, partition_count));
202          VLOG(2) << "Schedule PARALLEL"
203                  << " instruction: " << instruction->name()
204                  << " instruction.callee: "
205                  << instruction->to_apply()->root_instruction()->name()
206                  << " partition_count: " << partition_count;
207        }
208
209        for (int64 i = 0; i < partition_count; ++i) {
210          // Get partition [start, limit) for each dimension.
211          auto partition_buffers =
212              GetPartitionBuffers(partition_iterator.GetPartition(i));
213          Schedule(instruction, partition_buffers, result_buffer);
214        }
215
216      } else {
217        // Set tasks in-flight to '1' for sequential instruction execution.
218        {
219          tensorflow::mutex_lock l(completion_queue_lock_);
220          tasks_in_flight_.insert(std::make_pair(instruction, 1));
221          VLOG(2) << "Schedule SEQUENTIAL"
222                  << " instruction: " << instruction->name()
223                  << " instruction.callee: "
224                  << instruction->to_apply()->root_instruction()->name();
225        }
226        Schedule(instruction, nullptr, result_buffer);
227      }
228
229      ++instructions_in_flight_;
230      pending_it = pending_->erase(pending_it);
231    }
232    // Wait for a completed HLO instruction to be present in the queue.  We will
233    // pop it out of the queue and make the result available to its users.
234    HloInstruction* instruction;
235    do {
236      tensorflow::mutex_lock l(completion_queue_lock_);
237      if (completion_queue_.empty()) {
238        completion_queue_cv_.wait(l);
239      }
240      if (!completion_queue_.empty()) {
241        instruction = completion_queue_.front();
242        completion_queue_.pop_front();
243        break;
244      }
245    } while (true);
246    TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
247                        assignment_->GetUniqueTopLevelSlice(instruction));
248    void* result_buffer =
249        static_cast<char*>(temps_array_[result_slice.index()]) +
250        result_slice.offset();
251    InsertOrDie(results_, instruction, result_buffer);
252    --instructions_in_flight_;
253  }
254  return Status::OK();
255}
256
257void Executor::Schedule(HloInstruction* instruction, int64* partition_buffers,
258                        void* result_buffer) {
259  // The thread pool entry takes ownership of |operand_buffers|.
260  auto operand_buffers = GetOperandBuffers(instruction);
261
262  auto function = FindOrDie(functions_, instruction);
263  const auto* exec_run_options = &run_options_->run_options();
264  thread_pool_->Schedule([this, instruction, result_buffer, operand_buffers,
265                          partition_buffers, exec_run_options, function]() {
266    function(result_buffer, exec_run_options, operand_buffers, temps_array_,
267             partition_buffers, profile_counters_array_);
268
269    delete[] operand_buffers;
270    delete[] partition_buffers;
271    // Push the completed HLO instruction on the queue, the main
272    // thread will pop it off and potentially launch more work which
273    // uses the result.
274    // TODO(b/27458679) Consider alternative task scheduling and synchronization
275    // schemes. For example, we could avoid the overhead associate with the
276    // condvar here if the thread just dequed the next instruction to execute
277    // on completion.
278    {
279      tensorflow::mutex_lock l(completion_queue_lock_);
280      // Decrement in-flight task count for this completion.
281      if (--FindOrDie(tasks_in_flight_, instruction) == 0) {
282        completion_queue_.push_back(instruction);
283        completion_queue_cv_.notify_all();
284        tasks_in_flight_.erase(instruction);
285      }
286    }
287  });
288}
289
290int64* Executor::GetPartitionBuffers(
291    const std::vector<std::pair<int64, int64>>& partition) {
292  // Return in 'partition_buffers' partition [size, limit) for each dimension.
293  auto partition_buffers = new int64[partition.size() * 2];
294  for (int i = 0; i < partition.size(); ++i) {
295    partition_buffers[2 * i + 0] = partition[i].first;
296    partition_buffers[2 * i + 1] = partition[i].first + partition[i].second;
297  }
298  return partition_buffers;
299}
300
301bool Executor::HasParallelTasks(HloInstruction* instruction) {
302  return instruction->opcode() == HloOpcode::kCall &&
303         !instruction->to_apply()
304              ->root_instruction()
305              ->outer_dimension_partitions()
306              .empty();
307}
308
309const void** Executor::GetOperandBuffers(HloInstruction* instruction) {
310  // We cannot use a move-only RAII type like std::unique_ptr because the
311  // list of operands is allocated on the main thread and transferred to the
312  // worker via the lambda passed to enqueue_function.  In order for the
313  // lambda to take ownership, we would need to use generalized lambda
314  // capture which is a feature new to C++14.
315  // TODO(b/27458679) Avoid dynamic allocations in Executor.
316  auto operand_buffers = new const void*[instruction->operand_count()];
317  std::transform(instruction->operands().begin(), instruction->operands().end(),
318                 operand_buffers, [this](HloInstruction* operand) {
319                   return FindOrDie(*results_, operand);
320                 });
321  return operand_buffers;
322}
323
324}  // namespace
325
326Status ParallelCpuExecutable::AllocateBuffers(
327    DeviceMemoryAllocator* memory_allocator, int device_ordinal,
328    std::vector<perftools::gputools::DeviceMemoryBase>* buffers) {
329  CHECK_EQ(buffers->size(), assignment_->Allocations().size());
330  VLOG(3) << "Allocating " << assignment_->Allocations().size()
331          << " allocations for module " << module().name();
332  for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size();
333       ++i) {
334    auto& allocation = assignment_->GetAllocation(i);
335
336    VLOG(3) << allocation.ToString();
337
338    if (allocation.is_entry_computation_parameter()) {
339      VLOG(3) << "allocation #" << i << " is a parameter";
340      continue;
341    }
342
343    if (allocation.is_thread_local()) {
344      VLOG(3) << "buffer #" << i << " is thread-local";
345      continue;
346    }
347
348    int64 buffer_size = allocation.size();
349    if (!(*buffers)[i].is_null()) {
350      VLOG(3) << "buffer #" << i
351              << " is in the preallocated result ShapedBuffer";
352    } else {
353      TF_ASSIGN_OR_RETURN((*buffers)[i], memory_allocator->Allocate(
354                                             device_ordinal, buffer_size));
355
356      VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes ["
357              << (*buffers)[i].opaque() << "]";
358    }
359
360    // Since the output buffer and all the temporary buffers were written into
361    // by the JITed code, msan has no way of knowing their memory was
362    // initialized. Mark them initialized so that msan doesn't flag loads from
363    // these buffers.
364    TF_ANNOTATE_MEMORY_IS_INITIALIZED((*buffers)[i].opaque(), buffer_size);
365  }
366
367  TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice,
368                      assignment_->GetUniqueTopLevelOutputSlice());
369  VLOG(3) << "result index: " << result_slice.index();
370
371  return Status::OK();
372}
373
374Status ParallelCpuExecutable::ExecuteComputeFunctions(
375    const ServiceExecutableRunOptions* run_options,
376    tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
377    tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers,
378    HloExecutionProfile* hlo_execution_profile) {
379  // Allocate profiling counters for each hlo instruction that we would like to
380  // profile.
381  std::vector<int64>* profile_counters = nullptr;
382  if (hlo_execution_profile) {
383    profile_counters = hlo_execution_profile->mutable_profile_counters();
384  }
385
386  std::vector<void*> buffer_pointers;
387  buffer_pointers.reserve(buffers.size());
388  for (auto device_allocation : buffers) {
389    buffer_pointers.push_back(device_allocation.opaque());
390  }
391
392  // Resolve functions for all the HLO instructions ahead of time.
393  HloInstructionMap<ComputeFunctionType> functions;
394  for (auto& entry : *function_names_) {
395    tensorflow::mutex_lock lock(jit_mutex_);
396    HloInstruction* instruction = entry.first;
397    llvm::JITSymbol sym = jit_->FindCompiledSymbol(entry.second);
398    TF_RET_CHECK(sym);
399    InsertOrDie(
400        &functions, instruction,
401        reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress())));
402  }
403
404  // Map containing pointers to result buffers for each instruction.
405  HloInstructionMap<const void*> results;
406
407  uint64 start_micros = tensorflow::Env::Default()->NowMicros();
408
409  std::list<HloInstruction*> pending;
410
411  // Call the function for each HLO instruction in topological order.
412  const HloComputation& entry_computation = *module().entry_computation();
413  for (auto* instruction : entry_computation.MakeInstructionPostOrder()) {
414    // Parameters and constants have no functions associated with them. Instead
415    // just copy the existing buffer into the map containing instruction
416    // results..
417    if (instruction->opcode() == HloOpcode::kParameter) {
418      InsertOrDie(
419          &results, instruction,
420          arguments[instruction->parameter_number()]->root_buffer().opaque());
421    } else if (instruction->opcode() == HloOpcode::kConstant) {
422      unsigned char* aligned_data =
423          FindOrDie(aligned_constants_, instruction).get();
424      InsertOrDie(&results, instruction, aligned_data);
425    } else {
426      TF_RET_CHECK(instruction->opcode() == HloOpcode::kCall);
427      pending.push_back(instruction);
428    }
429  }
430
431  // TODO(b/27458679) Manage scheduling based on in-flight concurrency limits.
432  // For example, if we expect a library conv/matmul call to run at max
433  // concurrency, we should not dispatch runnable instructions until the
434  // library call is finished (to avoid expensive cache invalidation).
435  Executor executor(
436      functions, run_options, &pending, &results, buffer_pointers.data(),
437      profile_counters ? profile_counters->data() : nullptr, assignment_.get());
438
439  TF_RETURN_IF_ERROR(executor.Run());
440
441  uint64 end_micros = tensorflow::Env::Default()->NowMicros();
442
443  {
444    tensorflow::mutex_lock lock(mutex_);
445    double nanoseconds = (end_micros - start_micros) * 1000.0;
446    execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0));
447  }
448
449  return Status::OK();
450}
451
452StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream(
453    const ServiceExecutableRunOptions* run_options,
454    tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments,
455    HloExecutionProfile* hlo_execution_profile) {
456  if (GetRootPointsToSet().IsAmbiguous()) {
457    return Unimplemented("Points-to set of root instruction is ambiguous");
458  }
459
460  se::Stream* stream = run_options->stream();
461  DeviceMemoryAllocator* memory_allocator = run_options->allocator();
462  std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size());
463
464  auto result_buffer = MakeUnique<ShapedBuffer>(
465      /*on_host_shape=*/result_shape(), /*on_device_shape=*/result_shape(),
466      stream->parent()->platform(), stream->parent()->device_ordinal());
467
468  TF_RETURN_IF_ERROR(AllocateBuffers(
469      memory_allocator, stream->parent()->device_ordinal(), &buffers));
470
471  TF_RETURN_IF_ERROR(ExecuteComputeFunctions(run_options, arguments, buffers,
472                                             hlo_execution_profile));
473
474  // Copy DeviceMemoryBase values which into the respective location in
475  // ShapedBuffer which is returned to the caller.
476  std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false);
477  TF_RETURN_IF_ERROR(result_buffer->buffers().ForEachMutableElementWithStatus(
478      [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) {
479        const auto& sources = this->GetRootPointsToSet().element(index);
480
481        // The points to set is unambiguous so the set should be a singleton.
482        CHECK_EQ(1, sources.size());
483        const LogicalBuffer* buffer_source = sources[0];
484        HloInstruction* src = buffer_source->instruction();
485
486        // The source for this result buffer can be a nested buffer such as a
487        // tuple element. The source instruction should have a non-parameter
488        // buffer assigned.
489        TF_ASSIGN_OR_RETURN(
490            const BufferAllocation::Slice slice,
491            this->assignment_->GetUniqueSlice(src, buffer_source->index()));
492        CHECK(!slice.allocation()->is_entry_computation_parameter());
493
494        const BufferAllocation::Index buffer_index = slice.index();
495        const se::DeviceMemoryBase& buffer = buffers[buffer_index];
496        CHECK(!buffer.is_null() || buffer.size() == 0);
497        *device_memory = buffer;
498        buffers_in_result[buffer_index] = true;
499        return Status::OK();
500      }));
501
502  // Free all buffers not in the result.
503  for (size_t i = 0; i < buffers.size(); ++i) {
504    se::DeviceMemoryBase alloc = buffers[i];
505    if (!buffers_in_result[i] && !alloc.is_null()) {
506      VLOG(3) << "CpuExecutable deallocating buffer #" << i << " ["
507              << alloc.opaque() << "]";
508      TF_RETURN_IF_ERROR(memory_allocator->Deallocate(
509          stream->parent()->device_ordinal(), &alloc));
510    }
511  }
512
513  return std::move(result_buffer);
514}
515
516StatusOr<std::unique_ptr<ShapedBuffer>>
517ParallelCpuExecutable::ExecuteAsyncOnStream(
518    const ServiceExecutableRunOptions* run_options,
519    tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments) {
520  // TODO(b/30671675): Implement asynchronous execution mode.
521  return Unimplemented(
522      "Asynchronous execution on stream is not yet supported on CPU.");
523}
524
525const PointsToSet& ParallelCpuExecutable::GetRootPointsToSet() const {
526  return assignment_->points_to_analysis().GetPointsToSet(
527      module().entry_computation()->root_instruction());
528}
529
530}  // namespace cpu
531}  // namespace xla
532