sync_engine.cc revision 03b57e008b61dfcb1fbad3aea950ae0e001748b0
1// Copyright 2013 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h" 6 7#include <vector> 8 9#include "base/bind.h" 10#include "base/metrics/histogram.h" 11#include "base/thread_task_runner_handle.h" 12#include "base/threading/sequenced_worker_pool.h" 13#include "base/time/time.h" 14#include "base/values.h" 15#include "chrome/browser/drive/drive_api_service.h" 16#include "chrome/browser/drive/drive_notification_manager.h" 17#include "chrome/browser/drive/drive_notification_manager_factory.h" 18#include "chrome/browser/drive/drive_service_interface.h" 19#include "chrome/browser/drive/drive_uploader.h" 20#include "chrome/browser/extensions/extension_service.h" 21#include "chrome/browser/profiles/profile.h" 22#include "chrome/browser/signin/profile_oauth2_token_service_factory.h" 23#include "chrome/browser/signin/signin_manager_factory.h" 24#include "chrome/browser/sync_file_system/drive_backend/callback_helper.h" 25#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h" 26#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h" 27#include "chrome/browser/sync_file_system/drive_backend/drive_service_on_worker.h" 28#include "chrome/browser/sync_file_system/drive_backend/drive_service_wrapper.h" 29#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_on_worker.h" 30#include "chrome/browser/sync_file_system/drive_backend/drive_uploader_wrapper.h" 31#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h" 32#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h" 33#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h" 34#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h" 35#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_on_worker.h" 36#include "chrome/browser/sync_file_system/drive_backend/remote_change_processor_wrapper.h" 37#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h" 38#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h" 39#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h" 40#include "chrome/browser/sync_file_system/drive_backend/sync_task.h" 41#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h" 42#include "chrome/browser/sync_file_system/drive_backend/sync_worker_interface.h" 43#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h" 44#include "chrome/browser/sync_file_system/file_status_observer.h" 45#include "chrome/browser/sync_file_system/logger.h" 46#include "chrome/browser/sync_file_system/syncable_file_system_util.h" 47#include "components/signin/core/browser/profile_oauth2_token_service.h" 48#include "components/signin/core/browser/signin_manager.h" 49#include "content/public/browser/browser_thread.h" 50#include "extensions/browser/extension_system.h" 51#include "extensions/browser/extension_system_provider.h" 52#include "extensions/browser/extensions_browser_client.h" 53#include "extensions/common/extension.h" 54#include "google_apis/drive/drive_api_url_generator.h" 55#include "google_apis/drive/gdata_wapi_url_generator.h" 56#include "net/url_request/url_request_context_getter.h" 57#include "webkit/common/blob/scoped_file.h" 58#include "webkit/common/fileapi/file_system_util.h" 59 60namespace sync_file_system { 61 62class RemoteChangeProcessor; 63 64namespace drive_backend { 65 66scoped_ptr<drive::DriveServiceInterface> 67SyncEngine::DriveServiceFactory::CreateDriveService( 68 OAuth2TokenService* oauth2_token_service, 69 net::URLRequestContextGetter* url_request_context_getter, 70 base::SequencedTaskRunner* blocking_task_runner) { 71 return scoped_ptr<drive::DriveServiceInterface>( 72 new drive::DriveAPIService( 73 oauth2_token_service, 74 url_request_context_getter, 75 blocking_task_runner, 76 GURL(google_apis::DriveApiUrlGenerator::kBaseUrlForProduction), 77 GURL(google_apis::DriveApiUrlGenerator:: 78 kBaseDownloadUrlForProduction), 79 GURL(google_apis::GDataWapiUrlGenerator::kBaseUrlForProduction), 80 std::string() /* custom_user_agent */)); 81} 82 83class SyncEngine::WorkerObserver : public SyncWorkerInterface::Observer { 84 public: 85 WorkerObserver(base::SequencedTaskRunner* ui_task_runner, 86 base::WeakPtr<SyncEngine> sync_engine) 87 : ui_task_runner_(ui_task_runner), 88 sync_engine_(sync_engine) { 89 sequence_checker_.DetachFromSequence(); 90 } 91 92 virtual ~WorkerObserver() { 93 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 94 } 95 96 virtual void OnPendingFileListUpdated(int item_count) OVERRIDE { 97 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 98 if (sync_engine_) 99 sync_engine_->OnPendingFileListUpdated(item_count); 100 return; 101 } 102 103 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 104 ui_task_runner_->PostTask( 105 FROM_HERE, 106 base::Bind(&SyncEngine::OnPendingFileListUpdated, 107 sync_engine_, 108 item_count)); 109 } 110 111 virtual void OnFileStatusChanged(const storage::FileSystemURL& url, 112 SyncFileStatus file_status, 113 SyncAction sync_action, 114 SyncDirection direction) OVERRIDE { 115 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 116 if (sync_engine_) 117 sync_engine_->OnFileStatusChanged( 118 url, file_status, sync_action, direction); 119 return; 120 } 121 122 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 123 ui_task_runner_->PostTask( 124 FROM_HERE, 125 base::Bind(&SyncEngine::OnFileStatusChanged, 126 sync_engine_, 127 url, file_status, sync_action, direction)); 128 } 129 130 virtual void UpdateServiceState(RemoteServiceState state, 131 const std::string& description) OVERRIDE { 132 if (ui_task_runner_->RunsTasksOnCurrentThread()) { 133 if (sync_engine_) 134 sync_engine_->UpdateServiceState(state, description); 135 return; 136 } 137 138 DCHECK(sequence_checker_.CalledOnValidSequencedThread()); 139 ui_task_runner_->PostTask( 140 FROM_HERE, 141 base::Bind(&SyncEngine::UpdateServiceState, 142 sync_engine_, state, description)); 143 } 144 145 void DetachFromSequence() { 146 sequence_checker_.DetachFromSequence(); 147 } 148 149 private: 150 scoped_refptr<base::SequencedTaskRunner> ui_task_runner_; 151 base::WeakPtr<SyncEngine> sync_engine_; 152 153 base::SequenceChecker sequence_checker_; 154 155 DISALLOW_COPY_AND_ASSIGN(WorkerObserver); 156}; 157 158namespace { 159 160void DidRegisterOrigin(const base::TimeTicks& start_time, 161 const SyncStatusCallback& callback, 162 SyncStatusCode status) { 163 base::TimeDelta delta(base::TimeTicks::Now() - start_time); 164 HISTOGRAM_TIMES("SyncFileSystem.RegisterOriginTime", delta); 165 callback.Run(status); 166} 167 168template <typename T> 169void DeleteSoonHelper(scoped_ptr<T>) {} 170 171template <typename T> 172void DeleteSoon(const tracked_objects::Location& from_here, 173 base::TaskRunner* task_runner, 174 scoped_ptr<T> obj) { 175 if (!obj) 176 return; 177 178 T* obj_ptr = obj.get(); 179 base::Closure deleter = 180 base::Bind(&DeleteSoonHelper<T>, base::Passed(&obj)); 181 if (!task_runner->PostTask(from_here, deleter)) { 182 obj_ptr->DetachFromSequence(); 183 deleter.Run(); 184 } 185} 186 187} // namespace 188 189scoped_ptr<SyncEngine> SyncEngine::CreateForBrowserContext( 190 content::BrowserContext* context, 191 TaskLogger* task_logger) { 192 scoped_refptr<base::SequencedWorkerPool> worker_pool = 193 content::BrowserThread::GetBlockingPool(); 194 195 scoped_refptr<base::SingleThreadTaskRunner> ui_task_runner = 196 base::ThreadTaskRunnerHandle::Get(); 197 scoped_refptr<base::SequencedTaskRunner> worker_task_runner = 198 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 199 worker_pool->GetSequenceToken(), 200 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN); 201 scoped_refptr<base::SequencedTaskRunner> drive_task_runner = 202 worker_pool->GetSequencedTaskRunnerWithShutdownBehavior( 203 worker_pool->GetSequenceToken(), 204 base::SequencedWorkerPool::SKIP_ON_SHUTDOWN); 205 206 Profile* profile = Profile::FromBrowserContext(context); 207 drive::DriveNotificationManager* notification_manager = 208 drive::DriveNotificationManagerFactory::GetForBrowserContext(context); 209 ExtensionService* extension_service = 210 extensions::ExtensionSystem::Get(context)->extension_service(); 211 SigninManagerBase* signin_manager = 212 SigninManagerFactory::GetForProfile(profile); 213 OAuth2TokenService* token_service = 214 ProfileOAuth2TokenServiceFactory::GetForProfile(profile); 215 scoped_refptr<net::URLRequestContextGetter> request_context = 216 context->GetRequestContext(); 217 218 scoped_ptr<drive_backend::SyncEngine> sync_engine( 219 new SyncEngine(ui_task_runner, 220 worker_task_runner, 221 drive_task_runner, 222 GetSyncFileSystemDir(context->GetPath()), 223 task_logger, 224 notification_manager, 225 extension_service, 226 signin_manager, 227 token_service, 228 request_context, 229 make_scoped_ptr(new DriveServiceFactory()), 230 NULL /* env_override */)); 231 232 sync_engine->Initialize(); 233 return sync_engine.Pass(); 234} 235 236void SyncEngine::AppendDependsOnFactories( 237 std::set<BrowserContextKeyedServiceFactory*>* factories) { 238 DCHECK(factories); 239 factories->insert(drive::DriveNotificationManagerFactory::GetInstance()); 240 factories->insert(SigninManagerFactory::GetInstance()); 241 factories->insert( 242 extensions::ExtensionsBrowserClient::Get()->GetExtensionSystemFactory()); 243 factories->insert(ProfileOAuth2TokenServiceFactory::GetInstance()); 244} 245 246SyncEngine::~SyncEngine() { 247 Reset(); 248 249 net::NetworkChangeNotifier::RemoveNetworkChangeObserver(this); 250 if (signin_manager_) 251 signin_manager_->RemoveObserver(this); 252 if (notification_manager_) 253 notification_manager_->RemoveObserver(this); 254} 255 256void SyncEngine::Reset() { 257 if (drive_service_) 258 drive_service_->RemoveObserver(this); 259 260 DeleteSoon(FROM_HERE, worker_task_runner_, sync_worker_.Pass()); 261 DeleteSoon(FROM_HERE, worker_task_runner_, worker_observer_.Pass()); 262 DeleteSoon(FROM_HERE, worker_task_runner_, 263 remote_change_processor_on_worker_.Pass()); 264 265 drive_service_wrapper_.reset(); 266 drive_service_.reset(); 267 drive_uploader_wrapper_.reset(); 268 drive_uploader_.reset(); 269 remote_change_processor_wrapper_.reset(); 270 callback_tracker_.AbortAll(); 271} 272 273void SyncEngine::Initialize() { 274 Reset(); 275 276 if (!signin_manager_ || 277 signin_manager_->GetAuthenticatedAccountId().empty()) 278 return; 279 280 DCHECK(drive_service_factory_); 281 scoped_ptr<drive::DriveServiceInterface> drive_service = 282 drive_service_factory_->CreateDriveService( 283 token_service_, request_context_, drive_task_runner_); 284 scoped_ptr<drive::DriveUploaderInterface> drive_uploader( 285 new drive::DriveUploader(drive_service.get(), drive_task_runner_)); 286 287 InitializeInternal(drive_service.Pass(), drive_uploader.Pass(), 288 scoped_ptr<SyncWorkerInterface>()); 289} 290 291void SyncEngine::InitializeForTesting( 292 scoped_ptr<drive::DriveServiceInterface> drive_service, 293 scoped_ptr<drive::DriveUploaderInterface> drive_uploader, 294 scoped_ptr<SyncWorkerInterface> sync_worker) { 295 Reset(); 296 InitializeInternal(drive_service.Pass(), drive_uploader.Pass(), 297 sync_worker.Pass()); 298} 299 300void SyncEngine::InitializeInternal( 301 scoped_ptr<drive::DriveServiceInterface> drive_service, 302 scoped_ptr<drive::DriveUploaderInterface> drive_uploader, 303 scoped_ptr<SyncWorkerInterface> sync_worker) { 304 drive_service_ = drive_service.Pass(); 305 drive_service_wrapper_.reset(new DriveServiceWrapper(drive_service_.get())); 306 307 std::string account_id; 308 if (signin_manager_) 309 account_id = signin_manager_->GetAuthenticatedAccountId(); 310 drive_service_->Initialize(account_id); 311 312 drive_uploader_ = drive_uploader.Pass(); 313 drive_uploader_wrapper_.reset( 314 new DriveUploaderWrapper(drive_uploader_.get())); 315 316 // DriveServiceWrapper and DriveServiceOnWorker relay communications 317 // between DriveService and syncers in SyncWorker. 318 scoped_ptr<drive::DriveServiceInterface> drive_service_on_worker( 319 new DriveServiceOnWorker(drive_service_wrapper_->AsWeakPtr(), 320 ui_task_runner_, 321 worker_task_runner_)); 322 scoped_ptr<drive::DriveUploaderInterface> drive_uploader_on_worker( 323 new DriveUploaderOnWorker(drive_uploader_wrapper_->AsWeakPtr(), 324 ui_task_runner_, 325 worker_task_runner_)); 326 scoped_ptr<SyncEngineContext> sync_engine_context( 327 new SyncEngineContext(drive_service_on_worker.Pass(), 328 drive_uploader_on_worker.Pass(), 329 task_logger_, 330 ui_task_runner_, 331 worker_task_runner_)); 332 333 worker_observer_.reset( 334 new WorkerObserver(ui_task_runner_, weak_ptr_factory_.GetWeakPtr())); 335 336 base::WeakPtr<ExtensionServiceInterface> extension_service_weak_ptr; 337 if (extension_service_) 338 extension_service_weak_ptr = extension_service_->AsWeakPtr(); 339 340 if (!sync_worker) { 341 sync_worker.reset(new SyncWorker( 342 sync_file_system_dir_, 343 extension_service_weak_ptr, 344 env_override_)); 345 } 346 347 sync_worker_ = sync_worker.Pass(); 348 sync_worker_->AddObserver(worker_observer_.get()); 349 350 worker_task_runner_->PostTask( 351 FROM_HERE, 352 base::Bind(&SyncWorkerInterface::Initialize, 353 base::Unretained(sync_worker_.get()), 354 base::Passed(&sync_engine_context))); 355 if (remote_change_processor_) 356 SetRemoteChangeProcessor(remote_change_processor_); 357 358 drive_service_->AddObserver(this); 359 360 service_state_ = REMOTE_SERVICE_TEMPORARY_UNAVAILABLE; 361 SetSyncEnabled(sync_enabled_); 362 OnNetworkChanged(net::NetworkChangeNotifier::GetConnectionType()); 363 if (drive_service_->HasRefreshToken()) 364 OnReadyToSendRequests(); 365 else 366 OnRefreshTokenInvalid(); 367} 368 369void SyncEngine::AddServiceObserver(SyncServiceObserver* observer) { 370 service_observers_.AddObserver(observer); 371} 372 373void SyncEngine::AddFileStatusObserver(FileStatusObserver* observer) { 374 file_status_observers_.AddObserver(observer); 375} 376 377void SyncEngine::RegisterOrigin(const GURL& origin, 378 const SyncStatusCallback& callback) { 379 if (!sync_worker_) { 380 // TODO(tzik): Record |origin| and retry the registration after late 381 // sign-in. Then, return SYNC_STATUS_OK. 382 if (!signin_manager_ || 383 signin_manager_->GetAuthenticatedAccountId().empty()) 384 callback.Run(SYNC_STATUS_AUTHENTICATION_FAILED); 385 else 386 callback.Run(SYNC_STATUS_ABORT); 387 return; 388 } 389 390 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 391 FROM_HERE, base::Bind(&DidRegisterOrigin, base::TimeTicks::Now(), 392 TrackCallback(callback))); 393 394 worker_task_runner_->PostTask( 395 FROM_HERE, 396 base::Bind(&SyncWorkerInterface::RegisterOrigin, 397 base::Unretained(sync_worker_.get()), 398 origin, relayed_callback)); 399} 400 401void SyncEngine::EnableOrigin( 402 const GURL& origin, const SyncStatusCallback& callback) { 403 if (!sync_worker_) { 404 // It's safe to return OK immediately since this is also checked in 405 // SyncWorker initialization. 406 callback.Run(SYNC_STATUS_OK); 407 return; 408 } 409 410 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 411 FROM_HERE, TrackCallback(callback)); 412 413 worker_task_runner_->PostTask( 414 FROM_HERE, 415 base::Bind(&SyncWorkerInterface::EnableOrigin, 416 base::Unretained(sync_worker_.get()), 417 origin, relayed_callback)); 418} 419 420void SyncEngine::DisableOrigin( 421 const GURL& origin, const SyncStatusCallback& callback) { 422 if (!sync_worker_) { 423 // It's safe to return OK immediately since this is also checked in 424 // SyncWorker initialization. 425 callback.Run(SYNC_STATUS_OK); 426 return; 427 } 428 429 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 430 FROM_HERE, TrackCallback(callback)); 431 432 worker_task_runner_->PostTask( 433 FROM_HERE, 434 base::Bind(&SyncWorkerInterface::DisableOrigin, 435 base::Unretained(sync_worker_.get()), 436 origin, 437 relayed_callback)); 438} 439 440void SyncEngine::UninstallOrigin( 441 const GURL& origin, 442 UninstallFlag flag, 443 const SyncStatusCallback& callback) { 444 if (!sync_worker_) { 445 // It's safe to return OK immediately since this is also checked in 446 // SyncWorker initialization. 447 callback.Run(SYNC_STATUS_OK); 448 return; 449 } 450 451 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 452 FROM_HERE, TrackCallback(callback)); 453 worker_task_runner_->PostTask( 454 FROM_HERE, 455 base::Bind(&SyncWorkerInterface::UninstallOrigin, 456 base::Unretained(sync_worker_.get()), 457 origin, flag, relayed_callback)); 458} 459 460void SyncEngine::ProcessRemoteChange(const SyncFileCallback& callback) { 461 if (GetCurrentState() == REMOTE_SERVICE_DISABLED) { 462 callback.Run(SYNC_STATUS_SYNC_DISABLED, storage::FileSystemURL()); 463 return; 464 } 465 466 base::Closure abort_closure = 467 base::Bind(callback, SYNC_STATUS_ABORT, storage::FileSystemURL()); 468 469 if (!sync_worker_) { 470 abort_closure.Run(); 471 return; 472 } 473 474 SyncFileCallback tracked_callback = callback_tracker_.Register( 475 abort_closure, callback); 476 SyncFileCallback relayed_callback = RelayCallbackToCurrentThread( 477 FROM_HERE, tracked_callback); 478 worker_task_runner_->PostTask( 479 FROM_HERE, 480 base::Bind(&SyncWorkerInterface::ProcessRemoteChange, 481 base::Unretained(sync_worker_.get()), 482 relayed_callback)); 483} 484 485void SyncEngine::SetRemoteChangeProcessor(RemoteChangeProcessor* processor) { 486 remote_change_processor_ = processor; 487 488 if (!sync_worker_) 489 return; 490 491 remote_change_processor_wrapper_.reset( 492 new RemoteChangeProcessorWrapper(processor)); 493 494 remote_change_processor_on_worker_.reset(new RemoteChangeProcessorOnWorker( 495 remote_change_processor_wrapper_->AsWeakPtr(), 496 ui_task_runner_, worker_task_runner_)); 497 498 worker_task_runner_->PostTask( 499 FROM_HERE, 500 base::Bind(&SyncWorkerInterface::SetRemoteChangeProcessor, 501 base::Unretained(sync_worker_.get()), 502 remote_change_processor_on_worker_.get())); 503} 504 505LocalChangeProcessor* SyncEngine::GetLocalChangeProcessor() { 506 return this; 507} 508 509RemoteServiceState SyncEngine::GetCurrentState() const { 510 if (!sync_enabled_) 511 return REMOTE_SERVICE_DISABLED; 512 if (!has_refresh_token_) 513 return REMOTE_SERVICE_AUTHENTICATION_REQUIRED; 514 return service_state_; 515} 516 517void SyncEngine::GetOriginStatusMap(const StatusMapCallback& callback) { 518 base::Closure abort_closure = 519 base::Bind(callback, base::Passed(scoped_ptr<OriginStatusMap>())); 520 521 if (!sync_worker_) { 522 abort_closure.Run(); 523 return; 524 } 525 526 StatusMapCallback tracked_callback = 527 callback_tracker_.Register(abort_closure, callback); 528 StatusMapCallback relayed_callback = 529 RelayCallbackToCurrentThread(FROM_HERE, tracked_callback); 530 531 worker_task_runner_->PostTask( 532 FROM_HERE, 533 base::Bind(&SyncWorkerInterface::GetOriginStatusMap, 534 base::Unretained(sync_worker_.get()), 535 relayed_callback)); 536} 537 538void SyncEngine::DumpFiles(const GURL& origin, 539 const ListCallback& callback) { 540 base::Closure abort_closure = 541 base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>())); 542 543 if (!sync_worker_) { 544 abort_closure.Run(); 545 return; 546 } 547 548 ListCallback tracked_callback = 549 callback_tracker_.Register(abort_closure, callback); 550 551 PostTaskAndReplyWithResult( 552 worker_task_runner_, 553 FROM_HERE, 554 base::Bind(&SyncWorkerInterface::DumpFiles, 555 base::Unretained(sync_worker_.get()), 556 origin), 557 tracked_callback); 558} 559 560void SyncEngine::DumpDatabase(const ListCallback& callback) { 561 base::Closure abort_closure = 562 base::Bind(callback, base::Passed(scoped_ptr<base::ListValue>())); 563 564 if (!sync_worker_) { 565 abort_closure.Run(); 566 return; 567 } 568 569 ListCallback tracked_callback = 570 callback_tracker_.Register(abort_closure, callback); 571 572 PostTaskAndReplyWithResult( 573 worker_task_runner_, 574 FROM_HERE, 575 base::Bind(&SyncWorkerInterface::DumpDatabase, 576 base::Unretained(sync_worker_.get())), 577 tracked_callback); 578} 579 580void SyncEngine::SetSyncEnabled(bool sync_enabled) { 581 sync_enabled_ = sync_enabled; 582 583 if (!sync_worker_) 584 return; 585 586 worker_task_runner_->PostTask( 587 FROM_HERE, 588 base::Bind(&SyncWorkerInterface::SetSyncEnabled, 589 base::Unretained(sync_worker_.get()), 590 sync_enabled)); 591} 592 593void SyncEngine::PromoteDemotedChanges(const base::Closure& callback) { 594 if (!sync_worker_) { 595 callback.Run(); 596 return; 597 } 598 599 base::Closure relayed_callback = RelayCallbackToCurrentThread( 600 FROM_HERE, callback_tracker_.Register(callback, callback)); 601 602 worker_task_runner_->PostTask( 603 FROM_HERE, 604 base::Bind(&SyncWorkerInterface::PromoteDemotedChanges, 605 base::Unretained(sync_worker_.get()), 606 relayed_callback)); 607} 608 609void SyncEngine::ApplyLocalChange(const FileChange& local_change, 610 const base::FilePath& local_path, 611 const SyncFileMetadata& local_metadata, 612 const storage::FileSystemURL& url, 613 const SyncStatusCallback& callback) { 614 if (GetCurrentState() == REMOTE_SERVICE_DISABLED) { 615 callback.Run(SYNC_STATUS_SYNC_DISABLED); 616 return; 617 } 618 619 if (!sync_worker_) { 620 callback.Run(SYNC_STATUS_ABORT); 621 return; 622 } 623 624 SyncStatusCallback relayed_callback = RelayCallbackToCurrentThread( 625 FROM_HERE, TrackCallback(callback)); 626 worker_task_runner_->PostTask( 627 FROM_HERE, 628 base::Bind(&SyncWorkerInterface::ApplyLocalChange, 629 base::Unretained(sync_worker_.get()), 630 local_change, 631 local_path, 632 local_metadata, 633 url, 634 relayed_callback)); 635} 636 637void SyncEngine::OnNotificationReceived() { 638 if (!sync_worker_) 639 return; 640 641 worker_task_runner_->PostTask( 642 FROM_HERE, 643 base::Bind(&SyncWorkerInterface::ActivateService, 644 base::Unretained(sync_worker_.get()), 645 REMOTE_SERVICE_OK, 646 "Got push notification for Drive")); 647} 648 649void SyncEngine::OnPushNotificationEnabled(bool) {} 650 651void SyncEngine::OnReadyToSendRequests() { 652 has_refresh_token_ = true; 653 if (!sync_worker_) 654 return; 655 656 worker_task_runner_->PostTask( 657 FROM_HERE, 658 base::Bind(&SyncWorkerInterface::ActivateService, 659 base::Unretained(sync_worker_.get()), 660 REMOTE_SERVICE_OK, 661 "Authenticated")); 662} 663 664void SyncEngine::OnRefreshTokenInvalid() { 665 has_refresh_token_ = false; 666 if (!sync_worker_) 667 return; 668 669 worker_task_runner_->PostTask( 670 FROM_HERE, 671 base::Bind(&SyncWorkerInterface::DeactivateService, 672 base::Unretained(sync_worker_.get()), 673 "Found invalid refresh token.")); 674} 675 676void SyncEngine::OnNetworkChanged( 677 net::NetworkChangeNotifier::ConnectionType type) { 678 if (!sync_worker_) 679 return; 680 681 bool network_available_old = network_available_; 682 network_available_ = (type != net::NetworkChangeNotifier::CONNECTION_NONE); 683 684 if (!network_available_old && network_available_) { 685 worker_task_runner_->PostTask( 686 FROM_HERE, 687 base::Bind(&SyncWorkerInterface::ActivateService, 688 base::Unretained(sync_worker_.get()), 689 REMOTE_SERVICE_OK, 690 "Connected")); 691 } else if (network_available_old && !network_available_) { 692 worker_task_runner_->PostTask( 693 FROM_HERE, 694 base::Bind(&SyncWorkerInterface::DeactivateService, 695 base::Unretained(sync_worker_.get()), 696 "Disconnected")); 697 } 698 699} 700 701void SyncEngine::GoogleSigninFailed(const GoogleServiceAuthError& error) { 702 Reset(); 703 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 704 "Failed to sign in."); 705} 706 707void SyncEngine::GoogleSigninSucceeded(const std::string& username, 708 const std::string& password) { 709 Initialize(); 710} 711 712void SyncEngine::GoogleSignedOut(const std::string& username) { 713 Reset(); 714 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 715 "User signed out."); 716} 717 718SyncEngine::SyncEngine( 719 base::SingleThreadTaskRunner* ui_task_runner, 720 base::SequencedTaskRunner* worker_task_runner, 721 base::SequencedTaskRunner* drive_task_runner, 722 const base::FilePath& sync_file_system_dir, 723 TaskLogger* task_logger, 724 drive::DriveNotificationManager* notification_manager, 725 ExtensionServiceInterface* extension_service, 726 SigninManagerBase* signin_manager, 727 OAuth2TokenService* token_service, 728 net::URLRequestContextGetter* request_context, 729 scoped_ptr<DriveServiceFactory> drive_service_factory, 730 leveldb::Env* env_override) 731 : ui_task_runner_(ui_task_runner), 732 worker_task_runner_(worker_task_runner), 733 drive_task_runner_(drive_task_runner), 734 sync_file_system_dir_(sync_file_system_dir), 735 task_logger_(task_logger), 736 notification_manager_(notification_manager), 737 extension_service_(extension_service), 738 signin_manager_(signin_manager), 739 token_service_(token_service), 740 request_context_(request_context), 741 drive_service_factory_(drive_service_factory.Pass()), 742 remote_change_processor_(NULL), 743 service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE), 744 has_refresh_token_(false), 745 network_available_(false), 746 sync_enabled_(false), 747 env_override_(env_override), 748 weak_ptr_factory_(this) { 749 DCHECK(sync_file_system_dir_.IsAbsolute()); 750 if (notification_manager_) 751 notification_manager_->AddObserver(this); 752 if (signin_manager_) 753 signin_manager_->AddObserver(this); 754 net::NetworkChangeNotifier::AddNetworkChangeObserver(this); 755} 756 757void SyncEngine::OnPendingFileListUpdated(int item_count) { 758 FOR_EACH_OBSERVER( 759 SyncServiceObserver, 760 service_observers_, 761 OnRemoteChangeQueueUpdated(item_count)); 762} 763 764void SyncEngine::OnFileStatusChanged(const storage::FileSystemURL& url, 765 SyncFileStatus file_status, 766 SyncAction sync_action, 767 SyncDirection direction) { 768 FOR_EACH_OBSERVER(FileStatusObserver, 769 file_status_observers_, 770 OnFileStatusChanged( 771 url, file_status, sync_action, direction)); 772} 773 774void SyncEngine::UpdateServiceState(RemoteServiceState state, 775 const std::string& description) { 776 service_state_ = state; 777 778 FOR_EACH_OBSERVER( 779 SyncServiceObserver, service_observers_, 780 OnRemoteServiceStateUpdated(GetCurrentState(), description)); 781} 782 783SyncStatusCallback SyncEngine::TrackCallback( 784 const SyncStatusCallback& callback) { 785 return callback_tracker_.Register( 786 base::Bind(callback, SYNC_STATUS_ABORT), 787 callback); 788} 789 790} // namespace drive_backend 791} // namespace sync_file_system 792