sync_engine.cc revision 1320f92c476a1ad9d19dba2a48c72b75566198e9
1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h" 6 7#include <vector> 8 9#include "base/bind.h" 10#include "base/metrics/histogram.h" 11#include "base/thread_task_runner_handle.h" 12#include "base/threading/sequenced_worker_pool.h" 13#include "base/time/time.h" 14#include "base/values.h" 15#include "chrome/browser/drive/drive_api_service.h" 16#include "chrome/browser/drive/drive_notification_manager.h" 17#include "chrome/browser/drive/drive_notification_manager_factory.h" 18#include "chrome/browser/drive/drive_service_interface.h" 19#include "chrome/browser/drive/drive_uploader.h" 20#include "chrome/browser/extensions/extension_service.h" 21#include "chrome/browser/profiles/profile.h" 22#include "chrome/browser/signin/profile_oauth2_token_service_factory.h" 23#include "chrome/browser/signin/signin_manager_factory.h" 24#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h" 25#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h" 26#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h" 27#include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h" 28#include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h" 29#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h" 30#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h" 31#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h" 32#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h" 33#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h" 34#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h" 35#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h" 36#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h" 37#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h" 38#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h" 39#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h" 40#include "chrome/browser/sync_file_system/drive_backend/sync_task.h" 41#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h" 42#include "chrome/browser/sync_file_system/drive_backend/sync_worker_interface.h" 43#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h" 44#include "chrome/browser/sync_file_system/file_status_observer.h" 45#include "chrome/browser/sync_file_system/logger.h" 46#include "chrome/browser/sync_file_system/syncable_file_system_util.h" 47#include "components/signin/core/browser/profile_oauth2_token_service.h" 48#include "components/signin/core/browser/signin_manager.h" 49#include "content/public/browser/browser_thread.h" 50#include "extensions/browser/extension_system.h" 51#include "extensions/browser/extension_system_provider.h" 52#include "extensions/browser/extensions_browser_client.h" 53#include "extensions/common/extension.h" 54#include "google_apis/drive/drive_api_url_generator.h" 55#include "google_apis/drive/gdata_wapi_url_generator.h" 56#include "net/url_request/url_request_context_getter.h" 57#include "storage/common/blob/scoped_file.h" 58#include "storage/common/fileapi/file_system_util.h" 59 60namespace sync_file_system { 61 62class RemoteChangeProcessor; 63 64namespace drive_backend { 65 66scoped_ptr<drive::DriveServiceInterface> 67SyncEngine::DriveServiceFactory::CreateDriveService( 68 OAuth2TokenService* oauth2_token_service, 69 net::URLRequestContextGetter* url_request_context_getter, 70 base::SequencedTaskRunner* blocking_task_runner) { 71 return scoped_ptr<drive::DriveServiceInterface>( 72 new drive::DriveAPIService( 73 oauth2_token_service, 74 url_request_context_getter, 75 blocking_task_runner, 76 GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction), 77 GURL(google_apis::DriveApiUrlGenerator:: 78 kBaseDownloadUrlForProduction), 79 GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction), 80 std::string() /* custom_user_agent */)); 81} 82 83class SyncEngine::WorkerObserver : public SyncWorkerInterface::Observer { 84 public: 85 WorkerObserver(base::SequencedTaskRunner* ui_task_runner, 86 base::WeakPtr<SyncEngine> sync_engine) 87 : ui_task_runner_(ui_task_runner), 88 sync_engine_(sync_engine) { 89 sequence_checker_.DetachFromSequence(); 90 } 91 92 virtual ~WorkerObserver() { 93 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 94 } 95 96 virtual void OnPendingFileListUpdated(int item_count) OVERRIDE { 97 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 98 if (sync_engine_) 99 sync_engine_->OnPendingFileListUpdated(item_count); 100 return; 101 } 102 103 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 104 ui_task_runner_->PostTask( 105 FROM_HERE, 106 base::Bind(&SyncEngine::OnPendingFileListUpdated, 107 sync_engine_, 108 item_count)); 109 } 110 111 virtual void OnFileStatusChanged(const storage::FileSystemURL& url, 112 SyncFileStatus file_status, 113 SyncAction sync_action, 114 SyncDirection direction) OVERRIDE { 115 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 116 if (sync_engine_) 117 sync_engine_->OnFileStatusChanged( 118 url, file_status, sync_action, direction); 119 return; 120 } 121 122 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 123 ui_task_runner_->PostTask( 124 FROM_HERE, 125 base::Bind(&SyncEngine::OnFileStatusChanged, 126 sync_engine_, 127 url, file_status, sync_action, direction)); 128 } 129 130 virtual void UpdateServiceState(RemoteServiceState state, 131 const std::string& description) OVERRIDE { 132 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 133 if (sync_engine_) 134 sync_engine_->UpdateServiceState(state, description); 135 return; 136 } 137 138 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 139 ui_task_runner_->PostTask( 140 FROM_HERE, 141 base::Bind(&SyncEngine::UpdateServiceState, 142 sync_engine_, state, description)); 143 } 144 145 void DetachFromSequence() { 146 sequence_checker_.DetachFromSequence(); 147 } 148 149 private: 150 scoped_refptr<base::SequencedTaskRunner> ui_task_runner_; 151 base::WeakPtr<SyncEngine> sync_engine_; 152 153 base::SequenceChecker sequence_checker_; 154 155 DISALLOW_COPY_AND_ASSIGN(WorkerObserver); 156}; 157 158namespace { 159 160void DidRegisterOrigin(const base::TimeTicks& start_time, 161 const SyncStatusCallback& callback, 162 SyncStatusCode status) { 163 base::TimeDelta delta(base::TimeTicks::Now() - start_time); 164 LOCAL_HISTOGRAM_TIMES("SyncFileSystem.RegisterOriginTime", delta); 165 callback.Run(status); 166} 167 168template <typename T> 169void DeleteSoonHelper(scoped_ptr<T>) {} 170 171template <typename T> 172void DeleteSoon(const tracked_objects::Location& from_here, 173 base::TaskRunner* task_runner, 174 scoped_ptr<T> obj) { 175 if (!obj) 176 return; 177 178 T* obj_ptr = obj.get(); 179 base::Closure deleter = 180 base::Bind(&DeleteSoonHelper<T>, base::Passed(&obj)); 181 if (!task_runner->PostTask(from_here, deleter)) { 182 obj_ptr->DetachFromSequence(); 183 deleter.Run(); 184 } 185} 186 187} // namespace 188 189scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext( 190 content::BrowserContext* context, 191 TaskLogger* task_logger) { 192 scoped_refptr<base::SequencedWorkerPool> worker_pool = 193 content::BrowserThread::GetBlockingPool(); 194 195 scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner = 196 base::ThreadTaskRunnerHandle::Get(); 197 scoped_refptr<base::SequencedTaskRunner> worker_task_runner = 198 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 199 worker_pool->GetSequenceToken(), 200 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN); 201 scoped_refptr<base::SequencedTaskRunner> drive_task_runner = 202 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 203 worker_pool->GetSequenceToken(), 204 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN); 205 206 Profile* profile = Profile::FromBrowserContext(context); 207 drive::DriveNotificationManager* notification_manager = 208 drive::DriveNotificationManagerFactory::GetForBrowserContext(context); 209 ExtensionService* extension_service = 210 extensions::ExtensionSystem::Get(context)->extension_service(); 211 SigninManagerBase* signin_manager = 212 SigninManagerFactory::GetForProfile(profile); 213 OAuth2TokenService* token_service = 214 ProfileOAuth2TokenServiceFactory::GetForProfile(profile); 215 scoped_refptr<net::URLRequestContextGetter> request_context = 216 context->GetRequestContext(); 217 218 scoped_ptr<drive_backend::SyncEngine> sync_engine( 219 new SyncEngine(ui_task_runner.get(), 220 worker_task_runner.get(), 221 drive_task_runner.get(), 222 GetSyncFileSystemDir(context->GetPath()), 223 task_logger, 224 notification_manager, 225 extension_service, 226 signin_manager, 227 token_service, 228 request_context.get(), 229 make_scoped_ptr(new DriveServiceFactory()), 230 NULL /* env_override */)); 231 232 sync_engine->Initialize(); 233 return sync_engine.Pass(); 234} 235 236void SyncEngine::AppendDependsOnFactories( 237 std::set<BrowserContextKeyedServiceFactory*>* factories) { 238 DCHECK(factories); 239 factories->insert(drive::DriveNotificationManagerFactory::GetInstance()); 240 factories->insert(SigninManagerFactory::GetInstance()); 241 factories->insert( 242 extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory()); 243 factories->insert(ProfileOAuth2TokenServiceFactory::GetInstance()); 244} 245 246SyncEngine::~SyncEngine() { 247 Reset(); 248 249 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this); 250 if (signin_manager_) 251 signin_manager_->RemoveObserver(this); 252 if (notification_manager_) 253 notification_manager_->RemoveObserver(this); 254} 255 256void SyncEngine::Reset() { 257 if (drive_service_) 258 drive_service_->RemoveObserver(this); 259 260 DeleteSoon(FROM_HERE, worker_task_runner_.get(), sync_worker_.Pass()); 261 DeleteSoon(FROM_HERE, worker_task_runner_.get(), worker_observer_.Pass()); 262 DeleteSoon(FROM_HERE, 263 worker_task_runner_.get(), 264 remote_change_processor_on_worker_.Pass()); 265 266 drive_service_wrapper_.reset(); 267 drive_service_.reset(); 268 drive_uploader_wrapper_.reset(); 269 drive_uploader_.reset(); 270 remote_change_processor_wrapper_.reset(); 271 callback_tracker_.AbortAll(); 272} 273 274void SyncEngine::Initialize() { 275 Reset(); 276 277 if (!signin_manager_ || !signin_manager_->IsAuthenticated()) 278 return; 279 280 DCHECK(drive_service_factory_); 281 scoped_ptr<drive::DriveServiceInterface> drive_service = 282 drive_service_factory_->CreateDriveService( 283 token_service_, request_context_.get(), drive_task_runner_.get()); 284 scoped_ptr<drive::DriveUploaderInterface> drive_uploader( 285 new drive::DriveUploader(drive_service.get(), drive_task_runner_.get())); 286 287 InitializeInternal(drive_service.Pass(), drive_uploader.Pass(), 288 scoped_ptr<SyncWorkerInterface>()); 289} 290 291void SyncEngine::InitializeForTesting( 292 scoped_ptr<drive::DriveServiceInterface> drive_service, 293 scoped_ptr<drive::DriveUploaderInterface> drive_uploader, 294 scoped_ptr<SyncWorkerInterface> sync_worker) { 295 Reset(); 296 InitializeInternal(drive_service.Pass(), drive_uploader.Pass(), 297 sync_worker.Pass()); 298} 299 300void SyncEngine::InitializeInternal( 301 scoped_ptr<drive::DriveServiceInterface> drive_service, 302 scoped_ptr<drive::DriveUploaderInterface> drive_uploader, 303 scoped_ptr<SyncWorkerInterface> sync_worker) { 304 drive_service_ = drive_service.Pass(); 305 drive_service_wrapper_.reset(new DriveServiceWrapper(drive_service_.get())); 306 307 std::string account_id; 308 if (signin_manager_) 309 account_id = signin_manager_->GetAuthenticatedAccountId(); 310 drive_service_->Initialize(account_id); 311 312 drive_uploader_ = drive_uploader.Pass(); 313 drive_uploader_wrapper_.reset( 314 new DriveUploaderWrapper(drive_uploader_.get())); 315 316 // DriveServiceWrapper and DriveServiceOnWorker relay communications 317 // between DriveService and syncers in SyncWorker. 318 scoped_ptr<drive::DriveServiceInterface> drive_service_on_worker( 319 new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(), 320 ui_task_runner_.get(), 321 worker_task_runner_.get())); 322 scoped_ptr<drive::DriveUploaderInterface> drive_uploader_on_worker( 323 new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(), 324 ui_task_runner_.get(), 325 worker_task_runner_.get())); 326 scoped_ptr<SyncEngineContext> sync_engine_context( 327 new SyncEngineContext(drive_service_on_worker.Pass(), 328 drive_uploader_on_worker.Pass(), 329 task_logger_, 330 ui_task_runner_.get(), 331 worker_task_runner_.get())); 332 333 worker_observer_.reset(new WorkerObserver(ui_task_runner_.get(), 334 weak_ptr_factory_.GetWeakPtr())); 335 336 base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr; 337 if (extension_service_) 338 extension_service_weak_ptr = extension_service_->AsWeakPtr(); 339 340 if (!sync_worker) { 341 sync_worker.reset(new SyncWorker( 342 sync_file_system_dir_, 343 extension_service_weak_ptr, 344 env_override_)); 345 } 346 347 sync_worker_ = sync_worker.Pass(); 348 sync_worker_->AddObserver(worker_observer_.get()); 349 350 worker_task_runner_->PostTask( 351 FROM_HERE, 352 base::Bind(&SyncWorkerInterface::Initialize, 353 base::Unretained(sync_worker_.get()), 354 base::Passed(&sync_engine_context))); 355 if (remote_change_processor_) 356 SetRemoteChangeProcessor(remote_change_processor_); 357 358 drive_service_->AddObserver(this); 359 360 service_state_ = REMOTE_SERVICE_TEMPORARY_UNAVAILABLE; 361 OnNetworkChanged(net::NetworkChangeNotifier::GetConnectionType()); 362 if (drive_service_->HasRefreshToken()) 363 OnReadyToSendRequests(); 364 else 365 OnRefreshTokenInvalid(); 366} 367 368void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) { 369 service_observers_.AddObserver(observer); 370} 371 372void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) { 373 file_status_observers_.AddObserver(observer); 374} 375 376void SyncEngine::RegisterOrigin(const GURL& origin, 377 const SyncStatusCallback& callback) { 378 if (!sync_worker_) { 379 // TODO(tzik): Record |origin| and retry the registration after late 380 // sign-in. Then, return SYNC_STATUS_OK. 381 if (!signin_manager_ || !signin_manager_->IsAuthenticated()) 382 callback.Run(SYNC_STATUS_AUTHENTICATION_FAILED); 383 else 384 callback.Run(SYNC_STATUS_ABORT); 385 return; 386 } 387 388 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 389 FROM_HERE, base::Bind(&DidRegisterOrigin, base::TimeTicks::Now(), 390 TrackCallback(callback))); 391 392 worker_task_runner_->PostTask( 393 FROM_HERE, 394 base::Bind(&SyncWorkerInterface::RegisterOrigin, 395 base::Unretained(sync_worker_.get()), 396 origin, relayed_callback)); 397} 398 399void SyncEngine::EnableOrigin( 400 const GURL& origin, const SyncStatusCallback& callback) { 401 if (!sync_worker_) { 402 // It's safe to return OK immediately since this is also checked in 403 // SyncWorker initialization. 404 callback.Run(SYNC_STATUS_OK); 405 return; 406 } 407 408 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 409 FROM_HERE, TrackCallback(callback)); 410 411 worker_task_runner_->PostTask( 412 FROM_HERE, 413 base::Bind(&SyncWorkerInterface::EnableOrigin, 414 base::Unretained(sync_worker_.get()), 415 origin, relayed_callback)); 416} 417 418void SyncEngine::DisableOrigin( 419 const GURL& origin, const SyncStatusCallback& callback) { 420 if (!sync_worker_) { 421 // It's safe to return OK immediately since this is also checked in 422 // SyncWorker initialization. 423 callback.Run(SYNC_STATUS_OK); 424 return; 425 } 426 427 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 428 FROM_HERE, TrackCallback(callback)); 429 430 worker_task_runner_->PostTask( 431 FROM_HERE, 432 base::Bind(&SyncWorkerInterface::DisableOrigin, 433 base::Unretained(sync_worker_.get()), 434 origin, 435 relayed_callback)); 436} 437 438void SyncEngine::UninstallOrigin( 439 const GURL& origin, 440 UninstallFlag flag, 441 const SyncStatusCallback& callback) { 442 if (!sync_worker_) { 443 // It's safe to return OK immediately since this is also checked in 444 // SyncWorker initialization. 445 callback.Run(SYNC_STATUS_OK); 446 return; 447 } 448 449 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 450 FROM_HERE, TrackCallback(callback)); 451 worker_task_runner_->PostTask( 452 FROM_HERE, 453 base::Bind(&SyncWorkerInterface::UninstallOrigin, 454 base::Unretained(sync_worker_.get()), 455 origin, flag, relayed_callback)); 456} 457 458void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) { 459 if (GetCurrentState() == REMOTE_SERVICE_DISABLED) { 460 callback.Run(SYNC_STATUS_SYNC_DISABLED, storage::FileSystemURL()); 461 return; 462 } 463 464 base::Closure abort_closure = 465 base::Bind(callback, SYNC_STATUS_ABORT, storage::FileSystemURL()); 466 467 if (!sync_worker_) { 468 abort_closure.Run(); 469 return; 470 } 471 472 SyncFileCallback tracked_callback = callback_tracker_.Register( 473 abort_closure, callback); 474 SyncFileCallback relayed_callback = RelayCallbackToCurrentThread( 475 FROM_HERE, tracked_callback); 476 worker_task_runner_->PostTask( 477 FROM_HERE, 478 base::Bind(&SyncWorkerInterface::ProcessRemoteChange, 479 base::Unretained(sync_worker_.get()), 480 relayed_callback)); 481} 482 483void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) { 484 remote_change_processor_ = processor; 485 486 if (!sync_worker_) 487 return; 488 489 remote_change_processor_wrapper_.reset( 490 new RemoteChangeProcessorWrapper(processor)); 491 492 remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker( 493 remote_change_processor_wrapper_->AsWeakPtr(), 494 ui_task_runner_.get(), 495 worker_task_runner_.get())); 496 497 worker_task_runner_->PostTask( 498 FROM_HERE, 499 base::Bind(&SyncWorkerInterface::SetRemoteChangeProcessor, 500 base::Unretained(sync_worker_.get()), 501 remote_change_processor_on_worker_.get())); 502} 503 504LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() { 505 return this; 506} 507 508RemoteServiceState SyncEngine::GetCurrentState() const { 509 if (!sync_enabled_) 510 return REMOTE_SERVICE_DISABLED; 511 if (!has_refresh_token_) 512 return REMOTE_SERVICE_AUTHENTICATION_REQUIRED; 513 return service_state_; 514} 515 516void SyncEngine::GetOriginStatusMap(const StatusMapCallback& callback) { 517 base::Closure abort_closure = 518 base::Bind(callback, base::Passed(scoped_ptr<OriginStatusMap>())); 519 520 if (!sync_worker_) { 521 abort_closure.Run(); 522 return; 523 } 524 525 StatusMapCallback tracked_callback = 526 callback_tracker_.Register(abort_closure, callback); 527 StatusMapCallback relayed_callback = 528 RelayCallbackToCurrentThread(FROM_HERE, tracked_callback); 529 530 worker_task_runner_->PostTask( 531 FROM_HERE, 532 base::Bind(&SyncWorkerInterface::GetOriginStatusMap, 533 base::Unretained(sync_worker_.get()), 534 relayed_callback)); 535} 536 537void SyncEngine::DumpFiles(const GURL& origin, 538 const ListCallback& callback) { 539 base::Closure abort_closure = 540 base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>())); 541 542 if (!sync_worker_) { 543 abort_closure.Run(); 544 return; 545 } 546 547 ListCallback tracked_callback = 548 callback_tracker_.Register(abort_closure, callback); 549 550 PostTaskAndReplyWithResult(worker_task_runner_.get(), 551 FROM_HERE, 552 base::Bind(&SyncWorkerInterface::DumpFiles, 553 base::Unretained(sync_worker_.get()), 554 origin), 555 tracked_callback); 556} 557 558void SyncEngine::DumpDatabase(const ListCallback& callback) { 559 base::Closure abort_closure = 560 base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>())); 561 562 if (!sync_worker_) { 563 abort_closure.Run(); 564 return; 565 } 566 567 ListCallback tracked_callback = 568 callback_tracker_.Register(abort_closure, callback); 569 570 PostTaskAndReplyWithResult(worker_task_runner_.get(), 571 FROM_HERE, 572 base::Bind(&SyncWorkerInterface::DumpDatabase, 573 base::Unretained(sync_worker_.get())), 574 tracked_callback); 575} 576 577void SyncEngine::SetSyncEnabled(bool sync_enabled) { 578 if (sync_enabled_ == sync_enabled) 579 return; 580 sync_enabled_ = sync_enabled; 581 582 if (sync_enabled_) { 583 if (!sync_worker_) 584 Initialize(); 585 586 // Have no login credential. 587 if (!sync_worker_) 588 return; 589 590 worker_task_runner_->PostTask( 591 FROM_HERE, 592 base::Bind(&SyncWorkerInterface::SetSyncEnabled, 593 base::Unretained(sync_worker_.get()), 594 sync_enabled_)); 595 return; 596 } 597 598 if (!sync_worker_) 599 return; 600 601 // TODO(tzik): Consider removing SyncWorkerInterface::SetSyncEnabled and 602 // let SyncEngine handle the flag. 603 worker_task_runner_->PostTask( 604 FROM_HERE, 605 base::Bind(&SyncWorkerInterface::SetSyncEnabled, 606 base::Unretained(sync_worker_.get()), 607 sync_enabled_)); 608 Reset(); 609} 610 611void SyncEngine::PromoteDemotedChanges(const base::Closure& callback) { 612 if (!sync_worker_) { 613 callback.Run(); 614 return; 615 } 616 617 base::Closure relayed_callback = RelayCallbackToCurrentThread( 618 FROM_HERE, callback_tracker_.Register(callback, callback)); 619 620 worker_task_runner_->PostTask( 621 FROM_HERE, 622 base::Bind(&SyncWorkerInterface::PromoteDemotedChanges, 623 base::Unretained(sync_worker_.get()), 624 relayed_callback)); 625} 626 627void SyncEngine::ApplyLocalChange(const FileChange& local_change, 628 const base::FilePath& local_path, 629 const SyncFileMetadata& local_metadata, 630 const storage::FileSystemURL& url, 631 const SyncStatusCallback& callback) { 632 if (GetCurrentState() == REMOTE_SERVICE_DISABLED) { 633 callback.Run(SYNC_STATUS_SYNC_DISABLED); 634 return; 635 } 636 637 if (!sync_worker_) { 638 callback.Run(SYNC_STATUS_ABORT); 639 return; 640 } 641 642 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 643 FROM_HERE, TrackCallback(callback)); 644 worker_task_runner_->PostTask( 645 FROM_HERE, 646 base::Bind(&SyncWorkerInterface::ApplyLocalChange, 647 base::Unretained(sync_worker_.get()), 648 local_change, 649 local_path, 650 local_metadata, 651 url, 652 relayed_callback)); 653} 654 655void SyncEngine::OnNotificationReceived() { 656 if (!sync_worker_) 657 return; 658 659 worker_task_runner_->PostTask( 660 FROM_HERE, 661 base::Bind(&SyncWorkerInterface::ActivateService, 662 base::Unretained(sync_worker_.get()), 663 REMOTE_SERVICE_OK, 664 "Got push notification for Drive")); 665} 666 667void SyncEngine::OnPushNotificationEnabled(bool /* enabled */) {} 668 669void SyncEngine::OnReadyToSendRequests() { 670 has_refresh_token_ = true; 671 if (!sync_worker_) 672 return; 673 674 worker_task_runner_->PostTask( 675 FROM_HERE, 676 base::Bind(&SyncWorkerInterface::ActivateService, 677 base::Unretained(sync_worker_.get()), 678 REMOTE_SERVICE_OK, 679 "Authenticated")); 680} 681 682void SyncEngine::OnRefreshTokenInvalid() { 683 has_refresh_token_ = false; 684 if (!sync_worker_) 685 return; 686 687 worker_task_runner_->PostTask( 688 FROM_HERE, 689 base::Bind(&SyncWorkerInterface::DeactivateService, 690 base::Unretained(sync_worker_.get()), 691 "Found invalid refresh token.")); 692} 693 694void SyncEngine::OnNetworkChanged( 695 net::NetworkChangeNotifier::ConnectionType type) { 696 if (!sync_worker_) 697 return; 698 699 bool network_available_old = network_available_; 700 network_available_ = (type != net::NetworkChangeNotifier::CONNECTION_NONE); 701 702 if (!network_available_old && network_available_) { 703 worker_task_runner_->PostTask( 704 FROM_HERE, 705 base::Bind(&SyncWorkerInterface::ActivateService, 706 base::Unretained(sync_worker_.get()), 707 REMOTE_SERVICE_OK, 708 "Connected")); 709 } else if (network_available_old && !network_available_) { 710 worker_task_runner_->PostTask( 711 FROM_HERE, 712 base::Bind(&SyncWorkerInterface::DeactivateService, 713 base::Unretained(sync_worker_.get()), 714 "Disconnected")); 715 } 716} 717 718void SyncEngine::GoogleSigninFailed(const GoogleServiceAuthError& error) { 719 Reset(); 720 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 721 "Failed to sign in."); 722} 723 724void SyncEngine::GoogleSigninSucceeded(const std::string& account_id, 725 const std::string& username, 726 const std::string& password) { 727 Initialize(); 728} 729 730void SyncEngine::GoogleSignedOut(const std::string& account_id, 731 const std::string& username) { 732 Reset(); 733 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 734 "User signed out."); 735} 736 737SyncEngine::SyncEngine( 738 const scoped_refptr<base::SingleThreadTaskRunner>& ui_task_runner, 739 const scoped_refptr<base::SequencedTaskRunner>& worker_task_runner, 740 const scoped_refptr<base::SequencedTaskRunner>& drive_task_runner, 741 const base::FilePath& sync_file_system_dir, 742 TaskLogger* task_logger, 743 drive::DriveNotificationManager* notification_manager, 744 ExtensionServiceInterface* extension_service, 745 SigninManagerBase* signin_manager, 746 OAuth2TokenService* token_service, 747 net::URLRequestContextGetter* request_context, 748 scoped_ptr<DriveServiceFactory> drive_service_factory, 749 leveldb::Env* env_override) 750 : ui_task_runner_(ui_task_runner), 751 worker_task_runner_(worker_task_runner), 752 drive_task_runner_(drive_task_runner), 753 sync_file_system_dir_(sync_file_system_dir), 754 task_logger_(task_logger), 755 notification_manager_(notification_manager), 756 extension_service_(extension_service), 757 signin_manager_(signin_manager), 758 token_service_(token_service), 759 request_context_(request_context), 760 drive_service_factory_(drive_service_factory.Pass()), 761 remote_change_processor_(NULL), 762 service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE), 763 has_refresh_token_(false), 764 network_available_(false), 765 sync_enabled_(false), 766 env_override_(env_override), 767 weak_ptr_factory_(this) { 768 DCHECK(sync_file_system_dir_.IsAbsolute()); 769 if (notification_manager_) 770 notification_manager_->AddObserver(this); 771 if (signin_manager_) 772 signin_manager_->AddObserver(this); 773 net::NetworkChangeNotifier::AddNetworkChangeObserver(this); 774} 775 776void SyncEngine::OnPendingFileListUpdated(int item_count) { 777 FOR_EACH_OBSERVER( 778 SyncServiceObserver, 779 service_observers_, 780 OnRemoteChangeQueueUpdated(item_count)); 781} 782 783void SyncEngine::OnFileStatusChanged(const storage::FileSystemURL& url, 784 SyncFileStatus file_status, 785 SyncAction sync_action, 786 SyncDirection direction) { 787 FOR_EACH_OBSERVER(FileStatusObserver, 788 file_status_observers_, 789 OnFileStatusChanged( 790 url, file_status, sync_action, direction)); 791} 792 793void SyncEngine::UpdateServiceState(RemoteServiceState state, 794 const std::string& description) { 795 service_state_ = state; 796 797 FOR_EACH_OBSERVER( 798 SyncServiceObserver, service_observers_, 799 OnRemoteServiceStateUpdated(GetCurrentState(), description)); 800} 801 802SyncStatusCallback SyncEngine::TrackCallback( 803 const SyncStatusCallback& callback) { 804 return callback_tracker_.Register( 805 base::Bind(callback, SYNC_STATUS_ABORT), 806 callback); 807} 808 809} // namespace drive_backend 810} // namespace sync_file_system 811