sync_engine.cc revision 5c02ac1a9c1b504631c0a3d2b6e737b5d738bae1
1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h" 6 7#include <vector> 8 9#include "base/bind.h" 10#include "base/threading/sequenced_worker_pool.h" 11#include "base/values.h" 12#include "chrome/browser/drive/drive_api_service.h" 13#include "chrome/browser/drive/drive_notification_manager.h" 14#include "chrome/browser/drive/drive_notification_manager_factory.h" 15#include "chrome/browser/drive/drive_service_interface.h" 16#include "chrome/browser/drive/drive_uploader.h" 17#include "chrome/browser/extensions/extension_service.h" 18#include "chrome/browser/profiles/profile.h" 19#include "chrome/browser/signin/profile_oauth2_token_service_factory.h" 20#include "chrome/browser/signin/signin_manager_factory.h" 21#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h" 22#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h" 23#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h" 24#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h" 25#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h" 26#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h" 27#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h" 28#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h" 29#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h" 30#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h" 31#include "chrome/browser/sync_file_system/drive_backend/sync_task.h" 32#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h" 33#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h" 34#include "chrome/browser/sync_file_system/file_status_observer.h" 35#include "chrome/browser/sync_file_system/logger.h" 36#include "chrome/browser/sync_file_system/syncable_file_system_util.h" 37#include "components/signin/core/browser/profile_oauth2_token_service.h" 38#include "components/signin/core/browser/signin_manager.h" 39#include "content/public/browser/browser_thread.h" 40#include "extensions/browser/extension_system.h" 41#include "extensions/browser/extension_system_provider.h" 42#include "extensions/browser/extensions_browser_client.h" 43#include "extensions/common/extension.h" 44#include "google_apis/drive/drive_api_url_generator.h" 45#include "google_apis/drive/gdata_wapi_url_generator.h" 46#include "webkit/common/blob/scoped_file.h" 47#include "webkit/common/fileapi/file_system_util.h" 48 49namespace sync_file_system { 50 51class RemoteChangeProcessor; 52 53namespace drive_backend { 54 55class SyncEngine::WorkerObserver 56 : public SyncWorker::Observer { 57 public: 58 WorkerObserver(base::SequencedTaskRunner* ui_task_runner, 59 base::WeakPtr<SyncEngine> sync_engine) 60 : ui_task_runner_(ui_task_runner), 61 sync_engine_(sync_engine){ 62 } 63 64 virtual ~WorkerObserver() {} 65 66 virtual void OnPendingFileListUpdated(int item_count) OVERRIDE { 67 ui_task_runner_->PostTask( 68 FROM_HERE, 69 base::Bind(&SyncEngine::OnPendingFileListUpdated, 70 sync_engine_, 71 item_count)); 72 } 73 74 virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url, 75 SyncFileStatus file_status, 76 SyncAction sync_action, 77 SyncDirection direction) OVERRIDE { 78 ui_task_runner_->PostTask( 79 FROM_HERE, 80 base::Bind(&SyncEngine::OnFileStatusChanged, 81 sync_engine_, 82 url, file_status, sync_action, direction)); 83 } 84 85 86 virtual void UpdateServiceState(RemoteServiceState state, 87 const std::string& description) OVERRIDE { 88 ui_task_runner_->PostTask( 89 FROM_HERE, 90 base::Bind(&SyncEngine::UpdateServiceState, 91 sync_engine_, state, description)); 92 } 93 94 private: 95 scoped_refptr<base::SequencedTaskRunner> ui_task_runner_; 96 base::WeakPtr<SyncEngine> sync_engine_; 97 98 DISALLOW_COPY_AND_ASSIGN(WorkerObserver); 99}; 100 101namespace { 102 103void EmptyStatusCallback(SyncStatusCode status) {} 104 105} // namespace 106 107scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext( 108 content::BrowserContext* context) { 109 scoped_refptr<base::SequencedWorkerPool> worker_pool( 110 content::BrowserThread::GetBlockingPool()); 111 scoped_refptr<base::SequencedTaskRunner> drive_task_runner( 112 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 113 worker_pool->GetSequenceToken(), 114 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN)); 115 116 Profile* profile = Profile::FromBrowserContext(context); 117 ProfileOAuth2TokenService* token_service = 118 ProfileOAuth2TokenServiceFactory::GetForProfile(profile); 119 scoped_ptr<drive::DriveServiceInterface> drive_service( 120 new drive::DriveAPIService( 121 token_service, 122 context->GetRequestContext(), 123 drive_task_runner.get(), 124 GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction), 125 GURL(google_apis::DriveApiUrlGenerator:: 126 kBaseDownloadUrlForProduction), 127 GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction), 128 std::string() /* custom_user_agent */)); 129 SigninManagerBase* signin_manager = 130 SigninManagerFactory::GetForProfile(profile); 131 drive_service->Initialize(signin_manager->GetAuthenticatedAccountId()); 132 133 scoped_ptr<drive::DriveUploaderInterface> drive_uploader( 134 new drive::DriveUploader(drive_service.get(), drive_task_runner.get())); 135 136 drive::DriveNotificationManager* notification_manager = 137 drive::DriveNotificationManagerFactory::GetForBrowserContext(context); 138 ExtensionService* extension_service = 139 extensions::ExtensionSystem::Get(context)->extension_service(); 140 141 scoped_refptr<base::SequencedTaskRunner> file_task_runner( 142 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 143 worker_pool->GetSequenceToken(), 144 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN)); 145 146 // TODO(peria): Create another task runner to manage SyncWorker. 147 scoped_refptr<base::SingleThreadTaskRunner> 148 worker_task_runner = base::MessageLoopProxy::current(); 149 150 scoped_ptr<drive_backend::SyncEngine> sync_engine( 151 new SyncEngine(drive_service.Pass(), 152 drive_uploader.Pass(), 153 worker_task_runner, 154 notification_manager, 155 extension_service, 156 signin_manager)); 157 sync_engine->Initialize( 158 GetSyncFileSystemDir(context->GetPath()), 159 file_task_runner.get(), 160 NULL); 161 162 return sync_engine.Pass(); 163} 164 165void SyncEngine::AppendDependsOnFactories( 166 std::set<BrowserContextKeyedServiceFactory*>* factories) { 167 DCHECK(factories); 168 factories->insert(drive::DriveNotificationManagerFactory::GetInstance()); 169 factories->insert(SigninManagerFactory::GetInstance()); 170 factories->insert( 171 extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory()); 172} 173 174SyncEngine::~SyncEngine() { 175 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this); 176 GetDriveService()->RemoveObserver(this); 177 if (notification_manager_) 178 notification_manager_->RemoveObserver(this); 179 180 // TODO(tzik): Destroy |sync_worker_| and |worker_observer_| on the worker. 181} 182 183void SyncEngine::Initialize(const base::FilePath& base_dir, 184 base::SequencedTaskRunner* file_task_runner, 185 leveldb::Env* env_override) { 186 scoped_ptr<SyncEngineContext> sync_engine_context( 187 new SyncEngineContext(drive_service_.get(), 188 drive_uploader_.get(), 189 base::MessageLoopProxy::current(), 190 worker_task_runner_, 191 file_task_runner)); 192 worker_observer_.reset( 193 new WorkerObserver(base::MessageLoopProxy::current(), 194 weak_ptr_factory_.GetWeakPtr())); 195 196 base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr; 197 if (extension_service_) 198 extension_service_weak_ptr = extension_service_->AsWeakPtr(); 199 200 // TODO(peria): Use PostTask on |worker_task_runner_| to call this function. 201 sync_worker_ = SyncWorker::CreateOnWorker( 202 base_dir, 203 worker_observer_.get(), 204 extension_service_weak_ptr, 205 sync_engine_context.Pass(), 206 env_override); 207 208 if (notification_manager_) 209 notification_manager_->AddObserver(this); 210 GetDriveService()->AddObserver(this); 211 net::NetworkChangeNotifier::AddNetworkChangeObserver(this); 212} 213 214void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) { 215 service_observers_.AddObserver(observer); 216} 217 218void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) { 219 file_status_observers_.AddObserver(observer); 220} 221 222void SyncEngine::RegisterOrigin( 223 const GURL& origin, const SyncStatusCallback& callback) { 224 worker_task_runner_->PostTask( 225 FROM_HERE, 226 base::Bind(&SyncWorker::RegisterOrigin, 227 base::Unretained(sync_worker_.get()), 228 origin, 229 RelayCallbackToCurrentThread( 230 FROM_HERE, callback))); 231} 232 233void SyncEngine::EnableOrigin( 234 const GURL& origin, const SyncStatusCallback& callback) { 235 worker_task_runner_->PostTask( 236 FROM_HERE, 237 base::Bind(&SyncWorker::EnableOrigin, 238 base::Unretained(sync_worker_.get()), 239 origin, 240 RelayCallbackToCurrentThread( 241 FROM_HERE, callback))); 242} 243 244void SyncEngine::DisableOrigin( 245 const GURL& origin, const SyncStatusCallback& callback) { 246 worker_task_runner_->PostTask( 247 FROM_HERE, 248 base::Bind(&SyncWorker::DisableOrigin, 249 base::Unretained(sync_worker_.get()), 250 origin, 251 RelayCallbackToCurrentThread( 252 FROM_HERE, callback))); 253} 254 255void SyncEngine::UninstallOrigin( 256 const GURL& origin, 257 UninstallFlag flag, 258 const SyncStatusCallback& callback) { 259 worker_task_runner_->PostTask( 260 FROM_HERE, 261 base::Bind(&SyncWorker::UninstallOrigin, 262 base::Unretained(sync_worker_.get()), 263 origin, flag, 264 RelayCallbackToCurrentThread( 265 FROM_HERE, callback))); 266} 267 268void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) { 269 worker_task_runner_->PostTask( 270 FROM_HERE, 271 base::Bind(&SyncWorker::ProcessRemoteChange, 272 base::Unretained(sync_worker_.get()), 273 RelayCallbackToCurrentThread( 274 FROM_HERE, callback))); 275} 276 277void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) { 278 worker_task_runner_->PostTask( 279 FROM_HERE, 280 base::Bind(&SyncWorker::SetRemoteChangeProcessor, 281 base::Unretained(sync_worker_.get()), 282 processor)); 283} 284 285LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() { 286 return this; 287} 288 289bool SyncEngine::IsConflicting(const fileapi::FileSystemURL& url) { 290 // TODO(tzik): Implement this before we support manual conflict resolution. 291 return false; 292} 293 294RemoteServiceState SyncEngine::GetCurrentState() const { 295 // TODO(peria): Post task 296 return sync_worker_->GetCurrentState(); 297} 298 299void SyncEngine::GetOriginStatusMap(OriginStatusMap* status_map) { 300 // TODO(peria): Make this route asynchronous. 301 sync_worker_->GetOriginStatusMap(status_map); 302} 303 304scoped_ptr<base::ListValue> SyncEngine::DumpFiles(const GURL& origin) { 305 // TODO(peria): Make this route asynchronous. 306 return sync_worker_->DumpFiles(origin); 307} 308 309scoped_ptr<base::ListValue> SyncEngine::DumpDatabase() { 310 // TODO(peria): Make this route asynchronous. 311 return sync_worker_->DumpDatabase(); 312} 313 314void SyncEngine::SetSyncEnabled(bool enabled) { 315 worker_task_runner_->PostTask( 316 FROM_HERE, 317 base::Bind(&SyncWorker::SetSyncEnabled, 318 base::Unretained(sync_worker_.get()), 319 enabled)); 320} 321 322void SyncEngine::UpdateSyncEnabled(bool enabled) { 323 const char* status_message = enabled ? "Sync is enabled" : "Sync is disabled"; 324 FOR_EACH_OBSERVER( 325 Observer, service_observers_, 326 OnRemoteServiceStateUpdated(GetCurrentState(), status_message)); 327} 328 329SyncStatusCode SyncEngine::SetDefaultConflictResolutionPolicy( 330 ConflictResolutionPolicy policy) { 331 // TODO(peria): Make this route asynchronous. 332 return sync_worker_->SetDefaultConflictResolutionPolicy(policy); 333} 334 335SyncStatusCode SyncEngine::SetConflictResolutionPolicy( 336 const GURL& origin, 337 ConflictResolutionPolicy policy) { 338 // TODO(peria): Make this route asynchronous. 339 return sync_worker_->SetConflictResolutionPolicy(origin, policy); 340} 341 342ConflictResolutionPolicy SyncEngine::GetDefaultConflictResolutionPolicy() 343 const { 344 // TODO(peria): Make this route asynchronous. 345 return sync_worker_->GetDefaultConflictResolutionPolicy(); 346} 347 348ConflictResolutionPolicy SyncEngine::GetConflictResolutionPolicy( 349 const GURL& origin) const { 350 // TODO(peria): Make this route asynchronous. 351 return sync_worker_->GetConflictResolutionPolicy(origin); 352} 353 354void SyncEngine::GetRemoteVersions( 355 const fileapi::FileSystemURL& url, 356 const RemoteVersionsCallback& callback) { 357 // TODO(tzik): Implement this before we support manual conflict resolution. 358 callback.Run(SYNC_STATUS_FAILED, std::vector<Version>()); 359} 360 361void SyncEngine::DownloadRemoteVersion( 362 const fileapi::FileSystemURL& url, 363 const std::string& version_id, 364 const DownloadVersionCallback& callback) { 365 // TODO(tzik): Implement this before we support manual conflict resolution. 366 callback.Run(SYNC_STATUS_FAILED, webkit_blob::ScopedFile()); 367} 368 369void SyncEngine::PromoteDemotedChanges() { 370 MetadataDatabase* metadata_db = GetMetadataDatabase(); 371 if (metadata_db && metadata_db->HasLowPriorityDirtyTracker()) { 372 metadata_db->PromoteLowerPriorityTrackersToNormal(); 373 FOR_EACH_OBSERVER( 374 Observer, 375 service_observers_, 376 OnRemoteChangeQueueUpdated(metadata_db->CountDirtyTracker())); 377 } 378} 379 380void SyncEngine::ApplyLocalChange( 381 const FileChange& local_change, 382 const base::FilePath& local_path, 383 const SyncFileMetadata& local_metadata, 384 const fileapi::FileSystemURL& url, 385 const SyncStatusCallback& callback) { 386 worker_task_runner_->PostTask( 387 FROM_HERE, 388 base::Bind(&SyncWorker::ApplyLocalChange, 389 base::Unretained(sync_worker_.get()), 390 local_change, 391 local_path, 392 local_metadata, 393 url, 394 RelayCallbackToCurrentThread( 395 FROM_HERE, callback))); 396} 397 398SyncTaskManager* SyncEngine::GetSyncTaskManagerForTesting() { 399 // TODO(peria): Post task 400 return sync_worker_->GetSyncTaskManager(); 401} 402 403void SyncEngine::OnNotificationReceived() { 404 worker_task_runner_->PostTask( 405 FROM_HERE, 406 base::Bind(&SyncWorker::OnNotificationReceived, 407 base::Unretained(sync_worker_.get()))); 408} 409 410void SyncEngine::OnPushNotificationEnabled(bool) {} 411 412void SyncEngine::OnReadyToSendRequests() { 413 const std::string account_id = 414 signin_manager_ ? signin_manager_->GetAuthenticatedAccountId() : ""; 415 416 worker_task_runner_->PostTask( 417 FROM_HERE, 418 base::Bind(&SyncWorker::OnReadyToSendRequests, 419 base::Unretained(sync_worker_.get()), 420 account_id)); 421} 422 423void SyncEngine::OnRefreshTokenInvalid() { 424 worker_task_runner_->PostTask( 425 FROM_HERE, 426 base::Bind(&SyncWorker::OnRefreshTokenInvalid, 427 base::Unretained(sync_worker_.get()))); 428} 429 430void SyncEngine::OnNetworkChanged( 431 net::NetworkChangeNotifier::ConnectionType type) { 432 worker_task_runner_->PostTask( 433 FROM_HERE, 434 base::Bind(&SyncWorker::OnNetworkChanged, 435 base::Unretained(sync_worker_.get()), 436 type)); 437} 438 439drive::DriveServiceInterface* SyncEngine::GetDriveService() { 440 return drive_service_.get(); 441} 442 443drive::DriveUploaderInterface* SyncEngine::GetDriveUploader() { 444 return drive_uploader_.get(); 445} 446 447MetadataDatabase* SyncEngine::GetMetadataDatabase() { 448 // TODO(peria): Post task 449 return sync_worker_->GetMetadataDatabase(); 450} 451 452SyncEngine::SyncEngine( 453 scoped_ptr<drive::DriveServiceInterface> drive_service, 454 scoped_ptr<drive::DriveUploaderInterface> drive_uploader, 455 base::SequencedTaskRunner* worker_task_runner, 456 drive::DriveNotificationManager* notification_manager, 457 ExtensionServiceInterface* extension_service, 458 SigninManagerBase* signin_manager) 459 : drive_service_(drive_service.Pass()), 460 drive_uploader_(drive_uploader.Pass()), 461 notification_manager_(notification_manager), 462 extension_service_(extension_service), 463 signin_manager_(signin_manager), 464 worker_task_runner_(worker_task_runner), 465 weak_ptr_factory_(this) {} 466 467void SyncEngine::OnPendingFileListUpdated(int item_count) { 468 FOR_EACH_OBSERVER( 469 Observer, 470 service_observers_, 471 OnRemoteChangeQueueUpdated(item_count)); 472} 473 474void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url, 475 SyncFileStatus file_status, 476 SyncAction sync_action, 477 SyncDirection direction) { 478 FOR_EACH_OBSERVER(FileStatusObserver, 479 file_status_observers_, 480 OnFileStatusChanged( 481 url, file_status, sync_action, direction)); 482} 483 484void SyncEngine::UpdateServiceState(RemoteServiceState state, 485 const std::string& description) { 486 FOR_EACH_OBSERVER( 487 Observer, service_observers_, 488 OnRemoteServiceStateUpdated(state, description)); 489} 490 491void SyncEngine::UpdateRegisteredApps() { 492 if (!extension_service_) 493 return; 494 495 MetadataDatabase* metadata_db = GetMetadataDatabase(); 496 DCHECK(metadata_db); 497 std::vector<std::string> app_ids; 498 metadata_db->GetRegisteredAppIDs(&app_ids); 499 500 // Update the status of every origin using status from ExtensionService. 501 for (std::vector<std::string>::const_iterator itr = app_ids.begin(); 502 itr != app_ids.end(); ++itr) { 503 const std::string& app_id = *itr; 504 GURL origin = 505 extensions::Extension::GetBaseURLFromExtensionId(app_id); 506 if (!extension_service_->GetInstalledExtension(app_id)) { 507 // Extension has been uninstalled. 508 // (At this stage we can't know if it was unpacked extension or not, 509 // so just purge the remote folder.) 510 UninstallOrigin(origin, 511 RemoteFileSyncService::UNINSTALL_AND_PURGE_REMOTE, 512 base::Bind(&EmptyStatusCallback)); 513 continue; 514 } 515 FileTracker tracker; 516 if (!metadata_db->FindAppRootTracker(app_id, &tracker)) { 517 // App will register itself on first run. 518 continue; 519 } 520 bool is_app_enabled = extension_service_->IsExtensionEnabled(app_id); 521 bool is_app_root_tracker_enabled = 522 tracker.tracker_kind() == TRACKER_KIND_APP_ROOT; 523 if (is_app_enabled && !is_app_root_tracker_enabled) 524 EnableOrigin(origin, base::Bind(&EmptyStatusCallback)); 525 else if (!is_app_enabled && is_app_root_tracker_enabled) 526 DisableOrigin(origin, base::Bind(&EmptyStatusCallback)); 527 } 528} 529 530} // namespace drive_backend 531} // namespace sync_file_system 532