sync_task_manager.cc revision 1320f92c476a1ad9d19dba2a48c72b75566198e9
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/memory/scoped_ptr.h"
10#include "base/sequenced_task_runner.h"
11#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
12#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
13#include "chrome/browser/sync_file_system/sync_file_metadata.h"
14
15using storage::FileSystemURL;
16
17namespace sync_file_system {
18namespace drive_backend {
19
20namespace {
21
22class SyncTaskAdapter : public ExclusiveTask {
23 public:
24  explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
25  virtual ~SyncTaskAdapter() {}
26
27  virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
28    task_.Run(callback);
29  }
30
31 private:
32  SyncTaskManager::Task task_;
33
34  DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
35};
36
37}  // namespace
38
39SyncTaskManager::PendingTask::PendingTask() {}
40
41SyncTaskManager::PendingTask::PendingTask(
42    const base::Closure& task, Priority pri, int seq)
43    : task(task), priority(pri), seq(seq) {}
44
45SyncTaskManager::PendingTask::~PendingTask() {}
46
47bool SyncTaskManager::PendingTaskComparator::operator()(
48    const PendingTask& left,
49    const PendingTask& right) const {
50  if (left.priority != right.priority)
51    return left.priority < right.priority;
52  return left.seq > right.seq;
53}
54
55SyncTaskManager::SyncTaskManager(
56    base::WeakPtr<Client> client,
57    size_t maximum_background_task,
58    const scoped_refptr<base::SequencedTaskRunner>& task_runner)
59    : client_(client),
60      maximum_background_task_(maximum_background_task),
61      pending_task_seq_(0),
62      task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
63      task_runner_(task_runner) {
64}
65
66SyncTaskManager::~SyncTaskManager() {
67  client_.reset();
68  token_.reset();
69}
70
71void SyncTaskManager::Initialize(SyncStatusCode status) {
72  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
73  DCHECK(!token_);
74  NotifyTaskDone(
75      SyncTaskToken::CreateForForegroundTask(AsWeakPtr(), task_runner_.get()),
76      status);
77}
78
79void SyncTaskManager::ScheduleTask(
80    const tracked_objects::Location& from_here,
81    const Task& task,
82    Priority priority,
83    const SyncStatusCallback& callback) {
84  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
85
86  ScheduleSyncTask(from_here,
87                   scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
88                   priority,
89                   callback);
90}
91
92void SyncTaskManager::ScheduleSyncTask(
93    const tracked_objects::Location& from_here,
94    scoped_ptr<SyncTask> task,
95    Priority priority,
96    const SyncStatusCallback& callback) {
97  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
98
99  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
100  if (!token) {
101    PushPendingTask(
102        base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
103                   base::Passed(&task), priority, callback),
104        priority);
105    return;
106  }
107  RunTask(token.Pass(), task.Pass());
108}
109
110bool SyncTaskManager::ScheduleTaskIfIdle(
111        const tracked_objects::Location& from_here,
112        const Task& task,
113        const SyncStatusCallback& callback) {
114  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
115
116  return ScheduleSyncTaskIfIdle(
117      from_here,
118      scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
119      callback);
120}
121
122bool SyncTaskManager::ScheduleSyncTaskIfIdle(
123    const tracked_objects::Location& from_here,
124    scoped_ptr<SyncTask> task,
125    const SyncStatusCallback& callback) {
126  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
127
128  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
129  if (!token)
130    return false;
131  RunTask(token.Pass(), task.Pass());
132  return true;
133}
134
135// static
136void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
137                                     SyncStatusCode status) {
138  DCHECK(token);
139
140  SyncTaskManager* manager = token->manager();
141  if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
142    DCHECK(!manager);
143    SyncStatusCallback callback = token->callback();
144    token->clear_callback();
145    callback.Run(status);
146    return;
147  }
148
149  if (manager)
150    manager->NotifyTaskDoneBody(token.Pass(), status);
151}
152
153// static
154void SyncTaskManager::UpdateTaskBlocker(
155    scoped_ptr<SyncTaskToken> current_task_token,
156    scoped_ptr<TaskBlocker> task_blocker,
157    const Continuation& continuation) {
158  DCHECK(current_task_token);
159
160  SyncTaskManager* manager = current_task_token->manager();
161  if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
162    DCHECK(!manager);
163    continuation.Run(current_task_token.Pass());
164    return;
165  }
166
167  if (!manager)
168    return;
169
170  scoped_ptr<SyncTaskToken> foreground_task_token;
171  scoped_ptr<SyncTaskToken> background_task_token;
172  scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
173  if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
174    foreground_task_token = current_task_token.Pass();
175  else
176    background_task_token = current_task_token.Pass();
177
178  manager->UpdateTaskBlockerBody(foreground_task_token.Pass(),
179                                 background_task_token.Pass(),
180                                 task_log.Pass(),
181                                 task_blocker.Pass(),
182                                 continuation);
183}
184
185bool SyncTaskManager::IsRunningTask(int64 token_id) const {
186  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
187
188  // If the client is gone, all task should be aborted.
189  if (!client_)
190    return false;
191
192  if (token_id == SyncTaskToken::kForegroundTaskTokenID)
193    return true;
194
195  return ContainsKey(running_background_tasks_, token_id);
196}
197
198void SyncTaskManager::DetachFromSequence() {
199  sequence_checker_.DetachFromSequence();
200}
201
202void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
203                                         SyncStatusCode status) {
204  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
205  DCHECK(token);
206
207  DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
208           << " (" << SyncStatusCodeToString(status) << ")"
209           << " " << token->location().ToString();
210
211  if (token->task_blocker()) {
212    dependency_manager_.Erase(token->task_blocker());
213    token->clear_task_blocker();
214  }
215
216  if (client_) {
217    if (token->has_task_log()) {
218      token->FinalizeTaskLog(SyncStatusCodeToString(status));
219      client_->RecordTaskLog(token->PassTaskLog());
220    }
221  }
222
223  scoped_ptr<SyncTask> task;
224  SyncStatusCallback callback = token->callback();
225  token->clear_callback();
226  if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
227    token_ = token.Pass();
228    task = running_foreground_task_.Pass();
229  } else {
230    task = running_background_tasks_.take_and_erase(token->token_id());
231  }
232
233  // Acquire the token to prevent a new task to jump into the queue.
234  token = token_.Pass();
235
236  bool task_used_network = false;
237  if (task)
238    task_used_network = task->used_network();
239
240  if (client_)
241    client_->NotifyLastOperationStatus(status, task_used_network);
242
243  if (!callback.is_null())
244    callback.Run(status);
245
246  // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
247  // making the call-chaing longer.
248  task_runner_->PostTask(
249      FROM_HERE,
250      base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask,
251                 AsWeakPtr(), base::Passed(&token)));
252}
253
254void SyncTaskManager::UpdateTaskBlockerBody(
255    scoped_ptr<SyncTaskToken> foreground_task_token,
256    scoped_ptr<SyncTaskToken> background_task_token,
257    scoped_ptr<TaskLogger::TaskLog> task_log,
258    scoped_ptr<TaskBlocker> task_blocker,
259    const Continuation& continuation) {
260  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
261
262  // Run the task directly if the parallelization is disabled.
263  if (!maximum_background_task_) {
264    DCHECK(foreground_task_token);
265    DCHECK(!background_task_token);
266    foreground_task_token->SetTaskLog(task_log.Pass());
267    continuation.Run(foreground_task_token.Pass());
268    return;
269  }
270
271  // Clear existing |task_blocker| from |dependency_manager_| before
272  // getting |foreground_task_token|, so that we can avoid dead lock.
273  if (background_task_token && background_task_token->task_blocker()) {
274    dependency_manager_.Erase(background_task_token->task_blocker());
275    background_task_token->clear_task_blocker();
276  }
277
278  // Try to get |foreground_task_token|.  If it's not available, wait for
279  // current foreground task to finish.
280  if (!foreground_task_token) {
281    DCHECK(background_task_token);
282    foreground_task_token = GetToken(background_task_token->location(),
283                                     SyncStatusCallback());
284    if (!foreground_task_token) {
285      PushPendingTask(
286          base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
287                     AsWeakPtr(),
288                     base::Passed(&foreground_task_token),
289                     base::Passed(&background_task_token),
290                     base::Passed(&task_log),
291                     base::Passed(&task_blocker),
292                     continuation),
293          PRIORITY_HIGH);
294      MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
295      return;
296    }
297  }
298
299  // Check if the task can run as a background task now.
300  // If there are too many task running or any other task blocks current
301  // task, wait for any other task to finish.
302  bool task_number_limit_exceeded =
303      !background_task_token &&
304      running_background_tasks_.size() >= maximum_background_task_;
305  if (task_number_limit_exceeded ||
306      !dependency_manager_.Insert(task_blocker.get())) {
307    DCHECK(!running_background_tasks_.empty());
308    DCHECK(pending_backgrounding_task_.is_null());
309
310    // Wait for NotifyTaskDone to release a |task_blocker|.
311    pending_backgrounding_task_ =
312        base::Bind(&SyncTaskManager::UpdateTaskBlockerBody,
313                   AsWeakPtr(),
314                   base::Passed(&foreground_task_token),
315                   base::Passed(&background_task_token),
316                   base::Passed(&task_log),
317                   base::Passed(&task_blocker),
318                   continuation);
319    return;
320  }
321
322  if (background_task_token) {
323    background_task_token->set_task_blocker(task_blocker.Pass());
324  } else {
325    tracked_objects::Location from_here = foreground_task_token->location();
326    SyncStatusCallback callback = foreground_task_token->callback();
327    foreground_task_token->clear_callback();
328
329    background_task_token =
330        SyncTaskToken::CreateForBackgroundTask(AsWeakPtr(),
331                                               task_runner_.get(),
332                                               task_token_seq_++,
333                                               task_blocker.Pass());
334    background_task_token->UpdateTask(from_here, callback);
335    running_background_tasks_.set(background_task_token->token_id(),
336                                  running_foreground_task_.Pass());
337  }
338
339  token_ = foreground_task_token.Pass();
340  MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
341  background_task_token->SetTaskLog(task_log.Pass());
342  continuation.Run(background_task_token.Pass());
343}
344
345scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
346    const tracked_objects::Location& from_here,
347    const SyncStatusCallback& callback) {
348  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
349
350  if (!token_)
351    return scoped_ptr<SyncTaskToken>();
352  token_->UpdateTask(from_here, callback);
353  return token_.Pass();
354}
355
356void SyncTaskManager::PushPendingTask(
357    const base::Closure& closure, Priority priority) {
358  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
359
360  pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
361}
362
363void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
364                              scoped_ptr<SyncTask> task) {
365  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
366  DCHECK(!running_foreground_task_);
367
368  running_foreground_task_ = task.Pass();
369  running_foreground_task_->RunPreflight(token.Pass());
370}
371
372void SyncTaskManager::MaybeStartNextForegroundTask(
373    scoped_ptr<SyncTaskToken> token) {
374  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
375
376  if (token) {
377    DCHECK(!token_);
378    token_ = token.Pass();
379  }
380
381  if (!pending_backgrounding_task_.is_null()) {
382    base::Closure closure = pending_backgrounding_task_;
383    pending_backgrounding_task_.Reset();
384    closure.Run();
385    return;
386  }
387
388  if (!token_)
389    return;
390
391  if (!pending_tasks_.empty()) {
392    base::Closure closure = pending_tasks_.top().task;
393    pending_tasks_.pop();
394    closure.Run();
395    return;
396  }
397
398  if (client_)
399    client_->MaybeScheduleNextTask();
400}
401
402}  // namespace drive_backend
403}  // namespace sync_file_system
404