sync_task_manager.cc revision 6d86b77056ed63eb6871182f42a9fd5f07550f90
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h" 6 7#include "base/bind.h" 8#include "base/location.h" 9#include "base/memory/scoped_ptr.h" 10#include "base/sequenced_task_runner.h" 11#include "chrome/browser/sync_file_system/drive_backend/sync_task.h" 12#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h" 13#include "chrome/browser/sync_file_system/sync_file_metadata.h" 14 15using fileapi::FileSystemURL; 16 17namespace sync_file_system { 18namespace drive_backend { 19 20namespace { 21 22class SyncTaskAdapter : public ExclusiveTask { 23 public: 24 explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {} 25 virtual ~SyncTaskAdapter() {} 26 27 virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE { 28 task_.Run(callback); 29 } 30 31 private: 32 SyncTaskManager::Task task_; 33 34 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter); 35}; 36 37} // namespace 38 39SyncTaskManager::PendingTask::PendingTask() {} 40 41SyncTaskManager::PendingTask::PendingTask( 42 const base::Closure& task, Priority pri, int seq) 43 : task(task), priority(pri), seq(seq) {} 44 45SyncTaskManager::PendingTask::~PendingTask() {} 46 47bool SyncTaskManager::PendingTaskComparator::operator()( 48 const PendingTask& left, 49 const PendingTask& right) const { 50 if (left.priority != right.priority) 51 return left.priority < right.priority; 52 return left.seq > right.seq; 53} 54 55SyncTaskManager::SyncTaskManager( 56 base::WeakPtr<Client> client, 57 size_t maximum_background_task, 58 base::SequencedTaskRunner* task_runner) 59 : client_(client), 60 maximum_background_task_(maximum_background_task), 61 pending_task_seq_(0), 62 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID), 63 task_runner_(task_runner) { 64} 65 66SyncTaskManager::~SyncTaskManager() { 67 client_.reset(); 68 token_.reset(); 69} 70 71void SyncTaskManager::Initialize(SyncStatusCode status) { 72 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 73 DCHECK(!token_); 74 NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()), 75 status); 76} 77 78void SyncTaskManager::ScheduleTask( 79 const tracked_objects::Location& from_here, 80 const Task& task, 81 Priority priority, 82 const SyncStatusCallback& callback) { 83 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 84 85 ScheduleSyncTask(from_here, 86 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 87 priority, 88 callback); 89} 90 91void SyncTaskManager::ScheduleSyncTask( 92 const tracked_objects::Location& from_here, 93 scoped_ptr<SyncTask> task, 94 Priority priority, 95 const SyncStatusCallback& callback) { 96 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 97 98 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 99 if (!token) { 100 PushPendingTask( 101 base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here, 102 base::Passed(&task), priority, callback), 103 priority); 104 return; 105 } 106 RunTask(token.Pass(), task.Pass()); 107} 108 109bool SyncTaskManager::ScheduleTaskIfIdle( 110 const tracked_objects::Location& from_here, 111 const Task& task, 112 const SyncStatusCallback& callback) { 113 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 114 115 return ScheduleSyncTaskIfIdle( 116 from_here, 117 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 118 callback); 119} 120 121bool SyncTaskManager::ScheduleSyncTaskIfIdle( 122 const tracked_objects::Location& from_here, 123 scoped_ptr<SyncTask> task, 124 const SyncStatusCallback& callback) { 125 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 126 127 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 128 if (!token) 129 return false; 130 RunTask(token.Pass(), task.Pass()); 131 return true; 132} 133 134// static 135void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token, 136 SyncStatusCode status) { 137 DCHECK(token); 138 139 SyncTaskManager* manager = token->manager(); 140 if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 141 DCHECK(!manager); 142 SyncStatusCallback callback = token->callback(); 143 token->clear_callback(); 144 callback.Run(status); 145 return; 146 } 147 148 if (manager) 149 manager->NotifyTaskDoneBody(token.Pass(), status); 150} 151 152// static 153void SyncTaskManager::UpdateBlockingFactor( 154 scoped_ptr<SyncTaskToken> current_task_token, 155 scoped_ptr<BlockingFactor> blocking_factor, 156 const Continuation& continuation) { 157 DCHECK(current_task_token); 158 159 SyncTaskManager* manager = current_task_token->manager(); 160 if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 161 DCHECK(!manager); 162 continuation.Run(current_task_token.Pass()); 163 return; 164 } 165 166 if (!manager) 167 return; 168 169 scoped_ptr<SyncTaskToken> foreground_task_token; 170 scoped_ptr<SyncTaskToken> background_task_token; 171 scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog(); 172 if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID) 173 foreground_task_token = current_task_token.Pass(); 174 else 175 background_task_token = current_task_token.Pass(); 176 177 manager->UpdateBlockingFactorBody(foreground_task_token.Pass(), 178 background_task_token.Pass(), 179 task_log.Pass(), 180 blocking_factor.Pass(), 181 continuation); 182} 183 184bool SyncTaskManager::IsRunningTask(int64 token_id) const { 185 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 186 187 // If the client is gone, all task should be aborted. 188 if (!client_) 189 return false; 190 191 if (token_id == SyncTaskToken::kForegroundTaskTokenID) 192 return true; 193 194 return ContainsKey(running_background_tasks_, token_id); 195} 196 197void SyncTaskManager::DetachFromSequence() { 198 sequence_checker_.DetachFromSequence(); 199} 200 201void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token, 202 SyncStatusCode status) { 203 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 204 DCHECK(token); 205 206 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status 207 << " (" << SyncStatusCodeToString(status) << ")" 208 << " " << token_->location().ToString(); 209 210 if (token->blocking_factor()) { 211 dependency_manager_.Erase(token->blocking_factor()); 212 token->clear_blocking_factor(); 213 } 214 215 if (client_) { 216 if (token->has_task_log()) { 217 token->FinalizeTaskLog(SyncStatusCodeToString(status)); 218 client_->RecordTaskLog(token->PassTaskLog()); 219 } 220 } 221 222 scoped_ptr<SyncTask> task; 223 SyncStatusCallback callback = token->callback(); 224 token->clear_callback(); 225 if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) { 226 token_ = token.Pass(); 227 task = running_foreground_task_.Pass(); 228 } else { 229 task = running_background_tasks_.take_and_erase(token->token_id()); 230 } 231 232 // Acquire the token to prevent a new task to jump into the queue. 233 token = token_.Pass(); 234 235 bool task_used_network = false; 236 if (task) 237 task_used_network = task->used_network(); 238 239 if (client_) 240 client_->NotifyLastOperationStatus(status, task_used_network); 241 242 if (!callback.is_null()) 243 callback.Run(status); 244 245 // Post MaybeStartNextForegroundTask rather than calling it directly to avoid 246 // making the call-chaing longer. 247 task_runner_->PostTask( 248 FROM_HERE, 249 base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask, 250 AsWeakPtr(), base::Passed(&token))); 251} 252 253void SyncTaskManager::UpdateBlockingFactorBody( 254 scoped_ptr<SyncTaskToken> foreground_task_token, 255 scoped_ptr<SyncTaskToken> background_task_token, 256 scoped_ptr<TaskLogger::TaskLog> task_log, 257 scoped_ptr<BlockingFactor> blocking_factor, 258 const Continuation& continuation) { 259 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 260 261 // Run the task directly if the parallelization is disabled. 262 if (!maximum_background_task_) { 263 DCHECK(foreground_task_token); 264 DCHECK(!background_task_token); 265 foreground_task_token->SetTaskLog(task_log.Pass()); 266 continuation.Run(foreground_task_token.Pass()); 267 return; 268 } 269 270 // Clear existing |blocking_factor| from |dependency_manager_| before 271 // getting |foreground_task_token|, so that we can avoid dead lock. 272 if (background_task_token && background_task_token->blocking_factor()) { 273 dependency_manager_.Erase(background_task_token->blocking_factor()); 274 background_task_token->clear_blocking_factor(); 275 } 276 277 // Try to get |foreground_task_token|. If it's not available, wait for 278 // current foreground task to finish. 279 if (!foreground_task_token) { 280 DCHECK(background_task_token); 281 foreground_task_token = GetToken(background_task_token->location(), 282 SyncStatusCallback()); 283 if (!foreground_task_token) { 284 PushPendingTask( 285 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody, 286 AsWeakPtr(), 287 base::Passed(&foreground_task_token), 288 base::Passed(&background_task_token), 289 base::Passed(&task_log), 290 base::Passed(&blocking_factor), 291 continuation), 292 PRIORITY_HIGH); 293 MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>()); 294 return; 295 } 296 } 297 298 // Check if the task can run as a background task now. 299 // If there are too many task running or any other task blocks current 300 // task, wait for any other task to finish. 301 bool task_number_limit_exceeded = 302 !background_task_token && 303 running_background_tasks_.size() >= maximum_background_task_; 304 if (task_number_limit_exceeded || 305 !dependency_manager_.Insert(blocking_factor.get())) { 306 DCHECK(!running_background_tasks_.empty()); 307 DCHECK(pending_backgrounding_task_.is_null()); 308 309 // Wait for NotifyTaskDone to release a |blocking_factor|. 310 pending_backgrounding_task_ = 311 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody, 312 AsWeakPtr(), 313 base::Passed(&foreground_task_token), 314 base::Passed(&background_task_token), 315 base::Passed(&task_log), 316 base::Passed(&blocking_factor), 317 continuation); 318 return; 319 } 320 321 if (background_task_token) { 322 background_task_token->set_blocking_factor(blocking_factor.Pass()); 323 } else { 324 tracked_objects::Location from_here = foreground_task_token->location(); 325 SyncStatusCallback callback = foreground_task_token->callback(); 326 foreground_task_token->clear_callback(); 327 328 background_task_token = 329 SyncTaskToken::CreateForBackgroundTask( 330 AsWeakPtr(), 331 task_token_seq_++, 332 blocking_factor.Pass()); 333 background_task_token->UpdateTask(from_here, callback); 334 running_background_tasks_.set(background_task_token->token_id(), 335 running_foreground_task_.Pass()); 336 } 337 338 token_ = foreground_task_token.Pass(); 339 MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>()); 340 background_task_token->SetTaskLog(task_log.Pass()); 341 continuation.Run(background_task_token.Pass()); 342} 343 344scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken( 345 const tracked_objects::Location& from_here, 346 const SyncStatusCallback& callback) { 347 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 348 349 if (!token_) 350 return scoped_ptr<SyncTaskToken>(); 351 token_->UpdateTask(from_here, callback); 352 return token_.Pass(); 353} 354 355void SyncTaskManager::PushPendingTask( 356 const base::Closure& closure, Priority priority) { 357 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 358 359 pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++)); 360} 361 362void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token, 363 scoped_ptr<SyncTask> task) { 364 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 365 DCHECK(!running_foreground_task_); 366 367 running_foreground_task_ = task.Pass(); 368 running_foreground_task_->RunPreflight(token.Pass()); 369} 370 371void SyncTaskManager::MaybeStartNextForegroundTask( 372 scoped_ptr<SyncTaskToken> token) { 373 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 374 375 if (token) { 376 DCHECK(!token_); 377 token_ = token.Pass(); 378 } 379 380 if (!pending_backgrounding_task_.is_null()) { 381 base::Closure closure = pending_backgrounding_task_; 382 pending_backgrounding_task_.Reset(); 383 closure.Run(); 384 return; 385 } 386 387 if (!token_) 388 return; 389 390 if (!pending_tasks_.empty()) { 391 base::Closure closure = pending_tasks_.top().task; 392 pending_tasks_.pop(); 393 closure.Run(); 394 return; 395 } 396 397 if (client_) 398 client_->MaybeScheduleNextTask(); 399} 400 401} // namespace drive_backend 402} // namespace sync_file_system 403