local_file_sync_context.cc revision 1320f92c476a1ad9d19dba2a48c72b75566198e9
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/local/local_file_sync_context.h"
6
7#include "base/bind.h"
8#include "base/files/file_util.h"
9#include "base/location.h"
10#include "base/single_thread_task_runner.h"
11#include "base/stl_util.h"
12#include "base/task_runner_util.h"
13#include "chrome/browser/sync_file_system/file_change.h"
14#include "chrome/browser/sync_file_system/local/local_file_change_tracker.h"
15#include "chrome/browser/sync_file_system/local/local_origin_change_observer.h"
16#include "chrome/browser/sync_file_system/local/root_delete_helper.h"
17#include "chrome/browser/sync_file_system/local/sync_file_system_backend.h"
18#include "chrome/browser/sync_file_system/local/syncable_file_operation_runner.h"
19#include "chrome/browser/sync_file_system/logger.h"
20#include "chrome/browser/sync_file_system/sync_file_metadata.h"
21#include "chrome/browser/sync_file_system/syncable_file_system_util.h"
22#include "storage/browser/fileapi/file_system_context.h"
23#include "storage/browser/fileapi/file_system_file_util.h"
24#include "storage/browser/fileapi/file_system_operation_context.h"
25#include "storage/browser/fileapi/file_system_operation_runner.h"
26#include "storage/common/blob/scoped_file.h"
27#include "storage/common/fileapi/file_system_util.h"
28
29using storage::FileSystemContext;
30using storage::FileSystemFileUtil;
31using storage::FileSystemOperation;
32using storage::FileSystemOperationContext;
33using storage::FileSystemURL;
34
35namespace sync_file_system {
36
37namespace {
38
39const int kMaxConcurrentSyncableOperation = 3;
40const int kNotifyChangesDurationInSec = 1;
41const int kMaxURLsToFetchForLocalSync = 5;
42
43const base::FilePath::CharType kSnapshotDir[] = FILE_PATH_LITERAL("snapshots");
44
45}  // namespace
46
47LocalFileSyncContext::LocalFileSyncContext(
48    const base::FilePath& base_path,
49    leveldb::Env* env_override,
50    base::SingleThreadTaskRunner* ui_task_runner,
51    base::SingleThreadTaskRunner* io_task_runner)
52    : local_base_path_(base_path.Append(FILE_PATH_LITERAL("local"))),
53      env_override_(env_override),
54      ui_task_runner_(ui_task_runner),
55      io_task_runner_(io_task_runner),
56      shutdown_on_ui_(false),
57      shutdown_on_io_(false),
58      mock_notify_changes_duration_in_sec_(-1) {
59  DCHECK(base_path.IsAbsolute());
60  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
61}
62
63void LocalFileSyncContext::MaybeInitializeFileSystemContext(
64    const GURL& source_url,
65    FileSystemContext* file_system_context,
66    const SyncStatusCallback& callback) {
67  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
68  if (ContainsKey(file_system_contexts_, file_system_context)) {
69    // The context has been already initialized. Just dispatch the callback
70    // with SYNC_STATUS_OK.
71    ui_task_runner_->PostTask(FROM_HERE, base::Bind(callback, SYNC_STATUS_OK));
72    return;
73  }
74
75  StatusCallbackQueue& callback_queue =
76      pending_initialize_callbacks_[file_system_context];
77  callback_queue.push_back(callback);
78  if (callback_queue.size() > 1)
79    return;
80
81  // The sync service always expects the origin (app) is initialized
82  // for writable way (even when MaybeInitializeFileSystemContext is called
83  // from read-only OpenFileSystem), so open the filesystem with
84  // CREATE_IF_NONEXISTENT here.
85  storage::FileSystemBackend::OpenFileSystemCallback open_filesystem_callback =
86      base::Bind(&LocalFileSyncContext::InitializeFileSystemContextOnIOThread,
87                 this,
88                 source_url,
89                 make_scoped_refptr(file_system_context));
90  io_task_runner_->PostTask(
91      FROM_HERE,
92      base::Bind(&storage::SandboxFileSystemBackendDelegate::OpenFileSystem,
93                 base::Unretained(file_system_context->sandbox_delegate()),
94                 source_url,
95                 storage::kFileSystemTypeSyncable,
96                 storage::OPEN_FILE_SYSTEM_CREATE_IF_NONEXISTENT,
97                 open_filesystem_callback,
98                 GURL()));
99}
100
101void LocalFileSyncContext::ShutdownOnUIThread() {
102  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
103  shutdown_on_ui_ = true;
104  io_task_runner_->PostTask(
105      FROM_HERE,
106      base::Bind(&LocalFileSyncContext::ShutdownOnIOThread, this));
107}
108
109void LocalFileSyncContext::GetFileForLocalSync(
110    FileSystemContext* file_system_context,
111    const LocalFileSyncInfoCallback& callback) {
112  DCHECK(file_system_context);
113  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
114
115  base::PostTaskAndReplyWithResult(
116      file_system_context->default_file_task_runner(),
117      FROM_HERE,
118      base::Bind(&LocalFileSyncContext::GetNextURLsForSyncOnFileThread,
119                 this, make_scoped_refptr(file_system_context)),
120      base::Bind(&LocalFileSyncContext::TryPrepareForLocalSync,
121                 this, make_scoped_refptr(file_system_context), callback));
122}
123
124void LocalFileSyncContext::ClearChangesForURL(
125    FileSystemContext* file_system_context,
126    const FileSystemURL& url,
127    const base::Closure& done_callback) {
128  // This is initially called on UI thread and to be relayed to FILE thread.
129  DCHECK(file_system_context);
130  if (!file_system_context->default_file_task_runner()->
131          RunsTasksOnCurrentThread()) {
132    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
133    file_system_context->default_file_task_runner()->PostTask(
134        FROM_HERE,
135        base::Bind(&LocalFileSyncContext::ClearChangesForURL,
136                   this, make_scoped_refptr(file_system_context),
137                   url, done_callback));
138    return;
139  }
140
141  SyncFileSystemBackend* backend =
142      SyncFileSystemBackend::GetBackend(file_system_context);
143  DCHECK(backend);
144  DCHECK(backend->change_tracker());
145  backend->change_tracker()->ClearChangesForURL(url);
146
147  // Call the completion callback on UI thread.
148  ui_task_runner_->PostTask(FROM_HERE, done_callback);
149}
150
151void LocalFileSyncContext::FinalizeSnapshotSync(
152    storage::FileSystemContext* file_system_context,
153    const storage::FileSystemURL& url,
154    SyncStatusCode sync_finish_status,
155    const base::Closure& done_callback) {
156  DCHECK(file_system_context);
157  DCHECK(url.is_valid());
158  if (!file_system_context->default_file_task_runner()->
159          RunsTasksOnCurrentThread()) {
160    file_system_context->default_file_task_runner()->PostTask(
161        FROM_HERE,
162        base::Bind(&LocalFileSyncContext::FinalizeSnapshotSync,
163                   this, make_scoped_refptr(file_system_context),
164                   url, sync_finish_status, done_callback));
165    return;
166  }
167
168  SyncFileSystemBackend* backend =
169      SyncFileSystemBackend::GetBackend(file_system_context);
170  DCHECK(backend);
171  DCHECK(backend->change_tracker());
172
173  if (sync_finish_status == SYNC_STATUS_OK ||
174      sync_finish_status == SYNC_STATUS_HAS_CONFLICT) {
175    // Commit the in-memory mirror change.
176    backend->change_tracker()->ResetToMirrorAndCommitChangesForURL(url);
177  } else {
178    // Abort in-memory mirror change.
179    backend->change_tracker()->RemoveMirrorAndCommitChangesForURL(url);
180  }
181
182  // We've been keeping it in writing mode, so clear the writing counter
183  // to unblock sync activities.
184  io_task_runner_->PostTask(
185      FROM_HERE, base::Bind(
186          &LocalFileSyncContext::FinalizeSnapshotSyncOnIOThread, this, url));
187
188  // Call the completion callback on UI thread.
189  ui_task_runner_->PostTask(FROM_HERE, done_callback);
190}
191
192void LocalFileSyncContext::FinalizeExclusiveSync(
193    storage::FileSystemContext* file_system_context,
194    const storage::FileSystemURL& url,
195    bool clear_local_changes,
196    const base::Closure& done_callback) {
197  DCHECK(file_system_context);
198  if (!url.is_valid()) {
199    done_callback.Run();
200    return;
201  }
202
203  if (clear_local_changes) {
204    ClearChangesForURL(file_system_context, url,
205                       base::Bind(&LocalFileSyncContext::FinalizeExclusiveSync,
206                                  this, make_scoped_refptr(file_system_context),
207                                  url, false, done_callback));
208    return;
209  }
210
211  io_task_runner_->PostTask(
212      FROM_HERE,
213      base::Bind(&LocalFileSyncContext::ClearSyncFlagOnIOThread,
214                 this, url, false /* for_snapshot_sync */));
215
216  done_callback.Run();
217}
218
219void LocalFileSyncContext::PrepareForSync(
220    FileSystemContext* file_system_context,
221    const FileSystemURL& url,
222    SyncMode sync_mode,
223    const LocalFileSyncInfoCallback& callback) {
224  // This is initially called on UI thread and to be relayed to IO thread.
225  if (!io_task_runner_->RunsTasksOnCurrentThread()) {
226    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
227    io_task_runner_->PostTask(
228        FROM_HERE,
229        base::Bind(&LocalFileSyncContext::PrepareForSync, this,
230                   make_scoped_refptr(file_system_context), url,
231                   sync_mode, callback));
232    return;
233  }
234  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
235  const bool syncable = sync_status()->IsSyncable(url);
236  // Disable writing if it's ready to be synced.
237  if (syncable)
238    sync_status()->StartSyncing(url);
239  ui_task_runner_->PostTask(
240      FROM_HERE,
241      base::Bind(&LocalFileSyncContext::DidGetWritingStatusForSync,
242                 this, make_scoped_refptr(file_system_context),
243                 syncable ? SYNC_STATUS_OK :
244                            SYNC_STATUS_FILE_BUSY,
245                 url, sync_mode, callback));
246}
247
248void LocalFileSyncContext::RegisterURLForWaitingSync(
249    const FileSystemURL& url,
250    const base::Closure& on_syncable_callback) {
251  // This is initially called on UI thread and to be relayed to IO thread.
252  if (!io_task_runner_->RunsTasksOnCurrentThread()) {
253    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
254    io_task_runner_->PostTask(
255        FROM_HERE,
256        base::Bind(&LocalFileSyncContext::RegisterURLForWaitingSync,
257                   this, url, on_syncable_callback));
258    return;
259  }
260  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
261  if (shutdown_on_io_)
262    return;
263  if (sync_status()->IsSyncable(url)) {
264    // No need to register; fire the callback now.
265    ui_task_runner_->PostTask(FROM_HERE, on_syncable_callback);
266    return;
267  }
268  url_waiting_sync_on_io_ = url;
269  url_syncable_callback_ = on_syncable_callback;
270}
271
272void LocalFileSyncContext::ApplyRemoteChange(
273    FileSystemContext* file_system_context,
274    const FileChange& change,
275    const base::FilePath& local_path,
276    const FileSystemURL& url,
277    const SyncStatusCallback& callback) {
278  if (!io_task_runner_->RunsTasksOnCurrentThread()) {
279    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
280    io_task_runner_->PostTask(
281        FROM_HERE,
282        base::Bind(&LocalFileSyncContext::ApplyRemoteChange, this,
283                   make_scoped_refptr(file_system_context),
284                   change, local_path, url, callback));
285    return;
286  }
287  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
288  DCHECK(!sync_status()->IsWritable(url));
289  DCHECK(!sync_status()->IsWriting(url));
290
291  FileSystemOperation::StatusCallback operation_callback;
292  switch (change.change()) {
293    case FileChange::FILE_CHANGE_DELETE:
294      HandleRemoteDelete(file_system_context, url, callback);
295      return;
296    case FileChange::FILE_CHANGE_ADD_OR_UPDATE:
297      HandleRemoteAddOrUpdate(
298          file_system_context, change, local_path, url, callback);
299      return;
300  }
301  NOTREACHED();
302  callback.Run(SYNC_STATUS_FAILED);
303}
304
305void LocalFileSyncContext::HandleRemoteDelete(
306    FileSystemContext* file_system_context,
307    const FileSystemURL& url,
308    const SyncStatusCallback& callback) {
309  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
310      file_system_context, url);
311
312  // Handle root directory case differently.
313  if (storage::VirtualPath::IsRootPath(url.path())) {
314    DCHECK(!root_delete_helper_);
315    root_delete_helper_.reset(new RootDeleteHelper(
316        file_system_context, sync_status(), url,
317        base::Bind(&LocalFileSyncContext::DidApplyRemoteChange,
318                   this, url, callback)));
319    root_delete_helper_->Run();
320    return;
321  }
322
323  file_system_context->operation_runner()->Remove(
324      url_for_sync, true /* recursive */,
325      base::Bind(&LocalFileSyncContext::DidApplyRemoteChange,
326                 this, url, callback));
327}
328
329void LocalFileSyncContext::HandleRemoteAddOrUpdate(
330    FileSystemContext* file_system_context,
331    const FileChange& change,
332    const base::FilePath& local_path,
333    const FileSystemURL& url,
334    const SyncStatusCallback& callback) {
335  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
336      file_system_context, url);
337
338  if (storage::VirtualPath::IsRootPath(url.path())) {
339    DidApplyRemoteChange(url, callback, base::File::FILE_OK);
340    return;
341  }
342
343  file_system_context->operation_runner()->Remove(
344      url_for_sync, true /* recursive */,
345      base::Bind(
346          &LocalFileSyncContext::DidRemoveExistingEntryForRemoteAddOrUpdate,
347          this,
348          make_scoped_refptr(file_system_context),
349          change,
350          local_path,
351          url,
352          callback));
353}
354
355void LocalFileSyncContext::DidRemoveExistingEntryForRemoteAddOrUpdate(
356    FileSystemContext* file_system_context,
357    const FileChange& change,
358    const base::FilePath& local_path,
359    const FileSystemURL& url,
360    const SyncStatusCallback& callback,
361    base::File::Error error) {
362  // Remove() may fail if the target entry does not exist (which is ok),
363  // so we ignore |error| here.
364
365  if (shutdown_on_io_) {
366    callback.Run(SYNC_FILE_ERROR_ABORT);
367    return;
368  }
369
370  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
371  DCHECK(!sync_status()->IsWritable(url));
372  DCHECK(!sync_status()->IsWriting(url));
373
374  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
375      file_system_context, url);
376  FileSystemOperation::StatusCallback operation_callback = base::Bind(
377      &LocalFileSyncContext::DidApplyRemoteChange, this, url, callback);
378
379  DCHECK_EQ(FileChange::FILE_CHANGE_ADD_OR_UPDATE, change.change());
380  switch (change.file_type()) {
381    case SYNC_FILE_TYPE_FILE: {
382      DCHECK(!local_path.empty());
383      base::FilePath dir_path = storage::VirtualPath::DirName(url.path());
384      if (dir_path.empty() ||
385          storage::VirtualPath::DirName(dir_path) == dir_path) {
386        // Copying into the root directory.
387        file_system_context->operation_runner()->CopyInForeignFile(
388            local_path, url_for_sync, operation_callback);
389      } else {
390        FileSystemURL dir_url = file_system_context->CreateCrackedFileSystemURL(
391            url_for_sync.origin(),
392            url_for_sync.mount_type(),
393            storage::VirtualPath::DirName(url_for_sync.virtual_path()));
394        file_system_context->operation_runner()->CreateDirectory(
395            dir_url,
396            false /* exclusive */,
397            true /* recursive */,
398            base::Bind(&LocalFileSyncContext::DidCreateDirectoryForCopyIn,
399                       this,
400                       make_scoped_refptr(file_system_context),
401                       local_path,
402                       url,
403                       operation_callback));
404      }
405      break;
406    }
407    case SYNC_FILE_TYPE_DIRECTORY:
408      file_system_context->operation_runner()->CreateDirectory(
409          url_for_sync, false /* exclusive */, true /* recursive */,
410          operation_callback);
411      break;
412    case SYNC_FILE_TYPE_UNKNOWN:
413      NOTREACHED() << "File type unknown for ADD_OR_UPDATE change";
414  }
415}
416
417void LocalFileSyncContext::RecordFakeLocalChange(
418    FileSystemContext* file_system_context,
419    const FileSystemURL& url,
420    const FileChange& change,
421    const SyncStatusCallback& callback) {
422  // This is called on UI thread and to be relayed to FILE thread.
423  DCHECK(file_system_context);
424  if (!file_system_context->default_file_task_runner()->
425          RunsTasksOnCurrentThread()) {
426    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
427    file_system_context->default_file_task_runner()->PostTask(
428        FROM_HERE,
429        base::Bind(&LocalFileSyncContext::RecordFakeLocalChange,
430                   this, make_scoped_refptr(file_system_context),
431                   url, change, callback));
432    return;
433  }
434
435  SyncFileSystemBackend* backend =
436      SyncFileSystemBackend::GetBackend(file_system_context);
437  DCHECK(backend);
438  DCHECK(backend->change_tracker());
439  backend->change_tracker()->MarkDirtyOnDatabase(url);
440  backend->change_tracker()->RecordChange(url, change);
441
442  // Fire the callback on UI thread.
443  ui_task_runner_->PostTask(FROM_HERE,
444                            base::Bind(callback,
445                                       SYNC_STATUS_OK));
446}
447
448void LocalFileSyncContext::GetFileMetadata(
449    FileSystemContext* file_system_context,
450    const FileSystemURL& url,
451    const SyncFileMetadataCallback& callback) {
452  // This is initially called on UI thread and to be relayed to IO thread.
453  if (!io_task_runner_->RunsTasksOnCurrentThread()) {
454    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
455    io_task_runner_->PostTask(
456        FROM_HERE,
457        base::Bind(&LocalFileSyncContext::GetFileMetadata, this,
458                   make_scoped_refptr(file_system_context), url, callback));
459    return;
460  }
461  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
462
463  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
464      file_system_context, url);
465  file_system_context->operation_runner()->GetMetadata(
466      url_for_sync, base::Bind(&LocalFileSyncContext::DidGetFileMetadata,
467                      this, callback));
468}
469
470void LocalFileSyncContext::HasPendingLocalChanges(
471    FileSystemContext* file_system_context,
472    const FileSystemURL& url,
473    const HasPendingLocalChangeCallback& callback) {
474  // This gets called on UI thread and relays the task on FILE thread.
475  DCHECK(file_system_context);
476  if (!file_system_context->default_file_task_runner()->
477          RunsTasksOnCurrentThread()) {
478    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
479    file_system_context->default_file_task_runner()->PostTask(
480        FROM_HERE,
481        base::Bind(&LocalFileSyncContext::HasPendingLocalChanges,
482                   this, make_scoped_refptr(file_system_context),
483                   url, callback));
484    return;
485  }
486
487  SyncFileSystemBackend* backend =
488      SyncFileSystemBackend::GetBackend(file_system_context);
489  DCHECK(backend);
490  DCHECK(backend->change_tracker());
491  FileChangeList changes;
492  backend->change_tracker()->GetChangesForURL(url, &changes);
493
494  // Fire the callback on UI thread.
495  ui_task_runner_->PostTask(FROM_HERE,
496                            base::Bind(callback,
497                                       SYNC_STATUS_OK,
498                                       !changes.empty()));
499}
500
501void LocalFileSyncContext::PromoteDemotedChanges(
502    const GURL& origin,
503    storage::FileSystemContext* file_system_context,
504    const base::Closure& callback) {
505  // This is initially called on UI thread and to be relayed to FILE thread.
506  DCHECK(file_system_context);
507  if (!file_system_context->default_file_task_runner()->
508          RunsTasksOnCurrentThread()) {
509    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
510    file_system_context->default_file_task_runner()->PostTask(
511        FROM_HERE,
512        base::Bind(&LocalFileSyncContext::PromoteDemotedChanges,
513                   this, origin, make_scoped_refptr(file_system_context),
514                   callback));
515    return;
516  }
517
518  SyncFileSystemBackend* backend =
519      SyncFileSystemBackend::GetBackend(file_system_context);
520  DCHECK(backend);
521  DCHECK(backend->change_tracker());
522  if (!backend->change_tracker()->PromoteDemotedChanges()) {
523    ui_task_runner_->PostTask(FROM_HERE, callback);
524    return;
525  }
526
527  io_task_runner_->PostTask(
528      FROM_HERE,
529      base::Bind(&LocalFileSyncContext::UpdateChangesForOrigin,
530                 this, origin, callback));
531}
532
533void LocalFileSyncContext::UpdateChangesForOrigin(
534    const GURL& origin,
535    const base::Closure& callback) {
536  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
537  if (shutdown_on_io_)
538    return;
539  origins_with_pending_changes_.insert(origin);
540  ScheduleNotifyChangesUpdatedOnIOThread(callback);
541}
542
543void LocalFileSyncContext::AddOriginChangeObserver(
544    LocalOriginChangeObserver* observer) {
545  origin_change_observers_.AddObserver(observer);
546}
547
548void LocalFileSyncContext::RemoveOriginChangeObserver(
549    LocalOriginChangeObserver* observer) {
550  origin_change_observers_.RemoveObserver(observer);
551}
552
553base::WeakPtr<SyncableFileOperationRunner>
554LocalFileSyncContext::operation_runner() const {
555  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
556  if (operation_runner_)
557    return operation_runner_->AsWeakPtr();
558  return base::WeakPtr<SyncableFileOperationRunner>();
559}
560
561LocalFileSyncStatus* LocalFileSyncContext::sync_status() const {
562  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
563  return sync_status_.get();
564}
565
566void LocalFileSyncContext::OnSyncEnabled(const FileSystemURL& url) {
567  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
568  if (shutdown_on_io_)
569    return;
570  UpdateChangesForOrigin(url.origin(), NoopClosure());
571  if (url_syncable_callback_.is_null() ||
572      sync_status()->IsWriting(url_waiting_sync_on_io_)) {
573    return;
574  }
575  // TODO(kinuko): may want to check how many pending tasks we have.
576  ui_task_runner_->PostTask(FROM_HERE, url_syncable_callback_);
577  url_syncable_callback_.Reset();
578}
579
580void LocalFileSyncContext::OnWriteEnabled(const FileSystemURL& url) {
581  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
582  // Nothing to do for now.
583}
584
585LocalFileSyncContext::~LocalFileSyncContext() {
586}
587
588void LocalFileSyncContext::ScheduleNotifyChangesUpdatedOnIOThread(
589    const base::Closure& callback) {
590  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
591  if (shutdown_on_io_)
592    return;
593  if (base::Time::Now() > last_notified_changes_ + NotifyChangesDuration()) {
594    NotifyAvailableChangesOnIOThread(callback);
595  } else if (!timer_on_io_->IsRunning()) {
596    timer_on_io_->Start(
597        FROM_HERE, NotifyChangesDuration(),
598        base::Bind(&LocalFileSyncContext::NotifyAvailableChangesOnIOThread,
599                   base::Unretained(this), callback));
600  }
601}
602
603void LocalFileSyncContext::NotifyAvailableChangesOnIOThread(
604    const base::Closure& callback) {
605  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
606  if (shutdown_on_io_)
607    return;
608  ui_task_runner_->PostTask(
609      FROM_HERE,
610      base::Bind(&LocalFileSyncContext::NotifyAvailableChanges,
611                 this, origins_with_pending_changes_, callback));
612  last_notified_changes_ = base::Time::Now();
613  origins_with_pending_changes_.clear();
614}
615
616void LocalFileSyncContext::NotifyAvailableChanges(
617    const std::set<GURL>& origins,
618    const base::Closure& callback) {
619  FOR_EACH_OBSERVER(LocalOriginChangeObserver, origin_change_observers_,
620                    OnChangesAvailableInOrigins(origins));
621  callback.Run();
622}
623
624void LocalFileSyncContext::ShutdownOnIOThread() {
625  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
626  shutdown_on_io_ = true;
627  operation_runner_.reset();
628  root_delete_helper_.reset();
629  sync_status_.reset();
630  timer_on_io_.reset();
631}
632
633void LocalFileSyncContext::InitializeFileSystemContextOnIOThread(
634    const GURL& source_url,
635    FileSystemContext* file_system_context,
636    const GURL& /* root */,
637    const std::string& /* name */,
638    base::File::Error error) {
639  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
640  if (shutdown_on_io_)
641    error = base::File::FILE_ERROR_ABORT;
642  if (error != base::File::FILE_OK) {
643    DidInitialize(source_url, file_system_context,
644                  FileErrorToSyncStatusCode(error));
645    return;
646  }
647  DCHECK(file_system_context);
648  SyncFileSystemBackend* backend =
649      SyncFileSystemBackend::GetBackend(file_system_context);
650  DCHECK(backend);
651  if (!backend->change_tracker()) {
652    // Create and initialize LocalFileChangeTracker and call back this method
653    // later again.
654    std::set<GURL>* origins_with_changes = new std::set<GURL>;
655    scoped_ptr<LocalFileChangeTracker>* tracker_ptr(
656        new scoped_ptr<LocalFileChangeTracker>);
657    base::PostTaskAndReplyWithResult(
658        file_system_context->default_file_task_runner(),
659        FROM_HERE,
660        base::Bind(&LocalFileSyncContext::InitializeChangeTrackerOnFileThread,
661                   this, tracker_ptr,
662                   make_scoped_refptr(file_system_context),
663                   origins_with_changes),
664        base::Bind(&LocalFileSyncContext::DidInitializeChangeTrackerOnIOThread,
665                   this, base::Owned(tracker_ptr),
666                   source_url,
667                   make_scoped_refptr(file_system_context),
668                   base::Owned(origins_with_changes)));
669    return;
670  }
671  if (!operation_runner_) {
672    DCHECK(!sync_status_);
673    DCHECK(!timer_on_io_);
674    sync_status_.reset(new LocalFileSyncStatus);
675    timer_on_io_.reset(new base::OneShotTimer<LocalFileSyncContext>);
676    operation_runner_.reset(new SyncableFileOperationRunner(
677            kMaxConcurrentSyncableOperation,
678            sync_status_.get()));
679    sync_status_->AddObserver(this);
680  }
681  backend->set_sync_context(this);
682  DidInitialize(source_url, file_system_context,
683                SYNC_STATUS_OK);
684}
685
686SyncStatusCode LocalFileSyncContext::InitializeChangeTrackerOnFileThread(
687    scoped_ptr<LocalFileChangeTracker>* tracker_ptr,
688    FileSystemContext* file_system_context,
689    std::set<GURL>* origins_with_changes) {
690  DCHECK(file_system_context);
691  DCHECK(tracker_ptr);
692  DCHECK(origins_with_changes);
693  tracker_ptr->reset(new LocalFileChangeTracker(
694          file_system_context->partition_path(),
695          env_override_,
696          file_system_context->default_file_task_runner()));
697  const SyncStatusCode status = (*tracker_ptr)->Initialize(file_system_context);
698  if (status != SYNC_STATUS_OK)
699    return status;
700
701  // Get all origins that have pending changes.
702  FileSystemURLQueue urls;
703  (*tracker_ptr)->GetNextChangedURLs(&urls, 0);
704  for (FileSystemURLQueue::iterator iter = urls.begin();
705       iter != urls.end(); ++iter) {
706    origins_with_changes->insert(iter->origin());
707  }
708
709  // Creates snapshot directory.
710  base::CreateDirectory(local_base_path_.Append(kSnapshotDir));
711
712  return status;
713}
714
715void LocalFileSyncContext::DidInitializeChangeTrackerOnIOThread(
716    scoped_ptr<LocalFileChangeTracker>* tracker_ptr,
717    const GURL& source_url,
718    FileSystemContext* file_system_context,
719    std::set<GURL>* origins_with_changes,
720    SyncStatusCode status) {
721  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
722  DCHECK(file_system_context);
723  DCHECK(origins_with_changes);
724  if (shutdown_on_io_)
725    status = SYNC_STATUS_ABORT;
726  if (status != SYNC_STATUS_OK) {
727    DidInitialize(source_url, file_system_context, status);
728    return;
729  }
730
731  SyncFileSystemBackend* backend =
732      SyncFileSystemBackend::GetBackend(file_system_context);
733  DCHECK(backend);
734  backend->SetLocalFileChangeTracker(tracker_ptr->Pass());
735
736  origins_with_pending_changes_.insert(origins_with_changes->begin(),
737                                       origins_with_changes->end());
738  ScheduleNotifyChangesUpdatedOnIOThread(NoopClosure());
739
740  InitializeFileSystemContextOnIOThread(source_url, file_system_context,
741                                        GURL(), std::string(),
742                                        base::File::FILE_OK);
743}
744
745void LocalFileSyncContext::DidInitialize(
746    const GURL& source_url,
747    FileSystemContext* file_system_context,
748    SyncStatusCode status) {
749  if (!ui_task_runner_->RunsTasksOnCurrentThread()) {
750    ui_task_runner_->PostTask(
751        FROM_HERE,
752        base::Bind(&LocalFileSyncContext::DidInitialize,
753                   this, source_url,
754                   make_scoped_refptr(file_system_context), status));
755    return;
756  }
757  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
758  DCHECK(!ContainsKey(file_system_contexts_, file_system_context));
759  DCHECK(ContainsKey(pending_initialize_callbacks_, file_system_context));
760
761  SyncFileSystemBackend* backend =
762      SyncFileSystemBackend::GetBackend(file_system_context);
763  DCHECK(backend);
764  DCHECK(backend->change_tracker());
765
766  file_system_contexts_.insert(file_system_context);
767
768  StatusCallbackQueue& callback_queue =
769      pending_initialize_callbacks_[file_system_context];
770  for (StatusCallbackQueue::iterator iter = callback_queue.begin();
771       iter != callback_queue.end(); ++iter) {
772    ui_task_runner_->PostTask(FROM_HERE, base::Bind(*iter, status));
773  }
774  pending_initialize_callbacks_.erase(file_system_context);
775}
776
777scoped_ptr<LocalFileSyncContext::FileSystemURLQueue>
778LocalFileSyncContext::GetNextURLsForSyncOnFileThread(
779    FileSystemContext* file_system_context) {
780  DCHECK(file_system_context);
781  DCHECK(file_system_context->default_file_task_runner()->
782             RunsTasksOnCurrentThread());
783  SyncFileSystemBackend* backend =
784      SyncFileSystemBackend::GetBackend(file_system_context);
785  DCHECK(backend);
786  DCHECK(backend->change_tracker());
787  scoped_ptr<FileSystemURLQueue> urls(new FileSystemURLQueue);
788  backend->change_tracker()->GetNextChangedURLs(
789      urls.get(), kMaxURLsToFetchForLocalSync);
790  for (FileSystemURLQueue::iterator iter = urls->begin();
791       iter != urls->end(); ++iter)
792    backend->change_tracker()->DemoteChangesForURL(*iter);
793
794  return urls.Pass();
795}
796
797void LocalFileSyncContext::TryPrepareForLocalSync(
798    FileSystemContext* file_system_context,
799    const LocalFileSyncInfoCallback& callback,
800    scoped_ptr<FileSystemURLQueue> urls) {
801  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
802  DCHECK(urls);
803
804  if (shutdown_on_ui_) {
805    callback.Run(SYNC_STATUS_ABORT, LocalFileSyncInfo(), storage::ScopedFile());
806    return;
807  }
808
809  if (urls->empty()) {
810    callback.Run(SYNC_STATUS_NO_CHANGE_TO_SYNC,
811                 LocalFileSyncInfo(),
812                 storage::ScopedFile());
813    return;
814  }
815
816  const FileSystemURL url = urls->front();
817  urls->pop_front();
818
819  PrepareForSync(
820      file_system_context, url, SYNC_SNAPSHOT,
821      base::Bind(&LocalFileSyncContext::DidTryPrepareForLocalSync,
822                 this, make_scoped_refptr(file_system_context),
823                 base::Passed(&urls), callback));
824}
825
826void LocalFileSyncContext::DidTryPrepareForLocalSync(
827    FileSystemContext* file_system_context,
828    scoped_ptr<FileSystemURLQueue> remaining_urls,
829    const LocalFileSyncInfoCallback& callback,
830    SyncStatusCode status,
831    const LocalFileSyncInfo& sync_file_info,
832    storage::ScopedFile snapshot) {
833  DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
834  if (status != SYNC_STATUS_FILE_BUSY) {
835    PromoteDemotedChangesForURLs(file_system_context,
836                                 remaining_urls.Pass());
837    callback.Run(status, sync_file_info, snapshot.Pass());
838    return;
839  }
840
841  PromoteDemotedChangesForURL(file_system_context, sync_file_info.url);
842
843  // Recursively call TryPrepareForLocalSync with remaining_urls.
844  TryPrepareForLocalSync(file_system_context, callback, remaining_urls.Pass());
845}
846
847void LocalFileSyncContext::PromoteDemotedChangesForURL(
848    FileSystemContext* file_system_context,
849    const FileSystemURL& url) {
850  DCHECK(file_system_context);
851  if (!file_system_context->default_file_task_runner()->
852          RunsTasksOnCurrentThread()) {
853    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
854    if (shutdown_on_ui_)
855      return;
856    file_system_context->default_file_task_runner()->PostTask(
857        FROM_HERE,
858        base::Bind(&LocalFileSyncContext::PromoteDemotedChangesForURL,
859                   this, make_scoped_refptr(file_system_context), url));
860    return;
861  }
862
863  SyncFileSystemBackend* backend =
864      SyncFileSystemBackend::GetBackend(file_system_context);
865  DCHECK(backend);
866  DCHECK(backend->change_tracker());
867  backend->change_tracker()->PromoteDemotedChangesForURL(url);
868}
869
870void LocalFileSyncContext::PromoteDemotedChangesForURLs(
871    FileSystemContext* file_system_context,
872    scoped_ptr<FileSystemURLQueue> urls) {
873  DCHECK(file_system_context);
874  if (!file_system_context->default_file_task_runner()->
875          RunsTasksOnCurrentThread()) {
876    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
877    if (shutdown_on_ui_)
878      return;
879    file_system_context->default_file_task_runner()->PostTask(
880        FROM_HERE,
881        base::Bind(&LocalFileSyncContext::PromoteDemotedChangesForURLs,
882                   this, make_scoped_refptr(file_system_context),
883                   base::Passed(&urls)));
884    return;
885  }
886
887  for (FileSystemURLQueue::iterator iter = urls->begin();
888       iter != urls->end(); ++iter)
889    PromoteDemotedChangesForURL(file_system_context, *iter);
890}
891
892void LocalFileSyncContext::DidGetWritingStatusForSync(
893    FileSystemContext* file_system_context,
894    SyncStatusCode status,
895    const FileSystemURL& url,
896    SyncMode sync_mode,
897    const LocalFileSyncInfoCallback& callback) {
898  // This gets called on UI thread and relays the task on FILE thread.
899  DCHECK(file_system_context);
900  if (!file_system_context->default_file_task_runner()->
901          RunsTasksOnCurrentThread()) {
902    DCHECK(ui_task_runner_->RunsTasksOnCurrentThread());
903    if (shutdown_on_ui_) {
904      callback.Run(
905          SYNC_STATUS_ABORT, LocalFileSyncInfo(), storage::ScopedFile());
906      return;
907    }
908    file_system_context->default_file_task_runner()->PostTask(
909        FROM_HERE,
910        base::Bind(&LocalFileSyncContext::DidGetWritingStatusForSync,
911                   this, make_scoped_refptr(file_system_context),
912                   status, url, sync_mode, callback));
913    return;
914  }
915
916  SyncFileSystemBackend* backend =
917      SyncFileSystemBackend::GetBackend(file_system_context);
918  DCHECK(backend);
919  DCHECK(backend->change_tracker());
920  FileChangeList changes;
921  backend->change_tracker()->GetChangesForURL(url, &changes);
922
923  base::FilePath platform_path;
924  base::File::Info file_info;
925  FileSystemFileUtil* file_util =
926      file_system_context->sandbox_delegate()->sync_file_util();
927  DCHECK(file_util);
928
929  base::File::Error file_error = file_util->GetFileInfo(
930      make_scoped_ptr(
931          new FileSystemOperationContext(file_system_context)).get(),
932      url,
933      &file_info,
934      &platform_path);
935
936  storage::ScopedFile snapshot;
937  if (file_error == base::File::FILE_OK && sync_mode == SYNC_SNAPSHOT) {
938    base::FilePath snapshot_path;
939    base::CreateTemporaryFileInDir(local_base_path_.Append(kSnapshotDir),
940                                   &snapshot_path);
941    if (base::CopyFile(platform_path, snapshot_path)) {
942      platform_path = snapshot_path;
943      snapshot =
944          storage::ScopedFile(snapshot_path,
945                              storage::ScopedFile::DELETE_ON_SCOPE_OUT,
946                              file_system_context->default_file_task_runner());
947    }
948  }
949
950  if (status == SYNC_STATUS_OK &&
951      file_error != base::File::FILE_OK &&
952      file_error != base::File::FILE_ERROR_NOT_FOUND) {
953    status = FileErrorToSyncStatusCode(file_error);
954}
955
956  DCHECK(!file_info.is_symbolic_link);
957
958  SyncFileType file_type = SYNC_FILE_TYPE_FILE;
959  if (file_error == base::File::FILE_ERROR_NOT_FOUND)
960    file_type = SYNC_FILE_TYPE_UNKNOWN;
961  else if (file_info.is_directory)
962    file_type = SYNC_FILE_TYPE_DIRECTORY;
963
964  LocalFileSyncInfo sync_file_info;
965  sync_file_info.url = url;
966  sync_file_info.local_file_path = platform_path;
967  sync_file_info.metadata.file_type = file_type;
968  sync_file_info.metadata.size = file_info.size;
969  sync_file_info.metadata.last_modified = file_info.last_modified;
970  sync_file_info.changes = changes;
971
972  if (status == SYNC_STATUS_OK && sync_mode == SYNC_SNAPSHOT) {
973    if (!changes.empty()) {
974      // Now we create an empty mirror change record for URL (and we record
975      // changes to both mirror and original records during sync), so that
976      // we can reset to the mirror when the sync succeeds.
977      backend->change_tracker()->CreateFreshMirrorForURL(url);
978    }
979
980    // 'Unlock' the file for snapshot sync.
981    // (But keep it in writing status so that no other sync starts on
982    // the same URL)
983    io_task_runner_->PostTask(
984        FROM_HERE,
985        base::Bind(&LocalFileSyncContext::ClearSyncFlagOnIOThread,
986                   this, url, true /* for_snapshot_sync */));
987  }
988
989  ui_task_runner_->PostTask(FROM_HERE,
990                            base::Bind(callback, status, sync_file_info,
991                                       base::Passed(&snapshot)));
992}
993
994void LocalFileSyncContext::ClearSyncFlagOnIOThread(
995    const FileSystemURL& url,
996    bool for_snapshot_sync) {
997  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
998  if (shutdown_on_io_)
999    return;
1000  sync_status()->EndSyncing(url);
1001
1002  if (for_snapshot_sync) {
1003    // The caller will hold shared lock on this one.
1004    sync_status()->StartWriting(url);
1005    return;
1006  }
1007
1008  // Since a sync has finished the number of changes must have been updated.
1009  UpdateChangesForOrigin(url.origin(), NoopClosure());
1010}
1011
1012void LocalFileSyncContext::FinalizeSnapshotSyncOnIOThread(
1013    const FileSystemURL& url) {
1014  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1015  if (shutdown_on_io_)
1016    return;
1017  sync_status()->EndWriting(url);
1018
1019  // Since a sync has finished the number of changes must have been updated.
1020  UpdateChangesForOrigin(url.origin(), NoopClosure());
1021}
1022
1023void LocalFileSyncContext::DidApplyRemoteChange(
1024    const FileSystemURL& url,
1025    const SyncStatusCallback& callback_on_ui,
1026    base::File::Error file_error) {
1027  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1028  root_delete_helper_.reset();
1029  ui_task_runner_->PostTask(
1030      FROM_HERE,
1031      base::Bind(callback_on_ui, FileErrorToSyncStatusCode(file_error)));
1032}
1033
1034void LocalFileSyncContext::DidGetFileMetadata(
1035    const SyncFileMetadataCallback& callback,
1036    base::File::Error file_error,
1037    const base::File::Info& file_info) {
1038  DCHECK(io_task_runner_->RunsTasksOnCurrentThread());
1039  SyncFileMetadata metadata;
1040  if (file_error == base::File::FILE_OK) {
1041    metadata.file_type = file_info.is_directory ?
1042        SYNC_FILE_TYPE_DIRECTORY : SYNC_FILE_TYPE_FILE;
1043    metadata.size = file_info.size;
1044    metadata.last_modified = file_info.last_modified;
1045  }
1046  ui_task_runner_->PostTask(
1047      FROM_HERE,
1048      base::Bind(callback, FileErrorToSyncStatusCode(file_error), metadata));
1049}
1050
1051base::TimeDelta LocalFileSyncContext::NotifyChangesDuration() {
1052  if (mock_notify_changes_duration_in_sec_ >= 0)
1053    return base::TimeDelta::FromSeconds(mock_notify_changes_duration_in_sec_);
1054  return base::TimeDelta::FromSeconds(kNotifyChangesDurationInSec);
1055}
1056
1057void LocalFileSyncContext::DidCreateDirectoryForCopyIn(
1058    FileSystemContext* file_system_context,
1059    const base::FilePath& local_path,
1060    const FileSystemURL& dest_url,
1061    const StatusCallback& callback,
1062    base::File::Error error) {
1063  if (error != base::File::FILE_OK) {
1064    callback.Run(error);
1065    return;
1066  }
1067
1068  FileSystemURL url_for_sync = CreateSyncableFileSystemURLForSync(
1069      file_system_context, dest_url);
1070  file_system_context->operation_runner()->CopyInForeignFile(
1071      local_path, url_for_sync, callback);
1072}
1073
1074}  // namespace sync_file_system
1075