sync_manager_impl.cc revision 5f1c94371a64b3196d4be9466099bb892df9b88e
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "sync/internal_api/sync_manager_impl.h" 6 7#include <string> 8 9#include "base/base64.h" 10#include "base/bind.h" 11#include "base/callback.h" 12#include "base/compiler_specific.h" 13#include "base/json/json_writer.h" 14#include "base/memory/ref_counted.h" 15#include "base/metrics/histogram.h" 16#include "base/observer_list.h" 17#include "base/strings/string_number_conversions.h" 18#include "base/thread_task_runner_handle.h" 19#include "base/values.h" 20#include "sync/engine/sync_scheduler.h" 21#include "sync/engine/syncer_types.h" 22#include "sync/internal_api/change_reorder_buffer.h" 23#include "sync/internal_api/public/base/cancelation_signal.h" 24#include "sync/internal_api/public/base/invalidation_interface.h" 25#include "sync/internal_api/public/base/model_type.h" 26#include "sync/internal_api/public/base_node.h" 27#include "sync/internal_api/public/configure_reason.h" 28#include "sync/internal_api/public/engine/polling_constants.h" 29#include "sync/internal_api/public/http_post_provider_factory.h" 30#include "sync/internal_api/public/internal_components_factory.h" 31#include "sync/internal_api/public/read_node.h" 32#include "sync/internal_api/public/read_transaction.h" 33#include "sync/internal_api/public/sync_context.h" 34#include "sync/internal_api/public/sync_context_proxy.h" 35#include "sync/internal_api/public/user_share.h" 36#include "sync/internal_api/public/util/experiments.h" 37#include "sync/internal_api/public/write_node.h" 38#include "sync/internal_api/public/write_transaction.h" 39#include "sync/internal_api/sync_context_proxy_impl.h" 40#include "sync/internal_api/syncapi_internal.h" 41#include "sync/internal_api/syncapi_server_connection_manager.h" 42#include "sync/protocol/proto_value_conversions.h" 43#include "sync/protocol/sync.pb.h" 44#include "sync/sessions/directory_type_debug_info_emitter.h" 45#include "sync/syncable/directory.h" 46#include "sync/syncable/entry.h" 47#include "sync/syncable/in_memory_directory_backing_store.h" 48#include "sync/syncable/on_disk_directory_backing_store.h" 49 50using base::TimeDelta; 51using sync_pb::GetUpdatesCallerInfo; 52 53class GURL; 54 55namespace syncer { 56 57using sessions::SyncSessionContext; 58using syncable::ImmutableWriteTransactionInfo; 59using syncable::SPECIFICS; 60using syncable::UNIQUE_POSITION; 61 62namespace { 63 64// Delays for syncer nudges. 65static const int kDefaultNudgeDelayMilliseconds = 200; 66static const int kPreferencesNudgeDelayMilliseconds = 2000; 67static const int kSyncRefreshDelayMsec = 500; 68static const int kSyncSchedulerDelayMsec = 250; 69 70GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason( 71 ConfigureReason reason) { 72 switch (reason) { 73 case CONFIGURE_REASON_RECONFIGURATION: 74 return GetUpdatesCallerInfo::RECONFIGURATION; 75 case CONFIGURE_REASON_MIGRATION: 76 return GetUpdatesCallerInfo::MIGRATION; 77 case CONFIGURE_REASON_NEW_CLIENT: 78 return GetUpdatesCallerInfo::NEW_CLIENT; 79 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE: 80 case CONFIGURE_REASON_CRYPTO: 81 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE; 82 default: 83 NOTREACHED(); 84 } 85 return GetUpdatesCallerInfo::UNKNOWN; 86} 87 88} // namespace 89 90// A class to calculate nudge delays for types. 91class NudgeStrategy { 92 public: 93 static TimeDelta GetNudgeDelayTimeDelta(const ModelType& model_type, 94 SyncManagerImpl* core) { 95 NudgeDelayStrategy delay_type = GetNudgeDelayStrategy(model_type); 96 return GetNudgeDelayTimeDeltaFromType(delay_type, 97 model_type, 98 core); 99 } 100 101 private: 102 // Possible types of nudge delay for datatypes. 103 // Note: These are just hints. If a sync happens then all dirty entries 104 // would be committed as part of the sync. 105 enum NudgeDelayStrategy { 106 // Sync right away. 107 IMMEDIATE, 108 109 // Sync this change while syncing another change. 110 ACCOMPANY_ONLY, 111 112 // The datatype does not use one of the predefined wait times but defines 113 // its own wait time logic for nudge. 114 CUSTOM, 115 }; 116 117 static NudgeDelayStrategy GetNudgeDelayStrategy(const ModelType& type) { 118 switch (type) { 119 case AUTOFILL: 120 return ACCOMPANY_ONLY; 121 case PREFERENCES: 122 case SESSIONS: 123 case FAVICON_IMAGES: 124 case FAVICON_TRACKING: 125 return CUSTOM; 126 default: 127 return IMMEDIATE; 128 } 129 } 130 131 static TimeDelta GetNudgeDelayTimeDeltaFromType( 132 const NudgeDelayStrategy& delay_type, const ModelType& model_type, 133 const SyncManagerImpl* core) { 134 CHECK(core); 135 TimeDelta delay = TimeDelta::FromMilliseconds( 136 kDefaultNudgeDelayMilliseconds); 137 switch (delay_type) { 138 case IMMEDIATE: 139 delay = TimeDelta::FromMilliseconds( 140 kDefaultNudgeDelayMilliseconds); 141 break; 142 case ACCOMPANY_ONLY: 143 delay = TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds); 144 break; 145 case CUSTOM: 146 switch (model_type) { 147 case PREFERENCES: 148 delay = TimeDelta::FromMilliseconds( 149 kPreferencesNudgeDelayMilliseconds); 150 break; 151 case SESSIONS: 152 case FAVICON_IMAGES: 153 case FAVICON_TRACKING: 154 delay = core->scheduler()->GetSessionsCommitDelay(); 155 break; 156 default: 157 NOTREACHED(); 158 } 159 break; 160 default: 161 NOTREACHED(); 162 } 163 return delay; 164 } 165}; 166 167SyncManagerImpl::SyncManagerImpl(const std::string& name) 168 : name_(name), 169 change_delegate_(NULL), 170 initialized_(false), 171 observing_network_connectivity_changes_(false), 172 report_unrecoverable_error_function_(NULL), 173 weak_ptr_factory_(this) { 174 // Pre-fill |notification_info_map_|. 175 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) { 176 notification_info_map_.insert( 177 std::make_pair(ModelTypeFromInt(i), NotificationInfo())); 178 } 179} 180 181SyncManagerImpl::~SyncManagerImpl() { 182 DCHECK(thread_checker_.CalledOnValidThread()); 183 CHECK(!initialized_); 184} 185 186SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {} 187SyncManagerImpl::NotificationInfo::~NotificationInfo() {} 188 189base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const { 190 base::DictionaryValue* value = new base::DictionaryValue(); 191 value->SetInteger("totalCount", total_count); 192 value->SetString("payload", payload); 193 return value; 194} 195 196bool SyncManagerImpl::VisiblePositionsDiffer( 197 const syncable::EntryKernelMutation& mutation) const { 198 const syncable::EntryKernel& a = mutation.original; 199 const syncable::EntryKernel& b = mutation.mutated; 200 if (!b.ShouldMaintainPosition()) 201 return false; 202 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION))) 203 return true; 204 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID)) 205 return true; 206 return false; 207} 208 209bool SyncManagerImpl::VisiblePropertiesDiffer( 210 const syncable::EntryKernelMutation& mutation, 211 Cryptographer* cryptographer) const { 212 const syncable::EntryKernel& a = mutation.original; 213 const syncable::EntryKernel& b = mutation.mutated; 214 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS); 215 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS); 216 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics), 217 GetModelTypeFromSpecifics(b_specifics)); 218 ModelType model_type = GetModelTypeFromSpecifics(b_specifics); 219 // Suppress updates to items that aren't tracked by any browser model. 220 if (model_type < FIRST_REAL_MODEL_TYPE || 221 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) { 222 return false; 223 } 224 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR)) 225 return true; 226 if (!AreSpecificsEqual(cryptographer, 227 a.ref(syncable::SPECIFICS), 228 b.ref(syncable::SPECIFICS))) { 229 return true; 230 } 231 if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA), 232 b.ref(syncable::ATTACHMENT_METADATA))) { 233 return true; 234 } 235 // We only care if the name has changed if neither specifics is encrypted 236 // (encrypted nodes blow away the NON_UNIQUE_NAME). 237 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() && 238 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME)) 239 return true; 240 if (VisiblePositionsDiffer(mutation)) 241 return true; 242 return false; 243} 244 245ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() { 246 return directory()->InitialSyncEndedTypes(); 247} 248 249ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken( 250 ModelTypeSet types) { 251 ModelTypeSet result; 252 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) { 253 sync_pb::DataTypeProgressMarker marker; 254 directory()->GetDownloadProgress(i.Get(), &marker); 255 256 if (marker.token().empty()) 257 result.Put(i.Get()); 258 } 259 return result; 260} 261 262void SyncManagerImpl::ConfigureSyncer( 263 ConfigureReason reason, 264 ModelTypeSet to_download, 265 ModelTypeSet to_purge, 266 ModelTypeSet to_journal, 267 ModelTypeSet to_unapply, 268 const ModelSafeRoutingInfo& new_routing_info, 269 const base::Closure& ready_task, 270 const base::Closure& retry_task) { 271 DCHECK(thread_checker_.CalledOnValidThread()); 272 DCHECK(!ready_task.is_null()); 273 DCHECK(!retry_task.is_null()); 274 275 DVLOG(1) << "Configuring -" 276 << "\n\t" << "current types: " 277 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info)) 278 << "\n\t" << "types to download: " 279 << ModelTypeSetToString(to_download) 280 << "\n\t" << "types to purge: " 281 << ModelTypeSetToString(to_purge) 282 << "\n\t" << "types to journal: " 283 << ModelTypeSetToString(to_journal) 284 << "\n\t" << "types to unapply: " 285 << ModelTypeSetToString(to_unapply); 286 if (!PurgeDisabledTypes(to_purge, 287 to_journal, 288 to_unapply)) { 289 // We failed to cleanup the types. Invoke the ready task without actually 290 // configuring any types. The caller should detect this as a configuration 291 // failure and act appropriately. 292 ready_task.Run(); 293 return; 294 } 295 296 ConfigurationParams params(GetSourceFromReason(reason), 297 to_download, 298 new_routing_info, 299 ready_task, 300 retry_task); 301 302 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE); 303 scheduler_->ScheduleConfiguration(params); 304} 305 306void SyncManagerImpl::Init( 307 const base::FilePath& database_location, 308 const WeakHandle<JsEventHandler>& event_handler, 309 const GURL& service_url, 310 scoped_ptr<HttpPostProviderFactory> post_factory, 311 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, 312 ExtensionsActivity* extensions_activity, 313 SyncManager::ChangeDelegate* change_delegate, 314 const SyncCredentials& credentials, 315 const std::string& invalidator_client_id, 316 const std::string& restored_key_for_bootstrapping, 317 const std::string& restored_keystore_key_for_bootstrapping, 318 InternalComponentsFactory* internal_components_factory, 319 Encryptor* encryptor, 320 scoped_ptr<UnrecoverableErrorHandler> unrecoverable_error_handler, 321 ReportUnrecoverableErrorFunction report_unrecoverable_error_function, 322 CancelationSignal* cancelation_signal) { 323 CHECK(!initialized_); 324 DCHECK(thread_checker_.CalledOnValidThread()); 325 DCHECK(post_factory.get()); 326 DCHECK(!credentials.email.empty()); 327 DCHECK(!credentials.sync_token.empty()); 328 DCHECK(!credentials.scope_set.empty()); 329 DCHECK(cancelation_signal); 330 DVLOG(1) << "SyncManager starting Init..."; 331 332 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()); 333 334 change_delegate_ = change_delegate; 335 336 AddObserver(&js_sync_manager_observer_); 337 SetJsEventHandler(event_handler); 338 339 AddObserver(&debug_info_event_listener_); 340 341 database_path_ = database_location.Append( 342 syncable::Directory::kSyncDatabaseFilename); 343 unrecoverable_error_handler_ = unrecoverable_error_handler.Pass(); 344 report_unrecoverable_error_function_ = report_unrecoverable_error_function; 345 346 allstatus_.SetHasKeystoreKey( 347 !restored_keystore_key_for_bootstrapping.empty()); 348 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl( 349 &share_, 350 encryptor, 351 restored_key_for_bootstrapping, 352 restored_keystore_key_for_bootstrapping)); 353 sync_encryption_handler_->AddObserver(this); 354 sync_encryption_handler_->AddObserver(&debug_info_event_listener_); 355 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_); 356 357 base::FilePath absolute_db_path = database_path_; 358 DCHECK(absolute_db_path.IsAbsolute()); 359 360 scoped_ptr<syncable::DirectoryBackingStore> backing_store = 361 internal_components_factory->BuildDirectoryBackingStore( 362 credentials.email, absolute_db_path).Pass(); 363 364 DCHECK(backing_store.get()); 365 share_.directory.reset( 366 new syncable::Directory( 367 backing_store.release(), 368 unrecoverable_error_handler_.get(), 369 report_unrecoverable_error_function_, 370 sync_encryption_handler_.get(), 371 sync_encryption_handler_->GetCryptographerUnsafe())); 372 share_.sync_credentials = credentials; 373 374 // UserShare is accessible to a lot of code that doesn't need access to the 375 // sync token so clear sync_token from the UserShare. 376 share_.sync_credentials.sync_token = ""; 377 378 const std::string& username = credentials.email; 379 DVLOG(1) << "Username: " << username; 380 if (!OpenDirectory(username)) { 381 NotifyInitializationFailure(); 382 LOG(ERROR) << "Sync manager initialization failed!"; 383 return; 384 } 385 386 connection_manager_.reset(new SyncAPIServerConnectionManager( 387 service_url.host() + service_url.path(), 388 service_url.EffectiveIntPort(), 389 service_url.SchemeIsSecure(), 390 post_factory.release(), 391 cancelation_signal)); 392 connection_manager_->set_client_id(directory()->cache_guid()); 393 connection_manager_->AddListener(this); 394 395 std::string sync_id = directory()->cache_guid(); 396 397 DVLOG(1) << "Setting sync client ID: " << sync_id; 398 allstatus_.SetSyncId(sync_id); 399 DVLOG(1) << "Setting invalidator client ID: " << invalidator_client_id; 400 allstatus_.SetInvalidatorClientId(invalidator_client_id); 401 402 model_type_registry_.reset(new ModelTypeRegistry(workers, directory(), this)); 403 404 // Bind the SyncContext WeakPtr to this thread. This helps us crash earlier 405 // if the pointer is misused in debug mode. 406 base::WeakPtr<SyncContext> weak_core = model_type_registry_->AsWeakPtr(); 407 weak_core.get(); 408 409 sync_context_proxy_.reset( 410 new SyncContextProxyImpl(base::ThreadTaskRunnerHandle::Get(), weak_core)); 411 412 // Build a SyncSessionContext and store the worker in it. 413 DVLOG(1) << "Sync is bringing up SyncSessionContext."; 414 std::vector<SyncEngineEventListener*> listeners; 415 listeners.push_back(&allstatus_); 416 listeners.push_back(this); 417 session_context_ = internal_components_factory->BuildContext( 418 connection_manager_.get(), 419 directory(), 420 extensions_activity, 421 listeners, 422 &debug_info_event_listener_, 423 model_type_registry_.get(), 424 invalidator_client_id).Pass(); 425 session_context_->set_account_name(credentials.email); 426 scheduler_ = internal_components_factory->BuildScheduler( 427 name_, session_context_.get(), cancelation_signal).Pass(); 428 429 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE); 430 431 initialized_ = true; 432 433 net::NetworkChangeNotifier::AddIPAddressObserver(this); 434 net::NetworkChangeNotifier::AddConnectionTypeObserver(this); 435 observing_network_connectivity_changes_ = true; 436 437 UpdateCredentials(credentials); 438 439 NotifyInitializationSuccess(); 440} 441 442void SyncManagerImpl::NotifyInitializationSuccess() { 443 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 444 OnInitializationComplete( 445 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), 446 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()), 447 true, InitialSyncEndedTypes())); 448} 449 450void SyncManagerImpl::NotifyInitializationFailure() { 451 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 452 OnInitializationComplete( 453 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), 454 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()), 455 false, ModelTypeSet())); 456} 457 458void SyncManagerImpl::OnPassphraseRequired( 459 PassphraseRequiredReason reason, 460 const sync_pb::EncryptedData& pending_keys) { 461 // Does nothing. 462} 463 464void SyncManagerImpl::OnPassphraseAccepted() { 465 // Does nothing. 466} 467 468void SyncManagerImpl::OnBootstrapTokenUpdated( 469 const std::string& bootstrap_token, 470 BootstrapTokenType type) { 471 if (type == KEYSTORE_BOOTSTRAP_TOKEN) 472 allstatus_.SetHasKeystoreKey(true); 473} 474 475void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types, 476 bool encrypt_everything) { 477 allstatus_.SetEncryptedTypes(encrypted_types); 478} 479 480void SyncManagerImpl::OnEncryptionComplete() { 481 // Does nothing. 482} 483 484void SyncManagerImpl::OnCryptographerStateChanged( 485 Cryptographer* cryptographer) { 486 allstatus_.SetCryptographerReady(cryptographer->is_ready()); 487 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys()); 488 allstatus_.SetKeystoreMigrationTime( 489 sync_encryption_handler_->migration_time()); 490} 491 492void SyncManagerImpl::OnPassphraseTypeChanged( 493 PassphraseType type, 494 base::Time explicit_passphrase_time) { 495 allstatus_.SetPassphraseType(type); 496 allstatus_.SetKeystoreMigrationTime( 497 sync_encryption_handler_->migration_time()); 498} 499 500void SyncManagerImpl::StartSyncingNormally( 501 const ModelSafeRoutingInfo& routing_info) { 502 // Start the sync scheduler. 503 // TODO(sync): We always want the newest set of routes when we switch back 504 // to normal mode. Figure out how to enforce set_routing_info is always 505 // appropriately set and that it's only modified when switching to normal 506 // mode. 507 DCHECK(thread_checker_.CalledOnValidThread()); 508 session_context_->SetRoutingInfo(routing_info); 509 scheduler_->Start(SyncScheduler::NORMAL_MODE); 510} 511 512syncable::Directory* SyncManagerImpl::directory() { 513 return share_.directory.get(); 514} 515 516const SyncScheduler* SyncManagerImpl::scheduler() const { 517 return scheduler_.get(); 518} 519 520bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const { 521 return connection_manager_->HasInvalidAuthToken(); 522} 523 524bool SyncManagerImpl::OpenDirectory(const std::string& username) { 525 DCHECK(!initialized_) << "Should only happen once"; 526 527 // Set before Open(). 528 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()); 529 WeakHandle<syncable::TransactionObserver> transaction_observer( 530 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr())); 531 532 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED; 533 open_result = directory()->Open(username, this, transaction_observer); 534 if (open_result != syncable::OPENED) { 535 LOG(ERROR) << "Could not open share for:" << username; 536 return false; 537 } 538 539 // Unapplied datatypes (those that do not have initial sync ended set) get 540 // re-downloaded during any configuration. But, it's possible for a datatype 541 // to have a progress marker but not have initial sync ended yet, making 542 // it a candidate for migration. This is a problem, as the DataTypeManager 543 // does not support a migration while it's already in the middle of a 544 // configuration. As a result, any partially synced datatype can stall the 545 // DTM, waiting for the configuration to complete, which it never will due 546 // to the migration error. In addition, a partially synced nigori will 547 // trigger the migration logic before the backend is initialized, resulting 548 // in crashes. We therefore detect and purge any partially synced types as 549 // part of initialization. 550 if (!PurgePartiallySyncedTypes()) 551 return false; 552 553 return true; 554} 555 556bool SyncManagerImpl::PurgePartiallySyncedTypes() { 557 ModelTypeSet partially_synced_types = ModelTypeSet::All(); 558 partially_synced_types.RemoveAll(InitialSyncEndedTypes()); 559 partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken( 560 ModelTypeSet::All())); 561 562 DVLOG(1) << "Purging partially synced types " 563 << ModelTypeSetToString(partially_synced_types); 564 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes", 565 partially_synced_types.Size()); 566 if (partially_synced_types.Empty()) 567 return true; 568 return directory()->PurgeEntriesWithTypeIn(partially_synced_types, 569 ModelTypeSet(), 570 ModelTypeSet()); 571} 572 573bool SyncManagerImpl::PurgeDisabledTypes( 574 ModelTypeSet to_purge, 575 ModelTypeSet to_journal, 576 ModelTypeSet to_unapply) { 577 if (to_purge.Empty()) 578 return true; 579 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge); 580 DCHECK(to_purge.HasAll(to_journal)); 581 DCHECK(to_purge.HasAll(to_unapply)); 582 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply); 583} 584 585void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) { 586 DCHECK(thread_checker_.CalledOnValidThread()); 587 DCHECK(initialized_); 588 DCHECK(!credentials.email.empty()); 589 DCHECK(!credentials.sync_token.empty()); 590 DCHECK(!credentials.scope_set.empty()); 591 592 observing_network_connectivity_changes_ = true; 593 if (!connection_manager_->SetAuthToken(credentials.sync_token)) 594 return; // Auth token is known to be invalid, so exit early. 595 596 scheduler_->OnCredentialsUpdated(); 597 598 // TODO(zea): pass the credential age to the debug info event listener. 599} 600 601void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) { 602 DCHECK(thread_checker_.CalledOnValidThread()); 603 observers_.AddObserver(observer); 604} 605 606void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) { 607 DCHECK(thread_checker_.CalledOnValidThread()); 608 observers_.RemoveObserver(observer); 609} 610 611void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) { 612 DCHECK(thread_checker_.CalledOnValidThread()); 613 614 // Prevent any in-flight method calls from running. Also 615 // invalidates |weak_handle_this_| and |change_observer_|. 616 weak_ptr_factory_.InvalidateWeakPtrs(); 617 js_mutation_event_observer_.InvalidateWeakPtrs(); 618 619 scheduler_.reset(); 620 session_context_.reset(); 621 model_type_registry_.reset(); 622 623 if (sync_encryption_handler_) { 624 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_); 625 sync_encryption_handler_->RemoveObserver(this); 626 } 627 628 SetJsEventHandler(WeakHandle<JsEventHandler>()); 629 RemoveObserver(&js_sync_manager_observer_); 630 631 RemoveObserver(&debug_info_event_listener_); 632 633 // |connection_manager_| may end up being NULL here in tests (in synchronous 634 // initialization mode). 635 // 636 // TODO(akalin): Fix this behavior. 637 if (connection_manager_) 638 connection_manager_->RemoveListener(this); 639 connection_manager_.reset(); 640 641 net::NetworkChangeNotifier::RemoveIPAddressObserver(this); 642 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this); 643 observing_network_connectivity_changes_ = false; 644 645 if (initialized_ && directory()) { 646 directory()->SaveChanges(); 647 } 648 649 share_.directory.reset(); 650 651 change_delegate_ = NULL; 652 653 initialized_ = false; 654 655 // We reset these here, since only now we know they will not be 656 // accessed from other threads (since we shut down everything). 657 change_observer_.Reset(); 658 weak_handle_this_.Reset(); 659} 660 661void SyncManagerImpl::OnIPAddressChanged() { 662 if (!observing_network_connectivity_changes_) { 663 DVLOG(1) << "IP address change dropped."; 664 return; 665 } 666 DVLOG(1) << "IP address change detected."; 667 OnNetworkConnectivityChangedImpl(); 668} 669 670void SyncManagerImpl::OnConnectionTypeChanged( 671 net::NetworkChangeNotifier::ConnectionType) { 672 if (!observing_network_connectivity_changes_) { 673 DVLOG(1) << "Connection type change dropped."; 674 return; 675 } 676 DVLOG(1) << "Connection type change detected."; 677 OnNetworkConnectivityChangedImpl(); 678} 679 680void SyncManagerImpl::OnNetworkConnectivityChangedImpl() { 681 DCHECK(thread_checker_.CalledOnValidThread()); 682 scheduler_->OnConnectionStatusChange(); 683} 684 685void SyncManagerImpl::OnServerConnectionEvent( 686 const ServerConnectionEvent& event) { 687 DCHECK(thread_checker_.CalledOnValidThread()); 688 if (event.connection_code == 689 HttpResponse::SERVER_CONNECTION_OK) { 690 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 691 OnConnectionStatusChange(CONNECTION_OK)); 692 } 693 694 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) { 695 observing_network_connectivity_changes_ = false; 696 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 697 OnConnectionStatusChange(CONNECTION_AUTH_ERROR)); 698 } 699 700 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) { 701 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 702 OnConnectionStatusChange(CONNECTION_SERVER_ERROR)); 703 } 704} 705 706void SyncManagerImpl::HandleTransactionCompleteChangeEvent( 707 ModelTypeSet models_with_changes) { 708 // This notification happens immediately after the transaction mutex is 709 // released. This allows work to be performed without blocking other threads 710 // from acquiring a transaction. 711 if (!change_delegate_) 712 return; 713 714 // Call commit. 715 for (ModelTypeSet::Iterator it = models_with_changes.First(); 716 it.Good(); it.Inc()) { 717 change_delegate_->OnChangesComplete(it.Get()); 718 change_observer_.Call( 719 FROM_HERE, 720 &SyncManager::ChangeObserver::OnChangesComplete, 721 it.Get()); 722 } 723} 724 725ModelTypeSet 726SyncManagerImpl::HandleTransactionEndingChangeEvent( 727 const ImmutableWriteTransactionInfo& write_transaction_info, 728 syncable::BaseTransaction* trans) { 729 // This notification happens immediately before a syncable WriteTransaction 730 // falls out of scope. It happens while the channel mutex is still held, 731 // and while the transaction mutex is held, so it cannot be re-entrant. 732 if (!change_delegate_ || change_records_.empty()) 733 return ModelTypeSet(); 734 735 // This will continue the WriteTransaction using a read only wrapper. 736 // This is the last chance for read to occur in the WriteTransaction 737 // that's closing. This special ReadTransaction will not close the 738 // underlying transaction. 739 ReadTransaction read_trans(GetUserShare(), trans); 740 741 ModelTypeSet models_with_changes; 742 for (ChangeRecordMap::const_iterator it = change_records_.begin(); 743 it != change_records_.end(); ++it) { 744 DCHECK(!it->second.Get().empty()); 745 ModelType type = ModelTypeFromInt(it->first); 746 change_delegate_-> 747 OnChangesApplied(type, trans->directory()->GetTransactionVersion(type), 748 &read_trans, it->second); 749 change_observer_.Call(FROM_HERE, 750 &SyncManager::ChangeObserver::OnChangesApplied, 751 type, write_transaction_info.Get().id, it->second); 752 models_with_changes.Put(type); 753 } 754 change_records_.clear(); 755 return models_with_changes; 756} 757 758void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi( 759 const ImmutableWriteTransactionInfo& write_transaction_info, 760 syncable::BaseTransaction* trans, 761 std::vector<int64>* entries_changed) { 762 // We have been notified about a user action changing a sync model. 763 LOG_IF(WARNING, !change_records_.empty()) << 764 "CALCULATE_CHANGES called with unapplied old changes."; 765 766 // The mutated model type, or UNSPECIFIED if nothing was mutated. 767 ModelTypeSet mutated_model_types; 768 769 const syncable::ImmutableEntryKernelMutationMap& mutations = 770 write_transaction_info.Get().mutations; 771 for (syncable::EntryKernelMutationMap::const_iterator it = 772 mutations.Get().begin(); it != mutations.Get().end(); ++it) { 773 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) { 774 continue; 775 } 776 777 ModelType model_type = 778 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS)); 779 if (model_type < FIRST_REAL_MODEL_TYPE) { 780 NOTREACHED() << "Permanent or underspecified item changed via syncapi."; 781 continue; 782 } 783 784 // Found real mutation. 785 if (model_type != UNSPECIFIED) { 786 mutated_model_types.Put(model_type); 787 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE)); 788 } 789 } 790 791 // Nudge if necessary. 792 if (!mutated_model_types.Empty()) { 793 if (weak_handle_this_.IsInitialized()) { 794 weak_handle_this_.Call(FROM_HERE, 795 &SyncManagerImpl::RequestNudgeForDataTypes, 796 FROM_HERE, 797 mutated_model_types); 798 } else { 799 NOTREACHED(); 800 } 801 } 802} 803 804void SyncManagerImpl::SetExtraChangeRecordData(int64 id, 805 ModelType type, ChangeReorderBuffer* buffer, 806 Cryptographer* cryptographer, const syncable::EntryKernel& original, 807 bool existed_before, bool exists_now) { 808 // If this is a deletion and the datatype was encrypted, we need to decrypt it 809 // and attach it to the buffer. 810 if (!exists_now && existed_before) { 811 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS)); 812 if (type == PASSWORDS) { 813 // Passwords must use their own legacy ExtraPasswordChangeRecordData. 814 scoped_ptr<sync_pb::PasswordSpecificsData> data( 815 DecryptPasswordSpecifics(original_specifics, cryptographer)); 816 if (!data) { 817 NOTREACHED(); 818 return; 819 } 820 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data)); 821 } else if (original_specifics.has_encrypted()) { 822 // All other datatypes can just create a new unencrypted specifics and 823 // attach it. 824 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted(); 825 if (!cryptographer->Decrypt(encrypted, &original_specifics)) { 826 NOTREACHED(); 827 return; 828 } 829 } 830 buffer->SetSpecificsForId(id, original_specifics); 831 } 832} 833 834void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer( 835 const ImmutableWriteTransactionInfo& write_transaction_info, 836 syncable::BaseTransaction* trans, 837 std::vector<int64>* entries_changed) { 838 // We only expect one notification per sync step, so change_buffers_ should 839 // contain no pending entries. 840 LOG_IF(WARNING, !change_records_.empty()) << 841 "CALCULATE_CHANGES called with unapplied old changes."; 842 843 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT]; 844 845 Cryptographer* crypto = directory()->GetCryptographer(trans); 846 const syncable::ImmutableEntryKernelMutationMap& mutations = 847 write_transaction_info.Get().mutations; 848 for (syncable::EntryKernelMutationMap::const_iterator it = 849 mutations.Get().begin(); it != mutations.Get().end(); ++it) { 850 bool existed_before = !it->second.original.ref(syncable::IS_DEL); 851 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL); 852 853 // Omit items that aren't associated with a model. 854 ModelType type = 855 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS)); 856 if (type < FIRST_REAL_MODEL_TYPE) 857 continue; 858 859 int64 handle = it->first; 860 if (exists_now && !existed_before) 861 change_buffers[type].PushAddedItem(handle); 862 else if (!exists_now && existed_before) 863 change_buffers[type].PushDeletedItem(handle); 864 else if (exists_now && existed_before && 865 VisiblePropertiesDiffer(it->second, crypto)) { 866 change_buffers[type].PushUpdatedItem(handle); 867 } 868 869 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto, 870 it->second.original, existed_before, exists_now); 871 } 872 873 ReadTransaction read_trans(GetUserShare(), trans); 874 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) { 875 if (!change_buffers[i].IsEmpty()) { 876 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans, 877 &(change_records_[i]))) { 878 for (size_t j = 0; j < change_records_[i].Get().size(); ++j) 879 entries_changed->push_back((change_records_[i].Get())[j].id); 880 } 881 if (change_records_[i].Get().empty()) 882 change_records_.erase(i); 883 } 884 } 885} 886 887TimeDelta SyncManagerImpl::GetNudgeDelayTimeDelta( 888 const ModelType& model_type) { 889 return NudgeStrategy::GetNudgeDelayTimeDelta(model_type, this); 890} 891 892void SyncManagerImpl::RequestNudgeForDataTypes( 893 const tracked_objects::Location& nudge_location, 894 ModelTypeSet types) { 895 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get()); 896 897 // TODO(lipalani) : Calculate the nudge delay based on all types. 898 base::TimeDelta nudge_delay = NudgeStrategy::GetNudgeDelayTimeDelta( 899 types.First().Get(), 900 this); 901 scheduler_->ScheduleLocalNudge(nudge_delay, 902 types, 903 nudge_location); 904} 905 906void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) { 907 DCHECK(thread_checker_.CalledOnValidThread()); 908 scheduler_->ScheduleInitialSyncNudge(type); 909} 910 911void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) { 912 DCHECK(thread_checker_.CalledOnValidThread()); 913 RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type)); 914} 915 916void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) { 917 DCHECK(thread_checker_.CalledOnValidThread()); 918 RefreshTypes(ModelTypeSet(type)); 919} 920 921void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) { 922 DCHECK(thread_checker_.CalledOnValidThread()); 923 // Only send an event if this is due to a cycle ending and this cycle 924 // concludes a canonical "sync" process; that is, based on what is known 925 // locally we are "all happy" and up-to-date. There may be new changes on 926 // the server, but we'll get them on a subsequent sync. 927 // 928 // Notifications are sent at the end of every sync cycle, regardless of 929 // whether we should sync again. 930 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) { 931 if (!initialized_) { 932 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not " 933 << "initialized"; 934 return; 935 } 936 937 DVLOG(1) << "Sending OnSyncCycleCompleted"; 938 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 939 OnSyncCycleCompleted(event.snapshot)); 940 } 941} 942 943void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) { 944 FOR_EACH_OBSERVER( 945 SyncManager::Observer, observers_, 946 OnActionableError(error)); 947} 948 949void SyncManagerImpl::OnRetryTimeChanged(base::Time) {} 950 951void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {} 952 953void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) { 954 FOR_EACH_OBSERVER( 955 SyncManager::Observer, observers_, 956 OnMigrationRequested(types)); 957} 958 959void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) { 960 protocol_event_buffer_.RecordProtocolEvent(event); 961 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, 962 OnProtocolEvent(event)); 963} 964 965void SyncManagerImpl::SetJsEventHandler( 966 const WeakHandle<JsEventHandler>& event_handler) { 967 js_sync_manager_observer_.SetJsEventHandler(event_handler); 968 js_mutation_event_observer_.SetJsEventHandler(event_handler); 969 js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler); 970} 971 972scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType( 973 syncer::ModelType type) { 974 DirectoryTypeDebugInfoEmitterMap* emitter_map = 975 model_type_registry_->directory_type_debug_info_emitter_map(); 976 DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type); 977 978 if (it == emitter_map->end()) { 979 // This can happen in some cases. The UI thread makes requests of us 980 // when it doesn't really know which types are enabled or disabled. 981 DLOG(WARNING) << "Asked to return debug info for invalid type " 982 << ModelTypeToString(type); 983 return scoped_ptr<base::ListValue>(); 984 } 985 986 return it->second->GetAllNodes(); 987} 988 989void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) { 990 DCHECK(thread_checker_.CalledOnValidThread()); 991 992 DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled; 993 allstatus_.SetNotificationsEnabled(invalidator_enabled); 994 scheduler_->SetNotificationsEnabled(invalidator_enabled); 995} 996 997void SyncManagerImpl::OnIncomingInvalidation( 998 syncer::ModelType type, 999 scoped_ptr<InvalidationInterface> invalidation) { 1000 DCHECK(thread_checker_.CalledOnValidThread()); 1001 1002 scheduler_->ScheduleInvalidationNudge( 1003 TimeDelta::FromMilliseconds(kSyncSchedulerDelayMsec), 1004 type, 1005 invalidation.Pass(), 1006 FROM_HERE); 1007} 1008 1009void SyncManagerImpl::RefreshTypes(ModelTypeSet types) { 1010 DCHECK(thread_checker_.CalledOnValidThread()); 1011 if (types.Empty()) { 1012 LOG(WARNING) << "Sync received refresh request with no types specified."; 1013 } else { 1014 scheduler_->ScheduleLocalRefreshRequest( 1015 TimeDelta::FromMilliseconds(kSyncRefreshDelayMsec), 1016 types, FROM_HERE); 1017 } 1018} 1019 1020SyncStatus SyncManagerImpl::GetDetailedStatus() const { 1021 return allstatus_.status(); 1022} 1023 1024void SyncManagerImpl::SaveChanges() { 1025 directory()->SaveChanges(); 1026} 1027 1028UserShare* SyncManagerImpl::GetUserShare() { 1029 DCHECK(initialized_); 1030 return &share_; 1031} 1032 1033syncer::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() { 1034 DCHECK(initialized_); 1035 return sync_context_proxy_.get(); 1036} 1037 1038const std::string SyncManagerImpl::cache_guid() { 1039 DCHECK(initialized_); 1040 return directory()->cache_guid(); 1041} 1042 1043bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) { 1044 ReadTransaction trans(FROM_HERE, GetUserShare()); 1045 ReadNode nigori_node(&trans); 1046 if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) { 1047 DVLOG(1) << "Couldn't find Nigori node."; 1048 return false; 1049 } 1050 bool found_experiment = false; 1051 1052 ReadNode favicon_sync_node(&trans); 1053 if (favicon_sync_node.InitByClientTagLookup( 1054 syncer::EXPERIMENTS, 1055 syncer::kFaviconSyncTag) == BaseNode::INIT_OK) { 1056 experiments->favicon_sync_limit = 1057 favicon_sync_node.GetExperimentsSpecifics().favicon_sync(). 1058 favicon_sync_limit(); 1059 found_experiment = true; 1060 } 1061 1062 ReadNode pre_commit_update_avoidance_node(&trans); 1063 if (pre_commit_update_avoidance_node.InitByClientTagLookup( 1064 syncer::EXPERIMENTS, 1065 syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) { 1066 session_context_->set_server_enabled_pre_commit_update_avoidance( 1067 pre_commit_update_avoidance_node.GetExperimentsSpecifics(). 1068 pre_commit_update_avoidance().enabled()); 1069 // We don't bother setting found_experiment. The frontend doesn't need to 1070 // know about this. 1071 } 1072 1073 ReadNode gcm_channel_node(&trans); 1074 if (gcm_channel_node.InitByClientTagLookup( 1075 syncer::EXPERIMENTS, 1076 syncer::kGCMChannelTag) == BaseNode::INIT_OK && 1077 gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) { 1078 experiments->gcm_channel_state = 1079 (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ? 1080 syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED); 1081 found_experiment = true; 1082 } 1083 1084 ReadNode enhanced_bookmarks_node(&trans); 1085 if (enhanced_bookmarks_node.InitByClientTagLookup( 1086 syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) == 1087 BaseNode::INIT_OK && 1088 enhanced_bookmarks_node.GetExperimentsSpecifics() 1089 .has_enhanced_bookmarks()) { 1090 const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks = 1091 enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks(); 1092 if (enhanced_bookmarks.has_enabled()) 1093 experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled(); 1094 if (enhanced_bookmarks.has_extension_id()) { 1095 experiments->enhanced_bookmarks_ext_id = 1096 enhanced_bookmarks.extension_id(); 1097 } 1098 found_experiment = true; 1099 } 1100 1101 ReadNode gcm_invalidations_node(&trans); 1102 if (gcm_invalidations_node.InitByClientTagLookup( 1103 syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) == 1104 BaseNode::INIT_OK) { 1105 const sync_pb::GcmInvalidationsFlags& gcm_invalidations = 1106 gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations(); 1107 if (gcm_invalidations.has_enabled()) { 1108 experiments->gcm_invalidations_enabled = gcm_invalidations.enabled(); 1109 found_experiment = true; 1110 } 1111 } 1112 1113 return found_experiment; 1114} 1115 1116bool SyncManagerImpl::HasUnsyncedItems() { 1117 ReadTransaction trans(FROM_HERE, GetUserShare()); 1118 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0); 1119} 1120 1121SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() { 1122 return sync_encryption_handler_.get(); 1123} 1124 1125ScopedVector<syncer::ProtocolEvent> 1126 SyncManagerImpl::GetBufferedProtocolEvents() { 1127 return protocol_event_buffer_.GetBufferedProtocolEvents(); 1128} 1129 1130void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver( 1131 syncer::TypeDebugInfoObserver* observer) { 1132 model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer); 1133} 1134 1135void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver( 1136 syncer::TypeDebugInfoObserver* observer) { 1137 model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer); 1138} 1139 1140bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver( 1141 syncer::TypeDebugInfoObserver* observer) { 1142 return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer); 1143} 1144 1145void SyncManagerImpl::RequestEmitDebugInfo() { 1146 model_type_registry_->RequestEmitDebugInfo(); 1147} 1148 1149// static. 1150int SyncManagerImpl::GetDefaultNudgeDelay() { 1151 return kDefaultNudgeDelayMilliseconds; 1152} 1153 1154// static. 1155int SyncManagerImpl::GetPreferencesNudgeDelay() { 1156 return kPreferencesNudgeDelayMilliseconds; 1157} 1158 1159} // namespace syncer 1160