sync_engine.cc revision 03b57e008b61dfcb1fbad3aea950ae0e001748b0
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
6
7#include <vector>
8
9#include "base/bind.h"
10#include "base/metrics/histogram.h"
11#include "base/thread_task_runner_handle.h"
12#include "base/threading/sequenced_worker_pool.h"
13#include "base/time/time.h"
14#include "base/values.h"
15#include "chrome/browser/drive/drive_api_service.h"
16#include "chrome/browser/drive/drive_notification_manager.h"
17#include "chrome/browser/drive/drive_notification_manager_factory.h"
18#include "chrome/browser/drive/drive_service_interface.h"
19#include "chrome/browser/drive/drive_uploader.h"
20#include "chrome/browser/extensions/extension_service.h"
21#include "chrome/browser/profiles/profile.h"
22#include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
23#include "chrome/browser/signin/signin_manager_factory.h"
24#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h"
25#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
26#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
27#include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h"
28#include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h"
29#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h"
30#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h"
31#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
32#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
33#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
34#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
35#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h"
36#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h"
37#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
38#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
39#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
40#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
41#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
42#include "chrome/browser/sync_file_system/drive_backend/sync_worker_interface.h"
43#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
44#include "chrome/browser/sync_file_system/file_status_observer.h"
45#include "chrome/browser/sync_file_system/logger.h"
46#include "chrome/browser/sync_file_system/syncable_file_system_util.h"
47#include "components/signin/core/browser/profile_oauth2_token_service.h"
48#include "components/signin/core/browser/signin_manager.h"
49#include "content/public/browser/browser_thread.h"
50#include "extensions/browser/extension_system.h"
51#include "extensions/browser/extension_system_provider.h"
52#include "extensions/browser/extensions_browser_client.h"
53#include "extensions/common/extension.h"
54#include "google_apis/drive/drive_api_url_generator.h"
55#include "google_apis/drive/gdata_wapi_url_generator.h"
56#include "net/url_request/url_request_context_getter.h"
57#include "webkit/common/blob/scoped_file.h"
58#include "webkit/common/fileapi/file_system_util.h"
59
60namespace sync_file_system {
61
62class RemoteChangeProcessor;
63
64namespace drive_backend {
65
66scoped_ptr<drive::DriveServiceInterface>
67SyncEngine::DriveServiceFactory::CreateDriveService(
68    OAuth2TokenService* oauth2_token_service,
69    net::URLRequestContextGetter* url_request_context_getter,
70    base::SequencedTaskRunner* blocking_task_runner) {
71  return scoped_ptr<drive::DriveServiceInterface>(
72      new drive::DriveAPIService(
73          oauth2_token_service,
74          url_request_context_getter,
75          blocking_task_runner,
76          GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction),
77          GURL(google_apis::DriveApiUrlGenerator::
78               kBaseDownloadUrlForProduction),
79          GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction),
80          std::string() /* custom_user_agent */));
81}
82
83class SyncEngine::WorkerObserver : public SyncWorkerInterface::Observer {
84 public:
85  WorkerObserver(base::SequencedTaskRunner* ui_task_runner,
86                 base::WeakPtr<SyncEngine> sync_engine)
87      : ui_task_runner_(ui_task_runner),
88        sync_engine_(sync_engine) {
89    sequence_checker_.DetachFromSequence();
90  }
91
92  virtual ~WorkerObserver() {
93    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
94  }
95
96  virtual void OnPendingFileListUpdated(int item_count) OVERRIDE {
97    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
98      if (sync_engine_)
99        sync_engine_->OnPendingFileListUpdated(item_count);
100      return;
101    }
102
103    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
104    ui_task_runner_->PostTask(
105        FROM_HERE,
106        base::Bind(&SyncEngine::OnPendingFileListUpdated,
107                   sync_engine_,
108                   item_count));
109  }
110
111  virtual void OnFileStatusChanged(const storage::FileSystemURL& url,
112                                   SyncFileStatus file_status,
113                                   SyncAction sync_action,
114                                   SyncDirection direction) OVERRIDE {
115    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
116      if (sync_engine_)
117        sync_engine_->OnFileStatusChanged(
118            url, file_status, sync_action, direction);
119      return;
120    }
121
122    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
123    ui_task_runner_->PostTask(
124        FROM_HERE,
125        base::Bind(&SyncEngine::OnFileStatusChanged,
126                   sync_engine_,
127                   url, file_status, sync_action, direction));
128  }
129
130  virtual void UpdateServiceState(RemoteServiceState state,
131                                  const std::string& description) OVERRIDE {
132    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
133      if (sync_engine_)
134        sync_engine_->UpdateServiceState(state, description);
135      return;
136    }
137
138    DCHECK(sequence_checker_.CalledOnValidSequencedThread());
139    ui_task_runner_->PostTask(
140        FROM_HERE,
141        base::Bind(&SyncEngine::UpdateServiceState,
142                   sync_engine_, state, description));
143  }
144
145  void DetachFromSequence() {
146    sequence_checker_.DetachFromSequence();
147  }
148
149 private:
150  scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
151  base::WeakPtr<SyncEngine> sync_engine_;
152
153  base::SequenceChecker sequence_checker_;
154
155  DISALLOW_COPY_AND_ASSIGN(WorkerObserver);
156};
157
158namespace {
159
160void DidRegisterOrigin(const base::TimeTicks& start_time,
161                       const SyncStatusCallback& callback,
162                       SyncStatusCode status) {
163  base::TimeDelta delta(base::TimeTicks::Now() - start_time);
164  HISTOGRAM_TIMES("SyncFileSystem.RegisterOriginTime", delta);
165  callback.Run(status);
166}
167
168template <typename T>
169void DeleteSoonHelper(scoped_ptr<T>) {}
170
171template <typename T>
172void DeleteSoon(const tracked_objects::Location& from_here,
173                base::TaskRunner* task_runner,
174                scoped_ptr<T> obj) {
175  if (!obj)
176    return;
177
178  T* obj_ptr = obj.get();
179  base::Closure deleter =
180      base::Bind(&DeleteSoonHelper<T>, base::Passed(&obj));
181  if (!task_runner->PostTask(from_here, deleter)) {
182    obj_ptr->DetachFromSequence();
183    deleter.Run();
184  }
185}
186
187}  // namespace
188
189scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
190    content::BrowserContext* context,
191    TaskLogger* task_logger) {
192  scoped_refptr<base::SequencedWorkerPool> worker_pool =
193      content::BrowserThread::GetBlockingPool();
194
195  scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner =
196      base::ThreadTaskRunnerHandle::Get();
197  scoped_refptr<base::SequencedTaskRunner> worker_task_runner =
198      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
199          worker_pool->GetSequenceToken(),
200          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
201  scoped_refptr<base::SequencedTaskRunner> drive_task_runner =
202      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
203          worker_pool->GetSequenceToken(),
204          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN);
205
206  Profile* profile = Profile::FromBrowserContext(context);
207  drive::DriveNotificationManager* notification_manager =
208      drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
209  ExtensionService* extension_service =
210      extensions::ExtensionSystem::Get(context)->extension_service();
211  SigninManagerBase* signin_manager =
212      SigninManagerFactory::GetForProfile(profile);
213  OAuth2TokenService* token_service =
214      ProfileOAuth2TokenServiceFactory::GetForProfile(profile);
215  scoped_refptr<net::URLRequestContextGetter> request_context =
216      context->GetRequestContext();
217
218  scoped_ptr<drive_backend::SyncEngine> sync_engine(
219      new SyncEngine(ui_task_runner,
220                     worker_task_runner,
221                     drive_task_runner,
222                     GetSyncFileSystemDir(context->GetPath()),
223                     task_logger,
224                     notification_manager,
225                     extension_service,
226                     signin_manager,
227                     token_service,
228                     request_context,
229                     make_scoped_ptr(new DriveServiceFactory()),
230                     NULL  /* env_override */));
231
232  sync_engine->Initialize();
233  return sync_engine.Pass();
234}
235
236void SyncEngine::AppendDependsOnFactories(
237    std::set<BrowserContextKeyedServiceFactory*>* factories) {
238  DCHECK(factories);
239  factories->insert(drive::DriveNotificationManagerFactory::GetInstance());
240  factories->insert(SigninManagerFactory::GetInstance());
241  factories->insert(
242      extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory());
243  factories->insert(ProfileOAuth2TokenServiceFactory::GetInstance());
244}
245
246SyncEngine::~SyncEngine() {
247  Reset();
248
249  net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
250  if (signin_manager_)
251    signin_manager_->RemoveObserver(this);
252  if (notification_manager_)
253    notification_manager_->RemoveObserver(this);
254}
255
256void SyncEngine::Reset() {
257  if (drive_service_)
258    drive_service_->RemoveObserver(this);
259
260  DeleteSoon(FROM_HERE, worker_task_runner_, sync_worker_.Pass());
261  DeleteSoon(FROM_HERE, worker_task_runner_, worker_observer_.Pass());
262  DeleteSoon(FROM_HERE, worker_task_runner_,
263             remote_change_processor_on_worker_.Pass());
264
265  drive_service_wrapper_.reset();
266  drive_service_.reset();
267  drive_uploader_wrapper_.reset();
268  drive_uploader_.reset();
269  remote_change_processor_wrapper_.reset();
270  callback_tracker_.AbortAll();
271}
272
273void SyncEngine::Initialize() {
274  Reset();
275
276  if (!signin_manager_ ||
277      signin_manager_->GetAuthenticatedAccountId().empty())
278    return;
279
280  DCHECK(drive_service_factory_);
281  scoped_ptr<drive::DriveServiceInterface> drive_service =
282      drive_service_factory_->CreateDriveService(
283          token_service_, request_context_, drive_task_runner_);
284  scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
285      new drive::DriveUploader(drive_service.get(), drive_task_runner_));
286
287  InitializeInternal(drive_service.Pass(), drive_uploader.Pass(),
288                     scoped_ptr<SyncWorkerInterface>());
289}
290
291void SyncEngine::InitializeForTesting(
292    scoped_ptr<drive::DriveServiceInterface> drive_service,
293    scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
294    scoped_ptr<SyncWorkerInterface> sync_worker) {
295  Reset();
296  InitializeInternal(drive_service.Pass(), drive_uploader.Pass(),
297                     sync_worker.Pass());
298}
299
300void SyncEngine::InitializeInternal(
301    scoped_ptr<drive::DriveServiceInterface> drive_service,
302    scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
303    scoped_ptr<SyncWorkerInterface> sync_worker) {
304  drive_service_ = drive_service.Pass();
305  drive_service_wrapper_.reset(new DriveServiceWrapper(drive_service_.get()));
306
307  std::string account_id;
308  if (signin_manager_)
309    account_id = signin_manager_->GetAuthenticatedAccountId();
310  drive_service_->Initialize(account_id);
311
312  drive_uploader_ = drive_uploader.Pass();
313  drive_uploader_wrapper_.reset(
314      new DriveUploaderWrapper(drive_uploader_.get()));
315
316  // DriveServiceWrapper and DriveServiceOnWorker relay communications
317  // between DriveService and syncers in SyncWorker.
318  scoped_ptr<drive::DriveServiceInterface> drive_service_on_worker(
319      new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(),
320                               ui_task_runner_,
321                               worker_task_runner_));
322  scoped_ptr<drive::DriveUploaderInterface> drive_uploader_on_worker(
323      new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(),
324                                ui_task_runner_,
325                                worker_task_runner_));
326  scoped_ptr<SyncEngineContext> sync_engine_context(
327      new SyncEngineContext(drive_service_on_worker.Pass(),
328                            drive_uploader_on_worker.Pass(),
329                            task_logger_,
330                            ui_task_runner_,
331                            worker_task_runner_));
332
333  worker_observer_.reset(
334      new WorkerObserver(ui_task_runner_, weak_ptr_factory_.GetWeakPtr()));
335
336  base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr;
337  if (extension_service_)
338    extension_service_weak_ptr = extension_service_->AsWeakPtr();
339
340  if (!sync_worker) {
341    sync_worker.reset(new SyncWorker(
342        sync_file_system_dir_,
343        extension_service_weak_ptr,
344        env_override_));
345  }
346
347  sync_worker_ = sync_worker.Pass();
348  sync_worker_->AddObserver(worker_observer_.get());
349
350  worker_task_runner_->PostTask(
351      FROM_HERE,
352      base::Bind(&SyncWorkerInterface::Initialize,
353                 base::Unretained(sync_worker_.get()),
354                 base::Passed(&sync_engine_context)));
355  if (remote_change_processor_)
356    SetRemoteChangeProcessor(remote_change_processor_);
357
358  drive_service_->AddObserver(this);
359
360  service_state_ = REMOTE_SERVICE_TEMPORARY_UNAVAILABLE;
361  SetSyncEnabled(sync_enabled_);
362  OnNetworkChanged(net::NetworkChangeNotifier::GetConnectionType());
363  if (drive_service_->HasRefreshToken())
364    OnReadyToSendRequests();
365  else
366    OnRefreshTokenInvalid();
367}
368
369void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
370  service_observers_.AddObserver(observer);
371}
372
373void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
374  file_status_observers_.AddObserver(observer);
375}
376
377void SyncEngine::RegisterOrigin(const GURL& origin,
378                                const SyncStatusCallback& callback) {
379  if (!sync_worker_) {
380    // TODO(tzik): Record |origin| and retry the registration after late
381    // sign-in.  Then, return SYNC_STATUS_OK.
382    if (!signin_manager_ ||
383        signin_manager_->GetAuthenticatedAccountId().empty())
384      callback.Run(SYNC_STATUS_AUTHENTICATION_FAILED);
385    else
386      callback.Run(SYNC_STATUS_ABORT);
387    return;
388  }
389
390  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
391      FROM_HERE, base::Bind(&DidRegisterOrigin, base::TimeTicks::Now(),
392                            TrackCallback(callback)));
393
394  worker_task_runner_->PostTask(
395      FROM_HERE,
396      base::Bind(&SyncWorkerInterface::RegisterOrigin,
397                 base::Unretained(sync_worker_.get()),
398                 origin, relayed_callback));
399}
400
401void SyncEngine::EnableOrigin(
402    const GURL& origin, const SyncStatusCallback& callback) {
403  if (!sync_worker_) {
404    // It's safe to return OK immediately since this is also checked in
405    // SyncWorker initialization.
406    callback.Run(SYNC_STATUS_OK);
407    return;
408  }
409
410  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
411      FROM_HERE, TrackCallback(callback));
412
413  worker_task_runner_->PostTask(
414      FROM_HERE,
415      base::Bind(&SyncWorkerInterface::EnableOrigin,
416                 base::Unretained(sync_worker_.get()),
417                 origin, relayed_callback));
418}
419
420void SyncEngine::DisableOrigin(
421    const GURL& origin, const SyncStatusCallback& callback) {
422  if (!sync_worker_) {
423    // It's safe to return OK immediately since this is also checked in
424    // SyncWorker initialization.
425    callback.Run(SYNC_STATUS_OK);
426    return;
427  }
428
429  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
430      FROM_HERE, TrackCallback(callback));
431
432  worker_task_runner_->PostTask(
433      FROM_HERE,
434      base::Bind(&SyncWorkerInterface::DisableOrigin,
435                 base::Unretained(sync_worker_.get()),
436                 origin,
437                 relayed_callback));
438}
439
440void SyncEngine::UninstallOrigin(
441    const GURL& origin,
442    UninstallFlag flag,
443    const SyncStatusCallback& callback) {
444  if (!sync_worker_) {
445    // It's safe to return OK immediately since this is also checked in
446    // SyncWorker initialization.
447    callback.Run(SYNC_STATUS_OK);
448    return;
449  }
450
451  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
452      FROM_HERE, TrackCallback(callback));
453  worker_task_runner_->PostTask(
454      FROM_HERE,
455      base::Bind(&SyncWorkerInterface::UninstallOrigin,
456                 base::Unretained(sync_worker_.get()),
457                 origin, flag, relayed_callback));
458}
459
460void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
461  if (GetCurrentState() == REMOTE_SERVICE_DISABLED) {
462    callback.Run(SYNC_STATUS_SYNC_DISABLED, storage::FileSystemURL());
463    return;
464  }
465
466  base::Closure abort_closure =
467      base::Bind(callback, SYNC_STATUS_ABORT, storage::FileSystemURL());
468
469  if (!sync_worker_) {
470    abort_closure.Run();
471    return;
472  }
473
474  SyncFileCallback tracked_callback = callback_tracker_.Register(
475      abort_closure, callback);
476  SyncFileCallback relayed_callback = RelayCallbackToCurrentThread(
477      FROM_HERE, tracked_callback);
478  worker_task_runner_->PostTask(
479      FROM_HERE,
480      base::Bind(&SyncWorkerInterface::ProcessRemoteChange,
481                 base::Unretained(sync_worker_.get()),
482                 relayed_callback));
483}
484
485void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
486  remote_change_processor_ = processor;
487
488  if (!sync_worker_)
489    return;
490
491  remote_change_processor_wrapper_.reset(
492      new RemoteChangeProcessorWrapper(processor));
493
494  remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker(
495      remote_change_processor_wrapper_->AsWeakPtr(),
496      ui_task_runner_, worker_task_runner_));
497
498  worker_task_runner_->PostTask(
499      FROM_HERE,
500      base::Bind(&SyncWorkerInterface::SetRemoteChangeProcessor,
501                 base::Unretained(sync_worker_.get()),
502                 remote_change_processor_on_worker_.get()));
503}
504
505LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
506  return this;
507}
508
509RemoteServiceState SyncEngine::GetCurrentState() const {
510  if (!sync_enabled_)
511    return REMOTE_SERVICE_DISABLED;
512  if (!has_refresh_token_)
513    return REMOTE_SERVICE_AUTHENTICATION_REQUIRED;
514  return service_state_;
515}
516
517void SyncEngine::GetOriginStatusMap(const StatusMapCallback& callback) {
518  base::Closure abort_closure =
519      base::Bind(callback, base::Passed(scoped_ptr<OriginStatusMap>()));
520
521  if (!sync_worker_) {
522    abort_closure.Run();
523    return;
524  }
525
526  StatusMapCallback tracked_callback =
527      callback_tracker_.Register(abort_closure, callback);
528  StatusMapCallback relayed_callback =
529      RelayCallbackToCurrentThread(FROM_HERE, tracked_callback);
530
531  worker_task_runner_->PostTask(
532      FROM_HERE,
533      base::Bind(&SyncWorkerInterface::GetOriginStatusMap,
534                 base::Unretained(sync_worker_.get()),
535                 relayed_callback));
536}
537
538void SyncEngine::DumpFiles(const GURL& origin,
539                           const ListCallback& callback) {
540  base::Closure abort_closure =
541      base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>()));
542
543  if (!sync_worker_) {
544    abort_closure.Run();
545    return;
546  }
547
548  ListCallback tracked_callback =
549      callback_tracker_.Register(abort_closure, callback);
550
551  PostTaskAndReplyWithResult(
552      worker_task_runner_,
553      FROM_HERE,
554      base::Bind(&SyncWorkerInterface::DumpFiles,
555                 base::Unretained(sync_worker_.get()),
556                 origin),
557      tracked_callback);
558}
559
560void SyncEngine::DumpDatabase(const ListCallback& callback) {
561  base::Closure abort_closure =
562      base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>()));
563
564  if (!sync_worker_) {
565    abort_closure.Run();
566    return;
567  }
568
569  ListCallback tracked_callback =
570      callback_tracker_.Register(abort_closure, callback);
571
572  PostTaskAndReplyWithResult(
573      worker_task_runner_,
574      FROM_HERE,
575      base::Bind(&SyncWorkerInterface::DumpDatabase,
576                 base::Unretained(sync_worker_.get())),
577      tracked_callback);
578}
579
580void SyncEngine::SetSyncEnabled(bool sync_enabled) {
581  sync_enabled_ = sync_enabled;
582
583  if (!sync_worker_)
584    return;
585
586  worker_task_runner_->PostTask(
587      FROM_HERE,
588      base::Bind(&SyncWorkerInterface::SetSyncEnabled,
589                 base::Unretained(sync_worker_.get()),
590                 sync_enabled));
591}
592
593void SyncEngine::PromoteDemotedChanges(const base::Closure& callback) {
594  if (!sync_worker_) {
595    callback.Run();
596    return;
597  }
598
599  base::Closure relayed_callback = RelayCallbackToCurrentThread(
600      FROM_HERE, callback_tracker_.Register(callback, callback));
601
602  worker_task_runner_->PostTask(
603      FROM_HERE,
604      base::Bind(&SyncWorkerInterface::PromoteDemotedChanges,
605                 base::Unretained(sync_worker_.get()),
606                 relayed_callback));
607}
608
609void SyncEngine::ApplyLocalChange(const FileChange& local_change,
610                                  const base::FilePath& local_path,
611                                  const SyncFileMetadata& local_metadata,
612                                  const storage::FileSystemURL& url,
613                                  const SyncStatusCallback& callback) {
614  if (GetCurrentState() == REMOTE_SERVICE_DISABLED) {
615    callback.Run(SYNC_STATUS_SYNC_DISABLED);
616    return;
617  }
618
619  if (!sync_worker_) {
620    callback.Run(SYNC_STATUS_ABORT);
621    return;
622  }
623
624  SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread(
625      FROM_HERE, TrackCallback(callback));
626  worker_task_runner_->PostTask(
627      FROM_HERE,
628      base::Bind(&SyncWorkerInterface::ApplyLocalChange,
629                 base::Unretained(sync_worker_.get()),
630                 local_change,
631                 local_path,
632                 local_metadata,
633                 url,
634                 relayed_callback));
635}
636
637void SyncEngine::OnNotificationReceived() {
638  if (!sync_worker_)
639    return;
640
641  worker_task_runner_->PostTask(
642      FROM_HERE,
643      base::Bind(&SyncWorkerInterface::ActivateService,
644                 base::Unretained(sync_worker_.get()),
645                 REMOTE_SERVICE_OK,
646                 "Got push notification for Drive"));
647}
648
649void SyncEngine::OnPushNotificationEnabled(bool) {}
650
651void SyncEngine::OnReadyToSendRequests() {
652  has_refresh_token_ = true;
653  if (!sync_worker_)
654    return;
655
656  worker_task_runner_->PostTask(
657      FROM_HERE,
658      base::Bind(&SyncWorkerInterface::ActivateService,
659                 base::Unretained(sync_worker_.get()),
660                 REMOTE_SERVICE_OK,
661                 "Authenticated"));
662}
663
664void SyncEngine::OnRefreshTokenInvalid() {
665  has_refresh_token_ = false;
666  if (!sync_worker_)
667    return;
668
669  worker_task_runner_->PostTask(
670      FROM_HERE,
671      base::Bind(&SyncWorkerInterface::DeactivateService,
672                 base::Unretained(sync_worker_.get()),
673                 "Found invalid refresh token."));
674}
675
676void SyncEngine::OnNetworkChanged(
677    net::NetworkChangeNotifier::ConnectionType type) {
678  if (!sync_worker_)
679    return;
680
681  bool network_available_old = network_available_;
682  network_available_ = (type != net::NetworkChangeNotifier::CONNECTION_NONE);
683
684  if (!network_available_old && network_available_) {
685    worker_task_runner_->PostTask(
686        FROM_HERE,
687        base::Bind(&SyncWorkerInterface::ActivateService,
688                   base::Unretained(sync_worker_.get()),
689                   REMOTE_SERVICE_OK,
690                   "Connected"));
691  } else if (network_available_old && !network_available_) {
692    worker_task_runner_->PostTask(
693        FROM_HERE,
694        base::Bind(&SyncWorkerInterface::DeactivateService,
695                   base::Unretained(sync_worker_.get()),
696                   "Disconnected"));
697  }
698
699}
700
701void SyncEngine::GoogleSigninFailed(const GoogleServiceAuthError& error) {
702  Reset();
703  UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
704                     "Failed to sign in.");
705}
706
707void SyncEngine::GoogleSigninSucceeded(const std::string& username,
708                                       const std::string& password) {
709  Initialize();
710}
711
712void SyncEngine::GoogleSignedOut(const std::string& username) {
713  Reset();
714  UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED,
715                     "User signed out.");
716}
717
718SyncEngine::SyncEngine(
719    base::SingleThreadTaskRunner* ui_task_runner,
720    base::SequencedTaskRunner* worker_task_runner,
721    base::SequencedTaskRunner* drive_task_runner,
722    const base::FilePath& sync_file_system_dir,
723    TaskLogger* task_logger,
724    drive::DriveNotificationManager* notification_manager,
725    ExtensionServiceInterface* extension_service,
726    SigninManagerBase* signin_manager,
727    OAuth2TokenService* token_service,
728    net::URLRequestContextGetter* request_context,
729    scoped_ptr<DriveServiceFactory> drive_service_factory,
730    leveldb::Env* env_override)
731    : ui_task_runner_(ui_task_runner),
732      worker_task_runner_(worker_task_runner),
733      drive_task_runner_(drive_task_runner),
734      sync_file_system_dir_(sync_file_system_dir),
735      task_logger_(task_logger),
736      notification_manager_(notification_manager),
737      extension_service_(extension_service),
738      signin_manager_(signin_manager),
739      token_service_(token_service),
740      request_context_(request_context),
741      drive_service_factory_(drive_service_factory.Pass()),
742      remote_change_processor_(NULL),
743      service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE),
744      has_refresh_token_(false),
745      network_available_(false),
746      sync_enabled_(false),
747      env_override_(env_override),
748      weak_ptr_factory_(this) {
749  DCHECK(sync_file_system_dir_.IsAbsolute());
750  if (notification_manager_)
751    notification_manager_->AddObserver(this);
752  if (signin_manager_)
753    signin_manager_->AddObserver(this);
754  net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
755}
756
757void SyncEngine::OnPendingFileListUpdated(int item_count) {
758  FOR_EACH_OBSERVER(
759      SyncServiceObserver,
760      service_observers_,
761      OnRemoteChangeQueueUpdated(item_count));
762}
763
764void SyncEngine::OnFileStatusChanged(const storage::FileSystemURL& url,
765                                     SyncFileStatus file_status,
766                                     SyncAction sync_action,
767                                     SyncDirection direction) {
768  FOR_EACH_OBSERVER(FileStatusObserver,
769                    file_status_observers_,
770                    OnFileStatusChanged(
771                        url, file_status, sync_action, direction));
772}
773
774void SyncEngine::UpdateServiceState(RemoteServiceState state,
775                                    const std::string& description) {
776  service_state_ = state;
777
778  FOR_EACH_OBSERVER(
779      SyncServiceObserver, service_observers_,
780      OnRemoteServiceStateUpdated(GetCurrentState(), description));
781}
782
783SyncStatusCallback SyncEngine::TrackCallback(
784    const SyncStatusCallback& callback) {
785  return callback_tracker_.Register(
786      base::Bind(callback, SYNC_STATUS_ABORT),
787      callback);
788}
789
790}  // namespace drive_backend
791}  // namespace sync_file_system
792