cpu_executable.cc revision 1e67c90e2caceeff82d09793d1ef5fa0300d219b
1/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16#include "tensorflow/compiler/xla/service/cpu/cpu_executable.h" 17 18#include <stdint.h> 19#include <algorithm> 20#include <set> 21#include <unordered_set> 22#include <utility> 23#include <vector> 24 25#include "external/llvm/include/llvm/ExecutionEngine/Orc/IRCompileLayer.h" 26#include "tensorflow/compiler/xla/service/buffer_assignment.h" 27#include "tensorflow/compiler/xla/service/computation_layout.h" 28#include "tensorflow/compiler/xla/service/hlo_computation.h" 29#include "tensorflow/compiler/xla/service/hlo_module.h" 30#include "tensorflow/compiler/xla/service/hlo_module_config.h" 31#include "tensorflow/compiler/xla/service/logical_buffer.h" 32#include "tensorflow/compiler/xla/service/shaped_buffer.h" 33#include "tensorflow/compiler/xla/shape_tree.h" 34#include "tensorflow/compiler/xla/shape_util.h" 35#include "tensorflow/compiler/xla/status_macros.h" 36#include "tensorflow/compiler/xla/types.h" 37#include "tensorflow/compiler/xla/util.h" 38#include "tensorflow/compiler/xla/xla_data.pb.h" 39#include "tensorflow/core/lib/strings/str_util.h" 40#include "tensorflow/core/lib/strings/strcat.h" 41#include "tensorflow/core/lib/strings/stringprintf.h" 42#include "tensorflow/core/platform/env.h" 43#include "tensorflow/core/platform/logging.h" 44#include "tensorflow/core/platform/mem.h" 45#include "tensorflow/core/platform/mutex.h" 46#include "tensorflow/core/platform/types.h" 47 48namespace se = ::perftools::gputools; 49 50namespace xla { 51namespace cpu { 52 53CpuExecutable::CpuExecutable( 54 std::unique_ptr<SimpleOrcJIT> jit, 55 std::unique_ptr<BufferAssignment> assignment, 56 std::unique_ptr<HloModule> hlo_module, 57 std::unique_ptr<HloModuleConfig> module_config, 58 const string& entry_function_name, 59 std::unordered_map<const HloInstruction*, size_t> hlo_to_profile_idx) 60 : Executable(std::move(hlo_module), std::move(module_config)), 61 jit_(std::move(jit)), 62 assignment_(std::move(assignment)), 63 hlo_to_profile_idx_(std::move(hlo_to_profile_idx)) { 64 // Resolve symbols in the constructor rather than at execution time to avoid 65 // races because FindSymbol is not thread safe. 66 llvm::JITSymbol sym = jit_->FindSymbol(entry_function_name); 67 // We expect to find the symbol provided with entry_function_name; otherwise 68 // this is an internal error. 69 CHECK(sym) << "Symbol " << entry_function_name << " not found."; 70 // getAddress can do work under the hood in the jit, so it needs to be 71 // guarded by the mutex. 72 compute_function_ = reinterpret_cast<ComputeFunctionType>(sym.getAddress()); 73} 74 75// Given a pointer to an output buffer (following the CPU JIT calling 76// conventions), mark addresses that are "live". The initial pointer itself is 77// trivially live. If the shape of the buffer is a tuple, this analysis looks 78// into the tuple's elements and marks them live as well (since tuples keep 79// pointers to buffers) and also works recursively. address is an in-memory 80// buffer address that contains some runtime XLA object. shape is its 81// shape. marked_addresses is the set of live addresses to populate. 82static void MarkLiveAddressesInOutput( 83 const void* address, const Shape& shape, 84 std::unordered_set<const void*>* marked_addresses) { 85 marked_addresses->insert(address); 86 const uintptr_t* address_buffer = static_cast<const uintptr_t*>(address); 87 if (ShapeUtil::IsTuple(shape)) { 88 for (int i = 0; i < ShapeUtil::TupleElementCount(shape); ++i) { 89 const uintptr_t* element_address = address_buffer + i; 90 const void* element = reinterpret_cast<const void*>(*element_address); 91 MarkLiveAddressesInOutput( 92 element, ShapeUtil::GetTupleElementShape(shape, i), marked_addresses); 93 } 94 } 95} 96 97Status CpuExecutable::AllocateBuffers( 98 DeviceMemoryAllocator* memory_allocator, int device_ordinal, 99 std::vector<perftools::gputools::DeviceMemoryBase>* buffers) { 100 CHECK_EQ(buffers->size(), assignment_->Allocations().size()); 101 VLOG(3) << "Allocating " << assignment_->Allocations().size() 102 << " allocations for module " << module().name(); 103 for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size(); 104 ++i) { 105 auto& allocation = assignment_->GetAllocation(i); 106 107 VLOG(3) << allocation.ToString(); 108 109 if (allocation.is_entry_computation_parameter()) { 110 VLOG(3) << "allocation #" << i << " is a parameter"; 111 continue; 112 } 113 114 if (allocation.is_thread_local()) { 115 VLOG(3) << "buffer #" << i << " is thread-local"; 116 continue; 117 } 118 119 int64 buffer_size = allocation.size(); 120 if (!(*buffers)[i].is_null()) { 121 VLOG(3) << "buffer #" << i 122 << " is in the preallocated result ShapedBuffer"; 123 } else { 124 TF_ASSIGN_OR_RETURN((*buffers)[i], memory_allocator->Allocate( 125 device_ordinal, buffer_size)); 126 127 VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes [" 128 << (*buffers)[i].opaque() << "]"; 129 } 130 131 // Since the output buffer and all the temporary buffers were written into 132 // by the JITed code, msan has no way of knowing their memory was 133 // initialized. Mark them initialized so that msan doesn't flag loads from 134 // these buffers. 135 TF_ANNOTATE_MEMORY_IS_INITIALIZED((*buffers)[i].opaque(), buffer_size); 136 } 137 138 TF_ASSIGN_OR_RETURN(const BufferAllocation* result_allocation, 139 assignment_->GetUniqueTopLevelOutputAllocation()); 140 141 VLOG(3) << "result index: " << result_allocation->index(); 142 143 return Status::OK(); 144} 145 146Status CpuExecutable::ExecuteComputeFunction( 147 const ExecutableRunOptions* run_options, 148 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, 149 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, 150 HloExecutionProfile* hlo_execution_profile) { 151 std::vector<se::DeviceMemoryBase> argument_buffers; 152 for (int i = 0; i < arguments.size(); ++i) { 153 TF_RET_CHECK(!ShapeUtil::IsTuple(arguments[i]->shape())); 154 argument_buffers.push_back(arguments[i]->buffer(/*index=*/{})); 155 } 156 return ExecuteComputeFunction(run_options, argument_buffers, buffers, 157 hlo_execution_profile); 158} 159 160Status CpuExecutable::ExecuteComputeFunction( 161 const ExecutableRunOptions* run_options, 162 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments, 163 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, 164 HloExecutionProfile* hlo_execution_profile) { 165 // The calling convention for JITed functions is: 166 // 167 // void function(void* result, const void* run_options, void** args_array, 168 // void** temps_array) 169 // 170 // result: Points at the result. 171 // run_options: the ExecutableRunOptions object. 172 // args_array: An array of pointers, each of which points to a parameter. 173 // The size of this array is determined by the function's arity 174 // (ProgramShape). 175 // temps_array: An array of pointers, each of which points to a temporary 176 // buffer the computation needs. The size of this array is 177 // determined by buffer analysis. 178 // 179 std::vector<const void*> args_array; 180 for (se::DeviceMemoryBase arg_mem : arguments) { 181 args_array.push_back(arg_mem.opaque()); 182 } 183 184 uint64 start_micros = tensorflow::Env::Default()->NowMicros(); 185 186 // Allocate profiling counters for each hlo instruction that we would like to 187 // profile. Allocate an additional profile counter for the entire 188 // computation. 189 std::vector<uint64> profile_counters(hlo_to_profile_idx_.size() + 1); 190 191 // Call the computation function following the calling convention. 192 std::vector<void*> buffer_pointers; 193 for (auto& buffer : buffers) { 194 buffer_pointers.push_back(const_cast<void*>(buffer.opaque())); 195 } 196 TF_ASSIGN_OR_RETURN(const BufferAllocation* result_allocation, 197 assignment_->GetUniqueTopLevelOutputAllocation()); 198 void* result_buffer = buffer_pointers[result_allocation->index()]; 199 if (VLOG_IS_ON(3)) { 200 VLOG(3) << "Executing compute function:"; 201 VLOG(3) << tensorflow::strings::Printf( 202 " func(void* result, void* params[%zu], void* temps[%zu], " 203 "uint64 profile_counters[%zu])", 204 args_array.size(), buffer_pointers.size(), profile_counters.size()); 205 VLOG(3) << tensorflow::strings::Printf(" result = %p", result_buffer); 206 auto ptr_printer = [](string* out, const void* p) { 207 tensorflow::strings::StrAppend(out, tensorflow::strings::Printf("%p", p)); 208 }; 209 VLOG(3) << tensorflow::strings::Printf( 210 " params = [%s]", 211 tensorflow::str_util::Join(args_array, ", ", ptr_printer).c_str()); 212 VLOG(3) << tensorflow::strings::Printf( 213 " temps = [%s]", 214 tensorflow::str_util::Join(buffer_pointers, ", ", ptr_printer).c_str()); 215 VLOG(3) << tensorflow::strings::Printf(" profile_counters = %p", 216 profile_counters.data()); 217 } 218 219 compute_function_(result_buffer, run_options, args_array.data(), 220 buffer_pointers.data(), profile_counters.data()); 221 222 uint64 end_micros = tensorflow::Env::Default()->NowMicros(); 223 224 { 225 tensorflow::mutex_lock lock(mutex_); 226 const double nanoseconds = (end_micros - start_micros) * 1000.0; 227 execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0)); 228 229 // The last profile counter is used for the computation as a whole. 230 execution_profile_.set_compute_cycle_count(profile_counters.back()); 231 } 232 233 if (hlo_execution_profile != nullptr) { 234 hlo_execution_profile->set_total_cycles_executed(profile_counters.back()); 235 236 for (auto hlo_prof_idx : hlo_to_profile_idx_) { 237 const HloInstruction* hlo = hlo_prof_idx.first; 238 uint64 cycles_taken = profile_counters[hlo_prof_idx.second]; 239 hlo_execution_profile->AddProfileResult(hlo, cycles_taken); 240 } 241 } 242 return Status::OK(); 243} 244 245StatusOr<perftools::gputools::DeviceMemoryBase> CpuExecutable::ExecuteOnStream( 246 const ExecutableRunOptions* run_options, 247 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments, 248 HloExecutionProfile* hlo_execution_profile) { 249 se::Stream* stream = run_options->stream(); 250 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 251 std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); 252 TF_RETURN_IF_ERROR(AllocateBuffers( 253 memory_allocator, stream->parent()->device_ordinal(), &buffers)); 254 255 TF_RETURN_IF_ERROR(ExecuteComputeFunction(run_options, arguments, buffers, 256 hlo_execution_profile)); 257 258 // Mark the buffers that are actually live (used in the output) when the 259 // computation finishes executing. 260 std::unordered_set<const void*> marked_addresses; 261 TF_ASSIGN_OR_RETURN(const BufferAllocation* result_allocation, 262 assignment_->GetUniqueTopLevelOutputAllocation()); 263 se::DeviceMemoryBase top_level_output = buffers[result_allocation->index()]; 264 MarkLiveAddressesInOutput(top_level_output.opaque(), result_shape(), 265 &marked_addresses); 266 267 VLOG(3) << "Live addresses in output marking found " 268 << marked_addresses.size() << " addresses:\n" 269 << tensorflow::str_util::Join( 270 marked_addresses, ", ", [](string* out, const void* address) { 271 tensorflow::strings::StrAppend( 272 out, tensorflow::strings::Printf("%p", address)); 273 }); 274 275 // Computation is done - deallocate temp buffers. Keep those marked live 276 // because they are referenced by the output of the computation and are needed 277 // by the service. They will be deallocated by the service. 278 for (auto i = 0; i < buffers.size(); ++i) { 279 auto alloc = buffers[i]; 280 if (marked_addresses.count(alloc.opaque()) == 0 && 281 alloc.opaque() != nullptr) { 282 VLOG(3) << "CpuExecutable deallocating buffer #" << i << " [" 283 << alloc.opaque() << "]"; 284 TF_RETURN_IF_ERROR(memory_allocator->Deallocate( 285 stream->parent()->device_ordinal(), &alloc)); 286 } 287 } 288 289 return top_level_output; 290} 291 292StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteOnStream( 293 const ExecutableRunOptions* run_options, 294 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, 295 HloExecutionProfile* hlo_execution_profile) { 296 se::Stream* stream = run_options->stream(); 297 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 298 if (GetRootPointsToSet().IsAmbiguous()) { 299 return Unimplemented("Points-to set of root instruction is ambiguous"); 300 } 301 std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); 302 303 TF_ASSIGN_OR_RETURN( 304 std::unique_ptr<ShapedBuffer> result_buffer, 305 ShapedBuffer::MakeShapedBuffer( 306 module_config().entry_computation_layout().result_shape(), 307 stream->parent()->platform(), stream->parent()->device_ordinal())); 308 309 TF_RETURN_IF_ERROR(AllocateBuffers( 310 memory_allocator, stream->parent()->device_ordinal(), &buffers)); 311 312 TF_RETURN_IF_ERROR(ExecuteComputeFunction(run_options, arguments, buffers, 313 hlo_execution_profile)); 314 315 // Copy DeviceMemoryBase values which contain the array(s) of the result into 316 // the respective location in ShapedBuffer which is returned to the caller. 317 std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false); 318 TF_RETURN_IF_ERROR( 319 result_buffer->mutable_shape_index_to_buffer_entry() 320 ->ForEachMutableElement( 321 [&buffers, &buffers_in_result, &result_buffer, this]( 322 const ShapeIndex& index, bool is_leaf, size_t* buffer_entry) { 323 if (is_leaf) { 324 const std::vector<const LogicalBuffer*>& sources = 325 this->GetRootPointsToSet().element(index); 326 // The points to set is unambiguous so the set should be a 327 // singleton. 328 CHECK_EQ(1, sources.size()); 329 const LogicalBuffer* buffer_source = sources[0]; 330 HloInstruction* src = buffer_source->instruction(); 331 332 // The source for this result buffer can be a nested buffer 333 // such as a tuple element. 334 335 // The source instruction should have a non-parameter buffer 336 // assigned. 337 TF_ASSIGN_OR_RETURN(const BufferAllocation* allocation, 338 this->assignment_->GetUniqueAllocation( 339 src, buffer_source->index())); 340 CHECK(!allocation->is_entry_computation_parameter()); 341 342 CHECK(!buffers[allocation->index()].is_null() || 343 buffers[allocation->index()].size() == 0); 344 result_buffer->mutable_buffers()->push_back( 345 buffers[allocation->index()]); 346 *buffer_entry = result_buffer->mutable_buffers()->size() - 1; 347 buffers_in_result[allocation->index()] = true; 348 } 349 return Status::OK(); 350 })); 351 352 // Free all buffers not in the result. 353 for (auto i = 0; i < buffers.size(); ++i) { 354 auto alloc = buffers[i]; 355 if (!buffers_in_result[i] && !alloc.is_null()) { 356 VLOG(3) << "CpuExecutable deallocating buffer #" << i << " [" 357 << alloc.opaque() << "]"; 358 TF_RETURN_IF_ERROR(memory_allocator->Deallocate( 359 stream->parent()->device_ordinal(), &alloc)); 360 } 361 } 362 363 return std::move(result_buffer); 364} 365 366Status CpuExecutable::ExecuteOnStream( 367 const ExecutableRunOptions* run_options, 368 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, 369 ShapedBuffer* result_buffer, HloExecutionProfile* hlo_execution_profile) { 370 se::Stream* stream = run_options->stream(); 371 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 372 // Every array element in the result of the computation must be unambiguously 373 // produced by a single instruction. 374 // This ensures that the buffers inside result_buffer can be assigned without 375 // conflict to the respective instructions because there is a one-to-one 376 // correspondence between hlo instructions and array buffers in the result. 377 if (GetRootPointsToSet().IsAmbiguous()) { 378 return Unimplemented( 379 "Points-to set of root instruction is ambiguous or not distinct"); 380 } 381 std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); 382 DCHECK(ShapeUtil::Compatible(result_buffer->shape(), result_shape())); 383 384 // If two tuple elements point to the same buffer, one of the results in the 385 // result buffer is considered the canonical location while the other result 386 // points to it (instead of, say, making a copy of the result). 387 // buffer_index_to_shape_index maps a buffer index to its canonical location 388 // in the result buffer. 389 std::unordered_map<BufferAllocation::Index, size_t> 390 buffer_index_to_shape_index; 391 392 // Copy values from result_buffer to the index in "buffers". These buffers 393 // will not be allocated in the call to AllocateBuffers. 394 std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false); 395 TF_RETURN_IF_ERROR( 396 result_buffer->mutable_shape_index_to_buffer_entry() 397 ->ForEachMutableElement( 398 [&buffers, &buffers_in_result, &buffer_index_to_shape_index, 399 result_buffer, this](const ShapeIndex& index, bool is_leaf, 400 size_t* buffer_entry) { 401 if (is_leaf) { 402 const std::vector<const LogicalBuffer*>& sources = 403 this->GetRootPointsToSet().element(index); 404 // The points to set is unambiguous so the set should be a 405 // singleton. 406 CHECK_EQ(1, sources.size()); 407 const LogicalBuffer* buffer_source = sources[0]; 408 HloInstruction* src = buffer_source->instruction(); 409 410 // The source for this result buffer can be a nested buffer 411 // such as a tuple element. 412 413 // The source instruction should have a non-parameter buffer 414 // assigned. 415 TF_ASSIGN_OR_RETURN(const BufferAllocation* allocation, 416 this->assignment_->GetUniqueAllocation( 417 src, buffer_source->index())); 418 CHECK(!allocation->is_entry_computation_parameter()); 419 420 auto insert_result = buffer_index_to_shape_index.emplace( 421 allocation->index(), *buffer_entry); 422 if (insert_result.second) { 423 // The points-to set is distinct so this buffer should not 424 // have 425 // been assigned in a previous invocation of this lambda. 426 perftools::gputools::DeviceMemoryBase memory_base = 427 result_buffer->buffer(index); 428 CHECK(buffers[allocation->index()].is_null()); 429 CHECK(!memory_base.is_null()); 430 buffers[allocation->index()] = memory_base; 431 buffers_in_result[allocation->index()] = true; 432 } else { 433 // Record the fact that this tuple element is identical to 434 // some 435 // prior result. 436 *buffer_entry = insert_result.first->second; 437 } 438 } 439 return Status::OK(); 440 })); 441 442 TF_RETURN_IF_ERROR(AllocateBuffers( 443 memory_allocator, stream->parent()->device_ordinal(), &buffers)); 444 445 TF_RETURN_IF_ERROR(ExecuteComputeFunction(run_options, arguments, buffers, 446 hlo_execution_profile)); 447 448 // Free all buffers not in the result. 449 for (auto i = 0; i < buffers.size(); ++i) { 450 auto alloc = buffers[i]; 451 if (!buffers_in_result[i] && !alloc.is_null()) { 452 VLOG(3) << "CpuExecutable deallocating buffer #" << i << " [" 453 << alloc.opaque() << "]"; 454 TF_RETURN_IF_ERROR(memory_allocator->Deallocate( 455 stream->parent()->device_ordinal(), &alloc)); 456 } 457 } 458 459 return Status::OK(); 460} 461 462StatusOr<perftools::gputools::DeviceMemoryBase> 463CpuExecutable::ExecuteAsyncOnStream( 464 const ExecutableRunOptions* run_options, 465 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments) { 466 // TODO(b/30671675): Implement asynchronous execution mode. 467 return Unimplemented( 468 "Asynchronous execution on stream is not yet supported on CPU."); 469} 470 471const PointsToSet& CpuExecutable::GetRootPointsToSet() const { 472 return assignment_->points_to_analysis().GetPointsToSet( 473 module().entry_computation()->root_instruction()); 474} 475 476} // namespace cpu 477} // namespace xla 478