1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "sync/notifier/sync_invalidation_listener.h"
6
7#include <vector>
8
9#include "base/bind.h"
10#include "base/callback.h"
11#include "base/compiler_specific.h"
12#include "base/logging.h"
13#include "base/tracked_objects.h"
14#include "google/cacheinvalidation/include/invalidation-client.h"
15#include "google/cacheinvalidation/include/types.h"
16#include "google/cacheinvalidation/types.pb.h"
17#include "jingle/notifier/listener/push_client.h"
18#include "sync/notifier/invalidation_util.h"
19#include "sync/notifier/object_id_invalidation_map.h"
20#include "sync/notifier/registration_manager.h"
21
22namespace {
23
24const char kApplicationName[] = "chrome-sync";
25
26}  // namespace
27
28namespace syncer {
29
30SyncInvalidationListener::Delegate::~Delegate() {}
31
32SyncInvalidationListener::SyncInvalidationListener(
33    scoped_ptr<notifier::PushClient> push_client)
34    : push_client_channel_(push_client.Pass()),
35      sync_system_resources_(&push_client_channel_, this),
36      delegate_(NULL),
37      ticl_state_(DEFAULT_INVALIDATION_ERROR),
38      push_client_state_(DEFAULT_INVALIDATION_ERROR),
39      weak_ptr_factory_(this) {
40  DCHECK(CalledOnValidThread());
41  push_client_channel_.AddObserver(this);
42}
43
44SyncInvalidationListener::~SyncInvalidationListener() {
45  DCHECK(CalledOnValidThread());
46  push_client_channel_.RemoveObserver(this);
47  Stop();
48  DCHECK(!delegate_);
49}
50
51void SyncInvalidationListener::Start(
52    const CreateInvalidationClientCallback&
53        create_invalidation_client_callback,
54    const std::string& client_id, const std::string& client_info,
55    const std::string& invalidation_bootstrap_data,
56    const UnackedInvalidationsMap& initial_unacked_invalidations,
57    const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
58    Delegate* delegate) {
59  DCHECK(CalledOnValidThread());
60  Stop();
61
62  sync_system_resources_.set_platform(client_info);
63  sync_system_resources_.Start();
64
65  // The Storage resource is implemented as a write-through cache.  We populate
66  // it with the initial state on startup, so subsequent writes go to disk and
67  // update the in-memory cache, while reads just return the cached state.
68  sync_system_resources_.storage()->SetInitialState(
69      invalidation_bootstrap_data);
70
71  unacked_invalidations_map_ = initial_unacked_invalidations;
72  invalidation_state_tracker_ = invalidation_state_tracker;
73  DCHECK(invalidation_state_tracker_.IsInitialized());
74
75  DCHECK(!delegate_);
76  DCHECK(delegate);
77  delegate_ = delegate;
78
79#if defined(OS_IOS)
80  int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS;
81#else
82  int client_type = ipc::invalidation::ClientType::CHROME_SYNC;
83#endif
84  invalidation_client_.reset(
85      create_invalidation_client_callback.Run(
86          &sync_system_resources_, client_type, client_id,
87          kApplicationName, this));
88  invalidation_client_->Start();
89
90  registration_manager_.reset(
91      new RegistrationManager(invalidation_client_.get()));
92}
93
94void SyncInvalidationListener::UpdateCredentials(
95    const std::string& email, const std::string& token) {
96  DCHECK(CalledOnValidThread());
97  push_client_channel_.UpdateCredentials(email, token);
98}
99
100void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
101  DCHECK(CalledOnValidThread());
102  registered_ids_ = ids;
103  // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
104  // working XMPP connection (as observed by us), so check it instead
105  // of GetState() (see http://crbug.com/139424).
106  if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
107    DoRegistrationUpdate();
108  }
109}
110
111void SyncInvalidationListener::Ready(
112    invalidation::InvalidationClient* client) {
113  DCHECK(CalledOnValidThread());
114  DCHECK_EQ(client, invalidation_client_.get());
115  ticl_state_ = INVALIDATIONS_ENABLED;
116  EmitStateChange();
117  DoRegistrationUpdate();
118}
119
120void SyncInvalidationListener::Invalidate(
121    invalidation::InvalidationClient* client,
122    const invalidation::Invalidation& invalidation,
123    const invalidation::AckHandle& ack_handle) {
124  DCHECK(CalledOnValidThread());
125  DCHECK_EQ(client, invalidation_client_.get());
126  client->Acknowledge(ack_handle);
127
128  const invalidation::ObjectId& id = invalidation.object_id();
129
130  std::string payload;
131  // payload() CHECK()'s has_payload(), so we must check it ourselves first.
132  if (invalidation.has_payload())
133    payload = invalidation.payload();
134
135  DVLOG(2) << "Received invalidation with version " << invalidation.version()
136           << " for " << ObjectIdToString(id);
137
138  ObjectIdInvalidationMap invalidations;
139  Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
140  inv.set_ack_handler(GetThisAsAckHandler());
141  invalidations.Insert(inv);
142
143  DispatchInvalidations(invalidations);
144}
145
146void SyncInvalidationListener::InvalidateUnknownVersion(
147    invalidation::InvalidationClient* client,
148    const invalidation::ObjectId& object_id,
149    const invalidation::AckHandle& ack_handle) {
150  DCHECK(CalledOnValidThread());
151  DCHECK_EQ(client, invalidation_client_.get());
152  DVLOG(1) << "InvalidateUnknownVersion";
153  client->Acknowledge(ack_handle);
154
155  ObjectIdInvalidationMap invalidations;
156  Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
157  unknown_version.set_ack_handler(GetThisAsAckHandler());
158  invalidations.Insert(unknown_version);
159
160  DispatchInvalidations(invalidations);
161}
162
163// This should behave as if we got an invalidation with version
164// UNKNOWN_OBJECT_VERSION for all known data types.
165void SyncInvalidationListener::InvalidateAll(
166    invalidation::InvalidationClient* client,
167    const invalidation::AckHandle& ack_handle) {
168  DCHECK(CalledOnValidThread());
169  DCHECK_EQ(client, invalidation_client_.get());
170  DVLOG(1) << "InvalidateAll";
171  client->Acknowledge(ack_handle);
172
173  ObjectIdInvalidationMap invalidations;
174  for (ObjectIdSet::iterator it = registered_ids_.begin();
175       it != registered_ids_.end(); ++it) {
176    Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
177    unknown_version.set_ack_handler(GetThisAsAckHandler());
178    invalidations.Insert(unknown_version);
179  }
180
181  DispatchInvalidations(invalidations);
182}
183
184// If a handler is registered, emit right away.  Otherwise, save it for later.
185void SyncInvalidationListener::DispatchInvalidations(
186    const ObjectIdInvalidationMap& invalidations) {
187  DCHECK(CalledOnValidThread());
188
189  ObjectIdInvalidationMap to_save = invalidations;
190  ObjectIdInvalidationMap to_emit =
191      invalidations.GetSubsetWithObjectIds(registered_ids_);
192
193  SaveInvalidations(to_save);
194  EmitSavedInvalidations(to_emit);
195}
196
197void SyncInvalidationListener::SaveInvalidations(
198    const ObjectIdInvalidationMap& to_save) {
199  ObjectIdSet objects_to_save = to_save.GetObjectIds();
200  for (ObjectIdSet::const_iterator it = objects_to_save.begin();
201       it != objects_to_save.end(); ++it) {
202    UnackedInvalidationsMap::iterator lookup =
203        unacked_invalidations_map_.find(*it);
204    if (lookup == unacked_invalidations_map_.end()) {
205      lookup = unacked_invalidations_map_.insert(
206          std::make_pair(*it, UnackedInvalidationSet(*it))).first;
207    }
208    lookup->second.AddSet(to_save.ForObject(*it));
209  }
210
211  invalidation_state_tracker_.Call(
212      FROM_HERE,
213      &InvalidationStateTracker::SetSavedInvalidations,
214      unacked_invalidations_map_);
215}
216
217void SyncInvalidationListener::EmitSavedInvalidations(
218    const ObjectIdInvalidationMap& to_emit) {
219  DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
220  delegate_->OnInvalidate(to_emit);
221}
222
223void SyncInvalidationListener::InformRegistrationStatus(
224      invalidation::InvalidationClient* client,
225      const invalidation::ObjectId& object_id,
226      InvalidationListener::RegistrationState new_state) {
227  DCHECK(CalledOnValidThread());
228  DCHECK_EQ(client, invalidation_client_.get());
229  DVLOG(1) << "InformRegistrationStatus: "
230           << ObjectIdToString(object_id) << " " << new_state;
231
232  if (new_state != InvalidationListener::REGISTERED) {
233    // Let |registration_manager_| handle the registration backoff policy.
234    registration_manager_->MarkRegistrationLost(object_id);
235  }
236}
237
238void SyncInvalidationListener::InformRegistrationFailure(
239    invalidation::InvalidationClient* client,
240    const invalidation::ObjectId& object_id,
241    bool is_transient,
242    const std::string& error_message) {
243  DCHECK(CalledOnValidThread());
244  DCHECK_EQ(client, invalidation_client_.get());
245  DVLOG(1) << "InformRegistrationFailure: "
246           << ObjectIdToString(object_id)
247           << "is_transient=" << is_transient
248           << ", message=" << error_message;
249
250  if (is_transient) {
251    // We don't care about |unknown_hint|; we let
252    // |registration_manager_| handle the registration backoff policy.
253    registration_manager_->MarkRegistrationLost(object_id);
254  } else {
255    // Non-transient failures require an action to resolve. This could happen
256    // because:
257    // - the server doesn't yet recognize the data type, which could happen for
258    //   brand-new data types.
259    // - the user has changed his password and hasn't updated it yet locally.
260    // Either way, block future registration attempts for |object_id|. However,
261    // we don't forget any saved invalidation state since we may use it once the
262    // error is addressed.
263    registration_manager_->DisableId(object_id);
264  }
265}
266
267void SyncInvalidationListener::ReissueRegistrations(
268    invalidation::InvalidationClient* client,
269    const std::string& prefix,
270    int prefix_length) {
271  DCHECK(CalledOnValidThread());
272  DCHECK_EQ(client, invalidation_client_.get());
273  DVLOG(1) << "AllRegistrationsLost";
274  registration_manager_->MarkAllRegistrationsLost();
275}
276
277void SyncInvalidationListener::InformError(
278    invalidation::InvalidationClient* client,
279    const invalidation::ErrorInfo& error_info) {
280  DCHECK(CalledOnValidThread());
281  DCHECK_EQ(client, invalidation_client_.get());
282  LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
283             << error_info.error_message()
284             << " (transient = " << error_info.is_transient() << ")";
285  if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
286    ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
287  } else {
288    ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
289  }
290  EmitStateChange();
291}
292
293void SyncInvalidationListener::Acknowledge(
294  const invalidation::ObjectId& id,
295  const syncer::AckHandle& handle) {
296  UnackedInvalidationsMap::iterator lookup =
297      unacked_invalidations_map_.find(id);
298  if (lookup == unacked_invalidations_map_.end()) {
299    DLOG(WARNING) << "Received acknowledgement for untracked object ID";
300    return;
301  }
302  lookup->second.Acknowledge(handle);
303  invalidation_state_tracker_.Call(
304      FROM_HERE,
305      &InvalidationStateTracker::SetSavedInvalidations,
306      unacked_invalidations_map_);
307}
308
309void SyncInvalidationListener::Drop(
310    const invalidation::ObjectId& id,
311    const syncer::AckHandle& handle) {
312  UnackedInvalidationsMap::iterator lookup =
313      unacked_invalidations_map_.find(id);
314  if (lookup == unacked_invalidations_map_.end()) {
315    DLOG(WARNING) << "Received drop for untracked object ID";
316    return;
317  }
318  lookup->second.Drop(handle);
319  invalidation_state_tracker_.Call(
320      FROM_HERE,
321      &InvalidationStateTracker::SetSavedInvalidations,
322      unacked_invalidations_map_);
323}
324
325void SyncInvalidationListener::WriteState(const std::string& state) {
326  DCHECK(CalledOnValidThread());
327  DVLOG(1) << "WriteState";
328  invalidation_state_tracker_.Call(
329      FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state);
330}
331
332void SyncInvalidationListener::DoRegistrationUpdate() {
333  DCHECK(CalledOnValidThread());
334  const ObjectIdSet& unregistered_ids =
335      registration_manager_->UpdateRegisteredIds(registered_ids_);
336  for (ObjectIdSet::iterator it = unregistered_ids.begin();
337       it != unregistered_ids.end(); ++it) {
338    unacked_invalidations_map_.erase(*it);
339  }
340  invalidation_state_tracker_.Call(
341      FROM_HERE,
342      &InvalidationStateTracker::SetSavedInvalidations,
343      unacked_invalidations_map_);
344
345  ObjectIdInvalidationMap object_id_invalidation_map;
346  for (UnackedInvalidationsMap::iterator map_it =
347       unacked_invalidations_map_.begin();
348       map_it != unacked_invalidations_map_.end(); ++map_it) {
349    if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
350      continue;
351    }
352    map_it->second.ExportInvalidations(
353        GetThisAsAckHandler(),
354        &object_id_invalidation_map);
355  }
356
357  // There's no need to run these through DispatchInvalidations(); they've
358  // already been saved to storage (that's where we found them) so all we need
359  // to do now is emit them.
360  EmitSavedInvalidations(object_id_invalidation_map);
361}
362
363void SyncInvalidationListener::StopForTest() {
364  DCHECK(CalledOnValidThread());
365  Stop();
366}
367
368void SyncInvalidationListener::Stop() {
369  DCHECK(CalledOnValidThread());
370  if (!invalidation_client_) {
371    return;
372  }
373
374  registration_manager_.reset();
375  sync_system_resources_.Stop();
376  invalidation_client_->Stop();
377
378  invalidation_client_.reset();
379  delegate_ = NULL;
380
381  ticl_state_ = DEFAULT_INVALIDATION_ERROR;
382  push_client_state_ = DEFAULT_INVALIDATION_ERROR;
383}
384
385InvalidatorState SyncInvalidationListener::GetState() const {
386  DCHECK(CalledOnValidThread());
387  if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
388      push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
389    // If either the ticl or the push client rejected our credentials,
390    // return INVALIDATION_CREDENTIALS_REJECTED.
391    return INVALIDATION_CREDENTIALS_REJECTED;
392  }
393  if (ticl_state_ == INVALIDATIONS_ENABLED &&
394      push_client_state_ == INVALIDATIONS_ENABLED) {
395    // If the ticl is ready and the push client notifications are
396    // enabled, return INVALIDATIONS_ENABLED.
397    return INVALIDATIONS_ENABLED;
398  }
399  // Otherwise, we have a transient error.
400  return TRANSIENT_INVALIDATION_ERROR;
401}
402
403void SyncInvalidationListener::EmitStateChange() {
404  DCHECK(CalledOnValidThread());
405  delegate_->OnInvalidatorStateChange(GetState());
406}
407
408WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() {
409  DCHECK(CalledOnValidThread());
410  return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr());
411}
412
413void SyncInvalidationListener::OnNetworkChannelStateChanged(
414    InvalidatorState invalidator_state) {
415  DCHECK(CalledOnValidThread());
416  push_client_state_ = invalidator_state;
417  EmitStateChange();
418}
419
420}  // namespace syncer
421