sync_worker.cc revision a02191e04bc25c4935f804f2c080ae28663d096d
1// Copyright 2014 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "chrome/browser/sync_file_system/drive_backend/sync_worker.h" 6 7#include <vector> 8 9#include "base/bind.h" 10#include "base/memory/weak_ptr.h" 11#include "base/threading/sequenced_worker_pool.h" 12#include "base/values.h" 13#include "chrome/browser/drive/drive_api_service.h" 14#include "chrome/browser/drive/drive_notification_manager.h" 15#include "chrome/browser/drive/drive_notification_manager_factory.h" 16#include "chrome/browser/drive/drive_service_interface.h" 17#include "chrome/browser/drive/drive_uploader.h" 18#include "chrome/browser/extensions/extension_service.h" 19#include "chrome/browser/profiles/profile.h" 20#include "chrome/browser/signin/profile_oauth2_token_service_factory.h" 21#include "chrome/browser/signin/signin_manager_factory.h" 22#include "chrome/browser/sync_file_system/drive_backend/conflict_resolver.h" 23#include "chrome/browser/sync_file_system/drive_backend/drive_backend_constants.h" 24#include "chrome/browser/sync_file_system/drive_backend/list_changes_task.h" 25#include "chrome/browser/sync_file_system/drive_backend/local_to_remote_syncer.h" 26#include "chrome/browser/sync_file_system/drive_backend/metadata_database.h" 27#include "chrome/browser/sync_file_system/drive_backend/register_app_task.h" 28#include "chrome/browser/sync_file_system/drive_backend/remote_to_local_syncer.h" 29#include "chrome/browser/sync_file_system/drive_backend/sync_engine.h" 30#include "chrome/browser/sync_file_system/drive_backend/sync_engine_context.h" 31#include "chrome/browser/sync_file_system/drive_backend/sync_engine_initializer.h" 32#include "chrome/browser/sync_file_system/drive_backend/sync_task.h" 33#include "chrome/browser/sync_file_system/drive_backend/uninstall_app_task.h" 34#include "chrome/browser/sync_file_system/file_status_observer.h" 35#include "chrome/browser/sync_file_system/logger.h" 36#include "chrome/browser/sync_file_system/syncable_file_system_util.h" 37#include "components/signin/core/browser/profile_oauth2_token_service.h" 38#include "components/signin/core/browser/signin_manager.h" 39#include "content/public/browser/browser_thread.h" 40#include "extensions/browser/extension_system.h" 41#include "extensions/browser/extension_system_provider.h" 42#include "extensions/browser/extensions_browser_client.h" 43#include "extensions/common/extension.h" 44#include "google_apis/drive/drive_api_url_generator.h" 45#include "google_apis/drive/gdata_wapi_url_generator.h" 46#include "webkit/common/blob/scoped_file.h" 47#include "webkit/common/fileapi/file_system_util.h" 48 49namespace sync_file_system { 50 51class RemoteChangeProcessor; 52 53namespace drive_backend { 54 55namespace { 56 57void EmptyStatusCallback(SyncStatusCode status) {} 58 59} // namespace 60 61scoped_ptr<SyncWorker> SyncWorker::CreateOnWorker( 62 const base::WeakPtr<drive_backend::SyncEngine>& sync_engine, 63 const base::FilePath& base_dir, 64 scoped_ptr<SyncEngineContext> sync_engine_context, 65 leveldb::Env* env_override) { 66 scoped_ptr<SyncWorker> sync_worker( 67 new SyncWorker(sync_engine, 68 base_dir, 69 sync_engine_context.Pass(), 70 env_override)); 71 sync_worker->Initialize(); 72 73 return sync_worker.Pass(); 74} 75 76SyncWorker::~SyncWorker() {} 77 78void SyncWorker::Initialize() { 79 DCHECK(!task_manager_); 80 81 task_manager_.reset(new SyncTaskManager( 82 weak_ptr_factory_.GetWeakPtr(), 0 /* maximum_background_task */)); 83 task_manager_->Initialize(SYNC_STATUS_OK); 84 85 PostInitializeTask(); 86 87 net::NetworkChangeNotifier::ConnectionType type = 88 net::NetworkChangeNotifier::GetConnectionType(); 89 network_available_ = 90 type != net::NetworkChangeNotifier::CONNECTION_NONE; 91} 92 93void SyncWorker::RegisterOrigin( 94 const GURL& origin, 95 const SyncStatusCallback& callback) { 96 if (!GetMetadataDatabase() && GetDriveService()->HasRefreshToken()) 97 PostInitializeTask(); 98 99 scoped_ptr<RegisterAppTask> task( 100 new RegisterAppTask(context_.get(), origin.host())); 101 if (task->CanFinishImmediately()) { 102 callback.Run(SYNC_STATUS_OK); 103 return; 104 } 105 106 task_manager_->ScheduleSyncTask( 107 FROM_HERE, 108 task.PassAs<SyncTask>(), 109 SyncTaskManager::PRIORITY_HIGH, 110 callback); 111} 112 113void SyncWorker::EnableOrigin( 114 const GURL& origin, 115 const SyncStatusCallback& callback) { 116 task_manager_->ScheduleTask( 117 FROM_HERE, 118 base::Bind(&SyncWorker::DoEnableApp, 119 weak_ptr_factory_.GetWeakPtr(), 120 origin.host()), 121 SyncTaskManager::PRIORITY_HIGH, 122 callback); 123} 124 125void SyncWorker::DisableOrigin( 126 const GURL& origin, 127 const SyncStatusCallback& callback) { 128 task_manager_->ScheduleTask( 129 FROM_HERE, 130 base::Bind(&SyncWorker::DoDisableApp, 131 weak_ptr_factory_.GetWeakPtr(), 132 origin.host()), 133 SyncTaskManager::PRIORITY_HIGH, 134 callback); 135} 136 137void SyncWorker::UninstallOrigin( 138 const GURL& origin, 139 RemoteFileSyncService::UninstallFlag flag, 140 const SyncStatusCallback& callback) { 141 task_manager_->ScheduleSyncTask( 142 FROM_HERE, 143 scoped_ptr<SyncTask>( 144 new UninstallAppTask(context_.get(), origin.host(), flag)), 145 SyncTaskManager::PRIORITY_HIGH, 146 callback); 147} 148 149void SyncWorker::ProcessRemoteChange( 150 const SyncFileCallback& callback) { 151 RemoteToLocalSyncer* syncer = new RemoteToLocalSyncer(context_.get()); 152 task_manager_->ScheduleSyncTask( 153 FROM_HERE, 154 scoped_ptr<SyncTask>(syncer), 155 SyncTaskManager::PRIORITY_MED, 156 base::Bind(&SyncWorker::DidProcessRemoteChange, 157 weak_ptr_factory_.GetWeakPtr(), 158 syncer, callback)); 159} 160 161void SyncWorker::SetRemoteChangeProcessor( 162 RemoteChangeProcessor* processor) { 163 context_->SetRemoteChangeProcessor(processor); 164} 165 166RemoteServiceState SyncWorker::GetCurrentState() const { 167 if (!sync_enabled_) 168 return REMOTE_SERVICE_DISABLED; 169 return service_state_; 170} 171 172void SyncWorker::GetOriginStatusMap( 173 RemoteFileSyncService::OriginStatusMap* status_map) { 174 DCHECK(status_map); 175 176 if (!GetMetadataDatabase()) 177 return; 178 179 std::vector<std::string> app_ids; 180 GetMetadataDatabase()->GetRegisteredAppIDs(&app_ids); 181 182 for (std::vector<std::string>::const_iterator itr = app_ids.begin(); 183 itr != app_ids.end(); ++itr) { 184 const std::string& app_id = *itr; 185 GURL origin = 186 extensions::Extension::GetBaseURLFromExtensionId(app_id); 187 (*status_map)[origin] = 188 GetMetadataDatabase()->IsAppEnabled(app_id) ? 189 "Enabled" : "Disabled"; 190 } 191} 192 193scoped_ptr<base::ListValue> SyncWorker::DumpFiles(const GURL& origin) { 194 if (!GetMetadataDatabase()) 195 return scoped_ptr<base::ListValue>(); 196 return GetMetadataDatabase()->DumpFiles(origin.host()); 197} 198 199scoped_ptr<base::ListValue> SyncWorker::DumpDatabase() { 200 if (!GetMetadataDatabase()) 201 return scoped_ptr<base::ListValue>(); 202 return GetMetadataDatabase()->DumpDatabase(); 203} 204 205void SyncWorker::SetSyncEnabled(bool enabled) { 206 if (sync_enabled_ == enabled) 207 return; 208 209 RemoteServiceState old_state = GetCurrentState(); 210 sync_enabled_ = enabled; 211 if (old_state == GetCurrentState()) 212 return; 213 214 // TODO(peria): PostTask() 215 sync_engine_->UpdateSyncEnabled(enabled); 216} 217 218SyncStatusCode SyncWorker::SetDefaultConflictResolutionPolicy( 219 ConflictResolutionPolicy policy) { 220 default_conflict_resolution_policy_ = policy; 221 return SYNC_STATUS_OK; 222} 223 224SyncStatusCode SyncWorker::SetConflictResolutionPolicy( 225 const GURL& origin, 226 ConflictResolutionPolicy policy) { 227 NOTIMPLEMENTED(); 228 default_conflict_resolution_policy_ = policy; 229 return SYNC_STATUS_OK; 230} 231 232ConflictResolutionPolicy SyncWorker::GetDefaultConflictResolutionPolicy() 233 const { 234 return default_conflict_resolution_policy_; 235} 236 237ConflictResolutionPolicy SyncWorker::GetConflictResolutionPolicy( 238 const GURL& origin) const { 239 NOTIMPLEMENTED(); 240 return default_conflict_resolution_policy_; 241} 242 243void SyncWorker::ApplyLocalChange( 244 const FileChange& local_change, 245 const base::FilePath& local_path, 246 const SyncFileMetadata& local_metadata, 247 const fileapi::FileSystemURL& url, 248 const SyncStatusCallback& callback) { 249 LocalToRemoteSyncer* syncer = new LocalToRemoteSyncer( 250 context_.get(), local_metadata, local_change, local_path, url); 251 task_manager_->ScheduleSyncTask( 252 FROM_HERE, 253 scoped_ptr<SyncTask>(syncer), 254 SyncTaskManager::PRIORITY_MED, 255 base::Bind(&SyncWorker::DidApplyLocalChange, 256 weak_ptr_factory_.GetWeakPtr(), 257 syncer, callback)); 258} 259 260void SyncWorker::MaybeScheduleNextTask() { 261 if (GetCurrentState() == REMOTE_SERVICE_DISABLED) 262 return; 263 264 // TODO(tzik): Notify observer of OnRemoteChangeQueueUpdated. 265 // TODO(tzik): Add an interface to get the number of dirty trackers to 266 // MetadataDatabase. 267 268 MaybeStartFetchChanges(); 269} 270 271void SyncWorker::NotifyLastOperationStatus( 272 SyncStatusCode status, 273 bool used_network) { 274 UpdateServiceStateFromSyncStatusCode(status, used_network); 275 276 if (GetMetadataDatabase()) { 277 // TODO(peria): Post task 278 sync_engine_->NotifyLastOperationStatus(); 279 } 280} 281 282void SyncWorker::OnNotificationReceived() { 283 if (service_state_ == REMOTE_SERVICE_TEMPORARY_UNAVAILABLE) 284 UpdateServiceState(REMOTE_SERVICE_OK, "Got push notification for Drive."); 285 286 should_check_remote_change_ = true; 287 MaybeScheduleNextTask(); 288} 289 290void SyncWorker::OnReadyToSendRequests(const std::string& account_id) { 291 if (service_state_ == REMOTE_SERVICE_OK) 292 return; 293 UpdateServiceState(REMOTE_SERVICE_OK, "Authenticated"); 294 295 if (!GetMetadataDatabase() && !account_id.empty()) { 296 GetDriveService()->Initialize(account_id); 297 PostInitializeTask(); 298 return; 299 } 300 301 should_check_remote_change_ = true; 302 MaybeScheduleNextTask(); 303} 304 305void SyncWorker::OnRefreshTokenInvalid() { 306 UpdateServiceState( 307 REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 308 "Found invalid refresh token."); 309} 310 311void SyncWorker::OnNetworkChanged( 312 net::NetworkChangeNotifier::ConnectionType type) { 313 bool new_network_availability = 314 type != net::NetworkChangeNotifier::CONNECTION_NONE; 315 316 if (network_available_ && !new_network_availability) { 317 UpdateServiceState(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE, "Disconnected"); 318 } else if (!network_available_ && new_network_availability) { 319 UpdateServiceState(REMOTE_SERVICE_OK, "Connected"); 320 should_check_remote_change_ = true; 321 MaybeStartFetchChanges(); 322 } 323 network_available_ = new_network_availability; 324} 325 326drive::DriveServiceInterface* SyncWorker::GetDriveService() { 327 return context_->GetDriveService(); 328} 329 330drive::DriveUploaderInterface* SyncWorker::GetDriveUploader() { 331 return context_->GetDriveUploader(); 332} 333 334MetadataDatabase* SyncWorker::GetMetadataDatabase() { 335 return context_->GetMetadataDatabase(); 336} 337 338SyncTaskManager* SyncWorker::GetSyncTaskManager() { 339 return task_manager_.get(); 340} 341 342SyncWorker::SyncWorker( 343 const base::WeakPtr<drive_backend::SyncEngine>& sync_engine, 344 const base::FilePath& base_dir, 345 scoped_ptr<SyncEngineContext> sync_engine_context, 346 leveldb::Env* env_override) 347 : base_dir_(base_dir), 348 env_override_(env_override), 349 service_state_(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE), 350 should_check_conflict_(true), 351 should_check_remote_change_(true), 352 listing_remote_changes_(false), 353 sync_enabled_(false), 354 default_conflict_resolution_policy_( 355 CONFLICT_RESOLUTION_POLICY_LAST_WRITE_WIN), 356 network_available_(false), 357 context_(sync_engine_context.Pass()), 358 sync_engine_(sync_engine), 359 weak_ptr_factory_(this) {} 360 361void SyncWorker::DoDisableApp(const std::string& app_id, 362 const SyncStatusCallback& callback) { 363 if (GetMetadataDatabase()) 364 GetMetadataDatabase()->DisableApp(app_id, callback); 365 else 366 callback.Run(SYNC_STATUS_OK); 367} 368 369void SyncWorker::DoEnableApp(const std::string& app_id, 370 const SyncStatusCallback& callback) { 371 if (GetMetadataDatabase()) 372 GetMetadataDatabase()->EnableApp(app_id, callback); 373 else 374 callback.Run(SYNC_STATUS_OK); 375} 376 377void SyncWorker::PostInitializeTask() { 378 DCHECK(!GetMetadataDatabase()); 379 380 // This initializer task may not run if MetadataDatabase in context_ is 381 // already initialized when it runs. 382 SyncEngineInitializer* initializer = 383 new SyncEngineInitializer(context_.get(), 384 context_->GetBlockingTaskRunner(), 385 base_dir_.Append(kDatabaseName), 386 env_override_); 387 task_manager_->ScheduleSyncTask( 388 FROM_HERE, 389 scoped_ptr<SyncTask>(initializer), 390 SyncTaskManager::PRIORITY_HIGH, 391 base::Bind(&SyncWorker::DidInitialize, weak_ptr_factory_.GetWeakPtr(), 392 initializer)); 393} 394 395void SyncWorker::DidInitialize(SyncEngineInitializer* initializer, 396 SyncStatusCode status) { 397 if (status != SYNC_STATUS_OK) { 398 if (GetDriveService()->HasRefreshToken()) { 399 UpdateServiceState(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE, 400 "Could not initialize remote service"); 401 } else { 402 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 403 "Authentication required."); 404 } 405 return; 406 } 407 408 scoped_ptr<MetadataDatabase> metadata_database = 409 initializer->PassMetadataDatabase(); 410 if (metadata_database) 411 context_->SetMetadataDatabase(metadata_database.Pass()); 412 413 // TODO(peria): Post task 414 sync_engine_->UpdateRegisteredApps(); 415} 416 417void SyncWorker::DidProcessRemoteChange(RemoteToLocalSyncer* syncer, 418 const SyncFileCallback& callback, 419 SyncStatusCode status) { 420 if (syncer->is_sync_root_deletion()) { 421 MetadataDatabase::ClearDatabase(context_->PassMetadataDatabase()); 422 PostInitializeTask(); 423 callback.Run(status, syncer->url()); 424 return; 425 } 426 427 if (status == SYNC_STATUS_OK) { 428 // TODO(peria): Post task 429 sync_engine_->DidProcessRemoteChange(syncer); 430 431 if (syncer->sync_action() == SYNC_ACTION_DELETED && 432 syncer->url().is_valid() && 433 fileapi::VirtualPath::IsRootPath(syncer->url().path())) { 434 RegisterOrigin(syncer->url().origin(), base::Bind(&EmptyStatusCallback)); 435 } 436 should_check_conflict_ = true; 437 } 438 callback.Run(status, syncer->url()); 439} 440 441void SyncWorker::DidApplyLocalChange(LocalToRemoteSyncer* syncer, 442 const SyncStatusCallback& callback, 443 SyncStatusCode status) { 444 // TODO(peria): Post task 445 sync_engine_->DidApplyLocalChange(syncer, status); 446 447 if (status == SYNC_STATUS_UNKNOWN_ORIGIN && syncer->url().is_valid()) { 448 RegisterOrigin(syncer->url().origin(), 449 base::Bind(&EmptyStatusCallback)); 450 } 451 452 if (syncer->needs_remote_change_listing() && 453 !listing_remote_changes_) { 454 task_manager_->ScheduleSyncTask( 455 FROM_HERE, 456 scoped_ptr<SyncTask>(new ListChangesTask(context_.get())), 457 SyncTaskManager::PRIORITY_HIGH, 458 base::Bind(&SyncWorker::DidFetchChanges, 459 weak_ptr_factory_.GetWeakPtr())); 460 should_check_remote_change_ = false; 461 listing_remote_changes_ = true; 462 time_to_check_changes_ = 463 base::TimeTicks::Now() + 464 base::TimeDelta::FromSeconds(kListChangesRetryDelaySeconds); 465 } 466 467 if (status != SYNC_STATUS_OK && 468 status != SYNC_STATUS_NO_CHANGE_TO_SYNC) { 469 callback.Run(status); 470 return; 471 } 472 473 if (status == SYNC_STATUS_OK) 474 should_check_conflict_ = true; 475 476 callback.Run(status); 477} 478 479void SyncWorker::MaybeStartFetchChanges() { 480 if (GetCurrentState() == REMOTE_SERVICE_DISABLED) 481 return; 482 483 if (!GetMetadataDatabase()) 484 return; 485 486 if (listing_remote_changes_) 487 return; 488 489 base::TimeTicks now = base::TimeTicks::Now(); 490 if (!should_check_remote_change_ && now < time_to_check_changes_) { 491 if (!GetMetadataDatabase()->HasDirtyTracker() && 492 should_check_conflict_) { 493 should_check_conflict_ = false; 494 task_manager_->ScheduleSyncTaskIfIdle( 495 FROM_HERE, 496 scoped_ptr<SyncTask>(new ConflictResolver(context_.get())), 497 base::Bind(&SyncWorker::DidResolveConflict, 498 weak_ptr_factory_.GetWeakPtr())); 499 } 500 return; 501 } 502 503 if (task_manager_->ScheduleSyncTaskIfIdle( 504 FROM_HERE, 505 scoped_ptr<SyncTask>(new ListChangesTask(context_.get())), 506 base::Bind(&SyncWorker::DidFetchChanges, 507 weak_ptr_factory_.GetWeakPtr()))) { 508 should_check_remote_change_ = false; 509 listing_remote_changes_ = true; 510 time_to_check_changes_ = 511 now + base::TimeDelta::FromSeconds(kListChangesRetryDelaySeconds); 512 } 513} 514 515void SyncWorker::DidResolveConflict(SyncStatusCode status) { 516 if (status == SYNC_STATUS_OK) 517 should_check_conflict_ = true; 518} 519 520void SyncWorker::DidFetchChanges(SyncStatusCode status) { 521 if (status == SYNC_STATUS_OK) 522 should_check_conflict_ = true; 523 listing_remote_changes_ = false; 524} 525 526void SyncWorker::UpdateServiceStateFromSyncStatusCode( 527 SyncStatusCode status, 528 bool used_network) { 529 switch (status) { 530 case SYNC_STATUS_OK: 531 if (used_network) 532 UpdateServiceState(REMOTE_SERVICE_OK, std::string()); 533 break; 534 535 // Authentication error. 536 case SYNC_STATUS_AUTHENTICATION_FAILED: 537 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 538 "Authentication required"); 539 break; 540 541 // OAuth token error. 542 case SYNC_STATUS_ACCESS_FORBIDDEN: 543 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 544 "Access forbidden"); 545 break; 546 547 // Errors which could make the service temporarily unavailable. 548 case SYNC_STATUS_SERVICE_TEMPORARILY_UNAVAILABLE: 549 case SYNC_STATUS_NETWORK_ERROR: 550 case SYNC_STATUS_ABORT: 551 case SYNC_STATUS_FAILED: 552 if (GetDriveService()->HasRefreshToken()) { 553 UpdateServiceState(REMOTE_SERVICE_TEMPORARY_UNAVAILABLE, 554 "Network or temporary service error."); 555 } else { 556 UpdateServiceState(REMOTE_SERVICE_AUTHENTICATION_REQUIRED, 557 "Authentication required"); 558 } 559 break; 560 561 // Errors which would require manual user intervention to resolve. 562 case SYNC_DATABASE_ERROR_CORRUPTION: 563 case SYNC_DATABASE_ERROR_IO_ERROR: 564 case SYNC_DATABASE_ERROR_FAILED: 565 UpdateServiceState(REMOTE_SERVICE_DISABLED, 566 "Unrecoverable database error"); 567 break; 568 569 default: 570 // Other errors don't affect service state 571 break; 572 } 573} 574 575void SyncWorker::UpdateServiceState(RemoteServiceState state, 576 const std::string& description) { 577 RemoteServiceState old_state = GetCurrentState(); 578 service_state_ = state; 579 580 if (old_state == GetCurrentState()) 581 return; 582 583 util::Log(logging::LOG_VERBOSE, FROM_HERE, 584 "Service state changed: %d->%d: %s", 585 old_state, GetCurrentState(), description.c_str()); 586 // TODO(peria): Post task 587 sync_engine_->UpdateServiceState(description); 588} 589 590} // namespace drive_backend 591} // namespace sync_file_system 592