cpu_executable.cc revision cf245240ca90e6b552415f720342ae1acd326590
1/* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3Licensed under the Apache License, Version 2.0 (the "License"); 4you may not use this file except in compliance with the License. 5You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9Unless required by applicable law or agreed to in writing, software 10distributed under the License is distributed on an "AS IS" BASIS, 11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12See the License for the specific language governing permissions and 13limitations under the License. 14==============================================================================*/ 15 16#include "tensorflow/compiler/xla/service/cpu/cpu_executable.h" 17 18#include <stdint.h> 19#include <algorithm> 20#include <set> 21#include <unordered_set> 22#include <utility> 23#include <vector> 24 25#include "llvm/ExecutionEngine/Orc/IRCompileLayer.h" 26#include "tensorflow/compiler/xla/service/buffer_assignment.h" 27#include "tensorflow/compiler/xla/service/computation_layout.h" 28#include "tensorflow/compiler/xla/service/hlo_computation.h" 29#include "tensorflow/compiler/xla/service/hlo_module.h" 30#include "tensorflow/compiler/xla/service/logical_buffer.h" 31#include "tensorflow/compiler/xla/service/shaped_buffer.h" 32#include "tensorflow/compiler/xla/shape_tree.h" 33#include "tensorflow/compiler/xla/shape_util.h" 34#include "tensorflow/compiler/xla/status_macros.h" 35#include "tensorflow/compiler/xla/types.h" 36#include "tensorflow/compiler/xla/util.h" 37#include "tensorflow/compiler/xla/xla_data.pb.h" 38#include "tensorflow/core/lib/strings/str_util.h" 39#include "tensorflow/core/lib/strings/strcat.h" 40#include "tensorflow/core/lib/strings/stringprintf.h" 41#include "tensorflow/core/platform/env.h" 42#include "tensorflow/core/platform/logging.h" 43#include "tensorflow/core/platform/mem.h" 44#include "tensorflow/core/platform/mutex.h" 45#include "tensorflow/core/platform/types.h" 46#include "tensorflow/stream_executor/host/host_stream.h" 47 48namespace se = ::perftools::gputools; 49 50namespace xla { 51namespace cpu { 52 53CpuExecutable::CpuExecutable( 54 std::unique_ptr<SimpleOrcJIT> jit, 55 std::unique_ptr<const BufferAssignment> assignment, 56 std::unique_ptr<const HloModule> hlo_module, 57 const string& entry_function_name, 58 std::unordered_map<const HloInstruction*, size_t> hlo_to_profile_idx) 59 : Executable(std::move(hlo_module)), 60 jit_(std::move(jit)), 61 assignment_(std::move(assignment)), 62 hlo_to_profile_idx_(std::move(hlo_to_profile_idx)) { 63 // Resolve symbols in the constructor rather than at execution time to avoid 64 // races because FindSymbol is not thread safe. 65 llvm::JITSymbol sym = jit_->FindSymbol(entry_function_name); 66 // We expect to find the symbol provided with entry_function_name; otherwise 67 // this is an internal error. 68 CHECK(sym) << "Symbol " << entry_function_name << " not found."; 69 // getAddress can do work under the hood in the jit, so it needs to be 70 // guarded by the mutex. 71 compute_function_ = 72 reinterpret_cast<ComputeFunctionType>(cantFail(sym.getAddress())); 73} 74 75// Given a pointer to an output buffer (following the CPU JIT calling 76// conventions), mark addresses that are "live". The initial pointer itself is 77// trivially live. If the shape of the buffer is a tuple, this analysis looks 78// into the tuple's elements and marks them live as well (since tuples keep 79// pointers to buffers) and also works recursively. address is an in-memory 80// buffer address that contains some runtime XLA object. shape is its 81// shape. marked_addresses is the set of live addresses to populate. 82static void MarkLiveAddressesInOutput( 83 const void* address, const Shape& shape, 84 std::unordered_set<const void*>* marked_addresses) { 85 marked_addresses->insert(address); 86 const uintptr_t* address_buffer = static_cast<const uintptr_t*>(address); 87 if (ShapeUtil::IsTuple(shape)) { 88 for (int i = 0; i < ShapeUtil::TupleElementCount(shape); ++i) { 89 const uintptr_t* element_address = address_buffer + i; 90 const void* element = reinterpret_cast<const void*>(*element_address); 91 MarkLiveAddressesInOutput( 92 element, ShapeUtil::GetTupleElementShape(shape, i), marked_addresses); 93 } 94 } 95} 96 97Status CpuExecutable::AllocateBuffers( 98 DeviceMemoryAllocator* memory_allocator, int device_ordinal, 99 std::vector<perftools::gputools::DeviceMemoryBase>* buffers) { 100 CHECK_EQ(buffers->size(), assignment_->Allocations().size()); 101 VLOG(3) << "Allocating " << assignment_->Allocations().size() 102 << " allocations for module " << module().name(); 103 for (BufferAllocation::Index i = 0; i < assignment_->Allocations().size(); 104 ++i) { 105 auto& allocation = assignment_->GetAllocation(i); 106 107 VLOG(3) << allocation.ToString(); 108 109 if (allocation.is_entry_computation_parameter()) { 110 VLOG(3) << "allocation #" << i << " is a parameter"; 111 continue; 112 } 113 114 if (allocation.is_thread_local()) { 115 VLOG(3) << "buffer #" << i << " is thread-local"; 116 continue; 117 } 118 119 int64 buffer_size = allocation.size(); 120 if (!(*buffers)[i].is_null()) { 121 VLOG(3) << "buffer #" << i 122 << " is in the preallocated result ShapedBuffer"; 123 } else { 124 TF_ASSIGN_OR_RETURN((*buffers)[i], memory_allocator->Allocate( 125 device_ordinal, buffer_size)); 126 127 VLOG(3) << "buffer #" << i << " allocated " << buffer_size << " bytes [" 128 << (*buffers)[i].opaque() << "]"; 129 } 130 131 // Since the output buffer and all the temporary buffers were written into 132 // by the JITed code, msan has no way of knowing their memory was 133 // initialized. Mark them initialized so that msan doesn't flag loads from 134 // these buffers. 135 TF_ANNOTATE_MEMORY_IS_INITIALIZED((*buffers)[i].opaque(), buffer_size); 136 } 137 138 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 139 assignment_->GetUniqueTopLevelOutputSlice()); 140 VLOG(3) << "result index: " << result_slice.index(); 141 142 return Status::OK(); 143} 144 145Status CpuExecutable::ExecuteComputeFunction( 146 const ExecutableRunOptions* run_options, 147 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, 148 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, 149 HloExecutionProfile* hlo_execution_profile) { 150 std::vector<se::DeviceMemoryBase> argument_buffers; 151 argument_buffers.reserve(arguments.size()); 152 for (const auto* argument : arguments) { 153 argument_buffers.push_back(argument->buffer(/*index=*/{})); 154 } 155 return ExecuteComputeFunction(run_options, argument_buffers, buffers, 156 hlo_execution_profile); 157} 158 159Status CpuExecutable::ExecuteComputeFunction( 160 const ExecutableRunOptions* run_options, 161 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments, 162 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, 163 HloExecutionProfile* hlo_execution_profile) { 164 // The calling convention for JITed functions is: 165 // 166 // void function(void* result, const void* run_options, void** args_array, 167 // void** temps_array) 168 // 169 // result: Points at the result. 170 // run_options: the ExecutableRunOptions object. 171 // args_array: An array of pointers, each of which points to a parameter. 172 // The size of this array is determined by the function's arity 173 // (ProgramShape). 174 // temps_array: An array of pointers, each of which points to a temporary 175 // buffer the computation needs. The size of this array is 176 // determined by buffer analysis. 177 // 178 std::vector<const void*> args_array; 179 for (se::DeviceMemoryBase arg_mem : arguments) { 180 args_array.push_back(arg_mem.opaque()); 181 } 182 183 uint64 start_micros = tensorflow::Env::Default()->NowMicros(); 184 185 // Allocate profiling counters for each hlo instruction that we would like to 186 // profile. Allocate an additional profile counter for the entire 187 // computation. 188 std::vector<uint64> profile_counters(hlo_to_profile_idx_.size() + 1); 189 190 // Call the computation function following the calling convention. 191 std::vector<void*> buffer_pointers; 192 for (auto& buffer : buffers) { 193 buffer_pointers.push_back(const_cast<void*>(buffer.opaque())); 194 } 195 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 196 assignment_->GetUniqueTopLevelOutputSlice()); 197 void* result_buffer = buffer_pointers[result_slice.index()]; 198 if (VLOG_IS_ON(3)) { 199 VLOG(3) << "Executing compute function:"; 200 VLOG(3) << tensorflow::strings::Printf( 201 " func(void* result, void* params[%zu], void* temps[%zu], " 202 "uint64 profile_counters[%zu])", 203 args_array.size(), buffer_pointers.size(), profile_counters.size()); 204 VLOG(3) << tensorflow::strings::Printf(" result = %p", result_buffer); 205 auto ptr_printer = [](string* out, const void* p) { 206 tensorflow::strings::StrAppend(out, tensorflow::strings::Printf("%p", p)); 207 }; 208 VLOG(3) << tensorflow::strings::Printf( 209 " params = [%s]", 210 tensorflow::str_util::Join(args_array, ", ", ptr_printer).c_str()); 211 VLOG(3) << tensorflow::strings::Printf( 212 " temps = [%s]", 213 tensorflow::str_util::Join(buffer_pointers, ", ", ptr_printer).c_str()); 214 VLOG(3) << tensorflow::strings::Printf(" profile_counters = %p", 215 profile_counters.data()); 216 } 217 218 compute_function_(result_buffer, run_options, args_array.data(), 219 buffer_pointers.data(), profile_counters.data()); 220 221 uint64 end_micros = tensorflow::Env::Default()->NowMicros(); 222 223 { 224 tensorflow::mutex_lock lock(mutex_); 225 const double nanoseconds = (end_micros - start_micros) * 1000.0; 226 execution_profile_.set_compute_time_ns(std::max(nanoseconds, 1.0)); 227 228 // The last profile counter is used for the computation as a whole. 229 execution_profile_.set_compute_cycle_count(profile_counters.back()); 230 } 231 232 if (hlo_execution_profile != nullptr) { 233 hlo_execution_profile->set_total_cycles_executed( 234 *module().entry_computation(), profile_counters.back()); 235 236 for (auto hlo_prof_idx : hlo_to_profile_idx_) { 237 const HloInstruction* hlo = hlo_prof_idx.first; 238 uint64 cycles_taken = profile_counters[hlo_prof_idx.second]; 239 hlo_execution_profile->SetCyclesTakenBy(hlo, cycles_taken); 240 } 241 } 242 return Status::OK(); 243} 244 245static void LogLiveAddresses( 246 const std::unordered_set<const void*>& marked_addresses) { 247 VLOG(3) << "Live addresses in output marking found " 248 << marked_addresses.size() << " addresses:\n" 249 << tensorflow::str_util::Join( 250 marked_addresses, ", ", [](string* out, const void* address) { 251 tensorflow::strings::StrAppend( 252 out, tensorflow::strings::Printf("%p", address)); 253 }); 254} 255 256static Status DeallocateTempBuffers( 257 DeviceMemoryAllocator* allocator, se::Stream* stream, 258 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> buffers, 259 const std::unordered_set<const void*>& marked_addresses) { 260 // Keep those marked live because they are referenced by the output of the 261 // computation and are needed by the service. They will be deallocated by the 262 // service. 263 for (size_t i = 0; i < buffers.size(); ++i) { 264 se::DeviceMemoryBase alloc = buffers[i]; 265 if (marked_addresses.count(alloc.opaque()) == 0 && !alloc.is_null()) { 266 VLOG(3) << "CpuExecutable deallocating buffer #" << i << " [" 267 << alloc.opaque() << "]"; 268 TF_RETURN_IF_ERROR( 269 allocator->Deallocate(stream->parent()->device_ordinal(), &alloc)); 270 } 271 } 272 273 return Status::OK(); 274} 275 276StatusOr<perftools::gputools::DeviceMemoryBase> CpuExecutable::ExecuteOnStream( 277 const ServiceExecutableRunOptions* run_options, 278 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments, 279 HloExecutionProfile* hlo_execution_profile) { 280 se::Stream* stream = run_options->stream(); 281 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 282 std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); 283 284 TF_RETURN_IF_ERROR(AllocateBuffers( 285 memory_allocator, stream->parent()->device_ordinal(), &buffers)); 286 TF_RETURN_IF_ERROR(ExecuteComputeFunction( 287 &run_options->run_options(), arguments, buffers, hlo_execution_profile)); 288 289 // Mark the buffers that are actually live (used in the output) when the 290 // computation finishes executing. 291 std::unordered_set<const void*> marked_addresses; 292 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 293 assignment_->GetUniqueTopLevelOutputSlice()); 294 se::DeviceMemoryBase top_level_output = buffers[result_slice.index()]; 295 MarkLiveAddressesInOutput(top_level_output.opaque(), result_shape(), 296 &marked_addresses); 297 298 LogLiveAddresses(marked_addresses); 299 TF_RETURN_IF_ERROR(DeallocateTempBuffers(memory_allocator, stream, buffers, 300 marked_addresses)); 301 302 return top_level_output; 303} 304 305StatusOr<std::unique_ptr<ShapedBuffer>> CpuExecutable::ExecuteOnStream( 306 const ServiceExecutableRunOptions* run_options, 307 tensorflow::gtl::ArraySlice<const ShapedBuffer*> arguments, 308 HloExecutionProfile* hlo_execution_profile) { 309 if (GetRootPointsToSet().IsAmbiguous()) { 310 return Unimplemented("Points-to set of root instruction is ambiguous"); 311 } 312 313 se::Stream* stream = run_options->stream(); 314 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 315 std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); 316 317 auto result_buffer = 318 MakeUnique<ShapedBuffer>(result_shape(), stream->parent()->platform(), 319 stream->parent()->device_ordinal()); 320 321 TF_RETURN_IF_ERROR(AllocateBuffers( 322 memory_allocator, stream->parent()->device_ordinal(), &buffers)); 323 TF_RETURN_IF_ERROR(ExecuteComputeFunction( 324 &run_options->run_options(), arguments, buffers, hlo_execution_profile)); 325 326 // Copy DeviceMemoryBase values which contain the array(s) of the result into 327 // the respective location in ShapedBuffer which is returned to the caller. 328 std::vector<bool> buffers_in_result(assignment_->Allocations().size(), false); 329 TF_RETURN_IF_ERROR( 330 result_buffer->mutable_shape_index_to_buffer_entry() 331 ->ForEachMutableElementWithStatus( 332 [&buffers, &buffers_in_result, &result_buffer, this]( 333 const ShapeIndex& index, size_t* buffer_entry) { 334 const auto& sources = this->GetRootPointsToSet().element(index); 335 // The points to set is unambiguous so the set should be a 336 // singleton. 337 CHECK_EQ(1, sources.size()); 338 const LogicalBuffer* buffer_source = sources[0]; 339 HloInstruction* src = buffer_source->instruction(); 340 341 // The source for this result buffer can be a nested buffer 342 // such as a tuple element. 343 344 // The source instruction should have a non-parameter buffer 345 // assigned. 346 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice slice, 347 this->assignment_->GetUniqueSlice( 348 src, buffer_source->index())); 349 CHECK(!slice.allocation()->is_entry_computation_parameter()); 350 351 const BufferAllocation::Index buffer_index = slice.index(); 352 const se::DeviceMemoryBase& buffer = buffers[buffer_index]; 353 CHECK(!buffer.is_null() || buffer.size() == 0); 354 *buffer_entry = result_buffer->mutable_buffers()->size(); 355 result_buffer->mutable_buffers()->push_back(buffer); 356 buffers_in_result[buffer_index] = true; 357 return Status::OK(); 358 })); 359 360 // Free all buffers not in the result. 361 for (size_t i = 0; i < buffers.size(); ++i) { 362 se::DeviceMemoryBase alloc = buffers[i]; 363 if (!buffers_in_result[i] && !alloc.is_null()) { 364 VLOG(3) << "CpuExecutable deallocating buffer #" << i << " [" 365 << alloc.opaque() << "]"; 366 TF_RETURN_IF_ERROR(memory_allocator->Deallocate( 367 stream->parent()->device_ordinal(), &alloc)); 368 } 369 } 370 371 return std::move(result_buffer); 372} 373 374StatusOr<perftools::gputools::DeviceMemoryBase> 375CpuExecutable::ExecuteAsyncOnStream( 376 const ServiceExecutableRunOptions* run_options, 377 tensorflow::gtl::ArraySlice<se::DeviceMemoryBase> arguments) { 378 if (hlo_profiling_enabled()) { 379 return Unimplemented( 380 "Asynchronous execution on stream with hlo profiling is not yet " 381 "supported on CPU."); 382 } 383 384 auto* host_stream = dynamic_cast<perftools::gputools::host::HostStream*>( 385 run_options->stream()->implementation()); 386 se::Stream* stream = run_options->stream(); 387 DeviceMemoryAllocator* memory_allocator = run_options->allocator(); 388 std::vector<se::DeviceMemoryBase> buffers(assignment_->Allocations().size()); 389 390 TF_RETURN_IF_ERROR(AllocateBuffers( 391 memory_allocator, stream->parent()->device_ordinal(), &buffers)); 392 393 // Mark the buffers that are actually live (used in the output) when the 394 // computation finishes executing. 395 std::unordered_set<const void*> marked_addresses; 396 TF_ASSIGN_OR_RETURN(const BufferAllocation::Slice result_slice, 397 assignment_->GetUniqueTopLevelOutputSlice()); 398 se::DeviceMemoryBase top_level_output = buffers[result_slice.index()]; 399 MarkLiveAddressesInOutput(top_level_output.opaque(), result_shape(), 400 &marked_addresses); 401 402 LogLiveAddresses(marked_addresses); 403 404 host_stream->EnqueueTask([this, run_options, arguments, buffers, 405 marked_addresses, memory_allocator, stream]() { 406 // Failing a CHECK here is not great, but I don't see an obvious way to 407 // return a failed Status asynchronously. 408 TF_CHECK_OK(ExecuteComputeFunction(&run_options->run_options(), arguments, 409 buffers, 410 /*hlo_execution_profile=*/nullptr)); 411 TF_CHECK_OK(DeallocateTempBuffers(memory_allocator, stream, buffers, 412 marked_addresses)); 413 }); 414 415 return top_level_output; 416} 417 418/*static*/ int64 CpuExecutable::ShapeSizeBytes(const Shape& shape) { 419 // On the cpu, opaques are pointers. 420 if (ShapeUtil::IsOpaque(shape)) { 421 return sizeof(void*); 422 } 423 return ShapeUtil::ByteSizeOf(shape, sizeof(void*)); 424} 425 426const PointsToSet& CpuExecutable::GetRootPointsToSet() const { 427 return assignment_->points_to_analysis().GetPointsToSet( 428 module().entry_computation()->root_instruction()); 429} 430 431std::unique_ptr<HloCostAnalysis> CpuExecutable::CreateCostAnalysis() const { 432 return MakeUnique<HloCostAnalysis>(ShapeSizeBytes); 433} 434 435} // namespace cpu 436} // namespace xla 437