sync_engine.cc revision cedac228d2dd51db4b79ea1e72c7f249408ee061
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h"
6
7#include <vector>
8
9#include "base/bind.h"
10#include "base/metrics/histogram.h"
11#include "base/threading/sequenced_worker_pool.h"
12#include "base/time/time.h"
13#include "base/values.h"
14#include "chrome/browser/drive/drive_api_service.h"
15#include "chrome/browser/drive/drive_notification_manager.h"
16#include "chrome/browser/drive/drive_notification_manager_factory.h"
17#include "chrome/browser/drive/drive_service_interface.h"
18#include "chrome/browser/drive/drive_uploader.h"
19#include "chrome/browser/extensions/extension_service.h"
20#include "chrome/browser/profiles/profile.h"
21#include "chrome/browser/signin/profile_oauth2_token_service_factory.h"
22#include "chrome/browser/signin/signin_manager_factory.h"
23#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h"
24#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h"
25#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h"
26#include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h"
27#include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h"
28#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h"
29#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h"
30#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h"
31#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h"
32#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h"
33#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h"
34#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h"
35#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h"
36#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h"
37#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h"
38#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h"
39#include "chrome/browser/sync_file_system/drive_backend/sync_task.h"
40#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h"
41#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h"
42#include "chrome/browser/sync_file_system/file_status_observer.h"
43#include "chrome/browser/sync_file_system/logger.h"
44#include "chrome/browser/sync_file_system/syncable_file_system_util.h"
45#include "components/signin/core/browser/profile_oauth2_token_service.h"
46#include "components/signin/core/browser/signin_manager.h"
47#include "content/public/browser/browser_thread.h"
48#include "extensions/browser/extension_system.h"
49#include "extensions/browser/extension_system_provider.h"
50#include "extensions/browser/extensions_browser_client.h"
51#include "extensions/common/extension.h"
52#include "google_apis/drive/drive_api_url_generator.h"
53#include "google_apis/drive/gdata_wapi_url_generator.h"
54#include "webkit/common/blob/scoped_file.h"
55#include "webkit/common/fileapi/file_system_util.h"
56
57namespace sync_file_system {
58
59class RemoteChangeProcessor;
60
61namespace drive_backend {
62
63class SyncEngine::WorkerObserver : public SyncWorker::Observer {
64 public:
65  WorkerObserver(base::SequencedTaskRunner* ui_task_runner,
66                 base::WeakPtr<SyncEngine> sync_engine)
67      : ui_task_runner_(ui_task_runner),
68        sync_engine_(sync_engine) {}
69
70  virtual ~WorkerObserver() {}
71
72  virtual void OnPendingFileListUpdated(int item_count) OVERRIDE {
73    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
74      if (sync_engine_)
75        sync_engine_->OnPendingFileListUpdated(item_count);
76      return;
77    }
78
79    ui_task_runner_->PostTask(
80        FROM_HERE,
81        base::Bind(&SyncEngine::OnPendingFileListUpdated,
82                   sync_engine_,
83                   item_count));
84  }
85
86  virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url,
87                                   SyncFileStatus file_status,
88                                   SyncAction sync_action,
89                                   SyncDirection direction) OVERRIDE {
90    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
91      if (sync_engine_)
92        sync_engine_->OnFileStatusChanged(
93            url, file_status, sync_action, direction);
94      return;
95    }
96
97    ui_task_runner_->PostTask(
98        FROM_HERE,
99        base::Bind(&SyncEngine::OnFileStatusChanged,
100                   sync_engine_,
101                   url, file_status, sync_action, direction));
102  }
103
104  virtual void UpdateServiceState(RemoteServiceState state,
105                                  const std::string& description) OVERRIDE {
106    if (ui_task_runner_->RunsTasksOnCurrentThread()) {
107      if (sync_engine_)
108        sync_engine_->UpdateServiceState(state, description);
109      return;
110    }
111
112    ui_task_runner_->PostTask(
113        FROM_HERE,
114        base::Bind(&SyncEngine::UpdateServiceState,
115                   sync_engine_, state, description));
116  }
117
118 private:
119  scoped_refptr<base::SequencedTaskRunner> ui_task_runner_;
120  base::WeakPtr<SyncEngine> sync_engine_;
121
122  DISALLOW_COPY_AND_ASSIGN(WorkerObserver);
123};
124
125namespace {
126
127void EmptyStatusCallback(SyncStatusCode status) {}
128
129void DidRegisterOrigin(const base::TimeTicks& start_time,
130                       const SyncStatusCallback& callback,
131                       SyncStatusCode status) {
132  base::TimeDelta delta(base::TimeTicks::Now() - start_time);
133  HISTOGRAM_TIMES("SyncFileSystem.RegisterOriginTime", delta);
134  callback.Run(status);
135}
136
137}  // namespace
138
139scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext(
140    content::BrowserContext* context,
141    TaskLogger* task_logger) {
142  scoped_refptr<base::SequencedWorkerPool> worker_pool(
143      content::BrowserThread::GetBlockingPool());
144  scoped_refptr<base::SequencedTaskRunner> drive_task_runner(
145      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
146          worker_pool->GetSequenceToken(),
147          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
148
149  Profile* profile = Profile::FromBrowserContext(context);
150  ProfileOAuth2TokenService* token_service =
151      ProfileOAuth2TokenServiceFactory::GetForProfile(profile);
152  scoped_ptr<drive::DriveServiceInterface> drive_service(
153      new drive::DriveAPIService(
154          token_service,
155          context->GetRequestContext(),
156          drive_task_runner.get(),
157          GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction),
158          GURL(google_apis::DriveApiUrlGenerator::
159               kBaseDownloadUrlForProduction),
160          GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction),
161          std::string() /* custom_user_agent */));
162  SigninManagerBase* signin_manager =
163      SigninManagerFactory::GetForProfile(profile);
164  drive_service->Initialize(signin_manager->GetAuthenticatedAccountId());
165
166  scoped_ptr<drive::DriveUploaderInterface> drive_uploader(
167      new drive::DriveUploader(drive_service.get(), drive_task_runner.get()));
168
169  drive::DriveNotificationManager* notification_manager =
170      drive::DriveNotificationManagerFactory::GetForBrowserContext(context);
171  ExtensionService* extension_service =
172      extensions::ExtensionSystem::Get(context)->extension_service();
173
174  scoped_refptr<base::SequencedTaskRunner> file_task_runner(
175      worker_pool->GetSequencedTaskRunnerWithShutdownBehavior(
176          worker_pool->GetSequenceToken(),
177          base::SequencedWorkerPool::SKIP_ON_SHUTDOWN));
178
179  // TODO(peria): Create another task runner to manage SyncWorker.
180  scoped_refptr<base::SingleThreadTaskRunner>
181      worker_task_runner = base::MessageLoopProxy::current();
182
183  scoped_ptr<drive_backend::SyncEngine> sync_engine(
184      new SyncEngine(drive_service.Pass(),
185                     drive_uploader.Pass(),
186                     worker_task_runner,
187                     notification_manager,
188                     extension_service,
189                     signin_manager));
190  sync_engine->Initialize(GetSyncFileSystemDir(context->GetPath()),
191                          task_logger,
192                          file_task_runner.get(),
193                          NULL);
194
195  return sync_engine.Pass();
196}
197
198void SyncEngine::AppendDependsOnFactories(
199    std::set<BrowserContextKeyedServiceFactory*>* factories) {
200  DCHECK(factories);
201  factories->insert(drive::DriveNotificationManagerFactory::GetInstance());
202  factories->insert(SigninManagerFactory::GetInstance());
203  factories->insert(
204      extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory());
205}
206
207SyncEngine::~SyncEngine() {
208  net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this);
209  GetDriveService()->RemoveObserver(this);
210  if (notification_manager_)
211    notification_manager_->RemoveObserver(this);
212
213  WorkerObserver* worker_observer = worker_observer_.release();
214  if (!worker_task_runner_->DeleteSoon(FROM_HERE, worker_observer))
215    delete worker_observer;
216
217  SyncWorker* sync_worker = sync_worker_.release();
218  if (!worker_task_runner_->DeleteSoon(FROM_HERE, sync_worker))
219    delete sync_worker;
220}
221
222void SyncEngine::Initialize(const base::FilePath& base_dir,
223                            TaskLogger* task_logger,
224                            base::SequencedTaskRunner* file_task_runner,
225                            leveldb::Env* env_override) {
226  // DriveServiceWrapper and DriveServiceOnWorker relay communications
227  // between DriveService and syncers in SyncWorker.
228  scoped_ptr<drive::DriveServiceInterface> drive_service_on_worker(
229      new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(),
230                               base::MessageLoopProxy::current(),
231                               worker_task_runner_));
232  scoped_ptr<drive::DriveUploaderInterface> drive_uploader_on_worker(
233      new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(),
234                                base::MessageLoopProxy::current(),
235                                worker_task_runner_));
236  scoped_ptr<SyncEngineContext> sync_engine_context(
237      new SyncEngineContext(drive_service_on_worker.Pass(),
238                            drive_uploader_on_worker.Pass(),
239                            task_logger,
240                            base::MessageLoopProxy::current(),
241                            worker_task_runner_,
242                            file_task_runner));
243
244  worker_observer_.reset(
245      new WorkerObserver(base::MessageLoopProxy::current(),
246                         weak_ptr_factory_.GetWeakPtr()));
247
248  base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr;
249  if (extension_service_)
250    extension_service_weak_ptr = extension_service_->AsWeakPtr();
251
252  sync_worker_.reset(new SyncWorker(
253      base_dir,
254      extension_service_weak_ptr,
255      sync_engine_context.Pass(),
256      env_override));
257  sync_worker_->AddObserver(worker_observer_.get());
258  worker_task_runner_->PostTask(
259      FROM_HERE,
260      base::Bind(&SyncWorker::Initialize,
261                 base::Unretained(sync_worker_.get())));
262
263  if (notification_manager_)
264    notification_manager_->AddObserver(this);
265  GetDriveService()->AddObserver(this);
266  net::NetworkChangeNotifier::AddNetworkChangeObserver(this);
267}
268
269void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) {
270  service_observers_.AddObserver(observer);
271}
272
273void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) {
274  file_status_observers_.AddObserver(observer);
275}
276
277void SyncEngine::RegisterOrigin(
278    const GURL& origin, const SyncStatusCallback& callback) {
279  worker_task_runner_->PostTask(
280      FROM_HERE,
281      base::Bind(&SyncWorker::RegisterOrigin,
282                 base::Unretained(sync_worker_.get()),
283                 origin,
284                 RelayCallbackToCurrentThread(
285                     FROM_HERE,
286                     base::Bind(&DidRegisterOrigin,
287                                base::TimeTicks::Now(),
288                                callback))));
289}
290
291void SyncEngine::EnableOrigin(
292    const GURL& origin, const SyncStatusCallback& callback) {
293  worker_task_runner_->PostTask(
294      FROM_HERE,
295      base::Bind(&SyncWorker::EnableOrigin,
296                 base::Unretained(sync_worker_.get()),
297                 origin,
298                 RelayCallbackToCurrentThread(
299                     FROM_HERE, callback)));
300}
301
302void SyncEngine::DisableOrigin(
303    const GURL& origin, const SyncStatusCallback& callback) {
304  worker_task_runner_->PostTask(
305      FROM_HERE,
306      base::Bind(&SyncWorker::DisableOrigin,
307                 base::Unretained(sync_worker_.get()),
308                 origin,
309                 RelayCallbackToCurrentThread(
310                     FROM_HERE, callback)));
311}
312
313void SyncEngine::UninstallOrigin(
314    const GURL& origin,
315    UninstallFlag flag,
316    const SyncStatusCallback& callback) {
317  worker_task_runner_->PostTask(
318      FROM_HERE,
319      base::Bind(&SyncWorker::UninstallOrigin,
320                 base::Unretained(sync_worker_.get()),
321                 origin, flag,
322                 RelayCallbackToCurrentThread(
323                     FROM_HERE, callback)));
324}
325
326void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) {
327  worker_task_runner_->PostTask(
328      FROM_HERE,
329      base::Bind(&SyncWorker::ProcessRemoteChange,
330                 base::Unretained(sync_worker_.get()),
331                 RelayCallbackToCurrentThread(
332                     FROM_HERE, callback)));
333}
334
335void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) {
336  remote_change_processor_ = processor;
337  remote_change_processor_wrapper_.reset(
338      new RemoteChangeProcessorWrapper(processor));
339
340  remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker(
341      remote_change_processor_wrapper_->AsWeakPtr(),
342      base::MessageLoopProxy::current(), /* ui_task_runner */
343      worker_task_runner_));
344
345  worker_task_runner_->PostTask(
346      FROM_HERE,
347      base::Bind(&SyncWorker::SetRemoteChangeProcessor,
348                 base::Unretained(sync_worker_.get()),
349                 remote_change_processor_on_worker_.get()));
350}
351
352LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() {
353  return this;
354}
355
356RemoteServiceState SyncEngine::GetCurrentState() const {
357  return service_state_;
358}
359
360void SyncEngine::GetOriginStatusMap(const StatusMapCallback& callback) {
361  worker_task_runner_->PostTask(
362      FROM_HERE,
363      base::Bind(&SyncWorker::GetOriginStatusMap,
364                 base::Unretained(sync_worker_.get()),
365                 RelayCallbackToCurrentThread(FROM_HERE, callback)));
366}
367
368void SyncEngine::DumpFiles(const GURL& origin,
369                           const ListCallback& callback) {
370  PostTaskAndReplyWithResult(
371      worker_task_runner_,
372      FROM_HERE,
373      base::Bind(&SyncWorker::DumpFiles,
374                 base::Unretained(sync_worker_.get()),
375                 origin),
376      callback);
377}
378
379void SyncEngine::DumpDatabase(const ListCallback& callback) {
380  PostTaskAndReplyWithResult(
381      worker_task_runner_,
382      FROM_HERE,
383      base::Bind(&SyncWorker::DumpDatabase,
384                 base::Unretained(sync_worker_.get())),
385      callback);
386}
387
388void SyncEngine::SetSyncEnabled(bool enabled) {
389  worker_task_runner_->PostTask(
390      FROM_HERE,
391      base::Bind(&SyncWorker::SetSyncEnabled,
392                 base::Unretained(sync_worker_.get()),
393                 enabled));
394}
395
396void SyncEngine::PromoteDemotedChanges() {
397  MetadataDatabase* metadata_db = GetMetadataDatabase();
398  if (metadata_db && metadata_db->HasLowPriorityDirtyTracker()) {
399    metadata_db->PromoteLowerPriorityTrackersToNormal();
400    FOR_EACH_OBSERVER(
401        Observer,
402        service_observers_,
403        OnRemoteChangeQueueUpdated(metadata_db->CountDirtyTracker()));
404  }
405}
406
407void SyncEngine::ApplyLocalChange(
408    const FileChange& local_change,
409    const base::FilePath& local_path,
410    const SyncFileMetadata& local_metadata,
411    const fileapi::FileSystemURL& url,
412    const SyncStatusCallback& callback) {
413  worker_task_runner_->PostTask(
414      FROM_HERE,
415      base::Bind(&SyncWorker::ApplyLocalChange,
416                 base::Unretained(sync_worker_.get()),
417                 local_change,
418                 local_path,
419                 local_metadata,
420                 url,
421                 RelayCallbackToCurrentThread(
422                     FROM_HERE, callback)));
423}
424
425SyncTaskManager* SyncEngine::GetSyncTaskManagerForTesting() {
426  return sync_worker_->GetSyncTaskManager();
427}
428
429void SyncEngine::OnNotificationReceived() {
430  worker_task_runner_->PostTask(
431      FROM_HERE,
432      base::Bind(&SyncWorker::OnNotificationReceived,
433                 base::Unretained(sync_worker_.get())));
434}
435
436void SyncEngine::OnPushNotificationEnabled(bool) {}
437
438void SyncEngine::OnReadyToSendRequests() {
439  const std::string account_id =
440      signin_manager_ ? signin_manager_->GetAuthenticatedAccountId() : "";
441
442  worker_task_runner_->PostTask(
443      FROM_HERE,
444      base::Bind(&SyncWorker::OnReadyToSendRequests,
445                 base::Unretained(sync_worker_.get()),
446                 account_id));
447}
448
449void SyncEngine::OnRefreshTokenInvalid() {
450  worker_task_runner_->PostTask(
451      FROM_HERE,
452      base::Bind(&SyncWorker::OnRefreshTokenInvalid,
453                 base::Unretained(sync_worker_.get())));
454}
455
456void SyncEngine::OnNetworkChanged(
457    net::NetworkChangeNotifier::ConnectionType type) {
458  worker_task_runner_->PostTask(
459      FROM_HERE,
460      base::Bind(&SyncWorker::OnNetworkChanged,
461                 base::Unretained(sync_worker_.get()),
462                 type));
463}
464
465drive::DriveServiceInterface* SyncEngine::GetDriveService() {
466  return drive_service_.get();
467}
468
469drive::DriveUploaderInterface* SyncEngine::GetDriveUploader() {
470  return drive_uploader_.get();
471}
472
473MetadataDatabase* SyncEngine::GetMetadataDatabase() {
474  // TODO(peria): Post task
475  return sync_worker_->GetMetadataDatabase();
476}
477
478SyncEngine::SyncEngine(
479    scoped_ptr<drive::DriveServiceInterface> drive_service,
480    scoped_ptr<drive::DriveUploaderInterface> drive_uploader,
481    base::SequencedTaskRunner* worker_task_runner,
482    drive::DriveNotificationManager* notification_manager,
483    ExtensionServiceInterface* extension_service,
484    SigninManagerBase* signin_manager)
485    : drive_service_(drive_service.Pass()),
486      drive_service_wrapper_(new DriveServiceWrapper(drive_service_.get())),
487      drive_uploader_(drive_uploader.Pass()),
488      drive_uploader_wrapper_(new DriveUploaderWrapper(drive_uploader_.get())),
489      service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE),
490      notification_manager_(notification_manager),
491      extension_service_(extension_service),
492      signin_manager_(signin_manager),
493      worker_task_runner_(worker_task_runner),
494      weak_ptr_factory_(this) {}
495
496void SyncEngine::OnPendingFileListUpdated(int item_count) {
497  FOR_EACH_OBSERVER(
498      Observer,
499      service_observers_,
500      OnRemoteChangeQueueUpdated(item_count));
501}
502
503void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url,
504                                     SyncFileStatus file_status,
505                                     SyncAction sync_action,
506                                     SyncDirection direction) {
507  FOR_EACH_OBSERVER(FileStatusObserver,
508                    file_status_observers_,
509                    OnFileStatusChanged(
510                        url, file_status, sync_action, direction));
511}
512
513void SyncEngine::UpdateServiceState(RemoteServiceState state,
514                                    const std::string& description) {
515  service_state_ = state;
516
517  FOR_EACH_OBSERVER(
518      Observer, service_observers_,
519      OnRemoteServiceStateUpdated(state, description));
520}
521
522void SyncEngine::UpdateRegisteredApps() {
523  if (!extension_service_)
524    return;
525
526  MetadataDatabase* metadata_db = GetMetadataDatabase();
527  DCHECK(metadata_db);
528  std::vector<std::string> app_ids;
529  metadata_db->GetRegisteredAppIDs(&app_ids);
530
531  // Update the status of every origin using status from ExtensionService.
532  for (std::vector<std::string>::const_iterator itr = app_ids.begin();
533       itr != app_ids.end(); ++itr) {
534    const std::string& app_id = *itr;
535    GURL origin =
536        extensions::Extension::GetBaseURLFromExtensionId(app_id);
537    if (!extension_service_->GetInstalledExtension(app_id)) {
538      // Extension has been uninstalled.
539      // (At this stage we can't know if it was unpacked extension or not,
540      // so just purge the remote folder.)
541      UninstallOrigin(origin,
542                      RemoteFileSyncService::UNINSTALL_AND_PURGE_REMOTE,
543                      base::Bind(&EmptyStatusCallback));
544      continue;
545    }
546    FileTracker tracker;
547    if (!metadata_db->FindAppRootTracker(app_id, &tracker)) {
548      // App will register itself on first run.
549      continue;
550    }
551    bool is_app_enabled = extension_service_->IsExtensionEnabled(app_id);
552    bool is_app_root_tracker_enabled =
553        tracker.tracker_kind() == TRACKER_KIND_APP_ROOT;
554    if (is_app_enabled && !is_app_root_tracker_enabled)
555      EnableOrigin(origin, base::Bind(&EmptyStatusCallback));
556    else if (!is_app_enabled && is_app_root_tracker_enabled)
557      DisableOrigin(origin, base::Bind(&EmptyStatusCallback));
558  }
559}
560
561}  // namespace drive_backend
562}  // namespace sync_file_system
563