sync_task_manager.cc revision 46d4c2bc3267f3f028f39e7e311b0f89aba2e4fd
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/memory/scoped_ptr.h"
10#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
11#include "chrome/browser/sync_file_system/sync_file_metadata.h"
12
13using fileapi::FileSystemURL;
14
15namespace sync_file_system {
16namespace drive_backend {
17
18namespace {
19
20class SyncTaskAdapter : public ExclusiveTask {
21 public:
22  explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
23  virtual ~SyncTaskAdapter() {}
24
25  virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
26    task_.Run(callback);
27  }
28
29 private:
30  SyncTaskManager::Task task_;
31
32  DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
33};
34
35}  // namespace
36
37SyncTaskManager::PendingTask::PendingTask() {}
38
39SyncTaskManager::PendingTask::PendingTask(
40    const base::Closure& task, Priority pri, int seq)
41    : task(task), priority(pri), seq(seq) {}
42
43SyncTaskManager::PendingTask::~PendingTask() {}
44
45bool SyncTaskManager::PendingTaskComparator::operator()(
46    const PendingTask& left,
47    const PendingTask& right) const {
48  if (left.priority != right.priority)
49    return left.priority < right.priority;
50  return left.seq > right.seq;
51}
52
53SyncTaskManager::SyncTaskManager(
54    base::WeakPtr<Client> client,
55    size_t maximum_background_task)
56    : client_(client),
57      maximum_background_task_(maximum_background_task),
58      pending_task_seq_(0),
59      task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID) {
60}
61
62SyncTaskManager::~SyncTaskManager() {
63  client_.reset();
64  token_.reset();
65}
66
67void SyncTaskManager::Initialize(SyncStatusCode status) {
68  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
69  DCHECK(!token_);
70  NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()),
71                 status);
72}
73
74void SyncTaskManager::ScheduleTask(
75    const tracked_objects::Location& from_here,
76    const Task& task,
77    Priority priority,
78    const SyncStatusCallback& callback) {
79  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
80
81  ScheduleSyncTask(from_here,
82                   scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
83                   priority,
84                   callback);
85}
86
87void SyncTaskManager::ScheduleSyncTask(
88    const tracked_objects::Location& from_here,
89    scoped_ptr<SyncTask> task,
90    Priority priority,
91    const SyncStatusCallback& callback) {
92  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
93
94  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
95  if (!token) {
96    PushPendingTask(
97        base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
98                   base::Passed(&task), priority, callback),
99        priority);
100    return;
101  }
102  RunTask(token.Pass(), task.Pass());
103}
104
105bool SyncTaskManager::ScheduleTaskIfIdle(
106        const tracked_objects::Location& from_here,
107        const Task& task,
108        const SyncStatusCallback& callback) {
109  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
110
111  return ScheduleSyncTaskIfIdle(
112      from_here,
113      scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
114      callback);
115}
116
117bool SyncTaskManager::ScheduleSyncTaskIfIdle(
118    const tracked_objects::Location& from_here,
119    scoped_ptr<SyncTask> task,
120    const SyncStatusCallback& callback) {
121  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
122
123  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
124  if (!token)
125    return false;
126  RunTask(token.Pass(), task.Pass());
127  return true;
128}
129
130// static
131void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
132                                     SyncStatusCode status) {
133  DCHECK(token);
134
135  SyncTaskManager* manager = token->manager();
136  if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
137    DCHECK(!manager);
138    SyncStatusCallback callback = token->callback();
139    token->clear_callback();
140    callback.Run(status);
141    return;
142  }
143
144  if (manager)
145    manager->NotifyTaskDoneBody(token.Pass(), status);
146}
147
148// static
149void SyncTaskManager::UpdateBlockingFactor(
150    scoped_ptr<SyncTaskToken> current_task_token,
151    scoped_ptr<BlockingFactor> blocking_factor,
152    const Continuation& continuation) {
153  DCHECK(current_task_token);
154
155  SyncTaskManager* manager = current_task_token->manager();
156  if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
157    DCHECK(!manager);
158    continuation.Run(current_task_token.Pass());
159    return;
160  }
161
162  if (!manager)
163    return;
164
165  scoped_ptr<SyncTaskToken> foreground_task_token;
166  scoped_ptr<SyncTaskToken> background_task_token;
167  scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
168  if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
169    foreground_task_token = current_task_token.Pass();
170  else
171    background_task_token = current_task_token.Pass();
172
173  manager->UpdateBlockingFactorBody(foreground_task_token.Pass(),
174                                    background_task_token.Pass(),
175                                    task_log.Pass(),
176                                    blocking_factor.Pass(),
177                                    continuation);
178}
179
180bool SyncTaskManager::IsRunningTask(int64 token_id) const {
181  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
182
183  // If the client is gone, all task should be aborted.
184  if (!client_)
185    return false;
186
187  if (token_id == SyncTaskToken::kForegroundTaskTokenID)
188    return true;
189
190  return ContainsKey(running_background_tasks_, token_id);
191}
192
193void SyncTaskManager::DetachFromSequence() {
194  sequence_checker_.DetachFromSequence();
195}
196
197void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
198                                         SyncStatusCode status) {
199  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
200  DCHECK(token);
201
202  DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
203           << " (" << SyncStatusCodeToString(status) << ")"
204           << " " << token_->location().ToString();
205
206  if (token->blocking_factor()) {
207    dependency_manager_.Erase(token->blocking_factor());
208    token->clear_blocking_factor();
209  }
210
211  if (client_) {
212    if (token->has_task_log()) {
213      token->FinalizeTaskLog(SyncStatusCodeToString(status));
214      client_->RecordTaskLog(token->PassTaskLog());
215    }
216  }
217
218  scoped_ptr<SyncTask> task;
219  SyncStatusCallback callback = token->callback();
220  token->clear_callback();
221  if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
222    token_ = token.Pass();
223    task = running_foreground_task_.Pass();
224  } else {
225    task = running_background_tasks_.take_and_erase(token->token_id());
226  }
227
228  bool task_used_network = false;
229  if (task)
230    task_used_network = task->used_network();
231
232  if (client_)
233    client_->NotifyLastOperationStatus(status, task_used_network);
234
235  if (!callback.is_null())
236    callback.Run(status);
237
238  StartNextTask();
239}
240
241void SyncTaskManager::UpdateBlockingFactorBody(
242    scoped_ptr<SyncTaskToken> foreground_task_token,
243    scoped_ptr<SyncTaskToken> background_task_token,
244    scoped_ptr<TaskLogger::TaskLog> task_log,
245    scoped_ptr<BlockingFactor> blocking_factor,
246    const Continuation& continuation) {
247  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
248
249  // Run the task directly if the parallelization is disabled.
250  if (!maximum_background_task_) {
251    DCHECK(foreground_task_token);
252    DCHECK(!background_task_token);
253    foreground_task_token->SetTaskLog(task_log.Pass());
254    continuation.Run(foreground_task_token.Pass());
255    return;
256  }
257
258  // Clear existing |blocking_factor| from |dependency_manager_| before
259  // getting |foreground_task_token|, so that we can avoid dead lock.
260  if (background_task_token && background_task_token->blocking_factor()) {
261    dependency_manager_.Erase(background_task_token->blocking_factor());
262    background_task_token->clear_blocking_factor();
263  }
264
265  // Try to get |foreground_task_token|.  If it's not available, wait for
266  // current foreground task to finish.
267  if (!foreground_task_token) {
268    DCHECK(background_task_token);
269    foreground_task_token = GetToken(background_task_token->location(),
270                                     SyncStatusCallback());
271    if (!foreground_task_token) {
272      PushPendingTask(
273          base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
274                     AsWeakPtr(),
275                     base::Passed(&foreground_task_token),
276                     base::Passed(&background_task_token),
277                     base::Passed(&task_log),
278                     base::Passed(&blocking_factor),
279                     continuation),
280          PRIORITY_HIGH);
281      StartNextTask();
282      return;
283    }
284  }
285
286  // Check if the task can run as a background task now.
287  // If there are too many task running or any other task blocks current
288  // task, wait for any other task to finish.
289  bool task_number_limit_exceeded =
290      !background_task_token &&
291      running_background_tasks_.size() >= maximum_background_task_;
292  if (task_number_limit_exceeded ||
293      !dependency_manager_.Insert(blocking_factor.get())) {
294    DCHECK(!running_background_tasks_.empty());
295    DCHECK(pending_backgrounding_task_.is_null());
296
297    // Wait for NotifyTaskDone to release a |blocking_factor|.
298    pending_backgrounding_task_ =
299        base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
300                   AsWeakPtr(),
301                   base::Passed(&foreground_task_token),
302                   base::Passed(&background_task_token),
303                   base::Passed(&task_log),
304                   base::Passed(&blocking_factor),
305                   continuation);
306    return;
307  }
308
309  if (background_task_token) {
310    background_task_token->set_blocking_factor(blocking_factor.Pass());
311  } else {
312    tracked_objects::Location from_here = foreground_task_token->location();
313    SyncStatusCallback callback = foreground_task_token->callback();
314    foreground_task_token->clear_callback();
315
316    background_task_token =
317        SyncTaskToken::CreateForBackgroundTask(
318            AsWeakPtr(),
319            task_token_seq_++,
320            blocking_factor.Pass());
321    background_task_token->UpdateTask(from_here, callback);
322    running_background_tasks_.set(background_task_token->token_id(),
323                                  running_foreground_task_.Pass());
324  }
325
326  token_ = foreground_task_token.Pass();
327  StartNextTask();
328  background_task_token->SetTaskLog(task_log.Pass());
329  continuation.Run(background_task_token.Pass());
330}
331
332scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
333    const tracked_objects::Location& from_here,
334    const SyncStatusCallback& callback) {
335  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
336
337  if (!token_)
338    return scoped_ptr<SyncTaskToken>();
339  token_->UpdateTask(from_here, callback);
340  return token_.Pass();
341}
342
343void SyncTaskManager::PushPendingTask(
344    const base::Closure& closure, Priority priority) {
345  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
346
347  pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
348}
349
350void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
351                              scoped_ptr<SyncTask> task) {
352  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
353  DCHECK(!running_foreground_task_);
354
355  running_foreground_task_ = task.Pass();
356  running_foreground_task_->RunPreflight(token.Pass());
357}
358
359void SyncTaskManager::StartNextTask() {
360  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
361
362  if (!pending_backgrounding_task_.is_null()) {
363    base::Closure closure = pending_backgrounding_task_;
364    pending_backgrounding_task_.Reset();
365    closure.Run();
366    return;
367  }
368
369  if (!pending_tasks_.empty()) {
370    base::Closure closure = pending_tasks_.top().task;
371    pending_tasks_.pop();
372    closure.Run();
373    return;
374  }
375
376  if (client_)
377    client_->MaybeScheduleNextTask();
378}
379
380}  // namespace drive_backend
381}  // namespace sync_file_system
382