1c8b59c046895fa5b6d79f73e0b5817330fcfbfc1A. Unique TensorFlower/* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 200986d48bb646daab659503ad3a713919865f32dDerek Murray 300986d48bb646daab659503ad3a713919865f32dDerek MurrayLicensed under the Apache License, Version 2.0 (the "License"); 400986d48bb646daab659503ad3a713919865f32dDerek Murrayyou may not use this file except in compliance with the License. 500986d48bb646daab659503ad3a713919865f32dDerek MurrayYou may obtain a copy of the License at 600986d48bb646daab659503ad3a713919865f32dDerek Murray 700986d48bb646daab659503ad3a713919865f32dDerek Murray http://www.apache.org/licenses/LICENSE-2.0 800986d48bb646daab659503ad3a713919865f32dDerek Murray 900986d48bb646daab659503ad3a713919865f32dDerek MurrayUnless required by applicable law or agreed to in writing, software 1000986d48bb646daab659503ad3a713919865f32dDerek Murraydistributed under the License is distributed on an "AS IS" BASIS, 1100986d48bb646daab659503ad3a713919865f32dDerek MurrayWITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1200986d48bb646daab659503ad3a713919865f32dDerek MurraySee the License for the specific language governing permissions and 1300986d48bb646daab659503ad3a713919865f32dDerek Murraylimitations under the License. 1400986d48bb646daab659503ad3a713919865f32dDerek Murray==============================================================================*/ 1500986d48bb646daab659503ad3a713919865f32dDerek Murray 1600986d48bb646daab659503ad3a713919865f32dDerek Murray#include "tensorflow/core/distributed_runtime/worker_cache_partial.h" 1700986d48bb646daab659503ad3a713919865f32dDerek Murray 180f70b8e4b72109a2f99a6235da2f17ec142040adVijay Vasudevan#include "tensorflow/core/common_runtime/process_util.h" 1900986d48bb646daab659503ad3a713919865f32dDerek Murray#include "tensorflow/core/distributed_runtime/worker_interface.h" 2000986d48bb646daab659503ad3a713919865f32dDerek Murray#include "tensorflow/core/lib/core/errors.h" 2100986d48bb646daab659503ad3a713919865f32dDerek Murray#include "tensorflow/core/lib/core/status.h" 2200986d48bb646daab659503ad3a713919865f32dDerek Murray#include "tensorflow/core/platform/logging.h" 2300986d48bb646daab659503ad3a713919865f32dDerek Murray#include "tensorflow/core/platform/mutex.h" 2400986d48bb646daab659503ad3a713919865f32dDerek Murray#include "tensorflow/core/platform/types.h" 2500986d48bb646daab659503ad3a713919865f32dDerek Murray#include "tensorflow/core/util/device_name_utils.h" 2600986d48bb646daab659503ad3a713919865f32dDerek Murray 2700986d48bb646daab659503ad3a713919865f32dDerek Murraynamespace tensorflow { 2800986d48bb646daab659503ad3a713919865f32dDerek Murray 2979228c74e64a639aeb5692b442522d4aa279f885A. Unique TensorFlowerbool WorkerCachePartial::GetDeviceLocalityNonBlocking( 3079228c74e64a639aeb5692b442522d4aa279f885A. Unique TensorFlower const string& device_name, DeviceLocality* locality) { 3100986d48bb646daab659503ad3a713919865f32dDerek Murray mutex_lock lock(mu_); // could use reader lock 32fc60eca2b2400a5d622c175393b2aa2f78ee7600horance auto iter = device_status_cache_.find(device_name); 3300986d48bb646daab659503ad3a713919865f32dDerek Murray if (iter != device_status_cache_.end()) { 3479228c74e64a639aeb5692b442522d4aa279f885A. Unique TensorFlower *locality = iter->second.locality(); 3500986d48bb646daab659503ad3a713919865f32dDerek Murray return true; 3600986d48bb646daab659503ad3a713919865f32dDerek Murray } 3700986d48bb646daab659503ad3a713919865f32dDerek Murray return false; 3800986d48bb646daab659503ad3a713919865f32dDerek Murray} 3900986d48bb646daab659503ad3a713919865f32dDerek Murray 4079228c74e64a639aeb5692b442522d4aa279f885A. Unique TensorFlowervoid WorkerCachePartial::GetDeviceLocalityAsync(const string& device_name, 4179228c74e64a639aeb5692b442522d4aa279f885A. Unique TensorFlower DeviceLocality* locality, 4279228c74e64a639aeb5692b442522d4aa279f885A. Unique TensorFlower StatusCallback done) { 4379228c74e64a639aeb5692b442522d4aa279f885A. Unique TensorFlower if (!GetDeviceLocalityNonBlocking(device_name, locality)) { 4400986d48bb646daab659503ad3a713919865f32dDerek Murray // If cache entry was empty, make one try to fill it by RPC. 4579228c74e64a639aeb5692b442522d4aa279f885A. Unique TensorFlower SchedClosure([this, &device_name, locality, done]() { 4600986d48bb646daab659503ad3a713919865f32dDerek Murray Status s = RefreshDeviceStatus(device_name); 47fc60eca2b2400a5d622c175393b2aa2f78ee7600horance if (s.ok() && !GetDeviceLocalityNonBlocking(device_name, locality)) { 48fc60eca2b2400a5d622c175393b2aa2f78ee7600horance s = errors::Unavailable("No known remote device: ", device_name); 4900986d48bb646daab659503ad3a713919865f32dDerek Murray } 5000986d48bb646daab659503ad3a713919865f32dDerek Murray done(s); 5100986d48bb646daab659503ad3a713919865f32dDerek Murray }); 5200986d48bb646daab659503ad3a713919865f32dDerek Murray return; 5300986d48bb646daab659503ad3a713919865f32dDerek Murray } 5400986d48bb646daab659503ad3a713919865f32dDerek Murray done(Status::OK()); 5500986d48bb646daab659503ad3a713919865f32dDerek Murray} 5600986d48bb646daab659503ad3a713919865f32dDerek Murray 5700986d48bb646daab659503ad3a713919865f32dDerek MurrayStatus WorkerCachePartial::RefreshDeviceStatus(const string& device_name) { 5800986d48bb646daab659503ad3a713919865f32dDerek Murray string task; 5900986d48bb646daab659503ad3a713919865f32dDerek Murray string device; 6000986d48bb646daab659503ad3a713919865f32dDerek Murray Status s; 6100986d48bb646daab659503ad3a713919865f32dDerek Murray if (!DeviceNameUtils::SplitDeviceName(device_name, &task, &device)) { 6200986d48bb646daab659503ad3a713919865f32dDerek Murray s = errors::InvalidArgument("Bad device name to RefreshDeviceStatus: ", 6300986d48bb646daab659503ad3a713919865f32dDerek Murray device_name); 6400986d48bb646daab659503ad3a713919865f32dDerek Murray } 65fc60eca2b2400a5d622c175393b2aa2f78ee7600horance auto deleter = [this, &task](WorkerInterface* wi) { 66fc60eca2b2400a5d622c175393b2aa2f78ee7600horance ReleaseWorker(task, wi); 67fc60eca2b2400a5d622c175393b2aa2f78ee7600horance }; 686a2616e360daa08ad175d9856fd2e2fb1b4b2af5Derek Murray std::unique_ptr<WorkerInterface, decltype(deleter)> rwi(CreateWorker(task), 696a2616e360daa08ad175d9856fd2e2fb1b4b2af5Derek Murray deleter); 7000986d48bb646daab659503ad3a713919865f32dDerek Murray if (s.ok() && !rwi.get()) { 7100986d48bb646daab659503ad3a713919865f32dDerek Murray s = errors::Internal("RefreshDeviceStatus, unknown worker task: ", task); 7200986d48bb646daab659503ad3a713919865f32dDerek Murray } 7300986d48bb646daab659503ad3a713919865f32dDerek Murray 7400986d48bb646daab659503ad3a713919865f32dDerek Murray if (s.ok()) { 7500986d48bb646daab659503ad3a713919865f32dDerek Murray GetStatusRequest req; 7600986d48bb646daab659503ad3a713919865f32dDerek Murray GetStatusResponse resp; 7700986d48bb646daab659503ad3a713919865f32dDerek Murray s = rwi->GetStatus(&req, &resp); 7800986d48bb646daab659503ad3a713919865f32dDerek Murray if (s.ok()) { 7900986d48bb646daab659503ad3a713919865f32dDerek Murray mutex_lock lock(mu_); 8000986d48bb646daab659503ad3a713919865f32dDerek Murray for (auto& dev_attr : resp.device_attributes()) { 8100986d48bb646daab659503ad3a713919865f32dDerek Murray device_status_cache_[dev_attr.name()] = dev_attr; 8200986d48bb646daab659503ad3a713919865f32dDerek Murray } 8300986d48bb646daab659503ad3a713919865f32dDerek Murray } 8400986d48bb646daab659503ad3a713919865f32dDerek Murray } 8500986d48bb646daab659503ad3a713919865f32dDerek Murray return s; 8600986d48bb646daab659503ad3a713919865f32dDerek Murray} 8700986d48bb646daab659503ad3a713919865f32dDerek Murray 8800986d48bb646daab659503ad3a713919865f32dDerek Murrayvoid WorkerCachePartial::FlushStatusCache() { 8900986d48bb646daab659503ad3a713919865f32dDerek Murray mutex_lock lock(mu_); 9000986d48bb646daab659503ad3a713919865f32dDerek Murray device_status_cache_.clear(); 9100986d48bb646daab659503ad3a713919865f32dDerek Murray} 9200986d48bb646daab659503ad3a713919865f32dDerek Murray 9300986d48bb646daab659503ad3a713919865f32dDerek Murray} // namespace tensorflow 94