sync_task_manager.cc revision cedac228d2dd51db4b79ea1e72c7f249408ee061
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_task_manager.h"
6
7#include "base/bind.h"
8#include "base/location.h"
9#include "base/memory/scoped_ptr.h"
10#include "chrome/browser/sync_file_system/drive_backend/sync_task_token.h"
11#include "chrome/browser/sync_file_system/sync_file_metadata.h"
12
13using fileapi::FileSystemURL;
14
15namespace sync_file_system {
16namespace drive_backend {
17
18namespace {
19
20class SyncTaskAdapter : public ExclusiveTask {
21 public:
22  explicit SyncTaskAdapter(const SyncTaskManager::Task& task) : task_(task) {}
23  virtual ~SyncTaskAdapter() {}
24
25  virtual void RunExclusive(const SyncStatusCallback& callback) OVERRIDE {
26    task_.Run(callback);
27  }
28
29 private:
30  SyncTaskManager::Task task_;
31
32  DISALLOW_COPY_AND_ASSIGN(SyncTaskAdapter);
33};
34
35}  // namespace
36
37SyncTaskManager::PendingTask::PendingTask() {}
38
39SyncTaskManager::PendingTask::PendingTask(
40    const base::Closure& task, Priority pri, int seq)
41    : task(task), priority(pri), seq(seq) {}
42
43SyncTaskManager::PendingTask::~PendingTask() {}
44
45bool SyncTaskManager::PendingTaskComparator::operator()(
46    const PendingTask& left,
47    const PendingTask& right) const {
48  if (left.priority != right.priority)
49    return left.priority < right.priority;
50  return left.seq > right.seq;
51}
52
53SyncTaskManager::SyncTaskManager(
54    base::WeakPtr<Client> client,
55    size_t maximum_background_task)
56    : client_(client),
57      maximum_background_task_(maximum_background_task),
58      pending_task_seq_(0),
59      task_token_seq_(SyncTaskToken::kMinimumBackgroundTaskTokenID) {
60}
61
62SyncTaskManager::~SyncTaskManager() {
63  client_.reset();
64  token_.reset();
65}
66
67void SyncTaskManager::Initialize(SyncStatusCode status) {
68  DCHECK(!token_);
69  NotifyTaskDone(SyncTaskToken::CreateForForegroundTask(AsWeakPtr()),
70                 status);
71}
72
73void SyncTaskManager::ScheduleTask(
74    const tracked_objects::Location& from_here,
75    const Task& task,
76    Priority priority,
77    const SyncStatusCallback& callback) {
78  ScheduleSyncTask(from_here,
79                   scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
80                   priority,
81                   callback);
82}
83
84void SyncTaskManager::ScheduleSyncTask(
85    const tracked_objects::Location& from_here,
86    scoped_ptr<SyncTask> task,
87    Priority priority,
88    const SyncStatusCallback& callback) {
89  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
90  if (!token) {
91    PushPendingTask(
92        base::Bind(&SyncTaskManager::ScheduleSyncTask, AsWeakPtr(), from_here,
93                   base::Passed(&task), priority, callback),
94        priority);
95    return;
96  }
97  RunTask(token.Pass(), task.Pass());
98}
99
100bool SyncTaskManager::ScheduleTaskIfIdle(
101        const tracked_objects::Location& from_here,
102        const Task& task,
103        const SyncStatusCallback& callback) {
104  return ScheduleSyncTaskIfIdle(
105      from_here,
106      scoped_ptr<SyncTask>(new SyncTaskAdapter(task)),
107      callback);
108}
109
110bool SyncTaskManager::ScheduleSyncTaskIfIdle(
111    const tracked_objects::Location& from_here,
112    scoped_ptr<SyncTask> task,
113    const SyncStatusCallback& callback) {
114  scoped_ptr<SyncTaskToken> token(GetToken(from_here, callback));
115  if (!token)
116    return false;
117  RunTask(token.Pass(), task.Pass());
118  return true;
119}
120
121// static
122void SyncTaskManager::NotifyTaskDone(scoped_ptr<SyncTaskToken> token,
123                                     SyncStatusCode status) {
124  DCHECK(token);
125
126  SyncTaskManager* manager = token->manager();
127  if (token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
128    DCHECK(!manager);
129    SyncStatusCallback callback = token->callback();
130    token->clear_callback();
131    callback.Run(status);
132    return;
133  }
134
135  if (manager)
136    manager->NotifyTaskDoneBody(token.Pass(), status);
137}
138
139// static
140void SyncTaskManager::UpdateBlockingFactor(
141    scoped_ptr<SyncTaskToken> current_task_token,
142    scoped_ptr<BlockingFactor> blocking_factor,
143    const Continuation& continuation) {
144  DCHECK(current_task_token);
145
146  SyncTaskManager* manager = current_task_token->manager();
147  if (current_task_token->token_id() == SyncTaskToken::kTestingTaskTokenID) {
148    DCHECK(!manager);
149    continuation.Run(current_task_token.Pass());
150    return;
151  }
152
153  if (!manager)
154    return;
155
156  scoped_ptr<SyncTaskToken> foreground_task_token;
157  scoped_ptr<SyncTaskToken> background_task_token;
158  scoped_ptr<TaskLogger::TaskLog> task_log = current_task_token->PassTaskLog();
159  if (current_task_token->token_id() == SyncTaskToken::kForegroundTaskTokenID)
160    foreground_task_token = current_task_token.Pass();
161  else
162    background_task_token = current_task_token.Pass();
163
164  manager->UpdateBlockingFactorBody(foreground_task_token.Pass(),
165                                    background_task_token.Pass(),
166                                    task_log.Pass(),
167                                    blocking_factor.Pass(),
168                                    continuation);
169}
170
171bool SyncTaskManager::IsRunningTask(int64 token_id) const {
172  // If the client is gone, all task should be aborted.
173  if (!client_)
174    return false;
175
176  if (token_id == SyncTaskToken::kForegroundTaskTokenID)
177    return true;
178
179  return ContainsKey(running_background_tasks_, token_id);
180}
181
182void SyncTaskManager::NotifyTaskDoneBody(scoped_ptr<SyncTaskToken> token,
183                                         SyncStatusCode status) {
184  DCHECK(token);
185
186  DVLOG(3) << "NotifyTaskDone: " << "finished with status=" << status
187           << " (" << SyncStatusCodeToString(status) << ")"
188           << " " << token_->location().ToString();
189
190  if (token->blocking_factor()) {
191    dependency_manager_.Erase(token->blocking_factor());
192    token->clear_blocking_factor();
193  }
194
195  if (client_)
196    client_->RecordTaskLog(token->PassTaskLog());
197
198  scoped_ptr<SyncTask> task;
199  SyncStatusCallback callback = token->callback();
200  token->clear_callback();
201  if (token->token_id() == SyncTaskToken::kForegroundTaskTokenID) {
202    token_ = token.Pass();
203    task = running_foreground_task_.Pass();
204  } else {
205    task = running_background_tasks_.take_and_erase(token->token_id());
206  }
207
208  bool task_used_network = false;
209  if (task)
210    task_used_network = task->used_network();
211
212  if (client_)
213    client_->NotifyLastOperationStatus(status, task_used_network);
214
215  if (!callback.is_null())
216    callback.Run(status);
217
218  StartNextTask();
219}
220
221void SyncTaskManager::UpdateBlockingFactorBody(
222    scoped_ptr<SyncTaskToken> foreground_task_token,
223    scoped_ptr<SyncTaskToken> background_task_token,
224    scoped_ptr<TaskLogger::TaskLog> task_log,
225    scoped_ptr<BlockingFactor> blocking_factor,
226    const Continuation& continuation) {
227  // Run the task directly if the parallelization is disabled.
228  if (!maximum_background_task_) {
229    DCHECK(foreground_task_token);
230    DCHECK(!background_task_token);
231    continuation.Run(foreground_task_token.Pass());
232    return;
233  }
234
235  // Clear existing |blocking_factor| from |dependency_manager_| before
236  // getting |foreground_task_token|, so that we can avoid dead lock.
237  if (background_task_token && background_task_token->blocking_factor()) {
238    dependency_manager_.Erase(background_task_token->blocking_factor());
239    background_task_token->clear_blocking_factor();
240  }
241
242  // Try to get |foreground_task_token|.  If it's not available, wait for
243  // current foreground task to finish.
244  if (!foreground_task_token) {
245    DCHECK(background_task_token);
246    foreground_task_token = GetToken(background_task_token->location(),
247                                     SyncStatusCallback());
248    if (!foreground_task_token) {
249      PushPendingTask(
250          base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
251                     AsWeakPtr(),
252                     base::Passed(&foreground_task_token),
253                     base::Passed(&background_task_token),
254                     base::Passed(&task_log),
255                     base::Passed(&blocking_factor),
256                     continuation),
257          PRIORITY_HIGH);
258      StartNextTask();
259      return;
260    }
261  }
262
263  // Check if the task can run as a background task now.
264  // If there are too many task running or any other task blocks current
265  // task, wait for any other task to finish.
266  bool task_number_limit_exceeded =
267      !background_task_token &&
268      running_background_tasks_.size() >= maximum_background_task_;
269  if (task_number_limit_exceeded ||
270      !dependency_manager_.Insert(blocking_factor.get())) {
271    DCHECK(!running_background_tasks_.empty());
272    DCHECK(pending_backgrounding_task_.is_null());
273
274    // Wait for NotifyTaskDone to release a |blocking_factor|.
275    pending_backgrounding_task_ =
276        base::Bind(&SyncTaskManager::UpdateBlockingFactorBody,
277                   AsWeakPtr(),
278                   base::Passed(&foreground_task_token),
279                   base::Passed(&background_task_token),
280                   base::Passed(&task_log),
281                   base::Passed(&blocking_factor),
282                   continuation);
283    return;
284  }
285
286  if (background_task_token) {
287    background_task_token->set_blocking_factor(blocking_factor.Pass());
288  } else {
289    tracked_objects::Location from_here = foreground_task_token->location();
290    SyncStatusCallback callback = foreground_task_token->callback();
291    foreground_task_token->clear_callback();
292
293    background_task_token =
294        SyncTaskToken::CreateForBackgroundTask(
295            AsWeakPtr(),
296            task_token_seq_++,
297            blocking_factor.Pass());
298    background_task_token->UpdateTask(from_here, callback);
299    running_background_tasks_.set(background_task_token->token_id(),
300                                  running_foreground_task_.Pass());
301  }
302
303  token_ = foreground_task_token.Pass();
304  StartNextTask();
305  background_task_token->SetTaskLog(task_log.Pass());
306  continuation.Run(background_task_token.Pass());
307}
308
309scoped_ptr<SyncTaskToken> SyncTaskManager::GetToken(
310    const tracked_objects::Location& from_here,
311    const SyncStatusCallback& callback) {
312  if (!token_)
313    return scoped_ptr<SyncTaskToken>();
314  token_->UpdateTask(from_here, callback);
315  return token_.Pass();
316}
317
318void SyncTaskManager::PushPendingTask(
319    const base::Closure& closure, Priority priority) {
320  pending_tasks_.push(PendingTask(closure, priority, pending_task_seq_++));
321}
322
323void SyncTaskManager::RunTask(scoped_ptr<SyncTaskToken> token,
324                              scoped_ptr<SyncTask> task) {
325  DCHECK(!running_foreground_task_);
326
327  token->SetTaskLog(make_scoped_ptr(new TaskLogger::TaskLog));
328
329  running_foreground_task_ = task.Pass();
330  running_foreground_task_->RunPreflight(token.Pass());
331}
332
333void SyncTaskManager::StartNextTask() {
334  if (!pending_backgrounding_task_.is_null()) {
335    base::Closure closure = pending_backgrounding_task_;
336    pending_backgrounding_task_.Reset();
337    closure.Run();
338    return;
339  }
340
341  if (!pending_tasks_.empty()) {
342    base::Closure closure = pending_tasks_.top().task;
343    pending_tasks_.pop();
344    closure.Run();
345    return;
346  }
347
348  if (client_)
349    client_->MaybeScheduleNextTask();
350}
351
352}  // namespace drive_backend
353}  // namespace sync_file_system
354