sync_engine.cc revision 5c02ac1a9c1b504631c0a3d2b6e737b5d738bae1
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
6
7#include <vector>
8
9#include "base/bind.h"
10#include "base/threading/sequenced_worker_pool.h"
11#include "base/values.h"
12#include "chrome/browser/drive/drive_api_service.h"
13#include "chrome/browser/drive/drive_notification_manager.h"
14#include "chrome/browser/drive/drive_notification_manager_factory.h"
15#include "chrome/browser/drive/drive_service_interface.h"
16#include "chrome/browser/drive/drive_uploader.h"
17#include "chrome/browser/extensions/extension_service.h"
18#include "chrome/browser/profiles/profile.h"
19#include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
20#include "chrome/browser/signin/signin_manager_factory.h"
21#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h"
22#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
23#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
24#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
25#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
26#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
27#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
28#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
29#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
30#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
31#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
32#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
33#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
34#include "chrome/browser/sync_file_system/file_status_observer.h"
35#include "chrome/browser/sync_file_system/logger.h"
36#include "chrome/browser/sync_file_system/syncable_file_system_util.h"
37#include "components/signin/core/browser/profile_oauth2_token_service.h"
38#include "components/signin/core/browser/signin_manager.h"
39#include "content/public/browser/browser_thread.h"
40#include "extensions/browser/extension_system.h"
41#include "extensions/browser/extension_system_provider.h"
42#include "extensions/browser/extensions_browser_client.h"
43#include "extensions/common/extension.h"
44#include "google_apis/drive/drive_api_url_generator.h"
45#include "google_apis/drive/gdata_wapi_url_generator.h"
46#include "webkit/common/blob/scoped_file.h"
47#include "webkit/common/fileapi/file_system_util.h"
48
49namespace sync_file_system {
50
51class RemoteChangeProcessor;
52
53namespace drive_backend {
54
55class SyncEngine::WorkerObserver
56    : public SyncWorker::Observer {
57 public:
58  WorkerObserver(base::SequencedTaskRunner* ui_task_runner,
59                 base::WeakPtr<SyncEngine> sync_engine)
60      : ui_task_runner_(ui_task_runner),
61        sync_engine_(sync_engine){
62  }
63
64  virtual ~WorkerObserver() {}
65
66  virtual void OnPendingFileListUpdated(int item_count) OVERRIDE {
67    ui_task_runner_->PostTask(
68        FROM_HERE,
69        base::Bind(&SyncEngine::OnPendingFileListUpdated,
70                   sync_engine_,
71                   item_count));
72  }
73
74  virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url,
75                                   SyncFileStatus file_status,
76                                   SyncAction sync_action,
77                                   SyncDirection direction) OVERRIDE {
78    ui_task_runner_->PostTask(
79        FROM_HERE,
80        base::Bind(&SyncEngine::OnFileStatusChanged,
81                   sync_engine_,
82                   url, file_status, sync_action, direction));
83  }
84
85
86  virtual void UpdateServiceState(RemoteServiceState state,
87                                  const std::string& description) OVERRIDE {
88    ui_task_runner_->PostTask(
89        FROM_HERE,
90        base::Bind(&SyncEngine::UpdateServiceState,
91                   sync_engine_, state, description));
92  }
93
94 private:
95  scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
96  base::WeakPtr<SyncEngine> sync_engine_;
97
98  DISALLOW_COPY_AND_ASSIGN(WorkerObserver);
99};
100
101namespace {
102
103void EmptyStatusCallback(SyncStatusCode status) {}
104
105}  // namespace
106
107scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
108    content::BrowserContext* context) {
109  scoped_refptr<base::SequencedWorkerPool> worker_pool(
110      content::BrowserThread::GetBlockingPool());
111  scoped_refptr<base::SequencedTaskRunner> drive_task_runner(
112      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
113          worker_pool->GetSequenceToken(),
114          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
115
116  Profile* profile = Profile::FromBrowserContext(context);
117  ProfileOAuth2TokenService* token_service =
118      ProfileOAuth2TokenServiceFactory::GetForProfile(profile);
119  scoped_ptr<drive::DriveServiceInterface> drive_service(
120      new drive::DriveAPIService(
121          token_service,
122          context->GetRequestContext(),
123          drive_task_runner.get(),
124          GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction),
125          GURL(google_apis::DriveApiUrlGenerator::
126               kBaseDownloadUrlForProduction),
127          GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction),
128          std::string() /* custom_user_agent */));
129  SigninManagerBase* signin_manager =
130      SigninManagerFactory::GetForProfile(profile);
131  drive_service->Initialize(signin_manager->GetAuthenticatedAccountId());
132
133  scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
134      new drive::DriveUploader(drive_service.get(), drive_task_runner.get()));
135
136  drive::DriveNotificationManager* notification_manager =
137      drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
138  ExtensionService* extension_service =
139      extensions::ExtensionSystem::Get(context)->extension_service();
140
141  scoped_refptr<base::SequencedTaskRunner> file_task_runner(
142      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
143          worker_pool->GetSequenceToken(),
144          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
145
146  // TODO(peria): Create another task runner to manage SyncWorker.
147  scoped_refptr<base::SingleThreadTaskRunner>
148      worker_task_runner = base::MessageLoopProxy::current();
149
150  scoped_ptr<drive_backend::SyncEngine> sync_engine(
151      new SyncEngine(drive_service.Pass(),
152                     drive_uploader.Pass(),
153                     worker_task_runner,
154                     notification_manager,
155                     extension_service,
156                     signin_manager));
157  sync_engine->Initialize(
158      GetSyncFileSystemDir(context->GetPath()),
159      file_task_runner.get(),
160      NULL);
161
162  return sync_engine.Pass();
163}
164
165void SyncEngine::AppendDependsOnFactories(
166    std::set<BrowserContextKeyedServiceFactory*>* factories) {
167  DCHECK(factories);
168  factories->insert(drive::DriveNotificationManagerFactory::GetInstance());
169  factories->insert(SigninManagerFactory::GetInstance());
170  factories->insert(
171      extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory());
172}
173
174SyncEngine::~SyncEngine() {
175  net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
176  GetDriveService()->RemoveObserver(this);
177  if (notification_manager_)
178    notification_manager_->RemoveObserver(this);
179
180  // TODO(tzik): Destroy |sync_worker_| and |worker_observer_| on the worker.
181}
182
183void SyncEngine::Initialize(const base::FilePath& base_dir,
184                            base::SequencedTaskRunner* file_task_runner,
185                            leveldb::Env* env_override) {
186  scoped_ptr<SyncEngineContext> sync_engine_context(
187      new SyncEngineContext(drive_service_.get(),
188                            drive_uploader_.get(),
189                            base::MessageLoopProxy::current(),
190                            worker_task_runner_,
191                            file_task_runner));
192  worker_observer_.reset(
193      new WorkerObserver(base::MessageLoopProxy::current(),
194                         weak_ptr_factory_.GetWeakPtr()));
195
196  base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr;
197  if (extension_service_)
198    extension_service_weak_ptr = extension_service_->AsWeakPtr();
199
200  // TODO(peria): Use PostTask on |worker_task_runner_| to call this function.
201  sync_worker_ = SyncWorker::CreateOnWorker(
202      base_dir,
203      worker_observer_.get(),
204      extension_service_weak_ptr,
205      sync_engine_context.Pass(),
206      env_override);
207
208  if (notification_manager_)
209    notification_manager_->AddObserver(this);
210  GetDriveService()->AddObserver(this);
211  net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
212}
213
214void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
215  service_observers_.AddObserver(observer);
216}
217
218void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
219  file_status_observers_.AddObserver(observer);
220}
221
222void SyncEngine::RegisterOrigin(
223    const GURL& origin, const SyncStatusCallback& callback) {
224  worker_task_runner_->PostTask(
225      FROM_HERE,
226      base::Bind(&SyncWorker::RegisterOrigin,
227                 base::Unretained(sync_worker_.get()),
228                 origin,
229                 RelayCallbackToCurrentThread(
230                     FROM_HERE, callback)));
231}
232
233void SyncEngine::EnableOrigin(
234    const GURL& origin, const SyncStatusCallback& callback) {
235  worker_task_runner_->PostTask(
236      FROM_HERE,
237      base::Bind(&SyncWorker::EnableOrigin,
238                 base::Unretained(sync_worker_.get()),
239                 origin,
240                 RelayCallbackToCurrentThread(
241                     FROM_HERE, callback)));
242}
243
244void SyncEngine::DisableOrigin(
245    const GURL& origin, const SyncStatusCallback& callback) {
246  worker_task_runner_->PostTask(
247      FROM_HERE,
248      base::Bind(&SyncWorker::DisableOrigin,
249                 base::Unretained(sync_worker_.get()),
250                 origin,
251                 RelayCallbackToCurrentThread(
252                     FROM_HERE, callback)));
253}
254
255void SyncEngine::UninstallOrigin(
256    const GURL& origin,
257    UninstallFlag flag,
258    const SyncStatusCallback& callback) {
259  worker_task_runner_->PostTask(
260      FROM_HERE,
261      base::Bind(&SyncWorker::UninstallOrigin,
262                 base::Unretained(sync_worker_.get()),
263                 origin, flag,
264                 RelayCallbackToCurrentThread(
265                     FROM_HERE, callback)));
266}
267
268void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
269  worker_task_runner_->PostTask(
270      FROM_HERE,
271      base::Bind(&SyncWorker::ProcessRemoteChange,
272                 base::Unretained(sync_worker_.get()),
273                 RelayCallbackToCurrentThread(
274                     FROM_HERE, callback)));
275}
276
277void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
278  worker_task_runner_->PostTask(
279      FROM_HERE,
280      base::Bind(&SyncWorker::SetRemoteChangeProcessor,
281                 base::Unretained(sync_worker_.get()),
282                 processor));
283}
284
285LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
286  return this;
287}
288
289bool SyncEngine::IsConflicting(const fileapi::FileSystemURL& url) {
290  // TODO(tzik): Implement this before we support manual conflict resolution.
291  return false;
292}
293
294RemoteServiceState SyncEngine::GetCurrentState() const {
295  // TODO(peria): Post task
296  return sync_worker_->GetCurrentState();
297}
298
299void SyncEngine::GetOriginStatusMap(OriginStatusMap* status_map) {
300  // TODO(peria): Make this route asynchronous.
301  sync_worker_->GetOriginStatusMap(status_map);
302}
303
304scoped_ptr<base::ListValue> SyncEngine::DumpFiles(const GURL& origin) {
305  // TODO(peria): Make this route asynchronous.
306  return sync_worker_->DumpFiles(origin);
307}
308
309scoped_ptr<base::ListValue> SyncEngine::DumpDatabase() {
310  // TODO(peria): Make this route asynchronous.
311  return sync_worker_->DumpDatabase();
312}
313
314void SyncEngine::SetSyncEnabled(bool enabled) {
315  worker_task_runner_->PostTask(
316      FROM_HERE,
317      base::Bind(&SyncWorker::SetSyncEnabled,
318                 base::Unretained(sync_worker_.get()),
319                 enabled));
320}
321
322void SyncEngine::UpdateSyncEnabled(bool enabled) {
323  const char* status_message = enabled ? "Sync is enabled" : "Sync is disabled";
324  FOR_EACH_OBSERVER(
325      Observer, service_observers_,
326      OnRemoteServiceStateUpdated(GetCurrentState(), status_message));
327}
328
329SyncStatusCode SyncEngine::SetDefaultConflictResolutionPolicy(
330    ConflictResolutionPolicy policy) {
331  // TODO(peria): Make this route asynchronous.
332  return sync_worker_->SetDefaultConflictResolutionPolicy(policy);
333}
334
335SyncStatusCode SyncEngine::SetConflictResolutionPolicy(
336    const GURL& origin,
337    ConflictResolutionPolicy policy) {
338  // TODO(peria): Make this route asynchronous.
339  return sync_worker_->SetConflictResolutionPolicy(origin, policy);
340}
341
342ConflictResolutionPolicy SyncEngine::GetDefaultConflictResolutionPolicy()
343    const {
344  // TODO(peria): Make this route asynchronous.
345  return sync_worker_->GetDefaultConflictResolutionPolicy();
346}
347
348ConflictResolutionPolicy SyncEngine::GetConflictResolutionPolicy(
349    const GURL& origin) const {
350  // TODO(peria): Make this route asynchronous.
351  return sync_worker_->GetConflictResolutionPolicy(origin);
352}
353
354void SyncEngine::GetRemoteVersions(
355    const fileapi::FileSystemURL& url,
356    const RemoteVersionsCallback& callback) {
357  // TODO(tzik): Implement this before we support manual conflict resolution.
358  callback.Run(SYNC_STATUS_FAILED, std::vector<Version>());
359}
360
361void SyncEngine::DownloadRemoteVersion(
362    const fileapi::FileSystemURL& url,
363    const std::string& version_id,
364    const DownloadVersionCallback& callback) {
365  // TODO(tzik): Implement this before we support manual conflict resolution.
366  callback.Run(SYNC_STATUS_FAILED, webkit_blob::ScopedFile());
367}
368
369void SyncEngine::PromoteDemotedChanges() {
370  MetadataDatabase* metadata_db = GetMetadataDatabase();
371  if (metadata_db && metadata_db->HasLowPriorityDirtyTracker()) {
372    metadata_db->PromoteLowerPriorityTrackersToNormal();
373    FOR_EACH_OBSERVER(
374        Observer,
375        service_observers_,
376        OnRemoteChangeQueueUpdated(metadata_db->CountDirtyTracker()));
377  }
378}
379
380void SyncEngine::ApplyLocalChange(
381    const FileChange& local_change,
382    const base::FilePath& local_path,
383    const SyncFileMetadata& local_metadata,
384    const fileapi::FileSystemURL& url,
385    const SyncStatusCallback& callback) {
386  worker_task_runner_->PostTask(
387      FROM_HERE,
388      base::Bind(&SyncWorker::ApplyLocalChange,
389                 base::Unretained(sync_worker_.get()),
390                 local_change,
391                 local_path,
392                 local_metadata,
393                 url,
394                 RelayCallbackToCurrentThread(
395                     FROM_HERE, callback)));
396}
397
398SyncTaskManager* SyncEngine::GetSyncTaskManagerForTesting() {
399  // TODO(peria): Post task
400  return sync_worker_->GetSyncTaskManager();
401}
402
403void SyncEngine::OnNotificationReceived() {
404  worker_task_runner_->PostTask(
405      FROM_HERE,
406      base::Bind(&SyncWorker::OnNotificationReceived,
407                 base::Unretained(sync_worker_.get())));
408}
409
410void SyncEngine::OnPushNotificationEnabled(bool) {}
411
412void SyncEngine::OnReadyToSendRequests() {
413  const std::string account_id =
414      signin_manager_ ? signin_manager_->GetAuthenticatedAccountId() : "";
415
416  worker_task_runner_->PostTask(
417      FROM_HERE,
418      base::Bind(&SyncWorker::OnReadyToSendRequests,
419                 base::Unretained(sync_worker_.get()),
420                 account_id));
421}
422
423void SyncEngine::OnRefreshTokenInvalid() {
424  worker_task_runner_->PostTask(
425      FROM_HERE,
426      base::Bind(&SyncWorker::OnRefreshTokenInvalid,
427                 base::Unretained(sync_worker_.get())));
428}
429
430void SyncEngine::OnNetworkChanged(
431    net::NetworkChangeNotifier::ConnectionType type) {
432  worker_task_runner_->PostTask(
433      FROM_HERE,
434      base::Bind(&SyncWorker::OnNetworkChanged,
435                 base::Unretained(sync_worker_.get()),
436                 type));
437}
438
439drive::DriveServiceInterface* SyncEngine::GetDriveService() {
440  return drive_service_.get();
441}
442
443drive::DriveUploaderInterface* SyncEngine::GetDriveUploader() {
444  return drive_uploader_.get();
445}
446
447MetadataDatabase* SyncEngine::GetMetadataDatabase() {
448  // TODO(peria): Post task
449  return sync_worker_->GetMetadataDatabase();
450}
451
452SyncEngine::SyncEngine(
453    scoped_ptr<drive::DriveServiceInterface> drive_service,
454    scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
455    base::SequencedTaskRunner* worker_task_runner,
456    drive::DriveNotificationManager* notification_manager,
457    ExtensionServiceInterface* extension_service,
458    SigninManagerBase* signin_manager)
459    : drive_service_(drive_service.Pass()),
460      drive_uploader_(drive_uploader.Pass()),
461      notification_manager_(notification_manager),
462      extension_service_(extension_service),
463      signin_manager_(signin_manager),
464      worker_task_runner_(worker_task_runner),
465      weak_ptr_factory_(this) {}
466
467void SyncEngine::OnPendingFileListUpdated(int item_count) {
468  FOR_EACH_OBSERVER(
469      Observer,
470      service_observers_,
471      OnRemoteChangeQueueUpdated(item_count));
472}
473
474void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url,
475                                     SyncFileStatus file_status,
476                                     SyncAction sync_action,
477                                     SyncDirection direction) {
478  FOR_EACH_OBSERVER(FileStatusObserver,
479                    file_status_observers_,
480                    OnFileStatusChanged(
481                        url, file_status, sync_action, direction));
482}
483
484void SyncEngine::UpdateServiceState(RemoteServiceState state,
485                                    const std::string& description) {
486  FOR_EACH_OBSERVER(
487      Observer, service_observers_,
488      OnRemoteServiceStateUpdated(state, description));
489}
490
491void SyncEngine::UpdateRegisteredApps() {
492  if (!extension_service_)
493    return;
494
495  MetadataDatabase* metadata_db = GetMetadataDatabase();
496  DCHECK(metadata_db);
497  std::vector<std::string> app_ids;
498  metadata_db->GetRegisteredAppIDs(&app_ids);
499
500  // Update the status of every origin using status from ExtensionService.
501  for (std::vector<std::string>::const_iterator itr = app_ids.begin();
502       itr != app_ids.end(); ++itr) {
503    const std::string& app_id = *itr;
504    GURL origin =
505        extensions::Extension::GetBaseURLFromExtensionId(app_id);
506    if (!extension_service_->GetInstalledExtension(app_id)) {
507      // Extension has been uninstalled.
508      // (At this stage we can't know if it was unpacked extension or not,
509      // so just purge the remote folder.)
510      UninstallOrigin(origin,
511                      RemoteFileSyncService::UNINSTALL_AND_PURGE_REMOTE,
512                      base::Bind(&EmptyStatusCallback));
513      continue;
514    }
515    FileTracker tracker;
516    if (!metadata_db->FindAppRootTracker(app_id, &tracker)) {
517      // App will register itself on first run.
518      continue;
519    }
520    bool is_app_enabled = extension_service_->IsExtensionEnabled(app_id);
521    bool is_app_root_tracker_enabled =
522        tracker.tracker_kind() == TRACKER_KIND_APP_ROOT;
523    if (is_app_enabled && !is_app_root_tracker_enabled)
524      EnableOrigin(origin, base::Bind(&EmptyStatusCallback));
525    else if (!is_app_enabled && is_app_root_tracker_enabled)
526      DisableOrigin(origin, base::Bind(&EmptyStatusCallback));
527  }
528}
529
530}  // namespace drive_backend
531}  // namespace sync_file_system
532