sync_engine.cc revision cedac228d2dd51db4b79ea1e72c7f249408ee061
1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h" 6 7#include <vector> 8 9#include "base/bind.h" 10#include "base/metrics/histogram.h" 11#include "base/threading/sequenced_worker_pool.h" 12#include "base/time/time.h" 13#include "base/values.h" 14#include "chrome/browser/drive/drive_api_service.h" 15#include "chrome/browser/drive/drive_notification_manager.h" 16#include "chrome/browser/drive/drive_notification_manager_factory.h" 17#include "chrome/browser/drive/drive_service_interface.h" 18#include "chrome/browser/drive/drive_uploader.h" 19#include "chrome/browser/extensions/extension_service.h" 20#include "chrome/browser/profiles/profile.h" 21#include "chrome/browser/signin/profile_oauth2_token_service_factory.h" 22#include "chrome/browser/signin/signin_manager_factory.h" 23#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h" 24#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h" 25#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h" 26#include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h" 27#include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h" 28#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h" 29#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h" 30#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h" 31#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h" 32#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h" 33#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h" 34#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h" 35#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h" 36#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h" 37#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h" 38#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h" 39#include "chrome/browser/sync_file_system/drive_backend/sync_task.h" 40#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h" 41#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h" 42#include "chrome/browser/sync_file_system/file_status_observer.h" 43#include "chrome/browser/sync_file_system/logger.h" 44#include "chrome/browser/sync_file_system/syncable_file_system_util.h" 45#include "components/signin/core/browser/profile_oauth2_token_service.h" 46#include "components/signin/core/browser/signin_manager.h" 47#include "content/public/browser/browser_thread.h" 48#include "extensions/browser/extension_system.h" 49#include "extensions/browser/extension_system_provider.h" 50#include "extensions/browser/extensions_browser_client.h" 51#include "extensions/common/extension.h" 52#include "google_apis/drive/drive_api_url_generator.h" 53#include "google_apis/drive/gdata_wapi_url_generator.h" 54#include "webkit/common/blob/scoped_file.h" 55#include "webkit/common/fileapi/file_system_util.h" 56 57namespace sync_file_system { 58 59class RemoteChangeProcessor; 60 61namespace drive_backend { 62 63class SyncEngine::WorkerObserver : public SyncWorker::Observer { 64 public: 65 WorkerObserver(base::SequencedTaskRunner* ui_task_runner, 66 base::WeakPtr<SyncEngine> sync_engine) 67 : ui_task_runner_(ui_task_runner), 68 sync_engine_(sync_engine) {} 69 70 virtual ~WorkerObserver() {} 71 72 virtual void OnPendingFileListUpdated(int item_count) OVERRIDE { 73 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 74 if (sync_engine_) 75 sync_engine_->OnPendingFileListUpdated(item_count); 76 return; 77 } 78 79 ui_task_runner_->PostTask( 80 FROM_HERE, 81 base::Bind(&SyncEngine::OnPendingFileListUpdated, 82 sync_engine_, 83 item_count)); 84 } 85 86 virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url, 87 SyncFileStatus file_status, 88 SyncAction sync_action, 89 SyncDirection direction) OVERRIDE { 90 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 91 if (sync_engine_) 92 sync_engine_->OnFileStatusChanged( 93 url, file_status, sync_action, direction); 94 return; 95 } 96 97 ui_task_runner_->PostTask( 98 FROM_HERE, 99 base::Bind(&SyncEngine::OnFileStatusChanged, 100 sync_engine_, 101 url, file_status, sync_action, direction)); 102 } 103 104 virtual void UpdateServiceState(RemoteServiceState state, 105 const std::string& description) OVERRIDE { 106 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 107 if (sync_engine_) 108 sync_engine_->UpdateServiceState(state, description); 109 return; 110 } 111 112 ui_task_runner_->PostTask( 113 FROM_HERE, 114 base::Bind(&SyncEngine::UpdateServiceState, 115 sync_engine_, state, description)); 116 } 117 118 private: 119 scoped_refptr<base::SequencedTaskRunner> ui_task_runner_; 120 base::WeakPtr<SyncEngine> sync_engine_; 121 122 DISALLOW_COPY_AND_ASSIGN(WorkerObserver); 123}; 124 125namespace { 126 127void EmptyStatusCallback(SyncStatusCode status) {} 128 129void DidRegisterOrigin(const base::TimeTicks& start_time, 130 const SyncStatusCallback& callback, 131 SyncStatusCode status) { 132 base::TimeDelta delta(base::TimeTicks::Now() - start_time); 133 HISTOGRAM_TIMES("SyncFileSystem.RegisterOriginTime", delta); 134 callback.Run(status); 135} 136 137} // namespace 138 139scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext( 140 content::BrowserContext* context, 141 TaskLogger* task_logger) { 142 scoped_refptr<base::SequencedWorkerPool> worker_pool( 143 content::BrowserThread::GetBlockingPool()); 144 scoped_refptr<base::SequencedTaskRunner> drive_task_runner( 145 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 146 worker_pool->GetSequenceToken(), 147 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN)); 148 149 Profile* profile = Profile::FromBrowserContext(context); 150 ProfileOAuth2TokenService* token_service = 151 ProfileOAuth2TokenServiceFactory::GetForProfile(profile); 152 scoped_ptr<drive::DriveServiceInterface> drive_service( 153 new drive::DriveAPIService( 154 token_service, 155 context->GetRequestContext(), 156 drive_task_runner.get(), 157 GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction), 158 GURL(google_apis::DriveApiUrlGenerator:: 159 kBaseDownloadUrlForProduction), 160 GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction), 161 std::string() /* custom_user_agent */)); 162 SigninManagerBase* signin_manager = 163 SigninManagerFactory::GetForProfile(profile); 164 drive_service->Initialize(signin_manager->GetAuthenticatedAccountId()); 165 166 scoped_ptr<drive::DriveUploaderInterface> drive_uploader( 167 new drive::DriveUploader(drive_service.get(), drive_task_runner.get())); 168 169 drive::DriveNotificationManager* notification_manager = 170 drive::DriveNotificationManagerFactory::GetForBrowserContext(context); 171 ExtensionService* extension_service = 172 extensions::ExtensionSystem::Get(context)->extension_service(); 173 174 scoped_refptr<base::SequencedTaskRunner> file_task_runner( 175 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 176 worker_pool->GetSequenceToken(), 177 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN)); 178 179 // TODO(peria): Create another task runner to manage SyncWorker. 180 scoped_refptr<base::SingleThreadTaskRunner> 181 worker_task_runner = base::MessageLoopProxy::current(); 182 183 scoped_ptr<drive_backend::SyncEngine> sync_engine( 184 new SyncEngine(drive_service.Pass(), 185 drive_uploader.Pass(), 186 worker_task_runner, 187 notification_manager, 188 extension_service, 189 signin_manager)); 190 sync_engine->Initialize(GetSyncFileSystemDir(context->GetPath()), 191 task_logger, 192 file_task_runner.get(), 193 NULL); 194 195 return sync_engine.Pass(); 196} 197 198void SyncEngine::AppendDependsOnFactories( 199 std::set<BrowserContextKeyedServiceFactory*>* factories) { 200 DCHECK(factories); 201 factories->insert(drive::DriveNotificationManagerFactory::GetInstance()); 202 factories->insert(SigninManagerFactory::GetInstance()); 203 factories->insert( 204 extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory()); 205} 206 207SyncEngine::~SyncEngine() { 208 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this); 209 GetDriveService()->RemoveObserver(this); 210 if (notification_manager_) 211 notification_manager_->RemoveObserver(this); 212 213 WorkerObserver* worker_observer = worker_observer_.release(); 214 if (!worker_task_runner_->DeleteSoon(FROM_HERE, worker_observer)) 215 delete worker_observer; 216 217 SyncWorker* sync_worker = sync_worker_.release(); 218 if (!worker_task_runner_->DeleteSoon(FROM_HERE, sync_worker)) 219 delete sync_worker; 220} 221 222void SyncEngine::Initialize(const base::FilePath& base_dir, 223 TaskLogger* task_logger, 224 base::SequencedTaskRunner* file_task_runner, 225 leveldb::Env* env_override) { 226 // DriveServiceWrapper and DriveServiceOnWorker relay communications 227 // between DriveService and syncers in SyncWorker. 228 scoped_ptr<drive::DriveServiceInterface> drive_service_on_worker( 229 new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(), 230 base::MessageLoopProxy::current(), 231 worker_task_runner_)); 232 scoped_ptr<drive::DriveUploaderInterface> drive_uploader_on_worker( 233 new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(), 234 base::MessageLoopProxy::current(), 235 worker_task_runner_)); 236 scoped_ptr<SyncEngineContext> sync_engine_context( 237 new SyncEngineContext(drive_service_on_worker.Pass(), 238 drive_uploader_on_worker.Pass(), 239 task_logger, 240 base::MessageLoopProxy::current(), 241 worker_task_runner_, 242 file_task_runner)); 243 244 worker_observer_.reset( 245 new WorkerObserver(base::MessageLoopProxy::current(), 246 weak_ptr_factory_.GetWeakPtr())); 247 248 base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr; 249 if (extension_service_) 250 extension_service_weak_ptr = extension_service_->AsWeakPtr(); 251 252 sync_worker_.reset(new SyncWorker( 253 base_dir, 254 extension_service_weak_ptr, 255 sync_engine_context.Pass(), 256 env_override)); 257 sync_worker_->AddObserver(worker_observer_.get()); 258 worker_task_runner_->PostTask( 259 FROM_HERE, 260 base::Bind(&SyncWorker::Initialize, 261 base::Unretained(sync_worker_.get()))); 262 263 if (notification_manager_) 264 notification_manager_->AddObserver(this); 265 GetDriveService()->AddObserver(this); 266 net::NetworkChangeNotifier::AddNetworkChangeObserver(this); 267} 268 269void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) { 270 service_observers_.AddObserver(observer); 271} 272 273void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) { 274 file_status_observers_.AddObserver(observer); 275} 276 277void SyncEngine::RegisterOrigin( 278 const GURL& origin, const SyncStatusCallback& callback) { 279 worker_task_runner_->PostTask( 280 FROM_HERE, 281 base::Bind(&SyncWorker::RegisterOrigin, 282 base::Unretained(sync_worker_.get()), 283 origin, 284 RelayCallbackToCurrentThread( 285 FROM_HERE, 286 base::Bind(&DidRegisterOrigin, 287 base::TimeTicks::Now(), 288 callback)))); 289} 290 291void SyncEngine::EnableOrigin( 292 const GURL& origin, const SyncStatusCallback& callback) { 293 worker_task_runner_->PostTask( 294 FROM_HERE, 295 base::Bind(&SyncWorker::EnableOrigin, 296 base::Unretained(sync_worker_.get()), 297 origin, 298 RelayCallbackToCurrentThread( 299 FROM_HERE, callback))); 300} 301 302void SyncEngine::DisableOrigin( 303 const GURL& origin, const SyncStatusCallback& callback) { 304 worker_task_runner_->PostTask( 305 FROM_HERE, 306 base::Bind(&SyncWorker::DisableOrigin, 307 base::Unretained(sync_worker_.get()), 308 origin, 309 RelayCallbackToCurrentThread( 310 FROM_HERE, callback))); 311} 312 313void SyncEngine::UninstallOrigin( 314 const GURL& origin, 315 UninstallFlag flag, 316 const SyncStatusCallback& callback) { 317 worker_task_runner_->PostTask( 318 FROM_HERE, 319 base::Bind(&SyncWorker::UninstallOrigin, 320 base::Unretained(sync_worker_.get()), 321 origin, flag, 322 RelayCallbackToCurrentThread( 323 FROM_HERE, callback))); 324} 325 326void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) { 327 worker_task_runner_->PostTask( 328 FROM_HERE, 329 base::Bind(&SyncWorker::ProcessRemoteChange, 330 base::Unretained(sync_worker_.get()), 331 RelayCallbackToCurrentThread( 332 FROM_HERE, callback))); 333} 334 335void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) { 336 remote_change_processor_ = processor; 337 remote_change_processor_wrapper_.reset( 338 new RemoteChangeProcessorWrapper(processor)); 339 340 remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker( 341 remote_change_processor_wrapper_->AsWeakPtr(), 342 base::MessageLoopProxy::current(), /* ui_task_runner */ 343 worker_task_runner_)); 344 345 worker_task_runner_->PostTask( 346 FROM_HERE, 347 base::Bind(&SyncWorker::SetRemoteChangeProcessor, 348 base::Unretained(sync_worker_.get()), 349 remote_change_processor_on_worker_.get())); 350} 351 352LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() { 353 return this; 354} 355 356RemoteServiceState SyncEngine::GetCurrentState() const { 357 return service_state_; 358} 359 360void SyncEngine::GetOriginStatusMap(const StatusMapCallback& callback) { 361 worker_task_runner_->PostTask( 362 FROM_HERE, 363 base::Bind(&SyncWorker::GetOriginStatusMap, 364 base::Unretained(sync_worker_.get()), 365 RelayCallbackToCurrentThread(FROM_HERE, callback))); 366} 367 368void SyncEngine::DumpFiles(const GURL& origin, 369 const ListCallback& callback) { 370 PostTaskAndReplyWithResult( 371 worker_task_runner_, 372 FROM_HERE, 373 base::Bind(&SyncWorker::DumpFiles, 374 base::Unretained(sync_worker_.get()), 375 origin), 376 callback); 377} 378 379void SyncEngine::DumpDatabase(const ListCallback& callback) { 380 PostTaskAndReplyWithResult( 381 worker_task_runner_, 382 FROM_HERE, 383 base::Bind(&SyncWorker::DumpDatabase, 384 base::Unretained(sync_worker_.get())), 385 callback); 386} 387 388void SyncEngine::SetSyncEnabled(bool enabled) { 389 worker_task_runner_->PostTask( 390 FROM_HERE, 391 base::Bind(&SyncWorker::SetSyncEnabled, 392 base::Unretained(sync_worker_.get()), 393 enabled)); 394} 395 396void SyncEngine::PromoteDemotedChanges() { 397 MetadataDatabase* metadata_db = GetMetadataDatabase(); 398 if (metadata_db && metadata_db->HasLowPriorityDirtyTracker()) { 399 metadata_db->PromoteLowerPriorityTrackersToNormal(); 400 FOR_EACH_OBSERVER( 401 Observer, 402 service_observers_, 403 OnRemoteChangeQueueUpdated(metadata_db->CountDirtyTracker())); 404 } 405} 406 407void SyncEngine::ApplyLocalChange( 408 const FileChange& local_change, 409 const base::FilePath& local_path, 410 const SyncFileMetadata& local_metadata, 411 const fileapi::FileSystemURL& url, 412 const SyncStatusCallback& callback) { 413 worker_task_runner_->PostTask( 414 FROM_HERE, 415 base::Bind(&SyncWorker::ApplyLocalChange, 416 base::Unretained(sync_worker_.get()), 417 local_change, 418 local_path, 419 local_metadata, 420 url, 421 RelayCallbackToCurrentThread( 422 FROM_HERE, callback))); 423} 424 425SyncTaskManager* SyncEngine::GetSyncTaskManagerForTesting() { 426 return sync_worker_->GetSyncTaskManager(); 427} 428 429void SyncEngine::OnNotificationReceived() { 430 worker_task_runner_->PostTask( 431 FROM_HERE, 432 base::Bind(&SyncWorker::OnNotificationReceived, 433 base::Unretained(sync_worker_.get()))); 434} 435 436void SyncEngine::OnPushNotificationEnabled(bool) {} 437 438void SyncEngine::OnReadyToSendRequests() { 439 const std::string account_id = 440 signin_manager_ ? signin_manager_->GetAuthenticatedAccountId() : ""; 441 442 worker_task_runner_->PostTask( 443 FROM_HERE, 444 base::Bind(&SyncWorker::OnReadyToSendRequests, 445 base::Unretained(sync_worker_.get()), 446 account_id)); 447} 448 449void SyncEngine::OnRefreshTokenInvalid() { 450 worker_task_runner_->PostTask( 451 FROM_HERE, 452 base::Bind(&SyncWorker::OnRefreshTokenInvalid, 453 base::Unretained(sync_worker_.get()))); 454} 455 456void SyncEngine::OnNetworkChanged( 457 net::NetworkChangeNotifier::ConnectionType type) { 458 worker_task_runner_->PostTask( 459 FROM_HERE, 460 base::Bind(&SyncWorker::OnNetworkChanged, 461 base::Unretained(sync_worker_.get()), 462 type)); 463} 464 465drive::DriveServiceInterface* SyncEngine::GetDriveService() { 466 return drive_service_.get(); 467} 468 469drive::DriveUploaderInterface* SyncEngine::GetDriveUploader() { 470 return drive_uploader_.get(); 471} 472 473MetadataDatabase* SyncEngine::GetMetadataDatabase() { 474 // TODO(peria): Post task 475 return sync_worker_->GetMetadataDatabase(); 476} 477 478SyncEngine::SyncEngine( 479 scoped_ptr<drive::DriveServiceInterface> drive_service, 480 scoped_ptr<drive::DriveUploaderInterface> drive_uploader, 481 base::SequencedTaskRunner* worker_task_runner, 482 drive::DriveNotificationManager* notification_manager, 483 ExtensionServiceInterface* extension_service, 484 SigninManagerBase* signin_manager) 485 : drive_service_(drive_service.Pass()), 486 drive_service_wrapper_(new DriveServiceWrapper(drive_service_.get())), 487 drive_uploader_(drive_uploader.Pass()), 488 drive_uploader_wrapper_(new DriveUploaderWrapper(drive_uploader_.get())), 489 service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE), 490 notification_manager_(notification_manager), 491 extension_service_(extension_service), 492 signin_manager_(signin_manager), 493 worker_task_runner_(worker_task_runner), 494 weak_ptr_factory_(this) {} 495 496void SyncEngine::OnPendingFileListUpdated(int item_count) { 497 FOR_EACH_OBSERVER( 498 Observer, 499 service_observers_, 500 OnRemoteChangeQueueUpdated(item_count)); 501} 502 503void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url, 504 SyncFileStatus file_status, 505 SyncAction sync_action, 506 SyncDirection direction) { 507 FOR_EACH_OBSERVER(FileStatusObserver, 508 file_status_observers_, 509 OnFileStatusChanged( 510 url, file_status, sync_action, direction)); 511} 512 513void SyncEngine::UpdateServiceState(RemoteServiceState state, 514 const std::string& description) { 515 service_state_ = state; 516 517 FOR_EACH_OBSERVER( 518 Observer, service_observers_, 519 OnRemoteServiceStateUpdated(state, description)); 520} 521 522void SyncEngine::UpdateRegisteredApps() { 523 if (!extension_service_) 524 return; 525 526 MetadataDatabase* metadata_db = GetMetadataDatabase(); 527 DCHECK(metadata_db); 528 std::vector<std::string> app_ids; 529 metadata_db->GetRegisteredAppIDs(&app_ids); 530 531 // Update the status of every origin using status from ExtensionService. 532 for (std::vector<std::string>::const_iterator itr = app_ids.begin(); 533 itr != app_ids.end(); ++itr) { 534 const std::string& app_id = *itr; 535 GURL origin = 536 extensions::Extension::GetBaseURLFromExtensionId(app_id); 537 if (!extension_service_->GetInstalledExtension(app_id)) { 538 // Extension has been uninstalled. 539 // (At this stage we can't know if it was unpacked extension or not, 540 // so just purge the remote folder.) 541 UninstallOrigin(origin, 542 RemoteFileSyncService::UNINSTALL_AND_PURGE_REMOTE, 543 base::Bind(&EmptyStatusCallback)); 544 continue; 545 } 546 FileTracker tracker; 547 if (!metadata_db->FindAppRootTracker(app_id, &tracker)) { 548 // App will register itself on first run. 549 continue; 550 } 551 bool is_app_enabled = extension_service_->IsExtensionEnabled(app_id); 552 bool is_app_root_tracker_enabled = 553 tracker.tracker_kind() == TRACKER_KIND_APP_ROOT; 554 if (is_app_enabled && !is_app_root_tracker_enabled) 555 EnableOrigin(origin, base::Bind(&EmptyStatusCallback)); 556 else if (!is_app_enabled && is_app_root_tracker_enabled) 557 DisableOrigin(origin, base::Bind(&EmptyStatusCallback)); 558 } 559} 560 561} // namespace drive_backend 562} // namespace sync_file_system 563