sync_task_manager.cc revision cedac228d2dd51db4b79ea1e72c7f249408ee061
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h" 6 7#include "base/bind.h" 8#include "base/location.h" 9#include "base/memory/scoped_ptr.h" 10#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h" 11#include "chrome/browser/sync_file_system/sync_file_metadata.h" 12 13using fileapi::FileSystemURL; 14 15namespace sync_file_system { 16namespace drive_backend { 17 18namespace { 19 20class SyncTaskAdapter : public ExclusiveTask { 21 public: 22 explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {} 23 virtual ~SyncTaskAdapter() {} 24 25 virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE { 26 task_.Run(callback); 27 } 28 29 private: 30 SyncTaskManager::Task task_; 31 32 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter); 33}; 34 35} // namespace 36 37SyncTaskManager::PendingTask::PendingTask() {} 38 39SyncTaskManager::PendingTask::PendingTask( 40 const base::Closure& task, Priority pri, int seq) 41 : task(task), priority(pri), seq(seq) {} 42 43SyncTaskManager::PendingTask::~PendingTask() {} 44 45bool SyncTaskManager::PendingTaskComparator::operator()( 46 const PendingTask& left, 47 const PendingTask& right) const { 48 if (left.priority != right.priority) 49 return left.priority < right.priority; 50 return left.seq > right.seq; 51} 52 53SyncTaskManager::SyncTaskManager( 54 base::WeakPtr<Client> client, 55 size_t maximum_background_task) 56 : client_(client), 57 maximum_background_task_(maximum_background_task), 58 pending_task_seq_(0), 59 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID) { 60} 61 62SyncTaskManager::~SyncTaskManager() { 63 client_.reset(); 64 token_.reset(); 65} 66 67void SyncTaskManager::Initialize(SyncStatusCode status) { 68 DCHECK(!token_); 69 NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()), 70 status); 71} 72 73void SyncTaskManager::ScheduleTask( 74 const tracked_objects::Location& from_here, 75 const Task& task, 76 Priority priority, 77 const SyncStatusCallback& callback) { 78 ScheduleSyncTask(from_here, 79 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 80 priority, 81 callback); 82} 83 84void SyncTaskManager::ScheduleSyncTask( 85 const tracked_objects::Location& from_here, 86 scoped_ptr<SyncTask> task, 87 Priority priority, 88 const SyncStatusCallback& callback) { 89 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 90 if (!token) { 91 PushPendingTask( 92 base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here, 93 base::Passed(&task), priority, callback), 94 priority); 95 return; 96 } 97 RunTask(token.Pass(), task.Pass()); 98} 99 100bool SyncTaskManager::ScheduleTaskIfIdle( 101 const tracked_objects::Location& from_here, 102 const Task& task, 103 const SyncStatusCallback& callback) { 104 return ScheduleSyncTaskIfIdle( 105 from_here, 106 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 107 callback); 108} 109 110bool SyncTaskManager::ScheduleSyncTaskIfIdle( 111 const tracked_objects::Location& from_here, 112 scoped_ptr<SyncTask> task, 113 const SyncStatusCallback& callback) { 114 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 115 if (!token) 116 return false; 117 RunTask(token.Pass(), task.Pass()); 118 return true; 119} 120 121// static 122void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token, 123 SyncStatusCode status) { 124 DCHECK(token); 125 126 SyncTaskManager* manager = token->manager(); 127 if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 128 DCHECK(!manager); 129 SyncStatusCallback callback = token->callback(); 130 token->clear_callback(); 131 callback.Run(status); 132 return; 133 } 134 135 if (manager) 136 manager->NotifyTaskDoneBody(token.Pass(), status); 137} 138 139// static 140void SyncTaskManager::UpdateBlockingFactor( 141 scoped_ptr<SyncTaskToken> current_task_token, 142 scoped_ptr<BlockingFactor> blocking_factor, 143 const Continuation& continuation) { 144 DCHECK(current_task_token); 145 146 SyncTaskManager* manager = current_task_token->manager(); 147 if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 148 DCHECK(!manager); 149 continuation.Run(current_task_token.Pass()); 150 return; 151 } 152 153 if (!manager) 154 return; 155 156 scoped_ptr<SyncTaskToken> foreground_task_token; 157 scoped_ptr<SyncTaskToken> background_task_token; 158 scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog(); 159 if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID) 160 foreground_task_token = current_task_token.Pass(); 161 else 162 background_task_token = current_task_token.Pass(); 163 164 manager->UpdateBlockingFactorBody(foreground_task_token.Pass(), 165 background_task_token.Pass(), 166 task_log.Pass(), 167 blocking_factor.Pass(), 168 continuation); 169} 170 171bool SyncTaskManager::IsRunningTask(int64 token_id) const { 172 // If the client is gone, all task should be aborted. 173 if (!client_) 174 return false; 175 176 if (token_id == SyncTaskToken::kForegroundTaskTokenID) 177 return true; 178 179 return ContainsKey(running_background_tasks_, token_id); 180} 181 182void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token, 183 SyncStatusCode status) { 184 DCHECK(token); 185 186 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status 187 << " (" << SyncStatusCodeToString(status) << ")" 188 << " " << token_->location().ToString(); 189 190 if (token->blocking_factor()) { 191 dependency_manager_.Erase(token->blocking_factor()); 192 token->clear_blocking_factor(); 193 } 194 195 if (client_) 196 client_->RecordTaskLog(token->PassTaskLog()); 197 198 scoped_ptr<SyncTask> task; 199 SyncStatusCallback callback = token->callback(); 200 token->clear_callback(); 201 if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) { 202 token_ = token.Pass(); 203 task = running_foreground_task_.Pass(); 204 } else { 205 task = running_background_tasks_.take_and_erase(token->token_id()); 206 } 207 208 bool task_used_network = false; 209 if (task) 210 task_used_network = task->used_network(); 211 212 if (client_) 213 client_->NotifyLastOperationStatus(status, task_used_network); 214 215 if (!callback.is_null()) 216 callback.Run(status); 217 218 StartNextTask(); 219} 220 221void SyncTaskManager::UpdateBlockingFactorBody( 222 scoped_ptr<SyncTaskToken> foreground_task_token, 223 scoped_ptr<SyncTaskToken> background_task_token, 224 scoped_ptr<TaskLogger::TaskLog> task_log, 225 scoped_ptr<BlockingFactor> blocking_factor, 226 const Continuation& continuation) { 227 // Run the task directly if the parallelization is disabled. 228 if (!maximum_background_task_) { 229 DCHECK(foreground_task_token); 230 DCHECK(!background_task_token); 231 continuation.Run(foreground_task_token.Pass()); 232 return; 233 } 234 235 // Clear existing |blocking_factor| from |dependency_manager_| before 236 // getting |foreground_task_token|, so that we can avoid dead lock. 237 if (background_task_token && background_task_token->blocking_factor()) { 238 dependency_manager_.Erase(background_task_token->blocking_factor()); 239 background_task_token->clear_blocking_factor(); 240 } 241 242 // Try to get |foreground_task_token|. If it's not available, wait for 243 // current foreground task to finish. 244 if (!foreground_task_token) { 245 DCHECK(background_task_token); 246 foreground_task_token = GetToken(background_task_token->location(), 247 SyncStatusCallback()); 248 if (!foreground_task_token) { 249 PushPendingTask( 250 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody, 251 AsWeakPtr(), 252 base::Passed(&foreground_task_token), 253 base::Passed(&background_task_token), 254 base::Passed(&task_log), 255 base::Passed(&blocking_factor), 256 continuation), 257 PRIORITY_HIGH); 258 StartNextTask(); 259 return; 260 } 261 } 262 263 // Check if the task can run as a background task now. 264 // If there are too many task running or any other task blocks current 265 // task, wait for any other task to finish. 266 bool task_number_limit_exceeded = 267 !background_task_token && 268 running_background_tasks_.size() >= maximum_background_task_; 269 if (task_number_limit_exceeded || 270 !dependency_manager_.Insert(blocking_factor.get())) { 271 DCHECK(!running_background_tasks_.empty()); 272 DCHECK(pending_backgrounding_task_.is_null()); 273 274 // Wait for NotifyTaskDone to release a |blocking_factor|. 275 pending_backgrounding_task_ = 276 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody, 277 AsWeakPtr(), 278 base::Passed(&foreground_task_token), 279 base::Passed(&background_task_token), 280 base::Passed(&task_log), 281 base::Passed(&blocking_factor), 282 continuation); 283 return; 284 } 285 286 if (background_task_token) { 287 background_task_token->set_blocking_factor(blocking_factor.Pass()); 288 } else { 289 tracked_objects::Location from_here = foreground_task_token->location(); 290 SyncStatusCallback callback = foreground_task_token->callback(); 291 foreground_task_token->clear_callback(); 292 293 background_task_token = 294 SyncTaskToken::CreateForBackgroundTask( 295 AsWeakPtr(), 296 task_token_seq_++, 297 blocking_factor.Pass()); 298 background_task_token->UpdateTask(from_here, callback); 299 running_background_tasks_.set(background_task_token->token_id(), 300 running_foreground_task_.Pass()); 301 } 302 303 token_ = foreground_task_token.Pass(); 304 StartNextTask(); 305 background_task_token->SetTaskLog(task_log.Pass()); 306 continuation.Run(background_task_token.Pass()); 307} 308 309scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken( 310 const tracked_objects::Location& from_here, 311 const SyncStatusCallback& callback) { 312 if (!token_) 313 return scoped_ptr<SyncTaskToken>(); 314 token_->UpdateTask(from_here, callback); 315 return token_.Pass(); 316} 317 318void SyncTaskManager::PushPendingTask( 319 const base::Closure& closure, Priority priority) { 320 pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++)); 321} 322 323void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token, 324 scoped_ptr<SyncTask> task) { 325 DCHECK(!running_foreground_task_); 326 327 token->SetTaskLog(make_scoped_ptr(new TaskLogger::TaskLog)); 328 329 running_foreground_task_ = task.Pass(); 330 running_foreground_task_->RunPreflight(token.Pass()); 331} 332 333void SyncTaskManager::StartNextTask() { 334 if (!pending_backgrounding_task_.is_null()) { 335 base::Closure closure = pending_backgrounding_task_; 336 pending_backgrounding_task_.Reset(); 337 closure.Run(); 338 return; 339 } 340 341 if (!pending_tasks_.empty()) { 342 base::Closure closure = pending_tasks_.top().task; 343 pending_tasks_.pop(); 344 closure.Run(); 345 return; 346 } 347 348 if (client_) 349 client_->MaybeScheduleNextTask(); 350} 351 352} // namespace drive_backend 353} // namespace sync_file_system 354