sync_engine.cc revision 116680a4aac90f2aa7413d9095a592090648e557
1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h" 6 7#include <vector> 8 9#include "base/bind.h" 10#include "base/metrics/histogram.h" 11#include "base/thread_task_runner_handle.h" 12#include "base/threading/sequenced_worker_pool.h" 13#include "base/time/time.h" 14#include "base/values.h" 15#include "chrome/browser/drive/drive_api_service.h" 16#include "chrome/browser/drive/drive_notification_manager.h" 17#include "chrome/browser/drive/drive_notification_manager_factory.h" 18#include "chrome/browser/drive/drive_service_interface.h" 19#include "chrome/browser/drive/drive_uploader.h" 20#include "chrome/browser/extensions/extension_service.h" 21#include "chrome/browser/profiles/profile.h" 22#include "chrome/browser/signin/profile_oauth2_token_service_factory.h" 23#include "chrome/browser/signin/signin_manager_factory.h" 24#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h" 25#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h" 26#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h" 27#include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h" 28#include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h" 29#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h" 30#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h" 31#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h" 32#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h" 33#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h" 34#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h" 35#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h" 36#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h" 37#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h" 38#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h" 39#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h" 40#include "chrome/browser/sync_file_system/drive_backend/sync_task.h" 41#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h" 42#include "chrome/browser/sync_file_system/drive_backend/sync_worker_interface.h" 43#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h" 44#include "chrome/browser/sync_file_system/file_status_observer.h" 45#include "chrome/browser/sync_file_system/logger.h" 46#include "chrome/browser/sync_file_system/syncable_file_system_util.h" 47#include "components/signin/core/browser/profile_oauth2_token_service.h" 48#include "components/signin/core/browser/signin_manager.h" 49#include "content/public/browser/browser_thread.h" 50#include "extensions/browser/extension_system.h" 51#include "extensions/browser/extension_system_provider.h" 52#include "extensions/browser/extensions_browser_client.h" 53#include "extensions/common/extension.h" 54#include "google_apis/drive/drive_api_url_generator.h" 55#include "google_apis/drive/gdata_wapi_url_generator.h" 56#include "net/url_request/url_request_context_getter.h" 57#include "webkit/common/blob/scoped_file.h" 58#include "webkit/common/fileapi/file_system_util.h" 59 60namespace sync_file_system { 61 62class RemoteChangeProcessor; 63 64namespace drive_backend { 65 66class SyncEngine::WorkerObserver : public SyncWorkerInterface::Observer { 67 public: 68 WorkerObserver(base::SequencedTaskRunner* ui_task_runner, 69 base::WeakPtr<SyncEngine> sync_engine) 70 : ui_task_runner_(ui_task_runner), 71 sync_engine_(sync_engine) { 72 sequence_checker_.DetachFromSequence(); 73 } 74 75 virtual ~WorkerObserver() { 76 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 77 } 78 79 virtual void OnPendingFileListUpdated(int item_count) OVERRIDE { 80 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 81 if (sync_engine_) 82 sync_engine_->OnPendingFileListUpdated(item_count); 83 return; 84 } 85 86 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 87 ui_task_runner_->PostTask( 88 FROM_HERE, 89 base::Bind(&SyncEngine::OnPendingFileListUpdated, 90 sync_engine_, 91 item_count)); 92 } 93 94 virtual void OnFileStatusChanged(const fileapi::FileSystemURL& url, 95 SyncFileStatus file_status, 96 SyncAction sync_action, 97 SyncDirection direction) OVERRIDE { 98 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 99 if (sync_engine_) 100 sync_engine_->OnFileStatusChanged( 101 url, file_status, sync_action, direction); 102 return; 103 } 104 105 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 106 ui_task_runner_->PostTask( 107 FROM_HERE, 108 base::Bind(&SyncEngine::OnFileStatusChanged, 109 sync_engine_, 110 url, file_status, sync_action, direction)); 111 } 112 113 virtual void UpdateServiceState(RemoteServiceState state, 114 const std::string& description) OVERRIDE { 115 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 116 if (sync_engine_) 117 sync_engine_->UpdateServiceState(state, description); 118 return; 119 } 120 121 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 122 ui_task_runner_->PostTask( 123 FROM_HERE, 124 base::Bind(&SyncEngine::UpdateServiceState, 125 sync_engine_, state, description)); 126 } 127 128 void DetachFromSequence() { 129 sequence_checker_.DetachFromSequence(); 130 } 131 132 private: 133 scoped_refptr<base::SequencedTaskRunner> ui_task_runner_; 134 base::WeakPtr<SyncEngine> sync_engine_; 135 136 base::SequenceChecker sequence_checker_; 137 138 DISALLOW_COPY_AND_ASSIGN(WorkerObserver); 139}; 140 141namespace { 142 143void DidRegisterOrigin(const base::TimeTicks& start_time, 144 const SyncStatusCallback& callback, 145 SyncStatusCode status) { 146 base::TimeDelta delta(base::TimeTicks::Now() - start_time); 147 HISTOGRAM_TIMES("SyncFileSystem.RegisterOriginTime", delta); 148 callback.Run(status); 149} 150 151template <typename T> 152void DeleteSoonHelper(scoped_ptr<T>) {} 153 154template <typename T> 155void DeleteSoon(const tracked_objects::Location& from_here, 156 base::TaskRunner* task_runner, 157 scoped_ptr<T> obj) { 158 if (!obj) 159 return; 160 161 T* obj_ptr = obj.get(); 162 base::Closure deleter = 163 base::Bind(&DeleteSoonHelper<T>, base::Passed(&obj)); 164 if (!task_runner->PostTask(from_here, deleter)) { 165 obj_ptr->DetachFromSequence(); 166 deleter.Run(); 167 } 168} 169 170} // namespace 171 172scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext( 173 content::BrowserContext* context, 174 TaskLogger* task_logger) { 175 scoped_refptr<base::SequencedWorkerPool> worker_pool = 176 content::BrowserThread::GetBlockingPool(); 177 178 scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner = 179 base::ThreadTaskRunnerHandle::Get(); 180 scoped_refptr<base::SequencedTaskRunner> worker_task_runner = 181 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 182 worker_pool->GetSequenceToken(), 183 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN); 184 scoped_refptr<base::SequencedTaskRunner> file_task_runner = 185 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 186 worker_pool->GetSequenceToken(), 187 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN); 188 scoped_refptr<base::SequencedTaskRunner> drive_task_runner = 189 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 190 worker_pool->GetSequenceToken(), 191 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN); 192 193 Profile* profile = Profile::FromBrowserContext(context); 194 drive::DriveNotificationManager* notification_manager = 195 drive::DriveNotificationManagerFactory::GetForBrowserContext(context); 196 ExtensionService* extension_service = 197 extensions::ExtensionSystem::Get(context)->extension_service(); 198 SigninManagerBase* signin_manager = 199 SigninManagerFactory::GetForProfile(profile); 200 ProfileOAuth2TokenService* token_service = 201 ProfileOAuth2TokenServiceFactory::GetForProfile(profile); 202 scoped_refptr<net::URLRequestContextGetter> request_context = 203 context->GetRequestContext(); 204 205 scoped_ptr<drive_backend::SyncEngine> sync_engine( 206 new SyncEngine(ui_task_runner, 207 worker_task_runner, 208 file_task_runner, 209 drive_task_runner, 210 GetSyncFileSystemDir(context->GetPath()), 211 task_logger, 212 notification_manager, 213 extension_service, 214 signin_manager, 215 token_service, 216 request_context, 217 NULL /* env_override */)); 218 219 sync_engine->Initialize(); 220 return sync_engine.Pass(); 221} 222 223void SyncEngine::AppendDependsOnFactories( 224 std::set<BrowserContextKeyedServiceFactory*>* factories) { 225 DCHECK(factories); 226 factories->insert(drive::DriveNotificationManagerFactory::GetInstance()); 227 factories->insert(SigninManagerFactory::GetInstance()); 228 factories->insert( 229 extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory()); 230 factories->insert(ProfileOAuth2TokenServiceFactory::GetInstance()); 231} 232 233SyncEngine::~SyncEngine() { 234 Reset(); 235 236 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this); 237 if (signin_manager_) 238 signin_manager_->RemoveObserver(this); 239 if (notification_manager_) 240 notification_manager_->RemoveObserver(this); 241} 242 243void SyncEngine::Reset() { 244 if (drive_service_) 245 drive_service_->RemoveObserver(this); 246 247 DeleteSoon(FROM_HERE, worker_task_runner_, sync_worker_.Pass()); 248 DeleteSoon(FROM_HERE, worker_task_runner_, worker_observer_.Pass()); 249 DeleteSoon(FROM_HERE, worker_task_runner_, 250 remote_change_processor_on_worker_.Pass()); 251 252 drive_service_wrapper_.reset(); 253 drive_service_.reset(); 254 drive_uploader_wrapper_.reset(); 255 drive_uploader_.reset(); 256 remote_change_processor_wrapper_.reset(); 257 callback_tracker_.AbortAll(); 258} 259 260void SyncEngine::Initialize() { 261 Reset(); 262 263 if (!signin_manager_ || 264 signin_manager_->GetAuthenticatedAccountId().empty()) 265 return; 266 267 scoped_ptr<drive::DriveServiceInterface> drive_service( 268 new drive::DriveAPIService( 269 token_service_, 270 request_context_, 271 drive_task_runner_, 272 GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction), 273 GURL(google_apis::DriveApiUrlGenerator:: 274 kBaseDownloadUrlForProduction), 275 GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction), 276 std::string() /* custom_user_agent */)); 277 scoped_ptr<drive::DriveUploaderInterface> drive_uploader( 278 new drive::DriveUploader(drive_service.get(), drive_task_runner_)); 279 280 InitializeInternal(drive_service.Pass(), drive_uploader.Pass(), 281 scoped_ptr<SyncWorkerInterface>()); 282} 283 284void SyncEngine::InitializeForTesting( 285 scoped_ptr<drive::DriveServiceInterface> drive_service, 286 scoped_ptr<drive::DriveUploaderInterface> drive_uploader, 287 scoped_ptr<SyncWorkerInterface> sync_worker) { 288 Reset(); 289 InitializeInternal(drive_service.Pass(), drive_uploader.Pass(), 290 sync_worker.Pass()); 291} 292 293void SyncEngine::InitializeInternal( 294 scoped_ptr<drive::DriveServiceInterface> drive_service, 295 scoped_ptr<drive::DriveUploaderInterface> drive_uploader, 296 scoped_ptr<SyncWorkerInterface> sync_worker) { 297 drive_service_ = drive_service.Pass(); 298 drive_service_wrapper_.reset(new DriveServiceWrapper(drive_service_.get())); 299 300 std::string account_id; 301 if (signin_manager_) 302 account_id = signin_manager_->GetAuthenticatedAccountId(); 303 drive_service_->Initialize(account_id); 304 305 drive_uploader_ = drive_uploader.Pass(); 306 drive_uploader_wrapper_.reset( 307 new DriveUploaderWrapper(drive_uploader_.get())); 308 309 // DriveServiceWrapper and DriveServiceOnWorker relay communications 310 // between DriveService and syncers in SyncWorker. 311 scoped_ptr<drive::DriveServiceInterface> drive_service_on_worker( 312 new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(), 313 ui_task_runner_, 314 worker_task_runner_)); 315 scoped_ptr<drive::DriveUploaderInterface> drive_uploader_on_worker( 316 new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(), 317 ui_task_runner_, 318 worker_task_runner_)); 319 scoped_ptr<SyncEngineContext> sync_engine_context( 320 new SyncEngineContext(drive_service_on_worker.Pass(), 321 drive_uploader_on_worker.Pass(), 322 task_logger_, 323 ui_task_runner_, 324 worker_task_runner_, 325 file_task_runner_)); 326 327 worker_observer_.reset( 328 new WorkerObserver(ui_task_runner_, weak_ptr_factory_.GetWeakPtr())); 329 330 base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr; 331 if (extension_service_) 332 extension_service_weak_ptr = extension_service_->AsWeakPtr(); 333 334 if (!sync_worker) { 335 sync_worker.reset(new SyncWorker( 336 sync_file_system_dir_, 337 extension_service_weak_ptr, 338 env_override_)); 339 } 340 341 sync_worker_ = sync_worker.Pass(); 342 sync_worker_->AddObserver(worker_observer_.get()); 343 344 worker_task_runner_->PostTask( 345 FROM_HERE, 346 base::Bind(&SyncWorkerInterface::Initialize, 347 base::Unretained(sync_worker_.get()), 348 base::Passed(&sync_engine_context))); 349 if (remote_change_processor_) 350 SetRemoteChangeProcessor(remote_change_processor_); 351 352 drive_service_->AddObserver(this); 353 354 service_state_ = REMOTE_SERVICE_TEMPORARY_UNAVAILABLE; 355 SetSyncEnabled(sync_enabled_); 356 OnNetworkChanged(net::NetworkChangeNotifier::GetConnectionType()); 357} 358 359void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) { 360 service_observers_.AddObserver(observer); 361} 362 363void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) { 364 file_status_observers_.AddObserver(observer); 365} 366 367void SyncEngine::RegisterOrigin(const GURL& origin, 368 const SyncStatusCallback& callback) { 369 if (!sync_worker_) { 370 // TODO(tzik): Record |origin| and retry the registration after late 371 // sign-in. Then, return SYNC_STATUS_OK. 372 if (!signin_manager_ || 373 signin_manager_->GetAuthenticatedAccountId().empty()) 374 callback.Run(SYNC_STATUS_AUTHENTICATION_FAILED); 375 else 376 callback.Run(SYNC_STATUS_ABORT); 377 return; 378 } 379 380 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 381 FROM_HERE, base::Bind(&DidRegisterOrigin, base::TimeTicks::Now(), 382 TrackCallback(callback))); 383 384 worker_task_runner_->PostTask( 385 FROM_HERE, 386 base::Bind(&SyncWorkerInterface::RegisterOrigin, 387 base::Unretained(sync_worker_.get()), 388 origin, relayed_callback)); 389} 390 391void SyncEngine::EnableOrigin( 392 const GURL& origin, const SyncStatusCallback& callback) { 393 if (!sync_worker_) { 394 // It's safe to return OK immediately since this is also checked in 395 // SyncWorker initialization. 396 callback.Run(SYNC_STATUS_OK); 397 return; 398 } 399 400 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 401 FROM_HERE, TrackCallback(callback)); 402 403 worker_task_runner_->PostTask( 404 FROM_HERE, 405 base::Bind(&SyncWorkerInterface::EnableOrigin, 406 base::Unretained(sync_worker_.get()), 407 origin, relayed_callback)); 408} 409 410void SyncEngine::DisableOrigin( 411 const GURL& origin, const SyncStatusCallback& callback) { 412 if (!sync_worker_) { 413 // It's safe to return OK immediately since this is also checked in 414 // SyncWorker initialization. 415 callback.Run(SYNC_STATUS_OK); 416 return; 417 } 418 419 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 420 FROM_HERE, TrackCallback(callback)); 421 422 worker_task_runner_->PostTask( 423 FROM_HERE, 424 base::Bind(&SyncWorkerInterface::DisableOrigin, 425 base::Unretained(sync_worker_.get()), 426 origin, 427 relayed_callback)); 428} 429 430void SyncEngine::UninstallOrigin( 431 const GURL& origin, 432 UninstallFlag flag, 433 const SyncStatusCallback& callback) { 434 if (!sync_worker_) { 435 // It's safe to return OK immediately since this is also checked in 436 // SyncWorker initialization. 437 callback.Run(SYNC_STATUS_OK); 438 return; 439 } 440 441 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 442 FROM_HERE, TrackCallback(callback)); 443 worker_task_runner_->PostTask( 444 FROM_HERE, 445 base::Bind(&SyncWorkerInterface::UninstallOrigin, 446 base::Unretained(sync_worker_.get()), 447 origin, flag, relayed_callback)); 448} 449 450void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) { 451 if (GetCurrentState() == REMOTE_SERVICE_DISABLED) { 452 callback.Run(SYNC_STATUS_SYNC_DISABLED, fileapi::FileSystemURL()); 453 return; 454 } 455 456 base::Closure abort_closure = 457 base::Bind(callback, SYNC_STATUS_ABORT, fileapi::FileSystemURL()); 458 459 if (!sync_worker_) { 460 abort_closure.Run(); 461 return; 462 } 463 464 SyncFileCallback tracked_callback = callback_tracker_.Register( 465 abort_closure, callback); 466 SyncFileCallback relayed_callback = RelayCallbackToCurrentThread( 467 FROM_HERE, tracked_callback); 468 worker_task_runner_->PostTask( 469 FROM_HERE, 470 base::Bind(&SyncWorkerInterface::ProcessRemoteChange, 471 base::Unretained(sync_worker_.get()), 472 relayed_callback)); 473} 474 475void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) { 476 remote_change_processor_ = processor; 477 478 if (!sync_worker_) 479 return; 480 481 remote_change_processor_wrapper_.reset( 482 new RemoteChangeProcessorWrapper(processor)); 483 484 remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker( 485 remote_change_processor_wrapper_->AsWeakPtr(), 486 ui_task_runner_, worker_task_runner_)); 487 488 worker_task_runner_->PostTask( 489 FROM_HERE, 490 base::Bind(&SyncWorkerInterface::SetRemoteChangeProcessor, 491 base::Unretained(sync_worker_.get()), 492 remote_change_processor_on_worker_.get())); 493} 494 495LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() { 496 return this; 497} 498 499RemoteServiceState SyncEngine::GetCurrentState() const { 500 if (!sync_enabled_) 501 return REMOTE_SERVICE_DISABLED; 502 return service_state_; 503} 504 505void SyncEngine::GetOriginStatusMap(const StatusMapCallback& callback) { 506 base::Closure abort_closure = 507 base::Bind(callback, base::Passed(scoped_ptr<OriginStatusMap>())); 508 509 if (!sync_worker_) { 510 abort_closure.Run(); 511 return; 512 } 513 514 StatusMapCallback tracked_callback = 515 callback_tracker_.Register(abort_closure, callback); 516 StatusMapCallback relayed_callback = 517 RelayCallbackToCurrentThread(FROM_HERE, tracked_callback); 518 519 worker_task_runner_->PostTask( 520 FROM_HERE, 521 base::Bind(&SyncWorkerInterface::GetOriginStatusMap, 522 base::Unretained(sync_worker_.get()), 523 relayed_callback)); 524} 525 526void SyncEngine::DumpFiles(const GURL& origin, 527 const ListCallback& callback) { 528 base::Closure abort_closure = 529 base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>())); 530 531 if (!sync_worker_) { 532 abort_closure.Run(); 533 return; 534 } 535 536 ListCallback tracked_callback = 537 callback_tracker_.Register(abort_closure, callback); 538 539 PostTaskAndReplyWithResult( 540 worker_task_runner_, 541 FROM_HERE, 542 base::Bind(&SyncWorkerInterface::DumpFiles, 543 base::Unretained(sync_worker_.get()), 544 origin), 545 tracked_callback); 546} 547 548void SyncEngine::DumpDatabase(const ListCallback& callback) { 549 base::Closure abort_closure = 550 base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>())); 551 552 if (!sync_worker_) { 553 abort_closure.Run(); 554 return; 555 } 556 557 ListCallback tracked_callback = 558 callback_tracker_.Register(abort_closure, callback); 559 560 PostTaskAndReplyWithResult( 561 worker_task_runner_, 562 FROM_HERE, 563 base::Bind(&SyncWorkerInterface::DumpDatabase, 564 base::Unretained(sync_worker_.get())), 565 tracked_callback); 566} 567 568void SyncEngine::SetSyncEnabled(bool sync_enabled) { 569 sync_enabled_ = sync_enabled; 570 571 if (!sync_worker_) 572 return; 573 574 worker_task_runner_->PostTask( 575 FROM_HERE, 576 base::Bind(&SyncWorkerInterface::SetSyncEnabled, 577 base::Unretained(sync_worker_.get()), 578 sync_enabled)); 579} 580 581void SyncEngine::PromoteDemotedChanges() { 582 if (!sync_worker_) 583 return; 584 585 worker_task_runner_->PostTask( 586 FROM_HERE, 587 base::Bind(&SyncWorkerInterface::PromoteDemotedChanges, 588 base::Unretained(sync_worker_.get()))); 589} 590 591void SyncEngine::ApplyLocalChange( 592 const FileChange& local_change, 593 const base::FilePath& local_path, 594 const SyncFileMetadata& local_metadata, 595 const fileapi::FileSystemURL& url, 596 const SyncStatusCallback& callback) { 597 if (GetCurrentState() == REMOTE_SERVICE_DISABLED) { 598 callback.Run(SYNC_STATUS_SYNC_DISABLED); 599 return; 600 } 601 602 if (!sync_worker_) { 603 callback.Run(SYNC_STATUS_ABORT); 604 return; 605 } 606 607 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 608 FROM_HERE, TrackCallback(callback)); 609 worker_task_runner_->PostTask( 610 FROM_HERE, 611 base::Bind(&SyncWorkerInterface::ApplyLocalChange, 612 base::Unretained(sync_worker_.get()), 613 local_change, 614 local_path, 615 local_metadata, 616 url, 617 relayed_callback)); 618} 619 620void SyncEngine::OnNotificationReceived() { 621 if (!sync_worker_) 622 return; 623 624 worker_task_runner_->PostTask( 625 FROM_HERE, 626 base::Bind(&SyncWorkerInterface::OnNotificationReceived, 627 base::Unretained(sync_worker_.get()))); 628} 629 630void SyncEngine::OnPushNotificationEnabled(bool) {} 631 632void SyncEngine::OnReadyToSendRequests() { 633 if (!sync_worker_) 634 return; 635 636 worker_task_runner_->PostTask( 637 FROM_HERE, 638 base::Bind(&SyncWorkerInterface::OnReadyToSendRequests, 639 base::Unretained(sync_worker_.get()))); 640} 641 642void SyncEngine::OnRefreshTokenInvalid() { 643 if (!sync_worker_) 644 return; 645 646 worker_task_runner_->PostTask( 647 FROM_HERE, 648 base::Bind(&SyncWorkerInterface::OnRefreshTokenInvalid, 649 base::Unretained(sync_worker_.get()))); 650} 651 652void SyncEngine::OnNetworkChanged( 653 net::NetworkChangeNotifier::ConnectionType type) { 654 if (!sync_worker_) 655 return; 656 657 worker_task_runner_->PostTask( 658 FROM_HERE, 659 base::Bind(&SyncWorkerInterface::OnNetworkChanged, 660 base::Unretained(sync_worker_.get()), 661 type)); 662} 663 664void SyncEngine::GoogleSigninFailed(const GoogleServiceAuthError& error) { 665 Reset(); 666 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 667 "Failed to sign in."); 668} 669 670void SyncEngine::GoogleSigninSucceeded(const std::string& username, 671 const std::string& password) { 672 Initialize(); 673} 674 675void SyncEngine::GoogleSignedOut(const std::string& username) { 676 Reset(); 677 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 678 "User signed out."); 679} 680 681SyncEngine::SyncEngine( 682 base::SingleThreadTaskRunner* ui_task_runner, 683 base::SequencedTaskRunner* worker_task_runner, 684 base::SequencedTaskRunner* file_task_runner, 685 base::SequencedTaskRunner* drive_task_runner, 686 const base::FilePath& sync_file_system_dir, 687 TaskLogger* task_logger, 688 drive::DriveNotificationManager* notification_manager, 689 ExtensionServiceInterface* extension_service, 690 SigninManagerBase* signin_manager, 691 ProfileOAuth2TokenService* token_service, 692 net::URLRequestContextGetter* request_context, 693 leveldb::Env* env_override) 694 : ui_task_runner_(ui_task_runner), 695 worker_task_runner_(worker_task_runner), 696 file_task_runner_(file_task_runner), 697 drive_task_runner_(drive_task_runner), 698 sync_file_system_dir_(sync_file_system_dir), 699 task_logger_(task_logger), 700 notification_manager_(notification_manager), 701 extension_service_(extension_service), 702 signin_manager_(signin_manager), 703 token_service_(token_service), 704 request_context_(request_context), 705 remote_change_processor_(NULL), 706 service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE), 707 sync_enabled_(false), 708 env_override_(env_override), 709 weak_ptr_factory_(this) { 710 DCHECK(sync_file_system_dir_.IsAbsolute()); 711 if (notification_manager_) 712 notification_manager_->AddObserver(this); 713 if (signin_manager_) 714 signin_manager_->AddObserver(this); 715 net::NetworkChangeNotifier::AddNetworkChangeObserver(this); 716} 717 718void SyncEngine::OnPendingFileListUpdated(int item_count) { 719 FOR_EACH_OBSERVER( 720 SyncServiceObserver, 721 service_observers_, 722 OnRemoteChangeQueueUpdated(item_count)); 723} 724 725void SyncEngine::OnFileStatusChanged(const fileapi::FileSystemURL& url, 726 SyncFileStatus file_status, 727 SyncAction sync_action, 728 SyncDirection direction) { 729 FOR_EACH_OBSERVER(FileStatusObserver, 730 file_status_observers_, 731 OnFileStatusChanged( 732 url, file_status, sync_action, direction)); 733} 734 735void SyncEngine::UpdateServiceState(RemoteServiceState state, 736 const std::string& description) { 737 service_state_ = state; 738 739 FOR_EACH_OBSERVER( 740 SyncServiceObserver, service_observers_, 741 OnRemoteServiceStateUpdated(state, description)); 742} 743 744SyncStatusCallback SyncEngine::TrackCallback( 745 const SyncStatusCallback& callback) { 746 return callback_tracker_.Register( 747 base::Bind(callback, SYNC_STATUS_ABORT), 748 callback); 749} 750 751} // namespace drive_backend 752} // namespace sync_file_system 753