local_file_sync_context.cc revision 5f1c94371a64b3196d4be9466099bb892df9b88e
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/local/local_file_sync_context.h"
6
7#include "base/bind.h"
8#include "base/file_util.h"
9#include "base/location.h"
10#include "base/single_thread_task_runner.h"
11#include "base/stl_util.h"
12#include "base/task_runner_util.h"
13#include "chrome/browser/sync_file_system/file_change.h"
14#include "chrome/browser/sync_file_system/local/local_file_change_tracker.h"
15#include "chrome/browser/sync_file_system/local/local_origin_change_observer.h"
16#include "chrome/browser/sync_file_system/local/root_delete_helper.h"
17#include "chrome/browser/sync_file_system/local/sync_file_system_backend.h"
18#include "chrome/browser/sync_file_system/local/syncable_file_operation_runner.h"
19#include "chrome/browser/sync_file_system/logger.h"
20#include "chrome/browser/sync_file_system/sync_file_metadata.h"
21#include "chrome/browser/sync_file_system/syncable_file_system_util.h"
22#include "webkit/browser/fileapi/file_system_context.h"
23#include "webkit/browser/fileapi/file_system_file_util.h"
24#include "webkit/browser/fileapi/file_system_operation_context.h"
25#include "webkit/browser/fileapi/file_system_operation_runner.h"
26#include "webkit/common/blob/scoped_file.h"
27#include "webkit/common/fileapi/file_system_util.h"
28
29using fileapi::FileSystemContext;
30using fileapi::FileSystemFileUtil;
31using fileapi::FileSystemOperation;
32using fileapi::FileSystemOperationContext;
33using fileapi::FileSystemURL;
34
35namespace sync_file_system {
36
37namespace {
38
39const int kMaxConcurrentSyncableOperation = 3;
40const int kNotifyChangesDurationInSec = 1;
41const int kMaxURLsToFetchForLocalSync = 5;
42
43const base::FilePath::CharType kSnapshotDir[] = FILE_PATH_LITERAL("snapshots");
44
45}  // namespace
46
47LocalFileSyncContext::LocalFileSyncContext(
48    const base::FilePath& base_path,
49    leveldb::Env* env_override,
50    base::SingleThreadTaskRunner* ui_task_runner,
51    base::SingleThreadTaskRunner* io_task_runner)
52    : local_base_path_(base_path.Append(FILE_PATH_LITERAL("local"))),
53      env_override_(env_override),
54      ui_task_runner_(ui_task_runner),
55      io_task_runner_(io_task_runner),
56      shutdown_on_ui_(false),
57      shutdown_on_io_(false),
58      mock_notify_changes_duration_in_sec_(-1) {
59  DCHECK(base_path.IsAbsolute());
60  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
61}
62
63void LocalFileSyncContext::MaybeInitializeFileSystemContext(
64    const GURL& source_url,
65    FileSystemContext* file_system_context,
66    const SyncStatusCallback& callback) {
67  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
68  if (ContainsKey(file_system_contexts_, file_system_context)) {
69    // The context has been already initialized. Just dispatch the callback
70    // with SYNC_STATUS_OK.
71    ui_task_runner_->PostTask(FROM_HERE, base::Bind(callback, SYNC_STATUS_OK));
72    return;
73  }
74
75  StatusCallbackQueue& callback_queue =
76      pending_initialize_callbacks_[file_system_context];
77  callback_queue.push_back(callback);
78  if (callback_queue.size() > 1)
79    return;
80
81  // The sync service always expects the origin (app) is initialized
82  // for writable way (even when MaybeInitializeFileSystemContext is called
83  // from read-only OpenFileSystem), so open the filesystem with
84  // CREATE_IF_NONEXISTENT here.
85  fileapi::FileSystemBackend::OpenFileSystemCallback open_filesystem_callback =
86      base::Bind(&LocalFileSyncContext::InitializeFileSystemContextOnIOThread,
87                 this, source_url, make_scoped_refptr(file_system_context));
88  io_task_runner_->PostTask(
89      FROM_HERE,
90      base::Bind(&fileapi::SandboxFileSystemBackendDelegate::OpenFileSystem,
91                 base::Unretained(file_system_context->sandbox_delegate()),
92                 source_url, fileapi::kFileSystemTypeSyncable,
93                 fileapi::OPEN_FILE_SYSTEM_CREATE_IF_NONEXISTENT,
94                 open_filesystem_callback, GURL()));
95}
96
97void LocalFileSyncContext::ShutdownOnUIThread() {
98  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
99  shutdown_on_ui_ = true;
100  io_task_runner_->PostTask(
101      FROM_HERE,
102      base::Bind(&LocalFileSyncContext::ShutdownOnIOThread, this));
103}
104
105void LocalFileSyncContext::GetFileForLocalSync(
106    FileSystemContext* file_system_context,
107    const LocalFileSyncInfoCallback& callback) {
108  DCHECK(file_system_context);
109  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
110
111  base::PostTaskAndReplyWithResult(
112      file_system_context->default_file_task_runner(),
113      FROM_HERE,
114      base::Bind(&LocalFileSyncContext::GetNextURLsForSyncOnFileThread,
115                 this, make_scoped_refptr(file_system_context)),
116      base::Bind(&LocalFileSyncContext::TryPrepareForLocalSync,
117                 this, make_scoped_refptr(file_system_context), callback));
118}
119
120void LocalFileSyncContext::ClearChangesForURL(
121    FileSystemContext* file_system_context,
122    const FileSystemURL& url,
123    const base::Closure& done_callback) {
124  // This is initially called on UI thread and to be relayed to FILE thread.
125  DCHECK(file_system_context);
126  if (!file_system_context->default_file_task_runner()->
127          RunsTasksOnCurrentThread()) {
128    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
129    file_system_context->default_file_task_runner()->PostTask(
130        FROM_HERE,
131        base::Bind(&LocalFileSyncContext::ClearChangesForURL,
132                   this, make_scoped_refptr(file_system_context),
133                   url, done_callback));
134    return;
135  }
136
137  SyncFileSystemBackend* backend =
138      SyncFileSystemBackend::GetBackend(file_system_context);
139  DCHECK(backend);
140  DCHECK(backend->change_tracker());
141  backend->change_tracker()->ClearChangesForURL(url);
142
143  // Call the completion callback on UI thread.
144  ui_task_runner_->PostTask(FROM_HERE, done_callback);
145}
146
147void LocalFileSyncContext::FinalizeSnapshotSync(
148    fileapi::FileSystemContext* file_system_context,
149    const fileapi::FileSystemURL& url,
150    SyncStatusCode sync_finish_status,
151    const base::Closure& done_callback) {
152  DCHECK(file_system_context);
153  DCHECK(url.is_valid());
154  if (!file_system_context->default_file_task_runner()->
155          RunsTasksOnCurrentThread()) {
156    file_system_context->default_file_task_runner()->PostTask(
157        FROM_HERE,
158        base::Bind(&LocalFileSyncContext::FinalizeSnapshotSync,
159                   this, make_scoped_refptr(file_system_context),
160                   url, sync_finish_status, done_callback));
161    return;
162  }
163
164  SyncFileSystemBackend* backend =
165      SyncFileSystemBackend::GetBackend(file_system_context);
166  DCHECK(backend);
167  DCHECK(backend->change_tracker());
168
169  if (sync_finish_status == SYNC_STATUS_OK ||
170      sync_finish_status == SYNC_STATUS_HAS_CONFLICT) {
171    // Commit the in-memory mirror change.
172    backend->change_tracker()->ResetToMirrorAndCommitChangesForURL(url);
173  } else {
174    // Abort in-memory mirror change.
175    backend->change_tracker()->RemoveMirrorAndCommitChangesForURL(url);
176  }
177
178  // We've been keeping it in writing mode, so clear the writing counter
179  // to unblock sync activities.
180  io_task_runner_->PostTask(
181      FROM_HERE, base::Bind(
182          &LocalFileSyncContext::FinalizeSnapshotSyncOnIOThread, this, url));
183
184  // Call the completion callback on UI thread.
185  ui_task_runner_->PostTask(FROM_HERE, done_callback);
186}
187
188void LocalFileSyncContext::FinalizeExclusiveSync(
189    fileapi::FileSystemContext* file_system_context,
190    const fileapi::FileSystemURL& url,
191    bool clear_local_changes,
192    const base::Closure& done_callback) {
193  DCHECK(file_system_context);
194  if (!url.is_valid()) {
195    done_callback.Run();
196    return;
197  }
198
199  if (clear_local_changes) {
200    ClearChangesForURL(file_system_context, url,
201                       base::Bind(&LocalFileSyncContext::FinalizeExclusiveSync,
202                                  this, make_scoped_refptr(file_system_context),
203                                  url, false, done_callback));
204    return;
205  }
206
207  io_task_runner_->PostTask(
208      FROM_HERE,
209      base::Bind(&LocalFileSyncContext::ClearSyncFlagOnIOThread,
210                 this, url, false /* for_snapshot_sync */));
211
212  done_callback.Run();
213}
214
215void LocalFileSyncContext::PrepareForSync(
216    FileSystemContext* file_system_context,
217    const FileSystemURL& url,
218    SyncMode sync_mode,
219    const LocalFileSyncInfoCallback& callback) {
220  // This is initially called on UI thread and to be relayed to IO thread.
221  if (!io_task_runner_->RunsTasksOnCurrentThread()) {
222    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
223    io_task_runner_->PostTask(
224        FROM_HERE,
225        base::Bind(&LocalFileSyncContext::PrepareForSync, this,
226                   make_scoped_refptr(file_system_context), url,
227                   sync_mode, callback));
228    return;
229  }
230  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
231  const bool syncable = sync_status()->IsSyncable(url);
232  // Disable writing if it's ready to be synced.
233  if (syncable)
234    sync_status()->StartSyncing(url);
235  ui_task_runner_->PostTask(
236      FROM_HERE,
237      base::Bind(&LocalFileSyncContext::DidGetWritingStatusForSync,
238                 this, make_scoped_refptr(file_system_context),
239                 syncable ? SYNC_STATUS_OK :
240                            SYNC_STATUS_FILE_BUSY,
241                 url, sync_mode, callback));
242}
243
244void LocalFileSyncContext::RegisterURLForWaitingSync(
245    const FileSystemURL& url,
246    const base::Closure& on_syncable_callback) {
247  // This is initially called on UI thread and to be relayed to IO thread.
248  if (!io_task_runner_->RunsTasksOnCurrentThread()) {
249    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
250    io_task_runner_->PostTask(
251        FROM_HERE,
252        base::Bind(&LocalFileSyncContext::RegisterURLForWaitingSync,
253                   this, url, on_syncable_callback));
254    return;
255  }
256  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
257  if (shutdown_on_io_)
258    return;
259  if (sync_status()->IsSyncable(url)) {
260    // No need to register; fire the callback now.
261    ui_task_runner_->PostTask(FROM_HERE, on_syncable_callback);
262    return;
263  }
264  url_waiting_sync_on_io_ = url;
265  url_syncable_callback_ = on_syncable_callback;
266}
267
268void LocalFileSyncContext::ApplyRemoteChange(
269    FileSystemContext* file_system_context,
270    const FileChange& change,
271    const base::FilePath& local_path,
272    const FileSystemURL& url,
273    const SyncStatusCallback& callback) {
274  if (!io_task_runner_->RunsTasksOnCurrentThread()) {
275    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
276    io_task_runner_->PostTask(
277        FROM_HERE,
278        base::Bind(&LocalFileSyncContext::ApplyRemoteChange, this,
279                   make_scoped_refptr(file_system_context),
280                   change, local_path, url, callback));
281    return;
282  }
283  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
284  DCHECK(!sync_status()->IsWritable(url));
285  DCHECK(!sync_status()->IsWriting(url));
286
287  FileSystemOperation::StatusCallback operation_callback;
288  switch (change.change()) {
289    case FileChange::FILE_CHANGE_DELETE:
290      HandleRemoteDelete(file_system_context, url, callback);
291      return;
292    case FileChange::FILE_CHANGE_ADD_OR_UPDATE:
293      HandleRemoteAddOrUpdate(
294          file_system_context, change, local_path, url, callback);
295      return;
296  }
297  NOTREACHED();
298  callback.Run(SYNC_STATUS_FAILED);
299}
300
301void LocalFileSyncContext::HandleRemoteDelete(
302    FileSystemContext* file_system_context,
303    const FileSystemURL& url,
304    const SyncStatusCallback& callback) {
305  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
306      file_system_context, url);
307
308  // Handle root directory case differently.
309  if (fileapi::VirtualPath::IsRootPath(url.path())) {
310    DCHECK(!root_delete_helper_);
311    root_delete_helper_.reset(new RootDeleteHelper(
312        file_system_context, sync_status(), url,
313        base::Bind(&LocalFileSyncContext::DidApplyRemoteChange,
314                   this, url, callback)));
315    root_delete_helper_->Run();
316    return;
317  }
318
319  file_system_context->operation_runner()->Remove(
320      url_for_sync, true /* recursive */,
321      base::Bind(&LocalFileSyncContext::DidApplyRemoteChange,
322                 this, url, callback));
323}
324
325void LocalFileSyncContext::HandleRemoteAddOrUpdate(
326    FileSystemContext* file_system_context,
327    const FileChange& change,
328    const base::FilePath& local_path,
329    const FileSystemURL& url,
330    const SyncStatusCallback& callback) {
331  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
332      file_system_context, url);
333
334  if (fileapi::VirtualPath::IsRootPath(url.path())) {
335    DidApplyRemoteChange(url, callback, base::File::FILE_OK);
336    return;
337  }
338
339  file_system_context->operation_runner()->Remove(
340      url_for_sync, true /* recursive */,
341      base::Bind(
342          &LocalFileSyncContext::DidRemoveExistingEntryForRemoteAddOrUpdate,
343          this,
344          make_scoped_refptr(file_system_context),
345          change,
346          local_path,
347          url,
348          callback));
349}
350
351void LocalFileSyncContext::DidRemoveExistingEntryForRemoteAddOrUpdate(
352    FileSystemContext* file_system_context,
353    const FileChange& change,
354    const base::FilePath& local_path,
355    const FileSystemURL& url,
356    const SyncStatusCallback& callback,
357    base::File::Error error) {
358  // Remove() may fail if the target entry does not exist (which is ok),
359  // so we ignore |error| here.
360
361  if (shutdown_on_io_) {
362    callback.Run(SYNC_FILE_ERROR_ABORT);
363    return;
364  }
365
366  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
367  DCHECK(!sync_status()->IsWritable(url));
368  DCHECK(!sync_status()->IsWriting(url));
369
370  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
371      file_system_context, url);
372  FileSystemOperation::StatusCallback operation_callback = base::Bind(
373      &LocalFileSyncContext::DidApplyRemoteChange, this, url, callback);
374
375  DCHECK_EQ(FileChange::FILE_CHANGE_ADD_OR_UPDATE, change.change());
376  switch (change.file_type()) {
377    case SYNC_FILE_TYPE_FILE: {
378      DCHECK(!local_path.empty());
379      base::FilePath dir_path = fileapi::VirtualPath::DirName(url.path());
380      if (dir_path.empty() ||
381          fileapi::VirtualPath::DirName(dir_path) == dir_path) {
382        // Copying into the root directory.
383        file_system_context->operation_runner()->CopyInForeignFile(
384            local_path, url_for_sync, operation_callback);
385      } else {
386        FileSystemURL dir_url = file_system_context->CreateCrackedFileSystemURL(
387            url_for_sync.origin(),
388            url_for_sync.mount_type(),
389            fileapi::VirtualPath::DirName(url_for_sync.virtual_path()));
390        file_system_context->operation_runner()->CreateDirectory(
391            dir_url,
392            false /* exclusive */,
393            true /* recursive */,
394            base::Bind(&LocalFileSyncContext::DidCreateDirectoryForCopyIn,
395                       this,
396                       make_scoped_refptr(file_system_context),
397                       local_path,
398                       url,
399                       operation_callback));
400      }
401      break;
402    }
403    case SYNC_FILE_TYPE_DIRECTORY:
404      file_system_context->operation_runner()->CreateDirectory(
405          url_for_sync, false /* exclusive */, true /* recursive */,
406          operation_callback);
407      break;
408    case SYNC_FILE_TYPE_UNKNOWN:
409      NOTREACHED() << "File type unknown for ADD_OR_UPDATE change";
410  }
411}
412
413void LocalFileSyncContext::RecordFakeLocalChange(
414    FileSystemContext* file_system_context,
415    const FileSystemURL& url,
416    const FileChange& change,
417    const SyncStatusCallback& callback) {
418  // This is called on UI thread and to be relayed to FILE thread.
419  DCHECK(file_system_context);
420  if (!file_system_context->default_file_task_runner()->
421          RunsTasksOnCurrentThread()) {
422    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
423    file_system_context->default_file_task_runner()->PostTask(
424        FROM_HERE,
425        base::Bind(&LocalFileSyncContext::RecordFakeLocalChange,
426                   this, make_scoped_refptr(file_system_context),
427                   url, change, callback));
428    return;
429  }
430
431  SyncFileSystemBackend* backend =
432      SyncFileSystemBackend::GetBackend(file_system_context);
433  DCHECK(backend);
434  DCHECK(backend->change_tracker());
435  backend->change_tracker()->MarkDirtyOnDatabase(url);
436  backend->change_tracker()->RecordChange(url, change);
437
438  // Fire the callback on UI thread.
439  ui_task_runner_->PostTask(FROM_HERE,
440                            base::Bind(callback,
441                                       SYNC_STATUS_OK));
442}
443
444void LocalFileSyncContext::GetFileMetadata(
445    FileSystemContext* file_system_context,
446    const FileSystemURL& url,
447    const SyncFileMetadataCallback& callback) {
448  // This is initially called on UI thread and to be relayed to IO thread.
449  if (!io_task_runner_->RunsTasksOnCurrentThread()) {
450    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
451    io_task_runner_->PostTask(
452        FROM_HERE,
453        base::Bind(&LocalFileSyncContext::GetFileMetadata, this,
454                   make_scoped_refptr(file_system_context), url, callback));
455    return;
456  }
457  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
458
459  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
460      file_system_context, url);
461  file_system_context->operation_runner()->GetMetadata(
462      url_for_sync, base::Bind(&LocalFileSyncContext::DidGetFileMetadata,
463                      this, callback));
464}
465
466void LocalFileSyncContext::HasPendingLocalChanges(
467    FileSystemContext* file_system_context,
468    const FileSystemURL& url,
469    const HasPendingLocalChangeCallback& callback) {
470  // This gets called on UI thread and relays the task on FILE thread.
471  DCHECK(file_system_context);
472  if (!file_system_context->default_file_task_runner()->
473          RunsTasksOnCurrentThread()) {
474    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
475    file_system_context->default_file_task_runner()->PostTask(
476        FROM_HERE,
477        base::Bind(&LocalFileSyncContext::HasPendingLocalChanges,
478                   this, make_scoped_refptr(file_system_context),
479                   url, callback));
480    return;
481  }
482
483  SyncFileSystemBackend* backend =
484      SyncFileSystemBackend::GetBackend(file_system_context);
485  DCHECK(backend);
486  DCHECK(backend->change_tracker());
487  FileChangeList changes;
488  backend->change_tracker()->GetChangesForURL(url, &changes);
489
490  // Fire the callback on UI thread.
491  ui_task_runner_->PostTask(FROM_HERE,
492                            base::Bind(callback,
493                                       SYNC_STATUS_OK,
494                                       !changes.empty()));
495}
496
497void LocalFileSyncContext::PromoteDemotedChanges(
498    const GURL& origin,
499    fileapi::FileSystemContext* file_system_context,
500    const base::Closure& callback) {
501  // This is initially called on UI thread and to be relayed to FILE thread.
502  DCHECK(file_system_context);
503  if (!file_system_context->default_file_task_runner()->
504          RunsTasksOnCurrentThread()) {
505    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
506    file_system_context->default_file_task_runner()->PostTask(
507        FROM_HERE,
508        base::Bind(&LocalFileSyncContext::PromoteDemotedChanges,
509                   this, origin, make_scoped_refptr(file_system_context),
510                   callback));
511    return;
512  }
513
514  SyncFileSystemBackend* backend =
515      SyncFileSystemBackend::GetBackend(file_system_context);
516  DCHECK(backend);
517  DCHECK(backend->change_tracker());
518  if (!backend->change_tracker()->PromoteDemotedChanges()) {
519    ui_task_runner_->PostTask(FROM_HERE, callback);
520    return;
521  }
522
523  io_task_runner_->PostTask(
524      FROM_HERE,
525      base::Bind(&LocalFileSyncContext::UpdateChangesForOrigin,
526                 this, origin, callback));
527}
528
529void LocalFileSyncContext::UpdateChangesForOrigin(
530    const GURL& origin,
531    const base::Closure& callback) {
532  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
533  if (shutdown_on_io_)
534    return;
535  origins_with_pending_changes_.insert(origin);
536  ScheduleNotifyChangesUpdatedOnIOThread(callback);
537}
538
539void LocalFileSyncContext::AddOriginChangeObserver(
540    LocalOriginChangeObserver* observer) {
541  origin_change_observers_.AddObserver(observer);
542}
543
544void LocalFileSyncContext::RemoveOriginChangeObserver(
545    LocalOriginChangeObserver* observer) {
546  origin_change_observers_.RemoveObserver(observer);
547}
548
549base::WeakPtr<SyncableFileOperationRunner>
550LocalFileSyncContext::operation_runner() const {
551  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
552  if (operation_runner_)
553    return operation_runner_->AsWeakPtr();
554  return base::WeakPtr<SyncableFileOperationRunner>();
555}
556
557LocalFileSyncStatus* LocalFileSyncContext::sync_status() const {
558  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
559  return sync_status_.get();
560}
561
562void LocalFileSyncContext::OnSyncEnabled(const FileSystemURL& url) {
563  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
564  if (shutdown_on_io_)
565    return;
566  UpdateChangesForOrigin(url.origin(), NoopClosure());
567  if (url_syncable_callback_.is_null() ||
568      sync_status()->IsWriting(url_waiting_sync_on_io_)) {
569    return;
570  }
571  // TODO(kinuko): may want to check how many pending tasks we have.
572  ui_task_runner_->PostTask(FROM_HERE, url_syncable_callback_);
573  url_syncable_callback_.Reset();
574}
575
576void LocalFileSyncContext::OnWriteEnabled(const FileSystemURL& url) {
577  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
578  // Nothing to do for now.
579}
580
581LocalFileSyncContext::~LocalFileSyncContext() {
582}
583
584void LocalFileSyncContext::ScheduleNotifyChangesUpdatedOnIOThread(
585    const base::Closure& callback) {
586  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
587  if (shutdown_on_io_)
588    return;
589  if (base::Time::Now() > last_notified_changes_ + NotifyChangesDuration()) {
590    NotifyAvailableChangesOnIOThread(callback);
591  } else if (!timer_on_io_->IsRunning()) {
592    timer_on_io_->Start(
593        FROM_HERE, NotifyChangesDuration(),
594        base::Bind(&LocalFileSyncContext::NotifyAvailableChangesOnIOThread,
595                   base::Unretained(this), callback));
596  }
597}
598
599void LocalFileSyncContext::NotifyAvailableChangesOnIOThread(
600    const base::Closure& callback) {
601  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
602  if (shutdown_on_io_)
603    return;
604  ui_task_runner_->PostTask(
605      FROM_HERE,
606      base::Bind(&LocalFileSyncContext::NotifyAvailableChanges,
607                 this, origins_with_pending_changes_, callback));
608  last_notified_changes_ = base::Time::Now();
609  origins_with_pending_changes_.clear();
610}
611
612void LocalFileSyncContext::NotifyAvailableChanges(
613    const std::set<GURL>& origins,
614    const base::Closure& callback) {
615  FOR_EACH_OBSERVER(LocalOriginChangeObserver, origin_change_observers_,
616                    OnChangesAvailableInOrigins(origins));
617  callback.Run();
618}
619
620void LocalFileSyncContext::ShutdownOnIOThread() {
621  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
622  shutdown_on_io_ = true;
623  operation_runner_.reset();
624  root_delete_helper_.reset();
625  sync_status_.reset();
626  timer_on_io_.reset();
627}
628
629void LocalFileSyncContext::InitializeFileSystemContextOnIOThread(
630    const GURL& source_url,
631    FileSystemContext* file_system_context,
632    const GURL& /* root */,
633    const std::string& /* name */,
634    base::File::Error error) {
635  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
636  if (shutdown_on_io_)
637    error = base::File::FILE_ERROR_ABORT;
638  if (error != base::File::FILE_OK) {
639    DidInitialize(source_url, file_system_context,
640                  FileErrorToSyncStatusCode(error));
641    return;
642  }
643  DCHECK(file_system_context);
644  SyncFileSystemBackend* backend =
645      SyncFileSystemBackend::GetBackend(file_system_context);
646  DCHECK(backend);
647  if (!backend->change_tracker()) {
648    // Create and initialize LocalFileChangeTracker and call back this method
649    // later again.
650    std::set<GURL>* origins_with_changes = new std::set<GURL>;
651    scoped_ptr<LocalFileChangeTracker>* tracker_ptr(
652        new scoped_ptr<LocalFileChangeTracker>);
653    base::PostTaskAndReplyWithResult(
654        file_system_context->default_file_task_runner(),
655        FROM_HERE,
656        base::Bind(&LocalFileSyncContext::InitializeChangeTrackerOnFileThread,
657                   this, tracker_ptr,
658                   make_scoped_refptr(file_system_context),
659                   origins_with_changes),
660        base::Bind(&LocalFileSyncContext::DidInitializeChangeTrackerOnIOThread,
661                   this, base::Owned(tracker_ptr),
662                   source_url,
663                   make_scoped_refptr(file_system_context),
664                   base::Owned(origins_with_changes)));
665    return;
666  }
667  if (!operation_runner_) {
668    DCHECK(!sync_status_);
669    DCHECK(!timer_on_io_);
670    sync_status_.reset(new LocalFileSyncStatus);
671    timer_on_io_.reset(new base::OneShotTimer<LocalFileSyncContext>);
672    operation_runner_.reset(new SyncableFileOperationRunner(
673            kMaxConcurrentSyncableOperation,
674            sync_status_.get()));
675    sync_status_->AddObserver(this);
676  }
677  backend->set_sync_context(this);
678  DidInitialize(source_url, file_system_context,
679                SYNC_STATUS_OK);
680}
681
682SyncStatusCode LocalFileSyncContext::InitializeChangeTrackerOnFileThread(
683    scoped_ptr<LocalFileChangeTracker>* tracker_ptr,
684    FileSystemContext* file_system_context,
685    std::set<GURL>* origins_with_changes) {
686  DCHECK(file_system_context);
687  DCHECK(tracker_ptr);
688  DCHECK(origins_with_changes);
689  tracker_ptr->reset(new LocalFileChangeTracker(
690          file_system_context->partition_path(),
691          env_override_,
692          file_system_context->default_file_task_runner()));
693  const SyncStatusCode status = (*tracker_ptr)->Initialize(file_system_context);
694  if (status != SYNC_STATUS_OK)
695    return status;
696
697  // Get all origins that have pending changes.
698  FileSystemURLQueue urls;
699  (*tracker_ptr)->GetNextChangedURLs(&urls, 0);
700  for (FileSystemURLQueue::iterator iter = urls.begin();
701       iter != urls.end(); ++iter) {
702    origins_with_changes->insert(iter->origin());
703  }
704
705  // Creates snapshot directory.
706  base::CreateDirectory(local_base_path_.Append(kSnapshotDir));
707
708  return status;
709}
710
711void LocalFileSyncContext::DidInitializeChangeTrackerOnIOThread(
712    scoped_ptr<LocalFileChangeTracker>* tracker_ptr,
713    const GURL& source_url,
714    FileSystemContext* file_system_context,
715    std::set<GURL>* origins_with_changes,
716    SyncStatusCode status) {
717  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
718  DCHECK(file_system_context);
719  DCHECK(origins_with_changes);
720  if (shutdown_on_io_)
721    status = SYNC_STATUS_ABORT;
722  if (status != SYNC_STATUS_OK) {
723    DidInitialize(source_url, file_system_context, status);
724    return;
725  }
726
727  SyncFileSystemBackend* backend =
728      SyncFileSystemBackend::GetBackend(file_system_context);
729  DCHECK(backend);
730  backend->SetLocalFileChangeTracker(tracker_ptr->Pass());
731
732  origins_with_pending_changes_.insert(origins_with_changes->begin(),
733                                       origins_with_changes->end());
734  ScheduleNotifyChangesUpdatedOnIOThread(NoopClosure());
735
736  InitializeFileSystemContextOnIOThread(source_url, file_system_context,
737                                        GURL(), std::string(),
738                                        base::File::FILE_OK);
739}
740
741void LocalFileSyncContext::DidInitialize(
742    const GURL& source_url,
743    FileSystemContext* file_system_context,
744    SyncStatusCode status) {
745  if (!ui_task_runner_->RunsTasksOnCurrentThread()) {
746    ui_task_runner_->PostTask(
747        FROM_HERE,
748        base::Bind(&LocalFileSyncContext::DidInitialize,
749                   this, source_url,
750                   make_scoped_refptr(file_system_context), status));
751    return;
752  }
753  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
754  DCHECK(!ContainsKey(file_system_contexts_, file_system_context));
755  DCHECK(ContainsKey(pending_initialize_callbacks_, file_system_context));
756
757  SyncFileSystemBackend* backend =
758      SyncFileSystemBackend::GetBackend(file_system_context);
759  DCHECK(backend);
760  DCHECK(backend->change_tracker());
761
762  file_system_contexts_.insert(file_system_context);
763
764  StatusCallbackQueue& callback_queue =
765      pending_initialize_callbacks_[file_system_context];
766  for (StatusCallbackQueue::iterator iter = callback_queue.begin();
767       iter != callback_queue.end(); ++iter) {
768    ui_task_runner_->PostTask(FROM_HERE, base::Bind(*iter, status));
769  }
770  pending_initialize_callbacks_.erase(file_system_context);
771}
772
773scoped_ptr<LocalFileSyncContext::FileSystemURLQueue>
774LocalFileSyncContext::GetNextURLsForSyncOnFileThread(
775    FileSystemContext* file_system_context) {
776  DCHECK(file_system_context);
777  DCHECK(file_system_context->default_file_task_runner()->
778             RunsTasksOnCurrentThread());
779  SyncFileSystemBackend* backend =
780      SyncFileSystemBackend::GetBackend(file_system_context);
781  DCHECK(backend);
782  DCHECK(backend->change_tracker());
783  scoped_ptr<FileSystemURLQueue> urls(new FileSystemURLQueue);
784  backend->change_tracker()->GetNextChangedURLs(
785      urls.get(), kMaxURLsToFetchForLocalSync);
786  for (FileSystemURLQueue::iterator iter = urls->begin();
787       iter != urls->end(); ++iter)
788    backend->change_tracker()->DemoteChangesForURL(*iter);
789
790  return urls.Pass();
791}
792
793void LocalFileSyncContext::TryPrepareForLocalSync(
794    FileSystemContext* file_system_context,
795    const LocalFileSyncInfoCallback& callback,
796    scoped_ptr<FileSystemURLQueue> urls) {
797  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
798  DCHECK(urls);
799
800  if (shutdown_on_ui_) {
801    callback.Run(SYNC_STATUS_ABORT, LocalFileSyncInfo(),
802                 webkit_blob::ScopedFile());
803    return;
804  }
805
806  if (urls->empty()) {
807    callback.Run(SYNC_STATUS_NO_CHANGE_TO_SYNC, LocalFileSyncInfo(),
808                 webkit_blob::ScopedFile());
809    return;
810  }
811
812  const FileSystemURL url = urls->front();
813  urls->pop_front();
814
815  PrepareForSync(
816      file_system_context, url, SYNC_SNAPSHOT,
817      base::Bind(&LocalFileSyncContext::DidTryPrepareForLocalSync,
818                 this, make_scoped_refptr(file_system_context),
819                 base::Passed(&urls), callback));
820}
821
822void LocalFileSyncContext::DidTryPrepareForLocalSync(
823    FileSystemContext* file_system_context,
824    scoped_ptr<FileSystemURLQueue> remaining_urls,
825    const LocalFileSyncInfoCallback& callback,
826    SyncStatusCode status,
827    const LocalFileSyncInfo& sync_file_info,
828    webkit_blob::ScopedFile snapshot) {
829  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
830  if (status != SYNC_STATUS_FILE_BUSY) {
831    PromoteDemotedChangesForURLs(file_system_context,
832                                 remaining_urls.Pass());
833    callback.Run(status, sync_file_info, snapshot.Pass());
834    return;
835  }
836
837  PromoteDemotedChangesForURL(file_system_context, sync_file_info.url);
838
839  // Recursively call TryPrepareForLocalSync with remaining_urls.
840  TryPrepareForLocalSync(file_system_context, callback, remaining_urls.Pass());
841}
842
843void LocalFileSyncContext::PromoteDemotedChangesForURL(
844    FileSystemContext* file_system_context,
845    const FileSystemURL& url) {
846  DCHECK(file_system_context);
847  if (!file_system_context->default_file_task_runner()->
848          RunsTasksOnCurrentThread()) {
849    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
850    if (shutdown_on_ui_)
851      return;
852    file_system_context->default_file_task_runner()->PostTask(
853        FROM_HERE,
854        base::Bind(&LocalFileSyncContext::PromoteDemotedChangesForURL,
855                   this, make_scoped_refptr(file_system_context), url));
856    return;
857  }
858
859  SyncFileSystemBackend* backend =
860      SyncFileSystemBackend::GetBackend(file_system_context);
861  DCHECK(backend);
862  DCHECK(backend->change_tracker());
863  backend->change_tracker()->PromoteDemotedChangesForURL(url);
864}
865
866void LocalFileSyncContext::PromoteDemotedChangesForURLs(
867    FileSystemContext* file_system_context,
868    scoped_ptr<FileSystemURLQueue> urls) {
869  DCHECK(file_system_context);
870  if (!file_system_context->default_file_task_runner()->
871          RunsTasksOnCurrentThread()) {
872    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
873    if (shutdown_on_ui_)
874      return;
875    file_system_context->default_file_task_runner()->PostTask(
876        FROM_HERE,
877        base::Bind(&LocalFileSyncContext::PromoteDemotedChangesForURLs,
878                   this, make_scoped_refptr(file_system_context),
879                   base::Passed(&urls)));
880    return;
881  }
882
883  for (FileSystemURLQueue::iterator iter = urls->begin();
884       iter != urls->end(); ++iter)
885    PromoteDemotedChangesForURL(file_system_context, *iter);
886}
887
888void LocalFileSyncContext::DidGetWritingStatusForSync(
889    FileSystemContext* file_system_context,
890    SyncStatusCode status,
891    const FileSystemURL& url,
892    SyncMode sync_mode,
893    const LocalFileSyncInfoCallback& callback) {
894  // This gets called on UI thread and relays the task on FILE thread.
895  DCHECK(file_system_context);
896  if (!file_system_context->default_file_task_runner()->
897          RunsTasksOnCurrentThread()) {
898    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
899    if (shutdown_on_ui_) {
900      callback.Run(SYNC_STATUS_ABORT, LocalFileSyncInfo(),
901                   webkit_blob::ScopedFile());
902      return;
903    }
904    file_system_context->default_file_task_runner()->PostTask(
905        FROM_HERE,
906        base::Bind(&LocalFileSyncContext::DidGetWritingStatusForSync,
907                   this, make_scoped_refptr(file_system_context),
908                   status, url, sync_mode, callback));
909    return;
910  }
911
912  SyncFileSystemBackend* backend =
913      SyncFileSystemBackend::GetBackend(file_system_context);
914  DCHECK(backend);
915  DCHECK(backend->change_tracker());
916  FileChangeList changes;
917  backend->change_tracker()->GetChangesForURL(url, &changes);
918
919  base::FilePath platform_path;
920  base::File::Info file_info;
921  FileSystemFileUtil* file_util =
922      file_system_context->sandbox_delegate()->sync_file_util();
923  DCHECK(file_util);
924
925  base::File::Error file_error = file_util->GetFileInfo(
926      make_scoped_ptr(
927          new FileSystemOperationContext(file_system_context)).get(),
928      url,
929      &file_info,
930      &platform_path);
931
932  webkit_blob::ScopedFile snapshot;
933  if (file_error == base::File::FILE_OK && sync_mode == SYNC_SNAPSHOT) {
934    base::FilePath snapshot_path;
935    base::CreateTemporaryFileInDir(local_base_path_.Append(kSnapshotDir),
936                                   &snapshot_path);
937    if (base::CopyFile(platform_path, snapshot_path)) {
938      platform_path = snapshot_path;
939      snapshot = webkit_blob::ScopedFile(
940          snapshot_path,
941          webkit_blob::ScopedFile::DELETE_ON_SCOPE_OUT,
942          file_system_context->default_file_task_runner());
943    }
944  }
945
946  if (status == SYNC_STATUS_OK &&
947      file_error != base::File::FILE_OK &&
948      file_error != base::File::FILE_ERROR_NOT_FOUND) {
949    status = FileErrorToSyncStatusCode(file_error);
950}
951
952  DCHECK(!file_info.is_symbolic_link);
953
954  SyncFileType file_type = SYNC_FILE_TYPE_FILE;
955  if (file_error == base::File::FILE_ERROR_NOT_FOUND)
956    file_type = SYNC_FILE_TYPE_UNKNOWN;
957  else if (file_info.is_directory)
958    file_type = SYNC_FILE_TYPE_DIRECTORY;
959
960  LocalFileSyncInfo sync_file_info;
961  sync_file_info.url = url;
962  sync_file_info.local_file_path = platform_path;
963  sync_file_info.metadata.file_type = file_type;
964  sync_file_info.metadata.size = file_info.size;
965  sync_file_info.metadata.last_modified = file_info.last_modified;
966  sync_file_info.changes = changes;
967
968  if (status == SYNC_STATUS_OK && sync_mode == SYNC_SNAPSHOT) {
969    if (!changes.empty()) {
970      // Now we create an empty mirror change record for URL (and we record
971      // changes to both mirror and original records during sync), so that
972      // we can reset to the mirror when the sync succeeds.
973      backend->change_tracker()->CreateFreshMirrorForURL(url);
974    }
975
976    // 'Unlock' the file for snapshot sync.
977    // (But keep it in writing status so that no other sync starts on
978    // the same URL)
979    io_task_runner_->PostTask(
980        FROM_HERE,
981        base::Bind(&LocalFileSyncContext::ClearSyncFlagOnIOThread,
982                   this, url, true /* for_snapshot_sync */));
983  }
984
985  ui_task_runner_->PostTask(FROM_HERE,
986                            base::Bind(callback, status, sync_file_info,
987                                       base::Passed(&snapshot)));
988}
989
990void LocalFileSyncContext::ClearSyncFlagOnIOThread(
991    const FileSystemURL& url,
992    bool for_snapshot_sync) {
993  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
994  if (shutdown_on_io_)
995    return;
996  sync_status()->EndSyncing(url);
997
998  if (for_snapshot_sync) {
999    // The caller will hold shared lock on this one.
1000    sync_status()->StartWriting(url);
1001    return;
1002  }
1003
1004  // Since a sync has finished the number of changes must have been updated.
1005  UpdateChangesForOrigin(url.origin(), NoopClosure());
1006}
1007
1008void LocalFileSyncContext::FinalizeSnapshotSyncOnIOThread(
1009    const FileSystemURL& url) {
1010  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1011  if (shutdown_on_io_)
1012    return;
1013  sync_status()->EndWriting(url);
1014
1015  // Since a sync has finished the number of changes must have been updated.
1016  UpdateChangesForOrigin(url.origin(), NoopClosure());
1017}
1018
1019void LocalFileSyncContext::DidApplyRemoteChange(
1020    const FileSystemURL& url,
1021    const SyncStatusCallback& callback_on_ui,
1022    base::File::Error file_error) {
1023  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1024  root_delete_helper_.reset();
1025  ui_task_runner_->PostTask(
1026      FROM_HERE,
1027      base::Bind(callback_on_ui, FileErrorToSyncStatusCode(file_error)));
1028}
1029
1030void LocalFileSyncContext::DidGetFileMetadata(
1031    const SyncFileMetadataCallback& callback,
1032    base::File::Error file_error,
1033    const base::File::Info& file_info) {
1034  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1035  SyncFileMetadata metadata;
1036  if (file_error == base::File::FILE_OK) {
1037    metadata.file_type = file_info.is_directory ?
1038        SYNC_FILE_TYPE_DIRECTORY : SYNC_FILE_TYPE_FILE;
1039    metadata.size = file_info.size;
1040    metadata.last_modified = file_info.last_modified;
1041  }
1042  ui_task_runner_->PostTask(
1043      FROM_HERE,
1044      base::Bind(callback, FileErrorToSyncStatusCode(file_error), metadata));
1045}
1046
1047base::TimeDelta LocalFileSyncContext::NotifyChangesDuration() {
1048  if (mock_notify_changes_duration_in_sec_ >= 0)
1049    return base::TimeDelta::FromSeconds(mock_notify_changes_duration_in_sec_);
1050  return base::TimeDelta::FromSeconds(kNotifyChangesDurationInSec);
1051}
1052
1053void LocalFileSyncContext::DidCreateDirectoryForCopyIn(
1054    FileSystemContext* file_system_context,
1055    const base::FilePath& local_path,
1056    const FileSystemURL& dest_url,
1057    const StatusCallback& callback,
1058    base::File::Error error) {
1059  if (error != base::File::FILE_OK) {
1060    callback.Run(error);
1061    return;
1062  }
1063
1064  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
1065      file_system_context, dest_url);
1066  file_system_context->operation_runner()->CopyInForeignFile(
1067      local_path, url_for_sync, callback);
1068}
1069
1070}  // namespace sync_file_system
1071