sync_task_manager.cc revision f8ee788a64d60abd8f2d742a5fdedde054ecd910
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/memory/scoped_ptr.h"
10#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
11#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
12#include "chrome/browser/sync_file_system/sync_file_metadata.h"
13
14using fileapi::FileSystemURL;
15
16namespace sync_file_system {
17namespace drive_backend {
18
19namespace {
20
21class SyncTaskAdapter : public ExclusiveTask {
22 public:
23  explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
24  virtual ~SyncTaskAdapter() {}
25
26  virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
27    task_.Run(callback);
28  }
29
30 private:
31  SyncTaskManager::Task task_;
32
33  DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
34};
35
36}  // namespace
37
38SyncTaskManager::PendingTask::PendingTask() {}
39
40SyncTaskManager::PendingTask::PendingTask(
41    const base::Closure& task, Priority pri, int seq)
42    : task(task), priority(pri), seq(seq) {}
43
44SyncTaskManager::PendingTask::~PendingTask() {}
45
46bool SyncTaskManager::PendingTaskComparator::operator()(
47    const PendingTask& left,
48    const PendingTask& right) const {
49  if (left.priority != right.priority)
50    return left.priority < right.priority;
51  return left.seq > right.seq;
52}
53
54SyncTaskManager::SyncTaskManager(
55    base::WeakPtr<Client> client,
56    size_t maximum_background_task)
57    : client_(client),
58      maximum_background_task_(maximum_background_task),
59      pending_task_seq_(0),
60      task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID) {
61}
62
63SyncTaskManager::~SyncTaskManager() {
64  client_.reset();
65  token_.reset();
66}
67
68void SyncTaskManager::Initialize(SyncStatusCode status) {
69  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
70  DCHECK(!token_);
71  NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()),
72                 status);
73}
74
75void SyncTaskManager::ScheduleTask(
76    const tracked_objects::Location& from_here,
77    const Task& task,
78    Priority priority,
79    const SyncStatusCallback& callback) {
80  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
81
82  ScheduleSyncTask(from_here,
83                   scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
84                   priority,
85                   callback);
86}
87
88void SyncTaskManager::ScheduleSyncTask(
89    const tracked_objects::Location& from_here,
90    scoped_ptr<SyncTask> task,
91    Priority priority,
92    const SyncStatusCallback& callback) {
93  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
94
95  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
96  if (!token) {
97    PushPendingTask(
98        base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
99                   base::Passed(&task), priority, callback),
100        priority);
101    return;
102  }
103  RunTask(token.Pass(), task.Pass());
104}
105
106bool SyncTaskManager::ScheduleTaskIfIdle(
107        const tracked_objects::Location& from_here,
108        const Task& task,
109        const SyncStatusCallback& callback) {
110  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
111
112  return ScheduleSyncTaskIfIdle(
113      from_here,
114      scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
115      callback);
116}
117
118bool SyncTaskManager::ScheduleSyncTaskIfIdle(
119    const tracked_objects::Location& from_here,
120    scoped_ptr<SyncTask> task,
121    const SyncStatusCallback& callback) {
122  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
123
124  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
125  if (!token)
126    return false;
127  RunTask(token.Pass(), task.Pass());
128  return true;
129}
130
131// static
132void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
133                                     SyncStatusCode status) {
134  DCHECK(token);
135
136  SyncTaskManager* manager = token->manager();
137  if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
138    DCHECK(!manager);
139    SyncStatusCallback callback = token->callback();
140    token->clear_callback();
141    callback.Run(status);
142    return;
143  }
144
145  if (manager)
146    manager->NotifyTaskDoneBody(token.Pass(), status);
147}
148
149// static
150void SyncTaskManager::UpdateBlockingFactor(
151    scoped_ptr<SyncTaskToken> current_task_token,
152    scoped_ptr<BlockingFactor> blocking_factor,
153    const Continuation& continuation) {
154  DCHECK(current_task_token);
155
156  SyncTaskManager* manager = current_task_token->manager();
157  if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
158    DCHECK(!manager);
159    continuation.Run(current_task_token.Pass());
160    return;
161  }
162
163  if (!manager)
164    return;
165
166  scoped_ptr<SyncTaskToken> foreground_task_token;
167  scoped_ptr<SyncTaskToken> background_task_token;
168  scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
169  if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
170    foreground_task_token = current_task_token.Pass();
171  else
172    background_task_token = current_task_token.Pass();
173
174  manager->UpdateBlockingFactorBody(foreground_task_token.Pass(),
175                                    background_task_token.Pass(),
176                                    task_log.Pass(),
177                                    blocking_factor.Pass(),
178                                    continuation);
179}
180
181bool SyncTaskManager::IsRunningTask(int64 token_id) const {
182  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
183
184  // If the client is gone, all task should be aborted.
185  if (!client_)
186    return false;
187
188  if (token_id == SyncTaskToken::kForegroundTaskTokenID)
189    return true;
190
191  return ContainsKey(running_background_tasks_, token_id);
192}
193
194void SyncTaskManager::DetachFromSequence() {
195  sequence_checker_.DetachFromSequence();
196}
197
198void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
199                                         SyncStatusCode status) {
200  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
201  DCHECK(token);
202
203  DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
204           << " (" << SyncStatusCodeToString(status) << ")"
205           << " " << token_->location().ToString();
206
207  if (token->blocking_factor()) {
208    dependency_manager_.Erase(token->blocking_factor());
209    token->clear_blocking_factor();
210  }
211
212  if (client_) {
213    if (token->has_task_log()) {
214      token->FinalizeTaskLog(SyncStatusCodeToString(status));
215      client_->RecordTaskLog(token->PassTaskLog());
216    }
217  }
218
219  scoped_ptr<SyncTask> task;
220  SyncStatusCallback callback = token->callback();
221  token->clear_callback();
222  if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
223    token_ = token.Pass();
224    task = running_foreground_task_.Pass();
225  } else {
226    task = running_background_tasks_.take_and_erase(token->token_id());
227  }
228
229  bool task_used_network = false;
230  if (task)
231    task_used_network = task->used_network();
232
233  if (client_)
234    client_->NotifyLastOperationStatus(status, task_used_network);
235
236  if (!callback.is_null())
237    callback.Run(status);
238
239  StartNextTask();
240}
241
242void SyncTaskManager::UpdateBlockingFactorBody(
243    scoped_ptr<SyncTaskToken> foreground_task_token,
244    scoped_ptr<SyncTaskToken> background_task_token,
245    scoped_ptr<TaskLogger::TaskLog> task_log,
246    scoped_ptr<BlockingFactor> blocking_factor,
247    const Continuation& continuation) {
248  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
249
250  // Run the task directly if the parallelization is disabled.
251  if (!maximum_background_task_) {
252    DCHECK(foreground_task_token);
253    DCHECK(!background_task_token);
254    foreground_task_token->SetTaskLog(task_log.Pass());
255    continuation.Run(foreground_task_token.Pass());
256    return;
257  }
258
259  // Clear existing |blocking_factor| from |dependency_manager_| before
260  // getting |foreground_task_token|, so that we can avoid dead lock.
261  if (background_task_token && background_task_token->blocking_factor()) {
262    dependency_manager_.Erase(background_task_token->blocking_factor());
263    background_task_token->clear_blocking_factor();
264  }
265
266  // Try to get |foreground_task_token|.  If it's not available, wait for
267  // current foreground task to finish.
268  if (!foreground_task_token) {
269    DCHECK(background_task_token);
270    foreground_task_token = GetToken(background_task_token->location(),
271                                     SyncStatusCallback());
272    if (!foreground_task_token) {
273      PushPendingTask(
274          base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
275                     AsWeakPtr(),
276                     base::Passed(&foreground_task_token),
277                     base::Passed(&background_task_token),
278                     base::Passed(&task_log),
279                     base::Passed(&blocking_factor),
280                     continuation),
281          PRIORITY_HIGH);
282      StartNextTask();
283      return;
284    }
285  }
286
287  // Check if the task can run as a background task now.
288  // If there are too many task running or any other task blocks current
289  // task, wait for any other task to finish.
290  bool task_number_limit_exceeded =
291      !background_task_token &&
292      running_background_tasks_.size() >= maximum_background_task_;
293  if (task_number_limit_exceeded ||
294      !dependency_manager_.Insert(blocking_factor.get())) {
295    DCHECK(!running_background_tasks_.empty());
296    DCHECK(pending_backgrounding_task_.is_null());
297
298    // Wait for NotifyTaskDone to release a |blocking_factor|.
299    pending_backgrounding_task_ =
300        base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
301                   AsWeakPtr(),
302                   base::Passed(&foreground_task_token),
303                   base::Passed(&background_task_token),
304                   base::Passed(&task_log),
305                   base::Passed(&blocking_factor),
306                   continuation);
307    return;
308  }
309
310  if (background_task_token) {
311    background_task_token->set_blocking_factor(blocking_factor.Pass());
312  } else {
313    tracked_objects::Location from_here = foreground_task_token->location();
314    SyncStatusCallback callback = foreground_task_token->callback();
315    foreground_task_token->clear_callback();
316
317    background_task_token =
318        SyncTaskToken::CreateForBackgroundTask(
319            AsWeakPtr(),
320            task_token_seq_++,
321            blocking_factor.Pass());
322    background_task_token->UpdateTask(from_here, callback);
323    running_background_tasks_.set(background_task_token->token_id(),
324                                  running_foreground_task_.Pass());
325  }
326
327  token_ = foreground_task_token.Pass();
328  StartNextTask();
329  background_task_token->SetTaskLog(task_log.Pass());
330  continuation.Run(background_task_token.Pass());
331}
332
333scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
334    const tracked_objects::Location& from_here,
335    const SyncStatusCallback& callback) {
336  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
337
338  if (!token_)
339    return scoped_ptr<SyncTaskToken>();
340  token_->UpdateTask(from_here, callback);
341  return token_.Pass();
342}
343
344void SyncTaskManager::PushPendingTask(
345    const base::Closure& closure, Priority priority) {
346  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
347
348  pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
349}
350
351void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
352                              scoped_ptr<SyncTask> task) {
353  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
354  DCHECK(!running_foreground_task_);
355
356  running_foreground_task_ = task.Pass();
357  running_foreground_task_->RunPreflight(token.Pass());
358}
359
360void SyncTaskManager::StartNextTask() {
361  DCHECK(sequence_checker_.CalledOnValidSequencedThread());
362
363  if (!pending_backgrounding_task_.is_null()) {
364    base::Closure closure = pending_backgrounding_task_;
365    pending_backgrounding_task_.Reset();
366    closure.Run();
367    return;
368  }
369
370  if (!pending_tasks_.empty()) {
371    base::Closure closure = pending_tasks_.top().task;
372    pending_tasks_.pop();
373    closure.Run();
374    return;
375  }
376
377  if (client_)
378    client_->MaybeScheduleNextTask();
379}
380
381}  // namespace drive_backend
382}  // namespace sync_file_system
383