sync_engine.cc revision 116680a4aac90f2aa7413d9095a592090648e557
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
6
7#include <vector>
8
9#include "base/bind.h"
10#include "base/metrics/histogram.h"
11#include "base/thread_task_runner_handle.h"
12#include "base/threading/sequenced_worker_pool.h"
13#include "base/time/time.h"
14#include "base/values.h"
15#include "chrome/browser/drive/drive_api_service.h"
16#include "chrome/browser/drive/drive_notification_manager.h"
17#include "chrome/browser/drive/drive_notification_manager_factory.h"
18#include "chrome/browser/drive/drive_service_interface.h"
19#include "chrome/browser/drive/drive_uploader.h"
20#include "chrome/browser/extensions/extension_service.h"
21#include "chrome/browser/profiles/profile.h"
22#include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
23#include "chrome/browser/signin/signin_manager_factory.h"
24#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h"
25#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
26#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
27#include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h"
28#include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h"
29#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h"
30#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h"
31#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
32#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
33#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
34#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
35#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h"
36#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h"
37#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
38#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
39#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
40#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
41#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
42#include "chrome/browser/sync_file_system/drive_backend/sync_worker_interface.h"
43#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
44#include "chrome/browser/sync_file_system/file_status_observer.h"
45#include "chrome/browser/sync_file_system/logger.h"
46#include "chrome/browser/sync_file_system/syncable_file_system_util.h"
47#include "components/signin/core/browser/profile_oauth2_token_service.h"
48#include "components/signin/core/browser/signin_manager.h"
49#include "content/public/browser/browser_thread.h"
50#include "extensions/browser/extension_system.h"
51#include "extensions/browser/extension_system_provider.h"
52#include "extensions/browser/extensions_browser_client.h"
53#include "extensions/common/extension.h"
54#include "google_apis/drive/drive_api_url_generator.h"
55#include "google_apis/drive/gdata_wapi_url_generator.h"
56#include "net/url_request/url_request_context_getter.h"
57#include "webkit/common/blob/scoped_file.h"
58#include "webkit/common/fileapi/file_system_util.h"
59
60namespace sync_file_system {
61
62class RemoteChangeProcessor;
63
64namespace drive_backend {
65
66class SyncEngine::WorkerObserver : public SyncWorkerInterface::Observer {
67 public:
68  WorkerObserver(base::SequencedTaskRunner* ui_task_runner,
69                 base::WeakPtr<SyncEngine> sync_engine)
70      : ui_task_runner_(ui_task_runner),
71        sync_engine_(sync_engine) {
72    sequence_checker_.DetachFromSequence();
73  }
74
75  virtual ~WorkerObserver() {
76    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
77  }
78
79  virtual void OnPendingFileListUpdated(int item_count) OVERRIDE {
80    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
81      if (sync_engine_)
82        sync_engine_->OnPendingFileListUpdated(item_count);
83      return;
84    }
85
86    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
87    ui_task_runner_->PostTask(
88        FROM_HERE,
89        base::Bind(&SyncEngine::OnPendingFileListUpdated,
90                   sync_engine_,
91                   item_count));
92  }
93
94  virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url,
95                                   SyncFileStatus file_status,
96                                   SyncAction sync_action,
97                                   SyncDirection direction) OVERRIDE {
98    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
99      if (sync_engine_)
100        sync_engine_->OnFileStatusChanged(
101            url, file_status, sync_action, direction);
102      return;
103    }
104
105    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
106    ui_task_runner_->PostTask(
107        FROM_HERE,
108        base::Bind(&SyncEngine::OnFileStatusChanged,
109                   sync_engine_,
110                   url, file_status, sync_action, direction));
111  }
112
113  virtual void UpdateServiceState(RemoteServiceState state,
114                                  const std::string& description) OVERRIDE {
115    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
116      if (sync_engine_)
117        sync_engine_->UpdateServiceState(state, description);
118      return;
119    }
120
121    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
122    ui_task_runner_->PostTask(
123        FROM_HERE,
124        base::Bind(&SyncEngine::UpdateServiceState,
125                   sync_engine_, state, description));
126  }
127
128  void DetachFromSequence() {
129    sequence_checker_.DetachFromSequence();
130  }
131
132 private:
133  scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
134  base::WeakPtr<SyncEngine> sync_engine_;
135
136  base::SequenceChecker sequence_checker_;
137
138  DISALLOW_COPY_AND_ASSIGN(WorkerObserver);
139};
140
141namespace {
142
143void DidRegisterOrigin(const base::TimeTicks& start_time,
144                       const SyncStatusCallback& callback,
145                       SyncStatusCode status) {
146  base::TimeDelta delta(base::TimeTicks::Now() - start_time);
147  HISTOGRAM_TIMES("SyncFileSystem.RegisterOriginTime", delta);
148  callback.Run(status);
149}
150
151template <typename T>
152void DeleteSoonHelper(scoped_ptr<T>) {}
153
154template <typename T>
155void DeleteSoon(const tracked_objects::Location& from_here,
156                base::TaskRunner* task_runner,
157                scoped_ptr<T> obj) {
158  if (!obj)
159    return;
160
161  T* obj_ptr = obj.get();
162  base::Closure deleter =
163      base::Bind(&DeleteSoonHelper<T>, base::Passed(&obj));
164  if (!task_runner->PostTask(from_here, deleter)) {
165    obj_ptr->DetachFromSequence();
166    deleter.Run();
167  }
168}
169
170}  // namespace
171
172scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
173    content::BrowserContext* context,
174    TaskLogger* task_logger) {
175  scoped_refptr<base::SequencedWorkerPool> worker_pool =
176      content::BrowserThread::GetBlockingPool();
177
178  scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner =
179      base::ThreadTaskRunnerHandle::Get();
180  scoped_refptr<base::SequencedTaskRunner> worker_task_runner =
181      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
182          worker_pool->GetSequenceToken(),
183          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
184  scoped_refptr<base::SequencedTaskRunner> file_task_runner =
185      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
186          worker_pool->GetSequenceToken(),
187          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
188  scoped_refptr<base::SequencedTaskRunner> drive_task_runner =
189      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
190          worker_pool->GetSequenceToken(),
191          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
192
193  Profile* profile = Profile::FromBrowserContext(context);
194  drive::DriveNotificationManager* notification_manager =
195      drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
196  ExtensionService* extension_service =
197      extensions::ExtensionSystem::Get(context)->extension_service();
198  SigninManagerBase* signin_manager =
199      SigninManagerFactory::GetForProfile(profile);
200  ProfileOAuth2TokenService* token_service =
201      ProfileOAuth2TokenServiceFactory::GetForProfile(profile);
202  scoped_refptr<net::URLRequestContextGetter> request_context =
203      context->GetRequestContext();
204
205  scoped_ptr<drive_backend::SyncEngine> sync_engine(
206      new SyncEngine(ui_task_runner,
207                     worker_task_runner,
208                     file_task_runner,
209                     drive_task_runner,
210                     GetSyncFileSystemDir(context->GetPath()),
211                     task_logger,
212                     notification_manager,
213                     extension_service,
214                     signin_manager,
215                     token_service,
216                     request_context,
217                     NULL  /* env_override */));
218
219  sync_engine->Initialize();
220  return sync_engine.Pass();
221}
222
223void SyncEngine::AppendDependsOnFactories(
224    std::set<BrowserContextKeyedServiceFactory*>* factories) {
225  DCHECK(factories);
226  factories->insert(drive::DriveNotificationManagerFactory::GetInstance());
227  factories->insert(SigninManagerFactory::GetInstance());
228  factories->insert(
229      extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory());
230  factories->insert(ProfileOAuth2TokenServiceFactory::GetInstance());
231}
232
233SyncEngine::~SyncEngine() {
234  Reset();
235
236  net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
237  if (signin_manager_)
238    signin_manager_->RemoveObserver(this);
239  if (notification_manager_)
240    notification_manager_->RemoveObserver(this);
241}
242
243void SyncEngine::Reset() {
244  if (drive_service_)
245    drive_service_->RemoveObserver(this);
246
247  DeleteSoon(FROM_HERE, worker_task_runner_, sync_worker_.Pass());
248  DeleteSoon(FROM_HERE, worker_task_runner_, worker_observer_.Pass());
249  DeleteSoon(FROM_HERE, worker_task_runner_,
250             remote_change_processor_on_worker_.Pass());
251
252  drive_service_wrapper_.reset();
253  drive_service_.reset();
254  drive_uploader_wrapper_.reset();
255  drive_uploader_.reset();
256  remote_change_processor_wrapper_.reset();
257  callback_tracker_.AbortAll();
258}
259
260void SyncEngine::Initialize() {
261  Reset();
262
263  if (!signin_manager_ ||
264      signin_manager_->GetAuthenticatedAccountId().empty())
265    return;
266
267  scoped_ptr<drive::DriveServiceInterface> drive_service(
268      new drive::DriveAPIService(
269          token_service_,
270          request_context_,
271          drive_task_runner_,
272          GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction),
273          GURL(google_apis::DriveApiUrlGenerator::
274               kBaseDownloadUrlForProduction),
275          GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction),
276          std::string() /* custom_user_agent */));
277  scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
278      new drive::DriveUploader(drive_service.get(), drive_task_runner_));
279
280  InitializeInternal(drive_service.Pass(), drive_uploader.Pass(),
281                     scoped_ptr<SyncWorkerInterface>());
282}
283
284void SyncEngine::InitializeForTesting(
285    scoped_ptr<drive::DriveServiceInterface> drive_service,
286    scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
287    scoped_ptr<SyncWorkerInterface> sync_worker) {
288  Reset();
289  InitializeInternal(drive_service.Pass(), drive_uploader.Pass(),
290                     sync_worker.Pass());
291}
292
293void SyncEngine::InitializeInternal(
294    scoped_ptr<drive::DriveServiceInterface> drive_service,
295    scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
296    scoped_ptr<SyncWorkerInterface> sync_worker) {
297  drive_service_ = drive_service.Pass();
298  drive_service_wrapper_.reset(new DriveServiceWrapper(drive_service_.get()));
299
300  std::string account_id;
301  if (signin_manager_)
302    account_id = signin_manager_->GetAuthenticatedAccountId();
303  drive_service_->Initialize(account_id);
304
305  drive_uploader_ = drive_uploader.Pass();
306  drive_uploader_wrapper_.reset(
307      new DriveUploaderWrapper(drive_uploader_.get()));
308
309  // DriveServiceWrapper and DriveServiceOnWorker relay communications
310  // between DriveService and syncers in SyncWorker.
311  scoped_ptr<drive::DriveServiceInterface> drive_service_on_worker(
312      new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(),
313                               ui_task_runner_,
314                               worker_task_runner_));
315  scoped_ptr<drive::DriveUploaderInterface> drive_uploader_on_worker(
316      new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(),
317                                ui_task_runner_,
318                                worker_task_runner_));
319  scoped_ptr<SyncEngineContext> sync_engine_context(
320      new SyncEngineContext(drive_service_on_worker.Pass(),
321                            drive_uploader_on_worker.Pass(),
322                            task_logger_,
323                            ui_task_runner_,
324                            worker_task_runner_,
325                            file_task_runner_));
326
327  worker_observer_.reset(
328      new WorkerObserver(ui_task_runner_, weak_ptr_factory_.GetWeakPtr()));
329
330  base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr;
331  if (extension_service_)
332    extension_service_weak_ptr = extension_service_->AsWeakPtr();
333
334  if (!sync_worker) {
335    sync_worker.reset(new SyncWorker(
336        sync_file_system_dir_,
337        extension_service_weak_ptr,
338        env_override_));
339  }
340
341  sync_worker_ = sync_worker.Pass();
342  sync_worker_->AddObserver(worker_observer_.get());
343
344  worker_task_runner_->PostTask(
345      FROM_HERE,
346      base::Bind(&SyncWorkerInterface::Initialize,
347                 base::Unretained(sync_worker_.get()),
348                 base::Passed(&sync_engine_context)));
349  if (remote_change_processor_)
350    SetRemoteChangeProcessor(remote_change_processor_);
351
352  drive_service_->AddObserver(this);
353
354  service_state_ = REMOTE_SERVICE_TEMPORARY_UNAVAILABLE;
355  SetSyncEnabled(sync_enabled_);
356  OnNetworkChanged(net::NetworkChangeNotifier::GetConnectionType());
357}
358
359void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
360  service_observers_.AddObserver(observer);
361}
362
363void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
364  file_status_observers_.AddObserver(observer);
365}
366
367void SyncEngine::RegisterOrigin(const GURL& origin,
368                                const SyncStatusCallback& callback) {
369  if (!sync_worker_) {
370    // TODO(tzik): Record |origin| and retry the registration after late
371    // sign-in.  Then, return SYNC_STATUS_OK.
372    if (!signin_manager_ ||
373        signin_manager_->GetAuthenticatedAccountId().empty())
374      callback.Run(SYNC_STATUS_AUTHENTICATION_FAILED);
375    else
376      callback.Run(SYNC_STATUS_ABORT);
377    return;
378  }
379
380  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
381      FROM_HERE, base::Bind(&DidRegisterOrigin, base::TimeTicks::Now(),
382                            TrackCallback(callback)));
383
384  worker_task_runner_->PostTask(
385      FROM_HERE,
386      base::Bind(&SyncWorkerInterface::RegisterOrigin,
387                 base::Unretained(sync_worker_.get()),
388                 origin, relayed_callback));
389}
390
391void SyncEngine::EnableOrigin(
392    const GURL& origin, const SyncStatusCallback& callback) {
393  if (!sync_worker_) {
394    // It's safe to return OK immediately since this is also checked in
395    // SyncWorker initialization.
396    callback.Run(SYNC_STATUS_OK);
397    return;
398  }
399
400  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
401      FROM_HERE, TrackCallback(callback));
402
403  worker_task_runner_->PostTask(
404      FROM_HERE,
405      base::Bind(&SyncWorkerInterface::EnableOrigin,
406                 base::Unretained(sync_worker_.get()),
407                 origin, relayed_callback));
408}
409
410void SyncEngine::DisableOrigin(
411    const GURL& origin, const SyncStatusCallback& callback) {
412  if (!sync_worker_) {
413    // It's safe to return OK immediately since this is also checked in
414    // SyncWorker initialization.
415    callback.Run(SYNC_STATUS_OK);
416    return;
417  }
418
419  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
420      FROM_HERE, TrackCallback(callback));
421
422  worker_task_runner_->PostTask(
423      FROM_HERE,
424      base::Bind(&SyncWorkerInterface::DisableOrigin,
425                 base::Unretained(sync_worker_.get()),
426                 origin,
427                 relayed_callback));
428}
429
430void SyncEngine::UninstallOrigin(
431    const GURL& origin,
432    UninstallFlag flag,
433    const SyncStatusCallback& callback) {
434  if (!sync_worker_) {
435    // It's safe to return OK immediately since this is also checked in
436    // SyncWorker initialization.
437    callback.Run(SYNC_STATUS_OK);
438    return;
439  }
440
441  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
442      FROM_HERE, TrackCallback(callback));
443  worker_task_runner_->PostTask(
444      FROM_HERE,
445      base::Bind(&SyncWorkerInterface::UninstallOrigin,
446                 base::Unretained(sync_worker_.get()),
447                 origin, flag, relayed_callback));
448}
449
450void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
451  if (GetCurrentState() == REMOTE_SERVICE_DISABLED) {
452    callback.Run(SYNC_STATUS_SYNC_DISABLED, fileapi::FileSystemURL());
453    return;
454  }
455
456  base::Closure abort_closure =
457      base::Bind(callback, SYNC_STATUS_ABORT, fileapi::FileSystemURL());
458
459  if (!sync_worker_) {
460    abort_closure.Run();
461    return;
462  }
463
464  SyncFileCallback tracked_callback = callback_tracker_.Register(
465      abort_closure, callback);
466  SyncFileCallback relayed_callback = RelayCallbackToCurrentThread(
467      FROM_HERE, tracked_callback);
468  worker_task_runner_->PostTask(
469      FROM_HERE,
470      base::Bind(&SyncWorkerInterface::ProcessRemoteChange,
471                 base::Unretained(sync_worker_.get()),
472                 relayed_callback));
473}
474
475void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
476  remote_change_processor_ = processor;
477
478  if (!sync_worker_)
479    return;
480
481  remote_change_processor_wrapper_.reset(
482      new RemoteChangeProcessorWrapper(processor));
483
484  remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker(
485      remote_change_processor_wrapper_->AsWeakPtr(),
486      ui_task_runner_, worker_task_runner_));
487
488  worker_task_runner_->PostTask(
489      FROM_HERE,
490      base::Bind(&SyncWorkerInterface::SetRemoteChangeProcessor,
491                 base::Unretained(sync_worker_.get()),
492                 remote_change_processor_on_worker_.get()));
493}
494
495LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
496  return this;
497}
498
499RemoteServiceState SyncEngine::GetCurrentState() const {
500  if (!sync_enabled_)
501    return REMOTE_SERVICE_DISABLED;
502  return service_state_;
503}
504
505void SyncEngine::GetOriginStatusMap(const StatusMapCallback& callback) {
506  base::Closure abort_closure =
507      base::Bind(callback, base::Passed(scoped_ptr<OriginStatusMap>()));
508
509  if (!sync_worker_) {
510    abort_closure.Run();
511    return;
512  }
513
514  StatusMapCallback tracked_callback =
515      callback_tracker_.Register(abort_closure, callback);
516  StatusMapCallback relayed_callback =
517      RelayCallbackToCurrentThread(FROM_HERE, tracked_callback);
518
519  worker_task_runner_->PostTask(
520      FROM_HERE,
521      base::Bind(&SyncWorkerInterface::GetOriginStatusMap,
522                 base::Unretained(sync_worker_.get()),
523                 relayed_callback));
524}
525
526void SyncEngine::DumpFiles(const GURL& origin,
527                           const ListCallback& callback) {
528  base::Closure abort_closure =
529      base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>()));
530
531  if (!sync_worker_) {
532    abort_closure.Run();
533    return;
534  }
535
536  ListCallback tracked_callback =
537      callback_tracker_.Register(abort_closure, callback);
538
539  PostTaskAndReplyWithResult(
540      worker_task_runner_,
541      FROM_HERE,
542      base::Bind(&SyncWorkerInterface::DumpFiles,
543                 base::Unretained(sync_worker_.get()),
544                 origin),
545      tracked_callback);
546}
547
548void SyncEngine::DumpDatabase(const ListCallback& callback) {
549  base::Closure abort_closure =
550      base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>()));
551
552  if (!sync_worker_) {
553    abort_closure.Run();
554    return;
555  }
556
557  ListCallback tracked_callback =
558      callback_tracker_.Register(abort_closure, callback);
559
560  PostTaskAndReplyWithResult(
561      worker_task_runner_,
562      FROM_HERE,
563      base::Bind(&SyncWorkerInterface::DumpDatabase,
564                 base::Unretained(sync_worker_.get())),
565      tracked_callback);
566}
567
568void SyncEngine::SetSyncEnabled(bool sync_enabled) {
569  sync_enabled_ = sync_enabled;
570
571  if (!sync_worker_)
572    return;
573
574  worker_task_runner_->PostTask(
575      FROM_HERE,
576      base::Bind(&SyncWorkerInterface::SetSyncEnabled,
577                 base::Unretained(sync_worker_.get()),
578                 sync_enabled));
579}
580
581void SyncEngine::PromoteDemotedChanges() {
582  if (!sync_worker_)
583    return;
584
585  worker_task_runner_->PostTask(
586      FROM_HERE,
587      base::Bind(&SyncWorkerInterface::PromoteDemotedChanges,
588                 base::Unretained(sync_worker_.get())));
589}
590
591void SyncEngine::ApplyLocalChange(
592    const FileChange& local_change,
593    const base::FilePath& local_path,
594    const SyncFileMetadata& local_metadata,
595    const fileapi::FileSystemURL& url,
596    const SyncStatusCallback& callback) {
597  if (GetCurrentState() == REMOTE_SERVICE_DISABLED) {
598    callback.Run(SYNC_STATUS_SYNC_DISABLED);
599    return;
600  }
601
602  if (!sync_worker_) {
603    callback.Run(SYNC_STATUS_ABORT);
604    return;
605  }
606
607  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
608      FROM_HERE, TrackCallback(callback));
609  worker_task_runner_->PostTask(
610      FROM_HERE,
611      base::Bind(&SyncWorkerInterface::ApplyLocalChange,
612                 base::Unretained(sync_worker_.get()),
613                 local_change,
614                 local_path,
615                 local_metadata,
616                 url,
617                 relayed_callback));
618}
619
620void SyncEngine::OnNotificationReceived() {
621  if (!sync_worker_)
622    return;
623
624  worker_task_runner_->PostTask(
625      FROM_HERE,
626      base::Bind(&SyncWorkerInterface::OnNotificationReceived,
627                 base::Unretained(sync_worker_.get())));
628}
629
630void SyncEngine::OnPushNotificationEnabled(bool) {}
631
632void SyncEngine::OnReadyToSendRequests() {
633  if (!sync_worker_)
634    return;
635
636  worker_task_runner_->PostTask(
637      FROM_HERE,
638      base::Bind(&SyncWorkerInterface::OnReadyToSendRequests,
639                 base::Unretained(sync_worker_.get())));
640}
641
642void SyncEngine::OnRefreshTokenInvalid() {
643  if (!sync_worker_)
644    return;
645
646  worker_task_runner_->PostTask(
647      FROM_HERE,
648      base::Bind(&SyncWorkerInterface::OnRefreshTokenInvalid,
649                 base::Unretained(sync_worker_.get())));
650}
651
652void SyncEngine::OnNetworkChanged(
653    net::NetworkChangeNotifier::ConnectionType type) {
654  if (!sync_worker_)
655    return;
656
657  worker_task_runner_->PostTask(
658      FROM_HERE,
659      base::Bind(&SyncWorkerInterface::OnNetworkChanged,
660                 base::Unretained(sync_worker_.get()),
661                 type));
662}
663
664void SyncEngine::GoogleSigninFailed(const GoogleServiceAuthError& error) {
665  Reset();
666  UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
667                     "Failed to sign in.");
668}
669
670void SyncEngine::GoogleSigninSucceeded(const std::string& username,
671                                       const std::string& password) {
672  Initialize();
673}
674
675void SyncEngine::GoogleSignedOut(const std::string& username) {
676  Reset();
677  UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
678                     "User signed out.");
679}
680
681SyncEngine::SyncEngine(
682    base::SingleThreadTaskRunner* ui_task_runner,
683    base::SequencedTaskRunner* worker_task_runner,
684    base::SequencedTaskRunner* file_task_runner,
685    base::SequencedTaskRunner* drive_task_runner,
686    const base::FilePath& sync_file_system_dir,
687    TaskLogger* task_logger,
688    drive::DriveNotificationManager* notification_manager,
689    ExtensionServiceInterface* extension_service,
690    SigninManagerBase* signin_manager,
691    ProfileOAuth2TokenService* token_service,
692    net::URLRequestContextGetter* request_context,
693    leveldb::Env* env_override)
694    : ui_task_runner_(ui_task_runner),
695      worker_task_runner_(worker_task_runner),
696      file_task_runner_(file_task_runner),
697      drive_task_runner_(drive_task_runner),
698      sync_file_system_dir_(sync_file_system_dir),
699      task_logger_(task_logger),
700      notification_manager_(notification_manager),
701      extension_service_(extension_service),
702      signin_manager_(signin_manager),
703      token_service_(token_service),
704      request_context_(request_context),
705      remote_change_processor_(NULL),
706      service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE),
707      sync_enabled_(false),
708      env_override_(env_override),
709      weak_ptr_factory_(this) {
710  DCHECK(sync_file_system_dir_.IsAbsolute());
711  if (notification_manager_)
712    notification_manager_->AddObserver(this);
713  if (signin_manager_)
714    signin_manager_->AddObserver(this);
715  net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
716}
717
718void SyncEngine::OnPendingFileListUpdated(int item_count) {
719  FOR_EACH_OBSERVER(
720      SyncServiceObserver,
721      service_observers_,
722      OnRemoteChangeQueueUpdated(item_count));
723}
724
725void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url,
726                                     SyncFileStatus file_status,
727                                     SyncAction sync_action,
728                                     SyncDirection direction) {
729  FOR_EACH_OBSERVER(FileStatusObserver,
730                    file_status_observers_,
731                    OnFileStatusChanged(
732                        url, file_status, sync_action, direction));
733}
734
735void SyncEngine::UpdateServiceState(RemoteServiceState state,
736                                    const std::string& description) {
737  service_state_ = state;
738
739  FOR_EACH_OBSERVER(
740      SyncServiceObserver, service_observers_,
741      OnRemoteServiceStateUpdated(state, description));
742}
743
744SyncStatusCallback SyncEngine::TrackCallback(
745    const SyncStatusCallback& callback) {
746  return callback_tracker_.Register(
747      base::Bind(callback, SYNC_STATUS_ABORT),
748      callback);
749}
750
751}  // namespace drive_backend
752}  // namespace sync_file_system
753