1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/internal_api/sync_manager_impl.h"
6
7#include <string>
8
9#include "base/base64.h"
10#include "base/bind.h"
11#include "base/callback.h"
12#include "base/compiler_specific.h"
13#include "base/json/json_writer.h"
14#include "base/memory/ref_counted.h"
15#include "base/metrics/histogram.h"
16#include "base/observer_list.h"
17#include "base/strings/string_number_conversions.h"
18#include "base/thread_task_runner_handle.h"
19#include "base/values.h"
20#include "sync/engine/sync_scheduler.h"
21#include "sync/engine/syncer_types.h"
22#include "sync/internal_api/change_reorder_buffer.h"
23#include "sync/internal_api/public/base/cancelation_signal.h"
24#include "sync/internal_api/public/base/invalidation_interface.h"
25#include "sync/internal_api/public/base/model_type.h"
26#include "sync/internal_api/public/base_node.h"
27#include "sync/internal_api/public/configure_reason.h"
28#include "sync/internal_api/public/engine/polling_constants.h"
29#include "sync/internal_api/public/http_post_provider_factory.h"
30#include "sync/internal_api/public/internal_components_factory.h"
31#include "sync/internal_api/public/read_node.h"
32#include "sync/internal_api/public/read_transaction.h"
33#include "sync/internal_api/public/sync_context.h"
34#include "sync/internal_api/public/sync_context_proxy.h"
35#include "sync/internal_api/public/user_share.h"
36#include "sync/internal_api/public/util/experiments.h"
37#include "sync/internal_api/public/write_node.h"
38#include "sync/internal_api/public/write_transaction.h"
39#include "sync/internal_api/sync_context_proxy_impl.h"
40#include "sync/internal_api/syncapi_internal.h"
41#include "sync/internal_api/syncapi_server_connection_manager.h"
42#include "sync/protocol/proto_value_conversions.h"
43#include "sync/protocol/sync.pb.h"
44#include "sync/sessions/directory_type_debug_info_emitter.h"
45#include "sync/syncable/directory.h"
46#include "sync/syncable/entry.h"
47#include "sync/syncable/in_memory_directory_backing_store.h"
48#include "sync/syncable/on_disk_directory_backing_store.h"
49
50using base::TimeDelta;
51using sync_pb::GetUpdatesCallerInfo;
52
53class GURL;
54
55namespace syncer {
56
57using sessions::SyncSessionContext;
58using syncable::ImmutableWriteTransactionInfo;
59using syncable::SPECIFICS;
60using syncable::UNIQUE_POSITION;
61
62namespace {
63
64GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
65    ConfigureReason reason) {
66  switch (reason) {
67    case CONFIGURE_REASON_RECONFIGURATION:
68      return GetUpdatesCallerInfo::RECONFIGURATION;
69    case CONFIGURE_REASON_MIGRATION:
70      return GetUpdatesCallerInfo::MIGRATION;
71    case CONFIGURE_REASON_NEW_CLIENT:
72      return GetUpdatesCallerInfo::NEW_CLIENT;
73    case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
74    case CONFIGURE_REASON_CRYPTO:
75      return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
76    case CONFIGURE_REASON_PROGRAMMATIC:
77      return GetUpdatesCallerInfo::PROGRAMMATIC;
78    default:
79      NOTREACHED();
80  }
81  return GetUpdatesCallerInfo::UNKNOWN;
82}
83
84}  // namespace
85
86SyncManagerImpl::SyncManagerImpl(const std::string& name)
87    : name_(name),
88      change_delegate_(NULL),
89      initialized_(false),
90      observing_network_connectivity_changes_(false),
91      report_unrecoverable_error_function_(NULL),
92      weak_ptr_factory_(this) {
93  // Pre-fill |notification_info_map_|.
94  for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
95    notification_info_map_.insert(
96        std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
97  }
98}
99
100SyncManagerImpl::~SyncManagerImpl() {
101  DCHECK(thread_checker_.CalledOnValidThread());
102  CHECK(!initialized_);
103}
104
105SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
106SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
107
108base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
109  base::DictionaryValue* value = new base::DictionaryValue();
110  value->SetInteger("totalCount", total_count);
111  value->SetString("payload", payload);
112  return value;
113}
114
115bool SyncManagerImpl::VisiblePositionsDiffer(
116    const syncable::EntryKernelMutation& mutation) const {
117  const syncable::EntryKernel& a = mutation.original;
118  const syncable::EntryKernel& b = mutation.mutated;
119  if (!b.ShouldMaintainPosition())
120    return false;
121  if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
122    return true;
123  if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
124    return true;
125  return false;
126}
127
128bool SyncManagerImpl::VisiblePropertiesDiffer(
129    const syncable::EntryKernelMutation& mutation,
130    Cryptographer* cryptographer) const {
131  const syncable::EntryKernel& a = mutation.original;
132  const syncable::EntryKernel& b = mutation.mutated;
133  const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
134  const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
135  DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
136            GetModelTypeFromSpecifics(b_specifics));
137  ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
138  // Suppress updates to items that aren't tracked by any browser model.
139  if (model_type < FIRST_REAL_MODEL_TYPE ||
140      !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
141    return false;
142  }
143  if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
144    return true;
145  if (!AreSpecificsEqual(cryptographer,
146                         a.ref(syncable::SPECIFICS),
147                         b.ref(syncable::SPECIFICS))) {
148    return true;
149  }
150  if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA),
151                                  b.ref(syncable::ATTACHMENT_METADATA))) {
152    return true;
153  }
154  // We only care if the name has changed if neither specifics is encrypted
155  // (encrypted nodes blow away the NON_UNIQUE_NAME).
156  if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
157      a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
158    return true;
159  if (VisiblePositionsDiffer(mutation))
160    return true;
161  return false;
162}
163
164ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
165  return directory()->InitialSyncEndedTypes();
166}
167
168ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
169    ModelTypeSet types) {
170  ModelTypeSet result;
171  for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
172    sync_pb::DataTypeProgressMarker marker;
173    directory()->GetDownloadProgress(i.Get(), &marker);
174
175    if (marker.token().empty())
176      result.Put(i.Get());
177  }
178  return result;
179}
180
181void SyncManagerImpl::ConfigureSyncer(
182    ConfigureReason reason,
183    ModelTypeSet to_download,
184    ModelTypeSet to_purge,
185    ModelTypeSet to_journal,
186    ModelTypeSet to_unapply,
187    const ModelSafeRoutingInfo& new_routing_info,
188    const base::Closure& ready_task,
189    const base::Closure& retry_task) {
190  DCHECK(thread_checker_.CalledOnValidThread());
191  DCHECK(!ready_task.is_null());
192  DCHECK(!retry_task.is_null());
193  DCHECK(initialized_);
194
195  DVLOG(1) << "Configuring -"
196           << "\n\t" << "current types: "
197           << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
198           << "\n\t" << "types to download: "
199           << ModelTypeSetToString(to_download)
200           << "\n\t" << "types to purge: "
201           << ModelTypeSetToString(to_purge)
202           << "\n\t" << "types to journal: "
203           << ModelTypeSetToString(to_journal)
204           << "\n\t" << "types to unapply: "
205           << ModelTypeSetToString(to_unapply);
206  if (!PurgeDisabledTypes(to_purge,
207                          to_journal,
208                          to_unapply)) {
209    // We failed to cleanup the types. Invoke the ready task without actually
210    // configuring any types. The caller should detect this as a configuration
211    // failure and act appropriately.
212    ready_task.Run();
213    return;
214  }
215
216  ConfigurationParams params(GetSourceFromReason(reason),
217                             to_download,
218                             new_routing_info,
219                             ready_task,
220                             retry_task);
221
222  scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
223  scheduler_->ScheduleConfiguration(params);
224}
225
226void SyncManagerImpl::Init(InitArgs* args) {
227  CHECK(!initialized_);
228  DCHECK(thread_checker_.CalledOnValidThread());
229  DCHECK(args->post_factory.get());
230  DCHECK(!args->credentials.email.empty());
231  DCHECK(!args->credentials.sync_token.empty());
232  DCHECK(!args->credentials.scope_set.empty());
233  DCHECK(args->cancelation_signal);
234  DVLOG(1) << "SyncManager starting Init...";
235
236  weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
237
238  change_delegate_ = args->change_delegate;
239
240  AddObserver(&js_sync_manager_observer_);
241  SetJsEventHandler(args->event_handler);
242
243  AddObserver(&debug_info_event_listener_);
244
245  database_path_ = args->database_location.Append(
246      syncable::Directory::kSyncDatabaseFilename);
247  unrecoverable_error_handler_ = args->unrecoverable_error_handler.Pass();
248  report_unrecoverable_error_function_ =
249      args->report_unrecoverable_error_function;
250
251  allstatus_.SetHasKeystoreKey(
252      !args->restored_keystore_key_for_bootstrapping.empty());
253  sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
254      &share_,
255      args->encryptor,
256      args->restored_key_for_bootstrapping,
257      args->restored_keystore_key_for_bootstrapping));
258  sync_encryption_handler_->AddObserver(this);
259  sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
260  sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
261
262  base::FilePath absolute_db_path = database_path_;
263  DCHECK(absolute_db_path.IsAbsolute());
264
265  scoped_ptr<syncable::DirectoryBackingStore> backing_store =
266      args->internal_components_factory->BuildDirectoryBackingStore(
267          InternalComponentsFactory::STORAGE_ON_DISK,
268          args->credentials.email, absolute_db_path).Pass();
269
270  DCHECK(backing_store.get());
271  share_.directory.reset(
272      new syncable::Directory(
273          backing_store.release(),
274          unrecoverable_error_handler_.get(),
275          report_unrecoverable_error_function_,
276          sync_encryption_handler_.get(),
277          sync_encryption_handler_->GetCryptographerUnsafe()));
278  share_.sync_credentials = args->credentials;
279
280  // UserShare is accessible to a lot of code that doesn't need access to the
281  // sync token so clear sync_token from the UserShare.
282  share_.sync_credentials.sync_token = "";
283
284  const std::string& username = args->credentials.email;
285  DVLOG(1) << "Username: " << username;
286  if (!OpenDirectory(username)) {
287    NotifyInitializationFailure();
288    LOG(ERROR) << "Sync manager initialization failed!";
289    return;
290  }
291
292  connection_manager_.reset(new SyncAPIServerConnectionManager(
293      args->service_url.host() + args->service_url.path(),
294      args->service_url.EffectiveIntPort(),
295      args->service_url.SchemeIsSecure(),
296      args->post_factory.release(),
297      args->cancelation_signal));
298  connection_manager_->set_client_id(directory()->cache_guid());
299  connection_manager_->AddListener(this);
300
301  std::string sync_id = directory()->cache_guid();
302
303  DVLOG(1) << "Setting sync client ID: " << sync_id;
304  allstatus_.SetSyncId(sync_id);
305  DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id;
306  allstatus_.SetInvalidatorClientId(args->invalidator_client_id);
307
308  model_type_registry_.reset(
309      new ModelTypeRegistry(args->workers, directory(), this));
310  sync_encryption_handler_->AddObserver(model_type_registry_.get());
311
312  // Bind the SyncContext WeakPtr to this thread.  This helps us crash earlier
313  // if the pointer is misused in debug mode.
314  base::WeakPtr<SyncContext> weak_core = model_type_registry_->AsWeakPtr();
315  weak_core.get();
316
317  sync_context_proxy_.reset(
318      new SyncContextProxyImpl(base::ThreadTaskRunnerHandle::Get(), weak_core));
319
320  // Build a SyncSessionContext and store the worker in it.
321  DVLOG(1) << "Sync is bringing up SyncSessionContext.";
322  std::vector<SyncEngineEventListener*> listeners;
323  listeners.push_back(&allstatus_);
324  listeners.push_back(this);
325  session_context_ =
326      args->internal_components_factory->BuildContext(
327                                             connection_manager_.get(),
328                                             directory(),
329                                             args->extensions_activity,
330                                             listeners,
331                                             &debug_info_event_listener_,
332                                             model_type_registry_.get(),
333                                             args->invalidator_client_id)
334          .Pass();
335  session_context_->set_account_name(args->credentials.email);
336  scheduler_ = args->internal_components_factory->BuildScheduler(
337      name_, session_context_.get(), args->cancelation_signal).Pass();
338
339  scheduler_->Start(SyncScheduler::CONFIGURATION_MODE);
340
341  initialized_ = true;
342
343  net::NetworkChangeNotifier::AddIPAddressObserver(this);
344  net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
345  observing_network_connectivity_changes_ = true;
346
347  UpdateCredentials(args->credentials);
348
349  NotifyInitializationSuccess();
350}
351
352void SyncManagerImpl::NotifyInitializationSuccess() {
353  FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
354                    OnInitializationComplete(
355                        MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
356                        MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
357                        true, InitialSyncEndedTypes()));
358}
359
360void SyncManagerImpl::NotifyInitializationFailure() {
361  FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
362                    OnInitializationComplete(
363                        MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
364                        MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
365                        false, ModelTypeSet()));
366}
367
368void SyncManagerImpl::OnPassphraseRequired(
369    PassphraseRequiredReason reason,
370    const sync_pb::EncryptedData& pending_keys) {
371  // Does nothing.
372}
373
374void SyncManagerImpl::OnPassphraseAccepted() {
375  // Does nothing.
376}
377
378void SyncManagerImpl::OnBootstrapTokenUpdated(
379    const std::string& bootstrap_token,
380    BootstrapTokenType type) {
381  if (type == KEYSTORE_BOOTSTRAP_TOKEN)
382    allstatus_.SetHasKeystoreKey(true);
383}
384
385void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
386                                              bool encrypt_everything) {
387  allstatus_.SetEncryptedTypes(encrypted_types);
388}
389
390void SyncManagerImpl::OnEncryptionComplete() {
391  // Does nothing.
392}
393
394void SyncManagerImpl::OnCryptographerStateChanged(
395    Cryptographer* cryptographer) {
396  allstatus_.SetCryptographerReady(cryptographer->is_ready());
397  allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
398  allstatus_.SetKeystoreMigrationTime(
399      sync_encryption_handler_->migration_time());
400}
401
402void SyncManagerImpl::OnPassphraseTypeChanged(
403    PassphraseType type,
404    base::Time explicit_passphrase_time) {
405  allstatus_.SetPassphraseType(type);
406  allstatus_.SetKeystoreMigrationTime(
407      sync_encryption_handler_->migration_time());
408}
409
410void SyncManagerImpl::StartSyncingNormally(
411    const ModelSafeRoutingInfo& routing_info) {
412  // Start the sync scheduler.
413  // TODO(sync): We always want the newest set of routes when we switch back
414  // to normal mode. Figure out how to enforce set_routing_info is always
415  // appropriately set and that it's only modified when switching to normal
416  // mode.
417  DCHECK(thread_checker_.CalledOnValidThread());
418  session_context_->SetRoutingInfo(routing_info);
419  scheduler_->Start(SyncScheduler::NORMAL_MODE);
420}
421
422syncable::Directory* SyncManagerImpl::directory() {
423  return share_.directory.get();
424}
425
426const SyncScheduler* SyncManagerImpl::scheduler() const {
427  return scheduler_.get();
428}
429
430bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
431  return connection_manager_->HasInvalidAuthToken();
432}
433
434bool SyncManagerImpl::OpenDirectory(const std::string& username) {
435  DCHECK(!initialized_) << "Should only happen once";
436
437  // Set before Open().
438  change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
439  WeakHandle<syncable::TransactionObserver> transaction_observer(
440      MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
441
442  syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
443  open_result = directory()->Open(username, this, transaction_observer);
444  if (open_result != syncable::OPENED) {
445    LOG(ERROR) << "Could not open share for:" << username;
446    return false;
447  }
448
449  // Unapplied datatypes (those that do not have initial sync ended set) get
450  // re-downloaded during any configuration. But, it's possible for a datatype
451  // to have a progress marker but not have initial sync ended yet, making
452  // it a candidate for migration. This is a problem, as the DataTypeManager
453  // does not support a migration while it's already in the middle of a
454  // configuration. As a result, any partially synced datatype can stall the
455  // DTM, waiting for the configuration to complete, which it never will due
456  // to the migration error. In addition, a partially synced nigori will
457  // trigger the migration logic before the backend is initialized, resulting
458  // in crashes. We therefore detect and purge any partially synced types as
459  // part of initialization.
460  if (!PurgePartiallySyncedTypes())
461    return false;
462
463  return true;
464}
465
466bool SyncManagerImpl::PurgePartiallySyncedTypes() {
467  ModelTypeSet partially_synced_types = ModelTypeSet::All();
468  partially_synced_types.RemoveAll(InitialSyncEndedTypes());
469  partially_synced_types.RemoveAll(GetTypesWithEmptyProgressMarkerToken(
470      ModelTypeSet::All()));
471
472  DVLOG(1) << "Purging partially synced types "
473           << ModelTypeSetToString(partially_synced_types);
474  UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
475                       partially_synced_types.Size());
476  if (partially_synced_types.Empty())
477    return true;
478  return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
479                                             ModelTypeSet(),
480                                             ModelTypeSet());
481}
482
483bool SyncManagerImpl::PurgeDisabledTypes(
484    ModelTypeSet to_purge,
485    ModelTypeSet to_journal,
486    ModelTypeSet to_unapply) {
487  if (to_purge.Empty())
488    return true;
489  DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
490  DCHECK(to_purge.HasAll(to_journal));
491  DCHECK(to_purge.HasAll(to_unapply));
492  return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
493}
494
495void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
496  DCHECK(thread_checker_.CalledOnValidThread());
497  DCHECK(initialized_);
498  DCHECK(!credentials.email.empty());
499  DCHECK(!credentials.sync_token.empty());
500  DCHECK(!credentials.scope_set.empty());
501
502  observing_network_connectivity_changes_ = true;
503  if (!connection_manager_->SetAuthToken(credentials.sync_token))
504    return;  // Auth token is known to be invalid, so exit early.
505
506  scheduler_->OnCredentialsUpdated();
507
508  // TODO(zea): pass the credential age to the debug info event listener.
509}
510
511void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
512  DCHECK(thread_checker_.CalledOnValidThread());
513  observers_.AddObserver(observer);
514}
515
516void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
517  DCHECK(thread_checker_.CalledOnValidThread());
518  observers_.RemoveObserver(observer);
519}
520
521void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) {
522  DCHECK(thread_checker_.CalledOnValidThread());
523
524  // Prevent any in-flight method calls from running.  Also
525  // invalidates |weak_handle_this_| and |change_observer_|.
526  weak_ptr_factory_.InvalidateWeakPtrs();
527  js_mutation_event_observer_.InvalidateWeakPtrs();
528
529  scheduler_.reset();
530  session_context_.reset();
531
532  if (model_type_registry_)
533    sync_encryption_handler_->RemoveObserver(model_type_registry_.get());
534
535  model_type_registry_.reset();
536
537  if (sync_encryption_handler_) {
538    sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
539    sync_encryption_handler_->RemoveObserver(this);
540  }
541
542  SetJsEventHandler(WeakHandle<JsEventHandler>());
543  RemoveObserver(&js_sync_manager_observer_);
544
545  RemoveObserver(&debug_info_event_listener_);
546
547  // |connection_manager_| may end up being NULL here in tests (in synchronous
548  // initialization mode).
549  //
550  // TODO(akalin): Fix this behavior.
551  if (connection_manager_)
552    connection_manager_->RemoveListener(this);
553  connection_manager_.reset();
554
555  net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
556  net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
557  observing_network_connectivity_changes_ = false;
558
559  if (initialized_ && directory()) {
560    directory()->SaveChanges();
561  }
562
563  share_.directory.reset();
564
565  change_delegate_ = NULL;
566
567  initialized_ = false;
568
569  // We reset these here, since only now we know they will not be
570  // accessed from other threads (since we shut down everything).
571  change_observer_.Reset();
572  weak_handle_this_.Reset();
573}
574
575void SyncManagerImpl::OnIPAddressChanged() {
576  if (!observing_network_connectivity_changes_) {
577    DVLOG(1) << "IP address change dropped.";
578    return;
579  }
580  DVLOG(1) << "IP address change detected.";
581  OnNetworkConnectivityChangedImpl();
582}
583
584void SyncManagerImpl::OnConnectionTypeChanged(
585  net::NetworkChangeNotifier::ConnectionType) {
586  if (!observing_network_connectivity_changes_) {
587    DVLOG(1) << "Connection type change dropped.";
588    return;
589  }
590  DVLOG(1) << "Connection type change detected.";
591  OnNetworkConnectivityChangedImpl();
592}
593
594void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
595  DCHECK(thread_checker_.CalledOnValidThread());
596  scheduler_->OnConnectionStatusChange();
597}
598
599void SyncManagerImpl::OnServerConnectionEvent(
600    const ServerConnectionEvent& event) {
601  DCHECK(thread_checker_.CalledOnValidThread());
602  if (event.connection_code ==
603      HttpResponse::SERVER_CONNECTION_OK) {
604    FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
605                      OnConnectionStatusChange(CONNECTION_OK));
606  }
607
608  if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
609    observing_network_connectivity_changes_ = false;
610    FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
611                      OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
612  }
613
614  if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
615    FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
616                      OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
617  }
618}
619
620void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
621    ModelTypeSet models_with_changes) {
622  // This notification happens immediately after the transaction mutex is
623  // released. This allows work to be performed without blocking other threads
624  // from acquiring a transaction.
625  if (!change_delegate_)
626    return;
627
628  // Call commit.
629  for (ModelTypeSet::Iterator it = models_with_changes.First();
630       it.Good(); it.Inc()) {
631    change_delegate_->OnChangesComplete(it.Get());
632    change_observer_.Call(
633        FROM_HERE,
634        &SyncManager::ChangeObserver::OnChangesComplete,
635        it.Get());
636  }
637}
638
639ModelTypeSet
640SyncManagerImpl::HandleTransactionEndingChangeEvent(
641    const ImmutableWriteTransactionInfo& write_transaction_info,
642    syncable::BaseTransaction* trans) {
643  // This notification happens immediately before a syncable WriteTransaction
644  // falls out of scope. It happens while the channel mutex is still held,
645  // and while the transaction mutex is held, so it cannot be re-entrant.
646  if (!change_delegate_ || change_records_.empty())
647    return ModelTypeSet();
648
649  // This will continue the WriteTransaction using a read only wrapper.
650  // This is the last chance for read to occur in the WriteTransaction
651  // that's closing. This special ReadTransaction will not close the
652  // underlying transaction.
653  ReadTransaction read_trans(GetUserShare(), trans);
654
655  ModelTypeSet models_with_changes;
656  for (ChangeRecordMap::const_iterator it = change_records_.begin();
657      it != change_records_.end(); ++it) {
658    DCHECK(!it->second.Get().empty());
659    ModelType type = ModelTypeFromInt(it->first);
660    change_delegate_->
661        OnChangesApplied(type, trans->directory()->GetTransactionVersion(type),
662                         &read_trans, it->second);
663    change_observer_.Call(FROM_HERE,
664        &SyncManager::ChangeObserver::OnChangesApplied,
665        type, write_transaction_info.Get().id, it->second);
666    models_with_changes.Put(type);
667  }
668  change_records_.clear();
669  return models_with_changes;
670}
671
672void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
673    const ImmutableWriteTransactionInfo& write_transaction_info,
674    syncable::BaseTransaction* trans,
675    std::vector<int64>* entries_changed) {
676  // We have been notified about a user action changing a sync model.
677  LOG_IF(WARNING, !change_records_.empty()) <<
678      "CALCULATE_CHANGES called with unapplied old changes.";
679
680  // The mutated model type, or UNSPECIFIED if nothing was mutated.
681  ModelTypeSet mutated_model_types;
682
683  const syncable::ImmutableEntryKernelMutationMap& mutations =
684      write_transaction_info.Get().mutations;
685  for (syncable::EntryKernelMutationMap::const_iterator it =
686           mutations.Get().begin(); it != mutations.Get().end(); ++it) {
687    if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
688      continue;
689    }
690
691    ModelType model_type =
692        GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
693    if (model_type < FIRST_REAL_MODEL_TYPE) {
694      NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
695      continue;
696    }
697
698    // Found real mutation.
699    if (model_type != UNSPECIFIED) {
700      mutated_model_types.Put(model_type);
701      entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
702    }
703  }
704
705  // Nudge if necessary.
706  if (!mutated_model_types.Empty()) {
707    if (weak_handle_this_.IsInitialized()) {
708      weak_handle_this_.Call(FROM_HERE,
709                             &SyncManagerImpl::RequestNudgeForDataTypes,
710                             FROM_HERE,
711                             mutated_model_types);
712    } else {
713      NOTREACHED();
714    }
715  }
716}
717
718void SyncManagerImpl::SetExtraChangeRecordData(int64 id,
719    ModelType type, ChangeReorderBuffer* buffer,
720    Cryptographer* cryptographer, const syncable::EntryKernel& original,
721    bool existed_before, bool exists_now) {
722  // If this is a deletion and the datatype was encrypted, we need to decrypt it
723  // and attach it to the buffer.
724  if (!exists_now && existed_before) {
725    sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
726    if (type == PASSWORDS) {
727      // Passwords must use their own legacy ExtraPasswordChangeRecordData.
728      scoped_ptr<sync_pb::PasswordSpecificsData> data(
729          DecryptPasswordSpecifics(original_specifics, cryptographer));
730      if (!data) {
731        NOTREACHED();
732        return;
733      }
734      buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
735    } else if (original_specifics.has_encrypted()) {
736      // All other datatypes can just create a new unencrypted specifics and
737      // attach it.
738      const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
739      if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
740        NOTREACHED();
741        return;
742      }
743    }
744    buffer->SetSpecificsForId(id, original_specifics);
745  }
746}
747
748void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
749    const ImmutableWriteTransactionInfo& write_transaction_info,
750    syncable::BaseTransaction* trans,
751    std::vector<int64>* entries_changed) {
752  // We only expect one notification per sync step, so change_buffers_ should
753  // contain no pending entries.
754  LOG_IF(WARNING, !change_records_.empty()) <<
755      "CALCULATE_CHANGES called with unapplied old changes.";
756
757  ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
758
759  Cryptographer* crypto = directory()->GetCryptographer(trans);
760  const syncable::ImmutableEntryKernelMutationMap& mutations =
761      write_transaction_info.Get().mutations;
762  for (syncable::EntryKernelMutationMap::const_iterator it =
763           mutations.Get().begin(); it != mutations.Get().end(); ++it) {
764    bool existed_before = !it->second.original.ref(syncable::IS_DEL);
765    bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
766
767    // Omit items that aren't associated with a model.
768    ModelType type =
769        GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
770    if (type < FIRST_REAL_MODEL_TYPE)
771      continue;
772
773    int64 handle = it->first;
774    if (exists_now && !existed_before)
775      change_buffers[type].PushAddedItem(handle);
776    else if (!exists_now && existed_before)
777      change_buffers[type].PushDeletedItem(handle);
778    else if (exists_now && existed_before &&
779             VisiblePropertiesDiffer(it->second, crypto)) {
780      change_buffers[type].PushUpdatedItem(handle);
781    }
782
783    SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
784                             it->second.original, existed_before, exists_now);
785  }
786
787  ReadTransaction read_trans(GetUserShare(), trans);
788  for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
789    if (!change_buffers[i].IsEmpty()) {
790      if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
791                                                     &(change_records_[i]))) {
792        for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
793          entries_changed->push_back((change_records_[i].Get())[j].id);
794      }
795      if (change_records_[i].Get().empty())
796        change_records_.erase(i);
797    }
798  }
799}
800
801void SyncManagerImpl::RequestNudgeForDataTypes(
802    const tracked_objects::Location& nudge_location,
803    ModelTypeSet types) {
804  debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
805
806  scheduler_->ScheduleLocalNudge(types, nudge_location);
807}
808
809void SyncManagerImpl::NudgeForInitialDownload(syncer::ModelType type) {
810  DCHECK(thread_checker_.CalledOnValidThread());
811  scheduler_->ScheduleInitialSyncNudge(type);
812}
813
814void SyncManagerImpl::NudgeForCommit(syncer::ModelType type) {
815  DCHECK(thread_checker_.CalledOnValidThread());
816  RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
817}
818
819void SyncManagerImpl::NudgeForRefresh(syncer::ModelType type) {
820  DCHECK(thread_checker_.CalledOnValidThread());
821  RefreshTypes(ModelTypeSet(type));
822}
823
824void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
825  DCHECK(thread_checker_.CalledOnValidThread());
826  // Only send an event if this is due to a cycle ending and this cycle
827  // concludes a canonical "sync" process; that is, based on what is known
828  // locally we are "all happy" and up-to-date.  There may be new changes on
829  // the server, but we'll get them on a subsequent sync.
830  //
831  // Notifications are sent at the end of every sync cycle, regardless of
832  // whether we should sync again.
833  if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
834    if (!initialized_) {
835      DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
836               << "initialized";
837      return;
838    }
839
840    DVLOG(1) << "Sending OnSyncCycleCompleted";
841    FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
842                      OnSyncCycleCompleted(event.snapshot));
843  }
844}
845
846void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
847  FOR_EACH_OBSERVER(
848      SyncManager::Observer, observers_,
849      OnActionableError(error));
850}
851
852void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
853
854void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
855
856void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
857  FOR_EACH_OBSERVER(
858      SyncManager::Observer, observers_,
859      OnMigrationRequested(types));
860}
861
862void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
863  protocol_event_buffer_.RecordProtocolEvent(event);
864  FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
865                    OnProtocolEvent(event));
866}
867
868void SyncManagerImpl::SetJsEventHandler(
869    const WeakHandle<JsEventHandler>& event_handler) {
870  js_sync_manager_observer_.SetJsEventHandler(event_handler);
871  js_mutation_event_observer_.SetJsEventHandler(event_handler);
872  js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
873}
874
875scoped_ptr<base::ListValue> SyncManagerImpl::GetAllNodesForType(
876    syncer::ModelType type) {
877  DirectoryTypeDebugInfoEmitterMap* emitter_map =
878      model_type_registry_->directory_type_debug_info_emitter_map();
879  DirectoryTypeDebugInfoEmitterMap::iterator it = emitter_map->find(type);
880
881  if (it == emitter_map->end()) {
882    // This can happen in some cases.  The UI thread makes requests of us
883    // when it doesn't really know which types are enabled or disabled.
884    DLOG(WARNING) << "Asked to return debug info for invalid type "
885                  << ModelTypeToString(type);
886    return scoped_ptr<base::ListValue>(new base::ListValue());
887  }
888
889  return it->second->GetAllNodes();
890}
891
892void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
893  DCHECK(thread_checker_.CalledOnValidThread());
894
895  DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
896  allstatus_.SetNotificationsEnabled(invalidator_enabled);
897  scheduler_->SetNotificationsEnabled(invalidator_enabled);
898}
899
900void SyncManagerImpl::OnIncomingInvalidation(
901    syncer::ModelType type,
902    scoped_ptr<InvalidationInterface> invalidation) {
903  DCHECK(thread_checker_.CalledOnValidThread());
904
905  scheduler_->ScheduleInvalidationNudge(
906      type,
907      invalidation.Pass(),
908      FROM_HERE);
909}
910
911void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
912  DCHECK(thread_checker_.CalledOnValidThread());
913  if (types.Empty()) {
914    LOG(WARNING) << "Sync received refresh request with no types specified.";
915  } else {
916    scheduler_->ScheduleLocalRefreshRequest(
917        types, FROM_HERE);
918  }
919}
920
921SyncStatus SyncManagerImpl::GetDetailedStatus() const {
922  return allstatus_.status();
923}
924
925void SyncManagerImpl::SaveChanges() {
926  directory()->SaveChanges();
927}
928
929UserShare* SyncManagerImpl::GetUserShare() {
930  DCHECK(initialized_);
931  return &share_;
932}
933
934syncer::SyncContextProxy* SyncManagerImpl::GetSyncContextProxy() {
935  DCHECK(initialized_);
936  return sync_context_proxy_.get();
937}
938
939const std::string SyncManagerImpl::cache_guid() {
940  DCHECK(initialized_);
941  return directory()->cache_guid();
942}
943
944bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
945  ReadTransaction trans(FROM_HERE, GetUserShare());
946  ReadNode nigori_node(&trans);
947  if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
948    DVLOG(1) << "Couldn't find Nigori node.";
949    return false;
950  }
951  bool found_experiment = false;
952
953  ReadNode favicon_sync_node(&trans);
954  if (favicon_sync_node.InitByClientTagLookup(
955          syncer::EXPERIMENTS,
956          syncer::kFaviconSyncTag) == BaseNode::INIT_OK) {
957    experiments->favicon_sync_limit =
958        favicon_sync_node.GetExperimentsSpecifics().favicon_sync().
959            favicon_sync_limit();
960    found_experiment = true;
961  }
962
963  ReadNode pre_commit_update_avoidance_node(&trans);
964  if (pre_commit_update_avoidance_node.InitByClientTagLookup(
965          syncer::EXPERIMENTS,
966          syncer::kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
967    session_context_->set_server_enabled_pre_commit_update_avoidance(
968        pre_commit_update_avoidance_node.GetExperimentsSpecifics().
969            pre_commit_update_avoidance().enabled());
970    // We don't bother setting found_experiment.  The frontend doesn't need to
971    // know about this.
972  }
973
974  ReadNode gcm_channel_node(&trans);
975  if (gcm_channel_node.InitByClientTagLookup(
976          syncer::EXPERIMENTS,
977          syncer::kGCMChannelTag) == BaseNode::INIT_OK &&
978      gcm_channel_node.GetExperimentsSpecifics().gcm_channel().has_enabled()) {
979    experiments->gcm_channel_state =
980        (gcm_channel_node.GetExperimentsSpecifics().gcm_channel().enabled() ?
981         syncer::Experiments::ENABLED : syncer::Experiments::SUPPRESSED);
982    found_experiment = true;
983  }
984
985  ReadNode enhanced_bookmarks_node(&trans);
986  if (enhanced_bookmarks_node.InitByClientTagLookup(
987          syncer::EXPERIMENTS, syncer::kEnhancedBookmarksTag) ==
988          BaseNode::INIT_OK &&
989      enhanced_bookmarks_node.GetExperimentsSpecifics()
990          .has_enhanced_bookmarks()) {
991    const sync_pb::EnhancedBookmarksFlags& enhanced_bookmarks =
992        enhanced_bookmarks_node.GetExperimentsSpecifics().enhanced_bookmarks();
993    if (enhanced_bookmarks.has_enabled())
994      experiments->enhanced_bookmarks_enabled = enhanced_bookmarks.enabled();
995    if (enhanced_bookmarks.has_extension_id()) {
996      experiments->enhanced_bookmarks_ext_id =
997          enhanced_bookmarks.extension_id();
998    }
999    found_experiment = true;
1000  }
1001
1002  ReadNode gcm_invalidations_node(&trans);
1003  if (gcm_invalidations_node.InitByClientTagLookup(
1004          syncer::EXPERIMENTS, syncer::kGCMInvalidationsTag) ==
1005      BaseNode::INIT_OK) {
1006    const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
1007        gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
1008    if (gcm_invalidations.has_enabled()) {
1009      experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
1010      found_experiment = true;
1011    }
1012  }
1013
1014  return found_experiment;
1015}
1016
1017bool SyncManagerImpl::HasUnsyncedItems() {
1018  ReadTransaction trans(FROM_HERE, GetUserShare());
1019  return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
1020}
1021
1022SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
1023  return sync_encryption_handler_.get();
1024}
1025
1026ScopedVector<syncer::ProtocolEvent>
1027    SyncManagerImpl::GetBufferedProtocolEvents() {
1028  return protocol_event_buffer_.GetBufferedProtocolEvents();
1029}
1030
1031void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
1032    syncer::TypeDebugInfoObserver* observer) {
1033  model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
1034}
1035
1036void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
1037    syncer::TypeDebugInfoObserver* observer) {
1038  model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
1039}
1040
1041bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
1042    syncer::TypeDebugInfoObserver* observer) {
1043  return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
1044}
1045
1046void SyncManagerImpl::RequestEmitDebugInfo() {
1047  model_type_registry_->RequestEmitDebugInfo();
1048}
1049
1050}  // namespace syncer
1051