1/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16#include "tensorflow/compiler/xla/service/cpu/parallel_cpu_executable.h" 17 18#include <stdint.h> 19#include <algorithm> 20#include <deque> 21#include <iterator> 22#include <list> 23#include <unordered_set> 24#include <utility> 25#include <vector> 26 27#include "llvm/ExecutionEngine/Orc/IRCompileLayer.h" 28#include "tensorflow/compiler/xla/map_util.h" 29#include "tensorflow/compiler/xla/service/buffer_assignment.h" 30#include "tensorflow/compiler/xla/service/cpu/cpu_runtime.h" 31#include "tensorflow/compiler/xla/service/cpu/shape_partition.h" 32#include "tensorflow/compiler/xla/service/hlo_computation.h" 33#include "tensorflow/compiler/xla/service/hlo_module.h" 34#include "tensorflow/compiler/xla/service/hlo_opcode.h" 35#include "tensorflow/compiler/xla/service/logical_buffer.h" 36#include "tensorflow/compiler/xla/service/shaped_buffer.h" 37#include "tensorflow/compiler/xla/shape_util.h" 38#include "tensorflow/compiler/xla/status_macros.h" 39#include "tensorflow/compiler/xla/types.h" 40#include "tensorflow/compiler/xla/util.h" 41#include "tensorflow/compiler/xla/xla_data.pb.h" 42#include "tensorflow/core/lib/core/threadpool.h" 43#include "tensorflow/core/lib/strings/str_util.h" 44#include "tensorflow/core/lib/strings/strcat.h" 45#include "tensorflow/core/lib/strings/stringprintf.h" 46#include "tensorflow/core/platform/env.h" 47#include "tensorflow/core/platform/logging.h" 48#include "tensorflow/core/platform/mem.h" 49#include "tensorflow/core/platform/mutex.h" 50#include "tensorflow/core/platform/types.h" 51 52namespace se = ::perftools::gputools; 53 54namespace xla { 55namespace cpu { 56 57ParallelCpuExecutable::ParallelCpuExecutable( 58 std::unique_ptr<SimpleOrcJIT> jit, 59 std::unique_ptr<const BufferAssignment> assignment, 60 std::unique_ptr<const HloModule> hlo_module, 61 std::unique_ptr<const HloInstructionMap<string>> function_names, 62 std::unordered_map<const HloInstruction*, std::unique_ptr<unsigned char[]>> 63 aligned_constants, 64 std::unique_ptr<HloProfilePrinterData> hlo_profile_printer_data, 65 std::unique_ptr<HloProfileIndexMap> hlo_profile_index_map) 66 : Executable(std::move(hlo_module), std::move(hlo_profile_printer_data), 67 std::move(hlo_profile_index_map)), 68 jit_(std::move(jit)), 69 assignment_(std::move(assignment)), 70 function_names_(std::move(function_names)), 71 aligned_constants_(std::move(aligned_constants)) {} 72 73// Type of the computation function we expect in the JIT. 74using ComputeFunctionType = void (*)(void*, const void*, const void**, void**, 75 int64*, int64*); 76 77// Given a pointer to an output buffer (following the CPU JIT calling 78// conventions), mark addresses that are "live". The initial pointer itself is 79// trivially live. If the shape of the buffer is a tuple, this analysis looks 80// into the tuple's elements and marks them live as well (since tuples keep 81// pointers to buffers) and also works recursively. 82// address is an in-memory buffer address that contains some runtime XLA object. 83// shape is its shape. marked_addresses is the set of live addresses to 84// populate. 85static void MarkLiveAddressesInOutput( 86 const void* address, const Shape& shape, 87 std::unordered_set<const void*>* marked_addresses) { 88 marked_addresses->insert(address); 89 const uintptr_t* address_buffer = static_cast<const uintptr_t*>(address); 90 if (ShapeUtil::IsTuple(shape)) { 91 for (int i = 0; i < ShapeUtil::TupleElementCount(shape); ++i) { 92 const uintptr_t* element_address = address_buffer + i; 93 const void* element = reinterpret_cast<const void*>(*element_address); 94 MarkLiveAddressesInOutput( 95 element, ShapeUtil::GetTupleElementShape(shape, i), marked_addresses); 96 } 97 } 98} 99 100namespace { 101 102// Executor manages the concurrent execution of 'functions' for instructions 103// in 'pending' on 'thread_pool' (storing resulting data in 'results'). 104class Executor { 105 public: 106 Executor(const HloInstructionMap<ComputeFunctionType>& functions, 107 const ServiceExecutableRunOptions* run_options, 108 std::list<HloInstruction*>* pending, 109 HloInstructionMap<const void*>* results, void** temps_array, 110 int64* profile_counters_array, const BufferAssignment* assignment) 111 : functions_(functions), 112 run_options_(run_options), 113 pending_(pending), 114 results_(results), 115 temps_array_(temps_array), 116 profile_counters_array_(profile_counters_array), 117 thread_pool_(CHECK_NOTNULL(run_options_->xla_intra_op_thread_pool())), 118 assignment_(assignment) {} 119 120 // Executes pending list of instructions on thread pool. 121 // Returns OK status on success, error status otherwise. 122 Status Run(); 123 124 private: 125 // Schedules a parallel invocation of compute function for 'instruction' on 126 // 'thread_pool_', storing result in 'result_buffer'. 127 // If 'partition_buffers' is non-null, parallel task will be invoked on 128 // per-dimension partition [start, limit) values stored in 129 // 'partition_buffers'. 130 void Schedule(HloInstruction* instruction, int64* partition_buffers, 131 void* result_buffer); 132 133 // Returns true if 'instruction' has been assigned parallel tasks (returns 134 // false otherwise). 135 bool HasParallelTasks(HloInstruction* instruction); 136 137 // Returns in 'partition_buffers' the partition [size, limit) for each 138 // dimension. 139 int64* GetPartitionBuffers( 140 const std::vector<std::pair<int64, int64>>& partition); 141 142 // Returns array of result buffers for all operands in 'instruction'. 143 const void** GetOperandBuffers(HloInstruction* instruction); 144 145 // Arguments passed into Executor. 146 const HloInstructionMap<ComputeFunctionType>& functions_; 147 const ServiceExecutableRunOptions* run_options_; 148 std::list<HloInstruction*>* pending_; 149 HloInstructionMap<const void*>* results_; 150 void** temps_array_; 151 int64* profile_counters_array_; 152 tensorflow::thread::ThreadPool* thread_pool_; 153 const BufferAssignment* assignment_; 154 155 // Members used to manage instruction execution. 156 tensorflow::mutex completion_queue_lock_; 157 tensorflow::condition_variable completion_queue_cv_; 158 std::deque<HloInstruction*> completion_queue_; 159 int64 instructions_in_flight_ = 0; 160 std::unordered_map<const HloInstruction*, int64> tasks_in_flight_; 161}; 162 163Status Executor::Run() { 164 while (!pending_->empty() || instructions_in_flight_ > 0) { 165 auto pending_it = pending_->begin(); 166 while (pending_it != pending_->end()) { 167 HloInstruction* instruction = *pending_it; 168 // Skip pending instructions whose operands aren't ready. 169 if (std::any_of(instruction->operands().begin(), 170 instruction->operands().end(), 171 [&](HloInstruction* operand) { 172 return !ContainsKey(*results_, operand); 173 })) { 174 ++pending_it; 175 continue; 176 } 177 178 // Get 'result_buffer' reference to result buffer for 'instruction'. 179 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 180 assignment_->GetUniqueTopLevelSlice(instruction)); 181 void* result_buffer = 182 static_cast<char*>(temps_array_[result_slice.index()]) + 183 result_slice.offset(); 184 185 if (HasParallelTasks(instruction)) { 186 // 'instruction' has been assigned parallel task partitions. 187 CHECK_EQ(HloOpcode::kCall, instruction->opcode()); 188 HloInstruction* root = instruction->to_apply()->root_instruction(); 189 190 // Create ShapePartitionIterator to iterate through all outer dimension 191 // partitions of 'instruction'. 192 ShapePartitionIterator partition_iterator( 193 root->shape(), root->outer_dimension_partitions()); 194 195 const int64 partition_count = 196 partition_iterator.GetTotalPartitionCount(); 197 198 // Record total parallel task count for 'instruction' before dispatch. 199 { 200 tensorflow::mutex_lock l(completion_queue_lock_); 201 tasks_in_flight_.insert(std::make_pair(instruction, partition_count)); 202 VLOG(2) << "Schedule PARALLEL" 203 << " instruction: " << instruction->name() 204 << " instruction.callee: " 205 << instruction->to_apply()->root_instruction()->name() 206 << " partition_count: " << partition_count; 207 } 208 209 for (int64 i = 0; i < partition_count; ++i) { 210 // Get partition [start, limit) for each dimension. 211 auto partition_buffers = 212 GetPartitionBuffers(partition_iterator.GetPartition(i)); 213 Schedule(instruction, partition_buffers, result_buffer); 214 } 215 216 } else { 217 // Set tasks in-flight to '1' for sequential instruction execution. 218 { 219 tensorflow::mutex_lock l(completion_queue_lock_); 220 tasks_in_flight_.insert(std::make_pair(instruction, 1)); 221 VLOG(2) << "Schedule SEQUENTIAL" 222 << " instruction: " << instruction->name() 223 << " instruction.callee: " 224 << instruction->to_apply()->root_instruction()->name(); 225 } 226 Schedule(instruction, nullptr, result_buffer); 227 } 228 229 ++instructions_in_flight_; 230 pending_it = pending_->erase(pending_it); 231 } 232 // Wait for a completed HLO instruction to be present in the queue. We will 233 // pop it out of the queue and make the result available to its users. 234 HloInstruction* instruction; 235 do { 236 tensorflow::mutex_lock l(completion_queue_lock_); 237 if (completion_queue_.empty()) { 238 completion_queue_cv_.wait(l); 239 } 240 if (!completion_queue_.empty()) { 241 instruction = completion_queue_.front(); 242 completion_queue_.pop_front(); 243 break; 244 } 245 } while (true); 246 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 247 assignment_->GetUniqueTopLevelSlice(instruction)); 248 void* result_buffer = 249 static_cast<char*>(temps_array_[result_slice.index()]) + 250 result_slice.offset(); 251 InsertOrDie(results_, instruction, result_buffer); 252 --instructions_in_flight_; 253 } 254 return Status::OK(); 255} 256 257void Executor::Schedule(HloInstruction* instruction, int64* partition_buffers, 258 void* result_buffer) { 259 // The thread pool entry takes ownership of |operand_buffers|. 260 auto operand_buffers = GetOperandBuffers(instruction); 261 262 auto function = FindOrDie(functions_, instruction); 263 const auto* exec_run_options = &run_options_->run_options(); 264 thread_pool_->Schedule([this, instruction, result_buffer, operand_buffers, 265 partition_buffers, exec_run_options, function]() { 266 function(result_buffer, exec_run_options, operand_buffers, temps_array_, 267 partition_buffers, profile_counters_array_); 268 269 delete[] operand_buffers; 270 delete[] partition_buffers; 271 // Push the completed HLO instruction on the queue, the main 272 // thread will pop it off and potentially launch more work which 273 // uses the result. 274 // TODO(b/27458679) Consider alternative task scheduling and synchronization 275 // schemes. For example, we could avoid the overhead associate with the 276 // condvar here if the thread just dequed the next instruction to execute 277 // on completion. 278 { 279 tensorflow::mutex_lock l(completion_queue_lock_); 280 // Decrement in-flight task count for this completion. 281 if (--FindOrDie(tasks_in_flight_, instruction) == 0) { 282 completion_queue_.push_back(instruction); 283 completion_queue_cv_.notify_all(); 284 tasks_in_flight_.erase(instruction); 285 } 286 } 287 }); 288} 289 290int64* Executor::GetPartitionBuffers( 291 const std::vector<std::pair<int64, int64>>& partition) { 292 // Return in 'partition_buffers' partition [size, limit) for each dimension. 293 auto partition_buffers = new int64[partition.size() * 2]; 294 for (int i = 0; i < partition.size(); ++i) { 295 partition_buffers[2 * i + 0] = partition[i].first; 296 partition_buffers[2 * i + 1] = partition[i].first + partition[i].second; 297 } 298 return partition_buffers; 299} 300 301bool Executor::HasParallelTasks(HloInstruction* instruction) { 302 return instruction->opcode() == HloOpcode::kCall && 303 !instruction->to_apply() 304 ->root_instruction() 305 ->outer_dimension_partitions() 306 .empty(); 307} 308 309const void** Executor::GetOperandBuffers(HloInstruction* instruction) { 310 // We cannot use a move-only RAII type like std::unique_ptr because the 311 // list of operands is allocated on the main thread and transferred to the 312 // worker via the lambda passed to enqueue_function. In order for the 313 // lambda to take ownership, we would need to use generalized lambda 314 // capture which is a feature new to C++14. 315 // TODO(b/27458679) Avoid dynamic allocations in Executor. 316 auto operand_buffers = new const void*[instruction->operand_count()]; 317 std::transform(instruction->operands().begin(), instruction->operands().end(), 318 operand_buffers, [this](HloInstruction* operand) { 319 return FindOrDie(*results_, operand); 320 }); 321 return operand_buffers; 322} 323 324} // namespace 325 326Status ParallelCpuExecutable::AllocateBuffers( 327 DeviceMemoryAllocator* memory_allocator, int device_ordinal, 328 std::vector<perftools::gputools::DeviceMemoryBase>* buffers) { 329 CHECK_EQ(buffers->size(), assignment_->Allocations().size()); 330 VLOG(3) << "Allocating " << assignment_->Allocations().size() 331 << " allocations for module " << module().name(); 332 for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size(); 333 ++i) { 334 auto& allocation = assignment_->GetAllocation(i); 335 336 VLOG(3) << allocation.ToString(); 337 338 if (allocation.is_entry_computation_parameter()) { 339 VLOG(3) << "allocation #" << i << " is a parameter"; 340 continue; 341 } 342 343 if (allocation.is_thread_local()) { 344 VLOG(3) << "buffer #" << i << " is thread-local"; 345 continue; 346 } 347 348 int64 buffer_size = allocation.size(); 349 if (!(*buffers)[i].is_null()) { 350 VLOG(3) << "buffer #" << i 351 << " is in the preallocated result ShapedBuffer"; 352 } else { 353 TF_ASSIGN_OR_RETURN((*buffers)[i], memory_allocator->Allocate( 354 device_ordinal, buffer_size)); 355 356 VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes [" 357 << (*buffers)[i].opaque() << "]"; 358 } 359 360 // Since the output buffer and all the temporary buffers were written into 361 // by the JITed code, msan has no way of knowing their memory was 362 // initialized. Mark them initialized so that msan doesn't flag loads from 363 // these buffers. 364 TF_ANNOTATE_MEMORY_IS_INITIALIZED((*buffers)[i].opaque(), buffer_size); 365 } 366 367 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 368 assignment_->GetUniqueTopLevelOutputSlice()); 369 VLOG(3) << "result index: " << result_slice.index(); 370 371 return Status::OK(); 372} 373 374Status ParallelCpuExecutable::ExecuteComputeFunctions( 375 const ServiceExecutableRunOptions* run_options, 376 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, 377 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, 378 HloExecutionProfile* hlo_execution_profile) { 379 // Allocate profiling counters for each hlo instruction that we would like to 380 // profile. 381 std::vector<int64>* profile_counters = nullptr; 382 if (hlo_execution_profile) { 383 profile_counters = hlo_execution_profile->mutable_profile_counters(); 384 } 385 386 std::vector<void*> buffer_pointers; 387 buffer_pointers.reserve(buffers.size()); 388 for (auto device_allocation : buffers) { 389 buffer_pointers.push_back(device_allocation.opaque()); 390 } 391 392 // Resolve functions for all the HLO instructions ahead of time. 393 HloInstructionMap<ComputeFunctionType> functions; 394 for (auto& entry : *function_names_) { 395 tensorflow::mutex_lock lock(jit_mutex_); 396 HloInstruction* instruction = entry.first; 397 llvm::JITSymbol sym = jit_->FindCompiledSymbol(entry.second); 398 TF_RET_CHECK(sym); 399 InsertOrDie( 400 &functions, instruction, 401 reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress()))); 402 } 403 404 // Map containing pointers to result buffers for each instruction. 405 HloInstructionMap<const void*> results; 406 407 uint64 start_micros = tensorflow::Env::Default()->NowMicros(); 408 409 std::list<HloInstruction*> pending; 410 411 // Call the function for each HLO instruction in topological order. 412 const HloComputation& entry_computation = *module().entry_computation(); 413 for (auto* instruction : entry_computation.MakeInstructionPostOrder()) { 414 // Parameters and constants have no functions associated with them. Instead 415 // just copy the existing buffer into the map containing instruction 416 // results.. 417 if (instruction->opcode() == HloOpcode::kParameter) { 418 InsertOrDie( 419 &results, instruction, 420 arguments[instruction->parameter_number()]->root_buffer().opaque()); 421 } else if (instruction->opcode() == HloOpcode::kConstant) { 422 unsigned char* aligned_data = 423 FindOrDie(aligned_constants_, instruction).get(); 424 InsertOrDie(&results, instruction, aligned_data); 425 } else { 426 TF_RET_CHECK(instruction->opcode() == HloOpcode::kCall); 427 pending.push_back(instruction); 428 } 429 } 430 431 // TODO(b/27458679) Manage scheduling based on in-flight concurrency limits. 432 // For example, if we expect a library conv/matmul call to run at max 433 // concurrency, we should not dispatch runnable instructions until the 434 // library call is finished (to avoid expensive cache invalidation). 435 Executor executor( 436 functions, run_options, &pending, &results, buffer_pointers.data(), 437 profile_counters ? profile_counters->data() : nullptr, assignment_.get()); 438 439 TF_RETURN_IF_ERROR(executor.Run()); 440 441 uint64 end_micros = tensorflow::Env::Default()->NowMicros(); 442 443 { 444 tensorflow::mutex_lock lock(mutex_); 445 double nanoseconds = (end_micros - start_micros) * 1000.0; 446 execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0)); 447 } 448 449 return Status::OK(); 450} 451 452StatusOr<std::unique_ptr<ShapedBuffer>> ParallelCpuExecutable::ExecuteOnStream( 453 const ServiceExecutableRunOptions* run_options, 454 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, 455 HloExecutionProfile* hlo_execution_profile) { 456 if (GetRootPointsToSet().IsAmbiguous()) { 457 return Unimplemented("Points-to set of root instruction is ambiguous"); 458 } 459 460 se::Stream* stream = run_options->stream(); 461 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 462 std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); 463 464 auto result_buffer = MakeUnique<ShapedBuffer>( 465 /*on_host_shape=*/result_shape(), /*on_device_shape=*/result_shape(), 466 stream->parent()->platform(), stream->parent()->device_ordinal()); 467 468 TF_RETURN_IF_ERROR(AllocateBuffers( 469 memory_allocator, stream->parent()->device_ordinal(), &buffers)); 470 471 TF_RETURN_IF_ERROR(ExecuteComputeFunctions(run_options, arguments, buffers, 472 hlo_execution_profile)); 473 474 // Copy DeviceMemoryBase values which into the respective location in 475 // ShapedBuffer which is returned to the caller. 476 std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false); 477 TF_RETURN_IF_ERROR(result_buffer->buffers().ForEachMutableElementWithStatus( 478 [&](const ShapeIndex& index, se::DeviceMemoryBase* device_memory) { 479 const auto& sources = this->GetRootPointsToSet().element(index); 480 481 // The points to set is unambiguous so the set should be a singleton. 482 CHECK_EQ(1, sources.size()); 483 const LogicalBuffer* buffer_source = sources[0]; 484 HloInstruction* src = buffer_source->instruction(); 485 486 // The source for this result buffer can be a nested buffer such as a 487 // tuple element. The source instruction should have a non-parameter 488 // buffer assigned. 489 TF_ASSIGN_OR_RETURN( 490 const BufferAllocation::Slice slice, 491 this->assignment_->GetUniqueSlice(src, buffer_source->index())); 492 CHECK(!slice.allocation()->is_entry_computation_parameter()); 493 494 const BufferAllocation::Index buffer_index = slice.index(); 495 const se::DeviceMemoryBase& buffer = buffers[buffer_index]; 496 CHECK(!buffer.is_null() || buffer.size() == 0); 497 *device_memory = buffer; 498 buffers_in_result[buffer_index] = true; 499 return Status::OK(); 500 })); 501 502 // Free all buffers not in the result. 503 for (size_t i = 0; i < buffers.size(); ++i) { 504 se::DeviceMemoryBase alloc = buffers[i]; 505 if (!buffers_in_result[i] && !alloc.is_null()) { 506 VLOG(3) << "CpuExecutable deallocating buffer #" << i << " [" 507 << alloc.opaque() << "]"; 508 TF_RETURN_IF_ERROR(memory_allocator->Deallocate( 509 stream->parent()->device_ordinal(), &alloc)); 510 } 511 } 512 513 return std::move(result_buffer); 514} 515 516StatusOr<std::unique_ptr<ShapedBuffer>> 517ParallelCpuExecutable::ExecuteAsyncOnStream( 518 const ServiceExecutableRunOptions* run_options, 519 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments) { 520 // TODO(b/30671675): Implement asynchronous execution mode. 521 return Unimplemented( 522 "Asynchronous execution on stream is not yet supported on CPU."); 523} 524 525const PointsToSet& ParallelCpuExecutable::GetRootPointsToSet() const { 526 return assignment_->points_to_analysis().GetPointsToSet( 527 module().entry_computation()->root_instruction()); 528} 529 530} // namespace cpu 531} // namespace xla 532