sync_engine.cc revision 1320f92c476a1ad9d19dba2a48c72b75566198e9
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
6
7#include <vector>
8
9#include "base/bind.h"
10#include "base/metrics/histogram.h"
11#include "base/thread_task_runner_handle.h"
12#include "base/threading/sequenced_worker_pool.h"
13#include "base/time/time.h"
14#include "base/values.h"
15#include "chrome/browser/drive/drive_api_service.h"
16#include "chrome/browser/drive/drive_notification_manager.h"
17#include "chrome/browser/drive/drive_notification_manager_factory.h"
18#include "chrome/browser/drive/drive_service_interface.h"
19#include "chrome/browser/drive/drive_uploader.h"
20#include "chrome/browser/extensions/extension_service.h"
21#include "chrome/browser/profiles/profile.h"
22#include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
23#include "chrome/browser/signin/signin_manager_factory.h"
24#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h"
25#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
26#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
27#include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h"
28#include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h"
29#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h"
30#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h"
31#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
32#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
33#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
34#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
35#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h"
36#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h"
37#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
38#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
39#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
40#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
41#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
42#include "chrome/browser/sync_file_system/drive_backend/sync_worker_interface.h"
43#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
44#include "chrome/browser/sync_file_system/file_status_observer.h"
45#include "chrome/browser/sync_file_system/logger.h"
46#include "chrome/browser/sync_file_system/syncable_file_system_util.h"
47#include "components/signin/core/browser/profile_oauth2_token_service.h"
48#include "components/signin/core/browser/signin_manager.h"
49#include "content/public/browser/browser_thread.h"
50#include "extensions/browser/extension_system.h"
51#include "extensions/browser/extension_system_provider.h"
52#include "extensions/browser/extensions_browser_client.h"
53#include "extensions/common/extension.h"
54#include "google_apis/drive/drive_api_url_generator.h"
55#include "google_apis/drive/gdata_wapi_url_generator.h"
56#include "net/url_request/url_request_context_getter.h"
57#include "storage/common/blob/scoped_file.h"
58#include "storage/common/fileapi/file_system_util.h"
59
60namespace sync_file_system {
61
62class RemoteChangeProcessor;
63
64namespace drive_backend {
65
66scoped_ptr<drive::DriveServiceInterface>
67SyncEngine::DriveServiceFactory::CreateDriveService(
68    OAuth2TokenService* oauth2_token_service,
69    net::URLRequestContextGetter* url_request_context_getter,
70    base::SequencedTaskRunner* blocking_task_runner) {
71  return scoped_ptr<drive::DriveServiceInterface>(
72      new drive::DriveAPIService(
73          oauth2_token_service,
74          url_request_context_getter,
75          blocking_task_runner,
76          GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction),
77          GURL(google_apis::DriveApiUrlGenerator::
78               kBaseDownloadUrlForProduction),
79          GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction),
80          std::string() /* custom_user_agent */));
81}
82
83class SyncEngine::WorkerObserver : public SyncWorkerInterface::Observer {
84 public:
85  WorkerObserver(base::SequencedTaskRunner* ui_task_runner,
86                 base::WeakPtr<SyncEngine> sync_engine)
87      : ui_task_runner_(ui_task_runner),
88        sync_engine_(sync_engine) {
89    sequence_checker_.DetachFromSequence();
90  }
91
92  virtual ~WorkerObserver() {
93    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
94  }
95
96  virtual void OnPendingFileListUpdated(int item_count) OVERRIDE {
97    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
98      if (sync_engine_)
99        sync_engine_->OnPendingFileListUpdated(item_count);
100      return;
101    }
102
103    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
104    ui_task_runner_->PostTask(
105        FROM_HERE,
106        base::Bind(&SyncEngine::OnPendingFileListUpdated,
107                   sync_engine_,
108                   item_count));
109  }
110
111  virtual void OnFileStatusChanged(const storage::FileSystemURL& url,
112                                   SyncFileStatus file_status,
113                                   SyncAction sync_action,
114                                   SyncDirection direction) OVERRIDE {
115    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
116      if (sync_engine_)
117        sync_engine_->OnFileStatusChanged(
118            url, file_status, sync_action, direction);
119      return;
120    }
121
122    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
123    ui_task_runner_->PostTask(
124        FROM_HERE,
125        base::Bind(&SyncEngine::OnFileStatusChanged,
126                   sync_engine_,
127                   url, file_status, sync_action, direction));
128  }
129
130  virtual void UpdateServiceState(RemoteServiceState state,
131                                  const std::string& description) OVERRIDE {
132    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
133      if (sync_engine_)
134        sync_engine_->UpdateServiceState(state, description);
135      return;
136    }
137
138    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
139    ui_task_runner_->PostTask(
140        FROM_HERE,
141        base::Bind(&SyncEngine::UpdateServiceState,
142                   sync_engine_, state, description));
143  }
144
145  void DetachFromSequence() {
146    sequence_checker_.DetachFromSequence();
147  }
148
149 private:
150  scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
151  base::WeakPtr<SyncEngine> sync_engine_;
152
153  base::SequenceChecker sequence_checker_;
154
155  DISALLOW_COPY_AND_ASSIGN(WorkerObserver);
156};
157
158namespace {
159
160void DidRegisterOrigin(const base::TimeTicks& start_time,
161                       const SyncStatusCallback& callback,
162                       SyncStatusCode status) {
163  base::TimeDelta delta(base::TimeTicks::Now() - start_time);
164  LOCAL_HISTOGRAM_TIMES("SyncFileSystem.RegisterOriginTime", delta);
165  callback.Run(status);
166}
167
168template <typename T>
169void DeleteSoonHelper(scoped_ptr<T>) {}
170
171template <typename T>
172void DeleteSoon(const tracked_objects::Location& from_here,
173                base::TaskRunner* task_runner,
174                scoped_ptr<T> obj) {
175  if (!obj)
176    return;
177
178  T* obj_ptr = obj.get();
179  base::Closure deleter =
180      base::Bind(&DeleteSoonHelper<T>, base::Passed(&obj));
181  if (!task_runner->PostTask(from_here, deleter)) {
182    obj_ptr->DetachFromSequence();
183    deleter.Run();
184  }
185}
186
187}  // namespace
188
189scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
190    content::BrowserContext* context,
191    TaskLogger* task_logger) {
192  scoped_refptr<base::SequencedWorkerPool> worker_pool =
193      content::BrowserThread::GetBlockingPool();
194
195  scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner =
196      base::ThreadTaskRunnerHandle::Get();
197  scoped_refptr<base::SequencedTaskRunner> worker_task_runner =
198      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
199          worker_pool->GetSequenceToken(),
200          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
201  scoped_refptr<base::SequencedTaskRunner> drive_task_runner =
202      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
203          worker_pool->GetSequenceToken(),
204          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
205
206  Profile* profile = Profile::FromBrowserContext(context);
207  drive::DriveNotificationManager* notification_manager =
208      drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
209  ExtensionService* extension_service =
210      extensions::ExtensionSystem::Get(context)->extension_service();
211  SigninManagerBase* signin_manager =
212      SigninManagerFactory::GetForProfile(profile);
213  OAuth2TokenService* token_service =
214      ProfileOAuth2TokenServiceFactory::GetForProfile(profile);
215  scoped_refptr<net::URLRequestContextGetter> request_context =
216      context->GetRequestContext();
217
218  scoped_ptr<drive_backend::SyncEngine> sync_engine(
219      new SyncEngine(ui_task_runner.get(),
220                     worker_task_runner.get(),
221                     drive_task_runner.get(),
222                     GetSyncFileSystemDir(context->GetPath()),
223                     task_logger,
224                     notification_manager,
225                     extension_service,
226                     signin_manager,
227                     token_service,
228                     request_context.get(),
229                     make_scoped_ptr(new DriveServiceFactory()),
230                     NULL /* env_override */));
231
232  sync_engine->Initialize();
233  return sync_engine.Pass();
234}
235
236void SyncEngine::AppendDependsOnFactories(
237    std::set<BrowserContextKeyedServiceFactory*>* factories) {
238  DCHECK(factories);
239  factories->insert(drive::DriveNotificationManagerFactory::GetInstance());
240  factories->insert(SigninManagerFactory::GetInstance());
241  factories->insert(
242      extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory());
243  factories->insert(ProfileOAuth2TokenServiceFactory::GetInstance());
244}
245
246SyncEngine::~SyncEngine() {
247  Reset();
248
249  net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
250  if (signin_manager_)
251    signin_manager_->RemoveObserver(this);
252  if (notification_manager_)
253    notification_manager_->RemoveObserver(this);
254}
255
256void SyncEngine::Reset() {
257  if (drive_service_)
258    drive_service_->RemoveObserver(this);
259
260  DeleteSoon(FROM_HERE, worker_task_runner_.get(), sync_worker_.Pass());
261  DeleteSoon(FROM_HERE, worker_task_runner_.get(), worker_observer_.Pass());
262  DeleteSoon(FROM_HERE,
263             worker_task_runner_.get(),
264             remote_change_processor_on_worker_.Pass());
265
266  drive_service_wrapper_.reset();
267  drive_service_.reset();
268  drive_uploader_wrapper_.reset();
269  drive_uploader_.reset();
270  remote_change_processor_wrapper_.reset();
271  callback_tracker_.AbortAll();
272}
273
274void SyncEngine::Initialize() {
275  Reset();
276
277  if (!signin_manager_ || !signin_manager_->IsAuthenticated())
278    return;
279
280  DCHECK(drive_service_factory_);
281  scoped_ptr<drive::DriveServiceInterface> drive_service =
282      drive_service_factory_->CreateDriveService(
283          token_service_, request_context_.get(), drive_task_runner_.get());
284  scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
285      new drive::DriveUploader(drive_service.get(), drive_task_runner_.get()));
286
287  InitializeInternal(drive_service.Pass(), drive_uploader.Pass(),
288                     scoped_ptr<SyncWorkerInterface>());
289}
290
291void SyncEngine::InitializeForTesting(
292    scoped_ptr<drive::DriveServiceInterface> drive_service,
293    scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
294    scoped_ptr<SyncWorkerInterface> sync_worker) {
295  Reset();
296  InitializeInternal(drive_service.Pass(), drive_uploader.Pass(),
297                     sync_worker.Pass());
298}
299
300void SyncEngine::InitializeInternal(
301    scoped_ptr<drive::DriveServiceInterface> drive_service,
302    scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
303    scoped_ptr<SyncWorkerInterface> sync_worker) {
304  drive_service_ = drive_service.Pass();
305  drive_service_wrapper_.reset(new DriveServiceWrapper(drive_service_.get()));
306
307  std::string account_id;
308  if (signin_manager_)
309    account_id = signin_manager_->GetAuthenticatedAccountId();
310  drive_service_->Initialize(account_id);
311
312  drive_uploader_ = drive_uploader.Pass();
313  drive_uploader_wrapper_.reset(
314      new DriveUploaderWrapper(drive_uploader_.get()));
315
316  // DriveServiceWrapper and DriveServiceOnWorker relay communications
317  // between DriveService and syncers in SyncWorker.
318  scoped_ptr<drive::DriveServiceInterface> drive_service_on_worker(
319      new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(),
320                               ui_task_runner_.get(),
321                               worker_task_runner_.get()));
322  scoped_ptr<drive::DriveUploaderInterface> drive_uploader_on_worker(
323      new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(),
324                                ui_task_runner_.get(),
325                                worker_task_runner_.get()));
326  scoped_ptr<SyncEngineContext> sync_engine_context(
327      new SyncEngineContext(drive_service_on_worker.Pass(),
328                            drive_uploader_on_worker.Pass(),
329                            task_logger_,
330                            ui_task_runner_.get(),
331                            worker_task_runner_.get()));
332
333  worker_observer_.reset(new WorkerObserver(ui_task_runner_.get(),
334                                            weak_ptr_factory_.GetWeakPtr()));
335
336  base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr;
337  if (extension_service_)
338    extension_service_weak_ptr = extension_service_->AsWeakPtr();
339
340  if (!sync_worker) {
341    sync_worker.reset(new SyncWorker(
342        sync_file_system_dir_,
343        extension_service_weak_ptr,
344        env_override_));
345  }
346
347  sync_worker_ = sync_worker.Pass();
348  sync_worker_->AddObserver(worker_observer_.get());
349
350  worker_task_runner_->PostTask(
351      FROM_HERE,
352      base::Bind(&SyncWorkerInterface::Initialize,
353                 base::Unretained(sync_worker_.get()),
354                 base::Passed(&sync_engine_context)));
355  if (remote_change_processor_)
356    SetRemoteChangeProcessor(remote_change_processor_);
357
358  drive_service_->AddObserver(this);
359
360  service_state_ = REMOTE_SERVICE_TEMPORARY_UNAVAILABLE;
361  OnNetworkChanged(net::NetworkChangeNotifier::GetConnectionType());
362  if (drive_service_->HasRefreshToken())
363    OnReadyToSendRequests();
364  else
365    OnRefreshTokenInvalid();
366}
367
368void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
369  service_observers_.AddObserver(observer);
370}
371
372void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
373  file_status_observers_.AddObserver(observer);
374}
375
376void SyncEngine::RegisterOrigin(const GURL& origin,
377                                const SyncStatusCallback& callback) {
378  if (!sync_worker_) {
379    // TODO(tzik): Record |origin| and retry the registration after late
380    // sign-in.  Then, return SYNC_STATUS_OK.
381    if (!signin_manager_ || !signin_manager_->IsAuthenticated())
382      callback.Run(SYNC_STATUS_AUTHENTICATION_FAILED);
383    else
384      callback.Run(SYNC_STATUS_ABORT);
385    return;
386  }
387
388  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
389      FROM_HERE, base::Bind(&DidRegisterOrigin, base::TimeTicks::Now(),
390                            TrackCallback(callback)));
391
392  worker_task_runner_->PostTask(
393      FROM_HERE,
394      base::Bind(&SyncWorkerInterface::RegisterOrigin,
395                 base::Unretained(sync_worker_.get()),
396                 origin, relayed_callback));
397}
398
399void SyncEngine::EnableOrigin(
400    const GURL& origin, const SyncStatusCallback& callback) {
401  if (!sync_worker_) {
402    // It's safe to return OK immediately since this is also checked in
403    // SyncWorker initialization.
404    callback.Run(SYNC_STATUS_OK);
405    return;
406  }
407
408  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
409      FROM_HERE, TrackCallback(callback));
410
411  worker_task_runner_->PostTask(
412      FROM_HERE,
413      base::Bind(&SyncWorkerInterface::EnableOrigin,
414                 base::Unretained(sync_worker_.get()),
415                 origin, relayed_callback));
416}
417
418void SyncEngine::DisableOrigin(
419    const GURL& origin, const SyncStatusCallback& callback) {
420  if (!sync_worker_) {
421    // It's safe to return OK immediately since this is also checked in
422    // SyncWorker initialization.
423    callback.Run(SYNC_STATUS_OK);
424    return;
425  }
426
427  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
428      FROM_HERE, TrackCallback(callback));
429
430  worker_task_runner_->PostTask(
431      FROM_HERE,
432      base::Bind(&SyncWorkerInterface::DisableOrigin,
433                 base::Unretained(sync_worker_.get()),
434                 origin,
435                 relayed_callback));
436}
437
438void SyncEngine::UninstallOrigin(
439    const GURL& origin,
440    UninstallFlag flag,
441    const SyncStatusCallback& callback) {
442  if (!sync_worker_) {
443    // It's safe to return OK immediately since this is also checked in
444    // SyncWorker initialization.
445    callback.Run(SYNC_STATUS_OK);
446    return;
447  }
448
449  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
450      FROM_HERE, TrackCallback(callback));
451  worker_task_runner_->PostTask(
452      FROM_HERE,
453      base::Bind(&SyncWorkerInterface::UninstallOrigin,
454                 base::Unretained(sync_worker_.get()),
455                 origin, flag, relayed_callback));
456}
457
458void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
459  if (GetCurrentState() == REMOTE_SERVICE_DISABLED) {
460    callback.Run(SYNC_STATUS_SYNC_DISABLED, storage::FileSystemURL());
461    return;
462  }
463
464  base::Closure abort_closure =
465      base::Bind(callback, SYNC_STATUS_ABORT, storage::FileSystemURL());
466
467  if (!sync_worker_) {
468    abort_closure.Run();
469    return;
470  }
471
472  SyncFileCallback tracked_callback = callback_tracker_.Register(
473      abort_closure, callback);
474  SyncFileCallback relayed_callback = RelayCallbackToCurrentThread(
475      FROM_HERE, tracked_callback);
476  worker_task_runner_->PostTask(
477      FROM_HERE,
478      base::Bind(&SyncWorkerInterface::ProcessRemoteChange,
479                 base::Unretained(sync_worker_.get()),
480                 relayed_callback));
481}
482
483void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
484  remote_change_processor_ = processor;
485
486  if (!sync_worker_)
487    return;
488
489  remote_change_processor_wrapper_.reset(
490      new RemoteChangeProcessorWrapper(processor));
491
492  remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker(
493      remote_change_processor_wrapper_->AsWeakPtr(),
494      ui_task_runner_.get(),
495      worker_task_runner_.get()));
496
497  worker_task_runner_->PostTask(
498      FROM_HERE,
499      base::Bind(&SyncWorkerInterface::SetRemoteChangeProcessor,
500                 base::Unretained(sync_worker_.get()),
501                 remote_change_processor_on_worker_.get()));
502}
503
504LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
505  return this;
506}
507
508RemoteServiceState SyncEngine::GetCurrentState() const {
509  if (!sync_enabled_)
510    return REMOTE_SERVICE_DISABLED;
511  if (!has_refresh_token_)
512    return REMOTE_SERVICE_AUTHENTICATION_REQUIRED;
513  return service_state_;
514}
515
516void SyncEngine::GetOriginStatusMap(const StatusMapCallback& callback) {
517  base::Closure abort_closure =
518      base::Bind(callback, base::Passed(scoped_ptr<OriginStatusMap>()));
519
520  if (!sync_worker_) {
521    abort_closure.Run();
522    return;
523  }
524
525  StatusMapCallback tracked_callback =
526      callback_tracker_.Register(abort_closure, callback);
527  StatusMapCallback relayed_callback =
528      RelayCallbackToCurrentThread(FROM_HERE, tracked_callback);
529
530  worker_task_runner_->PostTask(
531      FROM_HERE,
532      base::Bind(&SyncWorkerInterface::GetOriginStatusMap,
533                 base::Unretained(sync_worker_.get()),
534                 relayed_callback));
535}
536
537void SyncEngine::DumpFiles(const GURL& origin,
538                           const ListCallback& callback) {
539  base::Closure abort_closure =
540      base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>()));
541
542  if (!sync_worker_) {
543    abort_closure.Run();
544    return;
545  }
546
547  ListCallback tracked_callback =
548      callback_tracker_.Register(abort_closure, callback);
549
550  PostTaskAndReplyWithResult(worker_task_runner_.get(),
551                             FROM_HERE,
552                             base::Bind(&SyncWorkerInterface::DumpFiles,
553                                        base::Unretained(sync_worker_.get()),
554                                        origin),
555                             tracked_callback);
556}
557
558void SyncEngine::DumpDatabase(const ListCallback& callback) {
559  base::Closure abort_closure =
560      base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>()));
561
562  if (!sync_worker_) {
563    abort_closure.Run();
564    return;
565  }
566
567  ListCallback tracked_callback =
568      callback_tracker_.Register(abort_closure, callback);
569
570  PostTaskAndReplyWithResult(worker_task_runner_.get(),
571                             FROM_HERE,
572                             base::Bind(&SyncWorkerInterface::DumpDatabase,
573                                        base::Unretained(sync_worker_.get())),
574                             tracked_callback);
575}
576
577void SyncEngine::SetSyncEnabled(bool sync_enabled) {
578  if (sync_enabled_ == sync_enabled)
579    return;
580  sync_enabled_ = sync_enabled;
581
582  if (sync_enabled_) {
583    if (!sync_worker_)
584      Initialize();
585
586    // Have no login credential.
587    if (!sync_worker_)
588      return;
589
590    worker_task_runner_->PostTask(
591        FROM_HERE,
592        base::Bind(&SyncWorkerInterface::SetSyncEnabled,
593                   base::Unretained(sync_worker_.get()),
594                   sync_enabled_));
595    return;
596  }
597
598  if (!sync_worker_)
599    return;
600
601  // TODO(tzik): Consider removing SyncWorkerInterface::SetSyncEnabled and
602  // let SyncEngine handle the flag.
603  worker_task_runner_->PostTask(
604      FROM_HERE,
605      base::Bind(&SyncWorkerInterface::SetSyncEnabled,
606                 base::Unretained(sync_worker_.get()),
607                 sync_enabled_));
608  Reset();
609}
610
611void SyncEngine::PromoteDemotedChanges(const base::Closure& callback) {
612  if (!sync_worker_) {
613    callback.Run();
614    return;
615  }
616
617  base::Closure relayed_callback = RelayCallbackToCurrentThread(
618      FROM_HERE, callback_tracker_.Register(callback, callback));
619
620  worker_task_runner_->PostTask(
621      FROM_HERE,
622      base::Bind(&SyncWorkerInterface::PromoteDemotedChanges,
623                 base::Unretained(sync_worker_.get()),
624                 relayed_callback));
625}
626
627void SyncEngine::ApplyLocalChange(const FileChange& local_change,
628                                  const base::FilePath& local_path,
629                                  const SyncFileMetadata& local_metadata,
630                                  const storage::FileSystemURL& url,
631                                  const SyncStatusCallback& callback) {
632  if (GetCurrentState() == REMOTE_SERVICE_DISABLED) {
633    callback.Run(SYNC_STATUS_SYNC_DISABLED);
634    return;
635  }
636
637  if (!sync_worker_) {
638    callback.Run(SYNC_STATUS_ABORT);
639    return;
640  }
641
642  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
643      FROM_HERE, TrackCallback(callback));
644  worker_task_runner_->PostTask(
645      FROM_HERE,
646      base::Bind(&SyncWorkerInterface::ApplyLocalChange,
647                 base::Unretained(sync_worker_.get()),
648                 local_change,
649                 local_path,
650                 local_metadata,
651                 url,
652                 relayed_callback));
653}
654
655void SyncEngine::OnNotificationReceived() {
656  if (!sync_worker_)
657    return;
658
659  worker_task_runner_->PostTask(
660      FROM_HERE,
661      base::Bind(&SyncWorkerInterface::ActivateService,
662                 base::Unretained(sync_worker_.get()),
663                 REMOTE_SERVICE_OK,
664                 "Got push notification for Drive"));
665}
666
667void SyncEngine::OnPushNotificationEnabled(bool /* enabled */) {}
668
669void SyncEngine::OnReadyToSendRequests() {
670  has_refresh_token_ = true;
671  if (!sync_worker_)
672    return;
673
674  worker_task_runner_->PostTask(
675      FROM_HERE,
676      base::Bind(&SyncWorkerInterface::ActivateService,
677                 base::Unretained(sync_worker_.get()),
678                 REMOTE_SERVICE_OK,
679                 "Authenticated"));
680}
681
682void SyncEngine::OnRefreshTokenInvalid() {
683  has_refresh_token_ = false;
684  if (!sync_worker_)
685    return;
686
687  worker_task_runner_->PostTask(
688      FROM_HERE,
689      base::Bind(&SyncWorkerInterface::DeactivateService,
690                 base::Unretained(sync_worker_.get()),
691                 "Found invalid refresh token."));
692}
693
694void SyncEngine::OnNetworkChanged(
695    net::NetworkChangeNotifier::ConnectionType type) {
696  if (!sync_worker_)
697    return;
698
699  bool network_available_old = network_available_;
700  network_available_ = (type != net::NetworkChangeNotifier::CONNECTION_NONE);
701
702  if (!network_available_old && network_available_) {
703    worker_task_runner_->PostTask(
704        FROM_HERE,
705        base::Bind(&SyncWorkerInterface::ActivateService,
706                   base::Unretained(sync_worker_.get()),
707                   REMOTE_SERVICE_OK,
708                   "Connected"));
709  } else if (network_available_old && !network_available_) {
710    worker_task_runner_->PostTask(
711        FROM_HERE,
712        base::Bind(&SyncWorkerInterface::DeactivateService,
713                   base::Unretained(sync_worker_.get()),
714                   "Disconnected"));
715  }
716}
717
718void SyncEngine::GoogleSigninFailed(const GoogleServiceAuthError& error) {
719  Reset();
720  UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
721                     "Failed to sign in.");
722}
723
724void SyncEngine::GoogleSigninSucceeded(const std::string& account_id,
725                                       const std::string& username,
726                                       const std::string& password) {
727  Initialize();
728}
729
730void SyncEngine::GoogleSignedOut(const std::string& account_id,
731                                 const std::string& username) {
732  Reset();
733  UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
734                     "User signed out.");
735}
736
737SyncEngine::SyncEngine(
738    const scoped_refptr<base::SingleThreadTaskRunner>& ui_task_runner,
739    const scoped_refptr<base::SequencedTaskRunner>& worker_task_runner,
740    const scoped_refptr<base::SequencedTaskRunner>& drive_task_runner,
741    const base::FilePath& sync_file_system_dir,
742    TaskLogger* task_logger,
743    drive::DriveNotificationManager* notification_manager,
744    ExtensionServiceInterface* extension_service,
745    SigninManagerBase* signin_manager,
746    OAuth2TokenService* token_service,
747    net::URLRequestContextGetter* request_context,
748    scoped_ptr<DriveServiceFactory> drive_service_factory,
749    leveldb::Env* env_override)
750    : ui_task_runner_(ui_task_runner),
751      worker_task_runner_(worker_task_runner),
752      drive_task_runner_(drive_task_runner),
753      sync_file_system_dir_(sync_file_system_dir),
754      task_logger_(task_logger),
755      notification_manager_(notification_manager),
756      extension_service_(extension_service),
757      signin_manager_(signin_manager),
758      token_service_(token_service),
759      request_context_(request_context),
760      drive_service_factory_(drive_service_factory.Pass()),
761      remote_change_processor_(NULL),
762      service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE),
763      has_refresh_token_(false),
764      network_available_(false),
765      sync_enabled_(false),
766      env_override_(env_override),
767      weak_ptr_factory_(this) {
768  DCHECK(sync_file_system_dir_.IsAbsolute());
769  if (notification_manager_)
770    notification_manager_->AddObserver(this);
771  if (signin_manager_)
772    signin_manager_->AddObserver(this);
773  net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
774}
775
776void SyncEngine::OnPendingFileListUpdated(int item_count) {
777  FOR_EACH_OBSERVER(
778      SyncServiceObserver,
779      service_observers_,
780      OnRemoteChangeQueueUpdated(item_count));
781}
782
783void SyncEngine::OnFileStatusChanged(const storage::FileSystemURL& url,
784                                     SyncFileStatus file_status,
785                                     SyncAction sync_action,
786                                     SyncDirection direction) {
787  FOR_EACH_OBSERVER(FileStatusObserver,
788                    file_status_observers_,
789                    OnFileStatusChanged(
790                        url, file_status, sync_action, direction));
791}
792
793void SyncEngine::UpdateServiceState(RemoteServiceState state,
794                                    const std::string& description) {
795  service_state_ = state;
796
797  FOR_EACH_OBSERVER(
798      SyncServiceObserver, service_observers_,
799      OnRemoteServiceStateUpdated(GetCurrentState(), description));
800}
801
802SyncStatusCallback SyncEngine::TrackCallback(
803    const SyncStatusCallback& callback) {
804  return callback_tracker_.Register(
805      base::Bind(callback, SYNC_STATUS_ABORT),
806      callback);
807}
808
809}  // namespace drive_backend
810}  // namespace sync_file_system
811