sync_task_manager.cc revision f8ee788a64d60abd8f2d742a5fdedde054ecd910
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h" 6 7#include "base/bind.h" 8#include "base/location.h" 9#include "base/memory/scoped_ptr.h" 10#include "chrome/browser/sync_file_system/drive_backend/sync_task.h" 11#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h" 12#include "chrome/browser/sync_file_system/sync_file_metadata.h" 13 14using fileapi::FileSystemURL; 15 16namespace sync_file_system { 17namespace drive_backend { 18 19namespace { 20 21class SyncTaskAdapter : public ExclusiveTask { 22 public: 23 explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {} 24 virtual ~SyncTaskAdapter() {} 25 26 virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE { 27 task_.Run(callback); 28 } 29 30 private: 31 SyncTaskManager::Task task_; 32 33 DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter); 34}; 35 36} // namespace 37 38SyncTaskManager::PendingTask::PendingTask() {} 39 40SyncTaskManager::PendingTask::PendingTask( 41 const base::Closure& task, Priority pri, int seq) 42 : task(task), priority(pri), seq(seq) {} 43 44SyncTaskManager::PendingTask::~PendingTask() {} 45 46bool SyncTaskManager::PendingTaskComparator::operator()( 47 const PendingTask& left, 48 const PendingTask& right) const { 49 if (left.priority != right.priority) 50 return left.priority < right.priority; 51 return left.seq > right.seq; 52} 53 54SyncTaskManager::SyncTaskManager( 55 base::WeakPtr<Client> client, 56 size_t maximum_background_task) 57 : client_(client), 58 maximum_background_task_(maximum_background_task), 59 pending_task_seq_(0), 60 task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID) { 61} 62 63SyncTaskManager::~SyncTaskManager() { 64 client_.reset(); 65 token_.reset(); 66} 67 68void SyncTaskManager::Initialize(SyncStatusCode status) { 69 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 70 DCHECK(!token_); 71 NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()), 72 status); 73} 74 75void SyncTaskManager::ScheduleTask( 76 const tracked_objects::Location& from_here, 77 const Task& task, 78 Priority priority, 79 const SyncStatusCallback& callback) { 80 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 81 82 ScheduleSyncTask(from_here, 83 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 84 priority, 85 callback); 86} 87 88void SyncTaskManager::ScheduleSyncTask( 89 const tracked_objects::Location& from_here, 90 scoped_ptr<SyncTask> task, 91 Priority priority, 92 const SyncStatusCallback& callback) { 93 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 94 95 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 96 if (!token) { 97 PushPendingTask( 98 base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here, 99 base::Passed(&task), priority, callback), 100 priority); 101 return; 102 } 103 RunTask(token.Pass(), task.Pass()); 104} 105 106bool SyncTaskManager::ScheduleTaskIfIdle( 107 const tracked_objects::Location& from_here, 108 const Task& task, 109 const SyncStatusCallback& callback) { 110 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 111 112 return ScheduleSyncTaskIfIdle( 113 from_here, 114 scoped_ptr<SyncTask>(new SyncTaskAdapter(task)), 115 callback); 116} 117 118bool SyncTaskManager::ScheduleSyncTaskIfIdle( 119 const tracked_objects::Location& from_here, 120 scoped_ptr<SyncTask> task, 121 const SyncStatusCallback& callback) { 122 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 123 124 scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback)); 125 if (!token) 126 return false; 127 RunTask(token.Pass(), task.Pass()); 128 return true; 129} 130 131// static 132void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token, 133 SyncStatusCode status) { 134 DCHECK(token); 135 136 SyncTaskManager* manager = token->manager(); 137 if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 138 DCHECK(!manager); 139 SyncStatusCallback callback = token->callback(); 140 token->clear_callback(); 141 callback.Run(status); 142 return; 143 } 144 145 if (manager) 146 manager->NotifyTaskDoneBody(token.Pass(), status); 147} 148 149// static 150void SyncTaskManager::UpdateBlockingFactor( 151 scoped_ptr<SyncTaskToken> current_task_token, 152 scoped_ptr<BlockingFactor> blocking_factor, 153 const Continuation& continuation) { 154 DCHECK(current_task_token); 155 156 SyncTaskManager* manager = current_task_token->manager(); 157 if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) { 158 DCHECK(!manager); 159 continuation.Run(current_task_token.Pass()); 160 return; 161 } 162 163 if (!manager) 164 return; 165 166 scoped_ptr<SyncTaskToken> foreground_task_token; 167 scoped_ptr<SyncTaskToken> background_task_token; 168 scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog(); 169 if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID) 170 foreground_task_token = current_task_token.Pass(); 171 else 172 background_task_token = current_task_token.Pass(); 173 174 manager->UpdateBlockingFactorBody(foreground_task_token.Pass(), 175 background_task_token.Pass(), 176 task_log.Pass(), 177 blocking_factor.Pass(), 178 continuation); 179} 180 181bool SyncTaskManager::IsRunningTask(int64 token_id) const { 182 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 183 184 // If the client is gone, all task should be aborted. 185 if (!client_) 186 return false; 187 188 if (token_id == SyncTaskToken::kForegroundTaskTokenID) 189 return true; 190 191 return ContainsKey(running_background_tasks_, token_id); 192} 193 194void SyncTaskManager::DetachFromSequence() { 195 sequence_checker_.DetachFromSequence(); 196} 197 198void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token, 199 SyncStatusCode status) { 200 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 201 DCHECK(token); 202 203 DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status 204 << " (" << SyncStatusCodeToString(status) << ")" 205 << " " << token_->location().ToString(); 206 207 if (token->blocking_factor()) { 208 dependency_manager_.Erase(token->blocking_factor()); 209 token->clear_blocking_factor(); 210 } 211 212 if (client_) { 213 if (token->has_task_log()) { 214 token->FinalizeTaskLog(SyncStatusCodeToString(status)); 215 client_->RecordTaskLog(token->PassTaskLog()); 216 } 217 } 218 219 scoped_ptr<SyncTask> task; 220 SyncStatusCallback callback = token->callback(); 221 token->clear_callback(); 222 if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) { 223 token_ = token.Pass(); 224 task = running_foreground_task_.Pass(); 225 } else { 226 task = running_background_tasks_.take_and_erase(token->token_id()); 227 } 228 229 bool task_used_network = false; 230 if (task) 231 task_used_network = task->used_network(); 232 233 if (client_) 234 client_->NotifyLastOperationStatus(status, task_used_network); 235 236 if (!callback.is_null()) 237 callback.Run(status); 238 239 StartNextTask(); 240} 241 242void SyncTaskManager::UpdateBlockingFactorBody( 243 scoped_ptr<SyncTaskToken> foreground_task_token, 244 scoped_ptr<SyncTaskToken> background_task_token, 245 scoped_ptr<TaskLogger::TaskLog> task_log, 246 scoped_ptr<BlockingFactor> blocking_factor, 247 const Continuation& continuation) { 248 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 249 250 // Run the task directly if the parallelization is disabled. 251 if (!maximum_background_task_) { 252 DCHECK(foreground_task_token); 253 DCHECK(!background_task_token); 254 foreground_task_token->SetTaskLog(task_log.Pass()); 255 continuation.Run(foreground_task_token.Pass()); 256 return; 257 } 258 259 // Clear existing |blocking_factor| from |dependency_manager_| before 260 // getting |foreground_task_token|, so that we can avoid dead lock. 261 if (background_task_token && background_task_token->blocking_factor()) { 262 dependency_manager_.Erase(background_task_token->blocking_factor()); 263 background_task_token->clear_blocking_factor(); 264 } 265 266 // Try to get |foreground_task_token|. If it's not available, wait for 267 // current foreground task to finish. 268 if (!foreground_task_token) { 269 DCHECK(background_task_token); 270 foreground_task_token = GetToken(background_task_token->location(), 271 SyncStatusCallback()); 272 if (!foreground_task_token) { 273 PushPendingTask( 274 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody, 275 AsWeakPtr(), 276 base::Passed(&foreground_task_token), 277 base::Passed(&background_task_token), 278 base::Passed(&task_log), 279 base::Passed(&blocking_factor), 280 continuation), 281 PRIORITY_HIGH); 282 StartNextTask(); 283 return; 284 } 285 } 286 287 // Check if the task can run as a background task now. 288 // If there are too many task running or any other task blocks current 289 // task, wait for any other task to finish. 290 bool task_number_limit_exceeded = 291 !background_task_token && 292 running_background_tasks_.size() >= maximum_background_task_; 293 if (task_number_limit_exceeded || 294 !dependency_manager_.Insert(blocking_factor.get())) { 295 DCHECK(!running_background_tasks_.empty()); 296 DCHECK(pending_backgrounding_task_.is_null()); 297 298 // Wait for NotifyTaskDone to release a |blocking_factor|. 299 pending_backgrounding_task_ = 300 base::Bind(&SyncTaskManager::UpdateBlockingFactorBody, 301 AsWeakPtr(), 302 base::Passed(&foreground_task_token), 303 base::Passed(&background_task_token), 304 base::Passed(&task_log), 305 base::Passed(&blocking_factor), 306 continuation); 307 return; 308 } 309 310 if (background_task_token) { 311 background_task_token->set_blocking_factor(blocking_factor.Pass()); 312 } else { 313 tracked_objects::Location from_here = foreground_task_token->location(); 314 SyncStatusCallback callback = foreground_task_token->callback(); 315 foreground_task_token->clear_callback(); 316 317 background_task_token = 318 SyncTaskToken::CreateForBackgroundTask( 319 AsWeakPtr(), 320 task_token_seq_++, 321 blocking_factor.Pass()); 322 background_task_token->UpdateTask(from_here, callback); 323 running_background_tasks_.set(background_task_token->token_id(), 324 running_foreground_task_.Pass()); 325 } 326 327 token_ = foreground_task_token.Pass(); 328 StartNextTask(); 329 background_task_token->SetTaskLog(task_log.Pass()); 330 continuation.Run(background_task_token.Pass()); 331} 332 333scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken( 334 const tracked_objects::Location& from_here, 335 const SyncStatusCallback& callback) { 336 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 337 338 if (!token_) 339 return scoped_ptr<SyncTaskToken>(); 340 token_->UpdateTask(from_here, callback); 341 return token_.Pass(); 342} 343 344void SyncTaskManager::PushPendingTask( 345 const base::Closure& closure, Priority priority) { 346 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 347 348 pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++)); 349} 350 351void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token, 352 scoped_ptr<SyncTask> task) { 353 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 354 DCHECK(!running_foreground_task_); 355 356 running_foreground_task_ = task.Pass(); 357 running_foreground_task_->RunPreflight(token.Pass()); 358} 359 360void SyncTaskManager::StartNextTask() { 361 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 362 363 if (!pending_backgrounding_task_.is_null()) { 364 base::Closure closure = pending_backgrounding_task_; 365 pending_backgrounding_task_.Reset(); 366 closure.Run(); 367 return; 368 } 369 370 if (!pending_tasks_.empty()) { 371 base::Closure closure = pending_tasks_.top().task; 372 pending_tasks_.pop(); 373 closure.Run(); 374 return; 375 } 376 377 if (client_) 378 client_->MaybeScheduleNextTask(); 379} 380 381} // namespace drive_backend 382} // namespace sync_file_system 383