sync_task_manager.cc revision 46d4c2bc3267f3f028f39e7e311b0f89aba2e4fd
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h" 6 7#include "base/bind.h" 8#include "base/location.h" 9#include "base/memory/scoped_ptr.h" 10#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h" 11#include "chrome/browser/sync_file_system/sync_file_metadata.h" 12 13using fileapi::FileSystemURL; 14 15namespace sync_file_system { 16namespace drive_backend { 17 18namespace { 19 20class SyncTaskAdapter : public ExclusiveTask { 21 public: 22 explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {} 23 virtual ~SyncTaskAdapter() {} 24 25 virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE { 26 task_.Run(callback); 27 } 28 29 private: 30 SyncTaskManager::Task task_; 31 32 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter); 33}; 34 35} // namespace 36 37SyncTaskManager::PendingTask::PendingTask() {} 38 39SyncTaskManager::PendingTask::PendingTask( 40 const base::Closure& task, Priority pri, int seq) 41 : task(task), priority(pri), seq(seq) {} 42 43SyncTaskManager::PendingTask::~PendingTask() {} 44 45bool SyncTaskManager::PendingTaskComparator::operator()( 46 const PendingTask& left, 47 const PendingTask& right) const { 48 if (left.priority != right.priority) 49 return left.priority < right.priority; 50 return left.seq > right.seq; 51} 52 53SyncTaskManager::SyncTaskManager( 54 base::WeakPtr<Client> client, 55 size_t maximum_background_task) 56 : client_(client), 57 maximum_background_task_(maximum_background_task), 58 pending_task_seq_(0), 59 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID) { 60} 61 62SyncTaskManager::~SyncTaskManager() { 63 client_.reset(); 64 token_.reset(); 65} 66 67void SyncTaskManager::Initialize(SyncStatusCode status) { 68 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 69 DCHECK(!token_); 70 NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()), 71 status); 72} 73 74void SyncTaskManager::ScheduleTask( 75 const tracked_objects::Location& from_here, 76 const Task& task, 77 Priority priority, 78 const SyncStatusCallback& callback) { 79 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 80 81 ScheduleSyncTask(from_here, 82 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 83 priority, 84 callback); 85} 86 87void SyncTaskManager::ScheduleSyncTask( 88 const tracked_objects::Location& from_here, 89 scoped_ptr<SyncTask> task, 90 Priority priority, 91 const SyncStatusCallback& callback) { 92 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 93 94 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 95 if (!token) { 96 PushPendingTask( 97 base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here, 98 base::Passed(&task), priority, callback), 99 priority); 100 return; 101 } 102 RunTask(token.Pass(), task.Pass()); 103} 104 105bool SyncTaskManager::ScheduleTaskIfIdle( 106 const tracked_objects::Location& from_here, 107 const Task& task, 108 const SyncStatusCallback& callback) { 109 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 110 111 return ScheduleSyncTaskIfIdle( 112 from_here, 113 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 114 callback); 115} 116 117bool SyncTaskManager::ScheduleSyncTaskIfIdle( 118 const tracked_objects::Location& from_here, 119 scoped_ptr<SyncTask> task, 120 const SyncStatusCallback& callback) { 121 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 122 123 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 124 if (!token) 125 return false; 126 RunTask(token.Pass(), task.Pass()); 127 return true; 128} 129 130// static 131void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token, 132 SyncStatusCode status) { 133 DCHECK(token); 134 135 SyncTaskManager* manager = token->manager(); 136 if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 137 DCHECK(!manager); 138 SyncStatusCallback callback = token->callback(); 139 token->clear_callback(); 140 callback.Run(status); 141 return; 142 } 143 144 if (manager) 145 manager->NotifyTaskDoneBody(token.Pass(), status); 146} 147 148// static 149void SyncTaskManager::UpdateBlockingFactor( 150 scoped_ptr<SyncTaskToken> current_task_token, 151 scoped_ptr<BlockingFactor> blocking_factor, 152 const Continuation& continuation) { 153 DCHECK(current_task_token); 154 155 SyncTaskManager* manager = current_task_token->manager(); 156 if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 157 DCHECK(!manager); 158 continuation.Run(current_task_token.Pass()); 159 return; 160 } 161 162 if (!manager) 163 return; 164 165 scoped_ptr<SyncTaskToken> foreground_task_token; 166 scoped_ptr<SyncTaskToken> background_task_token; 167 scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog(); 168 if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID) 169 foreground_task_token = current_task_token.Pass(); 170 else 171 background_task_token = current_task_token.Pass(); 172 173 manager->UpdateBlockingFactorBody(foreground_task_token.Pass(), 174 background_task_token.Pass(), 175 task_log.Pass(), 176 blocking_factor.Pass(), 177 continuation); 178} 179 180bool SyncTaskManager::IsRunningTask(int64 token_id) const { 181 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 182 183 // If the client is gone, all task should be aborted. 184 if (!client_) 185 return false; 186 187 if (token_id == SyncTaskToken::kForegroundTaskTokenID) 188 return true; 189 190 return ContainsKey(running_background_tasks_, token_id); 191} 192 193void SyncTaskManager::DetachFromSequence() { 194 sequence_checker_.DetachFromSequence(); 195} 196 197void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token, 198 SyncStatusCode status) { 199 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 200 DCHECK(token); 201 202 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status 203 << " (" << SyncStatusCodeToString(status) << ")" 204 << " " << token_->location().ToString(); 205 206 if (token->blocking_factor()) { 207 dependency_manager_.Erase(token->blocking_factor()); 208 token->clear_blocking_factor(); 209 } 210 211 if (client_) { 212 if (token->has_task_log()) { 213 token->FinalizeTaskLog(SyncStatusCodeToString(status)); 214 client_->RecordTaskLog(token->PassTaskLog()); 215 } 216 } 217 218 scoped_ptr<SyncTask> task; 219 SyncStatusCallback callback = token->callback(); 220 token->clear_callback(); 221 if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) { 222 token_ = token.Pass(); 223 task = running_foreground_task_.Pass(); 224 } else { 225 task = running_background_tasks_.take_and_erase(token->token_id()); 226 } 227 228 bool task_used_network = false; 229 if (task) 230 task_used_network = task->used_network(); 231 232 if (client_) 233 client_->NotifyLastOperationStatus(status, task_used_network); 234 235 if (!callback.is_null()) 236 callback.Run(status); 237 238 StartNextTask(); 239} 240 241void SyncTaskManager::UpdateBlockingFactorBody( 242 scoped_ptr<SyncTaskToken> foreground_task_token, 243 scoped_ptr<SyncTaskToken> background_task_token, 244 scoped_ptr<TaskLogger::TaskLog> task_log, 245 scoped_ptr<BlockingFactor> blocking_factor, 246 const Continuation& continuation) { 247 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 248 249 // Run the task directly if the parallelization is disabled. 250 if (!maximum_background_task_) { 251 DCHECK(foreground_task_token); 252 DCHECK(!background_task_token); 253 foreground_task_token->SetTaskLog(task_log.Pass()); 254 continuation.Run(foreground_task_token.Pass()); 255 return; 256 } 257 258 // Clear existing |blocking_factor| from |dependency_manager_| before 259 // getting |foreground_task_token|, so that we can avoid dead lock. 260 if (background_task_token && background_task_token->blocking_factor()) { 261 dependency_manager_.Erase(background_task_token->blocking_factor()); 262 background_task_token->clear_blocking_factor(); 263 } 264 265 // Try to get |foreground_task_token|. If it's not available, wait for 266 // current foreground task to finish. 267 if (!foreground_task_token) { 268 DCHECK(background_task_token); 269 foreground_task_token = GetToken(background_task_token->location(), 270 SyncStatusCallback()); 271 if (!foreground_task_token) { 272 PushPendingTask( 273 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody, 274 AsWeakPtr(), 275 base::Passed(&foreground_task_token), 276 base::Passed(&background_task_token), 277 base::Passed(&task_log), 278 base::Passed(&blocking_factor), 279 continuation), 280 PRIORITY_HIGH); 281 StartNextTask(); 282 return; 283 } 284 } 285 286 // Check if the task can run as a background task now. 287 // If there are too many task running or any other task blocks current 288 // task, wait for any other task to finish. 289 bool task_number_limit_exceeded = 290 !background_task_token && 291 running_background_tasks_.size() >= maximum_background_task_; 292 if (task_number_limit_exceeded || 293 !dependency_manager_.Insert(blocking_factor.get())) { 294 DCHECK(!running_background_tasks_.empty()); 295 DCHECK(pending_backgrounding_task_.is_null()); 296 297 // Wait for NotifyTaskDone to release a |blocking_factor|. 298 pending_backgrounding_task_ = 299 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody, 300 AsWeakPtr(), 301 base::Passed(&foreground_task_token), 302 base::Passed(&background_task_token), 303 base::Passed(&task_log), 304 base::Passed(&blocking_factor), 305 continuation); 306 return; 307 } 308 309 if (background_task_token) { 310 background_task_token->set_blocking_factor(blocking_factor.Pass()); 311 } else { 312 tracked_objects::Location from_here = foreground_task_token->location(); 313 SyncStatusCallback callback = foreground_task_token->callback(); 314 foreground_task_token->clear_callback(); 315 316 background_task_token = 317 SyncTaskToken::CreateForBackgroundTask( 318 AsWeakPtr(), 319 task_token_seq_++, 320 blocking_factor.Pass()); 321 background_task_token->UpdateTask(from_here, callback); 322 running_background_tasks_.set(background_task_token->token_id(), 323 running_foreground_task_.Pass()); 324 } 325 326 token_ = foreground_task_token.Pass(); 327 StartNextTask(); 328 background_task_token->SetTaskLog(task_log.Pass()); 329 continuation.Run(background_task_token.Pass()); 330} 331 332scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken( 333 const tracked_objects::Location& from_here, 334 const SyncStatusCallback& callback) { 335 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 336 337 if (!token_) 338 return scoped_ptr<SyncTaskToken>(); 339 token_->UpdateTask(from_here, callback); 340 return token_.Pass(); 341} 342 343void SyncTaskManager::PushPendingTask( 344 const base::Closure& closure, Priority priority) { 345 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 346 347 pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++)); 348} 349 350void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token, 351 scoped_ptr<SyncTask> task) { 352 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 353 DCHECK(!running_foreground_task_); 354 355 running_foreground_task_ = task.Pass(); 356 running_foreground_task_->RunPreflight(token.Pass()); 357} 358 359void SyncTaskManager::StartNextTask() { 360 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 361 362 if (!pending_backgrounding_task_.is_null()) { 363 base::Closure closure = pending_backgrounding_task_; 364 pending_backgrounding_task_.Reset(); 365 closure.Run(); 366 return; 367 } 368 369 if (!pending_tasks_.empty()) { 370 base::Closure closure = pending_tasks_.top().task; 371 pending_tasks_.pop(); 372 closure.Run(); 373 return; 374 } 375 376 if (client_) 377 client_->MaybeScheduleNextTask(); 378} 379 380} // namespace drive_backend 381} // namespace sync_file_system 382