sync_task_manager.cc revision 03b57e008b61dfcb1fbad3aea950ae0e001748b0
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/memory/scoped_ptr.h"
10#include "base/sequenced_task_runner.h"
11#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
12#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
13#include "chrome/browser/sync_file_system/sync_file_metadata.h"
14
15using storage::FileSystemURL;
16
17namespace sync_file_system {
18namespace drive_backend {
19
20namespace {
21
22class SyncTaskAdapter : public ExclusiveTask {
23 public:
24  explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
25  virtual ~SyncTaskAdapter() {}
26
27  virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
28    task_.Run(callback);
29  }
30
31 private:
32  SyncTaskManager::Task task_;
33
34  DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
35};
36
37}  // namespace
38
39SyncTaskManager::PendingTask::PendingTask() {}
40
41SyncTaskManager::PendingTask::PendingTask(
42    const base::Closure& task, Priority pri, int seq)
43    : task(task), priority(pri), seq(seq) {}
44
45SyncTaskManager::PendingTask::~PendingTask() {}
46
47bool SyncTaskManager::PendingTaskComparator::operator()(
48    const PendingTask& left,
49    const PendingTask& right) const {
50  if (left.priority != right.priority)
51    return left.priority < right.priority;
52  return left.seq > right.seq;
53}
54
55SyncTaskManager::SyncTaskManager(
56    base::WeakPtr<Client> client,
57    size_t maximum_background_task,
58    base::SequencedTaskRunner* task_runner)
59    : client_(client),
60      maximum_background_task_(maximum_background_task),
61      pending_task_seq_(0),
62      task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID),
63      task_runner_(task_runner) {
64}
65
66SyncTaskManager::~SyncTaskManager() {
67  client_.reset();
68  token_.reset();
69}
70
71void SyncTaskManager::Initialize(SyncStatusCode status) {
72  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
73  DCHECK(!token_);
74  NotifyTaskDone(
75      SyncTaskToken::CreateForForegroundTask(AsWeakPtr(), task_runner_),
76      status);
77}
78
79void SyncTaskManager::ScheduleTask(
80    const tracked_objects::Location& from_here,
81    const Task& task,
82    Priority priority,
83    const SyncStatusCallback& callback) {
84  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
85
86  ScheduleSyncTask(from_here,
87                   scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
88                   priority,
89                   callback);
90}
91
92void SyncTaskManager::ScheduleSyncTask(
93    const tracked_objects::Location& from_here,
94    scoped_ptr<SyncTask> task,
95    Priority priority,
96    const SyncStatusCallback& callback) {
97  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
98
99  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
100  if (!token) {
101    PushPendingTask(
102        base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
103                   base::Passed(&task), priority, callback),
104        priority);
105    return;
106  }
107  RunTask(token.Pass(), task.Pass());
108}
109
110bool SyncTaskManager::ScheduleTaskIfIdle(
111        const tracked_objects::Location& from_here,
112        const Task& task,
113        const SyncStatusCallback& callback) {
114  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
115
116  return ScheduleSyncTaskIfIdle(
117      from_here,
118      scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
119      callback);
120}
121
122bool SyncTaskManager::ScheduleSyncTaskIfIdle(
123    const tracked_objects::Location& from_here,
124    scoped_ptr<SyncTask> task,
125    const SyncStatusCallback& callback) {
126  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
127
128  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
129  if (!token)
130    return false;
131  RunTask(token.Pass(), task.Pass());
132  return true;
133}
134
135// static
136void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
137                                     SyncStatusCode status) {
138  DCHECK(token);
139
140  SyncTaskManager* manager = token->manager();
141  if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
142    DCHECK(!manager);
143    SyncStatusCallback callback = token->callback();
144    token->clear_callback();
145    callback.Run(status);
146    return;
147  }
148
149  if (manager)
150    manager->NotifyTaskDoneBody(token.Pass(), status);
151}
152
153// static
154void SyncTaskManager::UpdateBlockingFactor(
155    scoped_ptr<SyncTaskToken> current_task_token,
156    scoped_ptr<BlockingFactor> blocking_factor,
157    const Continuation& continuation) {
158  DCHECK(current_task_token);
159
160  SyncTaskManager* manager = current_task_token->manager();
161  if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
162    DCHECK(!manager);
163    continuation.Run(current_task_token.Pass());
164    return;
165  }
166
167  if (!manager)
168    return;
169
170  scoped_ptr<SyncTaskToken> foreground_task_token;
171  scoped_ptr<SyncTaskToken> background_task_token;
172  scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
173  if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
174    foreground_task_token = current_task_token.Pass();
175  else
176    background_task_token = current_task_token.Pass();
177
178  manager->UpdateBlockingFactorBody(foreground_task_token.Pass(),
179                                    background_task_token.Pass(),
180                                    task_log.Pass(),
181                                    blocking_factor.Pass(),
182                                    continuation);
183}
184
185bool SyncTaskManager::IsRunningTask(int64 token_id) const {
186  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
187
188  // If the client is gone, all task should be aborted.
189  if (!client_)
190    return false;
191
192  if (token_id == SyncTaskToken::kForegroundTaskTokenID)
193    return true;
194
195  return ContainsKey(running_background_tasks_, token_id);
196}
197
198void SyncTaskManager::DetachFromSequence() {
199  sequence_checker_.DetachFromSequence();
200}
201
202void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
203                                         SyncStatusCode status) {
204  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
205  DCHECK(token);
206
207  DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
208           << " (" << SyncStatusCodeToString(status) << ")"
209           << " " << token->location().ToString();
210
211  if (token->blocking_factor()) {
212    dependency_manager_.Erase(token->blocking_factor());
213    token->clear_blocking_factor();
214  }
215
216  if (client_) {
217    if (token->has_task_log()) {
218      token->FinalizeTaskLog(SyncStatusCodeToString(status));
219      client_->RecordTaskLog(token->PassTaskLog());
220    }
221  }
222
223  scoped_ptr<SyncTask> task;
224  SyncStatusCallback callback = token->callback();
225  token->clear_callback();
226  if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
227    token_ = token.Pass();
228    task = running_foreground_task_.Pass();
229  } else {
230    task = running_background_tasks_.take_and_erase(token->token_id());
231  }
232
233  // Acquire the token to prevent a new task to jump into the queue.
234  token = token_.Pass();
235
236  bool task_used_network = false;
237  if (task)
238    task_used_network = task->used_network();
239
240  if (client_)
241    client_->NotifyLastOperationStatus(status, task_used_network);
242
243  if (!callback.is_null())
244    callback.Run(status);
245
246  // Post MaybeStartNextForegroundTask rather than calling it directly to avoid
247  // making the call-chaing longer.
248  task_runner_->PostTask(
249      FROM_HERE,
250      base::Bind(&SyncTaskManager::MaybeStartNextForegroundTask,
251                 AsWeakPtr(), base::Passed(&token)));
252}
253
254void SyncTaskManager::UpdateBlockingFactorBody(
255    scoped_ptr<SyncTaskToken> foreground_task_token,
256    scoped_ptr<SyncTaskToken> background_task_token,
257    scoped_ptr<TaskLogger::TaskLog> task_log,
258    scoped_ptr<BlockingFactor> blocking_factor,
259    const Continuation& continuation) {
260  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
261
262  // Run the task directly if the parallelization is disabled.
263  if (!maximum_background_task_) {
264    DCHECK(foreground_task_token);
265    DCHECK(!background_task_token);
266    foreground_task_token->SetTaskLog(task_log.Pass());
267    continuation.Run(foreground_task_token.Pass());
268    return;
269  }
270
271  // Clear existing |blocking_factor| from |dependency_manager_| before
272  // getting |foreground_task_token|, so that we can avoid dead lock.
273  if (background_task_token && background_task_token->blocking_factor()) {
274    dependency_manager_.Erase(background_task_token->blocking_factor());
275    background_task_token->clear_blocking_factor();
276  }
277
278  // Try to get |foreground_task_token|.  If it's not available, wait for
279  // current foreground task to finish.
280  if (!foreground_task_token) {
281    DCHECK(background_task_token);
282    foreground_task_token = GetToken(background_task_token->location(),
283                                     SyncStatusCallback());
284    if (!foreground_task_token) {
285      PushPendingTask(
286          base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
287                     AsWeakPtr(),
288                     base::Passed(&foreground_task_token),
289                     base::Passed(&background_task_token),
290                     base::Passed(&task_log),
291                     base::Passed(&blocking_factor),
292                     continuation),
293          PRIORITY_HIGH);
294      MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
295      return;
296    }
297  }
298
299  // Check if the task can run as a background task now.
300  // If there are too many task running or any other task blocks current
301  // task, wait for any other task to finish.
302  bool task_number_limit_exceeded =
303      !background_task_token &&
304      running_background_tasks_.size() >= maximum_background_task_;
305  if (task_number_limit_exceeded ||
306      !dependency_manager_.Insert(blocking_factor.get())) {
307    DCHECK(!running_background_tasks_.empty());
308    DCHECK(pending_backgrounding_task_.is_null());
309
310    // Wait for NotifyTaskDone to release a |blocking_factor|.
311    pending_backgrounding_task_ =
312        base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
313                   AsWeakPtr(),
314                   base::Passed(&foreground_task_token),
315                   base::Passed(&background_task_token),
316                   base::Passed(&task_log),
317                   base::Passed(&blocking_factor),
318                   continuation);
319    return;
320  }
321
322  if (background_task_token) {
323    background_task_token->set_blocking_factor(blocking_factor.Pass());
324  } else {
325    tracked_objects::Location from_here = foreground_task_token->location();
326    SyncStatusCallback callback = foreground_task_token->callback();
327    foreground_task_token->clear_callback();
328
329    background_task_token =
330        SyncTaskToken::CreateForBackgroundTask(
331            AsWeakPtr(),
332            task_runner_,
333            task_token_seq_++,
334            blocking_factor.Pass());
335    background_task_token->UpdateTask(from_here, callback);
336    running_background_tasks_.set(background_task_token->token_id(),
337                                  running_foreground_task_.Pass());
338  }
339
340  token_ = foreground_task_token.Pass();
341  MaybeStartNextForegroundTask(scoped_ptr<SyncTaskToken>());
342  background_task_token->SetTaskLog(task_log.Pass());
343  continuation.Run(background_task_token.Pass());
344}
345
346scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
347    const tracked_objects::Location& from_here,
348    const SyncStatusCallback& callback) {
349  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
350
351  if (!token_)
352    return scoped_ptr<SyncTaskToken>();
353  token_->UpdateTask(from_here, callback);
354  return token_.Pass();
355}
356
357void SyncTaskManager::PushPendingTask(
358    const base::Closure& closure, Priority priority) {
359  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
360
361  pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
362}
363
364void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
365                              scoped_ptr<SyncTask> task) {
366  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
367  DCHECK(!running_foreground_task_);
368
369  running_foreground_task_ = task.Pass();
370  running_foreground_task_->RunPreflight(token.Pass());
371}
372
373void SyncTaskManager::MaybeStartNextForegroundTask(
374    scoped_ptr<SyncTaskToken> token) {
375  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
376
377  if (token) {
378    DCHECK(!token_);
379    token_ = token.Pass();
380  }
381
382  if (!pending_backgrounding_task_.is_null()) {
383    base::Closure closure = pending_backgrounding_task_;
384    pending_backgrounding_task_.Reset();
385    closure.Run();
386    return;
387  }
388
389  if (!token_)
390    return;
391
392  if (!pending_tasks_.empty()) {
393    base::Closure closure = pending_tasks_.top().task;
394    pending_tasks_.pop();
395    closure.Run();
396    return;
397  }
398
399  if (client_)
400    client_->MaybeScheduleNextTask();
401}
402
403}  // namespace drive_backend
404}  // namespace sync_file_system
405